spec_ai_knowledge_graph/
graph_store.rs

1use crate::types::{EdgeType, GraphEdge, GraphNode, GraphPath, NodeType, TraversalDirection};
2use crate::vector_clock::VectorClock;
3use anyhow::Result;
4use chrono::{DateTime, Duration, Utc};
5use duckdb::{params, Connection};
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::sync::{Arc, Mutex, MutexGuard};
9
10#[derive(Clone)]
11pub struct KnowledgeGraphStore {
12    conn: Arc<Mutex<Connection>>,
13    instance_id: String,
14}
15
16impl KnowledgeGraphStore {
17    pub fn new(conn: Arc<Mutex<Connection>>, instance_id: impl Into<String>) -> Self {
18        Self {
19            conn,
20            instance_id: instance_id.into(),
21        }
22    }
23
24    pub fn from_connection(conn: Connection, instance_id: impl Into<String>) -> Self {
25        Self {
26            conn: Arc::new(Mutex::new(conn)),
27            instance_id: instance_id.into(),
28        }
29    }
30
31    pub fn instance_id(&self) -> &str {
32        &self.instance_id
33    }
34
35    fn conn(&self) -> MutexGuard<'_, Connection> {
36        self.conn.lock().expect("database connection poisoned")
37    }
38
39    // ---------- Graph Node Operations ----------
40
41    pub fn insert_graph_node(
42        &self,
43        session_id: &str,
44        node_type: NodeType,
45        label: &str,
46        properties: &JsonValue,
47        embedding_id: Option<i64>,
48    ) -> Result<i64> {
49        let sync_enabled = self
50            .graph_get_sync_enabled(session_id, "default")
51            .unwrap_or(false);
52
53        let mut vector_clock = VectorClock::new();
54        vector_clock.increment(&self.instance_id);
55        let vc_json = vector_clock.to_json()?;
56
57        let conn = self.conn();
58
59        let mut stmt = conn.prepare(
60            "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
61                                     vector_clock, last_modified_by, sync_enabled)
62             VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
63        )?;
64        let id: i64 = stmt.query_row(
65            params![
66                session_id,
67                node_type.as_str(),
68                label,
69                properties.to_string(),
70                embedding_id,
71                vc_json,
72                self.instance_id,
73                sync_enabled,
74            ],
75            |row| row.get(0),
76        )?;
77
78        if sync_enabled {
79            let node_data = serde_json::json!({
80                "id": id,
81                "session_id": session_id,
82                "node_type": node_type.as_str(),
83                "label": label,
84                "properties": properties,
85                "embedding_id": embedding_id,
86            });
87
88            self.graph_changelog_append(
89                session_id,
90                &self.instance_id,
91                "node",
92                id,
93                "create",
94                &vc_json,
95                Some(&node_data.to_string()),
96            )?;
97        }
98
99        Ok(id)
100    }
101
102    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
103        let conn = self.conn();
104        let mut stmt = conn.prepare(
105            "SELECT id, session_id, node_type, label, properties, embedding_id,
106                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
107             FROM graph_nodes WHERE id = ?",
108        )?;
109        let mut rows = stmt.query(params![node_id])?;
110        if let Some(row) = rows.next()? {
111            Ok(Some(Self::row_to_graph_node(row)?))
112        } else {
113            Ok(None)
114        }
115    }
116
117    pub fn list_graph_nodes(
118        &self,
119        session_id: &str,
120        node_type: Option<NodeType>,
121        limit: Option<i64>,
122    ) -> Result<Vec<GraphNode>> {
123        let conn = self.conn();
124
125        let nodes = if let Some(nt) = node_type {
126            let mut stmt = conn.prepare(
127                "SELECT id, session_id, node_type, label, properties, embedding_id,
128                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
129                 FROM graph_nodes WHERE session_id = ? AND node_type = ?
130                 ORDER BY id DESC LIMIT ?",
131            )?;
132            let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
133            Self::collect_graph_nodes(query)?
134        } else {
135            let mut stmt = conn.prepare(
136                "SELECT id, session_id, node_type, label, properties, embedding_id,
137                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
138                 FROM graph_nodes WHERE session_id = ?
139                 ORDER BY id DESC LIMIT ?",
140            )?;
141            let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
142            Self::collect_graph_nodes(query)?
143        };
144
145        Ok(nodes)
146    }
147
148    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
149        let conn = self.conn();
150        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
151        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
152        Ok(count)
153    }
154
155    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
156        let conn = self.conn();
157
158        let mut stmt = conn.prepare(
159            "SELECT session_id, node_type, label, vector_clock, sync_enabled
160             FROM graph_nodes WHERE id = ?",
161        )?;
162
163        let (session_id, node_type, label, current_vc_json, sync_enabled): (
164            String,
165            String,
166            String,
167            Option<String>,
168            bool,
169        ) = stmt.query_row(params![node_id], |row| {
170            Ok((
171                row.get(0)?,
172                row.get(1)?,
173                row.get(2)?,
174                row.get(3)?,
175                row.get(4).unwrap_or(false),
176            ))
177        })?;
178
179        let mut vector_clock = if let Some(vc_json) = current_vc_json {
180            VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
181        } else {
182            VectorClock::new()
183        };
184        vector_clock.increment(&self.instance_id);
185        let vc_json = vector_clock.to_json()?;
186
187        conn.execute(
188            "UPDATE graph_nodes
189             SET properties = ?,
190                 vector_clock = ?,
191                 last_modified_by = ?,
192                 updated_at = CURRENT_TIMESTAMP
193             WHERE id = ?",
194            params![properties.to_string(), vc_json, self.instance_id, node_id],
195        )?;
196
197        if sync_enabled {
198            let node_data = serde_json::json!({
199                "id": node_id,
200                "session_id": session_id,
201                "node_type": node_type,
202                "label": label,
203                "properties": properties,
204            });
205
206            self.graph_changelog_append(
207                &session_id,
208                &self.instance_id,
209                "node",
210                node_id,
211                "update",
212                &vc_json,
213                Some(&node_data.to_string()),
214            )?;
215        }
216
217        Ok(())
218    }
219
220    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
221        let conn = self.conn();
222
223        let mut stmt = conn.prepare(
224            "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
225             FROM graph_nodes WHERE id = ?",
226        )?;
227
228        let result = stmt.query_row(params![node_id], |row| {
229            Ok((
230                row.get::<_, String>(0)?,
231                row.get::<_, String>(1)?,
232                row.get::<_, String>(2)?,
233                row.get::<_, String>(3)?,
234                row.get::<_, Option<String>>(4)?,
235                row.get::<_, bool>(5).unwrap_or(false),
236            ))
237        });
238
239        if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
240            result
241        {
242            if sync_enabled {
243                let mut vector_clock = if let Some(vc_json) = current_vc_json {
244                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
245                } else {
246                    VectorClock::new()
247                };
248                vector_clock.increment(&self.instance_id);
249                let vc_json = vector_clock.to_json()?;
250
251                conn.execute(
252                    "INSERT INTO graph_tombstones
253                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
254                     VALUES (?, ?, ?, ?, ?)",
255                    params![session_id, "node", node_id, self.instance_id, vc_json],
256                )?;
257
258                let node_data = serde_json::json!({
259                    "id": node_id,
260                    "session_id": session_id,
261                    "node_type": node_type,
262                    "label": label,
263                    "properties": properties,
264                });
265
266                self.graph_changelog_append(
267                    &session_id,
268                    &self.instance_id,
269                    "node",
270                    node_id,
271                    "delete",
272                    &vc_json,
273                    Some(&node_data.to_string()),
274                )?;
275            }
276        }
277
278        conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
279        Ok(())
280    }
281
282    // ---------- Graph Edge Operations ----------
283
284    pub fn insert_graph_edge(
285        &self,
286        session_id: &str,
287        source_id: i64,
288        target_id: i64,
289        edge_type: EdgeType,
290        predicate: Option<&str>,
291        properties: Option<&JsonValue>,
292        weight: f32,
293    ) -> Result<i64> {
294        let sync_enabled = self
295            .graph_get_sync_enabled(session_id, "default")
296            .unwrap_or(false);
297
298        let mut vector_clock = VectorClock::new();
299        vector_clock.increment(&self.instance_id);
300        let vc_json = vector_clock.to_json()?;
301
302        let conn = self.conn();
303
304        let mut stmt = conn.prepare(
305            "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
306                                     vector_clock, last_modified_by, sync_enabled)
307             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
308        )?;
309        let props_str = properties.map(|p| p.to_string());
310        let id: i64 = stmt.query_row(
311            params![
312                session_id,
313                source_id,
314                target_id,
315                edge_type.as_str(),
316                predicate,
317                props_str,
318                weight,
319                vc_json,
320                self.instance_id,
321                sync_enabled,
322            ],
323            |row| row.get(0),
324        )?;
325
326        if sync_enabled {
327            let edge_data = serde_json::json!({
328                "id": id,
329                "session_id": session_id,
330                "source_id": source_id,
331                "target_id": target_id,
332                "edge_type": edge_type.as_str(),
333                "predicate": predicate,
334                "properties": properties,
335                "weight": weight,
336            });
337
338            self.graph_changelog_append(
339                session_id,
340                &self.instance_id,
341                "edge",
342                id,
343                "insert",
344                &vc_json,
345                Some(&edge_data.to_string()),
346            )?;
347        }
348
349        Ok(id)
350    }
351
352    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
353        let conn = self.conn();
354        let mut stmt = conn.prepare(
355            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
356                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
357             FROM graph_edges WHERE id = ?",
358        )?;
359        let mut rows = stmt.query(params![edge_id])?;
360        if let Some(row) = rows.next()? {
361            Ok(Some(Self::row_to_graph_edge(row)?))
362        } else {
363            Ok(None)
364        }
365    }
366
367    pub fn list_graph_edges(
368        &self,
369        session_id: &str,
370        source_id: Option<i64>,
371        target_id: Option<i64>,
372    ) -> Result<Vec<GraphEdge>> {
373        let conn = self.conn();
374
375        let edges = match (source_id, target_id) {
376            (Some(src), Some(tgt)) => {
377                let mut stmt = conn.prepare(
378                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
379                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
380                     FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
381                )?;
382                let query = stmt.query(params![session_id, src, tgt])?;
383                Self::collect_graph_edges(query)?
384            }
385            (Some(src), None) => {
386                let mut stmt = conn.prepare(
387                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
388                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
389                     FROM graph_edges WHERE session_id = ? AND source_id = ?",
390                )?;
391                let query = stmt.query(params![session_id, src])?;
392                Self::collect_graph_edges(query)?
393            }
394            (None, Some(tgt)) => {
395                let mut stmt = conn.prepare(
396                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
397                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
398                     FROM graph_edges WHERE session_id = ? AND target_id = ?",
399                )?;
400                let query = stmt.query(params![session_id, tgt])?;
401                Self::collect_graph_edges(query)?
402            }
403            (None, None) => {
404                let mut stmt = conn.prepare(
405                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
406                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
407                     FROM graph_edges WHERE session_id = ?",
408                )?;
409                let query = stmt.query(params![session_id])?;
410                Self::collect_graph_edges(query)?
411            }
412        };
413
414        Ok(edges)
415    }
416
417    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
418        let conn = self.conn();
419        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
420        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
421        Ok(count)
422    }
423
424    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
425        let conn = self.conn();
426
427        let mut stmt = conn.prepare(
428            "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
429                    vector_clock, sync_enabled
430             FROM graph_edges WHERE id = ?",
431        )?;
432
433        let result = stmt.query_row(params![edge_id], |row| {
434            Ok((
435                row.get::<_, String>(0)?,
436                row.get::<_, i64>(1)?,
437                row.get::<_, i64>(2)?,
438                row.get::<_, String>(3)?,
439                row.get::<_, Option<String>>(4)?,
440                row.get::<_, Option<String>>(5)?,
441                row.get::<_, f32>(6)?,
442                row.get::<_, Option<String>>(7)?,
443                row.get::<_, bool>(8).unwrap_or(false),
444            ))
445        });
446
447        if let Ok((
448            session_id,
449            source_id,
450            target_id,
451            edge_type,
452            predicate,
453            properties,
454            weight,
455            current_vc_json,
456            sync_enabled,
457        )) = result
458        {
459            if sync_enabled {
460                let mut vector_clock = if let Some(vc_json) = current_vc_json {
461                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
462                } else {
463                    VectorClock::new()
464                };
465                vector_clock.increment(&self.instance_id);
466                let vc_json = vector_clock.to_json()?;
467
468                conn.execute(
469                    "INSERT INTO graph_tombstones
470                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
471                     VALUES (?, ?, ?, ?, ?)",
472                    params![session_id, "edge", edge_id, self.instance_id, vc_json],
473                )?;
474
475                let edge_data = serde_json::json!({
476                    "id": edge_id,
477                    "session_id": session_id,
478                    "source_id": source_id,
479                    "target_id": target_id,
480                    "edge_type": edge_type,
481                    "predicate": predicate,
482                    "properties": properties,
483                    "weight": weight,
484                });
485
486                self.graph_changelog_append(
487                    &session_id,
488                    &self.instance_id,
489                    "edge",
490                    edge_id,
491                    "delete",
492                    &vc_json,
493                    Some(&edge_data.to_string()),
494                )?;
495            }
496        }
497
498        conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
499        Ok(())
500    }
501
502    // ---------- Graph Traversal Operations ----------
503
504    pub fn find_shortest_path(
505        &self,
506        session_id: &str,
507        source_id: i64,
508        target_id: i64,
509        max_hops: Option<usize>,
510    ) -> Result<Option<GraphPath>> {
511        let max_depth = max_hops.unwrap_or(10);
512
513        let mut visited = HashSet::new();
514        let mut queue = VecDeque::new();
515        let mut parent_map = HashMap::new();
516
517        queue.push_back((source_id, 0));
518        visited.insert(source_id);
519
520        while let Some((current_id, depth)) = queue.pop_front() {
521            if current_id == target_id {
522                let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
523                return Ok(Some(path));
524            }
525
526            if depth >= max_depth {
527                continue;
528            }
529
530            let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
531            for edge in edges {
532                let target = edge.target_id;
533                if !visited.contains(&target) {
534                    visited.insert(target);
535                    parent_map.insert(target, (current_id, edge));
536                    queue.push_back((target, depth + 1));
537                }
538            }
539        }
540
541        Ok(None)
542    }
543
544    pub fn traverse_neighbors(
545        &self,
546        session_id: &str,
547        node_id: i64,
548        direction: TraversalDirection,
549        depth: usize,
550    ) -> Result<Vec<GraphNode>> {
551        if depth == 0 {
552            return Ok(vec![]);
553        }
554
555        let mut visited = HashSet::new();
556        let mut result = Vec::new();
557        let mut queue = VecDeque::new();
558
559        queue.push_back((node_id, 0));
560        visited.insert(node_id);
561
562        while let Some((current_id, current_depth)) = queue.pop_front() {
563            if current_depth > 0 {
564                if let Some(node) = self.get_graph_node(current_id)? {
565                    result.push(node);
566                }
567            }
568
569            if current_depth >= depth {
570                continue;
571            }
572
573            let edges = match direction {
574                TraversalDirection::Outgoing => {
575                    self.list_graph_edges(session_id, Some(current_id), None)?
576                }
577                TraversalDirection::Incoming => {
578                    self.list_graph_edges(session_id, None, Some(current_id))?
579                }
580                TraversalDirection::Both => {
581                    let mut out_edges =
582                        self.list_graph_edges(session_id, Some(current_id), None)?;
583                    let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
584                    out_edges.extend(in_edges);
585                    out_edges
586                }
587            };
588
589            for edge in edges {
590                let next_id = match direction {
591                    TraversalDirection::Outgoing => edge.target_id,
592                    TraversalDirection::Incoming => edge.source_id,
593                    TraversalDirection::Both => {
594                        if edge.source_id == current_id {
595                            edge.target_id
596                        } else {
597                            edge.source_id
598                        }
599                    }
600                };
601
602                if !visited.contains(&next_id) {
603                    visited.insert(next_id);
604                    queue.push_back((next_id, current_depth + 1));
605                }
606            }
607        }
608
609        Ok(result)
610    }
611
612    fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
613        let id: i64 = row.get(0)?;
614        let session_id: String = row.get(1)?;
615        let node_type: String = row.get(2)?;
616        let label: String = row.get(3)?;
617        let properties: String = row.get(4)?;
618        let embedding_id: Option<i64> = row.get(5)?;
619        let created_at: String = row.get(6)?;
620        let updated_at: String = row.get(7)?;
621
622        Ok(GraphNode {
623            id,
624            session_id,
625            node_type: NodeType::from_str(&node_type),
626            label,
627            properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
628            embedding_id,
629            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
630            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
631        })
632    }
633
634    fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
635        let id: i64 = row.get(0)?;
636        let session_id: String = row.get(1)?;
637        let source_id: i64 = row.get(2)?;
638        let target_id: i64 = row.get(3)?;
639        let edge_type: String = row.get(4)?;
640        let predicate: Option<String> = row.get(5)?;
641        let properties: Option<String> = row.get(6)?;
642        let weight: f32 = row.get(7)?;
643        let temporal_start: Option<String> = row.get(8)?;
644        let temporal_end: Option<String> = row.get(9)?;
645        let created_at: String = row.get(10)?;
646
647        Ok(GraphEdge {
648            id,
649            session_id,
650            source_id,
651            target_id,
652            edge_type: EdgeType::from_str(&edge_type),
653            predicate,
654            properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
655            weight,
656            temporal_start: temporal_start.and_then(|s| s.parse().ok()),
657            temporal_end: temporal_end.and_then(|s| s.parse().ok()),
658            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
659        })
660    }
661
662    fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
663        let mut nodes = Vec::new();
664        while let Some(row) = rows.next()? {
665            nodes.push(Self::row_to_graph_node(row)?);
666        }
667        Ok(nodes)
668    }
669
670    fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
671        let mut edges = Vec::new();
672        while let Some(row) = rows.next()? {
673            edges.push(Self::row_to_graph_edge(row)?);
674        }
675        Ok(edges)
676    }
677
678    fn reconstruct_path(
679        &self,
680        parent_map: &HashMap<i64, (i64, GraphEdge)>,
681        source_id: i64,
682        target_id: i64,
683    ) -> Result<GraphPath> {
684        let mut path_edges = Vec::new();
685        let mut path_nodes = Vec::new();
686        let mut current = target_id;
687        let mut total_weight = 0.0;
688
689        while current != source_id {
690            if let Some((parent, edge)) = parent_map.get(&current) {
691                path_edges.push(edge.clone());
692                total_weight += edge.weight;
693                current = *parent;
694            } else {
695                break;
696            }
697        }
698
699        path_edges.reverse();
700
701        if let Some(node) = self.get_graph_node(source_id)? {
702            path_nodes.push(node);
703        }
704        for edge in &path_edges {
705            if let Some(node) = self.get_graph_node(edge.target_id)? {
706                path_nodes.push(node);
707            }
708        }
709
710        Ok(GraphPath {
711            length: path_edges.len(),
712            weight: total_weight,
713            nodes: path_nodes,
714            edges: path_edges,
715        })
716    }
717
718    // ===== Graph Synchronization Methods =====
719
720    pub fn graph_changelog_append(
721        &self,
722        session_id: &str,
723        instance_id: &str,
724        entity_type: &str,
725        entity_id: i64,
726        operation: &str,
727        vector_clock: &str,
728        data: Option<&str>,
729    ) -> Result<i64> {
730        let conn = self.conn();
731        conn.execute(
732            "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
733             VALUES (?, ?, ?, ?, ?, ?, ?)",
734            params![session_id, instance_id, entity_type, entity_id, operation, vector_clock, data],
735        )?;
736        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
737        Ok(id)
738    }
739
740    pub fn graph_changelog_get_since(
741        &self,
742        session_id: &str,
743        since_timestamp: &str,
744    ) -> Result<Vec<ChangelogEntry>> {
745        let conn = self.conn();
746        let mut stmt = conn.prepare(
747            "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
748             FROM graph_changelog
749             WHERE session_id = ? AND created_at > ?
750             ORDER BY created_at ASC",
751        )?;
752        let mut rows = stmt.query(params![session_id, since_timestamp])?;
753        let mut entries = Vec::new();
754        while let Some(row) = rows.next()? {
755            entries.push(ChangelogEntry::from_row(row)?);
756        }
757        Ok(entries)
758    }
759
760    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
761        let conn = self.conn();
762        let cutoff = Utc::now() - Duration::days(days_to_keep);
763        let cutoff_str = cutoff.to_rfc3339();
764        let deleted = conn.execute(
765            "DELETE FROM graph_changelog WHERE created_at < ?",
766            params![cutoff_str],
767        )?;
768        Ok(deleted)
769    }
770
771    pub fn graph_sync_state_get(
772        &self,
773        instance_id: &str,
774        session_id: &str,
775        graph_name: &str,
776    ) -> Result<Option<String>> {
777        let conn = self.conn();
778        let result: Result<String, _> = conn.query_row(
779            "SELECT vector_clock FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
780            params![instance_id, session_id, graph_name],
781            |row| row.get(0),
782        );
783        match result {
784            Ok(vc) => Ok(Some(vc)),
785            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
786            Err(e) => Err(e.into()),
787        }
788    }
789
790    pub fn graph_sync_state_update(
791        &self,
792        instance_id: &str,
793        session_id: &str,
794        graph_name: &str,
795        vector_clock: &str,
796    ) -> Result<()> {
797        let conn = self.conn();
798        conn.execute("BEGIN TRANSACTION", params![])?;
799        conn.execute(
800            "DELETE FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
801            params![instance_id, session_id, graph_name],
802        )?;
803        conn.execute(
804            "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock) VALUES (?, ?, ?, ?)",
805            params![instance_id, session_id, graph_name, vector_clock],
806        )?;
807        conn.execute("COMMIT", params![])?;
808        Ok(())
809    }
810
811    pub fn graph_set_sync_enabled(
812        &self,
813        session_id: &str,
814        graph_name: &str,
815        enabled: bool,
816    ) -> Result<()> {
817        let conn = self.conn();
818        // Upsert: insert if not exists, update if exists
819        conn.execute(
820            "INSERT INTO graph_metadata (session_id, graph_name, sync_enabled)
821             VALUES (?, ?, ?)
822             ON CONFLICT (session_id, graph_name) DO UPDATE SET sync_enabled = EXCLUDED.sync_enabled",
823            params![session_id, graph_name, enabled],
824        )?;
825        Ok(())
826    }
827
828    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
829        let conn = self.conn();
830        let result: Result<bool, _> = conn.query_row(
831            "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
832            params![session_id, graph_name],
833            |row| row.get(0),
834        );
835        match result {
836            Ok(enabled) => Ok(enabled),
837            Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
838            Err(e) => Err(e.into()),
839        }
840    }
841
842    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
843        let conn = self.conn();
844        let mut stmt = conn.prepare(
845            "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
846             UNION
847             SELECT DISTINCT 'default' as graph_name
848             FROM graph_nodes WHERE session_id = ?
849             ORDER BY graph_name",
850        )?;
851
852        let mut graphs = Vec::new();
853        let mut rows = stmt.query(params![session_id, session_id])?;
854        while let Some(row) = rows.next()? {
855            let graph_name: String = row.get(0)?;
856            graphs.push(graph_name);
857        }
858
859        if graphs.is_empty() {
860            let node_count: i64 = conn.query_row(
861                "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
862                params![session_id],
863                |row| row.get(0),
864            )?;
865            if node_count > 0 {
866                graphs.push("default".to_string());
867            }
868        }
869
870        Ok(graphs)
871    }
872
873    /// List all sync-enabled graphs across all sessions
874    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
875        let conn = self.conn();
876        let mut stmt = conn.prepare(
877            "SELECT session_id, graph_name FROM graph_metadata WHERE sync_enabled = TRUE ORDER BY session_id, graph_name",
878        )?;
879
880        let mut results = Vec::new();
881        let mut rows = stmt.query(params![])?;
882        while let Some(row) = rows.next()? {
883            let session_id: String = row.get(0)?;
884            let graph_name: String = row.get(1)?;
885            results.push((session_id, graph_name));
886        }
887
888        Ok(results)
889    }
890
891    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
892        let conn = self.conn();
893        let result: Result<SyncedNodeRecord, _> = conn.query_row(
894            "SELECT id, session_id, node_type, label, properties, embedding_id,
895                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
896                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
897             FROM graph_nodes WHERE id = ?",
898            params![node_id],
899            SyncedNodeRecord::from_row,
900        );
901        match result {
902            Ok(node) => Ok(Some(node)),
903            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
904            Err(e) => Err(e.into()),
905        }
906    }
907
908    pub fn graph_list_nodes_with_sync(
909        &self,
910        session_id: &str,
911        sync_enabled_only: bool,
912        include_deleted: bool,
913    ) -> Result<Vec<SyncedNodeRecord>> {
914        let conn = self.conn();
915        let mut query = String::from(
916            "SELECT id, session_id, node_type, label, properties, embedding_id,
917                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
918                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
919             FROM graph_nodes WHERE session_id = ?",
920        );
921
922        if sync_enabled_only {
923            query.push_str(" AND sync_enabled = TRUE");
924        }
925        if !include_deleted {
926            query.push_str(" AND is_deleted = FALSE");
927        }
928        query.push_str(" ORDER BY created_at ASC");
929
930        let mut stmt = conn.prepare(&query)?;
931        let mut rows = stmt.query(params![session_id])?;
932        let mut nodes = Vec::new();
933        while let Some(row) = rows.next()? {
934            nodes.push(SyncedNodeRecord::from_row(row)?);
935        }
936        Ok(nodes)
937    }
938
939    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
940        let conn = self.conn();
941        let result: Result<SyncedEdgeRecord, _> = conn.query_row(
942            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
943                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
944                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
945             FROM graph_edges WHERE id = ?",
946            params![edge_id],
947            SyncedEdgeRecord::from_row,
948        );
949        match result {
950            Ok(edge) => Ok(Some(edge)),
951            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
952            Err(e) => Err(e.into()),
953        }
954    }
955
956    pub fn graph_list_edges_with_sync(
957        &self,
958        session_id: &str,
959        sync_enabled_only: bool,
960        include_deleted: bool,
961    ) -> Result<Vec<SyncedEdgeRecord>> {
962        let conn = self.conn();
963        let mut query = String::from(
964            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
965                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
966                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
967             FROM graph_edges WHERE session_id = ?",
968        );
969
970        if sync_enabled_only {
971            query.push_str(" AND sync_enabled = TRUE");
972        }
973        if !include_deleted {
974            query.push_str(" AND is_deleted = FALSE");
975        }
976        query.push_str(" ORDER BY created_at ASC");
977
978        let mut stmt = conn.prepare(&query)?;
979        let mut rows = stmt.query(params![session_id])?;
980        let mut edges = Vec::new();
981        while let Some(row) = rows.next()? {
982            edges.push(SyncedEdgeRecord::from_row(row)?);
983        }
984        Ok(edges)
985    }
986
987    pub fn graph_update_node_sync_metadata(
988        &self,
989        node_id: i64,
990        vector_clock: &str,
991        last_modified_by: &str,
992        sync_enabled: bool,
993    ) -> Result<()> {
994        let conn = self.conn();
995        conn.execute(
996            "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
997             WHERE id = ?",
998            params![vector_clock, last_modified_by, sync_enabled, node_id],
999        )?;
1000        Ok(())
1001    }
1002
1003    pub fn graph_update_edge_sync_metadata(
1004        &self,
1005        edge_id: i64,
1006        vector_clock: &str,
1007        last_modified_by: &str,
1008        sync_enabled: bool,
1009    ) -> Result<()> {
1010        let conn = self.conn();
1011        conn.execute(
1012            "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1013             WHERE id = ?",
1014            params![vector_clock, last_modified_by, sync_enabled, edge_id],
1015        )?;
1016        Ok(())
1017    }
1018
1019    pub fn graph_mark_node_deleted(
1020        &self,
1021        node_id: i64,
1022        vector_clock: &str,
1023        deleted_by: &str,
1024    ) -> Result<()> {
1025        let conn = self.conn();
1026        conn.execute(
1027            "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1028             WHERE id = ?",
1029            params![vector_clock, deleted_by, node_id],
1030        )?;
1031        Ok(())
1032    }
1033
1034    pub fn graph_mark_edge_deleted(
1035        &self,
1036        edge_id: i64,
1037        vector_clock: &str,
1038        deleted_by: &str,
1039    ) -> Result<()> {
1040        let conn = self.conn();
1041        conn.execute(
1042            "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1043             WHERE id = ?",
1044            params![vector_clock, deleted_by, edge_id],
1045        )?;
1046        Ok(())
1047    }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub struct ChangelogEntry {
1052    pub id: i64,
1053    pub session_id: String,
1054    pub instance_id: String,
1055    pub entity_type: String,
1056    pub entity_id: i64,
1057    pub operation: String,
1058    pub vector_clock: String,
1059    pub data: Option<String>,
1060    pub created_at: DateTime<Utc>,
1061}
1062
1063impl ChangelogEntry {
1064    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1065        let id: i64 = row.get(0)?;
1066        let session_id: String = row.get(1)?;
1067        let instance_id: String = row.get(2)?;
1068        let entity_type: String = row.get(3)?;
1069        let entity_id: i64 = row.get(4)?;
1070        let operation: String = row.get(5)?;
1071        let vector_clock: String = row.get(6)?;
1072        let data: Option<String> = row.get(7)?;
1073        let created_at_str: String = row.get(8)?;
1074
1075        Ok(ChangelogEntry {
1076            id,
1077            session_id,
1078            instance_id,
1079            entity_type,
1080            entity_id,
1081            operation,
1082            vector_clock,
1083            data,
1084            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1085        })
1086    }
1087}
1088
1089#[derive(Debug, Clone)]
1090pub struct SyncedNodeRecord {
1091    pub id: i64,
1092    pub session_id: String,
1093    pub node_type: String,
1094    pub label: String,
1095    pub properties: JsonValue,
1096    pub embedding_id: Option<i64>,
1097    pub created_at: DateTime<Utc>,
1098    pub updated_at: DateTime<Utc>,
1099    pub vector_clock: String,
1100    pub last_modified_by: Option<String>,
1101    pub is_deleted: bool,
1102    pub sync_enabled: bool,
1103}
1104
1105impl SyncedNodeRecord {
1106    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1107        let id: i64 = row.get(0)?;
1108        let session_id: String = row.get(1)?;
1109        let node_type: String = row.get(2)?;
1110        let label: String = row.get(3)?;
1111        let properties_str: String = row.get(4)?;
1112        let properties: JsonValue = serde_json::from_str(&properties_str).map_err(|e| {
1113            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1114        })?;
1115        let embedding_id: Option<i64> = row.get(5)?;
1116        let created_at_str: String = row.get(6)?;
1117        let updated_at_str: String = row.get(7)?;
1118        let vector_clock: String = row.get(8)?;
1119        let last_modified_by: Option<String> = row.get(9)?;
1120        let is_deleted: bool = row.get(10)?;
1121        let sync_enabled: bool = row.get(11)?;
1122
1123        Ok(SyncedNodeRecord {
1124            id,
1125            session_id,
1126            node_type,
1127            label,
1128            properties,
1129            embedding_id,
1130            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1131            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1132            vector_clock,
1133            last_modified_by,
1134            is_deleted,
1135            sync_enabled,
1136        })
1137    }
1138}
1139
1140#[derive(Debug, Clone)]
1141pub struct SyncedEdgeRecord {
1142    pub id: i64,
1143    pub session_id: String,
1144    pub source_id: i64,
1145    pub target_id: i64,
1146    pub edge_type: String,
1147    pub predicate: Option<String>,
1148    pub properties: Option<JsonValue>,
1149    pub weight: f32,
1150    pub temporal_start: Option<DateTime<Utc>>,
1151    pub temporal_end: Option<DateTime<Utc>>,
1152    pub created_at: DateTime<Utc>,
1153    pub vector_clock: String,
1154    pub last_modified_by: Option<String>,
1155    pub is_deleted: bool,
1156    pub sync_enabled: bool,
1157}
1158
1159impl SyncedEdgeRecord {
1160    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1161        let id: i64 = row.get(0)?;
1162        let session_id: String = row.get(1)?;
1163        let source_id: i64 = row.get(2)?;
1164        let target_id: i64 = row.get(3)?;
1165        let edge_type: String = row.get(4)?;
1166        let predicate: Option<String> = row.get(5)?;
1167        let properties_str: Option<String> = row.get(6)?;
1168        let properties: Option<JsonValue> = properties_str
1169            .as_ref()
1170            .and_then(|s| serde_json::from_str(s).ok());
1171        let weight: f32 = row.get(7)?;
1172        let temporal_start_str: Option<String> = row.get(8)?;
1173        let temporal_end_str: Option<String> = row.get(9)?;
1174        let created_at_str: String = row.get(10)?;
1175        let vector_clock: String = row.get(11)?;
1176        let last_modified_by: Option<String> = row.get(12)?;
1177        let is_deleted: bool = row.get(13)?;
1178        let sync_enabled: bool = row.get(14)?;
1179
1180        Ok(SyncedEdgeRecord {
1181            id,
1182            session_id,
1183            source_id,
1184            target_id,
1185            edge_type,
1186            predicate,
1187            properties,
1188            weight,
1189            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1190            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1191            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1192            vector_clock,
1193            last_modified_by,
1194            is_deleted,
1195            sync_enabled,
1196        })
1197    }
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202    use super::*;
1203    use anyhow::Result;
1204    use serde_json::json;
1205
1206    fn setup_store() -> KnowledgeGraphStore {
1207        setup_store_with(|_| {})
1208    }
1209
1210    fn setup_store_with<F>(extra: F) -> KnowledgeGraphStore
1211    where
1212        F: FnOnce(&Connection),
1213    {
1214        let conn = Connection::open_in_memory().expect("open in-memory database");
1215        conn.execute_batch(
1216            r#"
1217            CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
1218            CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
1219            CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
1220            CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
1221            CREATE SEQUENCE IF NOT EXISTS graph_tombstones_id_seq START 1;
1222
1223            CREATE TABLE graph_nodes (
1224                id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
1225                session_id TEXT NOT NULL,
1226                node_type TEXT NOT NULL,
1227                label TEXT NOT NULL,
1228                properties TEXT NOT NULL,
1229                embedding_id BIGINT,
1230                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1231                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1232                vector_clock TEXT DEFAULT '{}',
1233                last_modified_by TEXT,
1234                is_deleted BOOLEAN DEFAULT FALSE,
1235                sync_enabled BOOLEAN DEFAULT FALSE
1236            );
1237
1238            CREATE TABLE graph_edges (
1239                id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
1240                session_id TEXT NOT NULL,
1241                source_id BIGINT NOT NULL,
1242                target_id BIGINT NOT NULL,
1243                edge_type TEXT NOT NULL,
1244                predicate TEXT,
1245                properties TEXT,
1246                weight REAL DEFAULT 1.0,
1247                temporal_start TIMESTAMP,
1248                temporal_end TIMESTAMP,
1249                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1250                vector_clock TEXT DEFAULT '{}',
1251                last_modified_by TEXT,
1252                is_deleted BOOLEAN DEFAULT FALSE,
1253                sync_enabled BOOLEAN DEFAULT FALSE
1254            );
1255
1256            CREATE TABLE graph_metadata (
1257                id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
1258                session_id TEXT NOT NULL,
1259                graph_name TEXT NOT NULL,
1260                sync_enabled BOOLEAN DEFAULT FALSE
1261            );
1262
1263            CREATE TABLE graph_changelog (
1264                id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
1265                session_id TEXT NOT NULL,
1266                instance_id TEXT NOT NULL,
1267                entity_type TEXT NOT NULL,
1268                entity_id BIGINT NOT NULL,
1269                operation TEXT NOT NULL,
1270                vector_clock TEXT NOT NULL,
1271                data TEXT,
1272                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1273            );
1274
1275            CREATE TABLE graph_sync_state (
1276                instance_id TEXT NOT NULL,
1277                session_id TEXT NOT NULL,
1278                graph_name TEXT NOT NULL,
1279                vector_clock TEXT NOT NULL,
1280                last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1281            );
1282
1283            CREATE TABLE graph_tombstones (
1284                id BIGINT PRIMARY KEY DEFAULT nextval('graph_tombstones_id_seq'),
1285                session_id TEXT NOT NULL,
1286                entity_type TEXT NOT NULL,
1287                entity_id BIGINT NOT NULL,
1288                deleted_by TEXT NOT NULL,
1289                vector_clock TEXT NOT NULL,
1290                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
1291            );
1292            "#,
1293        )
1294        .expect("create graph schema");
1295
1296        extra(&conn);
1297
1298        KnowledgeGraphStore::from_connection(conn, "test-instance")
1299    }
1300
1301    #[test]
1302    fn insert_update_delete_node_flow() -> Result<()> {
1303        let store = setup_store();
1304        let props = json!({ "kind": "repository" });
1305        let node_id =
1306            store.insert_graph_node("session", NodeType::Entity, "SpecAI", &props, None)?;
1307
1308        let nodes = store.list_graph_nodes("session", None, Some(10))?;
1309        assert_eq!(nodes.len(), 1);
1310        assert_eq!(nodes[0].label, "SpecAI");
1311
1312        let updated_props = json!({ "kind": "repository", "stars": 42 });
1313        store.update_graph_node(node_id, &updated_props)?;
1314        let updated = store.get_graph_node(node_id)?.expect("node exists");
1315        assert_eq!(updated.properties["stars"], 42);
1316
1317        store.delete_graph_node(node_id)?;
1318        assert!(store.get_graph_node(node_id)?.is_none());
1319        Ok(())
1320    }
1321
1322    #[test]
1323    fn create_edges_and_find_paths() -> Result<()> {
1324        let store = setup_store();
1325        let a = store.insert_graph_node("session", NodeType::Entity, "A", &json!({}), None)?;
1326        let b = store.insert_graph_node("session", NodeType::Entity, "B", &json!({}), None)?;
1327        let c = store.insert_graph_node("session", NodeType::Entity, "C", &json!({}), None)?;
1328
1329        store.insert_graph_edge("session", a, b, EdgeType::RelatesTo, None, None, 1.0)?;
1330        store.insert_graph_edge("session", b, c, EdgeType::RelatesTo, None, None, 1.0)?;
1331
1332        let path = store
1333            .find_shortest_path("session", a, c, Some(5))?
1334            .expect("path exists");
1335        assert_eq!(path.nodes.len(), 3);
1336        assert_eq!(path.edges.len(), 2);
1337        assert_eq!(path.length, 2);
1338        assert_eq!(path.nodes.first().unwrap().label, "A");
1339        assert_eq!(path.nodes.last().unwrap().label, "C");
1340
1341        let edges = store.list_graph_edges("session", None, None)?;
1342        assert_eq!(edges.len(), 2);
1343
1344        Ok(())
1345    }
1346
1347    #[test]
1348    fn traverse_neighbors_respects_direction() -> Result<()> {
1349        let store = setup_store();
1350        let alpha =
1351            store.insert_graph_node("session", NodeType::Entity, "Alpha", &json!({}), None)?;
1352        let beta =
1353            store.insert_graph_node("session", NodeType::Entity, "Beta", &json!({}), None)?;
1354        let gamma =
1355            store.insert_graph_node("session", NodeType::Entity, "Gamma", &json!({}), None)?;
1356
1357        store.insert_graph_edge("session", alpha, beta, EdgeType::RelatesTo, None, None, 1.0)?;
1358        store.insert_graph_edge("session", beta, gamma, EdgeType::RelatesTo, None, None, 1.0)?;
1359
1360        let outgoing =
1361            store.traverse_neighbors("session", alpha, TraversalDirection::Outgoing, 2)?;
1362        assert_eq!(outgoing.len(), 2);
1363        assert!(outgoing.iter().any(|node| node.label == "Beta"));
1364        assert!(outgoing.iter().any(|node| node.label == "Gamma"));
1365
1366        let incoming =
1367            store.traverse_neighbors("session", gamma, TraversalDirection::Incoming, 2)?;
1368        assert_eq!(incoming.len(), 2);
1369        assert!(incoming.iter().any(|node| node.label == "Beta"));
1370        assert!(incoming.iter().any(|node| node.label == "Alpha"));
1371
1372        Ok(())
1373    }
1374}