velesdb-core 1.15.0

High-performance vector database engine written in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
//! Graph traversal algorithms for multi-hop queries.
//!
//! This module provides BFS-based traversal for variable-length path patterns
//! like `(a)-[*1..3]->(b)` in MATCH clauses.
//!
//! # Streaming Mode (EPIC-019 US-005)
//!
//! For large graphs, the module provides streaming iterators that yield results
//! lazily without loading all visited nodes into memory at once.

use super::EdgeStore;
use rustc_hash::{FxHashMap, FxHashSet};
use smallvec::SmallVec;
use std::collections::VecDeque;

/// Default maximum depth for unbounded traversals.
pub const DEFAULT_MAX_DEPTH: u32 = 3;

/// Safety cap for maximum depth to prevent runaway traversals.
/// Only applied when user requests unbounded traversal (*).
///
/// Note: Neo4j and ArangoDB do NOT impose hard limits.
/// 100 is chosen to cover most real-world use cases:
/// - Social networks (6 degrees of separation)
/// - Dependency graphs (deep npm/cargo trees)
/// - Organizational hierarchies
/// - Knowledge graphs
pub const SAFETY_MAX_DEPTH: u32 = 100;

/// Stack-allocated path type used internally by BFS/DFS traversal.
///
/// Most graph queries are depth 1-3 (social networks, knowledge graphs).
/// `SmallVec<[u64; 4]>` stores up to 4 edge IDs inline (32 bytes on stack)
/// and only heap-allocates for deeper traversals, eliminating per-path
/// allocation overhead in the common case.
///
/// Note: [`TraversalResult::path`] uses `Vec<u64>` at the public API
/// boundary. This type is exposed for advanced internal use only.
pub type TraversalPath = SmallVec<[u64; 4]>;

/// Result of a graph traversal operation.
///
/// The `path` field is `Vec<u64>` at the public API boundary. Internally,
/// BFS state uses `SmallVec<[u64; 4]>` to avoid per-path heap allocation
/// for typical depth 1-3 traversals, but this is converted to `Vec` when
/// building the result.
#[derive(Debug, Clone)]
pub struct TraversalResult {
    /// The target node ID reached.
    pub target_id: u64,
    /// The path taken (list of edge IDs).
    pub path: Vec<u64>,
    /// Depth of the traversal (number of hops).
    pub depth: u32,
}

impl TraversalResult {
    /// Creates a new traversal result.
    #[must_use]
    pub fn new(target_id: u64, path: Vec<u64>, depth: u32) -> Self {
        Self {
            target_id,
            path,
            depth,
        }
    }

    /// Creates a traversal result from an internal `SmallVec` path.
    ///
    /// Converts the stack-allocated path to a heap-allocated `Vec` at the
    /// public API boundary.
    #[must_use]
    #[allow(clippy::needless_pass_by_value)]
    // Reason: Constructor used by MATCH clause traversal (Wave 6 wiring).
    #[allow(dead_code)]
    pub(crate) fn from_smallvec(target_id: u64, path: TraversalPath, depth: u32) -> Self {
        Self {
            target_id,
            path: path.to_vec(),
            depth,
        }
    }
}

/// Configuration for graph traversal.
#[derive(Debug, Clone)]
pub struct TraversalConfig {
    /// Minimum number of hops (inclusive).
    pub min_depth: u32,
    /// Maximum number of hops (inclusive).
    pub max_depth: u32,
    /// Maximum number of results to return.
    pub limit: usize,
    /// Filter by relationship types (empty = all types).
    pub rel_types: Vec<String>,
}

impl Default for TraversalConfig {
    fn default() -> Self {
        Self {
            min_depth: 1,
            max_depth: DEFAULT_MAX_DEPTH,
            limit: 100,
            rel_types: Vec::new(),
        }
    }
}

impl TraversalConfig {
    /// Creates a config for a specific range (e.g., *1..3).
    ///
    /// Respects the caller's max_depth without artificial capping.
    /// For unbounded traversals, use `with_unbounded_range()` instead.
    #[must_use]
    pub fn with_range(min: u32, max: u32) -> Self {
        Self {
            min_depth: min,
            max_depth: max,
            ..Self::default()
        }
    }

    /// Creates a config for unbounded traversal (e.g., *1..).
    ///
    /// Applies SAFETY_MAX_DEPTH cap to prevent runaway traversals.
    #[must_use]
    pub fn with_unbounded_range(min: u32) -> Self {
        Self {
            min_depth: min,
            max_depth: SAFETY_MAX_DEPTH,
            ..Self::default()
        }
    }

    /// Sets the result limit.
    #[must_use]
    pub fn with_limit(mut self, limit: usize) -> Self {
        self.limit = limit;
        self
    }

