Skip to main content

spec_ai/spec_ai_knowledge_graph/
graph_store.rs

1use crate::spec_ai_knowledge_graph::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::spec_ai_knowledge_graph::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{Connection, params};
6use serde_json::{Map, Value as JsonValue};
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12    conn: Arc<Mutex<Connection>>,
13    instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17    pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18        Self {
19            conn,
20            instance_id: instance_id.into(),
21        }
22    }
23
24    pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25        Self {
26            conn: Arc::new(Mutex::new(conn)),
27            instance_id: instance_id.into(),
28        }
29    }
30
31    pub fn instance_id(&self) -> &str {
32        &self.instance_id
33    }
34
35    fn conn(&self) -> MutexGuard<'_, Connection> {
36        self.conn.lock().expect("database connection poisoned")
37    }
38
39    // ---------- Graph Node Operations ----------
40
41    pub fn insert_graph_node(
42        &self,
43        session_id: &str,
44        node_type: NodeType,
45        label: &str,
46        properties: &JsonValue,
47        embedding_id: Option<i64>,
48    ) -> Result<i64> {
49        let sync_enabled = self
50            .graph_get_sync_enabled(session_id, "default")
51            .unwrap_or(false);
52
53        let mut vector_clock = VectorClock::new();
54        vector_clock.increment(&self.instance_id);
55        let vc_json = vector_clock.to_json()?;
56
57        let conn = self.conn();
58
59        let mut stmt = conn.prepare(
60            "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61                                     vector_clock, last_modified_by, sync_enabled)
62             VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63        )?;
64        let id: i64 = stmt.query_row(
65            params![
66                session_id,
67                node_type.as_str(),
68                label,
69                properties.to_string(),
70                embedding_id,
71                vc_json,
72                self.instance_id,
73                sync_enabled,
74            ],
75            |row| row.get(0),
76        )?;
77
78        if sync_enabled {
79            let node_data = serde_json::json!({
80                "id": id,
81                "session_id": session_id,
82                "node_type": node_type.as_str(),
83                "label": label,
84                "properties": properties,
85                "embedding_id": embedding_id,
86            });
87
88            self.graph_changelog_append(
89                session_id,
90                &self.instance_id,
91                "node",
92                id,
93                "create",
94                &vc_json,
95                Some(&node_data.to_string()),
96            )?;
97        }
98
99        Ok(id)
100    }
101
102    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103        let conn = self.conn();
104        let mut stmt = conn.prepare(
105            "SELECT id, session_id, node_type, label, properties, embedding_id,
106                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107             FROM graph_nodes WHERE id = ?",
108        )?;
109        let mut rows = stmt.query(params![node_id])?;
110        if let Some(row) = rows.next()? {
111            Ok(Some(Self::row_to_graph_node(row)?))
112        } else {
113            Ok(None)
114        }
115    }
116
117    pub fn list_graph_nodes(
118        &self,
119        session_id: &str,
120        node_type: Option<NodeType>,
121        limit: Option<i64>,
122    ) -> Result<Vec<GraphNode>> {
123        let conn = self.conn();
124
125        let nodes = if let Some(nt) = node_type {
126            let mut stmt = conn.prepare(
127                "SELECT id, session_id, node_type, label, properties, embedding_id,
128                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129                 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130                 ORDER BY id DESC LIMIT ?",
131            )?;
132            let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133            Self::collect_graph_nodes(query)?
134        } else {
135            let mut stmt = conn.prepare(
136                "SELECT id, session_id, node_type, label, properties, embedding_id,
137                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138                 FROM graph_nodes WHERE session_id = ?
139                 ORDER BY id DESC LIMIT ?",
140            )?;
141            let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142            Self::collect_graph_nodes(query)?
143        };
144
145        Ok(nodes)
146    }
147
148    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149        let conn = self.conn();
150        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152        Ok(count)
153    }
154
155    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156        let conn = self.conn();
157
158        let mut stmt = conn.prepare(
159            "SELECT session_id, node_type, label, vector_clock, sync_enabled
160             FROM graph_nodes WHERE id = ?",
161        )?;
162
163        let (session_id, node_type, label, current_vc_json, sync_enabled): (
164            String,
165            String,
166            String,
167            Option<String>,
168            bool,
169        ) = stmt.query_row(params![node_id], |row| {
170            Ok((
171                row.get(0)?,
172                row.get(1)?,
173                row.get(2)?,
174                row.get(3)?,
175                row.get(4).unwrap_or(false),
176            ))
177        })?;
178
179        let mut vector_clock = if let Some(vc_json) = current_vc_json {
180            VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181        } else {
182            VectorClock::new()
183        };
184        vector_clock.increment(&self.instance_id);
185        let vc_json = vector_clock.to_json()?;
186
187        conn.execute(
188            "UPDATE graph_nodes
189             SET properties = ?,
190                 vector_clock = ?,
191                 last_modified_by = ?,
192                 updated_at = CURRENT_TIMESTAMP
193             WHERE id = ?",
194            params![properties.to_string(), vc_json, self.instance_id, node_id],
195        )?;
196
197        if sync_enabled {
198            let node_data = serde_json::json!({
199                "id": node_id,
200                "session_id": session_id,
201                "node_type": node_type,
202                "label": label,
203                "properties": properties,
204            });
205
206            self.graph_changelog_append(
207                &session_id,
208                &self.instance_id,
209                "node",
210                node_id,
211                "update",
212                &vc_json,
213                Some(&node_data.to_string()),
214            )?;
215        }
216
217        Ok(())
218    }
219
220    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221        let conn = self.conn();
222
223        let mut stmt = conn.prepare(
224            "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225             FROM graph_nodes WHERE id = ?",
226        )?;
227
228        let result = stmt.query_row(params![node_id], |row| {
229            Ok((
230                row.get::<_, String>(0)?,
231                row.get::<_, String>(1)?,
232                row.get::<_, String>(2)?,
233                row.get::<_, String>(3)?,
234                row.get::<_, Option<String>>(4)?,
235                row.get::<_, bool>(5).unwrap_or(false),
236            ))
237        });
238
239        if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240            result
241        {
242            if sync_enabled {
243                let mut vector_clock = if let Some(vc_json) = current_vc_json {
244                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245                } else {
246                    VectorClock::new()
247                };
248                vector_clock.increment(&self.instance_id);
249                let vc_json = vector_clock.to_json()?;
250
251                conn.execute(
252                    "INSERT INTO graph_tombstones
253                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
254                     VALUES (?, ?, ?, ?, ?)",
255                    params![session_id, "node", node_id, self.instance_id, vc_json],
256                )?;
257
258                let node_data = serde_json::json!({
259                    "id": node_id,
260                    "session_id": session_id,
261                    "node_type": node_type,
262                    "label": label,
263                    "properties": properties,
264                });
265
266                self.graph_changelog_append(
267                    &session_id,
268                    &self.instance_id,
269                    "node",
270                    node_id,
271                    "delete",
272                    &vc_json,
273                    Some(&node_data.to_string()),
274                )?;
275            }
276        }
277
278        conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279        Ok(())
280    }
281
282    // ---------- Graph Edge Operations ----------
283
284    #[allow(clippy::too_many_arguments)]
285    pub fn insert_graph_edge(
286        &self,
287        session_id: &str,
288        source_id: i64,
289        target_id: i64,
290        edge_type: EdgeType,
291        predicate: Option<&str>,
292        properties: Option<&JsonValue>,
293        weight: f32,
294    ) -> Result<i64> {
295        let sync_enabled = self
296            .graph_get_sync_enabled(session_id, "default")
297            .unwrap_or(false);
298
299        let mut vector_clock = VectorClock::new();
300        vector_clock.increment(&self.instance_id);
301        let vc_json = vector_clock.to_json()?;
302
303        let conn = self.conn();
304
305        let mut stmt = conn.prepare(
306            "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
307                                     vector_clock, last_modified_by, sync_enabled)
308             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
309        )?;
310        let props_str = properties.map(|p| p.to_string());
311        let id: i64 = stmt.query_row(
312            params![
313                session_id,
314                source_id,
315                target_id,
316                edge_type.as_str(),
317                predicate,
318                props_str,
319                weight,
320                vc_json,
321                self.instance_id,
322                sync_enabled,
323            ],
324            |row| row.get(0),
325        )?;
326
327        if sync_enabled {
328            let edge_data = serde_json::json!({
329                "id": id,
330                "session_id": session_id,
331                "source_id": source_id,
332                "target_id": target_id,
333                "edge_type": edge_type.as_str(),
334                "predicate": predicate,
335                "properties": properties,
336                "weight": weight,
337            });
338
339            self.graph_changelog_append(
340                session_id,
341                &self.instance_id,
342                "edge",
343                id,
344                "insert",
345                &vc_json,
346                Some(&edge_data.to_string()),
347            )?;
348        }
349
350        Ok(id)
351    }
352
353    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
354        let conn = self.conn();
355        let mut stmt = conn.prepare(
356            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
357                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
358             FROM graph_edges WHERE id = ?",
359        )?;
360        let mut rows = stmt.query(params![edge_id])?;
361        if let Some(row) = rows.next()? {
362            Ok(Some(Self::row_to_graph_edge(row)?))
363        } else {
364            Ok(None)
365        }
366    }
367
368    pub fn list_graph_edges(
369        &self,
370        session_id: &str,
371        source_id: Option<i64>,
372        target_id: Option<i64>,
373    ) -> Result<Vec<GraphEdge>> {
374        let conn = self.conn();
375
376        let edges = match (source_id, target_id) {
377            (Some(src), Some(tgt)) => {
378                let mut stmt = conn.prepare(
379                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
380                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
381                     FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
382                )?;
383                let query = stmt.query(params![session_id, src, tgt])?;
384                Self::collect_graph_edges(query)?
385            }
386            (Some(src), None) => {
387                let mut stmt = conn.prepare(
388                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
389                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
390                     FROM graph_edges WHERE session_id = ? AND source_id = ?",
391                )?;
392                let query = stmt.query(params![session_id, src])?;
393                Self::collect_graph_edges(query)?
394            }
395            (None, Some(tgt)) => {
396                let mut stmt = conn.prepare(
397                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
398                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
399                     FROM graph_edges WHERE session_id = ? AND target_id = ?",
400                )?;
401                let query = stmt.query(params![session_id, tgt])?;
402                Self::collect_graph_edges(query)?
403            }
404            (None, None) => {
405                let mut stmt = conn.prepare(
406                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
407                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
408                     FROM graph_edges WHERE session_id = ?",
409                )?;
410                let query = stmt.query(params![session_id])?;
411                Self::collect_graph_edges(query)?
412            }
413        };
414
415        Ok(edges)
416    }
417
418    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
419        let conn = self.conn();
420        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
421        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
422        Ok(count)
423    }
424
425    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
426        let conn = self.conn();
427
428        let mut stmt = conn.prepare(
429            "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
430                    vector_clock, sync_enabled
431             FROM graph_edges WHERE id = ?",
432        )?;
433
434        let result = stmt.query_row(params![edge_id], |row| {
435            Ok((
436                row.get::<_, String>(0)?,
437                row.get::<_, i64>(1)?,
438                row.get::<_, i64>(2)?,
439                row.get::<_, String>(3)?,
440                row.get::<_, Option<String>>(4)?,
441                row.get::<_, Option<String>>(5)?,
442                row.get::<_, f32>(6)?,
443                row.get::<_, Option<String>>(7)?,
444                row.get::<_, bool>(8).unwrap_or(false),
445            ))
446        });
447
448        if let Ok((
449            session_id,
450            source_id,
451            target_id,
452            edge_type,
453            predicate,
454            properties,
455            weight,
456            current_vc_json,
457            sync_enabled,
458        )) = result
459        {
460            if sync_enabled {
461                let mut vector_clock = if let Some(vc_json) = current_vc_json {
462                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
463                } else {
464                    VectorClock::new()
465                };
466                vector_clock.increment(&self.instance_id);
467                let vc_json = vector_clock.to_json()?;
468
469                conn.execute(
470                    "INSERT INTO graph_tombstones
471                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
472                     VALUES (?, ?, ?, ?, ?)",
473                    params![session_id, "edge", edge_id, self.instance_id, vc_json],
474                )?;
475
476                let edge_data = serde_json::json!({
477                    "id": edge_id,
478                    "session_id": session_id,
479                    "source_id": source_id,
480                    "target_id": target_id,
481                    "edge_type": edge_type,
482                    "predicate": predicate,
483                    "properties": properties,
484                    "weight": weight,
485                });
486
487                self.graph_changelog_append(
488                    &session_id,
489                    &self.instance_id,
490                    "edge",
491                    edge_id,
492                    "delete",
493                    &vc_json,
494                    Some(&edge_data.to_string()),
495                )?;
496            }
497        }
498
499        conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
500        Ok(())
501    }
502
503    // ---------- Graph Traversal Operations ----------
504
505    pub fn find_shortest_path(
506        &self,
507        session_id: &str,
508        source_id: i64,
509        target_id: i64,
510        max_hops: Option<usize>,
511    ) -> Result<Option<GraphPath>> {
512        let max_depth = max_hops.unwrap_or(10);
513
514        let mut visited = HashSet::new();
515        let mut queue = VecDeque::new();
516        let mut parent_map = HashMap::new();
517
518        queue.push_back((source_id, 0));
519        visited.insert(source_id);
520
521        while let Some((current_id, depth)) = queue.pop_front() {
522            if current_id == target_id {
523                let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
524                return Ok(Some(path));
525            }
526
527            if depth >= max_depth {
528                continue;
529            }
530
531            let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
532            for edge in edges {
533                let target = edge.target_id;
534                if !visited.contains(&target) {
535                    visited.insert(target);
536                    parent_map.insert(target, (current_id, edge));
537                    queue.push_back((target, depth + 1));
538                }
539            }
540        }
541
542        Ok(None)
543    }
544
545    pub fn traverse_neighbors(
546        &self,
547        session_id: &str,
548        node_id: i64,
549        direction: TraversalDirection,
550        depth: usize,
551    ) -> Result<Vec<GraphNode>> {
552        if depth == 0 {
553            return Ok(vec![]);
554        }
555
556        let mut visited = HashSet::new();
557        let mut result = Vec::new();
558        let mut queue = VecDeque::new();
559
560        queue.push_back((node_id, 0));
561        visited.insert(node_id);
562
563        while let Some((current_id, current_depth)) = queue.pop_front() {
564            if current_depth > 0 {
565                if let Some(node) = self.get_graph_node(current_id)? {
566                    result.push(node);
567                }
568            }
569
570            if current_depth >= depth {
571                continue;
572            }
573
574            let edges = match direction {
575                TraversalDirection::Outgoing => {
576                    self.list_graph_edges(session_id, Some(current_id), None)?
577                }
578                TraversalDirection::Incoming => {
579                    self.list_graph_edges(session_id, None, Some(current_id))?
580                }
581                TraversalDirection::Both => {
582                    let mut out_edges =
583                        self.list_graph_edges(session_id, Some(current_id), None)?;
584                    let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
585                    out_edges.extend(in_edges);
586                    out_edges
587                }
588            };
589
590            for edge in edges {
591                let next_id = match direction {
592                    TraversalDirection::Outgoing => edge.target_id,
593                    TraversalDirection::Incoming => edge.source_id,
594                    TraversalDirection::Both => {
595                        if edge.source_id == current_id {
596                            edge.target_id
597                        } else {
598                            edge.source_id
599                        }
600                    }
601                };
602
603                if !visited.contains(&next_id) {
604                    visited.insert(next_id);
605                    queue.push_back((next_id, current_depth + 1));
606                }
607            }
608        }
609
610        Ok(result)
611    }
612
613    fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
614        let id: i64 = row.get(0)?;
615        let session_id: String = row.get(1)?;
616        let node_type: String = row.get(2)?;
617        let label: String = row.get(3)?;
618        let properties: String = row.get(4)?;
619        let embedding_id: Option<i64> = row.get(5)?;
620        let created_at: String = row.get(6)?;
621        let updated_at: String = row.get(7)?;
622
623        Ok(GraphNode {
624            id,
625            session_id,
626            node_type: NodeType::from_str(&node_type),
627            label,
628            properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
629            embedding_id,
630            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
631            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
632        })
633    }
634
635    fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
636        let id: i64 = row.get(0)?;
637        let session_id: String = row.get(1)?;
638        let source_id: i64 = row.get(2)?;
639        let target_id: i64 = row.get(3)?;
640        let edge_type: String = row.get(4)?;
641        let predicate: Option<String> = row.get(5)?;
642        let properties: Option<String> = row.get(6)?;
643        let weight: f32 = row.get(7)?;
644        let temporal_start: Option<String> = row.get(8)?;
645        let temporal_end: Option<String> = row.get(9)?;
646        let created_at: String = row.get(10)?;
647
648        Ok(GraphEdge {
649            id,
650            session_id,
651            source_id,
652            target_id,
653            edge_type: EdgeType::from_str(&edge_type),
654            predicate,
655            properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
656            weight,
657            temporal_start: temporal_start.and_then(|s| s.parse().ok()),
658            temporal_end: temporal_end.and_then(|s| s.parse().ok()),
659            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
660        })
661    }
662
663    fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
664        let mut nodes = Vec::new();
665        while let Some(row) = rows.next()? {
666            nodes.push(Self::row_to_graph_node(row)?);
667        }
668        Ok(nodes)
669    }
670
671    fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
672        let mut edges = Vec::new();
673        while let Some(row) = rows.next()? {
674            edges.push(Self::row_to_graph_edge(row)?);
675        }
676        Ok(edges)
677    }
678
679    fn reconstruct_path(
680        &self,
681        parent_map: &HashMap<i64, (i64, GraphEdge)>,
682        source_id: i64,
683        target_id: i64,
684    ) -> Result<GraphPath> {
685        let mut path_edges = Vec::new();
686        let mut path_nodes = Vec::new();
687        let mut current = target_id;
688        let mut total_weight = 0.0;
689
690        while current != source_id {
691            if let Some((parent, edge)) = parent_map.get(&current) {
692                path_edges.push(edge.clone());
693                total_weight += edge.weight;
694                current = *parent;
695            } else {
696                break;
697            }
698        }
699
700        path_edges.reverse();
701
702        if let Some(node) = self.get_graph_node(source_id)? {
703            path_nodes.push(node);
704        }
705        for edge in &path_edges {
706            if let Some(node) = self.get_graph_node(edge.target_id)? {
707                path_nodes.push(node);
708            }
709        }
710
711        Ok(GraphPath {
712            length: path_edges.len(),
713            weight: total_weight,
714            nodes: path_nodes,
715            edges: path_edges,
716        })
717    }
718
719    // ===== Graph Synchronization Methods =====
720
721    #[allow(clippy::too_many_arguments)]
722    pub fn graph_changelog_append(
723        &self,
724        session_id: &str,
725        instance_id: &str,
726        entity_type: &str,
727        entity_id: i64,
728        operation: &str,
729        vector_clock: &str,
730        data: Option<&str>,
731    ) -> Result<i64> {
732        let conn = self.conn();
733        let mut stmt = conn.prepare(
734            "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
735             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
736        )?;
737        let id: i64 = stmt.query_row(
738            params![
739                session_id,
740                instance_id,
741                entity_type,
742                entity_id,
743                operation,
744                vector_clock,
745                data
746            ],
747            |row| row.get(0),
748        )?;
749        Ok(id)
750    }
751
752    pub fn graph_changelog_get_since(
753        &self,
754        session_id: &str,
755        since_timestamp: &str,
756    ) -> Result<Vec<ChangelogEntry>> {
757        let conn = self.conn();
758        let mut stmt = conn.prepare(
759            "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
760             FROM graph_changelog
761             WHERE session_id = ? AND created_at > ?
762             ORDER BY created_at ASC",
763        )?;
764        let mut rows = stmt.query(params![session_id, since_timestamp])?;
765        let mut entries = Vec::new();
766        while let Some(row) = rows.next()? {
767            entries.push(ChangelogEntry::from_row(row)?);
768        }
769        Ok(entries)
770    }
771
772    pub fn graph_list_conflicts(
773        &self,
774        session_id: Option<&str>,
775        limit: usize,
776    ) -> Result<Vec<ChangelogEntry>> {
777        let conn = self.conn();
778        let mut entries = Vec::new();
779
780        if let Some(sid) = session_id {
781            let mut stmt = conn.prepare(
782                "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
783                 FROM graph_changelog
784                 WHERE operation = 'conflict' AND session_id = ?
785                 ORDER BY created_at DESC
786                 LIMIT ?",
787            )?;
788            let mut rows = stmt.query(params![sid, limit as i64])?;
789            while let Some(row) = rows.next()? {
790                entries.push(ChangelogEntry::from_row(row)?);
791            }
792        } else {
793            let mut stmt = conn.prepare(
794                "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
795                 FROM graph_changelog
796                 WHERE operation = 'conflict'
797                 ORDER BY created_at DESC
798                 LIMIT ?",
799            )?;
800            let mut rows = stmt.query(params![limit as i64])?;
801            while let Some(row) = rows.next()? {
802                entries.push(ChangelogEntry::from_row(row)?);
803            }
804        }
805
806        Ok(entries)
807    }
808
809    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
810        let conn = self.conn();
811        let cutoff = Utc::now() - Duration::days(days_to_keep);
812        let cutoff_str = cutoff.to_rfc3339();
813        let deleted = conn.execute(
814            "DELETE FROM graph_changelog WHERE created_at < ?",
815            params![cutoff_str],
816        )?;
817        Ok(deleted)
818    }
819
820    pub fn graph_sync_state_get_metadata(
821        &self,
822        instance_id: &str,
823        session_id: &str,
824        graph_name: &str,
825    ) -> Result<Option<SyncStateRecord>> {
826        let conn = self.conn();
827        let result: Result<SyncStateRecord, _> = conn.query_row(
828            "SELECT vector_clock, CAST(last_sync_at AS TEXT)
829             FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
830            params![instance_id, session_id, graph_name],
831            |row| {
832                Ok(SyncStateRecord {
833                    vector_clock: row.get(0)?,
834                    last_sync_at: row.get(1).ok(),
835                })
836            },
837        );
838
839        match result {
840            Ok(record) => Ok(Some(record)),
841            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
842            Err(e) => Err(e.into()),
843        }
844    }
845
846    pub fn graph_sync_state_get(
847        &self,
848        instance_id: &str,
849        session_id: &str,
850        graph_name: &str,
851    ) -> Result<Option<String>> {
852        Ok(self
853            .graph_sync_state_get_metadata(instance_id, session_id, graph_name)?
854            .map(|r| r.vector_clock))
855    }
856
857    pub fn graph_sync_state_update(
858        &self,
859        instance_id: &str,
860        session_id: &str,
861        graph_name: &str,
862        vector_clock: &str,
863    ) -> Result<()> {
864        let conn = self.conn();
865        conn.execute(
866            "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock, last_sync_at)
867             VALUES (?, ?, ?, ?, now())
868             ON CONFLICT (instance_id, session_id, graph_name)
869             DO UPDATE SET vector_clock = EXCLUDED.vector_clock, last_sync_at = now()",
870            params![instance_id, session_id, graph_name, vector_clock],
871        )?;
872        Ok(())
873    }
874
875    pub fn graph_set_sync_enabled(
876        &self,
877        session_id: &str,
878        graph_name: &str,
879        enabled: bool,
880    ) -> Result<()> {
881        let conn = self.conn();
882        // Upsert: insert if not exists, update if exists
883        conn.execute(
884            "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
885             VALUES (?, ?, ?)
886             ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
887            params![session_id, graph_name, enabled],
888        )?;
889        Ok(())
890    }
891
892    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
893        let conn = self.conn();
894        let result: Result<bool, _> = conn.query_row(
895            "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
896            params![session_id, graph_name],
897            |row| row.get(0),
898        );
899        match result {
900            Ok(enabled) => Ok(enabled),
901            Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
902            Err(e) => Err(e.into()),
903        }
904    }
905
906    pub fn graph_set_sync_config(
907        &self,
908        session_id: &str,
909        graph_name: &str,
910        sync_enabled: bool,
911        conflict_resolution_strategy: Option<&str>,
912        sync_interval_seconds: Option<u64>,
913    ) -> Result<GraphSyncConfig> {
914        let conn = self.conn();
915
916        // Load existing config to preserve unrelated keys
917        let existing_config_value: JsonValue = conn
918            .query_row(
919                "SELECT config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
920                params![session_id, graph_name],
921                |row| row.get::<_, Option<String>>(0),
922            )
923            .unwrap_or(None)
924            .as_deref()
925            .and_then(|s| serde_json::from_str(s).ok())
926            .unwrap_or_else(|| JsonValue::Object(Map::new()));
927
928        let mut root_obj = existing_config_value
929            .as_object()
930            .cloned()
931            .unwrap_or_default();
932        let mut sync_obj = root_obj
933            .get("sync")
934            .and_then(|v| v.as_object())
935            .cloned()
936            .unwrap_or_default();
937
938        let final_strategy = conflict_resolution_strategy
939            .map(|s| s.to_string())
940            .or_else(|| {
941                sync_obj
942                    .get("conflict_resolution_strategy")
943                    .and_then(|v| v.as_str().map(|s| s.to_string()))
944            })
945            .or_else(|| Some("vector_clock".to_string()));
946
947        let final_interval = sync_interval_seconds
948            .or_else(|| {
949                sync_obj
950                    .get("sync_interval_seconds")
951                    .and_then(|v| v.as_u64())
952            })
953            .or(Some(60));
954
955        if let Some(strategy) = final_strategy.clone() {
956            sync_obj.insert(
957                "conflict_resolution_strategy".to_string(),
958                JsonValue::String(strategy),
959            );
960        }
961        if let Some(interval) = final_interval {
962            sync_obj.insert(
963                "sync_interval_seconds".to_string(),
964                JsonValue::from(interval),
965            );
966        }
967
968        root_obj.insert("sync".to_string(), JsonValue::Object(sync_obj));
969        let merged_config = JsonValue::Object(root_obj).to_string();
970
971        conn.execute(
972            "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled, config, updated_at)
973             VALUES (?, ?, ?, ?, now())
974             ON CONFLICT (session_id, graph_name)
975             DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled,
976                           config = EXCLUDED.config,
977                           updated_at = now()",
978            params![session_id, graph_name, sync_enabled, merged_config],
979        )?;
980
981        Ok(GraphSyncConfig {
982            sync_enabled,
983            conflict_resolution_strategy: final_strategy,
984            sync_interval_seconds: final_interval,
985        })
986    }
987
988    pub fn graph_get_sync_config(
989        &self,
990        session_id: &str,
991        graph_name: &str,
992    ) -> Result<GraphSyncConfig> {
993        let conn = self.conn();
994        let result: Result<(bool, Option<String>), _> = conn.query_row(
995            "SELECT sync_enabled, config FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
996            params![session_id, graph_name],
997            |row| Ok((row.get(0)?, row.get(1)?)),
998        );
999
1000        match result {
1001            Ok((sync_enabled, config_json)) => {
1002                let config_value: JsonValue = config_json
1003                    .as_deref()
1004                    .and_then(|s| serde_json::from_str(s).ok())
1005                    .unwrap_or_else(|| JsonValue::Object(Map::new()));
1006                let sync_obj = config_value
1007                    .get("sync")
1008                    .and_then(|v| v.as_object())
1009                    .cloned()
1010                    .unwrap_or_default();
1011
1012                Ok(GraphSyncConfig {
1013                    sync_enabled,
1014                    conflict_resolution_strategy: sync_obj
1015                        .get("conflict_resolution_strategy")
1016                        .and_then(|v| v.as_str())
1017                        .map(|s| s.to_string())
1018                        .or_else(|| Some("vector_clock".to_string())),
1019                    sync_interval_seconds: sync_obj
1020                        .get("sync_interval_seconds")
1021                        .and_then(|v| v.as_u64())
1022                        .or(Some(60)),
1023                })
1024            }
1025            Err(duckdb::Error::QueryReturnedNoRows) => Ok(GraphSyncConfig::default()),
1026            Err(e) => Err(e.into()),
1027        }
1028    }
1029
1030    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1031        let conn = self.conn();
1032        let mut stmt = conn.prepare(
1033            "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1034             UNION
1035             SELECT DISTINCT 'default' as graph_name
1036             FROM graph_nodes WHERE session_id = ?
1037             ORDER BY graph_name",
1038        )?;
1039
1040        let mut graphs = Vec::new();
1041        let mut rows = stmt.query(params![session_id, session_id])?;
1042        while let Some(row) = rows.next()? {
1043            let graph_name: String = row.get(0)?;
1044            graphs.push(graph_name);
1045        }
1046
1047        if graphs.is_empty() {
1048            let node_count: i64 = conn.query_row(
1049                "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1050                params![session_id],
1051                |row| row.get(0),
1052            )?;
1053            if node_count > 0 {
1054                graphs.push("default".to_string());
1055            }
1056        }
1057
1058        Ok(graphs)
1059    }
1060
1061    /// List all sync-enabled graphs across all sessions
1062    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
1063        let conn = self.conn();
1064        let mut stmt = conn.prepare(
1065            "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
1066        )?;
1067
1068        let mut results = Vec::new();
1069        let mut rows = stmt.query(params![])?;
1070        while let Some(row) = rows.next()? {
1071            let session_id: String = row.get(0)?;
1072            let graph_name: String = row.get(1)?;
1073            results.push((session_id, graph_name));
1074        }
1075
1076        Ok(results)
1077    }
1078
1079    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1080        let conn = self.conn();
1081        let result: Result<SyncedNodeRecord, _> = conn.query_row(
1082            "SELECT id, session_id, node_type, label, properties, embedding_id,
1083                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1084                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1085             FROM graph_nodes WHERE id = ?",
1086            params![node_id],
1087            SyncedNodeRecord::from_row,
1088        );
1089        match result {
1090            Ok(node) => Ok(Some(node)),
1091            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1092            Err(e) => Err(e.into()),
1093        }
1094    }
1095
1096    pub fn graph_list_nodes_with_sync(
1097        &self,
1098        session_id: &str,
1099        sync_enabled_only: bool,
1100        include_deleted: bool,
1101    ) -> Result<Vec<SyncedNodeRecord>> {
1102        let conn = self.conn();
1103        let mut query = String::from(
1104            "SELECT id, session_id, node_type, label, properties, embedding_id,
1105                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1106                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1107             FROM graph_nodes WHERE session_id = ?",
1108        );
1109
1110        if sync_enabled_only {
1111            query.push_str(" AND sync_enabled = TRUE");
1112        }
1113        if !include_deleted {
1114            query.push_str(" AND is_deleted = FALSE");
1115        }
1116        query.push_str(" ORDER BY created_at ASC");
1117
1118        let mut stmt = conn.prepare(&query)?;
1119        let mut rows = stmt.query(params![session_id])?;
1120        let mut nodes = Vec::new();
1121        while let Some(row) = rows.next()? {
1122            nodes.push(SyncedNodeRecord::from_row(row)?);
1123        }
1124        Ok(nodes)
1125    }
1126
1127    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1128        let conn = self.conn();
1129        let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1130            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1131                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1132                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1133             FROM graph_edges WHERE id = ?",
1134            params![edge_id],
1135            SyncedEdgeRecord::from_row,
1136        );
1137        match result {
1138            Ok(edge) => Ok(Some(edge)),
1139            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1140            Err(e) => Err(e.into()),
1141        }
1142    }
1143
1144    pub fn graph_list_edges_with_sync(
1145        &self,
1146        session_id: &str,
1147        sync_enabled_only: bool,
1148        include_deleted: bool,
1149    ) -> Result<Vec<SyncedEdgeRecord>> {
1150        let conn = self.conn();
1151        let mut query = String::from(
1152            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1153                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1154                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1155             FROM graph_edges WHERE session_id = ?",
1156        );
1157
1158        if sync_enabled_only {
1159            query.push_str(" AND sync_enabled = TRUE");
1160        }
1161        if !include_deleted {
1162            query.push_str(" AND is_deleted = FALSE");
1163        }
1164        query.push_str(" ORDER BY created_at ASC");
1165
1166        let mut stmt = conn.prepare(&query)?;
1167        let mut rows = stmt.query(params![session_id])?;
1168        let mut edges = Vec::new();
1169        while let Some(row) = rows.next()? {
1170            edges.push(SyncedEdgeRecord::from_row(row)?);
1171        }
1172        Ok(edges)
1173    }
1174
1175    pub fn graph_update_node_sync_metadata(
1176        &self,
1177        node_id: i64,
1178        vector_clock: &str,
1179        last_modified_by: &str,
1180        sync_enabled: bool,
1181    ) -> Result<()> {
1182        let conn = self.conn();
1183        conn.execute(
1184            "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1185             WHERE id = ?",
1186            params![vector_clock, last_modified_by, sync_enabled, node_id],
1187        )?;
1188        Ok(())
1189    }
1190
1191    pub fn graph_update_edge_sync_metadata(
1192        &self,
1193        edge_id: i64,
1194        vector_clock: &str,
1195        last_modified_by: &str,
1196        sync_enabled: bool,
1197    ) -> Result<()> {
1198        let conn = self.conn();
1199        conn.execute(
1200            "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1201             WHERE id = ?",
1202            params![vector_clock, last_modified_by, sync_enabled, edge_id],
1203        )?;
1204        Ok(())
1205    }
1206
1207    pub fn graph_mark_node_deleted(
1208        &self,
1209        node_id: i64,
1210        vector_clock: &str,
1211        deleted_by: &str,
1212    ) -> Result<()> {
1213        let conn = self.conn();
1214        conn.execute(
1215            "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1216             WHERE id = ?",
1217            params![vector_clock, deleted_by, node_id],
1218        )?;
1219        Ok(())
1220    }
1221
1222    pub fn graph_mark_edge_deleted(
1223        &self,
1224        edge_id: i64,
1225        vector_clock: &str,
1226        deleted_by: &str,
1227    ) -> Result<()> {
1228        let conn = self.conn();
1229        conn.execute(
1230            "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1231             WHERE id = ?",
1232            params![vector_clock, deleted_by, edge_id],
1233        )?;
1234        Ok(())
1235    }
1236}
1237
1238#[derive(Debug, Clone)]
1239pub struct SyncStateRecord {
1240    pub vector_clock: String,
1241    pub last_sync_at: Option<String>,
1242}
1243
1244#[derive(Debug, Clone)]
1245pub struct GraphSyncConfig {
1246    pub sync_enabled: bool,
1247    pub conflict_resolution_strategy: Option<String>,
1248    pub sync_interval_seconds: Option<u64>,
1249}
1250
1251impl Default for GraphSyncConfig {
1252    fn default() -> Self {
1253        Self {
1254            sync_enabled: false,
1255            conflict_resolution_strategy: Some("vector_clock".to_string()),
1256            sync_interval_seconds: Some(60),
1257        }
1258    }
1259}
1260
1261#[derive(Debug, Clone)]
1262pub struct ChangelogEntry {
1263    pub id: i64,
1264    pub session_id: String,
1265    pub instance_id: String,
1266    pub entity_type: String,
1267    pub entity_id: i64,
1268    pub operation: String,
1269    pub vector_clock: String,
1270    pub data: Option<String>,
1271    pub created_at: DateTime<Utc>,
1272}
1273
1274impl ChangelogEntry {
1275    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1276        let id: i64 = row.get(0)?;
1277        let session_id: String = row.get(1)?;
1278        let instance_id: String = row.get(2)?;
1279        let entity_type: String = row.get(3)?;
1280        let entity_id: i64 = row.get(4)?;
1281        let operation: String = row.get(5)?;
1282        let vector_clock: String = row.get(6)?;
1283        let data: Option<String> = row.get(7)?;
1284        let created_at_str: String = row.get(8)?;
1285
1286        Ok(ChangelogEntry {
1287            id,
1288            session_id,
1289            instance_id,
1290            entity_type,
1291            entity_id,
1292            operation,
1293            vector_clock,
1294            data,
1295            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1296        })
1297    }
1298}
1299
1300#[derive(Debug, Clone)]
1301pub struct SyncedNodeRecord {
1302    pub id: i64,
1303    pub session_id: String,
1304    pub node_type: String,
1305    pub label: String,
1306    pub properties: JsonValue,
1307    pub embedding_id: Option<i64>,
1308    pub created_at: DateTime<Utc>,
1309    pub updated_at: DateTime<Utc>,
1310    pub vector_clock: String,
1311    pub last_modified_by: Option<String>,
1312    pub is_deleted: bool,
1313    pub sync_enabled: bool,
1314}
1315
1316impl SyncedNodeRecord {
1317    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1318        let id: i64 = row.get(0)?;
1319        let session_id: String = row.get(1)?;
1320        let node_type: String = row.get(2)?;
1321        let label: String = row.get(3)?;
1322        let properties_str: String = row.get(4)?;
1323        let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1324            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1325        })?;
1326        let embedding_id: Option<i64> = row.get(5)?;
1327        let created_at_str: String = row.get(6)?;
1328        let updated_at_str: String = row.get(7)?;
1329        let vector_clock: String = row.get(8)?;
1330        let last_modified_by: Option<String> = row.get(9)?;
1331        let is_deleted: bool = row.get(10)?;
1332        let sync_enabled: bool = row.get(11)?;
1333
1334        Ok(SyncedNodeRecord {
1335            id,
1336            session_id,
1337            node_type,
1338            label,
1339            properties,
1340            embedding_id,
1341            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1342            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1343            vector_clock,
1344            last_modified_by,
1345            is_deleted,
1346            sync_enabled,
1347        })
1348    }
1349}
1350
1351#[derive(Debug, Clone)]
1352pub struct SyncedEdgeRecord {
1353    pub id: i64,
1354    pub session_id: String,
1355    pub source_id: i64,
1356    pub target_id: i64,
1357    pub edge_type: String,
1358    pub predicate: Option<String>,
1359    pub properties: Option<JsonValue>,
1360    pub weight: f32,
1361    pub temporal_start: Option<DateTime<Utc>>,
1362    pub temporal_end: Option<DateTime<Utc>>,
1363    pub created_at: DateTime<Utc>,
1364    pub vector_clock: String,
1365    pub last_modified_by: Option<String>,
1366    pub is_deleted: bool,
1367    pub sync_enabled: bool,
1368}
1369
1370impl SyncedEdgeRecord {
1371    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1372        let id: i64 = row.get(0)?;
1373        let session_id: String = row.get(1)?;
1374        let source_id: i64 = row.get(2)?;
1375        let target_id: i64 = row.get(3)?;
1376        let edge_type: String = row.get(4)?;
1377        let predicate: Option<String> = row.get(5)?;
1378        let properties_str: Option<String> = row.get(6)?;
1379        let properties: Option<JsonValue> = properties_str
1380            .as_ref()
1381            .and_then(|s| serde_json::from_str(s).ok());
1382        let weight: f32 = row.get(7)?;
1383        let temporal_start_str: Option<String> = row.get(8)?;
1384        let temporal_end_str: Option<String> = row.get(9)?;
1385        let created_at_str: String = row.get(10)?;
1386        let vector_clock: String = row.get(11)?;
1387        let last_modified_by: Option<String> = row.get(12)?;
1388        let is_deleted: bool = row.get(13)?;
1389        let sync_enabled: bool = row.get(14)?;
1390
1391        Ok(SyncedEdgeRecord {
1392            id,
1393            session_id,
1394            source_id,
1395            target_id,
1396            edge_type,
1397            predicate,
1398            properties,
1399            weight,
1400            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1401            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1402            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1403            vector_clock,
1404            last_modified_by,
1405            is_deleted,
1406            sync_enabled,
1407        })
1408    }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use super::*;
1414    use anyhow::Result;
1415    use serde_json::json;
1416
1417    fn setup_store() -> KnowledgeGraphStore {
1418        setup_store_with(|_| {})
1419    }
1420
1421    fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1422    where
1423        F: FnOnce(&Connection),
1424    {
1425        let conn = Connection::open_in_memory().expect("open in-memory database");
1426        conn.execute_batch(
1427            r#"
1428            CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1429            CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1430            CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1431            CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1432            CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1433
1434            CREATE TABLE graph_nodes (
1435                id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1436                session_id TEXT NOT NULL,
1437                node_type TEXT NOT NULL,
1438                label TEXT NOT NULL,
1439                properties TEXT NOT NULL,
1440                embedding_id BIGINT,
1441                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1442                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1443                vector_clock TEXT DEFAULT '{}',
1444                last_modified_by TEXT,
1445                is_deleted BOOLEAN DEFAULT FALSE,
1446                sync_enabled BOOLEAN DEFAULT FALSE
1447            );
1448
1449            CREATE TABLE graph_edges (
1450                id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1451                session_id TEXT NOT NULL,
1452                source_id BIGINT NOT NULL,
1453                target_id BIGINT NOT NULL,
1454                edge_type TEXT NOT NULL,
1455                predicate TEXT,
1456                properties TEXT,
1457                weight REAL DEFAULT 1.0,
1458                temporal_start TIMESTAMP,
1459                temporal_end TIMESTAMP,
1460                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1461                vector_clock TEXT DEFAULT '{}',
1462                last_modified_by TEXT,
1463                is_deleted BOOLEAN DEFAULT FALSE,
1464                sync_enabled BOOLEAN DEFAULT FALSE
1465            );
1466
1467            CREATE TABLE graph_metadata (
1468                id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1469                session_id TEXT NOT NULL,
1470                graph_name TEXT NOT NULL,
1471                is_created BOOLEAN DEFAULT FALSE,
1472                schema_version INTEGER DEFAULT 1,
1473                config TEXT,
1474                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1475                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1476                sync_enabled BOOLEAN DEFAULT FALSE,
1477                UNIQUE(session_id, graph_name)
1478            );
1479
1480            CREATE TABLE graph_changelog (
1481                id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1482                session_id TEXT NOT NULL,
1483                instance_id TEXT NOT NULL,
1484                entity_type TEXT NOT NULL,
1485                entity_id BIGINT NOT NULL,
1486                operation TEXT NOT NULL,
1487                vector_clock TEXT NOT NULL,
1488                data TEXT,
1489                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1490            );
1491
1492            CREATE TABLE graph_sync_state (
1493                instance_id TEXT NOT NULL,
1494                session_id TEXT NOT NULL,
1495                graph_name TEXT NOT NULL,
1496                vector_clock TEXT NOT NULL,
1497                last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1498                PRIMARY KEY (instance_id, session_id, graph_name)
1499            );
1500
1501            CREATE TABLE graph_tombstones (
1502                id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1503                session_id TEXT NOT NULL,
1504                entity_type TEXT NOT NULL,
1505                entity_id BIGINT NOT NULL,
1506                deleted_by TEXT NOT NULL,
1507                vector_clock TEXT NOT NULL,
1508                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1509            );
1510            "#,
1511        )
1512        .expect("create graph schema");
1513
1514        extra(&conn);
1515
1516        KnowledgeGraphStore::from_connection(conn, "test-instance")
1517    }
1518
1519    #[test]
1520    fn insert_update_delete_node_flow() -> Result<()> {
1521        let store = setup_store();
1522        let props = json!({ "kind": "repository" });
1523        let node_id =
1524            store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1525
1526        let nodes = store.list_graph_nodes("session", None, Some(10))?;
1527        assert_eq!(nodes.len(), 1);
1528        assert_eq!(nodes[0].label, "SpecAI");
1529
1530        let updated_props = json!({ "kind": "repository", "stars": 42 });
1531        store.update_graph_node(node_id, &updated_props)?;
1532        let updated = store.get_graph_node(node_id)?.expect("node exists");
1533        assert_eq!(updated.properties["stars"], 42);
1534
1535        store.delete_graph_node(node_id)?;
1536        assert!(store.get_graph_node(node_id)?.is_none());
1537        Ok(())
1538    }
1539
1540    #[test]
1541    fn create_edges_and_find_paths() -> Result<()> {
1542        let store = setup_store();
1543        let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1544        let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1545        let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1546
1547        store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1548        store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1549
1550        let path = store
1551            .find_shortest_path("session", a, c, Some(5))?
1552            .expect("path exists");
1553        assert_eq!(path.nodes.len(), 3);
1554        assert_eq!(path.edges.len(), 2);
1555        assert_eq!(path.length, 2);
1556        assert_eq!(path.nodes.first().unwrap().label, "A");
1557        assert_eq!(path.nodes.last().unwrap().label, "C");
1558
1559        let edges = store.list_graph_edges("session", None, None)?;
1560        assert_eq!(edges.len(), 2);
1561
1562        Ok(())
1563    }
1564
1565    #[test]
1566    fn traverse_neighbors_respects_direction() -> Result<()> {
1567        let store = setup_store();
1568        let alpha =
1569            store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1570        let beta =
1571            store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1572        let gamma =
1573            store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1574
1575        store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1576        store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1577
1578        let outgoing =
1579            store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1580        assert_eq!(outgoing.len(), 2);
1581        assert!(outgoing.iter().any(|node| node.label == "Beta"));
1582        assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1583
1584        let incoming =
1585            store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1586        assert_eq!(incoming.len(), 2);
1587        assert!(incoming.iter().any(|node| node.label == "Beta"));
1588        assert!(incoming.iter().any(|node| node.label == "Alpha"));
1589
1590        Ok(())
1591    }
1592
1593    #[test]
1594    fn sync_config_round_trip() -> Result<()> {
1595        let store = setup_store();
1596
1597        let saved = store.graph_set_sync_config(
1598            "session",
1599            "default",
1600            true,
1601            Some("last_write_wins"),
1602            Some(120),
1603        )?;
1604        assert!(saved.sync_enabled);
1605        assert_eq!(
1606            saved.conflict_resolution_strategy.as_deref(),
1607            Some("last_write_wins")
1608        );
1609        assert_eq!(saved.sync_interval_seconds, Some(120));
1610
1611        let fetched = store.graph_get_sync_config("session", "default")?;
1612        assert!(fetched.sync_enabled);
1613        assert_eq!(
1614            fetched.conflict_resolution_strategy.as_deref(),
1615            Some("last_write_wins")
1616        );
1617        assert_eq!(fetched.sync_interval_seconds, Some(120));
1618
1619        let defaults = store.graph_get_sync_config("other_session", "missing")?;
1620        assert!(!defaults.sync_enabled);
1621        assert_eq!(
1622            defaults.conflict_resolution_strategy.as_deref(),
1623            Some("vector_clock")
1624        );
1625        assert_eq!(defaults.sync_interval_seconds, Some(60));
1626
1627        Ok(())
1628    }
1629
1630    #[test]
1631    fn sync_state_metadata_round_trip() -> Result<()> {
1632        let store = setup_store();
1633        store.graph_sync_state_update("instance", "session", "graph", r#"{"a":1}"#)?;
1634
1635        let state = store
1636            .graph_sync_state_get_metadata("instance", "session", "graph")?
1637            .expect("state exists");
1638
1639        assert_eq!(state.vector_clock, r#"{"a":1}"#);
1640        assert!(state.last_sync_at.is_some());
1641
1642        Ok(())
1643    }
1644
1645    #[test]
1646    fn graph_conflict_listing_filters() -> Result<()> {
1647        let store = setup_store();
1648        let vc_json = VectorClock::new().to_json()?;
1649
1650        let conflict_payload = json!({
1651            "graph_name": "g1",
1652            "local_version": { "id": 1 },
1653            "remote_version": { "id": 2 },
1654            "resolution": "Test"
1655        });
1656
1657        store.graph_changelog_append(
1658            "session_one",
1659            "inst1",
1660            "node",
1661            1,
1662            "conflict",
1663            &vc_json,
1664            Some(&conflict_payload.to_string()),
1665        )?;
1666
1667        let second_payload = json!({
1668            "graph_name": "g2",
1669            "local_version": { "id": 3 },
1670            "remote_version": { "id": 4 }
1671        });
1672
1673        store.graph_changelog_append(
1674            "session_two",
1675            "inst1",
1676            "edge",
1677            2,
1678            "conflict",
1679            &vc_json,
1680            Some(&second_payload.to_string()),
1681        )?;
1682
1683        let all_conflicts = store.graph_list_conflicts(None, 10)?;
1684        assert_eq!(all_conflicts.len(), 2);
1685
1686        let session_filtered = store.graph_list_conflicts(Some("session_one"), 10)?;
1687        assert_eq!(session_filtered.len(), 1);
1688        assert_eq!(session_filtered[0].session_id, "session_one");
1689        assert_eq!(session_filtered[0].entity_id, 1);
1690
1691        Ok(())
1692    }
1693}