1use std::fmt;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::Sink;
9use pin_project::{pin_project, pinned_drop};
10
11use super::{Action, Reply, SendError, TrySendError, Waker};
12
13#[pin_project(PinnedDrop)]
14pub struct QueueSender<S: Waker, Item, F, R> {
15 #[pin]
16 s: S,
17 #[pin]
18 f: F,
19 num_senders: Arc<AtomicUsize>,
20 _item: PhantomData<Item>,
21 _r: PhantomData<R>,
22}
23
24unsafe impl<S: Waker, Item, F, R> Sync for QueueSender<S, Item, F, R> {}
25
26unsafe impl<S: Waker, Item, F, R> Send for QueueSender<S, Item, F, R> {}
27
28impl<S, Item, F, R> Clone for QueueSender<S, Item, F, R>
29where
30 S: Clone + Waker,
31 F: Clone,
32{
33 #[inline]
34 fn clone(&self) -> Self {
35 self.num_senders.fetch_add(1, Ordering::SeqCst);
37 Self {
39 s: self.s.clone(),
40 f: self.f.clone(),
41 num_senders: self.num_senders.clone(),
42 _item: PhantomData,
43 _r: PhantomData,
44 }
45 }
46}
47
48#[pinned_drop]
49impl<S: Waker, Item, F, R> PinnedDrop for QueueSender<S, Item, F, R> {
50 fn drop(self: Pin<&mut Self>) {
51 self.set_closed();
52 }
53}
54
55impl<S, Item, F, R> fmt::Debug for QueueSender<S, Item, F, R>
56where
57 S: fmt::Debug + Waker,
58{
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("QueueSender")
61 .field("stream", &self.s)
62 .finish()
63 }
64}
65
66impl<S, Item, F, R> QueueSender<S, Item, F, R>
67where
68 S: Waker,
69{
70 #[inline]
71 fn set_closed(&self) -> usize {
72 let prev = self.num_senders.fetch_sub(1, Ordering::SeqCst);
73 if prev == 1 {
74 self.s.close_channel();
75 }
76 prev
77 }
78}
79
80impl<S, Item, F, R> QueueSender<S, Item, F, R>
81where
82 S: Waker,
83 F: Fn(&mut S, Action<Item>) -> Reply<R>,
84{
85 #[inline]
86 pub(super) fn new(s: S, f: F) -> Self {
87 Self {
88 s,
89 f,
90 num_senders: Arc::new(AtomicUsize::new(1)),
91 _item: PhantomData,
92 _r: PhantomData,
93 }
94 }
95
96 #[inline]
97 pub fn try_send(&mut self, item: Item) -> Result<R, TrySendError<Item>> {
98 if self.s.is_closed() {
99 return Err(SendError::disconnected(Some(item)));
100 }
101 if self.is_full() {
102 return Err(TrySendError::full(item));
103 }
104 let res = (self.f)(&mut self.s, Action::Send(item));
105 self.s.rx_wake();
106 if let Reply::Send(r) = res {
107 Ok(r)
108 } else {
109 unreachable!()
110 }
111 }
112
113 #[inline]
114 pub fn is_full(&mut self) -> bool {
115 match (self.f)(&mut self.s, Action::IsFull) {
116 Reply::IsFull(reply) => reply,
117 _ => unreachable!(),
118 }
119 }
120
121 #[inline]
122 pub fn is_empty(&mut self) -> bool {
123 match (self.f)(&mut self.s, Action::IsEmpty) {
124 Reply::IsEmpty(reply) => reply,
125 _ => unreachable!(),
126 }
127 }
128
129 #[inline]
130 pub fn len(&mut self) -> usize {
131 match (self.f)(&mut self.s, Action::Len) {
132 Reply::Len(reply) => reply,
133 _ => unreachable!(),
134 }
135 }
136}
137
138impl<S, Item, F, R> Sink<Item> for QueueSender<S, Item, F, R>
139where
140 S: Waker + Unpin,
141 F: Fn(&mut S, Action<Item>) -> Reply<R>,
142{
143 type Error = SendError<Item>;
144
145 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 if self.s.is_closed() {
147 return Poll::Ready(Err(SendError::disconnected(None)));
148 }
149 let mut this = self.project();
150 match (this.f)(&mut this.s, Action::IsFull) {
151 Reply::IsFull(true) => {
152 this.s.tx_park(cx.waker().clone());
153 Poll::Pending
154 }
155 Reply::IsFull(false) => Poll::Ready(Ok(())),
156 _ => unreachable!(),
157 }
158 }
159
160 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
161 if self.s.is_closed() {
162 return Err(SendError::disconnected(Some(item)));
163 }
164 let mut this = self.project();
165 let _ = (this.f)(&mut this.s, Action::Send(item));
166 this.s.rx_wake();
167 Ok(())
168 }
169
170 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171 if self.s.is_closed() {
172 return Poll::Ready(Err(SendError::disconnected(None)));
173 }
174 Poll::Ready(Ok(()))
175 }
176
177 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178 if self.s.is_closed() {
179 return Poll::Ready(Err(SendError::disconnected(None)));
180 }
181 if self.set_closed() > 1 {
182 return Poll::Ready(Ok(()));
183 }
184
185 let mut this = self.project();
186 match (this.f)(&mut this.s, Action::IsEmpty) {
187 Reply::IsEmpty(true) => Poll::Ready(Ok(())),
188 Reply::IsEmpty(false) => {
189 this.s.tx_park(cx.waker().clone());
190 Poll::Pending
191 }
192 _ => unreachable!(),
193 }
194 }
195}
196
197impl<S: Unpin + Waker, Item, F, R> std::convert::AsMut<S> for QueueSender<S, Item, F, R> {
198 #[inline]
199 fn as_mut(&mut self) -> &mut S {
200 &mut self.s
201 }
202}
203
204impl<S: Waker, Item, F, R> std::convert::AsRef<S> for QueueSender<S, Item, F, R> {
205 #[inline]
206 fn as_ref(&self) -> &S {
207 &self.s
208 }
209}