    /// Filters by relationship types.
    #[must_use]
    pub fn with_rel_types(mut self, types: Vec<String>) -> Self {
        self.rel_types = types;
        self
    }

    /// Sets a custom max depth (for advanced use cases).
    #[must_use]
    pub fn with_max_depth(mut self, max_depth: u32) -> Self {
        self.max_depth = max_depth;
        self
    }
}

/// BFS state for traversal (parent-pointer variant).
///
/// Path reconstruction is deferred to result collection via a shared
/// `FxHashMap<u64, (u64, u64)>` (target -> (parent, edge_id)).
/// This eliminates per-edge `SmallVec` clones during frontier expansion.
#[derive(Debug)]
pub(super) struct BfsState {
    /// Current node ID.
    pub(super) node_id: u64,
    /// Current depth.
    pub(super) depth: u32,
}

/// Reconstructs the edge-ID path from `target` back to `source` using parent pointers.
///
/// Walks the parent chain `target -> parent -> ... -> source` collecting edge IDs,
/// then reverses to produce source-to-target order.  Returns an empty `Vec` if
/// `target == source` (the traversal root has no parent entry).
#[must_use]
pub(super) fn reconstruct_path(
    target: u64,
    source: u64,
    parent_map: &FxHashMap<u64, (u64, u64)>,
) -> Vec<u64> {
    let mut path = Vec::new();
    let mut current = target;
    while current != source {
        if let Some(&(parent, edge_id)) = parent_map.get(&current) {
            path.push(edge_id);
            current = parent;
        } else {
            // Unreachable in a correctly-built parent_map; defensive break.
            break;
        }
    }
    path.reverse();
    path
}

/// Direction of edge traversal used by the shared BFS helper.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BfsDirection {
    /// Follow outgoing edges (source -> target).
    Forward,
    /// Follow incoming edges (target -> source).
    Reverse,
}

/// Core BFS loop shared by forward and reverse traversal.
///
/// Uses parent-pointer map instead of per-state path cloning.
/// Paths are reconstructed lazily only for nodes that qualify as results
/// (depth >= min_depth), eliminating O(visited * avg_depth) clone overhead.
///
/// For forward direction, uses CSR zero-copy path when snapshot exists,
/// avoiding `GraphEdge` cloning. Reverse direction always uses legacy path
/// since the CSR snapshot only covers outgoing edges.
#[must_use]
fn bfs_traverse_directed(
    edge_store: &EdgeStore,
    source_id: u64,
    config: &TraversalConfig,
    direction: BfsDirection,
) -> Vec<TraversalResult> {
    let mut results = Vec::new();
    let mut visited = FxHashSet::default();
    let mut queue = VecDeque::new();
    // Parent-pointer map: target_node -> (parent_node, edge_id).
    // O(visited_nodes) memory vs O(visited_nodes * avg_depth) for path cloning.
    let mut parent_map: FxHashMap<u64, (u64, u64)> = FxHashMap::default();

    // Pre-build a FxHashSet<&str> once for the entire traversal, not per-node.
    let rel_filter: FxHashSet<&str> = config.rel_types.iter().map(String::as_str).collect();

    // CRITICAL FIX: Mark source node as visited before traversal
    // to prevent cycles back to source causing duplicate work
    visited.insert(source_id);

    queue.push_back(BfsState {
        node_id: source_id,
        depth: 0,
    });

    // Use CSR zero-copy path for forward traversal when snapshot exists.
    let use_csr = direction == BfsDirection::Forward && edge_store.has_csr_snapshot();

    while let Some(state) = queue.pop_front() {
        if results.len() >= config.limit {
            break;
        }
        if use_csr {
            process_bfs_csr(
                edge_store,
                &state,
                config,
                source_id,
                &rel_filter,
                &mut results,
                &mut visited,
                &mut queue,
                &mut parent_map,
            );
        } else {
            let edges = match direction {
                BfsDirection::Forward => edge_store.get_outgoing(state.node_id),
                BfsDirection::Reverse => edge_store.get_incoming(state.node_id),
            };
            process_bfs_neighbors(
                &edges,
                &state,
                config,
                source_id,
                &rel_filter,
                direction,
                &mut results,
                &mut visited,
                &mut queue,
                &mut parent_map,
            );
        }
    }

    results
}

