1use crate::spec_ai_graph_sync::persistence::SyncPersistence;
4use crate::spec_ai_graph_sync::protocol::{GraphSyncPayload, SyncType, SyncedEdge, SyncedNode, Tombstone};
5use crate::spec_ai_graph_sync::resolver::{ConflictResolution, ConflictResolver};
6use crate::spec_ai_graph_sync::types::{SyncedEdgeRecord, SyncedNodeRecord};
7use anyhow::Result;
8use serde::Serialize;
9use serde_json::json;
10use crate::spec_ai_knowledge_graph::{ClockOrder, EdgeType, NodeType, VectorClock};
11use std::collections::HashSet;
12
13const INCREMENTAL_THRESHOLD: f32 = 0.3; pub struct SyncEngine<P: SyncPersistence> {
19 persistence: P,
20 instance_id: String,
21 resolver: ConflictResolver,
22}
23
24#[derive(Debug, Clone)]
26pub struct SyncStats {
27 pub nodes_sent: usize,
28 pub edges_sent: usize,
29 pub tombstones_sent: usize,
30 pub nodes_applied: usize,
31 pub edges_applied: usize,
32 pub tombstones_applied: usize,
33 pub conflicts_detected: usize,
34 pub conflicts_resolved: usize,
35 pub sync_type: String,
36}
37
38impl<P: SyncPersistence> SyncEngine<P> {
39 pub fn new(persistence: P, instance_id: String) -> Self {
41 Self {
42 persistence,
43 instance_id: instance_id.clone(),
44 resolver: ConflictResolver::new(instance_id),
45 }
46 }
47
48 pub fn persistence(&self) -> &P {
50 &self.persistence
51 }
52
53 pub fn instance_id(&self) -> &str {
55 &self.instance_id
56 }
57
58 pub fn resolver(&self) -> &ConflictResolver {
60 &self.resolver
61 }
62
63 pub async fn decide_sync_strategy(
65 &self,
66 session_id: &str,
67 graph_name: &str,
68 their_vector_clock: &VectorClock,
69 ) -> Result<SyncType> {
70 let our_vc_str = self
72 .persistence
73 .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
74 .unwrap_or_else(|| "{}".to_string());
75 let our_vc = VectorClock::from_json(&our_vc_str)?;
76
77 if their_vector_clock.is_empty() || our_vc.is_empty() {
79 return Ok(SyncType::Full);
80 }
81
82 let total_nodes = self.persistence.count_graph_nodes(session_id)?;
84
85 if total_nodes == 0 {
86 return Ok(SyncType::Full);
87 }
88
89 let since_timestamp = chrono::Utc::now()
92 .checked_sub_signed(chrono::Duration::hours(24))
93 .unwrap()
94 .to_rfc3339();
95
96 let changelog_entries = self
97 .persistence
98 .graph_changelog_get_since(session_id, &since_timestamp)?;
99
100 let changed_count = changelog_entries.len();
102 let change_ratio = (changed_count as f32) / (total_nodes as f32);
103
104 if change_ratio > INCREMENTAL_THRESHOLD {
105 Ok(SyncType::Full)
106 } else {
107 Ok(SyncType::Incremental)
108 }
109 }
110
111 pub async fn sync_full(&self, session_id: &str, graph_name: &str) -> Result<GraphSyncPayload> {
113 let nodes = self
115 .persistence
116 .graph_list_nodes_with_sync(session_id, true, false)?;
117 let edges = self
118 .persistence
119 .graph_list_edges_with_sync(session_id, true, false)?;
120
121 let vc_str = self
123 .persistence
124 .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
125 .unwrap_or_else(|| "{}".to_string());
126 let vector_clock = VectorClock::from_json(&vc_str)?;
127
128 let synced_nodes: Vec<SyncedNode> = nodes
130 .into_iter()
131 .map(|n| Self::node_record_to_synced(n))
132 .collect();
133 let synced_edges: Vec<SyncedEdge> = edges
134 .into_iter()
135 .map(|e| Self::edge_record_to_synced(e))
136 .collect();
137
138 Ok(GraphSyncPayload::response_full(
139 session_id.to_string(),
140 Some(graph_name.to_string()),
141 vector_clock,
142 synced_nodes,
143 synced_edges,
144 Vec::new(), None,
146 ))
147 }
148
149 pub async fn sync_incremental(
151 &self,
152 session_id: &str,
153 graph_name: &str,
154 their_vector_clock: &VectorClock,
155 ) -> Result<GraphSyncPayload> {
156 let our_vc_str = self
158 .persistence
159 .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
160 .unwrap_or_else(|| "{}".to_string());
161 let our_vector_clock = VectorClock::from_json(&our_vc_str)?;
162
163 let since_timestamp = chrono::Utc::now()
166 .checked_sub_signed(chrono::Duration::days(7))
167 .unwrap()
168 .to_rfc3339();
169
170 let changelog = self
171 .persistence
172 .graph_changelog_get_since(session_id, &since_timestamp)?;
173
174 let relevant_changes: Vec<_> = changelog
176 .iter()
177 .filter(|entry| {
178 if let Ok(entry_vc) = VectorClock::from_json(&entry.vector_clock) {
179 their_vector_clock.happens_before(&entry_vc)
180 || their_vector_clock.is_concurrent(&entry_vc)
181 } else {
182 false
183 }
184 })
185 .collect();
186
187 let mut node_ids: HashSet<i64> = HashSet::new();
189 let mut edge_ids: HashSet<i64> = HashSet::new();
190 let mut tombstones: Vec<Tombstone> = Vec::new();
191
192 for entry in relevant_changes {
193 match entry.entity_type.as_str() {
194 "node" => {
195 if entry.operation == "delete" {
196 let vc = VectorClock::from_json(&entry.vector_clock)?;
197 tombstones.push(Tombstone::new(
198 "node".to_string(),
199 entry.entity_id,
200 vc,
201 entry.instance_id.clone(),
202 ));
203 } else {
204 node_ids.insert(entry.entity_id);
205 }
206 }
207 "edge" => {
208 if entry.operation == "delete" {
209 let vc = VectorClock::from_json(&entry.vector_clock)?;
210 tombstones.push(Tombstone::new(
211 "edge".to_string(),
212 entry.entity_id,
213 vc,
214 entry.instance_id.clone(),
215 ));
216 } else {
217 edge_ids.insert(entry.entity_id);
218 }
219 }
220 _ => {}
221 }
222 }
223
224 let mut synced_nodes = Vec::new();
226 for node_id in node_ids {
227 if let Some(node) = self.persistence.graph_get_node_with_sync(node_id)? {
228 if node.sync_enabled && !node.is_deleted {
229 synced_nodes.push(Self::node_record_to_synced(node));
230 }
231 }
232 }
233
234 let mut synced_edges = Vec::new();
235 for edge_id in edge_ids {
236 if let Some(edge) = self.persistence.graph_get_edge_with_sync(edge_id)? {
237 if edge.sync_enabled && !edge.is_deleted {
238 synced_edges.push(Self::edge_record_to_synced(edge));
239 }
240 }
241 }
242
243 Ok(GraphSyncPayload::response_incremental(
244 session_id.to_string(),
245 Some(graph_name.to_string()),
246 our_vector_clock,
247 synced_nodes,
248 synced_edges,
249 tombstones,
250 None,
251 ))
252 }
253
254 pub async fn apply_sync(
256 &self,
257 payload: &GraphSyncPayload,
258 graph_name: &str,
259 ) -> Result<SyncStats> {
260 let mut stats = SyncStats {
261 nodes_sent: 0,
262 edges_sent: 0,
263 tombstones_sent: 0,
264 nodes_applied: 0,
265 edges_applied: 0,
266 tombstones_applied: 0,
267 conflicts_detected: 0,
268 conflicts_resolved: 0,
269 sync_type: format!("{:?}", payload.sync_type),
270 };
271
272 let our_vc_str = self
274 .persistence
275 .graph_sync_state_get(&self.instance_id, &payload.session_id, graph_name)?
276 .unwrap_or_else(|| "{}".to_string());
277 let mut our_vector_clock = VectorClock::from_json(&our_vc_str)?;
278
279 for node in &payload.nodes {
281 match self.apply_synced_node(node, &mut our_vector_clock).await {
282 Ok(applied) => {
283 if applied {
284 stats.nodes_applied += 1;
285 }
286 }
287 Err(e) if e.to_string().contains("conflict") => {
288 stats.conflicts_detected += 1;
289 let existing_node = self
291 .persistence
292 .graph_get_node_with_sync(node.id)?
293 .map(|n| Self::node_record_to_synced(n));
294
295 let resolution = self.resolver.resolve_node_conflict(
296 node,
297 existing_node.as_ref(),
298 &mut our_vector_clock,
299 );
300
301 self.record_conflict(
302 &node.session_id,
303 graph_name,
304 "node",
305 node.id,
306 existing_node.as_ref(),
307 node,
308 &our_vector_clock,
309 resolution.as_ref().ok(),
310 );
311
312 match resolution {
314 Ok(ConflictResolution::AcceptRemote) => {
315 self.update_node_from_synced(node)?;
317 stats.conflicts_resolved += 1;
318 stats.nodes_applied += 1;
319 }
320 Ok(ConflictResolution::KeepLocal) => {
321 stats.conflicts_resolved += 1;
323 }
324 Ok(ConflictResolution::Merged(merged_value)) => {
325 if let Ok(merged_node) =
327 serde_json::from_value::<SyncedNode>(merged_value)
328 {
329 self.update_node_from_synced(&merged_node)?;
330 stats.conflicts_resolved += 1;
331 stats.nodes_applied += 1;
332 }
333 }
334 Ok(ConflictResolution::RequiresManualReview) => {
335 tracing::warn!("Node {} conflict requires manual review", node.id);
336 }
338 Err(e) => {
339 tracing::warn!(
340 "Failed to resolve conflict for node {}: {}",
341 node.id,
342 e
343 );
344 }
345 }
346 }
347 Err(e) => {
348 tracing::warn!("Failed to apply node {}: {}", node.id, e);
349 }
350 }
351 }
352
353 for edge in &payload.edges {
355 match self.apply_synced_edge(edge, &mut our_vector_clock).await {
356 Ok(applied) => {
357 if applied {
358 stats.edges_applied += 1;
359 }
360 }
361 Err(e) if e.to_string().contains("conflict") => {
362 stats.conflicts_detected += 1;
363 let existing_edge = self
365 .persistence
366 .graph_get_edge_with_sync(edge.id)?
367 .map(|e| Self::edge_record_to_synced(e));
368
369 let resolution = self.resolver.resolve_edge_conflict(
370 edge,
371 existing_edge.as_ref(),
372 &mut our_vector_clock,
373 );
374
375 self.record_conflict(
376 &edge.session_id,
377 graph_name,
378 "edge",
379 edge.id,
380 existing_edge.as_ref(),
381 edge,
382 &our_vector_clock,
383 resolution.as_ref().ok(),
384 );
385
386 match resolution {
388 Ok(ConflictResolution::AcceptRemote) => {
389 self.update_edge_from_synced(edge)?;
391 stats.conflicts_resolved += 1;
392 stats.edges_applied += 1;
393 }
394 Ok(ConflictResolution::KeepLocal) => {
395 stats.conflicts_resolved += 1;
397 }
398 Ok(ConflictResolution::Merged(merged_value)) => {
399 if let Ok(merged_edge) =
401 serde_json::from_value::<SyncedEdge>(merged_value)
402 {
403 self.update_edge_from_synced(&merged_edge)?;
404 stats.conflicts_resolved += 1;
405 stats.edges_applied += 1;
406 }
407 }
408 Ok(ConflictResolution::RequiresManualReview) => {
409 tracing::warn!("Edge {} conflict requires manual review", edge.id);
410 }
412 Err(e) => {
413 tracing::warn!(
414 "Failed to resolve conflict for edge {}: {}",
415 edge.id,
416 e
417 );
418 }
419 }
420 }
421 Err(e) => {
422 tracing::warn!("Failed to apply edge {}: {}", edge.id, e);
423 }
424 }
425 }
426
427 for tombstone in &payload.tombstones {
429 match self.apply_tombstone(tombstone, &mut our_vector_clock).await {
430 Ok(applied) => {
431 if applied {
432 stats.tombstones_applied += 1;
433 }
434 }
435 Err(e) => {
436 tracing::warn!(
437 "Failed to apply tombstone for {} {}: {}",
438 tombstone.entity_type,
439 tombstone.entity_id,
440 e
441 );
442 }
443 }
444 }
445
446 our_vector_clock.merge(&payload.vector_clock);
448
449 let updated_vc_str = our_vector_clock.to_json()?;
451 self.persistence.graph_sync_state_update(
452 &self.instance_id,
453 &payload.session_id,
454 graph_name,
455 &updated_vc_str,
456 )?;
457
458 Ok(stats)
459 }
460
461 #[allow(clippy::too_many_arguments)]
462 fn record_conflict<V: Serialize>(
463 &self,
464 session_id: &str,
465 graph_name: &str,
466 entity_type: &str,
467 entity_id: i64,
468 local_version: Option<&V>,
469 remote_version: &V,
470 vector_clock: &VectorClock,
471 resolution: Option<&ConflictResolution>,
472 ) {
473 let vc_json = match vector_clock.to_json() {
474 Ok(vc) => vc,
475 Err(e) => {
476 tracing::warn!("Failed to serialize vector clock for conflict log: {}", e);
477 return;
478 }
479 };
480
481 let local_json = local_version.and_then(|v| serde_json::to_value(v).ok());
482 let remote_json = serde_json::to_value(remote_version).ok();
483
484 let data = json!({
485 "graph_name": graph_name,
486 "resolution": resolution.map(|r| format!("{:?}", r)),
487 "local_version": local_json,
488 "remote_version": remote_json,
489 })
490 .to_string();
491
492 if let Err(e) = self.persistence.graph_changelog_append(
493 session_id,
494 &self.instance_id,
495 entity_type,
496 entity_id,
497 "conflict",
498 &vc_json,
499 Some(&data),
500 ) {
501 tracing::warn!(
502 "Failed to append conflict log for {} {}: {}",
503 entity_type,
504 entity_id,
505 e
506 );
507 }
508 }
509
510 async fn apply_synced_node(
512 &self,
513 node: &SyncedNode,
514 our_vector_clock: &mut VectorClock,
515 ) -> Result<bool> {
516 let existing = self.persistence.graph_get_node_with_sync(node.id)?;
518
519 if let Some(existing_node) = existing {
520 let existing_vc = VectorClock::from_json(&existing_node.vector_clock)?;
522 let incoming_vc = &node.vector_clock;
523
524 match incoming_vc.compare(&existing_vc) {
525 ClockOrder::After => {
526 self.update_node_from_synced(node)?;
528 our_vector_clock.merge(incoming_vc);
529 Ok(true)
530 }
531 ClockOrder::Before | ClockOrder::Equal => {
532 Ok(false)
534 }
535 ClockOrder::Concurrent => {
536 anyhow::bail!("conflict detected for node {}", node.id);
538 }
539 }
540 } else {
541 self.insert_node_from_synced(node)?;
543 our_vector_clock.merge(&node.vector_clock);
544 Ok(true)
545 }
546 }
547
548 async fn apply_synced_edge(
550 &self,
551 edge: &SyncedEdge,
552 our_vector_clock: &mut VectorClock,
553 ) -> Result<bool> {
554 let existing = self.persistence.graph_get_edge_with_sync(edge.id)?;
555
556 if let Some(existing_edge) = existing {
557 let existing_vc = VectorClock::from_json(&existing_edge.vector_clock)?;
558 let incoming_vc = &edge.vector_clock;
559
560 match incoming_vc.compare(&existing_vc) {
561 ClockOrder::After => {
562 self.update_edge_from_synced(edge)?;
563 our_vector_clock.merge(incoming_vc);
564 Ok(true)
565 }
566 ClockOrder::Before | ClockOrder::Equal => Ok(false),
567 ClockOrder::Concurrent => {
568 anyhow::bail!("conflict detected for edge {}", edge.id);
569 }
570 }
571 } else {
572 self.insert_edge_from_synced(edge)?;
573 our_vector_clock.merge(&edge.vector_clock);
574 Ok(true)
575 }
576 }
577
578 async fn apply_tombstone(
580 &self,
581 tombstone: &Tombstone,
582 our_vector_clock: &mut VectorClock,
583 ) -> Result<bool> {
584 let vc_str = tombstone.vector_clock.to_json()?;
585
586 match tombstone.entity_type.as_str() {
587 "node" => {
588 self.persistence.graph_mark_node_deleted(
589 tombstone.entity_id,
590 &vc_str,
591 &tombstone.deleted_by,
592 )?;
593 }
594 "edge" => {
595 self.persistence.graph_mark_edge_deleted(
596 tombstone.entity_id,
597 &vc_str,
598 &tombstone.deleted_by,
599 )?;
600 }
601 _ => {
602 anyhow::bail!("unknown entity type: {}", tombstone.entity_type);
603 }
604 }
605
606 our_vector_clock.merge(&tombstone.vector_clock);
607 Ok(true)
608 }
609
610 fn node_record_to_synced(record: SyncedNodeRecord) -> SyncedNode {
613 SyncedNode {
614 id: record.id,
615 session_id: record.session_id,
616 node_type: NodeType::from_str(&record.node_type),
617 label: record.label,
618 properties: record.properties,
619 embedding_id: record.embedding_id,
620 created_at: record.created_at,
621 updated_at: record.updated_at,
622 vector_clock: VectorClock::from_json(&record.vector_clock).unwrap_or_default(),
623 last_modified_by: record.last_modified_by,
624 is_deleted: record.is_deleted,
625 sync_enabled: record.sync_enabled,
626 }
627 }
628
629 fn edge_record_to_synced(record: SyncedEdgeRecord) -> SyncedEdge {
630 SyncedEdge {
631 id: record.id,
632 session_id: record.session_id,
633 source_id: record.source_id,
634 target_id: record.target_id,
635 edge_type: EdgeType::from_str(&record.edge_type),
636 predicate: record.predicate,
637 properties: record.properties,
638 weight: record.weight,
639 temporal_start: record.temporal_start,
640 temporal_end: record.temporal_end,
641 created_at: record.created_at,
642 vector_clock: VectorClock::from_json(&record.vector_clock).unwrap_or_default(),
643 last_modified_by: record.last_modified_by,
644 is_deleted: record.is_deleted,
645 sync_enabled: record.sync_enabled,
646 }
647 }
648
649 fn update_node_from_synced(&self, node: &SyncedNode) -> Result<()> {
650 let vc_str = node.vector_clock.to_json()?;
651 let last_modified = node.last_modified_by.as_deref().unwrap_or("unknown");
652
653 self.persistence.graph_update_node_sync_metadata(
654 node.id,
655 &vc_str,
656 last_modified,
657 node.sync_enabled,
658 )?;
659
660 self.persistence
662 .update_graph_node(node.id, &node.properties)?;
663
664 Ok(())
665 }
666
667 fn update_edge_from_synced(&self, edge: &SyncedEdge) -> Result<()> {
668 let vc_str = edge.vector_clock.to_json()?;
669 let last_modified = edge.last_modified_by.as_deref().unwrap_or("unknown");
670
671 self.persistence.graph_update_edge_sync_metadata(
672 edge.id,
673 &vc_str,
674 last_modified,
675 edge.sync_enabled,
676 )?;
677
678 Ok(())
679 }
680
681 fn insert_node_from_synced(&self, node: &SyncedNode) -> Result<()> {
682 let node_id = self.persistence.insert_graph_node(
684 &node.session_id,
685 node.node_type.clone(),
686 &node.label,
687 &node.properties,
688 node.embedding_id,
689 )?;
690
691 let vc_str = node.vector_clock.to_json()?;
693 let last_modified = node.last_modified_by.as_deref().unwrap_or("unknown");
694
695 self.persistence.graph_update_node_sync_metadata(
696 node_id,
697 &vc_str,
698 last_modified,
699 node.sync_enabled,
700 )?;
701
702 Ok(())
703 }
704
705 fn insert_edge_from_synced(&self, edge: &SyncedEdge) -> Result<()> {
706 let edge_id = self.persistence.insert_graph_edge(
708 &edge.session_id,
709 edge.source_id,
710 edge.target_id,
711 edge.edge_type.clone(),
712 edge.predicate.as_deref(),
713 edge.properties.as_ref(),
714 edge.weight,
715 )?;
716
717 let vc_str = edge.vector_clock.to_json()?;
719 let last_modified = edge.last_modified_by.as_deref().unwrap_or("unknown");
720
721 self.persistence.graph_update_edge_sync_metadata(
722 edge_id,
723 &vc_str,
724 last_modified,
725 edge.sync_enabled,
726 )?;
727
728 Ok(())
729 }
730}