Skip to main content

velesdb_core/collection/graph/edge_concurrent/
mod.rs

1//! Concurrent edge store with sharded locking.
2//!
3//! This module provides `ConcurrentEdgeStore`, a thread-safe wrapper around
4//! `EdgeStore` that uses sharding to reduce lock contention.
5//!
6//! Read-only queries and traversal are in `query.rs`.
7
8// SAFETY: Numeric casts in edge store sharding are intentional:
9// - u64->usize for node ID hashing: Node IDs are generated sequentially and fit in usize
10// - Used for sharding only, actual storage uses u64 for persistence
11#![allow(clippy::cast_possible_truncation)]
12
13mod query;
14
15use super::edge::{EdgeStore, GraphEdge};
16use crate::error::{Error, Result};
17use parking_lot::RwLock;
18use std::collections::{HashMap, HashSet};
19
20/// Default number of shards for concurrent edge store.
21/// Increased from 64 to 256 for better scalability with 10M+ edges (EPIC-019 US-001).
22const DEFAULT_NUM_SHARDS: usize = 256;
23
24/// Minimum edges per shard for efficiency.
25/// Below this threshold, having more shards adds overhead without benefit.
26const MIN_EDGES_PER_SHARD: usize = 1000;
27
28/// Maximum recommended shards to limit memory overhead from RwLock + EdgeStore structures.
29const MAX_SHARDS: usize = 512;
30
31/// A thread-safe edge store using sharded locking.
32///
33/// Distributes edges across multiple shards based on source node ID
34/// to reduce lock contention in multi-threaded scenarios.
35///
36/// # Cross-Shard Edge Storage Pattern
37///
38/// Edges that span different shards (source and target in different shards) are stored
39/// in BOTH shards:
40/// - **Source shard**: Full edge with outgoing + label indices (`add_edge`)
41/// - **Target shard**: Edge copy with incoming index only (`add_edge_incoming_only`)
42///
43/// # Lock Ordering
44///
45/// When acquiring multiple shard locks, always acquire in ascending
46/// shard index order to prevent deadlocks.
47#[repr(C, align(64))]
48pub struct ConcurrentEdgeStore {
49    pub(super) shards: Vec<RwLock<EdgeStore>>,
50    pub(super) num_shards: usize,
51    /// Global registry of edge IDs with source node for optimized removal.
52    /// Maps edge_id -> source_node_id for O(1) shard lookup during removal.
53    pub(super) edge_ids: RwLock<HashMap<u64, u64>>,
54}
55
56impl ConcurrentEdgeStore {
57    /// Creates a new concurrent edge store with the default number of shards.
58    #[must_use]
59    pub fn new() -> Self {
60        Self::with_shards(DEFAULT_NUM_SHARDS)
61    }
62
63    /// Creates a new concurrent edge store with a specific number of shards.
64    ///
65    /// # Panics
66    ///
67    /// Panics if `num_shards` is 0 (would cause division-by-zero in shard_index).
68    #[must_use]
69    pub fn with_shards(num_shards: usize) -> Self {
70        assert!(num_shards > 0, "num_shards must be at least 1");
71        let shards = (0..num_shards)
72            .map(|_| RwLock::new(EdgeStore::new()))
73            .collect();
74        Self {
75            shards,
76            num_shards,
77            edge_ids: RwLock::new(HashMap::new()),
78        }
79    }
80
81    /// Creates a concurrent edge store with optimal shard count for estimated edge count.
82    ///
83    /// **FLAG-6: Uses integer bit manipulation for ceiling log2.**
84    #[must_use]
85    pub fn with_estimated_edges(estimated_edges: usize) -> Self {
86        let optimal_shards = if estimated_edges < MIN_EDGES_PER_SHARD {
87            1
88        } else {
89            let target_shards = estimated_edges / MIN_EDGES_PER_SHARD;
90            let power_of_2 = if target_shards <= 1 {
91                0
92            } else {
93                usize::BITS - (target_shards - 1).leading_zeros()
94            };
95            (1usize << power_of_2).clamp(1, MAX_SHARDS)
96        };
97        Self::with_shards(optimal_shards)
98    }
99
100    /// Returns the shard index for a given node ID.
101    #[inline]
102    pub(super) fn shard_index(&self, node_id: u64) -> usize {
103        (node_id as usize) % self.num_shards
104    }
105
106    /// Adds an edge to the store (thread-safe).
107    ///
108    /// Edges are stored in BOTH source and target shards:
109    /// - Source shard: for outgoing index lookups
110    /// - Target shard: for incoming index lookups
111    ///
112    /// When source and target are in different shards, locks are acquired
113    /// in ascending shard index order to prevent deadlocks.
114    ///
115    /// # Errors
116    ///
117    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
118    pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
119        let edge_id = edge.id();
120
121        // CRITICAL: Hold edge_ids lock throughout the entire operation to prevent race
122        // condition where remove_edge could free an ID while we're still inserting.
123        // Lock ordering: edge_ids FIRST, then shards in ascending order.
124        let mut ids = self.edge_ids.write();
125        if ids.contains_key(&edge_id) {
126            return Err(Error::EdgeExists(edge_id));
127        }
128
129        let source_id = edge.source();
130        let source_shard = self.shard_index(source_id);
131        let target_shard = self.shard_index(edge.target());
132
133        if source_shard == target_shard {
134            // Same shard: single lock, EdgeStore handles both indices
135            let mut guard = self.shards[source_shard].write();
136            guard.add_edge(edge)?;
137            ids.insert(edge_id, source_id);
138        } else {
139            // Different shards: acquire locks in ascending order to prevent deadlock
140            let (first_idx, second_idx) = if source_shard < target_shard {
141                (source_shard, target_shard)
142            } else {
143                (target_shard, source_shard)
144            };
145
146            let mut first_guard = self.shards[first_idx].write();
147            let mut second_guard = self.shards[second_idx].write();
148
149            if source_shard < target_shard {
150                first_guard.add_edge_outgoing_only(edge.clone())?;
151                if let Err(e) = second_guard.add_edge_incoming_only(edge) {
152                    first_guard.remove_edge_outgoing_only(edge_id);
153                    return Err(e);
154                }
155            } else {
156                second_guard.add_edge_outgoing_only(edge.clone())?;
157                if let Err(e) = first_guard.add_edge_incoming_only(edge) {
158                    second_guard.remove_edge_outgoing_only(edge_id);
159                    return Err(e);
160                }
161            }
162            ids.insert(edge_id, source_id);
163        }
164        Ok(())
165    }
166
167    /// Removes an edge by ID using optimized 2-shard lookup.
168    ///
169    /// # Concurrency Safety
170    ///
171    /// Lock ordering: edge_ids FIRST, then shards in ascending order.
172    pub fn remove_edge(&self, edge_id: u64) {
173        let mut ids = self.edge_ids.write();
174
175        let Some(&source_id) = ids.get(&edge_id) else {
176            return;
177        };
178
179        let source_shard_idx = self.shard_index(source_id);
180        let target_id = {
181            let guard = self.shards[source_shard_idx].read();
182            if let Some(edge) = guard.get_edge(edge_id) {
183                edge.target()
184            } else {
185                ids.remove(&edge_id);
186                return;
187            }
188        };
189
190        let target_shard_idx = self.shard_index(target_id);
191
192        if source_shard_idx == target_shard_idx {
193            self.shards[source_shard_idx].write().remove_edge(edge_id);
194        } else {
195            let (first_idx, second_idx) = if source_shard_idx < target_shard_idx {
196                (source_shard_idx, target_shard_idx)
197            } else {
198                (target_shard_idx, source_shard_idx)
199            };
200            let mut first = self.shards[first_idx].write();
201            let mut second = self.shards[second_idx].write();
202
203            if source_shard_idx < target_shard_idx {
204                first.remove_edge(edge_id);
205                second.remove_edge_incoming_only(edge_id);
206            } else {
207                first.remove_edge_incoming_only(edge_id);
208                second.remove_edge(edge_id);
209            }
210        }
211
212        ids.remove(&edge_id);
213    }
214
215    /// Removes all edges connected to a node (cascade delete, thread-safe).
216    ///
217    /// # Concurrency Safety
218    ///
219    /// Lock ordering: edge_ids FIRST, then shards in ascending order.
220    pub fn remove_node_edges(&self, node_id: u64) {
221        let mut ids = self.edge_ids.write();
222
223        let node_shard = self.shard_index(node_id);
224
225        // Phase 1: Collect all edges connected to this node (read-only)
226        let (outgoing_edges, incoming_edges): (Vec<_>, Vec<_>) = {
227            let guard = self.shards[node_shard].read();
228            let outgoing: Vec<_> = guard
229                .get_outgoing(node_id)
230                .iter()
231                .map(|e| (e.id(), e.target()))
232                .collect();
233            let incoming: Vec<_> = guard
234                .get_incoming(node_id)
235                .iter()
236                .map(|e| (e.id(), e.source()))
237                .collect();
238            (outgoing, incoming)
239        };
240
241        // Phase 2: Collect all shards that need cleanup (BTreeSet = sorted ascending)
242        let mut shards_to_clean: std::collections::BTreeSet<usize> =
243            std::collections::BTreeSet::new();
244        shards_to_clean.insert(node_shard);
245
246        for (_, target) in &outgoing_edges {
247            shards_to_clean.insert(self.shard_index(*target));
248        }
249        for (_, source) in &incoming_edges {
250            shards_to_clean.insert(self.shard_index(*source));
251        }
252
253        // Phase 3: Acquire shard locks in ascending order and perform cleanup
254        let mut guards: Vec<_> = shards_to_clean
255            .iter()
256            .map(|&idx| (idx, self.shards[idx].write()))
257            .collect();
258
259        // Phase 4: Clean up edges in all shards
260        for (shard_idx, guard) in &mut guards {
261            if *shard_idx == node_shard {
262                guard.remove_node_edges(node_id);
263            } else {
264                for (edge_id, target) in &outgoing_edges {
265                    if self.shard_index(*target) == *shard_idx {
266                        guard.remove_edge_incoming_only(*edge_id);
267                    }
268                }
269                for (edge_id, source) in &incoming_edges {
270                    if self.shard_index(*source) == *shard_idx {
271                        guard.remove_edge_outgoing_only(*edge_id);
272                    }
273                }
274            }
275        }
276
277        // Phase 5: Remove edge IDs from global registry
278        let mut removed: HashSet<u64> = HashSet::new();
279        for (edge_id, _) in &outgoing_edges {
280            if removed.insert(*edge_id) {
281                ids.remove(edge_id);
282            }
283        }
284        for (edge_id, _) in &incoming_edges {
285            if removed.insert(*edge_id) {
286                ids.remove(edge_id);
287            }
288        }
289    }
290}
291
292impl Default for ConcurrentEdgeStore {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298// Compile-time check: ConcurrentEdgeStore must be Send + Sync
299const _: () = {
300    const fn assert_send_sync<T: Send + Sync>() {}
301    assert_send_sync::<ConcurrentEdgeStore>();
302};