h3_util/
client_body.rs

1use h3::client::RequestStream;
2use hyper::body::{Body, Buf, Bytes};
3
4pub struct H3IncomingClient<S, B>
5where
6    B: Buf,
7    S: h3::quic::RecvStream,
8{
9    s: RequestStream<S, B>,
10    data_done: bool,
11}
12
13impl<S, B> H3IncomingClient<S, B>
14where
15    B: Buf,
16    S: h3::quic::RecvStream,
17{
18    pub fn new(s: RequestStream<S, B>) -> Self {
19        Self {
20            s,
21            data_done: false,
22        }
23    }
24}
25
26impl<S, B> hyper::body::Body for H3IncomingClient<S, B>
27where
28    B: Buf,
29    S: h3::quic::RecvStream,
30{
31    type Data = hyper::body::Bytes;
32
33    type Error = h3::error::StreamError;
34
35    fn poll_frame(
36        mut self: std::pin::Pin<&mut Self>,
37        cx: &mut std::task::Context<'_>,
38    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
39        if !self.data_done {
40            match futures::ready!(self.s.poll_recv_data(cx)) {
41                Ok(data_opt) => match data_opt {
42                    Some(mut data) => {
43                        let f = hyper::body::Frame::data(data.copy_to_bytes(data.remaining()));
44                        std::task::Poll::Ready(Some(Ok(f)))
45                    }
46                    None => {
47                        self.data_done = true;
48                        // try again for trailers
49                        cx.waker().wake_by_ref();
50                        std::task::Poll::Pending
51                    }
52                },
53                Err(e) => std::task::Poll::Ready(Some(Err(e))),
54            }
55        } else {
56            // TODO: need poll trailers api.
57            match futures::ready!(self.s.poll_recv_trailers(cx))? {
58                Some(tr) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::trailers(tr)))),
59                None => std::task::Poll::Ready(None),
60            }
61        }
62    }
63
64    fn is_end_stream(&self) -> bool {
65        false
66    }
67
68    fn size_hint(&self) -> hyper::body::SizeHint {
69        hyper::body::SizeHint::default()
70    }
71}
72
73pub async fn send_h3_client_body<S, B>(
74    w: &mut h3::client::RequestStream<<S as h3::quic::BidiStream<Bytes>>::SendStream, Bytes>,
75    bd: B,
76) -> Result<(), crate::Error>
77where
78    S: h3::quic::BidiStream<hyper::body::Bytes>,
79    B: Body + Send + 'static + Unpin,
80    B::Data: Send,
81    B::Error: Into<crate::Error>,
82{
83    let mut p_b = std::pin::pin!(bd);
84    // let mut sent_trailers = false;
85    while let Some(d) = futures::future::poll_fn(|cx| p_b.as_mut().poll_frame(cx)).await {
86        // send body
87        let d = d.map_err(|e| e.into())?;
88        if d.is_data() {
89            let mut d = d.into_data().ok().unwrap();
90            tracing::debug!("client write data");
91            // in most cases the copy is saved by using Bytes.
92            w.send_data(d.copy_to_bytes(d.remaining())).await?;
93        } else if d.is_trailers() {
94            let d = d.into_trailers().ok().unwrap();
95            tracing::debug!("client write trailer: {:?}", d);
96            w.send_trailers(d).await?;
97            //     sent_trailers = true;
98        }
99    }
100    // if !sent_trailers {
101    w.finish().await?;
102    // }
103    Ok(())
104}