1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15 types::{ExecutionResult, RunScriptOptions},
16 DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31 #[error("metric '{0}' not found!")]
32 MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37 #[serde(serialize_with = "serialize_provider_node")]
38 pub(crate) inner: DynNode,
39 pub(crate) spec: NodeSpec,
42 pub(crate) name: String,
43 pub(crate) ws_uri: String,
44 pub(crate) multiaddr: String,
45 pub(crate) prometheus_uri: String,
46 #[serde(skip)]
47 metrics_cache: Arc<RwLock<MetricMap>>,
48 #[serde(skip)]
49 is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54 pub(crate) name: String,
55 pub(crate) ws_uri: String,
56 pub(crate) prometheus_uri: String,
57 pub(crate) multiaddr: String,
58 pub(crate) spec: NodeSpec,
59 pub(crate) inner: serde_json::Value,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum LogLineCount {
73 TargetReached(u32),
74 TargetFailed(u32),
75}
76
77impl LogLineCount {
78 pub fn success(&self) -> bool {
79 match self {
80 Self::TargetReached(..) => true,
81 Self::TargetFailed(..) => false,
82 }
83 }
84}
85
86#[derive(Clone)]
99pub struct LogLineCountOptions {
100 pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
101 pub timeout: Duration,
102 pub wait_until_timeout_elapses: bool,
103}
104
105impl LogLineCountOptions {
106 pub fn new(
107 predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
108 timeout: Duration,
109 wait_until_timeout_elapses: bool,
110 ) -> Self {
111 Self {
112 predicate: Arc::new(predicate),
113 timeout,
114 wait_until_timeout_elapses,
115 }
116 }
117
118 pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
119 Self::new(|n| n == 0, timeout, true)
120 }
121
122 pub fn at_least_once(timeout: Duration) -> Self {
123 Self::new(|count| count >= 1, timeout, false)
124 }
125
126 pub fn exactly_once(timeout: Duration) -> Self {
127 Self::new(|count| count == 1, timeout, false)
128 }
129}
130
131impl NetworkNode {
144 pub(crate) fn new<T: Into<String>>(
146 name: T,
147 ws_uri: T,
148 prometheus_uri: T,
149 multiaddr: T,
150 spec: NodeSpec,
151 inner: DynNode,
152 ) -> Self {
153 Self {
154 name: name.into(),
155 ws_uri: ws_uri.into(),
156 prometheus_uri: prometheus_uri.into(),
157 inner,
158 spec,
159 multiaddr: multiaddr.into(),
160 metrics_cache: Arc::new(Default::default()),
161 is_running: Arc::new(AtomicBool::new(false)),
162 }
163 }
164
165 pub fn is_running(&self) -> bool {
169 self.is_running.load(Ordering::Acquire)
170 }
171
172 pub async fn is_responsive(&self) -> bool {
179 tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
180 .await
181 .is_ok()
182 }
183
184 pub(crate) fn set_is_running(&self, is_running: bool) {
185 self.is_running.store(is_running, Ordering::Release);
186 }
187
188 pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
189 self.multiaddr = multiaddr.into();
190 }
191
192 pub fn name(&self) -> &str {
193 &self.name
194 }
195
196 pub fn args(&self) -> Vec<&str> {
197 self.inner.args()
198 }
199
200 pub fn spec(&self) -> &NodeSpec {
201 &self.spec
202 }
203
204 pub fn ws_uri(&self) -> &str {
205 &self.ws_uri
206 }
207
208 pub fn multiaddr(&self) -> &str {
209 self.multiaddr.as_ref()
210 }
211
212 pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
216 get_client_from_url(&self.ws_uri).await
217 }
218
219 #[deprecated = "Use `wait_client` instead."]
221 pub async fn client<Config: subxt::Config>(
222 &self,
223 ) -> Result<OnlineClient<Config>, subxt::Error> {
224 self.try_client().await
225 }
226
227 pub async fn try_client<Config: subxt::Config>(
236 &self,
237 ) -> Result<OnlineClient<Config>, subxt::Error> {
238 get_client_from_url(&self.ws_uri).await
239 }
240
241 pub async fn wait_client<Config: subxt::Config>(
243 &self,
244 ) -> Result<OnlineClient<Config>, anyhow::Error> {
245 debug!("wait_client ws_uri: {}", self.ws_uri());
246 wait_ws_ready(self.ws_uri())
247 .await
248 .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
249
250 self.try_client()
251 .await
252 .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
253 }
254
255 pub async fn wait_client_with_timeout<Config: subxt::Config>(
257 &self,
258 timeout_secs: impl Into<u64>,
259 ) -> Result<OnlineClient<Config>, anyhow::Error> {
260 debug!("waiting until subxt client is ready");
261 tokio::time::timeout(
262 Duration::from_secs(timeout_secs.into()),
263 self.wait_client::<Config>(),
264 )
265 .await?
266 }
267
268 pub async fn pause(&self) -> Result<(), anyhow::Error> {
276 self.set_is_running(false);
277 self.inner.pause().await?;
278 Ok(())
279 }
280
281 pub async fn resume(&self) -> Result<(), anyhow::Error> {
287 self.set_is_running(true);
288 self.inner.resume().await?;
289 Ok(())
290 }
291
292 pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
297 self.set_is_running(false);
298 self.inner.restart(after).await?;
299 self.set_is_running(true);
300 Ok(())
301 }
302
303 pub async fn run_script(
310 &self,
311 options: RunScriptOptions,
312 ) -> Result<ExecutionResult, anyhow::Error> {
313 self.inner
314 .run_script(options)
315 .await
316 .map_err(|e| anyhow!("Failed to run script: {e}"))
317 }
318
319 pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
327 let metric_name = metric_name.into();
328 self.fetch_metrics().await?;
330 self.metric(&metric_name, true).await
332 }
333
334 pub async fn assert(
343 &self,
344 metric_name: impl Into<String>,
345 value: impl Into<f64>,
346 ) -> Result<bool, anyhow::Error> {
347 let value: f64 = value.into();
348 self.assert_with(metric_name, |v| v == value).await
349 }
350
351 pub async fn assert_with(
354 &self,
355 metric_name: impl Into<String>,
356 predicate: impl Fn(f64) -> bool,
357 ) -> Result<bool, anyhow::Error> {
358 let metric_name = metric_name.into();
359 self.fetch_metrics().await?;
361 let val = self.metric(&metric_name, true).await?;
362 trace!("🔎 Current value {val} passed to the predicated?");
363 Ok(predicate(val))
364 }
365
366 pub async fn wait_metric(
370 &self,
371 metric_name: impl Into<String>,
372 predicate: impl Fn(f64) -> bool,
373 ) -> Result<(), anyhow::Error> {
374 let metric_name = metric_name.into();
375 debug!("waiting until metric {metric_name} pass the predicate");
376 loop {
377 let res = self.assert_with(&metric_name, &predicate).await;
378 match res {
379 Ok(res) => {
380 if res {
381 return Ok(());
382 }
383 },
384 Err(e) => match e.downcast::<reqwest::Error>() {
385 Ok(io_err) => {
386 if !skip_err_while_waiting(&io_err) {
387 return Err(io_err.into());
388 }
389 },
390 Err(other) => {
391 match other.downcast::<NetworkNodeError>() {
392 Ok(node_err) => {
393 if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
394 return Err(node_err.into());
395 }
396 },
397 Err(other) => return Err(other),
398 };
399 },
400 },
401 }
402
403 tokio::time::sleep(Duration::from_secs(1)).await;
405 }
406 }
407
408 pub async fn wait_metric_with_timeout(
411 &self,
412 metric_name: impl Into<String>,
413 predicate: impl Fn(f64) -> bool,
414 timeout_secs: impl Into<u64>,
415 ) -> Result<(), anyhow::Error> {
416 let metric_name = metric_name.into();
417 let secs = timeout_secs.into();
418 debug!("waiting until metric {metric_name} pass the predicate");
419 let res = tokio::time::timeout(
420 Duration::from_secs(secs),
421 self.wait_metric(&metric_name, predicate),
422 )
423 .await;
424
425 if let Ok(inner_res) = res {
426 match inner_res {
427 Ok(_) => Ok(()),
428 Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
429 }
430 } else {
431 Err(anyhow!(
433 "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
434 ))
435 }
436 }
437
438 pub async fn logs(&self) -> Result<String, anyhow::Error> {
443 Ok(self.inner.logs().await?)
444 }
445
446 pub async fn wait_log_line_count(
448 &self,
449 pattern: impl Into<String>,
450 is_glob: bool,
451 count: usize,
452 ) -> Result<(), anyhow::Error> {
453 let pattern = pattern.into();
454 let pattern_clone = pattern.clone();
455 debug!("waiting until we find pattern {pattern} {count} times");
456 let match_fn: BoxedClosure = if is_glob {
457 Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
458 } else {
459 let re = Regex::new(&pattern)?;
460 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
461 };
462
463 loop {
464 let mut q = 0_usize;
465 let logs = self.logs().await?;
466 for line in logs.lines() {
467 trace!("line is {line}");
468 if match_fn(line)? {
469 trace!("pattern {pattern_clone} match in line {line}");
470 q += 1;
471 if q >= count {
472 return Ok(());
473 }
474 }
475 }
476
477 tokio::time::sleep(Duration::from_secs(2)).await;
478 }
479 }
480
481 pub async fn wait_log_line_count_with_timeout(
525 &self,
526 substring: impl Into<String>,
527 is_glob: bool,
528 options: LogLineCountOptions,
529 ) -> Result<LogLineCount, anyhow::Error> {
530 let substring = substring.into();
531 debug!(
532 "waiting until match lines count within {} seconds",
533 options.timeout.as_secs_f64()
534 );
535
536 let start = tokio::time::Instant::now();
537
538 let match_fn: BoxedClosure = if is_glob {
539 Box::new(move |line: &str| Ok(glob_match(&substring, line)))
540 } else {
541 let re = Regex::new(&substring)?;
542 Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
543 };
544
545 if options.wait_until_timeout_elapses {
546 tokio::time::sleep(options.timeout).await;
547 }
548
549 let mut q;
550 loop {
551 q = 0_u32;
552 let logs = self.logs().await?;
553 for line in logs.lines() {
554 if match_fn(line)? {
555 q += 1;
556
557 if !options.wait_until_timeout_elapses && (options.predicate)(q) {
562 return Ok(LogLineCount::TargetReached(q));
563 }
564 }
565 }
566
567 if start.elapsed() >= options.timeout {
568 break;
569 }
570
571 tokio::time::sleep(Duration::from_secs(2)).await;
572 }
573
574 if (options.predicate)(q) {
575 Ok(LogLineCount::TargetReached(q))
576 } else {
577 Ok(LogLineCount::TargetFailed(q))
578 }
579 }
580
581 async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
582 let response = reqwest::get(&self.prometheus_uri).await?;
583 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
584 let mut cache = self.metrics_cache.write().await;
585 *cache = metrics;
586 Ok(())
587 }
588
589 async fn metric(
591 &self,
592 metric_name: &str,
593 treat_not_found_as_zero: bool,
594 ) -> Result<f64, anyhow::Error> {
595 let mut metrics_map = self.metrics_cache.read().await;
596 if metrics_map.is_empty() {
597 drop(metrics_map);
599 self.fetch_metrics().await?;
600 metrics_map = self.metrics_cache.read().await;
601 }
602
603 if let Some(val) = metrics_map.get(metric_name) {
604 Ok(*val)
605 } else if treat_not_found_as_zero {
606 Ok(0_f64)
607 } else {
608 Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
609 }
610 }
611
612 pub async fn get_histogram_buckets(
632 &self,
633 metric_name: impl AsRef<str>,
634 label_filters: Option<HashMap<String, String>>,
635 ) -> Result<HashMap<String, u64>, anyhow::Error> {
636 let metric_name = metric_name.as_ref();
637
638 let response = reqwest::get(&self.prometheus_uri).await?;
640 let metrics = prom_metrics_parser::parse(&response.text().await?)?;
641
642 let resolved_metric_name = if metric_name.contains("_bucket") {
644 metric_name.to_string()
645 } else {
646 format!("{}_bucket", metric_name)
647 };
648
649 let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
653
654 for (key, &value) in metrics.iter() {
655 if !key.starts_with(&resolved_metric_name) {
656 continue;
657 }
658
659 let remaining = &key[resolved_metric_name.len()..];
660
661 let labels_str = &remaining[1..remaining.len() - 1];
662 let parsed_labels = Self::parse_label_string(labels_str);
663
664 if !parsed_labels.contains_key("le") {
666 continue;
667 }
668
669 if let Some(ref filters) = label_filters {
671 let mut all_match = true;
672 for (filter_key, filter_value) in filters {
673 if parsed_labels.get(filter_key) != Some(filter_value) {
674 all_match = false;
675 break;
676 }
677 }
678 if !all_match {
679 continue;
680 }
681 }
682
683 metric_entries.push((key.clone(), parsed_labels, value as u64));
684 }
685
686 let max_label_count = metric_entries
689 .iter()
690 .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
691 .max()
692 .unwrap_or(0);
693
694 let mut raw_buckets: Vec<(String, u64)> = Vec::new();
696 let mut seen_le_values = HashSet::new();
697 let mut active_series: Option<Vec<(String, String)>> = None;
698
699 for (_, parsed_labels, value) in metric_entries {
700 let le_value = parsed_labels.get("le").unwrap().clone();
701
702 let mut non_le_labels: Vec<(String, String)> = parsed_labels
704 .iter()
705 .filter(|(k, _)| k.as_str() != "le")
706 .map(|(k, v)| (k.clone(), v.clone()))
707 .collect();
708 non_le_labels.sort();
709
710 if non_le_labels.len() < max_label_count {
713 continue;
714 }
715
716 if let Some(ref prev_series) = active_series {
718 if prev_series != &non_le_labels {
719 if !raw_buckets.is_empty() {
720 break; }
722 active_series = Some(non_le_labels.clone());
723 seen_le_values.clear();
724 }
725 } else {
726 active_series = Some(non_le_labels.clone());
727 }
728
729 if !seen_le_values.insert(le_value.clone()) {
731 continue;
732 }
733
734 trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
735 raw_buckets.push((le_value, value));
736 }
737
738 raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
740
741 let mut buckets = HashMap::new();
743 let mut previous_value = 0_u64;
744 for (le, cumulative_count) in raw_buckets {
745 if cumulative_count < previous_value {
746 warn!(
747 "Warning: bucket count decreased from {} to {} at le={}",
748 previous_value, cumulative_count, le
749 );
750 }
751 let delta = cumulative_count.saturating_sub(previous_value);
752 buckets.insert(le, delta);
753 previous_value = cumulative_count;
754 }
755
756 Ok(buckets)
757 }
758
759 fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
765 let mut labels = HashMap::new();
766 let mut current_key = String::new();
767 let mut current_value = String::new();
768 let mut in_value = false;
769 let mut in_quotes = false;
770
771 for ch in labels_str.chars() {
772 match ch {
773 '=' if !in_quotes && !in_value => {
774 in_value = true;
775 },
776 '"' if in_value => {
777 in_quotes = !in_quotes;
778 },
779 ',' if !in_quotes => {
780 if !current_key.is_empty() {
782 labels.insert(
783 current_key.trim().to_string(),
784 current_value.trim().to_string(),
785 );
786 current_key.clear();
787 current_value.clear();
788 in_value = false;
789 }
790 },
791 _ => {
792 if in_value {
793 current_value.push(ch);
794 } else {
795 current_key.push(ch);
796 }
797 },
798 }
799 }
800
801 if !current_key.is_empty() {
803 labels.insert(
804 current_key.trim().to_string(),
805 current_value.trim().to_string(),
806 );
807 }
808
809 labels
810 }
811
812 fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
816 use std::cmp::Ordering;
817
818 match (a, b) {
820 ("+Inf", "+Inf") => Ordering::Equal,
821 ("+Inf", _) => Ordering::Greater,
822 (_, "+Inf") => Ordering::Less,
823 _ => {
824 match (a.parse::<f64>(), b.parse::<f64>()) {
826 (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
827 _ => a.cmp(b),
829 }
830 },
831 }
832 }
833
834 pub async fn wait_until_is_up(
846 &self,
847 timeout_secs: impl Into<u64>,
848 ) -> Result<(), anyhow::Error> {
849 self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
850 .await
851 .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
852 }
853}
854
855impl std::fmt::Debug for NetworkNode {
856 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
857 f.debug_struct("NetworkNode")
858 .field("inner", &"inner_skipped")
859 .field("spec", &self.spec)
860 .field("name", &self.name)
861 .field("ws_uri", &self.ws_uri)
862 .field("prometheus_uri", &self.prometheus_uri)
863 .finish()
864 }
865}
866
867fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
868where
869 S: Serializer,
870{
871 erased_serde::serialize(node.as_ref(), serializer)
872}
873
874#[cfg(test)]
876mod tests {
877 use std::{
878 path::{Path, PathBuf},
879 sync::{Arc, Mutex},
880 };
881
882 use async_trait::async_trait;
883 use provider::{types::*, ProviderError, ProviderNode};
884
885 use super::*;
886
887 #[derive(Serialize)]
888 struct MockNode {
889 logs: Arc<Mutex<Vec<String>>>,
890 }
891
892 impl MockNode {
893 fn new() -> Self {
894 Self {
895 logs: Arc::new(Mutex::new(vec![])),
896 }
897 }
898
899 fn logs_push(&self, lines: Vec<impl Into<String>>) {
900 self.logs
901 .lock()
902 .unwrap()
903 .extend(lines.into_iter().map(|l| l.into()));
904 }
905 }
906
907 #[async_trait]
908 impl ProviderNode for MockNode {
909 fn name(&self) -> &str {
910 todo!()
911 }
912
913 fn args(&self) -> Vec<&str> {
914 todo!()
915 }
916
917 fn base_dir(&self) -> &PathBuf {
918 todo!()
919 }
920
921 fn config_dir(&self) -> &PathBuf {
922 todo!()
923 }
924
925 fn data_dir(&self) -> &PathBuf {
926 todo!()
927 }
928
929 fn relay_data_dir(&self) -> &PathBuf {
930 todo!()
931 }
932
933 fn scripts_dir(&self) -> &PathBuf {
934 todo!()
935 }
936
937 fn log_path(&self) -> &PathBuf {
938 todo!()
939 }
940
941 fn log_cmd(&self) -> String {
942 todo!()
943 }
944
945 fn path_in_node(&self, _file: &Path) -> PathBuf {
946 todo!()
947 }
948
949 async fn logs(&self) -> Result<String, ProviderError> {
950 Ok(self.logs.lock().unwrap().join("\n"))
951 }
952
953 async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
954 todo!()
955 }
956
957 async fn run_command(
958 &self,
959 _options: RunCommandOptions,
960 ) -> Result<ExecutionResult, ProviderError> {
961 todo!()
962 }
963
964 async fn run_script(
965 &self,
966 _options: RunScriptOptions,
967 ) -> Result<ExecutionResult, ProviderError> {
968 todo!()
969 }
970
971 async fn send_file(
972 &self,
973 _local_file_path: &Path,
974 _remote_file_path: &Path,
975 _mode: &str,
976 ) -> Result<(), ProviderError> {
977 todo!()
978 }
979
980 async fn receive_file(
981 &self,
982 _remote_file_path: &Path,
983 _local_file_path: &Path,
984 ) -> Result<(), ProviderError> {
985 todo!()
986 }
987
988 async fn pause(&self) -> Result<(), ProviderError> {
989 todo!()
990 }
991
992 async fn resume(&self) -> Result<(), ProviderError> {
993 todo!()
994 }
995
996 async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
997 todo!()
998 }
999
1000 async fn destroy(&self) -> Result<(), ProviderError> {
1001 todo!()
1002 }
1003 }
1004
1005 #[tokio::test(flavor = "multi_thread")]
1006 async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1007 let mock_provider = Arc::new(MockNode::new());
1008 let mock_node = NetworkNode::new(
1009 "node1",
1010 "ws_uri",
1011 "prometheus_uri",
1012 "multiaddr",
1013 NodeSpec::default(),
1014 mock_provider.clone(),
1015 );
1016
1017 mock_provider.logs_push(vec![
1018 "system booting",
1019 "stub line 1",
1020 "stub line 2",
1021 "system ready",
1022 ]);
1023
1024 let options = LogLineCountOptions {
1026 predicate: Arc::new(|n| n == 1),
1027 timeout: Duration::from_secs(10),
1028 wait_until_timeout_elapses: false,
1029 };
1030
1031 let log_line_count = mock_node
1032 .wait_log_line_count_with_timeout("system ready", false, options)
1033 .await?;
1034
1035 assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1036
1037 Ok(())
1038 }
1039
1040 #[tokio::test(flavor = "multi_thread")]
1041 async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1042 let mock_provider = Arc::new(MockNode::new());
1043 let mock_node = NetworkNode::new(
1044 "node1",
1045 "ws_uri",
1046 "prometheus_uri",
1047 "multiaddr",
1048 NodeSpec::default(),
1049 mock_provider.clone(),
1050 );
1051
1052 mock_provider.logs_push(vec![
1053 "system booting",
1054 "stub line 1",
1055 "stub line 2",
1056 "system ready",
1057 ]);
1058
1059 let options = LogLineCountOptions {
1061 predicate: Arc::new(|n| n == 2),
1062 timeout: Duration::from_secs(4),
1063 wait_until_timeout_elapses: false,
1064 };
1065
1066 let task = tokio::spawn({
1067 async move {
1068 mock_node
1069 .wait_log_line_count_with_timeout("system ready", false, options)
1070 .await
1071 .unwrap()
1072 }
1073 });
1074
1075 tokio::time::sleep(Duration::from_secs(2)).await;
1076
1077 mock_provider.logs_push(vec!["system ready"]);
1078
1079 let log_line_count = task.await?;
1080
1081 assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1082
1083 Ok(())
1084 }
1085
1086 #[tokio::test(flavor = "multi_thread")]
1087 async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1088 let mock_provider = Arc::new(MockNode::new());
1089 let mock_node = NetworkNode::new(
1090 "node1",
1091 "ws_uri",
1092 "prometheus_uri",
1093 "multiaddr",
1094 NodeSpec::default(),
1095 mock_provider.clone(),
1096 );
1097
1098 mock_provider.logs_push(vec![
1099 "system booting",
1100 "stub line 1",
1101 "stub line 2",
1102 "system ready",
1103 ]);
1104
1105 let options = LogLineCountOptions {
1107 predicate: Arc::new(|n| n == 2),
1108 timeout: Duration::from_secs(2),
1109 wait_until_timeout_elapses: false,
1110 };
1111
1112 let log_line_count = mock_node
1113 .wait_log_line_count_with_timeout("system ready", false, options)
1114 .await?;
1115
1116 assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1117
1118 Ok(())
1119 }
1120
1121 #[tokio::test(flavor = "multi_thread")]
1122 async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1123 let mock_provider = Arc::new(MockNode::new());
1124 let mock_node = NetworkNode::new(
1125 "node1",
1126 "ws_uri",
1127 "prometheus_uri",
1128 "multiaddr",
1129 NodeSpec::default(),
1130 mock_provider.clone(),
1131 );
1132
1133 mock_provider.logs_push(vec![
1134 "system booting",
1135 "stub line 1",
1136 "stub line 2",
1137 "system ready",
1138 ]);
1139
1140 let options = LogLineCountOptions {
1142 predicate: Arc::new(|n| n == 2),
1143 timeout: Duration::from_secs(2),
1144 wait_until_timeout_elapses: true,
1145 };
1146
1147 let task = tokio::spawn({
1148 async move {
1149 mock_node
1150 .wait_log_line_count_with_timeout("system ready", false, options)
1151 .await
1152 .unwrap()
1153 }
1154 });
1155
1156 tokio::time::sleep(Duration::from_secs(1)).await;
1157
1158 mock_provider.logs_push(vec!["system ready"]);
1159 mock_provider.logs_push(vec!["system ready"]);
1160
1161 let log_line_count = task.await?;
1162
1163 assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1164
1165 Ok(())
1166 }
1167
1168 #[tokio::test(flavor = "multi_thread")]
1169 async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1170 let mock_provider = Arc::new(MockNode::new());
1171 let mock_node = NetworkNode::new(
1172 "node1",
1173 "ws_uri",
1174 "prometheus_uri",
1175 "multiaddr",
1176 NodeSpec::default(),
1177 mock_provider.clone(),
1178 );
1179
1180 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1181
1182 let task = tokio::spawn({
1183 async move {
1184 mock_node
1185 .wait_log_line_count_with_timeout(
1186 "system ready",
1187 false,
1188 LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1190 )
1191 .await
1192 .unwrap()
1193 }
1194 });
1195
1196 tokio::time::sleep(Duration::from_secs(1)).await;
1197
1198 mock_provider.logs_push(vec!["stub line 3"]);
1199
1200 assert!(task.await?.success());
1201
1202 Ok(())
1203 }
1204
1205 #[tokio::test(flavor = "multi_thread")]
1206 async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1207 let mock_provider = Arc::new(MockNode::new());
1208 let mock_node = NetworkNode::new(
1209 "node1",
1210 "ws_uri",
1211 "prometheus_uri",
1212 "multiaddr",
1213 NodeSpec::default(),
1214 mock_provider.clone(),
1215 );
1216
1217 mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1218
1219 let options = LogLineCountOptions {
1221 predicate: Arc::new(|n| (2..=5).contains(&n)),
1222 timeout: Duration::from_secs(2),
1223 wait_until_timeout_elapses: true,
1224 };
1225
1226 let task = tokio::spawn({
1227 async move {
1228 mock_node
1229 .wait_log_line_count_with_timeout("system ready", false, options)
1230 .await
1231 .unwrap()
1232 }
1233 });
1234
1235 tokio::time::sleep(Duration::from_secs(1)).await;
1236
1237 mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1238
1239 assert!(task.await?.success());
1240
1241 Ok(())
1242 }
1243
1244 #[tokio::test(flavor = "multi_thread")]
1245 async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1246 let mock_provider = Arc::new(MockNode::new());
1247 let mock_node = NetworkNode::new(
1248 "node1",
1249 "ws_uri",
1250 "prometheus_uri",
1251 "multiaddr",
1252 NodeSpec::default(),
1253 mock_provider.clone(),
1254 );
1255
1256 mock_provider.logs_push(vec![
1257 "system booting",
1258 "stub line 1",
1259 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1261 "stub line 2"
1262 ]);
1263
1264 let options = LogLineCountOptions {
1265 predicate: Arc::new(|n| n == 1),
1266 timeout: Duration::from_secs(3),
1267 wait_until_timeout_elapses: true,
1268 };
1269
1270 let task = tokio::spawn({
1271 async move {
1272 mock_node
1273 .wait_log_line_count_with_timeout(
1274 "error(?! importing block .*: block has an unknown parent)",
1275 false,
1276 options,
1277 )
1278 .await
1279 .unwrap()
1280 }
1281 });
1282
1283 tokio::time::sleep(Duration::from_secs(1)).await;
1284
1285 mock_provider.logs_push(vec![
1286 "system ready",
1287 "system error",
1289 "system ready",
1290 ]);
1291
1292 assert!(task.await?.success());
1293
1294 Ok(())
1295 }
1296
1297 #[tokio::test(flavor = "multi_thread")]
1298 async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1299 ) -> Result<(), anyhow::Error> {
1300 let mock_provider = Arc::new(MockNode::new());
1301 let mock_node = NetworkNode::new(
1302 "node1",
1303 "ws_uri",
1304 "prometheus_uri",
1305 "multiaddr",
1306 NodeSpec::default(),
1307 mock_provider.clone(),
1308 );
1309
1310 mock_provider.logs_push(vec![
1311 "system booting",
1312 "stub line 1",
1313 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1315 "stub line 2"
1316 ]);
1317
1318 let options = LogLineCountOptions {
1319 predicate: Arc::new(|n| n == 1),
1320 timeout: Duration::from_secs(6),
1321 wait_until_timeout_elapses: true,
1322 };
1323
1324 let task = tokio::spawn({
1325 async move {
1326 mock_node
1327 .wait_log_line_count_with_timeout(
1328 "error(?! importing block .*: block has an unknown parent)",
1329 false,
1330 options,
1331 )
1332 .await
1333 .unwrap()
1334 }
1335 });
1336
1337 tokio::time::sleep(Duration::from_secs(1)).await;
1338
1339 mock_provider.logs_push(vec!["system ready", "system ready"]);
1340
1341 assert!(!task.await?.success());
1342
1343 Ok(())
1344 }
1345
1346 #[tokio::test(flavor = "multi_thread")]
1347 async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1348 let mock_provider = Arc::new(MockNode::new());
1349 let mock_node = NetworkNode::new(
1350 "node1",
1351 "ws_uri",
1352 "prometheus_uri",
1353 "multiaddr",
1354 NodeSpec::default(),
1355 mock_provider.clone(),
1356 );
1357
1358 mock_provider.logs_push(vec![
1359 "system booting",
1360 "stub line 1",
1361 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1363 "stub line 2"
1364 ]);
1365
1366 let task = tokio::spawn({
1367 async move {
1368 mock_node
1369 .wait_log_line_count(
1370 "error(?! importing block .*: block has an unknown parent)",
1371 false,
1372 1,
1373 )
1374 .await
1375 .unwrap()
1376 }
1377 });
1378
1379 tokio::time::sleep(Duration::from_secs(1)).await;
1380
1381 mock_provider.logs_push(vec![
1382 "system ready",
1383 "system error",
1385 "system ready",
1386 ]);
1387
1388 assert!(task.await.is_ok());
1389
1390 Ok(())
1391 }
1392
1393 #[tokio::test(flavor = "multi_thread")]
1394 async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1395 let mock_provider = Arc::new(MockNode::new());
1396 let mock_node = NetworkNode::new(
1397 "node1",
1398 "ws_uri",
1399 "prometheus_uri",
1400 "multiaddr",
1401 NodeSpec::default(),
1402 mock_provider.clone(),
1403 );
1404
1405 mock_provider.logs_push(vec![
1406 "system booting",
1407 "stub line 1",
1408 "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1410 "stub line 2"
1411 ]);
1412
1413 let options = LogLineCountOptions {
1414 predicate: Arc::new(|count| count == 1),
1415 timeout: Duration::from_secs(2),
1416 wait_until_timeout_elapses: true,
1417 };
1418
1419 let task = tokio::spawn({
1420 async move {
1421 mock_node
1423 .wait_log_line_count_with_timeout(
1424 "error(?! importing block .*: block has an unknown parent)",
1425 false,
1426 options,
1427 )
1428 .await
1429 .unwrap()
1430 }
1431 });
1432
1433 tokio::time::sleep(Duration::from_secs(1)).await;
1434
1435 mock_provider.logs_push(vec!["system ready", "system ready"]);
1436
1437 assert!(!task.await?.success());
1438
1439 Ok(())
1440 }
1441
1442 #[tokio::test]
1443 async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1444 use std::sync::Arc;
1446
1447 let mock_metrics = concat!(
1449 "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1450 "# TYPE substrate_block_verification_time histogram\n",
1451 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1452 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1453 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1454 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1455 "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1456 "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1457 "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1458 );
1459
1460 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1462 let addr = listener.local_addr()?;
1463 let metrics = Arc::new(mock_metrics.to_string());
1464
1465 tokio::spawn({
1466 let metrics = metrics.clone();
1467 async move {
1468 loop {
1469 if let Ok((mut socket, _)) = listener.accept().await {
1470 let metrics = metrics.clone();
1471 tokio::spawn(async move {
1472 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1473 let mut buffer = [0; 1024];
1474 let _ = socket.read(&mut buffer).await;
1475
1476 let response = format!(
1477 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1478 metrics.len(),
1479 metrics
1480 );
1481 let _ = socket.write_all(response.as_bytes()).await;
1482 });
1483 }
1484 }
1485 }
1486 });
1487
1488 let mock_provider = Arc::new(MockNode::new());
1490 let mock_node = NetworkNode::new(
1491 "test_node",
1492 "ws://localhost:9944",
1493 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1494 "/ip4/127.0.0.1/tcp/30333",
1495 NodeSpec::default(),
1496 mock_provider,
1497 );
1498
1499 let mut label_filters = HashMap::new();
1501 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1502 let buckets = mock_node
1503 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1504 .await?;
1505
1506 assert_eq!(buckets.get("0.1"), Some(&10));
1508 assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); let mut label_filters = std::collections::HashMap::new();
1515 label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1516
1517 let buckets_filtered = mock_node
1518 .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1519 .await?;
1520
1521 assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1522 assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1523
1524 let buckets_with_suffix = mock_node
1526 .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1527 .await?;
1528
1529 assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1530
1531 Ok(())
1532 }
1533
1534 #[tokio::test]
1535 async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1536 use std::sync::Arc;
1538
1539 let mock_metrics = concat!(
1540 "# HELP test_metric A test metric\n",
1541 "# TYPE test_metric histogram\n",
1542 "test_metric_bucket{le=\"2.5\"} 40\n",
1543 "test_metric_bucket{le=\"0.1\"} 10\n",
1544 "test_metric_bucket{le=\"+Inf\"} 42\n",
1545 "test_metric_bucket{le=\"1.0\"} 35\n",
1546 "test_metric_bucket{le=\"0.5\"} 25\n",
1547 );
1548
1549 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1550 let addr = listener.local_addr()?;
1551 let metrics = Arc::new(mock_metrics.to_string());
1552
1553 tokio::spawn({
1554 let metrics = metrics.clone();
1555 async move {
1556 loop {
1557 if let Ok((mut socket, _)) = listener.accept().await {
1558 let metrics = metrics.clone();
1559 tokio::spawn(async move {
1560 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561 let mut buffer = [0; 1024];
1562 let _ = socket.read(&mut buffer).await;
1563 let response = format!(
1564 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1565 metrics.len(),
1566 metrics
1567 );
1568 let _ = socket.write_all(response.as_bytes()).await;
1569 });
1570 }
1571 }
1572 }
1573 });
1574
1575 let mock_provider = Arc::new(MockNode::new());
1576 let mock_node = NetworkNode::new(
1577 "test_node",
1578 "ws://localhost:9944",
1579 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1580 "/ip4/127.0.0.1/tcp/30333",
1581 NodeSpec::default(),
1582 mock_provider,
1583 );
1584
1585 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1586
1587 assert_eq!(buckets.get("0.1"), Some(&10)); assert_eq!(buckets.get("0.5"), Some(&15)); assert_eq!(buckets.get("1.0"), Some(&10)); assert_eq!(buckets.get("2.5"), Some(&5)); assert_eq!(buckets.get("+Inf"), Some(&2)); Ok(())
1595 }
1596
1597 #[tokio::test]
1598 async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1599 use std::sync::Arc;
1601
1602 let mock_metrics = concat!(
1603 "# HELP test_metric A test metric\n",
1604 "# TYPE test_metric histogram\n",
1605 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1606 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1607 "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1608 );
1609
1610 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1611 let addr = listener.local_addr()?;
1612 let metrics = Arc::new(mock_metrics.to_string());
1613
1614 tokio::spawn({
1615 let metrics = metrics.clone();
1616 async move {
1617 loop {
1618 if let Ok((mut socket, _)) = listener.accept().await {
1619 let metrics = metrics.clone();
1620 tokio::spawn(async move {
1621 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1622 let mut buffer = [0; 1024];
1623 let _ = socket.read(&mut buffer).await;
1624 let response = format!(
1625 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1626 metrics.len(),
1627 metrics
1628 );
1629 let _ = socket.write_all(response.as_bytes()).await;
1630 });
1631 }
1632 }
1633 }
1634 });
1635
1636 let mock_provider = Arc::new(MockNode::new());
1637 let mock_node = NetworkNode::new(
1638 "test_node",
1639 "ws://localhost:9944",
1640 &format!("http://127.0.0.1:{}/metrics", addr.port()),
1641 "/ip4/127.0.0.1/tcp/30333",
1642 NodeSpec::default(),
1643 mock_provider,
1644 );
1645
1646 let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1648 assert_eq!(buckets.get("0.1"), Some(&5));
1649 assert_eq!(buckets.get("0.5"), Some(&10)); assert_eq!(buckets.get("+Inf"), Some(&5)); let mut label_filters = std::collections::HashMap::new();
1654 label_filters.insert("method".to_string(), "GET,POST".to_string());
1655
1656 let buckets_filtered = mock_node
1657 .get_histogram_buckets("test_metric", Some(label_filters))
1658 .await?;
1659
1660 assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1661 assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1662
1663 Ok(())
1664 }
1665
1666 #[test]
1667 fn test_compare_le_values() {
1668 use std::cmp::Ordering;
1669
1670 use crate::network::node::NetworkNode;
1671
1672 assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1674 assert_eq!(
1675 NetworkNode::compare_le_values("1.0", "0.5"),
1676 Ordering::Greater
1677 );
1678 assert_eq!(
1679 NetworkNode::compare_le_values("1.0", "1.0"),
1680 Ordering::Equal
1681 );
1682
1683 assert_eq!(
1685 NetworkNode::compare_le_values("+Inf", "999"),
1686 Ordering::Greater
1687 );
1688 assert_eq!(
1689 NetworkNode::compare_le_values("0.1", "+Inf"),
1690 Ordering::Less
1691 );
1692 assert_eq!(
1693 NetworkNode::compare_le_values("+Inf", "+Inf"),
1694 Ordering::Equal
1695 );
1696
1697 assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1699 assert_eq!(
1700 NetworkNode::compare_le_values("1000", "999"),
1701 Ordering::Greater
1702 );
1703 }
1704}