spec_ai_knowledge_graph/
graph_store.rs

1use crate::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{params, Connection};
6use serde_json::{Map, Value as JsonValue};
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12    conn: Arc<Mutex<Connection>>,
13    instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17    pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18        Self {
19            conn,
20            instance_id: instance_id.into(),
21        }
22    }
23
24    pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25        Self {
26            conn: Arc::new(Mutex::new(conn)),
27            instance_id: instance_id.into(),
28        }
29    }
30
31    pub fn instance_id(&self) -> &str {
32        &self.instance_id
33    }
34
35    fn conn(&self) -> MutexGuard<'_, Connection> {
36        self.conn.lock().expect("database connection poisoned")
37    }
38
39    // ---------- Graph Node Operations ----------
40
41    pub fn insert_graph_node(
42        &self,
43        session_id: &str,
44        node_type: NodeType,
45        label: &str,
46        properties: &JsonValue,
47        embedding_id: Option<i64>,
48    ) -> Result<i64> {
49        let sync_enabled = self
50            .graph_get_sync_enabled(session_id, "default")
51            .unwrap_or(false);
52
53        let mut vector_clock = VectorClock::new();
54        vector_clock.increment(&self.instance_id);
55        let vc_json = vector_clock.to_json()?;
56
57        let conn = self.conn();
58
59        let mut stmt = conn.prepare(
60            "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61                                     vector_clock, last_modified_by, sync_enabled)
62             VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63        )?;
64        let id: i64 = stmt.query_row(
65            params![
66                session_id,
67                node_type.as_str(),
68                label,
69                properties.to_string(),
70                embedding_id,
71                vc_json,
72                self.instance_id,
73                sync_enabled,
74            ],
75            |row| row.get(0),
76        )?;
77
78        if sync_enabled {
79            let node_data = serde_json::json!({
80                "id": id,
81                "session_id": session_id,
82                "node_type": node_type.as_str(),
83                "label": label,
84                "properties": properties,
85                "embedding_id": embedding_id,
86            });
87
88            self.graph_changelog_append(
89                session_id,
90                &self.instance_id,
91                "node",
92                id,
93                "create",
94                &vc_json,
95                Some(&node_data.to_string()),
96            )?;
97        }
98
99        Ok(id)
100    }
101
102    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103        let conn = self.conn();
104        let mut stmt = conn.prepare(
105            "SELECT id, session_id, node_type, label, properties, embedding_id,
106                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107             FROM graph_nodes WHERE id = ?",
108        )?;
109        let mut rows = stmt.query(params![node_id])?;
110        if let Some(row) = rows.next()? {
111            Ok(Some(Self::row_to_graph_node(row)?))
112        } else {
113            Ok(None)
114        }
115    }
116
117    pub fn list_graph_nodes(
118        &self,
119        session_id: &str,
120        node_type: Option<NodeType>,
121        limit: Option<i64>,
122    ) -> Result<Vec<GraphNode>> {
123        let conn = self.conn();
124
125        let nodes = if let Some(nt) = node_type {
126            let mut stmt = conn.prepare(
127                "SELECT id, session_id, node_type, label, properties, embedding_id,
128                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129                 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130                 ORDER BY id DESC LIMIT ?",
131            )?;
132            let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133            Self::collect_graph_nodes(query)?
134        } else {
135            let mut stmt = conn.prepare(
136                "SELECT id, session_id, node_type, label, properties, embedding_id,
137                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138                 FROM graph_nodes WHERE session_id = ?
139                 ORDER BY id DESC LIMIT ?",
140            )?;
141            let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142            Self::collect_graph_nodes(query)?
143        };
144
145        Ok(nodes)
146    }
147
148    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149        let conn = self.conn();
150        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152        Ok(count)
153    }
154
155    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156        let conn = self.conn();
157
158        let mut stmt = conn.prepare(
159            "SELECT session_id, node_type, label, vector_clock, sync_enabled
160             FROM graph_nodes WHERE id = ?",
161        )?;
162
163        let (session_id, node_type, label, current_vc_json, sync_enabled): (
164            String,
165            String,
166            String,
167            Option<String>,
168            bool,
169        ) = stmt.query_row(params![node_id], |row| {
170            Ok((
171                row.get(0)?,
172                row.get(1)?,
173                row.get(2)?,
174                row.get(3)?,
175                row.get(4).unwrap_or(false),
176            ))
177        })?;
178
179        let mut vector_clock = if let Some(vc_json) = current_vc_json {
180            VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181        } else {
182            VectorClock::new()
183        };
184        vector_clock.increment(&self.instance_id);
185        let vc_json = vector_clock.to_json()?;
186
187        conn.execute(
188            "UPDATE graph_nodes
189             SET properties = ?,
190                 vector_clock = ?,
191                 last_modified_by = ?,
192                 updated_at = CURRENT_TIMESTAMP
193             WHERE id = ?",
194            params![properties.to_string(), vc_json, self.instance_id, node_id],
195        )?;
196
197        if sync_enabled {
198            let node_data = serde_json::json!({
199                "id": node_id,
200                "session_id": session_id,
201                "node_type": node_type,
202                "label": label,
203                "properties": properties,
204            });
205
206            self.graph_changelog_append(
207                &session_id,
208                &self.instance_id,
209                "node",
210                node_id,
211                "update",
212                &vc_json,
213                Some(&node_data.to_string()),
214            )?;
215        }
216
217        Ok(())
218    }
219
220    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221        let conn = self.conn();
222
223        let mut stmt = conn.prepare(
224            "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225             FROM graph_nodes WHERE id = ?",
226        )?;
227
228        let result = stmt.query_row(params![node_id], |row| {
229            Ok((
230                row.get::<_, String>(0)?,
231                row.get::<_, String>(1)?,
232                row.get::<_, String>(2)?,
233                row.get::<_, String>(3)?,
234                row.get::<_, Option<String>>(4)?,
235                row.get::<_, bool>(5).unwrap_or(false),
236            ))
237        });
238
239        if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240            result
241        {
242            if sync_enabled {
243                let mut vector_clock = if let Some(vc_json) = current_vc_json {
244                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245                } else {
246                    VectorClock::new()
247                };
248                vector_clock.increment(&self.instance_id);
249                let vc_json = vector_clock.to_json()?;
250
251                conn.execute(
252                    "INSERT INTO graph_tombstones
253                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
254                     VALUES (?, ?, ?, ?, ?)",
255                    params![session_id, "node", node_id, self.instance_id, vc_json],
256                )?;
257
258                let node_data = serde_json::json!({
259                    "id": node_id,
260                    "session_id": session_id,
261                    "node_type": node_type,
262                    "label": label,
263                    "properties": properties,
264                });
265
266                self.graph_changelog_append(
267                    &session_id,
268                    &self.instance_id,
269                    "node",
270                    node_id,
271                    "delete",
272                    &vc_json,
273                    Some(&node_data.to_string()),
274                )?;
275            }
276        }
277
278        conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279        Ok(())
280    }
281
282    // ---------- Graph Edge Operations ----------
283
284    pub fn insert_graph_edge(
285        &self,
286        session_id: &str,
287        source_id: i64,
288        target_id: i64,
289        edge_type: EdgeType,
290        predicate: Option<&str>,
291        properties: Option<&JsonValue>,
292        weight: f32,
293    ) -> Result<i64> {
294        let sync_enabled = self
295            .graph_get_sync_enabled(session_id, "default")
296            .unwrap_or(false);
297
298        let mut vector_clock = VectorClock::new();
299        vector_clock.increment(&self.instance_id);
300        let vc_json = vector_clock.to_json()?;
301
302        let conn = self.conn();
303
304        let mut stmt = conn.prepare(
305            "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
306                                     vector_clock, last_modified_by, sync_enabled)
307             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
308        )?;
309        let props_str = properties.map(|p| p.to_string());
310        let id: i64 = stmt.query_row(
311            params![
312                session_id,
313                source_id,
314                target_id,
315                edge_type.as_str(),
316                predicate,
317                props_str,
318                weight,
319                vc_json,
320                self.instance_id,
321                sync_enabled,
322            ],
323            |row| row.get(0),
324        )?;
325
326        if sync_enabled {
327            let edge_data = serde_json::json!({
328                "id": id,
329                "session_id": session_id,
330                "source_id": source_id,
331                "target_id": target_id,
332                "edge_type": edge_type.as_str(),
333                "predicate": predicate,
334                "properties": properties,
335                "weight": weight,
336            });
337
338            self.graph_changelog_append(
339                session_id,
340                &self.instance_id,
341                "edge",
342                id,
343                "insert",
344                &vc_json,
345                Some(&edge_data.to_string()),
346            )?;
347        }
348
349        Ok(id)
350    }
351
352    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
353        let conn = self.conn();
354        let mut stmt = conn.prepare(
355            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
356                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
357             FROM graph_edges WHERE id = ?",
358        )?;
359        let mut rows = stmt.query(params![edge_id])?;
360        if let Some(row) = rows.next()? {
361            Ok(Some(Self::row_to_graph_edge(row)?))
362        } else {
363            Ok(None)
364        }
365    }
366
367    pub fn list_graph_edges(
368        &self,
369        session_id: &str,
370        source_id: Option<i64>,
371        target_id: Option<i64>,
372    ) -> Result<Vec<GraphEdge>> {
373        let conn = self.conn();
374
375        let edges = match (source_id, target_id) {
376            (Some(src), Some(tgt)) => {
377                let mut stmt = conn.prepare(
378                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
379                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
380                     FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
381                )?;
382                let query = stmt.query(params![session_id, src, tgt])?;
383                Self::collect_graph_edges(query)?
384            }
385            (Some(src), None) => {
386                let mut stmt = conn.prepare(
387                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
388                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
389                     FROM graph_edges WHERE session_id = ? AND source_id = ?",
390                )?;
391                let query = stmt.query(params![session_id, src])?;
392                Self::collect_graph_edges(query)?
393            }
394            (None, Some(tgt)) => {
395                let mut stmt = conn.prepare(
396                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
397                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
398                     FROM graph_edges WHERE session_id = ? AND target_id = ?",
399                )?;
400                let query = stmt.query(params![session_id, tgt])?;
401                Self::collect_graph_edges(query)?
402            }
403            (None, None) => {
404                let mut stmt = conn.prepare(
405                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
406                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
407                     FROM graph_edges WHERE session_id = ?",
408                )?;
409                let query = stmt.query(params![session_id])?;
410                Self::collect_graph_edges(query)?
411            }
412        };
413
414        Ok(edges)
415    }
416
417    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
418        let conn = self.conn();
419        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
420        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
421        Ok(count)
422    }
423
424    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
425        let conn = self.conn();
426
427        let mut stmt = conn.prepare(
428            "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
429                    vector_clock, sync_enabled
430             FROM graph_edges WHERE id = ?",
431        )?;
432
433        let result = stmt.query_row(params![edge_id], |row| {
434            Ok((
435                row.get::<_, String>(0)?,
436                row.get::<_, i64>(1)?,
437                row.get::<_, i64>(2)?,
438                row.get::<_, String>(3)?,
439                row.get::<_, Option<String>>(4)?,
440                row.get::<_, Option<String>>(5)?,
441                row.get::<_, f32>(6)?,
442                row.get::<_, Option<String>>(7)?,
443                row.get::<_, bool>(8).unwrap_or(false),
444            ))
445        });
446
447        if let Ok((
448            session_id,
449            source_id,
450            target_id,
451            edge_type,
452            predicate,
453            properties,
454            weight,
455            current_vc_json,
456            sync_enabled,
457        )) = result
458        {
459            if sync_enabled {
460                let mut vector_clock = if let Some(vc_json) = current_vc_json {
461                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
462                } else {
463                    VectorClock::new()
464                };
465                vector_clock.increment(&self.instance_id);
466                let vc_json = vector_clock.to_json()?;
467
468                conn.execute(
469                    "INSERT INTO graph_tombstones
470                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
471                     VALUES (?, ?, ?, ?, ?)",
472                    params![session_id, "edge", edge_id, self.instance_id, vc_json],
473                )?;
474
475                let edge_data = serde_json::json!({
476                    "id": edge_id,
477                    "session_id": session_id,
478                    "source_id": source_id,
479                    "target_id": target_id,
480                    "edge_type": edge_type,
481                    "predicate": predicate,
482                    "properties": properties,
483                    "weight": weight,
484                });
485
486                self.graph_changelog_append(
487                    &session_id,
488                    &self.instance_id,
489                    "edge",
490                    edge_id,
491                    "delete",
492                    &vc_json,
493                    Some(&edge_data.to_string()),
494                )?;
495            }
496        }
497
498        conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
499        Ok(())
500    }
501
502    // ---------- Graph Traversal Operations ----------
503
504    pub fn find_shortest_path(
505        &self,
506        session_id: &str,
507        source_id: i64,
508        target_id: i64,
509        max_hops: Option<usize>,
510    ) -> Result<Option<GraphPath>> {
511        let max_depth = max_hops.unwrap_or(10);
512
513        let mut visited = HashSet::new();
514        let mut queue = VecDeque::new();
515        let mut parent_map = HashMap::new();
516
517        queue.push_back((source_id, 0));
518        visited.insert(source_id);
519
520        while let Some((current_id, depth)) = queue.pop_front() {
521            if current_id == target_id {
522                let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
523                return Ok(Some(path));
524            }
525
526            if depth >= max_depth {
527                continue;
528            }
529
530            let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
531            for edge in edges {
532                let target = edge.target_id;
533                if !visited.contains(&target) {
534                    visited.insert(target);
535                    parent_map.insert(target, (current_id, edge));
536                    queue.push_back((target, depth + 1));
537                }
538            }
539        }
540
541        Ok(None)
542    }
543
544    pub fn traverse_neighbors(
545        &self,
546        session_id: &str,
547        node_id: i64,
548        direction: TraversalDirection,
549        depth: usize,
550    ) -> Result<Vec<GraphNode>> {
551        if depth == 0 {
552            return Ok(vec![]);
553        }
554
555        let mut visited = HashSet::new();
556        let mut result = Vec::new();
557        let mut queue = VecDeque::new();
558
559        queue.push_back((node_id, 0));
560        visited.insert(node_id);
561
562        while let Some((current_id, current_depth)) = queue.pop_front() {
563            if current_depth > 0 {
564                if let Some(node) = self.get_graph_node(current_id)? {
565                    result.push(node);
566                }
567            }
568
569            if current_depth >= depth {
570                continue;
571            }
572
573            let edges = match direction {
574                TraversalDirection::Outgoing => {
575                    self.list_graph_edges(session_id, Some(current_id), None)?
576                }
577                TraversalDirection::Incoming => {
578                    self.list_graph_edges(session_id, None, Some(current_id))?
579                }
580                TraversalDirection::Both => {
581                    let mut out_edges =
582                        self.list_graph_edges(session_id, Some(current_id), None)?;
583                    let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
584                    out_edges.extend(in_edges);
585                    out_edges
586                }
587            };
588
589            for edge in edges {
590                let next_id = match direction {
591                    TraversalDirection::Outgoing => edge.target_id,
592                    TraversalDirection::Incoming => edge.source_id,
593                    TraversalDirection::Both => {
594                        if edge.source_id == current_id {
595                            edge.target_id
596                        } else {
597                            edge.source_id
598                        }
599                    }
600                };
601
602                if !visited.contains(&next_id) {
603                    visited.insert(next_id);
604                    queue.push_back((next_id, current_depth + 1));
605                }
606            }
607        }
608
609        Ok(result)
610    }
611
612    fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
613        let id: i64 = row.get(0)?;
614        let session_id: String = row.get(1)?;
615        let node_type: String = row.get(2)?;
616        let label: String = row.get(3)?;
617        let properties: String = row.get(4)?;
618        let embedding_id: Option<i64> = row.get(5)?;
619        let created_at: String = row.get(6)?;
620        let updated_at: String = row.get(7)?;
621
622        Ok(GraphNode {
623            id,
624            session_id,
625            node_type: NodeType::from_str(&node_type),
626            label,
627            properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
628            embedding_id,
629            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
630            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
631        })
632    }
633
634    fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
635        let id: i64 = row.get(0)?;
636        let session_id: String = row.get(1)?;
637        let source_id: i64 = row.get(2)?;
638        let target_id: i64 = row.get(3)?;
639        let edge_type: String = row.get(4)?;
640        let predicate: Option<String> = row.get(5)?;
641        let properties: Option<String> = row.get(6)?;
642        let weight: f32 = row.get(7)?;
643        let temporal_start: Option<String> = row.get(8)?;
644        let temporal_end: Option<String> = row.get(9)?;
645        let created_at: String = row.get(10)?;
646
647        Ok(GraphEdge {
648            id,
649            session_id,
650            source_id,
651            target_id,
652            edge_type: EdgeType::from_str(&edge_type),
653            predicate,
654            properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
655            weight,
656            temporal_start: temporal_start.and_then(|s| s.parse().ok()),
657            temporal_end: temporal_end.and_then(|s| s.parse().ok()),
658            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
659        })
660    }
661
662    fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
663        let mut nodes = Vec::new();
664        while let Some(row) = rows.next()? {
665            nodes.push(Self::row_to_graph_node(row)?);
666        }
667        Ok(nodes)
668    }
669
670    fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
671        let mut edges = Vec::new();
672        while let Some(row) = rows.next()? {
673            edges.push(Self::row_to_graph_edge(row)?);
674        }
675        Ok(edges)
676    }
677
678    fn reconstruct_path(
679        &self,
680        parent_map: &HashMap<i64, (i64, GraphEdge)>,
681        source_id: i64,
682        target_id: i64,
683    ) -> Result<GraphPath> {
684        let mut path_edges = Vec::new();
685        let mut path_nodes = Vec::new();
686        let mut current = target_id;
687        let mut total_weight = 0.0;
688
689        while current != source_id {
690            if let Some((parent, edge)) = parent_map.get(&current) {
691                path_edges.push(edge.clone());
692                total_weight += edge.weight;
693                current = *parent;
694            } else {
695                break;
696            }
697        }
698
699        path_edges.reverse();
700
701        if let Some(node) = self.get_graph_node(source_id)? {
702            path_nodes.push(node);
703        }
704        for edge in &path_edges {
705            if let Some(node) = self.get_graph_node(edge.target_id)? {
706                path_nodes.push(node);
707            }
708        }
709
710        Ok(GraphPath {
711            length: path_edges.len(),
712            weight: total_weight,
713            nodes: path_nodes,
714            edges: path_edges,
715        })
716    }
717
718    // ===== Graph Synchronization Methods =====
719
720    pub fn graph_changelog_append(
721        &self,
722        session_id: &str,
723        instance_id: &str,
724        entity_type: &str,
725        entity_id: i64,
726        operation: &str,
727        vector_clock: &str,
728        data: Option<&str>,
729    ) -> Result<i64> {
730        let conn = self.conn();
731        let mut stmt = conn.prepare(
732            "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
733             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
734        )?;
735        let id: i64 = stmt.query_row(
736            params![
737                session_id,
738                instance_id,
739                entity_type,
740                entity_id,
741                operation,
742                vector_clock,
743                data
744            ],
745            |row| row.get(0),
746        )?;
747        Ok(id)
748    }
749
750    pub fn graph_changelog_get_since(
751        &self,
752        session_id: &str,
753        since_timestamp: &str,
754    ) -> Result<Vec<ChangelogEntry>> {
755        let conn = self.conn();
756        let mut stmt = conn.prepare(
757            "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
758             FROM graph_changelog
759             WHERE session_id = ? AND created_at > ?
760             ORDER BY created_at ASC",
761        )?;
762        let mut rows = stmt.query(params![session_id, since_timestamp])?;
763        let mut entries = Vec::new();
764        while let Some(row) = rows.next()? {
765            entries.push(ChangelogEntry::from_row(row)?);
766        }
767        Ok(entries)
768    }
769
770    pub fn graph_list_conflicts(
771        &self,
772        session_id: Option<&str>,
773        limit: usize,
774    ) -> Result<Vec<ChangelogEntry>> {
775        let conn = self.conn();
776        let mut entries = Vec::new();
777
778        if let Some(sid) = session_id {
779            let mut stmt = conn.prepare(
780                "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
781                 FROM graph_changelog
782                 WHERE operation = 'conflict' AND session_id = ?
783                 ORDER BY created_at DESC
784                 LIMIT ?",
785            )?;
786            let mut rows = stmt.query(params![sid, limit as i64])?;
787            while let Some(row) = rows.next()? {
788                entries.push(ChangelogEntry::from_row(row)?);
789            }
790        } else {
791            let mut stmt = conn.prepare(
792                "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
793                 FROM graph_changelog
794                 WHERE operation = 'conflict'
795                 ORDER BY created_at DESC
796                 LIMIT ?",
797            )?;
798            let mut rows = stmt.query(params![limit as i64])?;
799            while let Some(row) = rows.next()? {
800                entries.push(ChangelogEntry::from_row(row)?);
801            }
802        }
803
804        Ok(entries)
805    }
806
807    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
808        let conn = self.conn();
809        let cutoff = Utc::now() - Duration::days(days_to_keep);
810        let cutoff_str = cutoff.to_rfc3339();
811        let deleted = conn.execute(
812            "DELETE FROM graph_changelog WHERE created_at < ?",
813            params![cutoff_str],
814        )?;
815        Ok(deleted)
816    }
817
818    pub fn graph_sync_state_get_metadata(
819        &self,
820        instance_id: &str,
821        session_id: &str,
822        graph_name: &str,
823    ) -> Result<Option<SyncStateRecord>> {
824        let conn = self.conn();
825        let result: Result<SyncStateRecord, _> = conn.query_row(
826            "SELECT vector_clock, CAST(last_sync_at AS TEXT)
827             FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
828            params![instance_id, session_id, graph_name],
829            |row| {
830                Ok(SyncStateRecord {
831                    vector_clock: row.get(0)?,
832                    last_sync_at: row.get(1).ok(),
833                })
834            },
835        );
836
837        match result {
838            Ok(record) => Ok(Some(record)),
839            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
840            Err(e) => Err(e.into()),
841        }
842    }
843
844    pub fn graph_sync_state_get(
845        &self,
846        instance_id: &str,
847        session_id: &str,
848        graph_name: &str,
849    ) -> Result<Option<String>> {
850        Ok(self
851            .graph_sync_state_get_metadata(instance_id, session_id, graph_name)?
852            .map(|r| r.vector_clock))
853    }
854
855    pub fn graph_sync_state_update(
856        &self,
857        instance_id: &str,
858        session_id: &str,
859        graph_name: &str,
860        vector_clock: &str,
861    ) -> Result<()> {
862        let conn = self.conn();
863        conn.execute(
864            "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock, last_sync_at)
865             VALUES (?, ?, ?, ?, now())
866             ON CONFLICT (instance_id, session_id, graph_name)
867             DO UPDATE SET vector_clock = EXCLUDED.vector_clock, last_sync_at = now()",
868            params![instance_id, session_id, graph_name, vector_clock],
869        )?;
870        Ok(())
871    }
872
873    pub fn graph_set_sync_enabled(
874        &self,
875        session_id: &str,
876        graph_name: &str,
877        enabled: bool,
878    ) -> Result<()> {
879        let conn = self.conn();
880        // Upsert: insert if not exists, update if exists
881        conn.execute(
882            "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
883             VALUES (?, ?, ?)
884             ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
885            params![session_id, graph_name, enabled],
886        )?;
887        Ok(())
888    }
889
890    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
891        let conn = self.conn();
892        let result: Result<bool, _> = conn.query_row(
893            "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
894            params![session_id, graph_name],
895            |row| row.get(0),
896        );
897        match result {
898            Ok(enabled) => Ok(enabled),
899            Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
900            Err(e) => Err(e.into()),
901        }
902    }
903
904    pub fn graph_set_sync_config(
905        &self,
906        session_id: &str,
907        graph_name: &str,
908        sync_enabled: bool,
909        conflict_resolution_strategy: Option<&str>,
910        sync_interval_seconds: Option<u64>,
911    ) -> Result<GraphSyncConfig> {
912        let conn = self.conn();
913
914        // Load existing config to preserve unrelated keys
915        let existing_config_value: JsonValue = conn
916            .query_row(
917                "SELECT config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
918                params![session_id, graph_name],
919                |row| row.get::<_, Option<String>>(0),
920            )
921            .unwrap_or(None)
922            .as_deref()
923            .and_then(|s| serde_json::from_str(s).ok())
924            .unwrap_or_else(|| JsonValue::Object(Map::new()));
925
926        let mut root_obj = existing_config_value
927            .as_object()
928            .cloned()
929            .unwrap_or_default();
930        let mut sync_obj = root_obj
931            .get("sync")
932            .and_then(|v| v.as_object())
933            .cloned()
934            .unwrap_or_default();
935
936        let final_strategy = conflict_resolution_strategy
937            .map(|s| s.to_string())
938            .or_else(|| {
939                sync_obj
940                    .get("conflict_resolution_strategy")
941                    .and_then(|v| v.as_str().map(|s| s.to_string()))
942            })
943            .or_else(|| Some("vector_clock".to_string()));
944
945        let final_interval = sync_interval_seconds
946            .or_else(|| {
947                sync_obj
948                    .get("sync_interval_seconds")
949                    .and_then(|v| v.as_u64())
950            })
951            .or(Some(60));
952
953        if let Some(strategy) = final_strategy.clone() {
954            sync_obj.insert(
955                "conflict_resolution_strategy".to_string(),
956                JsonValue::String(strategy),
957            );
958        }
959        if let Some(interval) = final_interval {
960            sync_obj.insert(
961                "sync_interval_seconds".to_string(),
962                JsonValue::from(interval),
963            );
964        }
965
966        root_obj.insert("sync".to_string(), JsonValue::Object(sync_obj));
967        let merged_config = JsonValue::Object(root_obj).to_string();
968
969        conn.execute(
970            "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled, config, updated_at)
971             VALUES (?, ?, ?, ?, now())
972             ON CONFLICT (session_id, graph_name)
973             DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled,
974                           config = EXCLUDED.config,
975                           updated_at = now()",
976            params![session_id, graph_name, sync_enabled, merged_config],
977        )?;
978
979        Ok(GraphSyncConfig {
980            sync_enabled,
981            conflict_resolution_strategy: final_strategy,
982            sync_interval_seconds: final_interval,
983        })
984    }
985
986    pub fn graph_get_sync_config(
987        &self,
988        session_id: &str,
989        graph_name: &str,
990    ) -> Result<GraphSyncConfig> {
991        let conn = self.conn();
992        let result: Result<(bool, Option<String>), _> = conn.query_row(
993            "SELECT sync_enabled, config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
994            params![session_id, graph_name],
995            |row| Ok((row.get(0)?, row.get(1)?)),
996        );
997
998        match result {
999            Ok((sync_enabled, config_json)) => {
1000                let config_value: JsonValue = config_json
1001                    .as_deref()
1002                    .and_then(|s| serde_json::from_str(s).ok())
1003                    .unwrap_or_else(|| JsonValue::Object(Map::new()));
1004                let sync_obj = config_value
1005                    .get("sync")
1006                    .and_then(|v| v.as_object())
1007                    .cloned()
1008                    .unwrap_or_default();
1009
1010                Ok(GraphSyncConfig {
1011                    sync_enabled,
1012                    conflict_resolution_strategy: sync_obj
1013                        .get("conflict_resolution_strategy")
1014                        .and_then(|v| v.as_str())
1015                        .map(|s| s.to_string())
1016                        .or_else(|| Some("vector_clock".to_string())),
1017                    sync_interval_seconds: sync_obj
1018                        .get("sync_interval_seconds")
1019                        .and_then(|v| v.as_u64())
1020                        .or(Some(60)),
1021                })
1022            }
1023            Err(duckdb::Error::QueryReturnedNoRows) => Ok(GraphSyncConfig::default()),
1024            Err(e) => Err(e.into()),
1025        }
1026    }
1027
1028    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1029        let conn = self.conn();
1030        let mut stmt = conn.prepare(
1031            "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1032             UNION
1033             SELECT DISTINCT 'default' as graph_name
1034             FROM graph_nodes WHERE session_id = ?
1035             ORDER BY graph_name",
1036        )?;
1037
1038        let mut graphs = Vec::new();
1039        let mut rows = stmt.query(params![session_id, session_id])?;
1040        while let Some(row) = rows.next()? {
1041            let graph_name: String = row.get(0)?;
1042            graphs.push(graph_name);
1043        }
1044
1045        if graphs.is_empty() {
1046            let node_count: i64 = conn.query_row(
1047                "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1048                params![session_id],
1049                |row| row.get(0),
1050            )?;
1051            if node_count > 0 {
1052                graphs.push("default".to_string());
1053            }
1054        }
1055
1056        Ok(graphs)
1057    }
1058
1059    /// List all sync-enabled graphs across all sessions
1060    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
1061        let conn = self.conn();
1062        let mut stmt = conn.prepare(
1063            "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
1064        )?;
1065
1066        let mut results = Vec::new();
1067        let mut rows = stmt.query(params![])?;
1068        while let Some(row) = rows.next()? {
1069            let session_id: String = row.get(0)?;
1070            let graph_name: String = row.get(1)?;
1071            results.push((session_id, graph_name));
1072        }
1073
1074        Ok(results)
1075    }
1076
1077    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1078        let conn = self.conn();
1079        let result: Result<SyncedNodeRecord, _> = conn.query_row(
1080            "SELECT id, session_id, node_type, label, properties, embedding_id,
1081                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1082                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1083             FROM graph_nodes WHERE id = ?",
1084            params![node_id],
1085            SyncedNodeRecord::from_row,
1086        );
1087        match result {
1088            Ok(node) => Ok(Some(node)),
1089            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1090            Err(e) => Err(e.into()),
1091        }
1092    }
1093
1094    pub fn graph_list_nodes_with_sync(
1095        &self,
1096        session_id: &str,
1097        sync_enabled_only: bool,
1098        include_deleted: bool,
1099    ) -> Result<Vec<SyncedNodeRecord>> {
1100        let conn = self.conn();
1101        let mut query = String::from(
1102            "SELECT id, session_id, node_type, label, properties, embedding_id,
1103                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1104                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1105             FROM graph_nodes WHERE session_id = ?",
1106        );
1107
1108        if sync_enabled_only {
1109            query.push_str(" AND sync_enabled = TRUE");
1110        }
1111        if !include_deleted {
1112            query.push_str(" AND is_deleted = FALSE");
1113        }
1114        query.push_str(" ORDER BY created_at ASC");
1115
1116        let mut stmt = conn.prepare(&query)?;
1117        let mut rows = stmt.query(params![session_id])?;
1118        let mut nodes = Vec::new();
1119        while let Some(row) = rows.next()? {
1120            nodes.push(SyncedNodeRecord::from_row(row)?);
1121        }
1122        Ok(nodes)
1123    }
1124
1125    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1126        let conn = self.conn();
1127        let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1128            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1129                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1130                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1131             FROM graph_edges WHERE id = ?",
1132            params![edge_id],
1133            SyncedEdgeRecord::from_row,
1134        );
1135        match result {
1136            Ok(edge) => Ok(Some(edge)),
1137            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1138            Err(e) => Err(e.into()),
1139        }
1140    }
1141
1142    pub fn graph_list_edges_with_sync(
1143        &self,
1144        session_id: &str,
1145        sync_enabled_only: bool,
1146        include_deleted: bool,
1147    ) -> Result<Vec<SyncedEdgeRecord>> {
1148        let conn = self.conn();
1149        let mut query = String::from(
1150            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1151                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1152                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1153             FROM graph_edges WHERE session_id = ?",
1154        );
1155
1156        if sync_enabled_only {
1157            query.push_str(" AND sync_enabled = TRUE");
1158        }
1159        if !include_deleted {
1160            query.push_str(" AND is_deleted = FALSE");
1161        }
1162        query.push_str(" ORDER BY created_at ASC");
1163
1164        let mut stmt = conn.prepare(&query)?;
1165        let mut rows = stmt.query(params![session_id])?;
1166        let mut edges = Vec::new();
1167        while let Some(row) = rows.next()? {
1168            edges.push(SyncedEdgeRecord::from_row(row)?);
1169        }
1170        Ok(edges)
1171    }
1172
1173    pub fn graph_update_node_sync_metadata(
1174        &self,
1175        node_id: i64,
1176        vector_clock: &str,
1177        last_modified_by: &str,
1178        sync_enabled: bool,
1179    ) -> Result<()> {
1180        let conn = self.conn();
1181        conn.execute(
1182            "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1183             WHERE id = ?",
1184            params![vector_clock, last_modified_by, sync_enabled, node_id],
1185        )?;
1186        Ok(())
1187    }
1188
1189    pub fn graph_update_edge_sync_metadata(
1190        &self,
1191        edge_id: i64,
1192        vector_clock: &str,
1193        last_modified_by: &str,
1194        sync_enabled: bool,
1195    ) -> Result<()> {
1196        let conn = self.conn();
1197        conn.execute(
1198            "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1199             WHERE id = ?",
1200            params![vector_clock, last_modified_by, sync_enabled, edge_id],
1201        )?;
1202        Ok(())
1203    }
1204
1205    pub fn graph_mark_node_deleted(
1206        &self,
1207        node_id: i64,
1208        vector_clock: &str,
1209        deleted_by: &str,
1210    ) -> Result<()> {
1211        let conn = self.conn();
1212        conn.execute(
1213            "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1214             WHERE id = ?",
1215            params![vector_clock, deleted_by, node_id],
1216        )?;
1217        Ok(())
1218    }
1219
1220    pub fn graph_mark_edge_deleted(
1221        &self,
1222        edge_id: i64,
1223        vector_clock: &str,
1224        deleted_by: &str,
1225    ) -> Result<()> {
1226        let conn = self.conn();
1227        conn.execute(
1228            "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1229             WHERE id = ?",
1230            params![vector_clock, deleted_by, edge_id],
1231        )?;
1232        Ok(())
1233    }
1234}
1235
1236#[derive(Debug, Clone)]
1237pub struct SyncStateRecord {
1238    pub vector_clock: String,
1239    pub last_sync_at: Option<String>,
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct GraphSyncConfig {
1244    pub sync_enabled: bool,
1245    pub conflict_resolution_strategy: Option<String>,
1246    pub sync_interval_seconds: Option<u64>,
1247}
1248
1249impl Default for GraphSyncConfig {
1250    fn default() -> Self {
1251        Self {
1252            sync_enabled: false,
1253            conflict_resolution_strategy: Some("vector_clock".to_string()),
1254            sync_interval_seconds: Some(60),
1255        }
1256    }
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct ChangelogEntry {
1261    pub id: i64,
1262    pub session_id: String,
1263    pub instance_id: String,
1264    pub entity_type: String,
1265    pub entity_id: i64,
1266    pub operation: String,
1267    pub vector_clock: String,
1268    pub data: Option<String>,
1269    pub created_at: DateTime<Utc>,
1270}
1271
1272impl ChangelogEntry {
1273    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1274        let id: i64 = row.get(0)?;
1275        let session_id: String = row.get(1)?;
1276        let instance_id: String = row.get(2)?;
1277        let entity_type: String = row.get(3)?;
1278        let entity_id: i64 = row.get(4)?;
1279        let operation: String = row.get(5)?;
1280        let vector_clock: String = row.get(6)?;
1281        let data: Option<String> = row.get(7)?;
1282        let created_at_str: String = row.get(8)?;
1283
1284        Ok(ChangelogEntry {
1285            id,
1286            session_id,
1287            instance_id,
1288            entity_type,
1289            entity_id,
1290            operation,
1291            vector_clock,
1292            data,
1293            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1294        })
1295    }
1296}
1297
1298#[derive(Debug, Clone)]
1299pub struct SyncedNodeRecord {
1300    pub id: i64,
1301    pub session_id: String,
1302    pub node_type: String,
1303    pub label: String,
1304    pub properties: JsonValue,
1305    pub embedding_id: Option<i64>,
1306    pub created_at: DateTime<Utc>,
1307    pub updated_at: DateTime<Utc>,
1308    pub vector_clock: String,
1309    pub last_modified_by: Option<String>,
1310    pub is_deleted: bool,
1311    pub sync_enabled: bool,
1312}
1313
1314impl SyncedNodeRecord {
1315    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1316        let id: i64 = row.get(0)?;
1317        let session_id: String = row.get(1)?;
1318        let node_type: String = row.get(2)?;
1319        let label: String = row.get(3)?;
1320        let properties_str: String = row.get(4)?;
1321        let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1322            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1323        })?;
1324        let embedding_id: Option<i64> = row.get(5)?;
1325        let created_at_str: String = row.get(6)?;
1326        let updated_at_str: String = row.get(7)?;
1327        let vector_clock: String = row.get(8)?;
1328        let last_modified_by: Option<String> = row.get(9)?;
1329        let is_deleted: bool = row.get(10)?;
1330        let sync_enabled: bool = row.get(11)?;
1331
1332        Ok(SyncedNodeRecord {
1333            id,
1334            session_id,
1335            node_type,
1336            label,
1337            properties,
1338            embedding_id,
1339            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1340            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1341            vector_clock,
1342            last_modified_by,
1343            is_deleted,
1344            sync_enabled,
1345        })
1346    }
1347}
1348
1349#[derive(Debug, Clone)]
1350pub struct SyncedEdgeRecord {
1351    pub id: i64,
1352    pub session_id: String,
1353    pub source_id: i64,
1354    pub target_id: i64,
1355    pub edge_type: String,
1356    pub predicate: Option<String>,
1357    pub properties: Option<JsonValue>,
1358    pub weight: f32,
1359    pub temporal_start: Option<DateTime<Utc>>,
1360    pub temporal_end: Option<DateTime<Utc>>,
1361    pub created_at: DateTime<Utc>,
1362    pub vector_clock: String,
1363    pub last_modified_by: Option<String>,
1364    pub is_deleted: bool,
1365    pub sync_enabled: bool,
1366}
1367
1368impl SyncedEdgeRecord {
1369    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1370        let id: i64 = row.get(0)?;
1371        let session_id: String = row.get(1)?;
1372        let source_id: i64 = row.get(2)?;
1373        let target_id: i64 = row.get(3)?;
1374        let edge_type: String = row.get(4)?;
1375        let predicate: Option<String> = row.get(5)?;
1376        let properties_str: Option<String> = row.get(6)?;
1377        let properties: Option<JsonValue> = properties_str
1378            .as_ref()
1379            .and_then(|s| serde_json::from_str(s).ok());
1380        let weight: f32 = row.get(7)?;
1381        let temporal_start_str: Option<String> = row.get(8)?;
1382        let temporal_end_str: Option<String> = row.get(9)?;
1383        let created_at_str: String = row.get(10)?;
1384        let vector_clock: String = row.get(11)?;
1385        let last_modified_by: Option<String> = row.get(12)?;
1386        let is_deleted: bool = row.get(13)?;
1387        let sync_enabled: bool = row.get(14)?;
1388
1389        Ok(SyncedEdgeRecord {
1390            id,
1391            session_id,
1392            source_id,
1393            target_id,
1394            edge_type,
1395            predicate,
1396            properties,
1397            weight,
1398            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1399            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1400            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1401            vector_clock,
1402            last_modified_by,
1403            is_deleted,
1404            sync_enabled,
1405        })
1406    }
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411    use super::*;
1412    use anyhow::Result;
1413    use serde_json::json;
1414
1415    fn setup_store() -> KnowledgeGraphStore {
1416        setup_store_with(|_| {})
1417    }
1418
1419    fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1420    where
1421        F: FnOnce(&Connection),
1422    {
1423        let conn = Connection::open_in_memory().expect("open in-memory database");
1424        conn.execute_batch(
1425            r#"
1426            CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1427            CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1428            CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1429            CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1430            CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1431
1432            CREATE TABLE graph_nodes (
1433                id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1434                session_id TEXT NOT NULL,
1435                node_type TEXT NOT NULL,
1436                label TEXT NOT NULL,
1437                properties TEXT NOT NULL,
1438                embedding_id BIGINT,
1439                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1440                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1441                vector_clock TEXT DEFAULT '{}',
1442                last_modified_by TEXT,
1443                is_deleted BOOLEAN DEFAULT FALSE,
1444                sync_enabled BOOLEAN DEFAULT FALSE
1445            );
1446
1447            CREATE TABLE graph_edges (
1448                id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1449                session_id TEXT NOT NULL,
1450                source_id BIGINT NOT NULL,
1451                target_id BIGINT NOT NULL,
1452                edge_type TEXT NOT NULL,
1453                predicate TEXT,
1454                properties TEXT,
1455                weight REAL DEFAULT 1.0,
1456                temporal_start TIMESTAMP,
1457                temporal_end TIMESTAMP,
1458                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1459                vector_clock TEXT DEFAULT '{}',
1460                last_modified_by TEXT,
1461                is_deleted BOOLEAN DEFAULT FALSE,
1462                sync_enabled BOOLEAN DEFAULT FALSE
1463            );
1464
1465            CREATE TABLE graph_metadata (
1466                id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1467                session_id TEXT NOT NULL,
1468                graph_name TEXT NOT NULL,
1469                is_created BOOLEAN DEFAULT FALSE,
1470                schema_version INTEGER DEFAULT 1,
1471                config TEXT,
1472                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1473                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1474                sync_enabled BOOLEAN DEFAULT FALSE,
1475                UNIQUE(session_id, graph_name)
1476            );
1477
1478            CREATE TABLE graph_changelog (
1479                id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1480                session_id TEXT NOT NULL,
1481                instance_id TEXT NOT NULL,
1482                entity_type TEXT NOT NULL,
1483                entity_id BIGINT NOT NULL,
1484                operation TEXT NOT NULL,
1485                vector_clock TEXT NOT NULL,
1486                data TEXT,
1487                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1488            );
1489
1490            CREATE TABLE graph_sync_state (
1491                instance_id TEXT NOT NULL,
1492                session_id TEXT NOT NULL,
1493                graph_name TEXT NOT NULL,
1494                vector_clock TEXT NOT NULL,
1495                last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1496                PRIMARY KEY (instance_id, session_id, graph_name)
1497            );
1498
1499            CREATE TABLE graph_tombstones (
1500                id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1501                session_id TEXT NOT NULL,
1502                entity_type TEXT NOT NULL,
1503                entity_id BIGINT NOT NULL,
1504                deleted_by TEXT NOT NULL,
1505                vector_clock TEXT NOT NULL,
1506                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1507            );
1508            "#,
1509        )
1510        .expect("create graph schema");
1511
1512        extra(&conn);
1513
1514        KnowledgeGraphStore::from_connection(conn, "test-instance")
1515    }
1516
1517    #[test]
1518    fn insert_update_delete_node_flow() -> Result<()> {
1519        let store = setup_store();
1520        let props = json!({ "kind": "repository" });
1521        let node_id =
1522            store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1523
1524        let nodes = store.list_graph_nodes("session", None, Some(10))?;
1525        assert_eq!(nodes.len(), 1);
1526        assert_eq!(nodes[0].label, "SpecAI");
1527
1528        let updated_props = json!({ "kind": "repository", "stars": 42 });
1529        store.update_graph_node(node_id, &updated_props)?;
1530        let updated = store.get_graph_node(node_id)?.expect("node exists");
1531        assert_eq!(updated.properties["stars"], 42);
1532
1533        store.delete_graph_node(node_id)?;
1534        assert!(store.get_graph_node(node_id)?.is_none());
1535        Ok(())
1536    }
1537
1538    #[test]
1539    fn create_edges_and_find_paths() -> Result<()> {
1540        let store = setup_store();
1541        let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1542        let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1543        let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1544
1545        store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1546        store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1547
1548        let path = store
1549            .find_shortest_path("session", a, c, Some(5))?
1550            .expect("path exists");
1551        assert_eq!(path.nodes.len(), 3);
1552        assert_eq!(path.edges.len(), 2);
1553        assert_eq!(path.length, 2);
1554        assert_eq!(path.nodes.first().unwrap().label, "A");
1555        assert_eq!(path.nodes.last().unwrap().label, "C");
1556
1557        let edges = store.list_graph_edges("session", None, None)?;
1558        assert_eq!(edges.len(), 2);
1559
1560        Ok(())
1561    }
1562
1563    #[test]
1564    fn traverse_neighbors_respects_direction() -> Result<()> {
1565        let store = setup_store();
1566        let alpha =
1567            store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1568        let beta =
1569            store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1570        let gamma =
1571            store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1572
1573        store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1574        store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1575
1576        let outgoing =
1577            store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1578        assert_eq!(outgoing.len(), 2);
1579        assert!(outgoing.iter().any(|node| node.label == "Beta"));
1580        assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1581
1582        let incoming =
1583            store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1584        assert_eq!(incoming.len(), 2);
1585        assert!(incoming.iter().any(|node| node.label == "Beta"));
1586        assert!(incoming.iter().any(|node| node.label == "Alpha"));
1587
1588        Ok(())
1589    }
1590
1591    #[test]
1592    fn sync_config_round_trip() -> Result<()> {
1593        let store = setup_store();
1594
1595        let saved = store.graph_set_sync_config(
1596            "session",
1597            "default",
1598            true,
1599            Some("last_write_wins"),
1600            Some(120),
1601        )?;
1602        assert!(saved.sync_enabled);
1603        assert_eq!(
1604            saved.conflict_resolution_strategy.as_deref(),
1605            Some("last_write_wins")
1606        );
1607        assert_eq!(saved.sync_interval_seconds, Some(120));
1608
1609        let fetched = store.graph_get_sync_config("session", "default")?;
1610        assert!(fetched.sync_enabled);
1611        assert_eq!(
1612            fetched.conflict_resolution_strategy.as_deref(),
1613            Some("last_write_wins")
1614        );
1615        assert_eq!(fetched.sync_interval_seconds, Some(120));
1616
1617        let defaults = store.graph_get_sync_config("other_session", "missing")?;
1618        assert!(!defaults.sync_enabled);
1619        assert_eq!(
1620            defaults.conflict_resolution_strategy.as_deref(),
1621            Some("vector_clock")
1622        );
1623        assert_eq!(defaults.sync_interval_seconds, Some(60));
1624
1625        Ok(())
1626    }
1627
1628    #[test]
1629    fn sync_state_metadata_round_trip() -> Result<()> {
1630        let store = setup_store();
1631        store.graph_sync_state_update("instance", "session", "graph", r#"{"a":1}"#)?;
1632
1633        let state = store
1634            .graph_sync_state_get_metadata("instance", "session", "graph")?
1635            .expect("state exists");
1636
1637        assert_eq!(state.vector_clock, r#"{"a":1}"#);
1638        assert!(state.last_sync_at.is_some());
1639
1640        Ok(())
1641    }
1642
1643    #[test]
1644    fn graph_conflict_listing_filters() -> Result<()> {
1645        let store = setup_store();
1646        let vc_json = VectorClock::new().to_json()?;
1647
1648        let conflict_payload = json!({
1649            "graph_name": "g1",
1650            "local_version": { "id": 1 },
1651            "remote_version": { "id": 2 },
1652            "resolution": "Test"
1653        });
1654
1655        store.graph_changelog_append(
1656            "session_one",
1657            "inst1",
1658            "node",
1659            1,
1660            "conflict",
1661            &vc_json,
1662            Some(&conflict_payload.to_string()),
1663        )?;
1664
1665        let second_payload = json!({
1666            "graph_name": "g2",
1667            "local_version": { "id": 3 },
1668            "remote_version": { "id": 4 }
1669        });
1670
1671        store.graph_changelog_append(
1672            "session_two",
1673            "inst1",
1674            "edge",
1675            2,
1676            "conflict",
1677            &vc_json,
1678            Some(&second_payload.to_string()),
1679        )?;
1680
1681        let all_conflicts = store.graph_list_conflicts(None, 10)?;
1682        assert_eq!(all_conflicts.len(), 2);
1683
1684        let session_filtered = store.graph_list_conflicts(Some("session_one"), 10)?;
1685        assert_eq!(session_filtered.len(), 1);
1686        assert_eq!(session_filtered[0].session_id, "session_one");
1687        assert_eq!(session_filtered[0].entity_id, 1);
1688
1689        Ok(())
1690    }
1691}