1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
//! Body::channel utilities. Extracted from Hyper under MIT license.
//! https://github.com/hyperium/hyper/blob/master/LICENSE
use crate::Error;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_channel::{mpsc, oneshot};
use http::HeaderMap;
use super::watch;
type BodySender = mpsc::Sender<Result<Bytes, Error>>;
type TrailersSender = oneshot::Sender<HeaderMap>;
pub(crate) const WANT_PENDING: usize = 1;
pub(crate) const WANT_READY: usize = 2;
/// A sender half created through [`Body::channel()`].
///
/// Useful when wanting to stream chunks from another thread.
///
/// ## Body Closing
///
/// Note that the request body will always be closed normally when the sender is dropped (meaning
/// that the empty terminating chunk will be sent to the remote). If you desire to close the
/// connection with an incomplete response (e.g. in the case of an error during asynchronous
/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
///
/// [`Body::channel()`]: struct.Body.html#method.channel
/// [`Sender::abort()`]: struct.Sender.html#method.abort
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
pub(crate) want_rx: watch::Receiver,
pub(crate) data_tx: BodySender,
pub(crate) trailers_tx: Option<TrailersSender>,
}
impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
.poll_ready(cx)
.map_err(|_| Error::new(SenderError::ChannelClosed))
}
fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
match self.want_rx.load(cx) {
WANT_READY => Poll::Ready(Ok(())),
WANT_PENDING => Poll::Pending,
watch::CLOSED => Poll::Ready(Err(Error::new(SenderError::ChannelClosed))),
unexpected => unreachable!("want_rx value: {}", unexpected),
}
}
async fn ready(&mut self) -> Result<(), Error> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
}
/// Send data on data channel when it is ready.
#[allow(unused)]
pub async fn send_data(&mut self, chunk: Bytes) -> Result<(), Error> {
self.ready().await?;
self.data_tx
.try_send(Ok(chunk))
.map_err(|_| Error::new(SenderError::ChannelClosed))
}
/// Send trailers on trailers channel.
#[allow(unused)]
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(Error::new(SenderError::ChannelClosed)),
};
tx.send(trailers).map_err(|_| Error::new(SenderError::ChannelClosed))
}
/// Try to send data on this channel.
///
/// # Errors
///
/// Returns `Err(Bytes)` if the channel could not (currently) accept
/// another `Bytes`.
///
/// # Note
///
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.data_tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}
/// Send a `SenderError::BodyWriteAborted` error and terminate the stream.
#[allow(unused)]
pub fn abort(mut self) {
self.send_error(Error::new(SenderError::BodyWriteAborted));
}
/// Terminate the stream with an error.
pub fn send_error(&mut self, err: Error) {
let _ = self
.data_tx
// clone so the send works even if buffer is full
.clone()
.try_send(Err(err));
}
}
#[derive(Debug)]
enum SenderError {
ChannelClosed,
BodyWriteAborted,
}
impl SenderError {
fn description(&self) -> &str {
match self {
SenderError::BodyWriteAborted => "user body write aborted",
SenderError::ChannelClosed => "channel closed",
}
}
}
impl std::fmt::Display for SenderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.description())
}
}
impl std::error::Error for SenderError {}