Skip to main content

graphos_core/index/
adjacency.rs

1//! Chunked adjacency lists with delta buffers.
2//!
3//! This is the primary edge storage structure, optimized for:
4//! - O(1) amortized edge insertion
5//! - Cache-friendly sequential scans
6//! - MVCC-compatible copy-on-write at chunk granularity
7//! - Optional backward adjacency for incoming edge queries
8
9use graphos_common::types::{EdgeId, NodeId};
10use graphos_common::utils::hash::{FxHashMap, FxHashSet};
11use parking_lot::RwLock;
12use smallvec::SmallVec;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Default chunk capacity (number of edges per chunk).
16const DEFAULT_CHUNK_CAPACITY: usize = 64;
17
18/// Threshold for delta buffer compaction.
19///
20/// Lower values reduce memory overhead and iteration cost for delta buffers,
21/// but increase compaction frequency. 64 provides a good balance for typical workloads.
22const DELTA_COMPACTION_THRESHOLD: usize = 64;
23
24/// A chunk of adjacency entries.
25#[derive(Debug, Clone)]
26struct AdjacencyChunk {
27    /// Destination node IDs.
28    destinations: Vec<NodeId>,
29    /// Edge IDs (parallel to destinations).
30    edge_ids: Vec<EdgeId>,
31    /// Capacity of this chunk.
32    capacity: usize,
33}
34
35impl AdjacencyChunk {
36    fn new(capacity: usize) -> Self {
37        Self {
38            destinations: Vec::with_capacity(capacity),
39            edge_ids: Vec::with_capacity(capacity),
40            capacity,
41        }
42    }
43
44    fn len(&self) -> usize {
45        self.destinations.len()
46    }
47
48    fn is_full(&self) -> bool {
49        self.destinations.len() >= self.capacity
50    }
51
52    fn push(&mut self, dst: NodeId, edge_id: EdgeId) -> bool {
53        if self.is_full() {
54            return false;
55        }
56        self.destinations.push(dst);
57        self.edge_ids.push(edge_id);
58        true
59    }
60
61    fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
62        self.destinations
63            .iter()
64            .copied()
65            .zip(self.edge_ids.iter().copied())
66    }
67}
68
69/// Adjacency list for a single node.
70#[derive(Debug)]
71struct AdjacencyList {
72    /// Compacted chunks of adjacency entries.
73    chunks: Vec<AdjacencyChunk>,
74    /// Delta buffer for recent insertions.
75    delta_inserts: SmallVec<[(NodeId, EdgeId); 8]>,
76    /// Set of deleted edge IDs.
77    deleted: FxHashSet<EdgeId>,
78}
79
80impl AdjacencyList {
81    fn new() -> Self {
82        Self {
83            chunks: Vec::new(),
84            delta_inserts: SmallVec::new(),
85            deleted: FxHashSet::default(),
86        }
87    }
88
89    fn add_edge(&mut self, dst: NodeId, edge_id: EdgeId) {
90        // Try to add to the last chunk
91        if let Some(last) = self.chunks.last_mut() {
92            if last.push(dst, edge_id) {
93                return;
94            }
95        }
96
97        // Add to delta buffer
98        self.delta_inserts.push((dst, edge_id));
99    }
100
101    fn mark_deleted(&mut self, edge_id: EdgeId) {
102        self.deleted.insert(edge_id);
103    }
104
105    fn compact(&mut self, chunk_capacity: usize) {
106        if self.delta_inserts.is_empty() {
107            return;
108        }
109
110        // Create new chunks from delta buffer
111        // Check if last chunk has room, and if so, pop it to continue filling
112        let last_has_room = self.chunks.last().is_some_and(|c| !c.is_full());
113        let mut current_chunk = if last_has_room {
114            // Invariant: is_some_and() returned true, so chunks is non-empty
115            self.chunks
116                .pop()
117                .expect("chunks is non-empty: is_some_and() succeeded on previous line")
118        } else {
119            AdjacencyChunk::new(chunk_capacity)
120        };
121
122        for (dst, edge_id) in self.delta_inserts.drain(..) {
123            if !current_chunk.push(dst, edge_id) {
124                self.chunks.push(current_chunk);
125                current_chunk = AdjacencyChunk::new(chunk_capacity);
126                current_chunk.push(dst, edge_id);
127            }
128        }
129
130        if current_chunk.len() > 0 {
131            self.chunks.push(current_chunk);
132        }
133    }
134
135    fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
136        let deleted = &self.deleted;
137
138        self.chunks
139            .iter()
140            .flat_map(|c| c.iter())
141            .chain(self.delta_inserts.iter().copied())
142            .filter(move |(_, edge_id)| !deleted.contains(edge_id))
143    }
144
145    fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
146        self.iter().map(|(dst, _)| dst)
147    }
148
149    fn degree(&self) -> usize {
150        self.iter().count()
151    }
152}
153
154/// Chunked adjacency lists with delta buffers.
155///
156/// This is the primary structure for storing edge connectivity.
157/// It supports efficient insertion, deletion (via tombstones),
158/// and sequential scanning.
159pub struct ChunkedAdjacency {
160    /// Adjacency lists indexed by source node.
161    lists: RwLock<FxHashMap<NodeId, AdjacencyList>>,
162    /// Chunk capacity for new chunks.
163    chunk_capacity: usize,
164    /// Total number of edges (including deleted).
165    edge_count: AtomicUsize,
166    /// Number of deleted edges.
167    deleted_count: AtomicUsize,
168}
169
170impl ChunkedAdjacency {
171    /// Creates a new chunked adjacency structure.
172    #[must_use]
173    pub fn new() -> Self {
174        Self::with_chunk_capacity(DEFAULT_CHUNK_CAPACITY)
175    }
176
177    /// Creates a new chunked adjacency with custom chunk capacity.
178    #[must_use]
179    pub fn with_chunk_capacity(capacity: usize) -> Self {
180        Self {
181            lists: RwLock::new(FxHashMap::default()),
182            chunk_capacity: capacity,
183            edge_count: AtomicUsize::new(0),
184            deleted_count: AtomicUsize::new(0),
185        }
186    }
187
188    /// Adds an edge from src to dst.
189    pub fn add_edge(&self, src: NodeId, dst: NodeId, edge_id: EdgeId) {
190        let mut lists = self.lists.write();
191        lists
192            .entry(src)
193            .or_insert_with(AdjacencyList::new)
194            .add_edge(dst, edge_id);
195        self.edge_count.fetch_add(1, Ordering::Relaxed);
196    }
197
198    /// Marks an edge as deleted.
199    pub fn mark_deleted(&self, src: NodeId, edge_id: EdgeId) {
200        let mut lists = self.lists.write();
201        if let Some(list) = lists.get_mut(&src) {
202            list.mark_deleted(edge_id);
203            self.deleted_count.fetch_add(1, Ordering::Relaxed);
204        }
205    }
206
207    /// Returns all neighbors of a node.
208    ///
209    /// Note: This allocates a Vec to collect neighbors while the internal lock
210    /// is held, then returns the Vec. For traversal performance, consider using
211    /// `edges_from` if you also need edge IDs, to avoid multiple lookups.
212    #[must_use]
213    pub fn neighbors(&self, src: NodeId) -> Vec<NodeId> {
214        let lists = self.lists.read();
215        lists
216            .get(&src)
217            .map(|list| list.neighbors().collect())
218            .unwrap_or_default()
219    }
220
221    /// Returns all (neighbor, edge_id) pairs for outgoing edges from a node.
222    ///
223    /// Note: This allocates a Vec to collect edges while the internal lock
224    /// is held, then returns the Vec. This is intentional to avoid holding
225    /// the lock across iteration.
226    #[must_use]
227    pub fn edges_from(&self, src: NodeId) -> Vec<(NodeId, EdgeId)> {
228        let lists = self.lists.read();
229        lists
230            .get(&src)
231            .map(|list| list.iter().collect())
232            .unwrap_or_default()
233    }
234
235    /// Returns the out-degree of a node.
236    pub fn out_degree(&self, src: NodeId) -> usize {
237        let lists = self.lists.read();
238        lists.get(&src).map_or(0, |list| list.degree())
239    }
240
241    /// Compacts all adjacency lists.
242    pub fn compact(&self) {
243        let mut lists = self.lists.write();
244        for list in lists.values_mut() {
245            list.compact(self.chunk_capacity);
246        }
247    }
248
249    /// Compacts delta buffers that exceed the threshold.
250    pub fn compact_if_needed(&self) {
251        let mut lists = self.lists.write();
252        for list in lists.values_mut() {
253            if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
254                list.compact(self.chunk_capacity);
255            }
256        }
257    }
258
259    /// Returns the total number of edges (including deleted).
260    pub fn total_edge_count(&self) -> usize {
261        self.edge_count.load(Ordering::Relaxed)
262    }
263
264    /// Returns the number of active (non-deleted) edges.
265    pub fn active_edge_count(&self) -> usize {
266        self.edge_count.load(Ordering::Relaxed) - self.deleted_count.load(Ordering::Relaxed)
267    }
268
269    /// Returns the number of nodes with adjacency lists.
270    pub fn node_count(&self) -> usize {
271        self.lists.read().len()
272    }
273
274    /// Clears all adjacency lists.
275    pub fn clear(&self) {
276        let mut lists = self.lists.write();
277        lists.clear();
278        self.edge_count.store(0, Ordering::Relaxed);
279        self.deleted_count.store(0, Ordering::Relaxed);
280    }
281}
282
283impl Default for ChunkedAdjacency {
284    fn default() -> Self {
285        Self::new()
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_basic_adjacency() {
295        let adj = ChunkedAdjacency::new();
296
297        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
298        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
299        adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
300
301        let neighbors = adj.neighbors(NodeId::new(0));
302        assert_eq!(neighbors.len(), 3);
303        assert!(neighbors.contains(&NodeId::new(1)));
304        assert!(neighbors.contains(&NodeId::new(2)));
305        assert!(neighbors.contains(&NodeId::new(3)));
306    }
307
308    #[test]
309    fn test_out_degree() {
310        let adj = ChunkedAdjacency::new();
311
312        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
313        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
314
315        assert_eq!(adj.out_degree(NodeId::new(0)), 2);
316        assert_eq!(adj.out_degree(NodeId::new(1)), 0);
317    }
318
319    #[test]
320    fn test_mark_deleted() {
321        let adj = ChunkedAdjacency::new();
322
323        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
324        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
325
326        adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
327
328        let neighbors = adj.neighbors(NodeId::new(0));
329        assert_eq!(neighbors.len(), 1);
330        assert!(neighbors.contains(&NodeId::new(2)));
331    }
332
333    #[test]
334    fn test_edges_from() {
335        let adj = ChunkedAdjacency::new();
336
337        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
338        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
339
340        let edges = adj.edges_from(NodeId::new(0));
341        assert_eq!(edges.len(), 2);
342        assert!(edges.contains(&(NodeId::new(1), EdgeId::new(10))));
343        assert!(edges.contains(&(NodeId::new(2), EdgeId::new(20))));
344    }
345
346    #[test]
347    fn test_compaction() {
348        let adj = ChunkedAdjacency::with_chunk_capacity(4);
349
350        // Add more edges than chunk capacity
351        for i in 0..10 {
352            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
353        }
354
355        adj.compact();
356
357        // All edges should still be accessible
358        let neighbors = adj.neighbors(NodeId::new(0));
359        assert_eq!(neighbors.len(), 10);
360    }
361
362    #[test]
363    fn test_edge_counts() {
364        let adj = ChunkedAdjacency::new();
365
366        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
367        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
368        adj.add_edge(NodeId::new(1), NodeId::new(2), EdgeId::new(2));
369
370        assert_eq!(adj.total_edge_count(), 3);
371        assert_eq!(adj.active_edge_count(), 3);
372
373        adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
374
375        assert_eq!(adj.total_edge_count(), 3);
376        assert_eq!(adj.active_edge_count(), 2);
377    }
378
379    #[test]
380    fn test_clear() {
381        let adj = ChunkedAdjacency::new();
382
383        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
384        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
385
386        adj.clear();
387
388        assert_eq!(adj.total_edge_count(), 0);
389        assert_eq!(adj.node_count(), 0);
390    }
391}