use crate::storage::direction::Direction;
use dashmap::DashMap;
use std::collections::HashMap;
use uni_common::core::id::{Eid, Vid};
#[derive(Clone, Debug)]
pub struct ShadowEdge {
pub neighbor_vid: Vid,
pub eid: Eid,
pub edge_type: u32,
pub created_version: u64,
pub deleted_version: u64,
}
pub struct ShadowCsr {
entries: DashMap<(u32, Direction), HashMap<Vid, Vec<ShadowEdge>>>,
}
impl ShadowCsr {
pub fn new() -> Self {
Self {
entries: DashMap::new(),
}
}
pub fn add_deleted_edge(&self, src_vid: Vid, edge: ShadowEdge, direction: Direction) {
self.entries
.entry((edge.edge_type, direction))
.or_default()
.entry(src_vid)
.or_default()
.push(edge);
}
pub fn get_entries_at_version(
&self,
vid: Vid,
edge_type: u32,
direction: Direction,
version: u64,
) -> Vec<(Vid, Eid)> {
let mut result = Vec::new();
if let Some(map) = self.entries.get(&(edge_type, direction))
&& let Some(edges) = map.get(&vid)
{
for edge in edges {
if edge.created_version <= version && edge.deleted_version > version {
result.push((edge.neighbor_vid, edge.eid));
}
}
}
result
}
pub fn get_entries(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<ShadowEdge> {
if let Some(map) = self.entries.get(&(edge_type, direction))
&& let Some(edges) = map.get(&vid)
{
return edges.clone();
}
Vec::new()
}
pub fn gc(&self, oldest_active_snapshot_version: u64) {
for mut entry in self.entries.iter_mut() {
let map = entry.value_mut();
for edge_list in map.values_mut() {
edge_list.retain(|e| e.deleted_version > oldest_active_snapshot_version);
}
map.retain(|_, edges| !edges.is_empty());
}
}
}
impl Default for ShadowCsr {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ShadowCsr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let total_edges: usize = self
.entries
.iter()
.map(|e| e.value().values().map(|v| v.len()).sum::<usize>())
.sum();
f.debug_struct("ShadowCsr")
.field("buckets", &self.entries.len())
.field("total_edges", &total_edges)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_shadow_edge(neighbor: u64, eid: u64, created: u64, deleted: u64) -> ShadowEdge {
ShadowEdge {
neighbor_vid: Vid::new(neighbor),
eid: Eid::new(eid),
edge_type: 1,
created_version: created,
deleted_version: deleted,
}
}
#[test]
fn test_add_and_query() {
let shadow = ShadowCsr::new();
let src = Vid::new(1);
shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 3);
assert_eq!(result.len(), 1);
assert_eq!(result[0], (Vid::new(2), Eid::new(100)));
let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 5);
assert!(result.is_empty());
let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 0);
assert!(result.is_empty());
}
#[test]
fn test_gc_removes_old_entries() {
let shadow = ShadowCsr::new();
let src = Vid::new(1);
shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 3), Direction::Outgoing);
shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 10), Direction::Outgoing);
shadow.gc(5);
let entries = shadow.get_entries(src, 1, Direction::Outgoing);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].eid, Eid::new(101));
}
#[test]
fn test_empty_shadow() {
let shadow = ShadowCsr::new();
let result = shadow.get_entries_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
assert!(result.is_empty());
}
#[test]
fn test_multiple_edges_same_vertex() {
let shadow = ShadowCsr::new();
let src = Vid::new(1);
shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 8), Direction::Outgoing);
let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 4);
assert_eq!(result.len(), 2);
let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 6);
assert_eq!(result.len(), 1);
assert_eq!(result[0].1, Eid::new(101));
}
}