1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//! Issue #584 — DeclarativeRetention slice 12.
//!
//! Background sweeper that physically removes rows whose timestamp
//! column has fallen beyond the collection's retention window. The
//! lazy-on-scan filter from slice 11 (`retention_filter`) hides
//! expired rows from reads the moment a policy is set; this slice
//! complements that filter with a low-priority background task that
//! reclaims storage in bounded batches.
//!
//! The sweeper executes deletes through the standard
//! `RedDBRuntime::execute_query` chokepoint (`DELETE FROM <collection>
//! WHERE id IN (...)`) so that WAL participation, snapshot guards and
//! event emission ride on the same single code path as user-issued
//! DELETEs — replicas replay sweeper deletes deterministically with
//! no special handling on the replication side.
//!
//! Per-collection runtime state (`last_sweep_at_ms`, `rows_swept_total`,
//! `last_pending_estimate`) lives on `RuntimeInner::retention_sweeper`
//! and is surfaced via the three extra columns on `red.retention`.
//! State is in-memory only — counters reset across restart, mirroring
//! the existing materialized-view scheduler state.
use std::collections::HashMap;
/// Default per-tick batch size for the background sweeper. Acceptance
/// criterion: `default batch size red.retention.sweeper_batch (e.g.
/// 1000)`. Kept low enough that one tick is bounded work — the
/// sweeper never holds locks long enough to block the write path.
pub(crate) const DEFAULT_SWEEPER_BATCH: usize = 1_000;
/// Per-collection sweeper state surfaced on `red.retention`.
#[derive(Debug, Clone, Default)]
pub(crate) struct SweeperState {
/// Wall-clock millis of the last sweep attempt — `0` until the
/// sweeper first ticked the collection.
pub last_sweep_at_ms: u64,
/// Cumulative rows reclaimed since boot.
pub rows_swept_total: u64,
/// Number of rows the last tick observed as expired *but not yet
/// swept* — i.e. either still inside the batch or queued for the
/// next tick. Surfaced as
/// `red.retention.current_rows_pending_sweep_estimate`.
pub last_pending_estimate: u64,
}
/// Per-runtime sweeper state map. Keyed by collection name.
#[derive(Debug, Default)]
pub(crate) struct RetentionSweeperState {
states: HashMap<String, SweeperState>,
}
impl RetentionSweeperState {
pub(crate) fn new() -> Self {
Self {
states: HashMap::new(),
}
}
/// Snapshot of every tracked collection's sweeper state.
pub(crate) fn snapshot(&self) -> Vec<(String, SweeperState)> {
self.states
.iter()
.map(|(name, state)| (name.clone(), state.clone()))
.collect()
}
/// Lookup the sweeper state for `collection`. Returns a fresh
/// `SweeperState::default()` when the collection has never been
/// ticked — keeps the call-site free of `Option` plumbing.
pub(crate) fn get(&self, collection: &str) -> SweeperState {
self.states
.get(collection)
.cloned()
.unwrap_or_default()
}
/// Record the outcome of a sweeper tick.
pub(crate) fn record_tick(
&mut self,
collection: &str,
rows_swept: u64,
pending_estimate: u64,
at_unix_ms: u64,
) {
let entry = self.states.entry(collection.to_string()).or_default();
entry.last_sweep_at_ms = at_unix_ms;
entry.rows_swept_total = entry.rows_swept_total.saturating_add(rows_swept);
entry.last_pending_estimate = pending_estimate;
}
/// Drop bookkeeping for a collection (DROP TABLE / DROP COLLECTION).
pub(crate) fn forget(&mut self, collection: &str) {
self.states.remove(collection);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn record_tick_accumulates_rows_swept_total() {
let mut state = RetentionSweeperState::new();
state.record_tick("events", 100, 50, 1_000);
state.record_tick("events", 50, 0, 2_000);
let s = state.get("events");
assert_eq!(s.rows_swept_total, 150);
assert_eq!(s.last_pending_estimate, 0);
assert_eq!(s.last_sweep_at_ms, 2_000);
}
#[test]
fn get_unknown_collection_returns_zeroed_state() {
let state = RetentionSweeperState::new();
let s = state.get("missing");
assert_eq!(s.last_sweep_at_ms, 0);
assert_eq!(s.rows_swept_total, 0);
assert_eq!(s.last_pending_estimate, 0);
}
#[test]
fn forget_removes_bookkeeping() {
let mut state = RetentionSweeperState::new();
state.record_tick("events", 10, 0, 1);
state.forget("events");
assert_eq!(state.get("events").rows_swept_total, 0);
}
}