Skip to main content

pdk_classy/extract/
context.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! The different contexts where data can be extracted from.
6
7use std::any::TypeId;
8use std::collections::HashSet;
9use std::{cell::RefCell, convert::Infallible, rc::Rc};
10
11use crate::host::clock::Clock;
12use crate::host::grpc::GrpcHost;
13use crate::host::shared_data::SharedData;
14use crate::middleware::EventHandlerStack;
15use crate::{
16    context,
17    event::{After, Exchange, Start},
18    host::Host,
19    reactor::{http::HttpReactor, root::RootReactor},
20    types::{HttpCid, RootCid},
21    BoxFuture,
22};
23
24use super::{extractability, AlreadyExtracted, Exclusive, FromContext, FromContextOnce};
25
26#[cfg(feature = "experimental_websocket")]
27use crate::reactor::websocket::WebSocketReactor;
28
29#[cfg(feature = "experimental_websocket")]
30pub(crate) type WebSocketHandlerFn =
31    Box<dyn Fn(Rc<WebSocketReactor>) -> BoxFuture<'static, Result<(), crate::BoxError>> + 'static>;
32
33/// Context representing the configuration of the plugin.
34/// The configuration function is the one annotated with the `#[entrypoint]` annotation.
35pub struct ConfigureContext {
36    pub(crate) host: Rc<dyn Host>,
37    pub(crate) clock: Rc<dyn Clock>,
38    pub(crate) grpc_host: Rc<dyn GrpcHost>,
39    pub(crate) shared_data: Rc<dyn SharedData>,
40    #[cfg(feature = "experimental_metrics")]
41    pub(crate) metrics: Rc<dyn crate::host::metrics::MetricsHost>,
42    pub(crate) root_reactor: Rc<RootReactor>,
43    pub(crate) event_handlers: Rc<RefCell<EventHandlerStack>>,
44    #[cfg(feature = "experimental_websocket")]
45    pub(crate) websocket_upstream_handler: Rc<RefCell<Option<WebSocketHandlerFn>>>,
46    #[cfg(feature = "experimental_websocket")]
47    pub(crate) websocket_downstream_handler: Rc<RefCell<Option<WebSocketHandlerFn>>>,
48    #[cfg(feature = "experimental_websocket")]
49    pub(crate) shared_state: Rc<RefCell<Option<Rc<dyn std::any::Any>>>>,
50    pub(crate) unique_extractions: RefCell<HashSet<TypeId>>,
51}
52
53context!(ConfigureContext);
54
55impl ConfigureContext {
56    pub(crate) fn extract_unique<T: 'static>(
57        &self,
58        op: impl FnOnce() -> T,
59    ) -> Result<T, AlreadyExtracted<T>> {
60        self.unique_extractions
61            .borrow_mut()
62            .insert(TypeId::of::<T>())
63            .then(op)
64            .ok_or_else(AlreadyExtracted::default)
65    }
66}
67
68impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn Host> {
69    type Error = Infallible;
70
71    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
72        Ok(context.host.clone())
73    }
74}
75
76impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn Clock> {
77    type Error = Infallible;
78
79    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
80        Ok(context.clock.clone())
81    }
82}
83
84impl FromContext<ConfigureContext, extractability::Transitive> for Rc<dyn SharedData> {
85    type Error = Infallible;
86
87    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
88        Ok(context.shared_data.clone())
89    }
90}
91
92#[cfg(feature = "experimental_metrics")]
93impl FromContext<ConfigureContext, extractability::Transitive>
94    for Rc<dyn crate::host::metrics::MetricsHost>
95{
96    type Error = Infallible;
97
98    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
99        Ok(context.metrics.clone())
100    }
101}
102
103impl FromContext<ConfigureContext, extractability::Transitive> for Rc<RootReactor> {
104    type Error = Infallible;
105
106    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
107        Ok(context.root_reactor.clone())
108    }
109}
110
111impl FromContext<ConfigureContext, extractability::Transitive> for RootCid {
112    type Error = Infallible;
113
114    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
115        Ok(context.root_reactor.context_id())
116    }
117}
118
119/// Context representing the request filter function of the plugin.
120/// The filter function is the one that is provided to the [`Launcher`](crate::bootstrap::Launcher).
121pub struct FilterContext {
122    parent: Rc<ConfigureContext>,
123    http_reactor: Rc<HttpReactor>,
124}
125
126impl FilterContext {
127    pub(crate) fn new(parent: Rc<ConfigureContext>, http_reactor: Rc<HttpReactor>) -> Self {
128        Self {
129            parent,
130            http_reactor,
131        }
132    }
133
134    fn parent(&self) -> &ConfigureContext {
135        &self.parent
136    }
137
138    #[cfg(feature = "experimental_websocket")]
139    pub(crate) fn parent_rc(&self) -> &Rc<ConfigureContext> {
140        &self.parent
141    }
142}
143
144context!(FilterContext => ConfigureContext {FilterContext::parent});
145
146impl<E> FromContextOnce<FilterContext> for Exchange<E>
147where
148    E: After<Start> + 'static,
149{
150    type Error = AlreadyExtracted<Exchange<E>>;
151
152    type Future<'c> = BoxFuture<'c, Result<Self, Self::Error>>;
153
154    fn from_context_once(context: Exclusive<FilterContext>) -> Self::Future<'_> {
155        Box::pin(async move {
156            let exchange = Exchange::new(
157                context.http_reactor.clone(),
158                context.parent.host.clone(),
159                Some(Start {
160                    _context_id: context.http_reactor.context_id(),
161                }),
162            );
163            Ok(exchange.wait_for_event::<E>().await)
164        })
165    }
166}
167
168impl FromContext<FilterContext> for Rc<HttpReactor> {
169    type Error = Infallible;
170
171    fn from_context(context: &FilterContext) -> Result<Self, Self::Error> {
172        Ok(context.http_reactor.clone())
173    }
174}
175
176impl FromContext<FilterContext> for HttpCid {
177    type Error = Infallible;
178
179    fn from_context(context: &FilterContext) -> Result<Self, Self::Error> {
180        Ok(context.http_reactor.context_id())
181    }
182}
183
184/// Context representing a WebSocket upstream filter (client→server).
185/// Provides access to all PDK injectables for WebSocket frame handling.
186#[cfg(feature = "experimental_websocket")]
187pub struct UpgradeUpstreamContext<S = ()> {
188    parent: Rc<ConfigureContext>,
189    state: Rc<S>,
190    reactor: Rc<WebSocketReactor>,
191}
192
193#[cfg(feature = "experimental_websocket")]
194impl<S> UpgradeUpstreamContext<S> {
195    pub(crate) fn new(
196        parent: Rc<ConfigureContext>,
197        state: Rc<S>,
198        reactor: Rc<WebSocketReactor>,
199    ) -> Self {
200        Self {
201            parent,
202            state,
203            reactor,
204        }
205    }
206
207    pub(crate) fn state(&self) -> &Rc<S> {
208        &self.state
209    }
210
211    pub(crate) fn reactor(&self) -> &Rc<WebSocketReactor> {
212        &self.reactor
213    }
214
215    fn parent(&self) -> &ConfigureContext {
216        &self.parent
217    }
218}
219
220#[cfg(feature = "experimental_websocket")]
221context!(<S> UpgradeUpstreamContext<S> => ConfigureContext {UpgradeUpstreamContext::parent});
222
223/// Context representing a WebSocket downstream filter (server→client).
224/// Provides access to all PDK injectables for WebSocket frame handling.
225#[cfg(feature = "experimental_websocket")]
226pub struct UpgradeDownstreamContext<S = ()> {
227    parent: Rc<ConfigureContext>,
228    state: Rc<S>,
229    reactor: Rc<WebSocketReactor>,
230}
231
232#[cfg(feature = "experimental_websocket")]
233impl<S> UpgradeDownstreamContext<S> {
234    pub(crate) fn new(
235        parent: Rc<ConfigureContext>,
236        state: Rc<S>,
237        reactor: Rc<WebSocketReactor>,
238    ) -> Self {
239        Self {
240            parent,
241            state,
242            reactor,
243        }
244    }
245
246    pub(crate) fn state(&self) -> &Rc<S> {
247        &self.state
248    }
249
250    pub(crate) fn reactor(&self) -> &Rc<WebSocketReactor> {
251        &self.reactor
252    }
253
254    fn parent(&self) -> &ConfigureContext {
255        &self.parent
256    }
257}
258
259#[cfg(feature = "experimental_websocket")]
260context!(<S> UpgradeDownstreamContext<S> => ConfigureContext {UpgradeDownstreamContext::parent});