pdk_classy/hl/
request.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use std::cell::RefCell;
6use std::future::{ready, Ready};
7use std::rc::Rc;
8
9use crate::event::{EventKind, RequestBody, RequestHeaders};
10use crate::BoxFuture;
11
12use super::body::BodyExchange;
13use super::body_stream::{BodyStream, BodyStreamExchange};
14use super::dynamic_exchange::DynamicExchange;
15use super::entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState};
16use super::headers::HeadersExchange;
17
18#[cfg(feature = "experimental_enable_stop_iteration")]
19use super::{
20    entity::{HeadersBodyState, IntoHeadersBodyState},
21    headers_body::HeadersBodyExchange,
22};
23
24#[cfg(feature = "experimental")]
25use super::{BodyError, Chunk};
26use super::{BodyState, EntityState, HeadersState};
27
28/// Initial state of the Request state-machine.
29/// This type is intended to be injected as a parameter in Request filters.
30pub struct RequestState {
31    contains_body: bool,
32    exchange: Rc<RefCell<DynamicExchange>>,
33}
34
35/// Error that represents an invalid state in the Request state-machine.
36#[derive(thiserror::Error, Debug)]
37pub enum InvalidRequestState {
38    /// Unexpected low-level event.
39    #[error("Invalid request state: {0:?}")]
40    Event(EventKind),
41
42    /// Unexpected early response.
43    #[error("Early response")]
44    EarlyResponse,
45}
46
47impl RequestState {
48    #[allow(clippy::await_holding_refcell_ref)]
49    pub(crate) async fn new(
50        exchange: Rc<RefCell<DynamicExchange>>,
51    ) -> Result<Self, InvalidRequestState> {
52        let contains_body;
53        {
54            let mut exchange_ref = exchange.borrow_mut();
55            let Some(exchange) = exchange_ref.wait_for_event::<RequestHeaders>().await else {
56                let error = exchange
57                    .borrow()
58                    .current_event()
59                    .map(InvalidRequestState::Event)
60                    .unwrap_or(InvalidRequestState::EarlyResponse);
61                return Err(error);
62            };
63            contains_body = !exchange
64                .event_data()
65                .expect("Must contain headers")
66                .event
67                .end_of_stream;
68        }
69
70        Ok(Self {
71            contains_body,
72            exchange,
73        })
74    }
75}
76
77impl IntoBodyState for RequestState {
78    type BodyState = RequestBodyState;
79    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
80
81    fn into_body_state(self) -> Self::BodyStateFuture {
82        Box::pin(async {
83            RequestBodyState {
84                inner: BodyExchange::new(self.exchange, self.contains_body).await,
85            }
86        })
87    }
88}
89
90impl IntoBodyStreamState for RequestState {
91    type BodyStreamState = RequestBodyStreamState;
92
93    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
94
95    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
96        Box::pin(async {
97            RequestBodyStreamState {
98                inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
99            }
100        })
101    }
102}
103
104#[cfg(feature = "experimental_enable_stop_iteration")]
105impl IntoHeadersBodyState for RequestState {
106    type HeadersBodyState = RequestHeadersBodyState;
107    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
108
109    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
110        Box::pin(async {
111            RequestHeadersBodyState {
112                inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
113            }
114        })
115    }
116}
117
118impl EntityState for RequestState {
119    type HeadersState = RequestHeadersState;
120    type HeadersStateFuture = Ready<Self::HeadersState>;
121
122    fn into_headers_state(self) -> Self::HeadersStateFuture {
123        ready(RequestHeadersState {
124            inner: HeadersExchange::new(self.exchange, self.contains_body),
125        })
126    }
127}
128
129/// Headers state of the Request state-machine.
130pub struct RequestHeadersState {
131    inner: HeadersExchange<RequestHeaders, RequestBody>,
132}
133
134impl RequestHeadersState {
135    /// Returns the HTTP method of the current Request.
136    pub fn method(&self) -> String {
137        self.inner.with_event_data(|e| e.method())
138    }
139
140    /// Returns the HTTP scheme of the current Request.
141    pub fn scheme(&self) -> String {
142        self.inner.with_event_data(|e| e.scheme())
143    }
144
145    /// Returns the HTTP authority of the current Request.
146    pub fn authority(&self) -> String {
147        self.inner.with_event_data(|e| e.authority())
148    }
149
150    /// Returns the HTTP path of the current Request.
151    pub fn path(&self) -> String {
152        self.inner.with_event_data(|e| e.path())
153    }
154}
155
156impl IntoBodyState for RequestHeadersState {
157    type BodyState = RequestBodyState;
158
159    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
160
161    fn into_body_state(self) -> Self::BodyStateFuture {
162        Box::pin(async {
163            RequestBodyState {
164                inner: self.inner.into_body_state().await,
165            }
166        })
167    }
168}
169
170impl IntoBodyStreamState for RequestHeadersState {
171    type BodyStreamState = RequestBodyStreamState;
172
173    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
174
175    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
176        Box::pin(async {
177            RequestBodyStreamState {
178                inner: self.inner.into_body_stream_state().await,
179            }
180        })
181    }
182}
183
184#[cfg(feature = "experimental_enable_stop_iteration")]
185impl IntoHeadersBodyState for RequestHeadersState {
186    type HeadersBodyState = RequestHeadersBodyState;
187    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
188
189    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
190        Box::pin(async {
191            RequestHeadersBodyState {
192                inner: self.inner.into_headers_body_state().await,
193            }
194        })
195    }
196}
197
198impl HeadersState for RequestHeadersState {
199    fn handler(&self) -> &dyn super::HeadersHandler {
200        self.inner.handler()
201    }
202
203    fn contains_body(&self) -> bool {
204        self.inner.contains_body
205    }
206}
207
208/// Body state of the Request state-machine.
209pub struct RequestBodyState {
210    inner: BodyExchange<RequestBody>,
211}
212
213impl BodyState for RequestBodyState {
214    fn handler(&self) -> &dyn super::BodyHandler {
215        &self.inner
216    }
217
218    fn contains_body(&self) -> bool {
219        self.inner.contains_body()
220    }
221}
222
223pub struct RequestBodyStreamState {
224    inner: BodyStreamExchange<RequestBody>,
225}
226
227impl BodyStreamState for RequestBodyStreamState {
228    type Stream<'b>
229        = BodyStream<'b>
230    where
231        Self: 'b;
232
233    fn contains_body(&self) -> bool {
234        self.inner.contains_body()
235    }
236
237    fn stream(&self) -> Self::Stream<'_> {
238        self.inner.stream()
239    }
240
241    #[cfg(feature = "experimental")]
242    fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
243        self.inner.write_chunk(chunk)
244    }
245}
246
247#[cfg(feature = "experimental_enable_stop_iteration")]
248/// HeadersBody state of the Request state-machine.
249pub struct RequestHeadersBodyState {
250    inner: HeadersBodyExchange<RequestBody>,
251}
252
253#[cfg(feature = "experimental_enable_stop_iteration")]
254impl HeadersBodyState for RequestHeadersBodyState {
255    fn handler(&self) -> &dyn super::HeadersBodyHandler {
256        self.inner.handler()
257    }
258
259    fn contains_body(&self) -> bool {
260        self.inner.contains_body()
261    }
262}