Skip to main content

oxide_mirror/
conflict.rs

1//! Conflict resolution strategies for incoming deltas.
2//!
3//! When a [`Delta`](crate::event::Delta) arrives for a record that already
4//! exists in the local mirror, the [`MirrorStore`](crate::store::MirrorStore)
5//! asks a [`ConflictStrategy`] what to do. The strategy returns a
6//! [`ConflictResolution`] which is one of:
7//!
8//! * [`Apply`](ConflictResolution::Apply) — overwrite the existing record
9//!   with the incoming delta verbatim.
10//! * [`Skip`](ConflictResolution::Skip) — leave the existing record
11//!   untouched. The delta is still recorded in the event log for audit, but
12//!   `mirror_records` is not updated.
13//! * [`ApplyMerged(value)`](ConflictResolution::ApplyMerged) — apply a
14//!   strategy-computed payload (e.g. a JSON-level merge of existing and
15//!   incoming).
16//!
17//! Four built-in strategies ship today:
18//!
19//! | Strategy | Behaviour |
20//! |----------|-----------|
21//! | [`LastWriteWins`] | Always `Apply` — newer wins. Default. |
22//! | [`HighestConfidence`] | `Apply` when incoming confidence ≥ existing, else `Skip`. |
23//! | [`KeepLocal`] | Always `Skip` — local writes are authoritative. |
24//! | [`MergeJson`] | Deep-merge JSON objects: incoming keys override, missing keys preserved. |
25//!
26//! Custom strategies implement [`ConflictStrategy`] directly.
27
28use serde_json::Value;
29
30use crate::event::{Delta, MirroredRecord};
31
32/// What the store should do with an incoming delta.
33#[derive(Debug, Clone)]
34pub enum ConflictResolution {
35    /// Overwrite the existing record with the incoming delta payload.
36    Apply,
37    /// Leave the existing record alone; log the delta only.
38    Skip,
39    /// Overwrite the existing record with a custom payload.
40    ApplyMerged(Value),
41}
42
43/// Pluggable strategy used by [`MirrorStore::apply_delta`](crate::store::MirrorStore::apply_delta).
44pub trait ConflictStrategy: Send + Sync {
45    /// Decide what to do when `incoming` collides with `existing`.
46    ///
47    /// `existing` is `None` when the record does not yet exist; strategies
48    /// should typically return `Apply` in that case.
49    fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution;
50
51    /// Human-readable label for logs / events.
52    fn label(&self) -> &'static str {
53        "custom"
54    }
55}
56
57/// Always overwrite; the most recent delta wins.
58#[derive(Debug, Default, Clone, Copy)]
59pub struct LastWriteWins;
60
61impl ConflictStrategy for LastWriteWins {
62    fn resolve(&self, _existing: Option<&MirroredRecord>, _incoming: &Delta) -> ConflictResolution {
63        ConflictResolution::Apply
64    }
65    fn label(&self) -> &'static str {
66        "last-write-wins"
67    }
68}
69
70/// Apply the incoming delta only if its [`Provenance::confidence`] is
71/// greater than or equal to the existing record's confidence.
72#[derive(Debug, Default, Clone, Copy)]
73pub struct HighestConfidence;
74
75impl ConflictStrategy for HighestConfidence {
76    fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution {
77        match existing {
78            None => ConflictResolution::Apply,
79            Some(rec) if incoming.provenance.confidence >= rec.confidence => {
80                ConflictResolution::Apply
81            }
82            _ => ConflictResolution::Skip,
83        }
84    }
85    fn label(&self) -> &'static str {
86        "highest-confidence"
87    }
88}
89
90/// Always skip when the record already exists. Useful when the mirror is
91/// authoritative and external syncs should only fill in missing rows.
92#[derive(Debug, Default, Clone, Copy)]
93pub struct KeepLocal;
94
95impl ConflictStrategy for KeepLocal {
96    fn resolve(&self, existing: Option<&MirroredRecord>, _incoming: &Delta) -> ConflictResolution {
97        match existing {
98            None => ConflictResolution::Apply,
99            Some(_) => ConflictResolution::Skip,
100        }
101    }
102    fn label(&self) -> &'static str {
103        "keep-local"
104    }
105}
106
107/// Deep-merge JSON objects. Incoming keys overwrite existing ones; keys
108/// missing from the incoming payload survive. Non-object payloads fall back
109/// to [`LastWriteWins`].
110#[derive(Debug, Default, Clone, Copy)]
111pub struct MergeJson;
112
113impl ConflictStrategy for MergeJson {
114    fn resolve(&self, existing: Option<&MirroredRecord>, incoming: &Delta) -> ConflictResolution {
115        let Some(rec) = existing else {
116            return ConflictResolution::Apply;
117        };
118        match (&rec.payload, &incoming.payload) {
119            (Value::Object(_), Value::Object(_)) => {
120                let mut merged = rec.payload.clone();
121                merge(&mut merged, incoming.payload.clone());
122                ConflictResolution::ApplyMerged(merged)
123            }
124            _ => ConflictResolution::Apply,
125        }
126    }
127    fn label(&self) -> &'static str {
128        "merge-json"
129    }
130}
131
132fn merge(base: &mut Value, patch: Value) {
133    match (base, patch) {
134        (Value::Object(b), Value::Object(p)) => {
135            for (k, v) in p {
136                match b.get_mut(&k) {
137                    Some(slot) => merge(slot, v),
138                    None => {
139                        b.insert(k, v);
140                    }
141                }
142            }
143        }
144        (slot, v) => *slot = v,
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::event::{DeltaOp, Provenance};
152    use chrono::Utc;
153    use serde_json::json;
154
155    fn rec(payload: Value, confidence: f32) -> MirroredRecord {
156        MirroredRecord {
157            resource: "pets".into(),
158            record_id: "1".into(),
159            payload,
160            source: "old".into(),
161            last_synced_at: Utc::now(),
162            confidence,
163            version: 1,
164        }
165    }
166
167    fn delta(payload: Value, confidence: f32) -> Delta {
168        Delta {
169            resource: "pets".into(),
170            record_id: "1".into(),
171            op: DeltaOp::Upsert,
172            payload,
173            occurred_at: Utc::now(),
174            provenance: Provenance {
175                source: "new".into(),
176                confidence,
177            },
178        }
179    }
180
181    #[test]
182    fn last_write_wins_always_applies() {
183        let s = LastWriteWins;
184        let existing = rec(json!({"a": 1}), 1.0);
185        let incoming = delta(json!({"a": 2}), 0.1);
186        assert!(matches!(
187            s.resolve(Some(&existing), &incoming),
188            ConflictResolution::Apply
189        ));
190    }
191
192    #[test]
193    fn highest_confidence_skips_when_lower() {
194        let s = HighestConfidence;
195        let existing = rec(json!({"a": 1}), 0.9);
196        let incoming = delta(json!({"a": 2}), 0.5);
197        assert!(matches!(
198            s.resolve(Some(&existing), &incoming),
199            ConflictResolution::Skip
200        ));
201        let incoming2 = delta(json!({"a": 3}), 0.95);
202        assert!(matches!(
203            s.resolve(Some(&existing), &incoming2),
204            ConflictResolution::Apply
205        ));
206    }
207
208    #[test]
209    fn keep_local_skips_when_exists() {
210        let s = KeepLocal;
211        let existing = rec(json!({}), 1.0);
212        let incoming = delta(json!({}), 1.0);
213        assert!(matches!(
214            s.resolve(Some(&existing), &incoming),
215            ConflictResolution::Skip
216        ));
217        assert!(matches!(
218            s.resolve(None, &incoming),
219            ConflictResolution::Apply
220        ));
221    }
222
223    #[test]
224    fn merge_json_deep_merges_objects() {
225        let s = MergeJson;
226        let existing = rec(json!({"a": 1, "nested": {"x": 1, "y": 2}}), 1.0);
227        let incoming = delta(json!({"b": 2, "nested": {"y": 99, "z": 3}}), 1.0);
228        match s.resolve(Some(&existing), &incoming) {
229            ConflictResolution::ApplyMerged(v) => {
230                assert_eq!(v["a"], 1);
231                assert_eq!(v["b"], 2);
232                assert_eq!(v["nested"]["x"], 1);
233                assert_eq!(v["nested"]["y"], 99);
234                assert_eq!(v["nested"]["z"], 3);
235            }
236            other => panic!("unexpected: {other:?}"),
237        }
238    }
239}