rust_logic_graph/distributed/
versioning.rs

1//! Context Versioning and Conflict Resolution
2//!
3//! Provides version tracking and conflict resolution strategies for distributed contexts.
4
5use crate::distributed::context::{ContextSnapshot, DistributedContext};
6use anyhow::{bail, Result};
7use serde::{Deserialize, Serialize};
8
9/// Version information for a context
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct ContextVersion {
12    /// Version number
13    pub version: u64,
14
15    /// Timestamp of this version
16    pub timestamp: u64,
17
18    /// Service that created this version
19    pub created_by: Option<String>,
20
21    /// Parent version (for tracking lineage)
22    pub parent_version: Option<u64>,
23}
24
25impl ContextVersion {
26    /// Create a new version
27    pub fn new(version: u64) -> Self {
28        let timestamp = std::time::SystemTime::now()
29            .duration_since(std::time::UNIX_EPOCH)
30            .unwrap()
31            .as_millis() as u64;
32
33        Self {
34            version,
35            timestamp,
36            created_by: None,
37            parent_version: None,
38        }
39    }
40
41    /// Create a new version with parent
42    pub fn with_parent(version: u64, parent: u64) -> Self {
43        let mut v = Self::new(version);
44        v.parent_version = Some(parent);
45        v
46    }
47}
48
49/// Conflict resolution strategies
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConflictResolution {
52    /// Last write wins (based on timestamp)
53    LastWriteWins,
54
55    /// Higher version wins
56    HigherVersionWins,
57
58    /// Fail on conflict (require manual resolution)
59    FailOnConflict,
60
61    /// Merge all changes (may cause data loss)
62    MergeAll,
63}
64
65/// A versioned context with history tracking
66#[derive(Debug, Clone)]
67pub struct VersionedContext {
68    /// Current context
69    pub current: DistributedContext,
70
71    /// Version history (limited to last N versions)
72    pub history: Vec<ContextSnapshot>,
73
74    /// Maximum history size
75    pub max_history: usize,
76
77    /// Conflict resolution strategy
78    pub resolution_strategy: ConflictResolution,
79}
80
81impl VersionedContext {
82    /// Create a new versioned context
83    pub fn new(session_id: impl Into<String>) -> Self {
84        Self {
85            current: DistributedContext::new(session_id),
86            history: Vec::new(),
87            max_history: 10,
88            resolution_strategy: ConflictResolution::LastWriteWins,
89        }
90    }
91
92    /// Create with custom settings
93    pub fn with_config(
94        session_id: impl Into<String>,
95        max_history: usize,
96        strategy: ConflictResolution,
97    ) -> Self {
98        Self {
99            current: DistributedContext::new(session_id),
100            history: Vec::new(),
101            max_history,
102            resolution_strategy: strategy,
103        }
104    }
105
106    /// Update the context and save to history
107    pub fn update(&mut self, new_context: DistributedContext) -> Result<()> {
108        // Save current state to history
109        let snapshot = self.current.snapshot();
110        self.history.push(snapshot);
111
112        // Trim history if needed
113        if self.history.len() > self.max_history {
114            self.history.remove(0);
115        }
116
117        self.current = new_context;
118        Ok(())
119    }
120
121    /// Merge another context with conflict resolution
122    pub fn merge_with_resolution(&mut self, other: &DistributedContext) -> Result<()> {
123        match self.resolution_strategy {
124            ConflictResolution::LastWriteWins => self.merge_last_write_wins(other),
125            ConflictResolution::HigherVersionWins => self.merge_higher_version_wins(other),
126            ConflictResolution::FailOnConflict => self.merge_fail_on_conflict(other),
127            ConflictResolution::MergeAll => self.merge_all(other),
128        }
129    }
130
131    fn merge_last_write_wins(&mut self, other: &DistributedContext) -> Result<()> {
132        // Compare timestamps
133        if other.metadata.updated_at > self.current.metadata.updated_at {
134            self.update(other.clone())?;
135        }
136        Ok(())
137    }
138
139    fn merge_higher_version_wins(&mut self, other: &DistributedContext) -> Result<()> {
140        if other.metadata.version > self.current.metadata.version {
141            self.update(other.clone())?;
142        }
143        Ok(())
144    }
145
146    fn merge_fail_on_conflict(&mut self, other: &DistributedContext) -> Result<()> {
147        // Check if versions diverged
148        if self.current.metadata.version != other.metadata.version {
149            bail!(
150                "Version conflict: current={}, other={}. Manual resolution required.",
151                self.current.metadata.version,
152                other.metadata.version
153            );
154        }
155
156        self.update(other.clone())?;
157        Ok(())
158    }
159
160    fn merge_all(&mut self, other: &DistributedContext) -> Result<()> {
161        // Merge all fields from other into current
162        self.current.merge(other);
163
164        // Save snapshot
165        let snapshot = self.current.snapshot();
166        self.history.push(snapshot);
167
168        if self.history.len() > self.max_history {
169            self.history.remove(0);
170        }
171
172        Ok(())
173    }
174
175    /// Get a specific version from history
176    pub fn get_version(&self, version: u64) -> Option<&ContextSnapshot> {
177        self.history.iter().find(|s| s.version == version)
178    }
179
180    /// Rollback to a previous version
181    pub fn rollback_to(&mut self, version: u64) -> Result<()> {
182        let snapshot = self
183            .get_version(version)
184            .ok_or_else(|| anyhow::anyhow!("Version {} not found in history", version))?;
185
186        // Reconstruct context from snapshot
187        let mut new_context = DistributedContext::new(&snapshot.session_id);
188        new_context.data = snapshot.data.clone();
189        new_context.metadata.version = snapshot.version + 1; // Increment version
190
191        self.update(new_context)?;
192        Ok(())
193    }
194
195    /// Get version history
196    pub fn get_history(&self) -> &[ContextSnapshot] {
197        &self.history
198    }
199
200    /// Clear history
201    pub fn clear_history(&mut self) {
202        self.history.clear();
203    }
204}
205
206/// Three-way merge for complex conflict resolution
207pub struct ThreeWayMerge {
208    /// Base version (common ancestor)
209    pub base: ContextSnapshot,
210
211    /// Local changes
212    pub local: ContextSnapshot,
213
214    /// Remote changes
215    pub remote: ContextSnapshot,
216}
217
218impl ThreeWayMerge {
219    /// Create a new three-way merge
220    pub fn new(base: ContextSnapshot, local: ContextSnapshot, remote: ContextSnapshot) -> Self {
221        Self {
222            base,
223            local,
224            remote,
225        }
226    }
227
228    /// Perform three-way merge
229    pub fn merge(&self) -> Result<DistributedContext> {
230        use serde_json::Value;
231
232        let mut merged = DistributedContext::new(&self.local.session_id);
233
234        // Collect all keys
235        let mut all_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
236        all_keys.extend(self.base.data.keys().cloned());
237        all_keys.extend(self.local.data.keys().cloned());
238        all_keys.extend(self.remote.data.keys().cloned());
239
240        // Merge each key
241        for key in all_keys {
242            let base_val = self.base.data.get(&key);
243            let local_val = self.local.data.get(&key);
244            let remote_val = self.remote.data.get(&key);
245
246            let merged_val = match (base_val, local_val, remote_val) {
247                // Both sides deleted
248                (Some(_), None, None) => None,
249
250                // Local deleted, remote unchanged
251                (Some(b), None, Some(r)) if b == r => None,
252
253                // Remote deleted, local unchanged
254                (Some(b), Some(l), None) if b == l => None,
255
256                // Both sides modified to same value
257                (Some(_), Some(l), Some(r)) if l == r => Some(l.clone()),
258
259                // Local modified, remote unchanged
260                (Some(b), Some(l), Some(r)) if b == r => Some(l.clone()),
261
262                // Remote modified, local unchanged
263                (Some(b), Some(l), Some(r)) if b == l => Some(r.clone()),
264
265                // Conflict: both modified differently
266                (Some(_), Some(l), Some(r)) if l != r => {
267                    // Last write wins (prefer remote in case of conflict)
268                    Some(r.clone())
269                }
270
271                // New key on both sides with same value
272                (None, Some(l), Some(r)) if l == r => Some(l.clone()),
273
274                // New key on local only
275                (None, Some(l), None) => Some(l.clone()),
276
277                // New key on remote only
278                (None, None, Some(r)) => Some(r.clone()),
279
280                // Conflict: new key on both sides with different values
281                (None, Some(_), Some(r)) => Some(r.clone()), // Prefer remote
282
283                _ => None,
284            };
285
286            if let Some(val) = merged_val {
287                merged.set(key, val);
288            }
289        }
290
291        Ok(merged)
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use serde_json::json;
299
300    #[test]
301    fn test_context_version() {
302        let v1 = ContextVersion::new(1);
303        assert_eq!(v1.version, 1);
304        assert!(v1.timestamp > 0);
305
306        let v2 = ContextVersion::with_parent(2, 1);
307        assert_eq!(v2.version, 2);
308        assert_eq!(v2.parent_version, Some(1));
309    }
310
311    #[test]
312    fn test_versioned_context() {
313        let mut vctx = VersionedContext::new("test");
314
315        // Initial version
316        assert_eq!(vctx.current.metadata.version, 1);
317
318        // Update
319        let mut new_ctx = DistributedContext::new("test");
320        new_ctx.set("key1", json!("value1"));
321        vctx.update(new_ctx).unwrap();
322
323        // Check history
324        assert_eq!(vctx.history.len(), 1);
325    }
326
327    #[test]
328    fn test_last_write_wins() {
329        let mut vctx = VersionedContext::new("test");
330        vctx.resolution_strategy = ConflictResolution::LastWriteWins;
331
332        // First update
333        let mut ctx1 = DistributedContext::new("test");
334        ctx1.set("key1", json!("value1"));
335        std::thread::sleep(std::time::Duration::from_millis(10));
336
337        vctx.update(ctx1).unwrap();
338
339        // Second update (newer timestamp)
340        let mut ctx2 = DistributedContext::new("test");
341        ctx2.set("key1", json!("value2"));
342        std::thread::sleep(std::time::Duration::from_millis(10));
343
344        vctx.merge_with_resolution(&ctx2).unwrap();
345
346        // Should have newer value
347        assert_eq!(vctx.current.get("key1"), Some(&json!("value2")));
348    }
349
350    #[test]
351    fn test_fail_on_conflict() {
352        let mut vctx = VersionedContext::new("test");
353        vctx.resolution_strategy = ConflictResolution::FailOnConflict;
354
355        // Update to version 2
356        let mut ctx1 = DistributedContext::new("test");
357        ctx1.set("key1", json!("value1"));
358        vctx.update(ctx1).unwrap();
359
360        // Try to merge version 1 (conflict)
361        let ctx2 = DistributedContext::new("test");
362        let result = vctx.merge_with_resolution(&ctx2);
363
364        assert!(result.is_err());
365    }
366
367    #[test]
368    fn test_rollback() {
369        let mut vctx = VersionedContext::new("test");
370
371        // Version 1
372        let mut ctx1 = DistributedContext::new("test");
373        ctx1.set("key1", json!("v1"));
374        vctx.update(ctx1).unwrap();
375        let v1 = vctx.current.metadata.version;
376
377        // Version 2
378        let mut ctx2 = DistributedContext::new("test");
379        ctx2.set("key1", json!("v2"));
380        vctx.update(ctx2).unwrap();
381
382        // Rollback to v1
383        vctx.rollback_to(v1).unwrap();
384        assert_eq!(vctx.current.get("key1"), Some(&json!("v1")));
385    }
386
387    #[test]
388    fn test_three_way_merge() {
389        // Base version
390        let mut base = DistributedContext::new("test");
391        base.set("key1", json!("base"));
392        let base_snapshot = base.snapshot();
393
394        // Local changes
395        let mut local = base.clone();
396        local.set("key1", json!("local"));
397        local.set("key2", json!("local-only"));
398        let local_snapshot = local.snapshot();
399
400        // Remote changes
401        let mut remote = base.clone();
402        remote.set("key1", json!("remote"));
403        remote.set("key3", json!("remote-only"));
404        let remote_snapshot = remote.snapshot();
405
406        // Perform merge
407        let merger = ThreeWayMerge::new(base_snapshot, local_snapshot, remote_snapshot);
408        let merged = merger.merge().unwrap();
409
410        // Remote wins on conflict (key1)
411        assert_eq!(merged.get("key1"), Some(&json!("remote")));
412        // Local-only key preserved
413        assert_eq!(merged.get("key2"), Some(&json!("local-only")));
414        // Remote-only key preserved
415        assert_eq!(merged.get("key3"), Some(&json!("remote-only")));
416    }
417}