rust_logic_graph/distributed/
versioning.rs

1//! Context Versioning and Conflict Resolution
2//!
3//! Provides version tracking and conflict resolution strategies for distributed contexts.
4
5use crate::distributed::context::{DistributedContext, ContextSnapshot};
6use serde::{Serialize, Deserialize};
7use anyhow::{Result, bail};
8
9/// Version information for a context
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct ContextVersion {
12    /// Version number
13    pub version: u64,
14    
15    /// Timestamp of this version
16    pub timestamp: u64,
17    
18    /// Service that created this version
19    pub created_by: Option<String>,
20    
21    /// Parent version (for tracking lineage)
22    pub parent_version: Option<u64>,
23}
24
25impl ContextVersion {
26    /// Create a new version
27    pub fn new(version: u64) -> Self {
28        let timestamp = std::time::SystemTime::now()
29            .duration_since(std::time::UNIX_EPOCH)
30            .unwrap()
31            .as_millis() as u64;
32        
33        Self {
34            version,
35            timestamp,
36            created_by: None,
37            parent_version: None,
38        }
39    }
40    
41    /// Create a new version with parent
42    pub fn with_parent(version: u64, parent: u64) -> Self {
43        let mut v = Self::new(version);
44        v.parent_version = Some(parent);
45        v
46    }
47}
48
49/// Conflict resolution strategies
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConflictResolution {
52    /// Last write wins (based on timestamp)
53    LastWriteWins,
54    
55    /// Higher version wins
56    HigherVersionWins,
57    
58    /// Fail on conflict (require manual resolution)
59    FailOnConflict,
60    
61    /// Merge all changes (may cause data loss)
62    MergeAll,
63}
64
65/// A versioned context with history tracking
66#[derive(Debug, Clone)]
67pub struct VersionedContext {
68    /// Current context
69    pub current: DistributedContext,
70    
71    /// Version history (limited to last N versions)
72    pub history: Vec<ContextSnapshot>,
73    
74    /// Maximum history size
75    pub max_history: usize,
76    
77    /// Conflict resolution strategy
78    pub resolution_strategy: ConflictResolution,
79}
80
81impl VersionedContext {
82    /// Create a new versioned context
83    pub fn new(session_id: impl Into<String>) -> Self {
84        Self {
85            current: DistributedContext::new(session_id),
86            history: Vec::new(),
87            max_history: 10,
88            resolution_strategy: ConflictResolution::LastWriteWins,
89        }
90    }
91    
92    /// Create with custom settings
93    pub fn with_config(
94        session_id: impl Into<String>,
95        max_history: usize,
96        strategy: ConflictResolution,
97    ) -> Self {
98        Self {
99            current: DistributedContext::new(session_id),
100            history: Vec::new(),
101            max_history,
102            resolution_strategy: strategy,
103        }
104    }
105    
106    /// Update the context and save to history
107    pub fn update(&mut self, new_context: DistributedContext) -> Result<()> {
108        // Save current state to history
109        let snapshot = self.current.snapshot();
110        self.history.push(snapshot);
111        
112        // Trim history if needed
113        if self.history.len() > self.max_history {
114            self.history.remove(0);
115        }
116        
117        self.current = new_context;
118        Ok(())
119    }
120    
121    /// Merge another context with conflict resolution
122    pub fn merge_with_resolution(&mut self, other: &DistributedContext) -> Result<()> {
123        match self.resolution_strategy {
124            ConflictResolution::LastWriteWins => {
125                self.merge_last_write_wins(other)
126            }
127            ConflictResolution::HigherVersionWins => {
128                self.merge_higher_version_wins(other)
129            }
130            ConflictResolution::FailOnConflict => {
131                self.merge_fail_on_conflict(other)
132            }
133            ConflictResolution::MergeAll => {
134                self.merge_all(other)
135            }
136        }
137    }
138    
139    fn merge_last_write_wins(&mut self, other: &DistributedContext) -> Result<()> {
140        // Compare timestamps
141        if other.metadata.updated_at > self.current.metadata.updated_at {
142            self.update(other.clone())?;
143        }
144        Ok(())
145    }
146    
147    fn merge_higher_version_wins(&mut self, other: &DistributedContext) -> Result<()> {
148        if other.metadata.version > self.current.metadata.version {
149            self.update(other.clone())?;
150        }
151        Ok(())
152    }
153    
154    fn merge_fail_on_conflict(&mut self, other: &DistributedContext) -> Result<()> {
155        // Check if versions diverged
156        if self.current.metadata.version != other.metadata.version {
157            bail!(
158                "Version conflict: current={}, other={}. Manual resolution required.",
159                self.current.metadata.version,
160                other.metadata.version
161            );
162        }
163        
164        self.update(other.clone())?;
165        Ok(())
166    }
167    
168    fn merge_all(&mut self, other: &DistributedContext) -> Result<()> {
169        // Merge all fields from other into current
170        self.current.merge(other);
171        
172        // Save snapshot
173        let snapshot = self.current.snapshot();
174        self.history.push(snapshot);
175        
176        if self.history.len() > self.max_history {
177            self.history.remove(0);
178        }
179        
180        Ok(())
181    }
182    
183    /// Get a specific version from history
184    pub fn get_version(&self, version: u64) -> Option<&ContextSnapshot> {
185        self.history.iter().find(|s| s.version == version)
186    }
187    
188    /// Rollback to a previous version
189    pub fn rollback_to(&mut self, version: u64) -> Result<()> {
190        let snapshot = self.get_version(version)
191            .ok_or_else(|| anyhow::anyhow!("Version {} not found in history", version))?;
192        
193        // Reconstruct context from snapshot
194        let mut new_context = DistributedContext::new(&snapshot.session_id);
195        new_context.data = snapshot.data.clone();
196        new_context.metadata.version = snapshot.version + 1; // Increment version
197        
198        self.update(new_context)?;
199        Ok(())
200    }
201    
202    /// Get version history
203    pub fn get_history(&self) -> &[ContextSnapshot] {
204        &self.history
205    }
206    
207    /// Clear history
208    pub fn clear_history(&mut self) {
209        self.history.clear();
210    }
211}
212
213/// Three-way merge for complex conflict resolution
214pub struct ThreeWayMerge {
215    /// Base version (common ancestor)
216    pub base: ContextSnapshot,
217    
218    /// Local changes
219    pub local: ContextSnapshot,
220    
221    /// Remote changes
222    pub remote: ContextSnapshot,
223}
224
225impl ThreeWayMerge {
226    /// Create a new three-way merge
227    pub fn new(
228        base: ContextSnapshot,
229        local: ContextSnapshot,
230        remote: ContextSnapshot,
231    ) -> Self {
232        Self { base, local, remote }
233    }
234    
235    /// Perform three-way merge
236    pub fn merge(&self) -> Result<DistributedContext> {
237        use serde_json::Value;
238        
239        let mut merged = DistributedContext::new(&self.local.session_id);
240        
241        // Collect all keys
242        let mut all_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
243        all_keys.extend(self.base.data.keys().cloned());
244        all_keys.extend(self.local.data.keys().cloned());
245        all_keys.extend(self.remote.data.keys().cloned());
246        
247        // Merge each key
248        for key in all_keys {
249            let base_val = self.base.data.get(&key);
250            let local_val = self.local.data.get(&key);
251            let remote_val = self.remote.data.get(&key);
252            
253            let merged_val = match (base_val, local_val, remote_val) {
254                // Both sides deleted
255                (Some(_), None, None) => None,
256                
257                // Local deleted, remote unchanged
258                (Some(b), None, Some(r)) if b == r => None,
259                
260                // Remote deleted, local unchanged
261                (Some(b), Some(l), None) if b == l => None,
262                
263                // Both sides modified to same value
264                (Some(_), Some(l), Some(r)) if l == r => Some(l.clone()),
265                
266                // Local modified, remote unchanged
267                (Some(b), Some(l), Some(r)) if b == r => Some(l.clone()),
268                
269                // Remote modified, local unchanged
270                (Some(b), Some(l), Some(r)) if b == l => Some(r.clone()),
271                
272                // Conflict: both modified differently
273                (Some(_), Some(l), Some(r)) if l != r => {
274                    // Last write wins (prefer remote in case of conflict)
275                    Some(r.clone())
276                }
277                
278                // New key on both sides with same value
279                (None, Some(l), Some(r)) if l == r => Some(l.clone()),
280                
281                // New key on local only
282                (None, Some(l), None) => Some(l.clone()),
283                
284                // New key on remote only
285                (None, None, Some(r)) => Some(r.clone()),
286                
287                // Conflict: new key on both sides with different values
288                (None, Some(_), Some(r)) => Some(r.clone()), // Prefer remote
289                
290                _ => None,
291            };
292            
293            if let Some(val) = merged_val {
294                merged.set(key, val);
295            }
296        }
297        
298        Ok(merged)
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use serde_json::json;
306    
307    #[test]
308    fn test_context_version() {
309        let v1 = ContextVersion::new(1);
310        assert_eq!(v1.version, 1);
311        assert!(v1.timestamp > 0);
312        
313        let v2 = ContextVersion::with_parent(2, 1);
314        assert_eq!(v2.version, 2);
315        assert_eq!(v2.parent_version, Some(1));
316    }
317    
318    #[test]
319    fn test_versioned_context() {
320        let mut vctx = VersionedContext::new("test");
321        
322        // Initial version
323        assert_eq!(vctx.current.metadata.version, 1);
324        
325        // Update
326        let mut new_ctx = DistributedContext::new("test");
327        new_ctx.set("key1", json!("value1"));
328        vctx.update(new_ctx).unwrap();
329        
330        // Check history
331        assert_eq!(vctx.history.len(), 1);
332    }
333    
334    #[test]
335    fn test_last_write_wins() {
336        let mut vctx = VersionedContext::new("test");
337        vctx.resolution_strategy = ConflictResolution::LastWriteWins;
338        
339        // First update
340        let mut ctx1 = DistributedContext::new("test");
341        ctx1.set("key1", json!("value1"));
342        std::thread::sleep(std::time::Duration::from_millis(10));
343        
344        vctx.update(ctx1).unwrap();
345        
346        // Second update (newer timestamp)
347        let mut ctx2 = DistributedContext::new("test");
348        ctx2.set("key1", json!("value2"));
349        std::thread::sleep(std::time::Duration::from_millis(10));
350        
351        vctx.merge_with_resolution(&ctx2).unwrap();
352        
353        // Should have newer value
354        assert_eq!(vctx.current.get("key1"), Some(&json!("value2")));
355    }
356    
357    #[test]
358    fn test_fail_on_conflict() {
359        let mut vctx = VersionedContext::new("test");
360        vctx.resolution_strategy = ConflictResolution::FailOnConflict;
361        
362        // Update to version 2
363        let mut ctx1 = DistributedContext::new("test");
364        ctx1.set("key1", json!("value1"));
365        vctx.update(ctx1).unwrap();
366        
367        // Try to merge version 1 (conflict)
368        let ctx2 = DistributedContext::new("test");
369        let result = vctx.merge_with_resolution(&ctx2);
370        
371        assert!(result.is_err());
372    }
373    
374    #[test]
375    fn test_rollback() {
376        let mut vctx = VersionedContext::new("test");
377        
378        // Version 1
379        let mut ctx1 = DistributedContext::new("test");
380        ctx1.set("key1", json!("v1"));
381        vctx.update(ctx1).unwrap();
382        let v1 = vctx.current.metadata.version;
383        
384        // Version 2
385        let mut ctx2 = DistributedContext::new("test");
386        ctx2.set("key1", json!("v2"));
387        vctx.update(ctx2).unwrap();
388        
389        // Rollback to v1
390        vctx.rollback_to(v1).unwrap();
391        assert_eq!(vctx.current.get("key1"), Some(&json!("v1")));
392    }
393    
394    #[test]
395    fn test_three_way_merge() {
396        // Base version
397        let mut base = DistributedContext::new("test");
398        base.set("key1", json!("base"));
399        let base_snapshot = base.snapshot();
400        
401        // Local changes
402        let mut local = base.clone();
403        local.set("key1", json!("local"));
404        local.set("key2", json!("local-only"));
405        let local_snapshot = local.snapshot();
406        
407        // Remote changes
408        let mut remote = base.clone();
409        remote.set("key1", json!("remote"));
410        remote.set("key3", json!("remote-only"));
411        let remote_snapshot = remote.snapshot();
412        
413        // Perform merge
414        let merger = ThreeWayMerge::new(base_snapshot, local_snapshot, remote_snapshot);
415        let merged = merger.merge().unwrap();
416        
417        // Remote wins on conflict (key1)
418        assert_eq!(merged.get("key1"), Some(&json!("remote")));
419        // Local-only key preserved
420        assert_eq!(merged.get("key2"), Some(&json!("local-only")));
421        // Remote-only key preserved
422        assert_eq!(merged.get("key3"), Some(&json!("remote-only")));
423    }
424}