Skip to main content

ntex_io/
timer.rs

1#![allow(clippy::mutable_key_type)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
4
5use ntex_util::time::{Seconds, now, sleep};
6use ntex_util::{HashSet, spawn};
7
8use crate::{IoRef, io::IoState};
9
10const CAP: usize = 64;
11const SEC: Duration = Duration::from_secs(1);
12
13thread_local! {
14    static TIMER: Inner = Inner {
15        running: Cell::new(false),
16        base: Cell::new(Instant::now()),
17        current: Cell::new(0),
18        storage: RefCell::new(InnerMut {
19            cache: VecDeque::with_capacity(CAP),
20            notifications: BTreeMap::default(),
21        })
22    }
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub struct TimerHandle(u32);
27
28impl TimerHandle {
29    pub const ZERO: TimerHandle = TimerHandle(0);
30
31    pub fn is_set(&self) -> bool {
32        self.0 != 0
33    }
34
35    pub fn remains(&self) -> Seconds {
36        TIMER.with(|timer| {
37            let cur = timer.current.get();
38            if self.0 <= cur {
39                Seconds::ZERO
40            } else {
41                #[allow(clippy::cast_possible_truncation)]
42                Seconds((self.0 - cur) as u16)
43            }
44        })
45    }
46
47    pub fn instant(&self) -> Instant {
48        TIMER.with(|timer| timer.base.get() + Duration::from_secs(u64::from(self.0)))
49    }
50}
51
52impl ops::Add<Seconds> for TimerHandle {
53    type Output = TimerHandle;
54
55    #[inline]
56    fn add(self, other: Seconds) -> TimerHandle {
57        TimerHandle(self.0 + u32::from(other.0))
58    }
59}
60
61struct Inner {
62    running: Cell<bool>,
63    base: Cell<Instant>,
64    current: Cell<u32>,
65    storage: RefCell<InnerMut>,
66}
67
68struct InnerMut {
69    cache: VecDeque<HashSet<Rc<IoState>>>,
70    notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
71}
72
73impl InnerMut {
74    fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
75        if let Some(states) = self.notifications.get_mut(&hnd.0) {
76            states.remove(&io.0);
77        }
78    }
79}
80
81pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
82    TIMER.with(|timer| {
83        timer.storage.borrow_mut().unregister(hnd, io);
84    });
85}
86
87pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
88    TIMER.with(|timer| {
89        let new_hnd = timer.current.get() + u32::from(timeout.0);
90        if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
91            hnd
92        } else {
93            timer.storage.borrow_mut().unregister(hnd, io);
94            register(timeout, io)
95        }
96    })
97}
98
99pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
100    TIMER.with(|timer| {
101        // setup current delta
102        if !timer.running.get() {
103            #[allow(clippy::cast_possible_truncation)]
104            let current = (now() - timer.base.get()).as_secs() as u32;
105            timer.current.set(current);
106            log::debug!(
107                "{}: Timer driver does not run, current: {}",
108                io.tag(),
109                current
110            );
111        }
112
113        let hnd = {
114            let hnd = timer.current.get() + u32::from(timeout.0);
115            let mut inner = timer.storage.borrow_mut();
116
117            // insert key
118            if let Some(item) = inner.notifications.range_mut(hnd..=hnd).next() {
119                item.1.insert(io.0.clone());
120                *item.0
121            } else {
122                let mut items = inner.cache.pop_front().unwrap_or_default();
123                items.insert(io.0.clone());
124                inner.notifications.insert(hnd, items);
125                hnd
126            }
127        };
128
129        if !timer.running.get() {
130            timer.running.set(true);
131
132            #[allow(clippy::let_underscore_future)]
133            let _ = spawn(async move {
134                let guard = TimerGuard;
135                loop {
136                    sleep(SEC).await;
137                    let stop = TIMER.with(|timer| {
138                        let current = timer.current.get();
139                        timer.current.set(current + 1);
140
141                        // notify io dispatcher
142                        let mut inner = timer.storage.borrow_mut();
143                        while let Some(key) = inner.notifications.keys().next() {
144                            let key = *key;
145                            if key <= current {
146                                let mut items = inner.notifications.remove(&key).unwrap();
147                                for st in items.drain() {
148                                    st.notify_timeout();
149                                }
150                                if inner.cache.len() <= CAP {
151                                    inner.cache.push_back(items);
152                                }
153                            } else {
154                                break;
155                            }
156                        }
157
158                        // new tick
159                        if inner.notifications.is_empty() {
160                            timer.running.set(false);
161                            true
162                        } else {
163                            false
164                        }
165                    });
166
167                    if stop {
168                        break;
169                    }
170                }
171                drop(guard);
172            });
173        }
174
175        TimerHandle(hnd)
176    })
177}
178
179struct TimerGuard;
180
181impl Drop for TimerGuard {
182    fn drop(&mut self) {
183        TIMER.with(|timer| {
184            timer.running.set(false);
185            timer.storage.borrow_mut().notifications.clear();
186        });
187    }
188}