mio_misc/
queue.rs

1//! Thread safe queues that that trigger notifications on `mio::Waker`
2use crate::NotificationId;
3use crossbeam_queue::{ArrayQueue, SegQueue};
4use mio::Waker;
5use std::sync::Arc;
6use std::{error, fmt, io};
7
8///
9pub type Result = std::result::Result<(), NotificationError<NotificationId>>;
10
11/// Represents the side that notifies
12pub trait Notifier: Send + Sync + fmt::Debug {
13    /// Notifies `Poll`
14    fn notify(&self, id: NotificationId) -> Result;
15}
16
17/// Represents the side that receives event notifications
18pub trait NotificationReceiver: Send + Sync {
19    /// Retrieves the next notification, if there's any
20    fn receive(&self) -> Option<NotificationId>;
21    /// Returns number of notifications
22    fn len(&self) -> usize;
23    /// Returns `true` if the queue is empty.
24    fn is_empty(&self) -> bool;
25}
26
27/// An unbounded queue that helps with simulation of registering event sources with `Poll`.
28/// It keeps track of `NotificationId`s associated with `Waker`
29#[derive(Debug)]
30pub struct NotificationQueue {
31    /// Waker to notify Poll
32    waker: Arc<Waker>,
33    /// Queue of `NotificationId`s
34    queue: SegQueue<NotificationId>,
35}
36
37impl NotificationQueue {
38    /// Creates a notification queue
39    pub fn new(waker: Arc<Waker>) -> NotificationQueue {
40        NotificationQueue {
41            waker,
42            queue: SegQueue::new(),
43        }
44    }
45
46    /// Queues the `NotificationId` and notifies the `Poll` associated with `Waker`
47    pub fn push(&self, id: NotificationId) -> io::Result<()> {
48        self.queue.push(id);
49        self.waker.wake()
50    }
51
52    /// Attempts to remove an element from the queue
53    /// If the queue is empty, None is returned.
54    pub fn pop(&self) -> Option<NotificationId> {
55        self.queue.pop()
56    }
57
58    /// Returns `true` if the queue is empty
59    pub fn is_empty(&self) -> bool {
60        self.queue.is_empty()
61    }
62
63    /// Returns queue length
64    pub fn len(&self) -> usize {
65        self.queue.len()
66    }
67}
68
69impl Notifier for NotificationQueue {
70    fn notify(&self, id: NotificationId) -> Result {
71        self.push(id).map_err(From::from)
72    }
73}
74
75impl NotificationReceiver for NotificationQueue {
76    fn receive(&self) -> Option<NotificationId> {
77        self.pop()
78    }
79
80    fn len(&self) -> usize {
81        self.len()
82    }
83
84    fn is_empty(&self) -> bool {
85        self.is_empty()
86    }
87}
88
89/// A bounded queue that helps with simulation of registering event sources with `Poll`.
90/// It keeps track of `NotificationId`s associated with Waker
91#[derive(Debug)]
92pub struct BoundedNotificationQueue {
93    /// Waker to notify Poll
94    waker: Arc<Waker>,
95    /// Queue of `NotificationId`'s
96    queue: ArrayQueue<NotificationId>,
97}
98
99impl BoundedNotificationQueue {
100    /// Creates the queue
101    pub fn new(size: usize, waker: Arc<Waker>) -> BoundedNotificationQueue {
102        BoundedNotificationQueue {
103            waker,
104            queue: ArrayQueue::new(size),
105        }
106    }
107
108    /// Queues the `NotificationId` and notifies the `Poll` associated with `Waker`
109    pub fn push(&self, id: NotificationId) -> Result {
110        self.queue
111            .push(id)
112            .map_err(NotificationError::Full)
113            .and_then(|_| self.waker.wake().map_err(From::from))
114    }
115
116    /// Attempts to remove an element from the queue
117    /// If the queue is empty, None is returned.
118    pub fn pop(&self) -> Option<NotificationId> {
119        self.queue.pop()
120    }
121
122    /// Returns `true` if the queue is empty.
123    pub fn is_empty(&self) -> bool {
124        self.queue.is_empty()
125    }
126
127    /// Returns queue length
128    pub fn len(&self) -> usize {
129        self.queue.len()
130    }
131}
132
133impl Notifier for BoundedNotificationQueue {
134    fn notify(&self, id: NotificationId) -> Result {
135        self.push(id)
136    }
137}
138
139impl NotificationReceiver for BoundedNotificationQueue {
140    fn receive(&self) -> Option<NotificationId> {
141        self.pop()
142    }
143
144    fn len(&self) -> usize {
145        self.len()
146    }
147
148    fn is_empty(&self) -> bool {
149        self.queue.is_empty()
150    }
151}
152
153/// An error returned from the `SyncEventNotificationQueue::push` function.
154pub enum NotificationError<T> {
155    /// An IO error.
156    Io(io::Error),
157
158    /// Notification could not be sent because the queue is full.
159    Full(T),
160}
161
162/*
163 *
164 * ===== Implement Error conversions =====
165 *
166 */
167
168impl<T> From<io::Error> for NotificationError<T> {
169    fn from(src: io::Error) -> Self {
170        NotificationError::Io(src)
171    }
172}
173
174/*
175 *
176 * ===== Implement Error, Debug, and Display for Errors =====
177 *
178 */
179
180impl<T> error::Error for NotificationError<T> {}
181
182impl<T> fmt::Debug for NotificationError<T> {
183    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
184        match self {
185            NotificationError::Io(io_err) => write!(f, "{:?}", io_err),
186            NotificationError::Full(..) => write!(f, "Full(..)"),
187        }
188    }
189}
190
191impl<T> fmt::Display for NotificationError<T> {
192    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
193        match self {
194            NotificationError::Io(io_err) => write!(f, "{}", io_err),
195            NotificationError::Full(..) => write!(f, "sending on a full notification queue"),
196        }
197    }
198}