micro_http/protocol/body/req_body.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, SinkExt, Stream, StreamExt};
use http_body::{Body, Frame};
use tracing::{error, info};
use crate::protocol::{Message, ParseError, PayloadItem, RequestHeader};
/// ReqBody implements an asynchronous streaming mechanism for HTTP request bodies.
///
/// # Design Goals
///
/// The main design goals of ReqBody are:
/// 1. Provide efficient streaming of request bodies without buffering entire payload in memory
/// 2. Bridge the gap between low-level payload streams and high-level http_body::Body interface
/// 3. Support concurrent processing of request handling and body streaming
/// 4. Allow proper cleanup of unread body data to maintain protocol correctness
///
/// # Architecture
///
/// ReqBody uses a channel-based architecture:
/// - `ReqBody`: Consumer side that implements http_body::Body
/// - `ReqBodySender`: Producer side that reads from raw payload stream
/// - They communicate through a mpsc channel and oneshot channels
///
/// # Example Flow
///
/// 1. HttpConnection creates ReqBody/ReqBodySender pair
/// 2. ReqBody is passed to request handler for body consumption
/// 3. ReqBodySender runs concurrently to stream payload chunks
/// 4. If handler doesn't read entire body, remaining data is skipped
pub struct ReqBody {
signal: mpsc::Sender<oneshot::Sender<PayloadItem>>,
receiving: Option<oneshot::Receiver<PayloadItem>>,
}
impl ReqBody {
/// Creates a new ReqBody with a channel for signaling payload requests.
///
/// The signal sender is used to request new payload chunks from the producer side.
fn new(signal: mpsc::Sender<oneshot::Sender<PayloadItem>>) -> Self {
Self { signal, receiving: None }
}
/// Creates a body streaming channel pair for processing HTTP request bodies.
///
/// This is the main entry point for setting up request body streaming. It creates
/// the necessary channels and returns both consumer and producer components.
///
/// The returned ReqBody implements http_body::Body and can be passed to request handlers,
/// while ReqBodySender handles reading from the underlying stream.
pub fn body_channel<S>(payload_stream: &mut S) -> (ReqBody, ReqBodySender<S>)
where
S: Stream + Unpin,
{
let (tx, receiver) = mpsc::channel(16);
let req_body = ReqBody::new(tx);
let body_sender = ReqBodySender { payload_stream, receiver, eof: false };
(req_body, body_sender)
}
}
/// ReqBodySender handles reading body chunks from the raw payload stream.
///
/// This component runs concurrently with request processing to ensure:
/// 1. Body chunks are available when the handler needs them
/// 2. All body data is properly drained from the connection
/// 3. Resources are cleaned up appropriately
///
/// The sender maintains an EOF flag to track when the complete body has been read,
/// which is crucial for proper connection handling.
pub struct ReqBodySender<'conn, S>
where
S: Stream + Unpin,
{
payload_stream: &'conn mut S,
receiver: mpsc::Receiver<oneshot::Sender<PayloadItem>>,
eof: bool,
}
impl<S> ReqBodySender<'_, S>
where
S: Stream<Item = Result<Message<RequestHeader>, ParseError>> + Unpin,
{
/// Streams body chunks from payload stream to ReqBody consumer.
///
/// This method runs in a loop, responding to chunk requests from the ReqBody
/// until either:
/// - The complete body is streamed (EOF)
/// - An error occurs during streaming
pub async fn send_body(&mut self) -> Result<(), ParseError> {
loop {
if self.eof {
return Ok(());
}
if let Some(sender) = self.receiver.next().await {
match self.payload_stream.next().await {
Some(Ok(Message::Payload(payload_item))) => {
if payload_item.is_eof() {
self.eof = true;
}
sender.send(payload_item).unwrap();
}
Some(Ok(Message::Header(_header))) => {
error!("received header from receive body phase");
return Err(ParseError::invalid_body("received header from receive body phase"));
}
Some(Err(e)) => {
return Err(e);
}
None => {
error!("cant read body");
return Err(ParseError::invalid_body("cant read body"));
}
}
}
}
}
/// Drains any remaining body chunks from the payload stream.
///
/// This is critical for maintaining HTTP protocol correctness when:
/// - The handler doesn't read the complete body
/// - The connection will be reused for future requests
///
/// It ensures the connection is in a clean state for the next request.
pub async fn skip_body(&mut self) {
if !self.eof {
let mut size: usize = 0;
while let Some(Ok(Message::Payload(payload_item))) = self.payload_stream.next().await {
if payload_item.is_eof() {
self.eof = true;
if size > 0 {
info!(size = size, "skip request body");
}
break;
}
if let Some(bytes) = payload_item.as_bytes() {
size += bytes.len();
}
}
}
}
}
/// Implements standard HTTP body interface for request bodies.
///
/// This implementation bridges our custom streaming mechanism with the standard
/// http_body::Body trait, allowing ReqBody to work seamlessly with HTTP
/// handlers and middleware that expect the standard interface.
impl Body for ReqBody {
type Data = Bytes;
type Error = ParseError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
loop {
if let Some(oneshot_receiver) = &mut self.receiving {
return match ready!(oneshot_receiver.poll_unpin(cx)) {
Ok(PayloadItem::Chunk(bytes)) => {
self.receiving.take();
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
Ok(PayloadItem::Eof) => {
self.receiving.take();
Poll::Ready(None)
}
Err(_) => {
self.receiving.take();
Poll::Ready(Some(Err(ParseError::invalid_body("parse body canceled"))))
}
};
}
match ready!(self.signal.poll_ready_unpin(cx)) {
Ok(_) => {
let (tx, rx) = oneshot::channel();
match self.signal.start_send(tx) {
Ok(_) => {
self.receiving = Some(rx);
continue;
}
Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
}
}
Err(e) => return Poll::Ready(Some(Err(ParseError::invalid_body(e)))),
};
}
}
}