1#![allow(clippy::cast_possible_truncation)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, mem, num::NonZeroUsize, ops, time::Duration, time::Instant};
4
5use ntex_rt::with_item;
6use ntex_util::time::{Seconds, now, sleep};
7use ntex_util::{HashSet, spawn};
8use slab::Slab;
9
10use crate::IoRef;
11
12const CAP: usize = 64;
13const SEC: Duration = Duration::from_secs(1);
14
15#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Default)]
16pub struct Id(Option<NonZeroUsize>);
17
18#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19pub struct TimerHandle(u32);
20
21impl TimerHandle {
22 pub const ZERO: TimerHandle = TimerHandle(0);
23
24 pub fn is_set(&self) -> bool {
25 self.0 != 0
26 }
27
28 pub fn remains(&self) -> Seconds {
29 IoManager::with(|mgr| {
30 let cur = mgr.timers.current;
31 if self.0 <= cur {
32 Seconds::ZERO
33 } else {
34 #[allow(clippy::cast_possible_truncation)]
35 Seconds((self.0 - cur) as u16)
36 }
37 })
38 }
39
40 pub fn instant(&self) -> Instant {
41 IoManager::with(|mgr| mgr.timers.base + Duration::from_secs(u64::from(self.0)))
42 }
43
44 pub(crate) fn update(self, timeout: Seconds, io: &IoRef) -> TimerHandle {
45 IoManager::with(|mgr| {
46 let new_hnd = mgr.timers.current + u32::from(timeout.0);
47 if self.0 == new_hnd || self.0 == new_hnd + 1 {
48 self
49 } else {
50 mgr.timers.unregister(self, io);
51 mgr.timers.register(timeout, io)
52 }
53 })
54 }
55
56 pub(crate) fn unregister(self, io: &IoRef) {
57 IoManager::with(|manager| manager.timers.unregister(self, io));
58 }
59
60 pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
61 IoManager::with(move |mgr| mgr.timers.register(timeout, io))
62 }
63}
64
65impl ops::Add<Seconds> for TimerHandle {
66 type Output = TimerHandle;
67
68 #[inline]
69 fn add(self, other: Seconds) -> TimerHandle {
70 TimerHandle(self.0 + u32::from(other.0))
71 }
72}
73
74struct TimerStorage {
75 running: bool,
76 base: Instant,
77 current: u32,
78 cache: VecDeque<HashSet<Id>>,
79 notifications: BTreeMap<u32, HashSet<Id>>,
80}
81
82impl TimerStorage {
83 fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
84 if let Some(states) = self.notifications.get_mut(&hnd.0) {
85 states.remove(&io.id());
86 }
87 }
88
89 fn register(&mut self, timeout: Seconds, io: &IoRef) -> TimerHandle {
90 if !self.running {
92 self.current = (now() - self.base).as_secs() as u32;
93 }
94
95 let hnd = {
96 let hnd = self.current + u32::from(timeout.0);
97
98 if let Some(item) = self.notifications.range_mut(hnd..=hnd).next() {
100 item.1.insert(io.id());
101 *item.0
102 } else {
103 let mut items = self.cache.pop_front().unwrap_or_default();
104 items.insert(io.id());
105 self.notifications.insert(hnd, items);
106 hnd
107 }
108 };
109
110 self.run_timer();
111
112 TimerHandle(hnd)
113 }
114
115 fn run_timer(&mut self) {
116 if self.running {
117 return;
118 }
119 self.running = true;
120
121 spawn(async move {
122 let guard = TimerGuard;
123 loop {
124 sleep(SEC).await;
125 let stop = IoManager::with(|mgr| {
126 let current = mgr.timers.current;
127 mgr.timers.current = current + 1;
128
129 while let Some(key) = mgr.timers.notifications.keys().next() {
131 let key = *key;
132 if key <= current {
133 let mut items = mgr.timers.notifications.remove(&key).unwrap();
134 for id in items.drain() {
135 if let Some(io) = mgr.get(id) {
136 io.notify_timeout();
137 }
138 }
139 if mgr.timers.cache.len() <= CAP {
140 mgr.timers.cache.push_back(items);
141 }
142 } else {
143 break;
144 }
145 }
146
147 if mgr.timers.notifications.is_empty() {
149 mgr.timers.running = false;
150 true
151 } else {
152 false
153 }
154 });
155
156 if stop {
157 break;
158 }
159 }
160 drop(guard);
161 });
162 }
163}
164
165struct TimerGuard;
166
167impl Drop for TimerGuard {
168 fn drop(&mut self) {
169 IoManager::with(|mgr| {
170 mgr.timers.running = false;
171 mgr.timers.notifications.clear();
172 });
173 }
174}
175
176struct IoStorage(Cell<Option<Box<IoManager>>>);
177
178pub(crate) struct IoManager {
179 storage: Slab<Option<IoRef>>,
180 timers: TimerStorage,
181 pub(crate) iops: Iops,
182}
183
184impl Default for IoStorage {
185 fn default() -> IoStorage {
186 IoStorage(Cell::new(Some(Box::new(IoManager::default()))))
187 }
188}
189
190impl Default for IoManager {
191 fn default() -> IoManager {
192 let mut storage = Slab::new();
193 assert_eq!(storage.insert(None), 0);
194
195 IoManager {
196 storage,
197 timers: TimerStorage {
198 running: false,
199 base: Instant::now(),
200 current: 0,
201 cache: VecDeque::with_capacity(CAP),
202 notifications: BTreeMap::default(),
203 },
204 iops: Iops {
205 running: false,
206 ops: Vec::with_capacity(32),
207 },
208 }
209 }
210}
211
212impl IoManager {
213 fn with<F, R>(f: F) -> R
214 where
215 F: FnOnce(&mut IoManager) -> R,
216 {
217 with_item::<IoStorage, _, _>(|st| {
218 let mut mgr = st.0.take().unwrap();
219 let result = f(&mut mgr);
220 st.0.set(Some(mgr));
221 result
222 })
223 }
224
225 fn get(&self, id: Id) -> Option<&IoRef> {
226 if let Some(id) = id.0 {
227 self.storage.get(id.get()).and_then(|item| item.as_ref())
228 } else {
229 None
230 }
231 }
232
233 pub(crate) fn register(io: &IoRef) -> Id {
234 IoManager::with(|manager| {
235 let entry = manager.storage.vacant_entry();
236 let id = Id(NonZeroUsize::new(entry.key()));
237 entry.insert(Some(io.clone()));
238 id
239 })
240 }
241
242 pub(crate) fn unregister(io: &IoRef) {
243 if let Some(id) = io.id().0 {
244 io.0.id.set(Id(None));
245 IoManager::with(|manager| {
246 if manager.storage.contains(id.get()) {
247 manager.storage.remove(id.get());
248 }
249 });
250 }
251 }
252}
253
254pub(crate) struct Iops {
255 running: bool,
256 pub(crate) ops: Vec<Id>,
257}
258
259impl Iops {
260 pub(crate) fn schedule_write(id: Id) {
261 IoManager::with(|mgr| {
262 mgr.iops.ops.push(id);
263
264 if !mgr.iops.running {
265 mgr.iops.running = true;
266 spawn(async move { Iops::run() });
267 }
268 });
269 }
270
271 pub(crate) fn run() {
272 IoManager::with(|mgr| {
273 mgr.iops.running = false;
274
275 let mut ops = mem::take(&mut mgr.iops.ops);
276 for id in ops.drain(..) {
277 if let Some(io) = mgr.get(id) {
278 io.ops_send_buf();
279 }
280 }
281 let _ = mem::replace(&mut mgr.iops.ops, ops);
282 });
283 }
284
285 #[cfg(test)]
286 pub(crate) fn is_registered(io: &IoRef) -> bool {
287 IoManager::with(|mgr| mgr.iops.ops.contains(&io.id()))
288 }
289}