velesdb_core/collection/graph/edge_concurrent/
mod.rs1#![allow(clippy::cast_possible_truncation)]
12
13mod query;
14
15use super::edge::{EdgeStore, GraphEdge};
16use crate::error::{Error, Result};
17use parking_lot::RwLock;
18use std::collections::{HashMap, HashSet};
19
20const DEFAULT_NUM_SHARDS: usize = 256;
23
24const MIN_EDGES_PER_SHARD: usize = 1000;
27
28const MAX_SHARDS: usize = 512;
30
31#[repr(C, align(64))]
48pub struct ConcurrentEdgeStore {
49 pub(super) shards: Vec<RwLock<EdgeStore>>,
50 pub(super) num_shards: usize,
51 pub(super) edge_ids: RwLock<HashMap<u64, u64>>,
54}
55
56impl ConcurrentEdgeStore {
57 #[must_use]
59 pub fn new() -> Self {
60 Self::with_shards(DEFAULT_NUM_SHARDS)
61 }
62
63 #[must_use]
69 pub fn with_shards(num_shards: usize) -> Self {
70 assert!(num_shards > 0, "num_shards must be at least 1");
71 let shards = (0..num_shards)
72 .map(|_| RwLock::new(EdgeStore::new()))
73 .collect();
74 Self {
75 shards,
76 num_shards,
77 edge_ids: RwLock::new(HashMap::new()),
78 }
79 }
80
81 #[must_use]
85 pub fn with_estimated_edges(estimated_edges: usize) -> Self {
86 let optimal_shards = if estimated_edges < MIN_EDGES_PER_SHARD {
87 1
88 } else {
89 let target_shards = estimated_edges / MIN_EDGES_PER_SHARD;
90 let power_of_2 = if target_shards <= 1 {
91 0
92 } else {
93 usize::BITS - (target_shards - 1).leading_zeros()
94 };
95 (1usize << power_of_2).clamp(1, MAX_SHARDS)
96 };
97 Self::with_shards(optimal_shards)
98 }
99
100 #[inline]
102 pub(super) fn shard_index(&self, node_id: u64) -> usize {
103 (node_id as usize) % self.num_shards
104 }
105
106 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
119 let edge_id = edge.id();
120
121 let mut ids = self.edge_ids.write();
125 if ids.contains_key(&edge_id) {
126 return Err(Error::EdgeExists(edge_id));
127 }
128
129 let source_id = edge.source();
130 let source_shard = self.shard_index(source_id);
131 let target_shard = self.shard_index(edge.target());
132
133 if source_shard == target_shard {
134 let mut guard = self.shards[source_shard].write();
136 guard.add_edge(edge)?;
137 ids.insert(edge_id, source_id);
138 } else {
139 let (first_idx, second_idx) = if source_shard < target_shard {
141 (source_shard, target_shard)
142 } else {
143 (target_shard, source_shard)
144 };
145
146 let mut first_guard = self.shards[first_idx].write();
147 let mut second_guard = self.shards[second_idx].write();
148
149 if source_shard < target_shard {
150 first_guard.add_edge_outgoing_only(edge.clone())?;
151 if let Err(e) = second_guard.add_edge_incoming_only(edge) {
152 first_guard.remove_edge_outgoing_only(edge_id);
153 return Err(e);
154 }
155 } else {
156 second_guard.add_edge_outgoing_only(edge.clone())?;
157 if let Err(e) = first_guard.add_edge_incoming_only(edge) {
158 second_guard.remove_edge_outgoing_only(edge_id);
159 return Err(e);
160 }
161 }
162 ids.insert(edge_id, source_id);
163 }
164 Ok(())
165 }
166
167 pub fn remove_edge(&self, edge_id: u64) {
173 let mut ids = self.edge_ids.write();
174
175 let Some(&source_id) = ids.get(&edge_id) else {
176 return;
177 };
178
179 let source_shard_idx = self.shard_index(source_id);
180 let target_id = {
181 let guard = self.shards[source_shard_idx].read();
182 if let Some(edge) = guard.get_edge(edge_id) {
183 edge.target()
184 } else {
185 ids.remove(&edge_id);
186 return;
187 }
188 };
189
190 let target_shard_idx = self.shard_index(target_id);
191
192 if source_shard_idx == target_shard_idx {
193 self.shards[source_shard_idx].write().remove_edge(edge_id);
194 } else {
195 let (first_idx, second_idx) = if source_shard_idx < target_shard_idx {
196 (source_shard_idx, target_shard_idx)
197 } else {
198 (target_shard_idx, source_shard_idx)
199 };
200 let mut first = self.shards[first_idx].write();
201 let mut second = self.shards[second_idx].write();
202
203 if source_shard_idx < target_shard_idx {
204 first.remove_edge(edge_id);
205 second.remove_edge_incoming_only(edge_id);
206 } else {
207 first.remove_edge_incoming_only(edge_id);
208 second.remove_edge(edge_id);
209 }
210 }
211
212 ids.remove(&edge_id);
213 }
214
215 pub fn remove_node_edges(&self, node_id: u64) {
221 let mut ids = self.edge_ids.write();
222
223 let node_shard = self.shard_index(node_id);
224
225 let (outgoing_edges, incoming_edges): (Vec<_>, Vec<_>) = {
227 let guard = self.shards[node_shard].read();
228 let outgoing: Vec<_> = guard
229 .get_outgoing(node_id)
230 .iter()
231 .map(|e| (e.id(), e.target()))
232 .collect();
233 let incoming: Vec<_> = guard
234 .get_incoming(node_id)
235 .iter()
236 .map(|e| (e.id(), e.source()))
237 .collect();
238 (outgoing, incoming)
239 };
240
241 let mut shards_to_clean: std::collections::BTreeSet<usize> =
243 std::collections::BTreeSet::new();
244 shards_to_clean.insert(node_shard);
245
246 for (_, target) in &outgoing_edges {
247 shards_to_clean.insert(self.shard_index(*target));
248 }
249 for (_, source) in &incoming_edges {
250 shards_to_clean.insert(self.shard_index(*source));
251 }
252
253 let mut guards: Vec<_> = shards_to_clean
255 .iter()
256 .map(|&idx| (idx, self.shards[idx].write()))
257 .collect();
258
259 for (shard_idx, guard) in &mut guards {
261 if *shard_idx == node_shard {
262 guard.remove_node_edges(node_id);
263 } else {
264 for (edge_id, target) in &outgoing_edges {
265 if self.shard_index(*target) == *shard_idx {
266 guard.remove_edge_incoming_only(*edge_id);
267 }
268 }
269 for (edge_id, source) in &incoming_edges {
270 if self.shard_index(*source) == *shard_idx {
271 guard.remove_edge_outgoing_only(*edge_id);
272 }
273 }
274 }
275 }
276
277 let mut removed: HashSet<u64> = HashSet::new();
279 for (edge_id, _) in &outgoing_edges {
280 if removed.insert(*edge_id) {
281 ids.remove(edge_id);
282 }
283 }
284 for (edge_id, _) in &incoming_edges {
285 if removed.insert(*edge_id) {
286 ids.remove(edge_id);
287 }
288 }
289 }
290}
291
292impl Default for ConcurrentEdgeStore {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298const _: () = {
300 const fn assert_send_sync<T: Send + Sync>() {}
301 assert_send_sync::<ConcurrentEdgeStore>();
302};