micro_http/protocol/body/
req_body.rs

1use std::pin::Pin;
2use std::task::{ready, Context, Poll};
3
4use bytes::Bytes;
5
6use futures::channel::{mpsc, oneshot};
7use futures::{FutureExt, SinkExt, Stream, StreamExt};
8
9use http_body::{Body, Frame};
10use tracing::{error, info};
11
12use crate::protocol::{Message, ParseError, PayloadItem, RequestHeader};
13
14/// ReqBody implements an asynchronous streaming mechanism for HTTP request bodies.
15///
16/// # Design Goals
17///
18/// The main design goals of ReqBody are:
19/// 1. Provide efficient streaming of request bodies without buffering entire payload in memory
20/// 2. Bridge the gap between low-level payload streams and high-level http_body::Body interface
21/// 3. Support concurrent processing of request handling and body streaming
22/// 4. Allow proper cleanup of unread body data to maintain protocol correctness
23///
24/// # Architecture
25///
26/// ReqBody uses a channel-based architecture:
27/// - `ReqBody`: Consumer side that implements http_body::Body
28/// - `ReqBodySender`: Producer side that reads from raw payload stream
29/// - They communicate through a mpsc channel and oneshot channels
30///
31/// # Example Flow
32///
33/// 1. HttpConnection creates ReqBody/ReqBodySender pair
34/// 2. ReqBody is passed to request handler for body consumption
35/// 3. ReqBodySender runs concurrently to stream payload chunks
36/// 4. If handler doesn't read entire body, remaining data is skipped
37pub struct ReqBody {
38    signal: mpsc::Sender<oneshot::Sender<PayloadItem>>,
39    receiving: Option<oneshot::Receiver<PayloadItem>>,
40}
41
42impl ReqBody {
43    /// Creates a new ReqBody with a channel for signaling payload requests.
44    ///
45    /// The signal sender is used to request new payload chunks from the producer side.
46    fn new(signal: mpsc::Sender<oneshot::Sender<PayloadItem>>) -> Self {
47        Self { signal, receiving: None }
48    }
49
50    /// Creates a body streaming channel pair for processing HTTP request bodies.
51    ///
52    /// This is the main entry point for setting up request body streaming. It creates
53    /// the necessary channels and returns both consumer and producer components.
54    ///
55    /// The returned ReqBody implements http_body::Body and can be passed to request handlers,
56    /// while ReqBodySender handles reading from the underlying stream.
57    pub fn body_channel<S>(payload_stream: &mut S) -> (ReqBody, ReqBodySender<S>)
58    where
59        S: Stream + Unpin,
60    {
61        let (tx, receiver) = mpsc::channel(16);
62
63        let req_body = ReqBody::new(tx);
64
65        let body_sender = ReqBodySender { payload_stream, receiver, eof: false };
66
67        (req_body, body_sender)
68    }
69}
70
71/// ReqBodySender handles reading body chunks from the raw payload stream.
72///
73/// This component runs concurrently with request processing to ensure:
74/// 1. Body chunks are available when the handler needs them
75/// 2. All body data is properly drained from the connection
76/// 3. Resources are cleaned up appropriately
77///
78/// The sender maintains an EOF flag to track when the complete body has been read,
79/// which is crucial for proper connection handling.
80pub struct ReqBodySender<'conn, S>
81where
82    S: Stream + Unpin,
83{
84    payload_stream: &'conn mut S,
85    receiver: mpsc::Receiver<oneshot::Sender<PayloadItem>>,
86    eof: bool,
87}
88
89impl<S> ReqBodySender<'_, S>
90where
91    S: Stream<Item = Result<Message<RequestHeader>, ParseError>> + Unpin,
92{
93    /// Streams body chunks from payload stream to ReqBody consumer.
94    ///
95    /// This method runs in a loop, responding to chunk requests from the ReqBody
96    /// until either:
97    /// - The complete body is streamed (EOF)
98    /// - An error occurs during streaming
99    pub async fn send_body(&mut self) -> Result<(), ParseError> {
100        loop {
101            if self.eof {
102                return Ok(());
103            }
104
105            if let Some(sender) = self.receiver.next().await {
106                match self.payload_stream.next().await {
107                    Some(Ok(Message::Payload(payload_item))) => {
108                        if payload_item.is_eof() {
109                            self.eof = true;
110                        }
111                        sender.send(payload_item).unwrap();
112                    }
113
114                    Some(Ok(Message::Header(_header))) => {
115                        error!("received header from receive body phase");
116                        return Err(ParseError::invalid_body("received header from receive body phase"));
117                    }
118
119                    Some(Err(e)) => {
120                        return Err(e);
121                    }
122
123                    None => {
124                        error!("cant read body");
125                        return Err(ParseError::invalid_body("cant read body"));
126                    }
127                }
128            }
129        }
130    }
131
132    /// Drains any remaining body chunks from the payload stream.
133    ///
134    /// This is critical for maintaining HTTP protocol correctness when:
135    /// - The handler doesn't read the complete body
136    /// - The connection will be reused for future requests
137    ///
138    /// It ensures the connection is in a clean state for the next request.
139    pub async fn skip_body(&mut self) {
140        if !self.eof {
141            let mut size: usize = 0;
142            while let Some(Ok(Message::Payload(payload_item))) = self.payload_stream.next().await {
143                if payload_item.is_eof() {
144                    self.eof = true;
145                    if size > 0 {
146                        info!(size = size, "skip request body");
147                    }
148                    break;
149                }
150
151                if let Some(bytes) = payload_item.as_bytes() {
152                    size += bytes.len();
153                }
154            }
155        }
156    }
157}
158
159/// Implements standard HTTP body interface for request bodies.
160///
161/// This implementation bridges our custom streaming mechanism with the standard
162/// http_body::Body trait, allowing ReqBody to work seamlessly with HTTP
163/// handlers and middleware that expect the standard interface.
164impl Body for ReqBody {
165    type Data = Bytes;
166    type Error = ParseError;
167
168    fn poll_frame(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
169        loop {
170            if let Some(oneshot_receiver) = &mut self.receiving {
171                return match ready!(oneshot_receiver.poll_unpin(cx)) {
172                    Ok(PayloadItem::Chunk(bytes)) => {
173                        self.receiving.take();
174                        Poll::Ready(Some(Ok(Frame::data(bytes))))
175                    }
176                    Ok(PayloadItem::Eof) => {
177                        self.receiving.take();
178                        Poll::Ready(None)
179                    }
180                    Err(_) => {
181                        self.receiving.take();
182                        Poll::Ready(Some(Err(ParseError::invalid_body("parse body canceled"))))
183                    }
184                };
185            }
186
187            match ready!(self.signal.poll_ready_unpin(cx)) {
188                Ok(_) => {
189                    let (tx, rx) = oneshot::channel();
190                    match self.signal.start_send(tx) {
191                        Ok(_) => {
192                            self.receiving = Some(rx);
193                            continue;
194                        }
195                        Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
196                    }
197                }
198                Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
199            };
200        }
201    }
202}