Skip to main content

reddb_server/runtime/
lease_timer_wheel.rs

1//! Edge-triggered timer wheel for lease expiry / refresh scheduling.
2//!
3//! Replaces the `thread::sleep(ttl/3)` polling loop in `lease_loop` with
4//! a bucket-granular timer wheel: the worker thread sleeps until the next
5//! bucket is due, fires all leases in that bucket, then sleeps again.
6//!
7//! ## Design (top-half / bottom-half)
8//!
9//! * **Top-half** (`schedule` / `cancel`): callable from any thread, O(log n).
10//! * **Bottom-half** (`run_until_shutdown`): dedicated worker thread; wakes
11//!   only when a bucket fires. Zero CPU for idle leases.
12//!
13//! Bucket granularity is configurable (default 100 ms). All expirations
14//! within the same granularity window coalesce into one wake-up.
15//!
16//! ## Scalability
17//!
18//! With N idle leases all expiring far in the future, the worker sleeps
19//! exactly until the earliest bucket — not once per lease. CPU overhead
20//! is O(0) while idle and O(k) where k is the number of leases firing in
21//! the current bucket.
22
23use std::collections::{BTreeMap, HashMap};
24use std::sync::{Condvar, Mutex};
25use std::time::{Duration, Instant};
26
27pub type LeaseId = String;
28
29struct WheelState {
30    /// Sorted map: fire instant → ids firing at that bucket boundary.
31    schedule: BTreeMap<Instant, Vec<LeaseId>>,
32    /// Reverse index: id → fire instant for O(1) cancel / reschedule.
33    id_to_fire: HashMap<LeaseId, Instant>,
34    shutdown: bool,
35}
36
37/// Timer wheel that schedules leases to fire at specific wall-clock times.
38///
39/// Construct once, share via `Arc`. Call [`schedule`] from producer threads,
40/// call [`run_until_shutdown`] on a dedicated worker thread.
41pub struct LeaseTimerWheel {
42    state: Mutex<WheelState>,
43    cvar: Condvar,
44    granularity: Duration,
45}
46
47impl LeaseTimerWheel {
48    /// New wheel with `granularity_ms` bucket width (minimum 1 ms).
49    pub fn new(granularity_ms: u64) -> Self {
50        Self {
51            state: Mutex::new(WheelState {
52                schedule: BTreeMap::new(),
53                id_to_fire: HashMap::new(),
54                shutdown: false,
55            }),
56            cvar: Condvar::new(),
57            granularity: Duration::from_millis(granularity_ms.max(1)),
58        }
59    }
60
61    /// Schedule `id` to fire at or after `expiry`.
62    ///
63    /// The expiry is snapped forward to the next granularity boundary so
64    /// near-simultaneous expirations coalesce into a single wake-up.
65    /// Re-scheduling an existing id silently replaces the prior entry.
66    pub fn schedule(&self, id: LeaseId, expiry: Instant) {
67        let fire_time = self.snap(expiry);
68        let mut state = self.state.lock().expect("wheel mutex poisoned");
69        self.remove_existing(&mut state, &id);
70        state
71            .schedule
72            .entry(fire_time)
73            .or_default()
74            .push(id.clone());
75        state.id_to_fire.insert(id, fire_time);
76        self.cvar.notify_one();
77    }
78
79    /// Cancel a scheduled lease. No-op if `id` is not scheduled.
80    pub fn cancel(&self, id: &str) {
81        let mut state = self.state.lock().expect("wheel mutex poisoned");
82        self.remove_existing(&mut state, id);
83    }
84
85    /// Signal the worker to stop after the current batch (if any) finishes.
86    pub fn shutdown(&self) {
87        let mut state = self.state.lock().expect("wheel mutex poisoned");
88        state.shutdown = true;
89        self.cvar.notify_all();
90    }
91
92    /// Block until `shutdown()` is called or `handler` returns `false`.
93    ///
94    /// For each fired lease id, calls `handler(id)`. If the handler returns
95    /// `false`, the wheel stops immediately (even mid-batch).
96    ///
97    /// CPU: the thread is parked in `Condvar::wait_timeout` between buckets.
98    /// With 10 k idle leases scheduled far in the future the CPU cost is
99    /// effectively zero — one wake-up per bucket boundary, not per lease.
100    pub fn run_until_shutdown(&self, mut handler: impl FnMut(LeaseId) -> bool + Send) {
101        loop {
102            // Hold the lock through sleep-duration computation AND the
103            // wait_timeout call so no `schedule()` notification is lost.
104            // (If we released between computing sleep_for and calling
105            // wait_timeout, a notification fired in that window would be
106            // dropped and the worker could sleep past the item's due time.)
107            let fired: Vec<LeaseId> = {
108                let state = self.state.lock().expect("wheel mutex poisoned");
109                if state.shutdown {
110                    return;
111                }
112
113                let sleep_for = match state.schedule.keys().next().copied() {
114                    Some(t) => {
115                        let now = Instant::now();
116                        if t <= now {
117                            Duration::ZERO
118                        } else {
119                            t - now
120                        }
121                    }
122                    // Nothing scheduled: park up to 1 h; condvar unparks on
123                    // schedule() or shutdown().
124                    None => Duration::from_secs(3600),
125                };
126
127                // Optionally sleep, keeping the lock through the call so
128                // any concurrent schedule() unparks us promptly.
129                let mut state = if sleep_for > Duration::ZERO {
130                    let (guard, _) = self
131                        .cvar
132                        .wait_timeout(state, sleep_for)
133                        .expect("condvar wait_timeout failed");
134                    if guard.shutdown {
135                        return;
136                    }
137                    guard
138                } else {
139                    state
140                };
141
142                // Drain all due buckets while still holding the lock.
143                let now = Instant::now();
144                let mut ids = Vec::new();
145                while let Some((&fire_time, _)) = state.schedule.iter().next() {
146                    if fire_time > now {
147                        break;
148                    }
149                    let bucket = state.schedule.remove(&fire_time).unwrap_or_default();
150                    for id in &bucket {
151                        state.id_to_fire.remove(id.as_str());
152                    }
153                    ids.extend(bucket);
154                }
155                ids
156                // Lock released here.
157            };
158
159            // Invoke handler outside the lock so schedule() / cancel() can
160            // be called from within the handler (e.g. to reschedule).
161            for id in fired {
162                if !handler(id) {
163                    return;
164                }
165            }
166        }
167    }
168
169    // Snap `expiry` forward to the next granularity boundary at-or-after
170    // `expiry`. Monotonic: result is always >= Instant::now().
171    fn snap(&self, expiry: Instant) -> Instant {
172        let now = Instant::now();
173        let base = expiry.max(now);
174        let nanos_from_now = (base - now).as_nanos() as u64;
175        let gran_nanos = self.granularity.as_nanos() as u64;
176        let snapped = nanos_from_now.div_ceil(gran_nanos) * gran_nanos;
177        now + Duration::from_nanos(snapped)
178    }
179
180    fn remove_existing(&self, state: &mut WheelState, id: &str) {
181        if let Some(old_time) = state.id_to_fire.remove(id) {
182            if let Some(v) = state.schedule.get_mut(&old_time) {
183                v.retain(|x| x != id);
184                if v.is_empty() {
185                    state.schedule.remove(&old_time);
186                }
187            }
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use std::sync::atomic::{AtomicUsize, Ordering};
196    use std::sync::Arc;
197    use std::thread;
198    use std::time::Duration;
199
200    fn ms(n: u64) -> Duration {
201        Duration::from_millis(n)
202    }
203
204    #[test]
205    fn single_lease_fires_within_granularity() {
206        let wheel = Arc::new(LeaseTimerWheel::new(50));
207        let fired = Arc::new(AtomicUsize::new(0));
208        let fired_clone = Arc::clone(&fired);
209        let wheel_clone = Arc::clone(&wheel);
210
211        // Schedule to fire in 50 ms.
212        wheel.schedule("lease-1".to_string(), Instant::now() + ms(50));
213
214        let t = thread::spawn(move || {
215            wheel_clone.run_until_shutdown(move |_id| {
216                fired_clone.fetch_add(1, Ordering::SeqCst);
217                false // stop after first fire
218            });
219        });
220
221        t.join().unwrap();
222        assert_eq!(fired.load(Ordering::SeqCst), 1);
223    }
224
225    #[test]
226    fn fires_at_roughly_the_right_time() {
227        let wheel = Arc::new(LeaseTimerWheel::new(50));
228        let start = Instant::now();
229        let fired_at = Arc::new(Mutex::new(None::<Instant>));
230        let fired_at_clone = Arc::clone(&fired_at);
231        let wheel_clone = Arc::clone(&wheel);
232
233        wheel.schedule("t".to_string(), start + ms(100));
234
235        let t = thread::spawn(move || {
236            wheel_clone.run_until_shutdown(move |_id| {
237                *fired_at_clone.lock().unwrap() = Some(Instant::now());
238                false
239            });
240        });
241        t.join().unwrap();
242
243        let elapsed = fired_at.lock().unwrap().unwrap() - start;
244        // Must fire at or after 100 ms, and within 100 ms + 2 * granularity (slack for CI).
245        assert!(elapsed >= ms(100), "fired too early: {elapsed:?}");
246        assert!(
247            elapsed < ms(400),
248            "fired too late (CI slowness?): {elapsed:?}"
249        );
250    }
251
252    #[test]
253    fn coalesces_multiple_leases_in_same_bucket() {
254        let wheel = Arc::new(LeaseTimerWheel::new(100));
255        let fired = Arc::new(AtomicUsize::new(0));
256        let fired_clone = Arc::clone(&fired);
257        let wheel_clone = Arc::clone(&wheel);
258        let total = Arc::new(AtomicUsize::new(0));
259        let total_clone = Arc::clone(&total);
260
261        // Three leases scheduled within the same 100 ms bucket.
262        let deadline = Instant::now() + ms(100);
263        wheel.schedule("a".to_string(), deadline);
264        wheel.schedule("b".to_string(), deadline);
265        wheel.schedule("c".to_string(), deadline);
266
267        let t = thread::spawn(move || {
268            wheel_clone.run_until_shutdown(move |_id| {
269                let n = fired_clone.fetch_add(1, Ordering::SeqCst) + 1;
270                total_clone.store(n, Ordering::SeqCst);
271                n < 3 // stop after 3 fires
272            });
273        });
274        t.join().unwrap();
275        assert_eq!(total.load(Ordering::SeqCst), 3);
276    }
277
278    #[test]
279    fn cancel_prevents_fire() {
280        let wheel = Arc::new(LeaseTimerWheel::new(50));
281        let fired = Arc::new(AtomicUsize::new(0));
282        let fired_clone = Arc::clone(&fired);
283        let wheel_clone = Arc::clone(&wheel);
284
285        wheel.schedule("doomed".to_string(), Instant::now() + ms(200));
286        wheel.schedule("survivor".to_string(), Instant::now() + ms(100));
287        wheel.cancel("doomed");
288
289        let t = thread::spawn(move || {
290            wheel_clone.run_until_shutdown(move |id| {
291                fired_clone.fetch_add(1, Ordering::SeqCst);
292                assert_eq!(id, "survivor", "cancelled lease must not fire");
293                false
294            });
295        });
296        t.join().unwrap();
297        assert_eq!(fired.load(Ordering::SeqCst), 1);
298    }
299
300    #[test]
301    fn reschedule_replaces_prior_entry() {
302        let wheel = Arc::new(LeaseTimerWheel::new(50));
303        let fire_times = Arc::new(Mutex::new(Vec::<Instant>::new()));
304        let fire_times_clone = Arc::clone(&fire_times);
305        let wheel_clone = Arc::clone(&wheel);
306
307        // Schedule at t+50, then immediately reschedule to t+150.
308        wheel.schedule("x".to_string(), Instant::now() + ms(50));
309        wheel.schedule("x".to_string(), Instant::now() + ms(150));
310
311        let start = Instant::now();
312        let t = thread::spawn(move || {
313            wheel_clone.run_until_shutdown(move |_id| {
314                fire_times_clone.lock().unwrap().push(Instant::now());
315                false
316            });
317        });
318        t.join().unwrap();
319
320        let times = fire_times.lock().unwrap();
321        assert_eq!(times.len(), 1, "only one fire expected after reschedule");
322        // Should not fire before 150 ms.
323        assert!(
324            times[0] - start >= ms(150),
325            "fired at wrong time: {:?}",
326            times[0] - start
327        );
328    }
329
330    #[test]
331    fn shutdown_stops_worker_with_no_scheduled_leases() {
332        let wheel = Arc::new(LeaseTimerWheel::new(100));
333        let wheel_clone = Arc::clone(&wheel);
334
335        let t = thread::spawn(move || {
336            wheel_clone.run_until_shutdown(|_| true);
337        });
338
339        // Worker is sleeping (nothing scheduled). Signal shutdown.
340        thread::sleep(ms(20));
341        wheel.shutdown();
342        t.join().unwrap(); // must not hang
343    }
344
345    #[test]
346    fn idle_10k_leases_do_not_spin() {
347        // 10 k leases all expiring 60 s from now.
348        // The worker should sleep until then — we verify it does not busy-spin
349        // by checking that it is still sleeping after 50 ms.
350        let wheel = Arc::new(LeaseTimerWheel::new(100));
351        let far_future = Instant::now() + Duration::from_secs(60);
352        for i in 0..10_000usize {
353            wheel.schedule(format!("lease-{i}"), far_future);
354        }
355
356        let fired = Arc::new(AtomicUsize::new(0));
357        let fired_clone = Arc::clone(&fired);
358        let wheel_clone = Arc::clone(&wheel);
359
360        thread::spawn(move || {
361            wheel_clone.run_until_shutdown(move |_| {
362                fired_clone.fetch_add(1, Ordering::SeqCst);
363                true
364            });
365        });
366
367        // Sleep 50 ms; nothing should have fired.
368        thread::sleep(ms(50));
369        assert_eq!(
370            fired.load(Ordering::SeqCst),
371            0,
372            "no leases should fire during idle period"
373        );
374        wheel.shutdown();
375    }
376}