Skip to main content

spec_ai/spec_ai_graph_sync/
engine.rs

1//! Graph synchronization engine with adaptive strategy.
2
3use crate::spec_ai_graph_sync::persistence::SyncPersistence;
4use crate::spec_ai_graph_sync::protocol::{GraphSyncPayload, SyncType, SyncedEdge, SyncedNode, Tombstone};
5use crate::spec_ai_graph_sync::resolver::{ConflictResolution, ConflictResolver};
6use crate::spec_ai_graph_sync::types::{SyncedEdgeRecord, SyncedNodeRecord};
7use anyhow::Result;
8use serde::Serialize;
9use serde_json::json;
10use crate::spec_ai_knowledge_graph::{ClockOrder, EdgeType, NodeType, VectorClock};
11use std::collections::HashSet;
12
13/// Threshold for deciding between full and incremental sync.
14/// If more than this percentage of nodes changed, do a full sync.
15const INCREMENTAL_THRESHOLD: f32 = 0.3; // 30%
16
17/// Graph synchronization engine with adaptive strategy.
18pub struct SyncEngine<P: SyncPersistence> {
19    persistence: P,
20    instance_id: String,
21    resolver: ConflictResolver,
22}
23
24/// Statistics from a sync operation.
25#[derive(Debug, Clone)]
26pub struct SyncStats {
27    pub nodes_sent: usize,
28    pub edges_sent: usize,
29    pub tombstones_sent: usize,
30    pub nodes_applied: usize,
31    pub edges_applied: usize,
32    pub tombstones_applied: usize,
33    pub conflicts_detected: usize,
34    pub conflicts_resolved: usize,
35    pub sync_type: String,
36}
37
38impl<P: SyncPersistence> SyncEngine<P> {
39    /// Create a new sync engine.
40    pub fn new(persistence: P, instance_id: String) -> Self {
41        Self {
42            persistence,
43            instance_id: instance_id.clone(),
44            resolver: ConflictResolver::new(instance_id),
45        }
46    }
47
48    /// Get a reference to the persistence layer.
49    pub fn persistence(&self) -> &P {
50        &self.persistence
51    }
52
53    /// Get the instance ID.
54    pub fn instance_id(&self) -> &str {
55        &self.instance_id
56    }
57
58    /// Get a reference to the conflict resolver.
59    pub fn resolver(&self) -> &ConflictResolver {
60        &self.resolver
61    }
62
63    /// Decide whether to use full or incremental sync based on changelog size.
64    pub async fn decide_sync_strategy(
65        &self,
66        session_id: &str,
67        graph_name: &str,
68        their_vector_clock: &VectorClock,
69    ) -> Result<SyncType> {
70        // Get our current vector clock
71        let our_vc_str = self
72            .persistence
73            .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
74            .unwrap_or_else(|| "{}".to_string());
75        let our_vc = VectorClock::from_json(&our_vc_str)?;
76
77        // If they're way behind or we have no common history, do full sync
78        if their_vector_clock.is_empty() || our_vc.is_empty() {
79            return Ok(SyncType::Full);
80        }
81
82        // Count total nodes in the graph
83        let total_nodes = self.persistence.count_graph_nodes(session_id)?;
84
85        if total_nodes == 0 {
86            return Ok(SyncType::Full);
87        }
88
89        // Estimate changed nodes by checking changelog
90        // This is an approximation - in production you'd want a more precise count
91        let since_timestamp = chrono::Utc::now()
92            .checked_sub_signed(chrono::Duration::hours(24))
93            .unwrap()
94            .to_rfc3339();
95
96        let changelog_entries = self
97            .persistence
98            .graph_changelog_get_since(session_id, &since_timestamp)?;
99
100        // Calculate change ratio
101        let changed_count = changelog_entries.len();
102        let change_ratio = (changed_count as f32) / (total_nodes as f32);
103
104        if change_ratio > INCREMENTAL_THRESHOLD {
105            Ok(SyncType::Full)
106        } else {
107            Ok(SyncType::Incremental)
108        }
109    }
110
111    /// Perform a full graph sync - send entire graph.
112    pub async fn sync_full(&self, session_id: &str, graph_name: &str) -> Result<GraphSyncPayload> {
113        // Get all synced nodes and edges
114        let nodes = self
115            .persistence
116            .graph_list_nodes_with_sync(session_id, true, false)?;
117        let edges = self
118            .persistence
119            .graph_list_edges_with_sync(session_id, true, false)?;
120
121        // Get our current vector clock
122        let vc_str = self
123            .persistence
124            .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
125            .unwrap_or_else(|| "{}".to_string());
126        let vector_clock = VectorClock::from_json(&vc_str)?;
127
128        // Convert to sync protocol types
129        let synced_nodes: Vec<SyncedNode> = nodes
130            .into_iter()
131            .map(|n| Self::node_record_to_synced(n))
132            .collect();
133        let synced_edges: Vec<SyncedEdge> = edges
134            .into_iter()
135            .map(|e| Self::edge_record_to_synced(e))
136            .collect();
137
138        Ok(GraphSyncPayload::response_full(
139            session_id.to_string(),
140            Some(graph_name.to_string()),
141            vector_clock,
142            synced_nodes,
143            synced_edges,
144            Vec::new(), // No tombstones in full sync
145            None,
146        ))
147    }
148
149    /// Perform incremental sync - send only changes since their vector clock.
150    pub async fn sync_incremental(
151        &self,
152        session_id: &str,
153        graph_name: &str,
154        their_vector_clock: &VectorClock,
155    ) -> Result<GraphSyncPayload> {
156        // Get our current vector clock
157        let our_vc_str = self
158            .persistence
159            .graph_sync_state_get(&self.instance_id, session_id, graph_name)?
160            .unwrap_or_else(|| "{}".to_string());
161        let our_vector_clock = VectorClock::from_json(&our_vc_str)?;
162
163        // Get changelog entries since their last sync
164        // For simplicity, we'll get recent changes and filter by vector clock
165        let since_timestamp = chrono::Utc::now()
166            .checked_sub_signed(chrono::Duration::days(7))
167            .unwrap()
168            .to_rfc3339();
169
170        let changelog = self
171            .persistence
172            .graph_changelog_get_since(session_id, &since_timestamp)?;
173
174        // Filter changelog entries that happened after their vector clock
175        let relevant_changes: Vec<_> = changelog
176            .iter()
177            .filter(|entry| {
178                if let Ok(entry_vc) = VectorClock::from_json(&entry.vector_clock) {
179                    their_vector_clock.happens_before(&entry_vc)
180                        || their_vector_clock.is_concurrent(&entry_vc)
181                } else {
182                    false
183                }
184            })
185            .collect();
186
187        // Group by entity type and ID
188        let mut node_ids: HashSet<i64> = HashSet::new();
189        let mut edge_ids: HashSet<i64> = HashSet::new();
190        let mut tombstones: Vec<Tombstone> = Vec::new();
191
192        for entry in relevant_changes {
193            match entry.entity_type.as_str() {
194                "node" => {
195                    if entry.operation == "delete" {
196                        let vc = VectorClock::from_json(&entry.vector_clock)?;
197                        tombstones.push(Tombstone::new(
198                            "node".to_string(),
199                            entry.entity_id,
200                            vc,
201                            entry.instance_id.clone(),
202                        ));
203                    } else {
204                        node_ids.insert(entry.entity_id);
205                    }
206                }
207                "edge" => {
208                    if entry.operation == "delete" {
209                        let vc = VectorClock::from_json(&entry.vector_clock)?;
210                        tombstones.push(Tombstone::new(
211                            "edge".to_string(),
212                            entry.entity_id,
213                            vc,
214                            entry.instance_id.clone(),
215                        ));
216                    } else {
217                        edge_ids.insert(entry.entity_id);
218                    }
219                }
220                _ => {}
221            }
222        }
223
224        // Fetch full entities for changed nodes/edges
225        let mut synced_nodes = Vec::new();
226        for node_id in node_ids {
227            if let Some(node) = self.persistence.graph_get_node_with_sync(node_id)? {
228                if node.sync_enabled && !node.is_deleted {
229                    synced_nodes.push(Self::node_record_to_synced(node));
230                }
231            }
232        }
233
234        let mut synced_edges = Vec::new();
235        for edge_id in edge_ids {
236            if let Some(edge) = self.persistence.graph_get_edge_with_sync(edge_id)? {
237                if edge.sync_enabled && !edge.is_deleted {
238                    synced_edges.push(Self::edge_record_to_synced(edge));
239                }
240            }
241        }
242
243        Ok(GraphSyncPayload::response_incremental(
244            session_id.to_string(),
245            Some(graph_name.to_string()),
246            our_vector_clock,
247            synced_nodes,
248            synced_edges,
249            tombstones,
250            None,
251        ))
252    }
253
254    /// Apply incoming sync payload to local graph.
255    pub async fn apply_sync(
256        &self,
257        payload: &GraphSyncPayload,
258        graph_name: &str,
259    ) -> Result<SyncStats> {
260        let mut stats = SyncStats {
261            nodes_sent: 0,
262            edges_sent: 0,
263            tombstones_sent: 0,
264            nodes_applied: 0,
265            edges_applied: 0,
266            tombstones_applied: 0,
267            conflicts_detected: 0,
268            conflicts_resolved: 0,
269            sync_type: format!("{:?}", payload.sync_type),
270        };
271
272        // Get our current vector clock
273        let our_vc_str = self
274            .persistence
275            .graph_sync_state_get(&self.instance_id, &payload.session_id, graph_name)?
276            .unwrap_or_else(|| "{}".to_string());
277        let mut our_vector_clock = VectorClock::from_json(&our_vc_str)?;
278
279        // Apply nodes
280        for node in &payload.nodes {
281            match self.apply_synced_node(node, &mut our_vector_clock).await {
282                Ok(applied) => {
283                    if applied {
284                        stats.nodes_applied += 1;
285                    }
286                }
287                Err(e) if e.to_string().contains("conflict") => {
288                    stats.conflicts_detected += 1;
289                    // Get existing node for conflict resolution
290                    let existing_node = self
291                        .persistence
292                        .graph_get_node_with_sync(node.id)?
293                        .map(|n| Self::node_record_to_synced(n));
294
295                    let resolution = self.resolver.resolve_node_conflict(
296                        node,
297                        existing_node.as_ref(),
298                        &mut our_vector_clock,
299                    );
300
301                    self.record_conflict(
302                        &node.session_id,
303                        graph_name,
304                        "node",
305                        node.id,
306                        existing_node.as_ref(),
307                        node,
308                        &our_vector_clock,
309                        resolution.as_ref().ok(),
310                    );
311
312                    // Try to resolve conflict
313                    match resolution {
314                        Ok(ConflictResolution::AcceptRemote) => {
315                            // Apply the remote version
316                            self.update_node_from_synced(node)?;
317                            stats.conflicts_resolved += 1;
318                            stats.nodes_applied += 1;
319                        }
320                        Ok(ConflictResolution::KeepLocal) => {
321                            // Keep our version, no action needed
322                            stats.conflicts_resolved += 1;
323                        }
324                        Ok(ConflictResolution::Merged(merged_value)) => {
325                            // Apply the merged version
326                            if let Ok(merged_node) =
327                                serde_json::from_value::<SyncedNode>(merged_value)
328                            {
329                                self.update_node_from_synced(&merged_node)?;
330                                stats.conflicts_resolved += 1;
331                                stats.nodes_applied += 1;
332                            }
333                        }
334                        Ok(ConflictResolution::RequiresManualReview) => {
335                            tracing::warn!("Node {} conflict requires manual review", node.id);
336                            // Don't count as resolved
337                        }
338                        Err(e) => {
339                            tracing::warn!(
340                                "Failed to resolve conflict for node {}: {}",
341                                node.id,
342                                e
343                            );
344                        }
345                    }
346                }
347                Err(e) => {
348                    tracing::warn!("Failed to apply node {}: {}", node.id, e);
349                }
350            }
351        }
352
353        // Apply edges
354        for edge in &payload.edges {
355            match self.apply_synced_edge(edge, &mut our_vector_clock).await {
356                Ok(applied) => {
357                    if applied {
358                        stats.edges_applied += 1;
359                    }
360                }
361                Err(e) if e.to_string().contains("conflict") => {
362                    stats.conflicts_detected += 1;
363                    // Get existing edge for conflict resolution
364                    let existing_edge = self
365                        .persistence
366                        .graph_get_edge_with_sync(edge.id)?
367                        .map(|e| Self::edge_record_to_synced(e));
368
369                    let resolution = self.resolver.resolve_edge_conflict(
370                        edge,
371                        existing_edge.as_ref(),
372                        &mut our_vector_clock,
373                    );
374
375                    self.record_conflict(
376                        &edge.session_id,
377                        graph_name,
378                        "edge",
379                        edge.id,
380                        existing_edge.as_ref(),
381                        edge,
382                        &our_vector_clock,
383                        resolution.as_ref().ok(),
384                    );
385
386                    // Try to resolve conflict
387                    match resolution {
388                        Ok(ConflictResolution::AcceptRemote) => {
389                            // Apply the remote version
390                            self.update_edge_from_synced(edge)?;
391                            stats.conflicts_resolved += 1;
392                            stats.edges_applied += 1;
393                        }
394                        Ok(ConflictResolution::KeepLocal) => {
395                            // Keep our version, no action needed
396                            stats.conflicts_resolved += 1;
397                        }
398                        Ok(ConflictResolution::Merged(merged_value)) => {
399                            // Apply the merged version
400                            if let Ok(merged_edge) =
401                                serde_json::from_value::<SyncedEdge>(merged_value)
402                            {
403                                self.update_edge_from_synced(&merged_edge)?;
404                                stats.conflicts_resolved += 1;
405                                stats.edges_applied += 1;
406                            }
407                        }
408                        Ok(ConflictResolution::RequiresManualReview) => {
409                            tracing::warn!("Edge {} conflict requires manual review", edge.id);
410                            // Don't count as resolved
411                        }
412                        Err(e) => {
413                            tracing::warn!(
414                                "Failed to resolve conflict for edge {}: {}",
415                                edge.id,
416                                e
417                            );
418                        }
419                    }
420                }
421                Err(e) => {
422                    tracing::warn!("Failed to apply edge {}: {}", edge.id, e);
423                }
424            }
425        }
426
427        // Apply tombstones
428        for tombstone in &payload.tombstones {
429            match self.apply_tombstone(tombstone, &mut our_vector_clock).await {
430                Ok(applied) => {
431                    if applied {
432                        stats.tombstones_applied += 1;
433                    }
434                }
435                Err(e) => {
436                    tracing::warn!(
437                        "Failed to apply tombstone for {} {}: {}",
438                        tombstone.entity_type,
439                        tombstone.entity_id,
440                        e
441                    );
442                }
443            }
444        }
445
446        // Merge their vector clock into ours
447        our_vector_clock.merge(&payload.vector_clock);
448
449        // Update our sync state
450        let updated_vc_str = our_vector_clock.to_json()?;
451        self.persistence.graph_sync_state_update(
452            &self.instance_id,
453            &payload.session_id,
454            graph_name,
455            &updated_vc_str,
456        )?;
457
458        Ok(stats)
459    }
460
461    #[allow(clippy::too_many_arguments)]
462    fn record_conflict<V: Serialize>(
463        &self,
464        session_id: &str,
465        graph_name: &str,
466        entity_type: &str,
467        entity_id: i64,
468        local_version: Option<&V>,
469        remote_version: &V,
470        vector_clock: &VectorClock,
471        resolution: Option<&ConflictResolution>,
472    ) {
473        let vc_json = match vector_clock.to_json() {
474            Ok(vc) => vc,
475            Err(e) => {
476                tracing::warn!("Failed to serialize vector clock for conflict log: {}", e);
477                return;
478            }
479        };
480
481        let local_json = local_version.and_then(|v| serde_json::to_value(v).ok());
482        let remote_json = serde_json::to_value(remote_version).ok();
483
484        let data = json!({
485            "graph_name": graph_name,
486            "resolution": resolution.map(|r| format!("{:?}", r)),
487            "local_version": local_json,
488            "remote_version": remote_json,
489        })
490        .to_string();
491
492        if let Err(e) = self.persistence.graph_changelog_append(
493            session_id,
494            &self.instance_id,
495            entity_type,
496            entity_id,
497            "conflict",
498            &vc_json,
499            Some(&data),
500        ) {
501            tracing::warn!(
502                "Failed to append conflict log for {} {}: {}",
503                entity_type,
504                entity_id,
505                e
506            );
507        }
508    }
509
510    /// Apply a single synced node with conflict detection.
511    async fn apply_synced_node(
512        &self,
513        node: &SyncedNode,
514        our_vector_clock: &mut VectorClock,
515    ) -> Result<bool> {
516        // Check if node exists locally
517        let existing = self.persistence.graph_get_node_with_sync(node.id)?;
518
519        if let Some(existing_node) = existing {
520            // Node exists - check for conflicts
521            let existing_vc = VectorClock::from_json(&existing_node.vector_clock)?;
522            let incoming_vc = &node.vector_clock;
523
524            match incoming_vc.compare(&existing_vc) {
525                ClockOrder::After => {
526                    // Incoming is newer, apply it
527                    self.update_node_from_synced(node)?;
528                    our_vector_clock.merge(incoming_vc);
529                    Ok(true)
530                }
531                ClockOrder::Before | ClockOrder::Equal => {
532                    // Our version is newer or equal, skip
533                    Ok(false)
534                }
535                ClockOrder::Concurrent => {
536                    // Conflict - let resolver handle it
537                    anyhow::bail!("conflict detected for node {}", node.id);
538                }
539            }
540        } else {
541            // Node doesn't exist, insert it
542            self.insert_node_from_synced(node)?;
543            our_vector_clock.merge(&node.vector_clock);
544            Ok(true)
545        }
546    }
547
548    /// Apply a single synced edge with conflict detection.
549    async fn apply_synced_edge(
550        &self,
551        edge: &SyncedEdge,
552        our_vector_clock: &mut VectorClock,
553    ) -> Result<bool> {
554        let existing = self.persistence.graph_get_edge_with_sync(edge.id)?;
555
556        if let Some(existing_edge) = existing {
557            let existing_vc = VectorClock::from_json(&existing_edge.vector_clock)?;
558            let incoming_vc = &edge.vector_clock;
559
560            match incoming_vc.compare(&existing_vc) {
561                ClockOrder::After => {
562                    self.update_edge_from_synced(edge)?;
563                    our_vector_clock.merge(incoming_vc);
564                    Ok(true)
565                }
566                ClockOrder::Before | ClockOrder::Equal => Ok(false),
567                ClockOrder::Concurrent => {
568                    anyhow::bail!("conflict detected for edge {}", edge.id);
569                }
570            }
571        } else {
572            self.insert_edge_from_synced(edge)?;
573            our_vector_clock.merge(&edge.vector_clock);
574            Ok(true)
575        }
576    }
577
578    /// Apply a tombstone (deleted entity).
579    async fn apply_tombstone(
580        &self,
581        tombstone: &Tombstone,
582        our_vector_clock: &mut VectorClock,
583    ) -> Result<bool> {
584        let vc_str = tombstone.vector_clock.to_json()?;
585
586        match tombstone.entity_type.as_str() {
587            "node" => {
588                self.persistence.graph_mark_node_deleted(
589                    tombstone.entity_id,
590                    &vc_str,
591                    &tombstone.deleted_by,
592                )?;
593            }
594            "edge" => {
595                self.persistence.graph_mark_edge_deleted(
596                    tombstone.entity_id,
597                    &vc_str,
598                    &tombstone.deleted_by,
599                )?;
600            }
601            _ => {
602                anyhow::bail!("unknown entity type: {}", tombstone.entity_type);
603            }
604        }
605
606        our_vector_clock.merge(&tombstone.vector_clock);
607        Ok(true)
608    }
609
610    // Helper methods for converting between record types
611
612    fn node_record_to_synced(record: SyncedNodeRecord) -> SyncedNode {
613        SyncedNode {
614            id: record.id,
615            session_id: record.session_id,
616            node_type: NodeType::from_str(&record.node_type),
617            label: record.label,
618            properties: record.properties,
619            embedding_id: record.embedding_id,
620            created_at: record.created_at,
621            updated_at: record.updated_at,
622            vector_clock: VectorClock::from_json(&record.vector_clock).unwrap_or_default(),
623            last_modified_by: record.last_modified_by,
624            is_deleted: record.is_deleted,
625            sync_enabled: record.sync_enabled,
626        }
627    }
628
629    fn edge_record_to_synced(record: SyncedEdgeRecord) -> SyncedEdge {
630        SyncedEdge {
631            id: record.id,
632            session_id: record.session_id,
633            source_id: record.source_id,
634            target_id: record.target_id,
635            edge_type: EdgeType::from_str(&record.edge_type),
636            predicate: record.predicate,
637            properties: record.properties,
638            weight: record.weight,
639            temporal_start: record.temporal_start,
640            temporal_end: record.temporal_end,
641            created_at: record.created_at,
642            vector_clock: VectorClock::from_json(&record.vector_clock).unwrap_or_default(),
643            last_modified_by: record.last_modified_by,
644            is_deleted: record.is_deleted,
645            sync_enabled: record.sync_enabled,
646        }
647    }
648
649    fn update_node_from_synced(&self, node: &SyncedNode) -> Result<()> {
650        let vc_str = node.vector_clock.to_json()?;
651        let last_modified = node.last_modified_by.as_deref().unwrap_or("unknown");
652
653        self.persistence.graph_update_node_sync_metadata(
654            node.id,
655            &vc_str,
656            last_modified,
657            node.sync_enabled,
658        )?;
659
660        // Also update the node properties
661        self.persistence
662            .update_graph_node(node.id, &node.properties)?;
663
664        Ok(())
665    }
666
667    fn update_edge_from_synced(&self, edge: &SyncedEdge) -> Result<()> {
668        let vc_str = edge.vector_clock.to_json()?;
669        let last_modified = edge.last_modified_by.as_deref().unwrap_or("unknown");
670
671        self.persistence.graph_update_edge_sync_metadata(
672            edge.id,
673            &vc_str,
674            last_modified,
675            edge.sync_enabled,
676        )?;
677
678        Ok(())
679    }
680
681    fn insert_node_from_synced(&self, node: &SyncedNode) -> Result<()> {
682        // Insert the node first
683        let node_id = self.persistence.insert_graph_node(
684            &node.session_id,
685            node.node_type.clone(),
686            &node.label,
687            &node.properties,
688            node.embedding_id,
689        )?;
690
691        // Then update its sync metadata
692        let vc_str = node.vector_clock.to_json()?;
693        let last_modified = node.last_modified_by.as_deref().unwrap_or("unknown");
694
695        self.persistence.graph_update_node_sync_metadata(
696            node_id,
697            &vc_str,
698            last_modified,
699            node.sync_enabled,
700        )?;
701
702        Ok(())
703    }
704
705    fn insert_edge_from_synced(&self, edge: &SyncedEdge) -> Result<()> {
706        // Insert the edge first
707        let edge_id = self.persistence.insert_graph_edge(
708            &edge.session_id,
709            edge.source_id,
710            edge.target_id,
711            edge.edge_type.clone(),
712            edge.predicate.as_deref(),
713            edge.properties.as_ref(),
714            edge.weight,
715        )?;
716
717        // Then update its sync metadata
718        let vc_str = edge.vector_clock.to_json()?;
719        let last_modified = edge.last_modified_by.as_deref().unwrap_or("unknown");
720
721        self.persistence.graph_update_edge_sync_metadata(
722            edge_id,
723            &vc_str,
724            last_modified,
725            edge.sync_enabled,
726        )?;
727
728        Ok(())
729    }
730}