blueprint_core_testing_utils/
runner.rs1use blueprint_core::Job;
2use blueprint_router::Router;
3use blueprint_runner::BlueprintConfig;
4use blueprint_runner::config::BlueprintEnvironment;
5use blueprint_runner::error::RunnerError as Error;
6use blueprint_runner::{BackgroundService, BlueprintRunner, BlueprintRunnerBuilder};
7use std::future::{self, Pending};
8use std::sync::Arc;
9use tokio::sync::oneshot;
10
11pub struct TestRunner<Ctx> {
12 router: Option<Router<Ctx>>,
13 job_index: usize,
14 #[doc(hidden)]
15 pub builder: Option<BlueprintRunnerBuilder<Pending<()>>>,
16 _phantom: core::marker::PhantomData<Ctx>,
17}
18
19impl<Ctx> TestRunner<Ctx>
20where
21 Ctx: Clone + Send + Sync + 'static,
22{
23 pub fn new<C>(config: C, env: BlueprintEnvironment) -> Self
24 where
25 C: BlueprintConfig + 'static,
26 {
27 let builder =
28 BlueprintRunner::builder(config, env).with_shutdown_handler(future::pending::<()>());
29 TestRunner {
30 router: Some(Router::<Ctx>::new()),
31 job_index: 0,
32 builder: Some(builder),
33 _phantom: core::marker::PhantomData,
34 }
35 }
36
37 #[expect(clippy::missing_panics_doc)]
38 pub fn add_job<J, T>(&mut self, job: J) -> &mut Self
39 where
40 J: Job<T, Ctx> + Send + Sync + 'static,
41 T: 'static,
42 {
43 self.router = Some(
44 self.router
45 .take()
46 .expect("router should always exist")
47 .route(self.job_index, job),
48 );
49 self.job_index += 1;
50 self
51 }
52
53 #[expect(clippy::missing_panics_doc)]
54 pub fn add_background_service<B>(&mut self, service: B) -> &mut Self
55 where
56 B: BackgroundService + Send + 'static,
57 {
58 self.builder = Some(
59 self.builder
60 .take()
61 .expect("router should always exist")
62 .background_service(service),
63 );
64 self
65 }
66
67 pub fn qos_service<C>(
73 &mut self,
74 qos_service: Arc<blueprint_qos::unified_service::QoSService<C>>,
75 ) -> &mut Self
76 where
77 C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
78 {
79 struct QoSServiceAdapter<
80 C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
81 > {
82 qos_service: Arc<blueprint_qos::unified_service::QoSService<C>>,
83 }
84
85 impl<C> BackgroundService for QoSServiceAdapter<C>
86 where
87 C: blueprint_qos::heartbeat::HeartbeatConsumer + Send + Sync + 'static,
88 {
89 async fn start(
90 &self,
91 ) -> Result<tokio::sync::oneshot::Receiver<Result<(), Error>>, Error> {
92 blueprint_core::info!(
93 "QoSServiceAdapter: Starting... Will integrate with QoSService lifecycle."
94 );
95 let (runner_tx, runner_rx) = oneshot::channel();
96
97 let (qos_completion_tx, qos_completion_rx) =
98 tokio::sync::oneshot::channel::<Result<(), blueprint_qos::error::Error>>();
99
100 self.qos_service
101 .set_completion_sender(qos_completion_tx)
102 .await;
103
104 tokio::spawn(async move {
105 match qos_completion_rx.await {
106 Ok(Ok(())) => {
107 if runner_tx.send(Ok(())).is_err() {
108 blueprint_core::warn!(
109 "QoSServiceAdapter: runner_rx dropped before successful QoS completion signal."
110 );
111 }
112 }
113 Ok(Err(qos_internal_err)) => {
114 blueprint_core::error!(
115 "QoSServiceAdapter: QoSService completed with an internal error: {}. Signaling runner.",
116 qos_internal_err
117 );
118 if runner_tx
119 .send(Err(Error::BackgroundService(format!(
120 "QoS service internal error: {}",
121 qos_internal_err
122 ))))
123 .is_err()
124 {
125 blueprint_core::warn!(
126 "QoSServiceAdapter: runner_rx dropped before QoS internal error signal."
127 );
128 }
129 }
130 Err(_recv_err) => {
131 blueprint_core::error!(
132 "QoSServiceAdapter: QoSService completion sender (qos_completion_tx) was dropped. This typically means QoSService panicked or did not complete cleanly. Signaling runner."
133 );
134 if runner_tx.send(Err(Error::BackgroundService(
135 "QoS service did not signal completion cleanly (sender dropped, possibly due to panic).".to_string(),
136 ))).is_err() {
137 blueprint_core::warn!(
138 "QoSServiceAdapter: runner_rx dropped after qos_completion_tx was dropped."
139 );
140 }
141 }
142 }
143 });
144
145 Ok(runner_rx)
146 }
147 }
148
149 let adapter = QoSServiceAdapter { qos_service };
150
151 self.builder = Some(
152 self.builder
153 .take()
154 .expect("BlueprintRunnerBuilder should always exist")
155 .background_service(adapter),
156 );
157 self
158 }
159
160 #[expect(clippy::missing_panics_doc)]
166 pub async fn run(self, context: Ctx) -> Result<(), Error> {
167 let router: Router = self.router.unwrap().with_context(context);
168
169 self.builder.unwrap().router(router).run().await
170 }
171}
172
173pub trait TestEnv: Sized {
174 type Config: BlueprintConfig;
175 type Context: Clone + Send + Sync + 'static;
176
177 fn new(config: Self::Config, env: BlueprintEnvironment) -> Result<Self, Error>;
183 fn add_job<J, T>(&mut self, job: J)
184 where
185 J: Job<T, Self::Context> + Send + Sync + 'static,
186 T: 'static;
187 fn add_background_service<B>(&mut self, service: B)
188 where
189 B: BackgroundService + Send + 'static;
190 fn get_blueprint_config(&self) -> BlueprintEnvironment;
191
192 fn run_runner(
198 &mut self,
199 context: Self::Context,
200 ) -> impl std::future::Future<Output = Result<(), Error>> + Send;
201}