Skip to main content

velesdb_core/collection/core/
graph_api.rs

1//! Graph API methods for Collection (EPIC-015 US-001).
2//!
3//! Exposes Knowledge Graph operations on Collection for use by
4//! Tauri plugin, REST API, and other consumers.
5
6use std::collections::HashSet;
7
8use crate::collection::graph::{
9    EdgeStore, GraphEdge, GraphSchema, TraversalConfig, TraversalResult,
10};
11use crate::collection::types::Collection;
12use crate::error::{Error, Result};
13use crate::index::VectorIndex;
14use crate::point::{Point, SearchResult};
15use crate::storage::{PayloadStorage, VectorStorage};
16
17/// Returns `true` if the edge's label is accepted by the relationship filter.
18///
19/// An empty `rel_types` slice means "accept all".
20#[inline]
21fn edge_passes_rel_filter(edge: &GraphEdge, rel_types: &[&str]) -> bool {
22    rel_types.is_empty() || rel_types.contains(&edge.label())
23}
24
25/// Collects unvisited, rel-type-filtered neighbor expansions for a node.
26///
27/// Each returned tuple is `(target_id, next_depth, path_to_target)`.
28/// Visited targets are inserted into `visited` before returning, so
29/// duplicate expansion is impossible even when the caller enqueues lazily.
30#[inline]
31fn collect_neighbor_expansions<'a>(
32    edges: impl Iterator<Item = &'a GraphEdge>,
33    depth: u32,
34    path: &[u64],
35    rel_types: &[&str],
36    visited: &mut HashSet<u64>,
37) -> Vec<(u64, u32, Vec<u64>)> {
38    edges
39        .filter(|e| edge_passes_rel_filter(e, rel_types))
40        .filter(|e| visited.insert(e.target()))
41        .map(|e| {
42            let mut new_path = path.to_vec();
43            new_path.push(e.id());
44            (e.target(), depth + 1, new_path)
45        })
46        .collect()
47}
48
49/// Pushes unvisited, rel-type-filtered neighbors onto the DFS stack.
50///
51/// Iterates outgoing edges in reverse so that the first outgoing edge
52/// is processed first after `stack.pop()` (LIFO order preservation).
53#[inline]
54fn expand_dfs_neighbors(
55    store: &EdgeStore,
56    node_id: u64,
57    depth: u32,
58    path: &[u64],
59    rel_filter: &HashSet<&str>,
60    visited: &HashSet<u64>,
61    stack: &mut Vec<(u64, u32, Vec<u64>)>,
62) {
63    for edge in store.get_outgoing(node_id).into_iter().rev() {
64        if !rel_filter.is_empty() && !rel_filter.contains(edge.label()) {
65            continue;
66        }
67        if visited.contains(&edge.target()) {
68            continue;
69        }
70        let mut new_path = path.to_vec();
71        // Use edge IDs in path, consistent with bfs_traverse/bfs_stream.
72        new_path.push(edge.id());
73        stack.push((edge.target(), depth + 1, new_path));
74    }
75}
76
77/// Shared traversal loop for both BFS and DFS.
78///
79/// The caller provides a mutable frontier (pre-seeded with the source node)
80/// and two function pointers:
81/// - `pop_fn`: extracts the next element (FIFO for BFS, LIFO for DFS)
82/// - `push_fn`: enqueues a new element
83///
84/// This eliminates the duplicated loop bodies in `traverse_bfs` and
85/// `traverse_dfs`.
86type TraversalEntry = (u64, u32, Vec<u64>);
87
88/// Bundled parameters for `traverse_with_frontier` (avoids too-many-arguments).
89struct TraversalParams<'a> {
90    store: &'a EdgeStore,
91    filter: &'a [&'a str],
92    limit: usize,
93    max_depth: u32,
94    source: u64,
95}
96
97fn traverse_with_frontier<F>(
98    params: &TraversalParams<'_>,
99    pop_fn: fn(&mut F) -> Option<TraversalEntry>,
100    push_fn: fn(&mut F, TraversalEntry),
101    frontier: &mut F,
102) -> Vec<TraversalResult> {
103    let mut visited = HashSet::new();
104    let mut results = Vec::new();
105    visited.insert(params.source);
106
107    while let Some((node, depth, path)) = (pop_fn)(frontier) {
108        if results.len() >= params.limit {
109            break;
110        }
111        if depth >= params.max_depth {
112            continue;
113        }
114
115        let neighbors = collect_neighbor_expansions(
116            params.store.get_outgoing(node).into_iter(),
117            depth,
118            &path,
119            params.filter,
120            &mut visited,
121        );
122
123        for (target, next_depth, new_path) in neighbors {
124            results.push(TraversalResult {
125                target_id: target,
126                depth: next_depth,
127                path: new_path.clone(),
128            });
129            if results.len() >= params.limit {
130                break;
131            }
132            (push_fn)(frontier, (target, next_depth, new_path));
133        }
134    }
135
136    results
137}
138
139/// BFS pop: removes from the front of the `VecDeque`.
140fn bfs_pop(q: &mut std::collections::VecDeque<TraversalEntry>) -> Option<TraversalEntry> {
141    q.pop_front()
142}
143
144/// BFS push: appends to the back of the `VecDeque`.
145fn bfs_push(q: &mut std::collections::VecDeque<TraversalEntry>, item: TraversalEntry) {
146    q.push_back(item);
147}
148
149/// DFS pop: removes from the end of the `Vec`.
150fn dfs_pop(s: &mut Vec<TraversalEntry>) -> Option<TraversalEntry> {
151    s.pop()
152}
153
154/// DFS push: appends to the end of the `Vec`.
155fn dfs_push(s: &mut Vec<TraversalEntry>, item: TraversalEntry) {
156    s.push(item);
157}
158
159impl Collection {
160    /// Adds an edge to the collection's knowledge graph.
161    ///
162    /// # Arguments
163    ///
164    /// * `edge` - The edge to add (id, source, target, label, properties)
165    ///
166    /// # Errors
167    ///
168    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// use velesdb_core::collection::graph::GraphEdge;
174    ///
175    /// let edge = GraphEdge::new(1, 100, 200, "KNOWS")?;
176    /// collection.add_edge(edge)?;
177    /// ```
178    pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
179        self.edge_store.write().add_edge(edge)?;
180        // Bump write generation so any cached plan for this collection is
181        // invalidated on the next query (CACHE-01).
182        self.write_generation
183            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184        Ok(())
185    }
186
187    /// Gets all edges from the collection's knowledge graph.
188    ///
189    /// Note: This iterates through all stored edges. For large graphs,
190    /// consider using `get_edges_by_label` or `get_outgoing_edges` for
191    /// more targeted queries.
192    ///
193    /// # Returns
194    ///
195    /// Vector of all edges in the graph (cloned).
196    #[must_use]
197    pub fn get_all_edges(&self) -> Vec<GraphEdge> {
198        let store = self.edge_store.read();
199        store.all_edges().into_iter().cloned().collect()
200    }
201
202    /// Gets edges filtered by label.
203    ///
204    /// # Arguments
205    ///
206    /// * `label` - The edge label (relationship type) to filter by
207    ///
208    /// # Returns
209    ///
210    /// Vector of edges with the specified label (cloned).
211    #[must_use]
212    pub fn get_edges_by_label(&self, label: &str) -> Vec<GraphEdge> {
213        self.edge_store
214            .read()
215            .get_edges_by_label(label)
216            .into_iter()
217            .cloned()
218            .collect()
219    }
220
221    /// Gets outgoing edges from a specific node.
222    ///
223    /// # Arguments
224    ///
225    /// * `node_id` - The source node ID
226    ///
227    /// # Returns
228    ///
229    /// Vector of edges originating from the specified node (cloned).
230    #[must_use]
231    pub fn get_outgoing_edges(&self, node_id: u64) -> Vec<GraphEdge> {
232        self.edge_store
233            .read()
234            .get_outgoing(node_id)
235            .into_iter()
236            .cloned()
237            .collect()
238    }
239
240    /// Gets incoming edges to a specific node.
241    ///
242    /// # Arguments
243    ///
244    /// * `node_id` - The target node ID
245    ///
246    /// # Returns
247    ///
248    /// Vector of edges pointing to the specified node (cloned).
249    #[must_use]
250    pub fn get_incoming_edges(&self, node_id: u64) -> Vec<GraphEdge> {
251        self.edge_store
252            .read()
253            .get_incoming(node_id)
254            .into_iter()
255            .cloned()
256            .collect()
257    }
258
259    /// Traverses the graph using BFS from a source node.
260    ///
261    /// # Arguments
262    ///
263    /// * `source` - Starting node ID
264    /// * `max_depth` - Maximum traversal depth
265    /// * `rel_types` - Optional filter by relationship types
266    /// * `limit` - Maximum number of results
267    ///
268    /// # Returns
269    ///
270    /// Vector of traversal results with target nodes and paths.
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if traversal fails.
275    pub fn traverse_bfs(
276        &self,
277        source: u64,
278        max_depth: u32,
279        rel_types: Option<&[&str]>,
280        limit: usize,
281    ) -> Result<Vec<TraversalResult>> {
282        let store = self.edge_store.read();
283        let filter: &[&str] = rel_types.unwrap_or(&[]);
284        let params = TraversalParams {
285            store: &store,
286            filter,
287            limit,
288            max_depth,
289            source,
290        };
291        let mut frontier = std::collections::VecDeque::new();
292        frontier.push_back((source, 0u32, Vec::new()));
293
294        Ok(traverse_with_frontier(
295            &params,
296            bfs_pop,
297            bfs_push,
298            &mut frontier,
299        ))
300    }
301
302    /// Traverses the graph using DFS from a source node.
303    ///
304    /// # Arguments
305    ///
306    /// * `source` - Starting node ID
307    /// * `max_depth` - Maximum traversal depth
308    /// * `rel_types` - Optional filter by relationship types
309    /// * `limit` - Maximum number of results
310    ///
311    /// # Returns
312    ///
313    /// Vector of traversal results with target nodes and paths.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if traversal fails.
318    pub fn traverse_dfs(
319        &self,
320        source: u64,
321        max_depth: u32,
322        rel_types: Option<&[&str]>,
323        limit: usize,
324    ) -> Result<Vec<TraversalResult>> {
325        let store = self.edge_store.read();
326        let filter: &[&str] = rel_types.unwrap_or(&[]);
327        let params = TraversalParams {
328            store: &store,
329            filter,
330            limit,
331            max_depth,
332            source,
333        };
334        let mut frontier = vec![(source, 0u32, Vec::new())];
335
336        Ok(traverse_with_frontier(
337            &params,
338            dfs_pop,
339            dfs_push,
340            &mut frontier,
341        ))
342    }
343
344    /// Gets the in-degree and out-degree of a node.
345    ///
346    /// # Arguments
347    ///
348    /// * `node_id` - The node ID
349    ///
350    /// # Returns
351    ///
352    /// Tuple of (`in_degree`, `out_degree`).
353    #[must_use]
354    pub fn get_node_degree(&self, node_id: u64) -> (usize, usize) {
355        let store = self.edge_store.read();
356        let in_degree = store.get_incoming(node_id).len();
357        let out_degree = store.get_outgoing(node_id).len();
358        (in_degree, out_degree)
359    }
360
361    /// Removes an edge from the graph by ID.
362    ///
363    /// # Arguments
364    ///
365    /// * `edge_id` - The edge ID to remove
366    ///
367    /// # Returns
368    ///
369    /// `true` if the edge existed and was removed, `false` if it didn't exist.
370    #[must_use]
371    pub fn remove_edge(&self, edge_id: u64) -> bool {
372        let mut store = self.edge_store.write();
373        if store.contains_edge(edge_id) {
374            store.remove_edge(edge_id);
375            // Bump only when a mutation actually occurred (CACHE-01).
376            // Releasing the write lock before the atomic bump is intentional:
377            // the bump is a best-effort cache invalidation hint, not part of
378            // the edge-store transaction.
379            drop(store);
380            self.write_generation
381                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
382            true
383        } else {
384            false
385        }
386    }
387
388    /// Returns the total number of edges in the graph.
389    #[must_use]
390    pub fn edge_count(&self) -> usize {
391        self.edge_store.read().len()
392    }
393
394    // -------------------------------------------------------------------------
395    // Graph schema
396    // -------------------------------------------------------------------------
397
398    /// Returns the graph schema stored in the collection config, if any.
399    #[must_use]
400    pub fn graph_schema(&self) -> Option<GraphSchema> {
401        self.config.read().graph_schema.clone()
402    }
403
404    /// Returns `true` if this collection was created as a graph collection.
405    #[must_use]
406    pub fn is_graph(&self) -> bool {
407        self.config.read().graph_schema.is_some()
408    }
409
410    /// Returns `true` if this graph collection stores node embeddings.
411    #[must_use]
412    pub fn has_embeddings(&self) -> bool {
413        self.config.read().embedding_dimension.is_some()
414    }
415
416    // -------------------------------------------------------------------------
417    // Node payload (graph node properties)
418    // -------------------------------------------------------------------------
419
420    /// Stores a JSON payload for a graph node.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if storage fails.
425    pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
426        let mut storage = self.payload_storage.write();
427        storage.store(node_id, payload)?;
428        // Bump write generation so any cached plan for this collection is
429        // invalidated on the next query (CACHE-01).
430        self.write_generation
431            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
432        Ok(())
433    }
434
435    /// Retrieves the JSON payload for a graph node.
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if retrieval fails.
440    pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
441        Ok(self.payload_storage.read().retrieve(node_id)?)
442    }
443
444    // -------------------------------------------------------------------------
445    // Graph traversal with TraversalConfig
446    // -------------------------------------------------------------------------
447
448    /// BFS traversal using the core `bfs_stream` iterator.
449    #[must_use]
450    pub fn traverse_bfs_config(
451        &self,
452        source_id: u64,
453        config: &TraversalConfig,
454    ) -> Vec<TraversalResult> {
455        use crate::collection::graph::{bfs_stream, StreamingConfig};
456        let store = self.edge_store.read();
457        let streaming = StreamingConfig {
458            max_depth: config.max_depth,
459            rel_types: config.rel_types.clone(),
460            limit: Some(config.limit),
461            max_visited_size: 100_000,
462        };
463        bfs_stream(&store, source_id, streaming)
464            .filter(|result| result.depth >= config.min_depth)
465            .take(config.limit)
466            .collect()
467    }
468
469    /// DFS traversal (iterative) using `TraversalConfig`.
470    #[must_use]
471    pub fn traverse_dfs_config(
472        &self,
473        source_id: u64,
474        config: &TraversalConfig,
475    ) -> Vec<TraversalResult> {
476        use std::collections::HashSet;
477        let store = self.edge_store.read();
478        let rel_filter: HashSet<&str> = config.rel_types.iter().map(String::as_str).collect();
479
480        let mut results = Vec::new();
481        let mut visited: HashSet<u64> = HashSet::new();
482        let mut stack: Vec<(u64, u32, Vec<u64>)> = vec![(source_id, 0, Vec::new())];
483
484        while let Some((node_id, depth, path)) = stack.pop() {
485            if results.len() >= config.limit {
486                break;
487            }
488            if !visited.insert(node_id) {
489                continue;
490            }
491            if depth >= config.min_depth && depth > 0 {
492                results.push(TraversalResult::new(node_id, path.clone(), depth));
493                if results.len() >= config.limit {
494                    break;
495                }
496            }
497            if depth < config.max_depth {
498                expand_dfs_neighbors(
499                    &store,
500                    node_id,
501                    depth,
502                    &path,
503                    &rel_filter,
504                    &visited,
505                    &mut stack,
506                );
507            }
508        }
509        results
510    }
511
512    // -------------------------------------------------------------------------
513    // Embedding search on graph nodes
514    // -------------------------------------------------------------------------
515
516    /// Searches for similar graph nodes by embedding vector.
517    ///
518    /// Only available if `has_embeddings()` returns `true`.
519    ///
520    /// # Errors
521    ///
522    /// Returns `Error::VectorNotAllowed` if no embeddings are configured,
523    /// or `Error::DimensionMismatch` if the query dimension is wrong.
524    pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
525        let config = self.config.read();
526        let emb_dim = config
527            .embedding_dimension
528            .ok_or_else(|| Error::VectorNotAllowed(config.name.clone()))?;
529        drop(config);
530
531        if query.len() != emb_dim {
532            return Err(Error::DimensionMismatch {
533                expected: emb_dim,
534                actual: query.len(),
535            });
536        }
537
538        // Reason: we reuse the existing HNSW index (dimension == emb_dim when created
539        // via create_graph_collection_with_embeddings). For graph-without-embeddings
540        // the HNSW has dimension 0 and the guard above already rejected the call.
541        let metric = self.config.read().metric;
542        let ids = self.index.search(query, k);
543        let ids = self.merge_delta(ids, query, k, metric);
544
545        // Acquire each lock once: collect vector data, then collect payload data.
546        // This avoids holding vector_storage while locking payload_storage per item.
547        let vectors: Vec<(u64, f32, Option<Vec<f32>>)> = {
548            let vector_storage = self.vector_storage.read();
549            ids.into_iter()
550                .map(|sr| {
551                    let vec = vector_storage.retrieve(sr.id).ok().flatten();
552                    (sr.id, sr.score, vec)
553                })
554                .collect()
555        };
556        let results = {
557            let payload_storage = self.payload_storage.read();
558            vectors
559                .into_iter()
560                .filter_map(|(id, score, vector)| {
561                    let vector = vector?;
562                    let payload = payload_storage.retrieve(id).ok().flatten();
563                    Some(SearchResult::new(
564                        Point {
565                            id,
566                            vector,
567                            payload,
568                            sparse_vectors: None,
569                        },
570                        score,
571                    ))
572                })
573                .collect()
574        };
575        Ok(results)
576    }
577}