liminal/durability/dedup/
sweep.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use crate::durability::{DurabilityError, StoredEntry};
5
6use super::codec::DedupRecord;
7use super::{DedupCache, DedupEntry};
8
9const CLOCK_SKEW_GRACE_MILLIS: u64 = 1_000;
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub struct DedupSweepReport {
14 scanned: usize,
15 expired: usize,
16 retained: usize,
17}
18
19impl DedupSweepReport {
20 #[must_use]
22 pub const fn scanned_entries(self) -> usize {
23 self.scanned
24 }
25
26 #[must_use]
28 pub const fn expired_entries(self) -> usize {
29 self.expired
30 }
31
32 #[must_use]
34 pub const fn retained_entries(self) -> usize {
35 self.retained
36 }
37}
38
39#[derive(Clone, Debug)]
41pub struct DedupSweeper {
42 cache: DedupCache,
43 ttl: Duration,
44 sweep_interval: Duration,
45}
46
47impl DedupSweeper {
48 #[must_use]
50 pub const fn new(cache: DedupCache, ttl: Duration, sweep_interval: Duration) -> Self {
51 Self {
52 cache,
53 ttl,
54 sweep_interval,
55 }
56 }
57
58 #[must_use]
60 pub const fn ttl(&self) -> Duration {
61 self.ttl
62 }
63
64 #[must_use]
66 pub const fn sweep_interval(&self) -> Duration {
67 self.sweep_interval
68 }
69
70 pub async fn sweep_once(&self, now_millis: u64) -> Result<DedupSweepReport, DurabilityError> {
76 let scanned = self.cache.store.scan(&self.cache.scan_prefix()).await?;
77 let scanned_entries = scanned.len();
78 let latest = latest_records_by_key(scanned)?;
79 let ttl_millis = duration_millis_saturating(self.ttl);
80 let mut expired_entries = 0;
81 let mut retained_entries = 0;
82
83 for candidate in latest.into_values() {
84 let DedupRecord::Active(entry) = candidate.record else {
85 continue;
86 };
87 if is_expired(entry.timestamp_millis(), now_millis, ttl_millis) {
88 self.tombstone(&entry, candidate.sequence).await?;
89 expired_entries += 1;
90 } else {
91 retained_entries += 1;
92 }
93 }
94
95 Ok(DedupSweepReport {
96 scanned: scanned_entries,
97 expired: expired_entries,
98 retained: retained_entries,
99 })
100 }
101
102 async fn tombstone(&self, entry: &DedupEntry, sequence: u64) -> Result<(), DurabilityError> {
103 let stream_key = self.cache.stream_key_for(entry.idempotency_key());
104 let expected_seq = sequence.checked_add(1).ok_or_else(|| {
105 DurabilityError::ConfigError("dedup sweep sequence overflow".to_owned())
106 })?;
107 let tombstone =
108 DedupRecord::tombstone(entry.idempotency_key().to_owned(), entry.timestamp_millis());
109 self.cache
110 .store
111 .append(&stream_key, tombstone.serialize()?, expected_seq)
112 .await?;
113 Ok(())
114 }
115}
116
117struct SweepCandidate {
118 record: DedupRecord,
119 sequence: u64,
120}
121
122impl SweepCandidate {
123 const fn new(record: DedupRecord, sequence: u64) -> Self {
124 Self { record, sequence }
125 }
126}
127
128fn latest_records_by_key(
129 entries: Vec<StoredEntry>,
130) -> Result<HashMap<String, SweepCandidate>, DurabilityError> {
131 let mut latest: HashMap<String, SweepCandidate> = HashMap::new();
132 for stored in entries {
133 let record = DedupRecord::deserialize(&stored.payload)?;
134 let key = record.idempotency_key().to_owned();
135 match latest.entry(key) {
136 std::collections::hash_map::Entry::Occupied(mut existing) => {
137 if stored.sequence >= existing.get().sequence {
138 existing.insert(SweepCandidate::new(record, stored.sequence));
139 }
140 }
141 std::collections::hash_map::Entry::Vacant(vacant) => {
142 vacant.insert(SweepCandidate::new(record, stored.sequence));
143 }
144 }
145 }
146 Ok(latest)
147}
148
149const fn is_expired(timestamp_millis: u64, now_millis: u64, ttl_millis: u64) -> bool {
150 let expiry = timestamp_millis
151 .saturating_add(ttl_millis)
152 .saturating_add(CLOCK_SKEW_GRACE_MILLIS);
153 now_millis > expiry
154}
155
156fn duration_millis_saturating(duration: Duration) -> u64 {
157 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
158}