1use std::fmt;
2use std::marker::Unpin;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::Stream;
7
8#[allow(unreachable_pub)]
9pub use self::queue_sender::QueueSender;
10#[allow(unreachable_pub)]
11pub use self::queue_stream::QueueStream;
12
13mod queue_sender;
14mod queue_stream;
15
16pub trait Waker {
17 fn rx_wake(&self);
18 fn tx_park(&self, w: std::task::Waker);
19 fn close_channel(&self);
20 fn is_closed(&self) -> bool;
21}
22
23impl<T: ?Sized> QueueExt for T {}
24
25pub trait QueueExt {
26 #[inline]
27 fn queue_stream<Item, F>(self, f: F) -> QueueStream<Self, Item, F>
28 where
29 Self: Sized + Unpin,
30 F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
31 {
32 assert_stream::<Item, _>(QueueStream::new(self, f))
33 }
34
35 #[inline]
36 fn queue_sender<Item, F, R>(self, f: F) -> QueueSender<Self, Item, F, R>
37 where
38 Self: Sized + Waker,
39 F: Fn(&mut Self, Action<Item>) -> Reply<R>,
40 {
41 QueueSender::new(self, f)
42 }
43
44 #[inline]
45 #[allow(clippy::type_complexity)]
46 fn queue_channel<Item, F1, R, F2>(
47 self,
48 f1: F1,
49 f2: F2,
50 ) -> (
51 QueueSender<QueueStream<Self, Item, F2>, Item, F1, R>,
52 QueueStream<Self, Item, F2>,
53 )
54 where
55 Self: Sized + Unpin + Clone,
56 F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
57 F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
58 {
59 queue_channel(self, f1, f2)
60 }
61}
62
63#[allow(clippy::type_complexity)]
64#[inline]
65pub fn queue_channel<Q, Item, F1, R, F2>(
66 q: Q,
67 f1: F1,
68 f2: F2,
69) -> (
70 QueueSender<QueueStream<Q, Item, F2>, Item, F1, R>,
71 QueueStream<Q, Item, F2>,
72)
73where
74 Q: Sized + Unpin + Clone,
75 F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
76 F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
77{
78 let rx = QueueStream::new(q, f2);
79 let tx = QueueSender::new(rx.clone(), f1);
80 (tx, rx)
81}
82
83pub enum Action<Item> {
84 Send(Item),
85 IsFull,
86 IsEmpty,
87 Len,
88}
89
90pub enum Reply<R> {
91 Send(R),
92 IsFull(bool),
93 IsEmpty(bool),
94 Len(usize),
95}
96
97pub type TrySendError<T> = SendError<T>;
98
99#[derive(Clone, PartialEq, Eq)]
100pub struct SendError<T> {
101 kind: SendErrorKind,
102 val: Option<T>,
103}
104
105impl<T> SendError<T> {
106 #[inline]
107 pub fn full(val: T) -> Self {
108 SendError {
109 kind: SendErrorKind::Full,
110 val: Some(val),
111 }
112 }
113
114 #[inline]
115 pub fn disconnected(val: Option<T>) -> Self {
116 SendError {
117 kind: SendErrorKind::Disconnected,
118 val,
119 }
120 }
121}
122
123impl<T> fmt::Debug for SendError<T> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("SendError")
126 .field("kind", &self.kind)
127 .finish()
128 }
129}
130
131impl<T> fmt::Display for SendError<T> {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 if self.is_full() {
134 write!(f, "send failed because mpsc is full")
135 } else {
136 write!(f, "send failed because receiver is gone")
137 }
138 }
139}
140
141#[allow(dead_code)]
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub enum SendErrorKind {
144 Full,
145 Disconnected,
146}
147
148impl<T: core::any::Any> std::error::Error for SendError<T> {}
149
150impl<T> SendError<T> {
151 #[inline]
153 pub fn is_full(&self) -> bool {
154 matches!(self.kind, SendErrorKind::Full)
155 }
156
157 #[inline]
159 pub fn is_disconnected(&self) -> bool {
160 matches!(self.kind, SendErrorKind::Disconnected)
161 }
162
163 #[inline]
165 pub fn into_inner(self) -> Option<T> {
166 self.val
167 }
168}
169
170#[inline]
173pub(crate) fn assert_stream<T, S>(stream: S) -> S
174where
175 S: Stream<Item = T>,
176{
177 stream
178}