pdk_classy/
bootstrap.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! The entrypoint to configure your function as a filter.
6
7use futures::{Stream, StreamExt};
8use std::{
9    convert::Infallible,
10    future::{ready, Ready},
11    task::{Poll, Waker},
12};
13
14use crate::{
15    extract::{
16        context::{ConfigureContext, FilterContext},
17        Exclusive, FromContextOnce,
18    },
19    handler::{Handler, IntoHandler},
20    reactor::{http::HttpReactor, root::RootReactor},
21};
22use std::rc::Rc;
23
24/// Launcher for asynchronous dispatch of filters through the [`Self::launch()`] method.
25/// This type is intended to be injected in configuration function.
26pub struct Launcher {
27    context: ConfigureContext,
28}
29
30/// The error type for [`Launcher::launch()`].
31#[derive(thiserror::Error, Debug)]
32#[error("Launch Error")]
33pub struct LaunchError {}
34
35impl Launcher {
36    pub(crate) fn new(context: ConfigureContext) -> Self {
37        Self { context }
38    }
39
40    /// Launches a `filter` to handle incoming concurrent requests.
41    /// This method ends when no more requests can be handled.
42    ///
43    /// * `filter` - The filter to be launched.
44    ///
45    /// # Errors
46    ///
47    /// This method will return an error in the following situation, but is not
48    /// limited to just this cases:
49    ///
50    /// * a `filter` can not be injected with extractible arguments.
51    pub async fn launch<F, I>(self, filter: F) -> Result<(), LaunchError>
52    where
53        F: IntoHandler<FilterContext, I, Output = ()>,
54    {
55        let configure_context = Rc::new(self.context);
56        let filter = filter.into_handler();
57        let contexts = ContextCreateStream::new(configure_context.root_reactor.clone());
58
59        contexts
60            .map(|http_reactor| (&filter, &configure_context, http_reactor))
61            .for_each_concurrent(
62                None,
63                |(filter, configure_context, http_reactor)| async move {
64                    let context =
65                        FilterContext::new(configure_context.clone(), http_reactor.clone());
66
67                    match filter.call(context).await {
68                        Ok(()) => {}
69                        Err(extraction_error) => {
70                            log::error!("Extraction problem in filter: {extraction_error}");
71                        }
72                    }
73                },
74            )
75            .await;
76
77        Ok(())
78    }
79}
80
81impl FromContextOnce<ConfigureContext> for Launcher {
82    type Error = Infallible;
83
84    type Future<'c> = Ready<Result<Self, Self::Error>>;
85
86    fn from_context_once(context: Exclusive<ConfigureContext>) -> Self::Future<'_> {
87        // Ideally we should pass a context borrowing instead of cloning it.
88        let context = ConfigureContext {
89            host: context.host.clone(),
90            clock: context.clock.clone(),
91            grpc_host: context.grpc_host.clone(),
92            shared_data: context.shared_data.clone(),
93            #[cfg(feature = "experimental_metrics")]
94            metrics: context.metrics.clone(),
95            root_reactor: context.root_reactor.clone(),
96            unique_extractions: Default::default(),
97        };
98        ready(Ok(Launcher::new(context)))
99    }
100}
101
102struct ContextCreateStream {
103    reactor: Rc<RootReactor>,
104    waker: Option<Waker>,
105}
106
107impl ContextCreateStream {
108    fn new(reactor: Rc<RootReactor>) -> Self {
109        Self {
110            reactor,
111            waker: None,
112        }
113    }
114}
115
116impl Unpin for ContextCreateStream {}
117
118impl Stream for ContextCreateStream {
119    type Item = Rc<HttpReactor>;
120
121    fn poll_next(
122        mut self: std::pin::Pin<&mut Self>,
123        cx: &mut std::task::Context<'_>,
124    ) -> std::task::Poll<Option<Self::Item>> {
125        if self.reactor.done() {
126            self.reactor.take_create_waker();
127            return Poll::Ready(None);
128        }
129
130        if let Some(http_reactor) = self.reactor.take_new_http_reactor() {
131            return Poll::Ready(Some(http_reactor));
132        }
133
134        match &self.waker {
135            None => {
136                // Register the waker in the reactor.
137                self.reactor.insert_create_waker(cx.waker().clone());
138                self.waker = Some(cx.waker().clone());
139            }
140            Some(waker) if !waker.will_wake(cx.waker()) => {
141                self.reactor.insert_create_waker(waker.clone());
142                self.waker = Some(cx.waker().clone());
143            }
144            Some(_) => {}
145        }
146        Poll::Pending
147    }
148}