blueprint_tangle_testing_utils/
runner.rs

1#![allow(dead_code)]
2
3use blueprint_contexts::tangle::TangleClientContext;
4use blueprint_core::Job;
5use blueprint_core_testing_utils::runner::{TestEnv, TestRunner};
6use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
7use blueprint_keystore::backends::Backend;
8use blueprint_keystore::crypto::sp_core::SpSr25519;
9use blueprint_qos::heartbeat::HeartbeatConsumer;
10use blueprint_qos::heartbeat::HeartbeatStatus;
11use blueprint_qos::{QoSConfig, QoSService};
12use blueprint_runner::BackgroundService;
13use blueprint_runner::config::BlueprintEnvironment;
14use blueprint_runner::config::Multiaddr;
15use blueprint_runner::error::{JobCallError, RunnerError as Error};
16use blueprint_runner::tangle::config::TangleConfig;
17use blueprint_std::fmt::{Debug, Formatter};
18use blueprint_std::pin::Pin;
19use blueprint_std::sync::Arc;
20use blueprint_tangle_extra::consumer::TangleConsumer;
21use blueprint_tangle_extra::producer::TangleProducer;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24
25pub struct TangleTestEnv<Ctx, C>
26where
27    C: HeartbeatConsumer + Send + Sync + 'static,
28{
29    pub runner: Option<TestRunner<Ctx>>,
30    pub config: TangleConfig,
31    pub env: BlueprintEnvironment,
32    pub runner_handle: Mutex<Option<JoinHandle<Result<(), Error>>>>,
33    pub qos_config: Option<QoSConfig>,
34    pub qos_service: Option<Arc<QoSService<C>>>,
35}
36
37impl<Ctx, C> TangleTestEnv<Ctx, C>
38where
39    C: HeartbeatConsumer + Send + Sync + 'static,
40{
41    pub(crate) fn update_networking_config(
42        &mut self,
43        bootnodes: Vec<Multiaddr>,
44        network_bind_port: u16,
45    ) {
46        self.env.bootnodes = bootnodes;
47        self.env.network_bind_port = network_bind_port;
48    }
49
50    /// Set the `QoS` config for this test environment
51    pub fn set_qos_config(&mut self, config: QoSConfig) {
52        self.qos_config = Some(config);
53    }
54    /// Set the `QoS` service for this test environment
55    pub fn set_qos_service(&mut self, service: Arc<QoSService<C>>) {
56        self.qos_service = Some(service);
57    }
58
59    // TODO(serial): This needs to return errors. Too many chances to panic here. Not helpful.
60    pub(crate) async fn set_tangle_producer_consumer(&mut self) {
61        let runner = self.runner.as_mut().expect("Runner already running");
62        let builder = runner.builder.take().expect("Runner already running");
63        let tangle_client = self
64            .env
65            .tangle_client()
66            .await
67            .expect("Tangle node should be running");
68        let producer = TangleProducer::finalized_blocks(tangle_client.rpc_client.clone())
69            .await
70            .expect("Failed to create producer");
71
72        let sr25519_signer = self
73            .env
74            .keystore()
75            .first_local::<SpSr25519>()
76            .expect("key not found");
77        let sr25519_pair = self
78            .env
79            .keystore()
80            .get_secret::<SpSr25519>(&sr25519_signer)
81            .expect("key not found");
82        let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
83        let consumer = TangleConsumer::new(tangle_client.rpc_client.clone(), sr25519_signer);
84        runner.builder = Some(builder.producer(producer).consumer(consumer));
85    }
86}
87
88impl<Ctx, C> Debug for TangleTestEnv<Ctx, C>
89where
90    C: HeartbeatConsumer + Send + Sync + 'static,
91{
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("TangleTestEnv")
94            .field("config", &self.config)
95            .field("env", &self.env)
96            .finish_non_exhaustive()
97    }
98}
99
100impl<Ctx, C> TestEnv for TangleTestEnv<Ctx, C>
101where
102    Ctx: Clone + Send + Sync + 'static,
103    C: HeartbeatConsumer + Send + Sync + 'static,
104{
105    type Config = TangleConfig;
106    type Context = Ctx;
107
108    fn new(config: Self::Config, env: BlueprintEnvironment) -> Result<Self, Error> {
109        let runner = TestRunner::<Ctx>::new::<Self::Config>(config.clone(), env.clone());
110
111        Ok(Self {
112            runner: Some(runner),
113            config,
114            env,
115            runner_handle: Mutex::new(None),
116            qos_config: None,
117            qos_service: None,
118        })
119    }
120
121    fn add_job<J, T>(&mut self, job: J)
122    where
123        J: Job<T, Self::Context> + Send + Sync + 'static,
124        T: 'static,
125    {
126        self.runner
127            .as_mut()
128            .expect("Runner already running")
129            .add_job(job);
130    }
131
132    fn add_background_service<B>(&mut self, service: B)
133    where
134        B: BackgroundService + Send + 'static,
135    {
136        self.runner
137            .as_mut()
138            .expect("Runner already running")
139            .add_background_service(service);
140    }
141
142    fn get_blueprint_config(&self) -> BlueprintEnvironment {
143        self.env.clone()
144    }
145
146    async fn run_runner(&mut self, context: Self::Context) -> Result<(), Error> {
147        // Spawn the runner in a background task
148        let mut runner = self.runner.take().expect("Runner already running");
149        if let Some(qos_service_arc) = &self.qos_service {
150            runner.qos_service(qos_service_arc.clone());
151        }
152
153        let handle = tokio::spawn(async move { runner.run(context).await });
154
155        let mut guard = self.runner_handle.lock().await;
156        *guard = Some(handle);
157
158        // Brief delay to allow for startup
159        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
160
161        // Just check if it failed immediately
162        let handle = guard.take().expect("was just set");
163        if !handle.is_finished() {
164            // Put the handle back since the runner is still running
165            *guard = Some(handle);
166            blueprint_core::info!("Runner started successfully");
167            return Ok(());
168        }
169
170        blueprint_core::info!("Runner task finished");
171        match handle.await {
172            Ok(Ok(())) => Ok(()),
173            Ok(Err(e)) => {
174                blueprint_core::error!("Runner failed during startup: {}", e);
175                Err(e)
176            }
177            Err(e) => {
178                blueprint_core::error!("Runner task panicked: {}", e);
179                Err(JobCallError::JobDidntFinish(e).into())
180            }
181        }
182    }
183}
184
185impl<Ctx, C> Drop for TangleTestEnv<Ctx, C>
186where
187    C: HeartbeatConsumer + Send + Sync + 'static,
188{
189    fn drop(&mut self) {
190        futures::executor::block_on(async {
191            let mut guard = self.runner_handle.lock().await;
192            if let Some(handle) = guard.take() {
193                handle.abort();
194            }
195        });
196    }
197}
198
199/// Mock implementation of the `HeartbeatConsumer` for testing
200#[derive(Clone, Default)]
201pub struct MockHeartbeatConsumer {
202    pub heartbeats: Arc<Mutex<Vec<HeartbeatStatus>>>,
203}
204
205impl MockHeartbeatConsumer {
206    #[must_use]
207    pub fn new() -> Self {
208        Self {
209            heartbeats: Arc::new(Mutex::new(Vec::new())),
210        }
211    }
212
213    /// Returns the number of heartbeats received
214    ///
215    /// # Panics
216    ///
217    /// Panics if the heartbeats mutex is poisoned
218    #[must_use]
219    pub async fn heartbeat_count(&self) -> usize {
220        self.heartbeats.lock().await.len()
221    }
222
223    /// Gets a copy of all received heartbeat statuses
224    ///
225    /// # Panics
226    ///
227    /// Panics if the heartbeats mutex is poisoned
228    #[must_use]
229    pub async fn get_heartbeats(&self) -> Vec<HeartbeatStatus> {
230        self.heartbeats.lock().await.clone()
231    }
232}
233
234impl HeartbeatConsumer for MockHeartbeatConsumer {
235    fn send_heartbeat(
236        &self,
237        status: &HeartbeatStatus,
238    ) -> Pin<Box<dyn std::future::Future<Output = Result<(), blueprint_qos::error::Error>> + Send>>
239    {
240        let status = status.clone();
241        let heartbeats_store = self.heartbeats.clone();
242
243        Box::pin(async move {
244            heartbeats_store.lock().await.push(status.clone());
245            Ok(())
246        })
247    }
248}