Skip to main content

velesdb_core/collection/
graph_collection.rs

1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29///     "./data/kg".into(),
30///     "knowledge",
31///     None,                    // no embeddings
32///     DistanceMetric::Cosine,  // unused when no embeddings
33///     GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42    /// Single source of truth — all graph state lives here (C-02 pure newtype).
43    pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47    // -------------------------------------------------------------------------
48    // Lifecycle
49    // -------------------------------------------------------------------------
50
51    /// Creates a new `GraphCollection`.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the directory cannot be created or storage fails.
56    pub fn create(
57        path: PathBuf,
58        name: &str,
59        dimension: Option<usize>,
60        metric: DistanceMetric,
61        schema: GraphSchema,
62    ) -> Result<Self> {
63        Ok(Self {
64            inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65        })
66    }
67
68    /// Opens an existing `GraphCollection` from disk.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if config or storage cannot be opened.
73    pub fn open(path: PathBuf) -> Result<Self> {
74        Ok(Self {
75            inner: Collection::open(path)?,
76        })
77    }
78
79    /// Flushes all state to disk.
80    ///
81    /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82    /// The WAL provides crash recovery for the vector index.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if any flush operation fails.
87    pub fn flush(&self) -> Result<()> {
88        self.inner.flush()
89    }
90
91    /// Full durability flush including `vectors.idx` serialization.
92    ///
93    /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94    /// on the next startup.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if any flush operation fails.
99    pub fn flush_full(&self) -> Result<()> {
100        self.inner.flush_full()
101    }
102
103    // -------------------------------------------------------------------------
104    // Metadata
105    // -------------------------------------------------------------------------
106
107    /// Returns the collection name.
108    #[must_use]
109    pub fn name(&self) -> String {
110        self.inner.config().name
111    }
112
113    /// Returns the graph schema stored in config.
114    ///
115    /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116    #[must_use]
117    pub fn schema(&self) -> GraphSchema {
118        self.inner
119            .graph_schema()
120            .unwrap_or_else(GraphSchema::schemaless)
121    }
122
123    /// Returns `true` if this collection stores node embeddings.
124    #[must_use]
125    pub fn has_embeddings(&self) -> bool {
126        self.inner.has_embeddings()
127    }
128
129    // -------------------------------------------------------------------------
130    // Graph operations — delegate to Collection graph API
131    // -------------------------------------------------------------------------
132
133    /// Adds an edge between two nodes.
134    ///
135    /// # Errors
136    ///
137    /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138    ///
139    /// # Examples
140    ///
141    /// ```rust,no_run
142    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144    /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145    /// coll.add_edge(edge)?;
146    /// # Ok::<(), velesdb_core::Error>(())
147    /// ```
148    pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149        self.inner.add_edge(edge)
150    }
151
152    /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153    ///
154    /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155    /// once at the end. Duplicate edge IDs are silently skipped.
156    ///
157    /// # Returns
158    ///
159    /// Number of edges successfully added.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if WAL durability logging fails for graph
164    /// collections (fail-closed: the in-memory store is not mutated).
165    pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> Result<usize> {
166        self.inner.add_edges_batch(edges)
167    }
168
169    /// Returns edges, optionally filtered by label.
170    #[must_use]
171    pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
172        match label {
173            Some(lbl) => self.inner.get_edges_by_label(lbl),
174            None => self.inner.get_all_edges(),
175        }
176    }
177
178    /// Returns all outgoing edges from a node.
179    #[must_use]
180    pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
181        self.inner.get_outgoing_edges(node_id)
182    }
183
184    /// Returns all incoming edges to a node.
185    #[must_use]
186    pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
187        self.inner.get_incoming_edges(node_id)
188    }
189
190    /// Returns the total number of edges in the graph without materializing them.
191    #[must_use]
192    pub fn edge_count(&self) -> usize {
193        self.inner.edge_count()
194    }
195
196    /// Returns `(in_degree, out_degree)` for a node.
197    #[must_use]
198    pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
199        self.inner.get_node_degree(node_id)
200    }
201
202    /// Returns the IDs of all nodes that have a stored payload.
203    ///
204    /// Nodes that appear only as edge endpoints without a stored payload
205    /// are not included. Use [`GraphCollection::get_edges`] to discover
206    /// all referenced node IDs.
207    #[must_use]
208    pub fn all_node_ids(&self) -> Vec<u64> {
209        self.inner.all_ids()
210    }
211
212    /// Returns the next batch of points for scroll iteration.
213    ///
214    /// Delegates to the inner collection's `scroll_batch` (parallel
215    /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if `batch_size` is 0.
220    pub fn scroll_batch(
221        &self,
222        cursor: Option<u64>,
223        batch_size: usize,
224        filter: Option<&crate::filter::Filter>,
225    ) -> Result<crate::collection::ScrollBatch> {
226        self.inner.scroll_batch(cursor, batch_size, filter)
227    }
228
229    /// Returns the number of nodes (points) stored in this collection.
230    #[must_use]
231    pub fn len(&self) -> usize {
232        self.inner.len()
233    }
234
235    /// Returns `true` if the collection contains no nodes.
236    #[must_use]
237    pub fn is_empty(&self) -> bool {
238        self.inner.is_empty()
239    }
240
241    /// Retrieves nodes by IDs, returning `None` for missing entries.
242    #[must_use]
243    pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
244        self.inner.get(ids)
245    }
246
247    /// Deletes nodes by IDs.
248    ///
249    /// Missing IDs are silently ignored.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if storage operations fail.
254    pub fn delete(&self, ids: &[u64]) -> Result<()> {
255        self.inner.delete(ids)
256    }
257
258    /// Removes an edge from the graph by ID.
259    ///
260    /// Returns `true` if the edge existed and was removed, `false` otherwise.
261    #[must_use]
262    pub fn remove_edge(&self, edge_id: u64) -> bool {
263        self.inner.remove_edge(edge_id)
264    }
265
266    /// Performs BFS traversal from a source node.
267    ///
268    /// # Examples
269    ///
270    /// ```rust,no_run
271    /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
272    /// # use velesdb_core::collection::graph::TraversalConfig;
273    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
274    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
275    /// let results = coll.traverse_bfs(100, &config);
276    /// for r in &results {
277    ///     println!("node={} depth={}", r.target_id, r.depth);
278    /// }
279    /// # Ok::<(), velesdb_core::Error>(())
280    /// ```
281    #[must_use]
282    pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
283        self.inner.traverse_bfs_config(source_id, config)
284    }
285
286    /// Performs DFS traversal from a source node.
287    #[must_use]
288    pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
289        self.inner.traverse_dfs_config(source_id, config)
290    }
291
292    /// Performs parallel BFS traversal from multiple start nodes.
293    ///
294    /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
295    /// distributes independent per-start-node BFS traversals across CPU cores.
296    /// Results are deduplicated by path signature and truncated to `config.limit`.
297    ///
298    /// # Examples
299    ///
300    /// ```rust,no_run
301    /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
302    /// # use velesdb_core::collection::graph::TraversalConfig;
303    /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
304    /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
305    /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
306    /// for r in &results {
307    ///     println!("node={} depth={}", r.target_id, r.depth);
308    /// }
309    /// # Ok::<(), velesdb_core::Error>(())
310    /// ```
311    #[must_use]
312    pub fn traverse_bfs_parallel(
313        &self,
314        start_nodes: &[u64],
315        config: &TraversalConfig,
316    ) -> Vec<TraversalResult> {
317        self.inner.traverse_bfs_parallel(start_nodes, config)
318    }
319
320    // -------------------------------------------------------------------------
321    // Payload / node properties
322    // -------------------------------------------------------------------------
323
324    /// Inserts or updates node payload (properties).
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if storage fails.
329    pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
330        self.inner.store_node_payload(node_id, payload)
331    }
332
333    /// Inserts or updates node payload (properties).
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if storage fails.
338    #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
339    pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
340        self.upsert_node_payload(node_id, payload)
341    }
342
343    /// Retrieves node payload.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if retrieval fails.
348    pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
349        self.inner.get_node_payload(node_id)
350    }
351
352    // -------------------------------------------------------------------------
353    // Optional embedding search
354    // -------------------------------------------------------------------------
355
356    /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
357    ///
358    /// # Errors
359    ///
360    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
361    /// or `Error::DimensionMismatch` if the query dimension is wrong.
362    pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
363        self.inner.search_by_embedding(query, k)
364    }
365
366    /// Alias for [`search_by_embedding`](Self::search_by_embedding).
367    ///
368    /// Provided for API parity with [`crate::VectorCollection::search`].
369    ///
370    /// # Errors
371    ///
372    /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
373    /// or `Error::DimensionMismatch` if the query dimension is wrong.
374    pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
375        self.search_by_embedding(query, k)
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::collection::graph::GraphSchema;
383    use crate::distance::DistanceMetric;
384    use std::collections::HashMap;
385    use tempfile::tempdir;
386
387    #[test]
388    fn test_all_node_ids_returns_ids_with_payload() {
389        let dir = tempdir().unwrap();
390        let col = GraphCollection::create(
391            dir.path().to_path_buf(),
392            "kg",
393            None,
394            DistanceMetric::Cosine,
395            GraphSchema::schemaless(),
396        )
397        .unwrap();
398
399        // Store payloads on two nodes
400        col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
401            .unwrap();
402        col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
403            .unwrap();
404
405        let ids = col.all_node_ids();
406        assert!(ids.contains(&10), "node 10 should be present");
407        assert!(ids.contains(&20), "node 20 should be present");
408        assert_eq!(ids.len(), 2);
409    }
410
411    #[test]
412    fn test_edge_count_returns_correct_count() {
413        let dir = tempdir().unwrap();
414        let col = GraphCollection::create(
415            dir.path().to_path_buf(),
416            "kg",
417            None,
418            DistanceMetric::Cosine,
419            GraphSchema::schemaless(),
420        )
421        .unwrap();
422
423        assert_eq!(col.edge_count(), 0);
424
425        let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
426        col.add_edge(edge1).unwrap();
427        assert_eq!(col.edge_count(), 1);
428
429        let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
430        col.add_edge(edge2).unwrap();
431        assert_eq!(col.edge_count(), 2);
432    }
433
434    #[test]
435    fn test_traverse_bfs_parallel_through_graph_collection() {
436        let dir = tempdir().unwrap();
437        let col = GraphCollection::create(
438            dir.path().to_path_buf(),
439            "kg",
440            None,
441            DistanceMetric::Cosine,
442            GraphSchema::schemaless(),
443        )
444        .unwrap();
445
446        // Build chain: 1->2->3
447        col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
448            .unwrap();
449        col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
450            .unwrap();
451
452        let config = TraversalConfig {
453            max_depth: 3,
454            min_depth: 1,
455            ..TraversalConfig::default()
456        };
457        let results = col.traverse_bfs_parallel(&[1], &config);
458        let target_ids: std::collections::HashSet<u64> =
459            results.iter().map(|r| r.target_id).collect();
460        assert!(target_ids.contains(&2), "should reach node 2");
461        assert!(target_ids.contains(&3), "should reach node 3");
462    }
463
464    #[test]
465    fn test_execute_match_finds_edges() {
466        let dir = tempdir().unwrap();
467        let col = GraphCollection::create(
468            dir.path().to_path_buf(),
469            "kg",
470            None,
471            DistanceMetric::Cosine,
472            GraphSchema::schemaless(),
473        )
474        .unwrap();
475
476        // Store node payloads with labels
477        col.upsert_node_payload(
478            10,
479            &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
480        )
481        .unwrap();
482        col.upsert_node_payload(
483            20,
484            &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
485        )
486        .unwrap();
487
488        // Add edge: Alice -> Bob
489        let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
490        col.add_edge(edge).unwrap();
491
492        // MATCH query through the GraphCollection delegate
493        let match_clause = crate::velesql::MatchClause {
494            patterns: vec![crate::velesql::GraphPattern {
495                name: None,
496                nodes: vec![
497                    crate::velesql::NodePattern::new().with_alias("a"),
498                    crate::velesql::NodePattern::new().with_alias("b"),
499                ],
500                relationships: vec![crate::velesql::RelationshipPattern::new(
501                    crate::velesql::Direction::Outgoing,
502                )],
503            }],
504            where_clause: None,
505            return_clause: crate::velesql::ReturnClause {
506                items: vec![],
507                order_by: None,
508                limit: Some(10),
509            },
510        };
511
512        let params = HashMap::new();
513        let results = col.execute_match(&match_clause, &params).unwrap();
514        assert!(
515            !results.is_empty(),
516            "execute_match should find the KNOWS edge"
517        );
518        assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
519    }
520}