Skip to main content

kevy_store/
expire.rs

1//! Active TTL reaper — Redis's `activeExpireCycle`, adapted to the
2//! thread-per-core / single-shard `Store`.
3//!
4//! Lazy expiry (in `live_entry[_mut]`) still handles the common case where
5//! the next access to a TTL'd key removes it. The active reaper exists for
6//! the harder case: a key has TTL but is never touched again, so without an
7//! explicit sweep it would sit in the map until the next FLUSH or eviction.
8//!
9//! Entry point: [`Store::tick_expire`]. The shard runtime calls it at the
10//! configured `[expiry].hz` cadence (default 10 Hz / every 100 ms);
11//! embedded users without a runtime call it themselves from whatever event
12//! loop they have (mandatory for WASM, which has no threads).
13
14use crate::Store;
15use std::time::Instant;
16
17/// What [`Store::tick_expire`] saw and did. Surfaced for tests, INFO
18/// keyspace, and (eventually) Wave 2 task #4's crash-safe verifier.
19#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
20pub struct ExpireStats {
21    /// Total TTL-bearing keys sampled across all rounds.
22    pub sampled: u32,
23    /// How many of those were past their deadline and got removed.
24    pub expired: u32,
25    /// Rounds executed before the loop exited (either `max_rounds` reached
26    /// or in-batch expire-rate dropped below the continuation threshold).
27    pub rounds: u32,
28}
29
30/// Continuation threshold: when an in-batch expire-rate is above this
31/// percentage, run another round (the keyspace is "expiry-heavy"). Mirrors
32/// Redis's 25% from `activeExpireCycle`.
33const EXPIRE_RATE_CONTINUATION: u32 = 25;
34
35/// Sample a single round of up to `samples` TTL-bearing keys starting at a
36/// random bucket; remove any that are past their deadline. Returns
37/// `(sampled, expired)` counts for this round. Walking is `O(visited)` —
38/// bounded by `2 * map.capacity()` to keep a sparsely-populated table from
39/// spinning the inner scan forever.
40pub(crate) fn sample_round(
41    store: &mut Store,
42    samples: usize,
43    now: Instant,
44) -> (u32, u32) {
45    let cap = store.map.capacity();
46    if cap == 0 || store.map.is_empty() {
47        return (0, 0);
48    }
49    // Random start derived from the access-ordinal clock; Fibonacci-hash
50    // multiplier shifts the sampling window every call so we don't re-visit
51    // the same bucket range twice in a row. (No-quality PRNG needed for
52    // sampling, just want to spread starting positions.)
53    store.clock_counter = store.clock_counter.wrapping_add(1);
54    let start = (store
55        .clock_counter
56        .wrapping_mul(0x9E37_79B9_7F4A_7C15) as usize)
57        % cap;
58    let mut victims: Vec<Vec<u8>> = Vec::with_capacity(samples);
59    let mut sampled = 0u32;
60    // Single-pass walk from `start`, bounded in *visited entries*, not just
61    // in TTL-bearing samples: without the bound, a keyspace with few (or
62    // zero) TTL'd keys made every round walk to the end of the table
63    // looking for them — measured at 6 % of server CPU on a 300k-key
64    // TTL-free shard (the pinned 8sh profile, 2026-06-10), for a reaper
65    // with nothing to reap. With it, a TTL-free round costs O(samples)
66    // buckets; sparse-TTL keyspaces sample fewer keys per round and rely
67    // on the rotating random start (plus lazy expiry) for coverage —
68    // the same time-boxing trade Redis's activeExpireCycle makes.
69    let visit_cap = samples.saturating_mul(8);
70    let mut visited = 0usize;
71    {
72        for (k, e) in store.map.iter_from_bucket(start) {
73            visited += 1;
74            if sampled as usize >= samples || visited > visit_cap {
75                break;
76            }
77            let Some(deadline_ns) = e.expire_at_ns else {
78                continue;
79            };
80            sampled += 1;
81            if crate::unpack_deadline(deadline_ns) <= now {
82                victims.push(k.to_vec());
83            }
84        }
85    }
86    let expired = victims.len() as u32;
87    for k in &victims {
88        store.remove_entry(k);
89    }
90    // Active-expire-driven removals are still expirations from the shard's
91    // perspective — surface them under the same counter `MEMORY STATS` /
92    // `INFO memory` already exposes.
93    if expired > 0 {
94        store.expired_keys_total = store
95            .expired_keys_total
96            .saturating_add(expired as u64);
97    }
98    let _ = sampled; // silence unused warning if all returned early
99    (sampled, expired)
100}
101
102impl Store {
103    /// Run up to `max_rounds` of active-expiry sampling against this shard.
104    ///
105    /// Per round: sample `samples_per_round` TTL-bearing keys at random and
106    /// drop any whose deadline has passed. Stop early as soon as the
107    /// in-batch expire-rate drops below 25 % (Redis's `activeExpireCycle`
108    /// continuation threshold) — that's the signal the keyspace doesn't
109    /// have a "thick band" of expired keys to clean up right now.
110    ///
111    /// Cost when there are no TTL-bearing keys at all: one map-emptiness
112    /// check + a single bucket-iter probe per round. Designed so the active
113    /// reaper is never a tax on TTL-free workloads.
114    pub fn tick_expire(&mut self, samples_per_round: usize, max_rounds: u32) -> ExpireStats {
115        // Refresh the coarse cached clock every tick (the read path's lazy
116        // expiry compares against it) — even when there's nothing to reap.
117        self.refresh_clock();
118        if samples_per_round == 0 || max_rounds == 0 || self.map.is_empty() {
119            return ExpireStats::default();
120        }
121        let now = Instant::now();
122        let mut total_sampled = 0u32;
123        let mut total_expired = 0u32;
124        let mut rounds = 0u32;
125        // Single-pass sample_round can return sampled=0 when the random
126        // start lands in an empty bucket region (sparse tables / unlucky
127        // starts). Allow 3 consecutive zero-sample rounds before declaring
128        // the keyspace TTL-free this tick, so a small table doesn't miss
129        // its expired keys for several ticks.
130        let mut consecutive_zero = 0u32;
131        for _ in 0..max_rounds {
132            let (sampled, expired) = sample_round(self, samples_per_round, now);
133            rounds += 1;
134            total_sampled = total_sampled.saturating_add(sampled);
135            total_expired = total_expired.saturating_add(expired);
136            if sampled == 0 {
137                consecutive_zero += 1;
138                if consecutive_zero >= 3 {
139                    break;
140                }
141                continue;
142            }
143            consecutive_zero = 0;
144            // Continuation gate: only push another round if THIS round was
145            // expiry-heavy. A round that finds nothing expired-enough exits.
146            if expired * 100 < sampled * EXPIRE_RATE_CONTINUATION {
147                break;
148            }
149        }
150        ExpireStats {
151            sampled: total_sampled,
152            expired: total_expired,
153            rounds,
154        }
155    }
156
157    /// Total keys expired (by lazy reap OR active reaper). Surfaced via
158    /// `INFO keyspace` and `MEMORY STATS` once those grow the field.
159    #[inline]
160    pub fn expired_keys_total(&self) -> u64 {
161        self.expired_keys_total
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::value::SmallBytes;
169    use std::time::Duration;
170
171    #[test]
172    fn tick_expire_drops_past_deadline() {
173        let mut s = Store::new();
174        s.set(b"k1", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
175        s.set(b"k2", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
176        s.set(b"perm", b"v".to_vec(), None, false, false);
177        std::thread::sleep(Duration::from_millis(20));
178        // A single tick may legitimately miss a key: the sampling walk is
179        // time-boxed and starts at a rotating bucket (the a635d65 trade —
180        // coverage comes from repeated ticks + lazy expiry, not from one
181        // exhaustive pass). Assert the *eventual* contract, like the
182        // production reaper exercises it.
183        for _ in 0..64 {
184            if s.dbsize() == 1 {
185                break;
186            }
187            s.tick_expire(20, 16);
188        }
189        assert_eq!(s.dbsize(), 1, "perm survives, both TTL'd keys reaped");
190        assert!(s.expired_keys_total() >= 2);
191    }
192
193    #[test]
194    fn tick_expire_no_op_on_fresh_ttls() {
195        let mut s = Store::new();
196        s.set(b"k1", b"v".to_vec(), Some(Duration::from_secs(3600)), false, false);
197        s.set(b"k2", b"v".to_vec(), Some(Duration::from_secs(3600)), false, false);
198        let stats = s.tick_expire(20, 16);
199        assert_eq!(stats.expired, 0, "no fresh TTL should expire");
200        // sampled may be 0..=2 depending on how many our walk hit
201        assert_eq!(s.dbsize(), 2);
202    }
203
204    #[test]
205    fn tick_expire_no_op_on_ttl_free_keyspace() {
206        let mut s = Store::new();
207        for i in 0..50 {
208            s.set(format!("k{i}").as_bytes(), b"v".to_vec(), None, false, false);
209        }
210        let stats = s.tick_expire(20, 16);
211        assert_eq!(stats.expired, 0);
212        assert_eq!(stats.sampled, 0, "no TTL'd keys ⇒ nothing sampled");
213        // Loop tolerates up to 3 consecutive zero-sample rounds (the
214        // unlucky-start guard) before exiting, so a TTL-free keyspace
215        // costs at most 3 cheap bucket-iter passes per tick.
216        assert!(stats.rounds <= 3, "got {}", stats.rounds);
217    }
218
219    #[test]
220    fn tick_expire_zero_args_short_circuit() {
221        let mut s = Store::new();
222        s.set(b"k", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
223        std::thread::sleep(Duration::from_millis(5));
224        assert_eq!(s.tick_expire(0, 16), ExpireStats::default());
225        assert_eq!(s.tick_expire(20, 0), ExpireStats::default());
226        // store still has the expired key (active reaper disabled).
227        assert_eq!(s.dbsize(), 1);
228    }
229
230    #[test]
231    fn tick_expire_loops_on_heavy_batch() {
232        let mut s = Store::new();
233        // 40 TTL'd keys (all expired) + 1 perm. A single tick samples from
234        // a random bucket window, so we may need several ticks for full
235        // coverage of a 40-key keyspace — that matches how `activeExpire`
236        // converges in production (10 ticks/sec until everything's cleaned).
237        for i in 0..40 {
238            s.set(
239                format!("k{i}").as_bytes(),
240                b"v".to_vec(),
241                Some(Duration::from_millis(1)),
242                false,
243                false,
244            );
245        }
246        s.set(b"perm", b"v".to_vec(), None, false, false);
247        std::thread::sleep(Duration::from_millis(20));
248        let mut total_expired = 0u32;
249        let mut any_round_ge_2 = false;
250        for _ in 0..20 {
251            let stats = s.tick_expire(20, 16);
252            total_expired += stats.expired;
253            if stats.rounds >= 2 {
254                any_round_ge_2 = true;
255            }
256            if s.dbsize() == 1 {
257                break;
258            }
259        }
260        assert_eq!(total_expired, 40);
261        assert!(any_round_ge_2, "at least one heavy-batch tick should loop");
262        assert_eq!(s.dbsize(), 1);
263        let _ = SmallBytes::from_slice(b"k0"); // touch SmallBytes import
264    }
265}