Skip to main content

diffly/
lib.rs

1use anyhow::Result;
2use std::collections::BTreeMap;
3use std::sync::Arc;
4
5pub mod application;
6pub mod domain;
7pub mod infrastructure;
8pub mod presentation;
9
10// ─── Log level ────────────────────────────────────────────────────────────────
11
12/// Controls the verbosity of diffly's internal tracing output.
13///
14/// Pass to [`init_tracing`] before calling any async entry point.
15///
16/// | Variant | `tracing` level | When to use                         |
17/// |---------|-----------------|-------------------------------------|
18/// | `Error` | `error`         | `--quiet` / CI scripting            |
19/// | `Info`  | `info`          | Default — shows per-table timings   |
20/// | `Debug` | `debug`         | `--verbose` — shows SQL queries too |
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum LogLevel {
23    Error,
24    #[default]
25    Info,
26    Debug,
27}
28
29/// Initialise the global `tracing` subscriber for diffly.
30///
31/// This is a convenience wrapper around `tracing_subscriber`. It respects
32/// `RUST_LOG` when set, falling back to `level` otherwise.
33///
34/// Call this **once** at application startup, before any diffly async function.
35/// Library consumers who manage their own subscriber should skip this and
36/// configure tracing themselves.
37///
38/// Only available when the `cli` feature is enabled (pulls in
39/// `tracing-subscriber`).
40#[cfg(feature = "cli")]
41pub fn init_tracing(level: LogLevel) {
42    use tracing_subscriber::fmt::format::FmtSpan;
43
44    let default_filter = match level {
45        LogLevel::Error => "diffly=error",
46        LogLevel::Info => "diffly=info",
47        LogLevel::Debug => "diffly=debug",
48    };
49
50    tracing_subscriber::fmt()
51        .with_span_events(FmtSpan::CLOSE)
52        .with_env_filter(
53            tracing_subscriber::EnvFilter::try_from_default_env()
54                .unwrap_or_else(|_| default_filter.into()),
55        )
56        .init();
57}
58
59// ─── Public API Facade ───
60
61pub use application::monitoring::PerfReport;
62pub use domain::changeset::{Changeset, Summary};
63pub use domain::conflict::ConflictReport;
64pub use domain::diff_result::DiffResult;
65pub use domain::fingerprint::fingerprint;
66pub use domain::ports::SnapshotProvider;
67pub use domain::snapshot::MapSnapshotProvider;
68pub use domain::table_diff::{ColumnDiff, RowChange, RowMap, RowUpdate, TableDiff};
69pub use domain::value_objects::{ColumnName, ExcludedColumns, Fingerprint, Schema, TableName};
70pub use infrastructure::config::{AppConfig, DbConfig, DiffConfig, OutputConfig, TableConfig};
71
72use crate::application::conflict::ConflictService;
73use crate::application::diff::{DiffService, TableDiffer};
74use crate::application::monitoring::{MonitoringDiffer, MonitoringRowRepository};
75use crate::application::snapshot::SnapshotService;
76use crate::domain::ports::RowRepository;
77use crate::infrastructure::db::client::connect;
78
79// ─── Public entry points ───
80
81/// 2-way diff only (no conflict detection).
82///
83/// Returns the raw `Changeset` (source vs. current target).
84/// Use [`run_with_conflicts`] if you need the 3-way merge.
85/// Use [`run_with_timing`] if you also want a performance report.
86pub async fn run(cfg: &AppConfig) -> Result<Changeset> {
87    let (changeset, _) = run_with_timing(cfg).await?;
88    Ok(changeset)
89}
90
91/// 2-way diff with performance timing.
92///
93/// Returns the `Changeset` and a [`PerfReport`] containing per-table
94/// fetch and diff timings.
95pub async fn run_with_timing(cfg: &AppConfig) -> Result<(Changeset, PerfReport)> {
96    let report = PerfReport::new();
97
98    let source_repo = build_repo(&cfg.source, Arc::clone(&report)).await?;
99    let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
100    let differ = Arc::new(MonitoringDiffer::new(
101        Arc::new(TableDiffer::new()),
102        Arc::clone(&report),
103    ));
104
105    let service = DiffService::new(source_repo, target_repo, differ);
106
107    let source_schema = Schema(cfg.source.schema.clone());
108    let target_schema = Schema(cfg.target.schema.clone());
109
110    let changeset = service
111        .run_diff(
112            &source_schema,
113            &target_schema,
114            &cfg.source.driver,
115            &cfg.diff.tables,
116        )
117        .await?;
118
119    let perf = report.lock().unwrap().clone();
120    // Embed the perf report inside the changeset so JSON/HTML writers include it automatically.
121    let changeset = changeset.with_perf(perf.clone());
122    Ok((changeset, perf))
123}
124
125/// Capture a point-in-time snapshot of the **target** DB for all configured tables.
126///
127/// Call this at **source-clone time**. The returned map is the data you
128/// should serialise (JSON, DynamoDB, S3…) and restore via [`snapshot_provider`].
129/// ```
130pub async fn snapshot(cfg: &AppConfig) -> Result<BTreeMap<String, Vec<RowMap>>> {
131    let (raw, _) = snapshot_with_timing(cfg).await?;
132    Ok(raw)
133}
134
135/// Capture a snapshot and return a [`PerfReport`] alongside the rows.
136pub async fn snapshot_with_timing(
137    cfg: &AppConfig,
138) -> Result<(BTreeMap<String, Vec<RowMap>>, PerfReport)> {
139    let report = PerfReport::new();
140    let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
141    let svc = SnapshotService::new(target_repo);
142    let target_schema = Schema(cfg.target.schema.clone());
143    let raw = svc.capture(&target_schema, &cfg.diff.tables).await?;
144    let perf = report.lock().unwrap().clone();
145    Ok((raw, perf))
146}
147
148/// Wrap a previously-captured snapshot map as a [`SnapshotProvider`].
149///
150/// Counterpart of [`snapshot`]: deserialise the stored JSON back into
151/// `BTreeMap<String, Vec<RowMap>>` and pass it here. The returned value is
152/// ready to use as the `base` argument to [`run_with_conflicts`].
153pub fn snapshot_provider(data: BTreeMap<String, Vec<RowMap>>) -> MapSnapshotProvider {
154    MapSnapshotProvider::new(data)
155}
156
157/// 2-way diff + 3-way conflict detection.
158///
159/// # Arguments
160/// * `cfg`                 — application configuration (same as [`run`])
161/// * `base`                — snapshot of target at **source-clone time**, obtained via [`snapshot_provider`]
162/// * `stored_fps`          — per-table SHA-256 fingerprints stored at clone time
163/// * `current_target_rows` — current target rows per table
164///
165/// The caller is responsible for persisting and restoring `base` and
166/// `stored_fps`. Diffly has no opinion on storage.
167pub async fn run_with_conflicts(
168    cfg: &AppConfig,
169    base: &dyn SnapshotProvider,
170    stored_fps: &BTreeMap<String, Fingerprint>,
171    current_target_rows: &BTreeMap<String, Vec<RowMap>>,
172) -> Result<DiffResult> {
173    let changeset = run(cfg).await?;
174
175    let pk_cols_by_table: BTreeMap<String, Vec<ColumnName>> = cfg
176        .diff
177        .tables
178        .iter()
179        .map(|t| {
180            let cols = t
181                .primary_key
182                .iter()
183                .map(|pk| ColumnName(pk.clone()))
184                .collect();
185            (t.name.clone(), cols)
186        })
187        .collect();
188
189    let conflict_svc = ConflictService::new();
190    Ok(conflict_svc.check(
191        changeset,
192        base,
193        stored_fps,
194        current_target_rows,
195        &pk_cols_by_table,
196    ))
197}
198
199// ─── Private helpers ───────────────────────────────────────────────────────────
200
201/// Connect to a DB and wrap the repository in the monitoring decorator.
202///
203/// The shared `report` accumulates timings from all repos created for the
204/// same run, giving a unified view across source and target.
205async fn build_repo(
206    cfg: &DbConfig,
207    report: Arc<std::sync::Mutex<PerfReport>>,
208) -> Result<Arc<dyn RowRepository>> {
209    let repo = Arc::new(connect(cfg).await?);
210    Ok(Arc::new(MonitoringRowRepository::new(repo, report)))
211}