xitca_http/h1/
body.rs

1use core::{
2    cell::{RefCell, RefMut},
3    future::poll_fn,
4    ops::DerefMut,
5    pin::Pin,
6    task::{Context, Poll, Waker},
7};
8
9use std::{collections::VecDeque, io, rc::Rc};
10
11use futures_core::stream::Stream;
12
13use crate::bytes::Bytes;
14
15/// max buffer size 32k
16pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
17
18#[derive(Clone, Debug)]
19enum RequestBodyInner {
20    Some(Rc<RefCell<Inner>>),
21    #[cfg(feature = "io-uring")]
22    Completion(super::dispatcher_uring::Body),
23    None,
24}
25
26impl RequestBodyInner {
27    fn new(eof: bool) -> Self {
28        match eof {
29            true => Self::None,
30            false => Self::Some(Default::default()),
31        }
32    }
33}
34
35/// Buffered stream of request body chunk.
36///
37/// impl [Stream] trait to produce chunk as [Bytes] type in async manner.
38#[derive(Debug)]
39pub struct RequestBody(RequestBodyInner);
40
41impl Default for RequestBody {
42    fn default() -> Self {
43        Self(RequestBodyInner::new(true))
44    }
45}
46
47impl RequestBody {
48    // an async spsc channel where RequestBodySender used to push data and popped from RequestBody.
49    pub(super) fn channel(eof: bool) -> (RequestBodySender, Self) {
50        let inner = RequestBodyInner::new(eof);
51        (RequestBodySender(inner.clone()), RequestBody(inner))
52    }
53
54    #[cfg(feature = "io-uring")]
55    pub(super) fn io_uring(body: super::dispatcher_uring::Body) -> Self {
56        RequestBody(RequestBodyInner::Completion(body))
57    }
58}
59
60impl Stream for RequestBody {
61    type Item = io::Result<Bytes>;
62
63    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
64        match self.get_mut().0 {
65            RequestBodyInner::Some(ref mut inner) => inner.borrow_mut().poll_next_unpin(cx),
66            RequestBodyInner::None => Poll::Ready(None),
67            #[cfg(feature = "io-uring")]
68            RequestBodyInner::Completion(ref mut body) => Pin::new(body).poll_next(cx),
69        }
70    }
71}
72
73impl From<RequestBody> for crate::body::RequestBody {
74    fn from(body: RequestBody) -> Self {
75        Self::H1(body)
76    }
77}
78
79/// Sender part of the payload stream
80pub struct RequestBodySender(RequestBodyInner);
81
82// TODO: rework early eof error handling.
83impl Drop for RequestBodySender {
84    fn drop(&mut self) {
85        if let Some(mut inner) = self.try_inner() {
86            if !inner.eof {
87                inner.feed_error(io::ErrorKind::UnexpectedEof.into());
88            }
89        }
90    }
91}
92
93impl RequestBodySender {
94    // try to get a mutable reference of inner and ignore RequestBody::None variant.
95    fn try_inner(&mut self) -> Option<RefMut<'_, Inner>> {
96        self.try_inner_on_none_with(|| {})
97    }
98
99    // try to get a mutable reference of inner and panic on RequestBody::None variant.
100    // this is a runtime check for internal optimization to avoid unnecessary operations.
101    // public api must not be able to trigger this panic.
102    fn try_inner_infallible(&mut self) -> Option<RefMut<'_, Inner>> {
103        self.try_inner_on_none_with(|| panic!("No Request Body found. Do not waste operation on Sender."))
104    }
105
106    fn try_inner_on_none_with<F>(&mut self, func: F) -> Option<RefMut<'_, Inner>>
107    where
108        F: FnOnce(),
109    {
110        match self.0 {
111            RequestBodyInner::Some(ref inner) => {
112                // request body is a shared pointer between only two owners and no weak reference.
113                debug_assert!(Rc::strong_count(inner) <= 2);
114                debug_assert_eq!(Rc::weak_count(inner), 0);
115                (Rc::strong_count(inner) != 1).then_some(inner.borrow_mut())
116            }
117            _ => {
118                func();
119                None
120            }
121        }
122    }
123
124    pub(super) fn feed_error(&mut self, e: io::Error) {
125        if let Some(mut inner) = self.try_inner_infallible() {
126            inner.feed_error(e);
127        }
128    }
129
130    pub(super) fn feed_eof(&mut self) {
131        if let Some(mut inner) = self.try_inner_infallible() {
132            inner.feed_eof();
133        }
134    }
135
136    pub(super) fn feed_data(&mut self, data: Bytes) {
137        if let Some(mut inner) = self.try_inner_infallible() {
138            inner.feed_data(data);
139        }
140    }
141
142    pub(super) fn ready(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
143        self.ready_with(|inner| !inner.backpressure())
144    }
145
146    // Lazily wait until RequestBody is already polled.
147    // For specific use case body must not be eagerly polled.
148    // For example: Request with Expect: Continue header.
149    pub(super) fn wait_for_poll(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
150        self.ready_with(|inner| inner.waiting())
151    }
152
153    async fn ready_with<F>(&mut self, func: F) -> io::Result<()>
154    where
155        F: Fn(&mut Inner) -> bool,
156    {
157        poll_fn(|cx| {
158            // Check only if Payload (other side) is alive, Otherwise always return io error.
159            match self.try_inner_infallible() {
160                Some(mut inner) => {
161                    if func(inner.deref_mut()) {
162                        Poll::Ready(Ok(()))
163                    } else {
164                        // when payload is not ready register current task waker and wait.
165                        inner.register_io(cx);
166                        Poll::Pending
167                    }
168                }
169                None => Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())),
170            }
171        })
172        .await
173    }
174}
175
176#[derive(Debug, Default)]
177struct Inner {
178    eof: bool,
179    len: usize,
180    err: Option<io::Error>,
181    items: VecDeque<Bytes>,
182    task: Option<Waker>,
183    io_task: Option<Waker>,
184}
185
186impl Inner {
187    /// Wake up future waiting for payload data to be available.
188    fn wake(&mut self) {
189        if let Some(waker) = self.task.take() {
190            waker.wake();
191        }
192    }
193
194    /// Wake up future feeding data to Payload.
195    fn wake_io(&mut self) {
196        if let Some(waker) = self.io_task.take() {
197            waker.wake();
198        }
199    }
200
201    /// true when a future is waiting for payload data.
202    fn waiting(&self) -> bool {
203        self.task.is_some()
204    }
205
206    /// Register future waiting data from payload.
207    /// Waker would be used in `Inner::wake`
208    fn register(&mut self, cx: &Context<'_>) {
209        if self.task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
210            self.task = Some(cx.waker().clone());
211        }
212    }
213
214    // Register future feeding data to payload.
215    /// Waker would be used in `Inner::wake_io`
216    fn register_io(&mut self, cx: &Context<'_>) {
217        if self.io_task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
218            self.io_task = Some(cx.waker().clone());
219        }
220    }
221
222    fn feed_error(&mut self, err: io::Error) {
223        self.err = Some(err);
224        self.wake();
225    }
226
227    fn feed_eof(&mut self) {
228        self.eof = true;
229        self.wake();
230    }
231
232    fn feed_data(&mut self, data: Bytes) {
233        self.len += data.len();
234        self.items.push_back(data);
235        self.wake();
236    }
237
238    fn backpressure(&self) -> bool {
239        self.len >= MAX_BUFFER_SIZE
240    }
241
242    fn poll_next_unpin(&mut self, cx: &Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
243        if let Some(data) = self.items.pop_front() {
244            self.len -= data.len();
245            Poll::Ready(Some(Ok(data)))
246        } else if let Some(err) = self.err.take() {
247            Poll::Ready(Some(Err(err)))
248        } else if self.eof {
249            Poll::Ready(None)
250        } else {
251            self.register(cx);
252            self.wake_io();
253            Poll::Pending
254        }
255    }
256}