1use std::cell::RefCell;
6use std::future::{ready, Ready};
7use std::rc::Rc;
8
9use super::body::BodyExchange;
10use super::body_stream::{BodyStream, BodyStreamExchange};
11use super::dynamic_exchange::DynamicExchange;
12use super::entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState};
13use super::headers::HeadersExchange;
14use crate::event::{EventKind, RequestBody, RequestHeaders};
15use crate::http_constants::{
16 DEFAULT_PATH, HEADER_AUTHORITY, HEADER_METHOD, HEADER_PATH, HEADER_SCHEME,
17};
18use crate::BoxFuture;
19
20#[cfg(feature = "experimental_enable_stop_iteration")]
21use super::{
22 entity::{HeadersBodyState, IntoHeadersBodyState},
23 headers_body::HeadersBodyExchange,
24};
25
26#[cfg(feature = "experimental")]
27use super::{BodyError, Chunk};
28use super::{BodyState, EntityState, HeadersState};
29
30pub struct RequestState {
33 contains_body: bool,
34 exchange: Rc<RefCell<DynamicExchange>>,
35}
36
37#[derive(thiserror::Error, Debug)]
39pub enum InvalidRequestState {
40 #[error("Invalid request state: {0:?}")]
42 Event(EventKind),
43
44 #[error("Early response")]
46 EarlyResponse,
47}
48
49impl RequestState {
50 #[allow(clippy::await_holding_refcell_ref)]
51 pub(crate) async fn new(
52 exchange: Rc<RefCell<DynamicExchange>>,
53 ) -> Result<Self, InvalidRequestState> {
54 let contains_body;
55 {
56 let mut exchange_ref = exchange.borrow_mut();
57 let Some(exchange) = exchange_ref.wait_for_event::<RequestHeaders>().await else {
58 let error = exchange
59 .borrow()
60 .current_event()
61 .map(InvalidRequestState::Event)
62 .unwrap_or(InvalidRequestState::EarlyResponse);
63 return Err(error);
64 };
65 contains_body = !exchange
66 .event_data()
67 .expect("Must contain headers")
68 .event
69 .end_of_stream;
70 }
71
72 Ok(Self {
73 contains_body,
74 exchange,
75 })
76 }
77}
78
79impl IntoBodyState for RequestState {
80 type BodyState = RequestBodyState;
81 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
82
83 fn into_body_state(self) -> Self::BodyStateFuture {
84 Box::pin(async {
85 RequestBodyState {
86 inner: BodyExchange::new(self.exchange, self.contains_body).await,
87 }
88 })
89 }
90}
91
92impl IntoBodyStreamState for RequestState {
93 type BodyStreamState = RequestBodyStreamState;
94
95 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
96
97 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
98 Box::pin(async {
99 RequestBodyStreamState {
100 inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
101 }
102 })
103 }
104}
105
106#[cfg(feature = "experimental_enable_stop_iteration")]
107impl IntoHeadersBodyState for RequestState {
108 type HeadersBodyState = RequestHeadersBodyState;
109 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
110
111 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
112 Box::pin(async {
113 RequestHeadersBodyState {
114 inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
115 }
116 })
117 }
118}
119
120impl EntityState for RequestState {
121 type HeadersState = RequestHeadersState;
122 type HeadersStateFuture = Ready<Self::HeadersState>;
123
124 fn into_headers_state(self) -> Self::HeadersStateFuture {
125 ready(RequestHeadersState {
126 inner: HeadersExchange::new(self.exchange, self.contains_body),
127 })
128 }
129}
130
131pub struct RequestHeadersState {
133 inner: HeadersExchange<RequestHeaders, RequestBody>,
134}
135
136impl RequestHeadersState {
137 pub fn method(&self) -> String {
138 self.handler().header(HEADER_METHOD).unwrap_or_default()
139 }
140
141 pub fn scheme(&self) -> String {
142 self.handler().header(HEADER_SCHEME).unwrap_or_default()
143 }
144
145 pub fn authority(&self) -> String {
146 self.handler().header(HEADER_AUTHORITY).unwrap_or_default()
147 }
148
149 pub fn path(&self) -> String {
150 self.handler()
151 .header(HEADER_PATH)
152 .unwrap_or_else(|| DEFAULT_PATH.to_string())
153 }
154}
155
156impl IntoBodyState for RequestHeadersState {
157 type BodyState = RequestBodyState;
158
159 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
160
161 fn into_body_state(self) -> Self::BodyStateFuture {
162 Box::pin(async {
163 RequestBodyState {
164 inner: self.inner.into_body_state().await,
165 }
166 })
167 }
168}
169
170impl IntoBodyStreamState for RequestHeadersState {
171 type BodyStreamState = RequestBodyStreamState;
172
173 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
174
175 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
176 Box::pin(async {
177 RequestBodyStreamState {
178 inner: self.inner.into_body_stream_state().await,
179 }
180 })
181 }
182}
183
184#[cfg(feature = "experimental_enable_stop_iteration")]
185impl IntoHeadersBodyState for RequestHeadersState {
186 type HeadersBodyState = RequestHeadersBodyState;
187 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
188
189 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
190 Box::pin(async {
191 RequestHeadersBodyState {
192 inner: self.inner.into_headers_body_state().await,
193 }
194 })
195 }
196}
197
198impl HeadersState for RequestHeadersState {
199 fn handler(&self) -> &dyn super::HeadersHandler {
200 self.inner.handler()
201 }
202
203 fn contains_body(&self) -> bool {
204 self.inner.contains_body
205 }
206}
207
208pub struct RequestBodyState {
210 inner: BodyExchange<RequestBody>,
211}
212
213impl BodyState for RequestBodyState {
214 fn handler(&self) -> &dyn super::BodyHandler {
215 &self.inner
216 }
217
218 fn contains_body(&self) -> bool {
219 self.inner.contains_body()
220 }
221}
222
223pub struct RequestBodyStreamState {
224 inner: BodyStreamExchange<RequestBody>,
225}
226
227impl BodyStreamState for RequestBodyStreamState {
228 type Stream<'b>
229 = BodyStream<'b>
230 where
231 Self: 'b;
232
233 fn contains_body(&self) -> bool {
234 self.inner.contains_body()
235 }
236
237 fn stream(&self) -> Self::Stream<'_> {
238 self.inner.stream()
239 }
240
241 #[cfg(feature = "experimental")]
242 fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
243 self.inner.write_chunk(chunk)
244 }
245}
246
247#[cfg(feature = "experimental_enable_stop_iteration")]
248pub struct RequestHeadersBodyState {
250 inner: HeadersBodyExchange<RequestBody, RequestHeaders>,
251}
252
253#[cfg(feature = "experimental_enable_stop_iteration")]
254impl HeadersBodyState for RequestHeadersBodyState {
255 fn handler(&self) -> &dyn super::HeadersBodyHandler {
256 self.inner.handler()
257 }
258
259 fn contains_body(&self) -> bool {
260 self.inner.contains_body()
261 }
262}