Skip to main content

uni_store/storage/
adjacency_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Unified adjacency manager orchestrating Main CSR, L0-csr overlay, and Shadow CSR.
5//!
6//! Implements a dual-CSR architecture where:
7//! - **Main CSR**: packed adjacency for all alive edges (one per edge_type + direction)
8//! - **L0-csr overlay**: concurrent insert/delete buffer that survives data flush
9//! - **Shadow CSR**: tracks deleted edges with version ranges for time-travel queries
10//!
11//! Regular queries read Main CSR + overlay with zero version filtering.
12//! Snapshot queries additionally filter by version and resurrect shadow entries.
13
14use crate::storage::adjacency_overlay::{FrozenCsrSegment, L0CsrSegment};
15use crate::storage::csr::MainCsr;
16use crate::storage::direction::Direction;
17use crate::storage::manager::StorageManager;
18use crate::storage::shadow_csr::{ShadowCsr, ShadowEdge};
19use dashmap::DashMap;
20use parking_lot::RwLock;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use uni_common::core::id::{Eid, Vid};
25
26/// Unified adjacency manager for the dual-CSR architecture.
27///
28/// Orchestrates Main CSR (packed alive edges), L0-csr overlay
29/// (in-memory mutations), and Shadow CSR (deleted edges for time-travel).
30/// Data flush never invalidates or rebuilds the CSR.
31pub struct AdjacencyManager {
32    /// Main CSR per `(edge_type, direction)` — all alive edges.
33    /// Edge type is u32 with bit 31 = 0 for schema'd, 1 for schemaless.
34    main_csr: DashMap<(u32, Direction), Arc<MainCsr>>,
35
36    /// Active L0-csr segment (current writes go here).
37    active_overlay: Arc<RwLock<L0CsrSegment>>,
38
39    /// Frozen segments awaiting compaction (oldest first).
40    frozen_segments: RwLock<Vec<Arc<FrozenCsrSegment>>>,
41
42    /// Shadow CSR for time-travel deleted edge tracking.
43    shadow: ShadowCsr,
44
45    /// Current approximate memory usage in bytes.
46    current_bytes: AtomicUsize,
47
48    /// Maximum memory budget in bytes.
49    max_bytes: usize,
50
51    /// Coalescing locks for warm() operations — prevents cache stampede.
52    /// Key: (edge_type_id, Direction), Value: Mutex guard for that warm operation.
53    warm_guards: DashMap<(u32, Direction), Arc<tokio::sync::Mutex<()>>>,
54}
55
56impl AdjacencyManager {
57    /// Creates a new adjacency manager with the given memory budget.
58    pub fn new(max_bytes: usize) -> Self {
59        Self {
60            main_csr: DashMap::new(),
61            active_overlay: Arc::new(RwLock::new(L0CsrSegment::new())),
62            frozen_segments: RwLock::new(Vec::new()),
63            shadow: ShadowCsr::new(),
64            current_bytes: AtomicUsize::new(0),
65            max_bytes,
66            warm_guards: DashMap::new(),
67        }
68    }
69
70    /// Returns neighbors for the current state (hot path, no version filtering).
71    ///
72    /// Reads Main CSR + frozen segments + active overlay, minus tombstones.
73    /// Tombstones from any layer remove edges from all lower layers.
74    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
75        let mut result: HashMap<Eid, Vid> = HashMap::new();
76
77        for &dir in direction.expand() {
78            // 1. Main CSR
79            if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
80                for entry in csr.get_entries(vid) {
81                    result.insert(entry.eid, entry.neighbor_vid);
82                }
83            }
84
85            // 2. Frozen segments (oldest first) — add inserts, then remove tombstones
86            for segment in self.frozen_segments.read().iter() {
87                if let Some(adj) = segment.inserts.get(&(edge_type, dir))
88                    && let Some(neighbors) = adj.get(&vid)
89                {
90                    for &(neighbor, eid, _version) in neighbors {
91                        result.insert(eid, neighbor);
92                    }
93                }
94                // Apply tombstones against ALL prior results (Main CSR + older segments)
95                result.retain(|eid, _| !segment.tombstones.contains_key(eid));
96            }
97
98            // 3. Active overlay — add inserts, then remove tombstones
99            let active = self.active_overlay.read();
100            if let Some(adj) = active.inserts.get(&(edge_type, dir))
101                && let Some(neighbors) = adj.get(&vid)
102            {
103                for &(neighbor, eid, _version) in neighbors {
104                    result.insert(eid, neighbor);
105                }
106            }
107            // Apply active overlay tombstones against ALL prior results
108            result.retain(|eid, _| !active.tombstones.contains_key(eid));
109        }
110
111        result.into_iter().map(|(e, n)| (n, e)).collect()
112    }
113
114    /// Returns neighbors visible at a specific snapshot version.
115    ///
116    /// Filters Main CSR entries by `created_version`, applies frozen/active
117    /// overlay with version filtering, and resurrects Shadow CSR entries
118    /// that were alive at the given version.
119    pub fn get_neighbors_at_version(
120        &self,
121        vid: Vid,
122        edge_type: u32,
123        direction: Direction,
124        version: u64,
125    ) -> Vec<(Vid, Eid)> {
126        let mut result: HashMap<Eid, Vid> = HashMap::new();
127
128        for &dir in direction.expand() {
129            // 1. Main CSR — filter by created_version
130            if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
131                for entry in csr.get_entries(vid) {
132                    if entry.created_version <= version {
133                        result.insert(entry.eid, entry.neighbor_vid);
134                    }
135                }
136            }
137
138            // 2. Frozen segments — filter inserts by version, apply tombstones
139            for segment in self.frozen_segments.read().iter() {
140                if let Some(adj) = segment.inserts.get(&(edge_type, dir))
141                    && let Some(neighbors) = adj.get(&vid)
142                {
143                    for &(neighbor, eid, ver) in neighbors {
144                        if ver <= version {
145                            result.insert(eid, neighbor);
146                        }
147                    }
148                }
149                result.retain(|eid, _| {
150                    segment
151                        .tombstones
152                        .get(eid)
153                        .is_none_or(|ts| ts.version > version)
154                });
155            }
156
157            // 3. Active overlay — add version-filtered inserts, then apply tombstones
158            let active = self.active_overlay.read();
159            if let Some(adj) = active.inserts.get(&(edge_type, dir))
160                && let Some(neighbors) = adj.get(&vid)
161            {
162                for &(neighbor, eid, ver) in neighbors {
163                    let not_tombstoned = active
164                        .tombstones
165                        .get(&eid)
166                        .is_none_or(|ts| ts.version > version);
167                    if ver <= version && not_tombstoned {
168                        result.insert(eid, neighbor);
169                    }
170                }
171            }
172            // Apply active overlay tombstones against ALL prior results
173            result.retain(|eid, _| {
174                active
175                    .tombstones
176                    .get(eid)
177                    .is_none_or(|ts| ts.version > version)
178            });
179
180            // 4. Shadow CSR — resurrect edges alive at version
181            for (neighbor, eid) in self
182                .shadow
183                .get_entries_at_version(vid, edge_type, dir, version)
184            {
185                result.insert(eid, neighbor);
186            }
187        }
188
189        result.into_iter().map(|(e, n)| (n, e)).collect()
190    }
191
192    /// Records an edge insertion into the L0-csr overlay (both directions).
193    pub fn insert_edge(&self, src: Vid, dst: Vid, eid: Eid, edge_type: u32, version: u64) {
194        let active = self.active_overlay.read();
195        active.insert_edge(src, dst, eid, edge_type, version, Direction::Outgoing);
196        active.insert_edge(dst, src, eid, edge_type, version, Direction::Incoming);
197    }
198
199    /// Records a tombstone for a deleted edge in the L0-csr overlay.
200    pub fn add_tombstone(&self, eid: Eid, src: Vid, dst: Vid, edge_type: u32, version: u64) {
201        let active = self.active_overlay.read();
202        active.add_tombstone(eid, src, dst, edge_type, version);
203    }
204
205    /// Sets the Main CSR for a specific edge type and direction.
206    ///
207    /// Used by `warm()` to install a freshly built CSR from storage.
208    pub fn set_main_csr(&self, edge_type: u32, direction: Direction, csr: MainCsr) {
209        let size = csr.memory_usage();
210        self.main_csr.insert((edge_type, direction), Arc::new(csr));
211        self.current_bytes.fetch_add(size, Ordering::Relaxed);
212    }
213
214    /// Checks whether a Main CSR exists for the given edge type and direction.
215    pub fn has_csr(&self, edge_type: u32, direction: Direction) -> bool {
216        self.main_csr.contains_key(&(edge_type, direction))
217    }
218
219    /// Checks whether this manager has been activated for the given edge type.
220    ///
221    /// Returns `true` if a Main CSR exists or the overlay has entries for
222    /// this edge type and direction.
223    pub fn is_active_for(&self, edge_type: u32, direction: Direction) -> bool {
224        let active = self.active_overlay.read();
225        direction.expand().iter().any(|&d| {
226            self.main_csr.contains_key(&(edge_type, d)) || active.has_entries_for(edge_type, d)
227        })
228    }
229
230    /// Returns the number of frozen segments awaiting compaction.
231    pub fn frozen_segment_count(&self) -> usize {
232        self.frozen_segments.read().len()
233    }
234
235    /// Returns whether compaction should be triggered based on segment count.
236    pub fn should_compact(&self, threshold: usize) -> bool {
237        self.frozen_segment_count() >= threshold
238    }
239
240    /// Compacts frozen overlay segments into the Main CSR.
241    ///
242    /// Freezes the active overlay, merges all frozen segments with the
243    /// existing Main CSR, moves tombstoned edges to Shadow CSR, and
244    /// atomically swaps in the new Main CSR.
245    ///
246    /// CRITICAL: Frozen segments remain readable until the new CSR is installed,
247    /// eliminating the visibility gap where edges would be invisible.
248    pub fn compact(&self) {
249        // Step 1: Freeze active overlay and push to frozen list
250        let frozen = {
251            let mut active = self.active_overlay.write();
252            let old = std::mem::take(&mut *active);
253            Arc::new(old.freeze())
254        };
255        self.frozen_segments.write().push(frozen);
256
257        // Step 2: CLONE frozen segments for building (DON'T drain yet)
258        // This ensures they remain readable during CSR construction
259        let segments = self.frozen_segments.read().clone();
260
261        // Step 3: Collect all (edge_type, direction) keys from segments + existing CSRs
262        let mut all_keys: HashSet<(u32, Direction)> = HashSet::new();
263        for segment in &segments {
264            for key in segment.inserts.keys() {
265                all_keys.insert(*key);
266            }
267        }
268        for entry in self.main_csr.iter() {
269            all_keys.insert(*entry.key());
270        }
271
272        // Step 4: For each key, merge
273        for (edge_type, direction) in all_keys {
274            let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
275            let mut max_offset: u64 = 0;
276
277            // Collect all tombstone EIDs
278            let mut tombstoned_eids: HashSet<Eid> = HashSet::new();
279            for segment in &segments {
280                for (eid, ts) in &segment.tombstones {
281                    if ts.edge_type == edge_type {
282                        tombstoned_eids.insert(*eid);
283
284                        // Move to shadow CSR
285                        self.shadow.add_deleted_edge(
286                            ts.src_vid,
287                            ShadowEdge {
288                                neighbor_vid: ts.dst_vid,
289                                eid: *eid,
290                                edge_type,
291                                created_version: 0, // unknown; overlay tombstones don't track creation version
292                                deleted_version: ts.version,
293                            },
294                            direction,
295                        );
296                    }
297                }
298            }
299
300            // Add entries from old Main CSR
301            if let Some(old_csr) = self.main_csr.get(&(edge_type, direction)) {
302                for vid_offset in 0..old_csr.num_vertices() {
303                    let vid = Vid::new(vid_offset as u64);
304                    for entry in old_csr.get_entries(vid) {
305                        if !tombstoned_eids.contains(&entry.eid) {
306                            entries.push((
307                                vid_offset as u64,
308                                entry.neighbor_vid,
309                                entry.eid,
310                                entry.created_version,
311                            ));
312                            max_offset = max_offset.max(vid_offset as u64);
313                        }
314                    }
315                }
316            }
317
318            // Overlay frozen segments (oldest first)
319            for segment in &segments {
320                if let Some(adj) = segment.inserts.get(&(edge_type, direction)) {
321                    for (vid, neighbors) in adj {
322                        for &(neighbor, eid, version) in neighbors {
323                            if !tombstoned_eids.contains(&eid) {
324                                let offset = vid.as_u64();
325                                entries.push((offset, neighbor, eid, version));
326                                max_offset = max_offset.max(offset);
327                            }
328                        }
329                    }
330                }
331            }
332
333            // Deduplicate by Eid — keep entry with highest version for each Eid
334            {
335                use std::collections::hash_map::Entry;
336
337                let mut best: HashMap<Eid, usize> = HashMap::new();
338                for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
339                    match best.entry(*eid) {
340                        Entry::Vacant(e) => {
341                            e.insert(idx);
342                        }
343                        Entry::Occupied(mut e) => {
344                            if *ver > entries[*e.get()].3 {
345                                e.insert(idx);
346                            }
347                        }
348                    }
349                }
350                let keep: HashSet<usize> = best.into_values().collect();
351                let mut idx = 0;
352                entries.retain(|_| {
353                    let k = keep.contains(&idx);
354                    idx += 1;
355                    k
356                });
357            }
358
359            // Build new Main CSR and install
360            let new_csr = MainCsr::from_edge_entries(max_offset as usize, entries);
361            let size = new_csr.memory_usage();
362
363            // Remove old size, add new
364            if let Some(old) = self.main_csr.get(&(edge_type, direction)) {
365                self.current_bytes
366                    .fetch_sub(old.memory_usage(), Ordering::Relaxed);
367            }
368
369            self.main_csr
370                .insert((edge_type, direction), Arc::new(new_csr));
371            self.current_bytes.fetch_add(size, Ordering::Relaxed);
372        }
373
374        // Step 5: ONLY NOW clear frozen segments and reset active overlay
375        // New CSR contains all their data, so they're safe to discard
376        self.frozen_segments.write().clear();
377    }
378
379    /// Warms the Main CSR from storage (L2 adjacency + L1 delta) for a specific edge type and direction.
380    ///
381    /// Reads L2 adjacency datasets and L1 delta entries from Lance,
382    /// builds a [`MainCsr`] with version metadata, and populates the
383    /// [`ShadowCsr`] with L1 tombstones. Called once at startup or
384    /// lazily on first access per edge type.
385    pub async fn warm(
386        &self,
387        storage: &StorageManager,
388        edge_type_id: u32,
389        direction: Direction,
390        version: Option<u64>,
391    ) -> anyhow::Result<()> {
392        let schema = storage.schema_manager().schema();
393
394        // Use unified lookup to support both schema'd and schemaless edge types
395        let edge_type_name = schema
396            .edge_type_name_by_id_unified(edge_type_id)
397            .ok_or_else(|| anyhow::anyhow!("Edge type {} not found", edge_type_id))?;
398
399        // Determine which labels to load adjacency for based on edge type metadata
400        let labels_to_load: Vec<String> = {
401            let edge_meta = schema.edge_types.get(&edge_type_name);
402            match (direction, edge_meta) {
403                (Direction::Outgoing, Some(meta)) => meta.src_labels.clone(),
404                (Direction::Incoming, Some(meta)) => meta.dst_labels.clone(),
405                (Direction::Both, Some(meta)) => {
406                    let mut labels = meta.src_labels.clone();
407                    labels.extend(meta.dst_labels.iter().cloned());
408                    labels.sort();
409                    labels.dedup();
410                    labels
411                }
412                _ => Vec::new(),
413            }
414        };
415
416        use arrow_array::{ListArray, UInt8Array, UInt64Array};
417
418        let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
419        let mut deleted_eids = HashSet::new();
420
421        for &read_dir in direction.expand() {
422            let dir_str = read_dir.as_str();
423            for label_name in &labels_to_load {
424                // 1. Read L2 (Adjacency Dataset)
425                let adj_ds = storage.adjacency_dataset(&edge_type_name, label_name, dir_str);
426                let backend = storage.backend();
427
428                if let Ok(adj_ds) = adj_ds {
429                    let adj_table_name = adj_ds.table_name();
430                    let adj_exists = backend.table_exists(&adj_table_name).await.unwrap_or(false);
431
432                    if adj_exists {
433                        let mut request = crate::backend::types::ScanRequest::all(&adj_table_name);
434                        if let Some(hwm) = version {
435                            request = request.with_filter(format!("_version <= {}", hwm));
436                        }
437
438                        let batches: Vec<arrow_array::RecordBatch> =
439                            backend.scan(request).await.unwrap_or_default();
440
441                        for batch in batches {
442                            let src_col = batch
443                                .column_by_name("src_vid")
444                                .unwrap()
445                                .as_any()
446                                .downcast_ref::<UInt64Array>()
447                                .unwrap();
448                            let neighbors_list = batch
449                                .column_by_name("neighbors")
450                                .unwrap()
451                                .as_any()
452                                .downcast_ref::<ListArray>()
453                                .unwrap();
454                            let eids_list = batch
455                                .column_by_name("edge_ids")
456                                .unwrap()
457                                .as_any()
458                                .downcast_ref::<ListArray>()
459                                .unwrap();
460
461                            for i in 0..batch.num_rows() {
462                                let src_offset = src_col.value(i);
463                                let neighbors_array_ref = neighbors_list.value(i);
464                                let neighbors = neighbors_array_ref
465                                    .as_any()
466                                    .downcast_ref::<UInt64Array>()
467                                    .unwrap();
468                                let eids_array_ref = eids_list.value(i);
469                                let eids = eids_array_ref
470                                    .as_any()
471                                    .downcast_ref::<UInt64Array>()
472                                    .unwrap();
473
474                                for j in 0..neighbors.len() {
475                                    // L2 adjacency rows don't carry per-edge _version.
476                                    // Version 0 means "from base storage" — the `_version <= hwm` filter on
477                                    // the query already ensures we only load rows within the snapshot window.
478                                    // At query time, get_neighbors_at_version() uses created_version to filter,
479                                    // so version=0 edges are always visible (which is correct for compacted L2 data).
480                                    entries.push((
481                                        src_offset,
482                                        Vid::from(neighbors.value(j)),
483                                        Eid::from(eids.value(j)),
484                                        0,
485                                    ));
486                                }
487                            }
488                        }
489                    }
490                }
491            }
492
493            // 2. Read L1 (Delta)
494            let delta_ds = storage.delta_dataset(&edge_type_name, dir_str)?;
495            let backend = storage.backend();
496            let delta_table_name = delta_ds.table_name();
497
498            if backend
499                .table_exists(&delta_table_name)
500                .await
501                .unwrap_or(false)
502            {
503                let mut request = crate::backend::types::ScanRequest::all(&delta_table_name);
504                if let Some(hwm) = version {
505                    request = request.with_filter(format!("_version <= {}", hwm));
506                }
507
508                if let Ok(batches) = backend.scan(request).await {
509                    for batch in batches {
510                        let src_col = batch
511                            .column_by_name("src_vid")
512                            .unwrap()
513                            .as_any()
514                            .downcast_ref::<UInt64Array>()
515                            .unwrap();
516                        let dst_col = batch
517                            .column_by_name("dst_vid")
518                            .unwrap()
519                            .as_any()
520                            .downcast_ref::<UInt64Array>()
521                            .unwrap();
522                        let eid_col = batch
523                            .column_by_name("eid")
524                            .unwrap()
525                            .as_any()
526                            .downcast_ref::<UInt64Array>()
527                            .unwrap();
528                        let op_col = batch
529                            .column_by_name("op")
530                            .unwrap()
531                            .as_any()
532                            .downcast_ref::<UInt8Array>()
533                            .unwrap();
534
535                        // Optionally read _version column
536                        let version_col = batch
537                            .column_by_name("_version")
538                            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>().cloned());
539
540                        for i in 0..batch.num_rows() {
541                            let src_vid = Vid::from(src_col.value(i));
542                            let dst_vid = Vid::from(dst_col.value(i));
543                            let eid = Eid::from(eid_col.value(i));
544                            let op = op_col.value(i); // 0=Insert, 1=Delete
545                            let row_version = version_col.as_ref().map_or(0, |vc| vc.value(i));
546
547                            // For incoming edges, the CSR key is dst (the vertex
548                            // receiving the edge) and the neighbor is src.
549                            let is_incoming = read_dir == Direction::Incoming;
550                            let (key_vid, neighbor_vid) = if is_incoming {
551                                (dst_vid, src_vid)
552                            } else {
553                                (src_vid, dst_vid)
554                            };
555
556                            if op == 0 {
557                                entries.push((key_vid.as_u64(), neighbor_vid, eid, row_version));
558                            } else {
559                                deleted_eids.insert(eid);
560                                self.shadow.add_deleted_edge(
561                                    key_vid,
562                                    ShadowEdge {
563                                        neighbor_vid,
564                                        eid,
565                                        edge_type: edge_type_id,
566                                        created_version: 0,
567                                        deleted_version: row_version,
568                                    },
569                                    read_dir,
570                                );
571                            }
572                        }
573                    }
574                }
575            }
576        }
577
578        // Filter out deleted edges
579        if !deleted_eids.is_empty() {
580            entries.retain(|(_, _, eid, _)| !deleted_eids.contains(eid));
581        }
582
583        // Deduplicate by Eid — keep entry with highest version for each Eid
584        // Multiple versions of the same edge can exist in L2+L1 or across L1 runs
585        {
586            use std::collections::hash_map::Entry;
587            use std::collections::{HashMap, HashSet};
588
589            let mut best: HashMap<Eid, usize> = HashMap::new();
590            for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
591                match best.entry(*eid) {
592                    Entry::Vacant(e) => {
593                        e.insert(idx);
594                    }
595                    Entry::Occupied(mut e) => {
596                        if *ver > entries[*e.get()].3 {
597                            e.insert(idx);
598                        }
599                    }
600                }
601            }
602            let keep: HashSet<usize> = best.into_values().collect();
603            let mut idx = 0;
604            entries.retain(|_| {
605                let k = keep.contains(&idx);
606                idx += 1;
607                k
608            });
609        }
610
611        // Build MainCsr
612        let max_offset = entries.iter().map(|(o, _, _, _)| *o).max().unwrap_or(0);
613        let csr = MainCsr::from_edge_entries(max_offset as usize, entries);
614        self.set_main_csr(edge_type_id, direction, csr);
615
616        Ok(())
617    }
618
619    /// Coalesced warm() operation to prevent cache stampede (Issue #13).
620    ///
621    /// Uses double-checked locking: fast-path checks if CSR already loaded,
622    /// then acquires per-(edge_type, direction) lock to ensure only one concurrent
623    /// warm() per adjacency key. Other readers wait for the first warm() to complete.
624    pub async fn warm_coalesced(
625        &self,
626        storage: &StorageManager,
627        edge_type_id: u32,
628        direction: Direction,
629        version: Option<u64>,
630    ) -> anyhow::Result<()> {
631        // Fast path: already loaded
632        if self.has_csr(edge_type_id, direction) {
633            return Ok(());
634        }
635
636        // Coalesce: only one concurrent warm per (type, dir)
637        let guard = self
638            .warm_guards
639            .entry((edge_type_id, direction))
640            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
641            .value()
642            .clone();
643        let _lock = guard.lock().await;
644
645        // Double-check after acquiring lock
646        if self.has_csr(edge_type_id, direction) {
647            return Ok(());
648        }
649
650        self.warm(storage, edge_type_id, direction, version).await
651    }
652
653    /// Returns the current approximate memory usage in bytes.
654    pub fn memory_usage(&self) -> usize {
655        self.current_bytes.load(Ordering::Relaxed)
656    }
657
658    /// Returns the maximum memory budget in bytes.
659    pub fn max_bytes(&self) -> usize {
660        self.max_bytes
661    }
662
663    /// Provides access to the shadow CSR for time-travel queries.
664    pub fn shadow(&self) -> &ShadowCsr {
665        &self.shadow
666    }
667}
668
669impl std::fmt::Debug for AdjacencyManager {
670    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671        f.debug_struct("AdjacencyManager")
672            .field("main_csr_count", &self.main_csr.len())
673            .field("frozen_segments", &self.frozen_segments.read().len())
674            .field("current_bytes", &self.current_bytes.load(Ordering::Relaxed))
675            .field("max_bytes", &self.max_bytes)
676            .finish()
677    }
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683
684    #[test]
685    fn test_insert_and_get_neighbors() {
686        let am = AdjacencyManager::new(1024 * 1024);
687        let src = Vid::new(1);
688        let dst = Vid::new(2);
689        let eid = Eid::new(100);
690
691        am.insert_edge(src, dst, eid, 1, 1);
692
693        let neighbors = am.get_neighbors(src, 1, Direction::Outgoing);
694        assert_eq!(neighbors.len(), 1);
695        assert_eq!(neighbors[0], (dst, eid));
696
697        // Incoming direction
698        let incoming = am.get_neighbors(dst, 1, Direction::Incoming);
699        assert_eq!(incoming.len(), 1);
700        assert_eq!(incoming[0], (src, eid));
701    }
702
703    #[test]
704    fn test_main_csr_lookup() {
705        let am = AdjacencyManager::new(1024 * 1024);
706
707        let csr = MainCsr::from_edge_entries(
708            1,
709            vec![
710                (0, Vid::new(10), Eid::new(100), 1),
711                (1, Vid::new(20), Eid::new(101), 2),
712            ],
713        );
714        am.set_main_csr(1, Direction::Outgoing, csr);
715
716        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
717        assert_eq!(n.len(), 1);
718        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
719    }
720
721    #[test]
722    fn test_overlay_on_top_of_main_csr() {
723        let am = AdjacencyManager::new(1024 * 1024);
724
725        // Main CSR has one edge
726        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
727        am.set_main_csr(1, Direction::Outgoing, csr);
728
729        // Overlay adds another
730        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
731
732        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
733        assert_eq!(n.len(), 2);
734
735        let eids: HashSet<Eid> = n.iter().map(|(_, e)| *e).collect();
736        assert!(eids.contains(&Eid::new(100)));
737        assert!(eids.contains(&Eid::new(101)));
738    }
739
740    #[test]
741    fn test_tombstone_removes_edge() {
742        let am = AdjacencyManager::new(1024 * 1024);
743
744        am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
745        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
746
747        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
748        assert!(n.is_empty());
749    }
750
751    #[test]
752    fn test_version_filtered_query() {
753        let am = AdjacencyManager::new(1024 * 1024);
754
755        // Main CSR with two edges at different versions
756        let csr = MainCsr::from_edge_entries(
757            0,
758            vec![
759                (0, Vid::new(10), Eid::new(100), 1),
760                (0, Vid::new(20), Eid::new(101), 5),
761            ],
762        );
763        am.set_main_csr(1, Direction::Outgoing, csr);
764
765        // At version 3: only first edge visible
766        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
767        assert_eq!(n.len(), 1);
768        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
769
770        // At version 5: both visible
771        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
772        assert_eq!(n.len(), 2);
773    }
774
775    #[test]
776    fn test_shadow_csr_resurrects_deleted_edges() {
777        let am = AdjacencyManager::new(1024 * 1024);
778
779        // Add a deleted edge to shadow: created at v1, deleted at v5
780        am.shadow().add_deleted_edge(
781            Vid::new(0),
782            ShadowEdge {
783                neighbor_vid: Vid::new(10),
784                eid: Eid::new(100),
785                edge_type: 1,
786                created_version: 1,
787                deleted_version: 5,
788            },
789            Direction::Outgoing,
790        );
791
792        // At version 3: shadow edge should be visible
793        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
794        assert_eq!(n.len(), 1);
795        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
796
797        // At version 5: deleted, not visible
798        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
799        assert!(n.is_empty());
800    }
801
802    #[test]
803    fn test_compact_merges_into_main_csr() {
804        let am = AdjacencyManager::new(1024 * 1024);
805
806        // Insert edges into overlay
807        am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
808        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
809
810        // Compact: overlay → Main CSR
811        am.compact();
812
813        // Frozen segments should be empty after compaction
814        assert_eq!(am.frozen_segment_count(), 0);
815
816        // Edges should still be accessible via Main CSR
817        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
818        assert_eq!(n.len(), 2);
819
820        assert!(am.has_csr(1, Direction::Outgoing));
821    }
822
823    #[test]
824    fn test_compact_removes_tombstoned_edges() {
825        let am = AdjacencyManager::new(1024 * 1024);
826
827        // Set up Main CSR with one edge
828        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
829        am.set_main_csr(1, Direction::Outgoing, csr);
830
831        // Add new edge + tombstone for old edge in overlay
832        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
833        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 3);
834
835        am.compact();
836
837        // Only the new edge should remain
838        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
839        assert_eq!(n.len(), 1);
840        assert_eq!(n[0], (Vid::new(20), Eid::new(101)));
841    }
842
843    #[test]
844    fn test_should_compact() {
845        let am = AdjacencyManager::new(1024 * 1024);
846        assert!(!am.should_compact(4));
847
848        // Manually freeze the active overlay multiple times
849        for _ in 0..4 {
850            let frozen = {
851                let mut active = am.active_overlay.write();
852                let old = std::mem::take(&mut *active);
853                Arc::new(old.freeze())
854            };
855            am.frozen_segments.write().push(frozen);
856        }
857
858        assert!(am.should_compact(4));
859    }
860
861    #[test]
862    fn test_empty_manager() {
863        let am = AdjacencyManager::new(1024 * 1024);
864        assert!(
865            am.get_neighbors(Vid::new(0), 1, Direction::Outgoing)
866                .is_empty()
867        );
868        assert!(!am.has_csr(1, Direction::Outgoing));
869    }
870
871    #[test]
872    fn test_overlay_tombstone_removes_main_csr_edge() {
873        // Simulates: insert edge → flush/compact into Main CSR → delete edge (tombstone in overlay)
874        let am = AdjacencyManager::new(1024 * 1024);
875
876        // Edge already compacted into Main CSR
877        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
878        am.set_main_csr(1, Direction::Outgoing, csr);
879
880        // Verify edge is visible before deletion
881        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
882        assert_eq!(n.len(), 1);
883
884        // Delete via overlay tombstone (simulates Writer::delete_edge dual-write)
885        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
886
887        // Tombstone in overlay must remove edge from Main CSR results
888        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
889        assert!(
890            n.is_empty(),
891            "Edge should be removed by overlay tombstone, got {:?}",
892            n
893        );
894    }
895
896    #[test]
897    fn test_overlay_tombstone_removes_main_csr_edge_versioned() {
898        // Same scenario but via get_neighbors_at_version
899        let am = AdjacencyManager::new(1024 * 1024);
900
901        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
902        am.set_main_csr(1, Direction::Outgoing, csr);
903
904        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 5);
905
906        // At version 3: edge created at v1, tombstone at v5 → visible
907        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
908        assert_eq!(n.len(), 1);
909
910        // At version 5: tombstone applies → not visible
911        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
912        assert!(
913            n.is_empty(),
914            "Edge should be removed by overlay tombstone at version 5"
915        );
916    }
917
918    #[test]
919    fn test_frozen_tombstone_removes_main_csr_edge() {
920        // Edge in Main CSR, tombstone in a frozen segment
921        let am = AdjacencyManager::new(1024 * 1024);
922
923        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
924        am.set_main_csr(1, Direction::Outgoing, csr);
925
926        // Add tombstone to active overlay, then compact to freeze it
927        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
928
929        // Freeze the overlay manually
930        {
931            let mut active = am.active_overlay.write();
932            let old = std::mem::take(&mut *active);
933            let frozen = std::sync::Arc::new(old.freeze());
934            am.frozen_segments.write().push(frozen);
935        }
936
937        // The frozen segment's tombstone should remove the Main CSR edge
938        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
939        assert!(n.is_empty(), "Frozen tombstone should remove Main CSR edge");
940    }
941
942    #[test]
943    fn test_per_edge_version_filtering() {
944        // Test that edges inserted at different versions are correctly filtered
945        // by get_neighbors_at_version()
946        let am = AdjacencyManager::new(1024 * 1024);
947
948        let src = Vid::new(0);
949        let dst_a = Vid::new(10);
950        let dst_b = Vid::new(20);
951        let eid_a = Eid::new(100);
952        let eid_b = Eid::new(200);
953        let etype = 1;
954
955        // Insert edge A at version 3
956        am.insert_edge(src, dst_a, eid_a, etype, 3);
957
958        // Insert edge B at version 7
959        am.insert_edge(src, dst_b, eid_b, etype, 7);
960
961        // Query at version 2 → neither edge visible
962        let neighbors_v2 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 2);
963        assert!(
964            neighbors_v2.is_empty(),
965            "No edges should be visible at version 2"
966        );
967
968        // Query at version 5 → only edge A visible
969        let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
970        assert_eq!(
971            neighbors_v5.len(),
972            1,
973            "Only edge A should be visible at version 5"
974        );
975        assert_eq!(neighbors_v5[0].0, dst_a, "Edge A destination should match");
976        assert_eq!(neighbors_v5[0].1, eid_a, "Edge A ID should match");
977
978        // Query at version 7 → both edges visible
979        let neighbors_v7 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 7);
980        assert_eq!(
981            neighbors_v7.len(),
982            2,
983            "Both edges should be visible at version 7"
984        );
985
986        // Query at version 10 → both edges visible
987        let neighbors_v10 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 10);
988        assert_eq!(
989            neighbors_v10.len(),
990            2,
991            "Both edges should be visible at version 10"
992        );
993    }
994
995    #[test]
996    fn test_duplicate_edges_deduplicated_by_eid() {
997        // Test Issue #41: Same Eid in MainCsr (v1) and overlay (v3) → only 1 result from get_neighbors
998        let am = AdjacencyManager::new(1024 * 1024);
999
1000        let src = Vid::new(0);
1001        let dst = Vid::new(10);
1002        let eid = Eid::new(100);
1003        let etype = 1;
1004
1005        // Set up Main CSR with edge at version 1
1006        let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1007        am.set_main_csr(etype, Direction::Outgoing, csr);
1008
1009        // Insert same Eid into overlay at version 3 (update scenario)
1010        am.insert_edge(src, dst, eid, etype, 3);
1011
1012        // get_neighbors should return only 1 edge (HashMap<Eid, Vid> deduplicates)
1013        let neighbors = am.get_neighbors(src, etype, Direction::Outgoing);
1014        assert_eq!(
1015            neighbors.len(),
1016            1,
1017            "Duplicate Eid should result in single entry"
1018        );
1019        assert_eq!(neighbors[0], (dst, eid));
1020    }
1021
1022    #[test]
1023    fn test_compact_deduplicates_edges_keeps_highest_version() {
1024        // Test Issue #41: Same Eid at v1 in CSR and v5 in overlay
1025        // After compact: get_neighbors_at_version(v5) → visible
1026        //               get_neighbors_at_version(v1) → NOT visible (compaction kept v5)
1027        let am = AdjacencyManager::new(1024 * 1024);
1028
1029        let src = Vid::new(0);
1030        let dst = Vid::new(10);
1031        let eid = Eid::new(100);
1032        let etype = 1;
1033
1034        // Set up Main CSR with edge at version 1
1035        let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1036        am.set_main_csr(etype, Direction::Outgoing, csr);
1037
1038        // Insert same Eid into overlay at version 5 (newer version)
1039        am.insert_edge(src, dst, eid, etype, 5);
1040
1041        // Before compact: both versions exist in different layers
1042        // After compact: only highest version (v5) should remain
1043
1044        am.compact();
1045
1046        // At version 5: edge should be visible (highest version kept)
1047        let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
1048        assert_eq!(neighbors_v5.len(), 1, "Edge should be visible at version 5");
1049        assert_eq!(neighbors_v5[0], (dst, eid));
1050
1051        // At version 4: edge should still be visible (v5 edge has created_version=5)
1052        // Actually, the edge at v5 replaces v1, so the edge has version 5
1053        // So at version 4, we should NOT see it
1054        let neighbors_v4 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 4);
1055        assert_eq!(
1056            neighbors_v4.len(),
1057            0,
1058            "After compaction, only version 5 exists; version 4 should not see it"
1059        );
1060
1061        // At version 1: edge should NOT be visible (old version discarded)
1062        let neighbors_v1 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 1);
1063        assert_eq!(
1064            neighbors_v1.len(),
1065            0,
1066            "Old version discarded during compaction deduplication"
1067        );
1068
1069        // At version 6: edge should be visible (v5 edge still exists)
1070        let neighbors_v6 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 6);
1071        assert_eq!(neighbors_v6.len(), 1, "Edge should be visible at version 6");
1072    }
1073
1074    /// Test that tombstone filtering is O(result_size), not O(tombstone_count).
1075    /// This verifies fix for issue #140 (inverted tombstone scan).
1076    #[test]
1077    fn test_tombstone_scan_performance() {
1078        let am = AdjacencyManager::new(1024 * 1024);
1079        let vertex_a = Vid::new(1);
1080        let vertex_b = Vid::new(2);
1081        let etype = 1;
1082
1083        // Create 5 edges from vertex_a
1084        let mut a_edges = Vec::new();
1085        for i in 0..5 {
1086            let dst = Vid::new(100 + i);
1087            let eid = Eid::new(1000 + i);
1088            am.insert_edge(vertex_a, dst, eid, etype, 1);
1089            a_edges.push((dst, eid));
1090        }
1091
1092        // Create 100 deleted edges from vertex_b (creates 100 tombstones)
1093        for i in 0..100 {
1094            let dst = Vid::new(200 + i);
1095            let eid = Eid::new(2000 + i);
1096            am.insert_edge(vertex_b, dst, eid, etype, 1);
1097            am.add_tombstone(eid, vertex_b, dst, etype, 2);
1098        }
1099
1100        // Query neighbors of vertex_a
1101        // With O(T) scan, this would iterate 100 tombstones
1102        // With O(result) scan, this only checks 5 edges against tombstone map
1103        let neighbors = am.get_neighbors(vertex_a, etype, Direction::Outgoing);
1104
1105        // Verify all 5 edges are returned correctly
1106        assert_eq!(
1107            neighbors.len(),
1108            5,
1109            "Should return all 5 edges from vertex_a"
1110        );
1111        for (dst, eid) in &a_edges {
1112            assert!(
1113                neighbors.contains(&(*dst, *eid)),
1114                "Edge {:?} should be in results",
1115                (dst, eid)
1116            );
1117        }
1118
1119        // Verify vertex_b has no neighbors (all tombstoned)
1120        let b_neighbors = am.get_neighbors(vertex_b, etype, Direction::Outgoing);
1121        assert_eq!(
1122            b_neighbors.len(),
1123            0,
1124            "Vertex B should have no neighbors (all deleted)"
1125        );
1126    }
1127}