Skip to main content

spec_ai/spec_ai_graph_sync/
resolver.rs

1//! Conflict resolution for graph synchronization.
2
3use crate::spec_ai_graph_sync::protocol::{SyncedEdge, SyncedNode};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use serde_json::{Value as JsonValue, json};
7use crate::spec_ai_knowledge_graph::{ClockOrder, VectorClock};
8use tracing::{debug, info, warn};
9
10/// Represents a detected conflict for audit trail
11#[derive(Debug, Clone)]
12pub struct ConflictRecord {
13    pub node_id: String,
14    pub conflict_type: ConflictType,
15    pub local_version: JsonValue,
16    pub remote_version: JsonValue,
17    pub resolution: ConflictResolution,
18    pub timestamp: DateTime<Utc>,
19}
20
21#[derive(Debug, Clone)]
22pub enum ConflictType {
23    VectorClockConcurrent,
24    SemanticConflict(String),
25    TypeMismatch,
26    DeleteUpdate, // One side deleted, other updated
27}
28
29#[derive(Debug, Clone)]
30pub enum ConflictResolution {
31    AcceptRemote,
32    KeepLocal,
33    Merged(JsonValue),
34    RequiresManualReview,
35}
36
37/// Conflict resolution strategies for graph synchronization
38pub struct ConflictResolver {
39    instance_id: String,
40    conflict_log: std::sync::Arc<std::sync::Mutex<Vec<ConflictRecord>>>,
41}
42
43impl ConflictResolver {
44    pub fn new(instance_id: String) -> Self {
45        Self {
46            instance_id,
47            conflict_log: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
48        }
49    }
50
51    /// Resolve a node conflict using vector clock merge and property reconciliation
52    pub fn resolve_node_conflict(
53        &self,
54        incoming: &SyncedNode,
55        our_node: Option<&SyncedNode>,
56        our_vector_clock: &mut VectorClock,
57    ) -> Result<ConflictResolution> {
58        // Get incoming vector clock
59        let incoming_vc = &incoming.vector_clock;
60
61        // Determine clock ordering
62        let clock_order = our_vector_clock.compare(incoming_vc);
63
64        debug!(
65            "Resolving node conflict for {}: clock_order = {:?}",
66            incoming.id, clock_order
67        );
68
69        let resolution = match clock_order {
70            ClockOrder::Before => {
71                // Our version is older, accept incoming
72                info!(
73                    "Node {} - our version is older, accepting remote",
74                    incoming.id
75                );
76                our_vector_clock.merge(incoming_vc);
77                ConflictResolution::AcceptRemote
78            }
79            ClockOrder::After => {
80                // Our version is newer, keep local
81                info!("Node {} - our version is newer, keeping local", incoming.id);
82                ConflictResolution::KeepLocal
83            }
84            ClockOrder::Equal => {
85                // Same version, no conflict
86                debug!(
87                    "Node {} - versions are equal, no changes needed",
88                    incoming.id
89                );
90                ConflictResolution::KeepLocal
91            }
92            ClockOrder::Concurrent => {
93                // True conflict - need to merge
94                warn!("Node {} - concurrent modification detected", incoming.id);
95
96                if let Some(local_node) = our_node {
97                    // Detect semantic conflicts
98                    let semantic_conflicts = self.detect_semantic_conflicts(local_node, incoming);
99
100                    if !semantic_conflicts.is_empty() {
101                        warn!(
102                            "Semantic conflicts detected for node {}: {:?}",
103                            incoming.id, semantic_conflicts
104                        );
105                    }
106
107                    // Get timestamps (already DateTime<Utc>)
108                    let local_ts = local_node.updated_at;
109                    let remote_ts = incoming.updated_at;
110
111                    // Apply type-specific merge strategy
112                    let merged_properties = if incoming.node_type == local_node.node_type {
113                        self.apply_type_specific_merge(
114                            incoming.node_type.as_str(),
115                            &local_node.properties,
116                            &incoming.properties,
117                        )
118                    } else {
119                        // Type mismatch - this is a serious conflict
120                        warn!(
121                            "Node type mismatch for {}: local={:?}, remote={:?}",
122                            incoming.id, local_node.node_type, incoming.node_type
123                        );
124
125                        // Record for manual review
126                        self.record_conflict(ConflictRecord {
127                            node_id: incoming.id.to_string(),
128                            conflict_type: ConflictType::TypeMismatch,
129                            local_version: serde_json::to_value(local_node)?,
130                            remote_version: serde_json::to_value(incoming)?,
131                            resolution: ConflictResolution::RequiresManualReview,
132                            timestamp: Utc::now(),
133                        });
134
135                        return Ok(ConflictResolution::RequiresManualReview);
136                    };
137
138                    // Choose label based on timestamps
139                    let merged_label = if remote_ts > local_ts {
140                        incoming.label.clone()
141                    } else {
142                        local_node.label.clone()
143                    };
144
145                    // Merge vector clocks
146                    our_vector_clock.merge(incoming_vc);
147                    our_vector_clock.increment(&self.instance_id);
148
149                    // Create merged node
150                    let merged_node = json!({
151                        "id": incoming.id,
152                        "label": merged_label,
153                        "node_type": incoming.node_type,
154                        "properties": merged_properties,
155                        "vector_clock": our_vector_clock.to_json()?,
156                        "updated_at": Utc::now().to_rfc3339(),
157                    });
158
159                    // Record the conflict and resolution
160                    self.record_conflict(ConflictRecord {
161                        node_id: incoming.id.to_string(),
162                        conflict_type: ConflictType::VectorClockConcurrent,
163                        local_version: serde_json::to_value(local_node)?,
164                        remote_version: serde_json::to_value(incoming)?,
165                        resolution: ConflictResolution::Merged(merged_node.clone()),
166                        timestamp: Utc::now(),
167                    });
168
169                    ConflictResolution::Merged(merged_node)
170                } else {
171                    // Node doesn't exist locally but we have a concurrent clock
172                    // This could happen if node was deleted locally
173                    warn!(
174                        "Node {} exists remotely but not locally with concurrent clock",
175                        incoming.id
176                    );
177
178                    // Check if we have a tombstone for this node
179                    // For now, accept the remote version
180                    our_vector_clock.merge(incoming_vc);
181                    ConflictResolution::AcceptRemote
182                }
183            }
184        };
185
186        Ok(resolution)
187    }
188
189    /// Resolve an edge conflict
190    pub fn resolve_edge_conflict(
191        &self,
192        incoming: &SyncedEdge,
193        our_edge: Option<&SyncedEdge>,
194        our_vector_clock: &mut VectorClock,
195    ) -> Result<ConflictResolution> {
196        // Get incoming vector clock
197        let incoming_vc = &incoming.vector_clock;
198
199        // Determine clock ordering
200        let clock_order = our_vector_clock.compare(incoming_vc);
201
202        debug!(
203            "Resolving edge conflict for {}: clock_order = {:?}",
204            incoming.id, clock_order
205        );
206
207        let resolution = match clock_order {
208            ClockOrder::Before => {
209                // Our version is older, accept incoming
210                info!(
211                    "Edge {} - our version is older, accepting remote",
212                    incoming.id
213                );
214                our_vector_clock.merge(incoming_vc);
215                ConflictResolution::AcceptRemote
216            }
217            ClockOrder::After => {
218                // Our version is newer, keep local
219                info!("Edge {} - our version is newer, keeping local", incoming.id);
220                ConflictResolution::KeepLocal
221            }
222            ClockOrder::Equal => {
223                // Same version, no conflict
224                debug!(
225                    "Edge {} - versions are equal, no changes needed",
226                    incoming.id
227                );
228                ConflictResolution::KeepLocal
229            }
230            ClockOrder::Concurrent => {
231                // True conflict - need to merge
232                warn!("Edge {} - concurrent modification detected", incoming.id);
233
234                if let Some(local_edge) = our_edge {
235                    // Get timestamps
236                    let local_ts = local_edge.created_at; // Edges don't have updated_at
237                    let remote_ts = incoming.created_at;
238
239                    // Merge properties based on timestamps
240                    let empty_props = serde_json::json!({});
241                    let local_props = local_edge.properties.as_ref().unwrap_or(&empty_props);
242                    let remote_props = incoming.properties.as_ref().unwrap_or(&empty_props);
243                    let merged_properties =
244                        self.merge_json_properties(local_props, remote_props, local_ts, remote_ts);
245
246                    // Choose weight and predicate based on timestamps
247                    let (merged_weight, merged_predicate) = if remote_ts > local_ts {
248                        (incoming.weight, incoming.predicate.clone())
249                    } else {
250                        (local_edge.weight, local_edge.predicate.clone())
251                    };
252
253                    // Verify edge endpoints haven't changed
254                    if local_edge.source_id != incoming.source_id
255                        || local_edge.target_id != incoming.target_id
256                    {
257                        warn!(
258                            "Edge {} endpoints changed - requires manual review",
259                            incoming.id
260                        );
261
262                        self.record_conflict(ConflictRecord {
263                            node_id: incoming.id.to_string(),
264                            conflict_type: ConflictType::SemanticConflict(
265                                "Edge endpoints mismatch".to_string(),
266                            ),
267                            local_version: serde_json::to_value(local_edge)?,
268                            remote_version: serde_json::to_value(incoming)?,
269                            resolution: ConflictResolution::RequiresManualReview,
270                            timestamp: Utc::now(),
271                        });
272
273                        return Ok(ConflictResolution::RequiresManualReview);
274                    }
275
276                    // Merge vector clocks
277                    our_vector_clock.merge(incoming_vc);
278                    our_vector_clock.increment(&self.instance_id);
279
280                    // Create merged edge
281                    let merged_edge = json!({
282                        "id": incoming.id,
283                        "session_id": incoming.session_id,
284                        "source_id": incoming.source_id,
285                        "target_id": incoming.target_id,
286                        "edge_type": incoming.edge_type,
287                        "predicate": merged_predicate,
288                        "weight": merged_weight,
289                        "properties": Some(merged_properties),
290                        "temporal_start": incoming.temporal_start,
291                        "temporal_end": incoming.temporal_end,
292                        "created_at": incoming.created_at,
293                        "vector_clock": our_vector_clock,
294                        "last_modified_by": incoming.last_modified_by,
295                        "is_deleted": false,
296                        "sync_enabled": true,
297                    });
298
299                    // Record the conflict and resolution
300                    self.record_conflict(ConflictRecord {
301                        node_id: incoming.id.to_string(),
302                        conflict_type: ConflictType::VectorClockConcurrent,
303                        local_version: serde_json::to_value(local_edge)?,
304                        remote_version: serde_json::to_value(incoming)?,
305                        resolution: ConflictResolution::Merged(merged_edge.clone()),
306                        timestamp: Utc::now(),
307                    });
308
309                    ConflictResolution::Merged(merged_edge)
310                } else {
311                    // Edge doesn't exist locally but we have a concurrent clock
312                    our_vector_clock.merge(incoming_vc);
313                    ConflictResolution::AcceptRemote
314                }
315            }
316        };
317
318        Ok(resolution)
319    }
320
321    /// Record a conflict for audit trail
322    fn record_conflict(&self, record: ConflictRecord) {
323        if let Ok(mut log) = self.conflict_log.lock() {
324            log.push(record);
325        }
326    }
327
328    /// Get all recorded conflicts
329    pub fn get_conflict_log(&self) -> Vec<ConflictRecord> {
330        self.conflict_log
331            .lock()
332            .map(|log| log.clone())
333            .unwrap_or_default()
334    }
335
336    /// Clear the conflict log
337    pub fn clear_conflict_log(&self) {
338        if let Ok(mut log) = self.conflict_log.lock() {
339            log.clear();
340        }
341    }
342
343    /// Merge two JSON objects, preferring newer values based on timestamps
344    #[allow(dead_code)]
345    #[allow(clippy::only_used_in_recursion)]
346    pub fn merge_json_properties(
347        &self,
348        local: &JsonValue,
349        remote: &JsonValue,
350        local_timestamp: chrono::DateTime<chrono::Utc>,
351        remote_timestamp: chrono::DateTime<chrono::Utc>,
352    ) -> JsonValue {
353        match (local, remote) {
354            (JsonValue::Object(local_map), JsonValue::Object(remote_map)) => {
355                let mut merged = serde_json::Map::new();
356
357                // Start with local properties
358                for (key, value) in local_map {
359                    merged.insert(key.clone(), value.clone());
360                }
361
362                // Merge remote properties
363                for (key, remote_value) in remote_map {
364                    if let Some(local_value) = local_map.get(key) {
365                        // Key exists in both - recursively merge or use timestamp
366                        if local_value.is_object() && remote_value.is_object() {
367                            merged.insert(
368                                key.clone(),
369                                self.merge_json_properties(
370                                    local_value,
371                                    remote_value,
372                                    local_timestamp,
373                                    remote_timestamp,
374                                ),
375                            );
376                        } else {
377                            // Use timestamp to decide
378                            if remote_timestamp > local_timestamp {
379                                merged.insert(key.clone(), remote_value.clone());
380                            }
381                        }
382                    } else {
383                        // Key only in remote, add it
384                        merged.insert(key.clone(), remote_value.clone());
385                    }
386                }
387
388                JsonValue::Object(merged)
389            }
390            (JsonValue::Array(local_arr), JsonValue::Array(remote_arr)) => {
391                // For arrays, merge and deduplicate
392                let mut merged_arr = local_arr.clone();
393                for item in remote_arr {
394                    if !merged_arr.contains(item) {
395                        merged_arr.push(item.clone());
396                    }
397                }
398                JsonValue::Array(merged_arr)
399            }
400            _ => {
401                // For scalar values, use timestamp
402                if remote_timestamp > local_timestamp {
403                    remote.clone()
404                } else {
405                    local.clone()
406                }
407            }
408        }
409    }
410
411    /// Detect semantic conflicts (application-specific logic)
412    #[allow(dead_code)]
413    pub fn detect_semantic_conflicts(
414        &self,
415        local: &SyncedNode,
416        remote: &SyncedNode,
417    ) -> Vec<String> {
418        let mut conflicts = Vec::new();
419
420        // Example: Check if critical fields differ
421        if local.label != remote.label {
422            conflicts.push(format!(
423                "Label mismatch: '{}' vs '{}'",
424                local.label, remote.label
425            ));
426        }
427
428        if local.node_type != remote.node_type {
429            conflicts.push(format!(
430                "Node type mismatch: {:?} vs {:?}",
431                local.node_type, remote.node_type
432            ));
433        }
434
435        conflicts
436    }
437
438    /// Apply a merge strategy based on node type
439    #[allow(dead_code)]
440    pub fn apply_type_specific_merge(
441        &self,
442        node_type: &str,
443        local: &JsonValue,
444        remote: &JsonValue,
445    ) -> JsonValue {
446        // Application-specific merge rules
447        match node_type {
448            "entity" => {
449                // For entities, merge properties but preserve local identifiers
450                self.merge_preserving_keys(local, remote, &["id", "created_by"])
451            }
452            "concept" => {
453                // For concepts, prefer remote definitions
454                remote.clone()
455            }
456            "fact" => {
457                // For facts, combine evidence from both
458                self.merge_combining_arrays(local, remote, &["evidence", "sources"])
459            }
460            _ => {
461                // Default: prefer newer (remote in conflict scenarios)
462                remote.clone()
463            }
464        }
465    }
466
467    fn merge_preserving_keys(
468        &self,
469        local: &JsonValue,
470        remote: &JsonValue,
471        preserve_keys: &[&str],
472    ) -> JsonValue {
473        if let (JsonValue::Object(local_map), JsonValue::Object(remote_map)) = (local, remote) {
474            let mut merged = remote_map.clone();
475            for key in preserve_keys {
476                if let Some(value) = local_map.get(*key) {
477                    merged.insert(key.to_string(), value.clone());
478                }
479            }
480            JsonValue::Object(merged)
481        } else {
482            remote.clone()
483        }
484    }
485
486    fn merge_combining_arrays(
487        &self,
488        local: &JsonValue,
489        remote: &JsonValue,
490        array_keys: &[&str],
491    ) -> JsonValue {
492        if let (JsonValue::Object(local_map), JsonValue::Object(remote_map)) = (local, remote) {
493            let mut merged = local_map.clone();
494
495            for (key, remote_value) in remote_map {
496                if array_keys.contains(&key.as_str()) {
497                    // Combine arrays
498                    if let Some(JsonValue::Array(local_arr)) = merged.get(key) {
499                        if let JsonValue::Array(remote_arr) = remote_value {
500                            let mut combined = local_arr.clone();
501                            for item in remote_arr {
502                                if !combined.contains(item) {
503                                    combined.push(item.clone());
504                                }
505                            }
506                            merged.insert(key.clone(), JsonValue::Array(combined));
507                        }
508                    } else {
509                        merged.insert(key.clone(), remote_value.clone());
510                    }
511                } else {
512                    // Overwrite with remote value
513                    merged.insert(key.clone(), remote_value.clone());
514                }
515            }
516
517            JsonValue::Object(merged)
518        } else {
519            remote.clone()
520        }
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use serde_json::json;
528
529    #[test]
530    fn test_merge_json_objects() {
531        let resolver = ConflictResolver::new("test-instance".to_string());
532
533        let local = json!({
534            "name": "Alice",
535            "age": 30,
536            "city": "NYC"
537        });
538
539        let remote = json!({
540            "name": "Alice",
541            "age": 31,
542            "country": "USA"
543        });
544
545        let local_time = chrono::Utc::now();
546        let remote_time = local_time + chrono::Duration::seconds(10);
547
548        let merged = resolver.merge_json_properties(&local, &remote, local_time, remote_time);
549
550        assert_eq!(merged["name"], "Alice");
551        assert_eq!(merged["age"], 31); // Remote is newer
552        assert_eq!(merged["city"], "NYC"); // Preserved from local
553        assert_eq!(merged["country"], "USA"); // Added from remote
554    }
555
556    #[test]
557    fn test_merge_arrays() {
558        let resolver = ConflictResolver::new("test-instance".to_string());
559
560        let local = json!(["a", "b", "c"]);
561        let remote = json!(["b", "c", "d"]);
562
563        let local_time = chrono::Utc::now();
564        let remote_time = local_time + chrono::Duration::seconds(10);
565
566        let merged = resolver.merge_json_properties(&local, &remote, local_time, remote_time);
567
568        if let JsonValue::Array(arr) = merged {
569            assert!(arr.contains(&json!("a")));
570            assert!(arr.contains(&json!("b")));
571            assert!(arr.contains(&json!("c")));
572            assert!(arr.contains(&json!("d")));
573        } else {
574            panic!("Expected array");
575        }
576    }
577
578    #[test]
579    fn test_preserve_keys() {
580        let resolver = ConflictResolver::new("test-instance".to_string());
581
582        let local = json!({
583            "id": "123",
584            "name": "Local",
585            "created_by": "user1"
586        });
587
588        let remote = json!({
589            "id": "456",
590            "name": "Remote",
591            "created_by": "user2"
592        });
593
594        let merged = resolver.merge_preserving_keys(&local, &remote, &["id", "created_by"]);
595
596        assert_eq!(merged["id"], "123"); // Preserved
597        assert_eq!(merged["name"], "Remote"); // From remote
598        assert_eq!(merged["created_by"], "user1"); // Preserved
599    }
600}