1use serde_json::Value;
29
30use crate::event::{Delta, MirroredRecord};
31
32#[derive(Debug, Clone)]
34pub enum ConflictResolution {
35 Apply,
37 Skip,
39 ApplyMerged(Value),
41}
42
43pub trait ConflictStrategy: Send + Sync {
45 fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution;
50
51 fn label(&self) -> &'static str {
53 "custom"
54 }
55}
56
57#[derive(Debug, Default, Clone, Copy)]
59pub struct LastWriteWins;
60
61impl ConflictStrategy for LastWriteWins {
62 fn resolve(&self, _existing: Option<&MirroredRecord>, _incoming: &Delta) -> ConflictResolution {
63 ConflictResolution::Apply
64 }
65 fn label(&self) -> &'static str {
66 "last-write-wins"
67 }
68}
69
70#[derive(Debug, Default, Clone, Copy)]
73pub struct HighestConfidence;
74
75impl ConflictStrategy for HighestConfidence {
76 fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution {
77 match existing {
78 None => ConflictResolution::Apply,
79 Some(rec) if incoming.provenance.confidence >= rec.confidence => {
80 ConflictResolution::Apply
81 }
82 _ => ConflictResolution::Skip,
83 }
84 }
85 fn label(&self) -> &'static str {
86 "highest-confidence"
87 }
88}
89
90#[derive(Debug, Default, Clone, Copy)]
93pub struct KeepLocal;
94
95impl ConflictStrategy for KeepLocal {
96 fn resolve(&self, existing: Option<&MirroredRecord>, _incoming: &Delta) -> ConflictResolution {
97 match existing {
98 None => ConflictResolution::Apply,
99 Some(_) => ConflictResolution::Skip,
100 }
101 }
102 fn label(&self) -> &'static str {
103 "keep-local"
104 }
105}
106
107#[derive(Debug, Default, Clone, Copy)]
111pub struct MergeJson;
112
113impl ConflictStrategy for MergeJson {
114 fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution {
115 let Some(rec) = existing else {
116 return ConflictResolution::Apply;
117 };
118 match (&rec.payload, &incoming.payload) {
119 (Value::Object(_), Value::Object(_)) => {
120 let mut merged = rec.payload.clone();
121 merge(&mut merged, incoming.payload.clone());
122 ConflictResolution::ApplyMerged(merged)
123 }
124 _ => ConflictResolution::Apply,
125 }
126 }
127 fn label(&self) -> &'static str {
128 "merge-json"
129 }
130}
131
132fn merge(base: &mut Value, patch: Value) {
133 match (base, patch) {
134 (Value::Object(b), Value::Object(p)) => {
135 for (k, v) in p {
136 match b.get_mut(&k) {
137 Some(slot) => merge(slot, v),
138 None => {
139 b.insert(k, v);
140 }
141 }
142 }
143 }
144 (slot, v) => *slot = v,
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::event::{DeltaOp, Provenance};
152 use chrono::Utc;
153 use serde_json::json;
154
155 fn rec(payload: Value, confidence: f32) -> MirroredRecord {
156 MirroredRecord {
157 resource: "pets".into(),
158 record_id: "1".into(),
159 payload,
160 source: "old".into(),
161 last_synced_at: Utc::now(),
162 confidence,
163 version: 1,
164 }
165 }
166
167 fn delta(payload: Value, confidence: f32) -> Delta {
168 Delta {
169 resource: "pets".into(),
170 record_id: "1".into(),
171 op: DeltaOp::Upsert,
172 payload,
173 occurred_at: Utc::now(),
174 provenance: Provenance {
175 source: "new".into(),
176 confidence,
177 },
178 }
179 }
180
181 #[test]
182 fn last_write_wins_always_applies() {
183 let s = LastWriteWins;
184 let existing = rec(json!({"a": 1}), 1.0);
185 let incoming = delta(json!({"a": 2}), 0.1);
186 assert!(matches!(
187 s.resolve(Some(&existing), &incoming),
188 ConflictResolution::Apply
189 ));
190 }
191
192 #[test]
193 fn highest_confidence_skips_when_lower() {
194 let s = HighestConfidence;
195 let existing = rec(json!({"a": 1}), 0.9);
196 let incoming = delta(json!({"a": 2}), 0.5);
197 assert!(matches!(
198 s.resolve(Some(&existing), &incoming),
199 ConflictResolution::Skip
200 ));
201 let incoming2 = delta(json!({"a": 3}), 0.95);
202 assert!(matches!(
203 s.resolve(Some(&existing), &incoming2),
204 ConflictResolution::Apply
205 ));
206 }
207
208 #[test]
209 fn keep_local_skips_when_exists() {
210 let s = KeepLocal;
211 let existing = rec(json!({}), 1.0);
212 let incoming = delta(json!({}), 1.0);
213 assert!(matches!(
214 s.resolve(Some(&existing), &incoming),
215 ConflictResolution::Skip
216 ));
217 assert!(matches!(
218 s.resolve(None, &incoming),
219 ConflictResolution::Apply
220 ));
221 }
222
223 #[test]
224 fn merge_json_deep_merges_objects() {
225 let s = MergeJson;
226 let existing = rec(json!({"a": 1, "nested": {"x": 1, "y": 2}}), 1.0);
227 let incoming = delta(json!({"b": 2, "nested": {"y": 99, "z": 3}}), 1.0);
228 match s.resolve(Some(&existing), &incoming) {
229 ConflictResolution::ApplyMerged(v) => {
230 assert_eq!(v["a"], 1);
231 assert_eq!(v["b"], 2);
232 assert_eq!(v["nested"]["x"], 1);
233 assert_eq!(v["nested"]["y"], 99);
234 assert_eq!(v["nested"]["z"], 3);
235 }
236 other => panic!("unexpected: {other:?}"),
237 }
238 }
239}