http_body_io/
body_writer.rs

1use core::{
2    future::Future,
3    pin::{pin, Pin},
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use tokio::sync::mpsc::error::TrySendError;
9
10/// A writer for the body of an HTTP request or response.
11///
12/// This writer can be used to write the body of an HTTP request or response
13/// using the [`std::io::Write`] or [`tokio::io::AsyncWrite`] traits.
14///
15/// In order for the reader to stop reading, the writer must be dropped.
16pub struct BodyWriter {
17    pub(crate) sender: tokio::sync::mpsc::Sender<Bytes>,
18}
19
20impl std::fmt::Debug for BodyWriter {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("BodyWriter").finish()
23    }
24}
25
26impl std::io::Write for BodyWriter {
27    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
28        let mut bytes = Bytes::copy_from_slice(buf);
29        loop {
30            match self.sender.try_send(bytes) {
31                Ok(()) => return Ok(buf.len()),
32                Err(TrySendError::Full(bytes_ret)) => {
33                    bytes = bytes_ret;
34                    std::thread::yield_now();
35                }
36                Err(TrySendError::Closed(_)) => {
37                    return Err(std::io::Error::new(
38                        std::io::ErrorKind::BrokenPipe,
39                        "BodyWriter closed",
40                    ));
41                }
42            }
43        }
44    }
45    fn flush(&mut self) -> std::io::Result<()> {
46        Ok(())
47    }
48}
49
50impl tokio::io::AsyncWrite for BodyWriter {
51    fn poll_write(
52        self: Pin<&mut Self>,
53        cx: &mut Context<'_>,
54        buf: &[u8],
55    ) -> Poll<Result<usize, std::io::Error>> {
56        let mut this = pin!(self.sender.send(Bytes::copy_from_slice(buf)));
57        match this.as_mut().poll(cx) {
58            Poll::Pending => {
59                cx.waker().wake_by_ref();
60                Poll::Pending
61            }
62            Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.len())),
63            Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
64                std::io::ErrorKind::BrokenPipe,
65                "BodyWriter closed",
66            ))),
67        }
68    }
69    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
70        Poll::Ready(Ok(()))
71    }
72    fn poll_shutdown(
73        self: Pin<&mut Self>,
74        _cx: &mut Context<'_>,
75    ) -> Poll<Result<(), std::io::Error>> {
76        Poll::Ready(Ok(()))
77    }
78}