Skip to main content

zombienet_orchestrator/network/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use fancy_regex::Regex;
12use glob_match::glob_match;
13use prom_metrics_parser::MetricMap;
14use provider::{
15    types::{ExecutionResult, RunScriptOptions},
16    DynNode,
17};
18use serde::{Deserialize, Serialize, Serializer};
19use subxt::{backend::rpc::RpcClient, OnlineClient};
20use support::net::{skip_err_while_waiting, wait_ws_ready};
21use thiserror::Error;
22use tokio::sync::RwLock;
23use tracing::{debug, trace, warn};
24
25use crate::{network_spec::node::NodeSpec, tx_helper::client::get_client_from_url};
26
27type BoxedClosure = Box<dyn Fn(&str) -> Result<bool, anyhow::Error> + Send + Sync>;
28
29#[derive(Error, Debug)]
30pub enum NetworkNodeError {
31    #[error("metric '{0}' not found!")]
32    MetricNotFound(String),
33}
34
35#[derive(Clone, Serialize)]
36pub struct NetworkNode {
37    #[serde(serialize_with = "serialize_provider_node")]
38    pub(crate) inner: DynNode,
39    // TODO: do we need the full spec here?
40    // Maybe a reduce set of values.
41    pub(crate) spec: NodeSpec,
42    pub(crate) name: String,
43    pub(crate) ws_uri: String,
44    pub(crate) multiaddr: String,
45    pub(crate) prometheus_uri: String,
46    #[serde(skip)]
47    metrics_cache: Arc<RwLock<MetricMap>>,
48    #[serde(skip)]
49    is_running: Arc<AtomicBool>,
50}
51
52#[derive(Deserialize)]
53pub(crate) struct RawNetworkNode {
54    pub(crate) name: String,
55    pub(crate) ws_uri: String,
56    pub(crate) prometheus_uri: String,
57    pub(crate) multiaddr: String,
58    pub(crate) spec: NodeSpec,
59    pub(crate) inner: serde_json::Value,
60}
61
62/// Result of waiting for a certain number of log lines to appear.
63///
64/// Indicates whether the log line count condition was met within the timeout period.
65///
66/// # Variants
67/// - `TargetReached(count)` – The predicate condition was satisfied within the timeout.
68///     * `count`: The number of matching log lines at the time of satisfaction.
69/// - `TargetFailed(count)` – The condition was not met within the timeout.
70///     * `count`: The final number of matching log lines at timeout expiration.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum LogLineCount {
73    TargetReached(u32),
74    TargetFailed(u32),
75}
76
77impl LogLineCount {
78    pub fn success(&self) -> bool {
79        match self {
80            Self::TargetReached(..) => true,
81            Self::TargetFailed(..) => false,
82        }
83    }
84}
85
86/// Configuration for controlling log line count waiting behavior.
87///
88/// Allows specifying a custom predicate on the number of matching log lines,
89/// a timeout in seconds, and whether the system should wait the entire timeout duration.
90///
91/// # Fields
92/// - `predicate`: A function that takes the current number of matching lines and
93///   returns `true` if the condition is satisfied.
94/// - `timeout_secs`: Maximum number of seconds to wait.
95/// - `wait_until_timeout_elapses`: If `true`, the system will continue waiting
96///   for the full timeout duration, even if the condition is already met early.
97///   Useful when you need to verify sustained absence or stability (e.g., "ensure no new logs appear").
98#[derive(Clone)]
99pub struct LogLineCountOptions {
100    pub predicate: Arc<dyn Fn(u32) -> bool + Send + Sync>,
101    pub timeout: Duration,
102    pub wait_until_timeout_elapses: bool,
103}
104
105impl LogLineCountOptions {
106    pub fn new(
107        predicate: impl Fn(u32) -> bool + 'static + Send + Sync,
108        timeout: Duration,
109        wait_until_timeout_elapses: bool,
110    ) -> Self {
111        Self {
112            predicate: Arc::new(predicate),
113            timeout,
114            wait_until_timeout_elapses,
115        }
116    }
117
118    pub fn no_occurences_within_timeout(timeout: Duration) -> Self {
119        Self::new(|n| n == 0, timeout, true)
120    }
121
122    pub fn at_least_once(timeout: Duration) -> Self {
123        Self::new(|count| count >= 1, timeout, false)
124    }
125
126    pub fn exactly_once(timeout: Duration) -> Self {
127        Self::new(|count| count == 1, timeout, false)
128    }
129}
130
131// #[derive(Clone, Debug)]
132// pub struct QueryMetricOptions {
133//     use_cache: bool,
134//     treat_not_found_as_zero: bool,
135// }
136
137// impl Default for QueryMetricOptions {
138//     fn default() -> Self {
139//         Self { use_cache: false, treat_not_found_as_zero: true }
140//     }
141// }
142
143impl NetworkNode {
144    /// Create a new NetworkNode
145    pub(crate) fn new<T: Into<String>>(
146        name: T,
147        ws_uri: T,
148        prometheus_uri: T,
149        multiaddr: T,
150        spec: NodeSpec,
151        inner: DynNode,
152    ) -> Self {
153        Self {
154            name: name.into(),
155            ws_uri: ws_uri.into(),
156            prometheus_uri: prometheus_uri.into(),
157            inner,
158            spec,
159            multiaddr: multiaddr.into(),
160            metrics_cache: Arc::new(Default::default()),
161            is_running: Arc::new(AtomicBool::new(false)),
162        }
163    }
164
165    /// Check if the node is currently running (not paused).
166    ///
167    /// This returns the internal running state.
168    pub fn is_running(&self) -> bool {
169        self.is_running.load(Ordering::Acquire)
170    }
171
172    /// Check if the node is responsive by attempting to connect to its WebSocket endpoint.
173    ///
174    /// This performs an actual connection attempt with a short timeout (2 seconds).
175    /// Returns `true` if the node is reachable and responding, `false` otherwise.
176    ///
177    /// This is more robust than `is_running()` as it verifies the node is actually alive.
178    pub async fn is_responsive(&self) -> bool {
179        tokio::time::timeout(Duration::from_secs(2), wait_ws_ready(self.ws_uri()))
180            .await
181            .is_ok()
182    }
183
184    pub(crate) fn set_is_running(&self, is_running: bool) {
185        self.is_running.store(is_running, Ordering::Release);
186    }
187
188    pub(crate) fn set_multiaddr(&mut self, multiaddr: impl Into<String>) {
189        self.multiaddr = multiaddr.into();
190    }
191
192    pub fn name(&self) -> &str {
193        &self.name
194    }
195
196    pub fn args(&self) -> Vec<&str> {
197        self.inner.args()
198    }
199
200    pub fn spec(&self) -> &NodeSpec {
201        &self.spec
202    }
203
204    pub fn ws_uri(&self) -> &str {
205        &self.ws_uri
206    }
207
208    pub fn multiaddr(&self) -> &str {
209        self.multiaddr.as_ref()
210    }
211
212    // Subxt
213
214    /// Get the rpc client for the node
215    pub async fn rpc(&self) -> Result<RpcClient, subxt::Error> {
216        get_client_from_url(&self.ws_uri).await
217    }
218
219    /// Get the [online client](subxt::client::OnlineClient) for the node
220    #[deprecated = "Use `wait_client` instead."]
221    pub async fn client<Config: subxt::Config>(
222        &self,
223    ) -> Result<OnlineClient<Config>, subxt::Error> {
224        self.try_client().await
225    }
226
227    /// Try to connect to the node.
228    ///
229    /// Most of the time you only want to use [`NetworkNode::wait_client`] that waits for
230    /// the node to appear before it connects to it. This function directly tries
231    /// to connect to the node and returns an error if the node is not yet available
232    /// at that point in time.
233    ///
234    /// Returns a [`OnlineClient`] on success.
235    pub async fn try_client<Config: subxt::Config>(
236        &self,
237    ) -> Result<OnlineClient<Config>, subxt::Error> {
238        get_client_from_url(&self.ws_uri).await
239    }
240
241    /// Wait until get the [online client](subxt::client::OnlineClient) for the node
242    pub async fn wait_client<Config: subxt::Config>(
243        &self,
244    ) -> Result<OnlineClient<Config>, anyhow::Error> {
245        debug!("wait_client ws_uri: {}", self.ws_uri());
246        wait_ws_ready(self.ws_uri())
247            .await
248            .map_err(|e| anyhow!("Error awaiting http_client to ws be ready, err: {e}"))?;
249
250        self.try_client()
251            .await
252            .map_err(|e| anyhow!("Can't create a subxt client, err: {e}"))
253    }
254
255    /// Wait until get the [online client](subxt::client::OnlineClient) for the node with a defined timeout
256    pub async fn wait_client_with_timeout<Config: subxt::Config>(
257        &self,
258        timeout_secs: impl Into<u64>,
259    ) -> Result<OnlineClient<Config>, anyhow::Error> {
260        debug!("waiting until subxt client is ready");
261        tokio::time::timeout(
262            Duration::from_secs(timeout_secs.into()),
263            self.wait_client::<Config>(),
264        )
265        .await?
266    }
267
268    // Commands
269
270    /// Pause the node, this is implemented by pausing the
271    /// actual process (e.g polkadot) with sending `SIGSTOP` signal
272    ///
273    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
274    /// with global setting `teardown_on_failure` disabled.
275    pub async fn pause(&self) -> Result<(), anyhow::Error> {
276        self.set_is_running(false);
277        self.inner.pause().await?;
278        Ok(())
279    }
280
281    /// Resume the node, this is implemented by resuming the
282    /// actual process (e.g polkadot) with sending `SIGCONT` signal
283    ///
284    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
285    /// with global setting `teardown_on_failure` disabled.
286    pub async fn resume(&self) -> Result<(), anyhow::Error> {
287        self.set_is_running(true);
288        self.inner.resume().await?;
289        Ok(())
290    }
291
292    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
293    ///
294    /// Note: If you're using this method with the native provider on the attached network, the live network has to be running
295    /// with global setting `teardown_on_failure` disabled.
296    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
297        self.set_is_running(false);
298        self.inner.restart(after).await?;
299        self.set_is_running(true);
300        Ok(())
301    }
302
303    /// Run a script inside the node's container/environment
304    ///
305    /// The script will be uploaded to the node, made executable, and executed with
306    /// the provided arguments and environment variables.
307    ///
308    /// Returns `Ok(stdout)` on success, or `Err((exit_status, stderr))` on failure.
309    pub async fn run_script(
310        &self,
311        options: RunScriptOptions,
312    ) -> Result<ExecutionResult, anyhow::Error> {
313        self.inner
314            .run_script(options)
315            .await
316            .map_err(|e| anyhow!("Failed to run script: {e}"))
317    }
318
319    // Metrics assertions
320
321    /// Get metric value 'by name' from Prometheus (exposed by the node)
322    /// metric name can be:
323    /// with prefix (e.g: 'polkadot_')
324    /// with chain attribute (e.g: 'chain=rococo-local')
325    /// without prefix and/or without chain attribute
326    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
327        let metric_name = metric_name.into();
328        // force cache reload
329        self.fetch_metrics().await?;
330        // by default we treat not found as 0 (same in v1)
331        self.metric(&metric_name, true).await
332    }
333
334    /// Assert on a metric value 'by name' from Prometheus (exposed by the node)
335    /// metric name can be:
336    /// with prefix (e.g: 'polkadot_')
337    /// with chain attribute (e.g: 'chain=rococo-local')
338    /// without prefix and/or without chain attribute
339    ///
340    /// We first try to assert on the value using the cached metrics and
341    /// if not meet the criteria we reload the cache and check again
342    pub async fn assert(
343        &self,
344        metric_name: impl Into<String>,
345        value: impl Into<f64>,
346    ) -> Result<bool, anyhow::Error> {
347        let value: f64 = value.into();
348        self.assert_with(metric_name, |v| v == value).await
349    }
350
351    /// Assert on a metric value using a given predicate.
352    /// See [`NetworkNode::reports`] description for details on metric name.
353    pub async fn assert_with(
354        &self,
355        metric_name: impl Into<String>,
356        predicate: impl Fn(f64) -> bool,
357    ) -> Result<bool, anyhow::Error> {
358        let metric_name = metric_name.into();
359        // reload metrics
360        self.fetch_metrics().await?;
361        let val = self.metric(&metric_name, true).await?;
362        trace!("🔎 Current value {val} passed to the predicated?");
363        Ok(predicate(val))
364    }
365
366    // Wait methods for metrics
367
368    /// Wait until a metric value pass the `predicate`
369    pub async fn wait_metric(
370        &self,
371        metric_name: impl Into<String>,
372        predicate: impl Fn(f64) -> bool,
373    ) -> Result<(), anyhow::Error> {
374        let metric_name = metric_name.into();
375        debug!("waiting until metric {metric_name} pass the predicate");
376        loop {
377            let res = self.assert_with(&metric_name, &predicate).await;
378            match res {
379                Ok(res) => {
380                    if res {
381                        return Ok(());
382                    }
383                },
384                Err(e) => match e.downcast::<reqwest::Error>() {
385                    Ok(io_err) => {
386                        if !skip_err_while_waiting(&io_err) {
387                            return Err(io_err.into());
388                        }
389                    },
390                    Err(other) => {
391                        match other.downcast::<NetworkNodeError>() {
392                            Ok(node_err) => {
393                                if !matches!(node_err, NetworkNodeError::MetricNotFound(_)) {
394                                    return Err(node_err.into());
395                                }
396                            },
397                            Err(other) => return Err(other),
398                        };
399                    },
400                },
401            }
402
403            // sleep to not spam prometheus
404            tokio::time::sleep(Duration::from_secs(1)).await;
405        }
406    }
407
408    /// Wait until a metric value pass the `predicate`
409    /// with a timeout (secs)
410    pub async fn wait_metric_with_timeout(
411        &self,
412        metric_name: impl Into<String>,
413        predicate: impl Fn(f64) -> bool,
414        timeout_secs: impl Into<u64>,
415    ) -> Result<(), anyhow::Error> {
416        let metric_name = metric_name.into();
417        let secs = timeout_secs.into();
418        debug!("waiting until metric {metric_name} pass the predicate");
419        let res = tokio::time::timeout(
420            Duration::from_secs(secs),
421            self.wait_metric(&metric_name, predicate),
422        )
423        .await;
424
425        if let Ok(inner_res) = res {
426            match inner_res {
427                Ok(_) => Ok(()),
428                Err(e) => Err(anyhow!("Error waiting for metric: {e}")),
429            }
430        } else {
431            // timeout
432            Err(anyhow!(
433                "Timeout ({secs}), waiting for metric {metric_name} pass the predicate"
434            ))
435        }
436    }
437
438    // Logs
439
440    /// Get the logs of the node
441    /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
442    pub async fn logs(&self) -> Result<String, anyhow::Error> {
443        Ok(self.inner.logs().await?)
444    }
445
446    /// Wait until a the number of matching log lines is reach
447    pub async fn wait_log_line_count(
448        &self,
449        pattern: impl Into<String>,
450        is_glob: bool,
451        count: usize,
452    ) -> Result<(), anyhow::Error> {
453        let pattern = pattern.into();
454        let pattern_clone = pattern.clone();
455        debug!("waiting until we find pattern {pattern} {count} times");
456        let match_fn: BoxedClosure = if is_glob {
457            Box::new(move |line: &str| Ok(glob_match(&pattern, line)))
458        } else {
459            let re = Regex::new(&pattern)?;
460            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
461        };
462
463        loop {
464            let mut q = 0_usize;
465            let logs = self.logs().await?;
466            for line in logs.lines() {
467                trace!("line is {line}");
468                if match_fn(line)? {
469                    trace!("pattern {pattern_clone} match in line {line}");
470                    q += 1;
471                    if q >= count {
472                        return Ok(());
473                    }
474                }
475            }
476
477            tokio::time::sleep(Duration::from_secs(2)).await;
478        }
479    }
480
481    /// Waits until the number of matching log lines satisfies a custom condition,
482    /// optionally waiting for the entire duration of the timeout.
483    ///
484    /// This method searches log lines for a given substring or glob pattern,
485    /// and evaluates the number of matching lines using a user-provided predicate function.
486    /// Optionally, it can wait for the full timeout duration to ensure the condition
487    /// holds consistently (e.g., for verifying absence of logs).
488    ///
489    /// # Arguments
490    /// * `substring` - The substring or pattern to match within log lines.
491    /// * `is_glob` - Whether to treat `substring` as a glob pattern (`true`) or a regex (`false`).
492    /// * `options` - Configuration for timeout, match count predicate, and full-duration waiting.
493    ///
494    /// # Returns
495    /// * `Ok(LogLineCount::TargetReached(n))` if the predicate was satisfied within the timeout,
496    /// * `Ok(LogLineCount::TargetFails(n))` if the predicate was not satisfied in time,
497    /// * `Err(e)` if an error occurred during log retrieval or matching.
498    ///
499    /// # Example
500    /// ```rust
501    /// # use std::{sync::Arc, time::Duration};
502    /// # use provider::NativeProvider;
503    /// # use support::{fs::local::LocalFileSystem};
504    /// # use zombienet_orchestrator::{Orchestrator, network::node::{NetworkNode, LogLineCountOptions}};
505    /// # use configuration::NetworkConfig;
506    /// # async fn example() -> Result<(), anyhow::Error> {
507    /// #   let provider = NativeProvider::new(LocalFileSystem {});
508    /// #   let orchestrator = Orchestrator::new(LocalFileSystem {}, provider);
509    /// #   let config = NetworkConfig::load_from_toml("config.toml")?;
510    /// #   let network = orchestrator.spawn(config).await?;
511    /// let node = network.get_node("alice")?;
512    /// // Wait (up to 10 seconds) until pattern occurs once
513    /// let options = LogLineCountOptions {
514    ///     predicate: Arc::new(|count| count == 1),
515    ///     timeout: Duration::from_secs(10),
516    ///     wait_until_timeout_elapses: false,
517    /// };
518    /// let result = node
519    ///     .wait_log_line_count_with_timeout("error", false, options)
520    ///     .await?;
521    /// #   Ok(())
522    /// # }
523    /// ```
524    pub async fn wait_log_line_count_with_timeout(
525        &self,
526        substring: impl Into<String>,
527        is_glob: bool,
528        options: LogLineCountOptions,
529    ) -> Result<LogLineCount, anyhow::Error> {
530        let substring = substring.into();
531        debug!(
532            "waiting until match lines count within {} seconds",
533            options.timeout.as_secs_f64()
534        );
535
536        let start = tokio::time::Instant::now();
537
538        let match_fn: BoxedClosure = if is_glob {
539            Box::new(move |line: &str| Ok(glob_match(&substring, line)))
540        } else {
541            let re = Regex::new(&substring)?;
542            Box::new(move |line: &str| re.is_match(line).map_err(|e| anyhow!(e.to_string())))
543        };
544
545        if options.wait_until_timeout_elapses {
546            tokio::time::sleep(options.timeout).await;
547        }
548
549        let mut q;
550        loop {
551            q = 0_u32;
552            let logs = self.logs().await?;
553            for line in logs.lines() {
554                if match_fn(line)? {
555                    q += 1;
556
557                    // If `wait_until_timeout_elapses` is set then check the condition just once at the
558                    // end after the whole log file is processed. This is to address the cases when the
559                    // predicate becomes true and false again.
560                    // eg. expected exactly 2 matching lines are expected but 3 are present
561                    if !options.wait_until_timeout_elapses && (options.predicate)(q) {
562                        return Ok(LogLineCount::TargetReached(q));
563                    }
564                }
565            }
566
567            if start.elapsed() >= options.timeout {
568                break;
569            }
570
571            tokio::time::sleep(Duration::from_secs(2)).await;
572        }
573
574        if (options.predicate)(q) {
575            Ok(LogLineCount::TargetReached(q))
576        } else {
577            Ok(LogLineCount::TargetFailed(q))
578        }
579    }
580
581    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
582        let response = reqwest::get(&self.prometheus_uri).await?;
583        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
584        let mut cache = self.metrics_cache.write().await;
585        *cache = metrics;
586        Ok(())
587    }
588
589    /// Query individual metric by name
590    async fn metric(
591        &self,
592        metric_name: &str,
593        treat_not_found_as_zero: bool,
594    ) -> Result<f64, anyhow::Error> {
595        let mut metrics_map = self.metrics_cache.read().await;
596        if metrics_map.is_empty() {
597            // reload metrics
598            drop(metrics_map);
599            self.fetch_metrics().await?;
600            metrics_map = self.metrics_cache.read().await;
601        }
602
603        if let Some(val) = metrics_map.get(metric_name) {
604            Ok(*val)
605        } else if treat_not_found_as_zero {
606            Ok(0_f64)
607        } else {
608            Err(NetworkNodeError::MetricNotFound(metric_name.into()).into())
609        }
610    }
611
612    /// Fetches histogram buckets for a given metric from the Prometheus endpoint.
613    ///
614    /// This function retrieves histogram bucket data by parsing the Prometheus metrics
615    /// and calculating the count of observations in each bucket. It automatically appends
616    /// `_bucket` suffix to the metric name if not already present.
617    ///
618    /// # Arguments
619    /// * `metric_name` - The name of the histogram metric (with or without `_bucket` suffix)
620    /// * `label_filters` - Optional HashMap of label key-value pairs to filter metrics by
621    ///
622    /// # Returns
623    /// A HashMap where keys are the `le` bucket boundaries as strings,
624    /// and values are the count of observations in each bucket (calculated as delta from previous bucket).
625    ///
626    /// # Example
627    /// ```ignore
628    /// let buckets = node.get_histogram_buckets("polkadot_pvf_execution_time", None).await?;
629    /// // Returns: {"0.1": 5, "0.5": 10, "1.0": 3, "+Inf": 0}
630    /// ```
631    pub async fn get_histogram_buckets(
632        &self,
633        metric_name: impl AsRef<str>,
634        label_filters: Option<HashMap<String, String>>,
635    ) -> Result<HashMap<String, u64>, anyhow::Error> {
636        let metric_name = metric_name.as_ref();
637
638        // Fetch and parse metrics using the existing parser
639        let response = reqwest::get(&self.prometheus_uri).await?;
640        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
641
642        // Ensure metric name has _bucket suffix
643        let resolved_metric_name = if metric_name.contains("_bucket") {
644            metric_name.to_string()
645        } else {
646            format!("{}_bucket", metric_name)
647        };
648
649        // First pass: collect all matching metrics with their label counts
650        // to identify which ones have the most complete label sets
651        // Each entry contains: (full_metric_key, parsed_labels_map, cumulative_count)
652        let mut metric_entries: Vec<(String, HashMap<String, String>, u64)> = Vec::new();
653
654        for (key, &value) in metrics.iter() {
655            if !key.starts_with(&resolved_metric_name) {
656                continue;
657            }
658
659            let remaining = &key[resolved_metric_name.len()..];
660
661            let labels_str = &remaining[1..remaining.len() - 1];
662            let parsed_labels = Self::parse_label_string(labels_str);
663
664            // Must have "le" label
665            if !parsed_labels.contains_key("le") {
666                continue;
667            }
668
669            // Check if label filters match
670            if let Some(ref filters) = label_filters {
671                let mut all_match = true;
672                for (filter_key, filter_value) in filters {
673                    if parsed_labels.get(filter_key) != Some(filter_value) {
674                        all_match = false;
675                        break;
676                    }
677                }
678                if !all_match {
679                    continue;
680                }
681            }
682
683            metric_entries.push((key.clone(), parsed_labels, value as u64));
684        }
685
686        // Find the maximum number of labels (excluding "le") across all entries
687        // This helps us identify the "fullest" version of each metric
688        let max_label_count = metric_entries
689            .iter()
690            .map(|(_, labels, _)| labels.iter().filter(|(k, _)| k.as_str() != "le").count())
691            .max()
692            .unwrap_or(0);
693
694        // Second pass: collect buckets, deduplicating and preferring entries with more labels
695        let mut raw_buckets: Vec<(String, u64)> = Vec::new();
696        let mut seen_le_values = HashSet::new();
697        let mut active_series: Option<Vec<(String, String)>> = None;
698
699        for (_, parsed_labels, value) in metric_entries {
700            let le_value = parsed_labels.get("le").unwrap().clone();
701
702            // Get non-"le" labels
703            let mut non_le_labels: Vec<(String, String)> = parsed_labels
704                .iter()
705                .filter(|(k, _)| k.as_str() != "le")
706                .map(|(k, v)| (k.clone(), v.clone()))
707                .collect();
708            non_le_labels.sort();
709
710            // Only process entries that have the maximum number of labels
711            // (this filters out the parser's duplicate keys with fewer labels)
712            if non_le_labels.len() < max_label_count {
713                continue;
714            }
715
716            // Detect series changes
717            if let Some(ref prev_series) = active_series {
718                if prev_series != &non_le_labels {
719                    if !raw_buckets.is_empty() {
720                        break; // Stop at first series change
721                    }
722                    active_series = Some(non_le_labels.clone());
723                    seen_le_values.clear();
724                }
725            } else {
726                active_series = Some(non_le_labels.clone());
727            }
728
729            // Deduplicate by le value within this series
730            if !seen_le_values.insert(le_value.clone()) {
731                continue;
732            }
733
734            trace!("{} le:{} {}", resolved_metric_name, &le_value, value);
735            raw_buckets.push((le_value, value));
736        }
737
738        // Sort buckets by their "le" values
739        raw_buckets.sort_by(|a, b| Self::compare_le_values(&a.0, &b.0));
740
741        // Calculate deltas between cumulative buckets
742        let mut buckets = HashMap::new();
743        let mut previous_value = 0_u64;
744        for (le, cumulative_count) in raw_buckets {
745            if cumulative_count < previous_value {
746                warn!(
747                    "Warning: bucket count decreased from {} to {} at le={}",
748                    previous_value, cumulative_count, le
749                );
750            }
751            let delta = cumulative_count.saturating_sub(previous_value);
752            buckets.insert(le, delta);
753            previous_value = cumulative_count;
754        }
755
756        Ok(buckets)
757    }
758
759    /// Parse label string from parsed metric key.
760    ///
761    /// Takes a label string in the format `key1="value1",key2="value2"`
762    /// and returns a HashMap of key-value pairs.
763    /// Handles commas inside quoted values correctly.
764    fn parse_label_string(labels_str: &str) -> HashMap<String, String> {
765        let mut labels = HashMap::new();
766        let mut current_key = String::new();
767        let mut current_value = String::new();
768        let mut in_value = false;
769        let mut in_quotes = false;
770
771        for ch in labels_str.chars() {
772            match ch {
773                '=' if !in_quotes && !in_value => {
774                    in_value = true;
775                },
776                '"' if in_value => {
777                    in_quotes = !in_quotes;
778                },
779                ',' if !in_quotes => {
780                    // End of key-value pair
781                    if !current_key.is_empty() {
782                        labels.insert(
783                            current_key.trim().to_string(),
784                            current_value.trim().to_string(),
785                        );
786                        current_key.clear();
787                        current_value.clear();
788                        in_value = false;
789                    }
790                },
791                _ => {
792                    if in_value {
793                        current_value.push(ch);
794                    } else {
795                        current_key.push(ch);
796                    }
797                },
798            }
799        }
800
801        // Insert last pair
802        if !current_key.is_empty() {
803            labels.insert(
804                current_key.trim().to_string(),
805                current_value.trim().to_string(),
806            );
807        }
808
809        labels
810    }
811
812    /// Compare two histogram bucket boundary values for sorting.
813    ///
814    /// Treats "+Inf" as the maximum value, otherwise compares numerically.
815    fn compare_le_values(a: &str, b: &str) -> std::cmp::Ordering {
816        use std::cmp::Ordering;
817
818        // Handle +Inf specially
819        match (a, b) {
820            ("+Inf", "+Inf") => Ordering::Equal,
821            ("+Inf", _) => Ordering::Greater,
822            (_, "+Inf") => Ordering::Less,
823            _ => {
824                // Try to parse as f64 for numeric comparison
825                match (a.parse::<f64>(), b.parse::<f64>()) {
826                    (Ok(a_val), Ok(b_val)) => a_val.partial_cmp(&b_val).unwrap_or(Ordering::Equal),
827                    // Fallback to string comparison if parsing fails
828                    _ => a.cmp(b),
829                }
830            },
831        }
832    }
833
834    /// Waits given number of seconds until node reports that it is up and running, which
835    /// is determined by metric 'process_start_time_seconds', which should appear,
836    /// when node finished booting up.
837    ///
838    ///
839    /// # Arguments
840    /// * `timeout_secs` - The number of seconds to wait.
841    ///
842    /// # Returns
843    /// * `Ok()` if the node is up before timeout occured.
844    /// * `Err(e)` if timeout or other error occurred while waiting.
845    pub async fn wait_until_is_up(
846        &self,
847        timeout_secs: impl Into<u64>,
848    ) -> Result<(), anyhow::Error> {
849        self.wait_metric_with_timeout("process_start_time_seconds", |b| b >= 1.0, timeout_secs)
850            .await
851            .map_err(|err| anyhow::anyhow!("{}: {:?}", self.name(), err))
852    }
853}
854
855impl std::fmt::Debug for NetworkNode {
856    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
857        f.debug_struct("NetworkNode")
858            .field("inner", &"inner_skipped")
859            .field("spec", &self.spec)
860            .field("name", &self.name)
861            .field("ws_uri", &self.ws_uri)
862            .field("prometheus_uri", &self.prometheus_uri)
863            .finish()
864    }
865}
866
867fn serialize_provider_node<S>(node: &DynNode, serializer: S) -> Result<S::Ok, S::Error>
868where
869    S: Serializer,
870{
871    erased_serde::serialize(node.as_ref(), serializer)
872}
873
874// TODO: mock and impl more unit tests
875#[cfg(test)]
876mod tests {
877    use std::{
878        path::{Path, PathBuf},
879        sync::{Arc, Mutex},
880    };
881
882    use async_trait::async_trait;
883    use provider::{types::*, ProviderError, ProviderNode};
884
885    use super::*;
886
887    #[derive(Serialize)]
888    struct MockNode {
889        logs: Arc<Mutex<Vec<String>>>,
890    }
891
892    impl MockNode {
893        fn new() -> Self {
894            Self {
895                logs: Arc::new(Mutex::new(vec![])),
896            }
897        }
898
899        fn logs_push(&self, lines: Vec<impl Into<String>>) {
900            self.logs
901                .lock()
902                .unwrap()
903                .extend(lines.into_iter().map(|l| l.into()));
904        }
905    }
906
907    #[async_trait]
908    impl ProviderNode for MockNode {
909        fn name(&self) -> &str {
910            todo!()
911        }
912
913        fn args(&self) -> Vec<&str> {
914            todo!()
915        }
916
917        fn base_dir(&self) -> &PathBuf {
918            todo!()
919        }
920
921        fn config_dir(&self) -> &PathBuf {
922            todo!()
923        }
924
925        fn data_dir(&self) -> &PathBuf {
926            todo!()
927        }
928
929        fn relay_data_dir(&self) -> &PathBuf {
930            todo!()
931        }
932
933        fn scripts_dir(&self) -> &PathBuf {
934            todo!()
935        }
936
937        fn log_path(&self) -> &PathBuf {
938            todo!()
939        }
940
941        fn log_cmd(&self) -> String {
942            todo!()
943        }
944
945        fn path_in_node(&self, _file: &Path) -> PathBuf {
946            todo!()
947        }
948
949        async fn logs(&self) -> Result<String, ProviderError> {
950            Ok(self.logs.lock().unwrap().join("\n"))
951        }
952
953        async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> {
954            todo!()
955        }
956
957        async fn run_command(
958            &self,
959            _options: RunCommandOptions,
960        ) -> Result<ExecutionResult, ProviderError> {
961            todo!()
962        }
963
964        async fn run_script(
965            &self,
966            _options: RunScriptOptions,
967        ) -> Result<ExecutionResult, ProviderError> {
968            todo!()
969        }
970
971        async fn send_file(
972            &self,
973            _local_file_path: &Path,
974            _remote_file_path: &Path,
975            _mode: &str,
976        ) -> Result<(), ProviderError> {
977            todo!()
978        }
979
980        async fn receive_file(
981            &self,
982            _remote_file_path: &Path,
983            _local_file_path: &Path,
984        ) -> Result<(), ProviderError> {
985            todo!()
986        }
987
988        async fn pause(&self) -> Result<(), ProviderError> {
989            todo!()
990        }
991
992        async fn resume(&self) -> Result<(), ProviderError> {
993            todo!()
994        }
995
996        async fn restart(&self, _after: Option<Duration>) -> Result<(), ProviderError> {
997            todo!()
998        }
999
1000        async fn destroy(&self) -> Result<(), ProviderError> {
1001            todo!()
1002        }
1003    }
1004
1005    #[tokio::test(flavor = "multi_thread")]
1006    async fn test_wait_log_count_target_reached_immediately() -> Result<(), anyhow::Error> {
1007        let mock_provider = Arc::new(MockNode::new());
1008        let mock_node = NetworkNode::new(
1009            "node1",
1010            "ws_uri",
1011            "prometheus_uri",
1012            "multiaddr",
1013            NodeSpec::default(),
1014            mock_provider.clone(),
1015        );
1016
1017        mock_provider.logs_push(vec![
1018            "system booting",
1019            "stub line 1",
1020            "stub line 2",
1021            "system ready",
1022        ]);
1023
1024        // Wait (up to 10 seconds) until pattern occurs once
1025        let options = LogLineCountOptions {
1026            predicate: Arc::new(|n| n == 1),
1027            timeout: Duration::from_secs(10),
1028            wait_until_timeout_elapses: false,
1029        };
1030
1031        let log_line_count = mock_node
1032            .wait_log_line_count_with_timeout("system ready", false, options)
1033            .await?;
1034
1035        assert!(matches!(log_line_count, LogLineCount::TargetReached(1)));
1036
1037        Ok(())
1038    }
1039
1040    #[tokio::test(flavor = "multi_thread")]
1041    async fn test_wait_log_count_target_reached_after_delay() -> Result<(), anyhow::Error> {
1042        let mock_provider = Arc::new(MockNode::new());
1043        let mock_node = NetworkNode::new(
1044            "node1",
1045            "ws_uri",
1046            "prometheus_uri",
1047            "multiaddr",
1048            NodeSpec::default(),
1049            mock_provider.clone(),
1050        );
1051
1052        mock_provider.logs_push(vec![
1053            "system booting",
1054            "stub line 1",
1055            "stub line 2",
1056            "system ready",
1057        ]);
1058
1059        // Wait (up to 4 seconds) until pattern occurs twice
1060        let options = LogLineCountOptions {
1061            predicate: Arc::new(|n| n == 2),
1062            timeout: Duration::from_secs(4),
1063            wait_until_timeout_elapses: false,
1064        };
1065
1066        let task = tokio::spawn({
1067            async move {
1068                mock_node
1069                    .wait_log_line_count_with_timeout("system ready", false, options)
1070                    .await
1071                    .unwrap()
1072            }
1073        });
1074
1075        tokio::time::sleep(Duration::from_secs(2)).await;
1076
1077        mock_provider.logs_push(vec!["system ready"]);
1078
1079        let log_line_count = task.await?;
1080
1081        assert!(matches!(log_line_count, LogLineCount::TargetReached(2)));
1082
1083        Ok(())
1084    }
1085
1086    #[tokio::test(flavor = "multi_thread")]
1087    async fn test_wait_log_count_target_failed_timeout() -> Result<(), anyhow::Error> {
1088        let mock_provider = Arc::new(MockNode::new());
1089        let mock_node = NetworkNode::new(
1090            "node1",
1091            "ws_uri",
1092            "prometheus_uri",
1093            "multiaddr",
1094            NodeSpec::default(),
1095            mock_provider.clone(),
1096        );
1097
1098        mock_provider.logs_push(vec![
1099            "system booting",
1100            "stub line 1",
1101            "stub line 2",
1102            "system ready",
1103        ]);
1104
1105        // Wait (up to 2 seconds) until pattern occurs twice
1106        let options = LogLineCountOptions {
1107            predicate: Arc::new(|n| n == 2),
1108            timeout: Duration::from_secs(2),
1109            wait_until_timeout_elapses: false,
1110        };
1111
1112        let log_line_count = mock_node
1113            .wait_log_line_count_with_timeout("system ready", false, options)
1114            .await?;
1115
1116        assert!(matches!(log_line_count, LogLineCount::TargetFailed(1)));
1117
1118        Ok(())
1119    }
1120
1121    #[tokio::test(flavor = "multi_thread")]
1122    async fn test_wait_log_count_target_failed_exceeded() -> Result<(), anyhow::Error> {
1123        let mock_provider = Arc::new(MockNode::new());
1124        let mock_node = NetworkNode::new(
1125            "node1",
1126            "ws_uri",
1127            "prometheus_uri",
1128            "multiaddr",
1129            NodeSpec::default(),
1130            mock_provider.clone(),
1131        );
1132
1133        mock_provider.logs_push(vec![
1134            "system booting",
1135            "stub line 1",
1136            "stub line 2",
1137            "system ready",
1138        ]);
1139
1140        // Wait until timeout and check if pattern occurs exactly twice
1141        let options = LogLineCountOptions {
1142            predicate: Arc::new(|n| n == 2),
1143            timeout: Duration::from_secs(2),
1144            wait_until_timeout_elapses: true,
1145        };
1146
1147        let task = tokio::spawn({
1148            async move {
1149                mock_node
1150                    .wait_log_line_count_with_timeout("system ready", false, options)
1151                    .await
1152                    .unwrap()
1153            }
1154        });
1155
1156        tokio::time::sleep(Duration::from_secs(1)).await;
1157
1158        mock_provider.logs_push(vec!["system ready"]);
1159        mock_provider.logs_push(vec!["system ready"]);
1160
1161        let log_line_count = task.await?;
1162
1163        assert!(matches!(log_line_count, LogLineCount::TargetFailed(3)));
1164
1165        Ok(())
1166    }
1167
1168    #[tokio::test(flavor = "multi_thread")]
1169    async fn test_wait_log_count_target_reached_no_occurences() -> Result<(), anyhow::Error> {
1170        let mock_provider = Arc::new(MockNode::new());
1171        let mock_node = NetworkNode::new(
1172            "node1",
1173            "ws_uri",
1174            "prometheus_uri",
1175            "multiaddr",
1176            NodeSpec::default(),
1177            mock_provider.clone(),
1178        );
1179
1180        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1181
1182        let task = tokio::spawn({
1183            async move {
1184                mock_node
1185                    .wait_log_line_count_with_timeout(
1186                        "system ready",
1187                        false,
1188                        // Wait until timeout and make sure pattern occurred zero times
1189                        LogLineCountOptions::no_occurences_within_timeout(Duration::from_secs(2)),
1190                    )
1191                    .await
1192                    .unwrap()
1193            }
1194        });
1195
1196        tokio::time::sleep(Duration::from_secs(1)).await;
1197
1198        mock_provider.logs_push(vec!["stub line 3"]);
1199
1200        assert!(task.await?.success());
1201
1202        Ok(())
1203    }
1204
1205    #[tokio::test(flavor = "multi_thread")]
1206    async fn test_wait_log_count_target_reached_in_range() -> Result<(), anyhow::Error> {
1207        let mock_provider = Arc::new(MockNode::new());
1208        let mock_node = NetworkNode::new(
1209            "node1",
1210            "ws_uri",
1211            "prometheus_uri",
1212            "multiaddr",
1213            NodeSpec::default(),
1214            mock_provider.clone(),
1215        );
1216
1217        mock_provider.logs_push(vec!["system booting", "stub line 1", "stub line 2"]);
1218
1219        // Wait until timeout and make sure pattern occurrence count is in range between 2 and 5
1220        let options = LogLineCountOptions {
1221            predicate: Arc::new(|n| (2..=5).contains(&n)),
1222            timeout: Duration::from_secs(2),
1223            wait_until_timeout_elapses: true,
1224        };
1225
1226        let task = tokio::spawn({
1227            async move {
1228                mock_node
1229                    .wait_log_line_count_with_timeout("system ready", false, options)
1230                    .await
1231                    .unwrap()
1232            }
1233        });
1234
1235        tokio::time::sleep(Duration::from_secs(1)).await;
1236
1237        mock_provider.logs_push(vec!["system ready", "system ready", "system ready"]);
1238
1239        assert!(task.await?.success());
1240
1241        Ok(())
1242    }
1243
1244    #[tokio::test(flavor = "multi_thread")]
1245    async fn test_wait_log_count_with_timeout_with_lookahead_regex() -> Result<(), anyhow::Error> {
1246        let mock_provider = Arc::new(MockNode::new());
1247        let mock_node = NetworkNode::new(
1248            "node1",
1249            "ws_uri",
1250            "prometheus_uri",
1251            "multiaddr",
1252            NodeSpec::default(),
1253            mock_provider.clone(),
1254        );
1255
1256        mock_provider.logs_push(vec![
1257            "system booting",
1258            "stub line 1",
1259            // this line should not match
1260            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1261            "stub line 2"
1262        ]);
1263
1264        let options = LogLineCountOptions {
1265            predicate: Arc::new(|n| n == 1),
1266            timeout: Duration::from_secs(3),
1267            wait_until_timeout_elapses: true,
1268        };
1269
1270        let task = tokio::spawn({
1271            async move {
1272                mock_node
1273                    .wait_log_line_count_with_timeout(
1274                        "error(?! importing block .*: block has an unknown parent)",
1275                        false,
1276                        options,
1277                    )
1278                    .await
1279                    .unwrap()
1280            }
1281        });
1282
1283        tokio::time::sleep(Duration::from_secs(1)).await;
1284
1285        mock_provider.logs_push(vec![
1286            "system ready",
1287            // this line should match
1288            "system error",
1289            "system ready",
1290        ]);
1291
1292        assert!(task.await?.success());
1293
1294        Ok(())
1295    }
1296
1297    #[tokio::test(flavor = "multi_thread")]
1298    async fn test_wait_log_count_with_timeout_with_lookahead_regex_fails(
1299    ) -> Result<(), anyhow::Error> {
1300        let mock_provider = Arc::new(MockNode::new());
1301        let mock_node = NetworkNode::new(
1302            "node1",
1303            "ws_uri",
1304            "prometheus_uri",
1305            "multiaddr",
1306            NodeSpec::default(),
1307            mock_provider.clone(),
1308        );
1309
1310        mock_provider.logs_push(vec![
1311            "system booting",
1312            "stub line 1",
1313            // this line should not match
1314            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1315            "stub line 2"
1316        ]);
1317
1318        let options = LogLineCountOptions {
1319            predicate: Arc::new(|n| n == 1),
1320            timeout: Duration::from_secs(6),
1321            wait_until_timeout_elapses: true,
1322        };
1323
1324        let task = tokio::spawn({
1325            async move {
1326                mock_node
1327                    .wait_log_line_count_with_timeout(
1328                        "error(?! importing block .*: block has an unknown parent)",
1329                        false,
1330                        options,
1331                    )
1332                    .await
1333                    .unwrap()
1334            }
1335        });
1336
1337        tokio::time::sleep(Duration::from_secs(1)).await;
1338
1339        mock_provider.logs_push(vec!["system ready", "system ready"]);
1340
1341        assert!(!task.await?.success());
1342
1343        Ok(())
1344    }
1345
1346    #[tokio::test(flavor = "multi_thread")]
1347    async fn test_wait_log_count_with_lockahead_regex() -> Result<(), anyhow::Error> {
1348        let mock_provider = Arc::new(MockNode::new());
1349        let mock_node = NetworkNode::new(
1350            "node1",
1351            "ws_uri",
1352            "prometheus_uri",
1353            "multiaddr",
1354            NodeSpec::default(),
1355            mock_provider.clone(),
1356        );
1357
1358        mock_provider.logs_push(vec![
1359            "system booting",
1360            "stub line 1",
1361            // this line should not match
1362            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1363            "stub line 2"
1364        ]);
1365
1366        let task = tokio::spawn({
1367            async move {
1368                mock_node
1369                    .wait_log_line_count(
1370                        "error(?! importing block .*: block has an unknown parent)",
1371                        false,
1372                        1,
1373                    )
1374                    .await
1375                    .unwrap()
1376            }
1377        });
1378
1379        tokio::time::sleep(Duration::from_secs(1)).await;
1380
1381        mock_provider.logs_push(vec![
1382            "system ready",
1383            // this line should match
1384            "system error",
1385            "system ready",
1386        ]);
1387
1388        assert!(task.await.is_ok());
1389
1390        Ok(())
1391    }
1392
1393    #[tokio::test(flavor = "multi_thread")]
1394    async fn test_wait_log_count_with_lookahead_regex_fails() -> Result<(), anyhow::Error> {
1395        let mock_provider = Arc::new(MockNode::new());
1396        let mock_node = NetworkNode::new(
1397            "node1",
1398            "ws_uri",
1399            "prometheus_uri",
1400            "multiaddr",
1401            NodeSpec::default(),
1402            mock_provider.clone(),
1403        );
1404
1405        mock_provider.logs_push(vec![
1406            "system booting",
1407            "stub line 1",
1408            // this line should not match
1409            "Error importing block 0xfd66e545c446b1c01205503130b816af0ec2c0e504a8472808e6ff4a644ce1fa: block has an unknown parent",
1410            "stub line 2"
1411        ]);
1412
1413        let options = LogLineCountOptions {
1414            predicate: Arc::new(|count| count == 1),
1415            timeout: Duration::from_secs(2),
1416            wait_until_timeout_elapses: true,
1417        };
1418
1419        let task = tokio::spawn({
1420            async move {
1421                // we expect no match, thus wait with timeout
1422                mock_node
1423                    .wait_log_line_count_with_timeout(
1424                        "error(?! importing block .*: block has an unknown parent)",
1425                        false,
1426                        options,
1427                    )
1428                    .await
1429                    .unwrap()
1430            }
1431        });
1432
1433        tokio::time::sleep(Duration::from_secs(1)).await;
1434
1435        mock_provider.logs_push(vec!["system ready", "system ready"]);
1436
1437        assert!(!task.await?.success());
1438
1439        Ok(())
1440    }
1441
1442    #[tokio::test]
1443    async fn test_get_histogram_buckets_parsing() -> Result<(), anyhow::Error> {
1444        // This test uses a mock HTTP server to simulate Prometheus metrics
1445        use std::sync::Arc;
1446
1447        // Create a mock metrics response with proper HELP and TYPE comments
1448        let mock_metrics = concat!(
1449            "# HELP substrate_block_verification_time Time taken to verify blocks\n",
1450            "# TYPE substrate_block_verification_time histogram\n",
1451            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.1\"} 10\n",
1452            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"0.5\"} 25\n",
1453            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"1.0\"} 35\n",
1454            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"2.5\"} 40\n",
1455            "substrate_block_verification_time_bucket{chain=\"rococo_local_testnet\",le=\"+Inf\"} 42\n",
1456            "substrate_block_verification_time_sum{chain=\"rococo_local_testnet\"} 45.5\n",
1457            "substrate_block_verification_time_count{chain=\"rococo_local_testnet\"} 42\n",
1458        );
1459
1460        // Start a mock HTTP server
1461        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1462        let addr = listener.local_addr()?;
1463        let metrics = Arc::new(mock_metrics.to_string());
1464
1465        tokio::spawn({
1466            let metrics = metrics.clone();
1467            async move {
1468                loop {
1469                    if let Ok((mut socket, _)) = listener.accept().await {
1470                        let metrics = metrics.clone();
1471                        tokio::spawn(async move {
1472                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1473                            let mut buffer = [0; 1024];
1474                            let _ = socket.read(&mut buffer).await;
1475
1476                            let response = format!(
1477                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1478                                metrics.len(),
1479                                metrics
1480                            );
1481                            let _ = socket.write_all(response.as_bytes()).await;
1482                        });
1483                    }
1484                }
1485            }
1486        });
1487
1488        // Create a NetworkNode with the mock prometheus URI
1489        let mock_provider = Arc::new(MockNode::new());
1490        let mock_node = NetworkNode::new(
1491            "test_node",
1492            "ws://localhost:9944",
1493            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1494            "/ip4/127.0.0.1/tcp/30333",
1495            NodeSpec::default(),
1496            mock_provider,
1497        );
1498
1499        // Get buckets with label filter
1500        let mut label_filters = HashMap::new();
1501        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1502        let buckets = mock_node
1503            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1504            .await?;
1505
1506        // Should get the rococo_local_testnet chain's buckets
1507        assert_eq!(buckets.get("0.1"), Some(&10));
1508        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1509        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1510        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1511        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1512
1513        // Get buckets with label filter for rococo
1514        let mut label_filters = std::collections::HashMap::new();
1515        label_filters.insert("chain".to_string(), "rococo_local_testnet".to_string());
1516
1517        let buckets_filtered = mock_node
1518            .get_histogram_buckets("substrate_block_verification_time", Some(label_filters))
1519            .await?;
1520
1521        assert_eq!(buckets_filtered.get("0.1"), Some(&10));
1522        assert_eq!(buckets_filtered.get("0.5"), Some(&15));
1523
1524        // Test 3: Get buckets with _bucket suffix already present
1525        let buckets_with_suffix = mock_node
1526            .get_histogram_buckets("substrate_block_verification_time_bucket", None)
1527            .await?;
1528
1529        assert_eq!(buckets_with_suffix.get("0.1"), Some(&10));
1530
1531        Ok(())
1532    }
1533
1534    #[tokio::test]
1535    async fn test_get_histogram_buckets_unordered() -> Result<(), anyhow::Error> {
1536        // Test that buckets are correctly sorted even when received out of order
1537        use std::sync::Arc;
1538
1539        let mock_metrics = concat!(
1540            "# HELP test_metric A test metric\n",
1541            "# TYPE test_metric histogram\n",
1542            "test_metric_bucket{le=\"2.5\"} 40\n",
1543            "test_metric_bucket{le=\"0.1\"} 10\n",
1544            "test_metric_bucket{le=\"+Inf\"} 42\n",
1545            "test_metric_bucket{le=\"1.0\"} 35\n",
1546            "test_metric_bucket{le=\"0.5\"} 25\n",
1547        );
1548
1549        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1550        let addr = listener.local_addr()?;
1551        let metrics = Arc::new(mock_metrics.to_string());
1552
1553        tokio::spawn({
1554            let metrics = metrics.clone();
1555            async move {
1556                loop {
1557                    if let Ok((mut socket, _)) = listener.accept().await {
1558                        let metrics = metrics.clone();
1559                        tokio::spawn(async move {
1560                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561                            let mut buffer = [0; 1024];
1562                            let _ = socket.read(&mut buffer).await;
1563                            let response = format!(
1564                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1565                                metrics.len(),
1566                                metrics
1567                            );
1568                            let _ = socket.write_all(response.as_bytes()).await;
1569                        });
1570                    }
1571                }
1572            }
1573        });
1574
1575        let mock_provider = Arc::new(MockNode::new());
1576        let mock_node = NetworkNode::new(
1577            "test_node",
1578            "ws://localhost:9944",
1579            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1580            "/ip4/127.0.0.1/tcp/30333",
1581            NodeSpec::default(),
1582            mock_provider,
1583        );
1584
1585        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1586
1587        // Verify deltas are calculated correctly after sorting
1588        assert_eq!(buckets.get("0.1"), Some(&10)); // 10 - 0
1589        assert_eq!(buckets.get("0.5"), Some(&15)); // 25 - 10
1590        assert_eq!(buckets.get("1.0"), Some(&10)); // 35 - 25
1591        assert_eq!(buckets.get("2.5"), Some(&5)); // 40 - 35
1592        assert_eq!(buckets.get("+Inf"), Some(&2)); // 42 - 40
1593
1594        Ok(())
1595    }
1596
1597    #[tokio::test]
1598    async fn test_get_histogram_buckets_complex_labels() -> Result<(), anyhow::Error> {
1599        // Test label parsing with commas and special characters in values
1600        use std::sync::Arc;
1601
1602        let mock_metrics = concat!(
1603            "# HELP test_metric A test metric\n",
1604            "# TYPE test_metric histogram\n",
1605            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.1\"} 5\n",
1606            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"0.5\"} 15\n",
1607            "test_metric_bucket{method=\"GET,POST\",path=\"/api/test\",le=\"+Inf\"} 20\n",
1608        );
1609
1610        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
1611        let addr = listener.local_addr()?;
1612        let metrics = Arc::new(mock_metrics.to_string());
1613
1614        tokio::spawn({
1615            let metrics = metrics.clone();
1616            async move {
1617                loop {
1618                    if let Ok((mut socket, _)) = listener.accept().await {
1619                        let metrics = metrics.clone();
1620                        tokio::spawn(async move {
1621                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1622                            let mut buffer = [0; 1024];
1623                            let _ = socket.read(&mut buffer).await;
1624                            let response = format!(
1625                                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
1626                                metrics.len(),
1627                                metrics
1628                            );
1629                            let _ = socket.write_all(response.as_bytes()).await;
1630                        });
1631                    }
1632                }
1633            }
1634        });
1635
1636        let mock_provider = Arc::new(MockNode::new());
1637        let mock_node = NetworkNode::new(
1638            "test_node",
1639            "ws://localhost:9944",
1640            &format!("http://127.0.0.1:{}/metrics", addr.port()),
1641            "/ip4/127.0.0.1/tcp/30333",
1642            NodeSpec::default(),
1643            mock_provider,
1644        );
1645
1646        // Test without filter
1647        let buckets = mock_node.get_histogram_buckets("test_metric", None).await?;
1648        assert_eq!(buckets.get("0.1"), Some(&5));
1649        assert_eq!(buckets.get("0.5"), Some(&10)); // 15 - 5
1650        assert_eq!(buckets.get("+Inf"), Some(&5)); // 20 - 15
1651
1652        // Test with filter containing comma in value
1653        let mut label_filters = std::collections::HashMap::new();
1654        label_filters.insert("method".to_string(), "GET,POST".to_string());
1655
1656        let buckets_filtered = mock_node
1657            .get_histogram_buckets("test_metric", Some(label_filters))
1658            .await?;
1659
1660        assert_eq!(buckets_filtered.get("0.1"), Some(&5));
1661        assert_eq!(buckets_filtered.get("0.5"), Some(&10));
1662
1663        Ok(())
1664    }
1665
1666    #[test]
1667    fn test_compare_le_values() {
1668        use std::cmp::Ordering;
1669
1670        use crate::network::node::NetworkNode;
1671
1672        // Numeric comparison
1673        assert_eq!(NetworkNode::compare_le_values("0.1", "0.5"), Ordering::Less);
1674        assert_eq!(
1675            NetworkNode::compare_le_values("1.0", "0.5"),
1676            Ordering::Greater
1677        );
1678        assert_eq!(
1679            NetworkNode::compare_le_values("1.0", "1.0"),
1680            Ordering::Equal
1681        );
1682
1683        // +Inf handling
1684        assert_eq!(
1685            NetworkNode::compare_le_values("+Inf", "999"),
1686            Ordering::Greater
1687        );
1688        assert_eq!(
1689            NetworkNode::compare_le_values("0.1", "+Inf"),
1690            Ordering::Less
1691        );
1692        assert_eq!(
1693            NetworkNode::compare_le_values("+Inf", "+Inf"),
1694            Ordering::Equal
1695        );
1696
1697        // Large numbers
1698        assert_eq!(NetworkNode::compare_le_values("10", "100"), Ordering::Less);
1699        assert_eq!(
1700            NetworkNode::compare_le_values("1000", "999"),
1701            Ordering::Greater
1702        );
1703    }
1704}