micro_http/protocol/body/
req_body.rs1use crate::protocol::body::body_channel::{BodyReceiver, BodySender, create_body_sender_receiver};
2use crate::protocol::{Message, ParseError, PayloadSize, RequestHeader};
3use bytes::Bytes;
4use futures::Stream;
5use http_body::{Body, Frame, SizeHint};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9#[derive(Debug)]
10pub struct ReqBody {
11 inner: ReqBodyRepr,
12}
13
14#[derive(Debug)]
15pub(crate) enum ReqBodyRepr {
16 Receiver(BodyReceiver),
17 NoBody,
18}
19
20impl ReqBody {
21 pub(crate) fn create_req_body<S>(body_stream: &mut S, payload_size: PayloadSize) -> (ReqBody, Option<BodySender<'_, S>>)
22 where
23 S: Stream<Item = Result<Message<(RequestHeader, PayloadSize)>, ParseError>> + Unpin,
24 {
25 match payload_size {
26 PayloadSize::Empty | PayloadSize::Length(0) => (ReqBody::no_body(), None),
27 _ => {
28 let (sender, receiver) = create_body_sender_receiver(body_stream, payload_size);
29 (ReqBody::receiver(receiver), Some(sender))
30 }
31 }
32 }
33
34 pub(crate) fn no_body() -> Self {
35 Self { inner: ReqBodyRepr::NoBody }
36 }
37
38 pub(crate) fn receiver(receiver: BodyReceiver) -> Self {
39 Self { inner: ReqBodyRepr::Receiver(receiver) }
40 }
41}
42
43impl Body for ReqBody {
44 type Data = Bytes;
45 type Error = ParseError;
46
47 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
48 let this = self.get_mut();
49 match &mut this.inner {
50 ReqBodyRepr::Receiver(body_receiver) => Pin::new(body_receiver).poll_frame(cx),
51 ReqBodyRepr::NoBody => Poll::Ready(None),
52 }
53 }
54
55 fn is_end_stream(&self) -> bool {
56 match &self.inner {
57 ReqBodyRepr::NoBody => true,
58 ReqBodyRepr::Receiver(body_receiver) => body_receiver.is_end_stream(),
59 }
60 }
61
62 fn size_hint(&self) -> SizeHint {
63 match &self.inner {
64 ReqBodyRepr::NoBody => SizeHint::with_exact(0),
65 ReqBodyRepr::Receiver(body_receiver) => body_receiver.size_hint(),
66 }
67 }
68}