blueprint_tangle_testing_utils/
runner.rs1#![allow(dead_code)]
2
3use blueprint_contexts::tangle::TangleClientContext;
4use blueprint_core::Job;
5use blueprint_core_testing_utils::runner::{TestEnv, TestRunner};
6use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
7use blueprint_keystore::backends::Backend;
8use blueprint_keystore::crypto::sp_core::SpSr25519;
9use blueprint_qos::heartbeat::HeartbeatConsumer;
10use blueprint_qos::heartbeat::HeartbeatStatus;
11use blueprint_qos::{QoSConfig, QoSService};
12use blueprint_runner::BackgroundService;
13use blueprint_runner::config::BlueprintEnvironment;
14use blueprint_runner::config::Multiaddr;
15use blueprint_runner::error::{JobCallError, RunnerError as Error};
16use blueprint_runner::tangle::config::TangleConfig;
17use blueprint_std::fmt::{Debug, Formatter};
18use blueprint_std::pin::Pin;
19use blueprint_std::sync::Arc;
20use blueprint_tangle_extra::consumer::TangleConsumer;
21use blueprint_tangle_extra::producer::TangleProducer;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24
25pub struct TangleTestEnv<Ctx, C>
26where
27 C: HeartbeatConsumer + Send + Sync + 'static,
28{
29 pub runner: Option<TestRunner<Ctx>>,
30 pub config: TangleConfig,
31 pub env: BlueprintEnvironment,
32 pub runner_handle: Mutex<Option<JoinHandle<Result<(), Error>>>>,
33 pub qos_config: Option<QoSConfig>,
34 pub qos_service: Option<Arc<QoSService<C>>>,
35}
36
37impl<Ctx, C> TangleTestEnv<Ctx, C>
38where
39 C: HeartbeatConsumer + Send + Sync + 'static,
40{
41 pub(crate) fn update_networking_config(
42 &mut self,
43 bootnodes: Vec<Multiaddr>,
44 network_bind_port: u16,
45 ) {
46 self.env.bootnodes = bootnodes;
47 self.env.network_bind_port = network_bind_port;
48 }
49
50 pub fn set_qos_config(&mut self, config: QoSConfig) {
52 self.qos_config = Some(config);
53 }
54 pub fn set_qos_service(&mut self, service: Arc<QoSService<C>>) {
56 self.qos_service = Some(service);
57 }
58
59 pub(crate) async fn set_tangle_producer_consumer(&mut self) {
61 let runner = self.runner.as_mut().expect("Runner already running");
62 let builder = runner.builder.take().expect("Runner already running");
63 let tangle_client = self
64 .env
65 .tangle_client()
66 .await
67 .expect("Tangle node should be running");
68 let producer = TangleProducer::finalized_blocks(tangle_client.rpc_client.clone())
69 .await
70 .expect("Failed to create producer");
71
72 let sr25519_signer = self
73 .env
74 .keystore()
75 .first_local::<SpSr25519>()
76 .expect("key not found");
77 let sr25519_pair = self
78 .env
79 .keystore()
80 .get_secret::<SpSr25519>(&sr25519_signer)
81 .expect("key not found");
82 let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
83 let consumer = TangleConsumer::new(tangle_client.rpc_client.clone(), sr25519_signer);
84 runner.builder = Some(builder.producer(producer).consumer(consumer));
85 }
86}
87
88impl<Ctx, C> Debug for TangleTestEnv<Ctx, C>
89where
90 C: HeartbeatConsumer + Send + Sync + 'static,
91{
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("TangleTestEnv")
94 .field("config", &self.config)
95 .field("env", &self.env)
96 .finish_non_exhaustive()
97 }
98}
99
100impl<Ctx, C> TestEnv for TangleTestEnv<Ctx, C>
101where
102 Ctx: Clone + Send + Sync + 'static,
103 C: HeartbeatConsumer + Send + Sync + 'static,
104{
105 type Config = TangleConfig;
106 type Context = Ctx;
107
108 fn new(config: Self::Config, env: BlueprintEnvironment) -> Result<Self, Error> {
109 let runner = TestRunner::<Ctx>::new::<Self::Config>(config.clone(), env.clone());
110
111 Ok(Self {
112 runner: Some(runner),
113 config,
114 env,
115 runner_handle: Mutex::new(None),
116 qos_config: None,
117 qos_service: None,
118 })
119 }
120
121 fn add_job<J, T>(&mut self, job: J)
122 where
123 J: Job<T, Self::Context> + Send + Sync + 'static,
124 T: 'static,
125 {
126 self.runner
127 .as_mut()
128 .expect("Runner already running")
129 .add_job(job);
130 }
131
132 fn add_background_service<B>(&mut self, service: B)
133 where
134 B: BackgroundService + Send + 'static,
135 {
136 self.runner
137 .as_mut()
138 .expect("Runner already running")
139 .add_background_service(service);
140 }
141
142 fn get_blueprint_config(&self) -> BlueprintEnvironment {
143 self.env.clone()
144 }
145
146 async fn run_runner(&mut self, context: Self::Context) -> Result<(), Error> {
147 let mut runner = self.runner.take().expect("Runner already running");
149 if let Some(qos_service_arc) = &self.qos_service {
150 runner.qos_service(qos_service_arc.clone());
151 }
152
153 let handle = tokio::spawn(async move { runner.run(context).await });
154
155 let mut guard = self.runner_handle.lock().await;
156 *guard = Some(handle);
157
158 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
160
161 let handle = guard.take().expect("was just set");
163 if !handle.is_finished() {
164 *guard = Some(handle);
166 blueprint_core::info!("Runner started successfully");
167 return Ok(());
168 }
169
170 blueprint_core::info!("Runner task finished");
171 match handle.await {
172 Ok(Ok(())) => Ok(()),
173 Ok(Err(e)) => {
174 blueprint_core::error!("Runner failed during startup: {}", e);
175 Err(e)
176 }
177 Err(e) => {
178 blueprint_core::error!("Runner task panicked: {}", e);
179 Err(JobCallError::JobDidntFinish(e).into())
180 }
181 }
182 }
183}
184
185impl<Ctx, C> Drop for TangleTestEnv<Ctx, C>
186where
187 C: HeartbeatConsumer + Send + Sync + 'static,
188{
189 fn drop(&mut self) {
190 futures::executor::block_on(async {
191 let mut guard = self.runner_handle.lock().await;
192 if let Some(handle) = guard.take() {
193 handle.abort();
194 }
195 });
196 }
197}
198
199#[derive(Clone, Default)]
201pub struct MockHeartbeatConsumer {
202 pub heartbeats: Arc<Mutex<Vec<HeartbeatStatus>>>,
203}
204
205impl MockHeartbeatConsumer {
206 #[must_use]
207 pub fn new() -> Self {
208 Self {
209 heartbeats: Arc::new(Mutex::new(Vec::new())),
210 }
211 }
212
213 #[must_use]
219 pub async fn heartbeat_count(&self) -> usize {
220 self.heartbeats.lock().await.len()
221 }
222
223 #[must_use]
229 pub async fn get_heartbeats(&self) -> Vec<HeartbeatStatus> {
230 self.heartbeats.lock().await.clone()
231 }
232}
233
234impl HeartbeatConsumer for MockHeartbeatConsumer {
235 fn send_heartbeat(
236 &self,
237 status: &HeartbeatStatus,
238 ) -> Pin<Box<dyn std::future::Future<Output = Result<(), blueprint_qos::error::Error>> + Send>>
239 {
240 let status = status.clone();
241 let heartbeats_store = self.heartbeats.clone();
242
243 Box::pin(async move {
244 heartbeats_store.lock().await.push(status.clone());
245 Ok(())
246 })
247 }
248}