velesdb_core/collection/core/graph_api.rs
1//! Graph API methods for Collection (EPIC-015 US-001).
2//!
3//! Exposes Knowledge Graph operations on Collection for use by
4//! Tauri plugin, REST API, and other consumers.
5
6use std::collections::HashSet;
7
8use crate::collection::graph::{
9 EdgeStore, GraphEdge, GraphSchema, TraversalConfig, TraversalResult,
10};
11use crate::collection::types::Collection;
12use crate::error::{Error, Result};
13use crate::index::VectorIndex;
14use crate::point::{Point, SearchResult};
15use crate::storage::{PayloadStorage, VectorStorage};
16
17/// Returns `true` if the edge's label is accepted by the relationship filter.
18///
19/// An empty `rel_types` slice means "accept all".
20#[inline]
21fn edge_passes_rel_filter(edge: &GraphEdge, rel_types: &[&str]) -> bool {
22 rel_types.is_empty() || rel_types.contains(&edge.label())
23}
24
25/// Collects unvisited, rel-type-filtered neighbor expansions for a node.
26///
27/// Each returned tuple is `(target_id, next_depth, path_to_target)`.
28/// Visited targets are inserted into `visited` before returning, so
29/// duplicate expansion is impossible even when the caller enqueues lazily.
30#[inline]
31fn collect_neighbor_expansions<'a>(
32 edges: impl Iterator<Item = &'a GraphEdge>,
33 depth: u32,
34 path: &[u64],
35 rel_types: &[&str],
36 visited: &mut HashSet<u64>,
37) -> Vec<(u64, u32, Vec<u64>)> {
38 edges
39 .filter(|e| edge_passes_rel_filter(e, rel_types))
40 .filter(|e| visited.insert(e.target()))
41 .map(|e| {
42 let mut new_path = path.to_vec();
43 new_path.push(e.id());
44 (e.target(), depth + 1, new_path)
45 })
46 .collect()
47}
48
49/// Pushes unvisited, rel-type-filtered neighbors onto the DFS stack.
50///
51/// Iterates outgoing edges in reverse so that the first outgoing edge
52/// is processed first after `stack.pop()` (LIFO order preservation).
53#[inline]
54fn expand_dfs_neighbors(
55 store: &EdgeStore,
56 node_id: u64,
57 depth: u32,
58 path: &[u64],
59 rel_filter: &HashSet<&str>,
60 visited: &HashSet<u64>,
61 stack: &mut Vec<(u64, u32, Vec<u64>)>,
62) {
63 for edge in store.get_outgoing(node_id).into_iter().rev() {
64 if !rel_filter.is_empty() && !rel_filter.contains(edge.label()) {
65 continue;
66 }
67 if visited.contains(&edge.target()) {
68 continue;
69 }
70 let mut new_path = path.to_vec();
71 // Use edge IDs in path, consistent with bfs_traverse/bfs_stream.
72 new_path.push(edge.id());
73 stack.push((edge.target(), depth + 1, new_path));
74 }
75}
76
77/// Shared traversal loop for both BFS and DFS.
78///
79/// The caller provides a mutable frontier (pre-seeded with the source node)
80/// and two function pointers:
81/// - `pop_fn`: extracts the next element (FIFO for BFS, LIFO for DFS)
82/// - `push_fn`: enqueues a new element
83///
84/// This eliminates the duplicated loop bodies in `traverse_bfs` and
85/// `traverse_dfs`.
86type TraversalEntry = (u64, u32, Vec<u64>);
87
88/// Bundled parameters for `traverse_with_frontier` (avoids too-many-arguments).
89struct TraversalParams<'a> {
90 store: &'a EdgeStore,
91 filter: &'a [&'a str],
92 limit: usize,
93 max_depth: u32,
94 source: u64,
95}
96
97fn traverse_with_frontier<F>(
98 params: &TraversalParams<'_>,
99 pop_fn: fn(&mut F) -> Option<TraversalEntry>,
100 push_fn: fn(&mut F, TraversalEntry),
101 frontier: &mut F,
102) -> Vec<TraversalResult> {
103 let mut visited = HashSet::new();
104 let mut results = Vec::new();
105 visited.insert(params.source);
106
107 while let Some((node, depth, path)) = (pop_fn)(frontier) {
108 if results.len() >= params.limit {
109 break;
110 }
111 if depth >= params.max_depth {
112 continue;
113 }
114
115 let neighbors = collect_neighbor_expansions(
116 params.store.get_outgoing(node).into_iter(),
117 depth,
118 &path,
119 params.filter,
120 &mut visited,
121 );
122
123 for (target, next_depth, new_path) in neighbors {
124 results.push(TraversalResult {
125 target_id: target,
126 depth: next_depth,
127 path: new_path.clone(),
128 });
129 if results.len() >= params.limit {
130 break;
131 }
132 (push_fn)(frontier, (target, next_depth, new_path));
133 }
134 }
135
136 results
137}
138
139/// BFS pop: removes from the front of the `VecDeque`.
140fn bfs_pop(q: &mut std::collections::VecDeque<TraversalEntry>) -> Option<TraversalEntry> {
141 q.pop_front()
142}
143
144/// BFS push: appends to the back of the `VecDeque`.
145fn bfs_push(q: &mut std::collections::VecDeque<TraversalEntry>, item: TraversalEntry) {
146 q.push_back(item);
147}
148
149/// DFS pop: removes from the end of the `Vec`.
150fn dfs_pop(s: &mut Vec<TraversalEntry>) -> Option<TraversalEntry> {
151 s.pop()
152}
153
154/// DFS push: appends to the end of the `Vec`.
155fn dfs_push(s: &mut Vec<TraversalEntry>, item: TraversalEntry) {
156 s.push(item);
157}
158
159impl Collection {
160 /// Adds an edge to the collection's knowledge graph.
161 ///
162 /// # Arguments
163 ///
164 /// * `edge` - The edge to add (id, source, target, label, properties)
165 ///
166 /// # Errors
167 ///
168 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
169 ///
170 /// # Example
171 ///
172 /// ```rust,ignore
173 /// use velesdb_core::collection::graph::GraphEdge;
174 ///
175 /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
176 /// collection.add_edge(edge)?;
177 /// ```
178 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
179 self.edge_store.write().add_edge(edge)?;
180 // Bump write generation so any cached plan for this collection is
181 // invalidated on the next query (CACHE-01).
182 self.write_generation
183 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184 Ok(())
185 }
186
187 /// Gets all edges from the collection's knowledge graph.
188 ///
189 /// Note: This iterates through all stored edges. For large graphs,
190 /// consider using `get_edges_by_label` or `get_outgoing_edges` for
191 /// more targeted queries.
192 ///
193 /// # Returns
194 ///
195 /// Vector of all edges in the graph (cloned).
196 #[must_use]
197 pub fn get_all_edges(&self) -> Vec<GraphEdge> {
198 let store = self.edge_store.read();
199 store.all_edges().into_iter().cloned().collect()
200 }
201
202 /// Gets edges filtered by label.
203 ///
204 /// # Arguments
205 ///
206 /// * `label` - The edge label (relationship type) to filter by
207 ///
208 /// # Returns
209 ///
210 /// Vector of edges with the specified label (cloned).
211 #[must_use]
212 pub fn get_edges_by_label(&self, label: &str) -> Vec<GraphEdge> {
213 self.edge_store
214 .read()
215 .get_edges_by_label(label)
216 .into_iter()
217 .cloned()
218 .collect()
219 }
220
221 /// Gets outgoing edges from a specific node.
222 ///
223 /// # Arguments
224 ///
225 /// * `node_id` - The source node ID
226 ///
227 /// # Returns
228 ///
229 /// Vector of edges originating from the specified node (cloned).
230 #[must_use]
231 pub fn get_outgoing_edges(&self, node_id: u64) -> Vec<GraphEdge> {
232 self.edge_store
233 .read()
234 .get_outgoing(node_id)
235 .into_iter()
236 .cloned()
237 .collect()
238 }
239
240 /// Gets incoming edges to a specific node.
241 ///
242 /// # Arguments
243 ///
244 /// * `node_id` - The target node ID
245 ///
246 /// # Returns
247 ///
248 /// Vector of edges pointing to the specified node (cloned).
249 #[must_use]
250 pub fn get_incoming_edges(&self, node_id: u64) -> Vec<GraphEdge> {
251 self.edge_store
252 .read()
253 .get_incoming(node_id)
254 .into_iter()
255 .cloned()
256 .collect()
257 }
258
259 /// Traverses the graph using BFS from a source node.
260 ///
261 /// # Arguments
262 ///
263 /// * `source` - Starting node ID
264 /// * `max_depth` - Maximum traversal depth
265 /// * `rel_types` - Optional filter by relationship types
266 /// * `limit` - Maximum number of results
267 ///
268 /// # Returns
269 ///
270 /// Vector of traversal results with target nodes and paths.
271 ///
272 /// # Errors
273 ///
274 /// Returns an error if traversal fails.
275 pub fn traverse_bfs(
276 &self,
277 source: u64,
278 max_depth: u32,
279 rel_types: Option<&[&str]>,
280 limit: usize,
281 ) -> Result<Vec<TraversalResult>> {
282 let store = self.edge_store.read();
283 let filter: &[&str] = rel_types.unwrap_or(&[]);
284 let params = TraversalParams {
285 store: &store,
286 filter,
287 limit,
288 max_depth,
289 source,
290 };
291 let mut frontier = std::collections::VecDeque::new();
292 frontier.push_back((source, 0u32, Vec::new()));
293
294 Ok(traverse_with_frontier(
295 ¶ms,
296 bfs_pop,
297 bfs_push,
298 &mut frontier,
299 ))
300 }
301
302 /// Traverses the graph using DFS from a source node.
303 ///
304 /// # Arguments
305 ///
306 /// * `source` - Starting node ID
307 /// * `max_depth` - Maximum traversal depth
308 /// * `rel_types` - Optional filter by relationship types
309 /// * `limit` - Maximum number of results
310 ///
311 /// # Returns
312 ///
313 /// Vector of traversal results with target nodes and paths.
314 ///
315 /// # Errors
316 ///
317 /// Returns an error if traversal fails.
318 pub fn traverse_dfs(
319 &self,
320 source: u64,
321 max_depth: u32,
322 rel_types: Option<&[&str]>,
323 limit: usize,
324 ) -> Result<Vec<TraversalResult>> {
325 let store = self.edge_store.read();
326 let filter: &[&str] = rel_types.unwrap_or(&[]);
327 let params = TraversalParams {
328 store: &store,
329 filter,
330 limit,
331 max_depth,
332 source,
333 };
334 let mut frontier = vec![(source, 0u32, Vec::new())];
335
336 Ok(traverse_with_frontier(
337 ¶ms,
338 dfs_pop,
339 dfs_push,
340 &mut frontier,
341 ))
342 }
343
344 /// Gets the in-degree and out-degree of a node.
345 ///
346 /// # Arguments
347 ///
348 /// * `node_id` - The node ID
349 ///
350 /// # Returns
351 ///
352 /// Tuple of (`in_degree`, `out_degree`).
353 #[must_use]
354 pub fn get_node_degree(&self, node_id: u64) -> (usize, usize) {
355 let store = self.edge_store.read();
356 let in_degree = store.get_incoming(node_id).len();
357 let out_degree = store.get_outgoing(node_id).len();
358 (in_degree, out_degree)
359 }
360
361 /// Removes an edge from the graph by ID.
362 ///
363 /// # Arguments
364 ///
365 /// * `edge_id` - The edge ID to remove
366 ///
367 /// # Returns
368 ///
369 /// `true` if the edge existed and was removed, `false` if it didn't exist.
370 #[must_use]
371 pub fn remove_edge(&self, edge_id: u64) -> bool {
372 let mut store = self.edge_store.write();
373 if store.contains_edge(edge_id) {
374 store.remove_edge(edge_id);
375 // Bump only when a mutation actually occurred (CACHE-01).
376 // Releasing the write lock before the atomic bump is intentional:
377 // the bump is a best-effort cache invalidation hint, not part of
378 // the edge-store transaction.
379 drop(store);
380 self.write_generation
381 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
382 true
383 } else {
384 false
385 }
386 }
387
388 /// Returns the total number of edges in the graph.
389 #[must_use]
390 pub fn edge_count(&self) -> usize {
391 self.edge_store.read().len()
392 }
393
394 // -------------------------------------------------------------------------
395 // Graph schema
396 // -------------------------------------------------------------------------
397
398 /// Returns the graph schema stored in the collection config, if any.
399 #[must_use]
400 pub fn graph_schema(&self) -> Option<GraphSchema> {
401 self.config.read().graph_schema.clone()
402 }
403
404 /// Returns `true` if this collection was created as a graph collection.
405 #[must_use]
406 pub fn is_graph(&self) -> bool {
407 self.config.read().graph_schema.is_some()
408 }
409
410 /// Returns `true` if this graph collection stores node embeddings.
411 #[must_use]
412 pub fn has_embeddings(&self) -> bool {
413 self.config.read().embedding_dimension.is_some()
414 }
415
416 // -------------------------------------------------------------------------
417 // Node payload (graph node properties)
418 // -------------------------------------------------------------------------
419
420 /// Stores a JSON payload for a graph node.
421 ///
422 /// # Errors
423 ///
424 /// Returns an error if storage fails.
425 pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
426 let mut storage = self.payload_storage.write();
427 storage.store(node_id, payload)?;
428 // Bump write generation so any cached plan for this collection is
429 // invalidated on the next query (CACHE-01).
430 self.write_generation
431 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
432 Ok(())
433 }
434
435 /// Retrieves the JSON payload for a graph node.
436 ///
437 /// # Errors
438 ///
439 /// Returns an error if retrieval fails.
440 pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
441 Ok(self.payload_storage.read().retrieve(node_id)?)
442 }
443
444 // -------------------------------------------------------------------------
445 // Graph traversal with TraversalConfig
446 // -------------------------------------------------------------------------
447
448 /// BFS traversal using the core `bfs_stream` iterator.
449 #[must_use]
450 pub fn traverse_bfs_config(
451 &self,
452 source_id: u64,
453 config: &TraversalConfig,
454 ) -> Vec<TraversalResult> {
455 use crate::collection::graph::{bfs_stream, StreamingConfig};
456 let store = self.edge_store.read();
457 let streaming = StreamingConfig {
458 max_depth: config.max_depth,
459 rel_types: config.rel_types.clone(),
460 limit: Some(config.limit),
461 max_visited_size: 100_000,
462 };
463 bfs_stream(&store, source_id, streaming)
464 .filter(|result| result.depth >= config.min_depth)
465 .take(config.limit)
466 .collect()
467 }
468
469 /// DFS traversal (iterative) using `TraversalConfig`.
470 #[must_use]
471 pub fn traverse_dfs_config(
472 &self,
473 source_id: u64,
474 config: &TraversalConfig,
475 ) -> Vec<TraversalResult> {
476 use std::collections::HashSet;
477 let store = self.edge_store.read();
478 let rel_filter: HashSet<&str> = config.rel_types.iter().map(String::as_str).collect();
479
480 let mut results = Vec::new();
481 let mut visited: HashSet<u64> = HashSet::new();
482 let mut stack: Vec<(u64, u32, Vec<u64>)> = vec![(source_id, 0, Vec::new())];
483
484 while let Some((node_id, depth, path)) = stack.pop() {
485 if results.len() >= config.limit {
486 break;
487 }
488 if !visited.insert(node_id) {
489 continue;
490 }
491 if depth >= config.min_depth && depth > 0 {
492 results.push(TraversalResult::new(node_id, path.clone(), depth));
493 if results.len() >= config.limit {
494 break;
495 }
496 }
497 if depth < config.max_depth {
498 expand_dfs_neighbors(
499 &store,
500 node_id,
501 depth,
502 &path,
503 &rel_filter,
504 &visited,
505 &mut stack,
506 );
507 }
508 }
509 results
510 }
511
512 // -------------------------------------------------------------------------
513 // Embedding search on graph nodes
514 // -------------------------------------------------------------------------
515
516 /// Searches for similar graph nodes by embedding vector.
517 ///
518 /// Only available if `has_embeddings()` returns `true`.
519 ///
520 /// # Errors
521 ///
522 /// Returns `Error::VectorNotAllowed` if no embeddings are configured,
523 /// or `Error::DimensionMismatch` if the query dimension is wrong.
524 pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
525 let config = self.config.read();
526 let emb_dim = config
527 .embedding_dimension
528 .ok_or_else(|| Error::VectorNotAllowed(config.name.clone()))?;
529 drop(config);
530
531 if query.len() != emb_dim {
532 return Err(Error::DimensionMismatch {
533 expected: emb_dim,
534 actual: query.len(),
535 });
536 }
537
538 // Reason: we reuse the existing HNSW index (dimension == emb_dim when created
539 // via create_graph_collection_with_embeddings). For graph-without-embeddings
540 // the HNSW has dimension 0 and the guard above already rejected the call.
541 let metric = self.config.read().metric;
542 let ids = self.index.search(query, k);
543 let ids = self.merge_delta(ids, query, k, metric);
544
545 // Acquire each lock once: collect vector data, then collect payload data.
546 // This avoids holding vector_storage while locking payload_storage per item.
547 let vectors: Vec<(u64, f32, Option<Vec<f32>>)> = {
548 let vector_storage = self.vector_storage.read();
549 ids.into_iter()
550 .map(|sr| {
551 let vec = vector_storage.retrieve(sr.id).ok().flatten();
552 (sr.id, sr.score, vec)
553 })
554 .collect()
555 };
556 let results = {
557 let payload_storage = self.payload_storage.read();
558 vectors
559 .into_iter()
560 .filter_map(|(id, score, vector)| {
561 let vector = vector?;
562 let payload = payload_storage.retrieve(id).ok().flatten();
563 Some(SearchResult::new(
564 Point {
565 id,
566 vector,
567 payload,
568 sparse_vectors: None,
569 },
570 score,
571 ))
572 })
573 .collect()
574 };
575 Ok(results)
576 }
577}