use std::collections::{BTreeMap, BTreeSet};
use serde_json::Value;
use crate::domain::changeset::Changeset;
use crate::domain::conflict::ConflictReport;
use crate::domain::diff_result::DiffResult;
use crate::domain::fingerprint::fingerprint;
use crate::domain::ports::SnapshotProvider;
use crate::domain::table_diff::RowMap;
use crate::domain::value_objects::{ColumnName, Fingerprint, TableName};
use crate::infrastructure::db::sql_utils::pk_key;
pub struct ConflictService;
impl ConflictService {
pub fn new() -> Self {
Self
}
pub fn check(
&self,
changeset: Changeset,
base: &dyn SnapshotProvider,
stored_fingerprints: &BTreeMap<String, Fingerprint>,
current_target_rows: &BTreeMap<String, Vec<RowMap>>,
pk_cols_by_table: &BTreeMap<String, Vec<ColumnName>>,
) -> DiffResult {
let mut all_conflicts: Vec<ConflictReport> = Vec::new();
for table_diff in &changeset.tables {
let table_name = TableName(table_diff.table_name.clone());
let pk_cols = match pk_cols_by_table.get(&table_diff.table_name) {
Some(cols) => cols,
None => continue,
};
if let (Some(stored_fp), Some(current_rows)) = (
stored_fingerprints.get(&table_diff.table_name),
current_target_rows.get(&table_diff.table_name),
) {
let current_fp = fingerprint(current_rows);
if ¤t_fp == stored_fp {
continue; }
}
let base_rows = match base.get(&table_name) {
Some(rows) => rows,
None => continue, };
let current_rows = match current_target_rows.get(&table_diff.table_name) {
Some(rows) => rows,
None => continue,
};
let base_index: BTreeMap<String, &RowMap> =
base_rows.iter().map(|r| (pk_key(r, pk_cols), r)).collect();
let current_index: BTreeMap<String, &RowMap> = current_rows
.iter()
.map(|r| (pk_key(r, pk_cols), r))
.collect();
let mut source_index: BTreeMap<String, RowMap> = BTreeMap::new();
for ins in &table_diff.inserts {
let k = pk_key(&ins.data, pk_cols);
source_index.insert(k, ins.data.clone());
}
for upd in &table_diff.updates {
let k = pk_key(&upd.after, pk_cols);
source_index.insert(k, upd.after.clone());
}
for pk_str in source_index.keys() {
let base_row: Option<&RowMap> = base_index.get(pk_str).copied();
let current_row: Option<&RowMap> = current_index.get(pk_str).copied();
let source_row: Option<&RowMap> = source_index.get(pk_str).map(|r| r as &RowMap);
let all_cols: BTreeSet<String> = [base_row, current_row, source_row]
.iter()
.filter_map(|opt: &Option<&RowMap>| {
opt.map(|r: &RowMap| r.keys().cloned().collect::<Vec<_>>())
})
.flatten()
.collect();
for col in &all_cols {
let null = Value::Null;
let base_val = base_row.and_then(|r| r.get(col.as_str())).unwrap_or(&null);
let current_val = current_row
.and_then(|r| r.get(col.as_str()))
.unwrap_or(&null);
let source_val = source_row
.and_then(|r| r.get(col.as_str()))
.unwrap_or(&null);
let target_changed = current_val != base_val;
let source_changed = source_val != base_val;
if target_changed && source_changed && source_val != current_val {
let pk_map: BTreeMap<String, Value> = pk_cols
.iter()
.filter_map(|c| {
base_row
.or(current_row)
.and_then(|r| r.get(c.0.as_str()))
.map(|v| (c.0.clone(), v.clone()))
})
.collect();
all_conflicts.push(ConflictReport {
table_name: table_diff.table_name.clone(),
pk: pk_map,
column: col.clone(),
base_value: base_val.clone(),
source_value: source_val.clone(),
target_value: current_val.clone(),
});
}
}
}
}
if all_conflicts.is_empty() {
DiffResult::Clean(changeset)
} else {
DiffResult::Conflicted {
changeset,
conflicts: all_conflicts,
}
}
}
}
impl Default for ConflictService {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::changeset::Changeset;
use crate::domain::table_diff::{ColumnDiff, RowUpdate, TableDiff};
use serde_json::json;
struct MapSnapshot(BTreeMap<String, Vec<RowMap>>);
impl SnapshotProvider for MapSnapshot {
fn get(&self, table: &TableName) -> Option<&[RowMap]> {
self.0.get(&table.0).map(|v| v.as_slice())
}
}
fn row(pairs: &[(&str, Value)]) -> RowMap {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.clone()))
.collect()
}
fn pk_col(s: &str) -> ColumnName {
ColumnName(s.to_string())
}
fn empty_changeset() -> Changeset {
Changeset::new("source", "target", "postgres", vec![])
}
#[test]
fn clean_when_no_base_snapshot() {
let svc = ConflictService::new();
let cs = empty_changeset();
let base = MapSnapshot(BTreeMap::new());
let result = svc.check(
cs,
&base,
&BTreeMap::new(),
&BTreeMap::new(),
&BTreeMap::new(),
);
assert!(result.is_clean());
}
#[test]
fn clean_when_fingerprints_match() {
let svc = ConflictService::new();
let target_rows = vec![row(&[("id", json!(1)), ("val", json!("x"))])];
let stored_fp = fingerprint(&target_rows);
let table = "t";
let cs = Changeset::new(
"s",
"target",
"postgres",
vec![TableDiff {
table_name: table.to_string(),
primary_key: vec!["id".to_string()],
inserts: vec![],
updates: vec![],
deletes: vec![],
}],
);
let base = MapSnapshot([(table.to_string(), target_rows.clone())].into());
let stored_fps = [(table.to_string(), stored_fp)].into();
let current_rows = [(table.to_string(), target_rows)].into();
let pk_map = [(table.to_string(), vec![pk_col("id")])].into();
let result = svc.check(cs, &base, &stored_fps, ¤t_rows, &pk_map);
assert!(result.is_clean());
}
#[test]
fn detects_conflict_on_same_row_same_column() {
let svc = ConflictService::new();
let table = "pricing_rules";
let base_rows = vec![row(&[("id", json!(1)), ("discount_rate", json!(0.10))])];
let source_after = row(&[("id", json!(1)), ("discount_rate", json!(0.20))]);
let target_rows = vec![row(&[("id", json!(1)), ("discount_rate", json!(0.15))])];
let stored_fp = fingerprint(&base_rows);
let cs = Changeset::new(
"source",
"target",
"postgres",
vec![TableDiff {
table_name: table.to_string(),
primary_key: vec!["id".to_string()],
inserts: vec![],
updates: vec![RowUpdate {
pk: [("id".to_string(), json!(1))].into(),
before: row(&[("id", json!(1)), ("discount_rate", json!(0.15))]),
after: source_after,
changed_columns: vec![ColumnDiff {
column: "discount_rate".to_string(),
before: json!(0.15),
after: json!(0.20),
}],
}],
deletes: vec![],
}],
);
let base = MapSnapshot([(table.to_string(), base_rows.clone())].into());
let stored_fps = [(table.to_string(), stored_fp)].into();
let current_rows = [(table.to_string(), target_rows)].into();
let pk_map = [(table.to_string(), vec![pk_col("id")])].into();
let result = svc.check(cs, &base, &stored_fps, ¤t_rows, &pk_map);
assert!(!result.is_clean());
let conflicts = result.conflicts();
assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].column, "discount_rate");
assert_eq!(conflicts[0].base_value, json!(0.10));
assert_eq!(conflicts[0].source_value, json!(0.20));
assert_eq!(conflicts[0].target_value, json!(0.15));
}
#[test]
fn no_conflict_when_different_rows_changed() {
let svc = ConflictService::new();
let table = "rules";
let base_rows = vec![
row(&[("id", json!(1)), ("val", json!("a"))]),
row(&[("id", json!(2)), ("val", json!("b"))]),
];
let source_after = row(&[("id", json!(1)), ("val", json!("source"))]);
let target_rows = vec![
row(&[("id", json!(1)), ("val", json!("a"))]), row(&[("id", json!(2)), ("val", json!("target"))]), ];
let stored_fp = fingerprint(&base_rows);
let cs = Changeset::new(
"s",
"t",
"postgres",
vec![TableDiff {
table_name: table.to_string(),
primary_key: vec!["id".to_string()],
inserts: vec![],
updates: vec![RowUpdate {
pk: [("id".to_string(), json!(1))].into(),
before: row(&[("id", json!(1)), ("val", json!("a"))]),
after: source_after,
changed_columns: vec![ColumnDiff {
column: "val".to_string(),
before: json!("a"),
after: json!("source"),
}],
}],
deletes: vec![],
}],
);
let base = MapSnapshot([(table.to_string(), base_rows.clone())].into());
let stored_fps = [(table.to_string(), stored_fp)].into();
let current_rows = [(table.to_string(), target_rows)].into();
let pk_map = [(table.to_string(), vec![pk_col("id")])].into();
let result = svc.check(cs, &base, &stored_fps, ¤t_rows, &pk_map);
assert!(result.is_clean(), "Different rows changed → no conflict");
}
}