Skip to main content

velesdb_core/collection/graph/edge_concurrent/
query.rs

1//! Read-only query and traversal methods for `ConcurrentEdgeStore`.
2//!
3//! Extracted from the main module for single-responsibility:
4//! - Edge lookups (by node, by label, by ID)
5//! - BFS traversal
6//! - Edge count
7
8use super::{ConcurrentEdgeStore, GraphEdge};
9use std::collections::{HashSet, VecDeque};
10
11impl ConcurrentEdgeStore {
12    /// Gets all outgoing edges from a node (thread-safe).
13    #[must_use]
14    pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
15        let shard = &self.shards[self.shard_index(node_id)];
16        let guard = shard.read();
17        guard.get_outgoing(node_id).into_iter().cloned().collect()
18    }
19
20    /// Gets all incoming edges to a node (thread-safe).
21    #[must_use]
22    pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
23        let shard = &self.shards[self.shard_index(node_id)];
24        let guard = shard.read();
25        guard.get_incoming(node_id).into_iter().cloned().collect()
26    }
27
28    /// Gets neighbors (target nodes) of a given node.
29    #[must_use]
30    pub fn get_neighbors(&self, node_id: u64) -> Vec<u64> {
31        self.get_outgoing(node_id)
32            .iter()
33            .map(GraphEdge::target)
34            .collect()
35    }
36
37    /// Gets outgoing edges filtered by label (thread-safe).
38    ///
39    /// # Performance Note
40    ///
41    /// This method delegates to the underlying `EdgeStore::get_outgoing_by_label`
42    /// which uses the composite index `(source_id, label) -> edge_ids` for O(1) lookup
43    /// when available (EPIC-019 US-003). Falls back to filtering if index not populated.
44    #[must_use]
45    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<GraphEdge> {
46        let shard_idx = self.shard_index(node_id);
47        let shard = self.shards[shard_idx].read();
48        shard
49            .get_outgoing_by_label(node_id, label)
50            .into_iter()
51            .cloned()
52            .collect()
53    }
54
55    /// Gets incoming edges filtered by label (thread-safe).
56    #[must_use]
57    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<GraphEdge> {
58        self.get_incoming(node_id)
59            .into_iter()
60            .filter(|e| e.label() == label)
61            .collect()
62    }
63
64    /// Gets all edges with a specific label across all shards.
65    ///
66    /// # Performance Warning
67    ///
68    /// This method iterates through ALL shards and aggregates results.
69    /// For large graphs with many shards, this can be expensive.
70    /// Consider using `get_outgoing_by_label(node_id, label)` if you know
71    /// the source node, which is O(k) instead of O(shards × edges_per_label).
72    #[must_use]
73    pub fn get_edges_by_label(&self, label: &str) -> Vec<GraphEdge> {
74        self.shards
75            .iter()
76            .flat_map(|shard| {
77                shard
78                    .read()
79                    .get_edges_by_label(label)
80                    .into_iter()
81                    .cloned()
82                    .collect::<Vec<_>>()
83            })
84            .collect()
85    }
86
87    /// Checks if an edge with the given ID exists.
88    #[must_use]
89    pub fn contains_edge(&self, edge_id: u64) -> bool {
90        self.edge_ids.read().contains_key(&edge_id)
91    }
92
93    /// Gets an edge by ID using optimized source shard lookup.
94    ///
95    /// Returns `None` if the edge doesn't exist.
96    #[must_use]
97    pub fn get_edge(&self, edge_id: u64) -> Option<GraphEdge> {
98        // Get source_id from registry for direct shard lookup
99        let source_id = *self.edge_ids.read().get(&edge_id)?;
100        let shard_idx = self.shard_index(source_id);
101        self.shards[shard_idx].read().get_edge(edge_id).cloned()
102    }
103
104    /// Traverses the graph using BFS from a starting node.
105    ///
106    /// Returns all nodes reachable within `max_depth` hops.
107    ///
108    /// Uses Read-Copy-Drop pattern to avoid holding locks during traversal.
109    #[must_use]
110    pub fn traverse_bfs(&self, start: u64, max_depth: u32) -> Vec<u64> {
111        let mut visited = HashSet::new();
112        let mut queue = VecDeque::new();
113        queue.push_back((start, 0u32));
114
115        while let Some((node, depth)) = queue.pop_front() {
116            if depth > max_depth || !visited.insert(node) {
117                continue;
118            }
119
120            // Read-Copy-Drop pattern: copy neighbors and drop guard immediately
121            let neighbors: Vec<u64> = {
122                let shard = &self.shards[self.shard_index(node)];
123                let guard = shard.read();
124                guard
125                    .get_outgoing(node)
126                    .iter()
127                    .map(|e| e.target())
128                    .collect()
129            }; // Guard dropped here
130
131            for neighbor in neighbors {
132                if !visited.contains(&neighbor) {
133                    queue.push_back((neighbor, depth + 1));
134                }
135            }
136        }
137
138        visited.into_iter().collect()
139    }
140
141    /// Returns the total edge count across all shards.
142    ///
143    /// Uses outgoing edge count to avoid double-counting edges that span shards.
144    #[must_use]
145    pub fn edge_count(&self) -> usize {
146        self.shards
147            .iter()
148            .map(|s| s.read().outgoing_edge_count())
149            .sum()
150    }
151}