1use std::collections::HashMap;
8
9use chrono::{DateTime, Utc};
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use super::{ConflictResolution, SyncConflict, SyncState, TeamSync};
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
18pub struct LwwEntry {
19 pub value: String,
21 pub timestamp: DateTime<Utc>,
23 pub peer_id: Uuid,
25 pub version: u64,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LwwElementSet {
35 local_peer: Uuid,
37 resolution: ConflictResolution,
39 elements: HashMap<String, LwwEntry>,
41 removed: HashMap<String, LwwEntry>,
43}
44
45impl LwwElementSet {
46 pub fn new(local_peer: Uuid, resolution: ConflictResolution) -> Self {
48 Self {
49 local_peer,
50 resolution,
51 elements: HashMap::new(),
52 removed: HashMap::new(),
53 }
54 }
55
56 pub fn set(&mut self, workflow_id: &str, yaml: &str) -> LwwEntry {
58 let version = self
59 .elements
60 .get(workflow_id)
61 .map(|e| e.version + 1)
62 .unwrap_or(1);
63
64 let entry = LwwEntry {
65 value: yaml.to_string(),
66 timestamp: Utc::now(),
67 peer_id: self.local_peer,
68 version,
69 };
70
71 self.elements.insert(workflow_id.to_string(), entry.clone());
72 self.removed.remove(workflow_id);
73 entry
74 }
75
76 pub fn remove(&mut self, workflow_id: &str) -> Option<LwwEntry> {
78 if let Some(entry) = self.elements.remove(workflow_id) {
79 let tombstone = LwwEntry {
80 value: String::new(),
81 timestamp: Utc::now(),
82 peer_id: self.local_peer,
83 version: entry.version + 1,
84 };
85 self.removed.insert(workflow_id.to_string(), tombstone.clone());
86 Some(tombstone)
87 } else {
88 None
89 }
90 }
91
92 pub fn get(&self, workflow_id: &str) -> Option<&LwwEntry> {
94 self.elements.get(workflow_id)
95 }
96
97 pub fn workflows(&self) -> Vec<String> {
99 self.elements.keys().cloned().collect()
100 }
101
102 pub fn merge(
105 &mut self,
106 workflow_id: &str,
107 remote: &LwwEntry,
108 ) -> (LwwEntry, Option<SyncConflict>) {
109 if let Some(tombstone) = self.removed.get(workflow_id) {
111 if Self::entry_wins(tombstone, remote) {
112 return (tombstone.clone(), None);
113 }
114 }
115
116 if let Some(local) = self.elements.get(workflow_id) {
117 if local.timestamp == remote.timestamp && local.peer_id != remote.peer_id {
118 let conflict = SyncConflict {
120 workflow_id: workflow_id.to_string(),
121 local_version: local.version,
122 remote_version: remote.version,
123 local_peer: local.peer_id.to_string(),
124 remote_peer: remote.peer_id.to_string(),
125 detected_at: Utc::now(),
126 };
127
128 let winner = self.resolve(local, remote);
129 self.elements.insert(workflow_id.to_string(), winner.clone());
130 (winner, Some(conflict))
131 } else if Self::entry_wins(remote, local) {
132 self.elements.insert(workflow_id.to_string(), remote.clone());
134 (remote.clone(), None)
135 } else {
136 (local.clone(), None)
138 }
139 } else {
140 self.elements.insert(workflow_id.to_string(), remote.clone());
142 (remote.clone(), None)
143 }
144 }
145
146 pub fn merge_set(&mut self, remote_set: &LwwElementSet) -> Vec<SyncConflict> {
148 let mut conflicts = Vec::new();
149
150 for (wf_id, remote_entry) in &remote_set.elements {
151 let (_, conflict) = self.merge(wf_id, remote_entry);
152 if let Some(c) = conflict {
153 conflicts.push(c);
154 }
155 }
156
157 for (wf_id, remote_tombstone) in &remote_set.removed {
159 if let Some(local) = self.elements.get(wf_id) {
160 if Self::entry_wins(remote_tombstone, local) {
161 self.elements.remove(wf_id);
162 self.removed.insert(wf_id.clone(), remote_tombstone.clone());
163 }
164 }
165 if let Some(local_tomb) = self.removed.get(wf_id) {
167 if Self::entry_wins(remote_tombstone, local_tomb) {
168 self.removed.insert(wf_id.clone(), remote_tombstone.clone());
169 }
170 } else if !self.elements.contains_key(wf_id) {
171 self.removed.insert(wf_id.clone(), remote_tombstone.clone());
172 }
173 }
174
175 conflicts
176 }
177
178 fn entry_wins(a: &LwwEntry, b: &LwwEntry) -> bool {
180 (a.timestamp, a.peer_id) > (b.timestamp, b.peer_id)
181 }
182
183 fn resolve(&self, local: &LwwEntry, remote: &LwwEntry) -> LwwEntry {
185 match self.resolution {
186 ConflictResolution::LastWriterWins => {
187 if Self::entry_wins(local, remote) {
188 local.clone()
189 } else {
190 remote.clone()
191 }
192 }
193 ConflictResolution::HighestNodeId => {
194 if local.peer_id >= remote.peer_id {
195 local.clone()
196 } else {
197 remote.clone()
198 }
199 }
200 ConflictResolution::ForkAndMerge => {
201 local.clone()
203 }
204 }
205 }
206}
207
208impl TeamSync for LwwElementSet {
209 fn push(&self, workflow_id: &str, _yaml: &str) -> anyhow::Result<SyncState> {
210 match self.elements.get(workflow_id) {
211 Some(_) => Ok(SyncState::Synced),
212 None => Ok(SyncState::LocalAhead {
213 pending_changes: 1,
214 }),
215 }
216 }
217
218 fn pull(&self, workflow_id: &str) -> anyhow::Result<(String, SyncState)> {
219 match self.elements.get(workflow_id) {
220 Some(entry) => Ok((entry.value.clone(), SyncState::Synced)),
221 None => anyhow::bail!("workflow '{workflow_id}' not found in element set"),
222 }
223 }
224
225 fn state(&self, workflow_id: &str) -> anyhow::Result<SyncState> {
226 if self.elements.contains_key(workflow_id) {
227 Ok(SyncState::Synced)
228 } else {
229 Ok(SyncState::LocalAhead {
230 pending_changes: 0,
231 })
232 }
233 }
234
235 fn resolve_conflict(
236 &self,
237 workflow_id: &str,
238 _resolution: &ConflictResolution,
239 ) -> anyhow::Result<String> {
240 self.elements
241 .get(workflow_id)
242 .map(|e| e.value.clone())
243 .ok_or_else(|| anyhow::anyhow!("workflow '{workflow_id}' not found"))
244 }
245
246 fn local_peer_id(&self) -> Uuid {
247 self.local_peer
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 fn make_peer() -> Uuid {
256 Uuid::new_v4()
257 }
258
259 #[test]
260 fn test_set_and_get() {
261 let peer = make_peer();
262 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
263
264 set.set("wf-1", "name: test\nsteps: []");
265 let entry = set.get("wf-1").unwrap();
266 assert_eq!(entry.value, "name: test\nsteps: []");
267 assert_eq!(entry.version, 1);
268 assert_eq!(entry.peer_id, peer);
269 }
270
271 #[test]
272 fn test_set_increments_version() {
273 let peer = make_peer();
274 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
275
276 set.set("wf-1", "v1");
277 set.set("wf-1", "v2");
278 let entry = set.get("wf-1").unwrap();
279 assert_eq!(entry.value, "v2");
280 assert_eq!(entry.version, 2);
281 }
282
283 #[test]
284 fn test_remove() {
285 let peer = make_peer();
286 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
287
288 set.set("wf-1", "content");
289 let tombstone = set.remove("wf-1");
290 assert!(tombstone.is_some());
291 assert!(set.get("wf-1").is_none());
292 assert!(set.workflows().is_empty());
293 }
294
295 #[test]
296 fn test_remove_nonexistent() {
297 let peer = make_peer();
298 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
299 assert!(set.remove("wf-1").is_none());
300 }
301
302 #[test]
303 fn test_merge_new_entry() {
304 let peer1 = make_peer();
305 let peer2 = make_peer();
306 let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
307
308 let remote = LwwEntry {
309 value: "remote content".into(),
310 timestamp: Utc::now(),
311 peer_id: peer2,
312 version: 1,
313 };
314
315 let (winner, conflict) = set.merge("wf-1", &remote);
316 assert!(conflict.is_none());
317 assert_eq!(winner.value, "remote content");
318 assert_eq!(set.get("wf-1").unwrap().value, "remote content");
319 }
320
321 #[test]
322 fn test_merge_remote_wins() {
323 let peer1 = make_peer();
324 let peer2 = make_peer();
325 let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
326
327 set.set("wf-1", "old");
328
329 let remote = LwwEntry {
331 value: "newer".into(),
332 timestamp: Utc::now() + chrono::Duration::seconds(1),
333 peer_id: peer2,
334 version: 2,
335 };
336
337 let (winner, _) = set.merge("wf-1", &remote);
338 assert_eq!(winner.value, "newer");
339 }
340
341 #[test]
342 fn test_merge_local_wins() {
343 let peer1 = make_peer();
344 let peer2 = make_peer();
345 let mut set = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
346
347 set.set("wf-1", "local");
348
349 let remote = LwwEntry {
350 value: "older".into(),
351 timestamp: Utc::now() - chrono::Duration::seconds(10),
352 peer_id: peer2,
353 version: 1,
354 };
355
356 let (winner, _) = set.merge("wf-1", &remote);
357 assert_eq!(winner.value, "local");
358 }
359
360 #[test]
361 fn test_merge_set() {
362 let peer1 = make_peer();
363 let peer2 = make_peer();
364 let mut set1 = LwwElementSet::new(peer1, ConflictResolution::LastWriterWins);
365 let mut set2 = LwwElementSet::new(peer2, ConflictResolution::LastWriterWins);
366
367 set1.set("wf-1", "from peer1");
368 set2.set("wf-2", "from peer2");
369
370 let conflicts = set1.merge_set(&set2);
371 assert!(conflicts.is_empty());
372 assert_eq!(set1.get("wf-1").unwrap().value, "from peer1");
373 assert_eq!(set1.get("wf-2").unwrap().value, "from peer2");
374 }
375
376 #[test]
377 fn test_workflows_list() {
378 let peer = make_peer();
379 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
380
381 set.set("wf-a", "a");
382 set.set("wf-b", "b");
383 set.set("wf-c", "c");
384
385 let mut wfs = set.workflows();
386 wfs.sort();
387 assert_eq!(wfs, vec!["wf-a", "wf-b", "wf-c"]);
388 }
389
390 #[test]
391 fn test_lww_entry_serialization() {
392 let entry = LwwEntry {
393 value: "test yaml".into(),
394 timestamp: Utc::now(),
395 peer_id: Uuid::new_v4(),
396 version: 42,
397 };
398 let json = serde_json::to_string(&entry).unwrap();
399 let back: LwwEntry = serde_json::from_str(&json).unwrap();
400 assert_eq!(entry.value, back.value);
401 assert_eq!(entry.version, back.version);
402 }
403
404 #[test]
405 fn test_team_sync_trait_push() {
406 let peer = make_peer();
407 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
408 set.set("wf-1", "content");
409
410 let state = set.push("wf-1", "content").unwrap();
411 assert_eq!(state, SyncState::Synced);
412 }
413
414 #[test]
415 fn test_team_sync_trait_pull() {
416 let peer = make_peer();
417 let mut set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
418 set.set("wf-1", "yaml content");
419
420 let (yaml, state) = set.pull("wf-1").unwrap();
421 assert_eq!(yaml, "yaml content");
422 assert_eq!(state, SyncState::Synced);
423 }
424
425 #[test]
426 fn test_team_sync_trait_pull_missing() {
427 let peer = make_peer();
428 let set = LwwElementSet::new(peer, ConflictResolution::LastWriterWins);
429 assert!(set.pull("nonexistent").is_err());
430 }
431
432 #[test]
433 fn test_highest_node_id_resolution() {
434 let peer1 = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
435 let peer2 = Uuid::parse_str("ffffffff-ffff-ffff-ffff-ffffffffffff").unwrap();
436 let mut set = LwwElementSet::new(peer1, ConflictResolution::HighestNodeId);
437
438 let ts = Utc::now();
439 set.set("wf-1", "from low peer");
440
441 let remote = LwwEntry {
443 value: "from high peer".into(),
444 timestamp: ts,
445 peer_id: peer2,
446 version: 1,
447 };
448
449 set.elements.get_mut("wf-1").unwrap().timestamp = ts;
451
452 let (winner, conflict) = set.merge("wf-1", &remote);
453 assert!(conflict.is_some());
454 assert_eq!(winner.value, "from high peer");
455 }
456}