use crate::storage::direction::Direction;
use dashmap::DashMap;
use std::collections::HashMap;
use uni_common::core::id::{Eid, Vid};
#[derive(Clone, Debug)]
pub struct OverlayTombstone {
pub eid: Eid,
pub src_vid: Vid,
pub dst_vid: Vid,
pub edge_type: u32,
pub version: u64,
}
type OverlayKey = (u32, Direction);
type NeighborMap = HashMap<Vid, Vec<(Vid, Eid, u64)>>;
pub struct L0CsrSegment {
pub(crate) inserts: DashMap<OverlayKey, NeighborMap>,
pub(crate) tombstones: DashMap<Eid, OverlayTombstone>,
}
impl L0CsrSegment {
pub fn new() -> Self {
Self {
inserts: DashMap::new(),
tombstones: DashMap::new(),
}
}
pub fn insert_edge(
&self,
src: Vid,
dst: Vid,
eid: Eid,
edge_type: u32,
version: u64,
direction: Direction,
) {
self.inserts
.entry((edge_type, direction))
.or_default()
.entry(src)
.or_default()
.push((dst, eid, version));
}
pub fn add_tombstone(&self, eid: Eid, src: Vid, dst: Vid, edge_type: u32, version: u64) {
self.tombstones.insert(
eid,
OverlayTombstone {
eid,
src_vid: src,
dst_vid: dst,
edge_type,
version,
},
);
}
pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
let mut result = Vec::new();
if let Some(adj) = self.inserts.get(&(edge_type, direction))
&& let Some(neighbors) = adj.get(&vid)
{
for &(neighbor, eid, _version) in neighbors {
if !self.tombstones.contains_key(&eid) {
result.push((neighbor, eid));
}
}
}
result
}
pub fn get_neighbors_at_version(
&self,
vid: Vid,
edge_type: u32,
direction: Direction,
version: u64,
) -> Vec<(Vid, Eid)> {
let mut result = Vec::new();
if let Some(adj) = self.inserts.get(&(edge_type, direction))
&& let Some(neighbors) = adj.get(&vid)
{
for &(neighbor, eid, ver) in neighbors {
let tombstoned = self
.tombstones
.get(&eid)
.is_some_and(|ts| ts.version <= version);
if ver <= version && !tombstoned {
result.push((neighbor, eid));
}
}
}
result
}
pub fn has_entries_for(&self, edge_type: u32, direction: Direction) -> bool {
self.inserts.contains_key(&(edge_type, direction))
}
pub fn freeze(self) -> FrozenCsrSegment {
FrozenCsrSegment {
inserts: self.inserts.into_iter().collect(),
tombstones: self.tombstones.into_iter().collect(),
}
}
}
impl Default for L0CsrSegment {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for L0CsrSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("L0CsrSegment")
.field("insert_buckets", &self.inserts.len())
.field("tombstones", &self.tombstones.len())
.finish()
}
}
#[derive(Debug)]
pub struct FrozenCsrSegment {
pub(crate) inserts: HashMap<OverlayKey, NeighborMap>,
pub(crate) tombstones: HashMap<Eid, OverlayTombstone>,
}
impl FrozenCsrSegment {
pub fn get_neighbors(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<(Vid, Eid)> {
let mut result = Vec::new();
if let Some(adj) = self.inserts.get(&(edge_type, direction))
&& let Some(neighbors) = adj.get(&vid)
{
for &(neighbor, eid, _version) in neighbors {
if !self.tombstones.contains_key(&eid) {
result.push((neighbor, eid));
}
}
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_insert_and_get_neighbors() {
let segment = L0CsrSegment::new();
let src = Vid::new(1);
let dst = Vid::new(2);
let eid = Eid::new(100);
segment.insert_edge(src, dst, eid, 1, 1, Direction::Outgoing);
let neighbors = segment.get_neighbors(src, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0], (dst, eid));
}
#[test]
fn test_tombstone_excludes_edge() {
let segment = L0CsrSegment::new();
let src = Vid::new(1);
let dst = Vid::new(2);
let eid = Eid::new(100);
segment.insert_edge(src, dst, eid, 1, 1, Direction::Outgoing);
segment.add_tombstone(eid, src, dst, 1, 2);
let neighbors = segment.get_neighbors(src, 1, Direction::Outgoing);
assert!(neighbors.is_empty());
}
#[test]
fn test_multiple_inserts_accumulate() {
let segment = L0CsrSegment::new();
let src = Vid::new(1);
segment.insert_edge(src, Vid::new(2), Eid::new(100), 1, 1, Direction::Outgoing);
segment.insert_edge(src, Vid::new(3), Eid::new(101), 1, 1, Direction::Outgoing);
let neighbors = segment.get_neighbors(src, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 2);
}
#[test]
fn test_different_directions_are_separate() {
let segment = L0CsrSegment::new();
let src = Vid::new(1);
let dst = Vid::new(2);
segment.insert_edge(src, dst, Eid::new(100), 1, 1, Direction::Outgoing);
segment.insert_edge(dst, src, Eid::new(100), 1, 1, Direction::Incoming);
let out = segment.get_neighbors(src, 1, Direction::Outgoing);
assert_eq!(out.len(), 1);
let inc = segment.get_neighbors(dst, 1, Direction::Incoming);
assert_eq!(inc.len(), 1);
assert!(
segment
.get_neighbors(src, 1, Direction::Incoming)
.is_empty()
);
}
#[test]
fn test_freeze_preserves_data() {
let segment = L0CsrSegment::new();
let src = Vid::new(1);
let dst = Vid::new(2);
let eid = Eid::new(100);
segment.insert_edge(src, dst, eid, 1, 1, Direction::Outgoing);
segment.add_tombstone(Eid::new(999), Vid::new(5), Vid::new(6), 2, 3);
let frozen = segment.freeze();
let neighbors = frozen.get_neighbors(src, 1, Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0], (dst, eid));
assert!(frozen.tombstones.contains_key(&Eid::new(999)));
}
#[test]
fn test_empty_segment() {
let segment = L0CsrSegment::new();
assert!(
segment
.get_neighbors(Vid::new(0), 1, Direction::Outgoing)
.is_empty()
);
}
}