1use crate::storage::adjacency_overlay::{FrozenCsrSegment, L0CsrSegment};
15use crate::storage::csr::MainCsr;
16use crate::storage::direction::Direction;
17use crate::storage::manager::StorageManager;
18use crate::storage::shadow_csr::{ShadowCsr, ShadowEdge};
19use dashmap::DashMap;
20use parking_lot::RwLock;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use uni_common::core::id::{Eid, Vid};
25
26pub struct AdjacencyManager {
32 main_csr: DashMap<(u32, Direction), Arc<MainCsr>>,
35
36 active_overlay: Arc<RwLock<L0CsrSegment>>,
38
39 frozen_segments: RwLock<Vec<Arc<FrozenCsrSegment>>>,
41
42 shadow: ShadowCsr,
44
45 current_bytes: AtomicUsize,
47
48 max_bytes: usize,
50
51 warm_guards: DashMap<(u32, Direction), Arc<tokio::sync::Mutex<()>>>,
54}
55
56impl AdjacencyManager {
57 pub fn new(max_bytes: usize) -> Self {
59 Self {
60 main_csr: DashMap::new(),
61 active_overlay: Arc::new(RwLock::new(L0CsrSegment::new())),
62 frozen_segments: RwLock::new(Vec::new()),
63 shadow: ShadowCsr::new(),
64 current_bytes: AtomicUsize::new(0),
65 max_bytes,
66 warm_guards: DashMap::new(),
67 }
68 }
69
70 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
75 let mut result: HashMap<Eid, Vid> = HashMap::new();
76
77 for &dir in direction.expand() {
78 if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
80 for entry in csr.get_entries(vid) {
81 result.insert(entry.eid, entry.neighbor_vid);
82 }
83 }
84
85 for segment in self.frozen_segments.read().iter() {
87 if let Some(adj) = segment.inserts.get(&(edge_type, dir))
88 && let Some(neighbors) = adj.get(&vid)
89 {
90 for &(neighbor, eid, _version) in neighbors {
91 result.insert(eid, neighbor);
92 }
93 }
94 result.retain(|eid, _| !segment.tombstones.contains_key(eid));
96 }
97
98 let active = self.active_overlay.read();
100 if let Some(adj) = active.inserts.get(&(edge_type, dir))
101 && let Some(neighbors) = adj.get(&vid)
102 {
103 for &(neighbor, eid, _version) in neighbors {
104 result.insert(eid, neighbor);
105 }
106 }
107 result.retain(|eid, _| !active.tombstones.contains_key(eid));
109 }
110
111 result.into_iter().map(|(e, n)| (n, e)).collect()
112 }
113
114 pub fn get_neighbors_at_version(
120 &self,
121 vid: Vid,
122 edge_type: u32,
123 direction: Direction,
124 version: u64,
125 ) -> Vec<(Vid, Eid)> {
126 let mut result: HashMap<Eid, Vid> = HashMap::new();
127
128 for &dir in direction.expand() {
129 if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
131 for entry in csr.get_entries(vid) {
132 if entry.created_version <= version {
133 result.insert(entry.eid, entry.neighbor_vid);
134 }
135 }
136 }
137
138 for segment in self.frozen_segments.read().iter() {
140 if let Some(adj) = segment.inserts.get(&(edge_type, dir))
141 && let Some(neighbors) = adj.get(&vid)
142 {
143 for &(neighbor, eid, ver) in neighbors {
144 if ver <= version {
145 result.insert(eid, neighbor);
146 }
147 }
148 }
149 result.retain(|eid, _| {
150 segment
151 .tombstones
152 .get(eid)
153 .is_none_or(|ts| ts.version > version)
154 });
155 }
156
157 let active = self.active_overlay.read();
159 if let Some(adj) = active.inserts.get(&(edge_type, dir))
160 && let Some(neighbors) = adj.get(&vid)
161 {
162 for &(neighbor, eid, ver) in neighbors {
163 let not_tombstoned = active
164 .tombstones
165 .get(&eid)
166 .is_none_or(|ts| ts.version > version);
167 if ver <= version && not_tombstoned {
168 result.insert(eid, neighbor);
169 }
170 }
171 }
172 result.retain(|eid, _| {
174 active
175 .tombstones
176 .get(eid)
177 .is_none_or(|ts| ts.version > version)
178 });
179
180 for (neighbor, eid) in self
182 .shadow
183 .get_entries_at_version(vid, edge_type, dir, version)
184 {
185 result.insert(eid, neighbor);
186 }
187 }
188
189 result.into_iter().map(|(e, n)| (n, e)).collect()
190 }
191
192 pub fn insert_edge(&self, src: Vid, dst: Vid, eid: Eid, edge_type: u32, version: u64) {
194 let active = self.active_overlay.read();
195 active.insert_edge(src, dst, eid, edge_type, version, Direction::Outgoing);
196 active.insert_edge(dst, src, eid, edge_type, version, Direction::Incoming);
197 }
198
199 pub fn add_tombstone(&self, eid: Eid, src: Vid, dst: Vid, edge_type: u32, version: u64) {
201 let active = self.active_overlay.read();
202 active.add_tombstone(eid, src, dst, edge_type, version);
203 }
204
205 pub fn set_main_csr(&self, edge_type: u32, direction: Direction, csr: MainCsr) {
209 let size = csr.memory_usage();
210 self.main_csr.insert((edge_type, direction), Arc::new(csr));
211 self.current_bytes.fetch_add(size, Ordering::Relaxed);
212 }
213
214 pub fn has_csr(&self, edge_type: u32, direction: Direction) -> bool {
216 self.main_csr.contains_key(&(edge_type, direction))
217 }
218
219 pub fn is_active_for(&self, edge_type: u32, direction: Direction) -> bool {
224 let active = self.active_overlay.read();
225 direction.expand().iter().any(|&d| {
226 self.main_csr.contains_key(&(edge_type, d)) || active.has_entries_for(edge_type, d)
227 })
228 }
229
230 pub fn frozen_segment_count(&self) -> usize {
232 self.frozen_segments.read().len()
233 }
234
235 pub fn should_compact(&self, threshold: usize) -> bool {
237 self.frozen_segment_count() >= threshold
238 }
239
240 pub fn compact(&self) {
249 let frozen = {
251 let mut active = self.active_overlay.write();
252 let old = std::mem::take(&mut *active);
253 Arc::new(old.freeze())
254 };
255 self.frozen_segments.write().push(frozen);
256
257 let segments = self.frozen_segments.read().clone();
260
261 let mut all_keys: HashSet<(u32, Direction)> = HashSet::new();
263 for segment in &segments {
264 for key in segment.inserts.keys() {
265 all_keys.insert(*key);
266 }
267 }
268 for entry in self.main_csr.iter() {
269 all_keys.insert(*entry.key());
270 }
271
272 for (edge_type, direction) in all_keys {
274 let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
275 let mut max_offset: u64 = 0;
276
277 let mut tombstoned_eids: HashSet<Eid> = HashSet::new();
279 for segment in &segments {
280 for (eid, ts) in &segment.tombstones {
281 if ts.edge_type == edge_type {
282 tombstoned_eids.insert(*eid);
283
284 self.shadow.add_deleted_edge(
286 ts.src_vid,
287 ShadowEdge {
288 neighbor_vid: ts.dst_vid,
289 eid: *eid,
290 edge_type,
291 created_version: 0, deleted_version: ts.version,
293 },
294 direction,
295 );
296 }
297 }
298 }
299
300 if let Some(old_csr) = self.main_csr.get(&(edge_type, direction)) {
302 for vid_offset in 0..old_csr.num_vertices() {
303 let vid = Vid::new(vid_offset as u64);
304 for entry in old_csr.get_entries(vid) {
305 if !tombstoned_eids.contains(&entry.eid) {
306 entries.push((
307 vid_offset as u64,
308 entry.neighbor_vid,
309 entry.eid,
310 entry.created_version,
311 ));
312 max_offset = max_offset.max(vid_offset as u64);
313 }
314 }
315 }
316 }
317
318 for segment in &segments {
320 if let Some(adj) = segment.inserts.get(&(edge_type, direction)) {
321 for (vid, neighbors) in adj {
322 for &(neighbor, eid, version) in neighbors {
323 if !tombstoned_eids.contains(&eid) {
324 let offset = vid.as_u64();
325 entries.push((offset, neighbor, eid, version));
326 max_offset = max_offset.max(offset);
327 }
328 }
329 }
330 }
331 }
332
333 {
335 use std::collections::hash_map::Entry;
336
337 let mut best: HashMap<Eid, usize> = HashMap::new();
338 for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
339 match best.entry(*eid) {
340 Entry::Vacant(e) => {
341 e.insert(idx);
342 }
343 Entry::Occupied(mut e) => {
344 if *ver > entries[*e.get()].3 {
345 e.insert(idx);
346 }
347 }
348 }
349 }
350 let keep: HashSet<usize> = best.into_values().collect();
351 let mut idx = 0;
352 entries.retain(|_| {
353 let k = keep.contains(&idx);
354 idx += 1;
355 k
356 });
357 }
358
359 let new_csr = MainCsr::from_edge_entries(max_offset as usize, entries);
361 let size = new_csr.memory_usage();
362
363 if let Some(old) = self.main_csr.get(&(edge_type, direction)) {
365 self.current_bytes
366 .fetch_sub(old.memory_usage(), Ordering::Relaxed);
367 }
368
369 self.main_csr
370 .insert((edge_type, direction), Arc::new(new_csr));
371 self.current_bytes.fetch_add(size, Ordering::Relaxed);
372 }
373
374 self.frozen_segments.write().clear();
377 }
378
379 pub async fn warm(
386 &self,
387 storage: &StorageManager,
388 edge_type_id: u32,
389 direction: Direction,
390 version: Option<u64>,
391 ) -> anyhow::Result<()> {
392 let schema = storage.schema_manager().schema();
393
394 let edge_type_name = schema
396 .edge_type_name_by_id_unified(edge_type_id)
397 .ok_or_else(|| anyhow::anyhow!("Edge type {} not found", edge_type_id))?;
398
399 let labels_to_load: Vec<String> = {
401 let edge_meta = schema.edge_types.get(&edge_type_name);
402 match (direction, edge_meta) {
403 (Direction::Outgoing, Some(meta)) => meta.src_labels.clone(),
404 (Direction::Incoming, Some(meta)) => meta.dst_labels.clone(),
405 (Direction::Both, Some(meta)) => {
406 let mut labels = meta.src_labels.clone();
407 labels.extend(meta.dst_labels.iter().cloned());
408 labels.sort();
409 labels.dedup();
410 labels
411 }
412 _ => Vec::new(),
413 }
414 };
415
416 use arrow_array::{ListArray, UInt8Array, UInt64Array};
417 use futures::TryStreamExt;
418 use lancedb::query::{ExecutableQuery, QueryBase};
419
420 let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
421 let mut deleted_eids = HashSet::new();
422
423 for &read_dir in direction.expand() {
424 let dir_str = read_dir.as_str();
425 for label_name in &labels_to_load {
426 let adj_ds = storage.adjacency_dataset(&edge_type_name, label_name, dir_str);
428 let lancedb_store = storage.lancedb_store();
429
430 if let Ok(adj_ds) = adj_ds
431 && let Ok(table) = adj_ds.open_lancedb(lancedb_store).await
432 {
433 let mut query = table.query();
434 if let Some(hwm) = version {
435 query = query.only_if(format!("_version <= {}", hwm));
436 }
437
438 if let Ok(stream) = query.execute().await {
439 let batches: Vec<arrow_array::RecordBatch> =
440 stream.try_collect().await.unwrap_or_default();
441
442 for batch in batches {
443 let src_col = batch
444 .column_by_name("src_vid")
445 .unwrap()
446 .as_any()
447 .downcast_ref::<UInt64Array>()
448 .unwrap();
449 let neighbors_list = batch
450 .column_by_name("neighbors")
451 .unwrap()
452 .as_any()
453 .downcast_ref::<ListArray>()
454 .unwrap();
455 let eids_list = batch
456 .column_by_name("edge_ids")
457 .unwrap()
458 .as_any()
459 .downcast_ref::<ListArray>()
460 .unwrap();
461
462 for i in 0..batch.num_rows() {
463 let src_offset = src_col.value(i);
464 let neighbors_array_ref = neighbors_list.value(i);
465 let neighbors = neighbors_array_ref
466 .as_any()
467 .downcast_ref::<UInt64Array>()
468 .unwrap();
469 let eids_array_ref = eids_list.value(i);
470 let eids = eids_array_ref
471 .as_any()
472 .downcast_ref::<UInt64Array>()
473 .unwrap();
474
475 for j in 0..neighbors.len() {
476 entries.push((
482 src_offset,
483 Vid::from(neighbors.value(j)),
484 Eid::from(eids.value(j)),
485 0,
486 ));
487 }
488 }
489 }
490 }
491 }
492 }
493
494 let delta_ds = storage.delta_dataset(&edge_type_name, dir_str)?;
496 let lancedb_store = storage.lancedb_store();
497
498 if let Ok(table) = delta_ds.open_lancedb(lancedb_store).await {
499 let mut query = table.query();
500 if let Some(hwm) = version {
501 query = query.only_if(format!("_version <= {}", hwm));
502 }
503
504 if let Ok(stream) = query.execute().await {
505 let batches: Vec<arrow_array::RecordBatch> =
506 stream.try_collect().await.unwrap_or_default();
507
508 for batch in batches {
509 let src_col = batch
510 .column_by_name("src_vid")
511 .unwrap()
512 .as_any()
513 .downcast_ref::<UInt64Array>()
514 .unwrap();
515 let dst_col = batch
516 .column_by_name("dst_vid")
517 .unwrap()
518 .as_any()
519 .downcast_ref::<UInt64Array>()
520 .unwrap();
521 let eid_col = batch
522 .column_by_name("eid")
523 .unwrap()
524 .as_any()
525 .downcast_ref::<UInt64Array>()
526 .unwrap();
527 let op_col = batch
528 .column_by_name("op")
529 .unwrap()
530 .as_any()
531 .downcast_ref::<UInt8Array>()
532 .unwrap();
533
534 let version_col = batch
536 .column_by_name("_version")
537 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>().cloned());
538
539 for i in 0..batch.num_rows() {
540 let src_vid = Vid::from(src_col.value(i));
541 let dst_vid = Vid::from(dst_col.value(i));
542 let eid = Eid::from(eid_col.value(i));
543 let op = op_col.value(i); let row_version = version_col.as_ref().map_or(0, |vc| vc.value(i));
545
546 let is_incoming = read_dir == Direction::Incoming;
549 let (key_vid, neighbor_vid) = if is_incoming {
550 (dst_vid, src_vid)
551 } else {
552 (src_vid, dst_vid)
553 };
554
555 if op == 0 {
556 entries.push((key_vid.as_u64(), neighbor_vid, eid, row_version));
557 } else {
558 deleted_eids.insert(eid);
559 self.shadow.add_deleted_edge(
560 key_vid,
561 ShadowEdge {
562 neighbor_vid,
563 eid,
564 edge_type: edge_type_id,
565 created_version: 0,
566 deleted_version: row_version,
567 },
568 read_dir,
569 );
570 }
571 }
572 }
573 }
574 }
575 }
576
577 if !deleted_eids.is_empty() {
579 entries.retain(|(_, _, eid, _)| !deleted_eids.contains(eid));
580 }
581
582 {
585 use std::collections::hash_map::Entry;
586 use std::collections::{HashMap, HashSet};
587
588 let mut best: HashMap<Eid, usize> = HashMap::new();
589 for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
590 match best.entry(*eid) {
591 Entry::Vacant(e) => {
592 e.insert(idx);
593 }
594 Entry::Occupied(mut e) => {
595 if *ver > entries[*e.get()].3 {
596 e.insert(idx);
597 }
598 }
599 }
600 }
601 let keep: HashSet<usize> = best.into_values().collect();
602 let mut idx = 0;
603 entries.retain(|_| {
604 let k = keep.contains(&idx);
605 idx += 1;
606 k
607 });
608 }
609
610 let max_offset = entries.iter().map(|(o, _, _, _)| *o).max().unwrap_or(0);
612 let csr = MainCsr::from_edge_entries(max_offset as usize, entries);
613 self.set_main_csr(edge_type_id, direction, csr);
614
615 Ok(())
616 }
617
618 pub async fn warm_coalesced(
624 &self,
625 storage: &StorageManager,
626 edge_type_id: u32,
627 direction: Direction,
628 version: Option<u64>,
629 ) -> anyhow::Result<()> {
630 if self.has_csr(edge_type_id, direction) {
632 return Ok(());
633 }
634
635 let guard = self
637 .warm_guards
638 .entry((edge_type_id, direction))
639 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
640 .value()
641 .clone();
642 let _lock = guard.lock().await;
643
644 if self.has_csr(edge_type_id, direction) {
646 return Ok(());
647 }
648
649 self.warm(storage, edge_type_id, direction, version).await
650 }
651
652 pub fn memory_usage(&self) -> usize {
654 self.current_bytes.load(Ordering::Relaxed)
655 }
656
657 pub fn max_bytes(&self) -> usize {
659 self.max_bytes
660 }
661
662 pub fn shadow(&self) -> &ShadowCsr {
664 &self.shadow
665 }
666}
667
668impl std::fmt::Debug for AdjacencyManager {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 f.debug_struct("AdjacencyManager")
671 .field("main_csr_count", &self.main_csr.len())
672 .field("frozen_segments", &self.frozen_segments.read().len())
673 .field("current_bytes", &self.current_bytes.load(Ordering::Relaxed))
674 .field("max_bytes", &self.max_bytes)
675 .finish()
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682
683 #[test]
684 fn test_insert_and_get_neighbors() {
685 let am = AdjacencyManager::new(1024 * 1024);
686 let src = Vid::new(1);
687 let dst = Vid::new(2);
688 let eid = Eid::new(100);
689
690 am.insert_edge(src, dst, eid, 1, 1);
691
692 let neighbors = am.get_neighbors(src, 1, Direction::Outgoing);
693 assert_eq!(neighbors.len(), 1);
694 assert_eq!(neighbors[0], (dst, eid));
695
696 let incoming = am.get_neighbors(dst, 1, Direction::Incoming);
698 assert_eq!(incoming.len(), 1);
699 assert_eq!(incoming[0], (src, eid));
700 }
701
702 #[test]
703 fn test_main_csr_lookup() {
704 let am = AdjacencyManager::new(1024 * 1024);
705
706 let csr = MainCsr::from_edge_entries(
707 1,
708 vec![
709 (0, Vid::new(10), Eid::new(100), 1),
710 (1, Vid::new(20), Eid::new(101), 2),
711 ],
712 );
713 am.set_main_csr(1, Direction::Outgoing, csr);
714
715 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
716 assert_eq!(n.len(), 1);
717 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
718 }
719
720 #[test]
721 fn test_overlay_on_top_of_main_csr() {
722 let am = AdjacencyManager::new(1024 * 1024);
723
724 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
726 am.set_main_csr(1, Direction::Outgoing, csr);
727
728 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
730
731 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
732 assert_eq!(n.len(), 2);
733
734 let eids: HashSet<Eid> = n.iter().map(|(_, e)| *e).collect();
735 assert!(eids.contains(&Eid::new(100)));
736 assert!(eids.contains(&Eid::new(101)));
737 }
738
739 #[test]
740 fn test_tombstone_removes_edge() {
741 let am = AdjacencyManager::new(1024 * 1024);
742
743 am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
744 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
745
746 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
747 assert!(n.is_empty());
748 }
749
750 #[test]
751 fn test_version_filtered_query() {
752 let am = AdjacencyManager::new(1024 * 1024);
753
754 let csr = MainCsr::from_edge_entries(
756 0,
757 vec![
758 (0, Vid::new(10), Eid::new(100), 1),
759 (0, Vid::new(20), Eid::new(101), 5),
760 ],
761 );
762 am.set_main_csr(1, Direction::Outgoing, csr);
763
764 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
766 assert_eq!(n.len(), 1);
767 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
768
769 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
771 assert_eq!(n.len(), 2);
772 }
773
774 #[test]
775 fn test_shadow_csr_resurrects_deleted_edges() {
776 let am = AdjacencyManager::new(1024 * 1024);
777
778 am.shadow().add_deleted_edge(
780 Vid::new(0),
781 ShadowEdge {
782 neighbor_vid: Vid::new(10),
783 eid: Eid::new(100),
784 edge_type: 1,
785 created_version: 1,
786 deleted_version: 5,
787 },
788 Direction::Outgoing,
789 );
790
791 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
793 assert_eq!(n.len(), 1);
794 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
795
796 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
798 assert!(n.is_empty());
799 }
800
801 #[test]
802 fn test_compact_merges_into_main_csr() {
803 let am = AdjacencyManager::new(1024 * 1024);
804
805 am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
807 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
808
809 am.compact();
811
812 assert_eq!(am.frozen_segment_count(), 0);
814
815 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
817 assert_eq!(n.len(), 2);
818
819 assert!(am.has_csr(1, Direction::Outgoing));
820 }
821
822 #[test]
823 fn test_compact_removes_tombstoned_edges() {
824 let am = AdjacencyManager::new(1024 * 1024);
825
826 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
828 am.set_main_csr(1, Direction::Outgoing, csr);
829
830 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
832 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 3);
833
834 am.compact();
835
836 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
838 assert_eq!(n.len(), 1);
839 assert_eq!(n[0], (Vid::new(20), Eid::new(101)));
840 }
841
842 #[test]
843 fn test_should_compact() {
844 let am = AdjacencyManager::new(1024 * 1024);
845 assert!(!am.should_compact(4));
846
847 for _ in 0..4 {
849 let frozen = {
850 let mut active = am.active_overlay.write();
851 let old = std::mem::take(&mut *active);
852 Arc::new(old.freeze())
853 };
854 am.frozen_segments.write().push(frozen);
855 }
856
857 assert!(am.should_compact(4));
858 }
859
860 #[test]
861 fn test_empty_manager() {
862 let am = AdjacencyManager::new(1024 * 1024);
863 assert!(
864 am.get_neighbors(Vid::new(0), 1, Direction::Outgoing)
865 .is_empty()
866 );
867 assert!(!am.has_csr(1, Direction::Outgoing));
868 }
869
870 #[test]
871 fn test_overlay_tombstone_removes_main_csr_edge() {
872 let am = AdjacencyManager::new(1024 * 1024);
874
875 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
877 am.set_main_csr(1, Direction::Outgoing, csr);
878
879 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
881 assert_eq!(n.len(), 1);
882
883 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
885
886 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
888 assert!(
889 n.is_empty(),
890 "Edge should be removed by overlay tombstone, got {:?}",
891 n
892 );
893 }
894
895 #[test]
896 fn test_overlay_tombstone_removes_main_csr_edge_versioned() {
897 let am = AdjacencyManager::new(1024 * 1024);
899
900 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
901 am.set_main_csr(1, Direction::Outgoing, csr);
902
903 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 5);
904
905 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
907 assert_eq!(n.len(), 1);
908
909 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
911 assert!(
912 n.is_empty(),
913 "Edge should be removed by overlay tombstone at version 5"
914 );
915 }
916
917 #[test]
918 fn test_frozen_tombstone_removes_main_csr_edge() {
919 let am = AdjacencyManager::new(1024 * 1024);
921
922 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
923 am.set_main_csr(1, Direction::Outgoing, csr);
924
925 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
927
928 {
930 let mut active = am.active_overlay.write();
931 let old = std::mem::take(&mut *active);
932 let frozen = std::sync::Arc::new(old.freeze());
933 am.frozen_segments.write().push(frozen);
934 }
935
936 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
938 assert!(n.is_empty(), "Frozen tombstone should remove Main CSR edge");
939 }
940
941 #[test]
942 fn test_per_edge_version_filtering() {
943 let am = AdjacencyManager::new(1024 * 1024);
946
947 let src = Vid::new(0);
948 let dst_a = Vid::new(10);
949 let dst_b = Vid::new(20);
950 let eid_a = Eid::new(100);
951 let eid_b = Eid::new(200);
952 let etype = 1;
953
954 am.insert_edge(src, dst_a, eid_a, etype, 3);
956
957 am.insert_edge(src, dst_b, eid_b, etype, 7);
959
960 let neighbors_v2 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 2);
962 assert!(
963 neighbors_v2.is_empty(),
964 "No edges should be visible at version 2"
965 );
966
967 let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
969 assert_eq!(
970 neighbors_v5.len(),
971 1,
972 "Only edge A should be visible at version 5"
973 );
974 assert_eq!(neighbors_v5[0].0, dst_a, "Edge A destination should match");
975 assert_eq!(neighbors_v5[0].1, eid_a, "Edge A ID should match");
976
977 let neighbors_v7 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 7);
979 assert_eq!(
980 neighbors_v7.len(),
981 2,
982 "Both edges should be visible at version 7"
983 );
984
985 let neighbors_v10 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 10);
987 assert_eq!(
988 neighbors_v10.len(),
989 2,
990 "Both edges should be visible at version 10"
991 );
992 }
993
994 #[test]
995 fn test_duplicate_edges_deduplicated_by_eid() {
996 let am = AdjacencyManager::new(1024 * 1024);
998
999 let src = Vid::new(0);
1000 let dst = Vid::new(10);
1001 let eid = Eid::new(100);
1002 let etype = 1;
1003
1004 let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1006 am.set_main_csr(etype, Direction::Outgoing, csr);
1007
1008 am.insert_edge(src, dst, eid, etype, 3);
1010
1011 let neighbors = am.get_neighbors(src, etype, Direction::Outgoing);
1013 assert_eq!(
1014 neighbors.len(),
1015 1,
1016 "Duplicate Eid should result in single entry"
1017 );
1018 assert_eq!(neighbors[0], (dst, eid));
1019 }
1020
1021 #[test]
1022 fn test_compact_deduplicates_edges_keeps_highest_version() {
1023 let am = AdjacencyManager::new(1024 * 1024);
1027
1028 let src = Vid::new(0);
1029 let dst = Vid::new(10);
1030 let eid = Eid::new(100);
1031 let etype = 1;
1032
1033 let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1035 am.set_main_csr(etype, Direction::Outgoing, csr);
1036
1037 am.insert_edge(src, dst, eid, etype, 5);
1039
1040 am.compact();
1044
1045 let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
1047 assert_eq!(neighbors_v5.len(), 1, "Edge should be visible at version 5");
1048 assert_eq!(neighbors_v5[0], (dst, eid));
1049
1050 let neighbors_v4 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 4);
1054 assert_eq!(
1055 neighbors_v4.len(),
1056 0,
1057 "After compaction, only version 5 exists; version 4 should not see it"
1058 );
1059
1060 let neighbors_v1 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 1);
1062 assert_eq!(
1063 neighbors_v1.len(),
1064 0,
1065 "Old version discarded during compaction deduplication"
1066 );
1067
1068 let neighbors_v6 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 6);
1070 assert_eq!(neighbors_v6.len(), 1, "Edge should be visible at version 6");
1071 }
1072
1073 #[test]
1076 fn test_tombstone_scan_performance() {
1077 let am = AdjacencyManager::new(1024 * 1024);
1078 let vertex_a = Vid::new(1);
1079 let vertex_b = Vid::new(2);
1080 let etype = 1;
1081
1082 let mut a_edges = Vec::new();
1084 for i in 0..5 {
1085 let dst = Vid::new(100 + i);
1086 let eid = Eid::new(1000 + i);
1087 am.insert_edge(vertex_a, dst, eid, etype, 1);
1088 a_edges.push((dst, eid));
1089 }
1090
1091 for i in 0..100 {
1093 let dst = Vid::new(200 + i);
1094 let eid = Eid::new(2000 + i);
1095 am.insert_edge(vertex_b, dst, eid, etype, 1);
1096 am.add_tombstone(eid, vertex_b, dst, etype, 2);
1097 }
1098
1099 let neighbors = am.get_neighbors(vertex_a, etype, Direction::Outgoing);
1103
1104 assert_eq!(
1106 neighbors.len(),
1107 5,
1108 "Should return all 5 edges from vertex_a"
1109 );
1110 for (dst, eid) in &a_edges {
1111 assert!(
1112 neighbors.contains(&(*dst, *eid)),
1113 "Edge {:?} should be in results",
1114 (dst, eid)
1115 );
1116 }
1117
1118 let b_neighbors = am.get_neighbors(vertex_b, etype, Direction::Outgoing);
1120 assert_eq!(
1121 b_neighbors.len(),
1122 0,
1123 "Vertex B should have no neighbors (all deleted)"
1124 );
1125 }
1126}