1use crate::NotificationId;
3use crossbeam_queue::{ArrayQueue, SegQueue};
4use mio::Waker;
5use std::sync::Arc;
6use std::{error, fmt, io};
7
8pub type Result = std::result::Result<(), NotificationError<NotificationId>>;
10
11pub trait Notifier: Send + Sync + fmt::Debug {
13 fn notify(&self, id: NotificationId) -> Result;
15}
16
17pub trait NotificationReceiver: Send + Sync {
19 fn receive(&self) -> Option<NotificationId>;
21 fn len(&self) -> usize;
23 fn is_empty(&self) -> bool;
25}
26
27#[derive(Debug)]
30pub struct NotificationQueue {
31 waker: Arc<Waker>,
33 queue: SegQueue<NotificationId>,
35}
36
37impl NotificationQueue {
38 pub fn new(waker: Arc<Waker>) -> NotificationQueue {
40 NotificationQueue {
41 waker,
42 queue: SegQueue::new(),
43 }
44 }
45
46 pub fn push(&self, id: NotificationId) -> io::Result<()> {
48 self.queue.push(id);
49 self.waker.wake()
50 }
51
52 pub fn pop(&self) -> Option<NotificationId> {
55 self.queue.pop()
56 }
57
58 pub fn is_empty(&self) -> bool {
60 self.queue.is_empty()
61 }
62
63 pub fn len(&self) -> usize {
65 self.queue.len()
66 }
67}
68
69impl Notifier for NotificationQueue {
70 fn notify(&self, id: NotificationId) -> Result {
71 self.push(id).map_err(From::from)
72 }
73}
74
75impl NotificationReceiver for NotificationQueue {
76 fn receive(&self) -> Option<NotificationId> {
77 self.pop()
78 }
79
80 fn len(&self) -> usize {
81 self.len()
82 }
83
84 fn is_empty(&self) -> bool {
85 self.is_empty()
86 }
87}
88
89#[derive(Debug)]
92pub struct BoundedNotificationQueue {
93 waker: Arc<Waker>,
95 queue: ArrayQueue<NotificationId>,
97}
98
99impl BoundedNotificationQueue {
100 pub fn new(size: usize, waker: Arc<Waker>) -> BoundedNotificationQueue {
102 BoundedNotificationQueue {
103 waker,
104 queue: ArrayQueue::new(size),
105 }
106 }
107
108 pub fn push(&self, id: NotificationId) -> Result {
110 self.queue
111 .push(id)
112 .map_err(NotificationError::Full)
113 .and_then(|_| self.waker.wake().map_err(From::from))
114 }
115
116 pub fn pop(&self) -> Option<NotificationId> {
119 self.queue.pop()
120 }
121
122 pub fn is_empty(&self) -> bool {
124 self.queue.is_empty()
125 }
126
127 pub fn len(&self) -> usize {
129 self.queue.len()
130 }
131}
132
133impl Notifier for BoundedNotificationQueue {
134 fn notify(&self, id: NotificationId) -> Result {
135 self.push(id)
136 }
137}
138
139impl NotificationReceiver for BoundedNotificationQueue {
140 fn receive(&self) -> Option<NotificationId> {
141 self.pop()
142 }
143
144 fn len(&self) -> usize {
145 self.len()
146 }
147
148 fn is_empty(&self) -> bool {
149 self.queue.is_empty()
150 }
151}
152
153pub enum NotificationError<T> {
155 Io(io::Error),
157
158 Full(T),
160}
161
162impl<T> From<io::Error> for NotificationError<T> {
169 fn from(src: io::Error) -> Self {
170 NotificationError::Io(src)
171 }
172}
173
174impl<T> error::Error for NotificationError<T> {}
181
182impl<T> fmt::Debug for NotificationError<T> {
183 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
184 match self {
185 NotificationError::Io(io_err) => write!(f, "{:?}", io_err),
186 NotificationError::Full(..) => write!(f, "Full(..)"),
187 }
188 }
189}
190
191impl<T> fmt::Display for NotificationError<T> {
192 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
193 match self {
194 NotificationError::Io(io_err) => write!(f, "{}", io_err),
195 NotificationError::Full(..) => write!(f, "sending on a full notification queue"),
196 }
197 }
198}