pdk_classy/hl/
response.rs1use std::{
6 cell::RefCell,
7 future::{ready, Ready},
8 rc::Rc,
9};
10
11use crate::{
12 event::{EventKind, ResponseBody, ResponseHeaders},
13 BoxFuture,
14};
15
16use super::{
17 body::BodyExchange,
18 body_stream::{BodyStream, BodyStreamExchange},
19 dynamic_exchange::DynamicExchange,
20 entity::{BodyStreamState, IntoBodyState, IntoBodyStreamState},
21 headers::HeadersExchange,
22 BodyState, EntityState, HeadersState,
23};
24#[cfg(feature = "experimental_enable_stop_iteration")]
25use super::{
26 entity::{HeadersBodyState, IntoHeadersBodyState},
27 headers_body::HeadersBodyExchange,
28};
29#[cfg(feature = "experimental")]
30use super::{BodyError, Chunk};
31
32pub struct ResponseState {
35 contains_body: bool,
36 exchange: Rc<RefCell<DynamicExchange>>,
37}
38
39#[derive(thiserror::Error, Debug)]
41pub enum InvalidResponseState {
42 #[error("Invalid response state: {0:?}")]
44 Event(EventKind),
45
46 #[error("Early response")]
48 EarlyResponse,
49}
50
51impl ResponseState {
52 #[allow(clippy::await_holding_refcell_ref)]
53 pub(crate) async fn new(
54 exchange: Rc<RefCell<DynamicExchange>>,
55 ) -> Result<Self, InvalidResponseState> {
56 let contains_body;
57 {
58 let mut e = exchange.borrow_mut();
59 let Some(exchange) = e.wait_for_event::<ResponseHeaders>().await else {
60 let error = exchange
61 .borrow()
62 .current_event()
63 .map(InvalidResponseState::Event)
64 .unwrap_or(InvalidResponseState::EarlyResponse);
65 return Err(error);
66 };
67 contains_body = !exchange
68 .event_data()
69 .expect("Must contain headers")
70 .event
71 .end_of_stream;
72 }
73
74 Ok(Self {
75 contains_body,
76 exchange,
77 })
78 }
79}
80
81impl IntoBodyState for ResponseState {
82 type BodyState = ResponseBodyState;
83 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
84
85 fn into_body_state(self) -> Self::BodyStateFuture {
86 Box::pin(async {
87 ResponseBodyState {
88 inner: BodyExchange::new(self.exchange, self.contains_body).await,
89 }
90 })
91 }
92}
93
94impl IntoBodyStreamState for ResponseState {
95 type BodyStreamState = ResponseBodyStreamState;
96
97 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
98
99 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
100 Box::pin(async {
101 ResponseBodyStreamState {
102 inner: BodyStreamExchange::new(self.exchange, self.contains_body).await,
103 }
104 })
105 }
106}
107
108#[cfg(feature = "experimental_enable_stop_iteration")]
109impl IntoHeadersBodyState for ResponseState {
110 type HeadersBodyState = ResponseHeadersBodyState;
111 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
112
113 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
114 Box::pin(async {
115 ResponseHeadersBodyState {
116 inner: HeadersBodyExchange::new(self.exchange, self.contains_body).await,
117 }
118 })
119 }
120}
121
122impl EntityState for ResponseState {
123 type HeadersState = ResponseHeadersState;
124 type HeadersStateFuture = Ready<Self::HeadersState>;
125
126 fn into_headers_state(self) -> Self::HeadersStateFuture {
127 ready(ResponseHeadersState {
128 inner: HeadersExchange::new(self.exchange, self.contains_body),
129 })
130 }
131}
132
133pub struct ResponseHeadersState {
135 inner: HeadersExchange<ResponseHeaders, ResponseBody>,
136}
137
138impl ResponseHeadersState {
139 pub fn status_code(&self) -> u32 {
140 self.inner.with_event_data(|e| e.status_code())
141 }
142}
143
144impl IntoBodyState for ResponseHeadersState {
145 type BodyState = ResponseBodyState;
146
147 type BodyStateFuture = BoxFuture<'static, Self::BodyState>;
148
149 fn into_body_state(self) -> Self::BodyStateFuture {
150 Box::pin(async {
151 ResponseBodyState {
152 inner: self.inner.into_body_state().await,
153 }
154 })
155 }
156}
157
158impl IntoBodyStreamState for ResponseHeadersState {
159 type BodyStreamState = ResponseBodyStreamState;
160
161 type BodyStreamStateFuture = BoxFuture<'static, Self::BodyStreamState>;
162
163 fn into_body_stream_state(self) -> Self::BodyStreamStateFuture {
164 Box::pin(async {
165 ResponseBodyStreamState {
166 inner: self.inner.into_body_stream_state().await,
167 }
168 })
169 }
170}
171
172#[cfg(feature = "experimental_enable_stop_iteration")]
173impl IntoHeadersBodyState for ResponseHeadersState {
174 type HeadersBodyState = ResponseHeadersBodyState;
175 type HeadersBodyStateFuture = BoxFuture<'static, Self::HeadersBodyState>;
176
177 fn into_headers_body_state(self) -> Self::HeadersBodyStateFuture {
178 Box::pin(async {
179 ResponseHeadersBodyState {
180 inner: self.inner.into_headers_body_state().await,
181 }
182 })
183 }
184}
185
186impl HeadersState for ResponseHeadersState {
187 fn handler(&self) -> &dyn super::HeadersHandler {
188 self.inner.handler()
189 }
190
191 fn contains_body(&self) -> bool {
192 self.inner.contains_body
193 }
194}
195
196pub struct ResponseBodyState {
198 inner: BodyExchange<ResponseBody>,
199}
200
201impl BodyState for ResponseBodyState {
202 fn handler(&self) -> &dyn super::BodyHandler {
203 &self.inner
204 }
205
206 fn contains_body(&self) -> bool {
207 self.inner.contains_body()
208 }
209}
210
211pub struct ResponseBodyStreamState {
212 inner: BodyStreamExchange<ResponseBody>,
213}
214
215impl BodyStreamState for ResponseBodyStreamState {
216 type Stream<'b>
217 = BodyStream<'b>
218 where
219 Self: 'b;
220
221 fn contains_body(&self) -> bool {
222 self.inner.contains_body()
223 }
224
225 fn stream(&self) -> Self::Stream<'_> {
226 self.inner.stream()
227 }
228
229 #[cfg(feature = "experimental")]
230 fn write_chunk(&self, chunk: Chunk) -> Result<(), BodyError> {
231 self.inner.write_chunk(chunk)
232 }
233}
234
235#[cfg(feature = "experimental_enable_stop_iteration")]
236pub struct ResponseHeadersBodyState {
238 inner: HeadersBodyExchange<ResponseBody>,
239}
240
241#[cfg(feature = "experimental_enable_stop_iteration")]
242impl HeadersBodyState for ResponseHeadersBodyState {
243 fn handler(&self) -> &dyn super::HeadersBodyHandler {
244 &self.inner
245 }
246
247 fn contains_body(&self) -> bool {
248 self.inner.contains_body()
249 }
250}