1use crate::storage::adjacency_overlay::{FrozenCsrSegment, L0CsrSegment};
15use crate::storage::csr::MainCsr;
16use crate::storage::direction::Direction;
17use crate::storage::manager::StorageManager;
18use crate::storage::shadow_csr::{ShadowCsr, ShadowEdge};
19use dashmap::DashMap;
20use parking_lot::RwLock;
21use std::collections::{HashMap, HashSet};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use uni_common::core::id::{Eid, Vid};
25
26pub struct AdjacencyManager {
32 main_csr: DashMap<(u32, Direction), Arc<MainCsr>>,
35
36 active_overlay: Arc<RwLock<L0CsrSegment>>,
38
39 frozen_segments: RwLock<Vec<Arc<FrozenCsrSegment>>>,
41
42 shadow: ShadowCsr,
44
45 current_bytes: AtomicUsize,
47
48 max_bytes: usize,
50
51 warm_guards: DashMap<(u32, Direction), Arc<tokio::sync::Mutex<()>>>,
54}
55
56impl AdjacencyManager {
57 pub fn new(max_bytes: usize) -> Self {
59 Self {
60 main_csr: DashMap::new(),
61 active_overlay: Arc::new(RwLock::new(L0CsrSegment::new())),
62 frozen_segments: RwLock::new(Vec::new()),
63 shadow: ShadowCsr::new(),
64 current_bytes: AtomicUsize::new(0),
65 max_bytes,
66 warm_guards: DashMap::new(),
67 }
68 }
69
70 pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
75 let mut result: HashMap<Eid, Vid> = HashMap::new();
76
77 for &dir in direction.expand() {
78 if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
80 for entry in csr.get_entries(vid) {
81 result.insert(entry.eid, entry.neighbor_vid);
82 }
83 }
84
85 for segment in self.frozen_segments.read().iter() {
87 if let Some(adj) = segment.inserts.get(&(edge_type, dir))
88 && let Some(neighbors) = adj.get(&vid)
89 {
90 for &(neighbor, eid, _version) in neighbors {
91 result.insert(eid, neighbor);
92 }
93 }
94 result.retain(|eid, _| !segment.tombstones.contains_key(eid));
96 }
97
98 let active = self.active_overlay.read();
100 if let Some(adj) = active.inserts.get(&(edge_type, dir))
101 && let Some(neighbors) = adj.get(&vid)
102 {
103 for &(neighbor, eid, _version) in neighbors {
104 result.insert(eid, neighbor);
105 }
106 }
107 result.retain(|eid, _| !active.tombstones.contains_key(eid));
109 }
110
111 result.into_iter().map(|(e, n)| (n, e)).collect()
112 }
113
114 pub fn get_neighbors_at_version(
120 &self,
121 vid: Vid,
122 edge_type: u32,
123 direction: Direction,
124 version: u64,
125 ) -> Vec<(Vid, Eid)> {
126 let mut result: HashMap<Eid, Vid> = HashMap::new();
127
128 for &dir in direction.expand() {
129 if let Some(csr) = self.main_csr.get(&(edge_type, dir)) {
131 for entry in csr.get_entries(vid) {
132 if entry.created_version <= version {
133 result.insert(entry.eid, entry.neighbor_vid);
134 }
135 }
136 }
137
138 for segment in self.frozen_segments.read().iter() {
140 if let Some(adj) = segment.inserts.get(&(edge_type, dir))
141 && let Some(neighbors) = adj.get(&vid)
142 {
143 for &(neighbor, eid, ver) in neighbors {
144 if ver <= version {
145 result.insert(eid, neighbor);
146 }
147 }
148 }
149 result.retain(|eid, _| {
150 segment
151 .tombstones
152 .get(eid)
153 .is_none_or(|ts| ts.version > version)
154 });
155 }
156
157 let active = self.active_overlay.read();
159 if let Some(adj) = active.inserts.get(&(edge_type, dir))
160 && let Some(neighbors) = adj.get(&vid)
161 {
162 for &(neighbor, eid, ver) in neighbors {
163 let not_tombstoned = active
164 .tombstones
165 .get(&eid)
166 .is_none_or(|ts| ts.version > version);
167 if ver <= version && not_tombstoned {
168 result.insert(eid, neighbor);
169 }
170 }
171 }
172 result.retain(|eid, _| {
174 active
175 .tombstones
176 .get(eid)
177 .is_none_or(|ts| ts.version > version)
178 });
179
180 for (neighbor, eid) in self
182 .shadow
183 .get_entries_at_version(vid, edge_type, dir, version)
184 {
185 result.insert(eid, neighbor);
186 }
187 }
188
189 result.into_iter().map(|(e, n)| (n, e)).collect()
190 }
191
192 pub fn insert_edge(&self, src: Vid, dst: Vid, eid: Eid, edge_type: u32, version: u64) {
194 let active = self.active_overlay.read();
195 active.insert_edge(src, dst, eid, edge_type, version, Direction::Outgoing);
196 active.insert_edge(dst, src, eid, edge_type, version, Direction::Incoming);
197 }
198
199 pub fn add_tombstone(&self, eid: Eid, src: Vid, dst: Vid, edge_type: u32, version: u64) {
201 let active = self.active_overlay.read();
202 active.add_tombstone(eid, src, dst, edge_type, version);
203 }
204
205 pub fn set_main_csr(&self, edge_type: u32, direction: Direction, csr: MainCsr) {
209 let size = csr.memory_usage();
210 self.main_csr.insert((edge_type, direction), Arc::new(csr));
211 self.current_bytes.fetch_add(size, Ordering::Relaxed);
212 }
213
214 pub fn has_csr(&self, edge_type: u32, direction: Direction) -> bool {
216 self.main_csr.contains_key(&(edge_type, direction))
217 }
218
219 pub fn is_active_for(&self, edge_type: u32, direction: Direction) -> bool {
224 let active = self.active_overlay.read();
225 direction.expand().iter().any(|&d| {
226 self.main_csr.contains_key(&(edge_type, d)) || active.has_entries_for(edge_type, d)
227 })
228 }
229
230 pub fn frozen_segment_count(&self) -> usize {
232 self.frozen_segments.read().len()
233 }
234
235 pub fn should_compact(&self, threshold: usize) -> bool {
237 self.frozen_segment_count() >= threshold
238 }
239
240 pub fn compact(&self) {
249 let frozen = {
251 let mut active = self.active_overlay.write();
252 let old = std::mem::take(&mut *active);
253 Arc::new(old.freeze())
254 };
255 self.frozen_segments.write().push(frozen);
256
257 let segments = self.frozen_segments.read().clone();
260
261 let mut all_keys: HashSet<(u32, Direction)> = HashSet::new();
263 for segment in &segments {
264 for key in segment.inserts.keys() {
265 all_keys.insert(*key);
266 }
267 }
268 for entry in self.main_csr.iter() {
269 all_keys.insert(*entry.key());
270 }
271
272 for (edge_type, direction) in all_keys {
274 let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
275 let mut max_offset: u64 = 0;
276
277 let mut tombstoned_eids: HashSet<Eid> = HashSet::new();
279 for segment in &segments {
280 for (eid, ts) in &segment.tombstones {
281 if ts.edge_type == edge_type {
282 tombstoned_eids.insert(*eid);
283
284 self.shadow.add_deleted_edge(
286 ts.src_vid,
287 ShadowEdge {
288 neighbor_vid: ts.dst_vid,
289 eid: *eid,
290 edge_type,
291 created_version: 0, deleted_version: ts.version,
293 },
294 direction,
295 );
296 }
297 }
298 }
299
300 if let Some(old_csr) = self.main_csr.get(&(edge_type, direction)) {
302 for vid_offset in 0..old_csr.num_vertices() {
303 let vid = Vid::new(vid_offset as u64);
304 for entry in old_csr.get_entries(vid) {
305 if !tombstoned_eids.contains(&entry.eid) {
306 entries.push((
307 vid_offset as u64,
308 entry.neighbor_vid,
309 entry.eid,
310 entry.created_version,
311 ));
312 max_offset = max_offset.max(vid_offset as u64);
313 }
314 }
315 }
316 }
317
318 for segment in &segments {
320 if let Some(adj) = segment.inserts.get(&(edge_type, direction)) {
321 for (vid, neighbors) in adj {
322 for &(neighbor, eid, version) in neighbors {
323 if !tombstoned_eids.contains(&eid) {
324 let offset = vid.as_u64();
325 entries.push((offset, neighbor, eid, version));
326 max_offset = max_offset.max(offset);
327 }
328 }
329 }
330 }
331 }
332
333 {
335 use std::collections::hash_map::Entry;
336
337 let mut best: HashMap<Eid, usize> = HashMap::new();
338 for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
339 match best.entry(*eid) {
340 Entry::Vacant(e) => {
341 e.insert(idx);
342 }
343 Entry::Occupied(mut e) => {
344 if *ver > entries[*e.get()].3 {
345 e.insert(idx);
346 }
347 }
348 }
349 }
350 let keep: HashSet<usize> = best.into_values().collect();
351 let mut idx = 0;
352 entries.retain(|_| {
353 let k = keep.contains(&idx);
354 idx += 1;
355 k
356 });
357 }
358
359 let new_csr = MainCsr::from_edge_entries(max_offset as usize, entries);
361 let size = new_csr.memory_usage();
362
363 if let Some(old) = self.main_csr.get(&(edge_type, direction)) {
365 self.current_bytes
366 .fetch_sub(old.memory_usage(), Ordering::Relaxed);
367 }
368
369 self.main_csr
370 .insert((edge_type, direction), Arc::new(new_csr));
371 self.current_bytes.fetch_add(size, Ordering::Relaxed);
372 }
373
374 self.frozen_segments.write().clear();
377 }
378
379 pub async fn warm(
386 &self,
387 storage: &StorageManager,
388 edge_type_id: u32,
389 direction: Direction,
390 version: Option<u64>,
391 ) -> anyhow::Result<()> {
392 let schema = storage.schema_manager().schema();
393
394 let edge_type_name = schema
396 .edge_type_name_by_id_unified(edge_type_id)
397 .ok_or_else(|| anyhow::anyhow!("Edge type {} not found", edge_type_id))?;
398
399 let labels_to_load: Vec<String> = {
401 let edge_meta = schema.edge_types.get(&edge_type_name);
402 match (direction, edge_meta) {
403 (Direction::Outgoing, Some(meta)) => meta.src_labels.clone(),
404 (Direction::Incoming, Some(meta)) => meta.dst_labels.clone(),
405 (Direction::Both, Some(meta)) => {
406 let mut labels = meta.src_labels.clone();
407 labels.extend(meta.dst_labels.iter().cloned());
408 labels.sort();
409 labels.dedup();
410 labels
411 }
412 _ => Vec::new(),
413 }
414 };
415
416 use arrow_array::{ListArray, UInt8Array, UInt64Array};
417
418 let mut entries: Vec<(u64, Vid, Eid, u64)> = Vec::new();
419 let mut deleted_eids = HashSet::new();
420
421 for &read_dir in direction.expand() {
422 let dir_str = read_dir.as_str();
423 for label_name in &labels_to_load {
424 let adj_ds = storage.adjacency_dataset(&edge_type_name, label_name, dir_str);
426 let backend = storage.backend();
427
428 if let Ok(adj_ds) = adj_ds {
429 let adj_table_name = adj_ds.table_name();
430 let adj_exists = backend.table_exists(&adj_table_name).await.unwrap_or(false);
431
432 if adj_exists {
433 let mut request = crate::backend::types::ScanRequest::all(&adj_table_name);
434 if let Some(hwm) = version {
435 request = request.with_filter(format!("_version <= {}", hwm));
436 }
437
438 let batches: Vec<arrow_array::RecordBatch> =
439 backend.scan(request).await.unwrap_or_default();
440
441 for batch in batches {
442 let src_col = batch
443 .column_by_name("src_vid")
444 .unwrap()
445 .as_any()
446 .downcast_ref::<UInt64Array>()
447 .unwrap();
448 let neighbors_list = batch
449 .column_by_name("neighbors")
450 .unwrap()
451 .as_any()
452 .downcast_ref::<ListArray>()
453 .unwrap();
454 let eids_list = batch
455 .column_by_name("edge_ids")
456 .unwrap()
457 .as_any()
458 .downcast_ref::<ListArray>()
459 .unwrap();
460
461 for i in 0..batch.num_rows() {
462 let src_offset = src_col.value(i);
463 let neighbors_array_ref = neighbors_list.value(i);
464 let neighbors = neighbors_array_ref
465 .as_any()
466 .downcast_ref::<UInt64Array>()
467 .unwrap();
468 let eids_array_ref = eids_list.value(i);
469 let eids = eids_array_ref
470 .as_any()
471 .downcast_ref::<UInt64Array>()
472 .unwrap();
473
474 for j in 0..neighbors.len() {
475 entries.push((
481 src_offset,
482 Vid::from(neighbors.value(j)),
483 Eid::from(eids.value(j)),
484 0,
485 ));
486 }
487 }
488 }
489 }
490 }
491 }
492
493 let delta_ds = storage.delta_dataset(&edge_type_name, dir_str)?;
495 let backend = storage.backend();
496 let delta_table_name = delta_ds.table_name();
497
498 if backend
499 .table_exists(&delta_table_name)
500 .await
501 .unwrap_or(false)
502 {
503 let mut request = crate::backend::types::ScanRequest::all(&delta_table_name);
504 if let Some(hwm) = version {
505 request = request.with_filter(format!("_version <= {}", hwm));
506 }
507
508 if let Ok(batches) = backend.scan(request).await {
509 for batch in batches {
510 let src_col = batch
511 .column_by_name("src_vid")
512 .unwrap()
513 .as_any()
514 .downcast_ref::<UInt64Array>()
515 .unwrap();
516 let dst_col = batch
517 .column_by_name("dst_vid")
518 .unwrap()
519 .as_any()
520 .downcast_ref::<UInt64Array>()
521 .unwrap();
522 let eid_col = batch
523 .column_by_name("eid")
524 .unwrap()
525 .as_any()
526 .downcast_ref::<UInt64Array>()
527 .unwrap();
528 let op_col = batch
529 .column_by_name("op")
530 .unwrap()
531 .as_any()
532 .downcast_ref::<UInt8Array>()
533 .unwrap();
534
535 let version_col = batch
537 .column_by_name("_version")
538 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>().cloned());
539
540 for i in 0..batch.num_rows() {
541 let src_vid = Vid::from(src_col.value(i));
542 let dst_vid = Vid::from(dst_col.value(i));
543 let eid = Eid::from(eid_col.value(i));
544 let op = op_col.value(i); let row_version = version_col.as_ref().map_or(0, |vc| vc.value(i));
546
547 let is_incoming = read_dir == Direction::Incoming;
550 let (key_vid, neighbor_vid) = if is_incoming {
551 (dst_vid, src_vid)
552 } else {
553 (src_vid, dst_vid)
554 };
555
556 if op == 0 {
557 entries.push((key_vid.as_u64(), neighbor_vid, eid, row_version));
558 } else {
559 deleted_eids.insert(eid);
560 self.shadow.add_deleted_edge(
561 key_vid,
562 ShadowEdge {
563 neighbor_vid,
564 eid,
565 edge_type: edge_type_id,
566 created_version: 0,
567 deleted_version: row_version,
568 },
569 read_dir,
570 );
571 }
572 }
573 }
574 }
575 }
576 }
577
578 if !deleted_eids.is_empty() {
580 entries.retain(|(_, _, eid, _)| !deleted_eids.contains(eid));
581 }
582
583 {
586 use std::collections::hash_map::Entry;
587 use std::collections::{HashMap, HashSet};
588
589 let mut best: HashMap<Eid, usize> = HashMap::new();
590 for (idx, (_, _, eid, ver)) in entries.iter().enumerate() {
591 match best.entry(*eid) {
592 Entry::Vacant(e) => {
593 e.insert(idx);
594 }
595 Entry::Occupied(mut e) => {
596 if *ver > entries[*e.get()].3 {
597 e.insert(idx);
598 }
599 }
600 }
601 }
602 let keep: HashSet<usize> = best.into_values().collect();
603 let mut idx = 0;
604 entries.retain(|_| {
605 let k = keep.contains(&idx);
606 idx += 1;
607 k
608 });
609 }
610
611 let max_offset = entries.iter().map(|(o, _, _, _)| *o).max().unwrap_or(0);
613 let csr = MainCsr::from_edge_entries(max_offset as usize, entries);
614 self.set_main_csr(edge_type_id, direction, csr);
615
616 Ok(())
617 }
618
619 pub async fn warm_coalesced(
625 &self,
626 storage: &StorageManager,
627 edge_type_id: u32,
628 direction: Direction,
629 version: Option<u64>,
630 ) -> anyhow::Result<()> {
631 if self.has_csr(edge_type_id, direction) {
633 return Ok(());
634 }
635
636 let guard = self
638 .warm_guards
639 .entry((edge_type_id, direction))
640 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
641 .value()
642 .clone();
643 let _lock = guard.lock().await;
644
645 if self.has_csr(edge_type_id, direction) {
647 return Ok(());
648 }
649
650 self.warm(storage, edge_type_id, direction, version).await
651 }
652
653 pub fn memory_usage(&self) -> usize {
655 self.current_bytes.load(Ordering::Relaxed)
656 }
657
658 pub fn max_bytes(&self) -> usize {
660 self.max_bytes
661 }
662
663 pub fn shadow(&self) -> &ShadowCsr {
665 &self.shadow
666 }
667}
668
669impl std::fmt::Debug for AdjacencyManager {
670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671 f.debug_struct("AdjacencyManager")
672 .field("main_csr_count", &self.main_csr.len())
673 .field("frozen_segments", &self.frozen_segments.read().len())
674 .field("current_bytes", &self.current_bytes.load(Ordering::Relaxed))
675 .field("max_bytes", &self.max_bytes)
676 .finish()
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683
684 #[test]
685 fn test_insert_and_get_neighbors() {
686 let am = AdjacencyManager::new(1024 * 1024);
687 let src = Vid::new(1);
688 let dst = Vid::new(2);
689 let eid = Eid::new(100);
690
691 am.insert_edge(src, dst, eid, 1, 1);
692
693 let neighbors = am.get_neighbors(src, 1, Direction::Outgoing);
694 assert_eq!(neighbors.len(), 1);
695 assert_eq!(neighbors[0], (dst, eid));
696
697 let incoming = am.get_neighbors(dst, 1, Direction::Incoming);
699 assert_eq!(incoming.len(), 1);
700 assert_eq!(incoming[0], (src, eid));
701 }
702
703 #[test]
704 fn test_main_csr_lookup() {
705 let am = AdjacencyManager::new(1024 * 1024);
706
707 let csr = MainCsr::from_edge_entries(
708 1,
709 vec![
710 (0, Vid::new(10), Eid::new(100), 1),
711 (1, Vid::new(20), Eid::new(101), 2),
712 ],
713 );
714 am.set_main_csr(1, Direction::Outgoing, csr);
715
716 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
717 assert_eq!(n.len(), 1);
718 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
719 }
720
721 #[test]
722 fn test_overlay_on_top_of_main_csr() {
723 let am = AdjacencyManager::new(1024 * 1024);
724
725 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
727 am.set_main_csr(1, Direction::Outgoing, csr);
728
729 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
731
732 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
733 assert_eq!(n.len(), 2);
734
735 let eids: HashSet<Eid> = n.iter().map(|(_, e)| *e).collect();
736 assert!(eids.contains(&Eid::new(100)));
737 assert!(eids.contains(&Eid::new(101)));
738 }
739
740 #[test]
741 fn test_tombstone_removes_edge() {
742 let am = AdjacencyManager::new(1024 * 1024);
743
744 am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
745 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
746
747 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
748 assert!(n.is_empty());
749 }
750
751 #[test]
752 fn test_version_filtered_query() {
753 let am = AdjacencyManager::new(1024 * 1024);
754
755 let csr = MainCsr::from_edge_entries(
757 0,
758 vec![
759 (0, Vid::new(10), Eid::new(100), 1),
760 (0, Vid::new(20), Eid::new(101), 5),
761 ],
762 );
763 am.set_main_csr(1, Direction::Outgoing, csr);
764
765 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
767 assert_eq!(n.len(), 1);
768 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
769
770 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
772 assert_eq!(n.len(), 2);
773 }
774
775 #[test]
776 fn test_shadow_csr_resurrects_deleted_edges() {
777 let am = AdjacencyManager::new(1024 * 1024);
778
779 am.shadow().add_deleted_edge(
781 Vid::new(0),
782 ShadowEdge {
783 neighbor_vid: Vid::new(10),
784 eid: Eid::new(100),
785 edge_type: 1,
786 created_version: 1,
787 deleted_version: 5,
788 },
789 Direction::Outgoing,
790 );
791
792 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
794 assert_eq!(n.len(), 1);
795 assert_eq!(n[0], (Vid::new(10), Eid::new(100)));
796
797 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
799 assert!(n.is_empty());
800 }
801
802 #[test]
803 fn test_compact_merges_into_main_csr() {
804 let am = AdjacencyManager::new(1024 * 1024);
805
806 am.insert_edge(Vid::new(0), Vid::new(10), Eid::new(100), 1, 1);
808 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
809
810 am.compact();
812
813 assert_eq!(am.frozen_segment_count(), 0);
815
816 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
818 assert_eq!(n.len(), 2);
819
820 assert!(am.has_csr(1, Direction::Outgoing));
821 }
822
823 #[test]
824 fn test_compact_removes_tombstoned_edges() {
825 let am = AdjacencyManager::new(1024 * 1024);
826
827 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
829 am.set_main_csr(1, Direction::Outgoing, csr);
830
831 am.insert_edge(Vid::new(0), Vid::new(20), Eid::new(101), 1, 2);
833 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 3);
834
835 am.compact();
836
837 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
839 assert_eq!(n.len(), 1);
840 assert_eq!(n[0], (Vid::new(20), Eid::new(101)));
841 }
842
843 #[test]
844 fn test_should_compact() {
845 let am = AdjacencyManager::new(1024 * 1024);
846 assert!(!am.should_compact(4));
847
848 for _ in 0..4 {
850 let frozen = {
851 let mut active = am.active_overlay.write();
852 let old = std::mem::take(&mut *active);
853 Arc::new(old.freeze())
854 };
855 am.frozen_segments.write().push(frozen);
856 }
857
858 assert!(am.should_compact(4));
859 }
860
861 #[test]
862 fn test_empty_manager() {
863 let am = AdjacencyManager::new(1024 * 1024);
864 assert!(
865 am.get_neighbors(Vid::new(0), 1, Direction::Outgoing)
866 .is_empty()
867 );
868 assert!(!am.has_csr(1, Direction::Outgoing));
869 }
870
871 #[test]
872 fn test_overlay_tombstone_removes_main_csr_edge() {
873 let am = AdjacencyManager::new(1024 * 1024);
875
876 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
878 am.set_main_csr(1, Direction::Outgoing, csr);
879
880 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
882 assert_eq!(n.len(), 1);
883
884 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
886
887 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
889 assert!(
890 n.is_empty(),
891 "Edge should be removed by overlay tombstone, got {:?}",
892 n
893 );
894 }
895
896 #[test]
897 fn test_overlay_tombstone_removes_main_csr_edge_versioned() {
898 let am = AdjacencyManager::new(1024 * 1024);
900
901 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
902 am.set_main_csr(1, Direction::Outgoing, csr);
903
904 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 5);
905
906 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 3);
908 assert_eq!(n.len(), 1);
909
910 let n = am.get_neighbors_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
912 assert!(
913 n.is_empty(),
914 "Edge should be removed by overlay tombstone at version 5"
915 );
916 }
917
918 #[test]
919 fn test_frozen_tombstone_removes_main_csr_edge() {
920 let am = AdjacencyManager::new(1024 * 1024);
922
923 let csr = MainCsr::from_edge_entries(0, vec![(0, Vid::new(10), Eid::new(100), 1)]);
924 am.set_main_csr(1, Direction::Outgoing, csr);
925
926 am.add_tombstone(Eid::new(100), Vid::new(0), Vid::new(10), 1, 2);
928
929 {
931 let mut active = am.active_overlay.write();
932 let old = std::mem::take(&mut *active);
933 let frozen = std::sync::Arc::new(old.freeze());
934 am.frozen_segments.write().push(frozen);
935 }
936
937 let n = am.get_neighbors(Vid::new(0), 1, Direction::Outgoing);
939 assert!(n.is_empty(), "Frozen tombstone should remove Main CSR edge");
940 }
941
942 #[test]
943 fn test_per_edge_version_filtering() {
944 let am = AdjacencyManager::new(1024 * 1024);
947
948 let src = Vid::new(0);
949 let dst_a = Vid::new(10);
950 let dst_b = Vid::new(20);
951 let eid_a = Eid::new(100);
952 let eid_b = Eid::new(200);
953 let etype = 1;
954
955 am.insert_edge(src, dst_a, eid_a, etype, 3);
957
958 am.insert_edge(src, dst_b, eid_b, etype, 7);
960
961 let neighbors_v2 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 2);
963 assert!(
964 neighbors_v2.is_empty(),
965 "No edges should be visible at version 2"
966 );
967
968 let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
970 assert_eq!(
971 neighbors_v5.len(),
972 1,
973 "Only edge A should be visible at version 5"
974 );
975 assert_eq!(neighbors_v5[0].0, dst_a, "Edge A destination should match");
976 assert_eq!(neighbors_v5[0].1, eid_a, "Edge A ID should match");
977
978 let neighbors_v7 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 7);
980 assert_eq!(
981 neighbors_v7.len(),
982 2,
983 "Both edges should be visible at version 7"
984 );
985
986 let neighbors_v10 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 10);
988 assert_eq!(
989 neighbors_v10.len(),
990 2,
991 "Both edges should be visible at version 10"
992 );
993 }
994
995 #[test]
996 fn test_duplicate_edges_deduplicated_by_eid() {
997 let am = AdjacencyManager::new(1024 * 1024);
999
1000 let src = Vid::new(0);
1001 let dst = Vid::new(10);
1002 let eid = Eid::new(100);
1003 let etype = 1;
1004
1005 let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1007 am.set_main_csr(etype, Direction::Outgoing, csr);
1008
1009 am.insert_edge(src, dst, eid, etype, 3);
1011
1012 let neighbors = am.get_neighbors(src, etype, Direction::Outgoing);
1014 assert_eq!(
1015 neighbors.len(),
1016 1,
1017 "Duplicate Eid should result in single entry"
1018 );
1019 assert_eq!(neighbors[0], (dst, eid));
1020 }
1021
1022 #[test]
1023 fn test_compact_deduplicates_edges_keeps_highest_version() {
1024 let am = AdjacencyManager::new(1024 * 1024);
1028
1029 let src = Vid::new(0);
1030 let dst = Vid::new(10);
1031 let eid = Eid::new(100);
1032 let etype = 1;
1033
1034 let csr = MainCsr::from_edge_entries(0, vec![(0, dst, eid, 1)]);
1036 am.set_main_csr(etype, Direction::Outgoing, csr);
1037
1038 am.insert_edge(src, dst, eid, etype, 5);
1040
1041 am.compact();
1045
1046 let neighbors_v5 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 5);
1048 assert_eq!(neighbors_v5.len(), 1, "Edge should be visible at version 5");
1049 assert_eq!(neighbors_v5[0], (dst, eid));
1050
1051 let neighbors_v4 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 4);
1055 assert_eq!(
1056 neighbors_v4.len(),
1057 0,
1058 "After compaction, only version 5 exists; version 4 should not see it"
1059 );
1060
1061 let neighbors_v1 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 1);
1063 assert_eq!(
1064 neighbors_v1.len(),
1065 0,
1066 "Old version discarded during compaction deduplication"
1067 );
1068
1069 let neighbors_v6 = am.get_neighbors_at_version(src, etype, Direction::Outgoing, 6);
1071 assert_eq!(neighbors_v6.len(), 1, "Edge should be visible at version 6");
1072 }
1073
1074 #[test]
1077 fn test_tombstone_scan_performance() {
1078 let am = AdjacencyManager::new(1024 * 1024);
1079 let vertex_a = Vid::new(1);
1080 let vertex_b = Vid::new(2);
1081 let etype = 1;
1082
1083 let mut a_edges = Vec::new();
1085 for i in 0..5 {
1086 let dst = Vid::new(100 + i);
1087 let eid = Eid::new(1000 + i);
1088 am.insert_edge(vertex_a, dst, eid, etype, 1);
1089 a_edges.push((dst, eid));
1090 }
1091
1092 for i in 0..100 {
1094 let dst = Vid::new(200 + i);
1095 let eid = Eid::new(2000 + i);
1096 am.insert_edge(vertex_b, dst, eid, etype, 1);
1097 am.add_tombstone(eid, vertex_b, dst, etype, 2);
1098 }
1099
1100 let neighbors = am.get_neighbors(vertex_a, etype, Direction::Outgoing);
1104
1105 assert_eq!(
1107 neighbors.len(),
1108 5,
1109 "Should return all 5 edges from vertex_a"
1110 );
1111 for (dst, eid) in &a_edges {
1112 assert!(
1113 neighbors.contains(&(*dst, *eid)),
1114 "Edge {:?} should be in results",
1115 (dst, eid)
1116 );
1117 }
1118
1119 let b_neighbors = am.get_neighbors(vertex_b, etype, Direction::Outgoing);
1121 assert_eq!(
1122 b_neighbors.len(),
1123 0,
1124 "Vertex B should have no neighbors (all deleted)"
1125 );
1126 }
1127}