1use crate::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{params, Connection};
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12 conn: Arc<Mutex<Connection>>,
13 instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17 pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18 Self {
19 conn,
20 instance_id: instance_id.into(),
21 }
22 }
23
24 pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25 Self {
26 conn: Arc::new(Mutex::new(conn)),
27 instance_id: instance_id.into(),
28 }
29 }
30
31 pub fn instance_id(&self) -> &str {
32 &self.instance_id
33 }
34
35 fn conn(&self) -> MutexGuard<'_, Connection> {
36 self.conn.lock().expect("database connection poisoned")
37 }
38
39 pub fn insert_graph_node(
42 &self,
43 session_id: &str,
44 node_type: NodeType,
45 label: &str,
46 properties: &JsonValue,
47 embedding_id: Option<i64>,
48 ) -> Result<i64> {
49 let sync_enabled = self
50 .graph_get_sync_enabled(session_id, "default")
51 .unwrap_or(false);
52
53 let mut vector_clock = VectorClock::new();
54 vector_clock.increment(&self.instance_id);
55 let vc_json = vector_clock.to_json()?;
56
57 let conn = self.conn();
58
59 let mut stmt = conn.prepare(
60 "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61 vector_clock, last_modified_by, sync_enabled)
62 VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63 )?;
64 let id: i64 = stmt.query_row(
65 params![
66 session_id,
67 node_type.as_str(),
68 label,
69 properties.to_string(),
70 embedding_id,
71 vc_json,
72 self.instance_id,
73 sync_enabled,
74 ],
75 |row| row.get(0),
76 )?;
77
78 if sync_enabled {
79 let node_data = serde_json::json!({
80 "id": id,
81 "session_id": session_id,
82 "node_type": node_type.as_str(),
83 "label": label,
84 "properties": properties,
85 "embedding_id": embedding_id,
86 });
87
88 self.graph_changelog_append(
89 session_id,
90 &self.instance_id,
91 "node",
92 id,
93 "create",
94 &vc_json,
95 Some(&node_data.to_string()),
96 )?;
97 }
98
99 Ok(id)
100 }
101
102 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103 let conn = self.conn();
104 let mut stmt = conn.prepare(
105 "SELECT id, session_id, node_type, label, properties, embedding_id,
106 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107 FROM graph_nodes WHERE id = ?",
108 )?;
109 let mut rows = stmt.query(params![node_id])?;
110 if let Some(row) = rows.next()? {
111 Ok(Some(Self::row_to_graph_node(row)?))
112 } else {
113 Ok(None)
114 }
115 }
116
117 pub fn list_graph_nodes(
118 &self,
119 session_id: &str,
120 node_type: Option<NodeType>,
121 limit: Option<i64>,
122 ) -> Result<Vec<GraphNode>> {
123 let conn = self.conn();
124
125 let nodes = if let Some(nt) = node_type {
126 let mut stmt = conn.prepare(
127 "SELECT id, session_id, node_type, label, properties, embedding_id,
128 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130 ORDER BY id DESC LIMIT ?",
131 )?;
132 let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133 Self::collect_graph_nodes(query)?
134 } else {
135 let mut stmt = conn.prepare(
136 "SELECT id, session_id, node_type, label, properties, embedding_id,
137 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138 FROM graph_nodes WHERE session_id = ?
139 ORDER BY id DESC LIMIT ?",
140 )?;
141 let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142 Self::collect_graph_nodes(query)?
143 };
144
145 Ok(nodes)
146 }
147
148 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149 let conn = self.conn();
150 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152 Ok(count)
153 }
154
155 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156 let conn = self.conn();
157
158 let mut stmt = conn.prepare(
159 "SELECT session_id, node_type, label, vector_clock, sync_enabled
160 FROM graph_nodes WHERE id = ?",
161 )?;
162
163 let (session_id, node_type, label, current_vc_json, sync_enabled): (
164 String,
165 String,
166 String,
167 Option<String>,
168 bool,
169 ) = stmt.query_row(params![node_id], |row| {
170 Ok((
171 row.get(0)?,
172 row.get(1)?,
173 row.get(2)?,
174 row.get(3)?,
175 row.get(4).unwrap_or(false),
176 ))
177 })?;
178
179 let mut vector_clock = if let Some(vc_json) = current_vc_json {
180 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181 } else {
182 VectorClock::new()
183 };
184 vector_clock.increment(&self.instance_id);
185 let vc_json = vector_clock.to_json()?;
186
187 conn.execute(
188 "UPDATE graph_nodes
189 SET properties = ?,
190 vector_clock = ?,
191 last_modified_by = ?,
192 updated_at = CURRENT_TIMESTAMP
193 WHERE id = ?",
194 params![properties.to_string(), vc_json, self.instance_id, node_id],
195 )?;
196
197 if sync_enabled {
198 let node_data = serde_json::json!({
199 "id": node_id,
200 "session_id": session_id,
201 "node_type": node_type,
202 "label": label,
203 "properties": properties,
204 });
205
206 self.graph_changelog_append(
207 &session_id,
208 &self.instance_id,
209 "node",
210 node_id,
211 "update",
212 &vc_json,
213 Some(&node_data.to_string()),
214 )?;
215 }
216
217 Ok(())
218 }
219
220 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221 let conn = self.conn();
222
223 let mut stmt = conn.prepare(
224 "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225 FROM graph_nodes WHERE id = ?",
226 )?;
227
228 let result = stmt.query_row(params![node_id], |row| {
229 Ok((
230 row.get::<_, String>(0)?,
231 row.get::<_, String>(1)?,
232 row.get::<_, String>(2)?,
233 row.get::<_, String>(3)?,
234 row.get::<_, Option<String>>(4)?,
235 row.get::<_, bool>(5).unwrap_or(false),
236 ))
237 });
238
239 if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240 result
241 {
242 if sync_enabled {
243 let mut vector_clock = if let Some(vc_json) = current_vc_json {
244 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245 } else {
246 VectorClock::new()
247 };
248 vector_clock.increment(&self.instance_id);
249 let vc_json = vector_clock.to_json()?;
250
251 conn.execute(
252 "INSERT INTO graph_tombstones
253 (session_id, entity_type, entity_id, deleted_by, vector_clock)
254 VALUES (?, ?, ?, ?, ?)",
255 params![session_id, "node", node_id, self.instance_id, vc_json],
256 )?;
257
258 let node_data = serde_json::json!({
259 "id": node_id,
260 "session_id": session_id,
261 "node_type": node_type,
262 "label": label,
263 "properties": properties,
264 });
265
266 self.graph_changelog_append(
267 &session_id,
268 &self.instance_id,
269 "node",
270 node_id,
271 "delete",
272 &vc_json,
273 Some(&node_data.to_string()),
274 )?;
275 }
276 }
277
278 conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279 Ok(())
280 }
281
282 pub fn insert_graph_edge(
285 &self,
286 session_id: &str,
287 source_id: i64,
288 target_id: i64,
289 edge_type: EdgeType,
290 predicate: Option<&str>,
291 properties: Option<&JsonValue>,
292 weight: f32,
293 ) -> Result<i64> {
294 let sync_enabled = self
295 .graph_get_sync_enabled(session_id, "default")
296 .unwrap_or(false);
297
298 let mut vector_clock = VectorClock::new();
299 vector_clock.increment(&self.instance_id);
300 let vc_json = vector_clock.to_json()?;
301
302 let conn = self.conn();
303
304 let mut stmt = conn.prepare(
305 "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
306 vector_clock, last_modified_by, sync_enabled)
307 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
308 )?;
309 let props_str = properties.map(|p| p.to_string());
310 let id: i64 = stmt.query_row(
311 params![
312 session_id,
313 source_id,
314 target_id,
315 edge_type.as_str(),
316 predicate,
317 props_str,
318 weight,
319 vc_json,
320 self.instance_id,
321 sync_enabled,
322 ],
323 |row| row.get(0),
324 )?;
325
326 if sync_enabled {
327 let edge_data = serde_json::json!({
328 "id": id,
329 "session_id": session_id,
330 "source_id": source_id,
331 "target_id": target_id,
332 "edge_type": edge_type.as_str(),
333 "predicate": predicate,
334 "properties": properties,
335 "weight": weight,
336 });
337
338 self.graph_changelog_append(
339 session_id,
340 &self.instance_id,
341 "edge",
342 id,
343 "insert",
344 &vc_json,
345 Some(&edge_data.to_string()),
346 )?;
347 }
348
349 Ok(id)
350 }
351
352 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
353 let conn = self.conn();
354 let mut stmt = conn.prepare(
355 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
356 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
357 FROM graph_edges WHERE id = ?",
358 )?;
359 let mut rows = stmt.query(params![edge_id])?;
360 if let Some(row) = rows.next()? {
361 Ok(Some(Self::row_to_graph_edge(row)?))
362 } else {
363 Ok(None)
364 }
365 }
366
367 pub fn list_graph_edges(
368 &self,
369 session_id: &str,
370 source_id: Option<i64>,
371 target_id: Option<i64>,
372 ) -> Result<Vec<GraphEdge>> {
373 let conn = self.conn();
374
375 let edges = match (source_id, target_id) {
376 (Some(src), Some(tgt)) => {
377 let mut stmt = conn.prepare(
378 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
379 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
380 FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
381 )?;
382 let query = stmt.query(params![session_id, src, tgt])?;
383 Self::collect_graph_edges(query)?
384 }
385 (Some(src), None) => {
386 let mut stmt = conn.prepare(
387 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
388 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
389 FROM graph_edges WHERE session_id = ? AND source_id = ?",
390 )?;
391 let query = stmt.query(params![session_id, src])?;
392 Self::collect_graph_edges(query)?
393 }
394 (None, Some(tgt)) => {
395 let mut stmt = conn.prepare(
396 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
397 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
398 FROM graph_edges WHERE session_id = ? AND target_id = ?",
399 )?;
400 let query = stmt.query(params![session_id, tgt])?;
401 Self::collect_graph_edges(query)?
402 }
403 (None, None) => {
404 let mut stmt = conn.prepare(
405 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
406 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
407 FROM graph_edges WHERE session_id = ?",
408 )?;
409 let query = stmt.query(params![session_id])?;
410 Self::collect_graph_edges(query)?
411 }
412 };
413
414 Ok(edges)
415 }
416
417 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
418 let conn = self.conn();
419 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
420 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
421 Ok(count)
422 }
423
424 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
425 let conn = self.conn();
426
427 let mut stmt = conn.prepare(
428 "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
429 vector_clock, sync_enabled
430 FROM graph_edges WHERE id = ?",
431 )?;
432
433 let result = stmt.query_row(params![edge_id], |row| {
434 Ok((
435 row.get::<_, String>(0)?,
436 row.get::<_, i64>(1)?,
437 row.get::<_, i64>(2)?,
438 row.get::<_, String>(3)?,
439 row.get::<_, Option<String>>(4)?,
440 row.get::<_, Option<String>>(5)?,
441 row.get::<_, f32>(6)?,
442 row.get::<_, Option<String>>(7)?,
443 row.get::<_, bool>(8).unwrap_or(false),
444 ))
445 });
446
447 if let Ok((
448 session_id,
449 source_id,
450 target_id,
451 edge_type,
452 predicate,
453 properties,
454 weight,
455 current_vc_json,
456 sync_enabled,
457 )) = result
458 {
459 if sync_enabled {
460 let mut vector_clock = if let Some(vc_json) = current_vc_json {
461 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
462 } else {
463 VectorClock::new()
464 };
465 vector_clock.increment(&self.instance_id);
466 let vc_json = vector_clock.to_json()?;
467
468 conn.execute(
469 "INSERT INTO graph_tombstones
470 (session_id, entity_type, entity_id, deleted_by, vector_clock)
471 VALUES (?, ?, ?, ?, ?)",
472 params![session_id, "edge", edge_id, self.instance_id, vc_json],
473 )?;
474
475 let edge_data = serde_json::json!({
476 "id": edge_id,
477 "session_id": session_id,
478 "source_id": source_id,
479 "target_id": target_id,
480 "edge_type": edge_type,
481 "predicate": predicate,
482 "properties": properties,
483 "weight": weight,
484 });
485
486 self.graph_changelog_append(
487 &session_id,
488 &self.instance_id,
489 "edge",
490 edge_id,
491 "delete",
492 &vc_json,
493 Some(&edge_data.to_string()),
494 )?;
495 }
496 }
497
498 conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
499 Ok(())
500 }
501
502 pub fn find_shortest_path(
505 &self,
506 session_id: &str,
507 source_id: i64,
508 target_id: i64,
509 max_hops: Option<usize>,
510 ) -> Result<Option<GraphPath>> {
511 let max_depth = max_hops.unwrap_or(10);
512
513 let mut visited = HashSet::new();
514 let mut queue = VecDeque::new();
515 let mut parent_map = HashMap::new();
516
517 queue.push_back((source_id, 0));
518 visited.insert(source_id);
519
520 while let Some((current_id, depth)) = queue.pop_front() {
521 if current_id == target_id {
522 let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
523 return Ok(Some(path));
524 }
525
526 if depth >= max_depth {
527 continue;
528 }
529
530 let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
531 for edge in edges {
532 let target = edge.target_id;
533 if !visited.contains(&target) {
534 visited.insert(target);
535 parent_map.insert(target, (current_id, edge));
536 queue.push_back((target, depth + 1));
537 }
538 }
539 }
540
541 Ok(None)
542 }
543
544 pub fn traverse_neighbors(
545 &self,
546 session_id: &str,
547 node_id: i64,
548 direction: TraversalDirection,
549 depth: usize,
550 ) -> Result<Vec<GraphNode>> {
551 if depth == 0 {
552 return Ok(vec![]);
553 }
554
555 let mut visited = HashSet::new();
556 let mut result = Vec::new();
557 let mut queue = VecDeque::new();
558
559 queue.push_back((node_id, 0));
560 visited.insert(node_id);
561
562 while let Some((current_id, current_depth)) = queue.pop_front() {
563 if current_depth > 0 {
564 if let Some(node) = self.get_graph_node(current_id)? {
565 result.push(node);
566 }
567 }
568
569 if current_depth >= depth {
570 continue;
571 }
572
573 let edges = match direction {
574 TraversalDirection::Outgoing => {
575 self.list_graph_edges(session_id, Some(current_id), None)?
576 }
577 TraversalDirection::Incoming => {
578 self.list_graph_edges(session_id, None, Some(current_id))?
579 }
580 TraversalDirection::Both => {
581 let mut out_edges =
582 self.list_graph_edges(session_id, Some(current_id), None)?;
583 let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
584 out_edges.extend(in_edges);
585 out_edges
586 }
587 };
588
589 for edge in edges {
590 let next_id = match direction {
591 TraversalDirection::Outgoing => edge.target_id,
592 TraversalDirection::Incoming => edge.source_id,
593 TraversalDirection::Both => {
594 if edge.source_id == current_id {
595 edge.target_id
596 } else {
597 edge.source_id
598 }
599 }
600 };
601
602 if !visited.contains(&next_id) {
603 visited.insert(next_id);
604 queue.push_back((next_id, current_depth + 1));
605 }
606 }
607 }
608
609 Ok(result)
610 }
611
612 fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
613 let id: i64 = row.get(0)?;
614 let session_id: String = row.get(1)?;
615 let node_type: String = row.get(2)?;
616 let label: String = row.get(3)?;
617 let properties: String = row.get(4)?;
618 let embedding_id: Option<i64> = row.get(5)?;
619 let created_at: String = row.get(6)?;
620 let updated_at: String = row.get(7)?;
621
622 Ok(GraphNode {
623 id,
624 session_id,
625 node_type: NodeType::from_str(&node_type),
626 label,
627 properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
628 embedding_id,
629 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
630 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
631 })
632 }
633
634 fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
635 let id: i64 = row.get(0)?;
636 let session_id: String = row.get(1)?;
637 let source_id: i64 = row.get(2)?;
638 let target_id: i64 = row.get(3)?;
639 let edge_type: String = row.get(4)?;
640 let predicate: Option<String> = row.get(5)?;
641 let properties: Option<String> = row.get(6)?;
642 let weight: f32 = row.get(7)?;
643 let temporal_start: Option<String> = row.get(8)?;
644 let temporal_end: Option<String> = row.get(9)?;
645 let created_at: String = row.get(10)?;
646
647 Ok(GraphEdge {
648 id,
649 session_id,
650 source_id,
651 target_id,
652 edge_type: EdgeType::from_str(&edge_type),
653 predicate,
654 properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
655 weight,
656 temporal_start: temporal_start.and_then(|s| s.parse().ok()),
657 temporal_end: temporal_end.and_then(|s| s.parse().ok()),
658 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
659 })
660 }
661
662 fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
663 let mut nodes = Vec::new();
664 while let Some(row) = rows.next()? {
665 nodes.push(Self::row_to_graph_node(row)?);
666 }
667 Ok(nodes)
668 }
669
670 fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
671 let mut edges = Vec::new();
672 while let Some(row) = rows.next()? {
673 edges.push(Self::row_to_graph_edge(row)?);
674 }
675 Ok(edges)
676 }
677
678 fn reconstruct_path(
679 &self,
680 parent_map: &HashMap<i64, (i64, GraphEdge)>,
681 source_id: i64,
682 target_id: i64,
683 ) -> Result<GraphPath> {
684 let mut path_edges = Vec::new();
685 let mut path_nodes = Vec::new();
686 let mut current = target_id;
687 let mut total_weight = 0.0;
688
689 while current != source_id {
690 if let Some((parent, edge)) = parent_map.get(¤t) {
691 path_edges.push(edge.clone());
692 total_weight += edge.weight;
693 current = *parent;
694 } else {
695 break;
696 }
697 }
698
699 path_edges.reverse();
700
701 if let Some(node) = self.get_graph_node(source_id)? {
702 path_nodes.push(node);
703 }
704 for edge in &path_edges {
705 if let Some(node) = self.get_graph_node(edge.target_id)? {
706 path_nodes.push(node);
707 }
708 }
709
710 Ok(GraphPath {
711 length: path_edges.len(),
712 weight: total_weight,
713 nodes: path_nodes,
714 edges: path_edges,
715 })
716 }
717
718 pub fn graph_changelog_append(
721 &self,
722 session_id: &str,
723 instance_id: &str,
724 entity_type: &str,
725 entity_id: i64,
726 operation: &str,
727 vector_clock: &str,
728 data: Option<&str>,
729 ) -> Result<i64> {
730 let conn = self.conn();
731 conn.execute(
732 "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
733 VALUES (?, ?, ?, ?, ?, ?, ?)",
734 params![session_id, instance_id, entity_type, entity_id, operation, vector_clock, data],
735 )?;
736 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
737 Ok(id)
738 }
739
740 pub fn graph_changelog_get_since(
741 &self,
742 session_id: &str,
743 since_timestamp: &str,
744 ) -> Result<Vec<ChangelogEntry>> {
745 let conn = self.conn();
746 let mut stmt = conn.prepare(
747 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
748 FROM graph_changelog
749 WHERE session_id = ? AND created_at > ?
750 ORDER BY created_at ASC",
751 )?;
752 let mut rows = stmt.query(params![session_id, since_timestamp])?;
753 let mut entries = Vec::new();
754 while let Some(row) = rows.next()? {
755 entries.push(ChangelogEntry::from_row(row)?);
756 }
757 Ok(entries)
758 }
759
760 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
761 let conn = self.conn();
762 let cutoff = Utc::now() - Duration::days(days_to_keep);
763 let cutoff_str = cutoff.to_rfc3339();
764 let deleted = conn.execute(
765 "DELETE FROM graph_changelog WHERE created_at < ?",
766 params![cutoff_str],
767 )?;
768 Ok(deleted)
769 }
770
771 pub fn graph_sync_state_get(
772 &self,
773 instance_id: &str,
774 session_id: &str,
775 graph_name: &str,
776 ) -> Result<Option<String>> {
777 let conn = self.conn();
778 let result: Result<String, _> = conn.query_row(
779 "SELECT vector_clock FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
780 params![instance_id, session_id, graph_name],
781 |row| row.get(0),
782 );
783 match result {
784 Ok(vc) => Ok(Some(vc)),
785 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
786 Err(e) => Err(e.into()),
787 }
788 }
789
790 pub fn graph_sync_state_update(
791 &self,
792 instance_id: &str,
793 session_id: &str,
794 graph_name: &str,
795 vector_clock: &str,
796 ) -> Result<()> {
797 let conn = self.conn();
798 conn.execute("BEGIN TRANSACTION", params![])?;
799 conn.execute(
800 "DELETE FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
801 params![instance_id, session_id, graph_name],
802 )?;
803 conn.execute(
804 "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock) VALUES (?, ?, ?, ?)",
805 params![instance_id, session_id, graph_name, vector_clock],
806 )?;
807 conn.execute("COMMIT", params![])?;
808 Ok(())
809 }
810
811 pub fn graph_set_sync_enabled(
812 &self,
813 session_id: &str,
814 graph_name: &str,
815 enabled: bool,
816 ) -> Result<()> {
817 let conn = self.conn();
818 conn.execute(
820 "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
821 VALUES (?, ?, ?)
822 ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
823 params![session_id, graph_name, enabled],
824 )?;
825 Ok(())
826 }
827
828 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
829 let conn = self.conn();
830 let result: Result<bool, _> = conn.query_row(
831 "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
832 params![session_id, graph_name],
833 |row| row.get(0),
834 );
835 match result {
836 Ok(enabled) => Ok(enabled),
837 Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
838 Err(e) => Err(e.into()),
839 }
840 }
841
842 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
843 let conn = self.conn();
844 let mut stmt = conn.prepare(
845 "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
846 UNION
847 SELECT DISTINCT 'default' as graph_name
848 FROM graph_nodes WHERE session_id = ?
849 ORDER BY graph_name",
850 )?;
851
852 let mut graphs = Vec::new();
853 let mut rows = stmt.query(params![session_id, session_id])?;
854 while let Some(row) = rows.next()? {
855 let graph_name: String = row.get(0)?;
856 graphs.push(graph_name);
857 }
858
859 if graphs.is_empty() {
860 let node_count: i64 = conn.query_row(
861 "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
862 params![session_id],
863 |row| row.get(0),
864 )?;
865 if node_count > 0 {
866 graphs.push("default".to_string());
867 }
868 }
869
870 Ok(graphs)
871 }
872
873 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
875 let conn = self.conn();
876 let mut stmt = conn.prepare(
877 "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
878 )?;
879
880 let mut results = Vec::new();
881 let mut rows = stmt.query(params![])?;
882 while let Some(row) = rows.next()? {
883 let session_id: String = row.get(0)?;
884 let graph_name: String = row.get(1)?;
885 results.push((session_id, graph_name));
886 }
887
888 Ok(results)
889 }
890
891 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
892 let conn = self.conn();
893 let result: Result<SyncedNodeRecord, _> = conn.query_row(
894 "SELECT id, session_id, node_type, label, properties, embedding_id,
895 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
896 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
897 FROM graph_nodes WHERE id = ?",
898 params![node_id],
899 SyncedNodeRecord::from_row,
900 );
901 match result {
902 Ok(node) => Ok(Some(node)),
903 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
904 Err(e) => Err(e.into()),
905 }
906 }
907
908 pub fn graph_list_nodes_with_sync(
909 &self,
910 session_id: &str,
911 sync_enabled_only: bool,
912 include_deleted: bool,
913 ) -> Result<Vec<SyncedNodeRecord>> {
914 let conn = self.conn();
915 let mut query = String::from(
916 "SELECT id, session_id, node_type, label, properties, embedding_id,
917 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
918 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
919 FROM graph_nodes WHERE session_id = ?",
920 );
921
922 if sync_enabled_only {
923 query.push_str(" AND sync_enabled = TRUE");
924 }
925 if !include_deleted {
926 query.push_str(" AND is_deleted = FALSE");
927 }
928 query.push_str(" ORDER BY created_at ASC");
929
930 let mut stmt = conn.prepare(&query)?;
931 let mut rows = stmt.query(params![session_id])?;
932 let mut nodes = Vec::new();
933 while let Some(row) = rows.next()? {
934 nodes.push(SyncedNodeRecord::from_row(row)?);
935 }
936 Ok(nodes)
937 }
938
939 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
940 let conn = self.conn();
941 let result: Result<SyncedEdgeRecord, _> = conn.query_row(
942 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
943 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
944 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
945 FROM graph_edges WHERE id = ?",
946 params![edge_id],
947 SyncedEdgeRecord::from_row,
948 );
949 match result {
950 Ok(edge) => Ok(Some(edge)),
951 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
952 Err(e) => Err(e.into()),
953 }
954 }
955
956 pub fn graph_list_edges_with_sync(
957 &self,
958 session_id: &str,
959 sync_enabled_only: bool,
960 include_deleted: bool,
961 ) -> Result<Vec<SyncedEdgeRecord>> {
962 let conn = self.conn();
963 let mut query = String::from(
964 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
965 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
966 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
967 FROM graph_edges WHERE session_id = ?",
968 );
969
970 if sync_enabled_only {
971 query.push_str(" AND sync_enabled = TRUE");
972 }
973 if !include_deleted {
974 query.push_str(" AND is_deleted = FALSE");
975 }
976 query.push_str(" ORDER BY created_at ASC");
977
978 let mut stmt = conn.prepare(&query)?;
979 let mut rows = stmt.query(params![session_id])?;
980 let mut edges = Vec::new();
981 while let Some(row) = rows.next()? {
982 edges.push(SyncedEdgeRecord::from_row(row)?);
983 }
984 Ok(edges)
985 }
986
987 pub fn graph_update_node_sync_metadata(
988 &self,
989 node_id: i64,
990 vector_clock: &str,
991 last_modified_by: &str,
992 sync_enabled: bool,
993 ) -> Result<()> {
994 let conn = self.conn();
995 conn.execute(
996 "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
997 WHERE id = ?",
998 params![vector_clock, last_modified_by, sync_enabled, node_id],
999 )?;
1000 Ok(())
1001 }
1002
1003 pub fn graph_update_edge_sync_metadata(
1004 &self,
1005 edge_id: i64,
1006 vector_clock: &str,
1007 last_modified_by: &str,
1008 sync_enabled: bool,
1009 ) -> Result<()> {
1010 let conn = self.conn();
1011 conn.execute(
1012 "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1013 WHERE id = ?",
1014 params![vector_clock, last_modified_by, sync_enabled, edge_id],
1015 )?;
1016 Ok(())
1017 }
1018
1019 pub fn graph_mark_node_deleted(
1020 &self,
1021 node_id: i64,
1022 vector_clock: &str,
1023 deleted_by: &str,
1024 ) -> Result<()> {
1025 let conn = self.conn();
1026 conn.execute(
1027 "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1028 WHERE id = ?",
1029 params![vector_clock, deleted_by, node_id],
1030 )?;
1031 Ok(())
1032 }
1033
1034 pub fn graph_mark_edge_deleted(
1035 &self,
1036 edge_id: i64,
1037 vector_clock: &str,
1038 deleted_by: &str,
1039 ) -> Result<()> {
1040 let conn = self.conn();
1041 conn.execute(
1042 "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1043 WHERE id = ?",
1044 params![vector_clock, deleted_by, edge_id],
1045 )?;
1046 Ok(())
1047 }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub struct ChangelogEntry {
1052 pub id: i64,
1053 pub session_id: String,
1054 pub instance_id: String,
1055 pub entity_type: String,
1056 pub entity_id: i64,
1057 pub operation: String,
1058 pub vector_clock: String,
1059 pub data: Option<String>,
1060 pub created_at: DateTime<Utc>,
1061}
1062
1063impl ChangelogEntry {
1064 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1065 let id: i64 = row.get(0)?;
1066 let session_id: String = row.get(1)?;
1067 let instance_id: String = row.get(2)?;
1068 let entity_type: String = row.get(3)?;
1069 let entity_id: i64 = row.get(4)?;
1070 let operation: String = row.get(5)?;
1071 let vector_clock: String = row.get(6)?;
1072 let data: Option<String> = row.get(7)?;
1073 let created_at_str: String = row.get(8)?;
1074
1075 Ok(ChangelogEntry {
1076 id,
1077 session_id,
1078 instance_id,
1079 entity_type,
1080 entity_id,
1081 operation,
1082 vector_clock,
1083 data,
1084 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1085 })
1086 }
1087}
1088
1089#[derive(Debug, Clone)]
1090pub struct SyncedNodeRecord {
1091 pub id: i64,
1092 pub session_id: String,
1093 pub node_type: String,
1094 pub label: String,
1095 pub properties: JsonValue,
1096 pub embedding_id: Option<i64>,
1097 pub created_at: DateTime<Utc>,
1098 pub updated_at: DateTime<Utc>,
1099 pub vector_clock: String,
1100 pub last_modified_by: Option<String>,
1101 pub is_deleted: bool,
1102 pub sync_enabled: bool,
1103}
1104
1105impl SyncedNodeRecord {
1106 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1107 let id: i64 = row.get(0)?;
1108 let session_id: String = row.get(1)?;
1109 let node_type: String = row.get(2)?;
1110 let label: String = row.get(3)?;
1111 let properties_str: String = row.get(4)?;
1112 let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1113 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1114 })?;
1115 let embedding_id: Option<i64> = row.get(5)?;
1116 let created_at_str: String = row.get(6)?;
1117 let updated_at_str: String = row.get(7)?;
1118 let vector_clock: String = row.get(8)?;
1119 let last_modified_by: Option<String> = row.get(9)?;
1120 let is_deleted: bool = row.get(10)?;
1121 let sync_enabled: bool = row.get(11)?;
1122
1123 Ok(SyncedNodeRecord {
1124 id,
1125 session_id,
1126 node_type,
1127 label,
1128 properties,
1129 embedding_id,
1130 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1131 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1132 vector_clock,
1133 last_modified_by,
1134 is_deleted,
1135 sync_enabled,
1136 })
1137 }
1138}
1139
1140#[derive(Debug, Clone)]
1141pub struct SyncedEdgeRecord {
1142 pub id: i64,
1143 pub session_id: String,
1144 pub source_id: i64,
1145 pub target_id: i64,
1146 pub edge_type: String,
1147 pub predicate: Option<String>,
1148 pub properties: Option<JsonValue>,
1149 pub weight: f32,
1150 pub temporal_start: Option<DateTime<Utc>>,
1151 pub temporal_end: Option<DateTime<Utc>>,
1152 pub created_at: DateTime<Utc>,
1153 pub vector_clock: String,
1154 pub last_modified_by: Option<String>,
1155 pub is_deleted: bool,
1156 pub sync_enabled: bool,
1157}
1158
1159impl SyncedEdgeRecord {
1160 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1161 let id: i64 = row.get(0)?;
1162 let session_id: String = row.get(1)?;
1163 let source_id: i64 = row.get(2)?;
1164 let target_id: i64 = row.get(3)?;
1165 let edge_type: String = row.get(4)?;
1166 let predicate: Option<String> = row.get(5)?;
1167 let properties_str: Option<String> = row.get(6)?;
1168 let properties: Option<JsonValue> = properties_str
1169 .as_ref()
1170 .and_then(|s| serde_json::from_str(s).ok());
1171 let weight: f32 = row.get(7)?;
1172 let temporal_start_str: Option<String> = row.get(8)?;
1173 let temporal_end_str: Option<String> = row.get(9)?;
1174 let created_at_str: String = row.get(10)?;
1175 let vector_clock: String = row.get(11)?;
1176 let last_modified_by: Option<String> = row.get(12)?;
1177 let is_deleted: bool = row.get(13)?;
1178 let sync_enabled: bool = row.get(14)?;
1179
1180 Ok(SyncedEdgeRecord {
1181 id,
1182 session_id,
1183 source_id,
1184 target_id,
1185 edge_type,
1186 predicate,
1187 properties,
1188 weight,
1189 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1190 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1191 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1192 vector_clock,
1193 last_modified_by,
1194 is_deleted,
1195 sync_enabled,
1196 })
1197 }
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202 use super::*;
1203 use anyhow::Result;
1204 use serde_json::json;
1205
1206 fn setup_store() -> KnowledgeGraphStore {
1207 setup_store_with(|_| {})
1208 }
1209
1210 fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1211 where
1212 F: FnOnce(&Connection),
1213 {
1214 let conn = Connection::open_in_memory().expect("open in-memory database");
1215 conn.execute_batch(
1216 r#"
1217 CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1218 CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1219 CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1220 CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1221 CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1222
1223 CREATE TABLE graph_nodes (
1224 id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1225 session_id TEXT NOT NULL,
1226 node_type TEXT NOT NULL,
1227 label TEXT NOT NULL,
1228 properties TEXT NOT NULL,
1229 embedding_id BIGINT,
1230 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1231 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1232 vector_clock TEXT DEFAULT '{}',
1233 last_modified_by TEXT,
1234 is_deleted BOOLEAN DEFAULT FALSE,
1235 sync_enabled BOOLEAN DEFAULT FALSE
1236 );
1237
1238 CREATE TABLE graph_edges (
1239 id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1240 session_id TEXT NOT NULL,
1241 source_id BIGINT NOT NULL,
1242 target_id BIGINT NOT NULL,
1243 edge_type TEXT NOT NULL,
1244 predicate TEXT,
1245 properties TEXT,
1246 weight REAL DEFAULT 1.0,
1247 temporal_start TIMESTAMP,
1248 temporal_end TIMESTAMP,
1249 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1250 vector_clock TEXT DEFAULT '{}',
1251 last_modified_by TEXT,
1252 is_deleted BOOLEAN DEFAULT FALSE,
1253 sync_enabled BOOLEAN DEFAULT FALSE
1254 );
1255
1256 CREATE TABLE graph_metadata (
1257 id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1258 session_id TEXT NOT NULL,
1259 graph_name TEXT NOT NULL,
1260 sync_enabled BOOLEAN DEFAULT FALSE
1261 );
1262
1263 CREATE TABLE graph_changelog (
1264 id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1265 session_id TEXT NOT NULL,
1266 instance_id TEXT NOT NULL,
1267 entity_type TEXT NOT NULL,
1268 entity_id BIGINT NOT NULL,
1269 operation TEXT NOT NULL,
1270 vector_clock TEXT NOT NULL,
1271 data TEXT,
1272 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1273 );
1274
1275 CREATE TABLE graph_sync_state (
1276 instance_id TEXT NOT NULL,
1277 session_id TEXT NOT NULL,
1278 graph_name TEXT NOT NULL,
1279 vector_clock TEXT NOT NULL,
1280 last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1281 );
1282
1283 CREATE TABLE graph_tombstones (
1284 id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1285 session_id TEXT NOT NULL,
1286 entity_type TEXT NOT NULL,
1287 entity_id BIGINT NOT NULL,
1288 deleted_by TEXT NOT NULL,
1289 vector_clock TEXT NOT NULL,
1290 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1291 );
1292 "#,
1293 )
1294 .expect("create graph schema");
1295
1296 extra(&conn);
1297
1298 KnowledgeGraphStore::from_connection(conn, "test-instance")
1299 }
1300
1301 #[test]
1302 fn insert_update_delete_node_flow() -> Result<()> {
1303 let store = setup_store();
1304 let props = json!({ "kind": "repository" });
1305 let node_id =
1306 store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1307
1308 let nodes = store.list_graph_nodes("session", None, Some(10))?;
1309 assert_eq!(nodes.len(), 1);
1310 assert_eq!(nodes[0].label, "SpecAI");
1311
1312 let updated_props = json!({ "kind": "repository", "stars": 42 });
1313 store.update_graph_node(node_id, &updated_props)?;
1314 let updated = store.get_graph_node(node_id)?.expect("node exists");
1315 assert_eq!(updated.properties["stars"], 42);
1316
1317 store.delete_graph_node(node_id)?;
1318 assert!(store.get_graph_node(node_id)?.is_none());
1319 Ok(())
1320 }
1321
1322 #[test]
1323 fn create_edges_and_find_paths() -> Result<()> {
1324 let store = setup_store();
1325 let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1326 let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1327 let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1328
1329 store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1330 store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1331
1332 let path = store
1333 .find_shortest_path("session", a, c, Some(5))?
1334 .expect("path exists");
1335 assert_eq!(path.nodes.len(), 3);
1336 assert_eq!(path.edges.len(), 2);
1337 assert_eq!(path.length, 2);
1338 assert_eq!(path.nodes.first().unwrap().label, "A");
1339 assert_eq!(path.nodes.last().unwrap().label, "C");
1340
1341 let edges = store.list_graph_edges("session", None, None)?;
1342 assert_eq!(edges.len(), 2);
1343
1344 Ok(())
1345 }
1346
1347 #[test]
1348 fn traverse_neighbors_respects_direction() -> Result<()> {
1349 let store = setup_store();
1350 let alpha =
1351 store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1352 let beta =
1353 store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1354 let gamma =
1355 store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1356
1357 store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1358 store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1359
1360 let outgoing =
1361 store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1362 assert_eq!(outgoing.len(), 2);
1363 assert!(outgoing.iter().any(|node| node.label == "Beta"));
1364 assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1365
1366 let incoming =
1367 store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1368 assert_eq!(incoming.len(), 2);
1369 assert!(incoming.iter().any(|node| node.label == "Beta"));
1370 assert!(incoming.iter().any(|node| node.label == "Alpha"));
1371
1372 Ok(())
1373 }
1374}