rust_logic_graph/distributed/
versioning.rs1use crate::distributed::context::{DistributedContext, ContextSnapshot};
6use serde::{Serialize, Deserialize};
7use anyhow::{Result, bail};
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub struct ContextVersion {
12 pub version: u64,
14
15 pub timestamp: u64,
17
18 pub created_by: Option<String>,
20
21 pub parent_version: Option<u64>,
23}
24
25impl ContextVersion {
26 pub fn new(version: u64) -> Self {
28 let timestamp = std::time::SystemTime::now()
29 .duration_since(std::time::UNIX_EPOCH)
30 .unwrap()
31 .as_millis() as u64;
32
33 Self {
34 version,
35 timestamp,
36 created_by: None,
37 parent_version: None,
38 }
39 }
40
41 pub fn with_parent(version: u64, parent: u64) -> Self {
43 let mut v = Self::new(version);
44 v.parent_version = Some(parent);
45 v
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConflictResolution {
52 LastWriteWins,
54
55 HigherVersionWins,
57
58 FailOnConflict,
60
61 MergeAll,
63}
64
65#[derive(Debug, Clone)]
67pub struct VersionedContext {
68 pub current: DistributedContext,
70
71 pub history: Vec<ContextSnapshot>,
73
74 pub max_history: usize,
76
77 pub resolution_strategy: ConflictResolution,
79}
80
81impl VersionedContext {
82 pub fn new(session_id: impl Into<String>) -> Self {
84 Self {
85 current: DistributedContext::new(session_id),
86 history: Vec::new(),
87 max_history: 10,
88 resolution_strategy: ConflictResolution::LastWriteWins,
89 }
90 }
91
92 pub fn with_config(
94 session_id: impl Into<String>,
95 max_history: usize,
96 strategy: ConflictResolution,
97 ) -> Self {
98 Self {
99 current: DistributedContext::new(session_id),
100 history: Vec::new(),
101 max_history,
102 resolution_strategy: strategy,
103 }
104 }
105
106 pub fn update(&mut self, new_context: DistributedContext) -> Result<()> {
108 let snapshot = self.current.snapshot();
110 self.history.push(snapshot);
111
112 if self.history.len() > self.max_history {
114 self.history.remove(0);
115 }
116
117 self.current = new_context;
118 Ok(())
119 }
120
121 pub fn merge_with_resolution(&mut self, other: &DistributedContext) -> Result<()> {
123 match self.resolution_strategy {
124 ConflictResolution::LastWriteWins => {
125 self.merge_last_write_wins(other)
126 }
127 ConflictResolution::HigherVersionWins => {
128 self.merge_higher_version_wins(other)
129 }
130 ConflictResolution::FailOnConflict => {
131 self.merge_fail_on_conflict(other)
132 }
133 ConflictResolution::MergeAll => {
134 self.merge_all(other)
135 }
136 }
137 }
138
139 fn merge_last_write_wins(&mut self, other: &DistributedContext) -> Result<()> {
140 if other.metadata.updated_at > self.current.metadata.updated_at {
142 self.update(other.clone())?;
143 }
144 Ok(())
145 }
146
147 fn merge_higher_version_wins(&mut self, other: &DistributedContext) -> Result<()> {
148 if other.metadata.version > self.current.metadata.version {
149 self.update(other.clone())?;
150 }
151 Ok(())
152 }
153
154 fn merge_fail_on_conflict(&mut self, other: &DistributedContext) -> Result<()> {
155 if self.current.metadata.version != other.metadata.version {
157 bail!(
158 "Version conflict: current={}, other={}. Manual resolution required.",
159 self.current.metadata.version,
160 other.metadata.version
161 );
162 }
163
164 self.update(other.clone())?;
165 Ok(())
166 }
167
168 fn merge_all(&mut self, other: &DistributedContext) -> Result<()> {
169 self.current.merge(other);
171
172 let snapshot = self.current.snapshot();
174 self.history.push(snapshot);
175
176 if self.history.len() > self.max_history {
177 self.history.remove(0);
178 }
179
180 Ok(())
181 }
182
183 pub fn get_version(&self, version: u64) -> Option<&ContextSnapshot> {
185 self.history.iter().find(|s| s.version == version)
186 }
187
188 pub fn rollback_to(&mut self, version: u64) -> Result<()> {
190 let snapshot = self.get_version(version)
191 .ok_or_else(|| anyhow::anyhow!("Version {} not found in history", version))?;
192
193 let mut new_context = DistributedContext::new(&snapshot.session_id);
195 new_context.data = snapshot.data.clone();
196 new_context.metadata.version = snapshot.version + 1; self.update(new_context)?;
199 Ok(())
200 }
201
202 pub fn get_history(&self) -> &[ContextSnapshot] {
204 &self.history
205 }
206
207 pub fn clear_history(&mut self) {
209 self.history.clear();
210 }
211}
212
213pub struct ThreeWayMerge {
215 pub base: ContextSnapshot,
217
218 pub local: ContextSnapshot,
220
221 pub remote: ContextSnapshot,
223}
224
225impl ThreeWayMerge {
226 pub fn new(
228 base: ContextSnapshot,
229 local: ContextSnapshot,
230 remote: ContextSnapshot,
231 ) -> Self {
232 Self { base, local, remote }
233 }
234
235 pub fn merge(&self) -> Result<DistributedContext> {
237 use serde_json::Value;
238
239 let mut merged = DistributedContext::new(&self.local.session_id);
240
241 let mut all_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
243 all_keys.extend(self.base.data.keys().cloned());
244 all_keys.extend(self.local.data.keys().cloned());
245 all_keys.extend(self.remote.data.keys().cloned());
246
247 for key in all_keys {
249 let base_val = self.base.data.get(&key);
250 let local_val = self.local.data.get(&key);
251 let remote_val = self.remote.data.get(&key);
252
253 let merged_val = match (base_val, local_val, remote_val) {
254 (Some(_), None, None) => None,
256
257 (Some(b), None, Some(r)) if b == r => None,
259
260 (Some(b), Some(l), None) if b == l => None,
262
263 (Some(_), Some(l), Some(r)) if l == r => Some(l.clone()),
265
266 (Some(b), Some(l), Some(r)) if b == r => Some(l.clone()),
268
269 (Some(b), Some(l), Some(r)) if b == l => Some(r.clone()),
271
272 (Some(_), Some(l), Some(r)) if l != r => {
274 Some(r.clone())
276 }
277
278 (None, Some(l), Some(r)) if l == r => Some(l.clone()),
280
281 (None, Some(l), None) => Some(l.clone()),
283
284 (None, None, Some(r)) => Some(r.clone()),
286
287 (None, Some(_), Some(r)) => Some(r.clone()), _ => None,
291 };
292
293 if let Some(val) = merged_val {
294 merged.set(key, val);
295 }
296 }
297
298 Ok(merged)
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use serde_json::json;
306
307 #[test]
308 fn test_context_version() {
309 let v1 = ContextVersion::new(1);
310 assert_eq!(v1.version, 1);
311 assert!(v1.timestamp > 0);
312
313 let v2 = ContextVersion::with_parent(2, 1);
314 assert_eq!(v2.version, 2);
315 assert_eq!(v2.parent_version, Some(1));
316 }
317
318 #[test]
319 fn test_versioned_context() {
320 let mut vctx = VersionedContext::new("test");
321
322 assert_eq!(vctx.current.metadata.version, 1);
324
325 let mut new_ctx = DistributedContext::new("test");
327 new_ctx.set("key1", json!("value1"));
328 vctx.update(new_ctx).unwrap();
329
330 assert_eq!(vctx.history.len(), 1);
332 }
333
334 #[test]
335 fn test_last_write_wins() {
336 let mut vctx = VersionedContext::new("test");
337 vctx.resolution_strategy = ConflictResolution::LastWriteWins;
338
339 let mut ctx1 = DistributedContext::new("test");
341 ctx1.set("key1", json!("value1"));
342 std::thread::sleep(std::time::Duration::from_millis(10));
343
344 vctx.update(ctx1).unwrap();
345
346 let mut ctx2 = DistributedContext::new("test");
348 ctx2.set("key1", json!("value2"));
349 std::thread::sleep(std::time::Duration::from_millis(10));
350
351 vctx.merge_with_resolution(&ctx2).unwrap();
352
353 assert_eq!(vctx.current.get("key1"), Some(&json!("value2")));
355 }
356
357 #[test]
358 fn test_fail_on_conflict() {
359 let mut vctx = VersionedContext::new("test");
360 vctx.resolution_strategy = ConflictResolution::FailOnConflict;
361
362 let mut ctx1 = DistributedContext::new("test");
364 ctx1.set("key1", json!("value1"));
365 vctx.update(ctx1).unwrap();
366
367 let ctx2 = DistributedContext::new("test");
369 let result = vctx.merge_with_resolution(&ctx2);
370
371 assert!(result.is_err());
372 }
373
374 #[test]
375 fn test_rollback() {
376 let mut vctx = VersionedContext::new("test");
377
378 let mut ctx1 = DistributedContext::new("test");
380 ctx1.set("key1", json!("v1"));
381 vctx.update(ctx1).unwrap();
382 let v1 = vctx.current.metadata.version;
383
384 let mut ctx2 = DistributedContext::new("test");
386 ctx2.set("key1", json!("v2"));
387 vctx.update(ctx2).unwrap();
388
389 vctx.rollback_to(v1).unwrap();
391 assert_eq!(vctx.current.get("key1"), Some(&json!("v1")));
392 }
393
394 #[test]
395 fn test_three_way_merge() {
396 let mut base = DistributedContext::new("test");
398 base.set("key1", json!("base"));
399 let base_snapshot = base.snapshot();
400
401 let mut local = base.clone();
403 local.set("key1", json!("local"));
404 local.set("key2", json!("local-only"));
405 let local_snapshot = local.snapshot();
406
407 let mut remote = base.clone();
409 remote.set("key1", json!("remote"));
410 remote.set("key3", json!("remote-only"));
411 let remote_snapshot = remote.snapshot();
412
413 let merger = ThreeWayMerge::new(base_snapshot, local_snapshot, remote_snapshot);
415 let merged = merger.merge().unwrap();
416
417 assert_eq!(merged.get("key1"), Some(&json!("remote")));
419 assert_eq!(merged.get("key2"), Some(&json!("local-only")));
421 assert_eq!(merged.get("key3"), Some(&json!("remote-only")));
423 }
424}