micro_http/protocol/body/
req_body.rs1use crate::protocol::body::body_channel::{BodyReceiver, BodySender, create_body_sender_receiver};
2use crate::protocol::{Message, ParseError, PayloadSize, RequestHeader};
3use bytes::Bytes;
4use futures::Stream;
5use http_body::{Body, Frame, SizeHint};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pub struct ReqBody {
10 inner: ReqBodyRepr,
11}
12pub(crate) enum ReqBodyRepr {
13 Receiver(BodyReceiver),
14 NoBody,
15}
16
17impl ReqBody {
18 pub(crate) fn create_req_body<S>(body_stream: &mut S, payload_size: PayloadSize) -> (ReqBody, Option<BodySender<S>>)
19 where
20 S: Stream<Item = Result<Message<(RequestHeader, PayloadSize)>, ParseError>> + Unpin,
21 {
22 match payload_size {
23 PayloadSize::Empty | PayloadSize::Length(0) => (ReqBody::no_body(), None),
24 _ => {
25 let (sender, receiver) = create_body_sender_receiver(body_stream, payload_size);
26 (ReqBody::receiver(receiver), Some(sender))
27 }
28 }
29 }
30
31 pub(crate) fn no_body() -> Self {
32 Self { inner: ReqBodyRepr::NoBody }
33 }
34
35 pub(crate) fn receiver(receiver: BodyReceiver) -> Self {
36 Self { inner: ReqBodyRepr::Receiver(receiver) }
37 }
38}
39
40impl Body for ReqBody {
41 type Data = Bytes;
42 type Error = ParseError;
43
44 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
45 let this = self.get_mut();
46 match &mut this.inner {
47 ReqBodyRepr::Receiver(body_receiver) => Pin::new(body_receiver).poll_frame(cx),
48 ReqBodyRepr::NoBody => Poll::Ready(None),
49 }
50 }
51
52 fn is_end_stream(&self) -> bool {
53 match &self.inner {
54 ReqBodyRepr::NoBody => true,
55 ReqBodyRepr::Receiver(body_receiver) => body_receiver.is_end_stream(),
56 }
57 }
58
59 fn size_hint(&self) -> SizeHint {
60 match &self.inner {
61 ReqBodyRepr::NoBody => SizeHint::with_exact(0),
62 ReqBodyRepr::Receiver(body_receiver) => body_receiver.size_hint(),
63 }
64 }
65}