1use h3::server::RequestStream;
2use hyper::body::{Body, Buf, Bytes};
3
4pub struct H3IncomingServer<S, B>
5where
6 B: Buf,
7 S: h3::quic::RecvStream,
8{
9 s: RequestStream<S, B>,
10 data_done: bool,
11}
12
13impl<S, B> H3IncomingServer<S, B>
14where
15 B: Buf,
16 S: h3::quic::RecvStream,
17{
18 pub fn new(s: RequestStream<S, B>) -> Self {
19 Self {
20 s,
21 data_done: false,
22 }
23 }
24}
25
26impl<S, B> hyper::body::Body for H3IncomingServer<S, B>
27where
28 B: Buf,
29 S: h3::quic::RecvStream,
30{
31 type Data = hyper::body::Bytes;
32
33 type Error = h3::error::StreamError;
34
35 fn poll_frame(
36 mut self: std::pin::Pin<&mut Self>,
37 cx: &mut std::task::Context<'_>,
38 ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
39 if !self.data_done {
40 tracing::debug!("server incomming poll_frame recv_data");
41 match futures::ready!(self.s.poll_recv_data(cx)) {
42 Ok(data_opt) => match data_opt {
43 Some(mut data) => {
44 let f = hyper::body::Frame::data(data.copy_to_bytes(data.remaining()));
45 std::task::Poll::Ready(Some(Ok(f)))
46 }
47 None => {
48 self.data_done = true;
49 cx.waker().wake_by_ref();
51 std::task::Poll::Pending
52 }
53 },
54 Err(e) => std::task::Poll::Ready(Some(Err(e))),
55 }
56 } else {
57 tracing::debug!("server incomming poll_frame recv_trailers");
58 match futures::ready!(self.s.poll_recv_trailers(cx))? {
59 Some(tr) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::trailers(tr)))),
60 None => std::task::Poll::Ready(None),
61 }
62 }
63 }
64
65 fn is_end_stream(&self) -> bool {
66 false
67 }
68
69 fn size_hint(&self) -> hyper::body::SizeHint {
70 hyper::body::SizeHint::default()
71 }
72}
73
74pub async fn send_h3_server_body<BD, S>(
75 w: &mut h3::server::RequestStream<<S as h3::quic::BidiStream<Bytes>>::SendStream, Bytes>,
76 bd: BD,
77) -> Result<(), crate::Error>
78where
79 BD: Body + 'static,
80 BD::Error: Into<crate::Error>,
81 <BD as Body>::Error: Into<crate::Error> + std::error::Error + Send + Sync,
82 <BD as Body>::Data: Send + Sync,
83 S: h3::quic::BidiStream<hyper::body::Bytes>,
84{
85 let mut p_b = std::pin::pin!(bd);
86 while let Some(d) = futures::future::poll_fn(|cx| p_b.as_mut().poll_frame(cx)).await {
87 let d = d.map_err(crate::Error::from)?;
89 if d.is_data() {
90 let mut d = d.into_data().ok().unwrap();
91 tracing::debug!("serving request write data");
92 w.send_data(d.copy_to_bytes(d.remaining())).await?;
94 } else if d.is_trailers() {
95 let d = d.into_trailers().ok().unwrap();
96 tracing::debug!("serving request write trailer: {:?}", d);
97 w.send_trailers(d).await?;
98 }
99 }
100 w.finish().await?;
104 Ok(())
105}