Skip to main content

uni_store/storage/
adjacency_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Unified adjacency manager orchestrating Main CSR, L0-csr overlay, and Shadow CSR.
5//!
6//! Implements a dual-CSR architecture where:
7//! - **Main CSR**: packed adjacency for all alive edges (one per edge_type + direction)
8//! - **L0-csr overlay**: concurrent insert/delete buffer that survives data flush
9//! - **Shadow CSR**: tracks deleted edges with version ranges for time-travel queries
10//!
11//! Regular queries read Main CSR + overlay with zero version filtering.
12//! Snapshot queries additionally filter by version and resurrect shadow entries.
13
14use crate::storage::adjacency_overlay::{FrozenCsrSegment, L0CsrSegment};
15use crate::storage::csr::MainCsr;
16use crate::storage::direction::Direction;
17use crate::storage::manager::StorageManager;
18use crate::storage::shadow_csr::{ShadowCsr, ShadowEdge};
19use dashmap::DashMap;
20use parking_lot::RwLock;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use uni_common::core::id::{Eid, Vid};
25
26/// Unified adjacency manager for the dual-CSR architecture.
27///
28/// Orchestrates Main CSR (packed alive edges), L0-csr overlay
29/// (in-memory mutations), and Shadow CSR (deleted edges for time-travel).
30/// Data flush never invalidates or rebuilds the CSR.
31pub struct AdjacencyManager {
32    /// Main CSR per `(edge_type, direction)` — all alive edges.
33    /// Edge type is u32 with bit 31 = 0 for schema'd, 1 for schemaless.
34    main_csr: DashMap<(u32, Direction), Arc<MainCsr>>,
35
36    /// Active L0-csr segment (current writes go here).
37    active_overlay: Arc<RwLock<L0CsrSegment>>,
38
39    /// Frozen segments awaiting compaction (oldest first).
40    frozen_segments: RwLock<Vec<Arc<FrozenCsrSegment>>>,
41
42    /// Shadow CSR for time-travel deleted edge tracking.
43    shadow: ShadowCsr,
44
45    /// Current approximate memory usage in bytes.
46    current_bytes: AtomicUsize,
47
48    /// Maximum memory budget in bytes.
49    max_bytes: usize,
50
51    /// Coalescing locks for warm() operations — prevents cache stampede.
52    /// Key: (edge_type_id, Direction), Value: Mutex guard for that warm operation.
53    warm_guards: DashMap<(u32, Direction), Arc<tokio::sync::Mutex<()>>>,
54}
55
56impl AdjacencyManager {
57    /// Creates a new adjacency manager with the given memory budget.
58    pub fn new(max_bytes: usize) -> Self {
59        Self {
60            main_csr: DashMap::new(),
61            active_overlay: Arc::new(RwLock::new(L0CsrSegment::new())),
62            frozen_segments: RwLock::new(Vec::new()),
63            shadow: ShadowCsr::new(),
64            current_bytes: AtomicUsize::new(0),
65            max_bytes,
66            warm_guards: DashMap::new(),
67        }
68    }
69
70    /// Returns neighbors for the current state (hot path, no version filtering).
71    ///
72    /// Reads Main CSR + frozen segments + active overlay, minus tombstones.
73    /// Tombstones from any layer remove edges from all lower layers.
74    pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
75        let mut result: HashMap<Eid, Vid> = HashMap::new();
76
77        for &dir in direction.expand() {
78            // 1. Main CSR
79            if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
80                for entry in csr.get_entries(vid) {
81                    result.insert(entry.eid, entry.neighbor_vid);
82                }
83            }
84
85            // 2. Frozen segments (oldest first) — add inserts, then remove tombstones
86            for segment in self.frozen_segments.read().iter() {
87                if let Some(adj) = segment.inserts.get(&(edge_type, dir))
88                    && let Some(neighbors) = adj.get(&vid)
89                {
90                    for &(neighbor, eid, _version) in neighbors {
91                        result.insert(eid, neighbor);
92                    }
93                }
94                // Apply tombstones against ALL prior results (Main CSR + older segments)
95                result.retain(|eid, _| !segment.tombstones.contains_key(eid));
96            }
97
98            // 3. Active overlay — add inserts, then remove tombstones
99            let active = self.active_overlay.read();
100            if let Some(adj) = active.inserts.get(&(edge_type, dir))
101                && let Some(neighbors) = adj.get(&vid)
102            {
103                for &(neighbor, eid, _version) in neighbors {
104                    result.insert(eid, neighbor);
105                }
106            }
107            // Apply active overlay tombstones against ALL prior results
108            result.retain(|eid, _| !active.tombstones.contains_key(eid));
109        }
110
111        result.into_iter().map(|(e, n)| (n, e)).collect()
112    }
113
114    /// Returns neighbors visible at a specific snapshot version.
115    ///
116    /// Filters Main CSR entries by `created_version`, applies frozen/active
117    /// overlay with version filtering, and resurrects Shadow CSR entries
118    /// that were alive at the given version.
119    pub fn get_neighbors_at_version(
120        &self,
121        vid: Vid,
122        edge_type: u32,
123        direction: Direction,
124        version: u64,
125    ) -> Vec<(Vid, Eid)> {
126        let mut result: HashMap<Eid, Vid> = HashMap::new();
127
128        for &dir in direction.expand() {
129            // 1. Main CSR — filter by created_version
130            if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
131                for entry in csr.get_entries(vid) {
132                    if entry.created_version <= version {
133                        result.insert(entry.eid, entry.neighbor_vid);
134                    }
135                }
136            }
137
138            // 2. Frozen segments — filter inserts by version, apply tombstones
139            for segment in self.frozen_segments.read().iter() {
140                if let Some(adj) = segment.inserts.get(&(edge_type, dir))
141                    && let Some(neighbors) = adj.get(&vid)
142                {
143                    for &(neighbor, eid, ver) in neighbors {
144                        if ver <= version {
145                            result.insert(eid, neighbor);
146                        }
147                    }
148                }
149                result.retain(|eid, _| {
150                    segment
151                        .tombstones
152                        .get(eid)
153                        .is_none_or(|ts| ts.version > version)
154                });
155            }
156
157            // 3. Active overlay — add version-filtered inserts, then apply tombstones
158            let active = self.active_overlay.read();
159            if let Some(adj) = active.inserts.get(&(edge_type, dir))
160                && let Some(neighbors) = adj.get(&vid)
161            {
162                for &(neighbor, eid, ver) in neighbors {
163                    let not_tombstoned = active
164                        .tombstones
165                        .get(&eid)
166                        .is_none_or(|ts| ts.version > version);
167                    if ver <= version && not_tombstoned {
168                        result.insert(eid, neighbor);
169                    }
170                }
171            }
172            // Apply active overlay tombstones against ALL prior results
173            result.retain(|eid, _| {
174                active
175                    .tombstones
176                    .get(eid)
177                    .is_none_or(|ts| ts.version > version)
178            });
179
180            // 4. Shadow CSR — resurrect edges alive at version
181            for (neighbor, eid) in self
182                .shadow
183                .get_entries_at_version(vid, edge_type, dir, version)
184            {
185                result.insert(eid, neighbor);
186            }
187        }
188
189        result.into_iter().map(|(e, n)| (n, e)).collect()
190    }
191
192    /// Records an edge insertion into the L0-csr overlay (both directions).
193    pub fn insert_edge(&self, src: Vid, dst: Vid, eid: Eid, edge_type: u32, version: u64) {
194        let active = self.active_overlay.read();
195        active.insert_edge(src, dst, eid, edge_type, version, Direction::Outgoing);
196        active.insert_edge(dst, src, eid, edge_type, version, Direction::Incoming);
197    }
198
199    /// Records a tombstone for a deleted edge in the L0-csr overlay.
200    pub fn add_tombstone(&self, eid: Eid, src: Vid, dst: Vid, edge_type: u32, version: u64) {
201        let active = self.active_overlay.read();
202        active.add_tombstone(eid, src, dst, edge_type, version);
203    }
204
205    /// Sets the Main CSR for a specific edge type and direction.
206    ///
207    /// Used by `warm()` to install a freshly built CSR from storage.
208    pub fn set_main_csr(&self, edge_type: u32, direction: Direction, csr: MainCsr) {
209        let size = csr.memory_usage();
210        self.main_csr.insert((edge_type, direction), Arc::new(csr));
211        self.current_bytes.fetch_add(size, Ordering::Relaxed);
212    }
213
214    /// Checks whether a Main CSR exists for the given edge type and direction.
215    pub fn has_csr(&self, edge_type: u32, direction: Direction) -> bool {
216        self.main_csr.contains_key(&(edge_type, direction))
217    }
218
219    /// Checks whether this manager has been activated for the given edge type.
220    ///
221    /// Returns `true` if a Main CSR exists or the overlay has entries for
222    /// this edge type and direction.
223    pub fn is_active_for(&self, edge_type: u32, direction: Direction) -> bool {
224        let active = self.active_overlay.read();
225        direction.expand().iter().any(|&d| {
226            self.main_csr.contains_key(&(edge_type, d)) || active.has_entries_for(edge_type, d)
227        })
228    }
229
230    /// Returns the number of frozen segments awaiting compaction.
231    pub fn frozen_segment_count(&self) -> usize {
232        self.frozen_segments.read().len()
233    }
234
235    /// Returns whether compaction should be triggered based on segment count.
236    pub fn should_compact(&self, threshold: usize) -> bool {
237        self.frozen_segment_count() >= threshold
238    }
239
240    /// Compacts frozen overlay segments into the Main CSR.
241    ///
242    /// Freezes the active overlay, merges all frozen segments with the
243    /// existing Main CSR, moves tombstoned edges to Shadow CSR, and
244    /// atomically swaps in the new Main CSR.
245    ///
246    /// CRITICAL: Frozen segments remain readable until the new CSR is installed,
247    /// eliminating the visibility gap where edges would be invisible.
248    pub fn compact(&self) {
249        // Step 1: Freeze active overlay and push to frozen list
250        let frozen = {
251            let mut active = self.active_overlay.write();
252            let old = std::mem::take(&mut *active);
253            Arc::new(old.freeze())
254        };
255        self.frozen_segments.write().push(frozen);
256
257        // Step 2: CLONE frozen segments for building (DON'T drain yet)
258        // This ensures they remain readable during CSR construction
259        let segments = self.frozen_segments.read().clone();
260
261        // Step 3: Collect all (edge_type, direction) keys from segments + existing CSRs
262        let mut all_keys: HashSet<(u32, Direction)> = HashSet::new();
263        for segment in &segments {
264            for key in segment.inserts.keys() {
265                all_keys.insert(*key);
266            }
267        }
268        for entry in self.main_csr.iter() {
269            all_keys.insert(*entry.key());
270        }
271
272        // Step 4: For each key, merge
273        for (edge_type, direction) in all_keys {
274            let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
275            let mut max_offset: u64 = 0;
276
277            // Collect all tombstone EIDs
278            let mut tombstoned_eids: HashSet<Eid> = HashSet::new();
279            for segment in &segments {
280                for (eid, ts) in &segment.tombstones {
281                    if ts.edge_type == edge_type {
282                        tombstoned_eids.insert(*eid);
283
284                        // Move to shadow CSR
285                        self.shadow.add_deleted_edge(
286                            ts.src_vid,
287                            ShadowEdge {
288                                neighbor_vid: ts.dst_vid,
289                                eid: *eid,
290                                edge_type,
291                                created_version: 0, // unknown; overlay tombstones don't track creation version
292                                deleted_version: ts.version,
293                            },
294                            direction,
295                        );
296                    }
297                }
298            }
299
300            // Add entries from old Main CSR
301            if let Some(old_csr) = self.main_csr.get(&(edge_type, direction)) {
302                for vid_offset in 0..old_csr.num_vertices() {
303                    let vid = Vid::new(vid_offset as u64);
304                    for entry in old_csr.get_entries(vid) {
305                        if !tombstoned_eids.contains(&entry.eid) {
306                            entries.push((
307                                vid_offset as u64,
308                                entry.neighbor_vid,
309                                entry.eid,
310                                entry.created_version,
311                            ));
312                            max_offset = max_offset.max(vid_offset as u64);
313                        }
314                    }
315                }
316            }
317
318            // Overlay frozen segments (oldest first)
319            for segment in &segments {
320                if let Some(adj) = segment.inserts.get(&(edge_type, direction)) {
321                    for (vid, neighbors) in adj {
322                        for &(neighbor, eid, version) in neighbors {
323                            if !tombstoned_eids.contains(&eid) {
324                                let offset = vid.as_u64();
325                                entries.push((offset, neighbor, eid, version));
326                                max_offset = max_offset.max(offset);
327                            }
328                        }
329                    }
330                }
331            }
332
333            // Deduplicate by Eid — keep entry with highest version for each Eid
334            {
335                use std::collections::hash_map::Entry;
336
337                let mut best: HashMap<Eid, usize> = HashMap::new();
338                for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
339                    match best.entry(*eid) {
340                        Entry::Vacant(e) => {
341                            e.insert(idx);
342                        }
343                        Entry::Occupied(mut e) => {
344                            if *ver > entries[*e.get()].3 {
345                                e.insert(idx);
346                            }
347                        }
348                    }
349                }
350                let keep: HashSet<usize> = best.into_values().collect();
351                let mut idx = 0;
352                entries.retain(|_| {
353                    let k = keep.contains(&idx);
354                    idx += 1;
355                    k
356                });
357            }
358
359            // Build new Main CSR and install
360            let new_csr = MainCsr::from_edge_entries(max_offset as usize, entries);
361            let size = new_csr.memory_usage();
362
363            // Remove old size, add new
364            if let Some(old) = self.main_csr.get(&(edge_type, direction)) {
365                self.current_bytes
366                    .fetch_sub(old.memory_usage(), Ordering::Relaxed);
367            }
368
369            self.main_csr
370                .insert((edge_type, direction), Arc::new(new_csr));
371            self.current_bytes.fetch_add(size, Ordering::Relaxed);
372        }
373
374        // Step 5: ONLY NOW clear frozen segments and reset active overlay
375        // New CSR contains all their data, so they're safe to discard
376        self.frozen_segments.write().clear();
377    }
378
379    /// Warms the Main CSR from storage (L2 adjacency + L1 delta) for a specific edge type and direction.
380    ///
381    /// Reads L2 adjacency datasets and L1 delta entries from Lance,
382    /// builds a [`MainCsr`] with version metadata, and populates the
383    /// [`ShadowCsr`] with L1 tombstones. Called once at startup or
384    /// lazily on first access per edge type.
385    pub async fn warm(
386        &self,
387        storage: &StorageManager,
388        edge_type_id: u32,
389        direction: Direction,
390        version: Option<u64>,
391    ) -> anyhow::Result<()> {
392        let schema = storage.schema_manager().schema();
393
394        // Use unified lookup to support both schema'd and schemaless edge types
395        let edge_type_name = schema
396            .edge_type_name_by_id_unified(edge_type_id)
397            .ok_or_else(|| anyhow::anyhow!("Edge type {} not found", edge_type_id))?;
398
399        // Determine which labels to load adjacency for based on edge type metadata
400        let labels_to_load: Vec<String> = {
401            let edge_meta = schema.edge_types.get(&edge_type_name);
402            match (direction, edge_meta) {
403                (Direction::Outgoing, Some(meta)) => meta.src_labels.clone(),
404                (Direction::Incoming, Some(meta)) => meta.dst_labels.clone(),
405                (Direction::Both, Some(meta)) => {
406                    let mut labels = meta.src_labels.clone();
407                    labels.extend(meta.dst_labels.iter().cloned());
408                    labels.sort();
409                    labels.dedup();
410                    labels
411                }
412                _ => Vec::new(),
413            }
414        };
415
416        use arrow_array::{ListArray, UInt8Array, UInt64Array};
417        use futures::TryStreamExt;
418        use lancedb::query::{ExecutableQuery, QueryBase};
419
420        let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
421        let mut deleted_eids = HashSet::new();
422
423        for &read_dir in direction.expand() {
424            let dir_str = read_dir.as_str();
425            for label_name in &labels_to_load {
426                // 1. Read L2 (Adjacency Dataset)
427                let adj_ds = storage.adjacency_dataset(&edge_type_name, label_name, dir_str);
428                let lancedb_store = storage.lancedb_store();
429
430                if let Ok(adj_ds) = adj_ds
431                    && let Ok(table) = adj_ds.open_lancedb(lancedb_store).await
432                {
433                    let mut query = table.query();
434                    if let Some(hwm) = version {
435                        query = query.only_if(format!("_version <= {}", hwm));
436                    }
437
438                    if let Ok(stream) = query.execute().await {
439                        let batches: Vec<arrow_array::RecordBatch> =
440                            stream.try_collect().await.unwrap_or_default();
441
442                        for batch in batches {
443                            let src_col = batch
444                                .column_by_name("src_vid")
445                                .unwrap()
446                                .as_any()
447                                .downcast_ref::<UInt64Array>()
448                                .unwrap();
449                            let neighbors_list = batch
450                                .column_by_name("neighbors")
451                                .unwrap()
452                                .as_any()
453                                .downcast_ref::<ListArray>()
454                                .unwrap();
455                            let eids_list = batch
456                                .column_by_name("edge_ids")
457                                .unwrap()
458                                .as_any()
459                                .downcast_ref::<ListArray>()
460                                .unwrap();
461
462                            for i in 0..batch.num_rows() {
463                                let src_offset = src_col.value(i);
464                                let neighbors_array_ref = neighbors_list.value(i);
465                                let neighbors = neighbors_array_ref
466                                    .as_any()
467                                    .downcast_ref::<UInt64Array>()
468                                    .unwrap();
469                                let eids_array_ref = eids_list.value(i);
470                                let eids = eids_array_ref
471                                    .as_any()
472                                    .downcast_ref::<UInt64Array>()
473                                    .unwrap();
474
475                                for j in 0..neighbors.len() {
476                                    // L2 adjacency rows don't carry per-edge _version.
477                                    // Version 0 means "from base storage" — the `_version <= hwm` filter on
478                                    // the query already ensures we only load rows within the snapshot window.
479                                    // At query time, get_neighbors_at_version() uses created_version to filter,
480                                    // so version=0 edges are always visible (which is correct for compacted L2 data).
481                                    entries.push((
482                                        src_offset,
483                                        Vid::from(neighbors.value(j)),
484                                        Eid::from(eids.value(j)),
485                                        0,
486                                    ));
487                                }
488                            }
489                        }
490                    }
491                }
492            }
493
494            // 2. Read L1 (Delta)
495            let delta_ds = storage.delta_dataset(&edge_type_name, dir_str)?;
496            let lancedb_store = storage.lancedb_store();
497
498            if let Ok(table) = delta_ds.open_lancedb(lancedb_store).await {
499                let mut query = table.query();
500                if let Some(hwm) = version {
501                    query = query.only_if(format!("_version <= {}", hwm));
502                }
503
504                if let Ok(stream) = query.execute().await {
505                    let batches: Vec<arrow_array::RecordBatch> =
506                        stream.try_collect().await.unwrap_or_default();
507
508                    for batch in batches {
509                        let src_col = batch
510                            .column_by_name("src_vid")
511                            .unwrap()
512                            .as_any()
513                            .downcast_ref::<UInt64Array>()
514                            .unwrap();
515                        let dst_col = batch
516                            .column_by_name("dst_vid")
517                            .unwrap()
518                            .as_any()
519                            .downcast_ref::<UInt64Array>()
520                            .unwrap();
521                        let eid_col = batch
522                            .column_by_name("eid")
523                            .unwrap()
524                            .as_any()
525                            .downcast_ref::<UInt64Array>()
526                            .unwrap();
527                        let op_col = batch
528                            .column_by_name("op")
529                            .unwrap()
530                            .as_any()
531                            .downcast_ref::<UInt8Array>()
532                            .unwrap();
533
534                        // Optionally read _version column
535                        let version_col = batch
536                            .column_by_name("_version")
537                            .and_then(|c| c.as_any().downcast_ref::<UInt64Array>().cloned());
538
539                        for i in 0..batch.num_rows() {
540                            let src_vid = Vid::from(src_col.value(i));
541                            let dst_vid = Vid::from(dst_col.value(i));
542                            let eid = Eid::from(eid_col.value(i));
543                            let op = op_col.value(i); // 0=Insert, 1=Delete
544                            let row_version = version_col.as_ref().map_or(0, |vc| vc.value(i));
545
546                            // For incoming edges, the CSR key is dst (the vertex
547                            // receiving the edge) and the neighbor is src.
548                            let is_incoming = read_dir == Direction::Incoming;
549                            let (key_vid, neighbor_vid) = if is_incoming {
550                                (dst_vid, src_vid)
551                            } else {
552                                (src_vid, dst_vid)
553                            };
554
555                            if op == 0 {
556                                entries.push((key_vid.as_u64(), neighbor_vid, eid, row_version));
557                            } else {
558                                deleted_eids.insert(eid);
559                                self.shadow.add_deleted_edge(
560                                    key_vid,
561                                    ShadowEdge {
562                                        neighbor_vid,
563                                        eid,
564                                        edge_type: edge_type_id,
565                                        created_version: 0,
566                                        deleted_version: row_version,
567                                    },
568                                    read_dir,
569                                );
570                            }
571                        }
572                    }
573                }
574            }
575        }
576
577        // Filter out deleted edges
578        if !deleted_eids.is_empty() {
579            entries.retain(|(_, _, eid, _)| !deleted_eids.contains(eid));
580        }
581
582        // Deduplicate by Eid — keep entry with highest version for each Eid
583        // Multiple versions of the same edge can exist in L2+L1 or across L1 runs
584        {
585            use std::collections::hash_map::Entry;
586            use std::collections::{HashMap, HashSet};
587
588            let mut best: HashMap<Eid, usize> = HashMap::new();
589            for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
590                match best.entry(*eid) {
591                    Entry::Vacant(e) => {
592                        e.insert(idx);
593                    }
594                    Entry::Occupied(mut e) => {
595                        if *ver > entries[*e.get()].3 {
596                            e.insert(idx);
597                        }
598                    }
599                }
600            }
601            let keep: HashSet<usize> = best.into_values().collect();
602            let mut idx = 0;
603            entries.retain(|_| {
604                let k = keep.contains(&idx);
605                idx += 1;
606                k
607            });
608        }
609
610        // Build MainCsr
611        let max_offset = entries.iter().map(|(o, _, _, _)| *o).max().unwrap_or(0);
612        let csr = MainCsr::from_edge_entries(max_offset as usize, entries);
613        self.set_main_csr(edge_type_id, direction, csr);
614
615        Ok(())
616    }
617
618    /// Coalesced warm() operation to prevent cache stampede (Issue #13).
619    ///
620    /// Uses double-checked locking: fast-path checks if CSR already loaded,
621    /// then acquires per-(edge_type, direction) lock to ensure only one concurrent
622    /// warm() per adjacency key. Other readers wait for the first warm() to complete.
623    pub async fn warm_coalesced(
624        &self,
625        storage: &StorageManager,
626        edge_type_id: u32,
627        direction: Direction,
628        version: Option<u64>,
629    ) -> anyhow::Result<()> {
630        // Fast path: already loaded
631        if self.has_csr(edge_type_id, direction) {
632            return Ok(());
633        }
634
635        // Coalesce: only one concurrent warm per (type, dir)
636        let guard = self
637            .warm_guards
638            .entry((edge_type_id, direction))
639            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
640            .value()
641            .clone();
642        let _lock = guard.lock().await;
643
644        // Double-check after acquiring lock
645        if self.has_csr(edge_type_id, direction) {
646            return Ok(());
647        }
648
649        self.warm(storage, edge_type_id, direction, version).await
650    }
651
652    /// Returns the current approximate memory usage in bytes.
653    pub fn memory_usage(&self) -> usize {
654        self.current_bytes.load(Ordering::Relaxed)
655    }
656
657    /// Returns the maximum memory budget in bytes.
658    pub fn max_bytes(&self) -> usize {
659        self.max_bytes
660    }
661
662    /// Provides access to the shadow CSR for time-travel queries.
663    pub fn shadow(&self) -> &ShadowCsr {
664        &self.shadow
665    }
666}
667
668impl std::fmt::Debug for AdjacencyManager {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        f.debug_struct("AdjacencyManager")
671            .field("main_csr_count", &self.main_csr.len())
672            .field("frozen_segments", &self.frozen_segments.read().len())
673            .field("current_bytes", &self.current_bytes.load(Ordering::Relaxed))
674            .field("max_bytes", &self.max_bytes)
675            .finish()
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682
683    #[test]
684    fn test_insert_and_get_neighbors() {
685        let am = AdjacencyManager::new(1024 * 1024);
686        let src = Vid::new(1);
687        let dst = Vid::new(2);
688        let eid = Eid::new(100);
689
690        am.insert_edge(src, dst, eid, 1, 1);
691
692        let neighbors = am.get_neighbors(src, 1, Direction::Outgoing);
693        assert_eq!(neighbors.len(), 1);
694        assert_eq!(neighbors[0], (dst, eid));
695
696        // Incoming direction
697        let incoming = am.get_neighbors(dst, 1, Direction::Incoming);
698        assert_eq!(incoming.len(), 1);
699        assert_eq!(incoming[0], (src, eid));
700    }
701
702    #[test]
703    fn test_main_csr_lookup() {
704        let am = AdjacencyManager::new(1024 * 1024);
705
706        let csr = MainCsr::from_edge_entries(
707            1,
708            vec![
709                (0, Vid::new(10), Eid::new(100), 1),
710                (1, Vid::new(20), Eid::new(101), 2),
711            ],
712        );
713        am.set_main_csr(1, Direction::Outgoing, csr);
714
715        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
716        assert_eq!(n.len(), 1);
717        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
718    }
719
720    #[test]
721    fn test_overlay_on_top_of_main_csr() {
722        let am = AdjacencyManager::new(1024 * 1024);
723
724        // Main CSR has one edge
725        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
726        am.set_main_csr(1, Direction::Outgoing, csr);
727
728        // Overlay adds another
729        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
730
731        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
732        assert_eq!(n.len(), 2);
733
734        let eids: HashSet<Eid> = n.iter().map(|(_, e)| *e).collect();
735        assert!(eids.contains(&Eid::new(100)));
736        assert!(eids.contains(&Eid::new(101)));
737    }
738
739    #[test]
740    fn test_tombstone_removes_edge() {
741        let am = AdjacencyManager::new(1024 * 1024);
742
743        am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
744        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
745
746        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
747        assert!(n.is_empty());
748    }
749
750    #[test]
751    fn test_version_filtered_query() {
752        let am = AdjacencyManager::new(1024 * 1024);
753
754        // Main CSR with two edges at different versions
755        let csr = MainCsr::from_edge_entries(
756            0,
757            vec![
758                (0, Vid::new(10), Eid::new(100), 1),
759                (0, Vid::new(20), Eid::new(101), 5),
760            ],
761        );
762        am.set_main_csr(1, Direction::Outgoing, csr);
763
764        // At version 3: only first edge visible
765        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
766        assert_eq!(n.len(), 1);
767        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
768
769        // At version 5: both visible
770        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
771        assert_eq!(n.len(), 2);
772    }
773
774    #[test]
775    fn test_shadow_csr_resurrects_deleted_edges() {
776        let am = AdjacencyManager::new(1024 * 1024);
777
778        // Add a deleted edge to shadow: created at v1, deleted at v5
779        am.shadow().add_deleted_edge(
780            Vid::new(0),
781            ShadowEdge {
782                neighbor_vid: Vid::new(10),
783                eid: Eid::new(100),
784                edge_type: 1,
785                created_version: 1,
786                deleted_version: 5,
787            },
788            Direction::Outgoing,
789        );
790
791        // At version 3: shadow edge should be visible
792        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
793        assert_eq!(n.len(), 1);
794        assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
795
796        // At version 5: deleted, not visible
797        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
798        assert!(n.is_empty());
799    }
800
801    #[test]
802    fn test_compact_merges_into_main_csr() {
803        let am = AdjacencyManager::new(1024 * 1024);
804
805        // Insert edges into overlay
806        am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
807        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
808
809        // Compact: overlay → Main CSR
810        am.compact();
811
812        // Frozen segments should be empty after compaction
813        assert_eq!(am.frozen_segment_count(), 0);
814
815        // Edges should still be accessible via Main CSR
816        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
817        assert_eq!(n.len(), 2);
818
819        assert!(am.has_csr(1, Direction::Outgoing));
820    }
821
822    #[test]
823    fn test_compact_removes_tombstoned_edges() {
824        let am = AdjacencyManager::new(1024 * 1024);
825
826        // Set up Main CSR with one edge
827        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
828        am.set_main_csr(1, Direction::Outgoing, csr);
829
830        // Add new edge + tombstone for old edge in overlay
831        am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
832        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 3);
833
834        am.compact();
835
836        // Only the new edge should remain
837        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
838        assert_eq!(n.len(), 1);
839        assert_eq!(n[0], (Vid::new(20), Eid::new(101)));
840    }
841
842    #[test]
843    fn test_should_compact() {
844        let am = AdjacencyManager::new(1024 * 1024);
845        assert!(!am.should_compact(4));
846
847        // Manually freeze the active overlay multiple times
848        for _ in 0..4 {
849            let frozen = {
850                let mut active = am.active_overlay.write();
851                let old = std::mem::take(&mut *active);
852                Arc::new(old.freeze())
853            };
854            am.frozen_segments.write().push(frozen);
855        }
856
857        assert!(am.should_compact(4));
858    }
859
860    #[test]
861    fn test_empty_manager() {
862        let am = AdjacencyManager::new(1024 * 1024);
863        assert!(
864            am.get_neighbors(Vid::new(0), 1, Direction::Outgoing)
865                .is_empty()
866        );
867        assert!(!am.has_csr(1, Direction::Outgoing));
868    }
869
870    #[test]
871    fn test_overlay_tombstone_removes_main_csr_edge() {
872        // Simulates: insert edge → flush/compact into Main CSR → delete edge (tombstone in overlay)
873        let am = AdjacencyManager::new(1024 * 1024);
874
875        // Edge already compacted into Main CSR
876        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
877        am.set_main_csr(1, Direction::Outgoing, csr);
878
879        // Verify edge is visible before deletion
880        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
881        assert_eq!(n.len(), 1);
882
883        // Delete via overlay tombstone (simulates Writer::delete_edge dual-write)
884        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
885
886        // Tombstone in overlay must remove edge from Main CSR results
887        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
888        assert!(
889            n.is_empty(),
890            "Edge should be removed by overlay tombstone, got {:?}",
891            n
892        );
893    }
894
895    #[test]
896    fn test_overlay_tombstone_removes_main_csr_edge_versioned() {
897        // Same scenario but via get_neighbors_at_version
898        let am = AdjacencyManager::new(1024 * 1024);
899
900        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
901        am.set_main_csr(1, Direction::Outgoing, csr);
902
903        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 5);
904
905        // At version 3: edge created at v1, tombstone at v5 → visible
906        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
907        assert_eq!(n.len(), 1);
908
909        // At version 5: tombstone applies → not visible
910        let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
911        assert!(
912            n.is_empty(),
913            "Edge should be removed by overlay tombstone at version 5"
914        );
915    }
916
917    #[test]
918    fn test_frozen_tombstone_removes_main_csr_edge() {
919        // Edge in Main CSR, tombstone in a frozen segment
920        let am = AdjacencyManager::new(1024 * 1024);
921
922        let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
923        am.set_main_csr(1, Direction::Outgoing, csr);
924
925        // Add tombstone to active overlay, then compact to freeze it
926        am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
927
928        // Freeze the overlay manually
929        {
930            let mut active = am.active_overlay.write();
931            let old = std::mem::take(&mut *active);
932            let frozen = std::sync::Arc::new(old.freeze());
933            am.frozen_segments.write().push(frozen);
934        }
935
936        // The frozen segment's tombstone should remove the Main CSR edge
937        let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
938        assert!(n.is_empty(), "Frozen tombstone should remove Main CSR edge");
939    }
940
941    #[test]
942    fn test_per_edge_version_filtering() {
943        // Test that edges inserted at different versions are correctly filtered
944        // by get_neighbors_at_version()
945        let am = AdjacencyManager::new(1024 * 1024);
946
947        let src = Vid::new(0);
948        let dst_a = Vid::new(10);
949        let dst_b = Vid::new(20);
950        let eid_a = Eid::new(100);
951        let eid_b = Eid::new(200);
952        let etype = 1;
953
954        // Insert edge A at version 3
955        am.insert_edge(src, dst_a, eid_a, etype, 3);
956
957        // Insert edge B at version 7
958        am.insert_edge(src, dst_b, eid_b, etype, 7);
959
960        // Query at version 2 → neither edge visible
961        let neighbors_v2 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 2);
962        assert!(
963            neighbors_v2.is_empty(),
964            "No edges should be visible at version 2"
965        );
966
967        // Query at version 5 → only edge A visible
968        let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
969        assert_eq!(
970            neighbors_v5.len(),
971            1,
972            "Only edge A should be visible at version 5"
973        );
974        assert_eq!(neighbors_v5[0].0, dst_a, "Edge A destination should match");
975        assert_eq!(neighbors_v5[0].1, eid_a, "Edge A ID should match");
976
977        // Query at version 7 → both edges visible
978        let neighbors_v7 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 7);
979        assert_eq!(
980            neighbors_v7.len(),
981            2,
982            "Both edges should be visible at version 7"
983        );
984
985        // Query at version 10 → both edges visible
986        let neighbors_v10 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 10);
987        assert_eq!(
988            neighbors_v10.len(),
989            2,
990            "Both edges should be visible at version 10"
991        );
992    }
993
994    #[test]
995    fn test_duplicate_edges_deduplicated_by_eid() {
996        // Test Issue #41: Same Eid in MainCsr (v1) and overlay (v3) → only 1 result from get_neighbors
997        let am = AdjacencyManager::new(1024 * 1024);
998
999        let src = Vid::new(0);
1000        let dst = Vid::new(10);
1001        let eid = Eid::new(100);
1002        let etype = 1;
1003
1004        // Set up Main CSR with edge at version 1
1005        let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1006        am.set_main_csr(etype, Direction::Outgoing, csr);
1007
1008        // Insert same Eid into overlay at version 3 (update scenario)
1009        am.insert_edge(src, dst, eid, etype, 3);
1010
1011        // get_neighbors should return only 1 edge (HashMap<Eid, Vid> deduplicates)
1012        let neighbors = am.get_neighbors(src, etype, Direction::Outgoing);
1013        assert_eq!(
1014            neighbors.len(),
1015            1,
1016            "Duplicate Eid should result in single entry"
1017        );
1018        assert_eq!(neighbors[0], (dst, eid));
1019    }
1020
1021    #[test]
1022    fn test_compact_deduplicates_edges_keeps_highest_version() {
1023        // Test Issue #41: Same Eid at v1 in CSR and v5 in overlay
1024        // After compact: get_neighbors_at_version(v5) → visible
1025        //               get_neighbors_at_version(v1) → NOT visible (compaction kept v5)
1026        let am = AdjacencyManager::new(1024 * 1024);
1027
1028        let src = Vid::new(0);
1029        let dst = Vid::new(10);
1030        let eid = Eid::new(100);
1031        let etype = 1;
1032
1033        // Set up Main CSR with edge at version 1
1034        let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1035        am.set_main_csr(etype, Direction::Outgoing, csr);
1036
1037        // Insert same Eid into overlay at version 5 (newer version)
1038        am.insert_edge(src, dst, eid, etype, 5);
1039
1040        // Before compact: both versions exist in different layers
1041        // After compact: only highest version (v5) should remain
1042
1043        am.compact();
1044
1045        // At version 5: edge should be visible (highest version kept)
1046        let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
1047        assert_eq!(neighbors_v5.len(), 1, "Edge should be visible at version 5");
1048        assert_eq!(neighbors_v5[0], (dst, eid));
1049
1050        // At version 4: edge should still be visible (v5 edge has created_version=5)
1051        // Actually, the edge at v5 replaces v1, so the edge has version 5
1052        // So at version 4, we should NOT see it
1053        let neighbors_v4 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 4);
1054        assert_eq!(
1055            neighbors_v4.len(),
1056            0,
1057            "After compaction, only version 5 exists; version 4 should not see it"
1058        );
1059
1060        // At version 1: edge should NOT be visible (old version discarded)
1061        let neighbors_v1 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 1);
1062        assert_eq!(
1063            neighbors_v1.len(),
1064            0,
1065            "Old version discarded during compaction deduplication"
1066        );
1067
1068        // At version 6: edge should be visible (v5 edge still exists)
1069        let neighbors_v6 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 6);
1070        assert_eq!(neighbors_v6.len(), 1, "Edge should be visible at version 6");
1071    }
1072
1073    /// Test that tombstone filtering is O(result_size), not O(tombstone_count).
1074    /// This verifies fix for issue #140 (inverted tombstone scan).
1075    #[test]
1076    fn test_tombstone_scan_performance() {
1077        let am = AdjacencyManager::new(1024 * 1024);
1078        let vertex_a = Vid::new(1);
1079        let vertex_b = Vid::new(2);
1080        let etype = 1;
1081
1082        // Create 5 edges from vertex_a
1083        let mut a_edges = Vec::new();
1084        for i in 0..5 {
1085            let dst = Vid::new(100 + i);
1086            let eid = Eid::new(1000 + i);
1087            am.insert_edge(vertex_a, dst, eid, etype, 1);
1088            a_edges.push((dst, eid));
1089        }
1090
1091        // Create 100 deleted edges from vertex_b (creates 100 tombstones)
1092        for i in 0..100 {
1093            let dst = Vid::new(200 + i);
1094            let eid = Eid::new(2000 + i);
1095            am.insert_edge(vertex_b, dst, eid, etype, 1);
1096            am.add_tombstone(eid, vertex_b, dst, etype, 2);
1097        }
1098
1099        // Query neighbors of vertex_a
1100        // With O(T) scan, this would iterate 100 tombstones
1101        // With O(result) scan, this only checks 5 edges against tombstone map
1102        let neighbors = am.get_neighbors(vertex_a, etype, Direction::Outgoing);
1103
1104        // Verify all 5 edges are returned correctly
1105        assert_eq!(
1106            neighbors.len(),
1107            5,
1108            "Should return all 5 edges from vertex_a"
1109        );
1110        for (dst, eid) in &a_edges {
1111            assert!(
1112                neighbors.contains(&(*dst, *eid)),
1113                "Edge {:?} should be in results",
1114                (dst, eid)
1115            );
1116        }
1117
1118        // Verify vertex_b has no neighbors (all tombstoned)
1119        let b_neighbors = am.get_neighbors(vertex_b, etype, Direction::Outgoing);
1120        assert_eq!(
1121            b_neighbors.len(),
1122            0,
1123            "Vertex B should have no neighbors (all deleted)"
1124        );
1125    }
1126}