1use graphos_common::types::{EdgeId, NodeId};
10use graphos_common::utils::hash::{FxHashMap, FxHashSet};
11use parking_lot::RwLock;
12use smallvec::SmallVec;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15const DEFAULT_CHUNK_CAPACITY: usize = 64;
17
18const DELTA_COMPACTION_THRESHOLD: usize = 64;
23
24#[derive(Debug, Clone)]
26struct AdjacencyChunk {
27 destinations: Vec<NodeId>,
29 edge_ids: Vec<EdgeId>,
31 capacity: usize,
33}
34
35impl AdjacencyChunk {
36 fn new(capacity: usize) -> Self {
37 Self {
38 destinations: Vec::with_capacity(capacity),
39 edge_ids: Vec::with_capacity(capacity),
40 capacity,
41 }
42 }
43
44 fn len(&self) -> usize {
45 self.destinations.len()
46 }
47
48 fn is_full(&self) -> bool {
49 self.destinations.len() >= self.capacity
50 }
51
52 fn push(&mut self, dst: NodeId, edge_id: EdgeId) -> bool {
53 if self.is_full() {
54 return false;
55 }
56 self.destinations.push(dst);
57 self.edge_ids.push(edge_id);
58 true
59 }
60
61 fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
62 self.destinations
63 .iter()
64 .copied()
65 .zip(self.edge_ids.iter().copied())
66 }
67}
68
69#[derive(Debug)]
71struct AdjacencyList {
72 chunks: Vec<AdjacencyChunk>,
74 delta_inserts: SmallVec<[(NodeId, EdgeId); 8]>,
76 deleted: FxHashSet<EdgeId>,
78}
79
80impl AdjacencyList {
81 fn new() -> Self {
82 Self {
83 chunks: Vec::new(),
84 delta_inserts: SmallVec::new(),
85 deleted: FxHashSet::default(),
86 }
87 }
88
89 fn add_edge(&mut self, dst: NodeId, edge_id: EdgeId) {
90 if let Some(last) = self.chunks.last_mut() {
92 if last.push(dst, edge_id) {
93 return;
94 }
95 }
96
97 self.delta_inserts.push((dst, edge_id));
99 }
100
101 fn mark_deleted(&mut self, edge_id: EdgeId) {
102 self.deleted.insert(edge_id);
103 }
104
105 fn compact(&mut self, chunk_capacity: usize) {
106 if self.delta_inserts.is_empty() {
107 return;
108 }
109
110 let last_has_room = self.chunks.last().is_some_and(|c| !c.is_full());
113 let mut current_chunk = if last_has_room {
114 self.chunks
116 .pop()
117 .expect("chunks is non-empty: is_some_and() succeeded on previous line")
118 } else {
119 AdjacencyChunk::new(chunk_capacity)
120 };
121
122 for (dst, edge_id) in self.delta_inserts.drain(..) {
123 if !current_chunk.push(dst, edge_id) {
124 self.chunks.push(current_chunk);
125 current_chunk = AdjacencyChunk::new(chunk_capacity);
126 current_chunk.push(dst, edge_id);
127 }
128 }
129
130 if current_chunk.len() > 0 {
131 self.chunks.push(current_chunk);
132 }
133 }
134
135 fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
136 let deleted = &self.deleted;
137
138 self.chunks
139 .iter()
140 .flat_map(|c| c.iter())
141 .chain(self.delta_inserts.iter().copied())
142 .filter(move |(_, edge_id)| !deleted.contains(edge_id))
143 }
144
145 fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
146 self.iter().map(|(dst, _)| dst)
147 }
148
149 fn degree(&self) -> usize {
150 self.iter().count()
151 }
152}
153
154pub struct ChunkedAdjacency {
160 lists: RwLock<FxHashMap<NodeId, AdjacencyList>>,
162 chunk_capacity: usize,
164 edge_count: AtomicUsize,
166 deleted_count: AtomicUsize,
168}
169
170impl ChunkedAdjacency {
171 #[must_use]
173 pub fn new() -> Self {
174 Self::with_chunk_capacity(DEFAULT_CHUNK_CAPACITY)
175 }
176
177 #[must_use]
179 pub fn with_chunk_capacity(capacity: usize) -> Self {
180 Self {
181 lists: RwLock::new(FxHashMap::default()),
182 chunk_capacity: capacity,
183 edge_count: AtomicUsize::new(0),
184 deleted_count: AtomicUsize::new(0),
185 }
186 }
187
188 pub fn add_edge(&self, src: NodeId, dst: NodeId, edge_id: EdgeId) {
190 let mut lists = self.lists.write();
191 lists
192 .entry(src)
193 .or_insert_with(AdjacencyList::new)
194 .add_edge(dst, edge_id);
195 self.edge_count.fetch_add(1, Ordering::Relaxed);
196 }
197
198 pub fn mark_deleted(&self, src: NodeId, edge_id: EdgeId) {
200 let mut lists = self.lists.write();
201 if let Some(list) = lists.get_mut(&src) {
202 list.mark_deleted(edge_id);
203 self.deleted_count.fetch_add(1, Ordering::Relaxed);
204 }
205 }
206
207 #[must_use]
213 pub fn neighbors(&self, src: NodeId) -> Vec<NodeId> {
214 let lists = self.lists.read();
215 lists
216 .get(&src)
217 .map(|list| list.neighbors().collect())
218 .unwrap_or_default()
219 }
220
221 #[must_use]
227 pub fn edges_from(&self, src: NodeId) -> Vec<(NodeId, EdgeId)> {
228 let lists = self.lists.read();
229 lists
230 .get(&src)
231 .map(|list| list.iter().collect())
232 .unwrap_or_default()
233 }
234
235 pub fn out_degree(&self, src: NodeId) -> usize {
237 let lists = self.lists.read();
238 lists.get(&src).map_or(0, |list| list.degree())
239 }
240
241 pub fn compact(&self) {
243 let mut lists = self.lists.write();
244 for list in lists.values_mut() {
245 list.compact(self.chunk_capacity);
246 }
247 }
248
249 pub fn compact_if_needed(&self) {
251 let mut lists = self.lists.write();
252 for list in lists.values_mut() {
253 if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
254 list.compact(self.chunk_capacity);
255 }
256 }
257 }
258
259 pub fn total_edge_count(&self) -> usize {
261 self.edge_count.load(Ordering::Relaxed)
262 }
263
264 pub fn active_edge_count(&self) -> usize {
266 self.edge_count.load(Ordering::Relaxed) - self.deleted_count.load(Ordering::Relaxed)
267 }
268
269 pub fn node_count(&self) -> usize {
271 self.lists.read().len()
272 }
273
274 pub fn clear(&self) {
276 let mut lists = self.lists.write();
277 lists.clear();
278 self.edge_count.store(0, Ordering::Relaxed);
279 self.deleted_count.store(0, Ordering::Relaxed);
280 }
281}
282
283impl Default for ChunkedAdjacency {
284 fn default() -> Self {
285 Self::new()
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_basic_adjacency() {
295 let adj = ChunkedAdjacency::new();
296
297 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
298 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
299 adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
300
301 let neighbors = adj.neighbors(NodeId::new(0));
302 assert_eq!(neighbors.len(), 3);
303 assert!(neighbors.contains(&NodeId::new(1)));
304 assert!(neighbors.contains(&NodeId::new(2)));
305 assert!(neighbors.contains(&NodeId::new(3)));
306 }
307
308 #[test]
309 fn test_out_degree() {
310 let adj = ChunkedAdjacency::new();
311
312 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
313 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
314
315 assert_eq!(adj.out_degree(NodeId::new(0)), 2);
316 assert_eq!(adj.out_degree(NodeId::new(1)), 0);
317 }
318
319 #[test]
320 fn test_mark_deleted() {
321 let adj = ChunkedAdjacency::new();
322
323 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
324 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
325
326 adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
327
328 let neighbors = adj.neighbors(NodeId::new(0));
329 assert_eq!(neighbors.len(), 1);
330 assert!(neighbors.contains(&NodeId::new(2)));
331 }
332
333 #[test]
334 fn test_edges_from() {
335 let adj = ChunkedAdjacency::new();
336
337 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
338 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
339
340 let edges = adj.edges_from(NodeId::new(0));
341 assert_eq!(edges.len(), 2);
342 assert!(edges.contains(&(NodeId::new(1), EdgeId::new(10))));
343 assert!(edges.contains(&(NodeId::new(2), EdgeId::new(20))));
344 }
345
346 #[test]
347 fn test_compaction() {
348 let adj = ChunkedAdjacency::with_chunk_capacity(4);
349
350 for i in 0..10 {
352 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
353 }
354
355 adj.compact();
356
357 let neighbors = adj.neighbors(NodeId::new(0));
359 assert_eq!(neighbors.len(), 10);
360 }
361
362 #[test]
363 fn test_edge_counts() {
364 let adj = ChunkedAdjacency::new();
365
366 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
367 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
368 adj.add_edge(NodeId::new(1), NodeId::new(2), EdgeId::new(2));
369
370 assert_eq!(adj.total_edge_count(), 3);
371 assert_eq!(adj.active_edge_count(), 3);
372
373 adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
374
375 assert_eq!(adj.total_edge_count(), 3);
376 assert_eq!(adj.active_edge_count(), 2);
377 }
378
379 #[test]
380 fn test_clear() {
381 let adj = ChunkedAdjacency::new();
382
383 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
384 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
385
386 adj.clear();
387
388 assert_eq!(adj.total_edge_count(), 0);
389 assert_eq!(adj.node_count(), 0);
390 }
391}