http_body_io/
body_writer.rs1use core::{
2 future::Future,
3 pin::{pin, Pin},
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use tokio::sync::mpsc::error::TrySendError;
9
10pub struct BodyWriter {
17 pub(crate) sender: tokio::sync::mpsc::Sender<Bytes>,
18}
19
20impl std::fmt::Debug for BodyWriter {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("BodyWriter").finish()
23 }
24}
25
26impl std::io::Write for BodyWriter {
27 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
28 let mut bytes = Bytes::copy_from_slice(buf);
29 loop {
30 match self.sender.try_send(bytes) {
31 Ok(()) => return Ok(buf.len()),
32 Err(TrySendError::Full(bytes_ret)) => {
33 bytes = bytes_ret;
34 std::thread::yield_now();
35 }
36 Err(TrySendError::Closed(_)) => {
37 return Err(std::io::Error::new(
38 std::io::ErrorKind::BrokenPipe,
39 "BodyWriter closed",
40 ));
41 }
42 }
43 }
44 }
45 fn flush(&mut self) -> std::io::Result<()> {
46 Ok(())
47 }
48}
49
50impl tokio::io::AsyncWrite for BodyWriter {
51 fn poll_write(
52 self: Pin<&mut Self>,
53 cx: &mut Context<'_>,
54 buf: &[u8],
55 ) -> Poll<Result<usize, std::io::Error>> {
56 let mut this = pin!(self.sender.send(Bytes::copy_from_slice(buf)));
57 match this.as_mut().poll(cx) {
58 Poll::Pending => {
59 cx.waker().wake_by_ref();
60 Poll::Pending
61 }
62 Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.len())),
63 Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
64 std::io::ErrorKind::BrokenPipe,
65 "BodyWriter closed",
66 ))),
67 }
68 }
69 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
70 Poll::Ready(Ok(()))
71 }
72 fn poll_shutdown(
73 self: Pin<&mut Self>,
74 _cx: &mut Context<'_>,
75 ) -> Poll<Result<(), std::io::Error>> {
76 Poll::Ready(Ok(()))
77 }
78}