Skip to main content

grafeo_core/index/
adjacency.rs

1//! Chunked adjacency lists - the core data structure for graph traversal.
2//!
3//! Every graph database needs fast neighbor lookups. This implementation uses
4//! chunked storage with delta buffers, giving you:
5//!
6//! - **O(1) amortized inserts** - new edges go into a delta buffer
7//! - **Cache-friendly scans** - chunks are sized for L1/L2 cache
8//! - **Soft deletes** - deletions don't require recompaction
9//! - **Concurrent reads** - RwLock allows many simultaneous traversals
10//! - **Compression** - cold chunks can be compressed using DeltaBitPacked
11
12use crate::storage::{BitPackedInts, DeltaBitPacked};
13use grafeo_common::types::{EdgeId, NodeId};
14use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
15use parking_lot::RwLock;
16use smallvec::SmallVec;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19/// Default chunk capacity (number of edges per chunk).
20const DEFAULT_CHUNK_CAPACITY: usize = 64;
21
22/// Threshold for delta buffer compaction.
23///
24/// Lower values reduce memory overhead and iteration cost for delta buffers,
25/// but increase compaction frequency. 64 provides a good balance for typical workloads.
26const DELTA_COMPACTION_THRESHOLD: usize = 64;
27
28/// Threshold for cold chunk compression.
29///
30/// When the number of hot chunks exceeds this threshold, the oldest hot chunks
31/// are compressed and moved to cold storage. This balances memory usage with
32/// the cost of compression/decompression.
33const COLD_COMPRESSION_THRESHOLD: usize = 4;
34
35/// A chunk of adjacency entries.
36#[derive(Debug, Clone)]
37struct AdjacencyChunk {
38    /// Destination node IDs.
39    destinations: Vec<NodeId>,
40    /// Edge IDs (parallel to destinations).
41    edge_ids: Vec<EdgeId>,
42    /// Capacity of this chunk.
43    capacity: usize,
44}
45
46impl AdjacencyChunk {
47    fn new(capacity: usize) -> Self {
48        Self {
49            destinations: Vec::with_capacity(capacity),
50            edge_ids: Vec::with_capacity(capacity),
51            capacity,
52        }
53    }
54
55    fn len(&self) -> usize {
56        self.destinations.len()
57    }
58
59    fn is_full(&self) -> bool {
60        self.destinations.len() >= self.capacity
61    }
62
63    fn push(&mut self, dst: NodeId, edge_id: EdgeId) -> bool {
64        if self.is_full() {
65            return false;
66        }
67        self.destinations.push(dst);
68        self.edge_ids.push(edge_id);
69        true
70    }
71
72    fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
73        self.destinations
74            .iter()
75            .copied()
76            .zip(self.edge_ids.iter().copied())
77    }
78
79    /// Compresses this chunk into a `CompressedAdjacencyChunk`.
80    ///
81    /// The entries are sorted by destination node ID for better delta compression.
82    /// Use this for cold chunks that won't be modified.
83    fn compress(&self) -> CompressedAdjacencyChunk {
84        // Sort entries by destination for better delta compression
85        let mut entries: Vec<_> = self
86            .destinations
87            .iter()
88            .copied()
89            .zip(self.edge_ids.iter().copied())
90            .collect();
91        entries.sort_by_key(|(dst, _)| dst.as_u64());
92
93        // Extract sorted destinations and corresponding edge IDs
94        let sorted_dsts: Vec<u64> = entries.iter().map(|(d, _)| d.as_u64()).collect();
95        let sorted_edges: Vec<u64> = entries.iter().map(|(_, e)| e.as_u64()).collect();
96
97        let max_destination = sorted_dsts.last().copied().unwrap_or(0);
98
99        CompressedAdjacencyChunk {
100            destinations: DeltaBitPacked::encode(&sorted_dsts),
101            edge_ids: BitPackedInts::pack(&sorted_edges),
102            count: entries.len(),
103            max_destination,
104        }
105    }
106}
107
108/// A compressed chunk of adjacency entries.
109///
110/// Uses DeltaBitPacked for destination node IDs (sorted for good compression)
111/// and BitPackedInts for edge IDs. Typical compression ratio is 5-10x for
112/// dense adjacency lists.
113#[derive(Debug, Clone)]
114struct CompressedAdjacencyChunk {
115    /// Delta + bit-packed destination node IDs (sorted).
116    destinations: DeltaBitPacked,
117    /// Bit-packed edge IDs.
118    edge_ids: BitPackedInts,
119    /// Number of entries.
120    count: usize,
121    /// Maximum destination node ID (last element in sorted order).
122    max_destination: u64,
123}
124
125impl CompressedAdjacencyChunk {
126    /// Returns the number of entries in this chunk.
127    fn len(&self) -> usize {
128        self.count
129    }
130
131    /// Returns true if this chunk is empty.
132    #[cfg(test)]
133    fn is_empty(&self) -> bool {
134        self.count == 0
135    }
136
137    /// Decompresses and iterates over all entries.
138    fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
139        let dsts = self.destinations.decode();
140        let edges = self.edge_ids.unpack();
141
142        dsts.into_iter()
143            .zip(edges)
144            .map(|(d, e)| (NodeId::new(d), EdgeId::new(e)))
145    }
146
147    /// Returns the approximate memory size in bytes.
148    fn memory_size(&self) -> usize {
149        // DeltaBitPacked: 8 bytes base + packed deltas
150        // BitPackedInts: packed data
151        let dest_size = 8 + self.destinations.to_bytes().len();
152        let edge_size = self.edge_ids.data().len() * 8;
153        dest_size + edge_size
154    }
155
156    /// Returns the minimum destination ID in this chunk.
157    #[must_use]
158    fn min_destination(&self) -> u64 {
159        self.destinations.base()
160    }
161
162    /// Returns `(destination, edge_id)` pairs where destination is in `[min, max]`.
163    ///
164    /// Uses zone-map pruning to skip chunks entirely when the range doesn't
165    /// overlap, then `partition_point` for efficient sub-range extraction.
166    fn destinations_in_range(&self, min: u64, max: u64) -> Vec<(NodeId, EdgeId)> {
167        if min > self.max_destination || max < self.destinations.base() {
168            return Vec::new();
169        }
170        let destinations = self.destinations.decode();
171        let edges = self.edge_ids.unpack();
172        let start = destinations.partition_point(|&d| d < min);
173        let end = destinations.partition_point(|&d| d <= max);
174        destinations[start..end]
175            .iter()
176            .zip(&edges[start..end])
177            .map(|(&d, &e)| (NodeId::new(d), EdgeId::new(e)))
178            .collect()
179    }
180
181    /// Returns the compression ratio compared to uncompressed storage.
182    #[cfg(test)]
183    fn compression_ratio(&self) -> f64 {
184        if self.count == 0 {
185            return 1.0;
186        }
187        let uncompressed = self.count * 16; // 8 bytes each for NodeId and EdgeId
188        let compressed = self.memory_size();
189        if compressed == 0 {
190            return f64::INFINITY;
191        }
192        uncompressed as f64 / compressed as f64
193    }
194}
195
196/// Zone map entry for a single compressed cold chunk.
197///
198/// The skip index stores one entry per cold chunk, sorted by `min_destination`.
199/// Binary search on this index identifies which chunks might contain a target
200/// destination without decompressing any data.
201#[derive(Debug, Clone, Copy)]
202struct SkipIndexEntry {
203    /// Minimum destination ID in the chunk (from `DeltaBitPacked::base()`).
204    min_destination: u64,
205    /// Maximum destination ID in the chunk.
206    max_destination: u64,
207    /// Index into `AdjacencyList::cold_chunks`.
208    chunk_index: usize,
209}
210
211/// Adjacency list for a single node.
212///
213/// Uses a tiered storage model:
214/// - **Hot chunks**: Recent data, uncompressed for fast modification
215/// - **Cold chunks**: Older data, compressed for memory efficiency
216/// - **Delta buffer**: Very recent insertions, not yet compacted
217/// - **Skip index**: Zone map over cold chunks for O(log n) point lookups
218#[derive(Debug)]
219struct AdjacencyList {
220    /// Hot chunks (mutable, uncompressed) - for recent data.
221    hot_chunks: Vec<AdjacencyChunk>,
222    /// Cold chunks (immutable, compressed) - for older data.
223    cold_chunks: Vec<CompressedAdjacencyChunk>,
224    /// Delta buffer for recent insertions.
225    /// Uses SmallVec with 16 inline entries for cache-friendly access.
226    /// Most nodes have <16 recent insertions before compaction, so this
227    /// avoids heap allocation in the common case.
228    delta_inserts: SmallVec<[(NodeId, EdgeId); 16]>,
229    /// Set of deleted edge IDs.
230    deleted: FxHashSet<EdgeId>,
231    /// Zone map skip index over cold chunks, sorted by `min_destination`.
232    /// Enables O(log n) point lookups and range queries without decompressing
233    /// all cold chunks.
234    skip_index: Vec<SkipIndexEntry>,
235}
236
237impl AdjacencyList {
238    fn new() -> Self {
239        Self {
240            hot_chunks: Vec::new(),
241            cold_chunks: Vec::new(),
242            delta_inserts: SmallVec::new(),
243            deleted: FxHashSet::default(),
244            skip_index: Vec::new(),
245        }
246    }
247
248    fn add_edge(&mut self, dst: NodeId, edge_id: EdgeId) {
249        // Try to add to the last hot chunk
250        if let Some(last) = self.hot_chunks.last_mut()
251            && last.push(dst, edge_id)
252        {
253            return;
254        }
255
256        // Add to delta buffer
257        self.delta_inserts.push((dst, edge_id));
258    }
259
260    fn mark_deleted(&mut self, edge_id: EdgeId) {
261        self.deleted.insert(edge_id);
262    }
263
264    fn compact(&mut self, chunk_capacity: usize) {
265        if self.delta_inserts.is_empty() {
266            return;
267        }
268
269        // Create new chunks from delta buffer
270        // Check if last hot chunk has room, and if so, pop it to continue filling
271        let last_has_room = self.hot_chunks.last().is_some_and(|c| !c.is_full());
272        let mut current_chunk = if last_has_room {
273            // Invariant: is_some_and() returned true, so hot_chunks is non-empty
274            self.hot_chunks
275                .pop()
276                .expect("hot_chunks is non-empty: is_some_and() succeeded on previous line")
277        } else {
278            AdjacencyChunk::new(chunk_capacity)
279        };
280
281        for (dst, edge_id) in self.delta_inserts.drain(..) {
282            if !current_chunk.push(dst, edge_id) {
283                self.hot_chunks.push(current_chunk);
284                current_chunk = AdjacencyChunk::new(chunk_capacity);
285                current_chunk.push(dst, edge_id);
286            }
287        }
288
289        if current_chunk.len() > 0 {
290            self.hot_chunks.push(current_chunk);
291        }
292
293        // Check if we should compress some hot chunks to cold
294        self.maybe_compress_to_cold();
295    }
296
297    /// Compresses oldest hot chunks to cold storage if threshold exceeded.
298    ///
299    /// Builds skip index entries for each new cold chunk, enabling O(log n)
300    /// point lookups via zone-map pruning.
301    fn maybe_compress_to_cold(&mut self) {
302        // Keep at least COLD_COMPRESSION_THRESHOLD hot chunks for write performance
303        while self.hot_chunks.len() > COLD_COMPRESSION_THRESHOLD {
304            // Remove the oldest (first) hot chunk
305            let oldest = self.hot_chunks.remove(0);
306
307            // Skip empty chunks
308            if oldest.len() == 0 {
309                continue;
310            }
311
312            // Compress and add to cold storage
313            let compressed = oldest.compress();
314            let chunk_index = self.cold_chunks.len();
315            self.skip_index.push(SkipIndexEntry {
316                min_destination: compressed.min_destination(),
317                max_destination: compressed.max_destination,
318                chunk_index,
319            });
320            self.cold_chunks.push(compressed);
321        }
322        // Maintain sort order for binary search
323        self.skip_index.sort_unstable_by_key(|e| e.min_destination);
324    }
325
326    /// Forces all hot chunks to be compressed to cold storage.
327    ///
328    /// Useful when memory pressure is high or the node is rarely accessed.
329    /// Rebuilds the skip index to include all newly compressed chunks.
330    fn freeze_all(&mut self) {
331        for chunk in self.hot_chunks.drain(..) {
332            if chunk.len() > 0 {
333                let compressed = chunk.compress();
334                let chunk_index = self.cold_chunks.len();
335                self.skip_index.push(SkipIndexEntry {
336                    min_destination: compressed.min_destination(),
337                    max_destination: compressed.max_destination,
338                    chunk_index,
339                });
340                self.cold_chunks.push(compressed);
341            }
342        }
343        self.skip_index.sort_unstable_by_key(|e| e.min_destination);
344    }
345
346    fn iter(&self) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
347        let deleted = &self.deleted;
348
349        // Iterate cold chunks first (oldest data)
350        let cold_iter = self.cold_chunks.iter().flat_map(|c| c.iter());
351
352        // Then hot chunks
353        let hot_iter = self.hot_chunks.iter().flat_map(|c| c.iter());
354
355        // Finally delta buffer (newest data)
356        let delta_iter = self.delta_inserts.iter().copied();
357
358        cold_iter
359            .chain(hot_iter)
360            .chain(delta_iter)
361            .filter(move |(_, edge_id)| !deleted.contains(edge_id))
362    }
363
364    /// Checks whether a specific destination node exists in this list.
365    ///
366    /// Uses the skip index for O(log n) lookup over cold chunks (only
367    /// decompresses chunks whose zone maps overlap the target). Then scans
368    /// hot chunks and the delta buffer linearly. Respects soft-deleted edges.
369    fn contains(&self, destination: NodeId) -> bool {
370        let dst_raw = destination.as_u64();
371        let deleted = &self.deleted;
372
373        // Cold chunks: use skip index to find candidate chunks
374        for entry in &self.skip_index {
375            if dst_raw < entry.min_destination || dst_raw > entry.max_destination {
376                continue;
377            }
378            let chunk = &self.cold_chunks[entry.chunk_index];
379            let decoded_dsts = chunk.destinations.decode();
380            let decoded_edges = chunk.edge_ids.unpack();
381            if let Ok(pos) = decoded_dsts.binary_search(&dst_raw) {
382                // Check this position and adjacent duplicates
383                let mut i = pos;
384                while i > 0 && decoded_dsts[i - 1] == dst_raw {
385                    i -= 1;
386                }
387                for j in i..decoded_dsts.len() {
388                    if decoded_dsts[j] != dst_raw {
389                        break;
390                    }
391                    if !deleted.contains(&EdgeId::new(decoded_edges[j])) {
392                        return true;
393                    }
394                }
395            }
396        }
397
398        // Hot chunks: linear scan (small, unsorted)
399        for chunk in &self.hot_chunks {
400            for (dst, edge_id) in chunk.iter() {
401                if dst == destination && !deleted.contains(&edge_id) {
402                    return true;
403                }
404            }
405        }
406
407        // Delta buffer: linear scan
408        for &(dst, edge_id) in &self.delta_inserts {
409            if dst == destination && !deleted.contains(&edge_id) {
410                return true;
411            }
412        }
413
414        false
415    }
416
417    /// Returns edges whose destination falls in `[min, max]` (inclusive).
418    ///
419    /// Uses skip index zone-map pruning over cold chunks, then linear scan
420    /// of hot chunks and delta buffer. Respects soft-deleted edges.
421    fn destinations_in_range(&self, min: NodeId, max: NodeId) -> Vec<(NodeId, EdgeId)> {
422        let min_raw = min.as_u64();
423        let max_raw = max.as_u64();
424        let deleted = &self.deleted;
425        let mut results = Vec::new();
426
427        // Cold chunks: skip index prunes non-overlapping chunks
428        for entry in &self.skip_index {
429            if entry.max_destination < min_raw || entry.min_destination > max_raw {
430                continue;
431            }
432            let chunk = &self.cold_chunks[entry.chunk_index];
433            results.extend(
434                chunk
435                    .destinations_in_range(min_raw, max_raw)
436                    .into_iter()
437                    .filter(|(_, eid)| !deleted.contains(eid)),
438            );
439        }
440
441        // Hot chunks: linear scan
442        for chunk in &self.hot_chunks {
443            for (dst, edge_id) in chunk.iter() {
444                if dst.as_u64() >= min_raw && dst.as_u64() <= max_raw && !deleted.contains(&edge_id)
445                {
446                    results.push((dst, edge_id));
447                }
448            }
449        }
450
451        // Delta buffer: linear scan
452        for &(dst, edge_id) in &self.delta_inserts {
453            if dst.as_u64() >= min_raw && dst.as_u64() <= max_raw && !deleted.contains(&edge_id) {
454                results.push((dst, edge_id));
455            }
456        }
457
458        results
459    }
460
461    fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
462        self.iter().map(|(dst, _)| dst)
463    }
464
465    fn degree(&self) -> usize {
466        self.iter().count()
467    }
468
469    /// Returns the number of entries in hot storage.
470    fn hot_count(&self) -> usize {
471        self.hot_chunks.iter().map(|c| c.len()).sum::<usize>() + self.delta_inserts.len()
472    }
473
474    /// Returns the number of entries in cold storage.
475    fn cold_count(&self) -> usize {
476        self.cold_chunks.iter().map(|c| c.len()).sum()
477    }
478
479    /// Returns the approximate memory size in bytes.
480    #[cfg(test)]
481    fn memory_size(&self) -> usize {
482        // Hot chunks: full uncompressed size
483        let hot_size = self.hot_chunks.iter().map(|c| c.len() * 16).sum::<usize>();
484
485        // Cold chunks: compressed size
486        let cold_size = self
487            .cold_chunks
488            .iter()
489            .map(|c| c.memory_size())
490            .sum::<usize>();
491
492        // Delta buffer
493        let delta_size = self.delta_inserts.len() * 16;
494
495        // Deleted set (rough estimate)
496        let deleted_size = self.deleted.len() * 16;
497
498        hot_size + cold_size + delta_size + deleted_size
499    }
500}
501
502/// The main structure for traversing graph edges.
503///
504/// Given a node, this tells you all its neighbors and the edges connecting them.
505/// Internally uses chunked storage (64 edges per chunk) with a delta buffer for
506/// recent inserts. Deletions are soft (tombstones) until compaction.
507///
508/// # Example
509///
510/// ```
511/// use grafeo_core::index::ChunkedAdjacency;
512/// use grafeo_common::types::{NodeId, EdgeId};
513///
514/// let adj = ChunkedAdjacency::new();
515///
516/// // Build a star graph: node 0 connects to nodes 1, 2, 3
517/// adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(100));
518/// adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(101));
519/// adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(102));
520///
521/// // Fast neighbor lookup
522/// let neighbors = adj.neighbors(NodeId::new(0));
523/// assert_eq!(neighbors.len(), 3);
524/// ```
525pub struct ChunkedAdjacency {
526    /// Adjacency lists indexed by source node.
527    /// Lock order: 10 (nested, acquired via LpgStore::forward_adj/backward_adj)
528    lists: RwLock<FxHashMap<NodeId, AdjacencyList>>,
529    /// Chunk capacity for new chunks.
530    chunk_capacity: usize,
531    /// Total number of edges (including deleted).
532    edge_count: AtomicUsize,
533    /// Number of deleted edges.
534    deleted_count: AtomicUsize,
535}
536
537impl ChunkedAdjacency {
538    /// Creates a new chunked adjacency structure.
539    #[must_use]
540    pub fn new() -> Self {
541        Self::with_chunk_capacity(DEFAULT_CHUNK_CAPACITY)
542    }
543
544    /// Creates a new chunked adjacency with custom chunk capacity.
545    #[must_use]
546    pub fn with_chunk_capacity(capacity: usize) -> Self {
547        Self {
548            lists: RwLock::new(FxHashMap::default()),
549            chunk_capacity: capacity,
550            edge_count: AtomicUsize::new(0),
551            deleted_count: AtomicUsize::new(0),
552        }
553    }
554
555    /// Adds an edge from src to dst.
556    pub fn add_edge(&self, src: NodeId, dst: NodeId, edge_id: EdgeId) {
557        let mut lists = self.lists.write();
558        lists
559            .entry(src)
560            .or_insert_with(AdjacencyList::new)
561            .add_edge(dst, edge_id);
562        self.edge_count.fetch_add(1, Ordering::Relaxed);
563    }
564
565    /// Adds multiple edges in a single lock acquisition.
566    ///
567    /// Each tuple is `(src, dst, edge_id)`. Takes the write lock once and
568    /// inserts all edges, then compacts any lists that exceed the delta
569    /// threshold. Significantly faster than calling `add_edge()` in a loop
570    /// for bulk imports.
571    pub fn batch_add_edges(&self, edges: &[(NodeId, NodeId, EdgeId)]) {
572        if edges.is_empty() {
573            return;
574        }
575        let mut lists = self.lists.write();
576        for &(src, dst, edge_id) in edges {
577            lists
578                .entry(src)
579                .or_insert_with(AdjacencyList::new)
580                .add_edge(dst, edge_id);
581        }
582        self.edge_count.fetch_add(edges.len(), Ordering::Relaxed);
583
584        // Compact any lists that overflowed their delta buffer
585        for list in lists.values_mut() {
586            if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
587                list.compact(self.chunk_capacity);
588            }
589        }
590    }
591
592    /// Marks an edge as deleted.
593    pub fn mark_deleted(&self, src: NodeId, edge_id: EdgeId) {
594        let mut lists = self.lists.write();
595        if let Some(list) = lists.get_mut(&src) {
596            list.mark_deleted(edge_id);
597            self.deleted_count.fetch_add(1, Ordering::Relaxed);
598        }
599    }
600
601    /// Restores a previously deleted edge (removes it from the deleted set).
602    ///
603    /// Used during transaction rollback to undo a soft-delete.
604    pub fn unmark_deleted(&self, src: NodeId, edge_id: EdgeId) {
605        let mut lists = self.lists.write();
606        if let Some(list) = lists.get_mut(&src)
607            && list.deleted.remove(&edge_id)
608        {
609            self.deleted_count.fetch_sub(1, Ordering::Relaxed);
610        }
611    }
612
613    /// Returns all neighbors of a node.
614    ///
615    /// Note: This allocates a Vec to collect neighbors while the internal lock
616    /// is held, then returns the Vec. For traversal performance, consider using
617    /// `edges_from` if you also need edge IDs, to avoid multiple lookups.
618    #[must_use]
619    pub fn neighbors(&self, src: NodeId) -> Vec<NodeId> {
620        let lists = self.lists.read();
621        lists
622            .get(&src)
623            .map(|list| list.neighbors().collect())
624            .unwrap_or_default()
625    }
626
627    /// Returns all (neighbor, edge_id) pairs for outgoing edges from a node.
628    ///
629    /// Note: This allocates a Vec to collect edges while the internal lock
630    /// is held, then returns the Vec. This is intentional to avoid holding
631    /// the lock across iteration.
632    #[must_use]
633    pub fn edges_from(&self, src: NodeId) -> Vec<(NodeId, EdgeId)> {
634        let lists = self.lists.read();
635        lists
636            .get(&src)
637            .map(|list| list.iter().collect())
638            .unwrap_or_default()
639    }
640
641    /// Returns the out-degree of a node (number of outgoing edges).
642    ///
643    /// For forward adjacency, this counts edges where `src` is the source.
644    pub fn out_degree(&self, src: NodeId) -> usize {
645        let lists = self.lists.read();
646        lists.get(&src).map_or(0, |list| list.degree())
647    }
648
649    /// Returns the in-degree of a node (number of incoming edges).
650    ///
651    /// This is semantically equivalent to `out_degree` but named differently
652    /// for use with backward adjacency where edges are stored in reverse.
653    /// When called on `backward_adj`, this returns the count of edges
654    /// where `node` is the destination.
655    pub fn in_degree(&self, node: NodeId) -> usize {
656        let lists = self.lists.read();
657        lists.get(&node).map_or(0, |list| list.degree())
658    }
659
660    /// Compacts all adjacency lists.
661    pub fn compact(&self) {
662        let mut lists = self.lists.write();
663        for list in lists.values_mut() {
664            list.compact(self.chunk_capacity);
665        }
666    }
667
668    /// Compacts delta buffers that exceed the threshold.
669    pub fn compact_if_needed(&self) {
670        let mut lists = self.lists.write();
671        for list in lists.values_mut() {
672            if list.delta_inserts.len() >= DELTA_COMPACTION_THRESHOLD {
673                list.compact(self.chunk_capacity);
674            }
675        }
676    }
677
678    /// Returns the total number of edges (including deleted).
679    pub fn total_edge_count(&self) -> usize {
680        self.edge_count.load(Ordering::Relaxed)
681    }
682
683    /// Returns the number of active (non-deleted) edges.
684    pub fn active_edge_count(&self) -> usize {
685        self.edge_count.load(Ordering::Relaxed) - self.deleted_count.load(Ordering::Relaxed)
686    }
687
688    /// Returns the number of nodes with adjacency lists.
689    pub fn node_count(&self) -> usize {
690        self.lists.read().len()
691    }
692
693    /// Checks if an edge from `src` to `dst` exists (not deleted).
694    ///
695    /// Uses zone-map skip index over cold chunks for O(log n) lookup
696    /// when most data is in cold storage. Hot chunks and the delta buffer
697    /// are scanned linearly.
698    #[must_use]
699    pub fn contains_edge(&self, src: NodeId, dst: NodeId) -> bool {
700        let lists = self.lists.read();
701        lists.get(&src).is_some_and(|list| list.contains(dst))
702    }
703
704    /// Returns edges from `src` whose destination is in `[min_dst, max_dst]`.
705    ///
706    /// Only decompresses cold chunks whose zone maps overlap the requested range.
707    #[must_use]
708    pub fn edges_in_range(
709        &self,
710        src: NodeId,
711        min_dst: NodeId,
712        max_dst: NodeId,
713    ) -> Vec<(NodeId, EdgeId)> {
714        let lists = self.lists.read();
715        lists
716            .get(&src)
717            .map(|list| list.destinations_in_range(min_dst, max_dst))
718            .unwrap_or_default()
719    }
720
721    /// Clears all adjacency lists.
722    pub fn clear(&self) {
723        let mut lists = self.lists.write();
724        lists.clear();
725        self.edge_count.store(0, Ordering::Relaxed);
726        self.deleted_count.store(0, Ordering::Relaxed);
727    }
728
729    /// Returns memory statistics for this adjacency structure.
730    #[must_use]
731    pub fn memory_stats(&self) -> AdjacencyMemoryStats {
732        let lists = self.lists.read();
733
734        let mut hot_entries = 0usize;
735        let mut cold_entries = 0usize;
736        let mut hot_bytes = 0usize;
737        let mut cold_bytes = 0usize;
738
739        for list in lists.values() {
740            hot_entries += list.hot_count();
741            cold_entries += list.cold_count();
742
743            // Hot: uncompressed (16 bytes per entry)
744            hot_bytes += list.hot_count() * 16;
745
746            // Cold: compressed size from memory_size()
747            for cold_chunk in &list.cold_chunks {
748                cold_bytes += cold_chunk.memory_size();
749            }
750        }
751
752        AdjacencyMemoryStats {
753            hot_entries,
754            cold_entries,
755            hot_bytes,
756            cold_bytes,
757            node_count: lists.len(),
758        }
759    }
760
761    /// Returns estimated heap memory in bytes.
762    #[must_use]
763    pub fn heap_memory_bytes(&self) -> usize {
764        let lists = self.lists.read();
765        // Outer hash map overhead
766        let map_overhead = lists.capacity()
767            * (std::mem::size_of::<NodeId>() + std::mem::size_of::<AdjacencyList>() + 1);
768        // Per-list memory: hot chunks + cold chunks + deltas + deleted set
769        let mut list_bytes = 0usize;
770        for list in lists.values() {
771            // Hot chunks: Vec<AdjacencyChunk> capacity + each chunk's Vec capacity
772            list_bytes += list.hot_chunks.capacity() * std::mem::size_of::<AdjacencyChunk>();
773            for chunk in &list.hot_chunks {
774                list_bytes += chunk.destinations.capacity() * std::mem::size_of::<NodeId>();
775                list_bytes += chunk.edge_ids.capacity() * std::mem::size_of::<EdgeId>();
776            }
777            // Cold chunks: compressed data
778            list_bytes +=
779                list.cold_chunks.capacity() * std::mem::size_of::<CompressedAdjacencyChunk>();
780            for cold in &list.cold_chunks {
781                list_bytes += cold.memory_size();
782            }
783            // Delta buffer (SmallVec inline when < 16 entries)
784            if list.delta_inserts.spilled() {
785                list_bytes += list.delta_inserts.capacity() * 16;
786            }
787            // Deleted set
788            list_bytes += list.deleted.capacity() * (std::mem::size_of::<EdgeId>() + 1);
789            // Skip index
790            list_bytes += list.skip_index.capacity() * std::mem::size_of::<SkipIndexEntry>();
791        }
792        map_overhead + list_bytes
793    }
794
795    /// Forces all hot chunks to be compressed for all adjacency lists.
796    ///
797    /// This is useful when memory pressure is high or during shutdown.
798    pub fn freeze_all(&self) {
799        let mut lists = self.lists.write();
800        for list in lists.values_mut() {
801            list.freeze_all();
802        }
803    }
804}
805
806/// Memory statistics for the adjacency structure.
807#[derive(Debug, Clone)]
808pub struct AdjacencyMemoryStats {
809    /// Number of entries in hot (uncompressed) storage.
810    pub hot_entries: usize,
811    /// Number of entries in cold (compressed) storage.
812    pub cold_entries: usize,
813    /// Bytes used by hot storage.
814    pub hot_bytes: usize,
815    /// Bytes used by cold storage.
816    pub cold_bytes: usize,
817    /// Number of nodes with adjacency lists.
818    pub node_count: usize,
819}
820
821impl AdjacencyMemoryStats {
822    /// Returns the total number of entries.
823    #[must_use]
824    pub fn total_entries(&self) -> usize {
825        self.hot_entries + self.cold_entries
826    }
827
828    /// Returns the total memory used in bytes.
829    #[must_use]
830    pub fn total_bytes(&self) -> usize {
831        self.hot_bytes + self.cold_bytes
832    }
833
834    /// Returns the compression ratio for cold storage.
835    ///
836    /// Values > 1.0 indicate actual compression.
837    #[must_use]
838    pub fn cold_compression_ratio(&self) -> f64 {
839        if self.cold_entries == 0 || self.cold_bytes == 0 {
840            return 1.0;
841        }
842        let uncompressed = self.cold_entries * 16;
843        uncompressed as f64 / self.cold_bytes as f64
844    }
845
846    /// Returns the overall compression ratio.
847    #[must_use]
848    pub fn overall_compression_ratio(&self) -> f64 {
849        let total_entries = self.total_entries();
850        if total_entries == 0 || self.total_bytes() == 0 {
851            return 1.0;
852        }
853        let uncompressed = total_entries * 16;
854        uncompressed as f64 / self.total_bytes() as f64
855    }
856}
857
858impl Default for ChunkedAdjacency {
859    fn default() -> Self {
860        Self::new()
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867
868    #[test]
869    fn test_basic_adjacency() {
870        let adj = ChunkedAdjacency::new();
871
872        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
873        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
874        adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
875
876        let neighbors = adj.neighbors(NodeId::new(0));
877        assert_eq!(neighbors.len(), 3);
878        assert!(neighbors.contains(&NodeId::new(1)));
879        assert!(neighbors.contains(&NodeId::new(2)));
880        assert!(neighbors.contains(&NodeId::new(3)));
881    }
882
883    #[test]
884    fn test_out_degree() {
885        let adj = ChunkedAdjacency::new();
886
887        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
888        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
889
890        assert_eq!(adj.out_degree(NodeId::new(0)), 2);
891        assert_eq!(adj.out_degree(NodeId::new(1)), 0);
892    }
893
894    #[test]
895    fn test_mark_deleted() {
896        let adj = ChunkedAdjacency::new();
897
898        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
899        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
900
901        adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
902
903        let neighbors = adj.neighbors(NodeId::new(0));
904        assert_eq!(neighbors.len(), 1);
905        assert!(neighbors.contains(&NodeId::new(2)));
906    }
907
908    #[test]
909    fn test_edges_from() {
910        let adj = ChunkedAdjacency::new();
911
912        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
913        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
914
915        let edges = adj.edges_from(NodeId::new(0));
916        assert_eq!(edges.len(), 2);
917        assert!(edges.contains(&(NodeId::new(1), EdgeId::new(10))));
918        assert!(edges.contains(&(NodeId::new(2), EdgeId::new(20))));
919    }
920
921    #[test]
922    fn test_compaction() {
923        let adj = ChunkedAdjacency::with_chunk_capacity(4);
924
925        // Add more edges than chunk capacity
926        for i in 0..10 {
927            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
928        }
929
930        adj.compact();
931
932        // All edges should still be accessible
933        let neighbors = adj.neighbors(NodeId::new(0));
934        assert_eq!(neighbors.len(), 10);
935    }
936
937    #[test]
938    fn test_edge_counts() {
939        let adj = ChunkedAdjacency::new();
940
941        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
942        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
943        adj.add_edge(NodeId::new(1), NodeId::new(2), EdgeId::new(2));
944
945        assert_eq!(adj.total_edge_count(), 3);
946        assert_eq!(adj.active_edge_count(), 3);
947
948        adj.mark_deleted(NodeId::new(0), EdgeId::new(0));
949
950        assert_eq!(adj.total_edge_count(), 3);
951        assert_eq!(adj.active_edge_count(), 2);
952    }
953
954    #[test]
955    fn test_clear() {
956        let adj = ChunkedAdjacency::new();
957
958        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
959        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
960
961        adj.clear();
962
963        assert_eq!(adj.total_edge_count(), 0);
964        assert_eq!(adj.node_count(), 0);
965    }
966
967    #[test]
968    fn test_chunk_compression() {
969        // Create a chunk with some edges
970        let mut chunk = AdjacencyChunk::new(64);
971
972        // Add edges with various destination IDs
973        for i in 0..20 {
974            chunk.push(NodeId::new(100 + i * 5), EdgeId::new(1000 + i));
975        }
976
977        // Compress the chunk
978        let compressed = chunk.compress();
979
980        // Verify all data is preserved
981        assert_eq!(compressed.len(), 20);
982
983        // Decompress and verify
984        let entries: Vec<_> = compressed.iter().collect();
985        assert_eq!(entries.len(), 20);
986
987        // After compression, entries are sorted by destination
988        // Verify destinations are sorted
989        for window in entries.windows(2) {
990            assert!(window[0].0.as_u64() <= window[1].0.as_u64());
991        }
992
993        // Verify all original destinations are present
994        let original_dsts: std::collections::HashSet<_> =
995            (0..20).map(|i| NodeId::new(100 + i * 5)).collect();
996        let compressed_dsts: std::collections::HashSet<_> =
997            entries.iter().map(|(d, _)| *d).collect();
998        assert_eq!(original_dsts, compressed_dsts);
999
1000        // Check compression ratio (should be > 1.0 for sorted data)
1001        let ratio = compressed.compression_ratio();
1002        assert!(
1003            ratio > 1.0,
1004            "Expected compression ratio > 1.0, got {}",
1005            ratio
1006        );
1007    }
1008
1009    #[test]
1010    fn test_empty_chunk_compression() {
1011        let chunk = AdjacencyChunk::new(64);
1012        let compressed = chunk.compress();
1013
1014        assert_eq!(compressed.len(), 0);
1015        assert!(compressed.is_empty());
1016        assert_eq!(compressed.iter().count(), 0);
1017    }
1018
1019    #[test]
1020    fn test_hot_to_cold_migration() {
1021        // Use small chunk capacity to trigger cold compression faster
1022        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1023
1024        // Add many edges to force multiple chunks and cold compression
1025        // With chunk_capacity=8 and COLD_COMPRESSION_THRESHOLD=4,
1026        // we need more than 4 * 8 = 32 edges to trigger cold compression
1027        for i in 0..100 {
1028            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1029        }
1030
1031        // Force compaction to trigger hot→cold migration
1032        adj.compact();
1033
1034        // All edges should still be accessible
1035        let neighbors = adj.neighbors(NodeId::new(0));
1036        assert_eq!(neighbors.len(), 100);
1037
1038        // Check memory stats
1039        let stats = adj.memory_stats();
1040        assert_eq!(stats.total_entries(), 100);
1041
1042        // With 100 edges and threshold of 4 hot chunks (32 edges),
1043        // we should have some cold entries
1044        assert!(
1045            stats.cold_entries > 0,
1046            "Expected some cold entries, got {}",
1047            stats.cold_entries
1048        );
1049    }
1050
1051    #[test]
1052    fn test_memory_stats() {
1053        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1054
1055        // Add edges
1056        for i in 0..20 {
1057            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1058        }
1059
1060        adj.compact();
1061
1062        let stats = adj.memory_stats();
1063        assert_eq!(stats.total_entries(), 20);
1064        assert_eq!(stats.node_count, 1);
1065        assert!(stats.total_bytes() > 0);
1066    }
1067
1068    #[test]
1069    fn test_freeze_all() {
1070        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1071
1072        // Add some edges
1073        for i in 0..30 {
1074            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1075        }
1076
1077        adj.compact();
1078
1079        // Get initial stats
1080        let before = adj.memory_stats();
1081
1082        // Freeze all hot chunks
1083        adj.freeze_all();
1084
1085        // Get stats after freeze
1086        let after = adj.memory_stats();
1087
1088        // All data should now be in cold storage
1089        assert_eq!(after.hot_entries, 0);
1090        assert_eq!(after.cold_entries, before.total_entries());
1091
1092        // All edges should still be accessible
1093        let neighbors = adj.neighbors(NodeId::new(0));
1094        assert_eq!(neighbors.len(), 30);
1095    }
1096
1097    #[test]
1098    fn test_cold_compression_ratio() {
1099        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1100
1101        // Add many edges with sequential IDs (compresses well)
1102        for i in 0..200 {
1103            adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
1104        }
1105
1106        adj.compact();
1107        adj.freeze_all();
1108
1109        let stats = adj.memory_stats();
1110
1111        // Sequential data should compress well
1112        let ratio = stats.cold_compression_ratio();
1113        assert!(
1114            ratio > 1.5,
1115            "Expected cold compression ratio > 1.5, got {}",
1116            ratio
1117        );
1118    }
1119
1120    #[test]
1121    fn test_deleted_edges_with_cold_storage() {
1122        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1123
1124        // Add edges
1125        for i in 0..50 {
1126            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1127        }
1128
1129        adj.compact();
1130
1131        // Delete some edges (some will be in cold storage after compaction)
1132        for i in (0..50).step_by(2) {
1133            adj.mark_deleted(NodeId::new(0), EdgeId::new(i));
1134        }
1135
1136        // Should have half the edges
1137        let neighbors = adj.neighbors(NodeId::new(0));
1138        assert_eq!(neighbors.len(), 25);
1139
1140        // Verify only odd-numbered destinations remain
1141        for neighbor in neighbors {
1142            assert!(neighbor.as_u64() % 2 == 0); // Original IDs were i+1, so even means odd i
1143        }
1144    }
1145
1146    #[test]
1147    fn test_adjacency_list_memory_size() {
1148        let mut list = AdjacencyList::new();
1149
1150        // Add edges
1151        for i in 0..50 {
1152            list.add_edge(NodeId::new(i + 1), EdgeId::new(i));
1153        }
1154
1155        // Compact with small chunk capacity to get multiple chunks
1156        list.compact(8);
1157
1158        let size = list.memory_size();
1159        assert!(size > 0);
1160
1161        // Size should be roughly proportional to entry count
1162        // Each entry is 16 bytes uncompressed
1163        assert!(size <= 50 * 16 + 200); // Allow some overhead
1164    }
1165
1166    #[test]
1167    fn test_cold_iteration_order() {
1168        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1169
1170        // Add edges in order
1171        for i in 0..50 {
1172            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1173        }
1174
1175        adj.compact();
1176
1177        // Collect all edges
1178        let edges = adj.edges_from(NodeId::new(0));
1179
1180        // All edges should be present
1181        assert_eq!(edges.len(), 50);
1182
1183        // Verify edge IDs are present (may be reordered due to compression sorting)
1184        let edge_ids: std::collections::HashSet<_> = edges.iter().map(|(_, e)| *e).collect();
1185        for i in 0..50 {
1186            assert!(edge_ids.contains(&EdgeId::new(i)));
1187        }
1188    }
1189
1190    #[test]
1191    fn test_in_degree() {
1192        let adj = ChunkedAdjacency::new();
1193
1194        // Simulate backward adjacency: edges stored as (dst, src)
1195        // Edge 1->2: backward stores (2, 1)
1196        // Edge 3->2: backward stores (2, 3)
1197        adj.add_edge(NodeId::new(2), NodeId::new(1), EdgeId::new(0)); // 1->2 in backward
1198        adj.add_edge(NodeId::new(2), NodeId::new(3), EdgeId::new(1)); // 3->2 in backward
1199
1200        // In-degree of node 2 is 2 (two edges point to it)
1201        assert_eq!(adj.in_degree(NodeId::new(2)), 2);
1202
1203        // Node 1 has no incoming edges
1204        assert_eq!(adj.in_degree(NodeId::new(1)), 0);
1205    }
1206
1207    #[test]
1208    fn test_bidirectional_edges() {
1209        let forward = ChunkedAdjacency::new();
1210        let backward = ChunkedAdjacency::new();
1211
1212        // Add edge: 1 -> 2
1213        let edge_id = EdgeId::new(100);
1214        forward.add_edge(NodeId::new(1), NodeId::new(2), edge_id);
1215        backward.add_edge(NodeId::new(2), NodeId::new(1), edge_id); // Reverse for backward!
1216
1217        // Forward: edges from node 1 → returns (dst=2, edge_id)
1218        let forward_edges = forward.edges_from(NodeId::new(1));
1219        assert_eq!(forward_edges.len(), 1);
1220        assert_eq!(forward_edges[0], (NodeId::new(2), edge_id));
1221
1222        // Forward: node 2 has no outgoing edges
1223        assert_eq!(forward.edges_from(NodeId::new(2)).len(), 0);
1224
1225        // Backward: edges to node 2 → stored as edges_from(2) → returns (src=1, edge_id)
1226        let backward_edges = backward.edges_from(NodeId::new(2));
1227        assert_eq!(backward_edges.len(), 1);
1228        assert_eq!(backward_edges[0], (NodeId::new(1), edge_id));
1229
1230        // Backward: node 1 has no incoming edges
1231        assert_eq!(backward.edges_from(NodeId::new(1)).len(), 0);
1232    }
1233
1234    #[test]
1235    fn test_bidirectional_chain() {
1236        // Test chain: A -> B -> C
1237        let forward = ChunkedAdjacency::new();
1238        let backward = ChunkedAdjacency::new();
1239
1240        let a = NodeId::new(1);
1241        let b = NodeId::new(2);
1242        let c = NodeId::new(3);
1243
1244        // Edge A -> B
1245        let edge_ab = EdgeId::new(10);
1246        forward.add_edge(a, b, edge_ab);
1247        backward.add_edge(b, a, edge_ab);
1248
1249        // Edge B -> C
1250        let edge_bc = EdgeId::new(20);
1251        forward.add_edge(b, c, edge_bc);
1252        backward.add_edge(c, b, edge_bc);
1253
1254        // Forward traversal from A: should reach B
1255        let from_a = forward.edges_from(a);
1256        assert_eq!(from_a.len(), 1);
1257        assert_eq!(from_a[0].0, b);
1258
1259        // Forward traversal from B: should reach C
1260        let from_b = forward.edges_from(b);
1261        assert_eq!(from_b.len(), 1);
1262        assert_eq!(from_b[0].0, c);
1263
1264        // Backward traversal to C: should find B
1265        let to_c = backward.edges_from(c);
1266        assert_eq!(to_c.len(), 1);
1267        assert_eq!(to_c[0].0, b);
1268
1269        // Backward traversal to B: should find A
1270        let to_b = backward.edges_from(b);
1271        assert_eq!(to_b.len(), 1);
1272        assert_eq!(to_b[0].0, a);
1273
1274        // Node A has no incoming edges
1275        assert_eq!(backward.edges_from(a).len(), 0);
1276
1277        // Node C has no outgoing edges
1278        assert_eq!(forward.edges_from(c).len(), 0);
1279    }
1280
1281    // === Skip Index Tests ===
1282
1283    #[test]
1284    fn test_contains_edge_basic() {
1285        let adj = ChunkedAdjacency::new();
1286
1287        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(0));
1288        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(1));
1289        adj.add_edge(NodeId::new(0), NodeId::new(3), EdgeId::new(2));
1290
1291        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(1)));
1292        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(2)));
1293        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(3)));
1294        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(4)));
1295        assert!(!adj.contains_edge(NodeId::new(1), NodeId::new(0)));
1296    }
1297
1298    #[test]
1299    fn test_contains_edge_after_delete() {
1300        let adj = ChunkedAdjacency::new();
1301
1302        adj.add_edge(NodeId::new(0), NodeId::new(1), EdgeId::new(10));
1303        adj.add_edge(NodeId::new(0), NodeId::new(2), EdgeId::new(20));
1304
1305        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(1)));
1306
1307        adj.mark_deleted(NodeId::new(0), EdgeId::new(10));
1308
1309        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(1)));
1310        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(2)));
1311    }
1312
1313    #[test]
1314    fn test_contains_edge_in_cold_storage() {
1315        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1316
1317        // Add enough edges to trigger hot→cold compression
1318        for i in 0..100 {
1319            adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
1320        }
1321
1322        adj.compact();
1323        adj.freeze_all();
1324
1325        // All edges should be findable via skip index
1326        for i in 0..100 {
1327            assert!(
1328                adj.contains_edge(NodeId::new(0), NodeId::new(100 + i)),
1329                "Should find destination {} in cold storage",
1330                100 + i
1331            );
1332        }
1333
1334        // Non-existent destinations should not be found
1335        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(0)));
1336        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(99)));
1337        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(200)));
1338    }
1339
1340    #[test]
1341    fn test_contains_edge_in_delta_only() {
1342        let adj = ChunkedAdjacency::new();
1343
1344        // Add just a few edges (stays in delta buffer)
1345        adj.add_edge(NodeId::new(0), NodeId::new(5), EdgeId::new(0));
1346        adj.add_edge(NodeId::new(0), NodeId::new(10), EdgeId::new(1));
1347
1348        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(5)));
1349        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(10)));
1350        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(7)));
1351    }
1352
1353    #[test]
1354    fn test_edges_in_range() {
1355        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1356
1357        // Add edges with destinations 100..200
1358        for i in 0..100 {
1359            adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
1360        }
1361
1362        adj.compact();
1363
1364        // Range [130, 140] should return 11 edges (130..=140)
1365        let results = adj.edges_in_range(NodeId::new(0), NodeId::new(130), NodeId::new(140));
1366        assert_eq!(
1367            results.len(),
1368            11,
1369            "Expected 11 edges in range [130, 140], got {}",
1370            results.len()
1371        );
1372
1373        // Verify all results are in range
1374        for (dst, _) in &results {
1375            assert!(dst.as_u64() >= 130 && dst.as_u64() <= 140);
1376        }
1377
1378        // Out-of-range query should return empty
1379        let empty = adj.edges_in_range(NodeId::new(0), NodeId::new(200), NodeId::new(300));
1380        assert!(empty.is_empty());
1381    }
1382
1383    #[test]
1384    fn test_skip_index_prunes_chunks() {
1385        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1386
1387        // Add edges in distinct ranges to create separate cold chunks
1388        // Range A: destinations 100-107, Range B: 200-207, Range C: 300-307
1389        for i in 0..8 {
1390            adj.add_edge(NodeId::new(0), NodeId::new(100 + i), EdgeId::new(i));
1391        }
1392        adj.compact();
1393
1394        for i in 0..8 {
1395            adj.add_edge(NodeId::new(0), NodeId::new(200 + i), EdgeId::new(100 + i));
1396        }
1397        adj.compact();
1398
1399        for i in 0..8 {
1400            adj.add_edge(NodeId::new(0), NodeId::new(300 + i), EdgeId::new(200 + i));
1401        }
1402        adj.compact();
1403        adj.freeze_all();
1404
1405        // contains_edge for each range
1406        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(103)));
1407        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(205)));
1408        assert!(adj.contains_edge(NodeId::new(0), NodeId::new(307)));
1409
1410        // Values between ranges should not exist
1411        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(150)));
1412        assert!(!adj.contains_edge(NodeId::new(0), NodeId::new(250)));
1413
1414        // Range query spanning only one chunk range
1415        let range_b = adj.edges_in_range(NodeId::new(0), NodeId::new(200), NodeId::new(207));
1416        assert_eq!(range_b.len(), 8);
1417    }
1418
1419    #[test]
1420    fn test_contains_after_freeze_all() {
1421        let adj = ChunkedAdjacency::with_chunk_capacity(8);
1422
1423        for i in 0..50 {
1424            adj.add_edge(NodeId::new(0), NodeId::new(i + 1), EdgeId::new(i));
1425        }
1426
1427        adj.compact();
1428        adj.freeze_all();
1429
1430        // Verify all edges are in cold storage
1431        let stats = adj.memory_stats();
1432        assert_eq!(stats.hot_entries, 0);
1433        assert_eq!(stats.cold_entries, 50);
1434
1435        // All edges should still be findable
1436        for i in 0..50 {
1437            assert!(
1438                adj.contains_edge(NodeId::new(0), NodeId::new(i + 1)),
1439                "Should find destination {} after freeze_all",
1440                i + 1
1441            );
1442        }
1443    }
1444}