1use anyhow::Result;
2use std::collections::BTreeMap;
3use std::sync::Arc;
4
5pub mod application;
6pub mod domain;
7pub mod infrastructure;
8pub mod presentation;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum LogLevel {
23 Error,
24 #[default]
25 Info,
26 Debug,
27}
28
29#[cfg(feature = "cli")]
41pub fn init_tracing(level: LogLevel) {
42 use tracing_subscriber::fmt::format::FmtSpan;
43
44 let default_filter = match level {
45 LogLevel::Error => "diffly=error",
46 LogLevel::Info => "diffly=info",
47 LogLevel::Debug => "diffly=debug",
48 };
49
50 tracing_subscriber::fmt()
51 .with_span_events(FmtSpan::CLOSE)
52 .with_env_filter(
53 tracing_subscriber::EnvFilter::try_from_default_env()
54 .unwrap_or_else(|_| default_filter.into()),
55 )
56 .init();
57}
58
59pub use application::monitoring::PerfReport;
62pub use domain::changeset::{Changeset, Summary};
63pub use domain::conflict::ConflictReport;
64pub use domain::diff_result::DiffResult;
65pub use domain::fingerprint::fingerprint;
66pub use domain::ports::SnapshotProvider;
67pub use domain::snapshot::MapSnapshotProvider;
68pub use domain::table_diff::{ColumnDiff, RowChange, RowMap, RowUpdate, TableDiff};
69pub use domain::value_objects::{ColumnName, ExcludedColumns, Fingerprint, Schema, TableName};
70pub use infrastructure::config::{AppConfig, DbConfig, DiffConfig, OutputConfig, TableConfig};
71
72use crate::application::conflict::ConflictService;
73use crate::application::diff::{DiffService, TableDiffer};
74use crate::application::monitoring::{MonitoringDiffer, MonitoringRowRepository};
75use crate::application::snapshot::SnapshotService;
76use crate::domain::ports::RowRepository;
77use crate::infrastructure::db::client::connect;
78
79pub async fn run(cfg: &AppConfig) -> Result<Changeset> {
87 let (changeset, _) = run_with_timing(cfg).await?;
88 Ok(changeset)
89}
90
91pub async fn run_with_timing(cfg: &AppConfig) -> Result<(Changeset, PerfReport)> {
96 let report = PerfReport::new();
97
98 let source_repo = build_repo(&cfg.source, Arc::clone(&report)).await?;
99 let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
100 let differ = Arc::new(MonitoringDiffer::new(
101 Arc::new(TableDiffer::new()),
102 Arc::clone(&report),
103 ));
104
105 let service = DiffService::new(source_repo, target_repo, differ);
106
107 let source_schema = Schema(cfg.source.schema.clone());
108 let target_schema = Schema(cfg.target.schema.clone());
109
110 let changeset = service
111 .run_diff(
112 &source_schema,
113 &target_schema,
114 &cfg.source.driver,
115 &cfg.diff.tables,
116 )
117 .await?;
118
119 let perf = report.lock().unwrap().clone();
120 let changeset = changeset.with_perf(perf.clone());
122 Ok((changeset, perf))
123}
124
125pub async fn snapshot(cfg: &AppConfig) -> Result<BTreeMap<String, Vec<RowMap>>> {
131 let (raw, _) = snapshot_with_timing(cfg).await?;
132 Ok(raw)
133}
134
135pub async fn snapshot_with_timing(
137 cfg: &AppConfig,
138) -> Result<(BTreeMap<String, Vec<RowMap>>, PerfReport)> {
139 let report = PerfReport::new();
140 let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
141 let svc = SnapshotService::new(target_repo);
142 let target_schema = Schema(cfg.target.schema.clone());
143 let raw = svc.capture(&target_schema, &cfg.diff.tables).await?;
144 let perf = report.lock().unwrap().clone();
145 Ok((raw, perf))
146}
147
148pub fn snapshot_provider(data: BTreeMap<String, Vec<RowMap>>) -> MapSnapshotProvider {
154 MapSnapshotProvider::new(data)
155}
156
157pub async fn run_with_conflicts(
168 cfg: &AppConfig,
169 base: &dyn SnapshotProvider,
170 stored_fps: &BTreeMap<String, Fingerprint>,
171 current_target_rows: &BTreeMap<String, Vec<RowMap>>,
172) -> Result<DiffResult> {
173 let changeset = run(cfg).await?;
174
175 let pk_cols_by_table: BTreeMap<String, Vec<ColumnName>> = cfg
176 .diff
177 .tables
178 .iter()
179 .map(|t| {
180 let cols = t
181 .primary_key
182 .iter()
183 .map(|pk| ColumnName(pk.clone()))
184 .collect();
185 (t.name.clone(), cols)
186 })
187 .collect();
188
189 let conflict_svc = ConflictService::new();
190 Ok(conflict_svc.check(
191 changeset,
192 base,
193 stored_fps,
194 current_target_rows,
195 &pk_cols_by_table,
196 ))
197}
198
199async fn build_repo(
206 cfg: &DbConfig,
207 report: Arc<std::sync::Mutex<PerfReport>>,
208) -> Result<Arc<dyn RowRepository>> {
209 let repo = Arc::new(connect(cfg).await?);
210 Ok(Arc::new(MonitoringRowRepository::new(repo, report)))
211}