notifier/
lib.rs

1//! A wrapper around platform event notification APIs (currently via [mio](https://github.com/carllerche/mio)) that can also handle high-resolution timer events, including those set (on another thread) *during* a `notifier.wait()` call.
2//!
3//! **[Crates.io](https://crates.io/crates/notifier) │ [Repo](https://github.com/alecmocatta/notifier)**
4//!
5//! Delivers **edge-triggered** notifications for file descriptor state changes (corresponding to `mio::Ready::readable() | mio::Ready::writable() | mio::unix::UnixReady::hup() | mio::unix::UnixReady::error()`) as well as elapsing of instants.
6//!
7//! It's designed to be used in conjunction with a library that exhaustively collects events (e.g. connected, data in, data available to be written, remote closed, bytes acked, connection errors) upon each edge-triggered notification – for example [`tcp_typed`](https://github.com/alecmocatta/tcp_typed).
8//!
9//! # Note
10//!
11//! Currently doesn't support Windows.
12
13#![doc(html_root_url = "https://docs.rs/notifier/0.1.3")]
14#![warn(
15	// missing_copy_implementations,
16	// missing_debug_implementations,
17	// missing_docs,
18	trivial_casts,
19	trivial_numeric_casts,
20	unused_import_braces,
21	unused_qualifications,
22	unused_results,
23	clippy::pedantic
24)] // from https://github.com/rust-unofficial/patterns/blob/master/anti_patterns/deny-warnings.md
25#![allow(
26	clippy::new_without_default,
27	clippy::indexing_slicing,
28	clippy::needless_pass_by_value,
29	clippy::inline_always
30)]
31
32mod heap;
33mod timer;
34
35use either::Either;
36use log::trace;
37use std::{cmp, collections::HashSet, hash::Hash, marker, mem, sync, time};
38
39#[cfg(unix)]
40type Fd = std::os::unix::io::RawFd;
41#[cfg(windows)]
42type Fd = std::os::windows::io::RawHandle;
43
44pub struct NotifierContext<'a, Key: 'a>
45where
46	Key: Clone + Eq + Hash + Into<usize>,
47	usize: Into<Key>,
48{
49	executor: &'a Notifier<Key>,
50	key: Key,
51}
52impl<'a, Key: 'a> NotifierContext<'a, Key>
53where
54	Key: Clone + Eq + Hash + Into<usize>,
55	usize: Into<Key>,
56{
57	#[inline(always)]
58	pub fn add_trigger(&self) -> (Triggerer, Triggeree) {
59		self.executor.add_trigger(self.key.clone())
60	}
61	#[inline(always)]
62	pub fn queue(&self) {
63		self.executor.queue(self.key.clone())
64	}
65	#[inline(always)]
66	pub fn add_fd(&self, fd: Fd) {
67		self.executor.add_fd(fd, self.key.clone())
68	}
69	#[inline(always)]
70	pub fn remove_fd(&self, fd: Fd) {
71		self.executor.remove_fd(fd, self.key.clone())
72	}
73	#[inline(always)]
74	pub fn add_instant(&self, instant: time::Instant) -> heap::Slot {
75		self.executor.add_instant(instant, self.key.clone())
76	}
77	#[inline(always)]
78	pub fn remove_instant(&self, slot: heap::Slot) {
79		self.executor.remove_instant(slot)
80	}
81}
82
83#[cfg(feature = "tcp_typed")]
84impl<'a, Key: 'a> tcp_typed::Notifier for NotifierContext<'a, Key>
85where
86	Key: Clone + Eq + Hash + Into<usize>,
87	usize: Into<Key>,
88{
89	type InstantSlot = heap::Slot;
90	#[inline(always)]
91	fn queue(&self) {
92		self.queue()
93	}
94	#[inline(always)]
95	fn add_fd(&self, fd: Fd) {
96		self.add_fd(fd)
97	}
98	#[inline(always)]
99	fn remove_fd(&self, fd: Fd) {
100		self.remove_fd(fd)
101	}
102	#[inline(always)]
103	fn add_instant(&self, instant: time::Instant) -> heap::Slot {
104		self.add_instant(instant)
105	}
106	#[inline(always)]
107	fn remove_instant(&self, slot: heap::Slot) {
108		self.remove_instant(slot)
109	}
110}
111
112struct TimeEvent<Key>(time::Instant, Key);
113impl<Key> PartialEq for TimeEvent<Key> {
114	#[inline(always)]
115	fn eq(&self, other: &Self) -> bool {
116		self.0.eq(&other.0)
117	}
118}
119impl<Key> Eq for TimeEvent<Key> {}
120impl<Key> PartialOrd for TimeEvent<Key> {
121	#[inline(always)]
122	fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
123		Some(self.0.cmp(&other.0))
124	}
125}
126impl<Key> Ord for TimeEvent<Key> {
127	#[inline(always)]
128	fn cmp(&self, other: &Self) -> cmp::Ordering {
129		self.0.cmp(&other.0)
130	}
131}
132pub struct Notifier<Key>
133where
134	Key: Clone + Eq + Hash + Into<usize>,
135	usize: Into<Key>,
136{
137	notifier_timeout: NotifierTimeout<Key>,
138	queue: sync::RwLock<HashSet<Key>>,
139	timer: sync::RwLock<heap::Heap<TimeEvent<Key>>>,
140}
141impl<Key> Notifier<Key>
142where
143	Key: Clone + Eq + Hash + Into<usize>,
144	usize: Into<Key>,
145{
146	pub fn new() -> Self {
147		Self {
148			notifier_timeout: NotifierTimeout::new(),
149			queue: sync::RwLock::new(HashSet::new()),
150			timer: sync::RwLock::new(heap::Heap::new()),
151		}
152	}
153
154	pub fn context(&self, key: Key) -> NotifierContext<Key> {
155		NotifierContext {
156			executor: self,
157			key,
158		}
159	}
160
161	fn queue(&self, data: Key) {
162		let _ = self.queue.write().unwrap().insert(data);
163		self.notifier_timeout.update_timeout(time::Instant::now());
164	}
165
166	fn add_fd(&self, fd: Fd, data: Key) {
167		self.notifier_timeout.add(
168			&mio::unix::EventedFd(&fd),
169			mio::Ready::readable()
170				| mio::Ready::writable()
171				| mio::unix::UnixReady::hup()
172				| mio::unix::UnixReady::error(), // EPOLLRDHUP?
173			data,
174		);
175	}
176
177	fn remove_fd(&self, fd: Fd, data: Key) {
178		self.notifier_timeout
179			.delete(&mio::unix::EventedFd(&fd), data);
180	}
181
182	fn add_instant(&self, instant: time::Instant, data: Key) -> heap::Slot {
183		trace!("add_instant {:?}", instant);
184		let mut timer = self.timer.write().unwrap();
185		let slot = timer.push(TimeEvent(instant, data));
186		self.notifier_timeout.update_timeout(instant);
187		slot
188	}
189
190	fn remove_instant(&self, slot: heap::Slot) {
191		let _ = self.timer.write().unwrap().remove(slot); // TODO
192	}
193
194	fn add_trigger(&self, data: Key) -> (Triggerer, Triggeree) {
195		let (registration, set_readiness) = mio::Registration::new2();
196		self.notifier_timeout
197			.add(&registration, mio::Ready::readable(), data);
198		(Triggerer(set_readiness), Triggeree(registration))
199	}
200
201	pub fn wait<F: FnMut(Either<mio::Ready, time::Instant>, Key)>(&self, mut f: F) {
202		let mut done_any = false;
203		let now = time::Instant::now();
204		let timeout = {
205			loop {
206				let TimeEvent(timeout, poll_key) = {
207					let timer = &mut *self.timer.write().unwrap();
208					if timer.peek().is_some() && timer.peek().unwrap().0 <= now {
209						trace!(
210							"timeout unelapsed {:?} <= {:?}",
211							timer.peek().unwrap().0,
212							now
213						);
214					}
215					if timer.peek().is_none() || timer.peek().unwrap().0 > now {
216						break;
217					}
218					timer.pop().unwrap()
219				};
220				done_any = true;
221				trace!("ran timeout {:?}", timeout);
222				f(Either::Right(timeout), poll_key)
223			}
224			self.timer.read().unwrap().peek().map(|x| x.0)
225		};
226		let done_any = done_any || !self.queue.read().unwrap().is_empty();
227		trace!("\\wait {:?}", timeout);
228		if let Some(timeout) = timeout {
229			self.notifier_timeout.update_timeout(timeout);
230		}
231		self.notifier_timeout
232			.wait(done_any, |flags, poll_key| f(Either::Left(flags), poll_key));
233		trace!("/wait");
234		let now = time::Instant::now();
235		let queue = mem::replace(&mut *self.queue.write().unwrap(), HashSet::new());
236		for poll_key in queue {
237			f(Either::Right(now), poll_key)
238		}
239		loop {
240			let TimeEvent(timeout, poll_key) = {
241				let timer = &mut *self.timer.write().unwrap();
242				if timer.peek().is_some() && timer.peek().unwrap().0 <= now {
243					trace!(
244						"timeout unelapsed {:?} <= {:?}",
245						timer.peek().unwrap().0,
246						now
247					);
248				}
249				if timer.peek().is_none() || timer.peek().unwrap().0 > now {
250					break;
251				}
252				timer.pop().unwrap()
253			};
254			trace!("ran timeout {:?}", timeout);
255			f(Either::Right(timeout), poll_key)
256		}
257	}
258}
259pub struct Triggerer(mio::SetReadiness);
260impl Drop for Triggerer {
261	fn drop(&mut self) {
262		self.0.set_readiness(mio::Ready::readable()).unwrap();
263	}
264}
265pub struct Triggeree(mio::Registration);
266
267const POLL_BUF_LENGTH: usize = 100;
268const POLL_TIMER: mio::Token = mio::Token(usize::max_value() - 1); // max_value() is taken by mio
269
270struct NotifierTimeout<Key>
271where
272	Key: Clone + Eq + Hash + Into<usize>,
273	usize: Into<Key>,
274{
275	poll: mio::Poll,
276	timer: timer::Timer,
277	timeout: sync::Mutex<Option<time::Instant>>,
278	strip: sync::Mutex<Option<HashSet<usize>>>,
279	marker: marker::PhantomData<fn(Key)>,
280}
281impl<Key> NotifierTimeout<Key>
282where
283	Key: Clone + Eq + Hash + Into<usize>,
284	usize: Into<Key>,
285{
286	fn new() -> Self {
287		let poll = mio::Poll::new().unwrap();
288		let timer = timer::Timer::new();
289		poll.register(
290			&timer,
291			POLL_TIMER,
292			mio::Ready::readable(),
293			mio::PollOpt::edge(),
294		)
295		.unwrap();
296		Self {
297			poll,
298			timer,
299			timeout: sync::Mutex::new(None),
300			strip: sync::Mutex::new(None),
301			marker: marker::PhantomData,
302		}
303	}
304
305	fn add<E: mio::event::Evented + ?Sized>(&self, fd: &E, events: mio::Ready, data: Key) {
306		let data: usize = data.into();
307		assert_ne!(mio::Token(data), POLL_TIMER);
308		if let Some(ref mut strip) = *self.strip.lock().unwrap() {
309			let _ = strip.remove(&data);
310		}
311		self.poll
312			.register(fd, mio::Token(data), events, mio::PollOpt::edge())
313			.unwrap();
314	}
315
316	fn delete<E: mio::event::Evented + ?Sized>(&self, fd: &E, data: Key) {
317		self.poll.deregister(fd).unwrap();
318		if let Some(ref mut strip) = *self.strip.lock().unwrap() {
319			let x = strip.insert(data.into());
320			assert!(x);
321		}
322	}
323
324	fn update_timeout(&self, timeout: time::Instant) {
325		let mut current_timeout = self.timeout.lock().unwrap();
326		trace!("update_timeout {:?} {:?}", current_timeout, timeout);
327		if current_timeout.is_none() || timeout < current_timeout.unwrap() {
328			*current_timeout = Some(timeout);
329			self.timer.set_timeout(timeout);
330		}
331	}
332
333	fn wait<F: FnMut(mio::Ready, Key)>(&self, mut nonblock: bool, mut f: F) {
334		let mut events = mio::Events::with_capacity(POLL_BUF_LENGTH);
335		loop {
336			let x = mem::replace(&mut *self.strip.lock().unwrap(), Some(HashSet::new()));
337			assert!(x.is_none());
338			let n = loop {
339				trace!("\\mio_wait {:?}", nonblock);
340				let n = self
341					.poll
342					.poll(
343						&mut events,
344						if nonblock {
345							Some(time::Duration::new(0, 0))
346						} else {
347							None
348						},
349					)
350					.unwrap();
351				trace!("/mio_wait: {:?}", n);
352				if !nonblock && n == 0 {
353					continue;
354				}
355				let mut current_timeout = self.timeout.lock().unwrap();
356				if self.timer.elapsed() {
357					*current_timeout = None;
358				}
359				break n;
360			};
361			assert!(n <= events.capacity());
362			let strip = mem::replace(&mut *self.strip.lock().unwrap(), None).unwrap(); // TODO: currently Context needs to do its own check for strips added after this point
363			for x in events
364				.iter()
365				.filter(|x| x.token() != POLL_TIMER && !strip.contains(&x.token().0))
366			{
367				f(x.readiness(), x.token().0.into())
368			}
369			if n < events.capacity() {
370				break;
371			}
372			nonblock = true;
373		}
374	}
375}
376impl<Key> Drop for NotifierTimeout<Key>
377where
378	Key: Clone + Eq + Hash + Into<usize>,
379	usize: Into<Key>,
380{
381	fn drop(&mut self) {
382		self.poll.deregister(&self.timer).unwrap();
383	}
384}