micro_http/protocol/body/req_body.rs
1use std::pin::Pin;
2use std::task::{ready, Context, Poll};
3
4use bytes::Bytes;
5
6use futures::channel::{mpsc, oneshot};
7use futures::{FutureExt, SinkExt, Stream, StreamExt};
8
9use http_body::{Body, Frame};
10use tracing::{error, info};
11
12use crate::protocol::{Message, ParseError, PayloadItem, RequestHeader};
13
14/// ReqBody implements an asynchronous streaming mechanism for HTTP request bodies.
15///
16/// # Design Goals
17///
18/// The main design goals of ReqBody are:
19/// 1. Provide efficient streaming of request bodies without buffering entire payload in memory
20/// 2. Bridge the gap between low-level payload streams and high-level http_body::Body interface
21/// 3. Support concurrent processing of request handling and body streaming
22/// 4. Allow proper cleanup of unread body data to maintain protocol correctness
23///
24/// # Architecture
25///
26/// ReqBody uses a channel-based architecture:
27/// - `ReqBody`: Consumer side that implements http_body::Body
28/// - `ReqBodySender`: Producer side that reads from raw payload stream
29/// - They communicate through a mpsc channel and oneshot channels
30///
31/// # Example Flow
32///
33/// 1. HttpConnection creates ReqBody/ReqBodySender pair
34/// 2. ReqBody is passed to request handler for body consumption
35/// 3. ReqBodySender runs concurrently to stream payload chunks
36/// 4. If handler doesn't read entire body, remaining data is skipped
37pub struct ReqBody {
38 signal: mpsc::Sender<oneshot::Sender<PayloadItem>>,
39 receiving: Option<oneshot::Receiver<PayloadItem>>,
40}
41
42impl ReqBody {
43 /// Creates a new ReqBody with a channel for signaling payload requests.
44 ///
45 /// The signal sender is used to request new payload chunks from the producer side.
46 fn new(signal: mpsc::Sender<oneshot::Sender<PayloadItem>>) -> Self {
47 Self { signal, receiving: None }
48 }
49
50 /// Creates a body streaming channel pair for processing HTTP request bodies.
51 ///
52 /// This is the main entry point for setting up request body streaming. It creates
53 /// the necessary channels and returns both consumer and producer components.
54 ///
55 /// The returned ReqBody implements http_body::Body and can be passed to request handlers,
56 /// while ReqBodySender handles reading from the underlying stream.
57 pub fn body_channel<S>(payload_stream: &mut S) -> (ReqBody, ReqBodySender<S>)
58 where
59 S: Stream + Unpin,
60 {
61 let (tx, receiver) = mpsc::channel(16);
62
63 let req_body = ReqBody::new(tx);
64
65 let body_sender = ReqBodySender { payload_stream, receiver, eof: false };
66
67 (req_body, body_sender)
68 }
69}
70
71/// ReqBodySender handles reading body chunks from the raw payload stream.
72///
73/// This component runs concurrently with request processing to ensure:
74/// 1. Body chunks are available when the handler needs them
75/// 2. All body data is properly drained from the connection
76/// 3. Resources are cleaned up appropriately
77///
78/// The sender maintains an EOF flag to track when the complete body has been read,
79/// which is crucial for proper connection handling.
80pub struct ReqBodySender<'conn, S>
81where
82 S: Stream + Unpin,
83{
84 payload_stream: &'conn mut S,
85 receiver: mpsc::Receiver<oneshot::Sender<PayloadItem>>,
86 eof: bool,
87}
88
89impl<S> ReqBodySender<'_, S>
90where
91 S: Stream<Item = Result<Message<RequestHeader>, ParseError>> + Unpin,
92{
93 /// Streams body chunks from payload stream to ReqBody consumer.
94 ///
95 /// This method runs in a loop, responding to chunk requests from the ReqBody
96 /// until either:
97 /// - The complete body is streamed (EOF)
98 /// - An error occurs during streaming
99 pub async fn send_body(&mut self) -> Result<(), ParseError> {
100 loop {
101 if self.eof {
102 return Ok(());
103 }
104
105 if let Some(sender) = self.receiver.next().await {
106 match self.payload_stream.next().await {
107 Some(Ok(Message::Payload(payload_item))) => {
108 if payload_item.is_eof() {
109 self.eof = true;
110 }
111 sender.send(payload_item).unwrap();
112 }
113
114 Some(Ok(Message::Header(_header))) => {
115 error!("received header from receive body phase");
116 return Err(ParseError::invalid_body("received header from receive body phase"));
117 }
118
119 Some(Err(e)) => {
120 return Err(e);
121 }
122
123 None => {
124 error!("cant read body");
125 return Err(ParseError::invalid_body("cant read body"));
126 }
127 }
128 }
129 }
130 }
131
132 /// Drains any remaining body chunks from the payload stream.
133 ///
134 /// This is critical for maintaining HTTP protocol correctness when:
135 /// - The handler doesn't read the complete body
136 /// - The connection will be reused for future requests
137 ///
138 /// It ensures the connection is in a clean state for the next request.
139 pub async fn skip_body(&mut self) {
140 if !self.eof {
141 let mut size: usize = 0;
142 while let Some(Ok(Message::Payload(payload_item))) = self.payload_stream.next().await {
143 if payload_item.is_eof() {
144 self.eof = true;
145 if size > 0 {
146 info!(size = size, "skip request body");
147 }
148 break;
149 }
150
151 if let Some(bytes) = payload_item.as_bytes() {
152 size += bytes.len();
153 }
154 }
155 }
156 }
157}
158
159/// Implements standard HTTP body interface for request bodies.
160///
161/// This implementation bridges our custom streaming mechanism with the standard
162/// http_body::Body trait, allowing ReqBody to work seamlessly with HTTP
163/// handlers and middleware that expect the standard interface.
164impl Body for ReqBody {
165 type Data = Bytes;
166 type Error = ParseError;
167
168 fn poll_frame(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
169 loop {
170 if let Some(oneshot_receiver) = &mut self.receiving {
171 return match ready!(oneshot_receiver.poll_unpin(cx)) {
172 Ok(PayloadItem::Chunk(bytes)) => {
173 self.receiving.take();
174 Poll::Ready(Some(Ok(Frame::data(bytes))))
175 }
176 Ok(PayloadItem::Eof) => {
177 self.receiving.take();
178 Poll::Ready(None)
179 }
180 Err(_) => {
181 self.receiving.take();
182 Poll::Ready(Some(Err(ParseError::invalid_body("parse body canceled"))))
183 }
184 };
185 }
186
187 match ready!(self.signal.poll_ready_unpin(cx)) {
188 Ok(_) => {
189 let (tx, rx) = oneshot::channel();
190 match self.signal.start_send(tx) {
191 Ok(_) => {
192 self.receiving = Some(rx);
193 continue;
194 }
195 Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
196 }
197 }
198 Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
199 };
200 }
201 }
202}