reddb_server/runtime/
lease_timer_wheel.rs1use std::collections::{BTreeMap, HashMap};
24use std::sync::{Condvar, Mutex};
25use std::time::{Duration, Instant};
26
27pub type LeaseId = String;
28
29struct WheelState {
30 schedule: BTreeMap<Instant, Vec<LeaseId>>,
32 id_to_fire: HashMap<LeaseId, Instant>,
34 shutdown: bool,
35}
36
37pub struct LeaseTimerWheel {
42 state: Mutex<WheelState>,
43 cvar: Condvar,
44 granularity: Duration,
45}
46
47impl LeaseTimerWheel {
48 pub fn new(granularity_ms: u64) -> Self {
50 Self {
51 state: Mutex::new(WheelState {
52 schedule: BTreeMap::new(),
53 id_to_fire: HashMap::new(),
54 shutdown: false,
55 }),
56 cvar: Condvar::new(),
57 granularity: Duration::from_millis(granularity_ms.max(1)),
58 }
59 }
60
61 pub fn schedule(&self, id: LeaseId, expiry: Instant) {
67 let fire_time = self.snap(expiry);
68 let mut state = self.state.lock().expect("wheel mutex poisoned");
69 self.remove_existing(&mut state, &id);
70 state
71 .schedule
72 .entry(fire_time)
73 .or_default()
74 .push(id.clone());
75 state.id_to_fire.insert(id, fire_time);
76 self.cvar.notify_one();
77 }
78
79 pub fn cancel(&self, id: &str) {
81 let mut state = self.state.lock().expect("wheel mutex poisoned");
82 self.remove_existing(&mut state, id);
83 }
84
85 pub fn shutdown(&self) {
87 let mut state = self.state.lock().expect("wheel mutex poisoned");
88 state.shutdown = true;
89 self.cvar.notify_all();
90 }
91
92 pub fn run_until_shutdown(&self, mut handler: impl FnMut(LeaseId) -> bool + Send) {
101 loop {
102 let fired: Vec<LeaseId> = {
108 let state = self.state.lock().expect("wheel mutex poisoned");
109 if state.shutdown {
110 return;
111 }
112
113 let sleep_for = match state.schedule.keys().next().copied() {
114 Some(t) => {
115 let now = Instant::now();
116 if t <= now {
117 Duration::ZERO
118 } else {
119 t - now
120 }
121 }
122 None => Duration::from_secs(3600),
125 };
126
127 let mut state = if sleep_for > Duration::ZERO {
130 let (guard, _) = self
131 .cvar
132 .wait_timeout(state, sleep_for)
133 .expect("condvar wait_timeout failed");
134 if guard.shutdown {
135 return;
136 }
137 guard
138 } else {
139 state
140 };
141
142 let now = Instant::now();
144 let mut ids = Vec::new();
145 while let Some((&fire_time, _)) = state.schedule.iter().next() {
146 if fire_time > now {
147 break;
148 }
149 let bucket = state.schedule.remove(&fire_time).unwrap_or_default();
150 for id in &bucket {
151 state.id_to_fire.remove(id.as_str());
152 }
153 ids.extend(bucket);
154 }
155 ids
156 };
158
159 for id in fired {
162 if !handler(id) {
163 return;
164 }
165 }
166 }
167 }
168
169 fn snap(&self, expiry: Instant) -> Instant {
172 let now = Instant::now();
173 let base = expiry.max(now);
174 let nanos_from_now = (base - now).as_nanos() as u64;
175 let gran_nanos = self.granularity.as_nanos() as u64;
176 let snapped = nanos_from_now.div_ceil(gran_nanos) * gran_nanos;
177 now + Duration::from_nanos(snapped)
178 }
179
180 fn remove_existing(&self, state: &mut WheelState, id: &str) {
181 if let Some(old_time) = state.id_to_fire.remove(id) {
182 if let Some(v) = state.schedule.get_mut(&old_time) {
183 v.retain(|x| x != id);
184 if v.is_empty() {
185 state.schedule.remove(&old_time);
186 }
187 }
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use std::sync::atomic::{AtomicUsize, Ordering};
196 use std::sync::Arc;
197 use std::thread;
198 use std::time::Duration;
199
200 fn ms(n: u64) -> Duration {
201 Duration::from_millis(n)
202 }
203
204 #[test]
205 fn single_lease_fires_within_granularity() {
206 let wheel = Arc::new(LeaseTimerWheel::new(50));
207 let fired = Arc::new(AtomicUsize::new(0));
208 let fired_clone = Arc::clone(&fired);
209 let wheel_clone = Arc::clone(&wheel);
210
211 wheel.schedule("lease-1".to_string(), Instant::now() + ms(50));
213
214 let t = thread::spawn(move || {
215 wheel_clone.run_until_shutdown(move |_id| {
216 fired_clone.fetch_add(1, Ordering::SeqCst);
217 false });
219 });
220
221 t.join().unwrap();
222 assert_eq!(fired.load(Ordering::SeqCst), 1);
223 }
224
225 #[test]
226 fn fires_at_roughly_the_right_time() {
227 let wheel = Arc::new(LeaseTimerWheel::new(50));
228 let start = Instant::now();
229 let fired_at = Arc::new(Mutex::new(None::<Instant>));
230 let fired_at_clone = Arc::clone(&fired_at);
231 let wheel_clone = Arc::clone(&wheel);
232
233 wheel.schedule("t".to_string(), start + ms(100));
234
235 let t = thread::spawn(move || {
236 wheel_clone.run_until_shutdown(move |_id| {
237 *fired_at_clone.lock().unwrap() = Some(Instant::now());
238 false
239 });
240 });
241 t.join().unwrap();
242
243 let elapsed = fired_at.lock().unwrap().unwrap() - start;
244 assert!(elapsed >= ms(100), "fired too early: {elapsed:?}");
246 assert!(
247 elapsed < ms(400),
248 "fired too late (CI slowness?): {elapsed:?}"
249 );
250 }
251
252 #[test]
253 fn coalesces_multiple_leases_in_same_bucket() {
254 let wheel = Arc::new(LeaseTimerWheel::new(100));
255 let fired = Arc::new(AtomicUsize::new(0));
256 let fired_clone = Arc::clone(&fired);
257 let wheel_clone = Arc::clone(&wheel);
258 let total = Arc::new(AtomicUsize::new(0));
259 let total_clone = Arc::clone(&total);
260
261 let deadline = Instant::now() + ms(100);
263 wheel.schedule("a".to_string(), deadline);
264 wheel.schedule("b".to_string(), deadline);
265 wheel.schedule("c".to_string(), deadline);
266
267 let t = thread::spawn(move || {
268 wheel_clone.run_until_shutdown(move |_id| {
269 let n = fired_clone.fetch_add(1, Ordering::SeqCst) + 1;
270 total_clone.store(n, Ordering::SeqCst);
271 n < 3 });
273 });
274 t.join().unwrap();
275 assert_eq!(total.load(Ordering::SeqCst), 3);
276 }
277
278 #[test]
279 fn cancel_prevents_fire() {
280 let wheel = Arc::new(LeaseTimerWheel::new(50));
281 let fired = Arc::new(AtomicUsize::new(0));
282 let fired_clone = Arc::clone(&fired);
283 let wheel_clone = Arc::clone(&wheel);
284
285 wheel.schedule("doomed".to_string(), Instant::now() + ms(200));
286 wheel.schedule("survivor".to_string(), Instant::now() + ms(100));
287 wheel.cancel("doomed");
288
289 let t = thread::spawn(move || {
290 wheel_clone.run_until_shutdown(move |id| {
291 fired_clone.fetch_add(1, Ordering::SeqCst);
292 assert_eq!(id, "survivor", "cancelled lease must not fire");
293 false
294 });
295 });
296 t.join().unwrap();
297 assert_eq!(fired.load(Ordering::SeqCst), 1);
298 }
299
300 #[test]
301 fn reschedule_replaces_prior_entry() {
302 let wheel = Arc::new(LeaseTimerWheel::new(50));
303 let fire_times = Arc::new(Mutex::new(Vec::<Instant>::new()));
304 let fire_times_clone = Arc::clone(&fire_times);
305 let wheel_clone = Arc::clone(&wheel);
306
307 wheel.schedule("x".to_string(), Instant::now() + ms(50));
309 wheel.schedule("x".to_string(), Instant::now() + ms(150));
310
311 let start = Instant::now();
312 let t = thread::spawn(move || {
313 wheel_clone.run_until_shutdown(move |_id| {
314 fire_times_clone.lock().unwrap().push(Instant::now());
315 false
316 });
317 });
318 t.join().unwrap();
319
320 let times = fire_times.lock().unwrap();
321 assert_eq!(times.len(), 1, "only one fire expected after reschedule");
322 assert!(
324 times[0] - start >= ms(150),
325 "fired at wrong time: {:?}",
326 times[0] - start
327 );
328 }
329
330 #[test]
331 fn shutdown_stops_worker_with_no_scheduled_leases() {
332 let wheel = Arc::new(LeaseTimerWheel::new(100));
333 let wheel_clone = Arc::clone(&wheel);
334
335 let t = thread::spawn(move || {
336 wheel_clone.run_until_shutdown(|_| true);
337 });
338
339 thread::sleep(ms(20));
341 wheel.shutdown();
342 t.join().unwrap(); }
344
345 #[test]
346 fn idle_10k_leases_do_not_spin() {
347 let wheel = Arc::new(LeaseTimerWheel::new(100));
351 let far_future = Instant::now() + Duration::from_secs(60);
352 for i in 0..10_000usize {
353 wheel.schedule(format!("lease-{i}"), far_future);
354 }
355
356 let fired = Arc::new(AtomicUsize::new(0));
357 let fired_clone = Arc::clone(&fired);
358 let wheel_clone = Arc::clone(&wheel);
359
360 thread::spawn(move || {
361 wheel_clone.run_until_shutdown(move |_| {
362 fired_clone.fetch_add(1, Ordering::SeqCst);
363 true
364 });
365 });
366
367 thread::sleep(ms(50));
369 assert_eq!(
370 fired.load(Ordering::SeqCst),
371 0,
372 "no leases should fire during idle period"
373 );
374 wheel.shutdown();
375 }
376}