Skip to main content

velesdb_core/collection/
graph_collection.rs

1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29///     "./data/kg".into(),
30///     "knowledge",
31///     None,                    // no embeddings
32///     DistanceMetric::Cosine,  // unused when no embeddings
33///     GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42    /// Single source of truth — all graph state lives here (C-02 pure newtype).
43    pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47    // -------------------------------------------------------------------------
48    // Lifecycle
49    // -------------------------------------------------------------------------
50
51    /// Creates a new `GraphCollection`.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the directory cannot be created or storage fails.
56    pub fn create(
57        path: PathBuf,
58        name: &str,
59        dimension: Option<usize>,
60        metric: DistanceMetric,
61        schema: GraphSchema,
62    ) -> Result<Self> {
63        Ok(Self {
64            inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65        })
66    }
67
68    /// Opens an existing `GraphCollection` from disk.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if config or storage cannot be opened.
73    pub fn open(path: PathBuf) -> Result<Self> {
74        Ok(Self {
75            inner: Collection::open(path)?,
76        })
77    }
78
79    /// Flushes all state to disk.
80    ///
81    /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82    /// The WAL provides crash recovery for the vector index.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if any flush operation fails.
87    pub fn flush(&self) -> Result<()> {
88        self.inner.flush()
89    }
90
91    /// Full durability flush including `vectors.idx` serialization.
92    ///
93    /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94    /// on the next startup.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if any flush operation fails.
99    pub fn flush_full(&self) -> Result<()> {
100        self.inner.flush_full()
101    }
102
103    // -------------------------------------------------------------------------
104    // Metadata
105    // -------------------------------------------------------------------------
106
107    /// Returns the collection name.
108    #[must_use]
109    pub fn name(&self) -> String {
110        self.inner.config().name
111    }
112
113    /// Returns the graph schema stored in config.
114    ///
115    /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116    #[must_use]
117    pub fn schema(&self) -> GraphSchema {
118        self.inner
119            .graph_schema()
120            .unwrap_or_else(GraphSchema::schemaless)
121    }
122
123    /// Returns `true` if this collection stores node embeddings.
124    #[must_use]
125    pub fn has_embeddings(&self) -> bool {
126        self.inner.has_embeddings()
127    }
128
129    // -------------------------------------------------------------------------
130    // Graph operations — delegate to Collection graph API
131    // -------------------------------------------------------------------------
132
133    /// Adds an edge between two nodes.
134    ///
135    /// # Errors
136    ///
137    /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138    ///
139    /// # Examples
140    ///
141    /// ```rust,no_run
142    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144    /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145    /// coll.add_edge(edge)?;
146    /// # Ok::<(), velesdb_core::Error>(())
147    /// ```
148    pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149        self.inner.add_edge(edge)
150    }
151
152    /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153    ///
154    /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155    /// once at the end. Duplicate edge IDs are silently skipped.
156    ///
157    /// # Returns
158    ///
159    /// Number of edges successfully added.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if WAL durability logging fails for graph
164    /// collections (fail-closed: the in-memory store is not mutated).
165    pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> Result<usize> {
166        self.inner.add_edges_batch(edges)
167    }
168
169    /// Returns edges, optionally filtered by label.
170    #[must_use]
171    pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
172        match label {
173            Some(lbl) => self.inner.get_edges_by_label(lbl),
174            None => self.inner.get_all_edges(),
175        }
176    }
177
178    /// Returns all outgoing edges from a node.
179    #[must_use]
180    pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
181        self.inner.get_outgoing_edges(node_id)
182    }
183
184    /// Returns all incoming edges to a node.
185    #[must_use]
186    pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
187        self.inner.get_incoming_edges(node_id)
188    }
189
190    /// Returns the total number of edges in the graph without materializing them.
191    #[must_use]
192    pub fn edge_count(&self) -> usize {
193        self.inner.edge_count()
194    }
195
196    /// Returns `(in_degree, out_degree)` for a node.
197    #[must_use]
198    pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
199        self.inner.get_node_degree(node_id)
200    }
201
202    /// Returns the IDs of all nodes that have a stored payload.
203    ///
204    /// Nodes that appear only as edge endpoints without a stored payload
205    /// are not included. Use [`GraphCollection::get_edges`] to discover
206    /// all referenced node IDs.
207    #[must_use]
208    pub fn all_node_ids(&self) -> Vec<u64> {
209        self.inner.all_ids()
210    }
211
212    /// Returns the next batch of points for scroll iteration.
213    ///
214    /// Delegates to the inner collection's `scroll_batch` (parallel
215    /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if `batch_size` is 0.
220    pub fn scroll_batch(
221        &self,
222        cursor: Option<u64>,
223        batch_size: usize,
224        filter: Option<&crate::filter::Filter>,
225    ) -> Result<crate::collection::ScrollBatch> {
226        self.inner.scroll_batch(cursor, batch_size, filter)
227    }
228
229    /// Returns the number of nodes (points) stored in this collection.
230    #[must_use]
231    pub fn len(&self) -> usize {
232        self.inner.len()
233    }
234
235    /// Returns `true` if the collection contains no nodes.
236    #[must_use]
237    pub fn is_empty(&self) -> bool {
238        self.inner.is_empty()
239    }
240
241    /// Retrieves nodes by IDs, returning `None` for missing entries.
242    #[must_use]
243    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
244        self.inner.get(ids)
245    }
246
247    /// Deletes nodes by IDs.
248    ///
249    /// Missing IDs are silently ignored.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if storage operations fail.
254    pub fn delete(&self, ids: &[u64]) -> Result<()> {
255        self.inner.delete(ids)
256    }
257
258    /// Removes an edge from the graph by ID.
259    ///
260    /// Returns `true` if the edge existed and was removed, `false` otherwise.
261    #[must_use]
262    pub fn remove_edge(&self, edge_id: u64) -> bool {
263        self.inner.remove_edge(edge_id)
264    }
265
266    /// Returns `true` if an edge with `edge_id` exists in the graph.
267    #[must_use]
268    pub fn has_edge(&self, edge_id: u64) -> bool {
269        self.inner.edge_exists(edge_id)
270    }
271
272    /// Performs BFS traversal from a source node.
273    ///
274    /// # Examples
275    ///
276    /// ```rust,no_run
277    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
278    /// # use velesdb_core::collection::graph::TraversalConfig;
279    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
280    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
281    /// let results = coll.traverse_bfs(100, &config);
282    /// for r in &results {
283    ///     println!("node={} depth={}", r.target_id, r.depth);
284    /// }
285    /// # Ok::<(), velesdb_core::Error>(())
286    /// ```
287    #[must_use]
288    pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
289        self.inner.traverse_bfs_config(source_id, config)
290    }
291
292    /// Performs DFS traversal from a source node.
293    #[must_use]
294    pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
295        self.inner.traverse_dfs_config(source_id, config)
296    }
297
298    /// Performs parallel BFS traversal from multiple start nodes.
299    ///
300    /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
301    /// distributes independent per-start-node BFS traversals across CPU cores.
302    /// Results are deduplicated by path signature and truncated to `config.limit`.
303    ///
304    /// # Examples
305    ///
306    /// ```rust,no_run
307    /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
308    /// # use velesdb_core::collection::graph::TraversalConfig;
309    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
310    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
311    /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
312    /// for r in &results {
313    ///     println!("node={} depth={}", r.target_id, r.depth);
314    /// }
315    /// # Ok::<(), velesdb_core::Error>(())
316    /// ```
317    #[must_use]
318    pub fn traverse_bfs_parallel(
319        &self,
320        start_nodes: &[u64],
321        config: &TraversalConfig,
322    ) -> Vec<TraversalResult> {
323        self.inner.traverse_bfs_parallel(start_nodes, config)
324    }
325
326    // -------------------------------------------------------------------------
327    // Payload / node properties
328    // -------------------------------------------------------------------------
329
330    /// Inserts or updates node payload (properties).
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if storage fails.
335    pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
336        self.inner.store_node_payload(node_id, payload)
337    }
338
339    /// Inserts or updates a node payload, optionally with an embedding vector.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if storage fails, the vector dimension is invalid, or
344    /// an embedding is supplied for a graph collection without embeddings.
345    pub fn upsert_node(
346        &self,
347        node_id: u64,
348        payload: &serde_json::Value,
349        vector: Option<Vec<f32>>,
350    ) -> Result<()> {
351        match vector {
352            Some(vector) => self
353                .inner
354                .upsert([Point::new(node_id, vector, Some(payload.clone()))]),
355            None => self.upsert_node_payload(node_id, payload),
356        }
357    }
358
359    /// Inserts or updates node payload (properties).
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if storage fails.
364    #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
365    pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
366        self.upsert_node_payload(node_id, payload)
367    }
368
369    /// Retrieves node payload.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if retrieval fails.
374    pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
375        self.inner.get_node_payload(node_id)
376    }
377
378    // -------------------------------------------------------------------------
379    // Optional embedding search
380    // -------------------------------------------------------------------------
381
382    /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
383    ///
384    /// # Errors
385    ///
386    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
387    /// or `Error::DimensionMismatch` if the query dimension is wrong.
388    pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
389        self.inner.search_by_embedding(query, k)
390    }
391
392    /// Alias for [`search_by_embedding`](Self::search_by_embedding).
393    ///
394    /// Provided for API parity with [`crate::VectorCollection::search`].
395    ///
396    /// # Errors
397    ///
398    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
399    /// or `Error::DimensionMismatch` if the query dimension is wrong.
400    pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
401        self.search_by_embedding(query, k)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::collection::graph::GraphSchema;
409    use crate::distance::DistanceMetric;
410    use std::collections::HashMap;
411    use tempfile::{tempdir, TempDir};
412
413    /// Creates a schemaless cosine `GraphCollection` in a fresh temp dir.
414    ///
415    /// Returns the `TempDir` guard alongside the collection so the backing
416    /// directory outlives the test. `dimension` controls embedding support
417    /// (`None` for payload/edge-only collections, `Some(n)` for searchable ones).
418    fn make_test_collection(dimension: Option<usize>) -> (TempDir, GraphCollection) {
419        let dir = tempdir().unwrap();
420        let col = GraphCollection::create(
421            dir.path().to_path_buf(),
422            "kg",
423            dimension,
424            DistanceMetric::Cosine,
425            GraphSchema::schemaless(),
426        )
427        .unwrap();
428        (dir, col)
429    }
430
431    #[test]
432    fn test_all_node_ids_returns_ids_with_payload() {
433        let (_dir, col) = make_test_collection(None);
434
435        // Store payloads on two nodes
436        col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
437            .unwrap();
438        col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
439            .unwrap();
440
441        let ids = col.all_node_ids();
442        assert!(ids.contains(&10), "node 10 should be present");
443        assert!(ids.contains(&20), "node 20 should be present");
444        assert_eq!(ids.len(), 2);
445    }
446
447    #[test]
448    fn test_upsert_node_with_embedding_is_searchable() {
449        let (_dir, col) = make_test_collection(Some(4));
450
451        col.upsert_node(
452            10,
453            &serde_json::json!({"name": "Alice"}),
454            Some(vec![1.0, 0.0, 0.0, 0.0]),
455        )
456        .unwrap();
457
458        assert_eq!(
459            col.get_node_payload(10).unwrap(),
460            Some(serde_json::json!({"name": "Alice"}))
461        );
462        let results = col.search_by_embedding(&[1.0, 0.0, 0.0, 0.0], 1).unwrap();
463        assert_eq!(results[0].point.id, 10);
464    }
465
466    #[test]
467    fn test_edge_count_returns_correct_count() {
468        let (_dir, col) = make_test_collection(None);
469
470        assert_eq!(col.edge_count(), 0);
471
472        let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
473        col.add_edge(edge1).unwrap();
474        assert_eq!(col.edge_count(), 1);
475
476        let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
477        col.add_edge(edge2).unwrap();
478        assert_eq!(col.edge_count(), 2);
479    }
480
481    #[test]
482    fn test_traverse_bfs_parallel_through_graph_collection() {
483        let (_dir, col) = make_test_collection(None);
484
485        // Build chain: 1->2->3
486        col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
487            .unwrap();
488        col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
489            .unwrap();
490
491        let config = TraversalConfig {
492            max_depth: 3,
493            min_depth: 1,
494            ..TraversalConfig::default()
495        };
496        let results = col.traverse_bfs_parallel(&[1], &config);
497        let target_ids: std::collections::HashSet<u64> =
498            results.iter().map(|r| r.target_id).collect();
499        assert!(target_ids.contains(&2), "should reach node 2");
500        assert!(target_ids.contains(&3), "should reach node 3");
501    }
502
503    #[test]
504    fn test_execute_match_finds_edges() {
505        let (_dir, col) = make_test_collection(None);
506
507        // Store node payloads with labels
508        col.upsert_node_payload(
509            10,
510            &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
511        )
512        .unwrap();
513        col.upsert_node_payload(
514            20,
515            &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
516        )
517        .unwrap();
518
519        // Add edge: Alice -> Bob
520        let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
521        col.add_edge(edge).unwrap();
522
523        // MATCH query through the GraphCollection delegate
524        let match_clause = crate::velesql::MatchClause {
525            patterns: vec![crate::velesql::GraphPattern {
526                name: None,
527                nodes: vec![
528                    crate::velesql::NodePattern::new().with_alias("a"),
529                    crate::velesql::NodePattern::new().with_alias("b"),
530                ],
531                relationships: vec![crate::velesql::RelationshipPattern::new(
532                    crate::velesql::Direction::Outgoing,
533                )],
534            }],
535            where_clause: None,
536            return_clause: crate::velesql::ReturnClause {
537                items: vec![],
538                order_by: None,
539                limit: Some(10),
540            },
541        };
542
543        let params = HashMap::new();
544        let results = col.execute_match(&match_clause, &params).unwrap();
545        assert!(
546            !results.is_empty(),
547            "execute_match should find the KNOWS edge"
548        );
549        assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
550    }
551}