1use crate::error::ClientError;
4use crate::normalize_url;
5use crate::now_unix_secs;
6use crate::promise::promise_channel;
7use crate::request::QueryRequest;
8use crate::url_encode;
9use crate::{
10 BackendInfo, HealthCheckResult, LabelsResult, MetricLabelsResult, MetricsClient, QueryResult,
11};
12use poll_promise::Promise;
13
14use super::response::{
15 parse_buildinfo_response, parse_labels_response, parse_response, parse_series_response,
16};
17
18pub struct PrometheusClient {
37 base_url: String,
38 http_client: reqwest::Client,
39 #[cfg(not(target_arch = "wasm32"))]
40 runtime_handle: tokio::runtime::Handle,
41}
42
43impl PrometheusClient {
44 #[must_use]
56 pub fn new(base_url: impl Into<String>) -> Self {
57 Self {
58 base_url: normalize_url(base_url),
59 http_client: reqwest::Client::new(),
60 #[cfg(not(target_arch = "wasm32"))]
61 runtime_handle: tokio::runtime::Handle::current(),
62 }
63 }
64
65 #[cfg(not(target_arch = "wasm32"))]
69 #[must_use]
70 pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
71 Self {
72 base_url: normalize_url(base_url),
73 http_client: reqwest::Client::new(),
74 runtime_handle: handle,
75 }
76 }
77
78 #[cfg(not(target_arch = "wasm32"))]
80 fn spawn<F>(&self, future: F)
81 where
82 F: std::future::Future<Output = ()> + Send + 'static,
83 {
84 self.runtime_handle.spawn(future);
85 }
86
87 #[cfg(target_arch = "wasm32")]
89 fn spawn<F>(&self, future: F)
90 where
91 F: std::future::Future<Output = ()> + 'static,
92 {
93 wasm_bindgen_futures::spawn_local(future);
94 }
95
96 fn build_url(&self, promql: &str, request: &QueryRequest) -> String {
98 let now_secs = now_unix_secs();
99
100 let end_secs = request
102 .end
103 .map(|ns| (ns / 1_000_000_000) as u64)
104 .unwrap_or(now_secs);
105 let start_secs = request
106 .start
107 .map(|ns| (ns / 1_000_000_000) as u64)
108 .unwrap_or(end_secs.saturating_sub(3600));
109
110 let step = request.step_secs;
111
112 let encoded_query = url_encode(promql);
114
115 format!(
116 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
117 self.base_url, encoded_query, start_secs, end_secs, step
118 )
119 }
120}
121
122impl MetricsClient for PrometheusClient {
123 fn query(&self, request: QueryRequest, ctx: &egui::Context) -> Promise<QueryResult> {
124 let promql = if request.query.is_empty() || request.query == "*" {
127 request.metric.clone()
128 } else {
129 request.query.clone()
130 };
131
132 let url = self.build_url(&promql, &request);
133 let metric = request.metric.clone();
134 let query = request.query.clone();
135 let granularity_ns = (request.step_secs as u128) * 1_000_000_000;
136
137 log::debug!("Prometheus query: {url}");
138
139 let (sender, promise) = promise_channel();
140 let ctx = ctx.clone();
141 let client = self.http_client.clone();
142
143 self.spawn(async move {
144 let result = match client.get(&url).send().await {
145 Ok(response) => {
146 let status = response.status();
147 if status.is_success() {
148 match response.bytes().await {
149 Ok(bytes) => parse_response(&bytes, &metric, &query, granularity_ns),
150 Err(e) => Err(ClientError::NetworkError(e.to_string())),
151 }
152 } else {
153 Err(ClientError::BackendError {
154 status: status.as_u16(),
155 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
156 })
157 }
158 }
159 Err(e) => Err(ClientError::NetworkError(e.to_string())),
160 };
161 sender.send(result);
162 ctx.request_repaint();
163 });
164
165 promise
166 }
167
168 fn fetch_label_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
169 let url = format!("{}/api/v1/labels", self.base_url);
170
171 log::debug!("Prometheus fetch labels: {url}");
172
173 let (sender, promise) = promise_channel();
174 let ctx = ctx.clone();
175 let client = self.http_client.clone();
176
177 self.spawn(async move {
178 let result = match client.get(&url).send().await {
179 Ok(response) => {
180 let status = response.status();
181 if status.is_success() {
182 match response.bytes().await {
183 Ok(bytes) => parse_labels_response(&bytes),
184 Err(e) => Err(ClientError::NetworkError(e.to_string())),
185 }
186 } else {
187 Err(ClientError::BackendError {
188 status: status.as_u16(),
189 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
190 })
191 }
192 }
193 Err(e) => Err(ClientError::NetworkError(e.to_string())),
194 };
195 sender.send(result);
196 ctx.request_repaint();
197 });
198
199 promise
200 }
201
202 fn fetch_label_values(&self, label: &str, ctx: &egui::Context) -> Promise<LabelsResult> {
203 let url = format!("{}/api/v1/label/{}/values", self.base_url, label);
204
205 log::debug!("Prometheus fetch label values for '{label}': {url}");
206
207 let (sender, promise) = promise_channel();
208 let ctx = ctx.clone();
209 let client = self.http_client.clone();
210
211 self.spawn(async move {
212 let result = match client.get(&url).send().await {
213 Ok(response) => {
214 let status = response.status();
215 if status.is_success() {
216 match response.bytes().await {
217 Ok(bytes) => parse_labels_response(&bytes),
218 Err(e) => Err(ClientError::NetworkError(e.to_string())),
219 }
220 } else {
221 Err(ClientError::BackendError {
222 status: status.as_u16(),
223 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
224 })
225 }
226 }
227 Err(e) => Err(ClientError::NetworkError(e.to_string())),
228 };
229 sender.send(result);
230 ctx.request_repaint();
231 });
232
233 promise
234 }
235
236 fn fetch_metric_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
237 self.fetch_label_values("__name__", ctx)
239 }
240
241 fn fetch_metric_labels(
242 &self,
243 metric: &str,
244 ctx: &egui::Context,
245 ) -> Promise<MetricLabelsResult> {
246 let selector = format!(r#"{{__name__="{metric}"}}"#);
249 let encoded_selector = url_encode(&selector);
250 let url = format!(
251 "{}/api/v1/series?match[]={}",
252 self.base_url, encoded_selector
253 );
254
255 log::debug!("Prometheus fetch metric labels for '{metric}': {url}");
256
257 let (sender, promise) = promise_channel();
258 let ctx = ctx.clone();
259 let client = self.http_client.clone();
260
261 self.spawn(async move {
262 let result = match client.get(&url).send().await {
263 Ok(response) => {
264 let status = response.status();
265 if status.is_success() {
266 match response.bytes().await {
267 Ok(bytes) => parse_series_response(&bytes),
268 Err(e) => Err(ClientError::NetworkError(e.to_string())),
269 }
270 } else {
271 Err(ClientError::BackendError {
272 status: status.as_u16(),
273 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
274 })
275 }
276 }
277 Err(e) => Err(ClientError::NetworkError(e.to_string())),
278 };
279 sender.send(result);
280 ctx.request_repaint();
281 });
282
283 promise
284 }
285
286 fn backend_type(&self) -> &'static str {
287 "prometheus"
288 }
289
290 fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
291 let url = format!("{}/api/v1/status/buildinfo", self.base_url);
292
293 log::debug!("Prometheus health check: {url}");
294
295 let (sender, promise) = promise_channel();
296 let ctx = ctx.clone();
297 let client = self.http_client.clone();
298
299 self.spawn(async move {
300 let result = match client.get(&url).send().await {
301 Ok(response) => {
302 let status = response.status();
303 if status.is_success() {
304 match response.bytes().await {
305 Ok(bytes) => parse_buildinfo_response(&bytes).map(|info| BackendInfo {
306 backend_type: "prometheus".to_string(),
307 version: info.version,
308 }),
309 Err(e) => Err(ClientError::NetworkError(e.to_string())),
310 }
311 } else {
312 Err(ClientError::BackendError {
313 status: status.as_u16(),
314 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
315 })
316 }
317 }
318 Err(e) => Err(ClientError::NetworkError(e.to_string())),
319 };
320 sender.send(result);
321 ctx.request_repaint();
322 });
323
324 promise
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 fn with_runtime<F: FnOnce()>(f: F) {
334 let rt = tokio::runtime::Runtime::new().unwrap();
335 let _guard = rt.enter();
336 f();
337 }
338
339 #[test]
340 fn test_new_removes_trailing_slash() {
341 with_runtime(|| {
342 let client = PrometheusClient::new("http://localhost:9090/");
343 assert_eq!(client.base_url, "http://localhost:9090");
344 });
345 }
346
347 #[test]
348 fn test_new_adds_http_protocol() {
349 with_runtime(|| {
350 let client = PrometheusClient::new("localhost:9090");
351 assert_eq!(client.base_url, "http://localhost:9090");
352 });
353 }
354
355 #[test]
356 fn test_new_preserves_https() {
357 with_runtime(|| {
358 let client = PrometheusClient::new("https://prometheus.example.com");
359 assert_eq!(client.base_url, "https://prometheus.example.com");
360 });
361 }
362
363 #[test]
364 fn test_build_url() {
365 with_runtime(|| {
366 let client = PrometheusClient::new("http://localhost:9090");
367 let request = QueryRequest::new("cpu", "*").with_step(60);
368
369 let url = client.build_url("cpu", &request);
370 assert!(url.starts_with("http://localhost:9090/api/v1/query_range?"));
371 assert!(url.contains("query=cpu"));
372 assert!(url.contains("step=60"));
373 });
374 }
375
376 #[test]
377 fn test_url_encode() {
378 assert_eq!(url_encode("simple"), "simple");
379 assert_eq!(
380 url_encode(r#"cpu{env="prod"}"#),
381 "cpu%7Benv%3D%22prod%22%7D"
382 );
383 assert_eq!(url_encode("rate(m[5m])"), "rate(m%5B5m%5D)");
384 }
385
386 #[test]
387 fn test_backend_type() {
388 with_runtime(|| {
389 let client = PrometheusClient::new("http://localhost:9090");
390 assert_eq!(client.backend_type(), "prometheus");
391 });
392 }
393}