h3_util/
server_body.rs

1use h3::server::RequestStream;
2use hyper::body::{Body, Buf, Bytes};
3
4pub struct H3IncomingServer<S, B>
5where
6    B: Buf,
7    S: h3::quic::RecvStream,
8{
9    s: RequestStream<S, B>,
10    data_done: bool,
11}
12
13impl<S, B> H3IncomingServer<S, B>
14where
15    B: Buf,
16    S: h3::quic::RecvStream,
17{
18    pub fn new(s: RequestStream<S, B>) -> Self {
19        Self {
20            s,
21            data_done: false,
22        }
23    }
24}
25
26impl<S, B> hyper::body::Body for H3IncomingServer<S, B>
27where
28    B: Buf,
29    S: h3::quic::RecvStream,
30{
31    type Data = hyper::body::Bytes;
32
33    type Error = h3::error::StreamError;
34
35    fn poll_frame(
36        mut self: std::pin::Pin<&mut Self>,
37        cx: &mut std::task::Context<'_>,
38    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
39        if !self.data_done {
40            tracing::debug!("server incomming poll_frame recv_data");
41            match futures::ready!(self.s.poll_recv_data(cx)) {
42                Ok(data_opt) => match data_opt {
43                    Some(mut data) => {
44                        let f = hyper::body::Frame::data(data.copy_to_bytes(data.remaining()));
45                        std::task::Poll::Ready(Some(Ok(f)))
46                    }
47                    None => {
48                        self.data_done = true;
49                        // try again to get trailers
50                        cx.waker().wake_by_ref();
51                        std::task::Poll::Pending
52                    }
53                },
54                Err(e) => std::task::Poll::Ready(Some(Err(e))),
55            }
56        } else {
57            tracing::debug!("server incomming poll_frame recv_trailers");
58            match futures::ready!(self.s.poll_recv_trailers(cx))? {
59                Some(tr) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::trailers(tr)))),
60                None => std::task::Poll::Ready(None),
61            }
62        }
63    }
64
65    fn is_end_stream(&self) -> bool {
66        false
67    }
68
69    fn size_hint(&self) -> hyper::body::SizeHint {
70        hyper::body::SizeHint::default()
71    }
72}
73
74pub async fn send_h3_server_body<BD, S>(
75    w: &mut h3::server::RequestStream<<S as h3::quic::BidiStream<Bytes>>::SendStream, Bytes>,
76    bd: BD,
77) -> Result<(), crate::Error>
78where
79    BD: Body + 'static,
80    BD::Error: Into<crate::Error>,
81    <BD as Body>::Error: Into<crate::Error> + std::error::Error + Send + Sync,
82    <BD as Body>::Data: Send + Sync,
83    S: h3::quic::BidiStream<hyper::body::Bytes>,
84{
85    let mut p_b = std::pin::pin!(bd);
86    while let Some(d) = futures::future::poll_fn(|cx| p_b.as_mut().poll_frame(cx)).await {
87        // send body
88        let d = d.map_err(crate::Error::from)?;
89        if d.is_data() {
90            let mut d = d.into_data().ok().unwrap();
91            tracing::debug!("serving request write data");
92            // Bytes optimizes the shallow copy.
93            w.send_data(d.copy_to_bytes(d.remaining())).await?;
94        } else if d.is_trailers() {
95            let d = d.into_trailers().ok().unwrap();
96            tracing::debug!("serving request write trailer: {:?}", d);
97            w.send_trailers(d).await?;
98        }
99    }
100    // Close the stream gracefully.
101    // This is technically only needed when not writing trailers.
102    // But msquic-h3 requires stream be gracefully closed all the time.
103    w.finish().await?;
104    Ok(())
105}