Skip to main content

uni_store/storage/
shadow_csr.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Shadow CSR for time-travel deleted edge tracking.
5//!
6//! Stores edges that have been deleted from the Main CSR along with
7//! their version lifecycle (created_version, deleted_version). Only
8//! queried during snapshot/time-travel reads — never on the hot path.
9
10use crate::storage::direction::Direction;
11use dashmap::DashMap;
12use std::collections::HashMap;
13use uni_common::core::id::{Eid, Vid};
14
15/// A deleted edge with version range for time-travel reconstruction.
16#[derive(Clone, Debug)]
17pub struct ShadowEdge {
18    /// Neighbor vertex ID.
19    pub neighbor_vid: Vid,
20    /// Edge ID.
21    pub eid: Eid,
22    /// Edge type ID (bit 31 = 0 for schema'd, 1 for schemaless).
23    pub edge_type: u32,
24    /// Version at which this edge was created.
25    pub created_version: u64,
26    /// Version at which this edge was deleted.
27    pub deleted_version: u64,
28}
29
30/// Shadow CSR storing deleted edges with their version lifecycle.
31///
32/// Only queried during snapshot/time-travel reads. Uses `HashMap`
33/// rather than packed CSR because deleted edges are typically few,
34/// append-heavy, and never on the regular query hot path.
35pub struct ShadowCsr {
36    /// `(edge_type, direction) -> vid -> Vec<ShadowEdge>`.
37    /// Edge type is u32 with bit 31 = 0 for schema'd, 1 for schemaless.
38    entries: DashMap<(u32, Direction), HashMap<Vid, Vec<ShadowEdge>>>,
39}
40
41impl ShadowCsr {
42    /// Creates an empty shadow CSR.
43    pub fn new() -> Self {
44        Self {
45            entries: DashMap::new(),
46        }
47    }
48
49    /// Records a deleted edge with its version lifecycle.
50    pub fn add_deleted_edge(&self, src_vid: Vid, edge: ShadowEdge, direction: Direction) {
51        self.entries
52            .entry((edge.edge_type, direction))
53            .or_default()
54            .entry(src_vid)
55            .or_default()
56            .push(edge);
57    }
58
59    /// Returns edges that were alive at the given `version`.
60    ///
61    /// An edge is considered alive at `version` when
62    /// `created_version <= version < deleted_version`.
63    pub fn get_entries_at_version(
64        &self,
65        vid: Vid,
66        edge_type: u32,
67        direction: Direction,
68        version: u64,
69    ) -> Vec<(Vid, Eid)> {
70        let mut result = Vec::new();
71
72        if let Some(map) = self.entries.get(&(edge_type, direction))
73            && let Some(edges) = map.get(&vid)
74        {
75            for edge in edges {
76                if edge.created_version <= version && edge.deleted_version > version {
77                    result.push((edge.neighbor_vid, edge.eid));
78                }
79            }
80        }
81
82        result
83    }
84
85    /// Returns raw shadow entries for a vertex (all versions).
86    pub fn get_entries(&self, vid: Vid, edge_type: u32, direction: Direction) -> Vec<ShadowEdge> {
87        if let Some(map) = self.entries.get(&(edge_type, direction))
88            && let Some(edges) = map.get(&vid)
89        {
90            return edges.clone();
91        }
92        Vec::new()
93    }
94
95    /// Garbage-collects shadow entries no longer needed.
96    ///
97    /// Removes entries where `deleted_version <= oldest_active_snapshot_version`,
98    /// since no active snapshot can reference those edges.
99    pub fn gc(&self, oldest_active_snapshot_version: u64) {
100        for mut entry in self.entries.iter_mut() {
101            let map = entry.value_mut();
102            for edge_list in map.values_mut() {
103                edge_list.retain(|e| e.deleted_version > oldest_active_snapshot_version);
104            }
105            // Remove empty vertex entries
106            map.retain(|_, edges| !edges.is_empty());
107        }
108    }
109}
110
111impl Default for ShadowCsr {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117impl std::fmt::Debug for ShadowCsr {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        let total_edges: usize = self
120            .entries
121            .iter()
122            .map(|e| e.value().values().map(|v| v.len()).sum::<usize>())
123            .sum();
124        f.debug_struct("ShadowCsr")
125            .field("buckets", &self.entries.len())
126            .field("total_edges", &total_edges)
127            .finish()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    fn make_shadow_edge(neighbor: u64, eid: u64, created: u64, deleted: u64) -> ShadowEdge {
136        ShadowEdge {
137            neighbor_vid: Vid::new(neighbor),
138            eid: Eid::new(eid),
139            edge_type: 1,
140            created_version: created,
141            deleted_version: deleted,
142        }
143    }
144
145    #[test]
146    fn test_add_and_query() {
147        let shadow = ShadowCsr::new();
148        let src = Vid::new(1);
149
150        // Edge created at v1, deleted at v5
151        shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
152
153        // At v3 the edge should be visible
154        let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 3);
155        assert_eq!(result.len(), 1);
156        assert_eq!(result[0], (Vid::new(2), Eid::new(100)));
157
158        // At v5 the edge is deleted
159        let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 5);
160        assert!(result.is_empty());
161
162        // At v0 the edge doesn't exist yet
163        let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 0);
164        assert!(result.is_empty());
165    }
166
167    #[test]
168    fn test_gc_removes_old_entries() {
169        let shadow = ShadowCsr::new();
170        let src = Vid::new(1);
171
172        shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 3), Direction::Outgoing);
173        shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 10), Direction::Outgoing);
174
175        // GC with oldest snapshot at v5 — first entry (deleted_version=3) gets removed
176        shadow.gc(5);
177
178        let entries = shadow.get_entries(src, 1, Direction::Outgoing);
179        assert_eq!(entries.len(), 1);
180        assert_eq!(entries[0].eid, Eid::new(101));
181    }
182
183    #[test]
184    fn test_empty_shadow() {
185        let shadow = ShadowCsr::new();
186        let result = shadow.get_entries_at_version(Vid::new(0), 1, Direction::Outgoing, 5);
187        assert!(result.is_empty());
188    }
189
190    #[test]
191    fn test_multiple_edges_same_vertex() {
192        let shadow = ShadowCsr::new();
193        let src = Vid::new(1);
194
195        shadow.add_deleted_edge(src, make_shadow_edge(2, 100, 1, 5), Direction::Outgoing);
196        shadow.add_deleted_edge(src, make_shadow_edge(3, 101, 2, 8), Direction::Outgoing);
197
198        // At v4: both alive
199        let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 4);
200        assert_eq!(result.len(), 2);
201
202        // At v6: only second alive
203        let result = shadow.get_entries_at_version(src, 1, Direction::Outgoing, 6);
204        assert_eq!(result.len(), 1);
205        assert_eq!(result[0].1, Eid::new(101));
206    }
207}