queue_ext/
lib.rs

1use std::fmt;
2use std::marker::Unpin;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::Stream;
7
8#[allow(unreachable_pub)]
9pub use self::queue_sender::QueueSender;
10#[allow(unreachable_pub)]
11pub use self::queue_stream::QueueStream;
12
13mod queue_sender;
14mod queue_stream;
15
16pub trait Waker {
17    fn rx_wake(&self);
18    fn tx_park(&self, w: std::task::Waker);
19    fn close_channel(&self);
20    fn is_closed(&self) -> bool;
21}
22
23impl<T: ?Sized> QueueExt for T {}
24
25pub trait QueueExt {
26    #[inline]
27    fn queue_stream<Item, F>(self, f: F) -> QueueStream<Self, Item, F>
28    where
29        Self: Sized + Unpin,
30        F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
31    {
32        assert_stream::<Item, _>(QueueStream::new(self, f))
33    }
34
35    #[inline]
36    fn queue_sender<Item, F, R>(self, f: F) -> QueueSender<Self, Item, F, R>
37    where
38        Self: Sized + Waker,
39        F: Fn(&mut Self, Action<Item>) -> Reply<R>,
40    {
41        QueueSender::new(self, f)
42    }
43
44    #[inline]
45    #[allow(clippy::type_complexity)]
46    fn queue_channel<Item, F1, R, F2>(
47        self,
48        f1: F1,
49        f2: F2,
50    ) -> (
51        QueueSender<QueueStream<Self, Item, F2>, Item, F1, R>,
52        QueueStream<Self, Item, F2>,
53    )
54    where
55        Self: Sized + Unpin + Clone,
56        F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
57        F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
58    {
59        queue_channel(self, f1, f2)
60    }
61}
62
63#[allow(clippy::type_complexity)]
64#[inline]
65pub fn queue_channel<Q, Item, F1, R, F2>(
66    q: Q,
67    f1: F1,
68    f2: F2,
69) -> (
70    QueueSender<QueueStream<Q, Item, F2>, Item, F1, R>,
71    QueueStream<Q, Item, F2>,
72)
73where
74    Q: Sized + Unpin + Clone,
75    F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
76    F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
77{
78    let rx = QueueStream::new(q, f2);
79    let tx = QueueSender::new(rx.clone(), f1);
80    (tx, rx)
81}
82
83pub enum Action<Item> {
84    Send(Item),
85    IsFull,
86    IsEmpty,
87    Len,
88}
89
90pub enum Reply<R> {
91    Send(R),
92    IsFull(bool),
93    IsEmpty(bool),
94    Len(usize),
95}
96
97pub type TrySendError<T> = SendError<T>;
98
99#[derive(Clone, PartialEq, Eq)]
100pub struct SendError<T> {
101    kind: SendErrorKind,
102    val: Option<T>,
103}
104
105impl<T> SendError<T> {
106    #[inline]
107    pub fn full(val: T) -> Self {
108        SendError {
109            kind: SendErrorKind::Full,
110            val: Some(val),
111        }
112    }
113
114    #[inline]
115    pub fn disconnected(val: Option<T>) -> Self {
116        SendError {
117            kind: SendErrorKind::Disconnected,
118            val,
119        }
120    }
121}
122
123impl<T> fmt::Debug for SendError<T> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("SendError")
126            .field("kind", &self.kind)
127            .finish()
128    }
129}
130
131impl<T> fmt::Display for SendError<T> {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        if self.is_full() {
134            write!(f, "send failed because mpsc is full")
135        } else {
136            write!(f, "send failed because receiver is gone")
137        }
138    }
139}
140
141#[allow(dead_code)]
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub enum SendErrorKind {
144    Full,
145    Disconnected,
146}
147
148impl<T: core::any::Any> std::error::Error for SendError<T> {}
149
150impl<T> SendError<T> {
151    /// Returns `true` if this error is a result of the mpsc being full.
152    #[inline]
153    pub fn is_full(&self) -> bool {
154        matches!(self.kind, SendErrorKind::Full)
155    }
156
157    /// Returns `true` if this error is a result of the receiver being dropped.
158    #[inline]
159    pub fn is_disconnected(&self) -> bool {
160        matches!(self.kind, SendErrorKind::Disconnected)
161    }
162
163    /// Returns the message that was attempted to be sent but failed.
164    #[inline]
165    pub fn into_inner(self) -> Option<T> {
166        self.val
167    }
168}
169
170// Just a helper function to ensure the streams we're returning all have the
171// right implementations.
172#[inline]
173pub(crate) fn assert_stream<T, S>(stream: S) -> S
174where
175    S: Stream<Item = T>,
176{
177    stream
178}