1use crate::harness::ENDOWED_TEST_NAMES;
2use crate::{
3 Error,
4 harness::{TangleTestConfig, generate_env_from_node_id},
5 runner::TangleTestEnv,
6};
7use blueprint_contexts::tangle::TangleClient;
8use blueprint_contexts::tangle::TangleClientContext;
9use blueprint_core::Job;
10use blueprint_core_testing_utils::runner::TestEnv;
11use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
12use blueprint_keystore::backends::Backend;
13use blueprint_keystore::crypto::sp_core::SpSr25519;
14use blueprint_qos::heartbeat::HeartbeatConsumer;
15use blueprint_qos::{QoSConfig, QoSService};
16use blueprint_runner::BackgroundService;
17use blueprint_runner::config::BlueprintEnvironment;
18use blueprint_runner::config::Multiaddr;
19use blueprint_runner::error::RunnerError;
20use blueprint_runner::tangle::config::TangleConfig;
21use blueprint_runner::tangle::error::TangleError;
22use futures::future::join_all;
23use std::fmt::{Debug, Formatter};
24use std::str::FromStr;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27use tangle_subxt::subxt::tx::Signer;
28use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
29
30#[derive(Clone)]
31pub enum NodeSlot<Ctx, C>
32where
33 Ctx: Clone,
34 C: HeartbeatConsumer + Clone + Send + Sync + 'static,
35{
36 Occupied(Arc<NodeHandle<Ctx, C>>),
37 Empty,
38}
39
40impl<Ctx, C> Debug for NodeSlot<Ctx, C>
41where
42 Ctx: Clone,
43 C: HeartbeatConsumer + Clone + Send + Sync + 'static,
44{
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 match self {
47 NodeSlot::Occupied(_node) => f.debug_struct("Occupied").finish(),
48 NodeSlot::Empty => f.debug_struct("Empty").finish(),
49 }
50 }
51}
52
53pub struct MultiNodeTestEnv<Ctx, C>
55where
56 Ctx: Clone + Send + Sync + 'static,
57 C: HeartbeatConsumer + Clone + Send + Sync + 'static,
58{
59 pub nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
60 pub command_tx: mpsc::Sender<EnvironmentCommand<Ctx>>,
61 pub event_tx: broadcast::Sender<TestEvent>,
62 pub config: Arc<TangleTestConfig>,
63 pub initialized_tx: Option<oneshot::Sender<()>>,
64 pub running_nodes: Arc<AtomicUsize>,
65}
66
67#[derive(Debug)]
68pub enum EnvironmentCommand<Ctx>
69where
70 Ctx: Clone + Send + Sync + 'static,
71{
72 AddNode {
73 node_id: usize,
74 result_tx: oneshot::Sender<Result<(), Error>>,
75 },
76 RemoveNode {
77 node_id: usize,
78 result_tx: oneshot::Sender<Result<(), Error>>,
79 },
80 Start {
81 result_tx: oneshot::Sender<Result<(), Error>>,
82 contexts: Vec<Ctx>,
83 },
84 Shutdown,
85}
86
87#[derive(Debug, Clone)]
88pub enum TestEvent {
89 NodeAdded(usize),
90 NodeRemoved(usize),
91 NodeShutdown(usize),
92 Error(String),
93}
94
95impl<Ctx, C> MultiNodeTestEnv<Ctx, C>
96where
97 Ctx: Clone + Send + Sync + 'static,
98 C: HeartbeatConsumer + Clone + Send + Sync + 'static,
99{
100 #[must_use]
102 pub fn new<const N: usize>(config: TangleTestConfig) -> Self {
103 const { assert!(N > 0, "Must have at least 1 initial node") };
104
105 let (command_tx, command_rx) = mpsc::channel(32);
106 let (event_tx, _) = broadcast::channel(100);
107 let (initialized_tx, _initialized_rx) = oneshot::channel();
108
109 let env = Self {
110 nodes: Arc::new(RwLock::new(vec![NodeSlot::Empty; N])),
111 command_tx,
112 event_tx: event_tx.clone(),
113 config: Arc::new(config),
114 initialized_tx: Some(initialized_tx),
115 running_nodes: Arc::new(AtomicUsize::new(0)),
116 };
117
118 Self::spawn_command_handler(
119 env.nodes.clone(),
120 env.config.clone(),
121 env.running_nodes.clone(),
122 command_rx,
123 event_tx,
124 );
125
126 env
127 }
128
129 #[allow(clippy::missing_panics_doc)]
135 pub async fn initialize(&mut self) -> Result<(), Error> {
136 if self.initialized_tx.is_none() {
137 return Ok(());
139 }
140
141 let initial_node_count = self.nodes.read().await.len();
142
143 for node_id in 0..initial_node_count {
145 self.add_node(node_id).await?;
146 }
147
148 let nodes = self.nodes.read().await;
150 for (index, node) in nodes.iter().enumerate() {
151 let NodeSlot::Occupied(node) = node else {
152 panic!("Not all nodes were initialized");
153 };
154
155 let mut bootnodes = Vec::new();
156 for node in nodes.iter().enumerate().filter(|(n, _)| *n != index) {
157 let NodeSlot::Occupied(node) = node.1 else {
158 panic!("Not all nodes were initialized");
159 };
160
161 bootnodes.push(node.addr.clone());
162 }
163
164 let mut env = node.test_env.write().await;
165 env.update_networking_config(bootnodes, node.port);
166 env.set_tangle_producer_consumer().await;
167 }
168
169 if let Some(tx) = self.initialized_tx.take() {
171 let _ = tx.send(());
172 }
173
174 Ok(())
175 }
176
177 pub async fn add_job<J: Job<T, Ctx> + Clone + Send + Sync + 'static, T: 'static>(
182 &self,
183 job: J,
184 ) {
185 let mut nodes = self.nodes.write().await;
186 for node in nodes.iter_mut() {
187 if let NodeSlot::Occupied(node) = node {
188 node.add_job(job.clone()).await;
189 }
190 }
191 }
192
193 pub async fn start(&mut self, context: Ctx) -> Result<(), Error> {
241 let nodes_len = self.nodes.read().await.len();
242 self.start_with_contexts(vec![context; nodes_len]).await
243 }
244
245 pub async fn start_with_contexts(&mut self, contexts: Vec<Ctx>) -> Result<(), Error> {
302 let nodes_len = self.nodes.read().await.len();
303 assert_eq!(
304 nodes_len,
305 contexts.len(),
306 "wrong number of contexts provided"
307 );
308
309 let (result_tx, result_rx) = oneshot::channel();
311 self.command_tx
312 .send(EnvironmentCommand::Start {
313 result_tx,
314 contexts,
315 })
316 .await
317 .map_err(|e| Error::Setup(e.to_string()))?;
318
319 result_rx.await.map_err(|e| Error::Setup(e.to_string()))??;
321
322 Ok(())
323 }
324
325 async fn add_node(&self, node_id: usize) -> Result<(), Error> {
326 let (result_tx, result_rx) = oneshot::channel();
327 self.command_tx
328 .send(EnvironmentCommand::AddNode { node_id, result_tx })
329 .await
330 .map_err(|e| Error::Setup(e.to_string()))?;
331 result_rx.await.map_err(|e| Error::Setup(e.to_string()))?
332 }
333
334 #[must_use]
336 pub fn subscribe(&self) -> broadcast::Receiver<TestEvent> {
337 self.event_tx.subscribe()
338 }
339
340 pub async fn remove_node(&self, node_id: usize) -> Result<(), Error> {
346 let (result_tx, result_rx) = oneshot::channel();
347 self.command_tx
348 .send(EnvironmentCommand::RemoveNode { node_id, result_tx })
349 .await
350 .map_err(|e| Error::Setup(e.to_string()))?;
351 result_rx.await.map_err(|e| Error::Setup(e.to_string()))?
352 }
353
354 pub async fn shutdown(&self) -> Result<(), Error> {
360 self.command_tx
361 .send(EnvironmentCommand::Shutdown)
362 .await
363 .map_err(|e| Error::Setup(e.to_string()))?;
364 Ok(())
365 }
366
367 fn spawn_command_handler(
368 nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
369 config: Arc<TangleTestConfig>,
370 running_nodes: Arc<AtomicUsize>,
371 mut command_rx: mpsc::Receiver<EnvironmentCommand<Ctx>>,
372 event_tx: broadcast::Sender<TestEvent>,
373 ) {
374 tokio::task::spawn(async move {
375 let nodes = nodes.clone();
376 while let Some(cmd) = command_rx.recv().await {
377 match cmd {
378 EnvironmentCommand::AddNode { node_id, result_tx } => {
379 let result = Self::handle_add_node(
380 nodes.clone(),
381 node_id,
382 config.clone(),
383 &event_tx,
384 )
385 .await;
386 let _ = result_tx.send(result);
387 }
388 EnvironmentCommand::RemoveNode { node_id, result_tx } => {
389 let result =
390 Self::handle_remove_node(nodes.clone(), node_id, &event_tx).await;
391 let _ = result_tx.send(result);
392 }
393 EnvironmentCommand::Start {
394 result_tx,
395 contexts,
396 } => {
397 let result = Self::handle_start(
398 nodes.clone(),
399 &event_tx,
400 running_nodes.clone(),
401 contexts,
402 )
403 .await;
404 let _ = result_tx.send(result);
405 }
406 EnvironmentCommand::Shutdown => {
407 Self::handle_shutdown(nodes.clone(), &event_tx).await;
408 break;
409 }
410 }
411 }
412 });
413 }
414
415 pub async fn node_handles(&self) -> Vec<Arc<NodeHandle<Ctx, C>>> {
416 self.nodes
417 .read()
418 .await
419 .iter()
420 .filter_map(|n| match n {
421 NodeSlot::Occupied(node) => Some(node.clone()),
422 NodeSlot::Empty => None,
423 })
424 .collect()
425 }
426
427 async fn handle_add_node(
428 nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
429 node_id: usize,
430 config: Arc<TangleTestConfig>,
431 event_tx: &broadcast::Sender<TestEvent>,
432 ) -> Result<(), Error> {
433 let node = NodeHandle::new(node_id, &config).await?;
434 nodes.write().await[node_id] = NodeSlot::Occupied(node);
435 let _ = event_tx.send(TestEvent::NodeAdded(node_id));
436 Ok(())
437 }
438
439 async fn handle_remove_node(
440 nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
441 node_id: usize,
442 event_tx: &broadcast::Sender<TestEvent>,
443 ) -> Result<(), Error> {
444 let nodes = nodes.read().await;
445
446 let NodeSlot::Occupied(node) = nodes[node_id].clone() else {
447 return Err(Error::Setup(format!("Node {} not found", node_id)));
448 };
449
450 if let Err(e) = node.shutdown().await {
452 let _ = event_tx.send(TestEvent::Error(format!(
453 "Failed to shutdown node {}: {}",
454 node_id, e
455 )));
456 }
457 let _ = event_tx.send(TestEvent::NodeRemoved(node_id));
458 Ok(())
459 }
460
461 async fn handle_start(
462 nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
463 event_tx: &broadcast::Sender<TestEvent>,
464 running_nodes: Arc<AtomicUsize>,
465 contexts: Vec<Ctx>,
466 ) -> Result<(), Error> {
467 let nodes = nodes.read().await;
468
469 assert!(
470 nodes.iter().all(|n| matches!(n, NodeSlot::Occupied(_))),
471 "Not all nodes were initialized"
472 );
473
474 let futures = nodes.iter().enumerate().map(|(node_id, node)| {
476 let running_nodes = running_nodes.clone();
477 let context = contexts[node_id].clone();
478
479 async move {
480 let NodeSlot::Occupied(node) = node else {
481 unreachable!()
482 };
483
484 if let Err(e) = node.start_runner(context).await {
485 let _ = event_tx.send(TestEvent::Error(format!(
486 "Failed to start node {}: {}",
487 node_id, e
488 )));
489 return Err(e);
490 }
491 running_nodes.fetch_add(1, Ordering::SeqCst);
492 Ok(())
493 }
494 });
495
496 let results = join_all(futures).await;
498 for result in results {
499 result?;
500 }
501
502 Ok(())
503 }
504
505 async fn handle_shutdown(
506 nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
507 event_tx: &broadcast::Sender<TestEvent>,
508 ) {
509 let nodes = nodes.read().await;
510 for (node_id, node) in nodes.iter().enumerate() {
511 if let NodeSlot::Occupied(node) = node {
512 if let Err(e) = node.shutdown().await {
513 let _ = event_tx.send(TestEvent::Error(format!(
514 "Failed to shutdown node {}: {}",
515 node_id, e
516 )));
517 }
518 }
519 }
520 }
521}
522
523struct NodeState {
524 is_running: bool,
525}
526
527impl Debug for NodeState {
528 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
529 f.debug_struct("NodeState")
530 .field("is_running", &self.is_running)
531 .finish()
532 }
533}
534
535#[non_exhaustive]
537enum NodeCommand {
538 Shutdown,
539}
540
541impl Debug for NodeCommand {
542 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
543 match self {
544 NodeCommand::Shutdown => f.write_str("Shutdown"),
545 }
546 }
547}
548
549pub struct NodeHandle<Ctx, C>
551where
552 C: HeartbeatConsumer + Send + Sync + 'static,
553{
554 pub node_id: usize,
555 pub addr: Multiaddr,
556 pub port: u16,
557 pub client: TangleClient,
558 pub signer: TanglePairSigner<sp_core::sr25519::Pair>,
559 state: Arc<RwLock<NodeState>>,
560 command_tx: mpsc::Sender<NodeCommand>,
561 pub test_env: Arc<RwLock<TangleTestEnv<Ctx, C>>>,
562}
563
564impl<Ctx, C> NodeHandle<Ctx, C>
565where
566 Ctx: Clone + Send + Sync + 'static,
567 C: HeartbeatConsumer + Send + Sync + 'static,
568{
569 pub async fn add_job<J, T>(&self, job: J)
574 where
575 J: Job<T, Ctx> + Send + Sync + 'static,
576 T: 'static,
577 {
578 self.test_env.write().await.add_job(job);
579 }
580
581 pub async fn add_background_service<K: BackgroundService + Send + 'static>(&self, service: K) {
582 self.test_env.write().await.add_background_service(service);
583 }
584
585 pub async fn blueprint_config(&self) -> BlueprintEnvironment {
586 self.test_env.read().await.get_blueprint_config()
587 }
588}
589
590impl<Ctx, C> Debug for NodeHandle<Ctx, C>
591where
592 Ctx: Clone + Send + Sync + 'static,
593 C: HeartbeatConsumer + Send + Sync + 'static,
594{
595 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
596 f.debug_struct("NodeHandle")
597 .field("node_id", &self.node_id)
598 .field("signer", &self.signer.address())
599 .field("test_env", &self.test_env)
600 .finish_non_exhaustive()
601 }
602}
603
604impl<Ctx, C> NodeHandle<Ctx, C>
606where
607 Ctx: Clone + Send + Sync + 'static,
608 C: HeartbeatConsumer + Send + Sync + 'static,
609{
610 async fn new(node_id: usize, config: &TangleTestConfig) -> Result<Arc<Self>, Error> {
611 let (command_tx, command_rx) = mpsc::channel(32);
612 let state = Arc::new(RwLock::new(NodeState { is_running: true }));
613
614 let mut env = generate_env_from_node_id(
616 ENDOWED_TEST_NAMES[node_id],
617 config.http_endpoint.clone(),
618 config.ws_endpoint.clone(),
619 config.temp_dir.as_path(),
620 )
621 .await?;
622 env.bridge_socket_path = Some(config.bridge_socket_path.clone());
623
624 let client = env.tangle_client().await?;
625 let keystore = env.keystore();
626 let sr25519_public = keystore
627 .first_local::<SpSr25519>()
628 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
629 let sr25519_pair = keystore
630 .get_secret::<SpSr25519>(&sr25519_public)
631 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
632 let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
633
634 let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?;
636
637 let port = find_open_tcp_bind_port();
638 blueprint_core::info!("Binding node {node_id} to port {port}");
639
640 let addr = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{port}"))
641 .expect("Should parse MultiAddr");
642
643 let node = Arc::new(Self {
644 node_id,
645 addr,
646 port,
647 client,
648 signer: sr25519_signer,
649 state,
650 command_tx,
651 test_env: Arc::new(RwLock::new(test_env)),
652 });
653
654 Self::spawn_command_handler(&node, command_rx);
655 Ok(node)
656 }
657
658 pub async fn set_qos_config(&self, config: QoSConfig) {
660 self.test_env.write().await.set_qos_config(config);
661 }
662
663 pub async fn set_qos_service(&self, service: Arc<QoSService<C>>) {
664 self.test_env.write().await.set_qos_service(service);
665 }
666
667 pub async fn shutdown(&self) -> Result<(), Error> {
673 self.command_tx
674 .send(NodeCommand::Shutdown)
675 .await
676 .map_err(|e| Error::Setup(e.to_string()))?;
677
678 let mut retries = 0;
680 while retries < 10 {
681 if !self.state.read().await.is_running {
682 return Ok(());
683 }
684 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
685 retries += 1;
686 }
687
688 Err(Error::Setup("Node failed to shutdown in time".to_string()))
689 }
690
691 pub async fn start_runner(&self, context: Ctx) -> Result<(), Error> {
697 let result = {
698 let mut test_env_guard = self.test_env.write().await;
699 test_env_guard.run_runner(context).await
700 };
701 result.map_err(|e| Error::Setup(e.to_string()))
702 }
703
704 fn spawn_command_handler(node: &Self, mut command_rx: mpsc::Receiver<NodeCommand>) {
705 let state = node.state.clone();
706 tokio::task::spawn(async move {
707 let Some(cmd) = command_rx.recv().await else {
708 return;
709 };
710 match cmd {
711 NodeCommand::Shutdown => {
712 let mut state = state.write().await;
713 state.is_running = false;
714 }
715 }
716 });
717 }
718
719 #[must_use]
721 pub fn client(&self) -> &TangleClient {
722 &self.client
723 }
724
725 #[must_use]
727 pub fn signer(&self) -> &TanglePairSigner<sp_core::sr25519::Pair> {
728 &self.signer
729 }
730}
731
732pub(crate) fn find_open_tcp_bind_port() -> u16 {
733 let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("Should bind to localhost");
734 let port = listener
735 .local_addr()
736 .expect("Should have a local address")
737 .port();
738 drop(listener);
739 port
740}