Skip to main content

queue_ext/
lib.rs

1use std::fmt;
2use std::marker::Unpin;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::Stream;
7
8#[allow(unreachable_pub)]
9pub use self::queue_sender::QueueSender;
10#[allow(unreachable_pub)]
11pub use self::queue_stream::QueueStream;
12
13mod queue_sender;
14mod queue_stream;
15
16pub trait Waker {
17    fn rx_wake(&self);
18    fn tx_park(&self, w: std::task::Waker);
19    fn close_channel(&self);
20    fn is_closed(&self) -> bool;
21}
22
23impl<T: ?Sized> QueueExt for T {}
24
25pub trait QueueExt {
26    #[inline]
27    fn queue_stream<Item, F>(self, f: F) -> QueueStream<Self, Item, F>
28    where
29        Self: Sized + Unpin,
30        F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
31    {
32        assert_stream::<Item, _>(QueueStream::new(self, f))
33    }
34
35    #[inline]
36    fn queue_sender<Item, F, R>(self, f: F) -> QueueSender<Self, Item, F, R>
37    where
38        Self: Sized + Waker,
39        F: Fn(&mut Self, Action<Item>) -> Reply<R>,
40    {
41        QueueSender::new(self, f)
42    }
43
44    #[inline]
45    #[allow(clippy::type_complexity)]
46    fn queue_channel<Item, F1, R, F2>(
47        self,
48        f1: F1,
49        f2: F2,
50    ) -> (
51        QueueSender<QueueStream<Self, Item, F2>, Item, F1, R>,
52        QueueStream<Self, Item, F2>,
53    )
54    where
55        Self: Sized + Unpin + Clone,
56        F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
57        F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
58    {
59        queue_channel(self, f1, f2)
60    }
61}
62
63#[allow(clippy::type_complexity)]
64#[inline]
65pub fn queue_channel<Q, Item, F1, R, F2>(
66    q: Q,
67    f1: F1,
68    f2: F2,
69) -> (
70    QueueSender<QueueStream<Q, Item, F2>, Item, F1, R>,
71    QueueStream<Q, Item, F2>,
72)
73where
74    Q: Sized + Unpin + Clone,
75    F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
76    F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
77{
78    let rx = QueueStream::new(q, f2);
79    let tx = QueueSender::new(rx.clone(), f1);
80    (tx, rx)
81}
82
83pub enum Action<Item> {
84    Send(Item),
85    IsFull,
86    IsEmpty,
87    Len,
88}
89
90pub enum Reply<R> {
91    Send(R),
92    IsFull(bool),
93    IsEmpty(bool),
94    Len(usize),
95}
96
97pub type TrySendError<T> = SendError<T>;
98
99#[derive(Clone, PartialEq, Eq)]
100pub struct SendError<T> {
101    kind: SendErrorKind,
102    val: Option<T>,
103}
104
105impl<T> SendError<T> {
106    #[inline]
107    pub fn full(val: T) -> Self {
108        SendError {
109            kind: SendErrorKind::Full,
110            val: Some(val),
111        }
112    }
113
114    #[inline]
115    pub fn disconnected(val: Option<T>) -> Self {
116        SendError {
117            kind: SendErrorKind::Disconnected,
118            val,
119        }
120    }
121}
122
123impl<T> fmt::Debug for SendError<T> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("SendError")
126            .field("kind", &self.kind)
127            .finish()
128    }
129}
130
131impl<T> fmt::Display for SendError<T> {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        if self.is_full() {
134            write!(f, "send failed because mpsc is full")
135        } else {
136            write!(f, "send failed because receiver is gone")
137        }
138    }
139}
140
141#[allow(dead_code)]
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub enum SendErrorKind {
144    Full,
145    Disconnected,
146}
147
148impl<T: core::any::Any> std::error::Error for SendError<T> {}
149
150impl<T> SendError<T> {
151    /// Returns `true` if this error is a result of the mpsc being full.
152    #[inline]
153    pub fn is_full(&self) -> bool {
154        matches!(self.kind, SendErrorKind::Full)
155    }
156
157    /// Returns `true` if this error is a result of the receiver being dropped.
158    #[inline]
159    pub fn is_disconnected(&self) -> bool {
160        matches!(self.kind, SendErrorKind::Disconnected)
161    }
162
163    /// Returns the message that was attempted to be sent but failed.
164    #[inline]
165    pub fn into_inner(self) -> Option<T> {
166        self.val
167    }
168}
169
170// Just a helper function to ensure the streams we're returning all have the
171// right implementations.
172#[inline]
173pub(crate) fn assert_stream<T, S>(stream: S) -> S
174where
175    S: Stream<Item = T>,
176{
177    stream
178}
179
180#[cfg(test)]
181use futures::pin_mut;
182#[cfg(test)]
183use futures::task::noop_waker;
184#[cfg(test)]
185use std::collections::VecDeque;
186#[cfg(test)]
187use std::sync::{Arc, Mutex};
188
189// ---------------------------------------------------------------------------
190// VecDeque-based helpers (no sharing — works when Q is not cloned)
191// ---------------------------------------------------------------------------
192
193#[cfg(test)]
194fn poll_deque(pin_q: Pin<&mut VecDeque<i32>>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
195    Poll::Ready(pin_q.get_mut().pop_front())
196}
197
198// ---------------------------------------------------------------------------
199// SharedQueue — Arc-backed queue that shares data on Clone
200// Required by queue_channel() which clones the underlying Q.
201// ---------------------------------------------------------------------------
202
203#[cfg(test)]
204#[derive(Clone)]
205struct SharedQueue(Arc<Mutex<VecDeque<i32>>>);
206
207#[cfg(test)]
208type SharedPollFn = fn(Pin<&mut SharedQueue>, &mut Context<'_>) -> Poll<Option<i32>>;
209
210#[cfg(test)]
211type SharedHandlerFn =
212    fn(&mut QueueStream<SharedQueue, i32, SharedPollFn>, Action<i32>) -> Reply<i32>;
213
214#[cfg(test)]
215fn poll_shared(pin_q: Pin<&mut SharedQueue>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
216    Poll::Ready(pin_q.get_mut().0.lock().unwrap().pop_front())
217}
218
219#[cfg(test)]
220fn push_shared(
221    s: &mut QueueStream<SharedQueue, i32, SharedPollFn>,
222    action: Action<i32>,
223) -> Reply<i32> {
224    match action {
225        Action::Send(item) => {
226            s.0.lock().unwrap().push_back(item);
227            Reply::Send(item)
228        }
229        Action::IsFull => Reply::IsFull(false),
230        Action::IsEmpty => Reply::IsEmpty(s.0.lock().unwrap().is_empty()),
231        Action::Len => Reply::Len(s.0.lock().unwrap().len()),
232    }
233}
234
235#[cfg(test)]
236fn make_shared_channel() -> (
237    QueueSender<QueueStream<SharedQueue, i32, SharedPollFn>, i32, SharedHandlerFn, i32>,
238    QueueStream<SharedQueue, i32, SharedPollFn>,
239) {
240    let q = SharedQueue(Arc::new(Mutex::new(VecDeque::new())));
241    queue_channel(
242        q,
243        push_shared as SharedHandlerFn,
244        poll_shared as SharedPollFn,
245    )
246}
247
248// ---------------------------------------------------------------------------
249// queue_channel end-to-end
250// ---------------------------------------------------------------------------
251
252#[test]
253fn queue_channel_send_receive() {
254    let (mut tx, rx) = make_shared_channel();
255
256    tx.try_send(1).unwrap();
257    tx.try_send(2).unwrap();
258    tx.try_send(3).unwrap();
259
260    pin_mut!(rx);
261    let waker = noop_waker();
262    let mut cx = Context::from_waker(&waker);
263
264    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
265    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
266    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(3)));
267    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(None));
268}
269
270#[test]
271fn queue_channel_sender_is_full() {
272    let (mut tx, _rx) = make_shared_channel();
273    assert!(!tx.is_full());
274    tx.try_send(1).unwrap();
275    assert_eq!(tx.len(), 1);
276}
277
278#[test]
279fn queue_channel_receiver_closed_when_sender_dropped() {
280    let (tx, rx) = make_shared_channel();
281    assert!(!rx.is_closed());
282    drop(tx);
283    assert!(rx.is_closed());
284
285    pin_mut!(rx);
286    let waker = noop_waker();
287    let mut cx = Context::from_waker(&waker);
288    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(None));
289}
290
291// ---------------------------------------------------------------------------
292// QueueExt trait
293// ---------------------------------------------------------------------------
294
295#[test]
296fn queue_ext_queue_stream() {
297    let q: VecDeque<i32> = VecDeque::from([5, 10]);
298    let stream = q.queue_stream(
299        poll_deque as fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
300    );
301    pin_mut!(stream);
302
303    let waker = noop_waker();
304    let mut cx = Context::from_waker(&waker);
305
306    assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(5)));
307    assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(Some(10)));
308    assert_eq!(stream.as_mut().poll_next(&mut cx), Poll::Ready(None));
309}
310
311#[test]
312fn queue_ext_queue_sender() {
313    let stream_obj: QueueStream<SharedQueue, i32, SharedPollFn> = QueueStream::new(
314        SharedQueue(Arc::new(Mutex::new(VecDeque::new()))),
315        poll_shared as SharedPollFn,
316    );
317    let mut sender = stream_obj.queue_sender(push_shared as SharedHandlerFn);
318    let r = sender.try_send(7);
319    assert!(r.is_ok());
320    assert_eq!(r.unwrap(), 7);
321}
322
323#[test]
324fn queue_ext_queue_channel() {
325    use futures::StreamExt;
326
327    let q = SharedQueue(Arc::new(Mutex::new(VecDeque::new())));
328    let (mut tx, rx) = q.queue_channel(push_shared as SharedHandlerFn, poll_shared as SharedPollFn);
329    tx.try_send(100).unwrap();
330
331    let items: Vec<i32> = futures::executor::block_on(rx.collect::<Vec<i32>>());
332    assert_eq!(items, vec![100]);
333}
334
335// ---------------------------------------------------------------------------
336// Multiple senders
337// ---------------------------------------------------------------------------
338
339#[test]
340fn multiple_senders() {
341    let (mut tx1, rx) = make_shared_channel();
342    let mut tx2 = tx1.clone();
343    let mut tx3 = tx1.clone();
344
345    tx1.try_send(1).unwrap();
346    tx2.try_send(2).unwrap();
347    tx3.try_send(3).unwrap();
348
349    assert_eq!(tx1.len(), 3);
350
351    pin_mut!(rx);
352    let waker = noop_waker();
353    let mut cx = Context::from_waker(&waker);
354
355    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(1)));
356    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(2)));
357    assert_eq!(rx.as_mut().poll_next(&mut cx), Poll::Ready(Some(3)));
358}
359
360#[test]
361fn channel_closes_only_when_last_sender_dropped() {
362    let (tx1, rx) = make_shared_channel();
363    let tx2 = tx1.clone();
364    let tx3 = tx1.clone();
365
366    assert!(!rx.is_closed());
367    drop(tx1);
368    assert!(!rx.is_closed(), "still have tx2, tx3");
369    drop(tx2);
370    assert!(!rx.is_closed(), "still have tx3");
371    drop(tx3);
372    assert!(rx.is_closed(), "all senders gone");
373}
374
375// ---------------------------------------------------------------------------
376// Waker trait impl on QueueStream
377// ---------------------------------------------------------------------------
378
379#[test]
380fn queue_stream_waker_rx_wake() {
381    let stream: QueueStream<
382        VecDeque<i32>,
383        i32,
384        fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
385    > = QueueStream::new(VecDeque::new(), poll_deque);
386    stream.rx_wake();
387}
388
389#[test]
390fn queue_stream_waker_is_closed() {
391    let stream: QueueStream<
392        VecDeque<i32>,
393        i32,
394        fn(Pin<&mut VecDeque<i32>>, &mut Context<'_>) -> Poll<Option<i32>>,
395    > = QueueStream::new(VecDeque::new(), poll_deque);
396    assert!(!Waker::is_closed(&stream));
397    stream.close_channel();
398    assert!(Waker::is_closed(&stream));
399}