Skip to main content

ntex_io/
ops.rs

1#![allow(clippy::cast_possible_truncation)]
2use std::collections::{BTreeMap, VecDeque};
3use std::{cell::Cell, mem, num::NonZeroUsize, ops, time::Duration, time::Instant};
4
5use ntex_rt::with_item;
6use ntex_util::time::{Seconds, now, sleep};
7use ntex_util::{HashSet, spawn};
8use slab::Slab;
9
10use crate::IoRef;
11
12const CAP: usize = 64;
13const SEC: Duration = Duration::from_secs(1);
14
15#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Default)]
16pub struct Id(Option<NonZeroUsize>);
17
18#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19pub struct TimerHandle(u32);
20
21impl TimerHandle {
22    pub const ZERO: TimerHandle = TimerHandle(0);
23
24    pub fn is_set(&self) -> bool {
25        self.0 != 0
26    }
27
28    pub fn remains(&self) -> Seconds {
29        IoManager::with(|mgr| {
30            let cur = mgr.timers.current;
31            if self.0 <= cur {
32                Seconds::ZERO
33            } else {
34                #[allow(clippy::cast_possible_truncation)]
35                Seconds((self.0 - cur) as u16)
36            }
37        })
38    }
39
40    pub fn instant(&self) -> Instant {
41        IoManager::with(|mgr| mgr.timers.base + Duration::from_secs(u64::from(self.0)))
42    }
43
44    pub(crate) fn update(self, timeout: Seconds, io: &IoRef) -> TimerHandle {
45        IoManager::with(|mgr| {
46            let new_hnd = mgr.timers.current + u32::from(timeout.0);
47            if self.0 == new_hnd || self.0 == new_hnd + 1 {
48                self
49            } else {
50                mgr.timers.unregister(self, io);
51                mgr.timers.register(timeout, io)
52            }
53        })
54    }
55
56    pub(crate) fn unregister(self, io: &IoRef) {
57        IoManager::with(|manager| manager.timers.unregister(self, io));
58    }
59
60    pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
61        IoManager::with(move |mgr| mgr.timers.register(timeout, io))
62    }
63}
64
65impl ops::Add<Seconds> for TimerHandle {
66    type Output = TimerHandle;
67
68    #[inline]
69    fn add(self, other: Seconds) -> TimerHandle {
70        TimerHandle(self.0 + u32::from(other.0))
71    }
72}
73
74struct TimerStorage {
75    running: bool,
76    base: Instant,
77    current: u32,
78    cache: VecDeque<HashSet<Id>>,
79    notifications: BTreeMap<u32, HashSet<Id>>,
80}
81
82impl TimerStorage {
83    fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
84        if let Some(states) = self.notifications.get_mut(&hnd.0) {
85            states.remove(&io.id());
86        }
87    }
88
89    fn register(&mut self, timeout: Seconds, io: &IoRef) -> TimerHandle {
90        // setup current delta
91        if !self.running {
92            self.current = (now() - self.base).as_secs() as u32;
93        }
94
95        let hnd = {
96            let hnd = self.current + u32::from(timeout.0);
97
98            // insert key
99            if let Some(item) = self.notifications.range_mut(hnd..=hnd).next() {
100                item.1.insert(io.id());
101                *item.0
102            } else {
103                let mut items = self.cache.pop_front().unwrap_or_default();
104                items.insert(io.id());
105                self.notifications.insert(hnd, items);
106                hnd
107            }
108        };
109
110        self.run_timer();
111
112        TimerHandle(hnd)
113    }
114
115    fn run_timer(&mut self) {
116        if self.running {
117            return;
118        }
119        self.running = true;
120
121        spawn(async move {
122            let guard = TimerGuard;
123            loop {
124                sleep(SEC).await;
125                let stop = IoManager::with(|mgr| {
126                    let current = mgr.timers.current;
127                    mgr.timers.current = current + 1;
128
129                    // notify io dispatcher
130                    while let Some(key) = mgr.timers.notifications.keys().next() {
131                        let key = *key;
132                        if key <= current {
133                            let mut items = mgr.timers.notifications.remove(&key).unwrap();
134                            for id in items.drain() {
135                                if let Some(io) = mgr.get(id) {
136                                    io.notify_timeout();
137                                }
138                            }
139                            if mgr.timers.cache.len() <= CAP {
140                                mgr.timers.cache.push_back(items);
141                            }
142                        } else {
143                            break;
144                        }
145                    }
146
147                    // new tick
148                    if mgr.timers.notifications.is_empty() {
149                        mgr.timers.running = false;
150                        true
151                    } else {
152                        false
153                    }
154                });
155
156                if stop {
157                    break;
158                }
159            }
160            drop(guard);
161        });
162    }
163}
164
165struct TimerGuard;
166
167impl Drop for TimerGuard {
168    fn drop(&mut self) {
169        IoManager::with(|mgr| {
170            mgr.timers.running = false;
171            mgr.timers.notifications.clear();
172        });
173    }
174}
175
176struct IoStorage(Cell<Option<Box<IoManager>>>);
177
178pub(crate) struct IoManager {
179    storage: Slab<Option<IoRef>>,
180    timers: TimerStorage,
181    pub(crate) iops: Iops,
182}
183
184impl Default for IoStorage {
185    fn default() -> IoStorage {
186        IoStorage(Cell::new(Some(Box::new(IoManager::default()))))
187    }
188}
189
190impl Default for IoManager {
191    fn default() -> IoManager {
192        let mut storage = Slab::new();
193        assert_eq!(storage.insert(None), 0);
194
195        IoManager {
196            storage,
197            timers: TimerStorage {
198                running: false,
199                base: Instant::now(),
200                current: 0,
201                cache: VecDeque::with_capacity(CAP),
202                notifications: BTreeMap::default(),
203            },
204            iops: Iops {
205                running: false,
206                ops: Vec::with_capacity(32),
207            },
208        }
209    }
210}
211
212impl IoManager {
213    fn with<F, R>(f: F) -> R
214    where
215        F: FnOnce(&mut IoManager) -> R,
216    {
217        with_item::<IoStorage, _, _>(|st| {
218            let mut mgr = st.0.take().unwrap();
219            let result = f(&mut mgr);
220            st.0.set(Some(mgr));
221            result
222        })
223    }
224
225    fn get(&self, id: Id) -> Option<&IoRef> {
226        if let Some(id) = id.0 {
227            self.storage.get(id.get()).and_then(|item| item.as_ref())
228        } else {
229            None
230        }
231    }
232
233    pub(crate) fn register(io: &IoRef) -> Id {
234        IoManager::with(|manager| {
235            let entry = manager.storage.vacant_entry();
236            let id = Id(NonZeroUsize::new(entry.key()));
237            entry.insert(Some(io.clone()));
238            id
239        })
240    }
241
242    pub(crate) fn unregister(io: &IoRef) {
243        if let Some(id) = io.id().0 {
244            io.0.id.set(Id(None));
245            IoManager::with(|manager| {
246                if manager.storage.contains(id.get()) {
247                    manager.storage.remove(id.get());
248                }
249            });
250        }
251    }
252}
253
254pub(crate) struct Iops {
255    running: bool,
256    pub(crate) ops: Vec<Id>,
257}
258
259impl Iops {
260    pub(crate) fn schedule_write(id: Id) {
261        IoManager::with(|mgr| {
262            mgr.iops.ops.push(id);
263
264            if !mgr.iops.running {
265                mgr.iops.running = true;
266                spawn(async move { Iops::run() });
267            }
268        });
269    }
270
271    pub(crate) fn run() {
272        IoManager::with(|mgr| {
273            mgr.iops.running = false;
274
275            let mut ops = mem::take(&mut mgr.iops.ops);
276            for id in ops.drain(..) {
277                if let Some(io) = mgr.get(id) {
278                    io.ops_send_buf();
279                }
280            }
281            let _ = mem::replace(&mut mgr.iops.ops, ops);
282        });
283    }
284
285    #[cfg(test)]
286    pub(crate) fn is_registered(io: &IoRef) -> bool {
287        IoManager::with(|mgr| mgr.iops.ops.contains(&io.id()))
288    }
289}