1use std::cell::RefCell;
6use std::future::{ready, Ready};
7use std::rc::Rc;
8
9use crate::event::{EventKind, RequestBody, RequestHeaders};
10use crate::BoxFuture;
11
12use super::body::BodyExchange;
13use super::body_stream::{BodyStream, BodyStreamExchange};
14use super::dynamic_exchange::DynamicExchange;
15use super::entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState};
16use super::headers::HeadersExchange;
17
18#[cfg(feature = "experimental_enable_stop_iteration")]
19use super::{
20 entity::{HeadersBodyState, IntoHeadersBodyState},
21 headers_body::HeadersBodyExchange,
22};
23
24#[cfg(feature = "experimental")]
25use super::{BodyError, Chunk};
26use super::{BodyState, EntityState, HeadersState};
27
28pub struct RequestState {
31 contains_body: bool,
32 exchange: Rc<RefCell<DynamicExchange>>,
33}
34
35#[derive(thiserror::Error, Debug)]
37pub enum InvalidRequestState {
38 #[error("Invalid request state: {0:?}")]
40 Event(EventKind),
41
42 #[error("Early response")]
44 EarlyResponse,
45}
46
47impl RequestState {
48 #[allow(clippy::await_holding_refcell_ref)]
49 pub(crate) async fn new(
50 exchange: Rc<RefCell<DynamicExchange>>,
51 ) -> Result<Self, InvalidRequestState> {
52 let contains_body;
53 {
54 let mut exchange_ref = exchange.borrow_mut();
55 let Some(exchange) = exchange_ref.wait_for_event::<RequestHeaders>().await else {
56 let error = exchange
57 .borrow()
58 .current_event()
59 .map(InvalidRequestState::Event)
60 .unwrap_or(InvalidRequestState::EarlyResponse);
61 return Err(error);
62 };
63 contains_body = !exchange
64 .event_data()
65 .expect("Must contain headers")
66 .event
67 .end_of_stream;
68 }
69
70 Ok(Self {
71 contains_body,
72 exchange,
73 })
74 }
75}
76
77impl IntoBodyState for RequestState {
78 type BodyState = RequestBodyState;
79 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
80
81 fn into_body_state(self) -> Self::BodyStateFuture {
82 Box::pin(async {
83 RequestBodyState {
84 inner: BodyExchange::new(self.exchange, self.contains_body).await,
85 }
86 })
87 }
88}
89
90impl IntoBodyStreamState for RequestState {
91 type BodyStreamState = RequestBodyStreamState;
92
93 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
94
95 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
96 Box::pin(async {
97 RequestBodyStreamState {
98 inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
99 }
100 })
101 }
102}
103
104#[cfg(feature = "experimental_enable_stop_iteration")]
105impl IntoHeadersBodyState for RequestState {
106 type HeadersBodyState = RequestHeadersBodyState;
107 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
108
109 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
110 Box::pin(async {
111 RequestHeadersBodyState {
112 inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
113 }
114 })
115 }
116}
117
118impl EntityState for RequestState {
119 type HeadersState = RequestHeadersState;
120 type HeadersStateFuture = Ready<Self::HeadersState>;
121
122 fn into_headers_state(self) -> Self::HeadersStateFuture {
123 ready(RequestHeadersState {
124 inner: HeadersExchange::new(self.exchange, self.contains_body),
125 })
126 }
127}
128
129pub struct RequestHeadersState {
131 inner: HeadersExchange<RequestHeaders, RequestBody>,
132}
133
134impl RequestHeadersState {
135 pub fn method(&self) -> String {
137 self.inner.with_event_data(|e| e.method())
138 }
139
140 pub fn scheme(&self) -> String {
142 self.inner.with_event_data(|e| e.scheme())
143 }
144
145 pub fn authority(&self) -> String {
147 self.inner.with_event_data(|e| e.authority())
148 }
149
150 pub fn path(&self) -> String {
152 self.inner.with_event_data(|e| e.path())
153 }
154}
155
156impl IntoBodyState for RequestHeadersState {
157 type BodyState = RequestBodyState;
158
159 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
160
161 fn into_body_state(self) -> Self::BodyStateFuture {
162 Box::pin(async {
163 RequestBodyState {
164 inner: self.inner.into_body_state().await,
165 }
166 })
167 }
168}
169
170impl IntoBodyStreamState for RequestHeadersState {
171 type BodyStreamState = RequestBodyStreamState;
172
173 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
174
175 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
176 Box::pin(async {
177 RequestBodyStreamState {
178 inner: self.inner.into_body_stream_state().await,
179 }
180 })
181 }
182}
183
184#[cfg(feature = "experimental_enable_stop_iteration")]
185impl IntoHeadersBodyState for RequestHeadersState {
186 type HeadersBodyState = RequestHeadersBodyState;
187 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
188
189 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
190 Box::pin(async {
191 RequestHeadersBodyState {
192 inner: self.inner.into_headers_body_state().await,
193 }
194 })
195 }
196}
197
198impl HeadersState for RequestHeadersState {
199 fn handler(&self) -> &dyn super::HeadersHandler {
200 self.inner.handler()
201 }
202
203 fn contains_body(&self) -> bool {
204 self.inner.contains_body
205 }
206}
207
208pub struct RequestBodyState {
210 inner: BodyExchange<RequestBody>,
211}
212
213impl BodyState for RequestBodyState {
214 fn handler(&self) -> &dyn super::BodyHandler {
215 &self.inner
216 }
217
218 fn contains_body(&self) -> bool {
219 self.inner.contains_body()
220 }
221}
222
223pub struct RequestBodyStreamState {
224 inner: BodyStreamExchange<RequestBody>,
225}
226
227impl BodyStreamState for RequestBodyStreamState {
228 type Stream<'b>
229 = BodyStream<'b>
230 where
231 Self: 'b;
232
233 fn contains_body(&self) -> bool {
234 self.inner.contains_body()
235 }
236
237 fn stream(&self) -> Self::Stream<'_> {
238 self.inner.stream()
239 }
240
241 #[cfg(feature = "experimental")]
242 fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
243 self.inner.write_chunk(chunk)
244 }
245}
246
247#[cfg(feature = "experimental_enable_stop_iteration")]
248pub struct RequestHeadersBodyState {
250 inner: HeadersBodyExchange<RequestBody>,
251}
252
253#[cfg(feature = "experimental_enable_stop_iteration")]
254impl HeadersBodyState for RequestHeadersBodyState {
255 fn handler(&self) -> &dyn super::HeadersBodyHandler {
256 self.inner.handler()
257 }
258
259 fn contains_body(&self) -> bool {
260 self.inner.contains_body()
261 }
262}