velesdb_core/collection/graph_collection.rs
1//! `GraphCollection`: knowledge graph with optional node embeddings.
2//!
3//! # Design
4//!
5//! `GraphCollection` is a pure newtype over `Collection` (C-02).
6//! All graph state (edge store, property/range indexes, node payloads, optional
7//! HNSW for node embeddings) lives inside the single `inner: Collection`.
8//! The graph schema and embedding dimension are persisted in `config.json`.
9//! There are no separate engine fields — no dual-storage risk.
10
11use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19/// A graph collection storing typed relationships between nodes.
20///
21/// Node embeddings are optional: if `dimension` is `None`, no vector index is created.
22///
23/// # Examples
24///
25/// ```rust,no_run
26/// use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
27///
28/// let coll = GraphCollection::create(
29/// "./data/kg".into(),
30/// "knowledge",
31/// None, // no embeddings
32/// DistanceMetric::Cosine, // unused when no embeddings
33/// GraphSchema::schemaless(),
34/// )?;
35///
36/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
37/// coll.add_edge(edge)?;
38/// # Ok::<(), velesdb_core::Error>(())
39/// ```
40#[derive(Clone)]
41pub struct GraphCollection {
42 /// Single source of truth — all graph state lives here (C-02 pure newtype).
43 pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47 // -------------------------------------------------------------------------
48 // Lifecycle
49 // -------------------------------------------------------------------------
50
51 /// Creates a new `GraphCollection`.
52 ///
53 /// # Errors
54 ///
55 /// Returns an error if the directory cannot be created or storage fails.
56 pub fn create(
57 path: PathBuf,
58 name: &str,
59 dimension: Option<usize>,
60 metric: DistanceMetric,
61 schema: GraphSchema,
62 ) -> Result<Self> {
63 Ok(Self {
64 inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65 })
66 }
67
68 /// Opens an existing `GraphCollection` from disk.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if config or storage cannot be opened.
73 pub fn open(path: PathBuf) -> Result<Self> {
74 Ok(Self {
75 inner: Collection::open(path)?,
76 })
77 }
78
79 /// Flushes all state to disk.
80 ///
81 /// Issue #423: This fast-path flush skips `vectors.idx` serialization.
82 /// The WAL provides crash recovery for the vector index.
83 ///
84 /// # Errors
85 ///
86 /// Returns an error if any flush operation fails.
87 pub fn flush(&self) -> Result<()> {
88 self.inner.flush()
89 }
90
91 /// Full durability flush including `vectors.idx` serialization.
92 ///
93 /// Issue #423: Use on graceful shutdown to avoid a full WAL replay
94 /// on the next startup.
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if any flush operation fails.
99 pub fn flush_full(&self) -> Result<()> {
100 self.inner.flush_full()
101 }
102
103 // -------------------------------------------------------------------------
104 // Metadata
105 // -------------------------------------------------------------------------
106
107 /// Returns the collection name.
108 #[must_use]
109 pub fn name(&self) -> String {
110 self.inner.config().name
111 }
112
113 /// Returns the graph schema stored in config.
114 ///
115 /// Returns `GraphSchema::schemaless()` for collections that have no schema set.
116 #[must_use]
117 pub fn schema(&self) -> GraphSchema {
118 self.inner
119 .graph_schema()
120 .unwrap_or_else(GraphSchema::schemaless)
121 }
122
123 /// Returns `true` if this collection stores node embeddings.
124 #[must_use]
125 pub fn has_embeddings(&self) -> bool {
126 self.inner.has_embeddings()
127 }
128
129 // -------------------------------------------------------------------------
130 // Graph operations — delegate to Collection graph API
131 // -------------------------------------------------------------------------
132
133 /// Adds an edge between two nodes.
134 ///
135 /// # Errors
136 ///
137 /// - Returns `Error::EdgeExists` if an edge with the same ID already exists.
138 ///
139 /// # Examples
140 ///
141 /// ```rust,no_run
142 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
143 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
144 /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
145 /// coll.add_edge(edge)?;
146 /// # Ok::<(), velesdb_core::Error>(())
147 /// ```
148 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149 self.inner.add_edge(edge)
150 }
151
152 /// Adds multiple edges in batch (much faster than calling add_edge in a loop).
153 ///
154 /// Acquires locks once for the entire batch and rebuilds the CSR snapshot
155 /// once at the end. Duplicate edge IDs are silently skipped.
156 ///
157 /// # Returns
158 ///
159 /// Number of edges successfully added.
160 #[must_use]
161 pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> usize {
162 let count = self.inner.edge_store.add_edges_batch(edges);
163 if count > 0 {
164 self.inner
165 .write_generation
166 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
167 }
168 count
169 }
170
171 /// Returns edges, optionally filtered by label.
172 #[must_use]
173 pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
174 match label {
175 Some(lbl) => self.inner.get_edges_by_label(lbl),
176 None => self.inner.get_all_edges(),
177 }
178 }
179
180 /// Returns all outgoing edges from a node.
181 #[must_use]
182 pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
183 self.inner.get_outgoing_edges(node_id)
184 }
185
186 /// Returns all incoming edges to a node.
187 #[must_use]
188 pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
189 self.inner.get_incoming_edges(node_id)
190 }
191
192 /// Returns the total number of edges in the graph without materializing them.
193 #[must_use]
194 pub fn edge_count(&self) -> usize {
195 self.inner.edge_count()
196 }
197
198 /// Returns `(in_degree, out_degree)` for a node.
199 #[must_use]
200 pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
201 self.inner.get_node_degree(node_id)
202 }
203
204 /// Returns the IDs of all nodes that have a stored payload.
205 ///
206 /// Nodes that appear only as edge endpoints without a stored payload
207 /// are not included. Use [`GraphCollection::get_edges`] to discover
208 /// all referenced node IDs.
209 #[must_use]
210 pub fn all_node_ids(&self) -> Vec<u64> {
211 self.inner.all_ids()
212 }
213
214 /// Returns the next batch of points for scroll iteration.
215 ///
216 /// Delegates to the inner collection's `scroll_batch` (parallel
217 /// implementation to [`VectorCollection::scroll_batch`](crate::VectorCollection::scroll_batch)).
218 ///
219 /// # Errors
220 ///
221 /// Returns an error if `batch_size` is 0.
222 pub fn scroll_batch(
223 &self,
224 cursor: Option<u64>,
225 batch_size: usize,
226 filter: Option<&crate::filter::Filter>,
227 ) -> Result<crate::collection::ScrollBatch> {
228 self.inner.scroll_batch(cursor, batch_size, filter)
229 }
230
231 /// Returns the number of nodes (points) stored in this collection.
232 #[must_use]
233 pub fn len(&self) -> usize {
234 self.inner.len()
235 }
236
237 /// Returns `true` if the collection contains no nodes.
238 #[must_use]
239 pub fn is_empty(&self) -> bool {
240 self.inner.is_empty()
241 }
242
243 /// Retrieves nodes by IDs, returning `None` for missing entries.
244 #[must_use]
245 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
246 self.inner.get(ids)
247 }
248
249 /// Deletes nodes by IDs.
250 ///
251 /// Missing IDs are silently ignored.
252 ///
253 /// # Errors
254 ///
255 /// Returns an error if storage operations fail.
256 pub fn delete(&self, ids: &[u64]) -> Result<()> {
257 self.inner.delete(ids)
258 }
259
260 /// Removes an edge from the graph by ID.
261 ///
262 /// Returns `true` if the edge existed and was removed, `false` otherwise.
263 #[must_use]
264 pub fn remove_edge(&self, edge_id: u64) -> bool {
265 self.inner.remove_edge(edge_id)
266 }
267
268 /// Performs BFS traversal from a source node.
269 ///
270 /// # Examples
271 ///
272 /// ```rust,no_run
273 /// # use velesdb_core::{GraphCollection, GraphSchema, GraphEdge, DistanceMetric};
274 /// # use velesdb_core::collection::graph::TraversalConfig;
275 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
276 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
277 /// let results = coll.traverse_bfs(100, &config);
278 /// for r in &results {
279 /// println!("node={} depth={}", r.target_id, r.depth);
280 /// }
281 /// # Ok::<(), velesdb_core::Error>(())
282 /// ```
283 #[must_use]
284 pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
285 self.inner.traverse_bfs_config(source_id, config)
286 }
287
288 /// Performs DFS traversal from a source node.
289 #[must_use]
290 pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
291 self.inner.traverse_dfs_config(source_id, config)
292 }
293
294 /// Performs parallel BFS traversal from multiple start nodes.
295 ///
296 /// When `start_nodes` exceeds the parallel threshold (100 nodes), rayon
297 /// distributes independent per-start-node BFS traversals across CPU cores.
298 /// Results are deduplicated by path signature and truncated to `config.limit`.
299 ///
300 /// # Examples
301 ///
302 /// ```rust,no_run
303 /// # use velesdb_core::{GraphCollection, GraphSchema, DistanceMetric};
304 /// # use velesdb_core::collection::graph::TraversalConfig;
305 /// # let coll = GraphCollection::create("./data/kg".into(), "kg", None, DistanceMetric::Cosine, GraphSchema::schemaless())?;
306 /// let config = TraversalConfig { max_depth: 3, ..TraversalConfig::default() };
307 /// let results = coll.traverse_bfs_parallel(&[100, 200, 300], &config);
308 /// for r in &results {
309 /// println!("node={} depth={}", r.target_id, r.depth);
310 /// }
311 /// # Ok::<(), velesdb_core::Error>(())
312 /// ```
313 #[must_use]
314 pub fn traverse_bfs_parallel(
315 &self,
316 start_nodes: &[u64],
317 config: &TraversalConfig,
318 ) -> Vec<TraversalResult> {
319 self.inner.traverse_bfs_parallel(start_nodes, config)
320 }
321
322 // -------------------------------------------------------------------------
323 // Payload / node properties
324 // -------------------------------------------------------------------------
325
326 /// Inserts or updates node payload (properties).
327 ///
328 /// # Errors
329 ///
330 /// Returns an error if storage fails.
331 pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
332 self.inner.store_node_payload(node_id, payload)
333 }
334
335 /// Inserts or updates node payload (properties).
336 ///
337 /// # Errors
338 ///
339 /// Returns an error if storage fails.
340 #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
341 pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
342 self.upsert_node_payload(node_id, payload)
343 }
344
345 /// Retrieves node payload.
346 ///
347 /// # Errors
348 ///
349 /// Returns an error if retrieval fails.
350 pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
351 self.inner.get_node_payload(node_id)
352 }
353
354 // -------------------------------------------------------------------------
355 // Optional embedding search
356 // -------------------------------------------------------------------------
357
358 /// Searches for similar nodes by embedding (only available if `has_embeddings()`).
359 ///
360 /// # Errors
361 ///
362 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
363 /// or `Error::DimensionMismatch` if the query dimension is wrong.
364 pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
365 self.inner.search_by_embedding(query, k)
366 }
367
368 /// Alias for [`search_by_embedding`](Self::search_by_embedding).
369 ///
370 /// Provided for API parity with [`crate::VectorCollection::search`].
371 ///
372 /// # Errors
373 ///
374 /// Returns `Error::VectorNotAllowed` if this collection has no embeddings,
375 /// or `Error::DimensionMismatch` if the query dimension is wrong.
376 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
377 self.search_by_embedding(query, k)
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::collection::graph::GraphSchema;
385 use crate::distance::DistanceMetric;
386 use std::collections::HashMap;
387 use tempfile::tempdir;
388
389 #[test]
390 fn test_all_node_ids_returns_ids_with_payload() {
391 let dir = tempdir().unwrap();
392 let col = GraphCollection::create(
393 dir.path().to_path_buf(),
394 "kg",
395 None,
396 DistanceMetric::Cosine,
397 GraphSchema::schemaless(),
398 )
399 .unwrap();
400
401 // Store payloads on two nodes
402 col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
403 .unwrap();
404 col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
405 .unwrap();
406
407 let ids = col.all_node_ids();
408 assert!(ids.contains(&10), "node 10 should be present");
409 assert!(ids.contains(&20), "node 20 should be present");
410 assert_eq!(ids.len(), 2);
411 }
412
413 #[test]
414 fn test_edge_count_returns_correct_count() {
415 let dir = tempdir().unwrap();
416 let col = GraphCollection::create(
417 dir.path().to_path_buf(),
418 "kg",
419 None,
420 DistanceMetric::Cosine,
421 GraphSchema::schemaless(),
422 )
423 .unwrap();
424
425 assert_eq!(col.edge_count(), 0);
426
427 let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
428 col.add_edge(edge1).unwrap();
429 assert_eq!(col.edge_count(), 1);
430
431 let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
432 col.add_edge(edge2).unwrap();
433 assert_eq!(col.edge_count(), 2);
434 }
435
436 #[test]
437 fn test_traverse_bfs_parallel_through_graph_collection() {
438 let dir = tempdir().unwrap();
439 let col = GraphCollection::create(
440 dir.path().to_path_buf(),
441 "kg",
442 None,
443 DistanceMetric::Cosine,
444 GraphSchema::schemaless(),
445 )
446 .unwrap();
447
448 // Build chain: 1->2->3
449 col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
450 .unwrap();
451 col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
452 .unwrap();
453
454 let config = TraversalConfig {
455 max_depth: 3,
456 min_depth: 1,
457 ..TraversalConfig::default()
458 };
459 let results = col.traverse_bfs_parallel(&[1], &config);
460 let target_ids: std::collections::HashSet<u64> =
461 results.iter().map(|r| r.target_id).collect();
462 assert!(target_ids.contains(&2), "should reach node 2");
463 assert!(target_ids.contains(&3), "should reach node 3");
464 }
465
466 #[test]
467 fn test_execute_match_finds_edges() {
468 let dir = tempdir().unwrap();
469 let col = GraphCollection::create(
470 dir.path().to_path_buf(),
471 "kg",
472 None,
473 DistanceMetric::Cosine,
474 GraphSchema::schemaless(),
475 )
476 .unwrap();
477
478 // Store node payloads with labels
479 col.upsert_node_payload(
480 10,
481 &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
482 )
483 .unwrap();
484 col.upsert_node_payload(
485 20,
486 &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
487 )
488 .unwrap();
489
490 // Add edge: Alice -> Bob
491 let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
492 col.add_edge(edge).unwrap();
493
494 // MATCH query through the GraphCollection delegate
495 let match_clause = crate::velesql::MatchClause {
496 patterns: vec![crate::velesql::GraphPattern {
497 name: None,
498 nodes: vec![
499 crate::velesql::NodePattern::new().with_alias("a"),
500 crate::velesql::NodePattern::new().with_alias("b"),
501 ],
502 relationships: vec![crate::velesql::RelationshipPattern::new(
503 crate::velesql::Direction::Outgoing,
504 )],
505 }],
506 where_clause: None,
507 return_clause: crate::velesql::ReturnClause {
508 items: vec![],
509 order_by: None,
510 limit: Some(10),
511 },
512 };
513
514 let params = HashMap::new();
515 let results = col.execute_match(&match_clause, ¶ms).unwrap();
516 assert!(
517 !results.is_empty(),
518 "execute_match should find the KNOWS edge"
519 );
520 assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
521 }
522}