Skip to main content

pdk_classy/hl/
request.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use std::cell::RefCell;
6use std::future::{ready, Ready};
7use std::rc::Rc;
8
9use super::body::BodyExchange;
10use super::body_stream::{BodyStream, BodyStreamExchange};
11use super::dynamic_exchange::DynamicExchange;
12use super::entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState};
13use super::headers::HeadersExchange;
14use crate::event::{EventKind, RequestBody, RequestHeaders};
15use crate::http_constants::{
16    DEFAULT_PATH, HEADER_AUTHORITY, HEADER_METHOD, HEADER_PATH, HEADER_SCHEME,
17};
18use crate::BoxFuture;
19
20#[cfg(feature = "experimental_enable_stop_iteration")]
21use super::{
22    entity::{HeadersBodyState, IntoHeadersBodyState},
23    headers_body::HeadersBodyExchange,
24};
25
26#[cfg(feature = "experimental")]
27use super::{BodyError, Chunk};
28use super::{BodyState, EntityState, HeadersState};
29
30/// Initial state of the Request state-machine.
31/// This type is intended to be injected as a parameter in Request filters.
32pub struct RequestState {
33    contains_body: bool,
34    exchange: Rc<RefCell<DynamicExchange>>,
35}
36
37/// Error that represents an invalid state in the Request state-machine.
38#[derive(thiserror::Error, Debug)]
39pub enum InvalidRequestState {
40    /// Unexpected low-level event.
41    #[error("Invalid request state: {0:?}")]
42    Event(EventKind),
43
44    /// Unexpected early response.
45    #[error("Early response")]
46    EarlyResponse,
47}
48
49impl RequestState {
50    #[allow(clippy::await_holding_refcell_ref)]
51    pub(crate) async fn new(
52        exchange: Rc<RefCell<DynamicExchange>>,
53    ) -> Result<Self, InvalidRequestState> {
54        let contains_body;
55        {
56            let mut exchange_ref = exchange.borrow_mut();
57            let Some(exchange) = exchange_ref.wait_for_event::<RequestHeaders>().await else {
58                let error = exchange
59                    .borrow()
60                    .current_event()
61                    .map(InvalidRequestState::Event)
62                    .unwrap_or(InvalidRequestState::EarlyResponse);
63                return Err(error);
64            };
65            contains_body = !exchange
66                .event_data()
67                .expect("Must contain headers")
68                .event
69                .end_of_stream;
70        }
71
72        Ok(Self {
73            contains_body,
74            exchange,
75        })
76    }
77}
78
79impl IntoBodyState for RequestState {
80    type BodyState = RequestBodyState;
81    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
82
83    fn into_body_state(self) -> Self::BodyStateFuture {
84        Box::pin(async {
85            RequestBodyState {
86                inner: BodyExchange::new(self.exchange, self.contains_body).await,
87            }
88        })
89    }
90}
91
92impl IntoBodyStreamState for RequestState {
93    type BodyStreamState = RequestBodyStreamState;
94
95    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
96
97    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
98        Box::pin(async {
99            RequestBodyStreamState {
100                inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
101            }
102        })
103    }
104}
105
106#[cfg(feature = "experimental_enable_stop_iteration")]
107impl IntoHeadersBodyState for RequestState {
108    type HeadersBodyState = RequestHeadersBodyState;
109    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
110
111    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
112        Box::pin(async {
113            RequestHeadersBodyState {
114                inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
115            }
116        })
117    }
118}
119
120impl EntityState for RequestState {
121    type HeadersState = RequestHeadersState;
122    type HeadersStateFuture = Ready<Self::HeadersState>;
123
124    fn into_headers_state(self) -> Self::HeadersStateFuture {
125        ready(RequestHeadersState {
126            inner: HeadersExchange::new(self.exchange, self.contains_body),
127        })
128    }
129}
130
131/// Headers state of the Request state-machine.
132pub struct RequestHeadersState {
133    inner: HeadersExchange<RequestHeaders, RequestBody>,
134}
135
136impl RequestHeadersState {
137    pub fn method(&self) -> String {
138        self.handler().header(HEADER_METHOD).unwrap_or_default()
139    }
140
141    pub fn scheme(&self) -> String {
142        self.handler().header(HEADER_SCHEME).unwrap_or_default()
143    }
144
145    pub fn authority(&self) -> String {
146        self.handler().header(HEADER_AUTHORITY).unwrap_or_default()
147    }
148
149    pub fn path(&self) -> String {
150        self.handler()
151            .header(HEADER_PATH)
152            .unwrap_or_else(|| DEFAULT_PATH.to_string())
153    }
154}
155
156impl IntoBodyState for RequestHeadersState {
157    type BodyState = RequestBodyState;
158
159    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
160
161    fn into_body_state(self) -> Self::BodyStateFuture {
162        Box::pin(async {
163            RequestBodyState {
164                inner: self.inner.into_body_state().await,
165            }
166        })
167    }
168}
169
170impl IntoBodyStreamState for RequestHeadersState {
171    type BodyStreamState = RequestBodyStreamState;
172
173    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
174
175    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
176        Box::pin(async {
177            RequestBodyStreamState {
178                inner: self.inner.into_body_stream_state().await,
179            }
180        })
181    }
182}
183
184#[cfg(feature = "experimental_enable_stop_iteration")]
185impl IntoHeadersBodyState for RequestHeadersState {
186    type HeadersBodyState = RequestHeadersBodyState;
187    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
188
189    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
190        Box::pin(async {
191            RequestHeadersBodyState {
192                inner: self.inner.into_headers_body_state().await,
193            }
194        })
195    }
196}
197
198impl HeadersState for RequestHeadersState {
199    fn handler(&self) -> &dyn super::HeadersHandler {
200        self.inner.handler()
201    }
202
203    fn contains_body(&self) -> bool {
204        self.inner.contains_body
205    }
206}
207
208/// Body state of the Request state-machine.
209pub struct RequestBodyState {
210    inner: BodyExchange<RequestBody>,
211}
212
213impl BodyState for RequestBodyState {
214    fn handler(&self) -> &dyn super::BodyHandler {
215        &self.inner
216    }
217
218    fn contains_body(&self) -> bool {
219        self.inner.contains_body()
220    }
221}
222
223pub struct RequestBodyStreamState {
224    inner: BodyStreamExchange<RequestBody>,
225}
226
227impl BodyStreamState for RequestBodyStreamState {
228    type Stream<'b>
229        = BodyStream<'b>
230    where
231        Self: 'b;
232
233    fn contains_body(&self) -> bool {
234        self.inner.contains_body()
235    }
236
237    fn stream(&self) -> Self::Stream<'_> {
238        self.inner.stream()
239    }
240
241    #[cfg(feature = "experimental")]
242    fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
243        self.inner.write_chunk(chunk)
244    }
245}
246
247#[cfg(feature = "experimental_enable_stop_iteration")]
248/// HeadersBody state of the Request state-machine.
249pub struct RequestHeadersBodyState {
250    inner: HeadersBodyExchange<RequestBody, RequestHeaders>,
251}
252
253#[cfg(feature = "experimental_enable_stop_iteration")]
254impl HeadersBodyState for RequestHeadersBodyState {
255    fn handler(&self) -> &dyn super::HeadersBodyHandler {
256        self.inner.handler()
257    }
258
259    fn contains_body(&self) -> bool {
260        self.inner.contains_body()
261    }
262}