1use crate::spec_ai_knowledge_graph::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::spec_ai_knowledge_graph::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{Connection, params};
6use serde_json::{Map, Value as JsonValue};
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12 conn: Arc<Mutex<Connection>>,
13 instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17 pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18 Self {
19 conn,
20 instance_id: instance_id.into(),
21 }
22 }
23
24 pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25 Self {
26 conn: Arc::new(Mutex::new(conn)),
27 instance_id: instance_id.into(),
28 }
29 }
30
31 pub fn instance_id(&self) -> &str {
32 &self.instance_id
33 }
34
35 fn conn(&self) -> MutexGuard<'_, Connection> {
36 self.conn.lock().expect("database connection poisoned")
37 }
38
39 pub fn insert_graph_node(
42 &self,
43 session_id: &str,
44 node_type: NodeType,
45 label: &str,
46 properties: &JsonValue,
47 embedding_id: Option<i64>,
48 ) -> Result<i64> {
49 let sync_enabled = self
50 .graph_get_sync_enabled(session_id, "default")
51 .unwrap_or(false);
52
53 let mut vector_clock = VectorClock::new();
54 vector_clock.increment(&self.instance_id);
55 let vc_json = vector_clock.to_json()?;
56
57 let conn = self.conn();
58
59 let mut stmt = conn.prepare(
60 "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61 vector_clock, last_modified_by, sync_enabled)
62 VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63 )?;
64 let id: i64 = stmt.query_row(
65 params![
66 session_id,
67 node_type.as_str(),
68 label,
69 properties.to_string(),
70 embedding_id,
71 vc_json,
72 self.instance_id,
73 sync_enabled,
74 ],
75 |row| row.get(0),
76 )?;
77
78 if sync_enabled {
79 let node_data = serde_json::json!({
80 "id": id,
81 "session_id": session_id,
82 "node_type": node_type.as_str(),
83 "label": label,
84 "properties": properties,
85 "embedding_id": embedding_id,
86 });
87
88 self.graph_changelog_append(
89 session_id,
90 &self.instance_id,
91 "node",
92 id,
93 "create",
94 &vc_json,
95 Some(&node_data.to_string()),
96 )?;
97 }
98
99 Ok(id)
100 }
101
102 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103 let conn = self.conn();
104 let mut stmt = conn.prepare(
105 "SELECT id, session_id, node_type, label, properties, embedding_id,
106 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107 FROM graph_nodes WHERE id = ?",
108 )?;
109 let mut rows = stmt.query(params![node_id])?;
110 if let Some(row) = rows.next()? {
111 Ok(Some(Self::row_to_graph_node(row)?))
112 } else {
113 Ok(None)
114 }
115 }
116
117 pub fn list_graph_nodes(
118 &self,
119 session_id: &str,
120 node_type: Option<NodeType>,
121 limit: Option<i64>,
122 ) -> Result<Vec<GraphNode>> {
123 let conn = self.conn();
124
125 let nodes = if let Some(nt) = node_type {
126 let mut stmt = conn.prepare(
127 "SELECT id, session_id, node_type, label, properties, embedding_id,
128 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130 ORDER BY id DESC LIMIT ?",
131 )?;
132 let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133 Self::collect_graph_nodes(query)?
134 } else {
135 let mut stmt = conn.prepare(
136 "SELECT id, session_id, node_type, label, properties, embedding_id,
137 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138 FROM graph_nodes WHERE session_id = ?
139 ORDER BY id DESC LIMIT ?",
140 )?;
141 let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142 Self::collect_graph_nodes(query)?
143 };
144
145 Ok(nodes)
146 }
147
148 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149 let conn = self.conn();
150 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152 Ok(count)
153 }
154
155 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156 let conn = self.conn();
157
158 let mut stmt = conn.prepare(
159 "SELECT session_id, node_type, label, vector_clock, sync_enabled
160 FROM graph_nodes WHERE id = ?",
161 )?;
162
163 let (session_id, node_type, label, current_vc_json, sync_enabled): (
164 String,
165 String,
166 String,
167 Option<String>,
168 bool,
169 ) = stmt.query_row(params![node_id], |row| {
170 Ok((
171 row.get(0)?,
172 row.get(1)?,
173 row.get(2)?,
174 row.get(3)?,
175 row.get(4).unwrap_or(false),
176 ))
177 })?;
178
179 let mut vector_clock = if let Some(vc_json) = current_vc_json {
180 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181 } else {
182 VectorClock::new()
183 };
184 vector_clock.increment(&self.instance_id);
185 let vc_json = vector_clock.to_json()?;
186
187 conn.execute(
188 "UPDATE graph_nodes
189 SET properties = ?,
190 vector_clock = ?,
191 last_modified_by = ?,
192 updated_at = CURRENT_TIMESTAMP
193 WHERE id = ?",
194 params![properties.to_string(), vc_json, self.instance_id, node_id],
195 )?;
196
197 if sync_enabled {
198 let node_data = serde_json::json!({
199 "id": node_id,
200 "session_id": session_id,
201 "node_type": node_type,
202 "label": label,
203 "properties": properties,
204 });
205
206 self.graph_changelog_append(
207 &session_id,
208 &self.instance_id,
209 "node",
210 node_id,
211 "update",
212 &vc_json,
213 Some(&node_data.to_string()),
214 )?;
215 }
216
217 Ok(())
218 }
219
220 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221 let conn = self.conn();
222
223 let mut stmt = conn.prepare(
224 "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225 FROM graph_nodes WHERE id = ?",
226 )?;
227
228 let result = stmt.query_row(params![node_id], |row| {
229 Ok((
230 row.get::<_, String>(0)?,
231 row.get::<_, String>(1)?,
232 row.get::<_, String>(2)?,
233 row.get::<_, String>(3)?,
234 row.get::<_, Option<String>>(4)?,
235 row.get::<_, bool>(5).unwrap_or(false),
236 ))
237 });
238
239 if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240 result
241 {
242 if sync_enabled {
243 let mut vector_clock = if let Some(vc_json) = current_vc_json {
244 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245 } else {
246 VectorClock::new()
247 };
248 vector_clock.increment(&self.instance_id);
249 let vc_json = vector_clock.to_json()?;
250
251 conn.execute(
252 "INSERT INTO graph_tombstones
253 (session_id, entity_type, entity_id, deleted_by, vector_clock)
254 VALUES (?, ?, ?, ?, ?)",
255 params![session_id, "node", node_id, self.instance_id, vc_json],
256 )?;
257
258 let node_data = serde_json::json!({
259 "id": node_id,
260 "session_id": session_id,
261 "node_type": node_type,
262 "label": label,
263 "properties": properties,
264 });
265
266 self.graph_changelog_append(
267 &session_id,
268 &self.instance_id,
269 "node",
270 node_id,
271 "delete",
272 &vc_json,
273 Some(&node_data.to_string()),
274 )?;
275 }
276 }
277
278 conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279 Ok(())
280 }
281
282 #[allow(clippy::too_many_arguments)]
285 pub fn insert_graph_edge(
286 &self,
287 session_id: &str,
288 source_id: i64,
289 target_id: i64,
290 edge_type: EdgeType,
291 predicate: Option<&str>,
292 properties: Option<&JsonValue>,
293 weight: f32,
294 ) -> Result<i64> {
295 let sync_enabled = self
296 .graph_get_sync_enabled(session_id, "default")
297 .unwrap_or(false);
298
299 let mut vector_clock = VectorClock::new();
300 vector_clock.increment(&self.instance_id);
301 let vc_json = vector_clock.to_json()?;
302
303 let conn = self.conn();
304
305 let mut stmt = conn.prepare(
306 "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
307 vector_clock, last_modified_by, sync_enabled)
308 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
309 )?;
310 let props_str = properties.map(|p| p.to_string());
311 let id: i64 = stmt.query_row(
312 params![
313 session_id,
314 source_id,
315 target_id,
316 edge_type.as_str(),
317 predicate,
318 props_str,
319 weight,
320 vc_json,
321 self.instance_id,
322 sync_enabled,
323 ],
324 |row| row.get(0),
325 )?;
326
327 if sync_enabled {
328 let edge_data = serde_json::json!({
329 "id": id,
330 "session_id": session_id,
331 "source_id": source_id,
332 "target_id": target_id,
333 "edge_type": edge_type.as_str(),
334 "predicate": predicate,
335 "properties": properties,
336 "weight": weight,
337 });
338
339 self.graph_changelog_append(
340 session_id,
341 &self.instance_id,
342 "edge",
343 id,
344 "insert",
345 &vc_json,
346 Some(&edge_data.to_string()),
347 )?;
348 }
349
350 Ok(id)
351 }
352
353 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
354 let conn = self.conn();
355 let mut stmt = conn.prepare(
356 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
357 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
358 FROM graph_edges WHERE id = ?",
359 )?;
360 let mut rows = stmt.query(params![edge_id])?;
361 if let Some(row) = rows.next()? {
362 Ok(Some(Self::row_to_graph_edge(row)?))
363 } else {
364 Ok(None)
365 }
366 }
367
368 pub fn list_graph_edges(
369 &self,
370 session_id: &str,
371 source_id: Option<i64>,
372 target_id: Option<i64>,
373 ) -> Result<Vec<GraphEdge>> {
374 let conn = self.conn();
375
376 let edges = match (source_id, target_id) {
377 (Some(src), Some(tgt)) => {
378 let mut stmt = conn.prepare(
379 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
380 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
381 FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
382 )?;
383 let query = stmt.query(params![session_id, src, tgt])?;
384 Self::collect_graph_edges(query)?
385 }
386 (Some(src), None) => {
387 let mut stmt = conn.prepare(
388 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
389 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
390 FROM graph_edges WHERE session_id = ? AND source_id = ?",
391 )?;
392 let query = stmt.query(params![session_id, src])?;
393 Self::collect_graph_edges(query)?
394 }
395 (None, Some(tgt)) => {
396 let mut stmt = conn.prepare(
397 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
398 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
399 FROM graph_edges WHERE session_id = ? AND target_id = ?",
400 )?;
401 let query = stmt.query(params![session_id, tgt])?;
402 Self::collect_graph_edges(query)?
403 }
404 (None, None) => {
405 let mut stmt = conn.prepare(
406 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
407 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
408 FROM graph_edges WHERE session_id = ?",
409 )?;
410 let query = stmt.query(params![session_id])?;
411 Self::collect_graph_edges(query)?
412 }
413 };
414
415 Ok(edges)
416 }
417
418 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
419 let conn = self.conn();
420 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
421 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
422 Ok(count)
423 }
424
425 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
426 let conn = self.conn();
427
428 let mut stmt = conn.prepare(
429 "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
430 vector_clock, sync_enabled
431 FROM graph_edges WHERE id = ?",
432 )?;
433
434 let result = stmt.query_row(params![edge_id], |row| {
435 Ok((
436 row.get::<_, String>(0)?,
437 row.get::<_, i64>(1)?,
438 row.get::<_, i64>(2)?,
439 row.get::<_, String>(3)?,
440 row.get::<_, Option<String>>(4)?,
441 row.get::<_, Option<String>>(5)?,
442 row.get::<_, f32>(6)?,
443 row.get::<_, Option<String>>(7)?,
444 row.get::<_, bool>(8).unwrap_or(false),
445 ))
446 });
447
448 if let Ok((
449 session_id,
450 source_id,
451 target_id,
452 edge_type,
453 predicate,
454 properties,
455 weight,
456 current_vc_json,
457 sync_enabled,
458 )) = result
459 {
460 if sync_enabled {
461 let mut vector_clock = if let Some(vc_json) = current_vc_json {
462 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
463 } else {
464 VectorClock::new()
465 };
466 vector_clock.increment(&self.instance_id);
467 let vc_json = vector_clock.to_json()?;
468
469 conn.execute(
470 "INSERT INTO graph_tombstones
471 (session_id, entity_type, entity_id, deleted_by, vector_clock)
472 VALUES (?, ?, ?, ?, ?)",
473 params![session_id, "edge", edge_id, self.instance_id, vc_json],
474 )?;
475
476 let edge_data = serde_json::json!({
477 "id": edge_id,
478 "session_id": session_id,
479 "source_id": source_id,
480 "target_id": target_id,
481 "edge_type": edge_type,
482 "predicate": predicate,
483 "properties": properties,
484 "weight": weight,
485 });
486
487 self.graph_changelog_append(
488 &session_id,
489 &self.instance_id,
490 "edge",
491 edge_id,
492 "delete",
493 &vc_json,
494 Some(&edge_data.to_string()),
495 )?;
496 }
497 }
498
499 conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
500 Ok(())
501 }
502
503 pub fn find_shortest_path(
506 &self,
507 session_id: &str,
508 source_id: i64,
509 target_id: i64,
510 max_hops: Option<usize>,
511 ) -> Result<Option<GraphPath>> {
512 let max_depth = max_hops.unwrap_or(10);
513
514 let mut visited = HashSet::new();
515 let mut queue = VecDeque::new();
516 let mut parent_map = HashMap::new();
517
518 queue.push_back((source_id, 0));
519 visited.insert(source_id);
520
521 while let Some((current_id, depth)) = queue.pop_front() {
522 if current_id == target_id {
523 let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
524 return Ok(Some(path));
525 }
526
527 if depth >= max_depth {
528 continue;
529 }
530
531 let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
532 for edge in edges {
533 let target = edge.target_id;
534 if !visited.contains(&target) {
535 visited.insert(target);
536 parent_map.insert(target, (current_id, edge));
537 queue.push_back((target, depth + 1));
538 }
539 }
540 }
541
542 Ok(None)
543 }
544
545 pub fn traverse_neighbors(
546 &self,
547 session_id: &str,
548 node_id: i64,
549 direction: TraversalDirection,
550 depth: usize,
551 ) -> Result<Vec<GraphNode>> {
552 if depth == 0 {
553 return Ok(vec![]);
554 }
555
556 let mut visited = HashSet::new();
557 let mut result = Vec::new();
558 let mut queue = VecDeque::new();
559
560 queue.push_back((node_id, 0));
561 visited.insert(node_id);
562
563 while let Some((current_id, current_depth)) = queue.pop_front() {
564 if current_depth > 0 {
565 if let Some(node) = self.get_graph_node(current_id)? {
566 result.push(node);
567 }
568 }
569
570 if current_depth >= depth {
571 continue;
572 }
573
574 let edges = match direction {
575 TraversalDirection::Outgoing => {
576 self.list_graph_edges(session_id, Some(current_id), None)?
577 }
578 TraversalDirection::Incoming => {
579 self.list_graph_edges(session_id, None, Some(current_id))?
580 }
581 TraversalDirection::Both => {
582 let mut out_edges =
583 self.list_graph_edges(session_id, Some(current_id), None)?;
584 let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
585 out_edges.extend(in_edges);
586 out_edges
587 }
588 };
589
590 for edge in edges {
591 let next_id = match direction {
592 TraversalDirection::Outgoing => edge.target_id,
593 TraversalDirection::Incoming => edge.source_id,
594 TraversalDirection::Both => {
595 if edge.source_id == current_id {
596 edge.target_id
597 } else {
598 edge.source_id
599 }
600 }
601 };
602
603 if !visited.contains(&next_id) {
604 visited.insert(next_id);
605 queue.push_back((next_id, current_depth + 1));
606 }
607 }
608 }
609
610 Ok(result)
611 }
612
613 fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
614 let id: i64 = row.get(0)?;
615 let session_id: String = row.get(1)?;
616 let node_type: String = row.get(2)?;
617 let label: String = row.get(3)?;
618 let properties: String = row.get(4)?;
619 let embedding_id: Option<i64> = row.get(5)?;
620 let created_at: String = row.get(6)?;
621 let updated_at: String = row.get(7)?;
622
623 Ok(GraphNode {
624 id,
625 session_id,
626 node_type: NodeType::from_str(&node_type),
627 label,
628 properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
629 embedding_id,
630 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
631 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
632 })
633 }
634
635 fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
636 let id: i64 = row.get(0)?;
637 let session_id: String = row.get(1)?;
638 let source_id: i64 = row.get(2)?;
639 let target_id: i64 = row.get(3)?;
640 let edge_type: String = row.get(4)?;
641 let predicate: Option<String> = row.get(5)?;
642 let properties: Option<String> = row.get(6)?;
643 let weight: f32 = row.get(7)?;
644 let temporal_start: Option<String> = row.get(8)?;
645 let temporal_end: Option<String> = row.get(9)?;
646 let created_at: String = row.get(10)?;
647
648 Ok(GraphEdge {
649 id,
650 session_id,
651 source_id,
652 target_id,
653 edge_type: EdgeType::from_str(&edge_type),
654 predicate,
655 properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
656 weight,
657 temporal_start: temporal_start.and_then(|s| s.parse().ok()),
658 temporal_end: temporal_end.and_then(|s| s.parse().ok()),
659 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
660 })
661 }
662
663 fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
664 let mut nodes = Vec::new();
665 while let Some(row) = rows.next()? {
666 nodes.push(Self::row_to_graph_node(row)?);
667 }
668 Ok(nodes)
669 }
670
671 fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
672 let mut edges = Vec::new();
673 while let Some(row) = rows.next()? {
674 edges.push(Self::row_to_graph_edge(row)?);
675 }
676 Ok(edges)
677 }
678
679 fn reconstruct_path(
680 &self,
681 parent_map: &HashMap<i64, (i64, GraphEdge)>,
682 source_id: i64,
683 target_id: i64,
684 ) -> Result<GraphPath> {
685 let mut path_edges = Vec::new();
686 let mut path_nodes = Vec::new();
687 let mut current = target_id;
688 let mut total_weight = 0.0;
689
690 while current != source_id {
691 if let Some((parent, edge)) = parent_map.get(¤t) {
692 path_edges.push(edge.clone());
693 total_weight += edge.weight;
694 current = *parent;
695 } else {
696 break;
697 }
698 }
699
700 path_edges.reverse();
701
702 if let Some(node) = self.get_graph_node(source_id)? {
703 path_nodes.push(node);
704 }
705 for edge in &path_edges {
706 if let Some(node) = self.get_graph_node(edge.target_id)? {
707 path_nodes.push(node);
708 }
709 }
710
711 Ok(GraphPath {
712 length: path_edges.len(),
713 weight: total_weight,
714 nodes: path_nodes,
715 edges: path_edges,
716 })
717 }
718
719 #[allow(clippy::too_many_arguments)]
722 pub fn graph_changelog_append(
723 &self,
724 session_id: &str,
725 instance_id: &str,
726 entity_type: &str,
727 entity_id: i64,
728 operation: &str,
729 vector_clock: &str,
730 data: Option<&str>,
731 ) -> Result<i64> {
732 let conn = self.conn();
733 let mut stmt = conn.prepare(
734 "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
735 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
736 )?;
737 let id: i64 = stmt.query_row(
738 params![
739 session_id,
740 instance_id,
741 entity_type,
742 entity_id,
743 operation,
744 vector_clock,
745 data
746 ],
747 |row| row.get(0),
748 )?;
749 Ok(id)
750 }
751
752 pub fn graph_changelog_get_since(
753 &self,
754 session_id: &str,
755 since_timestamp: &str,
756 ) -> Result<Vec<ChangelogEntry>> {
757 let conn = self.conn();
758 let mut stmt = conn.prepare(
759 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
760 FROM graph_changelog
761 WHERE session_id = ? AND created_at > ?
762 ORDER BY created_at ASC",
763 )?;
764 let mut rows = stmt.query(params![session_id, since_timestamp])?;
765 let mut entries = Vec::new();
766 while let Some(row) = rows.next()? {
767 entries.push(ChangelogEntry::from_row(row)?);
768 }
769 Ok(entries)
770 }
771
772 pub fn graph_list_conflicts(
773 &self,
774 session_id: Option<&str>,
775 limit: usize,
776 ) -> Result<Vec<ChangelogEntry>> {
777 let conn = self.conn();
778 let mut entries = Vec::new();
779
780 if let Some(sid) = session_id {
781 let mut stmt = conn.prepare(
782 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
783 FROM graph_changelog
784 WHERE operation = 'conflict' AND session_id = ?
785 ORDER BY created_at DESC
786 LIMIT ?",
787 )?;
788 let mut rows = stmt.query(params![sid, limit as i64])?;
789 while let Some(row) = rows.next()? {
790 entries.push(ChangelogEntry::from_row(row)?);
791 }
792 } else {
793 let mut stmt = conn.prepare(
794 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
795 FROM graph_changelog
796 WHERE operation = 'conflict'
797 ORDER BY created_at DESC
798 LIMIT ?",
799 )?;
800 let mut rows = stmt.query(params![limit as i64])?;
801 while let Some(row) = rows.next()? {
802 entries.push(ChangelogEntry::from_row(row)?);
803 }
804 }
805
806 Ok(entries)
807 }
808
809 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
810 let conn = self.conn();
811 let cutoff = Utc::now() - Duration::days(days_to_keep);
812 let cutoff_str = cutoff.to_rfc3339();
813 let deleted = conn.execute(
814 "DELETE FROM graph_changelog WHERE created_at < ?",
815 params![cutoff_str],
816 )?;
817 Ok(deleted)
818 }
819
820 pub fn graph_sync_state_get_metadata(
821 &self,
822 instance_id: &str,
823 session_id: &str,
824 graph_name: &str,
825 ) -> Result<Option<SyncStateRecord>> {
826 let conn = self.conn();
827 let result: Result<SyncStateRecord, _> = conn.query_row(
828 "SELECT vector_clock, CAST(last_sync_at AS TEXT)
829 FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
830 params![instance_id, session_id, graph_name],
831 |row| {
832 Ok(SyncStateRecord {
833 vector_clock: row.get(0)?,
834 last_sync_at: row.get(1).ok(),
835 })
836 },
837 );
838
839 match result {
840 Ok(record) => Ok(Some(record)),
841 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
842 Err(e) => Err(e.into()),
843 }
844 }
845
846 pub fn graph_sync_state_get(
847 &self,
848 instance_id: &str,
849 session_id: &str,
850 graph_name: &str,
851 ) -> Result<Option<String>> {
852 Ok(self
853 .graph_sync_state_get_metadata(instance_id, session_id, graph_name)?
854 .map(|r| r.vector_clock))
855 }
856
857 pub fn graph_sync_state_update(
858 &self,
859 instance_id: &str,
860 session_id: &str,
861 graph_name: &str,
862 vector_clock: &str,
863 ) -> Result<()> {
864 let conn = self.conn();
865 conn.execute(
866 "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock, last_sync_at)
867 VALUES (?, ?, ?, ?, now())
868 ON CONFLICT (instance_id, session_id, graph_name)
869 DO UPDATE SET vector_clock = EXCLUDED.vector_clock, last_sync_at = now()",
870 params![instance_id, session_id, graph_name, vector_clock],
871 )?;
872 Ok(())
873 }
874
875 pub fn graph_set_sync_enabled(
876 &self,
877 session_id: &str,
878 graph_name: &str,
879 enabled: bool,
880 ) -> Result<()> {
881 let conn = self.conn();
882 conn.execute(
884 "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
885 VALUES (?, ?, ?)
886 ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
887 params![session_id, graph_name, enabled],
888 )?;
889 Ok(())
890 }
891
892 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
893 let conn = self.conn();
894 let result: Result<bool, _> = conn.query_row(
895 "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
896 params![session_id, graph_name],
897 |row| row.get(0),
898 );
899 match result {
900 Ok(enabled) => Ok(enabled),
901 Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
902 Err(e) => Err(e.into()),
903 }
904 }
905
906 pub fn graph_set_sync_config(
907 &self,
908 session_id: &str,
909 graph_name: &str,
910 sync_enabled: bool,
911 conflict_resolution_strategy: Option<&str>,
912 sync_interval_seconds: Option<u64>,
913 ) -> Result<GraphSyncConfig> {
914 let conn = self.conn();
915
916 let existing_config_value: JsonValue = conn
918 .query_row(
919 "SELECT config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
920 params![session_id, graph_name],
921 |row| row.get::<_, Option<String>>(0),
922 )
923 .unwrap_or(None)
924 .as_deref()
925 .and_then(|s| serde_json::from_str(s).ok())
926 .unwrap_or_else(|| JsonValue::Object(Map::new()));
927
928 let mut root_obj = existing_config_value
929 .as_object()
930 .cloned()
931 .unwrap_or_default();
932 let mut sync_obj = root_obj
933 .get("sync")
934 .and_then(|v| v.as_object())
935 .cloned()
936 .unwrap_or_default();
937
938 let final_strategy = conflict_resolution_strategy
939 .map(|s| s.to_string())
940 .or_else(|| {
941 sync_obj
942 .get("conflict_resolution_strategy")
943 .and_then(|v| v.as_str().map(|s| s.to_string()))
944 })
945 .or_else(|| Some("vector_clock".to_string()));
946
947 let final_interval = sync_interval_seconds
948 .or_else(|| {
949 sync_obj
950 .get("sync_interval_seconds")
951 .and_then(|v| v.as_u64())
952 })
953 .or(Some(60));
954
955 if let Some(strategy) = final_strategy.clone() {
956 sync_obj.insert(
957 "conflict_resolution_strategy".to_string(),
958 JsonValue::String(strategy),
959 );
960 }
961 if let Some(interval) = final_interval {
962 sync_obj.insert(
963 "sync_interval_seconds".to_string(),
964 JsonValue::from(interval),
965 );
966 }
967
968 root_obj.insert("sync".to_string(), JsonValue::Object(sync_obj));
969 let merged_config = JsonValue::Object(root_obj).to_string();
970
971 conn.execute(
972 "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled, config, updated_at)
973 VALUES (?, ?, ?, ?, now())
974 ON CONFLICT (session_id, graph_name)
975 DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled,
976 config = EXCLUDED.config,
977 updated_at = now()",
978 params![session_id, graph_name, sync_enabled, merged_config],
979 )?;
980
981 Ok(GraphSyncConfig {
982 sync_enabled,
983 conflict_resolution_strategy: final_strategy,
984 sync_interval_seconds: final_interval,
985 })
986 }
987
988 pub fn graph_get_sync_config(
989 &self,
990 session_id: &str,
991 graph_name: &str,
992 ) -> Result<GraphSyncConfig> {
993 let conn = self.conn();
994 let result: Result<(bool, Option<String>), _> = conn.query_row(
995 "SELECT sync_enabled, config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
996 params![session_id, graph_name],
997 |row| Ok((row.get(0)?, row.get(1)?)),
998 );
999
1000 match result {
1001 Ok((sync_enabled, config_json)) => {
1002 let config_value: JsonValue = config_json
1003 .as_deref()
1004 .and_then(|s| serde_json::from_str(s).ok())
1005 .unwrap_or_else(|| JsonValue::Object(Map::new()));
1006 let sync_obj = config_value
1007 .get("sync")
1008 .and_then(|v| v.as_object())
1009 .cloned()
1010 .unwrap_or_default();
1011
1012 Ok(GraphSyncConfig {
1013 sync_enabled,
1014 conflict_resolution_strategy: sync_obj
1015 .get("conflict_resolution_strategy")
1016 .and_then(|v| v.as_str())
1017 .map(|s| s.to_string())
1018 .or_else(|| Some("vector_clock".to_string())),
1019 sync_interval_seconds: sync_obj
1020 .get("sync_interval_seconds")
1021 .and_then(|v| v.as_u64())
1022 .or(Some(60)),
1023 })
1024 }
1025 Err(duckdb::Error::QueryReturnedNoRows) => Ok(GraphSyncConfig::default()),
1026 Err(e) => Err(e.into()),
1027 }
1028 }
1029
1030 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1031 let conn = self.conn();
1032 let mut stmt = conn.prepare(
1033 "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1034 UNION
1035 SELECT DISTINCT 'default' as graph_name
1036 FROM graph_nodes WHERE session_id = ?
1037 ORDER BY graph_name",
1038 )?;
1039
1040 let mut graphs = Vec::new();
1041 let mut rows = stmt.query(params![session_id, session_id])?;
1042 while let Some(row) = rows.next()? {
1043 let graph_name: String = row.get(0)?;
1044 graphs.push(graph_name);
1045 }
1046
1047 if graphs.is_empty() {
1048 let node_count: i64 = conn.query_row(
1049 "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1050 params![session_id],
1051 |row| row.get(0),
1052 )?;
1053 if node_count > 0 {
1054 graphs.push("default".to_string());
1055 }
1056 }
1057
1058 Ok(graphs)
1059 }
1060
1061 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
1063 let conn = self.conn();
1064 let mut stmt = conn.prepare(
1065 "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
1066 )?;
1067
1068 let mut results = Vec::new();
1069 let mut rows = stmt.query(params![])?;
1070 while let Some(row) = rows.next()? {
1071 let session_id: String = row.get(0)?;
1072 let graph_name: String = row.get(1)?;
1073 results.push((session_id, graph_name));
1074 }
1075
1076 Ok(results)
1077 }
1078
1079 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1080 let conn = self.conn();
1081 let result: Result<SyncedNodeRecord, _> = conn.query_row(
1082 "SELECT id, session_id, node_type, label, properties, embedding_id,
1083 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1084 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1085 FROM graph_nodes WHERE id = ?",
1086 params![node_id],
1087 SyncedNodeRecord::from_row,
1088 );
1089 match result {
1090 Ok(node) => Ok(Some(node)),
1091 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1092 Err(e) => Err(e.into()),
1093 }
1094 }
1095
1096 pub fn graph_list_nodes_with_sync(
1097 &self,
1098 session_id: &str,
1099 sync_enabled_only: bool,
1100 include_deleted: bool,
1101 ) -> Result<Vec<SyncedNodeRecord>> {
1102 let conn = self.conn();
1103 let mut query = String::from(
1104 "SELECT id, session_id, node_type, label, properties, embedding_id,
1105 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1106 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1107 FROM graph_nodes WHERE session_id = ?",
1108 );
1109
1110 if sync_enabled_only {
1111 query.push_str(" AND sync_enabled = TRUE");
1112 }
1113 if !include_deleted {
1114 query.push_str(" AND is_deleted = FALSE");
1115 }
1116 query.push_str(" ORDER BY created_at ASC");
1117
1118 let mut stmt = conn.prepare(&query)?;
1119 let mut rows = stmt.query(params![session_id])?;
1120 let mut nodes = Vec::new();
1121 while let Some(row) = rows.next()? {
1122 nodes.push(SyncedNodeRecord::from_row(row)?);
1123 }
1124 Ok(nodes)
1125 }
1126
1127 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1128 let conn = self.conn();
1129 let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1130 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1131 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1132 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1133 FROM graph_edges WHERE id = ?",
1134 params![edge_id],
1135 SyncedEdgeRecord::from_row,
1136 );
1137 match result {
1138 Ok(edge) => Ok(Some(edge)),
1139 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1140 Err(e) => Err(e.into()),
1141 }
1142 }
1143
1144 pub fn graph_list_edges_with_sync(
1145 &self,
1146 session_id: &str,
1147 sync_enabled_only: bool,
1148 include_deleted: bool,
1149 ) -> Result<Vec<SyncedEdgeRecord>> {
1150 let conn = self.conn();
1151 let mut query = String::from(
1152 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1153 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1154 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1155 FROM graph_edges WHERE session_id = ?",
1156 );
1157
1158 if sync_enabled_only {
1159 query.push_str(" AND sync_enabled = TRUE");
1160 }
1161 if !include_deleted {
1162 query.push_str(" AND is_deleted = FALSE");
1163 }
1164 query.push_str(" ORDER BY created_at ASC");
1165
1166 let mut stmt = conn.prepare(&query)?;
1167 let mut rows = stmt.query(params![session_id])?;
1168 let mut edges = Vec::new();
1169 while let Some(row) = rows.next()? {
1170 edges.push(SyncedEdgeRecord::from_row(row)?);
1171 }
1172 Ok(edges)
1173 }
1174
1175 pub fn graph_update_node_sync_metadata(
1176 &self,
1177 node_id: i64,
1178 vector_clock: &str,
1179 last_modified_by: &str,
1180 sync_enabled: bool,
1181 ) -> Result<()> {
1182 let conn = self.conn();
1183 conn.execute(
1184 "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1185 WHERE id = ?",
1186 params![vector_clock, last_modified_by, sync_enabled, node_id],
1187 )?;
1188 Ok(())
1189 }
1190
1191 pub fn graph_update_edge_sync_metadata(
1192 &self,
1193 edge_id: i64,
1194 vector_clock: &str,
1195 last_modified_by: &str,
1196 sync_enabled: bool,
1197 ) -> Result<()> {
1198 let conn = self.conn();
1199 conn.execute(
1200 "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1201 WHERE id = ?",
1202 params![vector_clock, last_modified_by, sync_enabled, edge_id],
1203 )?;
1204 Ok(())
1205 }
1206
1207 pub fn graph_mark_node_deleted(
1208 &self,
1209 node_id: i64,
1210 vector_clock: &str,
1211 deleted_by: &str,
1212 ) -> Result<()> {
1213 let conn = self.conn();
1214 conn.execute(
1215 "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1216 WHERE id = ?",
1217 params![vector_clock, deleted_by, node_id],
1218 )?;
1219 Ok(())
1220 }
1221
1222 pub fn graph_mark_edge_deleted(
1223 &self,
1224 edge_id: i64,
1225 vector_clock: &str,
1226 deleted_by: &str,
1227 ) -> Result<()> {
1228 let conn = self.conn();
1229 conn.execute(
1230 "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1231 WHERE id = ?",
1232 params![vector_clock, deleted_by, edge_id],
1233 )?;
1234 Ok(())
1235 }
1236}
1237
1238#[derive(Debug, Clone)]
1239pub struct SyncStateRecord {
1240 pub vector_clock: String,
1241 pub last_sync_at: Option<String>,
1242}
1243
1244#[derive(Debug, Clone)]
1245pub struct GraphSyncConfig {
1246 pub sync_enabled: bool,
1247 pub conflict_resolution_strategy: Option<String>,
1248 pub sync_interval_seconds: Option<u64>,
1249}
1250
1251impl Default for GraphSyncConfig {
1252 fn default() -> Self {
1253 Self {
1254 sync_enabled: false,
1255 conflict_resolution_strategy: Some("vector_clock".to_string()),
1256 sync_interval_seconds: Some(60),
1257 }
1258 }
1259}
1260
1261#[derive(Debug, Clone)]
1262pub struct ChangelogEntry {
1263 pub id: i64,
1264 pub session_id: String,
1265 pub instance_id: String,
1266 pub entity_type: String,
1267 pub entity_id: i64,
1268 pub operation: String,
1269 pub vector_clock: String,
1270 pub data: Option<String>,
1271 pub created_at: DateTime<Utc>,
1272}
1273
1274impl ChangelogEntry {
1275 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1276 let id: i64 = row.get(0)?;
1277 let session_id: String = row.get(1)?;
1278 let instance_id: String = row.get(2)?;
1279 let entity_type: String = row.get(3)?;
1280 let entity_id: i64 = row.get(4)?;
1281 let operation: String = row.get(5)?;
1282 let vector_clock: String = row.get(6)?;
1283 let data: Option<String> = row.get(7)?;
1284 let created_at_str: String = row.get(8)?;
1285
1286 Ok(ChangelogEntry {
1287 id,
1288 session_id,
1289 instance_id,
1290 entity_type,
1291 entity_id,
1292 operation,
1293 vector_clock,
1294 data,
1295 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1296 })
1297 }
1298}
1299
1300#[derive(Debug, Clone)]
1301pub struct SyncedNodeRecord {
1302 pub id: i64,
1303 pub session_id: String,
1304 pub node_type: String,
1305 pub label: String,
1306 pub properties: JsonValue,
1307 pub embedding_id: Option<i64>,
1308 pub created_at: DateTime<Utc>,
1309 pub updated_at: DateTime<Utc>,
1310 pub vector_clock: String,
1311 pub last_modified_by: Option<String>,
1312 pub is_deleted: bool,
1313 pub sync_enabled: bool,
1314}
1315
1316impl SyncedNodeRecord {
1317 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1318 let id: i64 = row.get(0)?;
1319 let session_id: String = row.get(1)?;
1320 let node_type: String = row.get(2)?;
1321 let label: String = row.get(3)?;
1322 let properties_str: String = row.get(4)?;
1323 let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1324 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1325 })?;
1326 let embedding_id: Option<i64> = row.get(5)?;
1327 let created_at_str: String = row.get(6)?;
1328 let updated_at_str: String = row.get(7)?;
1329 let vector_clock: String = row.get(8)?;
1330 let last_modified_by: Option<String> = row.get(9)?;
1331 let is_deleted: bool = row.get(10)?;
1332 let sync_enabled: bool = row.get(11)?;
1333
1334 Ok(SyncedNodeRecord {
1335 id,
1336 session_id,
1337 node_type,
1338 label,
1339 properties,
1340 embedding_id,
1341 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1342 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1343 vector_clock,
1344 last_modified_by,
1345 is_deleted,
1346 sync_enabled,
1347 })
1348 }
1349}
1350
1351#[derive(Debug, Clone)]
1352pub struct SyncedEdgeRecord {
1353 pub id: i64,
1354 pub session_id: String,
1355 pub source_id: i64,
1356 pub target_id: i64,
1357 pub edge_type: String,
1358 pub predicate: Option<String>,
1359 pub properties: Option<JsonValue>,
1360 pub weight: f32,
1361 pub temporal_start: Option<DateTime<Utc>>,
1362 pub temporal_end: Option<DateTime<Utc>>,
1363 pub created_at: DateTime<Utc>,
1364 pub vector_clock: String,
1365 pub last_modified_by: Option<String>,
1366 pub is_deleted: bool,
1367 pub sync_enabled: bool,
1368}
1369
1370impl SyncedEdgeRecord {
1371 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1372 let id: i64 = row.get(0)?;
1373 let session_id: String = row.get(1)?;
1374 let source_id: i64 = row.get(2)?;
1375 let target_id: i64 = row.get(3)?;
1376 let edge_type: String = row.get(4)?;
1377 let predicate: Option<String> = row.get(5)?;
1378 let properties_str: Option<String> = row.get(6)?;
1379 let properties: Option<JsonValue> = properties_str
1380 .as_ref()
1381 .and_then(|s| serde_json::from_str(s).ok());
1382 let weight: f32 = row.get(7)?;
1383 let temporal_start_str: Option<String> = row.get(8)?;
1384 let temporal_end_str: Option<String> = row.get(9)?;
1385 let created_at_str: String = row.get(10)?;
1386 let vector_clock: String = row.get(11)?;
1387 let last_modified_by: Option<String> = row.get(12)?;
1388 let is_deleted: bool = row.get(13)?;
1389 let sync_enabled: bool = row.get(14)?;
1390
1391 Ok(SyncedEdgeRecord {
1392 id,
1393 session_id,
1394 source_id,
1395 target_id,
1396 edge_type,
1397 predicate,
1398 properties,
1399 weight,
1400 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1401 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1402 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1403 vector_clock,
1404 last_modified_by,
1405 is_deleted,
1406 sync_enabled,
1407 })
1408 }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413 use super::*;
1414 use anyhow::Result;
1415 use serde_json::json;
1416
1417 fn setup_store() -> KnowledgeGraphStore {
1418 setup_store_with(|_| {})
1419 }
1420
1421 fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1422 where
1423 F: FnOnce(&Connection),
1424 {
1425 let conn = Connection::open_in_memory().expect("open in-memory database");
1426 conn.execute_batch(
1427 r#"
1428 CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1429 CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1430 CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1431 CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1432 CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1433
1434 CREATE TABLE graph_nodes (
1435 id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1436 session_id TEXT NOT NULL,
1437 node_type TEXT NOT NULL,
1438 label TEXT NOT NULL,
1439 properties TEXT NOT NULL,
1440 embedding_id BIGINT,
1441 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1442 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1443 vector_clock TEXT DEFAULT '{}',
1444 last_modified_by TEXT,
1445 is_deleted BOOLEAN DEFAULT FALSE,
1446 sync_enabled BOOLEAN DEFAULT FALSE
1447 );
1448
1449 CREATE TABLE graph_edges (
1450 id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1451 session_id TEXT NOT NULL,
1452 source_id BIGINT NOT NULL,
1453 target_id BIGINT NOT NULL,
1454 edge_type TEXT NOT NULL,
1455 predicate TEXT,
1456 properties TEXT,
1457 weight REAL DEFAULT 1.0,
1458 temporal_start TIMESTAMP,
1459 temporal_end TIMESTAMP,
1460 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1461 vector_clock TEXT DEFAULT '{}',
1462 last_modified_by TEXT,
1463 is_deleted BOOLEAN DEFAULT FALSE,
1464 sync_enabled BOOLEAN DEFAULT FALSE
1465 );
1466
1467 CREATE TABLE graph_metadata (
1468 id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1469 session_id TEXT NOT NULL,
1470 graph_name TEXT NOT NULL,
1471 is_created BOOLEAN DEFAULT FALSE,
1472 schema_version INTEGER DEFAULT 1,
1473 config TEXT,
1474 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1475 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1476 sync_enabled BOOLEAN DEFAULT FALSE,
1477 UNIQUE(session_id, graph_name)
1478 );
1479
1480 CREATE TABLE graph_changelog (
1481 id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1482 session_id TEXT NOT NULL,
1483 instance_id TEXT NOT NULL,
1484 entity_type TEXT NOT NULL,
1485 entity_id BIGINT NOT NULL,
1486 operation TEXT NOT NULL,
1487 vector_clock TEXT NOT NULL,
1488 data TEXT,
1489 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1490 );
1491
1492 CREATE TABLE graph_sync_state (
1493 instance_id TEXT NOT NULL,
1494 session_id TEXT NOT NULL,
1495 graph_name TEXT NOT NULL,
1496 vector_clock TEXT NOT NULL,
1497 last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1498 PRIMARY KEY (instance_id, session_id, graph_name)
1499 );
1500
1501 CREATE TABLE graph_tombstones (
1502 id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1503 session_id TEXT NOT NULL,
1504 entity_type TEXT NOT NULL,
1505 entity_id BIGINT NOT NULL,
1506 deleted_by TEXT NOT NULL,
1507 vector_clock TEXT NOT NULL,
1508 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1509 );
1510 "#,
1511 )
1512 .expect("create graph schema");
1513
1514 extra(&conn);
1515
1516 KnowledgeGraphStore::from_connection(conn, "test-instance")
1517 }
1518
1519 #[test]
1520 fn insert_update_delete_node_flow() -> Result<()> {
1521 let store = setup_store();
1522 let props = json!({ "kind": "repository" });
1523 let node_id =
1524 store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1525
1526 let nodes = store.list_graph_nodes("session", None, Some(10))?;
1527 assert_eq!(nodes.len(), 1);
1528 assert_eq!(nodes[0].label, "SpecAI");
1529
1530 let updated_props = json!({ "kind": "repository", "stars": 42 });
1531 store.update_graph_node(node_id, &updated_props)?;
1532 let updated = store.get_graph_node(node_id)?.expect("node exists");
1533 assert_eq!(updated.properties["stars"], 42);
1534
1535 store.delete_graph_node(node_id)?;
1536 assert!(store.get_graph_node(node_id)?.is_none());
1537 Ok(())
1538 }
1539
1540 #[test]
1541 fn create_edges_and_find_paths() -> Result<()> {
1542 let store = setup_store();
1543 let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1544 let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1545 let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1546
1547 store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1548 store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1549
1550 let path = store
1551 .find_shortest_path("session", a, c, Some(5))?
1552 .expect("path exists");
1553 assert_eq!(path.nodes.len(), 3);
1554 assert_eq!(path.edges.len(), 2);
1555 assert_eq!(path.length, 2);
1556 assert_eq!(path.nodes.first().unwrap().label, "A");
1557 assert_eq!(path.nodes.last().unwrap().label, "C");
1558
1559 let edges = store.list_graph_edges("session", None, None)?;
1560 assert_eq!(edges.len(), 2);
1561
1562 Ok(())
1563 }
1564
1565 #[test]
1566 fn traverse_neighbors_respects_direction() -> Result<()> {
1567 let store = setup_store();
1568 let alpha =
1569 store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1570 let beta =
1571 store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1572 let gamma =
1573 store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1574
1575 store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1576 store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1577
1578 let outgoing =
1579 store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1580 assert_eq!(outgoing.len(), 2);
1581 assert!(outgoing.iter().any(|node| node.label == "Beta"));
1582 assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1583
1584 let incoming =
1585 store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1586 assert_eq!(incoming.len(), 2);
1587 assert!(incoming.iter().any(|node| node.label == "Beta"));
1588 assert!(incoming.iter().any(|node| node.label == "Alpha"));
1589
1590 Ok(())
1591 }
1592
1593 #[test]
1594 fn sync_config_round_trip() -> Result<()> {
1595 let store = setup_store();
1596
1597 let saved = store.graph_set_sync_config(
1598 "session",
1599 "default",
1600 true,
1601 Some("last_write_wins"),
1602 Some(120),
1603 )?;
1604 assert!(saved.sync_enabled);
1605 assert_eq!(
1606 saved.conflict_resolution_strategy.as_deref(),
1607 Some("last_write_wins")
1608 );
1609 assert_eq!(saved.sync_interval_seconds, Some(120));
1610
1611 let fetched = store.graph_get_sync_config("session", "default")?;
1612 assert!(fetched.sync_enabled);
1613 assert_eq!(
1614 fetched.conflict_resolution_strategy.as_deref(),
1615 Some("last_write_wins")
1616 );
1617 assert_eq!(fetched.sync_interval_seconds, Some(120));
1618
1619 let defaults = store.graph_get_sync_config("other_session", "missing")?;
1620 assert!(!defaults.sync_enabled);
1621 assert_eq!(
1622 defaults.conflict_resolution_strategy.as_deref(),
1623 Some("vector_clock")
1624 );
1625 assert_eq!(defaults.sync_interval_seconds, Some(60));
1626
1627 Ok(())
1628 }
1629
1630 #[test]
1631 fn sync_state_metadata_round_trip() -> Result<()> {
1632 let store = setup_store();
1633 store.graph_sync_state_update("instance", "session", "graph", r#"{"a":1}"#)?;
1634
1635 let state = store
1636 .graph_sync_state_get_metadata("instance", "session", "graph")?
1637 .expect("state exists");
1638
1639 assert_eq!(state.vector_clock, r#"{"a":1}"#);
1640 assert!(state.last_sync_at.is_some());
1641
1642 Ok(())
1643 }
1644
1645 #[test]
1646 fn graph_conflict_listing_filters() -> Result<()> {
1647 let store = setup_store();
1648 let vc_json = VectorClock::new().to_json()?;
1649
1650 let conflict_payload = json!({
1651 "graph_name": "g1",
1652 "local_version": { "id": 1 },
1653 "remote_version": { "id": 2 },
1654 "resolution": "Test"
1655 });
1656
1657 store.graph_changelog_append(
1658 "session_one",
1659 "inst1",
1660 "node",
1661 1,
1662 "conflict",
1663 &vc_json,
1664 Some(&conflict_payload.to_string()),
1665 )?;
1666
1667 let second_payload = json!({
1668 "graph_name": "g2",
1669 "local_version": { "id": 3 },
1670 "remote_version": { "id": 4 }
1671 });
1672
1673 store.graph_changelog_append(
1674 "session_two",
1675 "inst1",
1676 "edge",
1677 2,
1678 "conflict",
1679 &vc_json,
1680 Some(&second_payload.to_string()),
1681 )?;
1682
1683 let all_conflicts = store.graph_list_conflicts(None, 10)?;
1684 assert_eq!(all_conflicts.len(), 2);
1685
1686 let session_filtered = store.graph_list_conflicts(Some("session_one"), 10)?;
1687 assert_eq!(session_filtered.len(), 1);
1688 assert_eq!(session_filtered[0].session_id, "session_one");
1689 assert_eq!(session_filtered[0].entity_id, 1);
1690
1691 Ok(())
1692 }
1693}