1use std::fmt;
2use std::marker::Unpin;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::Stream;
7
8#[allow(unreachable_pub)]
9pub use self::queue_sender::QueueSender;
10#[allow(unreachable_pub)]
11pub use self::queue_stream::QueueStream;
12
13mod queue_sender;
14mod queue_stream;
15
16pub trait Waker {
17 fn rx_wake(&self);
18 fn tx_park(&self, w: std::task::Waker);
19 fn close_channel(&self);
20 fn is_closed(&self) -> bool;
21}
22
23impl<T: ?Sized> QueueExt for T {}
24
25pub trait QueueExt {
26 #[inline]
27 fn queue_stream<Item, F>(self, f: F) -> QueueStream<Self, Item, F>
28 where
29 Self: Sized + Unpin,
30 F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
31 {
32 assert_stream::<Item, _>(QueueStream::new(self, f))
33 }
34
35 #[inline]
36 fn queue_sender<Item, F, R>(self, f: F) -> QueueSender<Self, Item, F, R>
37 where
38 Self: Sized + Waker,
39 F: Fn(&mut Self, Action<Item>) -> Reply<R>,
40 {
41 QueueSender::new(self, f)
42 }
43
44 #[inline]
45 #[allow(clippy::type_complexity)]
46 fn queue_channel<Item, F1, R, F2>(
47 self,
48 f1: F1,
49 f2: F2,
50 ) -> (
51 QueueSender<QueueStream<Self, Item, F2>, Item, F1, R>,
52 QueueStream<Self, Item, F2>,
53 )
54 where
55 Self: Sized + Unpin + Clone,
56 F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
57 F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
58 {
59 queue_channel(self, f1, f2)
60 }
61}
62
63#[allow(clippy::type_complexity)]
64#[inline]
65pub fn queue_channel<Q, Item, F1, R, F2>(
66 q: Q,
67 f1: F1,
68 f2: F2,
69) -> (
70 QueueSender<QueueStream<Q, Item, F2>, Item, F1, R>,
71 QueueStream<Q, Item, F2>,
72)
73where
74 Q: Sized + Unpin + Clone,
75 F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
76 F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
77{
78 let rx = QueueStream::new(q, f2);
79 let tx = QueueSender::new(rx.clone(), f1);
80 (tx, rx)
81}
82
83pub enum Action<Item> {
84 Send(Item),
85 IsFull,
86 IsEmpty,
87 Len,
88}
89
90pub enum Reply<R> {
91 Send(R),
92 IsFull(bool),
93 IsEmpty(bool),
94 Len(usize),
95}
96
97pub type TrySendError<T> = SendError<T>;
98
99#[derive(Clone, PartialEq, Eq)]
100pub struct SendError<T> {
101 kind: SendErrorKind,
102 val: Option<T>,
103}
104
105impl<T> SendError<T> {
106 #[inline]
107 pub fn full(val: T) -> Self {
108 SendError {
109 kind: SendErrorKind::Full,
110 val: Some(val),
111 }
112 }
113
114 #[inline]
115 pub fn disconnected(val: Option<T>) -> Self {
116 SendError {
117 kind: SendErrorKind::Disconnected,
118 val,
119 }
120 }
121}
122
123impl<T> fmt::Debug for SendError<T> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("SendError")
126 .field("kind", &self.kind)
127 .finish()
128 }
129}
130
131impl<T> fmt::Display for SendError<T> {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 if self.is_full() {
134 write!(f, "send failed because mpsc is full")
135 } else {
136 write!(f, "send failed because receiver is gone")
137 }
138 }
139}
140
141#[allow(dead_code)]
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub enum SendErrorKind {
144 Full,
145 Disconnected,
146}
147
148impl<T: core::any::Any> std::error::Error for SendError<T> {}
149
150impl<T> SendError<T> {
151 #[inline]
153 pub fn is_full(&self) -> bool {
154 matches!(self.kind, SendErrorKind::Full)
155 }
156
157 #[inline]
159 pub fn is_disconnected(&self) -> bool {
160 matches!(self.kind, SendErrorKind::Disconnected)
161 }
162
163 #[inline]
165 pub fn into_inner(self) -> Option<T> {
166 self.val
167 }
168}
169
170#[inline]
173pub(crate) fn assert_stream<T, S>(stream: S) -> S
174where
175 S: Stream<Item = T>,
176{
177 stream
178}
179
180#[cfg(test)]
181use futures::pin_mut;
182#[cfg(test)]
183use futures::task::noop_waker;
184#[cfg(test)]
185use std::collections::VecDeque;
186#[cfg(test)]
187use std::sync::{Arc, Mutex};
188
189#[cfg(test)]
194fn poll_deque(pin_q: Pin<&mut VecDeque<i32>>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
195 Poll::Ready(pin_q.get_mut().pop_front())
196}
197
198#[cfg(test)]
204#[derive(Clone)]
205struct SharedQueue(Arc<Mutex<VecDeque<i32>>>);
206
207#[cfg(test)]
208type SharedPollFn = fn(Pin<&mut SharedQueue>, &mut Context<'_>) -> Poll<Option<i32>>;
209
210#[cfg(test)]
211type SharedHandlerFn =
212 fn(&mut QueueStream<SharedQueue, i32, SharedPollFn>, Action<i32>) -> Reply<i32>;
213
214#[cfg(test)]
215fn poll_shared(pin_q: Pin<&mut SharedQueue>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
216 Poll::Ready(pin_q.get_mut().0.lock().unwrap().pop_front())
217}
218
219#[cfg(test)]
220fn push_shared(
221 s: &mut QueueStream<SharedQueue, i32, SharedPollFn>,
222 action: Action<i32>,
223) -> Reply<i32> {
224 match action {
225 Action::Send(item) => {
226 s.0.lock().unwrap().push_back(item);
227 Reply::Send(item)
228 }
229 Action::IsFull => Reply::IsFull(false),
230 Action::IsEmpty => Reply::IsEmpty(s.0.lock().unwrap().is_empty()),
231 Action::Len => Reply::Len(s.0.lock().unwrap().len()),
232 }
233}
234
235#[cfg(test)]
236fn make_shared_channel() -> (
237 QueueSender<QueueStream<SharedQueue, i32, SharedPollFn>, i32, SharedHandlerFn, i32>,
238 QueueStream<SharedQueue, i32, SharedPollFn>,
239) {
240 let q = SharedQueue(Arc::new(Mutex::new(VecDeque::new())));
241 queue_channel(
242 q,
243 push_shared as SharedHandlerFn,
244 poll_shared as SharedPollFn,
245 )
246}
247
248#[test]
253fn queue_channel_send_receive() {
254 let (mut tx, rx) = make_shared_channel();
255
256 tx.try_send(1).unwrap();
257 tx.try_send(2).unwrap();
258 tx.try_send(3).unwrap();
259
260 pin_mut!(rx);
261 let waker = noop_waker();
262 let mut cx = Context::from_waker(&waker);
263
264 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
265 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
266 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(3)));
267 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(None));
268}
269
270#[test]
271fn queue_channel_sender_is_full() {
272 let (mut tx, _rx) = make_shared_channel();
273 assert!(!tx.is_full());
274 tx.try_send(1).unwrap();
275 assert_eq!(tx.len(), 1);
276}
277
278#[test]
279fn queue_channel_receiver_closed_when_sender_dropped() {
280 let (tx, rx) = make_shared_channel();
281 assert!(!rx.is_closed());
282 drop(tx);
283 assert!(rx.is_closed());
284
285 pin_mut!(rx);
286 let waker = noop_waker();
287 let mut cx = Context::from_waker(&waker);
288 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(None));
289}
290
291#[test]
296fn queue_ext_queue_stream() {
297 let q: VecDeque<i32> = VecDeque::from([5, 10]);
298 let stream = q.queue_stream(
299 poll_deque as fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
300 );
301 pin_mut!(stream);
302
303 let waker = noop_waker();
304 let mut cx = Context::from_waker(&waker);
305
306 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(5)));
307 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(10)));
308 assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
309}
310
311#[test]
312fn queue_ext_queue_sender() {
313 let stream_obj: QueueStream<SharedQueue, i32, SharedPollFn> = QueueStream::new(
314 SharedQueue(Arc::new(Mutex::new(VecDeque::new()))),
315 poll_shared as SharedPollFn,
316 );
317 let mut sender = stream_obj.queue_sender(push_shared as SharedHandlerFn);
318 let r = sender.try_send(7);
319 assert!(r.is_ok());
320 assert_eq!(r.unwrap(), 7);
321}
322
323#[test]
324fn queue_ext_queue_channel() {
325 use futures::StreamExt;
326
327 let q = SharedQueue(Arc::new(Mutex::new(VecDeque::new())));
328 let (mut tx, rx) = q.queue_channel(push_shared as SharedHandlerFn, poll_shared as SharedPollFn);
329 tx.try_send(100).unwrap();
330
331 let items: Vec<i32> = futures::executor::block_on(rx.collect::<Vec<i32>>());
332 assert_eq!(items, vec![100]);
333}
334
335#[test]
340fn multiple_senders() {
341 let (mut tx1, rx) = make_shared_channel();
342 let mut tx2 = tx1.clone();
343 let mut tx3 = tx1.clone();
344
345 tx1.try_send(1).unwrap();
346 tx2.try_send(2).unwrap();
347 tx3.try_send(3).unwrap();
348
349 assert_eq!(tx1.len(), 3);
350
351 pin_mut!(rx);
352 let waker = noop_waker();
353 let mut cx = Context::from_waker(&waker);
354
355 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
356 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
357 assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(3)));
358}
359
360#[test]
361fn channel_closes_only_when_last_sender_dropped() {
362 let (tx1, rx) = make_shared_channel();
363 let tx2 = tx1.clone();
364 let tx3 = tx1.clone();
365
366 assert!(!rx.is_closed());
367 drop(tx1);
368 assert!(!rx.is_closed(), "still have tx2, tx3");
369 drop(tx2);
370 assert!(!rx.is_closed(), "still have tx3");
371 drop(tx3);
372 assert!(rx.is_closed(), "all senders gone");
373}
374
375#[test]
380fn queue_stream_waker_rx_wake() {
381 let stream: QueueStream<
382 VecDeque<i32>,
383 i32,
384 fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
385 > = QueueStream::new(VecDeque::new(), poll_deque);
386 stream.rx_wake();
387}
388
389#[test]
390fn queue_stream_waker_is_closed() {
391 let stream: QueueStream<
392 VecDeque<i32>,
393 i32,
394 fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
395 > = QueueStream::new(VecDeque::new(), poll_deque);
396 assert!(!Waker::is_closed(&stream));
397 stream.close_channel();
398 assert!(Waker::is_closed(&stream));
399}