1use crate::spec_ai_graph_sync::protocol::{SyncedEdge, SyncedNode};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use serde_json::{Value as JsonValue, json};
7use crate::spec_ai_knowledge_graph::{ClockOrder, VectorClock};
8use tracing::{debug, info, warn};
9
10#[derive(Debug, Clone)]
12pub struct ConflictRecord {
13 pub node_id: String,
14 pub conflict_type: ConflictType,
15 pub local_version: JsonValue,
16 pub remote_version: JsonValue,
17 pub resolution: ConflictResolution,
18 pub timestamp: DateTime<Utc>,
19}
20
21#[derive(Debug, Clone)]
22pub enum ConflictType {
23 VectorClockConcurrent,
24 SemanticConflict(String),
25 TypeMismatch,
26 DeleteUpdate, }
28
29#[derive(Debug, Clone)]
30pub enum ConflictResolution {
31 AcceptRemote,
32 KeepLocal,
33 Merged(JsonValue),
34 RequiresManualReview,
35}
36
37pub struct ConflictResolver {
39 instance_id: String,
40 conflict_log: std::sync::Arc<std::sync::Mutex<Vec<ConflictRecord>>>,
41}
42
43impl ConflictResolver {
44 pub fn new(instance_id: String) -> Self {
45 Self {
46 instance_id,
47 conflict_log: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
48 }
49 }
50
51 pub fn resolve_node_conflict(
53 &self,
54 incoming: &SyncedNode,
55 our_node: Option<&SyncedNode>,
56 our_vector_clock: &mut VectorClock,
57 ) -> Result<ConflictResolution> {
58 let incoming_vc = &incoming.vector_clock;
60
61 let clock_order = our_vector_clock.compare(incoming_vc);
63
64 debug!(
65 "Resolving node conflict for {}: clock_order = {:?}",
66 incoming.id, clock_order
67 );
68
69 let resolution = match clock_order {
70 ClockOrder::Before => {
71 info!(
73 "Node {} - our version is older, accepting remote",
74 incoming.id
75 );
76 our_vector_clock.merge(incoming_vc);
77 ConflictResolution::AcceptRemote
78 }
79 ClockOrder::After => {
80 info!("Node {} - our version is newer, keeping local", incoming.id);
82 ConflictResolution::KeepLocal
83 }
84 ClockOrder::Equal => {
85 debug!(
87 "Node {} - versions are equal, no changes needed",
88 incoming.id
89 );
90 ConflictResolution::KeepLocal
91 }
92 ClockOrder::Concurrent => {
93 warn!("Node {} - concurrent modification detected", incoming.id);
95
96 if let Some(local_node) = our_node {
97 let semantic_conflicts = self.detect_semantic_conflicts(local_node, incoming);
99
100 if !semantic_conflicts.is_empty() {
101 warn!(
102 "Semantic conflicts detected for node {}: {:?}",
103 incoming.id, semantic_conflicts
104 );
105 }
106
107 let local_ts = local_node.updated_at;
109 let remote_ts = incoming.updated_at;
110
111 let merged_properties = if incoming.node_type == local_node.node_type {
113 self.apply_type_specific_merge(
114 incoming.node_type.as_str(),
115 &local_node.properties,
116 &incoming.properties,
117 )
118 } else {
119 warn!(
121 "Node type mismatch for {}: local={:?}, remote={:?}",
122 incoming.id, local_node.node_type, incoming.node_type
123 );
124
125 self.record_conflict(ConflictRecord {
127 node_id: incoming.id.to_string(),
128 conflict_type: ConflictType::TypeMismatch,
129 local_version: serde_json::to_value(local_node)?,
130 remote_version: serde_json::to_value(incoming)?,
131 resolution: ConflictResolution::RequiresManualReview,
132 timestamp: Utc::now(),
133 });
134
135 return Ok(ConflictResolution::RequiresManualReview);
136 };
137
138 let merged_label = if remote_ts > local_ts {
140 incoming.label.clone()
141 } else {
142 local_node.label.clone()
143 };
144
145 our_vector_clock.merge(incoming_vc);
147 our_vector_clock.increment(&self.instance_id);
148
149 let merged_node = json!({
151 "id": incoming.id,
152 "label": merged_label,
153 "node_type": incoming.node_type,
154 "properties": merged_properties,
155 "vector_clock": our_vector_clock.to_json()?,
156 "updated_at": Utc::now().to_rfc3339(),
157 });
158
159 self.record_conflict(ConflictRecord {
161 node_id: incoming.id.to_string(),
162 conflict_type: ConflictType::VectorClockConcurrent,
163 local_version: serde_json::to_value(local_node)?,
164 remote_version: serde_json::to_value(incoming)?,
165 resolution: ConflictResolution::Merged(merged_node.clone()),
166 timestamp: Utc::now(),
167 });
168
169 ConflictResolution::Merged(merged_node)
170 } else {
171 warn!(
174 "Node {} exists remotely but not locally with concurrent clock",
175 incoming.id
176 );
177
178 our_vector_clock.merge(incoming_vc);
181 ConflictResolution::AcceptRemote
182 }
183 }
184 };
185
186 Ok(resolution)
187 }
188
189 pub fn resolve_edge_conflict(
191 &self,
192 incoming: &SyncedEdge,
193 our_edge: Option<&SyncedEdge>,
194 our_vector_clock: &mut VectorClock,
195 ) -> Result<ConflictResolution> {
196 let incoming_vc = &incoming.vector_clock;
198
199 let clock_order = our_vector_clock.compare(incoming_vc);
201
202 debug!(
203 "Resolving edge conflict for {}: clock_order = {:?}",
204 incoming.id, clock_order
205 );
206
207 let resolution = match clock_order {
208 ClockOrder::Before => {
209 info!(
211 "Edge {} - our version is older, accepting remote",
212 incoming.id
213 );
214 our_vector_clock.merge(incoming_vc);
215 ConflictResolution::AcceptRemote
216 }
217 ClockOrder::After => {
218 info!("Edge {} - our version is newer, keeping local", incoming.id);
220 ConflictResolution::KeepLocal
221 }
222 ClockOrder::Equal => {
223 debug!(
225 "Edge {} - versions are equal, no changes needed",
226 incoming.id
227 );
228 ConflictResolution::KeepLocal
229 }
230 ClockOrder::Concurrent => {
231 warn!("Edge {} - concurrent modification detected", incoming.id);
233
234 if let Some(local_edge) = our_edge {
235 let local_ts = local_edge.created_at; let remote_ts = incoming.created_at;
238
239 let empty_props = serde_json::json!({});
241 let local_props = local_edge.properties.as_ref().unwrap_or(&empty_props);
242 let remote_props = incoming.properties.as_ref().unwrap_or(&empty_props);
243 let merged_properties =
244 self.merge_json_properties(local_props, remote_props, local_ts, remote_ts);
245
246 let (merged_weight, merged_predicate) = if remote_ts > local_ts {
248 (incoming.weight, incoming.predicate.clone())
249 } else {
250 (local_edge.weight, local_edge.predicate.clone())
251 };
252
253 if local_edge.source_id != incoming.source_id
255 || local_edge.target_id != incoming.target_id
256 {
257 warn!(
258 "Edge {} endpoints changed - requires manual review",
259 incoming.id
260 );
261
262 self.record_conflict(ConflictRecord {
263 node_id: incoming.id.to_string(),
264 conflict_type: ConflictType::SemanticConflict(
265 "Edge endpoints mismatch".to_string(),
266 ),
267 local_version: serde_json::to_value(local_edge)?,
268 remote_version: serde_json::to_value(incoming)?,
269 resolution: ConflictResolution::RequiresManualReview,
270 timestamp: Utc::now(),
271 });
272
273 return Ok(ConflictResolution::RequiresManualReview);
274 }
275
276 our_vector_clock.merge(incoming_vc);
278 our_vector_clock.increment(&self.instance_id);
279
280 let merged_edge = json!({
282 "id": incoming.id,
283 "session_id": incoming.session_id,
284 "source_id": incoming.source_id,
285 "target_id": incoming.target_id,
286 "edge_type": incoming.edge_type,
287 "predicate": merged_predicate,
288 "weight": merged_weight,
289 "properties": Some(merged_properties),
290 "temporal_start": incoming.temporal_start,
291 "temporal_end": incoming.temporal_end,
292 "created_at": incoming.created_at,
293 "vector_clock": our_vector_clock,
294 "last_modified_by": incoming.last_modified_by,
295 "is_deleted": false,
296 "sync_enabled": true,
297 });
298
299 self.record_conflict(ConflictRecord {
301 node_id: incoming.id.to_string(),
302 conflict_type: ConflictType::VectorClockConcurrent,
303 local_version: serde_json::to_value(local_edge)?,
304 remote_version: serde_json::to_value(incoming)?,
305 resolution: ConflictResolution::Merged(merged_edge.clone()),
306 timestamp: Utc::now(),
307 });
308
309 ConflictResolution::Merged(merged_edge)
310 } else {
311 our_vector_clock.merge(incoming_vc);
313 ConflictResolution::AcceptRemote
314 }
315 }
316 };
317
318 Ok(resolution)
319 }
320
321 fn record_conflict(&self, record: ConflictRecord) {
323 if let Ok(mut log) = self.conflict_log.lock() {
324 log.push(record);
325 }
326 }
327
328 pub fn get_conflict_log(&self) -> Vec<ConflictRecord> {
330 self.conflict_log
331 .lock()
332 .map(|log| log.clone())
333 .unwrap_or_default()
334 }
335
336 pub fn clear_conflict_log(&self) {
338 if let Ok(mut log) = self.conflict_log.lock() {
339 log.clear();
340 }
341 }
342
343 #[allow(dead_code)]
345 #[allow(clippy::only_used_in_recursion)]
346 pub fn merge_json_properties(
347 &self,
348 local: &JsonValue,
349 remote: &JsonValue,
350 local_timestamp: chrono::DateTime<chrono::Utc>,
351 remote_timestamp: chrono::DateTime<chrono::Utc>,
352 ) -> JsonValue {
353 match (local, remote) {
354 (JsonValue::Object(local_map), JsonValue::Object(remote_map)) => {
355 let mut merged = serde_json::Map::new();
356
357 for (key, value) in local_map {
359 merged.insert(key.clone(), value.clone());
360 }
361
362 for (key, remote_value) in remote_map {
364 if let Some(local_value) = local_map.get(key) {
365 if local_value.is_object() && remote_value.is_object() {
367 merged.insert(
368 key.clone(),
369 self.merge_json_properties(
370 local_value,
371 remote_value,
372 local_timestamp,
373 remote_timestamp,
374 ),
375 );
376 } else {
377 if remote_timestamp > local_timestamp {
379 merged.insert(key.clone(), remote_value.clone());
380 }
381 }
382 } else {
383 merged.insert(key.clone(), remote_value.clone());
385 }
386 }
387
388 JsonValue::Object(merged)
389 }
390 (JsonValue::Array(local_arr), JsonValue::Array(remote_arr)) => {
391 let mut merged_arr = local_arr.clone();
393 for item in remote_arr {
394 if !merged_arr.contains(item) {
395 merged_arr.push(item.clone());
396 }
397 }
398 JsonValue::Array(merged_arr)
399 }
400 _ => {
401 if remote_timestamp > local_timestamp {
403 remote.clone()
404 } else {
405 local.clone()
406 }
407 }
408 }
409 }
410
411 #[allow(dead_code)]
413 pub fn detect_semantic_conflicts(
414 &self,
415 local: &SyncedNode,
416 remote: &SyncedNode,
417 ) -> Vec<String> {
418 let mut conflicts = Vec::new();
419
420 if local.label != remote.label {
422 conflicts.push(format!(
423 "Label mismatch: '{}' vs '{}'",
424 local.label, remote.label
425 ));
426 }
427
428 if local.node_type != remote.node_type {
429 conflicts.push(format!(
430 "Node type mismatch: {:?} vs {:?}",
431 local.node_type, remote.node_type
432 ));
433 }
434
435 conflicts
436 }
437
438 #[allow(dead_code)]
440 pub fn apply_type_specific_merge(
441 &self,
442 node_type: &str,
443 local: &JsonValue,
444 remote: &JsonValue,
445 ) -> JsonValue {
446 match node_type {
448 "entity" => {
449 self.merge_preserving_keys(local, remote, &["id", "created_by"])
451 }
452 "concept" => {
453 remote.clone()
455 }
456 "fact" => {
457 self.merge_combining_arrays(local, remote, &["evidence", "sources"])
459 }
460 _ => {
461 remote.clone()
463 }
464 }
465 }
466
467 fn merge_preserving_keys(
468 &self,
469 local: &JsonValue,
470 remote: &JsonValue,
471 preserve_keys: &[&str],
472 ) -> JsonValue {
473 if let (JsonValue::Object(local_map), JsonValue::Object(remote_map)) = (local, remote) {
474 let mut merged = remote_map.clone();
475 for key in preserve_keys {
476 if let Some(value) = local_map.get(*key) {
477 merged.insert(key.to_string(), value.clone());
478 }
479 }
480 JsonValue::Object(merged)
481 } else {
482 remote.clone()
483 }
484 }
485
486 fn merge_combining_arrays(
487 &self,
488 local: &JsonValue,
489 remote: &JsonValue,
490 array_keys: &[&str],
491 ) -> JsonValue {
492 if let (JsonValue::Object(local_map), JsonValue::Object(remote_map)) = (local, remote) {
493 let mut merged = local_map.clone();
494
495 for (key, remote_value) in remote_map {
496 if array_keys.contains(&key.as_str()) {
497 if let Some(JsonValue::Array(local_arr)) = merged.get(key) {
499 if let JsonValue::Array(remote_arr) = remote_value {
500 let mut combined = local_arr.clone();
501 for item in remote_arr {
502 if !combined.contains(item) {
503 combined.push(item.clone());
504 }
505 }
506 merged.insert(key.clone(), JsonValue::Array(combined));
507 }
508 } else {
509 merged.insert(key.clone(), remote_value.clone());
510 }
511 } else {
512 merged.insert(key.clone(), remote_value.clone());
514 }
515 }
516
517 JsonValue::Object(merged)
518 } else {
519 remote.clone()
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use serde_json::json;
528
529 #[test]
530 fn test_merge_json_objects() {
531 let resolver = ConflictResolver::new("test-instance".to_string());
532
533 let local = json!({
534 "name": "Alice",
535 "age": 30,
536 "city": "NYC"
537 });
538
539 let remote = json!({
540 "name": "Alice",
541 "age": 31,
542 "country": "USA"
543 });
544
545 let local_time = chrono::Utc::now();
546 let remote_time = local_time + chrono::Duration::seconds(10);
547
548 let merged = resolver.merge_json_properties(&local, &remote, local_time, remote_time);
549
550 assert_eq!(merged["name"], "Alice");
551 assert_eq!(merged["age"], 31); assert_eq!(merged["city"], "NYC"); assert_eq!(merged["country"], "USA"); }
555
556 #[test]
557 fn test_merge_arrays() {
558 let resolver = ConflictResolver::new("test-instance".to_string());
559
560 let local = json!(["a", "b", "c"]);
561 let remote = json!(["b", "c", "d"]);
562
563 let local_time = chrono::Utc::now();
564 let remote_time = local_time + chrono::Duration::seconds(10);
565
566 let merged = resolver.merge_json_properties(&local, &remote, local_time, remote_time);
567
568 if let JsonValue::Array(arr) = merged {
569 assert!(arr.contains(&json!("a")));
570 assert!(arr.contains(&json!("b")));
571 assert!(arr.contains(&json!("c")));
572 assert!(arr.contains(&json!("d")));
573 } else {
574 panic!("Expected array");
575 }
576 }
577
578 #[test]
579 fn test_preserve_keys() {
580 let resolver = ConflictResolver::new("test-instance".to_string());
581
582 let local = json!({
583 "id": "123",
584 "name": "Local",
585 "created_by": "user1"
586 });
587
588 let remote = json!({
589 "id": "456",
590 "name": "Remote",
591 "created_by": "user2"
592 });
593
594 let merged = resolver.merge_preserving_keys(&local, &remote, &["id", "created_by"]);
595
596 assert_eq!(merged["id"], "123"); assert_eq!(merged["name"], "Remote"); assert_eq!(merged["created_by"], "user1"); }
600}