Skip to main content

ntex_io/
timer.rs

1#![allow(clippy::mutable_key_type)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
4
5use ntex_util::time::{Seconds, now, sleep};
6use ntex_util::{HashSet, spawn};
7
8use crate::{IoRef, io::IoState};
9
10const CAP: usize = 64;
11const SEC: Duration = Duration::from_secs(1);
12
13thread_local! {
14    static TIMER: Inner = Inner {
15        running: Cell::new(false),
16        base: Cell::new(Instant::now()),
17        current: Cell::new(0),
18        storage: RefCell::new(InnerMut {
19            cache: VecDeque::with_capacity(CAP),
20            notifications: BTreeMap::default(),
21        })
22    }
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub struct TimerHandle(u32);
27
28impl TimerHandle {
29    pub const ZERO: TimerHandle = TimerHandle(0);
30
31    pub fn is_set(&self) -> bool {
32        self.0 != 0
33    }
34
35    pub fn remains(&self) -> Seconds {
36        TIMER.with(|timer| {
37            let cur = timer.current.get();
38            if self.0 <= cur {
39                Seconds::ZERO
40            } else {
41                #[allow(clippy::cast_possible_truncation)]
42                Seconds((self.0 - cur) as u16)
43            }
44        })
45    }
46
47    pub fn instant(&self) -> Instant {
48        TIMER.with(|timer| timer.base.get() + Duration::from_secs(u64::from(self.0)))
49    }
50}
51
52impl ops::Add<Seconds> for TimerHandle {
53    type Output = TimerHandle;
54
55    #[inline]
56    fn add(self, other: Seconds) -> TimerHandle {
57        TimerHandle(self.0 + u32::from(other.0))
58    }
59}
60
61struct Inner {
62    running: Cell<bool>,
63    base: Cell<Instant>,
64    current: Cell<u32>,
65    storage: RefCell<InnerMut>,
66}
67
68struct InnerMut {
69    cache: VecDeque<HashSet<Rc<IoState>>>,
70    notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
71}
72
73impl InnerMut {
74    fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
75        if let Some(states) = self.notifications.get_mut(&hnd.0) {
76            states.remove(&io.0);
77        }
78    }
79}
80
81pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
82    TIMER.with(|timer| {
83        timer.storage.borrow_mut().unregister(hnd, io);
84    });
85}
86
87pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
88    TIMER.with(|timer| {
89        let new_hnd = timer.current.get() + u32::from(timeout.0);
90        if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
91            hnd
92        } else {
93            timer.storage.borrow_mut().unregister(hnd, io);
94            register(timeout, io)
95        }
96    })
97}
98
99pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
100    TIMER.with(|timer| {
101        // setup current delta
102        if !timer.running.get() {
103            #[allow(clippy::cast_possible_truncation)]
104            let current = (now() - timer.base.get()).as_secs() as u32;
105            timer.current.set(current);
106            log::debug!(
107                "{}: Timer driver does not run, current: {}",
108                io.tag(),
109                current
110            );
111        }
112
113        let hnd = {
114            let hnd = timer.current.get() + u32::from(timeout.0);
115            let mut inner = timer.storage.borrow_mut();
116
117            // insert key
118            if let Some(item) = inner.notifications.range_mut(hnd..=hnd).next() {
119                item.1.insert(io.0.clone());
120                *item.0
121            } else {
122                let mut items = inner.cache.pop_front().unwrap_or_default();
123                items.insert(io.0.clone());
124                inner.notifications.insert(hnd, items);
125                hnd
126            }
127        };
128
129        if !timer.running.get() {
130            timer.running.set(true);
131
132            spawn(async move {
133                let guard = TimerGuard;
134                loop {
135                    sleep(SEC).await;
136                    let stop = TIMER.with(|timer| {
137                        let current = timer.current.get();
138                        timer.current.set(current + 1);
139
140                        // notify io dispatcher
141                        let mut inner = timer.storage.borrow_mut();
142                        while let Some(key) = inner.notifications.keys().next() {
143                            let key = *key;
144                            if key <= current {
145                                let mut items = inner.notifications.remove(&key).unwrap();
146                                for st in items.drain() {
147                                    st.notify_timeout();
148                                }
149                                if inner.cache.len() <= CAP {
150                                    inner.cache.push_back(items);
151                                }
152                            } else {
153                                break;
154                            }
155                        }
156
157                        // new tick
158                        if inner.notifications.is_empty() {
159                            timer.running.set(false);
160                            true
161                        } else {
162                            false
163                        }
164                    });
165
166                    if stop {
167                        break;
168                    }
169                }
170                drop(guard);
171            });
172        }
173
174        TimerHandle(hnd)
175    })
176}
177
178struct TimerGuard;
179
180impl Drop for TimerGuard {
181    fn drop(&mut self) {
182        TIMER.with(|timer| {
183            timer.running.set(false);
184            timer.storage.borrow_mut().notifications.clear();
185        });
186    }
187}