Skip to main content

streamling_e2e/resources/
prometheus.rs

1//! Prometheus resource for querying metrics in e2e tests.
2
3use crate::{E2eError, Result};
4use serde::Deserialize;
5use tracing::{debug, warn};
6
7/// Resource for querying Prometheus metrics
8pub struct PrometheusResource {
9    /// Base URL for Prometheus (e.g., http://localhost:30090)
10    pub url: String,
11    /// Query endpoint (e.g., http://localhost:30090/api/v1/query)
12    pub query_endpoint: String,
13    /// Metrics ingestion endpoint for OTLP (e.g., http://localhost:30090/api/v1/otlp/v1/metrics)
14    pub ingestion_endpoint: String,
15}
16
17impl PrometheusResource {
18    /// Create a new Prometheus resource from a base URL
19    pub fn new(base_url: &str) -> Self {
20        let url = base_url.trim_end_matches('/').to_string();
21        Self {
22            query_endpoint: format!("{}/api/v1/query", url),
23            ingestion_endpoint: format!("{}/api/v1/otlp/v1/metrics", url),
24            url,
25        }
26    }
27
28    /// Query Prometheus for a metric value
29    pub async fn query(&self, query: &str) -> Result<Option<f64>> {
30        let encoded = urlencoding::encode(query);
31        let full_url = format!("{}?query={}", self.query_endpoint, encoded);
32
33        debug!("Prometheus query: {} -> {}", query, full_url);
34
35        let client = reqwest::Client::builder()
36            .timeout(std::time::Duration::from_secs(10))
37            .build()
38            .map_err(|e| E2eError::Prometheus(format!("Failed to build client: {}", e)))?;
39
40        let resp =
41            client.get(&full_url).send().await.map_err(|e| {
42                E2eError::Prometheus(format!("Query failed: {} url={}", e, full_url))
43            })?;
44
45        let status = resp.status();
46        let body = resp
47            .text()
48            .await
49            .map_err(|e| E2eError::Prometheus(format!("Failed to read response: {}", e)))?;
50
51        if !status.is_success() {
52            return Err(E2eError::Prometheus(format!(
53                "Query failed with status {}: {}",
54                status, body
55            )));
56        }
57
58        // Parse the Prometheus response
59        let response: PrometheusResponse = serde_json::from_str(&body).map_err(|e| {
60            E2eError::Prometheus(format!("Failed to parse response: {} body={}", e, body))
61        })?;
62
63        if response.status != "success" {
64            return Err(E2eError::Prometheus(format!(
65                "Query returned error status: {:?}",
66                response
67            )));
68        }
69
70        // Extract the value from the result
71        if let Some(result) = response.data.result.first() {
72            if let Some(value) = result.value.get(1) {
73                if let Some(s) = value.as_str() {
74                    return Ok(s.parse::<f64>().ok());
75                }
76            }
77        }
78
79        Ok(None)
80    }
81
82    /// Query for a metric and return as u64
83    pub async fn query_count(&self, query: &str) -> Result<Option<u64>> {
84        self.query(query).await.map(|v| v.map(|f| f as u64))
85    }
86
87    /// Build a query for output rows total metric
88    pub fn output_rows_query(node_id: &str, instance_id: Option<&str>) -> String {
89        Self::build_metric_query("streamling_output_rows_total", node_id, instance_id)
90    }
91
92    /// Build a query for input rows total metric
93    pub fn input_rows_query(node_id: &str, instance_id: Option<&str>) -> String {
94        Self::build_metric_query("streamling_input_rows_total", node_id, instance_id)
95    }
96
97    /// Build a query for elapsed compute metric
98    pub fn elapsed_compute_query(node_id: &str, instance_id: Option<&str>) -> String {
99        Self::build_metric_query(
100            "streamling_elapsed_compute_milliseconds_sum",
101            node_id,
102            instance_id,
103        )
104    }
105
106    /// Build a query for a checkpoint coordinator metric (counter total).
107    /// Coordinator metrics use `id="checkpoint_coordinator"`.
108    pub fn checkpoint_coordinator_query(metric_name: &str, instance_id: Option<&str>) -> String {
109        Self::build_metric_query(metric_name, "checkpoint_coordinator", instance_id)
110    }
111
112    /// Build a query for a checkpoint histogram metric (sum) by node id.
113    pub fn checkpoint_histogram_query(
114        metric_name: &str,
115        node_id: &str,
116        instance_id: Option<&str>,
117    ) -> String {
118        Self::build_metric_query(&format!("{}_sum", metric_name), node_id, instance_id)
119    }
120
121    /// Build a metric query with labels
122    /// Note: instance_id maps to the `instance` label in Prometheus (from OTLP resource attribute)
123    fn build_metric_query(metric_name: &str, node_id: &str, instance_id: Option<&str>) -> String {
124        let mut labels = format!("id=\"{}\"", node_id);
125        if let Some(instance) = instance_id {
126            labels.push_str(&format!(",instance=\"{}\"", instance));
127        }
128        format!("{}{{{}}}", metric_name, labels)
129    }
130
131    /// Wait for a metric to reach at least a certain value
132    /// Returns the actual value when the threshold is reached
133    pub async fn wait_for_metric_at_least(
134        &self,
135        query: &str,
136        min_value: u64,
137        timeout_secs: u64,
138        poll_interval_ms: u64,
139    ) -> Result<u64> {
140        let start = std::time::Instant::now();
141        let timeout = std::time::Duration::from_secs(timeout_secs);
142        let poll_interval = std::time::Duration::from_millis(poll_interval_ms);
143
144        loop {
145            if start.elapsed() >= timeout {
146                return Err(E2eError::Prometheus(format!(
147                    "Timeout waiting for metric {} to reach {}",
148                    query, min_value
149                )));
150            }
151
152            if let Some(count) = self.query_count(query).await? {
153                if count >= min_value {
154                    return Ok(count);
155                }
156                debug!(
157                    "Metric {} = {}, waiting for at least {}",
158                    query, count, min_value
159                );
160            }
161
162            tokio::time::sleep(poll_interval).await;
163        }
164    }
165
166    /// Assert that a metric reaches at least a certain value
167    pub async fn assert_metric_at_least(
168        &self,
169        query: &str,
170        expected: u64,
171        timeout_secs: u64,
172    ) -> Result<()> {
173        match self
174            .wait_for_metric_at_least(query, expected, timeout_secs, 500)
175            .await
176        {
177            Ok(actual) => {
178                debug!(
179                    "Metric {} reached {} (expected at least {})",
180                    query, actual, expected
181                );
182                Ok(())
183            }
184            Err(e) => {
185                // Try one more query to get the actual value for the error message
186                let actual = self.query_count(query).await.ok().flatten();
187                warn!(
188                    "Metric assertion failed: {} expected at least {}, got {:?}",
189                    query, expected, actual
190                );
191                Err(e)
192            }
193        }
194    }
195}
196
197#[derive(Debug, Deserialize)]
198struct PrometheusResponse {
199    status: String,
200    data: PrometheusData,
201}
202
203#[derive(Debug, Deserialize)]
204struct PrometheusData {
205    #[serde(default)]
206    result: Vec<PrometheusResult>,
207}
208
209#[derive(Debug, Deserialize)]
210struct PrometheusResult {
211    #[serde(default)]
212    value: Vec<serde_json::Value>,
213}