Skip to main content

velesdb_core/collection/
graph_collection.rs

1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29///     "./data/kg".into(),
30///     "knowledge",
31///     None,                    // no embeddings
32///     DistanceMetric::Cosine,  // unused when no embeddings
33///     GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42    /// Single source of truth — all graph state lives here (C-02 pure newtype).
43    pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47    // -------------------------------------------------------------------------
48    // Lifecycle
49    // -------------------------------------------------------------------------
50
51    /// Creates a new `GraphCollection`.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the directory cannot be created or storage fails.
56    pub fn create(
57        path: PathBuf,
58        name: &str,
59        dimension: Option<usize>,
60        metric: DistanceMetric,
61        schema: GraphSchema,
62    ) -> Result<Self> {
63        Ok(Self {
64            inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65        })
66    }
67
68    /// Opens an existing `GraphCollection` from disk.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if config or storage cannot be opened.
73    pub fn open(path: PathBuf) -> Result<Self> {
74        Ok(Self {
75            inner: Collection::open(path)?,
76        })
77    }
78
79    /// Flushes all state to disk.
80    ///
81    /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82    /// The WAL provides crash recovery for the vector index.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if any flush operation fails.
87    pub fn flush(&self) -> Result<()> {
88        self.inner.flush()
89    }
90
91    /// Full durability flush including `vectors.idx` serialization.
92    ///
93    /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94    /// on the next startup.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if any flush operation fails.
99    pub fn flush_full(&self) -> Result<()> {
100        self.inner.flush_full()
101    }
102
103    // -------------------------------------------------------------------------
104    // Metadata
105    // -------------------------------------------------------------------------
106
107    /// Returns the collection name.
108    #[must_use]
109    pub fn name(&self) -> String {
110        self.inner.config().name
111    }
112
113    /// Returns the graph schema stored in config.
114    ///
115    /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116    #[must_use]
117    pub fn schema(&self) -> GraphSchema {
118        self.inner
119            .graph_schema()
120            .unwrap_or_else(GraphSchema::schemaless)
121    }
122
123    /// Returns `true` if this collection stores node embeddings.
124    #[must_use]
125    pub fn has_embeddings(&self) -> bool {
126        self.inner.has_embeddings()
127    }
128
129    // -------------------------------------------------------------------------
130    // Graph operations — delegate to Collection graph API
131    // -------------------------------------------------------------------------
132
133    /// Adds an edge between two nodes.
134    ///
135    /// # Errors
136    ///
137    /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138    ///
139    /// # Examples
140    ///
141    /// ```rust,no_run
142    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144    /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145    /// coll.add_edge(edge)?;
146    /// # Ok::<(), velesdb_core::Error>(())
147    /// ```
148    pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149        self.inner.add_edge(edge)
150    }
151
152    /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153    ///
154    /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155    /// once at the end. Duplicate edge IDs are silently skipped.
156    ///
157    /// # Returns
158    ///
159    /// Number of edges successfully added.
160    #[must_use]
161    pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> usize {
162        let count = self.inner.edge_store.add_edges_batch(edges);
163        if count > 0 {
164            self.inner
165                .write_generation
166                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
167        }
168        count
169    }
170
171    /// Returns edges, optionally filtered by label.
172    #[must_use]
173    pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
174        match label {
175            Some(lbl) => self.inner.get_edges_by_label(lbl),
176            None => self.inner.get_all_edges(),
177        }
178    }
179
180    /// Returns all outgoing edges from a node.
181    #[must_use]
182    pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
183        self.inner.get_outgoing_edges(node_id)
184    }
185
186    /// Returns all incoming edges to a node.
187    #[must_use]
188    pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
189        self.inner.get_incoming_edges(node_id)
190    }
191
192    /// Returns the total number of edges in the graph without materializing them.
193    #[must_use]
194    pub fn edge_count(&self) -> usize {
195        self.inner.edge_count()
196    }
197
198    /// Returns `(in_degree, out_degree)` for a node.
199    #[must_use]
200    pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
201        self.inner.get_node_degree(node_id)
202    }
203
204    /// Returns the IDs of all nodes that have a stored payload.
205    ///
206    /// Nodes that appear only as edge endpoints without a stored payload
207    /// are not included. Use [`GraphCollection::get_edges`] to discover
208    /// all referenced node IDs.
209    #[must_use]
210    pub fn all_node_ids(&self) -> Vec<u64> {
211        self.inner.all_ids()
212    }
213
214    /// Returns the next batch of points for scroll iteration.
215    ///
216    /// Delegates to the inner collection's `scroll_batch` (parallel
217    /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if `batch_size` is 0.
222    pub fn scroll_batch(
223        &self,
224        cursor: Option<u64>,
225        batch_size: usize,
226        filter: Option<&crate::filter::Filter>,
227    ) -> Result<crate::collection::ScrollBatch> {
228        self.inner.scroll_batch(cursor, batch_size, filter)
229    }
230
231    /// Returns the number of nodes (points) stored in this collection.
232    #[must_use]
233    pub fn len(&self) -> usize {
234        self.inner.len()
235    }
236
237    /// Returns `true` if the collection contains no nodes.
238    #[must_use]
239    pub fn is_empty(&self) -> bool {
240        self.inner.is_empty()
241    }
242
243    /// Retrieves nodes by IDs, returning `None` for missing entries.
244    #[must_use]
245    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
246        self.inner.get(ids)
247    }
248
249    /// Deletes nodes by IDs.
250    ///
251    /// Missing IDs are silently ignored.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if storage operations fail.
256    pub fn delete(&self, ids: &[u64]) -> Result<()> {
257        self.inner.delete(ids)
258    }
259
260    /// Removes an edge from the graph by ID.
261    ///
262    /// Returns `true` if the edge existed and was removed, `false` otherwise.
263    #[must_use]
264    pub fn remove_edge(&self, edge_id: u64) -> bool {
265        self.inner.remove_edge(edge_id)
266    }
267
268    /// Performs BFS traversal from a source node.
269    ///
270    /// # Examples
271    ///
272    /// ```rust,no_run
273    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
274    /// # use velesdb_core::collection::graph::TraversalConfig;
275    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
276    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
277    /// let results = coll.traverse_bfs(100, &config);
278    /// for r in &results {
279    ///     println!("node={} depth={}", r.target_id, r.depth);
280    /// }
281    /// # Ok::<(), velesdb_core::Error>(())
282    /// ```
283    #[must_use]
284    pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
285        self.inner.traverse_bfs_config(source_id, config)
286    }
287
288    /// Performs DFS traversal from a source node.
289    #[must_use]
290    pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
291        self.inner.traverse_dfs_config(source_id, config)
292    }
293
294    /// Performs parallel BFS traversal from multiple start nodes.
295    ///
296    /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
297    /// distributes independent per-start-node BFS traversals across CPU cores.
298    /// Results are deduplicated by path signature and truncated to `config.limit`.
299    ///
300    /// # Examples
301    ///
302    /// ```rust,no_run
303    /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
304    /// # use velesdb_core::collection::graph::TraversalConfig;
305    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
306    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
307    /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
308    /// for r in &results {
309    ///     println!("node={} depth={}", r.target_id, r.depth);
310    /// }
311    /// # Ok::<(), velesdb_core::Error>(())
312    /// ```
313    #[must_use]
314    pub fn traverse_bfs_parallel(
315        &self,
316        start_nodes: &[u64],
317        config: &TraversalConfig,
318    ) -> Vec<TraversalResult> {
319        self.inner.traverse_bfs_parallel(start_nodes, config)
320    }
321
322    // -------------------------------------------------------------------------
323    // Payload / node properties
324    // -------------------------------------------------------------------------
325
326    /// Inserts or updates node payload (properties).
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if storage fails.
331    pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
332        self.inner.store_node_payload(node_id, payload)
333    }
334
335    /// Inserts or updates node payload (properties).
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if storage fails.
340    #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
341    pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
342        self.upsert_node_payload(node_id, payload)
343    }
344
345    /// Retrieves node payload.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if retrieval fails.
350    pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
351        self.inner.get_node_payload(node_id)
352    }
353
354    // -------------------------------------------------------------------------
355    // Optional embedding search
356    // -------------------------------------------------------------------------
357
358    /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
359    ///
360    /// # Errors
361    ///
362    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
363    /// or `Error::DimensionMismatch` if the query dimension is wrong.
364    pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
365        self.inner.search_by_embedding(query, k)
366    }
367
368    /// Alias for [`search_by_embedding`](Self::search_by_embedding).
369    ///
370    /// Provided for API parity with [`crate::VectorCollection::search`].
371    ///
372    /// # Errors
373    ///
374    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
375    /// or `Error::DimensionMismatch` if the query dimension is wrong.
376    pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
377        self.search_by_embedding(query, k)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::collection::graph::GraphSchema;
385    use crate::distance::DistanceMetric;
386    use std::collections::HashMap;
387    use tempfile::tempdir;
388
389    #[test]
390    fn test_all_node_ids_returns_ids_with_payload() {
391        let dir = tempdir().unwrap();
392        let col = GraphCollection::create(
393            dir.path().to_path_buf(),
394            "kg",
395            None,
396            DistanceMetric::Cosine,
397            GraphSchema::schemaless(),
398        )
399        .unwrap();
400
401        // Store payloads on two nodes
402        col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
403            .unwrap();
404        col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
405            .unwrap();
406
407        let ids = col.all_node_ids();
408        assert!(ids.contains(&10), "node 10 should be present");
409        assert!(ids.contains(&20), "node 20 should be present");
410        assert_eq!(ids.len(), 2);
411    }
412
413    #[test]
414    fn test_edge_count_returns_correct_count() {
415        let dir = tempdir().unwrap();
416        let col = GraphCollection::create(
417            dir.path().to_path_buf(),
418            "kg",
419            None,
420            DistanceMetric::Cosine,
421            GraphSchema::schemaless(),
422        )
423        .unwrap();
424
425        assert_eq!(col.edge_count(), 0);
426
427        let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
428        col.add_edge(edge1).unwrap();
429        assert_eq!(col.edge_count(), 1);
430
431        let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
432        col.add_edge(edge2).unwrap();
433        assert_eq!(col.edge_count(), 2);
434    }
435
436    #[test]
437    fn test_traverse_bfs_parallel_through_graph_collection() {
438        let dir = tempdir().unwrap();
439        let col = GraphCollection::create(
440            dir.path().to_path_buf(),
441            "kg",
442            None,
443            DistanceMetric::Cosine,
444            GraphSchema::schemaless(),
445        )
446        .unwrap();
447
448        // Build chain: 1->2->3
449        col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
450            .unwrap();
451        col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
452            .unwrap();
453
454        let config = TraversalConfig {
455            max_depth: 3,
456            min_depth: 1,
457            ..TraversalConfig::default()
458        };
459        let results = col.traverse_bfs_parallel(&[1], &config);
460        let target_ids: std::collections::HashSet<u64> =
461            results.iter().map(|r| r.target_id).collect();
462        assert!(target_ids.contains(&2), "should reach node 2");
463        assert!(target_ids.contains(&3), "should reach node 3");
464    }
465
466    #[test]
467    fn test_execute_match_finds_edges() {
468        let dir = tempdir().unwrap();
469        let col = GraphCollection::create(
470            dir.path().to_path_buf(),
471            "kg",
472            None,
473            DistanceMetric::Cosine,
474            GraphSchema::schemaless(),
475        )
476        .unwrap();
477
478        // Store node payloads with labels
479        col.upsert_node_payload(
480            10,
481            &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
482        )
483        .unwrap();
484        col.upsert_node_payload(
485            20,
486            &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
487        )
488        .unwrap();
489
490        // Add edge: Alice -> Bob
491        let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
492        col.add_edge(edge).unwrap();
493
494        // MATCH query through the GraphCollection delegate
495        let match_clause = crate::velesql::MatchClause {
496            patterns: vec![crate::velesql::GraphPattern {
497                name: None,
498                nodes: vec![
499                    crate::velesql::NodePattern::new().with_alias("a"),
500                    crate::velesql::NodePattern::new().with_alias("b"),
501                ],
502                relationships: vec![crate::velesql::RelationshipPattern::new(
503                    crate::velesql::Direction::Outgoing,
504                )],
505            }],
506            where_clause: None,
507            return_clause: crate::velesql::ReturnClause {
508                items: vec![],
509                order_by: None,
510                limit: Some(10),
511            },
512        };
513
514        let params = HashMap::new();
515        let results = col.execute_match(&match_clause, &params).unwrap();
516        assert!(
517            !results.is_empty(),
518            "execute_match should find the KNOWS edge"
519        );
520        assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
521    }
522}