velesdb_core/collection/graph_collection.rs
1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29/// "./data/kg".into(),
30/// "knowledge",
31/// None, // no embeddings
32/// DistanceMetric::Cosine, // unused when no embeddings
33/// GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42 /// Single source of truth — all graph state lives here (C-02 pure newtype).
43 pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47 // -------------------------------------------------------------------------
48 // Lifecycle
49 // -------------------------------------------------------------------------
50
51 /// Creates a new `GraphCollection`.
52 ///
53 /// # Errors
54 ///
55 /// Returns an error if the directory cannot be created or storage fails.
56 pub fn create(
57 path: PathBuf,
58 name: &str,
59 dimension: Option<usize>,
60 metric: DistanceMetric,
61 schema: GraphSchema,
62 ) -> Result<Self> {
63 Ok(Self {
64 inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65 })
66 }
67
68 /// Opens an existing `GraphCollection` from disk.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if config or storage cannot be opened.
73 pub fn open(path: PathBuf) -> Result<Self> {
74 Ok(Self {
75 inner: Collection::open(path)?,
76 })
77 }
78
79 /// Flushes all state to disk.
80 ///
81 /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82 /// The WAL provides crash recovery for the vector index.
83 ///
84 /// # Errors
85 ///
86 /// Returns an error if any flush operation fails.
87 pub fn flush(&self) -> Result<()> {
88 self.inner.flush()
89 }
90
91 /// Full durability flush including `vectors.idx` serialization.
92 ///
93 /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94 /// on the next startup.
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if any flush operation fails.
99 pub fn flush_full(&self) -> Result<()> {
100 self.inner.flush_full()
101 }
102
103 // -------------------------------------------------------------------------
104 // Metadata
105 // -------------------------------------------------------------------------
106
107 /// Returns the collection name.
108 #[must_use]
109 pub fn name(&self) -> String {
110 self.inner.config().name
111 }
112
113 /// Returns the graph schema stored in config.
114 ///
115 /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116 #[must_use]
117 pub fn schema(&self) -> GraphSchema {
118 self.inner
119 .graph_schema()
120 .unwrap_or_else(GraphSchema::schemaless)
121 }
122
123 /// Returns `true` if this collection stores node embeddings.
124 #[must_use]
125 pub fn has_embeddings(&self) -> bool {
126 self.inner.has_embeddings()
127 }
128
129 // -------------------------------------------------------------------------
130 // Graph operations — delegate to Collection graph API
131 // -------------------------------------------------------------------------
132
133 /// Adds an edge between two nodes.
134 ///
135 /// # Errors
136 ///
137 /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138 ///
139 /// # Examples
140 ///
141 /// ```rust,no_run
142 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144 /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145 /// coll.add_edge(edge)?;
146 /// # Ok::<(), velesdb_core::Error>(())
147 /// ```
148 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149 self.inner.add_edge(edge)
150 }
151
152 /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153 ///
154 /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155 /// once at the end. Duplicate edge IDs are silently skipped.
156 ///
157 /// # Returns
158 ///
159 /// Number of edges successfully added.
160 ///
161 /// # Errors
162 ///
163 /// Returns an error if WAL durability logging fails for graph
164 /// collections (fail-closed: the in-memory store is not mutated).
165 pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> Result<usize> {
166 self.inner.add_edges_batch(edges)
167 }
168
169 /// Returns edges, optionally filtered by label.
170 #[must_use]
171 pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
172 match label {
173 Some(lbl) => self.inner.get_edges_by_label(lbl),
174 None => self.inner.get_all_edges(),
175 }
176 }
177
178 /// Returns all outgoing edges from a node.
179 #[must_use]
180 pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
181 self.inner.get_outgoing_edges(node_id)
182 }
183
184 /// Returns all incoming edges to a node.
185 #[must_use]
186 pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
187 self.inner.get_incoming_edges(node_id)
188 }
189
190 /// Returns the total number of edges in the graph without materializing them.
191 #[must_use]
192 pub fn edge_count(&self) -> usize {
193 self.inner.edge_count()
194 }
195
196 /// Returns `(in_degree, out_degree)` for a node.
197 #[must_use]
198 pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
199 self.inner.get_node_degree(node_id)
200 }
201
202 /// Returns the IDs of all nodes that have a stored payload.
203 ///
204 /// Nodes that appear only as edge endpoints without a stored payload
205 /// are not included. Use [`GraphCollection::get_edges`] to discover
206 /// all referenced node IDs.
207 #[must_use]
208 pub fn all_node_ids(&self) -> Vec<u64> {
209 self.inner.all_ids()
210 }
211
212 /// Returns the next batch of points for scroll iteration.
213 ///
214 /// Delegates to the inner collection's `scroll_batch` (parallel
215 /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
216 ///
217 /// # Errors
218 ///
219 /// Returns an error if `batch_size` is 0.
220 pub fn scroll_batch(
221 &self,
222 cursor: Option<u64>,
223 batch_size: usize,
224 filter: Option<&crate::filter::Filter>,
225 ) -> Result<crate::collection::ScrollBatch> {
226 self.inner.scroll_batch(cursor, batch_size, filter)
227 }
228
229 /// Returns the number of nodes (points) stored in this collection.
230 #[must_use]
231 pub fn len(&self) -> usize {
232 self.inner.len()
233 }
234
235 /// Returns `true` if the collection contains no nodes.
236 #[must_use]
237 pub fn is_empty(&self) -> bool {
238 self.inner.is_empty()
239 }
240
241 /// Retrieves nodes by IDs, returning `None` for missing entries.
242 #[must_use]
243 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
244 self.inner.get(ids)
245 }
246
247 /// Deletes nodes by IDs.
248 ///
249 /// Missing IDs are silently ignored.
250 ///
251 /// # Errors
252 ///
253 /// Returns an error if storage operations fail.
254 pub fn delete(&self, ids: &[u64]) -> Result<()> {
255 self.inner.delete(ids)
256 }
257
258 /// Removes an edge from the graph by ID.
259 ///
260 /// Returns `true` if the edge existed and was removed, `false` otherwise.
261 #[must_use]
262 pub fn remove_edge(&self, edge_id: u64) -> bool {
263 self.inner.remove_edge(edge_id)
264 }
265
266 /// Returns `true` if an edge with `edge_id` exists in the graph.
267 #[must_use]
268 pub fn has_edge(&self, edge_id: u64) -> bool {
269 self.inner.edge_exists(edge_id)
270 }
271
272 /// Performs BFS traversal from a source node.
273 ///
274 /// # Examples
275 ///
276 /// ```rust,no_run
277 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
278 /// # use velesdb_core::collection::graph::TraversalConfig;
279 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
280 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
281 /// let results = coll.traverse_bfs(100, &config);
282 /// for r in &results {
283 /// println!("node={} depth={}", r.target_id, r.depth);
284 /// }
285 /// # Ok::<(), velesdb_core::Error>(())
286 /// ```
287 #[must_use]
288 pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
289 self.inner.traverse_bfs_config(source_id, config)
290 }
291
292 /// Performs DFS traversal from a source node.
293 #[must_use]
294 pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
295 self.inner.traverse_dfs_config(source_id, config)
296 }
297
298 /// Performs parallel BFS traversal from multiple start nodes.
299 ///
300 /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
301 /// distributes independent per-start-node BFS traversals across CPU cores.
302 /// Results are deduplicated by path signature and truncated to `config.limit`.
303 ///
304 /// # Examples
305 ///
306 /// ```rust,no_run
307 /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
308 /// # use velesdb_core::collection::graph::TraversalConfig;
309 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
310 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
311 /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
312 /// for r in &results {
313 /// println!("node={} depth={}", r.target_id, r.depth);
314 /// }
315 /// # Ok::<(), velesdb_core::Error>(())
316 /// ```
317 #[must_use]
318 pub fn traverse_bfs_parallel(
319 &self,
320 start_nodes: &[u64],
321 config: &TraversalConfig,
322 ) -> Vec<TraversalResult> {
323 self.inner.traverse_bfs_parallel(start_nodes, config)
324 }
325
326 // -------------------------------------------------------------------------
327 // Payload / node properties
328 // -------------------------------------------------------------------------
329
330 /// Inserts or updates node payload (properties).
331 ///
332 /// # Errors
333 ///
334 /// Returns an error if storage fails.
335 pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
336 self.inner.store_node_payload(node_id, payload)
337 }
338
339 /// Inserts or updates a node payload, optionally with an embedding vector.
340 ///
341 /// # Errors
342 ///
343 /// Returns an error if storage fails, the vector dimension is invalid, or
344 /// an embedding is supplied for a graph collection without embeddings.
345 pub fn upsert_node(
346 &self,
347 node_id: u64,
348 payload: &serde_json::Value,
349 vector: Option<Vec<f32>>,
350 ) -> Result<()> {
351 match vector {
352 Some(vector) => self
353 .inner
354 .upsert([Point::new(node_id, vector, Some(payload.clone()))]),
355 None => self.upsert_node_payload(node_id, payload),
356 }
357 }
358
359 /// Inserts or updates node payload (properties).
360 ///
361 /// # Errors
362 ///
363 /// Returns an error if storage fails.
364 #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
365 pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
366 self.upsert_node_payload(node_id, payload)
367 }
368
369 /// Retrieves node payload.
370 ///
371 /// # Errors
372 ///
373 /// Returns an error if retrieval fails.
374 pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
375 self.inner.get_node_payload(node_id)
376 }
377
378 // -------------------------------------------------------------------------
379 // Optional embedding search
380 // -------------------------------------------------------------------------
381
382 /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
383 ///
384 /// # Errors
385 ///
386 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
387 /// or `Error::DimensionMismatch` if the query dimension is wrong.
388 pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
389 self.inner.search_by_embedding(query, k)
390 }
391
392 /// Alias for [`search_by_embedding`](Self::search_by_embedding).
393 ///
394 /// Provided for API parity with [`crate::VectorCollection::search`].
395 ///
396 /// # Errors
397 ///
398 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
399 /// or `Error::DimensionMismatch` if the query dimension is wrong.
400 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
401 self.search_by_embedding(query, k)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::collection::graph::GraphSchema;
409 use crate::distance::DistanceMetric;
410 use std::collections::HashMap;
411 use tempfile::{tempdir, TempDir};
412
413 /// Creates a schemaless cosine `GraphCollection` in a fresh temp dir.
414 ///
415 /// Returns the `TempDir` guard alongside the collection so the backing
416 /// directory outlives the test. `dimension` controls embedding support
417 /// (`None` for payload/edge-only collections, `Some(n)` for searchable ones).
418 fn make_test_collection(dimension: Option<usize>) -> (TempDir, GraphCollection) {
419 let dir = tempdir().unwrap();
420 let col = GraphCollection::create(
421 dir.path().to_path_buf(),
422 "kg",
423 dimension,
424 DistanceMetric::Cosine,
425 GraphSchema::schemaless(),
426 )
427 .unwrap();
428 (dir, col)
429 }
430
431 #[test]
432 fn test_all_node_ids_returns_ids_with_payload() {
433 let (_dir, col) = make_test_collection(None);
434
435 // Store payloads on two nodes
436 col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
437 .unwrap();
438 col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
439 .unwrap();
440
441 let ids = col.all_node_ids();
442 assert!(ids.contains(&10), "node 10 should be present");
443 assert!(ids.contains(&20), "node 20 should be present");
444 assert_eq!(ids.len(), 2);
445 }
446
447 #[test]
448 fn test_upsert_node_with_embedding_is_searchable() {
449 let (_dir, col) = make_test_collection(Some(4));
450
451 col.upsert_node(
452 10,
453 &serde_json::json!({"name": "Alice"}),
454 Some(vec![1.0, 0.0, 0.0, 0.0]),
455 )
456 .unwrap();
457
458 assert_eq!(
459 col.get_node_payload(10).unwrap(),
460 Some(serde_json::json!({"name": "Alice"}))
461 );
462 let results = col.search_by_embedding(&[1.0, 0.0, 0.0, 0.0], 1).unwrap();
463 assert_eq!(results[0].point.id, 10);
464 }
465
466 #[test]
467 fn test_edge_count_returns_correct_count() {
468 let (_dir, col) = make_test_collection(None);
469
470 assert_eq!(col.edge_count(), 0);
471
472 let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
473 col.add_edge(edge1).unwrap();
474 assert_eq!(col.edge_count(), 1);
475
476 let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
477 col.add_edge(edge2).unwrap();
478 assert_eq!(col.edge_count(), 2);
479 }
480
481 #[test]
482 fn test_traverse_bfs_parallel_through_graph_collection() {
483 let (_dir, col) = make_test_collection(None);
484
485 // Build chain: 1->2->3
486 col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
487 .unwrap();
488 col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
489 .unwrap();
490
491 let config = TraversalConfig {
492 max_depth: 3,
493 min_depth: 1,
494 ..TraversalConfig::default()
495 };
496 let results = col.traverse_bfs_parallel(&[1], &config);
497 let target_ids: std::collections::HashSet<u64> =
498 results.iter().map(|r| r.target_id).collect();
499 assert!(target_ids.contains(&2), "should reach node 2");
500 assert!(target_ids.contains(&3), "should reach node 3");
501 }
502
503 #[test]
504 fn test_execute_match_finds_edges() {
505 let (_dir, col) = make_test_collection(None);
506
507 // Store node payloads with labels
508 col.upsert_node_payload(
509 10,
510 &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
511 )
512 .unwrap();
513 col.upsert_node_payload(
514 20,
515 &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
516 )
517 .unwrap();
518
519 // Add edge: Alice -> Bob
520 let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
521 col.add_edge(edge).unwrap();
522
523 // MATCH query through the GraphCollection delegate
524 let match_clause = crate::velesql::MatchClause {
525 patterns: vec![crate::velesql::GraphPattern {
526 name: None,
527 nodes: vec![
528 crate::velesql::NodePattern::new().with_alias("a"),
529 crate::velesql::NodePattern::new().with_alias("b"),
530 ],
531 relationships: vec![crate::velesql::RelationshipPattern::new(
532 crate::velesql::Direction::Outgoing,
533 )],
534 }],
535 where_clause: None,
536 return_clause: crate::velesql::ReturnClause {
537 items: vec![],
538 order_by: None,
539 limit: Some(10),
540 },
541 };
542
543 let params = HashMap::new();
544 let results = col.execute_match(&match_clause, ¶ms).unwrap();
545 assert!(
546 !results.is_empty(),
547 "execute_match should find the KNOWS edge"
548 );
549 assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
550 }
551}