queue_ext/
queue_sender.rs

1use std::fmt;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::Sink;
9use pin_project::{pin_project, pinned_drop};
10
11use super::{Action, Reply, SendError, TrySendError, Waker};
12
13#[pin_project(PinnedDrop)]
14pub struct QueueSender<S: Waker, Item, F, R> {
15    #[pin]
16    s: S,
17    #[pin]
18    f: F,
19    num_senders: Arc<AtomicUsize>,
20    _item: PhantomData<Item>,
21    _r: PhantomData<R>,
22}
23
24unsafe impl<S: Waker, Item, F, R> Sync for QueueSender<S, Item, F, R> {}
25
26unsafe impl<S: Waker, Item, F, R> Send for QueueSender<S, Item, F, R> {}
27
28impl<S, Item, F, R> Clone for QueueSender<S, Item, F, R>
29where
30    S: Clone + Waker,
31    F: Clone,
32{
33    #[inline]
34    fn clone(&self) -> Self {
35        //if !self.s.is_closed() {
36        self.num_senders.fetch_add(1, Ordering::SeqCst);
37        //}
38        Self {
39            s: self.s.clone(),
40            f: self.f.clone(),
41            num_senders: self.num_senders.clone(),
42            _item: PhantomData,
43            _r: PhantomData,
44        }
45    }
46}
47
48#[pinned_drop]
49impl<S: Waker, Item, F, R> PinnedDrop for QueueSender<S, Item, F, R> {
50    fn drop(self: Pin<&mut Self>) {
51        self.set_closed();
52    }
53}
54
55impl<S, Item, F, R> fmt::Debug for QueueSender<S, Item, F, R>
56where
57    S: fmt::Debug + Waker,
58{
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("QueueSender")
61            .field("stream", &self.s)
62            .finish()
63    }
64}
65
66impl<S, Item, F, R> QueueSender<S, Item, F, R>
67where
68    S: Waker,
69{
70    #[inline]
71    fn set_closed(&self) -> usize {
72        let prev = self.num_senders.fetch_sub(1, Ordering::SeqCst);
73        if prev == 1 {
74            self.s.close_channel();
75        }
76        prev
77    }
78}
79
80impl<S, Item, F, R> QueueSender<S, Item, F, R>
81where
82    S: Waker,
83    F: Fn(&mut S, Action<Item>) -> Reply<R>,
84{
85    #[inline]
86    pub(super) fn new(s: S, f: F) -> Self {
87        Self {
88            s,
89            f,
90            num_senders: Arc::new(AtomicUsize::new(1)),
91            _item: PhantomData,
92            _r: PhantomData,
93        }
94    }
95
96    #[inline]
97    pub fn try_send(&mut self, item: Item) -> Result<R, TrySendError<Item>> {
98        if self.s.is_closed() {
99            return Err(SendError::disconnected(Some(item)));
100        }
101        if self.is_full() {
102            return Err(TrySendError::full(item));
103        }
104        let res = (self.f)(&mut self.s, Action::Send(item));
105        self.s.rx_wake();
106        if let Reply::Send(r) = res {
107            Ok(r)
108        } else {
109            unreachable!()
110        }
111    }
112
113    #[inline]
114    pub fn is_full(&mut self) -> bool {
115        match (self.f)(&mut self.s, Action::IsFull) {
116            Reply::IsFull(reply) => reply,
117            _ => unreachable!(),
118        }
119    }
120
121    #[inline]
122    pub fn is_empty(&mut self) -> bool {
123        match (self.f)(&mut self.s, Action::IsEmpty) {
124            Reply::IsEmpty(reply) => reply,
125            _ => unreachable!(),
126        }
127    }
128
129    #[inline]
130    pub fn len(&mut self) -> usize {
131        match (self.f)(&mut self.s, Action::Len) {
132            Reply::Len(reply) => reply,
133            _ => unreachable!(),
134        }
135    }
136}
137
138impl<S, Item, F, R> Sink<Item> for QueueSender<S, Item, F, R>
139where
140    S: Waker + Unpin,
141    F: Fn(&mut S, Action<Item>) -> Reply<R>,
142{
143    type Error = SendError<Item>;
144
145    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146        if self.s.is_closed() {
147            return Poll::Ready(Err(SendError::disconnected(None)));
148        }
149        let mut this = self.project();
150        match (this.f)(&mut this.s, Action::IsFull) {
151            Reply::IsFull(true) => {
152                this.s.tx_park(cx.waker().clone());
153                Poll::Pending
154            }
155            Reply::IsFull(false) => Poll::Ready(Ok(())),
156            _ => unreachable!(),
157        }
158    }
159
160    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
161        if self.s.is_closed() {
162            return Err(SendError::disconnected(Some(item)));
163        }
164        let mut this = self.project();
165        let _ = (this.f)(&mut this.s, Action::Send(item));
166        this.s.rx_wake();
167        Ok(())
168    }
169
170    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171        if self.s.is_closed() {
172            return Poll::Ready(Err(SendError::disconnected(None)));
173        }
174        Poll::Ready(Ok(()))
175    }
176
177    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178        if self.s.is_closed() {
179            return Poll::Ready(Err(SendError::disconnected(None)));
180        }
181        if self.set_closed() > 1 {
182            return Poll::Ready(Ok(()));
183        }
184
185        let mut this = self.project();
186        match (this.f)(&mut this.s, Action::IsEmpty) {
187            Reply::IsEmpty(true) => Poll::Ready(Ok(())),
188            Reply::IsEmpty(false) => {
189                this.s.tx_park(cx.waker().clone());
190                Poll::Pending
191            }
192            _ => unreachable!(),
193        }
194    }
195}
196
197impl<S: Unpin + Waker, Item, F, R> std::convert::AsMut<S> for QueueSender<S, Item, F, R> {
198    #[inline]
199    fn as_mut(&mut self) -> &mut S {
200        &mut self.s
201    }
202}
203
204impl<S: Waker, Item, F, R> std::convert::AsRef<S> for QueueSender<S, Item, F, R> {
205    #[inline]
206    fn as_ref(&self) -> &S {
207        &self.s
208    }
209}