Skip to main content

velesdb_core/collection/graph/
traversal.rs

1//! Graph traversal algorithms for multi-hop queries.
2//!
3//! This module provides BFS-based traversal for variable-length path patterns
4//! like `(a)-[*1..3]->(b)` in MATCH clauses.
5//!
6//! # Streaming Mode (EPIC-019 US-005)
7//!
8//! For large graphs, the module provides streaming iterators that yield results
9//! lazily without loading all visited nodes into memory at once.
10
11#![allow(dead_code)] // WIP: Will be used by MATCH clause execution
12
13use super::EdgeStore;
14use std::collections::{HashSet, VecDeque};
15
16/// Default maximum depth for unbounded traversals.
17pub const DEFAULT_MAX_DEPTH: u32 = 3;
18
19/// Safety cap for maximum depth to prevent runaway traversals.
20/// Only applied when user requests unbounded traversal (*).
21///
22/// Note: Neo4j and ArangoDB do NOT impose hard limits.
23/// 100 is chosen to cover most real-world use cases:
24/// - Social networks (6 degrees of separation)
25/// - Dependency graphs (deep npm/cargo trees)
26/// - Organizational hierarchies
27/// - Knowledge graphs
28pub const SAFETY_MAX_DEPTH: u32 = 100;
29
30/// Result of a graph traversal operation.
31#[derive(Debug, Clone)]
32pub struct TraversalResult {
33    /// The target node ID reached.
34    pub target_id: u64,
35    /// The path taken (list of edge IDs).
36    pub path: Vec<u64>,
37    /// Depth of the traversal (number of hops).
38    pub depth: u32,
39}
40
41impl TraversalResult {
42    /// Creates a new traversal result.
43    #[must_use]
44    pub fn new(target_id: u64, path: Vec<u64>, depth: u32) -> Self {
45        Self {
46            target_id,
47            path,
48            depth,
49        }
50    }
51}
52
53/// Configuration for graph traversal.
54#[derive(Debug, Clone)]
55pub struct TraversalConfig {
56    /// Minimum number of hops (inclusive).
57    pub min_depth: u32,
58    /// Maximum number of hops (inclusive).
59    pub max_depth: u32,
60    /// Maximum number of results to return.
61    pub limit: usize,
62    /// Filter by relationship types (empty = all types).
63    pub rel_types: Vec<String>,
64}
65
66impl Default for TraversalConfig {
67    fn default() -> Self {
68        Self {
69            min_depth: 1,
70            max_depth: DEFAULT_MAX_DEPTH,
71            limit: 100,
72            rel_types: Vec::new(),
73        }
74    }
75}
76
77impl TraversalConfig {
78    /// Creates a config for a specific range (e.g., *1..3).
79    ///
80    /// Respects the caller's max_depth without artificial capping.
81    /// For unbounded traversals, use `with_unbounded_range()` instead.
82    #[must_use]
83    pub fn with_range(min: u32, max: u32) -> Self {
84        Self {
85            min_depth: min,
86            max_depth: max,
87            ..Self::default()
88        }
89    }
90
91    /// Creates a config for unbounded traversal (e.g., *1..).
92    ///
93    /// Applies SAFETY_MAX_DEPTH cap to prevent runaway traversals.
94    #[must_use]
95    pub fn with_unbounded_range(min: u32) -> Self {
96        Self {
97            min_depth: min,
98            max_depth: SAFETY_MAX_DEPTH,
99            ..Self::default()
100        }
101    }
102
103    /// Sets the result limit.
104    #[must_use]
105    pub fn with_limit(mut self, limit: usize) -> Self {
106        self.limit = limit;
107        self
108    }
109
110    /// Filters by relationship types.
111    #[must_use]
112    pub fn with_rel_types(mut self, types: Vec<String>) -> Self {
113        self.rel_types = types;
114        self
115    }
116
117    /// Sets a custom max depth (for advanced use cases).
118    #[must_use]
119    pub fn with_max_depth(mut self, max_depth: u32) -> Self {
120        self.max_depth = max_depth;
121        self
122    }
123}
124
125/// BFS state for traversal.
126#[derive(Debug)]
127pub(super) struct BfsState {
128    /// Current node ID.
129    pub(super) node_id: u64,
130    /// Path taken to reach this node (edge IDs).
131    pub(super) path: Vec<u64>,
132    /// Current depth.
133    pub(super) depth: u32,
134}
135
136/// Performs BFS traversal from a source node.
137///
138/// Finds all paths from `source_id` within the configured depth range.
139/// Uses iterative BFS with `VecDeque` for better cache locality.
140///
141/// # Arguments
142///
143/// * `edge_store` - The edge storage to traverse.
144/// * `source_id` - Starting node ID.
145/// * `config` - Traversal configuration.
146///
147/// # Returns
148///
149/// Vector of traversal results, limited by `config.limit`.
150#[must_use]
151pub fn bfs_traverse(
152    edge_store: &EdgeStore,
153    source_id: u64,
154    config: &TraversalConfig,
155) -> Vec<TraversalResult> {
156    let mut results = Vec::new();
157    let mut visited = HashSet::new();
158    let mut queue = VecDeque::new();
159
160    // CRITICAL FIX: Mark source node as visited before traversal
161    // to prevent cycles back to source causing duplicate work
162    visited.insert(source_id);
163
164    // Initialize with source node
165    queue.push_back(BfsState {
166        node_id: source_id,
167        path: Vec::new(),
168        depth: 0,
169    });
170
171    while let Some(state) = queue.pop_front() {
172        // Check if we've collected enough results
173        if results.len() >= config.limit {
174            break;
175        }
176
177        // Get outgoing edges from current node
178        let edges = edge_store.get_outgoing(state.node_id);
179
180        for edge in edges {
181            // Filter by relationship type if specified
182            if !config.rel_types.is_empty() && !config.rel_types.contains(&edge.label().to_string())
183            {
184                continue;
185            }
186
187            let target = edge.target();
188            let new_depth = state.depth + 1;
189
190            // Skip if exceeds max depth
191            if new_depth > config.max_depth {
192                continue;
193            }
194
195            // Build new path
196            let mut new_path = state.path.clone();
197            new_path.push(edge.id());
198
199            // Add to results if within valid depth range
200            if new_depth >= config.min_depth {
201                results.push(TraversalResult::new(target, new_path.clone(), new_depth));
202
203                if results.len() >= config.limit {
204                    break;
205                }
206            }
207
208            // Continue traversal if not at max depth and not visited
209            if new_depth < config.max_depth && !visited.contains(&target) {
210                visited.insert(target);
211                queue.push_back(BfsState {
212                    node_id: target,
213                    path: new_path,
214                    depth: new_depth,
215                });
216            }
217        }
218    }
219
220    results
221}
222
223/// Performs BFS traversal in the reverse direction (following incoming edges).
224#[must_use]
225pub fn bfs_traverse_reverse(
226    edge_store: &EdgeStore,
227    source_id: u64,
228    config: &TraversalConfig,
229) -> Vec<TraversalResult> {
230    let mut results = Vec::new();
231    let mut visited = HashSet::new();
232    let mut queue = VecDeque::new();
233
234    // CRITICAL FIX: Mark source node as visited before traversal
235    visited.insert(source_id);
236
237    queue.push_back(BfsState {
238        node_id: source_id,
239        path: Vec::new(),
240        depth: 0,
241    });
242
243    while let Some(state) = queue.pop_front() {
244        if results.len() >= config.limit {
245            break;
246        }
247
248        let edges = edge_store.get_incoming(state.node_id);
249
250        for edge in edges {
251            if !config.rel_types.is_empty() {
252                let label = edge.label().to_string();
253                if !config.rel_types.contains(&label) {
254                    continue;
255                }
256            }
257
258            let source = edge.source();
259            let new_depth = state.depth + 1;
260
261            if new_depth > config.max_depth {
262                continue;
263            }
264
265            let mut new_path = state.path.clone();
266            new_path.push(edge.id());
267
268            if new_depth >= config.min_depth {
269                results.push(TraversalResult::new(source, new_path.clone(), new_depth));
270
271                if results.len() >= config.limit {
272                    break;
273                }
274            }
275
276            if new_depth < config.max_depth && !visited.contains(&source) {
277                visited.insert(source);
278                queue.push_back(BfsState {
279                    node_id: source,
280                    path: new_path,
281                    depth: new_depth,
282                });
283            }
284        }
285    }
286
287    results
288}
289
290/// Performs bidirectional BFS (follows both directions).
291#[must_use]
292pub fn bfs_traverse_both(
293    edge_store: &EdgeStore,
294    source_id: u64,
295    config: &TraversalConfig,
296) -> Vec<TraversalResult> {
297    let mut results = Vec::new();
298    let half_limit = config.limit / 2 + 1;
299
300    let config_half = TraversalConfig {
301        limit: half_limit,
302        ..config.clone()
303    };
304
305    // Forward traversal
306    let forward = bfs_traverse(edge_store, source_id, &config_half);
307    results.extend(forward);
308
309    // Reverse traversal
310    if results.len() < config.limit {
311        let reverse = bfs_traverse_reverse(edge_store, source_id, &config_half);
312        for r in reverse {
313            if results.len() >= config.limit {
314                break;
315            }
316            // Avoid duplicates
317            if !results
318                .iter()
319                .any(|existing| existing.target_id == r.target_id && existing.path == r.path)
320            {
321                results.push(r);
322            }
323        }
324    }
325
326    results.truncate(config.limit);
327    results
328}
329
330// Tests moved to traversal_tests.rs per project rules