1#![doc(html_root_url = "https://docs.rs/notifier/0.1.3")]
14#![warn(
15 trivial_casts,
19 trivial_numeric_casts,
20 unused_import_braces,
21 unused_qualifications,
22 unused_results,
23 clippy::pedantic
24)] #![allow(
26 clippy::new_without_default,
27 clippy::indexing_slicing,
28 clippy::needless_pass_by_value,
29 clippy::inline_always
30)]
31
32mod heap;
33mod timer;
34
35use either::Either;
36use log::trace;
37use std::{cmp, collections::HashSet, hash::Hash, marker, mem, sync, time};
38
39#[cfg(unix)]
40type Fd = std::os::unix::io::RawFd;
41#[cfg(windows)]
42type Fd = std::os::windows::io::RawHandle;
43
44pub struct NotifierContext<'a, Key: 'a>
45where
46 Key: Clone + Eq + Hash + Into<usize>,
47 usize: Into<Key>,
48{
49 executor: &'a Notifier<Key>,
50 key: Key,
51}
52impl<'a, Key: 'a> NotifierContext<'a, Key>
53where
54 Key: Clone + Eq + Hash + Into<usize>,
55 usize: Into<Key>,
56{
57 #[inline(always)]
58 pub fn add_trigger(&self) -> (Triggerer, Triggeree) {
59 self.executor.add_trigger(self.key.clone())
60 }
61 #[inline(always)]
62 pub fn queue(&self) {
63 self.executor.queue(self.key.clone())
64 }
65 #[inline(always)]
66 pub fn add_fd(&self, fd: Fd) {
67 self.executor.add_fd(fd, self.key.clone())
68 }
69 #[inline(always)]
70 pub fn remove_fd(&self, fd: Fd) {
71 self.executor.remove_fd(fd, self.key.clone())
72 }
73 #[inline(always)]
74 pub fn add_instant(&self, instant: time::Instant) -> heap::Slot {
75 self.executor.add_instant(instant, self.key.clone())
76 }
77 #[inline(always)]
78 pub fn remove_instant(&self, slot: heap::Slot) {
79 self.executor.remove_instant(slot)
80 }
81}
82
83#[cfg(feature = "tcp_typed")]
84impl<'a, Key: 'a> tcp_typed::Notifier for NotifierContext<'a, Key>
85where
86 Key: Clone + Eq + Hash + Into<usize>,
87 usize: Into<Key>,
88{
89 type InstantSlot = heap::Slot;
90 #[inline(always)]
91 fn queue(&self) {
92 self.queue()
93 }
94 #[inline(always)]
95 fn add_fd(&self, fd: Fd) {
96 self.add_fd(fd)
97 }
98 #[inline(always)]
99 fn remove_fd(&self, fd: Fd) {
100 self.remove_fd(fd)
101 }
102 #[inline(always)]
103 fn add_instant(&self, instant: time::Instant) -> heap::Slot {
104 self.add_instant(instant)
105 }
106 #[inline(always)]
107 fn remove_instant(&self, slot: heap::Slot) {
108 self.remove_instant(slot)
109 }
110}
111
112struct TimeEvent<Key>(time::Instant, Key);
113impl<Key> PartialEq for TimeEvent<Key> {
114 #[inline(always)]
115 fn eq(&self, other: &Self) -> bool {
116 self.0.eq(&other.0)
117 }
118}
119impl<Key> Eq for TimeEvent<Key> {}
120impl<Key> PartialOrd for TimeEvent<Key> {
121 #[inline(always)]
122 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
123 Some(self.0.cmp(&other.0))
124 }
125}
126impl<Key> Ord for TimeEvent<Key> {
127 #[inline(always)]
128 fn cmp(&self, other: &Self) -> cmp::Ordering {
129 self.0.cmp(&other.0)
130 }
131}
132pub struct Notifier<Key>
133where
134 Key: Clone + Eq + Hash + Into<usize>,
135 usize: Into<Key>,
136{
137 notifier_timeout: NotifierTimeout<Key>,
138 queue: sync::RwLock<HashSet<Key>>,
139 timer: sync::RwLock<heap::Heap<TimeEvent<Key>>>,
140}
141impl<Key> Notifier<Key>
142where
143 Key: Clone + Eq + Hash + Into<usize>,
144 usize: Into<Key>,
145{
146 pub fn new() -> Self {
147 Self {
148 notifier_timeout: NotifierTimeout::new(),
149 queue: sync::RwLock::new(HashSet::new()),
150 timer: sync::RwLock::new(heap::Heap::new()),
151 }
152 }
153
154 pub fn context(&self, key: Key) -> NotifierContext<Key> {
155 NotifierContext {
156 executor: self,
157 key,
158 }
159 }
160
161 fn queue(&self, data: Key) {
162 let _ = self.queue.write().unwrap().insert(data);
163 self.notifier_timeout.update_timeout(time::Instant::now());
164 }
165
166 fn add_fd(&self, fd: Fd, data: Key) {
167 self.notifier_timeout.add(
168 &mio::unix::EventedFd(&fd),
169 mio::Ready::readable()
170 | mio::Ready::writable()
171 | mio::unix::UnixReady::hup()
172 | mio::unix::UnixReady::error(), data,
174 );
175 }
176
177 fn remove_fd(&self, fd: Fd, data: Key) {
178 self.notifier_timeout
179 .delete(&mio::unix::EventedFd(&fd), data);
180 }
181
182 fn add_instant(&self, instant: time::Instant, data: Key) -> heap::Slot {
183 trace!("add_instant {:?}", instant);
184 let mut timer = self.timer.write().unwrap();
185 let slot = timer.push(TimeEvent(instant, data));
186 self.notifier_timeout.update_timeout(instant);
187 slot
188 }
189
190 fn remove_instant(&self, slot: heap::Slot) {
191 let _ = self.timer.write().unwrap().remove(slot); }
193
194 fn add_trigger(&self, data: Key) -> (Triggerer, Triggeree) {
195 let (registration, set_readiness) = mio::Registration::new2();
196 self.notifier_timeout
197 .add(®istration, mio::Ready::readable(), data);
198 (Triggerer(set_readiness), Triggeree(registration))
199 }
200
201 pub fn wait<F: FnMut(Either<mio::Ready, time::Instant>, Key)>(&self, mut f: F) {
202 let mut done_any = false;
203 let now = time::Instant::now();
204 let timeout = {
205 loop {
206 let TimeEvent(timeout, poll_key) = {
207 let timer = &mut *self.timer.write().unwrap();
208 if timer.peek().is_some() && timer.peek().unwrap().0 <= now {
209 trace!(
210 "timeout unelapsed {:?} <= {:?}",
211 timer.peek().unwrap().0,
212 now
213 );
214 }
215 if timer.peek().is_none() || timer.peek().unwrap().0 > now {
216 break;
217 }
218 timer.pop().unwrap()
219 };
220 done_any = true;
221 trace!("ran timeout {:?}", timeout);
222 f(Either::Right(timeout), poll_key)
223 }
224 self.timer.read().unwrap().peek().map(|x| x.0)
225 };
226 let done_any = done_any || !self.queue.read().unwrap().is_empty();
227 trace!("\\wait {:?}", timeout);
228 if let Some(timeout) = timeout {
229 self.notifier_timeout.update_timeout(timeout);
230 }
231 self.notifier_timeout
232 .wait(done_any, |flags, poll_key| f(Either::Left(flags), poll_key));
233 trace!("/wait");
234 let now = time::Instant::now();
235 let queue = mem::replace(&mut *self.queue.write().unwrap(), HashSet::new());
236 for poll_key in queue {
237 f(Either::Right(now), poll_key)
238 }
239 loop {
240 let TimeEvent(timeout, poll_key) = {
241 let timer = &mut *self.timer.write().unwrap();
242 if timer.peek().is_some() && timer.peek().unwrap().0 <= now {
243 trace!(
244 "timeout unelapsed {:?} <= {:?}",
245 timer.peek().unwrap().0,
246 now
247 );
248 }
249 if timer.peek().is_none() || timer.peek().unwrap().0 > now {
250 break;
251 }
252 timer.pop().unwrap()
253 };
254 trace!("ran timeout {:?}", timeout);
255 f(Either::Right(timeout), poll_key)
256 }
257 }
258}
259pub struct Triggerer(mio::SetReadiness);
260impl Drop for Triggerer {
261 fn drop(&mut self) {
262 self.0.set_readiness(mio::Ready::readable()).unwrap();
263 }
264}
265pub struct Triggeree(mio::Registration);
266
267const POLL_BUF_LENGTH: usize = 100;
268const POLL_TIMER: mio::Token = mio::Token(usize::max_value() - 1); struct NotifierTimeout<Key>
271where
272 Key: Clone + Eq + Hash + Into<usize>,
273 usize: Into<Key>,
274{
275 poll: mio::Poll,
276 timer: timer::Timer,
277 timeout: sync::Mutex<Option<time::Instant>>,
278 strip: sync::Mutex<Option<HashSet<usize>>>,
279 marker: marker::PhantomData<fn(Key)>,
280}
281impl<Key> NotifierTimeout<Key>
282where
283 Key: Clone + Eq + Hash + Into<usize>,
284 usize: Into<Key>,
285{
286 fn new() -> Self {
287 let poll = mio::Poll::new().unwrap();
288 let timer = timer::Timer::new();
289 poll.register(
290 &timer,
291 POLL_TIMER,
292 mio::Ready::readable(),
293 mio::PollOpt::edge(),
294 )
295 .unwrap();
296 Self {
297 poll,
298 timer,
299 timeout: sync::Mutex::new(None),
300 strip: sync::Mutex::new(None),
301 marker: marker::PhantomData,
302 }
303 }
304
305 fn add<E: mio::event::Evented + ?Sized>(&self, fd: &E, events: mio::Ready, data: Key) {
306 let data: usize = data.into();
307 assert_ne!(mio::Token(data), POLL_TIMER);
308 if let Some(ref mut strip) = *self.strip.lock().unwrap() {
309 let _ = strip.remove(&data);
310 }
311 self.poll
312 .register(fd, mio::Token(data), events, mio::PollOpt::edge())
313 .unwrap();
314 }
315
316 fn delete<E: mio::event::Evented + ?Sized>(&self, fd: &E, data: Key) {
317 self.poll.deregister(fd).unwrap();
318 if let Some(ref mut strip) = *self.strip.lock().unwrap() {
319 let x = strip.insert(data.into());
320 assert!(x);
321 }
322 }
323
324 fn update_timeout(&self, timeout: time::Instant) {
325 let mut current_timeout = self.timeout.lock().unwrap();
326 trace!("update_timeout {:?} {:?}", current_timeout, timeout);
327 if current_timeout.is_none() || timeout < current_timeout.unwrap() {
328 *current_timeout = Some(timeout);
329 self.timer.set_timeout(timeout);
330 }
331 }
332
333 fn wait<F: FnMut(mio::Ready, Key)>(&self, mut nonblock: bool, mut f: F) {
334 let mut events = mio::Events::with_capacity(POLL_BUF_LENGTH);
335 loop {
336 let x = mem::replace(&mut *self.strip.lock().unwrap(), Some(HashSet::new()));
337 assert!(x.is_none());
338 let n = loop {
339 trace!("\\mio_wait {:?}", nonblock);
340 let n = self
341 .poll
342 .poll(
343 &mut events,
344 if nonblock {
345 Some(time::Duration::new(0, 0))
346 } else {
347 None
348 },
349 )
350 .unwrap();
351 trace!("/mio_wait: {:?}", n);
352 if !nonblock && n == 0 {
353 continue;
354 }
355 let mut current_timeout = self.timeout.lock().unwrap();
356 if self.timer.elapsed() {
357 *current_timeout = None;
358 }
359 break n;
360 };
361 assert!(n <= events.capacity());
362 let strip = mem::replace(&mut *self.strip.lock().unwrap(), None).unwrap(); for x in events
364 .iter()
365 .filter(|x| x.token() != POLL_TIMER && !strip.contains(&x.token().0))
366 {
367 f(x.readiness(), x.token().0.into())
368 }
369 if n < events.capacity() {
370 break;
371 }
372 nonblock = true;
373 }
374 }
375}
376impl<Key> Drop for NotifierTimeout<Key>
377where
378 Key: Clone + Eq + Hash + Into<usize>,
379 usize: Into<Key>,
380{
381 fn drop(&mut self) {
382 self.poll.deregister(&self.timer).unwrap();
383 }
384}