mio_wakeq/lib.rs
1//! # mio-wakeq
2//!
3//! A simple custom event delivery mechanism based on `mio`'s `Waker`
4//! functionality for `mio`-based systems. `mio`'s `Poll` is limited to a single
5//! `Waker`, which restricts it to handling only one type of external event.
6//! However, `mio-wakeq` allows multiple events to be managed and exposed
7//! through a single `Waker` within the same `Poll` instance.
8//!
9//! ```rust
10//! use mio::{Events, Poll, Token};
11//! use mio_wakeq::{EventId, EventQ, WakeQ};
12//!
13//! const WAKER: Token = Token(0);
14//! const EVENT0: EventId = EventId(0);
15//!
16//! let mut poll = Poll::new().unwrap();
17//! let wakeq = EventQ::new(&poll, WAKER, 8).unwrap();
18//!
19//! let event_sender = wakeq.get_sender();
20//! std::thread::spawn(move || {
21//! event_sender.trigger_event(EVENT0);
22//! });
23//!
24//! let mut events = Events::with_capacity(32);
25//! poll.poll(&mut events, None).unwrap();
26//!
27//! for event in events.iter() {
28//! match event.token() {
29//! WAKER => {
30//! for wev in wakeq.triggered_events() {
31//! assert_eq!(wev, EVENT0);
32//! }
33//! }
34//!
35//! _ => unimplemented!(),
36//! }
37//! }
38//! ```
39
40use mio::{Poll, Token, Waker};
41use std::io;
42use std::sync::atomic::{AtomicUsize, Ordering};
43use std::sync::{
44 mpsc::{self, Receiver, Sender},
45 Arc,
46};
47
48//
49//
50// EventId
51//
52//
53
54/// Event Id to specify uniqiue custom Event
55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
56pub struct EventId(pub usize);
57
58impl From<usize> for EventId {
59 fn from(id: usize) -> Self {
60 EventId(id)
61 }
62}
63
64impl From<EventId> for usize {
65 fn from(event_id: EventId) -> Self {
66 event_id.0
67 }
68}
69
70//
71//
72// WakeQSender
73//
74//
75
76/// Queue Sender for the custom type message
77///
78/// Used from the app thread to notify poll loop.
79/// Consist of async Tx channel and the mio Waker
80pub struct WakeQSender<T> {
81 tx: Sender<T>,
82 waker: Arc<Waker>,
83}
84
85impl<T> WakeQSender<T> {
86 /// Send the custom type message and wake the poll loop
87 pub fn send_event(&self, event: T) -> io::Result<()> {
88 self.tx.send(event).map_err(|err| {
89 io::Error::new(
90 io::ErrorKind::Other,
91 format!("Failed to send message: {}", err),
92 )
93 })?;
94
95 self.waker.wake()
96 }
97}
98
99//
100//
101// WakeQ
102//
103//
104
105/// Waker Queue for custom type message
106///
107/// Should be single instance per poll loop.
108/// There are two halfs distributed between threads:
109/// - `rx` - process messages inside the poll loop
110/// - `sender` - shared between notifiers thread to expose custom messages to the poll loop
111pub struct WakeQ<T> {
112 rx: Receiver<T>,
113 sender: WakeQSender<T>,
114}
115
116impl<T> WakeQ<T> {
117 /// Create new Waker Queue. Register Waker into the exist Poll instance
118 /// and create async channel as a message delivery mechanism.
119 pub fn new(poll: &Poll, token: Token) -> io::Result<Self> {
120 let (tx, rx) = mpsc::channel();
121 let waker = Waker::new(poll.registry(), token)?;
122 let sender = WakeQSender {
123 tx,
124 waker: Arc::new(waker),
125 };
126 Ok(WakeQ { rx, sender })
127 }
128
129 /// Get the cloned `WakeQSender`
130 pub fn get_sender(&self) -> WakeQSender<T> {
131 self.sender.clone()
132 }
133
134 /// Iterator for messages in the channel.
135 /// Obtain all the messages via non-blocking way.
136 /// Used for the processing side in the poll loop.
137 pub fn iter_pending_events(&self) -> impl Iterator<Item = T> + '_ {
138 std::iter::from_fn(|| self.rx.try_recv().ok())
139 }
140}
141
142impl<T> Clone for WakeQSender<T> {
143 fn clone(&self) -> Self {
144 WakeQSender {
145 tx: self.tx.clone(),
146 waker: Arc::clone(&self.waker),
147 }
148 }
149}
150
151//
152//
153// EventQSender
154//
155//
156
157/// Event Sender for the event trigerring mechanism.
158///
159/// Used by the app thread to notify when an event happens.
160/// Consist of poll Waker and shared reference to Events storage cloned
161/// from the `EventQ`
162#[derive(Clone)]
163pub struct EventQSender {
164 events: Arc<Vec<AtomicUsize>>,
165 waker: Arc<Waker>,
166}
167
168impl EventQSender {
169 /// Trigger the event by EventId and wake the poll loop from notifier thread
170 pub fn trigger_event(&self, event_id: EventId) -> io::Result<()> {
171 let event_id = usize::from(event_id);
172 let (chunk_idx, bit_idx) = Self::event_position(event_id);
173 self.events[chunk_idx].fetch_or(1 << bit_idx, Ordering::SeqCst);
174 self.waker.wake()
175 }
176
177 fn event_position(event_id: usize) -> (usize, u32) {
178 let chunk_idx = event_id / usize::BITS as usize;
179 let bit_idx = (event_id % usize::BITS as usize) as u32;
180 (chunk_idx, bit_idx)
181 }
182}
183
184//
185//
186// EventQ
187//
188//
189
190/// Events storage and processor
191///
192/// Should be single instance per poll loop.
193/// `events` is a event storage that shared by reference with the `sender`s.
194/// `sender` shared between notifiers thread to expose custom messages to the poll loop
195pub struct EventQ {
196 events: Arc<Vec<AtomicUsize>>,
197 sender: EventQSender,
198}
199
200impl EventQ {
201 /// Create new `EventQ`. Register Waker into the exist Poll instance
202 /// and create shared storage for the specified number of events
203 pub fn new(poll: &Poll, token: Token, num_events: usize) -> io::Result<Self> {
204 let num_chunks = (num_events + usize::BITS as usize - 1) / usize::BITS as usize;
205 let events = Arc::new((0..num_chunks).map(|_| AtomicUsize::new(0)).collect());
206 let waker = Arc::new(Waker::new(poll.registry(), token)?);
207
208 let sender = EventQSender {
209 events: Arc::clone(&events),
210 waker,
211 };
212 Ok(EventQ { events, sender })
213 }
214
215 /// Get the cloned `EventQSender`
216 pub fn get_sender(&self) -> EventQSender {
217 self.sender.clone()
218 }
219
220 /// Iterator for pending events.
221 /// Created over the already triggered events at the moment.
222 /// All events in the `EventQ` are reset before the iterator is created, which allows
223 /// triggering new events and scheduling the poll wake during current proccesing
224 pub fn triggered_events(&self) -> EventQIterator {
225 let triggered_events = self
226 .events
227 .iter()
228 .map(|chunk| chunk.swap(0, Ordering::SeqCst))
229 .collect();
230 EventQIterator {
231 events: triggered_events,
232 chunk_idx: 0,
233 bit_idx: 0,
234 }
235 }
236}
237
238//
239//
240// EventQIterator
241//
242//
243
244/// Event's iterator for the pendings events from `EventQ`
245pub struct EventQIterator {
246 events: Vec<usize>,
247 chunk_idx: usize,
248 bit_idx: u32,
249}
250
251impl Iterator for EventQIterator {
252 type Item = EventId;
253
254 fn next(&mut self) -> Option<Self::Item> {
255 while self.chunk_idx < self.events.len() {
256 let chunk = self.events[self.chunk_idx];
257 while self.bit_idx < usize::BITS {
258 let bit = 1 << self.bit_idx;
259 let bit_position = self.chunk_idx * usize::BITS as usize + self.bit_idx as usize;
260 self.bit_idx += 1;
261 if chunk & bit != 0 {
262 return Some(EventId(bit_position));
263 }
264 }
265 self.chunk_idx += 1;
266 self.bit_idx = 0;
267 }
268 None
269 }
270}