1#![allow(clippy::mutable_key_type)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
4
5use ntex_util::time::{now, sleep, Seconds};
6use ntex_util::{spawn, HashSet};
7
8use crate::{io::IoState, IoRef};
9
10const CAP: usize = 64;
11const SEC: Duration = Duration::from_secs(1);
12
13thread_local! {
14 static TIMER: Inner = Inner {
15 running: Cell::new(false),
16 base: Cell::new(Instant::now()),
17 current: Cell::new(0),
18 storage: RefCell::new(InnerMut {
19 cache: VecDeque::with_capacity(CAP),
20 notifications: BTreeMap::default(),
21 })
22 }
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub struct TimerHandle(u32);
27
28impl TimerHandle {
29 pub const ZERO: TimerHandle = TimerHandle(0);
30
31 pub fn is_set(&self) -> bool {
32 self.0 != 0
33 }
34
35 pub fn remains(&self) -> Seconds {
36 TIMER.with(|timer| {
37 let cur = timer.current.get();
38 if self.0 <= cur {
39 Seconds::ZERO
40 } else {
41 Seconds((self.0 - cur) as u16)
42 }
43 })
44 }
45
46 pub fn instant(&self) -> Instant {
47 TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64))
48 }
49}
50
51impl ops::Add<Seconds> for TimerHandle {
52 type Output = TimerHandle;
53
54 #[inline]
55 fn add(self, other: Seconds) -> TimerHandle {
56 TimerHandle(self.0 + other.0 as u32)
57 }
58}
59
60struct Inner {
61 running: Cell<bool>,
62 base: Cell<Instant>,
63 current: Cell<u32>,
64 storage: RefCell<InnerMut>,
65}
66
67struct InnerMut {
68 cache: VecDeque<HashSet<Rc<IoState>>>,
69 notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
70}
71
72impl InnerMut {
73 fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
74 if let Some(states) = self.notifications.get_mut(&hnd.0) {
75 states.remove(&io.0);
76 }
77 }
78}
79
80pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
81 TIMER.with(|timer| {
82 timer.storage.borrow_mut().unregister(hnd, io);
83 })
84}
85
86pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
87 TIMER.with(|timer| {
88 let new_hnd = timer.current.get() + timeout.0 as u32;
89 if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
90 hnd
91 } else {
92 timer.storage.borrow_mut().unregister(hnd, io);
93 register(timeout, io)
94 }
95 })
96}
97
98pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
99 TIMER.with(|timer| {
100 if !timer.running.get() {
102 let current = (now() - timer.base.get()).as_secs() as u32;
103 timer.current.set(current);
104 log::debug!(
105 "{}: Timer driver does not run, current: {}",
106 io.tag(),
107 current
108 );
109 }
110
111 let hnd = {
112 let hnd = timer.current.get() + timeout.0 as u32;
113 let mut inner = timer.storage.borrow_mut();
114
115 if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() {
117 item.1.insert(io.0.clone());
118 *item.0
119 } else {
120 let mut items = inner.cache.pop_front().unwrap_or_default();
121 items.insert(io.0.clone());
122 inner.notifications.insert(hnd, items);
123 hnd
124 }
125 };
126
127 if !timer.running.get() {
128 timer.running.set(true);
129
130 #[allow(clippy::let_underscore_future)]
131 let _ = spawn(async move {
132 let guard = TimerGuard;
133 loop {
134 sleep(SEC).await;
135 let stop = TIMER.with(|timer| {
136 let current = timer.current.get();
137 timer.current.set(current + 1);
138
139 let mut inner = timer.storage.borrow_mut();
141 while let Some(key) = inner.notifications.keys().next() {
142 let key = *key;
143 if key <= current {
144 let mut items = inner.notifications.remove(&key).unwrap();
145 items.drain().for_each(|st| st.notify_timeout());
146 if inner.cache.len() <= CAP {
147 inner.cache.push_back(items);
148 }
149 } else {
150 break;
151 }
152 }
153
154 if inner.notifications.is_empty() {
156 timer.running.set(false);
157 true
158 } else {
159 false
160 }
161 });
162
163 if stop {
164 break;
165 }
166 }
167 drop(guard);
168 });
169 }
170
171 TimerHandle(hnd)
172 })
173}
174
175struct TimerGuard;
176
177impl Drop for TimerGuard {
178 fn drop(&mut self) {
179 TIMER.with(|timer| {
180 timer.running.set(false);
181 timer.storage.borrow_mut().notifications.clear();
182 })
183 }
184}