/// CSR zero-copy BFS expansion for forward traversal.
///
/// Reads target IDs, edge IDs, and interned labels from contiguous memory
/// instead of cloning full `GraphEdge` objects (96+ bytes each).
/// Uses parent-pointer insertion instead of path cloning.
/// Only emits results for newly-discovered nodes (no duplicates).
#[inline]
#[allow(clippy::too_many_arguments)] // Reason: BFS helper passes parent_map alongside traversal state; private fn
fn process_bfs_csr(
    edge_store: &EdgeStore,
    state: &BfsState,
    config: &TraversalConfig,
    source_id: u64,
    rel_filter: &FxHashSet<&str>,
    results: &mut Vec<TraversalResult>,
    visited: &mut FxHashSet<u64>,
    queue: &mut VecDeque<BfsState>,
    parent_map: &mut FxHashMap<u64, (u64, u64)>,
) {
    let snapshot = edge_store
        .csr_snapshot()
        .expect("invariant: CSR snapshot checked before calling process_bfs_csr");
    let targets = snapshot.neighbors(state.node_id);
    let edge_ids = snapshot.edge_ids(state.node_id);

    for (i, (&target, &eid)) in targets.iter().zip(edge_ids.iter()).enumerate() {
        if results.len() >= config.limit {
            break;
        }
        if let Some(label) = snapshot.label_at(state.node_id, i) {
            if !rel_filter.is_empty() && !rel_filter.contains(label) {
                continue;
            }
        } else if !rel_filter.is_empty() {
            // Edge with unresolvable label excluded when filter is active
            continue;
        }
        process_bfs_candidate(
            target,
            eid,
            state.node_id,
            state.depth,
            config,
            source_id,
            results,
            visited,
            queue,
            parent_map,
        );
    }
}

/// Processes neighbors for a single BFS level: filters edges, records results,
/// and enqueues unvisited nodes for the next hop.
/// Uses parent-pointer insertion instead of path cloning.
/// Only emits results for newly-discovered nodes (no duplicates).
#[inline]
#[allow(clippy::too_many_arguments)] // Reason: BFS helper passes parent_map alongside traversal state; private fn
fn process_bfs_neighbors(
    edges: &[&super::GraphEdge],
    state: &BfsState,
    config: &TraversalConfig,
    source_id: u64,
    rel_filter: &FxHashSet<&str>,
    direction: BfsDirection,
    results: &mut Vec<TraversalResult>,
    visited: &mut FxHashSet<u64>,
    queue: &mut VecDeque<BfsState>,
    parent_map: &mut FxHashMap<u64, (u64, u64)>,
) {
    for edge in edges {
        if results.len() >= config.limit {
            break;
        }
        if !rel_filter.is_empty() && !rel_filter.contains(edge.label()) {
            continue;
        }
        let next_node = match direction {
            BfsDirection::Forward => edge.target(),
            BfsDirection::Reverse => edge.source(),
        };
        process_bfs_candidate(
            next_node,
            edge.id(),
            state.node_id,
            state.depth,
            config,
            source_id,
            results,
            visited,
            queue,
            parent_map,
        );
    }
}

/// Processes a single BFS candidate: checks depth, visited status, records
/// parent pointer, emits result if within depth range, and enqueues for
/// further expansion.
#[inline]
#[allow(clippy::too_many_arguments)]
fn process_bfs_candidate(
    target: u64,
    edge_id: u64,
    parent_node: u64,
    current_depth: u32,
    config: &TraversalConfig,
    source_id: u64,
    results: &mut Vec<TraversalResult>,
    visited: &mut FxHashSet<u64>,
    queue: &mut VecDeque<BfsState>,
    parent_map: &mut FxHashMap<u64, (u64, u64)>,
) {
    let new_depth = current_depth + 1;
    if new_depth > config.max_depth {
        return;
    }
    let is_new = visited.insert(target);
    if is_new {
        parent_map.insert(target, (parent_node, edge_id));

        if new_depth >= config.min_depth {
            let path = reconstruct_path(target, source_id, parent_map);
            results.push(TraversalResult::new(target, path, new_depth));
        }
        if new_depth < config.max_depth {
            queue.push_back(BfsState {
                node_id: target,
                depth: new_depth,
            });
        }
    }
}

/// Performs BFS traversal from a source node.
///
/// Finds all paths from `source_id` within the configured depth range.
/// Uses iterative BFS with `VecDeque` for better cache locality.
///
/// # Arguments
///
/// * `edge_store` - The edge storage to traverse.
/// * `source_id` - Starting node ID.
/// * `config` - Traversal configuration.
///
/// # Returns
///
/// Vector of traversal results, limited by `config.limit`.
#[must_use]
pub fn bfs_traverse(
    edge_store: &EdgeStore,
    source_id: u64,
    config: &TraversalConfig,
) -> Vec<TraversalResult> {
    bfs_traverse_directed(edge_store, source_id, config, BfsDirection::Forward)
}

/// Performs BFS traversal in the reverse direction (following incoming edges).
#[must_use]
pub fn bfs_traverse_reverse(
    edge_store: &EdgeStore,
    source_id: u64,
    config: &TraversalConfig,
) -> Vec<TraversalResult> {
    bfs_traverse_directed(edge_store, source_id, config, BfsDirection::Reverse)
}

// Tests moved to traversal_tests.rs per project rules