1use core::{
2 cell::{RefCell, RefMut},
3 future::poll_fn,
4 ops::DerefMut,
5 pin::Pin,
6 task::{Context, Poll, Waker},
7};
8
9use std::{collections::VecDeque, io, rc::Rc};
10
11use futures_core::stream::Stream;
12
13use crate::bytes::Bytes;
14
15pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
17
18#[derive(Clone, Debug)]
19enum RequestBodyInner {
20 Some(Rc<RefCell<Inner>>),
21 #[cfg(feature = "io-uring")]
22 Completion(super::dispatcher_uring::Body),
23 None,
24}
25
26impl RequestBodyInner {
27 fn new(eof: bool) -> Self {
28 match eof {
29 true => Self::None,
30 false => Self::Some(Default::default()),
31 }
32 }
33}
34
35#[derive(Debug)]
39pub struct RequestBody(RequestBodyInner);
40
41impl Default for RequestBody {
42 fn default() -> Self {
43 Self(RequestBodyInner::new(true))
44 }
45}
46
47impl RequestBody {
48 pub(super) fn channel(eof: bool) -> (RequestBodySender, Self) {
50 let inner = RequestBodyInner::new(eof);
51 (RequestBodySender(inner.clone()), RequestBody(inner))
52 }
53
54 #[cfg(feature = "io-uring")]
55 pub(super) fn io_uring(body: super::dispatcher_uring::Body) -> Self {
56 RequestBody(RequestBodyInner::Completion(body))
57 }
58}
59
60impl Stream for RequestBody {
61 type Item = io::Result<Bytes>;
62
63 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
64 match self.get_mut().0 {
65 RequestBodyInner::Some(ref mut inner) => inner.borrow_mut().poll_next_unpin(cx),
66 RequestBodyInner::None => Poll::Ready(None),
67 #[cfg(feature = "io-uring")]
68 RequestBodyInner::Completion(ref mut body) => Pin::new(body).poll_next(cx),
69 }
70 }
71}
72
73impl From<RequestBody> for crate::body::RequestBody {
74 fn from(body: RequestBody) -> Self {
75 Self::H1(body)
76 }
77}
78
79pub struct RequestBodySender(RequestBodyInner);
81
82impl Drop for RequestBodySender {
84 fn drop(&mut self) {
85 if let Some(mut inner) = self.try_inner() {
86 if !inner.eof {
87 inner.feed_error(io::ErrorKind::UnexpectedEof.into());
88 }
89 }
90 }
91}
92
93impl RequestBodySender {
94 fn try_inner(&mut self) -> Option<RefMut<'_, Inner>> {
96 self.try_inner_on_none_with(|| {})
97 }
98
99 fn try_inner_infallible(&mut self) -> Option<RefMut<'_, Inner>> {
103 self.try_inner_on_none_with(|| panic!("No Request Body found. Do not waste operation on Sender."))
104 }
105
106 fn try_inner_on_none_with<F>(&mut self, func: F) -> Option<RefMut<'_, Inner>>
107 where
108 F: FnOnce(),
109 {
110 match self.0 {
111 RequestBodyInner::Some(ref inner) => {
112 debug_assert!(Rc::strong_count(inner) <= 2);
114 debug_assert_eq!(Rc::weak_count(inner), 0);
115 (Rc::strong_count(inner) != 1).then_some(inner.borrow_mut())
116 }
117 _ => {
118 func();
119 None
120 }
121 }
122 }
123
124 pub(super) fn feed_error(&mut self, e: io::Error) {
125 if let Some(mut inner) = self.try_inner_infallible() {
126 inner.feed_error(e);
127 }
128 }
129
130 pub(super) fn feed_eof(&mut self) {
131 if let Some(mut inner) = self.try_inner_infallible() {
132 inner.feed_eof();
133 }
134 }
135
136 pub(super) fn feed_data(&mut self, data: Bytes) {
137 if let Some(mut inner) = self.try_inner_infallible() {
138 inner.feed_data(data);
139 }
140 }
141
142 pub(super) fn ready(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
143 self.ready_with(|inner| !inner.backpressure())
144 }
145
146 pub(super) fn wait_for_poll(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
150 self.ready_with(|inner| inner.waiting())
151 }
152
153 async fn ready_with<F>(&mut self, func: F) -> io::Result<()>
154 where
155 F: Fn(&mut Inner) -> bool,
156 {
157 poll_fn(|cx| {
158 match self.try_inner_infallible() {
160 Some(mut inner) => {
161 if func(inner.deref_mut()) {
162 Poll::Ready(Ok(()))
163 } else {
164 inner.register_io(cx);
166 Poll::Pending
167 }
168 }
169 None => Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())),
170 }
171 })
172 .await
173 }
174}
175
176#[derive(Debug, Default)]
177struct Inner {
178 eof: bool,
179 len: usize,
180 err: Option<io::Error>,
181 items: VecDeque<Bytes>,
182 task: Option<Waker>,
183 io_task: Option<Waker>,
184}
185
186impl Inner {
187 fn wake(&mut self) {
189 if let Some(waker) = self.task.take() {
190 waker.wake();
191 }
192 }
193
194 fn wake_io(&mut self) {
196 if let Some(waker) = self.io_task.take() {
197 waker.wake();
198 }
199 }
200
201 fn waiting(&self) -> bool {
203 self.task.is_some()
204 }
205
206 fn register(&mut self, cx: &Context<'_>) {
209 if self.task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
210 self.task = Some(cx.waker().clone());
211 }
212 }
213
214 fn register_io(&mut self, cx: &Context<'_>) {
217 if self.io_task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
218 self.io_task = Some(cx.waker().clone());
219 }
220 }
221
222 fn feed_error(&mut self, err: io::Error) {
223 self.err = Some(err);
224 self.wake();
225 }
226
227 fn feed_eof(&mut self) {
228 self.eof = true;
229 self.wake();
230 }
231
232 fn feed_data(&mut self, data: Bytes) {
233 self.len += data.len();
234 self.items.push_back(data);
235 self.wake();
236 }
237
238 fn backpressure(&self) -> bool {
239 self.len >= MAX_BUFFER_SIZE
240 }
241
242 fn poll_next_unpin(&mut self, cx: &Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
243 if let Some(data) = self.items.pop_front() {
244 self.len -= data.len();
245 Poll::Ready(Some(Ok(data)))
246 } else if let Some(err) = self.err.take() {
247 Poll::Ready(Some(Err(err)))
248 } else if self.eof {
249 Poll::Ready(None)
250 } else {
251 self.register(cx);
252 self.wake_io();
253 Poll::Pending
254 }
255 }
256}