velesdb_core/collection/graph_collection.rs
1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29/// "./data/kg".into(),
30/// "knowledge",
31/// None, // no embeddings
32/// DistanceMetric::Cosine, // unused when no embeddings
33/// GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42 /// Single source of truth — all graph state lives here (C-02 pure newtype).
43 pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47 // -------------------------------------------------------------------------
48 // Lifecycle
49 // -------------------------------------------------------------------------
50
51 /// Creates a new `GraphCollection`.
52 ///
53 /// # Errors
54 ///
55 /// Returns an error if the directory cannot be created or storage fails.
56 pub fn create(
57 path: PathBuf,
58 name: &str,
59 dimension: Option<usize>,
60 metric: DistanceMetric,
61 schema: GraphSchema,
62 ) -> Result<Self> {
63 Ok(Self {
64 inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65 })
66 }
67
68 /// Opens an existing `GraphCollection` from disk.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if config or storage cannot be opened.
73 pub fn open(path: PathBuf) -> Result<Self> {
74 Ok(Self {
75 inner: Collection::open(path)?,
76 })
77 }
78
79 /// Flushes all state to disk.
80 ///
81 /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82 /// The WAL provides crash recovery for the vector index.
83 ///
84 /// # Errors
85 ///
86 /// Returns an error if any flush operation fails.
87 pub fn flush(&self) -> Result<()> {
88 self.inner.flush()
89 }
90
91 /// Full durability flush including `vectors.idx` serialization.
92 ///
93 /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94 /// on the next startup.
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if any flush operation fails.
99 pub fn flush_full(&self) -> Result<()> {
100 self.inner.flush_full()
101 }
102
103 // -------------------------------------------------------------------------
104 // Metadata
105 // -------------------------------------------------------------------------
106
107 /// Returns the collection name.
108 #[must_use]
109 pub fn name(&self) -> String {
110 self.inner.config().name
111 }
112
113 /// Returns the graph schema stored in config.
114 ///
115 /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116 #[must_use]
117 pub fn schema(&self) -> GraphSchema {
118 self.inner
119 .graph_schema()
120 .unwrap_or_else(GraphSchema::schemaless)
121 }
122
123 /// Returns `true` if this collection stores node embeddings.
124 #[must_use]
125 pub fn has_embeddings(&self) -> bool {
126 self.inner.has_embeddings()
127 }
128
129 // -------------------------------------------------------------------------
130 // Graph operations — delegate to Collection graph API
131 // -------------------------------------------------------------------------
132
133 /// Adds an edge between two nodes.
134 ///
135 /// # Errors
136 ///
137 /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138 ///
139 /// # Examples
140 ///
141 /// ```rust,no_run
142 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144 /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145 /// coll.add_edge(edge)?;
146 /// # Ok::<(), velesdb_core::Error>(())
147 /// ```
148 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149 self.inner.add_edge(edge)
150 }
151
152 /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153 ///
154 /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155 /// once at the end. Duplicate edge IDs are silently skipped.
156 ///
157 /// # Returns
158 ///
159 /// Number of edges successfully added.
160 ///
161 /// # Errors
162 ///
163 /// Returns an error if WAL durability logging fails for graph
164 /// collections (fail-closed: the in-memory store is not mutated).
165 pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> Result<usize> {
166 self.inner.add_edges_batch(edges)
167 }
168
169 /// Returns edges, optionally filtered by label.
170 #[must_use]
171 pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
172 match label {
173 Some(lbl) => self.inner.get_edges_by_label(lbl),
174 None => self.inner.get_all_edges(),
175 }
176 }
177
178 /// Returns all outgoing edges from a node.
179 #[must_use]
180 pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
181 self.inner.get_outgoing_edges(node_id)
182 }
183
184 /// Returns all incoming edges to a node.
185 #[must_use]
186 pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
187 self.inner.get_incoming_edges(node_id)
188 }
189
190 /// Returns the total number of edges in the graph without materializing them.
191 #[must_use]
192 pub fn edge_count(&self) -> usize {
193 self.inner.edge_count()
194 }
195
196 /// Returns `(in_degree, out_degree)` for a node.
197 #[must_use]
198 pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
199 self.inner.get_node_degree(node_id)
200 }
201
202 /// Returns the IDs of all nodes that have a stored payload.
203 ///
204 /// Nodes that appear only as edge endpoints without a stored payload
205 /// are not included. Use [`GraphCollection::get_edges`] to discover
206 /// all referenced node IDs.
207 #[must_use]
208 pub fn all_node_ids(&self) -> Vec<u64> {
209 self.inner.all_ids()
210 }
211
212 /// Returns the next batch of points for scroll iteration.
213 ///
214 /// Delegates to the inner collection's `scroll_batch` (parallel
215 /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
216 ///
217 /// # Errors
218 ///
219 /// Returns an error if `batch_size` is 0.
220 pub fn scroll_batch(
221 &self,
222 cursor: Option<u64>,
223 batch_size: usize,
224 filter: Option<&crate::filter::Filter>,
225 ) -> Result<crate::collection::ScrollBatch> {
226 self.inner.scroll_batch(cursor, batch_size, filter)
227 }
228
229 /// Returns the number of nodes (points) stored in this collection.
230 #[must_use]
231 pub fn len(&self) -> usize {
232 self.inner.len()
233 }
234
235 /// Returns `true` if the collection contains no nodes.
236 #[must_use]
237 pub fn is_empty(&self) -> bool {
238 self.inner.is_empty()
239 }
240
241 /// Retrieves nodes by IDs, returning `None` for missing entries.
242 #[must_use]
243 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
244 self.inner.get(ids)
245 }
246
247 /// Deletes nodes by IDs.
248 ///
249 /// Missing IDs are silently ignored.
250 ///
251 /// # Errors
252 ///
253 /// Returns an error if storage operations fail.
254 pub fn delete(&self, ids: &[u64]) -> Result<()> {
255 self.inner.delete(ids)
256 }
257
258 /// Removes an edge from the graph by ID.
259 ///
260 /// Returns `true` if the edge existed and was removed, `false` otherwise.
261 #[must_use]
262 pub fn remove_edge(&self, edge_id: u64) -> bool {
263 self.inner.remove_edge(edge_id)
264 }
265
266 /// Performs BFS traversal from a source node.
267 ///
268 /// # Examples
269 ///
270 /// ```rust,no_run
271 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
272 /// # use velesdb_core::collection::graph::TraversalConfig;
273 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
274 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
275 /// let results = coll.traverse_bfs(100, &config);
276 /// for r in &results {
277 /// println!("node={} depth={}", r.target_id, r.depth);
278 /// }
279 /// # Ok::<(), velesdb_core::Error>(())
280 /// ```
281 #[must_use]
282 pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
283 self.inner.traverse_bfs_config(source_id, config)
284 }
285
286 /// Performs DFS traversal from a source node.
287 #[must_use]
288 pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
289 self.inner.traverse_dfs_config(source_id, config)
290 }
291
292 /// Performs parallel BFS traversal from multiple start nodes.
293 ///
294 /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
295 /// distributes independent per-start-node BFS traversals across CPU cores.
296 /// Results are deduplicated by path signature and truncated to `config.limit`.
297 ///
298 /// # Examples
299 ///
300 /// ```rust,no_run
301 /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
302 /// # use velesdb_core::collection::graph::TraversalConfig;
303 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
304 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
305 /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
306 /// for r in &results {
307 /// println!("node={} depth={}", r.target_id, r.depth);
308 /// }
309 /// # Ok::<(), velesdb_core::Error>(())
310 /// ```
311 #[must_use]
312 pub fn traverse_bfs_parallel(
313 &self,
314 start_nodes: &[u64],
315 config: &TraversalConfig,
316 ) -> Vec<TraversalResult> {
317 self.inner.traverse_bfs_parallel(start_nodes, config)
318 }
319
320 // -------------------------------------------------------------------------
321 // Payload / node properties
322 // -------------------------------------------------------------------------
323
324 /// Inserts or updates node payload (properties).
325 ///
326 /// # Errors
327 ///
328 /// Returns an error if storage fails.
329 pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
330 self.inner.store_node_payload(node_id, payload)
331 }
332
333 /// Inserts or updates node payload (properties).
334 ///
335 /// # Errors
336 ///
337 /// Returns an error if storage fails.
338 #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
339 pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
340 self.upsert_node_payload(node_id, payload)
341 }
342
343 /// Retrieves node payload.
344 ///
345 /// # Errors
346 ///
347 /// Returns an error if retrieval fails.
348 pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
349 self.inner.get_node_payload(node_id)
350 }
351
352 // -------------------------------------------------------------------------
353 // Optional embedding search
354 // -------------------------------------------------------------------------
355
356 /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
357 ///
358 /// # Errors
359 ///
360 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
361 /// or `Error::DimensionMismatch` if the query dimension is wrong.
362 pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
363 self.inner.search_by_embedding(query, k)
364 }
365
366 /// Alias for [`search_by_embedding`](Self::search_by_embedding).
367 ///
368 /// Provided for API parity with [`crate::VectorCollection::search`].
369 ///
370 /// # Errors
371 ///
372 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
373 /// or `Error::DimensionMismatch` if the query dimension is wrong.
374 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
375 self.search_by_embedding(query, k)
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use crate::collection::graph::GraphSchema;
383 use crate::distance::DistanceMetric;
384 use std::collections::HashMap;
385 use tempfile::tempdir;
386
387 #[test]
388 fn test_all_node_ids_returns_ids_with_payload() {
389 let dir = tempdir().unwrap();
390 let col = GraphCollection::create(
391 dir.path().to_path_buf(),
392 "kg",
393 None,
394 DistanceMetric::Cosine,
395 GraphSchema::schemaless(),
396 )
397 .unwrap();
398
399 // Store payloads on two nodes
400 col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
401 .unwrap();
402 col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
403 .unwrap();
404
405 let ids = col.all_node_ids();
406 assert!(ids.contains(&10), "node 10 should be present");
407 assert!(ids.contains(&20), "node 20 should be present");
408 assert_eq!(ids.len(), 2);
409 }
410
411 #[test]
412 fn test_edge_count_returns_correct_count() {
413 let dir = tempdir().unwrap();
414 let col = GraphCollection::create(
415 dir.path().to_path_buf(),
416 "kg",
417 None,
418 DistanceMetric::Cosine,
419 GraphSchema::schemaless(),
420 )
421 .unwrap();
422
423 assert_eq!(col.edge_count(), 0);
424
425 let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
426 col.add_edge(edge1).unwrap();
427 assert_eq!(col.edge_count(), 1);
428
429 let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
430 col.add_edge(edge2).unwrap();
431 assert_eq!(col.edge_count(), 2);
432 }
433
434 #[test]
435 fn test_traverse_bfs_parallel_through_graph_collection() {
436 let dir = tempdir().unwrap();
437 let col = GraphCollection::create(
438 dir.path().to_path_buf(),
439 "kg",
440 None,
441 DistanceMetric::Cosine,
442 GraphSchema::schemaless(),
443 )
444 .unwrap();
445
446 // Build chain: 1->2->3
447 col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
448 .unwrap();
449 col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
450 .unwrap();
451
452 let config = TraversalConfig {
453 max_depth: 3,
454 min_depth: 1,
455 ..TraversalConfig::default()
456 };
457 let results = col.traverse_bfs_parallel(&[1], &config);
458 let target_ids: std::collections::HashSet<u64> =
459 results.iter().map(|r| r.target_id).collect();
460 assert!(target_ids.contains(&2), "should reach node 2");
461 assert!(target_ids.contains(&3), "should reach node 3");
462 }
463
464 #[test]
465 fn test_execute_match_finds_edges() {
466 let dir = tempdir().unwrap();
467 let col = GraphCollection::create(
468 dir.path().to_path_buf(),
469 "kg",
470 None,
471 DistanceMetric::Cosine,
472 GraphSchema::schemaless(),
473 )
474 .unwrap();
475
476 // Store node payloads with labels
477 col.upsert_node_payload(
478 10,
479 &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
480 )
481 .unwrap();
482 col.upsert_node_payload(
483 20,
484 &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
485 )
486 .unwrap();
487
488 // Add edge: Alice -> Bob
489 let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
490 col.add_edge(edge).unwrap();
491
492 // MATCH query through the GraphCollection delegate
493 let match_clause = crate::velesql::MatchClause {
494 patterns: vec![crate::velesql::GraphPattern {
495 name: None,
496 nodes: vec![
497 crate::velesql::NodePattern::new().with_alias("a"),
498 crate::velesql::NodePattern::new().with_alias("b"),
499 ],
500 relationships: vec![crate::velesql::RelationshipPattern::new(
501 crate::velesql::Direction::Outgoing,
502 )],
503 }],
504 where_clause: None,
505 return_clause: crate::velesql::ReturnClause {
506 items: vec![],
507 order_by: None,
508 limit: Some(10),
509 },
510 };
511
512 let params = HashMap::new();
513 let results = col.execute_match(&match_clause, ¶ms).unwrap();
514 assert!(
515 !results.is_empty(),
516 "execute_match should find the KNOWS edge"
517 );
518 assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
519 }
520}