pingora_timeout/
timer.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Lightweight timer for systems with high rate of operations with timeout
16//! associated with them
17//!
18//! Users don't need to interact with this module.
19//!
20//! The idea is to bucket timers into finite time slots so that operations that
21//! start and end quickly don't have to create their own timers all the time
22//!
23//! Benchmark:
24//! - create 7.809622ms total, 78ns avg per iteration
25//! - drop: 1.348552ms total, 13ns avg per iteration
26//!
27//! tokio timer:
28//! - create 34.317439ms total, 343ns avg per iteration
29//! - drop: 10.694154ms total, 106ns avg per iteration
30
31use parking_lot::RwLock;
32use std::collections::BTreeMap;
33use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use thread_local::ThreadLocal;
37use tokio::sync::Notify;
38
39const RESOLUTION_MS: u64 = 10;
40const RESOLUTION_DURATION: Duration = Duration::from_millis(RESOLUTION_MS);
41
42// round to the NEXT timestamp based on the resolution
43#[inline]
44fn round_to(raw: u128, resolution: u128) -> u128 {
45    raw - 1 + resolution - (raw - 1) % resolution
46}
47// millisecond resolution as most
48#[derive(PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Debug)]
49struct Time(u128);
50
51impl From<u128> for Time {
52    fn from(raw_ms: u128) -> Self {
53        Time(round_to(raw_ms, RESOLUTION_MS as u128))
54    }
55}
56
57impl From<Duration> for Time {
58    fn from(d: Duration) -> Self {
59        Time(round_to(d.as_millis(), RESOLUTION_MS as u128))
60    }
61}
62
63impl Time {
64    pub fn not_after(&self, ts: u128) -> bool {
65        self.0 <= ts
66    }
67}
68
69/// the stub for waiting for a timer to be expired.
70pub struct TimerStub(Arc<Notify>, Arc<AtomicBool>);
71
72impl TimerStub {
73    /// Wait for the timer to expire.
74    pub async fn poll(self) {
75        if self.1.load(Ordering::SeqCst) {
76            return;
77        }
78        self.0.notified().await;
79    }
80}
81
82struct Timer(Arc<Notify>, Arc<AtomicBool>);
83
84impl Timer {
85    pub fn new() -> Self {
86        Timer(Arc::new(Notify::new()), Arc::new(AtomicBool::new(false)))
87    }
88
89    pub fn fire(&self) {
90        self.1.store(true, Ordering::SeqCst);
91        self.0.notify_waiters();
92    }
93
94    pub fn subscribe(&self) -> TimerStub {
95        TimerStub(self.0.clone(), self.1.clone())
96    }
97}
98
99/// The object that holds all the timers registered to it.
100pub struct TimerManager {
101    // each thread insert into its local timer tree to avoid lock contention
102    timers: ThreadLocal<RwLock<BTreeMap<Time, Timer>>>,
103    zero: Instant, // the reference zero point of Timestamp
104    // Start a new clock thread if this is -1 or staled. The clock thread should keep updating this
105    clock_watchdog: AtomicI64,
106    paused: AtomicBool,
107}
108
109// Consider the clock thread is dead after it fails to update the thread in DELAYS_SEC
110const DELAYS_SEC: i64 = 2; // TODO: make sure this value is larger than RESOLUTION_DURATION
111
112impl Default for TimerManager {
113    fn default() -> Self {
114        TimerManager {
115            timers: ThreadLocal::new(),
116            zero: Instant::now(),
117            clock_watchdog: AtomicI64::new(-DELAYS_SEC),
118            paused: AtomicBool::new(false),
119        }
120    }
121}
122
123impl TimerManager {
124    /// Create a new [TimerManager]
125    pub fn new() -> Self {
126        Self::default()
127    }
128
129    // This thread sleeps for a resolution time and then fires all the timers that are due to fire
130    pub(crate) fn clock_thread(&self) {
131        loop {
132            std::thread::sleep(RESOLUTION_DURATION);
133            let now = Instant::now() - self.zero;
134            self.clock_watchdog
135                .store(now.as_secs() as i64, Ordering::Relaxed);
136            if self.is_paused_for_fork() {
137                // just stop acquiring the locks, waiting for fork to happen
138                continue;
139            }
140            let now = now.as_millis();
141            // iterate through the timer tree for all threads
142            for thread_timer in self.timers.iter() {
143                let mut timers = thread_timer.write();
144                // Fire all timers until now
145                loop {
146                    let key_to_remove = timers.iter().next().and_then(|(k, _)| {
147                        if k.not_after(now) {
148                            Some(*k)
149                        } else {
150                            None
151                        }
152                    });
153                    if let Some(k) = key_to_remove {
154                        let timer = timers.remove(&k);
155                        // safe to unwrap, the key is from iter().next()
156                        timer.unwrap().fire();
157                    } else {
158                        break;
159                    }
160                }
161                // write lock drops here
162            }
163        }
164    }
165
166    // False if the clock is already started
167    // If true, the caller must start the clock thread next
168    pub(crate) fn should_i_start_clock(&self) -> bool {
169        let Err(prev) = self.is_clock_running() else {
170            return false;
171        };
172        let now = Instant::now().duration_since(self.zero).as_secs() as i64;
173        let res =
174            self.clock_watchdog
175                .compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst);
176        res.is_ok()
177    }
178
179    // Ok(()) if clock is running (watch dog is within DELAYS_SEC of now)
180    // Err(time) if watch do stopped at `time`
181    pub(crate) fn is_clock_running(&self) -> Result<(), i64> {
182        let now = Instant::now().duration_since(self.zero).as_secs() as i64;
183        let prev = self.clock_watchdog.load(Ordering::SeqCst);
184        if now < prev + DELAYS_SEC {
185            Ok(())
186        } else {
187            Err(prev)
188        }
189    }
190
191    /// Register a timer.
192    ///
193    /// When the timer expires, the [TimerStub] will be notified.
194    pub fn register_timer(&self, duration: Duration) -> TimerStub {
195        if self.is_paused_for_fork() {
196            // Return a dummy TimerStub that will trigger right away.
197            // This is fine assuming pause_for_fork() is called right before fork().
198            // The only possible register_timer() is from another thread which will
199            // be entirely lost after fork()
200            // TODO: buffer these register calls instead (without a lock)
201            let timer = Timer::new();
202            timer.fire();
203            return timer.subscribe();
204        }
205        let now: Time = (Instant::now() + duration - self.zero).into();
206        {
207            let timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).read();
208            if let Some(t) = timers.get(&now) {
209                return t.subscribe();
210            }
211        } // drop read lock
212
213        let timer = Timer::new();
214        let mut timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).write();
215        // Usually we check if another thread has insert the same node before we get the write lock,
216        // but because only this thread will insert anything to its local timers tree, there
217        // is no possible race that can happen. The only other thread is the clock thread who
218        // only removes timer from the tree
219        let stub = timer.subscribe();
220        timers.insert(now, timer);
221        stub
222    }
223
224    fn is_paused_for_fork(&self) -> bool {
225        self.paused.load(Ordering::SeqCst)
226    }
227
228    /// Pause the timer for fork()
229    ///
230    /// Because RwLock across fork() is undefined behavior, this function makes sure that no one
231    /// holds any locks.
232    ///
233    /// This function should be called right before fork().
234    pub fn pause_for_fork(&self) {
235        self.paused.store(true, Ordering::SeqCst);
236        // wait for everything to get out of their locks
237        std::thread::sleep(RESOLUTION_DURATION * 2);
238    }
239
240    /// Unpause the timer after fork()
241    ///
242    /// This function should be called right after fork().
243    pub fn unpause(&self) {
244        self.paused.store(false, Ordering::SeqCst)
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    #[test]
253    fn test_round() {
254        assert_eq!(round_to(30, 10), 30);
255        assert_eq!(round_to(31, 10), 40);
256        assert_eq!(round_to(29, 10), 30);
257    }
258
259    #[test]
260    fn test_time() {
261        let t: Time = 128.into(); // t will round to 130
262        assert_eq!(t, Duration::from_millis(130).into());
263        assert!(!t.not_after(128));
264        assert!(!t.not_after(129));
265        assert!(t.not_after(130));
266        assert!(t.not_after(131));
267    }
268
269    #[tokio::test]
270    async fn test_timer_manager() {
271        let tm_a = Arc::new(TimerManager::new());
272        let tm = tm_a.clone();
273        std::thread::spawn(move || tm_a.clock_thread());
274
275        let now = Instant::now();
276        let t1 = tm.register_timer(Duration::from_secs(1));
277        let t2 = tm.register_timer(Duration::from_secs(1));
278        t1.poll().await;
279        assert_eq!(now.elapsed().as_secs(), 1);
280        let now = Instant::now();
281        t2.poll().await;
282        // t2 fired along t1 so no extra wait time
283        assert_eq!(now.elapsed().as_secs(), 0);
284    }
285
286    #[test]
287    fn test_timer_manager_start_check() {
288        let tm = Arc::new(TimerManager::new());
289        assert!(tm.should_i_start_clock());
290        assert!(!tm.should_i_start_clock());
291        assert!(tm.is_clock_running().is_ok());
292    }
293
294    #[test]
295    fn test_timer_manager_watchdog() {
296        let tm = Arc::new(TimerManager::new());
297        assert!(tm.should_i_start_clock());
298        assert!(!tm.should_i_start_clock());
299
300        // we don't actually start the clock thread, sleep for the watchdog to expire
301        std::thread::sleep(Duration::from_secs(DELAYS_SEC as u64 + 1));
302        assert!(tm.is_clock_running().is_err());
303        assert!(tm.should_i_start_clock());
304    }
305
306    #[tokio::test]
307    async fn test_timer_manager_pause() {
308        let tm_a = Arc::new(TimerManager::new());
309        let tm = tm_a.clone();
310        std::thread::spawn(move || tm_a.clock_thread());
311
312        let now = Instant::now();
313        let t1 = tm.register_timer(Duration::from_secs(2));
314        tm.pause_for_fork();
315        // no actual fork happen, we just test that pause and unpause work
316
317        // any timer in this critical section is timed out right away
318        let t2 = tm.register_timer(Duration::from_secs(2));
319        t2.poll().await;
320        assert_eq!(now.elapsed().as_secs(), 0);
321
322        std::thread::sleep(Duration::from_secs(1));
323        tm.unpause();
324        t1.poll().await;
325        assert_eq!(now.elapsed().as_secs(), 2);
326    }
327}