Skip to main content

wreq_proto/
dispatch.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use http::{Request, Response};
8use http_body::Body;
9use pin_project_lite::pin_project;
10use tokio::sync::{mpsc, oneshot};
11
12use super::{body::Incoming, proto::http2::client::ResponseFutMap, Error};
13
14type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
15
16pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
17    let (tx, rx) = mpsc::unbounded_channel();
18    let (giver, taker) = want::new();
19    (
20        Sender {
21            buffered_once: false,
22            giver,
23            inner: tx,
24        },
25        Receiver { inner: rx, taker },
26    )
27}
28
29/// An error when calling `try_send_request`.
30///
31/// There is a possibility of an error occurring on a connection in-between the
32/// time that a request is queued and when it is actually written to the IO
33/// transport. If that happens, it is safe to return the request back to the
34/// caller, as it was never fully sent.
35#[derive(Debug)]
36pub struct TrySendError<T> {
37    pub(crate) error: Error,
38    pub(crate) message: Option<T>,
39}
40
41/// A bounded sender of requests and callbacks for when responses are ready.
42///
43/// While the inner sender is unbounded, the Giver is used to determine
44/// if the Receiver is ready for another request.
45pub(crate) struct Sender<T, U> {
46    /// One message is always allowed, even if the Receiver hasn't asked
47    /// for it yet. This boolean keeps track of whether we've sent one
48    /// without notice.
49    buffered_once: bool,
50    /// The Giver helps watch that the Receiver side has been polled
51    /// when the queue is empty. This helps us know when a request and
52    /// response have been fully processed, and a connection is ready
53    /// for more.
54    giver: want::Giver,
55    /// Actually bounded by the Giver, plus `buffered_once`.
56    inner: mpsc::UnboundedSender<Envelope<T, U>>,
57}
58
59/// An unbounded version.
60///
61/// Cannot poll the Giver, but can still use it to determine if the Receiver
62/// has been dropped. However, this version can be cloned.
63pub(crate) struct UnboundedSender<T, U> {
64    /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
65    giver: want::SharedGiver,
66    inner: mpsc::UnboundedSender<Envelope<T, U>>,
67}
68
69impl<T, U> Sender<T, U> {
70    #[inline]
71    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<super::Result<()>> {
72        self.giver.poll_want(cx).map_err(|_| Error::new_closed())
73    }
74
75    #[inline]
76    pub(crate) fn is_ready(&self) -> bool {
77        self.giver.is_wanting()
78    }
79
80    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
81        if self.giver.give() || !self.buffered_once {
82            // If the receiver is ready *now*, then of course we can send.
83            //
84            // If the receiver isn't ready yet, but we don't have anything
85            // in the channel yet, then allow one message.
86            self.buffered_once = true;
87        } else {
88            return Err(val);
89        };
90
91        let (tx, rx) = oneshot::channel();
92        self.inner
93            .send(Envelope(Some((val, Callback(Some(tx))))))
94            .map(move |_| rx)
95            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
96    }
97
98    #[inline]
99    pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
100        UnboundedSender {
101            giver: self.giver.shared(),
102            inner: self.inner,
103        }
104    }
105}
106
107impl<T, U> UnboundedSender<T, U> {
108    #[inline]
109    pub(crate) fn is_ready(&self) -> bool {
110        !self.giver.is_canceled()
111    }
112
113    #[inline]
114    pub(crate) fn is_closed(&self) -> bool {
115        self.giver.is_canceled()
116    }
117
118    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
119        let (tx, rx) = oneshot::channel();
120        self.inner
121            .send(Envelope(Some((val, Callback(Some(tx))))))
122            .map(move |_| rx)
123            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
124    }
125}
126
127impl<T, U> Clone for UnboundedSender<T, U> {
128    #[inline]
129    fn clone(&self) -> Self {
130        UnboundedSender {
131            giver: self.giver.clone(),
132            inner: self.inner.clone(),
133        }
134    }
135}
136
137pub(crate) struct Receiver<T, U> {
138    inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
139    taker: want::Taker,
140}
141
142impl<T, U> Receiver<T, U> {
143    pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
144        match self.inner.poll_recv(cx) {
145            Poll::Ready(item) => {
146                Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
147            }
148            Poll::Pending => {
149                self.taker.want();
150                Poll::Pending
151            }
152        }
153    }
154
155    #[inline]
156    pub(crate) fn close(&mut self) {
157        self.taker.cancel();
158        self.inner.close();
159    }
160
161    #[inline]
162    pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
163        use futures_util::FutureExt;
164        match self.inner.recv().now_or_never() {
165            Some(Some(mut env)) => env.0.take(),
166            _ => None,
167        }
168    }
169}
170
171impl<T, U> Drop for Receiver<T, U> {
172    #[inline]
173    fn drop(&mut self) {
174        // Notify the giver about the closure first, before dropping
175        // the mpsc::Receiver.
176        self.taker.cancel();
177    }
178}
179
180struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
181
182impl<T, U> Drop for Envelope<T, U> {
183    fn drop(&mut self) {
184        if let Some((val, cb)) = self.0.take() {
185            cb.send(Err(TrySendError {
186                error: Error::new_canceled().with("connection closed"),
187                message: Some(val),
188            }));
189        }
190    }
191}
192
193pub(crate) struct Callback<T, U>(Option<oneshot::Sender<Result<U, TrySendError<T>>>>);
194
195impl<T, U> Drop for Callback<T, U> {
196    fn drop(&mut self) {
197        if let Some(tx) = self.0.take() {
198            let _ = tx.send(Err(TrySendError {
199                error: dispatch_gone(),
200                message: None,
201            }));
202        }
203    }
204}
205
206#[cold]
207fn dispatch_gone() -> Error {
208    // FIXME(nox): What errors do we want here?
209    Error::new_user_dispatch_gone().with(if std::thread::panicking() {
210        "user code panicked"
211    } else {
212        "runtime dropped the dispatch task"
213    })
214}
215
216impl<T, U> Callback<T, U> {
217    const MISSING_SENDER: &'static str = "callback sender missing";
218
219    #[inline]
220    pub(crate) fn is_canceled(&self) -> bool {
221        self.0.as_ref().expect(Self::MISSING_SENDER).is_closed()
222    }
223
224    #[inline]
225    pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
226        self.0.as_mut().expect(Self::MISSING_SENDER).poll_closed(cx)
227    }
228
229    #[inline]
230    pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
231        let _ = self.0.take().expect(Self::MISSING_SENDER).send(val);
232    }
233}
234
235impl<T> TrySendError<T> {
236    /// Take the message from this error.
237    ///
238    /// The message will not always have been recovered. If an error occurs
239    /// after the message has been serialized onto the connection, it will not
240    /// be available here.
241    #[inline]
242    pub fn take_message(&mut self) -> Option<T> {
243        self.message.take()
244    }
245
246    /// Consumes this to return the inner error.
247    #[inline]
248    pub fn into_error(self) -> Error {
249        self.error
250    }
251}
252
253pin_project! {
254    pub(crate) struct SendWhen<B>
255    where
256        B: Body,
257        B: 'static,
258    {
259        #[pin]
260        pub(crate) when: ResponseFutMap<B>,
261        #[pin]
262        pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
263    }
264}
265
266impl<B> Future for SendWhen<B>
267where
268    B: Body + 'static,
269    B::Data: Send,
270{
271    type Output = ();
272
273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274        let mut this = self.project();
275        let mut call_back = this.call_back.take().expect("polled after complete");
276
277        match Pin::new(&mut this.when).poll(cx) {
278            Poll::Ready(Ok(res)) => {
279                call_back.send(Ok(res));
280                Poll::Ready(())
281            }
282            Poll::Pending => {
283                // check if the callback is canceled
284                match call_back.poll_canceled(cx) {
285                    Poll::Ready(v) => v,
286                    Poll::Pending => {
287                        // Move call_back back to struct before return
288                        this.call_back.set(Some(call_back));
289                        return Poll::Pending;
290                    }
291                };
292                trace!("send_when canceled");
293                // Tell pipe_task to reset the h2 stream so that
294                // RST_STREAM is sent and flow-control capacity freed.
295                this.when.as_mut().cancel();
296                Poll::Ready(())
297            }
298            Poll::Ready(Err((error, message))) => {
299                call_back.send(Err(TrySendError { error, message }));
300                Poll::Ready(())
301            }
302        }
303    }
304}