blueprint_core_testing_utils/
runner.rs

1use blueprint_core::Job;
2use blueprint_router::Router;
3use blueprint_runner::BlueprintConfig;
4use blueprint_runner::config::BlueprintEnvironment;
5use blueprint_runner::error::RunnerError as Error;
6use blueprint_runner::{BackgroundService, BlueprintRunner, BlueprintRunnerBuilder};
7use std::future::{self, Pending};
8use std::sync::Arc;
9use tokio::sync::oneshot;
10
11pub struct TestRunner<Ctx> {
12    router: Option<Router<Ctx>>,
13    job_index: usize,
14    #[doc(hidden)]
15    pub builder: Option<BlueprintRunnerBuilder<Pending<()>>>,
16    _phantom: core::marker::PhantomData<Ctx>,
17}
18
19impl<Ctx> TestRunner<Ctx>
20where
21    Ctx: Clone + Send + Sync + 'static,
22{
23    pub fn new<C>(config: C, env: BlueprintEnvironment) -> Self
24    where
25        C: BlueprintConfig + 'static,
26    {
27        let builder =
28            BlueprintRunner::builder(config, env).with_shutdown_handler(future::pending::<()>());
29        TestRunner {
30            router: Some(Router::<Ctx>::new()),
31            job_index: 0,
32            builder: Some(builder),
33            _phantom: core::marker::PhantomData,
34        }
35    }
36
37    #[expect(clippy::missing_panics_doc)]
38    pub fn add_job<J, T>(&mut self, job: J) -> &mut Self
39    where
40        J: Job<T, Ctx> + Send + Sync + 'static,
41        T: 'static,
42    {
43        self.router = Some(
44            self.router
45                .take()
46                .expect("router should always exist")
47                .route(self.job_index, job),
48        );
49        self.job_index += 1;
50        self
51    }
52
53    #[expect(clippy::missing_panics_doc)]
54    pub fn add_background_service<B>(&mut self, service: B) -> &mut Self
55    where
56        B: BackgroundService + Send + 'static,
57    {
58        self.builder = Some(
59            self.builder
60                .take()
61                .expect("router should always exist")
62                .background_service(service),
63        );
64        self
65    }
66
67    /// Integrate the unified `QoS` service (heartbeat, metrics, logging, dashboards) as an always-on background service.
68    ///
69    /// # Panics
70    ///
71    /// Panics if the builder is not initialized.
72    pub fn qos_service<C>(
73        &mut self,
74        qos_service: Arc<blueprint_qos::unified_service::QoSService<C>>,
75    ) -> &mut Self
76    where
77        C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
78    {
79        struct QoSServiceAdapter<
80            C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
81        > {
82            qos_service: Arc<blueprint_qos::unified_service::QoSService<C>>,
83        }
84
85        impl<C> BackgroundService for QoSServiceAdapter<C>
86        where
87            C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
88        {
89            async fn start(
90                &self,
91            ) -> Result<tokio::sync::oneshot::Receiver<Result<(), Error>>, Error> {
92                blueprint_core::info!(
93                    "QoSServiceAdapter: Starting... Will integrate with QoSService lifecycle."
94                );
95                let (runner_tx, runner_rx) = oneshot::channel();
96
97                let (qos_completion_tx, qos_completion_rx) =
98                    tokio::sync::oneshot::channel::<Result<(), blueprint_qos::error::Error>>();
99
100                self.qos_service
101                    .set_completion_sender(qos_completion_tx)
102                    .await;
103
104                tokio::spawn(async move {
105                    match qos_completion_rx.await {
106                        Ok(Ok(())) => {
107                            if runner_tx.send(Ok(())).is_err() {
108                                blueprint_core::warn!(
109                                    "QoSServiceAdapter: runner_rx dropped before successful QoS completion signal."
110                                );
111                            }
112                        }
113                        Ok(Err(qos_internal_err)) => {
114                            blueprint_core::error!(
115                                "QoSServiceAdapter: QoSService completed with an internal error: {}. Signaling runner.",
116                                qos_internal_err
117                            );
118                            if runner_tx
119                                .send(Err(Error::BackgroundService(format!(
120                                    "QoS service internal error: {}",
121                                    qos_internal_err
122                                ))))
123                                .is_err()
124                            {
125                                blueprint_core::warn!(
126                                    "QoSServiceAdapter: runner_rx dropped before QoS internal error signal."
127                                );
128                            }
129                        }
130                        Err(_recv_err) => {
131                            blueprint_core::error!(
132                                "QoSServiceAdapter: QoSService completion sender (qos_completion_tx) was dropped. This typically means QoSService panicked or did not complete cleanly. Signaling runner."
133                            );
134                            if runner_tx.send(Err(Error::BackgroundService(
135                                "QoS service did not signal completion cleanly (sender dropped, possibly due to panic).".to_string(),
136                            ))).is_err() {
137                                blueprint_core::warn!(
138                                    "QoSServiceAdapter: runner_rx dropped after qos_completion_tx was dropped."
139                                );
140                            }
141                        }
142                    }
143                });
144
145                Ok(runner_rx)
146            }
147        }
148
149        let adapter = QoSServiceAdapter { qos_service };
150
151        self.builder = Some(
152            self.builder
153                .take()
154                .expect("BlueprintRunnerBuilder should always exist")
155                .background_service(adapter),
156        );
157        self
158    }
159
160    /// Start the runner
161    ///
162    /// # Errors
163    ///
164    /// See [`BlueprintRunnerBuilder::run()`]
165    #[expect(clippy::missing_panics_doc)]
166    pub async fn run(self, context: Ctx) -> Result<(), Error> {
167        let router: Router = self.router.unwrap().with_context(context);
168
169        self.builder.unwrap().router(router).run().await
170    }
171}
172
173pub trait TestEnv: Sized {
174    type Config: BlueprintConfig;
175    type Context: Clone + Send + Sync + 'static;
176
177    /// Create a new test environment
178    ///
179    /// # Errors
180    ///
181    /// Errors depend on the implementation.
182    fn new(config: Self::Config, env: BlueprintEnvironment) -> Result<Self, Error>;
183    fn add_job<J, T>(&mut self, job: J)
184    where
185        J: Job<T, Self::Context> + Send + Sync + 'static,
186        T: 'static;
187    fn add_background_service<B>(&mut self, service: B)
188    where
189        B: BackgroundService + Send + 'static;
190    fn get_blueprint_config(&self) -> BlueprintEnvironment;
191
192    /// Start the runner
193    ///
194    /// # Panics
195    ///
196    /// Will panic if the runner is already started
197    fn run_runner(
198        &mut self,
199        context: Self::Context,
200    ) -> impl std::future::Future<Output = Result<(), Error>> + Send;
201}