Skip to main content

kevy_store/
expire.rs

1//! Active TTL reaper — Redis's `activeExpireCycle`, adapted to the
2//! thread-per-core / single-shard `Store`.
3//!
4//! Lazy expiry (in `live_entry[_mut]`) still handles the common case where
5//! the next access to a TTL'd key removes it. The active reaper exists for
6//! the harder case: a key has TTL but is never touched again, so without an
7//! explicit sweep it would sit in the map until the next FLUSH or eviction.
8//!
9//! Entry point: [`Store::tick_expire`]. The shard runtime calls it at the
10//! configured `[expiry].hz` cadence (default 10 Hz / every 100 ms);
11//! embedded users without a runtime call it themselves from whatever event
12//! loop they have (mandatory for WASM, which has no threads).
13
14use crate::{Store, now_ns};
15
16/// What [`Store::tick_expire`] saw and did. Surfaced for tests, INFO
17/// keyspace, and (eventually) Wave 2 task #4's crash-safe verifier.
18#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
19pub struct ExpireStats {
20    /// Total TTL-bearing keys sampled across all rounds.
21    pub sampled: u32,
22    /// How many of those were past their deadline and got removed.
23    pub expired: u32,
24    /// Rounds executed before the loop exited (either `max_rounds` reached
25    /// or in-batch expire-rate dropped below the continuation threshold).
26    pub rounds: u32,
27}
28
29/// Continuation threshold: when an in-batch expire-rate is above this
30/// percentage, run another round (the keyspace is "expiry-heavy"). Mirrors
31/// Redis's 25% from `activeExpireCycle`.
32const EXPIRE_RATE_CONTINUATION: u32 = 25;
33
34/// Sample a single round of up to `samples` TTL-bearing keys starting at a
35/// random bucket; remove any that are past their deadline. Returns
36/// `(sampled, expired)` counts for this round. Walking is `O(visited)` —
37/// bounded by `2 * map.capacity()` to keep a sparsely-populated table from
38/// spinning the inner scan forever.
39pub(crate) fn sample_round(store: &mut Store, samples: usize, now: u64) -> (u32, u32) {
40    let cap = store.map.capacity();
41    if cap == 0 || store.map.is_empty() {
42        return (0, 0);
43    }
44    // Random start derived from the access-ordinal clock; Fibonacci-hash
45    // multiplier shifts the sampling window every call so we don't re-visit
46    // the same bucket range twice in a row. (No-quality PRNG needed for
47    // sampling, just want to spread starting positions.)
48    store.clock_counter = store.clock_counter.wrapping_add(1);
49    let start = (store
50        .clock_counter
51        .wrapping_mul(0x9E37_79B9_7F4A_7C15) as usize)
52        % cap;
53    let mut victims: Vec<Vec<u8>> = Vec::with_capacity(samples);
54    let mut sampled = 0u32;
55    // Single-pass walk from `start`, bounded in *visited entries*, not just
56    // in TTL-bearing samples: without the bound, a keyspace with few (or
57    // zero) TTL'd keys made every round walk to the end of the table
58    // looking for them — measured at 6 % of server CPU on a 300k-key
59    // TTL-free shard (the pinned 8sh profile, 2026-06-10), for a reaper
60    // with nothing to reap. With it, a TTL-free round costs O(samples)
61    // buckets; sparse-TTL keyspaces sample fewer keys per round and rely
62    // on the rotating random start (plus lazy expiry) for coverage —
63    // the same time-boxing trade Redis's activeExpireCycle makes.
64    let visit_cap = samples.saturating_mul(8);
65    let mut visited = 0usize;
66    {
67        for (k, e) in store.map.iter_from_bucket(start) {
68            visited += 1;
69            if sampled as usize >= samples || visited > visit_cap {
70                break;
71            }
72            let Some(deadline_ns) = e.expire_at_ns else {
73                continue;
74            };
75            sampled += 1;
76            if deadline_ns.get() <= now {
77                victims.push(k.to_vec());
78            }
79        }
80    }
81    let expired = victims.len() as u32;
82    for k in &victims {
83        store.remove_entry(k);
84    }
85    // Active-expire-driven removals are still expirations from the shard's
86    // perspective — surface them under the same counter `MEMORY STATS` /
87    // `INFO memory` already exposes.
88    if expired > 0 {
89        store.expired_keys_total = store
90            .expired_keys_total
91            .saturating_add(u64::from(expired));
92    }
93    let _ = sampled; // silence unused warning if all returned early
94    (sampled, expired)
95}
96
97impl Store {
98    /// Run up to `max_rounds` of active-expiry sampling against this shard.
99    ///
100    /// Per round: sample `samples_per_round` TTL-bearing keys at random and
101    /// drop any whose deadline has passed. Stop early as soon as the
102    /// in-batch expire-rate drops below 25 % (Redis's `activeExpireCycle`
103    /// continuation threshold) — that's the signal the keyspace doesn't
104    /// have a "thick band" of expired keys to clean up right now.
105    ///
106    /// Cost when there are no TTL-bearing keys at all: one map-emptiness
107    /// check + a single bucket-iter probe per round. Designed so the active
108    /// reaper is never a tax on TTL-free workloads.
109    pub fn tick_expire(&mut self, samples_per_round: usize, max_rounds: u32) -> ExpireStats {
110        // Refresh the coarse cached clock every tick (the read path's lazy
111        // expiry compares against it) — even when there's nothing to reap.
112        self.refresh_clock();
113        // A13 (2026-06-20): skip the sampling loop entirely when no key
114        // carries a TTL. `expires` is the O(1)-maintained count of
115        // TTL-bearing keys (incremented/decremented in `adjust_expires`).
116        // The standard redis-benchmark workload sets no TTLs, so
117        // `expires == 0` is the common case — saving up to `max_rounds *
118        // samples_per_round` probe lookups per tick (~256 at the default
119        // 16×16 budget). For TTL-bearing workloads (cache patterns) this
120        // adds one comparison; the bigger "splay / skip-list" reaper
121        // structure that the task entry mentioned would only beat the
122        // current random-sample algorithm at very high TTL fractions,
123        // and is left as a future workload-driven follow-up.
124        if samples_per_round == 0
125            || max_rounds == 0
126            || self.map.is_empty()
127            || self.expires == 0
128        {
129            return ExpireStats::default();
130        }
131        let now = now_ns();
132        let mut total_sampled = 0u32;
133        let mut total_expired = 0u32;
134        let mut rounds = 0u32;
135        // Single-pass sample_round can return sampled=0 when the random
136        // start lands in an empty bucket region (sparse tables / unlucky
137        // starts). Allow 3 consecutive zero-sample rounds before declaring
138        // the keyspace TTL-free this tick, so a small table doesn't miss
139        // its expired keys for several ticks.
140        let mut consecutive_zero = 0u32;
141        for _ in 0..max_rounds {
142            let (sampled, expired) = sample_round(self, samples_per_round, now);
143            rounds += 1;
144            total_sampled = total_sampled.saturating_add(sampled);
145            total_expired = total_expired.saturating_add(expired);
146            if sampled == 0 {
147                consecutive_zero += 1;
148                if consecutive_zero >= 3 {
149                    break;
150                }
151                continue;
152            }
153            consecutive_zero = 0;
154            // Continuation gate: only push another round if THIS round was
155            // expiry-heavy. A round that finds nothing expired-enough exits.
156            if expired * 100 < sampled * EXPIRE_RATE_CONTINUATION {
157                break;
158            }
159        }
160        ExpireStats {
161            sampled: total_sampled,
162            expired: total_expired,
163            rounds,
164        }
165    }
166
167    /// Total keys expired (by lazy reap OR active reaper). Surfaced via
168    /// `INFO keyspace` and `MEMORY STATS` once those grow the field.
169    #[inline]
170    pub fn expired_keys_total(&self) -> u64 {
171        self.expired_keys_total
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use crate::value::SmallBytes;
179    use std::time::Duration;
180
181    #[test]
182    fn tick_expire_drops_past_deadline() {
183        let mut s = Store::new();
184        s.set(b"k1", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
185        s.set(b"k2", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
186        s.set(b"perm", b"v".to_vec(), None, false, false);
187        // Two flake sources, both observed on virtualized CI runners:
188        // a single tick may legitimately miss a key (the sampling walk is
189        // time-boxed with a rotating start — the a635d65 trade; coverage
190        // comes from repeated ticks), and on a starved macOS VM the
191        // monotonic clock (`Instant`, mach_absolute_time) can advance far
192        // slower than the wall-clock `sleep`, so a 1 ms deadline may not
193        // have passed yet. Sleep-and-tick until converged (bounded), like
194        // the production reaper drives it — the eventual contract.
195        for _ in 0..500 {
196            s.tick_expire(20, 16);
197            if s.dbsize() == 1 {
198                break;
199            }
200            std::thread::sleep(Duration::from_millis(10));
201        }
202        assert_eq!(s.dbsize(), 1, "perm survives, both TTL'd keys reaped");
203        assert!(s.expired_keys_total() >= 2);
204    }
205
206    #[test]
207    fn tick_expire_no_op_on_fresh_ttls() {
208        let mut s = Store::new();
209        s.set(b"k1", b"v".to_vec(), Some(Duration::from_hours(1)), false, false);
210        s.set(b"k2", b"v".to_vec(), Some(Duration::from_hours(1)), false, false);
211        let stats = s.tick_expire(20, 16);
212        assert_eq!(stats.expired, 0, "no fresh TTL should expire");
213        // sampled may be 0..=2 depending on how many our walk hit
214        assert_eq!(s.dbsize(), 2);
215    }
216
217    #[test]
218    fn tick_expire_no_op_on_ttl_free_keyspace() {
219        let mut s = Store::new();
220        for i in 0..50 {
221            s.set(format!("k{i}").as_bytes(), b"v".to_vec(), None, false, false);
222        }
223        let stats = s.tick_expire(20, 16);
224        assert_eq!(stats.expired, 0);
225        assert_eq!(stats.sampled, 0, "no TTL'd keys ⇒ nothing sampled");
226        // Loop tolerates up to 3 consecutive zero-sample rounds (the
227        // unlucky-start guard) before exiting, so a TTL-free keyspace
228        // costs at most 3 cheap bucket-iter passes per tick.
229        assert!(stats.rounds <= 3, "got {}", stats.rounds);
230    }
231
232    #[test]
233    fn tick_expire_zero_args_short_circuit() {
234        let mut s = Store::new();
235        s.set(b"k", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
236        std::thread::sleep(Duration::from_millis(5));
237        assert_eq!(s.tick_expire(0, 16), ExpireStats::default());
238        assert_eq!(s.tick_expire(20, 0), ExpireStats::default());
239        // store still has the expired key (active reaper disabled).
240        assert_eq!(s.dbsize(), 1);
241    }
242
243    #[test]
244    fn tick_expire_loops_on_heavy_batch() {
245        let mut s = Store::new();
246        // 40 TTL'd keys (all expired) + 1 perm. A single tick samples from
247        // a random bucket window, so we may need several ticks for full
248        // coverage of a 40-key keyspace — that matches how `activeExpire`
249        // converges in production (10 ticks/sec until everything's cleaned).
250        for i in 0..40 {
251            s.set(
252                format!("k{i}").as_bytes(),
253                b"v".to_vec(),
254                Some(Duration::from_millis(1)),
255                false,
256                false,
257            );
258        }
259        s.set(b"perm", b"v".to_vec(), None, false, false);
260        // Sleep-and-tick until converged: on a starved CI VM the monotonic
261        // clock can lag the wall-clock sleep, so a fixed pre-sleep + a
262        // bounded dry tick loop under-counts (see
263        // tick_expire_drops_past_deadline).
264        let mut total_expired = 0u32;
265        let mut any_round_ge_2 = false;
266        for _ in 0..500 {
267            let stats = s.tick_expire(20, 16);
268            total_expired += stats.expired;
269            if stats.rounds >= 2 {
270                any_round_ge_2 = true;
271            }
272            if s.dbsize() == 1 {
273                break;
274            }
275            std::thread::sleep(Duration::from_millis(10));
276        }
277        assert_eq!(total_expired, 40);
278        assert!(any_round_ge_2, "at least one heavy-batch tick should loop");
279        assert_eq!(s.dbsize(), 1);
280        let _ = SmallBytes::from_slice(b"k0"); // touch SmallBytes import
281    }
282}