kevy_store/expire.rs
1//! Active TTL reaper — Redis's `activeExpireCycle`, adapted to the
2//! thread-per-core / single-shard `Store`.
3//!
4//! Lazy expiry (in `live_entry[_mut]`) still handles the common case where
5//! the next access to a TTL'd key removes it. The active reaper exists for
6//! the harder case: a key has TTL but is never touched again, so without an
7//! explicit sweep it would sit in the map until the next FLUSH or eviction.
8//!
9//! Entry point: [`Store::tick_expire`]. The shard runtime calls it at the
10//! configured `[expiry].hz` cadence (default 10 Hz / every 100 ms);
11//! embedded users without a runtime call it themselves from whatever event
12//! loop they have (mandatory for WASM, which has no threads).
13
14use crate::{Store, now_ns};
15
16/// What [`Store::tick_expire`] saw and did. Surfaced for tests, INFO
17/// keyspace, and (eventually) Wave 2 task #4's crash-safe verifier.
18#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
19pub struct ExpireStats {
20 /// Total TTL-bearing keys sampled across all rounds.
21 pub sampled: u32,
22 /// How many of those were past their deadline and got removed.
23 pub expired: u32,
24 /// Rounds executed before the loop exited (either `max_rounds` reached
25 /// or in-batch expire-rate dropped below the continuation threshold).
26 pub rounds: u32,
27}
28
29/// Continuation threshold: when an in-batch expire-rate is above this
30/// percentage, run another round (the keyspace is "expiry-heavy"). Mirrors
31/// Redis's 25% from `activeExpireCycle`.
32const EXPIRE_RATE_CONTINUATION: u32 = 25;
33
34/// Sample a single round of up to `samples` TTL-bearing keys starting at a
35/// random bucket; remove any that are past their deadline. Returns
36/// `(sampled, expired)` counts for this round. Walking is `O(visited)` —
37/// bounded by `2 * map.capacity()` to keep a sparsely-populated table from
38/// spinning the inner scan forever.
39pub(crate) fn sample_round(store: &mut Store, samples: usize, now: u64) -> (u32, u32) {
40 let cap = store.map.capacity();
41 if cap == 0 || store.map.is_empty() {
42 return (0, 0);
43 }
44 // Random start derived from the access-ordinal clock; Fibonacci-hash
45 // multiplier shifts the sampling window every call so we don't re-visit
46 // the same bucket range twice in a row. (No-quality PRNG needed for
47 // sampling, just want to spread starting positions.)
48 store.clock_counter = store.clock_counter.wrapping_add(1);
49 let start = (store
50 .clock_counter
51 .wrapping_mul(0x9E37_79B9_7F4A_7C15) as usize)
52 % cap;
53 let mut victims: Vec<Vec<u8>> = Vec::with_capacity(samples);
54 let mut sampled = 0u32;
55 // Single-pass walk from `start`, bounded in *visited entries*, not just
56 // in TTL-bearing samples: without the bound, a keyspace with few (or
57 // zero) TTL'd keys made every round walk to the end of the table
58 // looking for them — measured at 6 % of server CPU on a 300k-key
59 // TTL-free shard (the pinned 8sh profile, 2026-06-10), for a reaper
60 // with nothing to reap. With it, a TTL-free round costs O(samples)
61 // buckets; sparse-TTL keyspaces sample fewer keys per round and rely
62 // on the rotating random start (plus lazy expiry) for coverage —
63 // the same time-boxing trade Redis's activeExpireCycle makes.
64 let visit_cap = samples.saturating_mul(8);
65 let mut visited = 0usize;
66 {
67 for (k, e) in store.map.iter_from_bucket(start) {
68 visited += 1;
69 if sampled as usize >= samples || visited > visit_cap {
70 break;
71 }
72 let Some(deadline_ns) = e.expire_at_ns else {
73 continue;
74 };
75 sampled += 1;
76 if deadline_ns.get() <= now {
77 victims.push(k.to_vec());
78 }
79 }
80 }
81 let expired = victims.len() as u32;
82 for k in &victims {
83 store.remove_entry(k);
84 }
85 // Active-expire-driven removals are still expirations from the shard's
86 // perspective — surface them under the same counter `MEMORY STATS` /
87 // `INFO memory` already exposes.
88 if expired > 0 {
89 store.expired_keys_total = store
90 .expired_keys_total
91 .saturating_add(u64::from(expired));
92 }
93 let _ = sampled; // silence unused warning if all returned early
94 (sampled, expired)
95}
96
97impl Store {
98 /// Run up to `max_rounds` of active-expiry sampling against this shard.
99 ///
100 /// Per round: sample `samples_per_round` TTL-bearing keys at random and
101 /// drop any whose deadline has passed. Stop early as soon as the
102 /// in-batch expire-rate drops below 25 % (Redis's `activeExpireCycle`
103 /// continuation threshold) — that's the signal the keyspace doesn't
104 /// have a "thick band" of expired keys to clean up right now.
105 ///
106 /// Cost when there are no TTL-bearing keys at all: one map-emptiness
107 /// check + a single bucket-iter probe per round. Designed so the active
108 /// reaper is never a tax on TTL-free workloads.
109 pub fn tick_expire(&mut self, samples_per_round: usize, max_rounds: u32) -> ExpireStats {
110 // Refresh the coarse cached clock every tick (the read path's lazy
111 // expiry compares against it) — even when there's nothing to reap.
112 self.refresh_clock();
113 if samples_per_round == 0 || max_rounds == 0 || self.map.is_empty() {
114 return ExpireStats::default();
115 }
116 let now = now_ns();
117 let mut total_sampled = 0u32;
118 let mut total_expired = 0u32;
119 let mut rounds = 0u32;
120 // Single-pass sample_round can return sampled=0 when the random
121 // start lands in an empty bucket region (sparse tables / unlucky
122 // starts). Allow 3 consecutive zero-sample rounds before declaring
123 // the keyspace TTL-free this tick, so a small table doesn't miss
124 // its expired keys for several ticks.
125 let mut consecutive_zero = 0u32;
126 for _ in 0..max_rounds {
127 let (sampled, expired) = sample_round(self, samples_per_round, now);
128 rounds += 1;
129 total_sampled = total_sampled.saturating_add(sampled);
130 total_expired = total_expired.saturating_add(expired);
131 if sampled == 0 {
132 consecutive_zero += 1;
133 if consecutive_zero >= 3 {
134 break;
135 }
136 continue;
137 }
138 consecutive_zero = 0;
139 // Continuation gate: only push another round if THIS round was
140 // expiry-heavy. A round that finds nothing expired-enough exits.
141 if expired * 100 < sampled * EXPIRE_RATE_CONTINUATION {
142 break;
143 }
144 }
145 ExpireStats {
146 sampled: total_sampled,
147 expired: total_expired,
148 rounds,
149 }
150 }
151
152 /// Total keys expired (by lazy reap OR active reaper). Surfaced via
153 /// `INFO keyspace` and `MEMORY STATS` once those grow the field.
154 #[inline]
155 pub fn expired_keys_total(&self) -> u64 {
156 self.expired_keys_total
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::value::SmallBytes;
164 use std::time::Duration;
165
166 #[test]
167 fn tick_expire_drops_past_deadline() {
168 let mut s = Store::new();
169 s.set(b"k1", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
170 s.set(b"k2", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
171 s.set(b"perm", b"v".to_vec(), None, false, false);
172 // Two flake sources, both observed on virtualized CI runners:
173 // a single tick may legitimately miss a key (the sampling walk is
174 // time-boxed with a rotating start — the a635d65 trade; coverage
175 // comes from repeated ticks), and on a starved macOS VM the
176 // monotonic clock (`Instant`, mach_absolute_time) can advance far
177 // slower than the wall-clock `sleep`, so a 1 ms deadline may not
178 // have passed yet. Sleep-and-tick until converged (bounded), like
179 // the production reaper drives it — the eventual contract.
180 for _ in 0..500 {
181 s.tick_expire(20, 16);
182 if s.dbsize() == 1 {
183 break;
184 }
185 std::thread::sleep(Duration::from_millis(10));
186 }
187 assert_eq!(s.dbsize(), 1, "perm survives, both TTL'd keys reaped");
188 assert!(s.expired_keys_total() >= 2);
189 }
190
191 #[test]
192 fn tick_expire_no_op_on_fresh_ttls() {
193 let mut s = Store::new();
194 s.set(b"k1", b"v".to_vec(), Some(Duration::from_hours(1)), false, false);
195 s.set(b"k2", b"v".to_vec(), Some(Duration::from_hours(1)), false, false);
196 let stats = s.tick_expire(20, 16);
197 assert_eq!(stats.expired, 0, "no fresh TTL should expire");
198 // sampled may be 0..=2 depending on how many our walk hit
199 assert_eq!(s.dbsize(), 2);
200 }
201
202 #[test]
203 fn tick_expire_no_op_on_ttl_free_keyspace() {
204 let mut s = Store::new();
205 for i in 0..50 {
206 s.set(format!("k{i}").as_bytes(), b"v".to_vec(), None, false, false);
207 }
208 let stats = s.tick_expire(20, 16);
209 assert_eq!(stats.expired, 0);
210 assert_eq!(stats.sampled, 0, "no TTL'd keys ⇒ nothing sampled");
211 // Loop tolerates up to 3 consecutive zero-sample rounds (the
212 // unlucky-start guard) before exiting, so a TTL-free keyspace
213 // costs at most 3 cheap bucket-iter passes per tick.
214 assert!(stats.rounds <= 3, "got {}", stats.rounds);
215 }
216
217 #[test]
218 fn tick_expire_zero_args_short_circuit() {
219 let mut s = Store::new();
220 s.set(b"k", b"v".to_vec(), Some(Duration::from_millis(1)), false, false);
221 std::thread::sleep(Duration::from_millis(5));
222 assert_eq!(s.tick_expire(0, 16), ExpireStats::default());
223 assert_eq!(s.tick_expire(20, 0), ExpireStats::default());
224 // store still has the expired key (active reaper disabled).
225 assert_eq!(s.dbsize(), 1);
226 }
227
228 #[test]
229 fn tick_expire_loops_on_heavy_batch() {
230 let mut s = Store::new();
231 // 40 TTL'd keys (all expired) + 1 perm. A single tick samples from
232 // a random bucket window, so we may need several ticks for full
233 // coverage of a 40-key keyspace — that matches how `activeExpire`
234 // converges in production (10 ticks/sec until everything's cleaned).
235 for i in 0..40 {
236 s.set(
237 format!("k{i}").as_bytes(),
238 b"v".to_vec(),
239 Some(Duration::from_millis(1)),
240 false,
241 false,
242 );
243 }
244 s.set(b"perm", b"v".to_vec(), None, false, false);
245 // Sleep-and-tick until converged: on a starved CI VM the monotonic
246 // clock can lag the wall-clock sleep, so a fixed pre-sleep + a
247 // bounded dry tick loop under-counts (see
248 // tick_expire_drops_past_deadline).
249 let mut total_expired = 0u32;
250 let mut any_round_ge_2 = false;
251 for _ in 0..500 {
252 let stats = s.tick_expire(20, 16);
253 total_expired += stats.expired;
254 if stats.rounds >= 2 {
255 any_round_ge_2 = true;
256 }
257 if s.dbsize() == 1 {
258 break;
259 }
260 std::thread::sleep(Duration::from_millis(10));
261 }
262 assert_eq!(total_expired, 40);
263 assert!(any_round_ge_2, "at least one heavy-batch tick should loop");
264 assert_eq!(s.dbsize(), 1);
265 let _ = SmallBytes::from_slice(b"k0"); // touch SmallBytes import
266 }
267}