1use crate::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{params, Connection};
6use serde_json::{Map, Value as JsonValue};
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12 conn: Arc<Mutex<Connection>>,
13 instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17 pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18 Self {
19 conn,
20 instance_id: instance_id.into(),
21 }
22 }
23
24 pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25 Self {
26 conn: Arc::new(Mutex::new(conn)),
27 instance_id: instance_id.into(),
28 }
29 }
30
31 pub fn instance_id(&self) -> &str {
32 &self.instance_id
33 }
34
35 fn conn(&self) -> MutexGuard<'_, Connection> {
36 self.conn.lock().expect("database connection poisoned")
37 }
38
39 pub fn insert_graph_node(
42 &self,
43 session_id: &str,
44 node_type: NodeType,
45 label: &str,
46 properties: &JsonValue,
47 embedding_id: Option<i64>,
48 ) -> Result<i64> {
49 let sync_enabled = self
50 .graph_get_sync_enabled(session_id, "default")
51 .unwrap_or(false);
52
53 let mut vector_clock = VectorClock::new();
54 vector_clock.increment(&self.instance_id);
55 let vc_json = vector_clock.to_json()?;
56
57 let conn = self.conn();
58
59 let mut stmt = conn.prepare(
60 "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61 vector_clock, last_modified_by, sync_enabled)
62 VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63 )?;
64 let id: i64 = stmt.query_row(
65 params![
66 session_id,
67 node_type.as_str(),
68 label,
69 properties.to_string(),
70 embedding_id,
71 vc_json,
72 self.instance_id,
73 sync_enabled,
74 ],
75 |row| row.get(0),
76 )?;
77
78 if sync_enabled {
79 let node_data = serde_json::json!({
80 "id": id,
81 "session_id": session_id,
82 "node_type": node_type.as_str(),
83 "label": label,
84 "properties": properties,
85 "embedding_id": embedding_id,
86 });
87
88 self.graph_changelog_append(
89 session_id,
90 &self.instance_id,
91 "node",
92 id,
93 "create",
94 &vc_json,
95 Some(&node_data.to_string()),
96 )?;
97 }
98
99 Ok(id)
100 }
101
102 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103 let conn = self.conn();
104 let mut stmt = conn.prepare(
105 "SELECT id, session_id, node_type, label, properties, embedding_id,
106 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107 FROM graph_nodes WHERE id = ?",
108 )?;
109 let mut rows = stmt.query(params![node_id])?;
110 if let Some(row) = rows.next()? {
111 Ok(Some(Self::row_to_graph_node(row)?))
112 } else {
113 Ok(None)
114 }
115 }
116
117 pub fn list_graph_nodes(
118 &self,
119 session_id: &str,
120 node_type: Option<NodeType>,
121 limit: Option<i64>,
122 ) -> Result<Vec<GraphNode>> {
123 let conn = self.conn();
124
125 let nodes = if let Some(nt) = node_type {
126 let mut stmt = conn.prepare(
127 "SELECT id, session_id, node_type, label, properties, embedding_id,
128 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130 ORDER BY id DESC LIMIT ?",
131 )?;
132 let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133 Self::collect_graph_nodes(query)?
134 } else {
135 let mut stmt = conn.prepare(
136 "SELECT id, session_id, node_type, label, properties, embedding_id,
137 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138 FROM graph_nodes WHERE session_id = ?
139 ORDER BY id DESC LIMIT ?",
140 )?;
141 let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142 Self::collect_graph_nodes(query)?
143 };
144
145 Ok(nodes)
146 }
147
148 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149 let conn = self.conn();
150 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152 Ok(count)
153 }
154
155 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156 let conn = self.conn();
157
158 let mut stmt = conn.prepare(
159 "SELECT session_id, node_type, label, vector_clock, sync_enabled
160 FROM graph_nodes WHERE id = ?",
161 )?;
162
163 let (session_id, node_type, label, current_vc_json, sync_enabled): (
164 String,
165 String,
166 String,
167 Option<String>,
168 bool,
169 ) = stmt.query_row(params![node_id], |row| {
170 Ok((
171 row.get(0)?,
172 row.get(1)?,
173 row.get(2)?,
174 row.get(3)?,
175 row.get(4).unwrap_or(false),
176 ))
177 })?;
178
179 let mut vector_clock = if let Some(vc_json) = current_vc_json {
180 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181 } else {
182 VectorClock::new()
183 };
184 vector_clock.increment(&self.instance_id);
185 let vc_json = vector_clock.to_json()?;
186
187 conn.execute(
188 "UPDATE graph_nodes
189 SET properties = ?,
190 vector_clock = ?,
191 last_modified_by = ?,
192 updated_at = CURRENT_TIMESTAMP
193 WHERE id = ?",
194 params![properties.to_string(), vc_json, self.instance_id, node_id],
195 )?;
196
197 if sync_enabled {
198 let node_data = serde_json::json!({
199 "id": node_id,
200 "session_id": session_id,
201 "node_type": node_type,
202 "label": label,
203 "properties": properties,
204 });
205
206 self.graph_changelog_append(
207 &session_id,
208 &self.instance_id,
209 "node",
210 node_id,
211 "update",
212 &vc_json,
213 Some(&node_data.to_string()),
214 )?;
215 }
216
217 Ok(())
218 }
219
220 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221 let conn = self.conn();
222
223 let mut stmt = conn.prepare(
224 "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225 FROM graph_nodes WHERE id = ?",
226 )?;
227
228 let result = stmt.query_row(params![node_id], |row| {
229 Ok((
230 row.get::<_, String>(0)?,
231 row.get::<_, String>(1)?,
232 row.get::<_, String>(2)?,
233 row.get::<_, String>(3)?,
234 row.get::<_, Option<String>>(4)?,
235 row.get::<_, bool>(5).unwrap_or(false),
236 ))
237 });
238
239 if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240 result
241 {
242 if sync_enabled {
243 let mut vector_clock = if let Some(vc_json) = current_vc_json {
244 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245 } else {
246 VectorClock::new()
247 };
248 vector_clock.increment(&self.instance_id);
249 let vc_json = vector_clock.to_json()?;
250
251 conn.execute(
252 "INSERT INTO graph_tombstones
253 (session_id, entity_type, entity_id, deleted_by, vector_clock)
254 VALUES (?, ?, ?, ?, ?)",
255 params![session_id, "node", node_id, self.instance_id, vc_json],
256 )?;
257
258 let node_data = serde_json::json!({
259 "id": node_id,
260 "session_id": session_id,
261 "node_type": node_type,
262 "label": label,
263 "properties": properties,
264 });
265
266 self.graph_changelog_append(
267 &session_id,
268 &self.instance_id,
269 "node",
270 node_id,
271 "delete",
272 &vc_json,
273 Some(&node_data.to_string()),
274 )?;
275 }
276 }
277
278 conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279 Ok(())
280 }
281
282 pub fn insert_graph_edge(
285 &self,
286 session_id: &str,
287 source_id: i64,
288 target_id: i64,
289 edge_type: EdgeType,
290 predicate: Option<&str>,
291 properties: Option<&JsonValue>,
292 weight: f32,
293 ) -> Result<i64> {
294 let sync_enabled = self
295 .graph_get_sync_enabled(session_id, "default")
296 .unwrap_or(false);
297
298 let mut vector_clock = VectorClock::new();
299 vector_clock.increment(&self.instance_id);
300 let vc_json = vector_clock.to_json()?;
301
302 let conn = self.conn();
303
304 let mut stmt = conn.prepare(
305 "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
306 vector_clock, last_modified_by, sync_enabled)
307 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
308 )?;
309 let props_str = properties.map(|p| p.to_string());
310 let id: i64 = stmt.query_row(
311 params![
312 session_id,
313 source_id,
314 target_id,
315 edge_type.as_str(),
316 predicate,
317 props_str,
318 weight,
319 vc_json,
320 self.instance_id,
321 sync_enabled,
322 ],
323 |row| row.get(0),
324 )?;
325
326 if sync_enabled {
327 let edge_data = serde_json::json!({
328 "id": id,
329 "session_id": session_id,
330 "source_id": source_id,
331 "target_id": target_id,
332 "edge_type": edge_type.as_str(),
333 "predicate": predicate,
334 "properties": properties,
335 "weight": weight,
336 });
337
338 self.graph_changelog_append(
339 session_id,
340 &self.instance_id,
341 "edge",
342 id,
343 "insert",
344 &vc_json,
345 Some(&edge_data.to_string()),
346 )?;
347 }
348
349 Ok(id)
350 }
351
352 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
353 let conn = self.conn();
354 let mut stmt = conn.prepare(
355 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
356 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
357 FROM graph_edges WHERE id = ?",
358 )?;
359 let mut rows = stmt.query(params![edge_id])?;
360 if let Some(row) = rows.next()? {
361 Ok(Some(Self::row_to_graph_edge(row)?))
362 } else {
363 Ok(None)
364 }
365 }
366
367 pub fn list_graph_edges(
368 &self,
369 session_id: &str,
370 source_id: Option<i64>,
371 target_id: Option<i64>,
372 ) -> Result<Vec<GraphEdge>> {
373 let conn = self.conn();
374
375 let edges = match (source_id, target_id) {
376 (Some(src), Some(tgt)) => {
377 let mut stmt = conn.prepare(
378 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
379 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
380 FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
381 )?;
382 let query = stmt.query(params![session_id, src, tgt])?;
383 Self::collect_graph_edges(query)?
384 }
385 (Some(src), None) => {
386 let mut stmt = conn.prepare(
387 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
388 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
389 FROM graph_edges WHERE session_id = ? AND source_id = ?",
390 )?;
391 let query = stmt.query(params![session_id, src])?;
392 Self::collect_graph_edges(query)?
393 }
394 (None, Some(tgt)) => {
395 let mut stmt = conn.prepare(
396 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
397 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
398 FROM graph_edges WHERE session_id = ? AND target_id = ?",
399 )?;
400 let query = stmt.query(params![session_id, tgt])?;
401 Self::collect_graph_edges(query)?
402 }
403 (None, None) => {
404 let mut stmt = conn.prepare(
405 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
406 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
407 FROM graph_edges WHERE session_id = ?",
408 )?;
409 let query = stmt.query(params![session_id])?;
410 Self::collect_graph_edges(query)?
411 }
412 };
413
414 Ok(edges)
415 }
416
417 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
418 let conn = self.conn();
419 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
420 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
421 Ok(count)
422 }
423
424 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
425 let conn = self.conn();
426
427 let mut stmt = conn.prepare(
428 "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
429 vector_clock, sync_enabled
430 FROM graph_edges WHERE id = ?",
431 )?;
432
433 let result = stmt.query_row(params![edge_id], |row| {
434 Ok((
435 row.get::<_, String>(0)?,
436 row.get::<_, i64>(1)?,
437 row.get::<_, i64>(2)?,
438 row.get::<_, String>(3)?,
439 row.get::<_, Option<String>>(4)?,
440 row.get::<_, Option<String>>(5)?,
441 row.get::<_, f32>(6)?,
442 row.get::<_, Option<String>>(7)?,
443 row.get::<_, bool>(8).unwrap_or(false),
444 ))
445 });
446
447 if let Ok((
448 session_id,
449 source_id,
450 target_id,
451 edge_type,
452 predicate,
453 properties,
454 weight,
455 current_vc_json,
456 sync_enabled,
457 )) = result
458 {
459 if sync_enabled {
460 let mut vector_clock = if let Some(vc_json) = current_vc_json {
461 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
462 } else {
463 VectorClock::new()
464 };
465 vector_clock.increment(&self.instance_id);
466 let vc_json = vector_clock.to_json()?;
467
468 conn.execute(
469 "INSERT INTO graph_tombstones
470 (session_id, entity_type, entity_id, deleted_by, vector_clock)
471 VALUES (?, ?, ?, ?, ?)",
472 params![session_id, "edge", edge_id, self.instance_id, vc_json],
473 )?;
474
475 let edge_data = serde_json::json!({
476 "id": edge_id,
477 "session_id": session_id,
478 "source_id": source_id,
479 "target_id": target_id,
480 "edge_type": edge_type,
481 "predicate": predicate,
482 "properties": properties,
483 "weight": weight,
484 });
485
486 self.graph_changelog_append(
487 &session_id,
488 &self.instance_id,
489 "edge",
490 edge_id,
491 "delete",
492 &vc_json,
493 Some(&edge_data.to_string()),
494 )?;
495 }
496 }
497
498 conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
499 Ok(())
500 }
501
502 pub fn find_shortest_path(
505 &self,
506 session_id: &str,
507 source_id: i64,
508 target_id: i64,
509 max_hops: Option<usize>,
510 ) -> Result<Option<GraphPath>> {
511 let max_depth = max_hops.unwrap_or(10);
512
513 let mut visited = HashSet::new();
514 let mut queue = VecDeque::new();
515 let mut parent_map = HashMap::new();
516
517 queue.push_back((source_id, 0));
518 visited.insert(source_id);
519
520 while let Some((current_id, depth)) = queue.pop_front() {
521 if current_id == target_id {
522 let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
523 return Ok(Some(path));
524 }
525
526 if depth >= max_depth {
527 continue;
528 }
529
530 let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
531 for edge in edges {
532 let target = edge.target_id;
533 if !visited.contains(&target) {
534 visited.insert(target);
535 parent_map.insert(target, (current_id, edge));
536 queue.push_back((target, depth + 1));
537 }
538 }
539 }
540
541 Ok(None)
542 }
543
544 pub fn traverse_neighbors(
545 &self,
546 session_id: &str,
547 node_id: i64,
548 direction: TraversalDirection,
549 depth: usize,
550 ) -> Result<Vec<GraphNode>> {
551 if depth == 0 {
552 return Ok(vec![]);
553 }
554
555 let mut visited = HashSet::new();
556 let mut result = Vec::new();
557 let mut queue = VecDeque::new();
558
559 queue.push_back((node_id, 0));
560 visited.insert(node_id);
561
562 while let Some((current_id, current_depth)) = queue.pop_front() {
563 if current_depth > 0 {
564 if let Some(node) = self.get_graph_node(current_id)? {
565 result.push(node);
566 }
567 }
568
569 if current_depth >= depth {
570 continue;
571 }
572
573 let edges = match direction {
574 TraversalDirection::Outgoing => {
575 self.list_graph_edges(session_id, Some(current_id), None)?
576 }
577 TraversalDirection::Incoming => {
578 self.list_graph_edges(session_id, None, Some(current_id))?
579 }
580 TraversalDirection::Both => {
581 let mut out_edges =
582 self.list_graph_edges(session_id, Some(current_id), None)?;
583 let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
584 out_edges.extend(in_edges);
585 out_edges
586 }
587 };
588
589 for edge in edges {
590 let next_id = match direction {
591 TraversalDirection::Outgoing => edge.target_id,
592 TraversalDirection::Incoming => edge.source_id,
593 TraversalDirection::Both => {
594 if edge.source_id == current_id {
595 edge.target_id
596 } else {
597 edge.source_id
598 }
599 }
600 };
601
602 if !visited.contains(&next_id) {
603 visited.insert(next_id);
604 queue.push_back((next_id, current_depth + 1));
605 }
606 }
607 }
608
609 Ok(result)
610 }
611
612 fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
613 let id: i64 = row.get(0)?;
614 let session_id: String = row.get(1)?;
615 let node_type: String = row.get(2)?;
616 let label: String = row.get(3)?;
617 let properties: String = row.get(4)?;
618 let embedding_id: Option<i64> = row.get(5)?;
619 let created_at: String = row.get(6)?;
620 let updated_at: String = row.get(7)?;
621
622 Ok(GraphNode {
623 id,
624 session_id,
625 node_type: NodeType::from_str(&node_type),
626 label,
627 properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
628 embedding_id,
629 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
630 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
631 })
632 }
633
634 fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
635 let id: i64 = row.get(0)?;
636 let session_id: String = row.get(1)?;
637 let source_id: i64 = row.get(2)?;
638 let target_id: i64 = row.get(3)?;
639 let edge_type: String = row.get(4)?;
640 let predicate: Option<String> = row.get(5)?;
641 let properties: Option<String> = row.get(6)?;
642 let weight: f32 = row.get(7)?;
643 let temporal_start: Option<String> = row.get(8)?;
644 let temporal_end: Option<String> = row.get(9)?;
645 let created_at: String = row.get(10)?;
646
647 Ok(GraphEdge {
648 id,
649 session_id,
650 source_id,
651 target_id,
652 edge_type: EdgeType::from_str(&edge_type),
653 predicate,
654 properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
655 weight,
656 temporal_start: temporal_start.and_then(|s| s.parse().ok()),
657 temporal_end: temporal_end.and_then(|s| s.parse().ok()),
658 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
659 })
660 }
661
662 fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
663 let mut nodes = Vec::new();
664 while let Some(row) = rows.next()? {
665 nodes.push(Self::row_to_graph_node(row)?);
666 }
667 Ok(nodes)
668 }
669
670 fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
671 let mut edges = Vec::new();
672 while let Some(row) = rows.next()? {
673 edges.push(Self::row_to_graph_edge(row)?);
674 }
675 Ok(edges)
676 }
677
678 fn reconstruct_path(
679 &self,
680 parent_map: &HashMap<i64, (i64, GraphEdge)>,
681 source_id: i64,
682 target_id: i64,
683 ) -> Result<GraphPath> {
684 let mut path_edges = Vec::new();
685 let mut path_nodes = Vec::new();
686 let mut current = target_id;
687 let mut total_weight = 0.0;
688
689 while current != source_id {
690 if let Some((parent, edge)) = parent_map.get(¤t) {
691 path_edges.push(edge.clone());
692 total_weight += edge.weight;
693 current = *parent;
694 } else {
695 break;
696 }
697 }
698
699 path_edges.reverse();
700
701 if let Some(node) = self.get_graph_node(source_id)? {
702 path_nodes.push(node);
703 }
704 for edge in &path_edges {
705 if let Some(node) = self.get_graph_node(edge.target_id)? {
706 path_nodes.push(node);
707 }
708 }
709
710 Ok(GraphPath {
711 length: path_edges.len(),
712 weight: total_weight,
713 nodes: path_nodes,
714 edges: path_edges,
715 })
716 }
717
718 pub fn graph_changelog_append(
721 &self,
722 session_id: &str,
723 instance_id: &str,
724 entity_type: &str,
725 entity_id: i64,
726 operation: &str,
727 vector_clock: &str,
728 data: Option<&str>,
729 ) -> Result<i64> {
730 let conn = self.conn();
731 let mut stmt = conn.prepare(
732 "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
733 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
734 )?;
735 let id: i64 = stmt.query_row(
736 params![
737 session_id,
738 instance_id,
739 entity_type,
740 entity_id,
741 operation,
742 vector_clock,
743 data
744 ],
745 |row| row.get(0),
746 )?;
747 Ok(id)
748 }
749
750 pub fn graph_changelog_get_since(
751 &self,
752 session_id: &str,
753 since_timestamp: &str,
754 ) -> Result<Vec<ChangelogEntry>> {
755 let conn = self.conn();
756 let mut stmt = conn.prepare(
757 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
758 FROM graph_changelog
759 WHERE session_id = ? AND created_at > ?
760 ORDER BY created_at ASC",
761 )?;
762 let mut rows = stmt.query(params![session_id, since_timestamp])?;
763 let mut entries = Vec::new();
764 while let Some(row) = rows.next()? {
765 entries.push(ChangelogEntry::from_row(row)?);
766 }
767 Ok(entries)
768 }
769
770 pub fn graph_list_conflicts(
771 &self,
772 session_id: Option<&str>,
773 limit: usize,
774 ) -> Result<Vec<ChangelogEntry>> {
775 let conn = self.conn();
776 let mut entries = Vec::new();
777
778 if let Some(sid) = session_id {
779 let mut stmt = conn.prepare(
780 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
781 FROM graph_changelog
782 WHERE operation = 'conflict' AND session_id = ?
783 ORDER BY created_at DESC
784 LIMIT ?",
785 )?;
786 let mut rows = stmt.query(params![sid, limit as i64])?;
787 while let Some(row) = rows.next()? {
788 entries.push(ChangelogEntry::from_row(row)?);
789 }
790 } else {
791 let mut stmt = conn.prepare(
792 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
793 FROM graph_changelog
794 WHERE operation = 'conflict'
795 ORDER BY created_at DESC
796 LIMIT ?",
797 )?;
798 let mut rows = stmt.query(params![limit as i64])?;
799 while let Some(row) = rows.next()? {
800 entries.push(ChangelogEntry::from_row(row)?);
801 }
802 }
803
804 Ok(entries)
805 }
806
807 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
808 let conn = self.conn();
809 let cutoff = Utc::now() - Duration::days(days_to_keep);
810 let cutoff_str = cutoff.to_rfc3339();
811 let deleted = conn.execute(
812 "DELETE FROM graph_changelog WHERE created_at < ?",
813 params![cutoff_str],
814 )?;
815 Ok(deleted)
816 }
817
818 pub fn graph_sync_state_get_metadata(
819 &self,
820 instance_id: &str,
821 session_id: &str,
822 graph_name: &str,
823 ) -> Result<Option<SyncStateRecord>> {
824 let conn = self.conn();
825 let result: Result<SyncStateRecord, _> = conn.query_row(
826 "SELECT vector_clock, CAST(last_sync_at AS TEXT)
827 FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
828 params![instance_id, session_id, graph_name],
829 |row| {
830 Ok(SyncStateRecord {
831 vector_clock: row.get(0)?,
832 last_sync_at: row.get(1).ok(),
833 })
834 },
835 );
836
837 match result {
838 Ok(record) => Ok(Some(record)),
839 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
840 Err(e) => Err(e.into()),
841 }
842 }
843
844 pub fn graph_sync_state_get(
845 &self,
846 instance_id: &str,
847 session_id: &str,
848 graph_name: &str,
849 ) -> Result<Option<String>> {
850 Ok(self
851 .graph_sync_state_get_metadata(instance_id, session_id, graph_name)?
852 .map(|r| r.vector_clock))
853 }
854
855 pub fn graph_sync_state_update(
856 &self,
857 instance_id: &str,
858 session_id: &str,
859 graph_name: &str,
860 vector_clock: &str,
861 ) -> Result<()> {
862 let conn = self.conn();
863 conn.execute(
864 "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock, last_sync_at)
865 VALUES (?, ?, ?, ?, now())
866 ON CONFLICT (instance_id, session_id, graph_name)
867 DO UPDATE SET vector_clock = EXCLUDED.vector_clock, last_sync_at = now()",
868 params![instance_id, session_id, graph_name, vector_clock],
869 )?;
870 Ok(())
871 }
872
873 pub fn graph_set_sync_enabled(
874 &self,
875 session_id: &str,
876 graph_name: &str,
877 enabled: bool,
878 ) -> Result<()> {
879 let conn = self.conn();
880 conn.execute(
882 "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
883 VALUES (?, ?, ?)
884 ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
885 params![session_id, graph_name, enabled],
886 )?;
887 Ok(())
888 }
889
890 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
891 let conn = self.conn();
892 let result: Result<bool, _> = conn.query_row(
893 "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
894 params![session_id, graph_name],
895 |row| row.get(0),
896 );
897 match result {
898 Ok(enabled) => Ok(enabled),
899 Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
900 Err(e) => Err(e.into()),
901 }
902 }
903
904 pub fn graph_set_sync_config(
905 &self,
906 session_id: &str,
907 graph_name: &str,
908 sync_enabled: bool,
909 conflict_resolution_strategy: Option<&str>,
910 sync_interval_seconds: Option<u64>,
911 ) -> Result<GraphSyncConfig> {
912 let conn = self.conn();
913
914 let existing_config_value: JsonValue = conn
916 .query_row(
917 "SELECT config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
918 params![session_id, graph_name],
919 |row| row.get::<_, Option<String>>(0),
920 )
921 .unwrap_or(None)
922 .as_deref()
923 .and_then(|s| serde_json::from_str(s).ok())
924 .unwrap_or_else(|| JsonValue::Object(Map::new()));
925
926 let mut root_obj = existing_config_value
927 .as_object()
928 .cloned()
929 .unwrap_or_default();
930 let mut sync_obj = root_obj
931 .get("sync")
932 .and_then(|v| v.as_object())
933 .cloned()
934 .unwrap_or_default();
935
936 let final_strategy = conflict_resolution_strategy
937 .map(|s| s.to_string())
938 .or_else(|| {
939 sync_obj
940 .get("conflict_resolution_strategy")
941 .and_then(|v| v.as_str().map(|s| s.to_string()))
942 })
943 .or_else(|| Some("vector_clock".to_string()));
944
945 let final_interval = sync_interval_seconds
946 .or_else(|| {
947 sync_obj
948 .get("sync_interval_seconds")
949 .and_then(|v| v.as_u64())
950 })
951 .or(Some(60));
952
953 if let Some(strategy) = final_strategy.clone() {
954 sync_obj.insert(
955 "conflict_resolution_strategy".to_string(),
956 JsonValue::String(strategy),
957 );
958 }
959 if let Some(interval) = final_interval {
960 sync_obj.insert(
961 "sync_interval_seconds".to_string(),
962 JsonValue::from(interval),
963 );
964 }
965
966 root_obj.insert("sync".to_string(), JsonValue::Object(sync_obj));
967 let merged_config = JsonValue::Object(root_obj).to_string();
968
969 conn.execute(
970 "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled, config, updated_at)
971 VALUES (?, ?, ?, ?, now())
972 ON CONFLICT (session_id, graph_name)
973 DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled,
974 config = EXCLUDED.config,
975 updated_at = now()",
976 params![session_id, graph_name, sync_enabled, merged_config],
977 )?;
978
979 Ok(GraphSyncConfig {
980 sync_enabled,
981 conflict_resolution_strategy: final_strategy,
982 sync_interval_seconds: final_interval,
983 })
984 }
985
986 pub fn graph_get_sync_config(
987 &self,
988 session_id: &str,
989 graph_name: &str,
990 ) -> Result<GraphSyncConfig> {
991 let conn = self.conn();
992 let result: Result<(bool, Option<String>), _> = conn.query_row(
993 "SELECT sync_enabled, config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
994 params![session_id, graph_name],
995 |row| Ok((row.get(0)?, row.get(1)?)),
996 );
997
998 match result {
999 Ok((sync_enabled, config_json)) => {
1000 let config_value: JsonValue = config_json
1001 .as_deref()
1002 .and_then(|s| serde_json::from_str(s).ok())
1003 .unwrap_or_else(|| JsonValue::Object(Map::new()));
1004 let sync_obj = config_value
1005 .get("sync")
1006 .and_then(|v| v.as_object())
1007 .cloned()
1008 .unwrap_or_default();
1009
1010 Ok(GraphSyncConfig {
1011 sync_enabled,
1012 conflict_resolution_strategy: sync_obj
1013 .get("conflict_resolution_strategy")
1014 .and_then(|v| v.as_str())
1015 .map(|s| s.to_string())
1016 .or_else(|| Some("vector_clock".to_string())),
1017 sync_interval_seconds: sync_obj
1018 .get("sync_interval_seconds")
1019 .and_then(|v| v.as_u64())
1020 .or(Some(60)),
1021 })
1022 }
1023 Err(duckdb::Error::QueryReturnedNoRows) => Ok(GraphSyncConfig::default()),
1024 Err(e) => Err(e.into()),
1025 }
1026 }
1027
1028 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1029 let conn = self.conn();
1030 let mut stmt = conn.prepare(
1031 "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1032 UNION
1033 SELECT DISTINCT 'default' as graph_name
1034 FROM graph_nodes WHERE session_id = ?
1035 ORDER BY graph_name",
1036 )?;
1037
1038 let mut graphs = Vec::new();
1039 let mut rows = stmt.query(params![session_id, session_id])?;
1040 while let Some(row) = rows.next()? {
1041 let graph_name: String = row.get(0)?;
1042 graphs.push(graph_name);
1043 }
1044
1045 if graphs.is_empty() {
1046 let node_count: i64 = conn.query_row(
1047 "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1048 params![session_id],
1049 |row| row.get(0),
1050 )?;
1051 if node_count > 0 {
1052 graphs.push("default".to_string());
1053 }
1054 }
1055
1056 Ok(graphs)
1057 }
1058
1059 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
1061 let conn = self.conn();
1062 let mut stmt = conn.prepare(
1063 "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
1064 )?;
1065
1066 let mut results = Vec::new();
1067 let mut rows = stmt.query(params![])?;
1068 while let Some(row) = rows.next()? {
1069 let session_id: String = row.get(0)?;
1070 let graph_name: String = row.get(1)?;
1071 results.push((session_id, graph_name));
1072 }
1073
1074 Ok(results)
1075 }
1076
1077 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1078 let conn = self.conn();
1079 let result: Result<SyncedNodeRecord, _> = conn.query_row(
1080 "SELECT id, session_id, node_type, label, properties, embedding_id,
1081 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1082 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1083 FROM graph_nodes WHERE id = ?",
1084 params![node_id],
1085 SyncedNodeRecord::from_row,
1086 );
1087 match result {
1088 Ok(node) => Ok(Some(node)),
1089 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1090 Err(e) => Err(e.into()),
1091 }
1092 }
1093
1094 pub fn graph_list_nodes_with_sync(
1095 &self,
1096 session_id: &str,
1097 sync_enabled_only: bool,
1098 include_deleted: bool,
1099 ) -> Result<Vec<SyncedNodeRecord>> {
1100 let conn = self.conn();
1101 let mut query = String::from(
1102 "SELECT id, session_id, node_type, label, properties, embedding_id,
1103 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1104 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1105 FROM graph_nodes WHERE session_id = ?",
1106 );
1107
1108 if sync_enabled_only {
1109 query.push_str(" AND sync_enabled = TRUE");
1110 }
1111 if !include_deleted {
1112 query.push_str(" AND is_deleted = FALSE");
1113 }
1114 query.push_str(" ORDER BY created_at ASC");
1115
1116 let mut stmt = conn.prepare(&query)?;
1117 let mut rows = stmt.query(params![session_id])?;
1118 let mut nodes = Vec::new();
1119 while let Some(row) = rows.next()? {
1120 nodes.push(SyncedNodeRecord::from_row(row)?);
1121 }
1122 Ok(nodes)
1123 }
1124
1125 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1126 let conn = self.conn();
1127 let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1128 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1129 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1130 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1131 FROM graph_edges WHERE id = ?",
1132 params![edge_id],
1133 SyncedEdgeRecord::from_row,
1134 );
1135 match result {
1136 Ok(edge) => Ok(Some(edge)),
1137 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1138 Err(e) => Err(e.into()),
1139 }
1140 }
1141
1142 pub fn graph_list_edges_with_sync(
1143 &self,
1144 session_id: &str,
1145 sync_enabled_only: bool,
1146 include_deleted: bool,
1147 ) -> Result<Vec<SyncedEdgeRecord>> {
1148 let conn = self.conn();
1149 let mut query = String::from(
1150 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1151 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1152 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1153 FROM graph_edges WHERE session_id = ?",
1154 );
1155
1156 if sync_enabled_only {
1157 query.push_str(" AND sync_enabled = TRUE");
1158 }
1159 if !include_deleted {
1160 query.push_str(" AND is_deleted = FALSE");
1161 }
1162 query.push_str(" ORDER BY created_at ASC");
1163
1164 let mut stmt = conn.prepare(&query)?;
1165 let mut rows = stmt.query(params![session_id])?;
1166 let mut edges = Vec::new();
1167 while let Some(row) = rows.next()? {
1168 edges.push(SyncedEdgeRecord::from_row(row)?);
1169 }
1170 Ok(edges)
1171 }
1172
1173 pub fn graph_update_node_sync_metadata(
1174 &self,
1175 node_id: i64,
1176 vector_clock: &str,
1177 last_modified_by: &str,
1178 sync_enabled: bool,
1179 ) -> Result<()> {
1180 let conn = self.conn();
1181 conn.execute(
1182 "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1183 WHERE id = ?",
1184 params![vector_clock, last_modified_by, sync_enabled, node_id],
1185 )?;
1186 Ok(())
1187 }
1188
1189 pub fn graph_update_edge_sync_metadata(
1190 &self,
1191 edge_id: i64,
1192 vector_clock: &str,
1193 last_modified_by: &str,
1194 sync_enabled: bool,
1195 ) -> Result<()> {
1196 let conn = self.conn();
1197 conn.execute(
1198 "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1199 WHERE id = ?",
1200 params![vector_clock, last_modified_by, sync_enabled, edge_id],
1201 )?;
1202 Ok(())
1203 }
1204
1205 pub fn graph_mark_node_deleted(
1206 &self,
1207 node_id: i64,
1208 vector_clock: &str,
1209 deleted_by: &str,
1210 ) -> Result<()> {
1211 let conn = self.conn();
1212 conn.execute(
1213 "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1214 WHERE id = ?",
1215 params![vector_clock, deleted_by, node_id],
1216 )?;
1217 Ok(())
1218 }
1219
1220 pub fn graph_mark_edge_deleted(
1221 &self,
1222 edge_id: i64,
1223 vector_clock: &str,
1224 deleted_by: &str,
1225 ) -> Result<()> {
1226 let conn = self.conn();
1227 conn.execute(
1228 "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1229 WHERE id = ?",
1230 params![vector_clock, deleted_by, edge_id],
1231 )?;
1232 Ok(())
1233 }
1234}
1235
1236#[derive(Debug, Clone)]
1237pub struct SyncStateRecord {
1238 pub vector_clock: String,
1239 pub last_sync_at: Option<String>,
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct GraphSyncConfig {
1244 pub sync_enabled: bool,
1245 pub conflict_resolution_strategy: Option<String>,
1246 pub sync_interval_seconds: Option<u64>,
1247}
1248
1249impl Default for GraphSyncConfig {
1250 fn default() -> Self {
1251 Self {
1252 sync_enabled: false,
1253 conflict_resolution_strategy: Some("vector_clock".to_string()),
1254 sync_interval_seconds: Some(60),
1255 }
1256 }
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct ChangelogEntry {
1261 pub id: i64,
1262 pub session_id: String,
1263 pub instance_id: String,
1264 pub entity_type: String,
1265 pub entity_id: i64,
1266 pub operation: String,
1267 pub vector_clock: String,
1268 pub data: Option<String>,
1269 pub created_at: DateTime<Utc>,
1270}
1271
1272impl ChangelogEntry {
1273 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1274 let id: i64 = row.get(0)?;
1275 let session_id: String = row.get(1)?;
1276 let instance_id: String = row.get(2)?;
1277 let entity_type: String = row.get(3)?;
1278 let entity_id: i64 = row.get(4)?;
1279 let operation: String = row.get(5)?;
1280 let vector_clock: String = row.get(6)?;
1281 let data: Option<String> = row.get(7)?;
1282 let created_at_str: String = row.get(8)?;
1283
1284 Ok(ChangelogEntry {
1285 id,
1286 session_id,
1287 instance_id,
1288 entity_type,
1289 entity_id,
1290 operation,
1291 vector_clock,
1292 data,
1293 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1294 })
1295 }
1296}
1297
1298#[derive(Debug, Clone)]
1299pub struct SyncedNodeRecord {
1300 pub id: i64,
1301 pub session_id: String,
1302 pub node_type: String,
1303 pub label: String,
1304 pub properties: JsonValue,
1305 pub embedding_id: Option<i64>,
1306 pub created_at: DateTime<Utc>,
1307 pub updated_at: DateTime<Utc>,
1308 pub vector_clock: String,
1309 pub last_modified_by: Option<String>,
1310 pub is_deleted: bool,
1311 pub sync_enabled: bool,
1312}
1313
1314impl SyncedNodeRecord {
1315 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1316 let id: i64 = row.get(0)?;
1317 let session_id: String = row.get(1)?;
1318 let node_type: String = row.get(2)?;
1319 let label: String = row.get(3)?;
1320 let properties_str: String = row.get(4)?;
1321 let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1322 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1323 })?;
1324 let embedding_id: Option<i64> = row.get(5)?;
1325 let created_at_str: String = row.get(6)?;
1326 let updated_at_str: String = row.get(7)?;
1327 let vector_clock: String = row.get(8)?;
1328 let last_modified_by: Option<String> = row.get(9)?;
1329 let is_deleted: bool = row.get(10)?;
1330 let sync_enabled: bool = row.get(11)?;
1331
1332 Ok(SyncedNodeRecord {
1333 id,
1334 session_id,
1335 node_type,
1336 label,
1337 properties,
1338 embedding_id,
1339 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1340 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1341 vector_clock,
1342 last_modified_by,
1343 is_deleted,
1344 sync_enabled,
1345 })
1346 }
1347}
1348
1349#[derive(Debug, Clone)]
1350pub struct SyncedEdgeRecord {
1351 pub id: i64,
1352 pub session_id: String,
1353 pub source_id: i64,
1354 pub target_id: i64,
1355 pub edge_type: String,
1356 pub predicate: Option<String>,
1357 pub properties: Option<JsonValue>,
1358 pub weight: f32,
1359 pub temporal_start: Option<DateTime<Utc>>,
1360 pub temporal_end: Option<DateTime<Utc>>,
1361 pub created_at: DateTime<Utc>,
1362 pub vector_clock: String,
1363 pub last_modified_by: Option<String>,
1364 pub is_deleted: bool,
1365 pub sync_enabled: bool,
1366}
1367
1368impl SyncedEdgeRecord {
1369 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1370 let id: i64 = row.get(0)?;
1371 let session_id: String = row.get(1)?;
1372 let source_id: i64 = row.get(2)?;
1373 let target_id: i64 = row.get(3)?;
1374 let edge_type: String = row.get(4)?;
1375 let predicate: Option<String> = row.get(5)?;
1376 let properties_str: Option<String> = row.get(6)?;
1377 let properties: Option<JsonValue> = properties_str
1378 .as_ref()
1379 .and_then(|s| serde_json::from_str(s).ok());
1380 let weight: f32 = row.get(7)?;
1381 let temporal_start_str: Option<String> = row.get(8)?;
1382 let temporal_end_str: Option<String> = row.get(9)?;
1383 let created_at_str: String = row.get(10)?;
1384 let vector_clock: String = row.get(11)?;
1385 let last_modified_by: Option<String> = row.get(12)?;
1386 let is_deleted: bool = row.get(13)?;
1387 let sync_enabled: bool = row.get(14)?;
1388
1389 Ok(SyncedEdgeRecord {
1390 id,
1391 session_id,
1392 source_id,
1393 target_id,
1394 edge_type,
1395 predicate,
1396 properties,
1397 weight,
1398 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1399 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1400 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1401 vector_clock,
1402 last_modified_by,
1403 is_deleted,
1404 sync_enabled,
1405 })
1406 }
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411 use super::*;
1412 use anyhow::Result;
1413 use serde_json::json;
1414
1415 fn setup_store() -> KnowledgeGraphStore {
1416 setup_store_with(|_| {})
1417 }
1418
1419 fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1420 where
1421 F: FnOnce(&Connection),
1422 {
1423 let conn = Connection::open_in_memory().expect("open in-memory database");
1424 conn.execute_batch(
1425 r#"
1426 CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1427 CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1428 CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1429 CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1430 CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1431
1432 CREATE TABLE graph_nodes (
1433 id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1434 session_id TEXT NOT NULL,
1435 node_type TEXT NOT NULL,
1436 label TEXT NOT NULL,
1437 properties TEXT NOT NULL,
1438 embedding_id BIGINT,
1439 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1440 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1441 vector_clock TEXT DEFAULT '{}',
1442 last_modified_by TEXT,
1443 is_deleted BOOLEAN DEFAULT FALSE,
1444 sync_enabled BOOLEAN DEFAULT FALSE
1445 );
1446
1447 CREATE TABLE graph_edges (
1448 id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1449 session_id TEXT NOT NULL,
1450 source_id BIGINT NOT NULL,
1451 target_id BIGINT NOT NULL,
1452 edge_type TEXT NOT NULL,
1453 predicate TEXT,
1454 properties TEXT,
1455 weight REAL DEFAULT 1.0,
1456 temporal_start TIMESTAMP,
1457 temporal_end TIMESTAMP,
1458 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1459 vector_clock TEXT DEFAULT '{}',
1460 last_modified_by TEXT,
1461 is_deleted BOOLEAN DEFAULT FALSE,
1462 sync_enabled BOOLEAN DEFAULT FALSE
1463 );
1464
1465 CREATE TABLE graph_metadata (
1466 id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1467 session_id TEXT NOT NULL,
1468 graph_name TEXT NOT NULL,
1469 is_created BOOLEAN DEFAULT FALSE,
1470 schema_version INTEGER DEFAULT 1,
1471 config TEXT,
1472 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1473 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1474 sync_enabled BOOLEAN DEFAULT FALSE,
1475 UNIQUE(session_id, graph_name)
1476 );
1477
1478 CREATE TABLE graph_changelog (
1479 id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1480 session_id TEXT NOT NULL,
1481 instance_id TEXT NOT NULL,
1482 entity_type TEXT NOT NULL,
1483 entity_id BIGINT NOT NULL,
1484 operation TEXT NOT NULL,
1485 vector_clock TEXT NOT NULL,
1486 data TEXT,
1487 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1488 );
1489
1490 CREATE TABLE graph_sync_state (
1491 instance_id TEXT NOT NULL,
1492 session_id TEXT NOT NULL,
1493 graph_name TEXT NOT NULL,
1494 vector_clock TEXT NOT NULL,
1495 last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1496 PRIMARY KEY (instance_id, session_id, graph_name)
1497 );
1498
1499 CREATE TABLE graph_tombstones (
1500 id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1501 session_id TEXT NOT NULL,
1502 entity_type TEXT NOT NULL,
1503 entity_id BIGINT NOT NULL,
1504 deleted_by TEXT NOT NULL,
1505 vector_clock TEXT NOT NULL,
1506 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1507 );
1508 "#,
1509 )
1510 .expect("create graph schema");
1511
1512 extra(&conn);
1513
1514 KnowledgeGraphStore::from_connection(conn, "test-instance")
1515 }
1516
1517 #[test]
1518 fn insert_update_delete_node_flow() -> Result<()> {
1519 let store = setup_store();
1520 let props = json!({ "kind": "repository" });
1521 let node_id =
1522 store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1523
1524 let nodes = store.list_graph_nodes("session", None, Some(10))?;
1525 assert_eq!(nodes.len(), 1);
1526 assert_eq!(nodes[0].label, "SpecAI");
1527
1528 let updated_props = json!({ "kind": "repository", "stars": 42 });
1529 store.update_graph_node(node_id, &updated_props)?;
1530 let updated = store.get_graph_node(node_id)?.expect("node exists");
1531 assert_eq!(updated.properties["stars"], 42);
1532
1533 store.delete_graph_node(node_id)?;
1534 assert!(store.get_graph_node(node_id)?.is_none());
1535 Ok(())
1536 }
1537
1538 #[test]
1539 fn create_edges_and_find_paths() -> Result<()> {
1540 let store = setup_store();
1541 let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1542 let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1543 let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1544
1545 store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1546 store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1547
1548 let path = store
1549 .find_shortest_path("session", a, c, Some(5))?
1550 .expect("path exists");
1551 assert_eq!(path.nodes.len(), 3);
1552 assert_eq!(path.edges.len(), 2);
1553 assert_eq!(path.length, 2);
1554 assert_eq!(path.nodes.first().unwrap().label, "A");
1555 assert_eq!(path.nodes.last().unwrap().label, "C");
1556
1557 let edges = store.list_graph_edges("session", None, None)?;
1558 assert_eq!(edges.len(), 2);
1559
1560 Ok(())
1561 }
1562
1563 #[test]
1564 fn traverse_neighbors_respects_direction() -> Result<()> {
1565 let store = setup_store();
1566 let alpha =
1567 store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1568 let beta =
1569 store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1570 let gamma =
1571 store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1572
1573 store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1574 store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1575
1576 let outgoing =
1577 store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1578 assert_eq!(outgoing.len(), 2);
1579 assert!(outgoing.iter().any(|node| node.label == "Beta"));
1580 assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1581
1582 let incoming =
1583 store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1584 assert_eq!(incoming.len(), 2);
1585 assert!(incoming.iter().any(|node| node.label == "Beta"));
1586 assert!(incoming.iter().any(|node| node.label == "Alpha"));
1587
1588 Ok(())
1589 }
1590
1591 #[test]
1592 fn sync_config_round_trip() -> Result<()> {
1593 let store = setup_store();
1594
1595 let saved = store.graph_set_sync_config(
1596 "session",
1597 "default",
1598 true,
1599 Some("last_write_wins"),
1600 Some(120),
1601 )?;
1602 assert!(saved.sync_enabled);
1603 assert_eq!(
1604 saved.conflict_resolution_strategy.as_deref(),
1605 Some("last_write_wins")
1606 );
1607 assert_eq!(saved.sync_interval_seconds, Some(120));
1608
1609 let fetched = store.graph_get_sync_config("session", "default")?;
1610 assert!(fetched.sync_enabled);
1611 assert_eq!(
1612 fetched.conflict_resolution_strategy.as_deref(),
1613 Some("last_write_wins")
1614 );
1615 assert_eq!(fetched.sync_interval_seconds, Some(120));
1616
1617 let defaults = store.graph_get_sync_config("other_session", "missing")?;
1618 assert!(!defaults.sync_enabled);
1619 assert_eq!(
1620 defaults.conflict_resolution_strategy.as_deref(),
1621 Some("vector_clock")
1622 );
1623 assert_eq!(defaults.sync_interval_seconds, Some(60));
1624
1625 Ok(())
1626 }
1627
1628 #[test]
1629 fn sync_state_metadata_round_trip() -> Result<()> {
1630 let store = setup_store();
1631 store.graph_sync_state_update("instance", "session", "graph", r#"{"a":1}"#)?;
1632
1633 let state = store
1634 .graph_sync_state_get_metadata("instance", "session", "graph")?
1635 .expect("state exists");
1636
1637 assert_eq!(state.vector_clock, r#"{"a":1}"#);
1638 assert!(state.last_sync_at.is_some());
1639
1640 Ok(())
1641 }
1642
1643 #[test]
1644 fn graph_conflict_listing_filters() -> Result<()> {
1645 let store = setup_store();
1646 let vc_json = VectorClock::new().to_json()?;
1647
1648 let conflict_payload = json!({
1649 "graph_name": "g1",
1650 "local_version": { "id": 1 },
1651 "remote_version": { "id": 2 },
1652 "resolution": "Test"
1653 });
1654
1655 store.graph_changelog_append(
1656 "session_one",
1657 "inst1",
1658 "node",
1659 1,
1660 "conflict",
1661 &vc_json,
1662 Some(&conflict_payload.to_string()),
1663 )?;
1664
1665 let second_payload = json!({
1666 "graph_name": "g2",
1667 "local_version": { "id": 3 },
1668 "remote_version": { "id": 4 }
1669 });
1670
1671 store.graph_changelog_append(
1672 "session_two",
1673 "inst1",
1674 "edge",
1675 2,
1676 "conflict",
1677 &vc_json,
1678 Some(&second_payload.to_string()),
1679 )?;
1680
1681 let all_conflicts = store.graph_list_conflicts(None, 10)?;
1682 assert_eq!(all_conflicts.len(), 2);
1683
1684 let session_filtered = store.graph_list_conflicts(Some("session_one"), 10)?;
1685 assert_eq!(session_filtered.len(), 1);
1686 assert_eq!(session_filtered[0].session_id, "session_one");
1687 assert_eq!(session_filtered[0].entity_id, 1);
1688
1689 Ok(())
1690 }
1691}