1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// SPDX-License-Identifier: BUSL-1.1
//! Audit-retention purge: drop *superseded* edge versions older than the
//! system-time cutoff.
//!
//! Live (latest-per-base) versions and any version newer than the cutoff are
//! preserved. The last surviving version of a base is never deleted, even if
//! it is older than the cutoff — "audit retain" reclaims only history, not
//! the current state.
use super::keys::{parse_versioned_edge_key, versioned_edge_key};
use crate::engine::graph::edge_store::store::{EDGES, EdgeStore, REVERSE_EDGES, redb_err};
use nodedb_types::TenantId;
use nodedb_types::temporal::ms_to_ordinal_upper;
impl EdgeStore {
/// Purge superseded versions for `(tid, collection)` whose
/// `system_from` ordinal is strictly less than
/// `ms_to_ordinal_upper(cutoff_system_ms)`.
///
/// Preserves:
/// - Every version whose `system_from >= cutoff_ordinal`.
/// - The latest-per-base version (tombstones / GDPR markers included)
/// even if it is older than the cutoff — otherwise the base would be
/// silently resurrected on a subsequent scan.
///
/// Deletes from both `EDGES` and `REVERSE_EDGES` inside a single
/// write transaction so the two indexes never diverge mid-purge.
/// Returns the number of `EDGES` rows removed (reverse-index
/// removals are paired 1:1).
pub fn purge_superseded_versions(
&self,
tid: TenantId,
collection: &str,
cutoff_system_ms: i64,
) -> crate::Result<usize> {
let cutoff_ordinal = ms_to_ordinal_upper(cutoff_system_ms);
let t = tid.as_u64();
// Pass 1: scan read-only, group by base, collect delete candidates.
let victims = self.collect_purge_victims(t, collection, cutoff_ordinal)?;
if victims.is_empty() {
return Ok(0);
}
// Pass 2: delete forward + reverse inside one write txn.
let write_txn = self
.db
.begin_write()
.map_err(|e| redb_err("begin_write purge", e))?;
{
let mut edges = write_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges purge", e))?;
let mut rev = write_txn
.open_table(REVERSE_EDGES)
.map_err(|e| redb_err("open reverse purge", e))?;
for v in &victims {
edges
.remove((t, v.fwd_key.as_str()))
.map_err(|e| redb_err("remove edge", e))?;
rev.remove((t, v.rev_key.as_str()))
.map_err(|e| redb_err("remove reverse", e))?;
}
}
write_txn
.commit()
.map_err(|e| redb_err("commit purge", e))?;
Ok(victims.len())
}
fn collect_purge_victims(
&self,
t: u64,
collection: &str,
cutoff_ordinal: i64,
) -> crate::Result<Vec<Victim>> {
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read purge", e))?;
let edges = read_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges read purge", e))?;
// Versioned keys sort chronologically within a base (ascending
// system_from). We walk the full tenant range once, group by base,
// and for each base:
// - keep the single newest version (latest per base)
// - keep any version with system_from >= cutoff_ordinal
// - every other version becomes a purge victim
// Streaming implementation: since redb returns keys in sorted order
// and the base prefix is a strict lex-prefix, we can emit victims as
// soon as we see "the next version for this same base" — i.e., the
// current candidate is provably not the latest and the cutoff
// predicate gates inclusion.
let low = (t, "");
let high = (t + 1, "");
let range = edges
.range(low..high)
.map_err(|e| redb_err("range purge", e))?;
let mut pending: Option<PendingVersion> = None;
let mut victims: Vec<Victim> = Vec::new();
for entry in range {
let (k, _v) = entry.map_err(|e| redb_err("entry purge", e))?;
let (_tid, key_str) = k.value();
let Some((parsed_coll, src, label, dst, system_from)) =
parse_versioned_edge_key(key_str)
else {
continue;
};
if parsed_coll != collection {
// Wrong collection within this tenant — skip.
if let Some(p) = pending.take() {
// Flushing pending unconditionally: since we're leaving
// its base, whatever was held is the latest-for-base
// and must be kept.
drop(p);
}
continue;
}
let base = Base {
src: src.to_string(),
label: label.to_string(),
dst: dst.to_string(),
};
match pending.take() {
None => {
pending = Some(PendingVersion { base, system_from });
}
Some(prev) if prev.base == base => {
// prev is older than the current one (sorted ascending),
// and by definition not the latest for this base — it is
// eligible to be a victim if it's below the cutoff.
if prev.system_from < cutoff_ordinal {
victims.push(Victim::new(collection, &prev.base, prev.system_from)?);
}
pending = Some(PendingVersion { base, system_from });
}
Some(_) => {
// Base boundary — prev was the latest for its base, keep.
pending = Some(PendingVersion { base, system_from });
}
}
}
// The last pending version is the latest for its base and is kept.
drop(pending);
Ok(victims)
}
}
#[derive(Debug, PartialEq, Eq)]
struct Base {
src: String,
label: String,
dst: String,
}
struct PendingVersion {
base: Base,
system_from: i64,
}
struct Victim {
fwd_key: String,
rev_key: String,
}
impl Victim {
fn new(collection: &str, base: &Base, system_from: i64) -> crate::Result<Self> {
let fwd_key =
versioned_edge_key(collection, &base.src, &base.label, &base.dst, system_from)?;
let rev_key =
versioned_edge_key(collection, &base.dst, &base.label, &base.src, system_from)?;
Ok(Self { fwd_key, rev_key })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::graph::edge_store::temporal::keys::EdgeRef;
fn fresh_store() -> (tempfile::TempDir, EdgeStore) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("graph.redb");
let store = EdgeStore::open(&path).unwrap();
(dir, store)
}
fn put(store: &EdgeStore, edge: EdgeRef<'_>, system_from: i64) {
store
.put_edge_versioned(edge, b"v1", system_from, 0, i64::MAX)
.unwrap();
}
fn count_edges(store: &EdgeStore, tid: TenantId, collection: &str) -> usize {
let read_txn = store.db.begin_read().unwrap();
let edges = read_txn.open_table(EDGES).unwrap();
let low = (tid.as_u64(), "");
let high = (tid.as_u64() + 1, "");
edges
.range(low..high)
.unwrap()
.filter_map(|r| {
let (k, _) = r.ok()?;
let (_t, s) = k.value();
let (c, ..) = parse_versioned_edge_key(s)?;
(c == collection).then_some(())
})
.count()
}
#[test]
fn purge_drops_superseded_versions_below_cutoff() {
let (_dir, store) = fresh_store();
let tid = TenantId::new(1);
let edge = EdgeRef::new(tid, "c", "a", "L", "b");
// Three versions at ms = 100, 200, 300. Cutoff at 250 should drop
// version 100 (< cutoff AND not latest). Version 200 is < cutoff
// but WOULD be latest if 300 didn't exist — it's superseded by 300,
// so it's eligible too. Version 300 is latest: kept.
for ms in [100i64, 200, 300] {
let ord = ms_to_ordinal_upper(ms);
put(&store, edge, ord);
}
assert_eq!(count_edges(&store, tid, "c"), 3);
let purged = store.purge_superseded_versions(tid, "c", 150).unwrap();
assert_eq!(purged, 1, "only v@100 is below cutoff AND superseded");
assert_eq!(count_edges(&store, tid, "c"), 2);
}
#[test]
fn purge_never_deletes_latest_version() {
let (_dir, store) = fresh_store();
let tid = TenantId::new(1);
let edge = EdgeRef::new(tid, "c", "a", "L", "b");
put(&store, edge, ms_to_ordinal_upper(100));
// Cutoff way in the future. Only one version: keep it.
let purged = store.purge_superseded_versions(tid, "c", 10_000).unwrap();
assert_eq!(purged, 0);
assert_eq!(count_edges(&store, tid, "c"), 1);
}
#[test]
fn purge_is_per_tenant_collection_scoped() {
let (_dir, store) = fresh_store();
let tid = TenantId::new(1);
let other_tenant = TenantId::new(2);
let e1 = EdgeRef::new(tid, "c", "a", "L", "b");
let e_other_tenant = EdgeRef::new(other_tenant, "c", "a", "L", "b");
let e_other_coll = EdgeRef::new(tid, "d", "a", "L", "b");
for ms in [100i64, 200, 300] {
put(&store, e1, ms_to_ordinal_upper(ms));
put(&store, e_other_tenant, ms_to_ordinal_upper(ms));
put(&store, e_other_coll, ms_to_ordinal_upper(ms));
}
// Purge only (tid=1, "c") below 250.
let purged = store.purge_superseded_versions(tid, "c", 150).unwrap();
assert_eq!(purged, 1);
assert_eq!(count_edges(&store, tid, "c"), 2);
assert_eq!(count_edges(&store, tid, "d"), 3);
assert_eq!(count_edges(&store, other_tenant, "c"), 3);
}
#[test]
fn purge_is_idempotent() {
let (_dir, store) = fresh_store();
let tid = TenantId::new(1);
let edge = EdgeRef::new(tid, "c", "a", "L", "b");
for ms in [100i64, 200, 300] {
put(&store, edge, ms_to_ordinal_upper(ms));
}
let first = store.purge_superseded_versions(tid, "c", 150).unwrap();
let second = store.purge_superseded_versions(tid, "c", 150).unwrap();
assert_eq!(first, 1);
assert_eq!(second, 0);
}
}