1use crate::storage::{BitPackedInts, DeltaBitPacked};
13use grafeo_common::types::{EdgeId, NodeId};
14use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
15use parking_lot::RwLock;
16use smallvec::SmallVec;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19const DEFAULT_CHUNK_CAPACITY: usize = 64;
21
22const DELTA_COMPACTION_THRESHOLD: usize = 64;
27
28const COLD_COMPRESSION_THRESHOLD: usize = 4;
34
35#[derive(Debug, Clone)]
37struct AdjacencyChunk {
38 destinations: Vec<NodeId>,
40 edge_ids: Vec<EdgeId>,
42 capacity: usize,
44}
45
46impl AdjacencyChunk {
47 fn new(capacity: usize) -> Self {
48 Self {
49 destinations: Vec::with_capacity(capacity),
50 edge_ids: Vec::with_capacity(capacity),
51 capacity,
52 }
53 }
54
55 fn len(&self) -> usize {
56 self.destinations.len()
57 }
58
59 fn is_full(&self) -> bool {
60 self.destinations.len() >= self.capacity
61 }
62
63 fn push(&mut self, dst: NodeId, edge_id: EdgeId) -> bool {
64 if self.is_full() {
65 return false;
66 }
67 self.destinations.push(dst);
68 self.edge_ids.push(edge_id);
69 true
70 }
71
72 fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
73 self.destinations
74 .iter()
75 .copied()
76 .zip(self.edge_ids.iter().copied())
77 }
78
79 #[allow(dead_code)]
84 fn compress(&self) -> CompressedAdjacencyChunk {
85 let mut entries: Vec<_> = self
87 .destinations
88 .iter()
89 .copied()
90 .zip(self.edge_ids.iter().copied())
91 .collect();
92 entries.sort_by_key(|(dst, _)| dst.as_u64());
93
94 let sorted_dsts: Vec<u64> = entries.iter().map(|(d, _)| d.as_u64()).collect();
96 let sorted_edges: Vec<u64> = entries.iter().map(|(_, e)| e.as_u64()).collect();
97
98 CompressedAdjacencyChunk {
99 destinations: DeltaBitPacked::encode(&sorted_dsts),
100 edge_ids: BitPackedInts::pack(&sorted_edges),
101 count: entries.len(),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
112struct CompressedAdjacencyChunk {
113 destinations: DeltaBitPacked,
115 edge_ids: BitPackedInts,
117 count: usize,
119}
120
121impl CompressedAdjacencyChunk {
122 fn len(&self) -> usize {
124 self.count
125 }
126
127 #[allow(dead_code)]
129 fn is_empty(&self) -> bool {
130 self.count == 0
131 }
132
133 fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
135 let dsts = self.destinations.decode();
136 let edges = self.edge_ids.unpack();
137
138 dsts.into_iter()
139 .zip(edges)
140 .map(|(d, e)| (NodeId::new(d), EdgeId::new(e)))
141 }
142
143 #[allow(dead_code)]
145 fn memory_size(&self) -> usize {
146 let dest_size = 8 + self.destinations.to_bytes().len();
149 let edge_size = self.edge_ids.data().len() * 8;
150 dest_size + edge_size
151 }
152
153 #[allow(dead_code)]
155 fn compression_ratio(&self) -> f64 {
156 if self.count == 0 {
157 return 1.0;
158 }
159 let uncompressed = self.count * 16; let compressed = self.memory_size();
161 if compressed == 0 {
162 return f64::INFINITY;
163 }
164 uncompressed as f64 / compressed as f64
165 }
166}
167
168#[derive(Debug)]
175struct AdjacencyList {
176 hot_chunks: Vec<AdjacencyChunk>,
178 cold_chunks: Vec<CompressedAdjacencyChunk>,
180 delta_inserts: SmallVec<[(NodeId, EdgeId); 8]>,
182 deleted: FxHashSet<EdgeId>,
184}
185
186impl AdjacencyList {
187 fn new() -> Self {
188 Self {
189 hot_chunks: Vec::new(),
190 cold_chunks: Vec::new(),
191 delta_inserts: SmallVec::new(),
192 deleted: FxHashSet::default(),
193 }
194 }
195
196 fn add_edge(&mut self, dst: NodeId, edge_id: EdgeId) {
197 if let Some(last) = self.hot_chunks.last_mut() {
199 if last.push(dst, edge_id) {
200 return;
201 }
202 }
203
204 self.delta_inserts.push((dst, edge_id));
206 }
207
208 fn mark_deleted(&mut self, edge_id: EdgeId) {
209 self.deleted.insert(edge_id);
210 }
211
212 fn compact(&mut self, chunk_capacity: usize) {
213 if self.delta_inserts.is_empty() {
214 return;
215 }
216
217 let last_has_room = self.hot_chunks.last().is_some_and(|c| !c.is_full());
220 let mut current_chunk = if last_has_room {
221 self.hot_chunks
223 .pop()
224 .expect("hot_chunks is non-empty: is_some_and() succeeded on previous line")
225 } else {
226 AdjacencyChunk::new(chunk_capacity)
227 };
228
229 for (dst, edge_id) in self.delta_inserts.drain(..) {
230 if !current_chunk.push(dst, edge_id) {
231 self.hot_chunks.push(current_chunk);
232 current_chunk = AdjacencyChunk::new(chunk_capacity);
233 current_chunk.push(dst, edge_id);
234 }
235 }
236
237 if current_chunk.len() > 0 {
238 self.hot_chunks.push(current_chunk);
239 }
240
241 self.maybe_compress_to_cold();
243 }
244
245 fn maybe_compress_to_cold(&mut self) {
247 while self.hot_chunks.len() > COLD_COMPRESSION_THRESHOLD {
249 let oldest = self.hot_chunks.remove(0);
251
252 if oldest.len() == 0 {
254 continue;
255 }
256
257 let compressed = oldest.compress();
259 self.cold_chunks.push(compressed);
260 }
261 }
262
263 #[allow(dead_code)]
267 fn freeze_all(&mut self) {
268 for chunk in self.hot_chunks.drain(..) {
269 if chunk.len() > 0 {
270 self.cold_chunks.push(chunk.compress());
271 }
272 }
273 }
274
275 fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
276 let deleted = &self.deleted;
277
278 let cold_iter = self.cold_chunks.iter().flat_map(|c| c.iter());
280
281 let hot_iter = self.hot_chunks.iter().flat_map(|c| c.iter());
283
284 let delta_iter = self.delta_inserts.iter().copied();
286
287 cold_iter
288 .chain(hot_iter)
289 .chain(delta_iter)
290 .filter(move |(_, edge_id)| !deleted.contains(edge_id))
291 }
292
293 fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
294 self.iter().map(|(dst, _)| dst)
295 }
296
297 fn degree(&self) -> usize {
298 self.iter().count()
299 }
300
301 #[allow(dead_code)]
303 fn hot_count(&self) -> usize {
304 self.hot_chunks.iter().map(|c| c.len()).sum::<usize>() + self.delta_inserts.len()
305 }
306
307 #[allow(dead_code)]
309 fn cold_count(&self) -> usize {
310 self.cold_chunks.iter().map(|c| c.len()).sum()
311 }
312
313 #[allow(dead_code)]
315 fn memory_size(&self) -> usize {
316 let hot_size = self.hot_chunks.iter().map(|c| c.len() * 16).sum::<usize>();
318
319 let cold_size = self
321 .cold_chunks
322 .iter()
323 .map(|c| c.memory_size())
324 .sum::<usize>();
325
326 let delta_size = self.delta_inserts.len() * 16;
328
329 let deleted_size = self.deleted.len() * 16;
331
332 hot_size + cold_size + delta_size + deleted_size
333 }
334
335 #[allow(dead_code)]
337 fn cold_compression_ratio(&self) -> f64 {
338 let total_cold_entries: usize = self.cold_chunks.iter().map(|c| c.len()).sum();
339 if total_cold_entries == 0 {
340 return 1.0;
341 }
342
343 let uncompressed_size = total_cold_entries * 16;
344 let compressed_size: usize = self.cold_chunks.iter().map(|c| c.memory_size()).sum();
345
346 if compressed_size == 0 {
347 return f64::INFINITY;
348 }
349
350 uncompressed_size as f64 / compressed_size as f64
351 }
352}
353
354pub struct ChunkedAdjacency {
378 lists: RwLock<FxHashMap<NodeId, AdjacencyList>>,
380 chunk_capacity: usize,
382 edge_count: AtomicUsize,
384 deleted_count: AtomicUsize,
386}
387
388impl ChunkedAdjacency {
389 #[must_use]
391 pub fn new() -> Self {
392 Self::with_chunk_capacity(DEFAULT_CHUNK_CAPACITY)
393 }
394
395 #[must_use]
397 pub fn with_chunk_capacity(capacity: usize) -> Self {
398 Self {
399 lists: RwLock::new(FxHashMap::default()),
400 chunk_capacity: capacity,
401 edge_count: AtomicUsize::new(0),
402 deleted_count: AtomicUsize::new(0),
403 }
404 }
405
406 pub fn add_edge(&self, src: NodeId, dst: NodeId, edge_id: EdgeId) {
408 let mut lists = self.lists.write();
409 lists
410 .entry(src)
411 .or_insert_with(AdjacencyList::new)
412 .add_edge(dst, edge_id);
413 self.edge_count.fetch_add(1, Ordering::Relaxed);
414 }
415
416 pub fn mark_deleted(&self, src: NodeId, edge_id: EdgeId) {
418 let mut lists = self.lists.write();
419 if let Some(list) = lists.get_mut(&src) {
420 list.mark_deleted(edge_id);
421 self.deleted_count.fetch_add(1, Ordering::Relaxed);
422 }
423 }
424
425 #[must_use]
431 pub fn neighbors(&self, src: NodeId) -> Vec<NodeId> {
432 let lists = self.lists.read();
433 lists
434 .get(&src)
435 .map(|list| list.neighbors().collect())
436 .unwrap_or_default()
437 }
438
439 #[must_use]
445 pub fn edges_from(&self, src: NodeId) -> Vec<(NodeId, EdgeId)> {
446 let lists = self.lists.read();
447 lists
448 .get(&src)
449 .map(|list| list.iter().collect())
450 .unwrap_or_default()
451 }
452
453 pub fn out_degree(&self, src: NodeId) -> usize {
455 let lists = self.lists.read();
456 lists.get(&src).map_or(0, |list| list.degree())
457 }
458
459 pub fn compact(&self) {
461 let mut lists = self.lists.write();
462 for list in lists.values_mut() {
463 list.compact(self.chunk_capacity);
464 }
465 }
466
467 pub fn compact_if_needed(&self) {
469 let mut lists = self.lists.write();
470 for list in lists.values_mut() {
471 if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
472 list.compact(self.chunk_capacity);
473 }
474 }
475 }
476
477 pub fn total_edge_count(&self) -> usize {
479 self.edge_count.load(Ordering::Relaxed)
480 }
481
482 pub fn active_edge_count(&self) -> usize {
484 self.edge_count.load(Ordering::Relaxed) - self.deleted_count.load(Ordering::Relaxed)
485 }
486
487 pub fn node_count(&self) -> usize {
489 self.lists.read().len()
490 }
491
492 pub fn clear(&self) {
494 let mut lists = self.lists.write();
495 lists.clear();
496 self.edge_count.store(0, Ordering::Relaxed);
497 self.deleted_count.store(0, Ordering::Relaxed);
498 }
499
500 #[must_use]
502 pub fn memory_stats(&self) -> AdjacencyMemoryStats {
503 let lists = self.lists.read();
504
505 let mut hot_entries = 0usize;
506 let mut cold_entries = 0usize;
507 let mut hot_bytes = 0usize;
508 let mut cold_bytes = 0usize;
509
510 for list in lists.values() {
511 hot_entries += list.hot_count();
512 cold_entries += list.cold_count();
513
514 hot_bytes += list.hot_count() * 16;
516
517 for cold_chunk in &list.cold_chunks {
519 cold_bytes += cold_chunk.memory_size();
520 }
521 }
522
523 AdjacencyMemoryStats {
524 hot_entries,
525 cold_entries,
526 hot_bytes,
527 cold_bytes,
528 node_count: lists.len(),
529 }
530 }
531
532 pub fn freeze_all(&self) {
536 let mut lists = self.lists.write();
537 for list in lists.values_mut() {
538 list.freeze_all();
539 }
540 }
541}
542
543#[derive(Debug, Clone)]
545pub struct AdjacencyMemoryStats {
546 pub hot_entries: usize,
548 pub cold_entries: usize,
550 pub hot_bytes: usize,
552 pub cold_bytes: usize,
554 pub node_count: usize,
556}
557
558impl AdjacencyMemoryStats {
559 #[must_use]
561 pub fn total_entries(&self) -> usize {
562 self.hot_entries + self.cold_entries
563 }
564
565 #[must_use]
567 pub fn total_bytes(&self) -> usize {
568 self.hot_bytes + self.cold_bytes
569 }
570
571 #[must_use]
575 pub fn cold_compression_ratio(&self) -> f64 {
576 if self.cold_entries == 0 || self.cold_bytes == 0 {
577 return 1.0;
578 }
579 let uncompressed = self.cold_entries * 16;
580 uncompressed as f64 / self.cold_bytes as f64
581 }
582
583 #[must_use]
585 pub fn overall_compression_ratio(&self) -> f64 {
586 let total_entries = self.total_entries();
587 if total_entries == 0 || self.total_bytes() == 0 {
588 return 1.0;
589 }
590 let uncompressed = total_entries * 16;
591 uncompressed as f64 / self.total_bytes() as f64
592 }
593}
594
595impl Default for ChunkedAdjacency {
596 fn default() -> Self {
597 Self::new()
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn test_basic_adjacency() {
607 let adj = ChunkedAdjacency::new();
608
609 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
610 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
611 adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
612
613 let neighbors = adj.neighbors(NodeId::new(0));
614 assert_eq!(neighbors.len(), 3);
615 assert!(neighbors.contains(&NodeId::new(1)));
616 assert!(neighbors.contains(&NodeId::new(2)));
617 assert!(neighbors.contains(&NodeId::new(3)));
618 }
619
620 #[test]
621 fn test_out_degree() {
622 let adj = ChunkedAdjacency::new();
623
624 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
625 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
626
627 assert_eq!(adj.out_degree(NodeId::new(0)), 2);
628 assert_eq!(adj.out_degree(NodeId::new(1)), 0);
629 }
630
631 #[test]
632 fn test_mark_deleted() {
633 let adj = ChunkedAdjacency::new();
634
635 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
636 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
637
638 adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
639
640 let neighbors = adj.neighbors(NodeId::new(0));
641 assert_eq!(neighbors.len(), 1);
642 assert!(neighbors.contains(&NodeId::new(2)));
643 }
644
645 #[test]
646 fn test_edges_from() {
647 let adj = ChunkedAdjacency::new();
648
649 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
650 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
651
652 let edges = adj.edges_from(NodeId::new(0));
653 assert_eq!(edges.len(), 2);
654 assert!(edges.contains(&(NodeId::new(1), EdgeId::new(10))));
655 assert!(edges.contains(&(NodeId::new(2), EdgeId::new(20))));
656 }
657
658 #[test]
659 fn test_compaction() {
660 let adj = ChunkedAdjacency::with_chunk_capacity(4);
661
662 for i in 0..10 {
664 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
665 }
666
667 adj.compact();
668
669 let neighbors = adj.neighbors(NodeId::new(0));
671 assert_eq!(neighbors.len(), 10);
672 }
673
674 #[test]
675 fn test_edge_counts() {
676 let adj = ChunkedAdjacency::new();
677
678 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
679 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
680 adj.add_edge(NodeId::new(1), NodeId::new(2), EdgeId::new(2));
681
682 assert_eq!(adj.total_edge_count(), 3);
683 assert_eq!(adj.active_edge_count(), 3);
684
685 adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
686
687 assert_eq!(adj.total_edge_count(), 3);
688 assert_eq!(adj.active_edge_count(), 2);
689 }
690
691 #[test]
692 fn test_clear() {
693 let adj = ChunkedAdjacency::new();
694
695 adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
696 adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
697
698 adj.clear();
699
700 assert_eq!(adj.total_edge_count(), 0);
701 assert_eq!(adj.node_count(), 0);
702 }
703
704 #[test]
705 fn test_chunk_compression() {
706 let mut chunk = AdjacencyChunk::new(64);
708
709 for i in 0..20 {
711 chunk.push(NodeId::new(100 + i * 5), EdgeId::new(1000 + i));
712 }
713
714 let compressed = chunk.compress();
716
717 assert_eq!(compressed.len(), 20);
719
720 let entries: Vec<_> = compressed.iter().collect();
722 assert_eq!(entries.len(), 20);
723
724 for window in entries.windows(2) {
727 assert!(window[0].0.as_u64() <= window[1].0.as_u64());
728 }
729
730 let original_dsts: std::collections::HashSet<_> =
732 (0..20).map(|i| NodeId::new(100 + i * 5)).collect();
733 let compressed_dsts: std::collections::HashSet<_> =
734 entries.iter().map(|(d, _)| *d).collect();
735 assert_eq!(original_dsts, compressed_dsts);
736
737 let ratio = compressed.compression_ratio();
739 assert!(
740 ratio > 1.0,
741 "Expected compression ratio > 1.0, got {}",
742 ratio
743 );
744 }
745
746 #[test]
747 fn test_empty_chunk_compression() {
748 let chunk = AdjacencyChunk::new(64);
749 let compressed = chunk.compress();
750
751 assert_eq!(compressed.len(), 0);
752 assert!(compressed.is_empty());
753 assert_eq!(compressed.iter().count(), 0);
754 }
755
756 #[test]
757 fn test_hot_to_cold_migration() {
758 let adj = ChunkedAdjacency::with_chunk_capacity(8);
760
761 for i in 0..100 {
765 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
766 }
767
768 adj.compact();
770
771 let neighbors = adj.neighbors(NodeId::new(0));
773 assert_eq!(neighbors.len(), 100);
774
775 let stats = adj.memory_stats();
777 assert_eq!(stats.total_entries(), 100);
778
779 assert!(
782 stats.cold_entries > 0,
783 "Expected some cold entries, got {}",
784 stats.cold_entries
785 );
786 }
787
788 #[test]
789 fn test_memory_stats() {
790 let adj = ChunkedAdjacency::with_chunk_capacity(8);
791
792 for i in 0..20 {
794 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
795 }
796
797 adj.compact();
798
799 let stats = adj.memory_stats();
800 assert_eq!(stats.total_entries(), 20);
801 assert_eq!(stats.node_count, 1);
802 assert!(stats.total_bytes() > 0);
803 }
804
805 #[test]
806 fn test_freeze_all() {
807 let adj = ChunkedAdjacency::with_chunk_capacity(8);
808
809 for i in 0..30 {
811 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
812 }
813
814 adj.compact();
815
816 let before = adj.memory_stats();
818
819 adj.freeze_all();
821
822 let after = adj.memory_stats();
824
825 assert_eq!(after.hot_entries, 0);
827 assert_eq!(after.cold_entries, before.total_entries());
828
829 let neighbors = adj.neighbors(NodeId::new(0));
831 assert_eq!(neighbors.len(), 30);
832 }
833
834 #[test]
835 fn test_cold_compression_ratio() {
836 let adj = ChunkedAdjacency::with_chunk_capacity(8);
837
838 for i in 0..200 {
840 adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
841 }
842
843 adj.compact();
844 adj.freeze_all();
845
846 let stats = adj.memory_stats();
847
848 let ratio = stats.cold_compression_ratio();
850 assert!(
851 ratio > 1.5,
852 "Expected cold compression ratio > 1.5, got {}",
853 ratio
854 );
855 }
856
857 #[test]
858 fn test_deleted_edges_with_cold_storage() {
859 let adj = ChunkedAdjacency::with_chunk_capacity(8);
860
861 for i in 0..50 {
863 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
864 }
865
866 adj.compact();
867
868 for i in (0..50).step_by(2) {
870 adj.mark_deleted(NodeId::new(0), EdgeId::new(i));
871 }
872
873 let neighbors = adj.neighbors(NodeId::new(0));
875 assert_eq!(neighbors.len(), 25);
876
877 for neighbor in neighbors {
879 assert!(neighbor.as_u64() % 2 == 0); }
881 }
882
883 #[test]
884 fn test_adjacency_list_memory_size() {
885 let mut list = AdjacencyList::new();
886
887 for i in 0..50 {
889 list.add_edge(NodeId::new(i + 1), EdgeId::new(i));
890 }
891
892 list.compact(8);
894
895 let size = list.memory_size();
896 assert!(size > 0);
897
898 assert!(size <= 50 * 16 + 200); }
902
903 #[test]
904 fn test_cold_iteration_order() {
905 let adj = ChunkedAdjacency::with_chunk_capacity(8);
906
907 for i in 0..50 {
909 adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
910 }
911
912 adj.compact();
913
914 let edges = adj.edges_from(NodeId::new(0));
916
917 assert_eq!(edges.len(), 50);
919
920 let edge_ids: std::collections::HashSet<_> = edges.iter().map(|(_, e)| *e).collect();
922 for i in 0..50 {
923 assert!(edge_ids.contains(&EdgeId::new(i)));
924 }
925 }
926}