Skip to main content

mur_core/team/
crdt.rs

1//! Lightweight LWW-Element-Set CRDT for workflow YAML synchronization.
2//!
3//! Implements a Last-Writer-Wins Element Set where each element is a
4//! workflow identified by its ID, and the value is the YAML content.
5//! Conflicts are resolved by comparing (timestamp, peer_id) tuples.
6
7use std::collections::HashMap;
8
9use chrono::{DateTime, Utc};
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use super::{ConflictResolution, SyncConflict, SyncState, TeamSync};
15
16/// A timestamped value in the LWW register.
17#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
18pub struct LwwEntry {
19    /// The workflow YAML content.
20    pub value: String,
21    /// Wall-clock timestamp of the write.
22    pub timestamp: DateTime<Utc>,
23    /// Peer that performed the write.
24    pub peer_id: Uuid,
25    /// Monotonically increasing version counter (per-key).
26    pub version: u64,
27}
28
29/// LWW-Element-Set for synchronizing workflow YAML across peers.
30///
31/// Each workflow ID maps to an `LwwEntry`. When merging, the entry
32/// with the higher timestamp wins; ties are broken by peer ID ordering.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LwwElementSet {
35    /// The local peer's identity.
36    local_peer: Uuid,
37    /// Conflict resolution strategy.
38    resolution: ConflictResolution,
39    /// The element set: workflow_id → LwwEntry.
40    elements: HashMap<String, LwwEntry>,
41    /// Tombstones for removed workflows.
42    removed: HashMap<String, LwwEntry>,
43}
44
45impl LwwElementSet {
46    /// Create a new empty LWW-Element-Set for the given peer.
47    pub fn new(local_peer: Uuid, resolution: ConflictResolution) -> Self {
48        Self {
49            local_peer,
50            resolution,
51            elements: HashMap::new(),
52            removed: HashMap::new(),
53        }
54    }
55
56    /// Insert or update a workflow YAML value.
57    pub fn set(&mut self, workflow_id: &str, yaml: &str) -> LwwEntry {
58        let version = self
59            .elements
60            .get(workflow_id)
61            .map(|e| e.version + 1)
62            .unwrap_or(1);
63
64        let entry = LwwEntry {
65            value: yaml.to_string(),
66            timestamp: Utc::now(),
67            peer_id: self.local_peer,
68            version,
69        };
70
71        self.elements.insert(workflow_id.to_string(), entry.clone());
72        self.removed.remove(workflow_id);
73        entry
74    }
75
76    /// Remove a workflow (add to tombstone set).
77    pub fn remove(&mut self, workflow_id: &str) -> Option<LwwEntry> {
78        if let Some(entry) = self.elements.remove(workflow_id) {
79            let tombstone = LwwEntry {
80                value: String::new(),
81                timestamp: Utc::now(),
82                peer_id: self.local_peer,
83                version: entry.version + 1,
84            };
85            self.removed.insert(workflow_id.to_string(), tombstone.clone());
86            Some(tombstone)
87        } else {
88            None
89        }
90    }
91
92    /// Get the current value for a workflow.
93    pub fn get(&self, workflow_id: &str) -> Option<&LwwEntry> {
94        self.elements.get(workflow_id)
95    }
96
97    /// List all active (non-removed) workflow IDs.
98    pub fn workflows(&self) -> Vec<String> {
99        self.elements.keys().cloned().collect()
100    }
101
102    /// Merge a remote entry for a given workflow ID.
103    /// Returns the winning entry and any conflict detected.
104    pub fn merge(
105        &mut self,
106        workflow_id: &str,
107        remote: &LwwEntry,
108    ) -> (LwwEntry, Option<SyncConflict>) {
109        // Check if remote is a tombstone (empty value after our element)
110        if let Some(tombstone) = self.removed.get(workflow_id) {
111            if Self::entry_wins(tombstone, remote) {
112                return (tombstone.clone(), None);
113            }
114        }
115
116        if let Some(local) = self.elements.get(workflow_id) {
117            if local.timestamp == remote.timestamp && local.peer_id != remote.peer_id {
118                // True conflict: same timestamp, different peers
119                let conflict = SyncConflict {
120                    workflow_id: workflow_id.to_string(),
121                    local_version: local.version,
122                    remote_version: remote.version,
123                    local_peer: local.peer_id.to_string(),
124                    remote_peer: remote.peer_id.to_string(),
125                    detected_at: Utc::now(),
126                };
127
128                let winner = self.resolve(local, remote);
129                self.elements.insert(workflow_id.to_string(), winner.clone());
130                (winner, Some(conflict))
131            } else if Self::entry_wins(remote, local) {
132                // Remote wins
133                self.elements.insert(workflow_id.to_string(), remote.clone());
134                (remote.clone(), None)
135            } else {
136                // Local wins
137                (local.clone(), None)
138            }
139        } else {
140            // No local entry, accept remote
141            self.elements.insert(workflow_id.to_string(), remote.clone());
142            (remote.clone(), None)
143        }
144    }
145
146    /// Merge an entire remote element set into the local one.
147    pub fn merge_set(&mut self, remote_set: &LwwElementSet) -> Vec<SyncConflict> {
148        let mut conflicts = Vec::new();
149
150        for (wf_id, remote_entry) in &remote_set.elements {
151            let (_, conflict) = self.merge(wf_id, remote_entry);
152            if let Some(c) = conflict {
153                conflicts.push(c);
154            }
155        }
156
157        // Merge tombstones
158        for (wf_id, remote_tombstone) in &remote_set.removed {
159            if let Some(local) = self.elements.get(wf_id) {
160                if Self::entry_wins(remote_tombstone, local) {
161                    self.elements.remove(wf_id);
162                    self.removed.insert(wf_id.clone(), remote_tombstone.clone());
163                }
164            }
165            // Merge tombstone timestamps (keep newer)
166            if let Some(local_tomb) = self.removed.get(wf_id) {
167                if Self::entry_wins(remote_tombstone, local_tomb) {
168                    self.removed.insert(wf_id.clone(), remote_tombstone.clone());
169                }
170            } else if !self.elements.contains_key(wf_id) {
171                self.removed.insert(wf_id.clone(), remote_tombstone.clone());
172            }
173        }
174
175        conflicts
176    }
177
178    /// Compare two entries: returns true if `a` wins over `b`.
179    fn entry_wins(a: &LwwEntry, b: &LwwEntry) -> bool {
180        (a.timestamp, a.peer_id) > (b.timestamp, b.peer_id)
181    }
182
183    /// Resolve a conflict between two entries using the configured strategy.
184    fn resolve(&self, local: &LwwEntry, remote: &LwwEntry) -> LwwEntry {
185        match self.resolution {
186            ConflictResolution::LastWriterWins => {
187                if Self::entry_wins(local, remote) {
188                    local.clone()
189                } else {
190                    remote.clone()
191                }
192            }
193            ConflictResolution::HighestNodeId => {
194                if local.peer_id >= remote.peer_id {
195                    local.clone()
196                } else {
197                    remote.clone()
198                }
199            }
200            ConflictResolution::ForkAndMerge => {
201                // In fork mode, keep local but the caller should handle the conflict
202                local.clone()
203            }
204        }
205    }
206}
207
208impl TeamSync for LwwElementSet {
209    fn push(&self, workflow_id: &str, _yaml: &str) -> anyhow::Result<SyncState> {
210        match self.elements.get(workflow_id) {
211            Some(_) => Ok(SyncState::Synced),
212            None => Ok(SyncState::LocalAhead {
213                pending_changes: 1,
214            }),
215        }
216    }
217
218    fn pull(&self, workflow_id: &str) -> anyhow::Result<(String, SyncState)> {
219        match self.elements.get(workflow_id) {
220            Some(entry) => Ok((entry.value.clone(), SyncState::Synced)),
221            None => anyhow::bail!("workflow '{workflow_id}' not found in element set"),
222        }
223    }
224
225    fn state(&self, workflow_id: &str) -> anyhow::Result<SyncState> {
226        if self.elements.contains_key(workflow_id) {
227            Ok(SyncState::Synced)
228        } else {
229            Ok(SyncState::LocalAhead {
230                pending_changes: 0,
231            })
232        }
233    }
234
235    fn resolve_conflict(
236        &self,
237        workflow_id: &str,
238        _resolution: &ConflictResolution,
239    ) -> anyhow::Result<String> {
240        self.elements
241            .get(workflow_id)
242            .map(|e| e.value.clone())
243            .ok_or_else(|| anyhow::anyhow!("workflow '{workflow_id}' not found"))
244    }
245
246    fn local_peer_id(&self) -> Uuid {
247        self.local_peer
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    fn make_peer() -> Uuid {
256        Uuid::new_v4()
257    }
258
259    #[test]
260    fn test_set_and_get() {
261        let peer = make_peer();
262        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
263
264        set.set("wf-1", "name: test\nsteps: []");
265        let entry = set.get("wf-1").unwrap();
266        assert_eq!(entry.value, "name: test\nsteps: []");
267        assert_eq!(entry.version, 1);
268        assert_eq!(entry.peer_id, peer);
269    }
270
271    #[test]
272    fn test_set_increments_version() {
273        let peer = make_peer();
274        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
275
276        set.set("wf-1", "v1");
277        set.set("wf-1", "v2");
278        let entry = set.get("wf-1").unwrap();
279        assert_eq!(entry.value, "v2");
280        assert_eq!(entry.version, 2);
281    }
282
283    #[test]
284    fn test_remove() {
285        let peer = make_peer();
286        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
287
288        set.set("wf-1", "content");
289        let tombstone = set.remove("wf-1");
290        assert!(tombstone.is_some());
291        assert!(set.get("wf-1").is_none());
292        assert!(set.workflows().is_empty());
293    }
294
295    #[test]
296    fn test_remove_nonexistent() {
297        let peer = make_peer();
298        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
299        assert!(set.remove("wf-1").is_none());
300    }
301
302    #[test]
303    fn test_merge_new_entry() {
304        let peer1 = make_peer();
305        let peer2 = make_peer();
306        let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
307
308        let remote = LwwEntry {
309            value: "remote content".into(),
310            timestamp: Utc::now(),
311            peer_id: peer2,
312            version: 1,
313        };
314
315        let (winner, conflict) = set.merge("wf-1", &remote);
316        assert!(conflict.is_none());
317        assert_eq!(winner.value, "remote content");
318        assert_eq!(set.get("wf-1").unwrap().value, "remote content");
319    }
320
321    #[test]
322    fn test_merge_remote_wins() {
323        let peer1 = make_peer();
324        let peer2 = make_peer();
325        let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
326
327        set.set("wf-1", "old");
328
329        // Sleep a tiny bit to ensure different timestamp
330        let remote = LwwEntry {
331            value: "newer".into(),
332            timestamp: Utc::now() + chrono::Duration::seconds(1),
333            peer_id: peer2,
334            version: 2,
335        };
336
337        let (winner, _) = set.merge("wf-1", &remote);
338        assert_eq!(winner.value, "newer");
339    }
340
341    #[test]
342    fn test_merge_local_wins() {
343        let peer1 = make_peer();
344        let peer2 = make_peer();
345        let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
346
347        set.set("wf-1", "local");
348
349        let remote = LwwEntry {
350            value: "older".into(),
351            timestamp: Utc::now() - chrono::Duration::seconds(10),
352            peer_id: peer2,
353            version: 1,
354        };
355
356        let (winner, _) = set.merge("wf-1", &remote);
357        assert_eq!(winner.value, "local");
358    }
359
360    #[test]
361    fn test_merge_set() {
362        let peer1 = make_peer();
363        let peer2 = make_peer();
364        let mut set1 = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
365        let mut set2 = LwwElementSet::new(peer2, ConflictResolution::LastWriterWins);
366
367        set1.set("wf-1", "from peer1");
368        set2.set("wf-2", "from peer2");
369
370        let conflicts = set1.merge_set(&set2);
371        assert!(conflicts.is_empty());
372        assert_eq!(set1.get("wf-1").unwrap().value, "from peer1");
373        assert_eq!(set1.get("wf-2").unwrap().value, "from peer2");
374    }
375
376    #[test]
377    fn test_workflows_list() {
378        let peer = make_peer();
379        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
380
381        set.set("wf-a", "a");
382        set.set("wf-b", "b");
383        set.set("wf-c", "c");
384
385        let mut wfs = set.workflows();
386        wfs.sort();
387        assert_eq!(wfs, vec!["wf-a", "wf-b", "wf-c"]);
388    }
389
390    #[test]
391    fn test_lww_entry_serialization() {
392        let entry = LwwEntry {
393            value: "test yaml".into(),
394            timestamp: Utc::now(),
395            peer_id: Uuid::new_v4(),
396            version: 42,
397        };
398        let json = serde_json::to_string(&entry).unwrap();
399        let back: LwwEntry = serde_json::from_str(&json).unwrap();
400        assert_eq!(entry.value, back.value);
401        assert_eq!(entry.version, back.version);
402    }
403
404    #[test]
405    fn test_team_sync_trait_push() {
406        let peer = make_peer();
407        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
408        set.set("wf-1", "content");
409
410        let state = set.push("wf-1", "content").unwrap();
411        assert_eq!(state, SyncState::Synced);
412    }
413
414    #[test]
415    fn test_team_sync_trait_pull() {
416        let peer = make_peer();
417        let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
418        set.set("wf-1", "yaml content");
419
420        let (yaml, state) = set.pull("wf-1").unwrap();
421        assert_eq!(yaml, "yaml content");
422        assert_eq!(state, SyncState::Synced);
423    }
424
425    #[test]
426    fn test_team_sync_trait_pull_missing() {
427        let peer = make_peer();
428        let set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
429        assert!(set.pull("nonexistent").is_err());
430    }
431
432    #[test]
433    fn test_highest_node_id_resolution() {
434        let peer1 = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
435        let peer2 = Uuid::parse_str("ffffffff-ffff-ffff-ffff-ffffffffffff").unwrap();
436        let mut set = LwwElementSet::new(peer1, ConflictResolution::HighestNodeId);
437
438        let ts = Utc::now();
439        set.set("wf-1", "from low peer");
440
441        // Force same timestamp for conflict
442        let remote = LwwEntry {
443            value: "from high peer".into(),
444            timestamp: ts,
445            peer_id: peer2,
446            version: 1,
447        };
448
449        // Overwrite local entry timestamp to match
450        set.elements.get_mut("wf-1").unwrap().timestamp = ts;
451
452        let (winner, conflict) = set.merge("wf-1", &remote);
453        assert!(conflict.is_some());
454        assert_eq!(winner.value, "from high peer");
455    }
456}