streamling_e2e/resources/
prometheus.rs1use crate::{E2eError, Result};
4use serde::Deserialize;
5use tracing::{debug, warn};
6
7pub struct PrometheusResource {
9 pub url: String,
11 pub query_endpoint: String,
13 pub ingestion_endpoint: String,
15}
16
17impl PrometheusResource {
18 pub fn new(base_url: &str) -> Self {
20 let url = base_url.trim_end_matches('/').to_string();
21 Self {
22 query_endpoint: format!("{}/api/v1/query", url),
23 ingestion_endpoint: format!("{}/api/v1/otlp/v1/metrics", url),
24 url,
25 }
26 }
27
28 pub async fn query(&self, query: &str) -> Result<Option<f64>> {
30 let encoded = urlencoding::encode(query);
31 let full_url = format!("{}?query={}", self.query_endpoint, encoded);
32
33 debug!("Prometheus query: {} -> {}", query, full_url);
34
35 let client = reqwest::Client::builder()
36 .timeout(std::time::Duration::from_secs(10))
37 .build()
38 .map_err(|e| E2eError::Prometheus(format!("Failed to build client: {}", e)))?;
39
40 let resp =
41 client.get(&full_url).send().await.map_err(|e| {
42 E2eError::Prometheus(format!("Query failed: {} url={}", e, full_url))
43 })?;
44
45 let status = resp.status();
46 let body = resp
47 .text()
48 .await
49 .map_err(|e| E2eError::Prometheus(format!("Failed to read response: {}", e)))?;
50
51 if !status.is_success() {
52 return Err(E2eError::Prometheus(format!(
53 "Query failed with status {}: {}",
54 status, body
55 )));
56 }
57
58 let response: PrometheusResponse = serde_json::from_str(&body).map_err(|e| {
60 E2eError::Prometheus(format!("Failed to parse response: {} body={}", e, body))
61 })?;
62
63 if response.status != "success" {
64 return Err(E2eError::Prometheus(format!(
65 "Query returned error status: {:?}",
66 response
67 )));
68 }
69
70 if let Some(result) = response.data.result.first() {
72 if let Some(value) = result.value.get(1) {
73 if let Some(s) = value.as_str() {
74 return Ok(s.parse::<f64>().ok());
75 }
76 }
77 }
78
79 Ok(None)
80 }
81
82 pub async fn query_count(&self, query: &str) -> Result<Option<u64>> {
84 self.query(query).await.map(|v| v.map(|f| f as u64))
85 }
86
87 pub fn output_rows_query(node_id: &str, instance_id: Option<&str>) -> String {
89 Self::build_metric_query("streamling_output_rows_total", node_id, instance_id)
90 }
91
92 pub fn input_rows_query(node_id: &str, instance_id: Option<&str>) -> String {
94 Self::build_metric_query("streamling_input_rows_total", node_id, instance_id)
95 }
96
97 pub fn elapsed_compute_query(node_id: &str, instance_id: Option<&str>) -> String {
99 Self::build_metric_query(
100 "streamling_elapsed_compute_milliseconds_sum",
101 node_id,
102 instance_id,
103 )
104 }
105
106 pub fn checkpoint_coordinator_query(metric_name: &str, instance_id: Option<&str>) -> String {
109 Self::build_metric_query(metric_name, "checkpoint_coordinator", instance_id)
110 }
111
112 pub fn checkpoint_histogram_query(
114 metric_name: &str,
115 node_id: &str,
116 instance_id: Option<&str>,
117 ) -> String {
118 Self::build_metric_query(&format!("{}_sum", metric_name), node_id, instance_id)
119 }
120
121 fn build_metric_query(metric_name: &str, node_id: &str, instance_id: Option<&str>) -> String {
124 let mut labels = format!("id=\"{}\"", node_id);
125 if let Some(instance) = instance_id {
126 labels.push_str(&format!(",instance=\"{}\"", instance));
127 }
128 format!("{}{{{}}}", metric_name, labels)
129 }
130
131 pub async fn wait_for_metric_at_least(
134 &self,
135 query: &str,
136 min_value: u64,
137 timeout_secs: u64,
138 poll_interval_ms: u64,
139 ) -> Result<u64> {
140 let start = std::time::Instant::now();
141 let timeout = std::time::Duration::from_secs(timeout_secs);
142 let poll_interval = std::time::Duration::from_millis(poll_interval_ms);
143
144 loop {
145 if start.elapsed() >= timeout {
146 return Err(E2eError::Prometheus(format!(
147 "Timeout waiting for metric {} to reach {}",
148 query, min_value
149 )));
150 }
151
152 if let Some(count) = self.query_count(query).await? {
153 if count >= min_value {
154 return Ok(count);
155 }
156 debug!(
157 "Metric {} = {}, waiting for at least {}",
158 query, count, min_value
159 );
160 }
161
162 tokio::time::sleep(poll_interval).await;
163 }
164 }
165
166 pub async fn assert_metric_at_least(
168 &self,
169 query: &str,
170 expected: u64,
171 timeout_secs: u64,
172 ) -> Result<()> {
173 match self
174 .wait_for_metric_at_least(query, expected, timeout_secs, 500)
175 .await
176 {
177 Ok(actual) => {
178 debug!(
179 "Metric {} reached {} (expected at least {})",
180 query, actual, expected
181 );
182 Ok(())
183 }
184 Err(e) => {
185 let actual = self.query_count(query).await.ok().flatten();
187 warn!(
188 "Metric assertion failed: {} expected at least {}, got {:?}",
189 query, expected, actual
190 );
191 Err(e)
192 }
193 }
194 }
195}
196
197#[derive(Debug, Deserialize)]
198struct PrometheusResponse {
199 status: String,
200 data: PrometheusData,
201}
202
203#[derive(Debug, Deserialize)]
204struct PrometheusData {
205 #[serde(default)]
206 result: Vec<PrometheusResult>,
207}
208
209#[derive(Debug, Deserialize)]
210struct PrometheusResult {
211 #[serde(default)]
212 value: Vec<serde_json::Value>,
213}