blueprint_tangle_testing_utils/
multi_node.rs

1use crate::harness::ENDOWED_TEST_NAMES;
2use crate::{
3    Error,
4    harness::{TangleTestConfig, generate_env_from_node_id},
5    runner::TangleTestEnv,
6};
7use blueprint_contexts::tangle::TangleClient;
8use blueprint_contexts::tangle::TangleClientContext;
9use blueprint_core::Job;
10use blueprint_core_testing_utils::runner::TestEnv;
11use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
12use blueprint_keystore::backends::Backend;
13use blueprint_keystore::crypto::sp_core::SpSr25519;
14use blueprint_qos::heartbeat::HeartbeatConsumer;
15use blueprint_qos::{QoSConfig, QoSService};
16use blueprint_runner::BackgroundService;
17use blueprint_runner::config::BlueprintEnvironment;
18use blueprint_runner::config::Multiaddr;
19use blueprint_runner::error::RunnerError;
20use blueprint_runner::tangle::config::TangleConfig;
21use blueprint_runner::tangle::error::TangleError;
22use futures::future::join_all;
23use std::fmt::{Debug, Formatter};
24use std::str::FromStr;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27use tangle_subxt::subxt::tx::Signer;
28use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
29
30#[derive(Clone)]
31pub enum NodeSlot<Ctx, C>
32where
33    Ctx: Clone,
34    C: HeartbeatConsumer + Clone + Send + Sync + 'static,
35{
36    Occupied(Arc<NodeHandle<Ctx, C>>),
37    Empty,
38}
39
40impl<Ctx, C> Debug for NodeSlot<Ctx, C>
41where
42    Ctx: Clone,
43    C: HeartbeatConsumer + Clone + Send + Sync + 'static,
44{
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        match self {
47            NodeSlot::Occupied(_node) => f.debug_struct("Occupied").finish(),
48            NodeSlot::Empty => f.debug_struct("Empty").finish(),
49        }
50    }
51}
52
53/// Improved multi-node test environment with better control and observability
54pub struct MultiNodeTestEnv<Ctx, C>
55where
56    Ctx: Clone + Send + Sync + 'static,
57    C: HeartbeatConsumer + Clone + Send + Sync + 'static,
58{
59    pub nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
60    pub command_tx: mpsc::Sender<EnvironmentCommand<Ctx>>,
61    pub event_tx: broadcast::Sender<TestEvent>,
62    pub config: Arc<TangleTestConfig>,
63    pub initialized_tx: Option<oneshot::Sender<()>>,
64    pub running_nodes: Arc<AtomicUsize>,
65}
66
67#[derive(Debug)]
68pub enum EnvironmentCommand<Ctx>
69where
70    Ctx: Clone + Send + Sync + 'static,
71{
72    AddNode {
73        node_id: usize,
74        result_tx: oneshot::Sender<Result<(), Error>>,
75    },
76    RemoveNode {
77        node_id: usize,
78        result_tx: oneshot::Sender<Result<(), Error>>,
79    },
80    Start {
81        result_tx: oneshot::Sender<Result<(), Error>>,
82        contexts: Vec<Ctx>,
83    },
84    Shutdown,
85}
86
87#[derive(Debug, Clone)]
88pub enum TestEvent {
89    NodeAdded(usize),
90    NodeRemoved(usize),
91    NodeShutdown(usize),
92    Error(String),
93}
94
95impl<Ctx, C> MultiNodeTestEnv<Ctx, C>
96where
97    Ctx: Clone + Send + Sync + 'static,
98    C: HeartbeatConsumer + Clone + Send + Sync + 'static,
99{
100    /// Creates a new multi-node test environment
101    #[must_use]
102    pub fn new<const N: usize>(config: TangleTestConfig) -> Self {
103        const { assert!(N > 0, "Must have at least 1 initial node") };
104
105        let (command_tx, command_rx) = mpsc::channel(32);
106        let (event_tx, _) = broadcast::channel(100);
107        let (initialized_tx, _initialized_rx) = oneshot::channel();
108
109        let env = Self {
110            nodes: Arc::new(RwLock::new(vec![NodeSlot::Empty; N])),
111            command_tx,
112            event_tx: event_tx.clone(),
113            config: Arc::new(config),
114            initialized_tx: Some(initialized_tx),
115            running_nodes: Arc::new(AtomicUsize::new(0)),
116        };
117
118        Self::spawn_command_handler(
119            env.nodes.clone(),
120            env.config.clone(),
121            env.running_nodes.clone(),
122            command_rx,
123            event_tx,
124        );
125
126        env
127    }
128
129    /// Initializes the multi node test environment with N nodes
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the command channel closed prematurely
134    #[allow(clippy::missing_panics_doc)]
135    pub async fn initialize(&mut self) -> Result<(), Error> {
136        if self.initialized_tx.is_none() {
137            // Already initialized
138            return Ok(());
139        }
140
141        let initial_node_count = self.nodes.read().await.len();
142
143        // First add N nodes
144        for node_id in 0..initial_node_count {
145            self.add_node(node_id).await?;
146        }
147
148        // Setup the bootnodes
149        let nodes = self.nodes.read().await;
150        for (index, node) in nodes.iter().enumerate() {
151            let NodeSlot::Occupied(node) = node else {
152                panic!("Not all nodes were initialized");
153            };
154
155            let mut bootnodes = Vec::new();
156            for node in nodes.iter().enumerate().filter(|(n, _)| *n != index) {
157                let NodeSlot::Occupied(node) = node.1 else {
158                    panic!("Not all nodes were initialized");
159                };
160
161                bootnodes.push(node.addr.clone());
162            }
163
164            let mut env = node.test_env.write().await;
165            env.update_networking_config(bootnodes, node.port);
166            env.set_tangle_producer_consumer().await;
167        }
168
169        // Signal initialization is complete
170        if let Some(tx) = self.initialized_tx.take() {
171            let _ = tx.send(());
172        }
173
174        Ok(())
175    }
176
177    /// Adds a job to the node to be executed when the test is run.
178    ///
179    /// The job is added to the end of the list of jobs and can be stopped using the `stop_job`
180    /// method.
181    pub async fn add_job<J: Job<T, Ctx> + Clone + Send + Sync + 'static, T: 'static>(
182        &self,
183        job: J,
184    ) {
185        let mut nodes = self.nodes.write().await;
186        for node in nodes.iter_mut() {
187            if let NodeSlot::Occupied(node) = node {
188                node.add_job(job.clone()).await;
189            }
190        }
191    }
192
193    /// Send a start command to all nodes
194    ///
195    /// This will clone `context` to all nodes. If your context is node-specific (for example, in a
196    /// networking blueprint), then see [`Self::start_with_contexts()`] for providing per-node contexts.
197    ///
198    /// # Panics
199    ///
200    /// See [`Self::start_with_contexts()`]
201    ///
202    /// # Errors
203    ///
204    /// See [`Self::start_with_contexts()`]
205    ///
206    /// # Examples
207    ///
208    /// ```rust,no_run
209    /// use blueprint_core::extract::Context;
210    /// use blueprint_tangle_testing_utils::TangleTestHarness;
211    /// use tempfile::TempDir;
212    ///
213    /// // This context isn't node specific, it can safely be cloned to all nodes.
214    /// #[derive(Clone)]
215    /// struct MyContext {
216    ///     foo: u64,
217    /// }
218    ///
219    /// async fn some_job(Context(_context): Context<MyContext>) {}
220    ///
221    /// # #[tokio::main]
222    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
223    /// let tmp_dir = TempDir::new()?;
224    ///
225    /// // Initialize the harness with the type, this is an indication that it will need the
226    /// // context later.
227    /// let harness = TangleTestHarness::<MyContext>::setup(tmp_dir).await?;
228    ///
229    /// let (mut test_env, _service_id, _blueprint_id) = harness.setup_services::<1>(false).await?;
230    ///
231    /// // Setup the test environment
232    /// test_env.initialize().await?;
233    /// test_env.add_job(some_job).await;
234    ///
235    /// // Ready to start now, provide the context
236    /// let context = MyContext { foo: 0 };
237    /// test_env.start(context).await?;
238    /// # Ok(()) }
239    /// ```
240    pub async fn start(&mut self, context: Ctx) -> Result<(), Error> {
241        let nodes_len = self.nodes.read().await.len();
242        self.start_with_contexts(vec![context; nodes_len]).await
243    }
244
245    /// Send a start command to all nodes
246    ///
247    /// This takes a list of contexts, that will be applied in the same order as the nodes appear in
248    /// [`Self::node_handles()`].
249    ///
250    /// # Panics
251    ///
252    /// This will panic if `contexts.len()` != `nodes.len()`
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the command channel closed prematurely
257    ///
258    /// # Examples
259    ///
260    /// ```rust,no_run
261    /// use blueprint_core::extract::Context;
262    /// use blueprint_tangle_testing_utils::TangleTestHarness;
263    /// use tempfile::TempDir;
264    ///
265    /// // This context is node specific. Each node needs its own copy.
266    /// #[derive(Clone)]
267    /// struct MyContext {
268    ///     foo: u64,
269    /// }
270    ///
271    /// async fn some_job(Context(_context): Context<MyContext>) {}
272    ///
273    /// // Start up a test with 2 nodes
274    /// const N: usize = 2;
275    ///
276    /// # #[tokio::main]
277    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
278    /// let tmp_dir = TempDir::new()?;
279    ///
280    /// // Initialize the harness with the type, this is an indication that it will need the
281    /// // context later.
282    /// let harness = TangleTestHarness::<MyContext>::setup(tmp_dir).await?;
283    ///
284    /// let (mut test_env, _service_id, _blueprint_id) = harness.setup_services::<N>(false).await?;
285    ///
286    /// // Setup the test environment
287    /// test_env.initialize().await?;
288    /// test_env.add_job(some_job).await;
289    ///
290    /// // Ready to start now, provide the contexts
291    /// let mut contexts = Vec::new();
292    /// let handles = test_env.node_handles().await;
293    /// for (index, handle) in handles.iter().enumerate() {
294    ///     let context = MyContext { foo: index as u64 };
295    ///     contexts.push(context);
296    /// }
297    ///
298    /// test_env.start_with_contexts(contexts).await?;
299    /// # Ok(()) }
300    /// ```
301    pub async fn start_with_contexts(&mut self, contexts: Vec<Ctx>) -> Result<(), Error> {
302        let nodes_len = self.nodes.read().await.len();
303        assert_eq!(
304            nodes_len,
305            contexts.len(),
306            "wrong number of contexts provided"
307        );
308
309        // Start all nodes' runners
310        let (result_tx, result_rx) = oneshot::channel();
311        self.command_tx
312            .send(EnvironmentCommand::Start {
313                result_tx,
314                contexts,
315            })
316            .await
317            .map_err(|e| Error::Setup(e.to_string()))?;
318
319        // Wait for initialization to complete
320        result_rx.await.map_err(|e| Error::Setup(e.to_string()))??;
321
322        Ok(())
323    }
324
325    async fn add_node(&self, node_id: usize) -> Result<(), Error> {
326        let (result_tx, result_rx) = oneshot::channel();
327        self.command_tx
328            .send(EnvironmentCommand::AddNode { node_id, result_tx })
329            .await
330            .map_err(|e| Error::Setup(e.to_string()))?;
331        result_rx.await.map_err(|e| Error::Setup(e.to_string()))?
332    }
333
334    /// Subscribes to test environment events
335    #[must_use]
336    pub fn subscribe(&self) -> broadcast::Receiver<TestEvent> {
337        self.event_tx.subscribe()
338    }
339
340    /// Removes a node from the test environment
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the command channel closed prematurely
345    pub async fn remove_node(&self, node_id: usize) -> Result<(), Error> {
346        let (result_tx, result_rx) = oneshot::channel();
347        self.command_tx
348            .send(EnvironmentCommand::RemoveNode { node_id, result_tx })
349            .await
350            .map_err(|e| Error::Setup(e.to_string()))?;
351        result_rx.await.map_err(|e| Error::Setup(e.to_string()))?
352    }
353
354    /// Shuts down the test environment
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the command channel closed prematurely
359    pub async fn shutdown(&self) -> Result<(), Error> {
360        self.command_tx
361            .send(EnvironmentCommand::Shutdown)
362            .await
363            .map_err(|e| Error::Setup(e.to_string()))?;
364        Ok(())
365    }
366
367    fn spawn_command_handler(
368        nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
369        config: Arc<TangleTestConfig>,
370        running_nodes: Arc<AtomicUsize>,
371        mut command_rx: mpsc::Receiver<EnvironmentCommand<Ctx>>,
372        event_tx: broadcast::Sender<TestEvent>,
373    ) {
374        tokio::task::spawn(async move {
375            let nodes = nodes.clone();
376            while let Some(cmd) = command_rx.recv().await {
377                match cmd {
378                    EnvironmentCommand::AddNode { node_id, result_tx } => {
379                        let result = Self::handle_add_node(
380                            nodes.clone(),
381                            node_id,
382                            config.clone(),
383                            &event_tx,
384                        )
385                        .await;
386                        let _ = result_tx.send(result);
387                    }
388                    EnvironmentCommand::RemoveNode { node_id, result_tx } => {
389                        let result =
390                            Self::handle_remove_node(nodes.clone(), node_id, &event_tx).await;
391                        let _ = result_tx.send(result);
392                    }
393                    EnvironmentCommand::Start {
394                        result_tx,
395                        contexts,
396                    } => {
397                        let result = Self::handle_start(
398                            nodes.clone(),
399                            &event_tx,
400                            running_nodes.clone(),
401                            contexts,
402                        )
403                        .await;
404                        let _ = result_tx.send(result);
405                    }
406                    EnvironmentCommand::Shutdown => {
407                        Self::handle_shutdown(nodes.clone(), &event_tx).await;
408                        break;
409                    }
410                }
411            }
412        });
413    }
414
415    pub async fn node_handles(&self) -> Vec<Arc<NodeHandle<Ctx, C>>> {
416        self.nodes
417            .read()
418            .await
419            .iter()
420            .filter_map(|n| match n {
421                NodeSlot::Occupied(node) => Some(node.clone()),
422                NodeSlot::Empty => None,
423            })
424            .collect()
425    }
426
427    async fn handle_add_node(
428        nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
429        node_id: usize,
430        config: Arc<TangleTestConfig>,
431        event_tx: &broadcast::Sender<TestEvent>,
432    ) -> Result<(), Error> {
433        let node = NodeHandle::new(node_id, &config).await?;
434        nodes.write().await[node_id] = NodeSlot::Occupied(node);
435        let _ = event_tx.send(TestEvent::NodeAdded(node_id));
436        Ok(())
437    }
438
439    async fn handle_remove_node(
440        nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
441        node_id: usize,
442        event_tx: &broadcast::Sender<TestEvent>,
443    ) -> Result<(), Error> {
444        let nodes = nodes.read().await;
445
446        let NodeSlot::Occupied(node) = nodes[node_id].clone() else {
447            return Err(Error::Setup(format!("Node {} not found", node_id)));
448        };
449
450        // Send shutdown command to the node
451        if let Err(e) = node.shutdown().await {
452            let _ = event_tx.send(TestEvent::Error(format!(
453                "Failed to shutdown node {}: {}",
454                node_id, e
455            )));
456        }
457        let _ = event_tx.send(TestEvent::NodeRemoved(node_id));
458        Ok(())
459    }
460
461    async fn handle_start(
462        nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
463        event_tx: &broadcast::Sender<TestEvent>,
464        running_nodes: Arc<AtomicUsize>,
465        contexts: Vec<Ctx>,
466    ) -> Result<(), Error> {
467        let nodes = nodes.read().await;
468
469        assert!(
470            nodes.iter().all(|n| matches!(n, NodeSlot::Occupied(_))),
471            "Not all nodes were initialized"
472        );
473
474        // Start all node runners concurrently
475        let futures = nodes.iter().enumerate().map(|(node_id, node)| {
476            let running_nodes = running_nodes.clone();
477            let context = contexts[node_id].clone();
478
479            async move {
480                let NodeSlot::Occupied(node) = node else {
481                    unreachable!()
482                };
483
484                if let Err(e) = node.start_runner(context).await {
485                    let _ = event_tx.send(TestEvent::Error(format!(
486                        "Failed to start node {}: {}",
487                        node_id, e
488                    )));
489                    return Err(e);
490                }
491                running_nodes.fetch_add(1, Ordering::SeqCst);
492                Ok(())
493            }
494        });
495
496        // Wait for all nodes to start
497        let results = join_all(futures).await;
498        for result in results {
499            result?;
500        }
501
502        Ok(())
503    }
504
505    async fn handle_shutdown(
506        nodes: Arc<RwLock<Vec<NodeSlot<Ctx, C>>>>,
507        event_tx: &broadcast::Sender<TestEvent>,
508    ) {
509        let nodes = nodes.read().await;
510        for (node_id, node) in nodes.iter().enumerate() {
511            if let NodeSlot::Occupied(node) = node {
512                if let Err(e) = node.shutdown().await {
513                    let _ = event_tx.send(TestEvent::Error(format!(
514                        "Failed to shutdown node {}: {}",
515                        node_id, e
516                    )));
517                }
518            }
519        }
520    }
521}
522
523struct NodeState {
524    is_running: bool,
525}
526
527impl Debug for NodeState {
528    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
529        f.debug_struct("NodeState")
530            .field("is_running", &self.is_running)
531            .finish()
532    }
533}
534
535/// Commands that can be sent to individual nodes
536#[non_exhaustive]
537enum NodeCommand {
538    Shutdown,
539}
540
541impl Debug for NodeCommand {
542    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
543        match self {
544            NodeCommand::Shutdown => f.write_str("Shutdown"),
545        }
546    }
547}
548
549/// Represents a single node in the multi-node test environment
550pub struct NodeHandle<Ctx, C>
551where
552    C: HeartbeatConsumer + Send + Sync + 'static,
553{
554    pub node_id: usize,
555    pub addr: Multiaddr,
556    pub port: u16,
557    pub client: TangleClient,
558    pub signer: TanglePairSigner<sp_core::sr25519::Pair>,
559    state: Arc<RwLock<NodeState>>,
560    command_tx: mpsc::Sender<NodeCommand>,
561    pub test_env: Arc<RwLock<TangleTestEnv<Ctx, C>>>,
562}
563
564impl<Ctx, C> NodeHandle<Ctx, C>
565where
566    Ctx: Clone + Send + Sync + 'static,
567    C: HeartbeatConsumer + Send + Sync + 'static,
568{
569    /// Adds a job to the node to be executed when the test is run.
570    ///
571    /// The job is added to the end of the list of jobs and can be stopped using the `stop_job`
572    /// method.
573    pub async fn add_job<J, T>(&self, job: J)
574    where
575        J: Job<T, Ctx> + Send + Sync + 'static,
576        T: 'static,
577    {
578        self.test_env.write().await.add_job(job);
579    }
580
581    pub async fn add_background_service<K: BackgroundService + Send + 'static>(&self, service: K) {
582        self.test_env.write().await.add_background_service(service);
583    }
584
585    pub async fn blueprint_config(&self) -> BlueprintEnvironment {
586        self.test_env.read().await.get_blueprint_config()
587    }
588}
589
590impl<Ctx, C> Debug for NodeHandle<Ctx, C>
591where
592    Ctx: Clone + Send + Sync + 'static,
593    C: HeartbeatConsumer + Send + Sync + 'static,
594{
595    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
596        f.debug_struct("NodeHandle")
597            .field("node_id", &self.node_id)
598            .field("signer", &self.signer.address())
599            .field("test_env", &self.test_env)
600            .finish_non_exhaustive()
601    }
602}
603
604// Implementation for NodeHandle
605impl<Ctx, C> NodeHandle<Ctx, C>
606where
607    Ctx: Clone + Send + Sync + 'static,
608    C: HeartbeatConsumer + Send + Sync + 'static,
609{
610    async fn new(node_id: usize, config: &TangleTestConfig) -> Result<Arc<Self>, Error> {
611        let (command_tx, command_rx) = mpsc::channel(32);
612        let state = Arc::new(RwLock::new(NodeState { is_running: true }));
613
614        // Create node environment and client
615        let mut env = generate_env_from_node_id(
616            ENDOWED_TEST_NAMES[node_id],
617            config.http_endpoint.clone(),
618            config.ws_endpoint.clone(),
619            config.temp_dir.as_path(),
620        )
621        .await?;
622        env.bridge_socket_path = Some(config.bridge_socket_path.clone());
623
624        let client = env.tangle_client().await?;
625        let keystore = env.keystore();
626        let sr25519_public = keystore
627            .first_local::<SpSr25519>()
628            .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
629        let sr25519_pair = keystore
630            .get_secret::<SpSr25519>(&sr25519_public)
631            .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
632        let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
633
634        // Create TangleTestEnv for this node
635        let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?;
636
637        let port = find_open_tcp_bind_port();
638        blueprint_core::info!("Binding node {node_id} to port {port}");
639
640        let addr = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{port}"))
641            .expect("Should parse MultiAddr");
642
643        let node = Arc::new(Self {
644            node_id,
645            addr,
646            port,
647            client,
648            signer: sr25519_signer,
649            state,
650            command_tx,
651            test_env: Arc::new(RwLock::new(test_env)),
652        });
653
654        Self::spawn_command_handler(&node, command_rx);
655        Ok(node)
656    }
657
658    /// Set the `QoS` config for this node
659    pub async fn set_qos_config(&self, config: QoSConfig) {
660        self.test_env.write().await.set_qos_config(config);
661    }
662
663    pub async fn set_qos_service(&self, service: Arc<QoSService<C>>) {
664        self.test_env.write().await.set_qos_service(service);
665    }
666
667    /// Shuts down the node
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if the node fails to shutdown in time
672    pub async fn shutdown(&self) -> Result<(), Error> {
673        self.command_tx
674            .send(NodeCommand::Shutdown)
675            .await
676            .map_err(|e| Error::Setup(e.to_string()))?;
677
678        // Wait for the node to mark itself as not running
679        let mut retries = 0;
680        while retries < 10 {
681            if !self.state.read().await.is_running {
682                return Ok(());
683            }
684            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
685            retries += 1;
686        }
687
688        Err(Error::Setup("Node failed to shutdown in time".to_string()))
689    }
690
691    /// Start the runner for this node
692    ///
693    /// # Errors
694    ///
695    /// Any errors will be from the runner itself, likely caused by job failure.
696    pub async fn start_runner(&self, context: Ctx) -> Result<(), Error> {
697        let result = {
698            let mut test_env_guard = self.test_env.write().await;
699            test_env_guard.run_runner(context).await
700        };
701        result.map_err(|e| Error::Setup(e.to_string()))
702    }
703
704    fn spawn_command_handler(node: &Self, mut command_rx: mpsc::Receiver<NodeCommand>) {
705        let state = node.state.clone();
706        tokio::task::spawn(async move {
707            let Some(cmd) = command_rx.recv().await else {
708                return;
709            };
710            match cmd {
711                NodeCommand::Shutdown => {
712                    let mut state = state.write().await;
713                    state.is_running = false;
714                }
715            }
716        });
717    }
718
719    /// Gets a reference to the node's client
720    #[must_use]
721    pub fn client(&self) -> &TangleClient {
722        &self.client
723    }
724
725    /// Gets a reference to the node's signer
726    #[must_use]
727    pub fn signer(&self) -> &TanglePairSigner<sp_core::sr25519::Pair> {
728        &self.signer
729    }
730}
731
732pub(crate) fn find_open_tcp_bind_port() -> u16 {
733    let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("Should bind to localhost");
734    let port = listener
735        .local_addr()
736        .expect("Should have a local address")
737        .port();
738    drop(listener);
739    port
740}