rust_logic_graph/distributed/
versioning.rs1use crate::distributed::context::{ContextSnapshot, DistributedContext};
6use anyhow::{bail, Result};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct ContextVersion {
12 pub version: u64,
14
15 pub timestamp: u64,
17
18 pub created_by: Option<String>,
20
21 pub parent_version: Option<u64>,
23}
24
25impl ContextVersion {
26 pub fn new(version: u64) -> Self {
28 let timestamp = std::time::SystemTime::now()
29 .duration_since(std::time::UNIX_EPOCH)
30 .unwrap()
31 .as_millis() as u64;
32
33 Self {
34 version,
35 timestamp,
36 created_by: None,
37 parent_version: None,
38 }
39 }
40
41 pub fn with_parent(version: u64, parent: u64) -> Self {
43 let mut v = Self::new(version);
44 v.parent_version = Some(parent);
45 v
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConflictResolution {
52 LastWriteWins,
54
55 HigherVersionWins,
57
58 FailOnConflict,
60
61 MergeAll,
63}
64
65#[derive(Debug, Clone)]
67pub struct VersionedContext {
68 pub current: DistributedContext,
70
71 pub history: Vec<ContextSnapshot>,
73
74 pub max_history: usize,
76
77 pub resolution_strategy: ConflictResolution,
79}
80
81impl VersionedContext {
82 pub fn new(session_id: impl Into<String>) -> Self {
84 Self {
85 current: DistributedContext::new(session_id),
86 history: Vec::new(),
87 max_history: 10,
88 resolution_strategy: ConflictResolution::LastWriteWins,
89 }
90 }
91
92 pub fn with_config(
94 session_id: impl Into<String>,
95 max_history: usize,
96 strategy: ConflictResolution,
97 ) -> Self {
98 Self {
99 current: DistributedContext::new(session_id),
100 history: Vec::new(),
101 max_history,
102 resolution_strategy: strategy,
103 }
104 }
105
106 pub fn update(&mut self, new_context: DistributedContext) -> Result<()> {
108 let snapshot = self.current.snapshot();
110 self.history.push(snapshot);
111
112 if self.history.len() > self.max_history {
114 self.history.remove(0);
115 }
116
117 self.current = new_context;
118 Ok(())
119 }
120
121 pub fn merge_with_resolution(&mut self, other: &DistributedContext) -> Result<()> {
123 match self.resolution_strategy {
124 ConflictResolution::LastWriteWins => self.merge_last_write_wins(other),
125 ConflictResolution::HigherVersionWins => self.merge_higher_version_wins(other),
126 ConflictResolution::FailOnConflict => self.merge_fail_on_conflict(other),
127 ConflictResolution::MergeAll => self.merge_all(other),
128 }
129 }
130
131 fn merge_last_write_wins(&mut self, other: &DistributedContext) -> Result<()> {
132 if other.metadata.updated_at > self.current.metadata.updated_at {
134 self.update(other.clone())?;
135 }
136 Ok(())
137 }
138
139 fn merge_higher_version_wins(&mut self, other: &DistributedContext) -> Result<()> {
140 if other.metadata.version > self.current.metadata.version {
141 self.update(other.clone())?;
142 }
143 Ok(())
144 }
145
146 fn merge_fail_on_conflict(&mut self, other: &DistributedContext) -> Result<()> {
147 if self.current.metadata.version != other.metadata.version {
149 bail!(
150 "Version conflict: current={}, other={}. Manual resolution required.",
151 self.current.metadata.version,
152 other.metadata.version
153 );
154 }
155
156 self.update(other.clone())?;
157 Ok(())
158 }
159
160 fn merge_all(&mut self, other: &DistributedContext) -> Result<()> {
161 self.current.merge(other);
163
164 let snapshot = self.current.snapshot();
166 self.history.push(snapshot);
167
168 if self.history.len() > self.max_history {
169 self.history.remove(0);
170 }
171
172 Ok(())
173 }
174
175 pub fn get_version(&self, version: u64) -> Option<&ContextSnapshot> {
177 self.history.iter().find(|s| s.version == version)
178 }
179
180 pub fn rollback_to(&mut self, version: u64) -> Result<()> {
182 let snapshot = self
183 .get_version(version)
184 .ok_or_else(|| anyhow::anyhow!("Version {} not found in history", version))?;
185
186 let mut new_context = DistributedContext::new(&snapshot.session_id);
188 new_context.data = snapshot.data.clone();
189 new_context.metadata.version = snapshot.version + 1; self.update(new_context)?;
192 Ok(())
193 }
194
195 pub fn get_history(&self) -> &[ContextSnapshot] {
197 &self.history
198 }
199
200 pub fn clear_history(&mut self) {
202 self.history.clear();
203 }
204}
205
206pub struct ThreeWayMerge {
208 pub base: ContextSnapshot,
210
211 pub local: ContextSnapshot,
213
214 pub remote: ContextSnapshot,
216}
217
218impl ThreeWayMerge {
219 pub fn new(base: ContextSnapshot, local: ContextSnapshot, remote: ContextSnapshot) -> Self {
221 Self {
222 base,
223 local,
224 remote,
225 }
226 }
227
228 pub fn merge(&self) -> Result<DistributedContext> {
230 use serde_json::Value;
231
232 let mut merged = DistributedContext::new(&self.local.session_id);
233
234 let mut all_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
236 all_keys.extend(self.base.data.keys().cloned());
237 all_keys.extend(self.local.data.keys().cloned());
238 all_keys.extend(self.remote.data.keys().cloned());
239
240 for key in all_keys {
242 let base_val = self.base.data.get(&key);
243 let local_val = self.local.data.get(&key);
244 let remote_val = self.remote.data.get(&key);
245
246 let merged_val = match (base_val, local_val, remote_val) {
247 (Some(_), None, None) => None,
249
250 (Some(b), None, Some(r)) if b == r => None,
252
253 (Some(b), Some(l), None) if b == l => None,
255
256 (Some(_), Some(l), Some(r)) if l == r => Some(l.clone()),
258
259 (Some(b), Some(l), Some(r)) if b == r => Some(l.clone()),
261
262 (Some(b), Some(l), Some(r)) if b == l => Some(r.clone()),
264
265 (Some(_), Some(l), Some(r)) if l != r => {
267 Some(r.clone())
269 }
270
271 (None, Some(l), Some(r)) if l == r => Some(l.clone()),
273
274 (None, Some(l), None) => Some(l.clone()),
276
277 (None, None, Some(r)) => Some(r.clone()),
279
280 (None, Some(_), Some(r)) => Some(r.clone()), _ => None,
284 };
285
286 if let Some(val) = merged_val {
287 merged.set(key, val);
288 }
289 }
290
291 Ok(merged)
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use serde_json::json;
299
300 #[test]
301 fn test_context_version() {
302 let v1 = ContextVersion::new(1);
303 assert_eq!(v1.version, 1);
304 assert!(v1.timestamp > 0);
305
306 let v2 = ContextVersion::with_parent(2, 1);
307 assert_eq!(v2.version, 2);
308 assert_eq!(v2.parent_version, Some(1));
309 }
310
311 #[test]
312 fn test_versioned_context() {
313 let mut vctx = VersionedContext::new("test");
314
315 assert_eq!(vctx.current.metadata.version, 1);
317
318 let mut new_ctx = DistributedContext::new("test");
320 new_ctx.set("key1", json!("value1"));
321 vctx.update(new_ctx).unwrap();
322
323 assert_eq!(vctx.history.len(), 1);
325 }
326
327 #[test]
328 fn test_last_write_wins() {
329 let mut vctx = VersionedContext::new("test");
330 vctx.resolution_strategy = ConflictResolution::LastWriteWins;
331
332 let mut ctx1 = DistributedContext::new("test");
334 ctx1.set("key1", json!("value1"));
335 std::thread::sleep(std::time::Duration::from_millis(10));
336
337 vctx.update(ctx1).unwrap();
338
339 let mut ctx2 = DistributedContext::new("test");
341 ctx2.set("key1", json!("value2"));
342 std::thread::sleep(std::time::Duration::from_millis(10));
343
344 vctx.merge_with_resolution(&ctx2).unwrap();
345
346 assert_eq!(vctx.current.get("key1"), Some(&json!("value2")));
348 }
349
350 #[test]
351 fn test_fail_on_conflict() {
352 let mut vctx = VersionedContext::new("test");
353 vctx.resolution_strategy = ConflictResolution::FailOnConflict;
354
355 let mut ctx1 = DistributedContext::new("test");
357 ctx1.set("key1", json!("value1"));
358 vctx.update(ctx1).unwrap();
359
360 let ctx2 = DistributedContext::new("test");
362 let result = vctx.merge_with_resolution(&ctx2);
363
364 assert!(result.is_err());
365 }
366
367 #[test]
368 fn test_rollback() {
369 let mut vctx = VersionedContext::new("test");
370
371 let mut ctx1 = DistributedContext::new("test");
373 ctx1.set("key1", json!("v1"));
374 vctx.update(ctx1).unwrap();
375 let v1 = vctx.current.metadata.version;
376
377 let mut ctx2 = DistributedContext::new("test");
379 ctx2.set("key1", json!("v2"));
380 vctx.update(ctx2).unwrap();
381
382 vctx.rollback_to(v1).unwrap();
384 assert_eq!(vctx.current.get("key1"), Some(&json!("v1")));
385 }
386
387 #[test]
388 fn test_three_way_merge() {
389 let mut base = DistributedContext::new("test");
391 base.set("key1", json!("base"));
392 let base_snapshot = base.snapshot();
393
394 let mut local = base.clone();
396 local.set("key1", json!("local"));
397 local.set("key2", json!("local-only"));
398 let local_snapshot = local.snapshot();
399
400 let mut remote = base.clone();
402 remote.set("key1", json!("remote"));
403 remote.set("key3", json!("remote-only"));
404 let remote_snapshot = remote.snapshot();
405
406 let merger = ThreeWayMerge::new(base_snapshot, local_snapshot, remote_snapshot);
408 let merged = merger.merge().unwrap();
409
410 assert_eq!(merged.get("key1"), Some(&json!("remote")));
412 assert_eq!(merged.get("key2"), Some(&json!("local-only")));
414 assert_eq!(merged.get("key3"), Some(&json!("remote-only")));
416 }
417}