pdk_classy/hl/
response.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use std::{
6    cell::RefCell,
7    future::{ready, Ready},
8    rc::Rc,
9};
10
11use crate::{
12    event::{EventKind, ResponseBody, ResponseHeaders},
13    BoxFuture,
14};
15
16use super::{
17    body::BodyExchange,
18    body_stream::{BodyStream, BodyStreamExchange},
19    dynamic_exchange::DynamicExchange,
20    entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState},
21    headers::HeadersExchange,
22    BodyState, EntityState, HeadersState,
23};
24#[cfg(feature = "experimental_enable_stop_iteration")]
25use super::{
26    entity::{HeadersBodyState, IntoHeadersBodyState},
27    headers_body::HeadersBodyExchange,
28};
29#[cfg(feature = "experimental")]
30use super::{BodyError, Chunk};
31
32/// Initial state of the Response state-machine.
33/// This type is intended to be injected as a parameter in Response filters.
34pub struct ResponseState {
35    contains_body: bool,
36    exchange: Rc<RefCell<DynamicExchange>>,
37}
38
39/// Error that represents an invalid state in the Response state-machine.
40#[derive(thiserror::Error, Debug)]
41pub enum InvalidResponseState {
42    /// Unexpected low-level event.
43    #[error("Invalid response state: {0:?}")]
44    Event(EventKind),
45
46    /// Unexpected early response.
47    #[error("Early response")]
48    EarlyResponse,
49}
50
51impl ResponseState {
52    #[allow(clippy::await_holding_refcell_ref)]
53    pub(crate) async fn new(
54        exchange: Rc<RefCell<DynamicExchange>>,
55    ) -> Result<Self, InvalidResponseState> {
56        let contains_body;
57        {
58            let mut e = exchange.borrow_mut();
59            let Some(exchange) = e.wait_for_event::<ResponseHeaders>().await else {
60                let error = exchange
61                    .borrow()
62                    .current_event()
63                    .map(InvalidResponseState::Event)
64                    .unwrap_or(InvalidResponseState::EarlyResponse);
65                return Err(error);
66            };
67            contains_body = !exchange
68                .event_data()
69                .expect("Must contain headers")
70                .event
71                .end_of_stream;
72        }
73
74        Ok(Self {
75            contains_body,
76            exchange,
77        })
78    }
79}
80
81impl IntoBodyState for ResponseState {
82    type BodyState = ResponseBodyState;
83    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
84
85    fn into_body_state(self) -> Self::BodyStateFuture {
86        Box::pin(async {
87            ResponseBodyState {
88                inner: BodyExchange::new(self.exchange, self.contains_body).await,
89            }
90        })
91    }
92}
93
94impl IntoBodyStreamState for ResponseState {
95    type BodyStreamState = ResponseBodyStreamState;
96
97    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
98
99    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
100        Box::pin(async {
101            ResponseBodyStreamState {
102                inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
103            }
104        })
105    }
106}
107
108#[cfg(feature = "experimental_enable_stop_iteration")]
109impl IntoHeadersBodyState for ResponseState {
110    type HeadersBodyState = ResponseHeadersBodyState;
111    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
112
113    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
114        Box::pin(async {
115            ResponseHeadersBodyState {
116                inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
117            }
118        })
119    }
120}
121
122impl EntityState for ResponseState {
123    type HeadersState = ResponseHeadersState;
124    type HeadersStateFuture = Ready<Self::HeadersState>;
125
126    fn into_headers_state(self) -> Self::HeadersStateFuture {
127        ready(ResponseHeadersState {
128            inner: HeadersExchange::new(self.exchange, self.contains_body),
129        })
130    }
131}
132
133/// Headers state of the Response state-machine.
134pub struct ResponseHeadersState {
135    inner: HeadersExchange<ResponseHeaders, ResponseBody>,
136}
137
138impl ResponseHeadersState {
139    pub fn status_code(&self) -> u32 {
140        self.inner.with_event_data(|e| e.status_code())
141    }
142}
143
144impl IntoBodyState for ResponseHeadersState {
145    type BodyState = ResponseBodyState;
146
147    type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
148
149    fn into_body_state(self) -> Self::BodyStateFuture {
150        Box::pin(async {
151            ResponseBodyState {
152                inner: self.inner.into_body_state().await,
153            }
154        })
155    }
156}
157
158impl IntoBodyStreamState for ResponseHeadersState {
159    type BodyStreamState = ResponseBodyStreamState;
160
161    type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
162
163    fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
164        Box::pin(async {
165            ResponseBodyStreamState {
166                inner: self.inner.into_body_stream_state().await,
167            }
168        })
169    }
170}
171
172#[cfg(feature = "experimental_enable_stop_iteration")]
173impl IntoHeadersBodyState for ResponseHeadersState {
174    type HeadersBodyState = ResponseHeadersBodyState;
175    type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
176
177    fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
178        Box::pin(async {
179            ResponseHeadersBodyState {
180                inner: self.inner.into_headers_body_state().await,
181            }
182        })
183    }
184}
185
186impl HeadersState for ResponseHeadersState {
187    fn handler(&self) -> &dyn super::HeadersHandler {
188        self.inner.handler()
189    }
190
191    fn contains_body(&self) -> bool {
192        self.inner.contains_body
193    }
194}
195
196/// Body state of the Response state-machine.
197pub struct ResponseBodyState {
198    inner: BodyExchange<ResponseBody>,
199}
200
201impl BodyState for ResponseBodyState {
202    fn handler(&self) -> &dyn super::BodyHandler {
203        &self.inner
204    }
205
206    fn contains_body(&self) -> bool {
207        self.inner.contains_body()
208    }
209}
210
211pub struct ResponseBodyStreamState {
212    inner: BodyStreamExchange<ResponseBody>,
213}
214
215impl BodyStreamState for ResponseBodyStreamState {
216    type Stream<'b>
217        = BodyStream<'b>
218    where
219        Self: 'b;
220
221    fn contains_body(&self) -> bool {
222        self.inner.contains_body()
223    }
224
225    fn stream(&self) -> Self::Stream<'_> {
226        self.inner.stream()
227    }
228
229    #[cfg(feature = "experimental")]
230    fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
231        self.inner.write_chunk(chunk)
232    }
233}
234
235#[cfg(feature = "experimental_enable_stop_iteration")]
236/// HeadersBody state of the Response state-machine.
237pub struct ResponseHeadersBodyState {
238    inner: HeadersBodyExchange<ResponseBody>,
239}
240
241#[cfg(feature = "experimental_enable_stop_iteration")]
242impl HeadersBodyState for ResponseHeadersBodyState {
243    fn handler(&self) -> &dyn super::HeadersBodyHandler {
244        &self.inner
245    }
246
247    fn contains_body(&self) -> bool {
248        self.inner.contains_body()
249    }
250}