lambda_runtime_api_client/body/
sender.rs1use crate::Error;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8use futures_channel::{mpsc, oneshot};
9use http::HeaderMap;
10
11use super::watch;
12
13type BodySender = mpsc::Sender<Result<Bytes, Error>>;
14type TrailersSender = oneshot::Sender<HeaderMap>;
15
16pub(crate) const WANT_PENDING: usize = 1;
17pub(crate) const WANT_READY: usize = 2;
18
19#[must_use = "Sender does nothing unless sent on"]
33pub struct Sender {
34 pub(crate) want_rx: watch::Receiver,
35 pub(crate) data_tx: BodySender,
36 pub(crate) trailers_tx: Option<TrailersSender>,
37}
38
39impl Sender {
40 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
42 ready!(self.poll_want(cx)?);
44 self.data_tx
45 .poll_ready(cx)
46 .map_err(|_| Error::new(SenderError::ChannelClosed))
47 }
48
49 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
50 match self.want_rx.load(cx) {
51 WANT_READY => Poll::Ready(Ok(())),
52 WANT_PENDING => Poll::Pending,
53 watch::CLOSED => Poll::Ready(Err(Error::new(SenderError::ChannelClosed))),
54 unexpected => unreachable!("want_rx value: {}", unexpected),
55 }
56 }
57
58 async fn ready(&mut self) -> Result<(), Error> {
59 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
60 }
61
62 #[allow(unused)]
64 pub async fn send_data(&mut self, chunk: Bytes) -> Result<(), Error> {
65 self.ready().await?;
66 self.data_tx
67 .try_send(Ok(chunk))
68 .map_err(|_| Error::new(SenderError::ChannelClosed))
69 }
70
71 #[allow(unused)]
73 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
74 let tx = match self.trailers_tx.take() {
75 Some(tx) => tx,
76 None => return Err(Error::new(SenderError::ChannelClosed)),
77 };
78 tx.send(trailers).map_err(|_| Error::new(SenderError::ChannelClosed))
79 }
80
81 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
94 self.data_tx
95 .try_send(Ok(chunk))
96 .map_err(|err| err.into_inner().expect("just sent Ok"))
97 }
98
99 #[allow(unused)]
101 pub fn abort(mut self) {
102 self.send_error(Error::new(SenderError::BodyWriteAborted));
103 }
104
105 pub fn send_error(&mut self, err: Error) {
107 let _ = self
108 .data_tx
109 .clone()
111 .try_send(Err(err));
112 }
113}
114
115#[derive(Debug)]
116enum SenderError {
117 ChannelClosed,
118 BodyWriteAborted,
119}
120
121impl SenderError {
122 fn description(&self) -> &str {
123 match self {
124 SenderError::BodyWriteAborted => "user body write aborted",
125 SenderError::ChannelClosed => "channel closed",
126 }
127 }
128}
129
130impl std::fmt::Display for SenderError {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 f.write_str(self.description())
133 }
134}
135impl std::error::Error for SenderError {}