velesdb_core/collection/graph/edge_concurrent/query.rs
1//! Read-only query and traversal methods for `ConcurrentEdgeStore`.
2//!
3//! Extracted from the main module for single-responsibility:
4//! - Edge lookups (by node, by label, by ID)
5//! - BFS traversal
6//! - Edge count
7
8use super::{ConcurrentEdgeStore, GraphEdge};
9use std::collections::{HashSet, VecDeque};
10
11impl ConcurrentEdgeStore {
12 /// Gets all outgoing edges from a node (thread-safe).
13 #[must_use]
14 pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
15 let shard = &self.shards[self.shard_index(node_id)];
16 let guard = shard.read();
17 guard.get_outgoing(node_id).into_iter().cloned().collect()
18 }
19
20 /// Gets all incoming edges to a node (thread-safe).
21 #[must_use]
22 pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
23 let shard = &self.shards[self.shard_index(node_id)];
24 let guard = shard.read();
25 guard.get_incoming(node_id).into_iter().cloned().collect()
26 }
27
28 /// Gets neighbors (target nodes) of a given node.
29 #[must_use]
30 pub fn get_neighbors(&self, node_id: u64) -> Vec<u64> {
31 self.get_outgoing(node_id)
32 .iter()
33 .map(GraphEdge::target)
34 .collect()
35 }
36
37 /// Gets outgoing edges filtered by label (thread-safe).
38 ///
39 /// # Performance Note
40 ///
41 /// This method delegates to the underlying `EdgeStore::get_outgoing_by_label`
42 /// which uses the composite index `(source_id, label) -> edge_ids` for O(1) lookup
43 /// when available (EPIC-019 US-003). Falls back to filtering if index not populated.
44 #[must_use]
45 pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<GraphEdge> {
46 let shard_idx = self.shard_index(node_id);
47 let shard = self.shards[shard_idx].read();
48 shard
49 .get_outgoing_by_label(node_id, label)
50 .into_iter()
51 .cloned()
52 .collect()
53 }
54
55 /// Gets incoming edges filtered by label (thread-safe).
56 #[must_use]
57 pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<GraphEdge> {
58 self.get_incoming(node_id)
59 .into_iter()
60 .filter(|e| e.label() == label)
61 .collect()
62 }
63
64 /// Gets all edges with a specific label across all shards.
65 ///
66 /// # Performance Warning
67 ///
68 /// This method iterates through ALL shards and aggregates results.
69 /// For large graphs with many shards, this can be expensive.
70 /// Consider using `get_outgoing_by_label(node_id, label)` if you know
71 /// the source node, which is O(k) instead of O(shards × edges_per_label).
72 #[must_use]
73 pub fn get_edges_by_label(&self, label: &str) -> Vec<GraphEdge> {
74 self.shards
75 .iter()
76 .flat_map(|shard| {
77 shard
78 .read()
79 .get_edges_by_label(label)
80 .into_iter()
81 .cloned()
82 .collect::<Vec<_>>()
83 })
84 .collect()
85 }
86
87 /// Checks if an edge with the given ID exists.
88 #[must_use]
89 pub fn contains_edge(&self, edge_id: u64) -> bool {
90 self.edge_ids.read().contains_key(&edge_id)
91 }
92
93 /// Gets an edge by ID using optimized source shard lookup.
94 ///
95 /// Returns `None` if the edge doesn't exist.
96 #[must_use]
97 pub fn get_edge(&self, edge_id: u64) -> Option<GraphEdge> {
98 // Get source_id from registry for direct shard lookup
99 let source_id = *self.edge_ids.read().get(&edge_id)?;
100 let shard_idx = self.shard_index(source_id);
101 self.shards[shard_idx].read().get_edge(edge_id).cloned()
102 }
103
104 /// Traverses the graph using BFS from a starting node.
105 ///
106 /// Returns all nodes reachable within `max_depth` hops.
107 ///
108 /// Uses Read-Copy-Drop pattern to avoid holding locks during traversal.
109 #[must_use]
110 pub fn traverse_bfs(&self, start: u64, max_depth: u32) -> Vec<u64> {
111 let mut visited = HashSet::new();
112 let mut queue = VecDeque::new();
113 queue.push_back((start, 0u32));
114
115 while let Some((node, depth)) = queue.pop_front() {
116 if depth > max_depth || !visited.insert(node) {
117 continue;
118 }
119
120 // Read-Copy-Drop pattern: copy neighbors and drop guard immediately
121 let neighbors: Vec<u64> = {
122 let shard = &self.shards[self.shard_index(node)];
123 let guard = shard.read();
124 guard
125 .get_outgoing(node)
126 .iter()
127 .map(|e| e.target())
128 .collect()
129 }; // Guard dropped here
130
131 for neighbor in neighbors {
132 if !visited.contains(&neighbor) {
133 queue.push_back((neighbor, depth + 1));
134 }
135 }
136 }
137
138 visited.into_iter().collect()
139 }
140
141 /// Returns the total edge count across all shards.
142 ///
143 /// Uses outgoing edge count to avoid double-counting edges that span shards.
144 #[must_use]
145 pub fn edge_count(&self) -> usize {
146 self.shards
147 .iter()
148 .map(|s| s.read().outgoing_edge_count())
149 .sum()
150 }
151}