1use crate::storage::direction::Direction;
11use dashmap::DashMap;
12use std::collections::HashMap;
13use uni_common::core::id::{Eid, Vid};
14
15#[derive(Clone, Debug)]
17pub struct ShadowEdge {
18 pub neighbor_vid: Vid,
20 pub eid: Eid,
22 pub edge_type: u32,
24 pub created_version: u64,
26 pub deleted_version: u64,
28}
29
30pub struct ShadowCsr {
36 entries: DashMap<(u32, Direction), HashMap<Vid, Vec<ShadowEdge>>>,
39}
40
41impl ShadowCsr {
42 pub fn new() -> Self {
44 Self {
45 entries: DashMap::new(),
46 }
47 }
48
49 pub fn add_deleted_edge(&self, src_vid: Vid, edge: ShadowEdge, direction: Direction) {
51 self.entries
52 .entry((edge.edge_type, direction))
53 .or_default()
54 .entry(src_vid)
55 .or_default()
56 .push(edge);
57 }
58
59 pub fn get_entries_at_version(
64 &self,
65 vid: Vid,
66 edge_type: u32,
67 direction: Direction,
68 version: u64,
69 ) -> Vec<(Vid, Eid)> {
70 let mut result = Vec::new();
71
72 if let Some(map) = self.entries.get(&(edge_type, direction))
73 && let Some(edges) = map.get(&vid)
74 {
75 for edge in edges {
76 if edge.created_version <= version && edge.deleted_version > version {
77 result.push((edge.neighbor_vid, edge.eid));
78 }
79 }
80 }
81
82 result
83 }
84
85 pub fn get_entries(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<ShadowEdge> {
87 if let Some(map) = self.entries.get(&(edge_type, direction))
88 && let Some(edges) = map.get(&vid)
89 {
90 return edges.clone();
91 }
92 Vec::new()
93 }
94
95 pub fn gc(&self, oldest_active_snapshot_version: u64) {
100 for mut entry in self.entries.iter_mut() {
101 let map = entry.value_mut();
102 for edge_list in map.values_mut() {
103 edge_list.retain(|e| e.deleted_version > oldest_active_snapshot_version);
104 }
105 map.retain(|_, edges| !edges.is_empty());
107 }
108 }
109}
110
111impl Default for ShadowCsr {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117impl std::fmt::Debug for ShadowCsr {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 let total_edges: usize = self
120 .entries
121 .iter()
122 .map(|e| e.value().values().map(|v| v.len()).sum::<usize>())
123 .sum();
124 f.debug_struct("ShadowCsr")
125 .field("buckets", &self.entries.len())
126 .field("total_edges", &total_edges)
127 .finish()
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 fn make_shadow_edge(neighbor: u64, eid: u64, created: u64, deleted: u64) -> ShadowEdge {
136 ShadowEdge {
137 neighbor_vid: Vid::new(neighbor),
138 eid: Eid::new(eid),
139 edge_type: 1,
140 created_version: created,
141 deleted_version: deleted,
142 }
143 }
144
145 #[test]
146 fn test_add_and_query() {
147 let shadow = ShadowCsr::new();
148 let src = Vid::new(1);
149
150 shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
152
153 let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 3);
155 assert_eq!(result.len(), 1);
156 assert_eq!(result[0], (Vid::new(2), Eid::new(100)));
157
158 let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 5);
160 assert!(result.is_empty());
161
162 let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 0);
164 assert!(result.is_empty());
165 }
166
167 #[test]
168 fn test_gc_removes_old_entries() {
169 let shadow = ShadowCsr::new();
170 let src = Vid::new(1);
171
172 shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 3), Direction::Outgoing);
173 shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 10), Direction::Outgoing);
174
175 shadow.gc(5);
177
178 let entries = shadow.get_entries(src, 1, Direction::Outgoing);
179 assert_eq!(entries.len(), 1);
180 assert_eq!(entries[0].eid, Eid::new(101));
181 }
182
183 #[test]
184 fn test_empty_shadow() {
185 let shadow = ShadowCsr::new();
186 let result = shadow.get_entries_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
187 assert!(result.is_empty());
188 }
189
190 #[test]
191 fn test_multiple_edges_same_vertex() {
192 let shadow = ShadowCsr::new();
193 let src = Vid::new(1);
194
195 shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
196 shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 8), Direction::Outgoing);
197
198 let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 4);
200 assert_eq!(result.len(), 2);
201
202 let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 6);
204 assert_eq!(result.len(), 1);
205 assert_eq!(result[0].1, Eid::new(101));
206 }
207}