zombienet_orchestrator/network/
node.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::{Deserialize, Serialize, Serializer};
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27    #[error("metric '{0}' not found!")]
28    MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33    #[serde(serialize_with = "serialize_provider_node")]
34    pub(crate) inner: DynNode,
35    // TODO: do we need the full spec here?
36    // Maybe a reduce set of values.
37    pub(crate) spec: NodeSpec,
38    pub(crate) name: String,
39    pub(crate) ws_uri: String,
40    pub(crate) multiaddr: String,
41    pub(crate) prometheus_uri: String,
42    #[serde(skip)]
43    metrics_cache: Arc<RwLock<MetricMap>>,
44    #[serde(skip)]
45    is_running: Arc<AtomicBool>,
46}
47
48#[derive(Deserialize)]
49pub(crate) struct RawNetworkNode {
50    pub(crate) name: String,
51    pub(crate) ws_uri: String,
52    pub(crate) prometheus_uri: String,
53    pub(crate) multiaddr: String,
54    pub(crate) spec: NodeSpec,
55    pub(crate) inner: serde_json::Value,
56}
57
58/// Result of waiting for a certain number of log lines to appear.
59///
60/// Indicates whether the log line count condition was met within the timeout period.
61///
62/// # Variants
63/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
64///     * `count`: The number of matching log lines at the time of satisfaction.
65/// - `TargetFailed(count)` – The condition was not met within the timeout.
66///     * `count`: The final number of matching log lines at timeout expiration.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum LogLineCount {
69    TargetReached(u32),
70    TargetFailed(u32),
71}
72
73impl LogLineCount {
74    pub fn success(&self) -> bool {
75        match self {
76            Self::TargetReached(..) => true,
77            Self::TargetFailed(..) => false,
78        }
79    }
80}
81
82/// Configuration for controlling log line count waiting behavior.
83///
84/// Allows specifying a custom predicate on the number of matching log lines,
85/// a timeout in seconds, and whether the system should wait the entire timeout duration.
86///
87/// # Fields
88/// - `predicate`: A function that takes the current number of matching lines and
89///   returns `true` if the condition is satisfied.
90/// - `timeout_secs`: Maximum number of seconds to wait.
91/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
92///   for the full timeout duration, even if the condition is already met early.
93///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
94#[derive(Clone)]
95pub struct LogLineCountOptions {
96    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
97    pub timeout: Duration,
98    pub wait_until_timeout_elapses: bool,
99}
100
101impl LogLineCountOptions {
102    pub fn new(
103        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
104        timeout: Duration,
105        wait_until_timeout_elapses: bool,
106    ) -> Self {
107        Self {
108            predicate: Arc::new(predicate),
109            timeout,
110            wait_until_timeout_elapses,
111        }
112    }
113
114    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
115        Self::new(|n| n == 0, timeout, true)
116    }
117}
118
119// #[derive(Clone, Debug)]
120// pub struct QueryMetricOptions {
121//     use_cache: bool,
122//     treat_not_found_as_zero: bool,
123// }
124
125// impl Default for QueryMetricOptions {
126//     fn default() -> Self {
127//         Self { use_cache: false, treat_not_found_as_zero: true }
128//     }
129// }
130
131impl NetworkNode {
132    /// Create a new NetworkNode
133    pub(crate) fn new<T: Into<String>>(
134        name: T,
135        ws_uri: T,
136        prometheus_uri: T,
137        multiaddr: T,
138        spec: NodeSpec,
139        inner: DynNode,
140    ) -> Self {
141        Self {
142            name: name.into(),
143            ws_uri: ws_uri.into(),
144            prometheus_uri: prometheus_uri.into(),
145            inner,
146            spec,
147            multiaddr: multiaddr.into(),
148            metrics_cache: Arc::new(Default::default()),
149            is_running: Arc::new(AtomicBool::new(false)),
150        }
151    }
152
153    pub(crate) fn is_running(&self) -> bool {
154        self.is_running.load(Ordering::Acquire)
155    }
156
157    pub(crate) fn set_is_running(&self, is_running: bool) {
158        self.is_running.store(is_running, Ordering::Release);
159    }
160
161    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
162        self.multiaddr = multiaddr.into();
163    }
164
165    pub fn name(&self) -> &str {
166        &self.name
167    }
168
169    pub fn args(&self) -> Vec<&str> {
170        self.inner.args()
171    }
172
173    pub fn spec(&self) -> &NodeSpec {
174        &self.spec
175    }
176
177    pub fn ws_uri(&self) -> &str {
178        &self.ws_uri
179    }
180
181    pub fn multiaddr(&self) -> &str {
182        self.multiaddr.as_ref()
183    }
184
185    // Subxt
186
187    /// Get the rpc client for the node
188    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
189        get_client_from_url(&self.ws_uri).await
190    }
191
192    /// Get the [online client](subxt::client::OnlineClient) for the node
193    #[deprecated = "Use `wait_client` instead."]
194    pub async fn client<Config: subxt::Config>(
195        &self,
196    ) -> Result<OnlineClient<Config>, subxt::Error> {
197        self.try_client().await
198    }
199
200    /// Try to connect to the node.
201    ///
202    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
203    /// the node to appear before it connects to it. This function directly tries
204    /// to connect to the node and returns an error if the node is not yet available
205    /// at that point in time.
206    ///
207    /// Returns a [`OnlineClient`] on success.
208    pub async fn try_client<Config: subxt::Config>(
209        &self,
210    ) -> Result<OnlineClient<Config>, subxt::Error> {
211        get_client_from_url(&self.ws_uri).await
212    }
213
214    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
215    pub async fn wait_client<Config: subxt::Config>(
216        &self,
217    ) -> Result<OnlineClient<Config>, anyhow::Error> {
218        debug!("wait_client ws_uri: {}", self.ws_uri());
219        wait_ws_ready(self.ws_uri())
220            .await
221            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
222
223        self.try_client()
224            .await
225            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
226    }
227
228    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
229    pub async fn wait_client_with_timeout<Config: subxt::Config>(
230        &self,
231        timeout_secs: impl Into<u64>,
232    ) -> Result<OnlineClient<Config>, anyhow::Error> {
233        debug!("waiting until subxt client is ready");
234        tokio::time::timeout(
235            Duration::from_secs(timeout_secs.into()),
236            self.wait_client::<Config>(),
237        )
238        .await?
239    }
240
241    // Commands
242
243    /// Pause the node, this is implemented by pausing the
244    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
245    ///
246    /// Note: Using this method with the native provider is currently unsupported.
247    pub async fn pause(&self) -> Result<(), anyhow::Error> {
248        self.set_is_running(false);
249        self.inner.pause().await?;
250        Ok(())
251    }
252
253    /// Resume the node, this is implemented by resuming the
254    /// actual process (e.g polkadot) with sending `SIGCONT` signal
255    ///
256    /// Note: Using this method with the native provider is currently unsupported.
257    pub async fn resume(&self) -> Result<(), anyhow::Error> {
258        self.set_is_running(true);
259        self.inner.resume().await?;
260        Ok(())
261    }
262
263    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
264    ///
265    /// Note: Using this method with the native provider is currently unsupported.
266    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
267        self.set_is_running(false);
268        self.inner.restart(after).await?;
269        self.set_is_running(true);
270        Ok(())
271    }
272
273    // Metrics assertions
274
275    /// Get metric value 'by name' from Prometheus (exposed by the node)
276    /// metric name can be:
277    /// with prefix (e.g: 'polkadot_')
278    /// with chain attribute (e.g: 'chain=rococo-local')
279    /// without prefix and/or without chain attribute
280    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
281        let metric_name = metric_name.into();
282        // force cache reload
283        self.fetch_metrics().await?;
284        // by default we treat not found as 0 (same in v1)
285        self.metric(&metric_name, true).await
286    }
287
288    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
289    /// metric name can be:
290    /// with prefix (e.g: 'polkadot_')
291    /// with chain attribute (e.g: 'chain=rococo-local')
292    /// without prefix and/or without chain attribute
293    ///
294    /// We first try to assert on the value using the cached metrics and
295    /// if not meet the criteria we reload the cache and check again
296    pub async fn assert(
297        &self,
298        metric_name: impl Into<String>,
299        value: impl Into<f64>,
300    ) -> Result<bool, anyhow::Error> {
301        let value: f64 = value.into();
302        self.assert_with(metric_name, |v| v == value).await
303    }
304
305    /// Assert on a metric value using a given predicate.
306    /// See [`NetworkNode::reports`] description for details on metric name.
307    pub async fn assert_with(
308        &self,
309        metric_name: impl Into<String>,
310        predicate: impl Fn(f64) -> bool,
311    ) -> Result<bool, anyhow::Error> {
312        let metric_name = metric_name.into();
313        // reload metrics
314        self.fetch_metrics().await?;
315        let val = self.metric(&metric_name, true).await?;
316        trace!("🔎 Current value {val} passed to the predicated?");
317        Ok(predicate(val))
318    }
319
320    // Wait methods for metrics
321
322    /// Wait until a metric value pass the `predicate`
323    pub async fn wait_metric(
324        &self,
325        metric_name: impl Into<String>,
326        predicate: impl Fn(f64) -> bool,
327    ) -> Result<(), anyhow::Error> {
328        let metric_name = metric_name.into();
329        debug!("waiting until metric {metric_name} pass the predicate");
330        loop {
331            let res = self.assert_with(&metric_name, &predicate).await;
332            match res {
333                Ok(res) => {
334                    if res {
335                        return Ok(());
336                    }
337                },
338                Err(e) => match e.downcast::<reqwest::Error>() {
339                    Ok(io_err) => {
340                        if !skip_err_while_waiting(&io_err) {
341                            return Err(io_err.into());
342                        }
343                    },
344                    Err(other) => {
345                        match other.downcast::<NetworkNodeError>() {
346                            Ok(node_err) => {
347                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
348                                    return Err(node_err.into());
349                                }
350                            },
351                            Err(other) => return Err(other),
352                        };
353                    },
354                },
355            }
356
357            // sleep to not spam prometheus
358            tokio::time::sleep(Duration::from_secs(1)).await;
359        }
360    }
361
362    /// Wait until a metric value pass the `predicate`
363    /// with a timeout (secs)
364    pub async fn wait_metric_with_timeout(
365        &self,
366        metric_name: impl Into<String>,
367        predicate: impl Fn(f64) -> bool,
368        timeout_secs: impl Into<u64>,
369    ) -> Result<(), anyhow::Error> {
370        let metric_name = metric_name.into();
371        let secs = timeout_secs.into();
372        debug!("waiting until metric {metric_name} pass the predicate");
373        let res = tokio::time::timeout(
374            Duration::from_secs(secs),
375            self.wait_metric(&metric_name, predicate),
376        )
377        .await;
378
379        if let Ok(inner_res) = res {
380            match inner_res {
381                Ok(_) => Ok(()),
382                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
383            }
384        } else {
385            // timeout
386            Err(anyhow!(
387                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
388            ))
389        }
390    }
391
392    // Logs
393
394    /// Get the logs of the node
395    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
396    pub async fn logs(&self) -> Result<String, anyhow::Error> {
397        Ok(self.inner.logs().await?)
398    }
399
400    /// Wait until a the number of matching log lines is reach
401    pub async fn wait_log_line_count(
402        &self,
403        pattern: impl Into<String>,
404        is_glob: bool,
405        count: usize,
406    ) -> Result<(), anyhow::Error> {
407        let pattern = pattern.into();
408        let pattern_clone = pattern.clone();
409        debug!("waiting until we find pattern {pattern} {count} times");
410        let match_fn: BoxedClosure = if is_glob {
411            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
412        } else {
413            let re = Regex::new(&pattern)?;
414            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
415        };
416
417        loop {
418            let mut q = 0_usize;
419            let logs = self.logs().await?;
420            for line in logs.lines() {
421                trace!("line is {line}");
422                if match_fn(line)? {
423                    trace!("pattern {pattern_clone} match in line {line}");
424                    q += 1;
425                    if q >= count {
426                        return Ok(());
427                    }
428                }
429            }
430
431            tokio::time::sleep(Duration::from_secs(2)).await;
432        }
433    }
434
435    /// Waits until the number of matching log lines satisfies a custom condition,
436    /// optionally waiting for the entire duration of the timeout.
437    ///
438    /// This method searches log lines for a given substring or glob pattern,
439    /// and evaluates the number of matching lines using a user-provided predicate function.
440    /// Optionally, it can wait for the full timeout duration to ensure the condition
441    /// holds consistently (e.g., for verifying absence of logs).
442    ///
443    /// # Arguments
444    /// * `substring` - The substring or pattern to match within log lines.
445    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
446    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
447    ///
448    /// # Returns
449    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
450    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
451    /// * `Err(e)` if an error occurred during log retrieval or matching.
452    ///
453    /// # Example
454    /// ```rust
455    /// # use std::{sync::Arc, time::Duration};
456    /// # use provider::NativeProvider;
457    /// # use support::{fs::local::LocalFileSystem};
458    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
459    /// # use configuration::NetworkConfig;
460    /// # async fn example() -> Result<(), anyhow::Error> {
461    /// #   let provider = NativeProvider::new(LocalFileSystem {});
462    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
463    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
464    /// #   let network = orchestrator.spawn(config).await?;
465    /// let node = network.get_node("alice")?;
466    /// // Wait (up to 10 seconds) until pattern occurs once
467    /// let options = LogLineCountOptions {
468    ///     predicate: Arc::new(|count| count == 1),
469    ///     timeout: Duration::from_secs(10),
470    ///     wait_until_timeout_elapses: false,
471    /// };
472    /// let result = node
473    ///     .wait_log_line_count_with_timeout("error", false, options)
474    ///     .await?;
475    /// #   Ok(())
476    /// # }
477    /// ```
478    pub async fn wait_log_line_count_with_timeout(
479        &self,
480        substring: impl Into<String>,
481        is_glob: bool,
482        options: LogLineCountOptions,
483    ) -> Result<LogLineCount, anyhow::Error> {
484        let substring = substring.into();
485        debug!(
486            "waiting until match lines count within {} seconds",
487            options.timeout.as_secs_f64()
488        );
489
490        let start = tokio::time::Instant::now();
491
492        let match_fn: BoxedClosure = if is_glob {
493            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
494        } else {
495            let re = Regex::new(&substring)?;
496            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
497        };
498
499        if options.wait_until_timeout_elapses {
500            tokio::time::sleep(options.timeout).await;
501        }
502
503        let mut q;
504        loop {
505            q = 0_u32;
506            let logs = self.logs().await?;
507            for line in logs.lines() {
508                if match_fn(line)? {
509                    q += 1;
510
511                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
512                    // end after the whole log file is processed. This is to address the cases when the
513                    // predicate becomes true and false again.
514                    // eg. expected exactly 2 matching lines are expected but 3 are present
515                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
516                        return Ok(LogLineCount::TargetReached(q));
517                    }
518                }
519            }
520
521            if start.elapsed() >= options.timeout {
522                break;
523            }
524
525            tokio::time::sleep(Duration::from_secs(2)).await;
526        }
527
528        if (options.predicate)(q) {
529            Ok(LogLineCount::TargetReached(q))
530        } else {
531            Ok(LogLineCount::TargetFailed(q))
532        }
533    }
534
535    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
536        let response = reqwest::get(&self.prometheus_uri).await?;
537        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
538        let mut cache = self.metrics_cache.write().await;
539        *cache = metrics;
540        Ok(())
541    }
542
543    /// Query individual metric by name
544    async fn metric(
545        &self,
546        metric_name: &str,
547        treat_not_found_as_zero: bool,
548    ) -> Result<f64, anyhow::Error> {
549        let mut metrics_map = self.metrics_cache.read().await;
550        if metrics_map.is_empty() {
551            // reload metrics
552            drop(metrics_map);
553            self.fetch_metrics().await?;
554            metrics_map = self.metrics_cache.read().await;
555        }
556
557        if let Some(val) = metrics_map.get(metric_name) {
558            Ok(*val)
559        } else if treat_not_found_as_zero {
560            Ok(0_f64)
561        } else {
562            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
563        }
564    }
565
566    /// Waits given number of seconds until node reports that it is up and running, which
567    /// is determined by metric 'process_start_time_seconds', which should appear,
568    /// when node finished booting up.
569    ///
570    ///
571    /// # Arguments
572    /// * `timeout_secs` - The number of seconds to wait.
573    ///
574    /// # Returns
575    /// * `Ok()` if the node is up before timeout occured.
576    /// * `Err(e)` if timeout or other error occurred while waiting.
577    pub async fn wait_until_is_up(
578        &self,
579        timeout_secs: impl Into<u64>,
580    ) -> Result<(), anyhow::Error> {
581        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
582            .await
583            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
584    }
585}
586
587impl std::fmt::Debug for NetworkNode {
588    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589        f.debug_struct("NetworkNode")
590            .field("inner", &"inner_skipped")
591            .field("spec", &self.spec)
592            .field("name", &self.name)
593            .field("ws_uri", &self.ws_uri)
594            .field("prometheus_uri", &self.prometheus_uri)
595            .finish()
596    }
597}
598
599fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
600where
601    S: Serializer,
602{
603    erased_serde::serialize(node.as_ref(), serializer)
604}
605
606// TODO: mock and impl more unit tests
607#[cfg(test)]
608mod tests {
609    use std::{
610        path::{Path, PathBuf},
611        sync::{Arc, Mutex},
612    };
613
614    use async_trait::async_trait;
615    use provider::{types::*, ProviderError, ProviderNode};
616
617    use super::*;
618
619    #[derive(Serialize)]
620    struct MockNode {
621        logs: Arc<Mutex<Vec<String>>>,
622    }
623
624    impl MockNode {
625        fn new() -> Self {
626            Self {
627                logs: Arc::new(Mutex::new(vec![])),
628            }
629        }
630
631        fn logs_push(&self, lines: Vec<impl Into<String>>) {
632            self.logs
633                .lock()
634                .unwrap()
635                .extend(lines.into_iter().map(|l| l.into()));
636        }
637    }
638
639    #[async_trait]
640    impl ProviderNode for MockNode {
641        fn name(&self) -> &str {
642            todo!()
643        }
644
645        fn args(&self) -> Vec<&str> {
646            todo!()
647        }
648
649        fn base_dir(&self) -> &PathBuf {
650            todo!()
651        }
652
653        fn config_dir(&self) -> &PathBuf {
654            todo!()
655        }
656
657        fn data_dir(&self) -> &PathBuf {
658            todo!()
659        }
660
661        fn relay_data_dir(&self) -> &PathBuf {
662            todo!()
663        }
664
665        fn scripts_dir(&self) -> &PathBuf {
666            todo!()
667        }
668
669        fn log_path(&self) -> &PathBuf {
670            todo!()
671        }
672
673        fn log_cmd(&self) -> String {
674            todo!()
675        }
676
677        fn path_in_node(&self, _file: &Path) -> PathBuf {
678            todo!()
679        }
680
681        async fn logs(&self) -> Result<String, ProviderError> {
682            Ok(self.logs.lock().unwrap().join("\n"))
683        }
684
685        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
686            todo!()
687        }
688
689        async fn run_command(
690            &self,
691            _options: RunCommandOptions,
692        ) -> Result<ExecutionResult, ProviderError> {
693            todo!()
694        }
695
696        async fn run_script(
697            &self,
698            _options: RunScriptOptions,
699        ) -> Result<ExecutionResult, ProviderError> {
700            todo!()
701        }
702
703        async fn send_file(
704            &self,
705            _local_file_path: &Path,
706            _remote_file_path: &Path,
707            _mode: &str,
708        ) -> Result<(), ProviderError> {
709            todo!()
710        }
711
712        async fn receive_file(
713            &self,
714            _remote_file_path: &Path,
715            _local_file_path: &Path,
716        ) -> Result<(), ProviderError> {
717            todo!()
718        }
719
720        async fn pause(&self) -> Result<(), ProviderError> {
721            todo!()
722        }
723
724        async fn resume(&self) -> Result<(), ProviderError> {
725            todo!()
726        }
727
728        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
729            todo!()
730        }
731
732        async fn destroy(&self) -> Result<(), ProviderError> {
733            todo!()
734        }
735    }
736
737    #[tokio::test(flavor = "multi_thread")]
738    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
739        let mock_provider = Arc::new(MockNode::new());
740        let mock_node = NetworkNode::new(
741            "node1",
742            "ws_uri",
743            "prometheus_uri",
744            "multiaddr",
745            NodeSpec::default(),
746            mock_provider.clone(),
747        );
748
749        mock_provider.logs_push(vec![
750            "system booting",
751            "stub line 1",
752            "stub line 2",
753            "system ready",
754        ]);
755
756        // Wait (up to 10 seconds) until pattern occurs once
757        let options = LogLineCountOptions {
758            predicate: Arc::new(|n| n == 1),
759            timeout: Duration::from_secs(10),
760            wait_until_timeout_elapses: false,
761        };
762
763        let log_line_count = mock_node
764            .wait_log_line_count_with_timeout("system ready", false, options)
765            .await?;
766
767        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
768
769        Ok(())
770    }
771
772    #[tokio::test(flavor = "multi_thread")]
773    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
774        let mock_provider = Arc::new(MockNode::new());
775        let mock_node = NetworkNode::new(
776            "node1",
777            "ws_uri",
778            "prometheus_uri",
779            "multiaddr",
780            NodeSpec::default(),
781            mock_provider.clone(),
782        );
783
784        mock_provider.logs_push(vec![
785            "system booting",
786            "stub line 1",
787            "stub line 2",
788            "system ready",
789        ]);
790
791        // Wait (up to 4 seconds) until pattern occurs twice
792        let options = LogLineCountOptions {
793            predicate: Arc::new(|n| n == 2),
794            timeout: Duration::from_secs(4),
795            wait_until_timeout_elapses: false,
796        };
797
798        let task = tokio::spawn({
799            async move {
800                mock_node
801                    .wait_log_line_count_with_timeout("system ready", false, options)
802                    .await
803                    .unwrap()
804            }
805        });
806
807        tokio::time::sleep(Duration::from_secs(2)).await;
808
809        mock_provider.logs_push(vec!["system ready"]);
810
811        let log_line_count = task.await?;
812
813        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
814
815        Ok(())
816    }
817
818    #[tokio::test(flavor = "multi_thread")]
819    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
820        let mock_provider = Arc::new(MockNode::new());
821        let mock_node = NetworkNode::new(
822            "node1",
823            "ws_uri",
824            "prometheus_uri",
825            "multiaddr",
826            NodeSpec::default(),
827            mock_provider.clone(),
828        );
829
830        mock_provider.logs_push(vec![
831            "system booting",
832            "stub line 1",
833            "stub line 2",
834            "system ready",
835        ]);
836
837        // Wait (up to 2 seconds) until pattern occurs twice
838        let options = LogLineCountOptions {
839            predicate: Arc::new(|n| n == 2),
840            timeout: Duration::from_secs(2),
841            wait_until_timeout_elapses: false,
842        };
843
844        let log_line_count = mock_node
845            .wait_log_line_count_with_timeout("system ready", false, options)
846            .await?;
847
848        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
849
850        Ok(())
851    }
852
853    #[tokio::test(flavor = "multi_thread")]
854    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
855        let mock_provider = Arc::new(MockNode::new());
856        let mock_node = NetworkNode::new(
857            "node1",
858            "ws_uri",
859            "prometheus_uri",
860            "multiaddr",
861            NodeSpec::default(),
862            mock_provider.clone(),
863        );
864
865        mock_provider.logs_push(vec![
866            "system booting",
867            "stub line 1",
868            "stub line 2",
869            "system ready",
870        ]);
871
872        // Wait until timeout and check if pattern occurs exactly twice
873        let options = LogLineCountOptions {
874            predicate: Arc::new(|n| n == 2),
875            timeout: Duration::from_secs(2),
876            wait_until_timeout_elapses: true,
877        };
878
879        let task = tokio::spawn({
880            async move {
881                mock_node
882                    .wait_log_line_count_with_timeout("system ready", false, options)
883                    .await
884                    .unwrap()
885            }
886        });
887
888        tokio::time::sleep(Duration::from_secs(1)).await;
889
890        mock_provider.logs_push(vec!["system ready"]);
891        mock_provider.logs_push(vec!["system ready"]);
892
893        let log_line_count = task.await?;
894
895        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
896
897        Ok(())
898    }
899
900    #[tokio::test(flavor = "multi_thread")]
901    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
902        let mock_provider = Arc::new(MockNode::new());
903        let mock_node = NetworkNode::new(
904            "node1",
905            "ws_uri",
906            "prometheus_uri",
907            "multiaddr",
908            NodeSpec::default(),
909            mock_provider.clone(),
910        );
911
912        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
913
914        let task = tokio::spawn({
915            async move {
916                mock_node
917                    .wait_log_line_count_with_timeout(
918                        "system ready",
919                        false,
920                        // Wait until timeout and make sure pattern occurred zero times
921                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
922                    )
923                    .await
924                    .unwrap()
925            }
926        });
927
928        tokio::time::sleep(Duration::from_secs(1)).await;
929
930        mock_provider.logs_push(vec!["stub line 3"]);
931
932        assert!(task.await?.success());
933
934        Ok(())
935    }
936
937    #[tokio::test(flavor = "multi_thread")]
938    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
939        let mock_provider = Arc::new(MockNode::new());
940        let mock_node = NetworkNode::new(
941            "node1",
942            "ws_uri",
943            "prometheus_uri",
944            "multiaddr",
945            NodeSpec::default(),
946            mock_provider.clone(),
947        );
948
949        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
950
951        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
952        let options = LogLineCountOptions {
953            predicate: Arc::new(|n| (2..=5).contains(&n)),
954            timeout: Duration::from_secs(2),
955            wait_until_timeout_elapses: true,
956        };
957
958        let task = tokio::spawn({
959            async move {
960                mock_node
961                    .wait_log_line_count_with_timeout("system ready", false, options)
962                    .await
963                    .unwrap()
964            }
965        });
966
967        tokio::time::sleep(Duration::from_secs(1)).await;
968
969        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
970
971        assert!(task.await?.success());
972
973        Ok(())
974    }
975
976    #[tokio::test(flavor = "multi_thread")]
977    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
978        let mock_provider = Arc::new(MockNode::new());
979        let mock_node = NetworkNode::new(
980            "node1",
981            "ws_uri",
982            "prometheus_uri",
983            "multiaddr",
984            NodeSpec::default(),
985            mock_provider.clone(),
986        );
987
988        mock_provider.logs_push(vec![
989            "system booting",
990            "stub line 1",
991            // this line should not match
992            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
993            "stub line 2"
994        ]);
995
996        let options = LogLineCountOptions {
997            predicate: Arc::new(|n| n == 1),
998            timeout: Duration::from_secs(3),
999            wait_until_timeout_elapses: true,
1000        };
1001
1002        let task = tokio::spawn({
1003            async move {
1004                mock_node
1005                    .wait_log_line_count_with_timeout(
1006                        "error(?! importing block .*: block has an unknown parent)",
1007                        false,
1008                        options,
1009                    )
1010                    .await
1011                    .unwrap()
1012            }
1013        });
1014
1015        tokio::time::sleep(Duration::from_secs(1)).await;
1016
1017        mock_provider.logs_push(vec![
1018            "system ready",
1019            // this line should match
1020            "system error",
1021            "system ready",
1022        ]);
1023
1024        assert!(task.await?.success());
1025
1026        Ok(())
1027    }
1028
1029    #[tokio::test(flavor = "multi_thread")]
1030    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1031    ) -> Result<(), anyhow::Error> {
1032        let mock_provider = Arc::new(MockNode::new());
1033        let mock_node = NetworkNode::new(
1034            "node1",
1035            "ws_uri",
1036            "prometheus_uri",
1037            "multiaddr",
1038            NodeSpec::default(),
1039            mock_provider.clone(),
1040        );
1041
1042        mock_provider.logs_push(vec![
1043            "system booting",
1044            "stub line 1",
1045            // this line should not match
1046            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1047            "stub line 2"
1048        ]);
1049
1050        let options = LogLineCountOptions {
1051            predicate: Arc::new(|n| n == 1),
1052            timeout: Duration::from_secs(6),
1053            wait_until_timeout_elapses: true,
1054        };
1055
1056        let task = tokio::spawn({
1057            async move {
1058                mock_node
1059                    .wait_log_line_count_with_timeout(
1060                        "error(?! importing block .*: block has an unknown parent)",
1061                        false,
1062                        options,
1063                    )
1064                    .await
1065                    .unwrap()
1066            }
1067        });
1068
1069        tokio::time::sleep(Duration::from_secs(1)).await;
1070
1071        mock_provider.logs_push(vec!["system ready", "system ready"]);
1072
1073        assert!(!task.await?.success());
1074
1075        Ok(())
1076    }
1077
1078    #[tokio::test(flavor = "multi_thread")]
1079    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1080        let mock_provider = Arc::new(MockNode::new());
1081        let mock_node = NetworkNode::new(
1082            "node1",
1083            "ws_uri",
1084            "prometheus_uri",
1085            "multiaddr",
1086            NodeSpec::default(),
1087            mock_provider.clone(),
1088        );
1089
1090        mock_provider.logs_push(vec![
1091            "system booting",
1092            "stub line 1",
1093            // this line should not match
1094            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1095            "stub line 2"
1096        ]);
1097
1098        let task = tokio::spawn({
1099            async move {
1100                mock_node
1101                    .wait_log_line_count(
1102                        "error(?! importing block .*: block has an unknown parent)",
1103                        false,
1104                        1,
1105                    )
1106                    .await
1107                    .unwrap()
1108            }
1109        });
1110
1111        tokio::time::sleep(Duration::from_secs(1)).await;
1112
1113        mock_provider.logs_push(vec![
1114            "system ready",
1115            // this line should match
1116            "system error",
1117            "system ready",
1118        ]);
1119
1120        assert!(task.await.is_ok());
1121
1122        Ok(())
1123    }
1124
1125    #[tokio::test(flavor = "multi_thread")]
1126    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1127        let mock_provider = Arc::new(MockNode::new());
1128        let mock_node = NetworkNode::new(
1129            "node1",
1130            "ws_uri",
1131            "prometheus_uri",
1132            "multiaddr",
1133            NodeSpec::default(),
1134            mock_provider.clone(),
1135        );
1136
1137        mock_provider.logs_push(vec![
1138            "system booting",
1139            "stub line 1",
1140            // this line should not match
1141            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1142            "stub line 2"
1143        ]);
1144
1145        let options = LogLineCountOptions {
1146            predicate: Arc::new(|count| count == 1),
1147            timeout: Duration::from_secs(2),
1148            wait_until_timeout_elapses: true,
1149        };
1150
1151        let task = tokio::spawn({
1152            async move {
1153                // we expect no match, thus wait with timeout
1154                mock_node
1155                    .wait_log_line_count_with_timeout(
1156                        "error(?! importing block .*: block has an unknown parent)",
1157                        false,
1158                        options,
1159                    )
1160                    .await
1161                    .unwrap()
1162            }
1163        });
1164
1165        tokio::time::sleep(Duration::from_secs(1)).await;
1166
1167        mock_provider.logs_push(vec!["system ready", "system ready"]);
1168
1169        assert!(!task.await?.success());
1170
1171        Ok(())
1172    }
1173}