lambda_runtime_api_client/body/
sender.rs

1//! Body::channel utilities. Extracted from Hyper under MIT license.
2//! <https://github.com/hyperium/hyper/blob/master/LICENSE>
3
4use crate::Error;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8use futures_channel::{mpsc, oneshot};
9use http::HeaderMap;
10
11use super::watch;
12
13type BodySender = mpsc::Sender<Result<Bytes, Error>>;
14type TrailersSender = oneshot::Sender<HeaderMap>;
15
16pub(crate) const WANT_PENDING: usize = 1;
17pub(crate) const WANT_READY: usize = 2;
18
19/// A sender half created through [`Body::channel()`].
20///
21/// Useful when wanting to stream chunks from another thread.
22///
23/// ## Body Closing
24///
25/// Note that the request body will always be closed normally when the sender is dropped (meaning
26/// that the empty terminating chunk will be sent to the remote). If you desire to close the
27/// connection with an incomplete response (e.g. in the case of an error during asynchronous
28/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
29///
30/// [`Body::channel()`]: struct.Body.html#method.channel
31/// [`Sender::abort()`]: struct.Sender.html#method.abort
32#[must_use = "Sender does nothing unless sent on"]
33pub struct Sender {
34    pub(crate) want_rx: watch::Receiver,
35    pub(crate) data_tx: BodySender,
36    pub(crate) trailers_tx: Option<TrailersSender>,
37}
38
39impl Sender {
40    /// Check to see if this `Sender` can send more data.
41    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
42        // Check if the receiver end has tried polling for the body yet
43        ready!(self.poll_want(cx)?);
44        self.data_tx
45            .poll_ready(cx)
46            .map_err(|_| Error::new(SenderError::ChannelClosed))
47    }
48
49    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
50        match self.want_rx.load(cx) {
51            WANT_READY => Poll::Ready(Ok(())),
52            WANT_PENDING => Poll::Pending,
53            watch::CLOSED => Poll::Ready(Err(Error::new(SenderError::ChannelClosed))),
54            unexpected => unreachable!("want_rx value: {}", unexpected),
55        }
56    }
57
58    async fn ready(&mut self) -> Result<(), Error> {
59        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
60    }
61
62    /// Send data on data channel when it is ready.
63    #[allow(unused)]
64    pub async fn send_data(&mut self, chunk: Bytes) -> Result<(), Error> {
65        self.ready().await?;
66        self.data_tx
67            .try_send(Ok(chunk))
68            .map_err(|_| Error::new(SenderError::ChannelClosed))
69    }
70
71    /// Send trailers on trailers channel.
72    #[allow(unused)]
73    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
74        let tx = match self.trailers_tx.take() {
75            Some(tx) => tx,
76            None => return Err(Error::new(SenderError::ChannelClosed)),
77        };
78        tx.send(trailers).map_err(|_| Error::new(SenderError::ChannelClosed))
79    }
80
81    /// Try to send data on this channel.
82    ///
83    /// # Errors
84    ///
85    /// Returns `Err(Bytes)` if the channel could not (currently) accept
86    /// another `Bytes`.
87    ///
88    /// # Note
89    ///
90    /// This is mostly useful for when trying to send from some other thread
91    /// that doesn't have an async context. If in an async context, prefer
92    /// `send_data()` instead.
93    pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
94        self.data_tx
95            .try_send(Ok(chunk))
96            .map_err(|err| err.into_inner().expect("just sent Ok"))
97    }
98
99    /// Send a `SenderError::BodyWriteAborted` error and terminate the stream.
100    #[allow(unused)]
101    pub fn abort(mut self) {
102        self.send_error(Error::new(SenderError::BodyWriteAborted));
103    }
104
105    /// Terminate the stream with an error.
106    pub fn send_error(&mut self, err: Error) {
107        let _ = self
108            .data_tx
109            // clone so the send works even if buffer is full
110            .clone()
111            .try_send(Err(err));
112    }
113}
114
115#[derive(Debug)]
116enum SenderError {
117    ChannelClosed,
118    BodyWriteAborted,
119}
120
121impl SenderError {
122    fn description(&self) -> &str {
123        match self {
124            SenderError::BodyWriteAborted => "user body write aborted",
125            SenderError::ChannelClosed => "channel closed",
126        }
127    }
128}
129
130impl std::fmt::Display for SenderError {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.write_str(self.description())
133    }
134}
135impl std::error::Error for SenderError {}