1use h3::client::RequestStream;
2use hyper::body::{Body, Buf, Bytes};
3
4pub struct H3IncomingClient<S, B>
5where
6 B: Buf,
7 S: h3::quic::RecvStream,
8{
9 s: RequestStream<S, B>,
10 data_done: bool,
11}
12
13impl<S, B> H3IncomingClient<S, B>
14where
15 B: Buf,
16 S: h3::quic::RecvStream,
17{
18 pub fn new(s: RequestStream<S, B>) -> Self {
19 Self {
20 s,
21 data_done: false,
22 }
23 }
24}
25
26impl<S, B> hyper::body::Body for H3IncomingClient<S, B>
27where
28 B: Buf,
29 S: h3::quic::RecvStream,
30{
31 type Data = hyper::body::Bytes;
32
33 type Error = h3::error::StreamError;
34
35 fn poll_frame(
36 mut self: std::pin::Pin<&mut Self>,
37 cx: &mut std::task::Context<'_>,
38 ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
39 if !self.data_done {
40 match futures::ready!(self.s.poll_recv_data(cx)) {
41 Ok(data_opt) => match data_opt {
42 Some(mut data) => {
43 let f = hyper::body::Frame::data(data.copy_to_bytes(data.remaining()));
44 std::task::Poll::Ready(Some(Ok(f)))
45 }
46 None => {
47 self.data_done = true;
48 cx.waker().wake_by_ref();
50 std::task::Poll::Pending
51 }
52 },
53 Err(e) => std::task::Poll::Ready(Some(Err(e))),
54 }
55 } else {
56 match futures::ready!(self.s.poll_recv_trailers(cx))? {
58 Some(tr) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::trailers(tr)))),
59 None => std::task::Poll::Ready(None),
60 }
61 }
62 }
63
64 fn is_end_stream(&self) -> bool {
65 false
66 }
67
68 fn size_hint(&self) -> hyper::body::SizeHint {
69 hyper::body::SizeHint::default()
70 }
71}
72
73pub async fn send_h3_client_body<S, B>(
74 w: &mut h3::client::RequestStream<<S as h3::quic::BidiStream<Bytes>>::SendStream, Bytes>,
75 bd: B,
76) -> Result<(), crate::Error>
77where
78 S: h3::quic::BidiStream<hyper::body::Bytes>,
79 B: Body + Send + 'static + Unpin,
80 B::Data: Send,
81 B::Error: Into<crate::Error>,
82{
83 let mut p_b = std::pin::pin!(bd);
84 while let Some(d) = futures::future::poll_fn(|cx| p_b.as_mut().poll_frame(cx)).await {
86 let d = d.map_err(|e| e.into())?;
88 if d.is_data() {
89 let mut d = d.into_data().ok().unwrap();
90 tracing::debug!("client write data");
91 w.send_data(d.copy_to_bytes(d.remaining())).await?;
93 } else if d.is_trailers() {
94 let d = d.into_trailers().ok().unwrap();
95 tracing::debug!("client write trailer: {:?}", d);
96 w.send_trailers(d).await?;
97 }
99 }
100 w.finish().await?;
102 Ok(())
104}