Skip to main content

spec_ai/spec_ai_graph_sync/
protocol.rs

1//! Synchronization protocol types for graph synchronization.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use crate::spec_ai_knowledge_graph::{EdgeType, NodeType, VectorClock};
6
7/// Type of graph synchronization operation
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
9#[serde(rename_all = "snake_case")]
10pub enum SyncType {
11    /// Request complete graph snapshot
12    RequestFull,
13    /// Request incremental changes since given vector clock
14    RequestIncremental,
15    /// Full graph snapshot response
16    Full,
17    /// Incremental delta update response
18    Incremental,
19    /// Acknowledgment of received sync
20    Ack,
21    /// Conflict notification requiring resolution
22    Conflict,
23}
24
25/// Main payload for MessageType::GraphSync messages
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct GraphSyncPayload {
28    /// Type of sync operation
29    pub sync_type: SyncType,
30    /// Session ID for the graph being synced
31    pub session_id: String,
32    /// Graph name (from graph_metadata)
33    pub graph_name: Option<String>,
34    /// Vector clock representing the state of this sync
35    pub vector_clock: VectorClock,
36    /// Nodes to sync (empty for requests)
37    #[serde(default)]
38    pub nodes: Vec<SyncedNode>,
39    /// Edges to sync (empty for requests)
40    #[serde(default)]
41    pub edges: Vec<SyncedEdge>,
42    /// Tombstones for deleted entities
43    #[serde(default)]
44    pub tombstones: Vec<Tombstone>,
45    /// Optional correlation ID for request/response matching
46    pub correlation_id: Option<String>,
47    /// For Conflict type: description of the conflict
48    pub conflict_info: Option<String>,
49}
50
51/// Graph node with sync metadata
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncedNode {
54    /// Core node data
55    pub id: i64,
56    pub session_id: String,
57    pub node_type: NodeType,
58    pub label: String,
59    pub properties: serde_json::Value,
60    pub embedding_id: Option<i64>,
61    pub created_at: DateTime<Utc>,
62    pub updated_at: DateTime<Utc>,
63
64    /// Sync metadata
65    pub vector_clock: VectorClock,
66    pub last_modified_by: Option<String>,
67    pub is_deleted: bool,
68    pub sync_enabled: bool,
69}
70
71/// Graph edge with sync metadata
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct SyncedEdge {
74    /// Core edge data
75    pub id: i64,
76    pub session_id: String,
77    pub source_id: i64,
78    pub target_id: i64,
79    pub edge_type: EdgeType,
80    pub predicate: Option<String>,
81    pub properties: Option<serde_json::Value>,
82    pub weight: f32,
83    pub temporal_start: Option<DateTime<Utc>>,
84    pub temporal_end: Option<DateTime<Utc>>,
85    pub created_at: DateTime<Utc>,
86
87    /// Sync metadata
88    pub vector_clock: VectorClock,
89    pub last_modified_by: Option<String>,
90    pub is_deleted: bool,
91    pub sync_enabled: bool,
92}
93
94/// Tombstone for tracking deleted entities
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct Tombstone {
97    /// Type of entity: 'node' or 'edge'
98    pub entity_type: String,
99    /// ID of the deleted entity
100    pub entity_id: i64,
101    /// Vector clock at time of deletion
102    pub vector_clock: VectorClock,
103    /// Instance that performed the deletion
104    pub deleted_by: String,
105    /// When the deletion occurred
106    pub deleted_at: DateTime<Utc>,
107}
108
109/// Request for full graph sync
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct SyncFullRequest {
112    pub session_id: String,
113    pub graph_name: Option<String>,
114    pub requesting_instance: String,
115}
116
117/// Request for incremental sync since a given vector clock
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct SyncIncrementalRequest {
120    pub session_id: String,
121    pub graph_name: Option<String>,
122    pub requesting_instance: String,
123    pub since_vector_clock: VectorClock,
124}
125
126/// Response containing graph data
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct SyncResponse {
129    pub session_id: String,
130    pub graph_name: Option<String>,
131    pub vector_clock: VectorClock,
132    pub nodes: Vec<SyncedNode>,
133    pub edges: Vec<SyncedEdge>,
134    pub tombstones: Vec<Tombstone>,
135    pub is_incremental: bool,
136}
137
138/// Acknowledgment of successful sync
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct SyncAck {
141    pub session_id: String,
142    pub graph_name: Option<String>,
143    pub vector_clock: VectorClock,
144    pub nodes_applied: usize,
145    pub edges_applied: usize,
146    pub tombstones_applied: usize,
147    pub conflicts_detected: usize,
148}
149
150/// Conflict notification
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct SyncConflict {
153    pub session_id: String,
154    pub graph_name: Option<String>,
155    pub entity_type: String,
156    pub entity_id: i64,
157    pub local_vector_clock: VectorClock,
158    pub remote_vector_clock: VectorClock,
159    pub description: String,
160}
161
162impl GraphSyncPayload {
163    /// Create a full sync request
164    pub fn request_full(
165        session_id: String,
166        graph_name: Option<String>,
167        requesting_instance: String,
168    ) -> Self {
169        let mut vector_clock = VectorClock::new();
170        vector_clock.increment(&requesting_instance);
171
172        Self {
173            sync_type: SyncType::RequestFull,
174            session_id,
175            graph_name,
176            vector_clock,
177            nodes: Vec::new(),
178            edges: Vec::new(),
179            tombstones: Vec::new(),
180            correlation_id: Some(uuid::Uuid::new_v4().to_string()),
181            conflict_info: None,
182        }
183    }
184
185    /// Create an incremental sync request
186    pub fn request_incremental(
187        session_id: String,
188        graph_name: Option<String>,
189        requesting_instance: String,
190        since_vector_clock: VectorClock,
191    ) -> Self {
192        let mut vector_clock = since_vector_clock.clone();
193        vector_clock.increment(&requesting_instance);
194
195        Self {
196            sync_type: SyncType::RequestIncremental,
197            session_id,
198            graph_name,
199            vector_clock,
200            nodes: Vec::new(),
201            edges: Vec::new(),
202            tombstones: Vec::new(),
203            correlation_id: Some(uuid::Uuid::new_v4().to_string()),
204            conflict_info: None,
205        }
206    }
207
208    /// Create a full sync response
209    pub fn response_full(
210        session_id: String,
211        graph_name: Option<String>,
212        vector_clock: VectorClock,
213        nodes: Vec<SyncedNode>,
214        edges: Vec<SyncedEdge>,
215        tombstones: Vec<Tombstone>,
216        correlation_id: Option<String>,
217    ) -> Self {
218        Self {
219            sync_type: SyncType::Full,
220            session_id,
221            graph_name,
222            vector_clock,
223            nodes,
224            edges,
225            tombstones,
226            correlation_id,
227            conflict_info: None,
228        }
229    }
230
231    /// Create an incremental sync response
232    pub fn response_incremental(
233        session_id: String,
234        graph_name: Option<String>,
235        vector_clock: VectorClock,
236        nodes: Vec<SyncedNode>,
237        edges: Vec<SyncedEdge>,
238        tombstones: Vec<Tombstone>,
239        correlation_id: Option<String>,
240    ) -> Self {
241        Self {
242            sync_type: SyncType::Incremental,
243            session_id,
244            graph_name,
245            vector_clock,
246            nodes,
247            edges,
248            tombstones,
249            correlation_id,
250            conflict_info: None,
251        }
252    }
253
254    /// Create an acknowledgment
255    #[allow(clippy::too_many_arguments)]
256    pub fn ack(
257        session_id: String,
258        graph_name: Option<String>,
259        vector_clock: VectorClock,
260        nodes_applied: usize,
261        edges_applied: usize,
262        tombstones_applied: usize,
263        conflicts_detected: usize,
264        correlation_id: Option<String>,
265    ) -> Self {
266        Self {
267            sync_type: SyncType::Ack,
268            session_id,
269            graph_name,
270            vector_clock,
271            nodes: Vec::new(),
272            edges: Vec::new(),
273            tombstones: Vec::new(),
274            correlation_id,
275            conflict_info: Some(format!(
276                "Applied {}/{}/{} (nodes/edges/tombstones), {} conflicts",
277                nodes_applied, edges_applied, tombstones_applied, conflicts_detected
278            )),
279        }
280    }
281
282    /// Create a conflict notification
283    pub fn conflict(
284        session_id: String,
285        graph_name: Option<String>,
286        entity_type: String,
287        entity_id: i64,
288        local_vector_clock: VectorClock,
289        remote_vector_clock: VectorClock,
290        correlation_id: Option<String>,
291    ) -> Self {
292        Self {
293            sync_type: SyncType::Conflict,
294            session_id,
295            graph_name,
296            vector_clock: local_vector_clock.clone(),
297            nodes: Vec::new(),
298            edges: Vec::new(),
299            tombstones: Vec::new(),
300            correlation_id,
301            conflict_info: Some(format!(
302                "Conflict detected for {} {}: local={}, remote={}",
303                entity_type, entity_id, local_vector_clock, remote_vector_clock
304            )),
305        }
306    }
307}
308
309impl Tombstone {
310    /// Create a new tombstone for a deleted entity
311    pub fn new(
312        entity_type: String,
313        entity_id: i64,
314        vector_clock: VectorClock,
315        deleted_by: String,
316    ) -> Self {
317        Self {
318            entity_type,
319            entity_id,
320            vector_clock,
321            deleted_by,
322            deleted_at: Utc::now(),
323        }
324    }
325}