use anyhow::Result;
use std::collections::BTreeMap;
use std::sync::Arc;
pub mod application;
pub mod domain;
pub mod infrastructure;
pub mod presentation;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LogLevel {
Error,
#[default]
Info,
Debug,
}
#[cfg(feature = "cli")]
pub fn init_tracing(level: LogLevel) {
use tracing_subscriber::fmt::format::FmtSpan;
let default_filter = match level {
LogLevel::Error => "diffly=error",
LogLevel::Info => "diffly=info",
LogLevel::Debug => "diffly=debug",
};
tracing_subscriber::fmt()
.with_span_events(FmtSpan::CLOSE)
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| default_filter.into()),
)
.init();
}
pub use application::monitoring::PerfReport;
pub use domain::changeset::{Changeset, Summary};
pub use domain::conflict::ConflictReport;
pub use domain::diff_result::DiffResult;
pub use domain::fingerprint::fingerprint;
pub use domain::ports::SnapshotProvider;
pub use domain::snapshot::MapSnapshotProvider;
pub use domain::table_diff::{ColumnDiff, RowChange, RowMap, RowUpdate, TableDiff};
pub use domain::value_objects::{ColumnName, ExcludedColumns, Fingerprint, Schema, TableName};
pub use infrastructure::config::{AppConfig, DbConfig, DiffConfig, OutputConfig, TableConfig};
use crate::application::conflict::ConflictService;
use crate::application::diff::{DiffService, TableDiffer};
use crate::application::monitoring::{MonitoringDiffer, MonitoringRowRepository};
use crate::application::snapshot::SnapshotService;
use crate::domain::ports::RowRepository;
use crate::infrastructure::db::client::connect;
pub async fn run(cfg: &AppConfig) -> Result<Changeset> {
let (changeset, _) = run_with_timing(cfg).await?;
Ok(changeset)
}
pub async fn run_with_timing(cfg: &AppConfig) -> Result<(Changeset, PerfReport)> {
let report = PerfReport::new();
let source_repo = build_repo(&cfg.source, Arc::clone(&report)).await?;
let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
let differ = Arc::new(MonitoringDiffer::new(
Arc::new(TableDiffer::new()),
Arc::clone(&report),
));
let service = DiffService::new(source_repo, target_repo, differ);
let source_schema = Schema(cfg.source.schema.clone());
let target_schema = Schema(cfg.target.schema.clone());
let changeset = service
.run_diff(
&source_schema,
&target_schema,
&cfg.source.driver,
&cfg.diff.tables,
)
.await?;
let perf = report.lock().unwrap().clone();
let changeset = changeset.with_perf(perf.clone());
Ok((changeset, perf))
}
pub async fn snapshot(cfg: &AppConfig) -> Result<BTreeMap<String, Vec<RowMap>>> {
let (raw, _) = snapshot_with_timing(cfg).await?;
Ok(raw)
}
pub async fn snapshot_with_timing(
cfg: &AppConfig,
) -> Result<(BTreeMap<String, Vec<RowMap>>, PerfReport)> {
let report = PerfReport::new();
let target_repo = build_repo(&cfg.target, Arc::clone(&report)).await?;
let svc = SnapshotService::new(target_repo);
let target_schema = Schema(cfg.target.schema.clone());
let raw = svc.capture(&target_schema, &cfg.diff.tables).await?;
let perf = report.lock().unwrap().clone();
Ok((raw, perf))
}
pub fn snapshot_provider(data: BTreeMap<String, Vec<RowMap>>) -> MapSnapshotProvider {
MapSnapshotProvider::new(data)
}
pub async fn run_with_conflicts(
cfg: &AppConfig,
base: &dyn SnapshotProvider,
stored_fps: &BTreeMap<String, Fingerprint>,
current_target_rows: &BTreeMap<String, Vec<RowMap>>,
) -> Result<DiffResult> {
let changeset = run(cfg).await?;
let pk_cols_by_table: BTreeMap<String, Vec<ColumnName>> = cfg
.diff
.tables
.iter()
.map(|t| {
let cols = t
.primary_key
.iter()
.map(|pk| ColumnName(pk.clone()))
.collect();
(t.name.clone(), cols)
})
.collect();
let conflict_svc = ConflictService::new();
Ok(conflict_svc.check(
changeset,
base,
stored_fps,
current_target_rows,
&pk_cols_by_table,
))
}
async fn build_repo(
cfg: &DbConfig,
report: Arc<std::sync::Mutex<PerfReport>>,
) -> Result<Arc<dyn RowRepository>> {
let repo = Arc::new(connect(cfg).await?);
Ok(Arc::new(MonitoringRowRepository::new(repo, report)))
}