1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use crate::spec_ai_knowledge_graph::{EdgeType, NodeType, VectorClock};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
9#[serde(rename_all = "snake_case")]
10pub enum SyncType {
11 RequestFull,
13 RequestIncremental,
15 Full,
17 Incremental,
19 Ack,
21 Conflict,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct GraphSyncPayload {
28 pub sync_type: SyncType,
30 pub session_id: String,
32 pub graph_name: Option<String>,
34 pub vector_clock: VectorClock,
36 #[serde(default)]
38 pub nodes: Vec<SyncedNode>,
39 #[serde(default)]
41 pub edges: Vec<SyncedEdge>,
42 #[serde(default)]
44 pub tombstones: Vec<Tombstone>,
45 pub correlation_id: Option<String>,
47 pub conflict_info: Option<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncedNode {
54 pub id: i64,
56 pub session_id: String,
57 pub node_type: NodeType,
58 pub label: String,
59 pub properties: serde_json::Value,
60 pub embedding_id: Option<i64>,
61 pub created_at: DateTime<Utc>,
62 pub updated_at: DateTime<Utc>,
63
64 pub vector_clock: VectorClock,
66 pub last_modified_by: Option<String>,
67 pub is_deleted: bool,
68 pub sync_enabled: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct SyncedEdge {
74 pub id: i64,
76 pub session_id: String,
77 pub source_id: i64,
78 pub target_id: i64,
79 pub edge_type: EdgeType,
80 pub predicate: Option<String>,
81 pub properties: Option<serde_json::Value>,
82 pub weight: f32,
83 pub temporal_start: Option<DateTime<Utc>>,
84 pub temporal_end: Option<DateTime<Utc>>,
85 pub created_at: DateTime<Utc>,
86
87 pub vector_clock: VectorClock,
89 pub last_modified_by: Option<String>,
90 pub is_deleted: bool,
91 pub sync_enabled: bool,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct Tombstone {
97 pub entity_type: String,
99 pub entity_id: i64,
101 pub vector_clock: VectorClock,
103 pub deleted_by: String,
105 pub deleted_at: DateTime<Utc>,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct SyncFullRequest {
112 pub session_id: String,
113 pub graph_name: Option<String>,
114 pub requesting_instance: String,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct SyncIncrementalRequest {
120 pub session_id: String,
121 pub graph_name: Option<String>,
122 pub requesting_instance: String,
123 pub since_vector_clock: VectorClock,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct SyncResponse {
129 pub session_id: String,
130 pub graph_name: Option<String>,
131 pub vector_clock: VectorClock,
132 pub nodes: Vec<SyncedNode>,
133 pub edges: Vec<SyncedEdge>,
134 pub tombstones: Vec<Tombstone>,
135 pub is_incremental: bool,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct SyncAck {
141 pub session_id: String,
142 pub graph_name: Option<String>,
143 pub vector_clock: VectorClock,
144 pub nodes_applied: usize,
145 pub edges_applied: usize,
146 pub tombstones_applied: usize,
147 pub conflicts_detected: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct SyncConflict {
153 pub session_id: String,
154 pub graph_name: Option<String>,
155 pub entity_type: String,
156 pub entity_id: i64,
157 pub local_vector_clock: VectorClock,
158 pub remote_vector_clock: VectorClock,
159 pub description: String,
160}
161
162impl GraphSyncPayload {
163 pub fn request_full(
165 session_id: String,
166 graph_name: Option<String>,
167 requesting_instance: String,
168 ) -> Self {
169 let mut vector_clock = VectorClock::new();
170 vector_clock.increment(&requesting_instance);
171
172 Self {
173 sync_type: SyncType::RequestFull,
174 session_id,
175 graph_name,
176 vector_clock,
177 nodes: Vec::new(),
178 edges: Vec::new(),
179 tombstones: Vec::new(),
180 correlation_id: Some(uuid::Uuid::new_v4().to_string()),
181 conflict_info: None,
182 }
183 }
184
185 pub fn request_incremental(
187 session_id: String,
188 graph_name: Option<String>,
189 requesting_instance: String,
190 since_vector_clock: VectorClock,
191 ) -> Self {
192 let mut vector_clock = since_vector_clock.clone();
193 vector_clock.increment(&requesting_instance);
194
195 Self {
196 sync_type: SyncType::RequestIncremental,
197 session_id,
198 graph_name,
199 vector_clock,
200 nodes: Vec::new(),
201 edges: Vec::new(),
202 tombstones: Vec::new(),
203 correlation_id: Some(uuid::Uuid::new_v4().to_string()),
204 conflict_info: None,
205 }
206 }
207
208 pub fn response_full(
210 session_id: String,
211 graph_name: Option<String>,
212 vector_clock: VectorClock,
213 nodes: Vec<SyncedNode>,
214 edges: Vec<SyncedEdge>,
215 tombstones: Vec<Tombstone>,
216 correlation_id: Option<String>,
217 ) -> Self {
218 Self {
219 sync_type: SyncType::Full,
220 session_id,
221 graph_name,
222 vector_clock,
223 nodes,
224 edges,
225 tombstones,
226 correlation_id,
227 conflict_info: None,
228 }
229 }
230
231 pub fn response_incremental(
233 session_id: String,
234 graph_name: Option<String>,
235 vector_clock: VectorClock,
236 nodes: Vec<SyncedNode>,
237 edges: Vec<SyncedEdge>,
238 tombstones: Vec<Tombstone>,
239 correlation_id: Option<String>,
240 ) -> Self {
241 Self {
242 sync_type: SyncType::Incremental,
243 session_id,
244 graph_name,
245 vector_clock,
246 nodes,
247 edges,
248 tombstones,
249 correlation_id,
250 conflict_info: None,
251 }
252 }
253
254 #[allow(clippy::too_many_arguments)]
256 pub fn ack(
257 session_id: String,
258 graph_name: Option<String>,
259 vector_clock: VectorClock,
260 nodes_applied: usize,
261 edges_applied: usize,
262 tombstones_applied: usize,
263 conflicts_detected: usize,
264 correlation_id: Option<String>,
265 ) -> Self {
266 Self {
267 sync_type: SyncType::Ack,
268 session_id,
269 graph_name,
270 vector_clock,
271 nodes: Vec::new(),
272 edges: Vec::new(),
273 tombstones: Vec::new(),
274 correlation_id,
275 conflict_info: Some(format!(
276 "Applied {}/{}/{} (nodes/edges/tombstones), {} conflicts",
277 nodes_applied, edges_applied, tombstones_applied, conflicts_detected
278 )),
279 }
280 }
281
282 pub fn conflict(
284 session_id: String,
285 graph_name: Option<String>,
286 entity_type: String,
287 entity_id: i64,
288 local_vector_clock: VectorClock,
289 remote_vector_clock: VectorClock,
290 correlation_id: Option<String>,
291 ) -> Self {
292 Self {
293 sync_type: SyncType::Conflict,
294 session_id,
295 graph_name,
296 vector_clock: local_vector_clock.clone(),
297 nodes: Vec::new(),
298 edges: Vec::new(),
299 tombstones: Vec::new(),
300 correlation_id,
301 conflict_info: Some(format!(
302 "Conflict detected for {} {}: local={}, remote={}",
303 entity_type, entity_id, local_vector_clock, remote_vector_clock
304 )),
305 }
306 }
307}
308
309impl Tombstone {
310 pub fn new(
312 entity_type: String,
313 entity_id: i64,
314 vector_clock: VectorClock,
315 deleted_by: String,
316 ) -> Self {
317 Self {
318 entity_type,
319 entity_id,
320 vector_clock,
321 deleted_by,
322 deleted_at: Utc::now(),
323 }
324 }
325}