ntex_io/
timer.rs

1#![allow(clippy::mutable_key_type)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
4
5use ntex_util::time::{now, sleep, Seconds};
6use ntex_util::{spawn, HashSet};
7
8use crate::{io::IoState, IoRef};
9
10const CAP: usize = 64;
11const SEC: Duration = Duration::from_secs(1);
12
13thread_local! {
14    static TIMER: Inner = Inner {
15        running: Cell::new(false),
16        base: Cell::new(Instant::now()),
17        current: Cell::new(0),
18        storage: RefCell::new(InnerMut {
19            cache: VecDeque::with_capacity(CAP),
20            notifications: BTreeMap::default(),
21        })
22    }
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub struct TimerHandle(u32);
27
28impl TimerHandle {
29    pub const ZERO: TimerHandle = TimerHandle(0);
30
31    pub fn is_set(&self) -> bool {
32        self.0 != 0
33    }
34
35    pub fn remains(&self) -> Seconds {
36        TIMER.with(|timer| {
37            let cur = timer.current.get();
38            if self.0 <= cur {
39                Seconds::ZERO
40            } else {
41                Seconds((self.0 - cur) as u16)
42            }
43        })
44    }
45
46    pub fn instant(&self) -> Instant {
47        TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64))
48    }
49}
50
51impl ops::Add<Seconds> for TimerHandle {
52    type Output = TimerHandle;
53
54    #[inline]
55    fn add(self, other: Seconds) -> TimerHandle {
56        TimerHandle(self.0 + other.0 as u32)
57    }
58}
59
60struct Inner {
61    running: Cell<bool>,
62    base: Cell<Instant>,
63    current: Cell<u32>,
64    storage: RefCell<InnerMut>,
65}
66
67struct InnerMut {
68    cache: VecDeque<HashSet<Rc<IoState>>>,
69    notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
70}
71
72impl InnerMut {
73    fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
74        if let Some(states) = self.notifications.get_mut(&hnd.0) {
75            states.remove(&io.0);
76        }
77    }
78}
79
80pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
81    TIMER.with(|timer| {
82        timer.storage.borrow_mut().unregister(hnd, io);
83    })
84}
85
86pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
87    TIMER.with(|timer| {
88        let new_hnd = timer.current.get() + timeout.0 as u32;
89        if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
90            hnd
91        } else {
92            timer.storage.borrow_mut().unregister(hnd, io);
93            register(timeout, io)
94        }
95    })
96}
97
98pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
99    TIMER.with(|timer| {
100        // setup current delta
101        if !timer.running.get() {
102            let current = (now() - timer.base.get()).as_secs() as u32;
103            timer.current.set(current);
104            log::debug!(
105                "{}: Timer driver does not run, current: {}",
106                io.tag(),
107                current
108            );
109        }
110
111        let hnd = {
112            let hnd = timer.current.get() + timeout.0 as u32;
113            let mut inner = timer.storage.borrow_mut();
114
115            // insert key
116            if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() {
117                item.1.insert(io.0.clone());
118                *item.0
119            } else {
120                let mut items = inner.cache.pop_front().unwrap_or_default();
121                items.insert(io.0.clone());
122                inner.notifications.insert(hnd, items);
123                hnd
124            }
125        };
126
127        if !timer.running.get() {
128            timer.running.set(true);
129
130            #[allow(clippy::let_underscore_future)]
131            let _ = spawn(async move {
132                let guard = TimerGuard;
133                loop {
134                    sleep(SEC).await;
135                    let stop = TIMER.with(|timer| {
136                        let current = timer.current.get();
137                        timer.current.set(current + 1);
138
139                        // notify io dispatcher
140                        let mut inner = timer.storage.borrow_mut();
141                        while let Some(key) = inner.notifications.keys().next() {
142                            let key = *key;
143                            if key <= current {
144                                let mut items = inner.notifications.remove(&key).unwrap();
145                                items.drain().for_each(|st| st.notify_timeout());
146                                if inner.cache.len() <= CAP {
147                                    inner.cache.push_back(items);
148                                }
149                            } else {
150                                break;
151                            }
152                        }
153
154                        // new tick
155                        if inner.notifications.is_empty() {
156                            timer.running.set(false);
157                            true
158                        } else {
159                            false
160                        }
161                    });
162
163                    if stop {
164                        break;
165                    }
166                }
167                drop(guard);
168            });
169        }
170
171        TimerHandle(hnd)
172    })
173}
174
175struct TimerGuard;
176
177impl Drop for TimerGuard {
178    fn drop(&mut self) {
179        TIMER.with(|timer| {
180            timer.running.set(false);
181            timer.storage.borrow_mut().notifications.clear();
182        })
183    }
184}