mio_wakeq/
lib.rs

1//! # mio-wakeq
2//!
3//! A simple custom event delivery mechanism based on `mio`'s `Waker`
4//! functionality for `mio`-based systems. `mio`'s `Poll` is limited to a single
5//! `Waker`, which restricts it to handling only one type of external event.
6//! However, `mio-wakeq` allows multiple events to be managed and exposed
7//! through a single `Waker` within the same `Poll` instance.
8//!
9//! ```rust
10//! use mio::{Events, Poll, Token};
11//! use mio_wakeq::{EventId, EventQ, WakeQ};
12//!
13//! const WAKER: Token = Token(0);
14//! const EVENT0: EventId = EventId(0);
15//!
16//! let mut poll = Poll::new().unwrap();
17//! let wakeq = EventQ::new(&poll, WAKER, 8).unwrap();
18//!
19//! let event_sender = wakeq.get_sender();
20//! std::thread::spawn(move || {
21//!     event_sender.trigger_event(EVENT0);
22//! });
23//!
24//! let mut events = Events::with_capacity(32);
25//! poll.poll(&mut events, None).unwrap();
26//!
27//! for event in events.iter() {
28//!     match event.token() {
29//!         WAKER => {
30//!             for wev in wakeq.triggered_events() {
31//!                 assert_eq!(wev, EVENT0);
32//!             }
33//!         }
34//!
35//!         _ => unimplemented!(),
36//!     }
37//! }
38//! ```
39
40use mio::{Poll, Token, Waker};
41use std::io;
42use std::sync::atomic::{AtomicUsize, Ordering};
43use std::sync::{
44    mpsc::{self, Receiver, Sender},
45    Arc,
46};
47
48//
49//
50// EventId
51//
52//
53
54/// Event Id to specify uniqiue custom Event
55#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
56pub struct EventId(pub usize);
57
58impl From<usize> for EventId {
59    fn from(id: usize) -> Self {
60        EventId(id)
61    }
62}
63
64impl From<EventId> for usize {
65    fn from(event_id: EventId) -> Self {
66        event_id.0
67    }
68}
69
70//
71//
72// WakeQSender
73//
74//
75
76/// Queue Sender for the custom type message
77///
78/// Used from the app thread to notify poll loop.
79/// Consist of async Tx channel and the mio Waker
80pub struct WakeQSender<T> {
81    tx: Sender<T>,
82    waker: Arc<Waker>,
83}
84
85impl<T> WakeQSender<T> {
86    /// Send the custom type message and wake the poll loop
87    pub fn send_event(&self, event: T) -> io::Result<()> {
88        self.tx.send(event).map_err(|err| {
89            io::Error::new(
90                io::ErrorKind::Other,
91                format!("Failed to send message: {}", err),
92            )
93        })?;
94
95        self.waker.wake()
96    }
97}
98
99//
100//
101// WakeQ
102//
103//
104
105/// Waker Queue for custom type message
106///
107/// Should be single instance per poll loop.
108/// There are two halfs distributed between threads:
109/// - `rx` - process messages inside the poll loop
110/// - `sender` - shared between notifiers thread to expose custom messages to the poll loop
111pub struct WakeQ<T> {
112    rx: Receiver<T>,
113    sender: WakeQSender<T>,
114}
115
116impl<T> WakeQ<T> {
117    /// Create new Waker Queue. Register Waker into the exist Poll instance
118    /// and create async channel as a message delivery mechanism.
119    pub fn new(poll: &Poll, token: Token) -> io::Result<Self> {
120        let (tx, rx) = mpsc::channel();
121        let waker = Waker::new(poll.registry(), token)?;
122        let sender = WakeQSender {
123            tx,
124            waker: Arc::new(waker),
125        };
126        Ok(WakeQ { rx, sender })
127    }
128
129    /// Get the cloned `WakeQSender`
130    pub fn get_sender(&self) -> WakeQSender<T> {
131        self.sender.clone()
132    }
133
134    /// Iterator for messages in the channel.
135    /// Obtain all the messages via non-blocking way.
136    /// Used for the processing side in the poll loop.
137    pub fn iter_pending_events(&self) -> impl Iterator<Item = T> + '_ {
138        std::iter::from_fn(|| self.rx.try_recv().ok())
139    }
140}
141
142impl<T> Clone for WakeQSender<T> {
143    fn clone(&self) -> Self {
144        WakeQSender {
145            tx: self.tx.clone(),
146            waker: Arc::clone(&self.waker),
147        }
148    }
149}
150
151//
152//
153// EventQSender
154//
155//
156
157/// Event Sender for the event trigerring mechanism.
158///
159/// Used by the app thread to notify when an event happens.
160/// Consist of poll Waker and shared reference to Events storage cloned
161/// from the `EventQ`
162#[derive(Clone)]
163pub struct EventQSender {
164    events: Arc<Vec<AtomicUsize>>,
165    waker: Arc<Waker>,
166}
167
168impl EventQSender {
169    /// Trigger the event by EventId and wake the poll loop from notifier thread
170    pub fn trigger_event(&self, event_id: EventId) -> io::Result<()> {
171        let event_id = usize::from(event_id);
172        let (chunk_idx, bit_idx) = Self::event_position(event_id);
173        self.events[chunk_idx].fetch_or(1 << bit_idx, Ordering::SeqCst);
174        self.waker.wake()
175    }
176
177    fn event_position(event_id: usize) -> (usize, u32) {
178        let chunk_idx = event_id / usize::BITS as usize;
179        let bit_idx = (event_id % usize::BITS as usize) as u32;
180        (chunk_idx, bit_idx)
181    }
182}
183
184//
185//
186// EventQ
187//
188//
189
190/// Events storage and processor
191///
192/// Should be single instance per poll loop.
193/// `events` is a event storage that shared by reference with the `sender`s.
194/// `sender` shared between notifiers thread to expose custom messages to the poll loop
195pub struct EventQ {
196    events: Arc<Vec<AtomicUsize>>,
197    sender: EventQSender,
198}
199
200impl EventQ {
201    /// Create new `EventQ`. Register Waker into the exist Poll instance
202    /// and create shared storage for the specified number of events
203    pub fn new(poll: &Poll, token: Token, num_events: usize) -> io::Result<Self> {
204        let num_chunks = (num_events + usize::BITS as usize - 1) / usize::BITS as usize;
205        let events = Arc::new((0..num_chunks).map(|_| AtomicUsize::new(0)).collect());
206        let waker = Arc::new(Waker::new(poll.registry(), token)?);
207
208        let sender = EventQSender {
209            events: Arc::clone(&events),
210            waker,
211        };
212        Ok(EventQ { events, sender })
213    }
214
215    /// Get the cloned `EventQSender`
216    pub fn get_sender(&self) -> EventQSender {
217        self.sender.clone()
218    }
219
220    /// Iterator for pending events.
221    /// Created over the already triggered events at the moment.
222    /// All events in the `EventQ` are reset before the iterator is created, which allows
223    /// triggering new events and scheduling the poll wake during current proccesing
224    pub fn triggered_events(&self) -> EventQIterator {
225        let triggered_events = self
226            .events
227            .iter()
228            .map(|chunk| chunk.swap(0, Ordering::SeqCst))
229            .collect();
230        EventQIterator {
231            events: triggered_events,
232            chunk_idx: 0,
233            bit_idx: 0,
234        }
235    }
236}
237
238//
239//
240// EventQIterator
241//
242//
243
244/// Event's iterator for the pendings events from `EventQ`
245pub struct EventQIterator {
246    events: Vec<usize>,
247    chunk_idx: usize,
248    bit_idx: u32,
249}
250
251impl Iterator for EventQIterator {
252    type Item = EventId;
253
254    fn next(&mut self) -> Option<Self::Item> {
255        while self.chunk_idx < self.events.len() {
256            let chunk = self.events[self.chunk_idx];
257            while self.bit_idx < usize::BITS {
258                let bit = 1 << self.bit_idx;
259                let bit_position = self.chunk_idx * usize::BITS as usize + self.bit_idx as usize;
260                self.bit_idx += 1;
261                if chunk & bit != 0 {
262                    return Some(EventId(bit_position));
263                }
264            }
265            self.chunk_idx += 1;
266            self.bit_idx = 0;
267        }
268        None
269    }
270}