1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Body::channel utilities. Extracted from Hyper under MIT license.
//! https://github.com/hyperium/hyper/blob/master/LICENSE

use crate::Error;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_channel::{mpsc, oneshot};
use http::HeaderMap;

use super::watch;

type BodySender = mpsc::Sender<Result<Bytes, Error>>;
type TrailersSender = oneshot::Sender<HeaderMap>;

pub(crate) const WANT_PENDING: usize = 1;
pub(crate) const WANT_READY: usize = 2;

/// A sender half created through [`Body::channel()`].
///
/// Useful when wanting to stream chunks from another thread.
///
/// ## Body Closing
///
/// Note that the request body will always be closed normally when the sender is dropped (meaning
/// that the empty terminating chunk will be sent to the remote). If you desire to close the
/// connection with an incomplete response (e.g. in the case of an error during asynchronous
/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
///
/// [`Body::channel()`]: struct.Body.html#method.channel
/// [`Sender::abort()`]: struct.Sender.html#method.abort
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
    pub(crate) want_rx: watch::Receiver,
    pub(crate) data_tx: BodySender,
    pub(crate) trailers_tx: Option<TrailersSender>,
}

impl Sender {
    /// Check to see if this `Sender` can send more data.
    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        // Check if the receiver end has tried polling for the body yet
        ready!(self.poll_want(cx)?);
        self.data_tx
            .poll_ready(cx)
            .map_err(|_| Error::new(SenderError::ChannelClosed))
    }

    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        match self.want_rx.load(cx) {
            WANT_READY => Poll::Ready(Ok(())),
            WANT_PENDING => Poll::Pending,
            watch::CLOSED => Poll::Ready(Err(Error::new(SenderError::ChannelClosed))),
            unexpected => unreachable!("want_rx value: {}", unexpected),
        }
    }

    async fn ready(&mut self) -> Result<(), Error> {
        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
    }

    /// Send data on data channel when it is ready.
    #[allow(unused)]
    pub async fn send_data(&mut self, chunk: Bytes) -> Result<(), Error> {
        self.ready().await?;
        self.data_tx
            .try_send(Ok(chunk))
            .map_err(|_| Error::new(SenderError::ChannelClosed))
    }

    /// Send trailers on trailers channel.
    #[allow(unused)]
    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
        let tx = match self.trailers_tx.take() {
            Some(tx) => tx,
            None => return Err(Error::new(SenderError::ChannelClosed)),
        };
        tx.send(trailers).map_err(|_| Error::new(SenderError::ChannelClosed))
    }

    /// Try to send data on this channel.
    ///
    /// # Errors
    ///
    /// Returns `Err(Bytes)` if the channel could not (currently) accept
    /// another `Bytes`.
    ///
    /// # Note
    ///
    /// This is mostly useful for when trying to send from some other thread
    /// that doesn't have an async context. If in an async context, prefer
    /// `send_data()` instead.
    pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
        self.data_tx
            .try_send(Ok(chunk))
            .map_err(|err| err.into_inner().expect("just sent Ok"))
    }

    /// Send a `SenderError::BodyWriteAborted` error and terminate the stream.
    #[allow(unused)]
    pub fn abort(mut self) {
        self.send_error(Error::new(SenderError::BodyWriteAborted));
    }

    /// Terminate the stream with an error.
    pub fn send_error(&mut self, err: Error) {
        let _ = self
            .data_tx
            // clone so the send works even if buffer is full
            .clone()
            .try_send(Err(err));
    }
}

#[derive(Debug)]
enum SenderError {
    ChannelClosed,
    BodyWriteAborted,
}

impl SenderError {
    fn description(&self) -> &str {
        match self {
            SenderError::BodyWriteAborted => "user body write aborted",
            SenderError::ChannelClosed => "channel closed",
        }
    }
}

impl std::fmt::Display for SenderError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(self.description())
    }
}
impl std::error::Error for SenderError {}