1#![allow(clippy::mutable_key_type)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
4
5use ntex_util::time::{Seconds, now, sleep};
6use ntex_util::{HashSet, spawn};
7
8use crate::{IoRef, io::IoState};
9
10const CAP: usize = 64;
11const SEC: Duration = Duration::from_secs(1);
12
13thread_local! {
14 static TIMER: Inner = Inner {
15 running: Cell::new(false),
16 base: Cell::new(Instant::now()),
17 current: Cell::new(0),
18 storage: RefCell::new(InnerMut {
19 cache: VecDeque::with_capacity(CAP),
20 notifications: BTreeMap::default(),
21 })
22 }
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub struct TimerHandle(u32);
27
28impl TimerHandle {
29 pub const ZERO: TimerHandle = TimerHandle(0);
30
31 pub fn is_set(&self) -> bool {
32 self.0 != 0
33 }
34
35 pub fn remains(&self) -> Seconds {
36 TIMER.with(|timer| {
37 let cur = timer.current.get();
38 if self.0 <= cur {
39 Seconds::ZERO
40 } else {
41 #[allow(clippy::cast_possible_truncation)]
42 Seconds((self.0 - cur) as u16)
43 }
44 })
45 }
46
47 pub fn instant(&self) -> Instant {
48 TIMER.with(|timer| timer.base.get() + Duration::from_secs(u64::from(self.0)))
49 }
50}
51
52impl ops::Add<Seconds> for TimerHandle {
53 type Output = TimerHandle;
54
55 #[inline]
56 fn add(self, other: Seconds) -> TimerHandle {
57 TimerHandle(self.0 + u32::from(other.0))
58 }
59}
60
61struct Inner {
62 running: Cell<bool>,
63 base: Cell<Instant>,
64 current: Cell<u32>,
65 storage: RefCell<InnerMut>,
66}
67
68struct InnerMut {
69 cache: VecDeque<HashSet<Rc<IoState>>>,
70 notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
71}
72
73impl InnerMut {
74 fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
75 if let Some(states) = self.notifications.get_mut(&hnd.0) {
76 states.remove(&io.0);
77 }
78 }
79}
80
81pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
82 TIMER.with(|timer| {
83 timer.storage.borrow_mut().unregister(hnd, io);
84 });
85}
86
87pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
88 TIMER.with(|timer| {
89 let new_hnd = timer.current.get() + u32::from(timeout.0);
90 if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
91 hnd
92 } else {
93 timer.storage.borrow_mut().unregister(hnd, io);
94 register(timeout, io)
95 }
96 })
97}
98
99pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
100 TIMER.with(|timer| {
101 if !timer.running.get() {
103 #[allow(clippy::cast_possible_truncation)]
104 let current = (now() - timer.base.get()).as_secs() as u32;
105 timer.current.set(current);
106 log::debug!(
107 "{}: Timer driver does not run, current: {}",
108 io.tag(),
109 current
110 );
111 }
112
113 let hnd = {
114 let hnd = timer.current.get() + u32::from(timeout.0);
115 let mut inner = timer.storage.borrow_mut();
116
117 if let Some(item) = inner.notifications.range_mut(hnd..=hnd).next() {
119 item.1.insert(io.0.clone());
120 *item.0
121 } else {
122 let mut items = inner.cache.pop_front().unwrap_or_default();
123 items.insert(io.0.clone());
124 inner.notifications.insert(hnd, items);
125 hnd
126 }
127 };
128
129 if !timer.running.get() {
130 timer.running.set(true);
131
132 #[allow(clippy::let_underscore_future)]
133 let _ = spawn(async move {
134 let guard = TimerGuard;
135 loop {
136 sleep(SEC).await;
137 let stop = TIMER.with(|timer| {
138 let current = timer.current.get();
139 timer.current.set(current + 1);
140
141 let mut inner = timer.storage.borrow_mut();
143 while let Some(key) = inner.notifications.keys().next() {
144 let key = *key;
145 if key <= current {
146 let mut items = inner.notifications.remove(&key).unwrap();
147 for st in items.drain() {
148 st.notify_timeout();
149 }
150 if inner.cache.len() <= CAP {
151 inner.cache.push_back(items);
152 }
153 } else {
154 break;
155 }
156 }
157
158 if inner.notifications.is_empty() {
160 timer.running.set(false);
161 true
162 } else {
163 false
164 }
165 });
166
167 if stop {
168 break;
169 }
170 }
171 drop(guard);
172 });
173 }
174
175 TimerHandle(hnd)
176 })
177}
178
179struct TimerGuard;
180
181impl Drop for TimerGuard {
182 fn drop(&mut self) {
183 TIMER.with(|timer| {
184 timer.running.set(false);
185 timer.storage.borrow_mut().notifications.clear();
186 });
187 }
188}