Skip to main content

liminal/durability/dedup/
sweep.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use crate::durability::{DurabilityError, StoredEntry};
5
6use super::codec::DedupRecord;
7use super::{DedupCache, DedupEntry};
8
9const CLOCK_SKEW_GRACE_MILLIS: u64 = 1_000;
10
11/// Summary produced by a dedup TTL sweep.
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub struct DedupSweepReport {
14    scanned: usize,
15    expired: usize,
16    retained: usize,
17}
18
19impl DedupSweepReport {
20    /// Number of scanned stored dedup events.
21    #[must_use]
22    pub const fn scanned_entries(self) -> usize {
23        self.scanned
24    }
25
26    /// Number of logical cache entries tombstoned by the sweep.
27    #[must_use]
28    pub const fn expired_entries(self) -> usize {
29        self.expired
30    }
31
32    /// Number of latest active entries retained after the sweep.
33    #[must_use]
34    pub const fn retained_entries(self) -> usize {
35        self.retained
36    }
37}
38
39/// Configurable TTL sweeper for haematite-backed dedup entries.
40#[derive(Clone, Debug)]
41pub struct DedupSweeper {
42    cache: DedupCache,
43    ttl: Duration,
44    sweep_interval: Duration,
45}
46
47impl DedupSweeper {
48    /// Creates a sweeper with caller-configured TTL and interval.
49    #[must_use]
50    pub const fn new(cache: DedupCache, ttl: Duration, sweep_interval: Duration) -> Self {
51        Self {
52            cache,
53            ttl,
54            sweep_interval,
55        }
56    }
57
58    /// Returns the configured dedup TTL.
59    #[must_use]
60    pub const fn ttl(&self) -> Duration {
61        self.ttl
62    }
63
64    /// Returns the configured sweep interval.
65    #[must_use]
66    pub const fn sweep_interval(&self) -> Duration {
67        self.sweep_interval
68    }
69
70    /// Runs one scan-based sweep at the supplied epoch millisecond timestamp.
71    ///
72    /// # Errors
73    ///
74    /// Propagates store scan/append errors and serialized-entry decode errors.
75    pub async fn sweep_once(&self, now_millis: u64) -> Result<DedupSweepReport, DurabilityError> {
76        let scanned = self.cache.store.scan(&self.cache.scan_prefix()).await?;
77        let scanned_entries = scanned.len();
78        let latest = latest_records_by_key(scanned)?;
79        let ttl_millis = duration_millis_saturating(self.ttl);
80        let mut expired_entries = 0;
81        let mut retained_entries = 0;
82
83        for candidate in latest.into_values() {
84            let DedupRecord::Active(entry) = candidate.record else {
85                continue;
86            };
87            if is_expired(entry.timestamp_millis(), now_millis, ttl_millis) {
88                self.tombstone(&entry, candidate.sequence).await?;
89                expired_entries += 1;
90            } else {
91                retained_entries += 1;
92            }
93        }
94
95        Ok(DedupSweepReport {
96            scanned: scanned_entries,
97            expired: expired_entries,
98            retained: retained_entries,
99        })
100    }
101
102    async fn tombstone(&self, entry: &DedupEntry, sequence: u64) -> Result<(), DurabilityError> {
103        let stream_key = self.cache.stream_key_for(entry.idempotency_key());
104        let expected_seq = sequence.checked_add(1).ok_or_else(|| {
105            DurabilityError::ConfigError("dedup sweep sequence overflow".to_owned())
106        })?;
107        let tombstone =
108            DedupRecord::tombstone(entry.idempotency_key().to_owned(), entry.timestamp_millis());
109        self.cache
110            .store
111            .append(&stream_key, tombstone.serialize()?, expected_seq)
112            .await?;
113        Ok(())
114    }
115}
116
117struct SweepCandidate {
118    record: DedupRecord,
119    sequence: u64,
120}
121
122impl SweepCandidate {
123    const fn new(record: DedupRecord, sequence: u64) -> Self {
124        Self { record, sequence }
125    }
126}
127
128fn latest_records_by_key(
129    entries: Vec<StoredEntry>,
130) -> Result<HashMap<String, SweepCandidate>, DurabilityError> {
131    let mut latest: HashMap<String, SweepCandidate> = HashMap::new();
132    for stored in entries {
133        let record = DedupRecord::deserialize(&stored.payload)?;
134        let key = record.idempotency_key().to_owned();
135        match latest.entry(key) {
136            std::collections::hash_map::Entry::Occupied(mut existing) => {
137                if stored.sequence >= existing.get().sequence {
138                    existing.insert(SweepCandidate::new(record, stored.sequence));
139                }
140            }
141            std::collections::hash_map::Entry::Vacant(vacant) => {
142                vacant.insert(SweepCandidate::new(record, stored.sequence));
143            }
144        }
145    }
146    Ok(latest)
147}
148
149const fn is_expired(timestamp_millis: u64, now_millis: u64, ttl_millis: u64) -> bool {
150    let expiry = timestamp_millis
151        .saturating_add(ttl_millis)
152        .saturating_add(CLOCK_SKEW_GRACE_MILLIS);
153    now_millis > expiry
154}
155
156fn duration_millis_saturating(duration: Duration) -> u64 {
157    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
158}