1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use fancy_regex::Regex;
11use glob_match::glob_match;
12use prom_metrics_parser::MetricMap;
13use provider::DynNode;
14use serde::{Deserialize, Serialize, Serializer};
15use subxt::{backend::rpc::RpcClient, OnlineClient};
16use support::net::{skip_err_while_waiting, wait_ws_ready};
17use thiserror::Error;
18use tokio::sync::RwLock;
19use tracing::{debug, trace};
20
21use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
22
23type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
24
25#[derive(Error, Debug)]
26pub enum NetworkNodeError {
27 #[error("metric '{0}' not found!")]
28 MetricNotFound(String),
29}
30
31#[derive(Clone, Serialize)]
32pub struct NetworkNode {
33 #[serde(serialize_with = "serialize_provider_node")]
34 pub(crate) inner: DynNode,
35 pub(crate) spec: NodeSpec,
38 pub(crate) name: String,
39 pub(crate) ws_uri: String,
40 pub(crate) multiaddr: String,
41 pub(crate) prometheus_uri: String,
42 #[serde(skip)]
43 metrics_cache: Arc<RwLock<MetricMap>>,
44 #[serde(skip)]
45 is_running: Arc<AtomicBool>,
46}
47
48#[derive(Deserialize)]
49pub(crate) struct RawNetworkNode {
50 pub(crate) name: String,
51 pub(crate) ws_uri: String,
52 pub(crate) prometheus_uri: String,
53 pub(crate) multiaddr: String,
54 pub(crate) spec: NodeSpec,
55 pub(crate) inner: serde_json::Value,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum LogLineCount {
69 TargetReached(u32),
70 TargetFailed(u32),
71}
72
73impl LogLineCount {
74 pub fn success(&self) -> bool {
75 match self {
76 Self::TargetReached(..) => true,
77 Self::TargetFailed(..) => false,
78 }
79 }
80}
81
82#[derive(Clone)]
95pub struct LogLineCountOptions {
96 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
97 pub timeout: Duration,
98 pub wait_until_timeout_elapses: bool,
99}
100
101impl LogLineCountOptions {
102 pub fn new(
103 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
104 timeout: Duration,
105 wait_until_timeout_elapses: bool,
106 ) -> Self {
107 Self {
108 predicate: Arc::new(predicate),
109 timeout,
110 wait_until_timeout_elapses,
111 }
112 }
113
114 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
115 Self::new(|n| n == 0, timeout, true)
116 }
117}
118
119impl NetworkNode {
132 pub(crate) fn new<T: Into<String>>(
134 name: T,
135 ws_uri: T,
136 prometheus_uri: T,
137 multiaddr: T,
138 spec: NodeSpec,
139 inner: DynNode,
140 ) -> Self {
141 Self {
142 name: name.into(),
143 ws_uri: ws_uri.into(),
144 prometheus_uri: prometheus_uri.into(),
145 inner,
146 spec,
147 multiaddr: multiaddr.into(),
148 metrics_cache: Arc::new(Default::default()),
149 is_running: Arc::new(AtomicBool::new(false)),
150 }
151 }
152
153 pub(crate) fn is_running(&self) -> bool {
154 self.is_running.load(Ordering::Acquire)
155 }
156
157 pub(crate) fn set_is_running(&self, is_running: bool) {
158 self.is_running.store(is_running, Ordering::Release);
159 }
160
161 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
162 self.multiaddr = multiaddr.into();
163 }
164
165 pub fn name(&self) -> &str {
166 &self.name
167 }
168
169 pub fn args(&self) -> Vec<&str> {
170 self.inner.args()
171 }
172
173 pub fn spec(&self) -> &NodeSpec {
174 &self.spec
175 }
176
177 pub fn ws_uri(&self) -> &str {
178 &self.ws_uri
179 }
180
181 pub fn multiaddr(&self) -> &str {
182 self.multiaddr.as_ref()
183 }
184
185 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
189 get_client_from_url(&self.ws_uri).await
190 }
191
192 #[deprecated = "Use `wait_client` instead."]
194 pub async fn client<Config: subxt::Config>(
195 &self,
196 ) -> Result<OnlineClient<Config>, subxt::Error> {
197 self.try_client().await
198 }
199
200 pub async fn try_client<Config: subxt::Config>(
209 &self,
210 ) -> Result<OnlineClient<Config>, subxt::Error> {
211 get_client_from_url(&self.ws_uri).await
212 }
213
214 pub async fn wait_client<Config: subxt::Config>(
216 &self,
217 ) -> Result<OnlineClient<Config>, anyhow::Error> {
218 debug!("wait_client ws_uri: {}", self.ws_uri());
219 wait_ws_ready(self.ws_uri())
220 .await
221 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
222
223 self.try_client()
224 .await
225 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
226 }
227
228 pub async fn wait_client_with_timeout<Config: subxt::Config>(
230 &self,
231 timeout_secs: impl Into<u64>,
232 ) -> Result<OnlineClient<Config>, anyhow::Error> {
233 debug!("waiting until subxt client is ready");
234 tokio::time::timeout(
235 Duration::from_secs(timeout_secs.into()),
236 self.wait_client::<Config>(),
237 )
238 .await?
239 }
240
241 pub async fn pause(&self) -> Result<(), anyhow::Error> {
248 self.set_is_running(false);
249 self.inner.pause().await?;
250 Ok(())
251 }
252
253 pub async fn resume(&self) -> Result<(), anyhow::Error> {
258 self.set_is_running(true);
259 self.inner.resume().await?;
260 Ok(())
261 }
262
263 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
267 self.set_is_running(false);
268 self.inner.restart(after).await?;
269 self.set_is_running(true);
270 Ok(())
271 }
272
273 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
281 let metric_name = metric_name.into();
282 self.fetch_metrics().await?;
284 self.metric(&metric_name, true).await
286 }
287
288 pub async fn assert(
297 &self,
298 metric_name: impl Into<String>,
299 value: impl Into<f64>,
300 ) -> Result<bool, anyhow::Error> {
301 let value: f64 = value.into();
302 self.assert_with(metric_name, |v| v == value).await
303 }
304
305 pub async fn assert_with(
308 &self,
309 metric_name: impl Into<String>,
310 predicate: impl Fn(f64) -> bool,
311 ) -> Result<bool, anyhow::Error> {
312 let metric_name = metric_name.into();
313 self.fetch_metrics().await?;
315 let val = self.metric(&metric_name, true).await?;
316 trace!("🔎 Current value {val} passed to the predicated?");
317 Ok(predicate(val))
318 }
319
320 pub async fn wait_metric(
324 &self,
325 metric_name: impl Into<String>,
326 predicate: impl Fn(f64) -> bool,
327 ) -> Result<(), anyhow::Error> {
328 let metric_name = metric_name.into();
329 debug!("waiting until metric {metric_name} pass the predicate");
330 loop {
331 let res = self.assert_with(&metric_name, &predicate).await;
332 match res {
333 Ok(res) => {
334 if res {
335 return Ok(());
336 }
337 },
338 Err(e) => match e.downcast::<reqwest::Error>() {
339 Ok(io_err) => {
340 if !skip_err_while_waiting(&io_err) {
341 return Err(io_err.into());
342 }
343 },
344 Err(other) => {
345 match other.downcast::<NetworkNodeError>() {
346 Ok(node_err) => {
347 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
348 return Err(node_err.into());
349 }
350 },
351 Err(other) => return Err(other),
352 };
353 },
354 },
355 }
356
357 tokio::time::sleep(Duration::from_secs(1)).await;
359 }
360 }
361
362 pub async fn wait_metric_with_timeout(
365 &self,
366 metric_name: impl Into<String>,
367 predicate: impl Fn(f64) -> bool,
368 timeout_secs: impl Into<u64>,
369 ) -> Result<(), anyhow::Error> {
370 let metric_name = metric_name.into();
371 let secs = timeout_secs.into();
372 debug!("waiting until metric {metric_name} pass the predicate");
373 let res = tokio::time::timeout(
374 Duration::from_secs(secs),
375 self.wait_metric(&metric_name, predicate),
376 )
377 .await;
378
379 if let Ok(inner_res) = res {
380 match inner_res {
381 Ok(_) => Ok(()),
382 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
383 }
384 } else {
385 Err(anyhow!(
387 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
388 ))
389 }
390 }
391
392 pub async fn logs(&self) -> Result<String, anyhow::Error> {
397 Ok(self.inner.logs().await?)
398 }
399
400 pub async fn wait_log_line_count(
402 &self,
403 pattern: impl Into<String>,
404 is_glob: bool,
405 count: usize,
406 ) -> Result<(), anyhow::Error> {
407 let pattern = pattern.into();
408 let pattern_clone = pattern.clone();
409 debug!("waiting until we find pattern {pattern} {count} times");
410 let match_fn: BoxedClosure = if is_glob {
411 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
412 } else {
413 let re = Regex::new(&pattern)?;
414 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
415 };
416
417 loop {
418 let mut q = 0_usize;
419 let logs = self.logs().await?;
420 for line in logs.lines() {
421 trace!("line is {line}");
422 if match_fn(line)? {
423 trace!("pattern {pattern_clone} match in line {line}");
424 q += 1;
425 if q >= count {
426 return Ok(());
427 }
428 }
429 }
430
431 tokio::time::sleep(Duration::from_secs(2)).await;
432 }
433 }
434
435 pub async fn wait_log_line_count_with_timeout(
479 &self,
480 substring: impl Into<String>,
481 is_glob: bool,
482 options: LogLineCountOptions,
483 ) -> Result<LogLineCount, anyhow::Error> {
484 let substring = substring.into();
485 debug!(
486 "waiting until match lines count within {} seconds",
487 options.timeout.as_secs_f64()
488 );
489
490 let start = tokio::time::Instant::now();
491
492 let match_fn: BoxedClosure = if is_glob {
493 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
494 } else {
495 let re = Regex::new(&substring)?;
496 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
497 };
498
499 if options.wait_until_timeout_elapses {
500 tokio::time::sleep(options.timeout).await;
501 }
502
503 let mut q;
504 loop {
505 q = 0_u32;
506 let logs = self.logs().await?;
507 for line in logs.lines() {
508 if match_fn(line)? {
509 q += 1;
510
511 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
516 return Ok(LogLineCount::TargetReached(q));
517 }
518 }
519 }
520
521 if start.elapsed() >= options.timeout {
522 break;
523 }
524
525 tokio::time::sleep(Duration::from_secs(2)).await;
526 }
527
528 if (options.predicate)(q) {
529 Ok(LogLineCount::TargetReached(q))
530 } else {
531 Ok(LogLineCount::TargetFailed(q))
532 }
533 }
534
535 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
536 let response = reqwest::get(&self.prometheus_uri).await?;
537 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
538 let mut cache = self.metrics_cache.write().await;
539 *cache = metrics;
540 Ok(())
541 }
542
543 async fn metric(
545 &self,
546 metric_name: &str,
547 treat_not_found_as_zero: bool,
548 ) -> Result<f64, anyhow::Error> {
549 let mut metrics_map = self.metrics_cache.read().await;
550 if metrics_map.is_empty() {
551 drop(metrics_map);
553 self.fetch_metrics().await?;
554 metrics_map = self.metrics_cache.read().await;
555 }
556
557 if let Some(val) = metrics_map.get(metric_name) {
558 Ok(*val)
559 } else if treat_not_found_as_zero {
560 Ok(0_f64)
561 } else {
562 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
563 }
564 }
565
566 pub async fn wait_until_is_up(
578 &self,
579 timeout_secs: impl Into<u64>,
580 ) -> Result<(), anyhow::Error> {
581 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
582 .await
583 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
584 }
585}
586
587impl std::fmt::Debug for NetworkNode {
588 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589 f.debug_struct("NetworkNode")
590 .field("inner", &"inner_skipped")
591 .field("spec", &self.spec)
592 .field("name", &self.name)
593 .field("ws_uri", &self.ws_uri)
594 .field("prometheus_uri", &self.prometheus_uri)
595 .finish()
596 }
597}
598
599fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
600where
601 S: Serializer,
602{
603 erased_serde::serialize(node.as_ref(), serializer)
604}
605
606#[cfg(test)]
608mod tests {
609 use std::{
610 path::{Path, PathBuf},
611 sync::{Arc, Mutex},
612 };
613
614 use async_trait::async_trait;
615 use provider::{types::*, ProviderError, ProviderNode};
616
617 use super::*;
618
619 #[derive(Serialize)]
620 struct MockNode {
621 logs: Arc<Mutex<Vec<String>>>,
622 }
623
624 impl MockNode {
625 fn new() -> Self {
626 Self {
627 logs: Arc::new(Mutex::new(vec![])),
628 }
629 }
630
631 fn logs_push(&self, lines: Vec<impl Into<String>>) {
632 self.logs
633 .lock()
634 .unwrap()
635 .extend(lines.into_iter().map(|l| l.into()));
636 }
637 }
638
639 #[async_trait]
640 impl ProviderNode for MockNode {
641 fn name(&self) -> &str {
642 todo!()
643 }
644
645 fn args(&self) -> Vec<&str> {
646 todo!()
647 }
648
649 fn base_dir(&self) -> &PathBuf {
650 todo!()
651 }
652
653 fn config_dir(&self) -> &PathBuf {
654 todo!()
655 }
656
657 fn data_dir(&self) -> &PathBuf {
658 todo!()
659 }
660
661 fn relay_data_dir(&self) -> &PathBuf {
662 todo!()
663 }
664
665 fn scripts_dir(&self) -> &PathBuf {
666 todo!()
667 }
668
669 fn log_path(&self) -> &PathBuf {
670 todo!()
671 }
672
673 fn log_cmd(&self) -> String {
674 todo!()
675 }
676
677 fn path_in_node(&self, _file: &Path) -> PathBuf {
678 todo!()
679 }
680
681 async fn logs(&self) -> Result<String, ProviderError> {
682 Ok(self.logs.lock().unwrap().join("\n"))
683 }
684
685 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
686 todo!()
687 }
688
689 async fn run_command(
690 &self,
691 _options: RunCommandOptions,
692 ) -> Result<ExecutionResult, ProviderError> {
693 todo!()
694 }
695
696 async fn run_script(
697 &self,
698 _options: RunScriptOptions,
699 ) -> Result<ExecutionResult, ProviderError> {
700 todo!()
701 }
702
703 async fn send_file(
704 &self,
705 _local_file_path: &Path,
706 _remote_file_path: &Path,
707 _mode: &str,
708 ) -> Result<(), ProviderError> {
709 todo!()
710 }
711
712 async fn receive_file(
713 &self,
714 _remote_file_path: &Path,
715 _local_file_path: &Path,
716 ) -> Result<(), ProviderError> {
717 todo!()
718 }
719
720 async fn pause(&self) -> Result<(), ProviderError> {
721 todo!()
722 }
723
724 async fn resume(&self) -> Result<(), ProviderError> {
725 todo!()
726 }
727
728 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
729 todo!()
730 }
731
732 async fn destroy(&self) -> Result<(), ProviderError> {
733 todo!()
734 }
735 }
736
737 #[tokio::test(flavor = "multi_thread")]
738 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
739 let mock_provider = Arc::new(MockNode::new());
740 let mock_node = NetworkNode::new(
741 "node1",
742 "ws_uri",
743 "prometheus_uri",
744 "multiaddr",
745 NodeSpec::default(),
746 mock_provider.clone(),
747 );
748
749 mock_provider.logs_push(vec![
750 "system booting",
751 "stub line 1",
752 "stub line 2",
753 "system ready",
754 ]);
755
756 let options = LogLineCountOptions {
758 predicate: Arc::new(|n| n == 1),
759 timeout: Duration::from_secs(10),
760 wait_until_timeout_elapses: false,
761 };
762
763 let log_line_count = mock_node
764 .wait_log_line_count_with_timeout("system ready", false, options)
765 .await?;
766
767 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
768
769 Ok(())
770 }
771
772 #[tokio::test(flavor = "multi_thread")]
773 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
774 let mock_provider = Arc::new(MockNode::new());
775 let mock_node = NetworkNode::new(
776 "node1",
777 "ws_uri",
778 "prometheus_uri",
779 "multiaddr",
780 NodeSpec::default(),
781 mock_provider.clone(),
782 );
783
784 mock_provider.logs_push(vec![
785 "system booting",
786 "stub line 1",
787 "stub line 2",
788 "system ready",
789 ]);
790
791 let options = LogLineCountOptions {
793 predicate: Arc::new(|n| n == 2),
794 timeout: Duration::from_secs(4),
795 wait_until_timeout_elapses: false,
796 };
797
798 let task = tokio::spawn({
799 async move {
800 mock_node
801 .wait_log_line_count_with_timeout("system ready", false, options)
802 .await
803 .unwrap()
804 }
805 });
806
807 tokio::time::sleep(Duration::from_secs(2)).await;
808
809 mock_provider.logs_push(vec!["system ready"]);
810
811 let log_line_count = task.await?;
812
813 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
814
815 Ok(())
816 }
817
818 #[tokio::test(flavor = "multi_thread")]
819 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
820 let mock_provider = Arc::new(MockNode::new());
821 let mock_node = NetworkNode::new(
822 "node1",
823 "ws_uri",
824 "prometheus_uri",
825 "multiaddr",
826 NodeSpec::default(),
827 mock_provider.clone(),
828 );
829
830 mock_provider.logs_push(vec![
831 "system booting",
832 "stub line 1",
833 "stub line 2",
834 "system ready",
835 ]);
836
837 let options = LogLineCountOptions {
839 predicate: Arc::new(|n| n == 2),
840 timeout: Duration::from_secs(2),
841 wait_until_timeout_elapses: false,
842 };
843
844 let log_line_count = mock_node
845 .wait_log_line_count_with_timeout("system ready", false, options)
846 .await?;
847
848 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
849
850 Ok(())
851 }
852
853 #[tokio::test(flavor = "multi_thread")]
854 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
855 let mock_provider = Arc::new(MockNode::new());
856 let mock_node = NetworkNode::new(
857 "node1",
858 "ws_uri",
859 "prometheus_uri",
860 "multiaddr",
861 NodeSpec::default(),
862 mock_provider.clone(),
863 );
864
865 mock_provider.logs_push(vec![
866 "system booting",
867 "stub line 1",
868 "stub line 2",
869 "system ready",
870 ]);
871
872 let options = LogLineCountOptions {
874 predicate: Arc::new(|n| n == 2),
875 timeout: Duration::from_secs(2),
876 wait_until_timeout_elapses: true,
877 };
878
879 let task = tokio::spawn({
880 async move {
881 mock_node
882 .wait_log_line_count_with_timeout("system ready", false, options)
883 .await
884 .unwrap()
885 }
886 });
887
888 tokio::time::sleep(Duration::from_secs(1)).await;
889
890 mock_provider.logs_push(vec!["system ready"]);
891 mock_provider.logs_push(vec!["system ready"]);
892
893 let log_line_count = task.await?;
894
895 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
896
897 Ok(())
898 }
899
900 #[tokio::test(flavor = "multi_thread")]
901 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
902 let mock_provider = Arc::new(MockNode::new());
903 let mock_node = NetworkNode::new(
904 "node1",
905 "ws_uri",
906 "prometheus_uri",
907 "multiaddr",
908 NodeSpec::default(),
909 mock_provider.clone(),
910 );
911
912 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
913
914 let task = tokio::spawn({
915 async move {
916 mock_node
917 .wait_log_line_count_with_timeout(
918 "system ready",
919 false,
920 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
922 )
923 .await
924 .unwrap()
925 }
926 });
927
928 tokio::time::sleep(Duration::from_secs(1)).await;
929
930 mock_provider.logs_push(vec!["stub line 3"]);
931
932 assert!(task.await?.success());
933
934 Ok(())
935 }
936
937 #[tokio::test(flavor = "multi_thread")]
938 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
939 let mock_provider = Arc::new(MockNode::new());
940 let mock_node = NetworkNode::new(
941 "node1",
942 "ws_uri",
943 "prometheus_uri",
944 "multiaddr",
945 NodeSpec::default(),
946 mock_provider.clone(),
947 );
948
949 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
950
951 let options = LogLineCountOptions {
953 predicate: Arc::new(|n| (2..=5).contains(&n)),
954 timeout: Duration::from_secs(2),
955 wait_until_timeout_elapses: true,
956 };
957
958 let task = tokio::spawn({
959 async move {
960 mock_node
961 .wait_log_line_count_with_timeout("system ready", false, options)
962 .await
963 .unwrap()
964 }
965 });
966
967 tokio::time::sleep(Duration::from_secs(1)).await;
968
969 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
970
971 assert!(task.await?.success());
972
973 Ok(())
974 }
975
976 #[tokio::test(flavor = "multi_thread")]
977 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
978 let mock_provider = Arc::new(MockNode::new());
979 let mock_node = NetworkNode::new(
980 "node1",
981 "ws_uri",
982 "prometheus_uri",
983 "multiaddr",
984 NodeSpec::default(),
985 mock_provider.clone(),
986 );
987
988 mock_provider.logs_push(vec![
989 "system booting",
990 "stub line 1",
991 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
993 "stub line 2"
994 ]);
995
996 let options = LogLineCountOptions {
997 predicate: Arc::new(|n| n == 1),
998 timeout: Duration::from_secs(3),
999 wait_until_timeout_elapses: true,
1000 };
1001
1002 let task = tokio::spawn({
1003 async move {
1004 mock_node
1005 .wait_log_line_count_with_timeout(
1006 "error(?! importing block .*: block has an unknown parent)",
1007 false,
1008 options,
1009 )
1010 .await
1011 .unwrap()
1012 }
1013 });
1014
1015 tokio::time::sleep(Duration::from_secs(1)).await;
1016
1017 mock_provider.logs_push(vec![
1018 "system ready",
1019 "system error",
1021 "system ready",
1022 ]);
1023
1024 assert!(task.await?.success());
1025
1026 Ok(())
1027 }
1028
1029 #[tokio::test(flavor = "multi_thread")]
1030 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1031 ) -> Result<(), anyhow::Error> {
1032 let mock_provider = Arc::new(MockNode::new());
1033 let mock_node = NetworkNode::new(
1034 "node1",
1035 "ws_uri",
1036 "prometheus_uri",
1037 "multiaddr",
1038 NodeSpec::default(),
1039 mock_provider.clone(),
1040 );
1041
1042 mock_provider.logs_push(vec![
1043 "system booting",
1044 "stub line 1",
1045 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1047 "stub line 2"
1048 ]);
1049
1050 let options = LogLineCountOptions {
1051 predicate: Arc::new(|n| n == 1),
1052 timeout: Duration::from_secs(6),
1053 wait_until_timeout_elapses: true,
1054 };
1055
1056 let task = tokio::spawn({
1057 async move {
1058 mock_node
1059 .wait_log_line_count_with_timeout(
1060 "error(?! importing block .*: block has an unknown parent)",
1061 false,
1062 options,
1063 )
1064 .await
1065 .unwrap()
1066 }
1067 });
1068
1069 tokio::time::sleep(Duration::from_secs(1)).await;
1070
1071 mock_provider.logs_push(vec!["system ready", "system ready"]);
1072
1073 assert!(!task.await?.success());
1074
1075 Ok(())
1076 }
1077
1078 #[tokio::test(flavor = "multi_thread")]
1079 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1080 let mock_provider = Arc::new(MockNode::new());
1081 let mock_node = NetworkNode::new(
1082 "node1",
1083 "ws_uri",
1084 "prometheus_uri",
1085 "multiaddr",
1086 NodeSpec::default(),
1087 mock_provider.clone(),
1088 );
1089
1090 mock_provider.logs_push(vec![
1091 "system booting",
1092 "stub line 1",
1093 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1095 "stub line 2"
1096 ]);
1097
1098 let task = tokio::spawn({
1099 async move {
1100 mock_node
1101 .wait_log_line_count(
1102 "error(?! importing block .*: block has an unknown parent)",
1103 false,
1104 1,
1105 )
1106 .await
1107 .unwrap()
1108 }
1109 });
1110
1111 tokio::time::sleep(Duration::from_secs(1)).await;
1112
1113 mock_provider.logs_push(vec![
1114 "system ready",
1115 "system error",
1117 "system ready",
1118 ]);
1119
1120 assert!(task.await.is_ok());
1121
1122 Ok(())
1123 }
1124
1125 #[tokio::test(flavor = "multi_thread")]
1126 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1127 let mock_provider = Arc::new(MockNode::new());
1128 let mock_node = NetworkNode::new(
1129 "node1",
1130 "ws_uri",
1131 "prometheus_uri",
1132 "multiaddr",
1133 NodeSpec::default(),
1134 mock_provider.clone(),
1135 );
1136
1137 mock_provider.logs_push(vec![
1138 "system booting",
1139 "stub line 1",
1140 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1142 "stub line 2"
1143 ]);
1144
1145 let options = LogLineCountOptions {
1146 predicate: Arc::new(|count| count == 1),
1147 timeout: Duration::from_secs(2),
1148 wait_until_timeout_elapses: true,
1149 };
1150
1151 let task = tokio::spawn({
1152 async move {
1153 mock_node
1155 .wait_log_line_count_with_timeout(
1156 "error(?! importing block .*: block has an unknown parent)",
1157 false,
1158 options,
1159 )
1160 .await
1161 .unwrap()
1162 }
1163 });
1164
1165 tokio::time::sleep(Duration::from_secs(1)).await;
1166
1167 mock_provider.logs_push(vec!["system ready", "system ready"]);
1168
1169 assert!(!task.await?.success());
1170
1171 Ok(())
1172 }
1173}