pdk_classy/extract/
context.rs1use std::any::TypeId;
8use std::collections::HashSet;
9use std::{cell::RefCell, convert::Infallible, rc::Rc};
10
11use crate::host::clock::Clock;
12use crate::host::grpc::GrpcHost;
13use crate::host::shared_data::SharedData;
14use crate::middleware::EventHandlerStack;
15use crate::{
16 context,
17 event::{After, Exchange, Start},
18 host::Host,
19 reactor::{http::HttpReactor, root::RootReactor},
20 types::{HttpCid, RootCid},
21 BoxFuture,
22};
23
24use super::{extractability, AlreadyExtracted, Exclusive, FromContext, FromContextOnce};
25
26#[cfg(feature = "experimental_websocket")]
27use crate::reactor::websocket::WebSocketReactor;
28
29#[cfg(feature = "experimental_websocket")]
30pub(crate) type WebSocketHandlerFn =
31 Box<dyn Fn(Rc<WebSocketReactor>) -> BoxFuture<'static, Result<(), crate::BoxError>> + 'static>;
32
33pub struct ConfigureContext {
36 pub(crate) host: Rc<dyn Host>,
37 pub(crate) clock: Rc<dyn Clock>,
38 pub(crate) grpc_host: Rc<dyn GrpcHost>,
39 pub(crate) shared_data: Rc<dyn SharedData>,
40 #[cfg(feature = "experimental_metrics")]
41 pub(crate) metrics: Rc<dyn crate::host::metrics::MetricsHost>,
42 pub(crate) root_reactor: Rc<RootReactor>,
43 pub(crate) event_handlers: Rc<RefCell<EventHandlerStack>>,
44 #[cfg(feature = "experimental_websocket")]
45 pub(crate) websocket_upstream_handler: Rc<RefCell<Option<WebSocketHandlerFn>>>,
46 #[cfg(feature = "experimental_websocket")]
47 pub(crate) websocket_downstream_handler: Rc<RefCell<Option<WebSocketHandlerFn>>>,
48 #[cfg(feature = "experimental_websocket")]
49 pub(crate) shared_state: Rc<RefCell<Option<Rc<dyn std::any::Any>>>>,
50 pub(crate) unique_extractions: RefCell<HashSet<TypeId>>,
51}
52
53context!(ConfigureContext);
54
55impl ConfigureContext {
56 pub(crate) fn extract_unique<T: 'static>(
57 &self,
58 op: impl FnOnce() -> T,
59 ) -> Result<T, AlreadyExtracted<T>> {
60 self.unique_extractions
61 .borrow_mut()
62 .insert(TypeId::of::<T>())
63 .then(op)
64 .ok_or_else(AlreadyExtracted::default)
65 }
66}
67
68impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn Host> {
69 type Error = Infallible;
70
71 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
72 Ok(context.host.clone())
73 }
74}
75
76impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn Clock> {
77 type Error = Infallible;
78
79 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
80 Ok(context.clock.clone())
81 }
82}
83
84impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn SharedData> {
85 type Error = Infallible;
86
87 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
88 Ok(context.shared_data.clone())
89 }
90}
91
92#[cfg(feature = "experimental_metrics")]
93impl FromContext<ConfigureContext, extractability::Transitive>
94 for Rc<dyn crate::host::metrics::MetricsHost>
95{
96 type Error = Infallible;
97
98 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
99 Ok(context.metrics.clone())
100 }
101}
102
103impl FromContext<ConfigureContext, extractability::Transitive> for Rc<RootReactor> {
104 type Error = Infallible;
105
106 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
107 Ok(context.root_reactor.clone())
108 }
109}
110
111impl FromContext<ConfigureContext, extractability::Transitive> for RootCid {
112 type Error = Infallible;
113
114 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
115 Ok(context.root_reactor.context_id())
116 }
117}
118
119pub struct FilterContext {
122 parent: Rc<ConfigureContext>,
123 http_reactor: Rc<HttpReactor>,
124}
125
126impl FilterContext {
127 pub(crate) fn new(parent: Rc<ConfigureContext>, http_reactor: Rc<HttpReactor>) -> Self {
128 Self {
129 parent,
130 http_reactor,
131 }
132 }
133
134 fn parent(&self) -> &ConfigureContext {
135 &self.parent
136 }
137
138 #[cfg(feature = "experimental_websocket")]
139 pub(crate) fn parent_rc(&self) -> &Rc<ConfigureContext> {
140 &self.parent
141 }
142}
143
144context!(FilterContext => ConfigureContext {FilterContext::parent});
145
146impl<E> FromContextOnce<FilterContext> for Exchange<E>
147where
148 E: After<Start> + 'static,
149{
150 type Error = AlreadyExtracted<Exchange<E>>;
151
152 type Future<'c> = BoxFuture<'c, Result<Self, Self::Error>>;
153
154 fn from_context_once(context: Exclusive<FilterContext>) -> Self::Future<'_> {
155 Box::pin(async move {
156 let exchange = Exchange::new(
157 context.http_reactor.clone(),
158 context.parent.host.clone(),
159 Some(Start {
160 _context_id: context.http_reactor.context_id(),
161 }),
162 );
163 Ok(exchange.wait_for_event::<E>().await)
164 })
165 }
166}
167
168impl FromContext<FilterContext> for Rc<HttpReactor> {
169 type Error = Infallible;
170
171 fn from_context(context: &FilterContext) -> Result<Self, Self::Error> {
172 Ok(context.http_reactor.clone())
173 }
174}
175
176impl FromContext<FilterContext> for HttpCid {
177 type Error = Infallible;
178
179 fn from_context(context: &FilterContext) -> Result<Self, Self::Error> {
180 Ok(context.http_reactor.context_id())
181 }
182}
183
184#[cfg(feature = "experimental_websocket")]
187pub struct UpgradeUpstreamContext<S = ()> {
188 parent: Rc<ConfigureContext>,
189 state: Rc<S>,
190 reactor: Rc<WebSocketReactor>,
191}
192
193#[cfg(feature = "experimental_websocket")]
194impl<S> UpgradeUpstreamContext<S> {
195 pub(crate) fn new(
196 parent: Rc<ConfigureContext>,
197 state: Rc<S>,
198 reactor: Rc<WebSocketReactor>,
199 ) -> Self {
200 Self {
201 parent,
202 state,
203 reactor,
204 }
205 }
206
207 pub(crate) fn state(&self) -> &Rc<S> {
208 &self.state
209 }
210
211 pub(crate) fn reactor(&self) -> &Rc<WebSocketReactor> {
212 &self.reactor
213 }
214
215 fn parent(&self) -> &ConfigureContext {
216 &self.parent
217 }
218}
219
220#[cfg(feature = "experimental_websocket")]
221context!(<S> UpgradeUpstreamContext<S> => ConfigureContext {UpgradeUpstreamContext::parent});
222
223#[cfg(feature = "experimental_websocket")]
226pub struct UpgradeDownstreamContext<S = ()> {
227 parent: Rc<ConfigureContext>,
228 state: Rc<S>,
229 reactor: Rc<WebSocketReactor>,
230}
231
232#[cfg(feature = "experimental_websocket")]
233impl<S> UpgradeDownstreamContext<S> {
234 pub(crate) fn new(
235 parent: Rc<ConfigureContext>,
236 state: Rc<S>,
237 reactor: Rc<WebSocketReactor>,
238 ) -> Self {
239 Self {
240 parent,
241 state,
242 reactor,
243 }
244 }
245
246 pub(crate) fn state(&self) -> &Rc<S> {
247 &self.state
248 }
249
250 pub(crate) fn reactor(&self) -> &Rc<WebSocketReactor> {
251 &self.reactor
252 }
253
254 fn parent(&self) -> &ConfigureContext {
255 &self.parent
256 }
257}
258
259#[cfg(feature = "experimental_websocket")]
260context!(<S> UpgradeDownstreamContext<S> => ConfigureContext {UpgradeDownstreamContext::parent});