Skip to main content

enya_client/prometheus/
client.rs

1//! Prometheus HTTP client implementation.
2
3use crate::error::ClientError;
4use crate::normalize_url;
5use crate::now_unix_secs;
6use crate::promise::promise_channel;
7use crate::request::QueryRequest;
8use crate::url_encode;
9use crate::{
10    BackendInfo, HealthCheckResult, LabelsResult, MetricLabelsResult, MetricsClient, QueryResult,
11};
12use poll_promise::Promise;
13
14use super::response::{
15    parse_buildinfo_response, parse_labels_response, parse_response, parse_series_response,
16};
17
18/// Client for querying Prometheus via its HTTP API.
19///
20/// Executes PromQL queries directly against the `/api/v1/query_range` endpoint.
21/// Uses `reqwest` for HTTP requests on both native (with tokio) and WASM (with
22/// wasm-bindgen-futures).
23///
24/// # Example
25///
26/// ```ignore
27/// use enya_client::{QueryManager, QueryRequest};
28/// use enya_client::prometheus::PrometheusClient;
29///
30/// let client = PrometheusClient::new("http://localhost:9090");
31/// let mut manager = QueryManager::new();
32///
33/// let request = QueryRequest::new("cpu_usage", "sum(env:prod) by (host)");
34/// manager.execute(&client, request, &ctx);
35/// ```
36pub struct PrometheusClient {
37    base_url: String,
38    http_client: reqwest::Client,
39    #[cfg(not(target_arch = "wasm32"))]
40    runtime_handle: tokio::runtime::Handle,
41}
42
43impl PrometheusClient {
44    /// Create a new Prometheus client.
45    ///
46    /// # Arguments
47    ///
48    /// * `base_url` - The base URL of the Prometheus server (e.g., "http://localhost:9090")
49    ///
50    /// If no protocol is specified, `http://` is assumed (Prometheus default).
51    ///
52    /// # Panics
53    ///
54    /// On native, panics if called outside a tokio runtime context.
55    #[must_use]
56    pub fn new(base_url: impl Into<String>) -> Self {
57        Self {
58            base_url: normalize_url(base_url),
59            http_client: reqwest::Client::new(),
60            #[cfg(not(target_arch = "wasm32"))]
61            runtime_handle: tokio::runtime::Handle::current(),
62        }
63    }
64
65    /// Create a new Prometheus client with an explicit runtime handle.
66    ///
67    /// Use this when creating the client outside a tokio runtime context.
68    #[cfg(not(target_arch = "wasm32"))]
69    #[must_use]
70    pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
71        Self {
72            base_url: normalize_url(base_url),
73            http_client: reqwest::Client::new(),
74            runtime_handle: handle,
75        }
76    }
77
78    /// Spawn an async task on the runtime.
79    #[cfg(not(target_arch = "wasm32"))]
80    fn spawn<F>(&self, future: F)
81    where
82        F: std::future::Future<Output = ()> + Send + 'static,
83    {
84        self.runtime_handle.spawn(future);
85    }
86
87    /// Spawn an async task using wasm-bindgen-futures.
88    #[cfg(target_arch = "wasm32")]
89    fn spawn<F>(&self, future: F)
90    where
91        F: std::future::Future<Output = ()> + 'static,
92    {
93        wasm_bindgen_futures::spawn_local(future);
94    }
95
96    /// Build the query_range URL for a request.
97    fn build_url(&self, promql: &str, request: &QueryRequest) -> String {
98        let now_secs = now_unix_secs();
99
100        // Default time range: 1 hour ago to now
101        let end_secs = request
102            .end
103            .map(|ns| (ns / 1_000_000_000) as u64)
104            .unwrap_or(now_secs);
105        let start_secs = request
106            .start
107            .map(|ns| (ns / 1_000_000_000) as u64)
108            .unwrap_or(end_secs.saturating_sub(3600));
109
110        let step = request.step_secs;
111
112        // URL-encode the query
113        let encoded_query = url_encode(promql);
114
115        format!(
116            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
117            self.base_url, encoded_query, start_secs, end_secs, step
118        )
119    }
120}
121
122impl MetricsClient for PrometheusClient {
123    fn query(&self, request: QueryRequest, ctx: &egui::Context) -> Promise<QueryResult> {
124        // Use query directly as PromQL (no translation)
125        // If query is empty or "*", use the metric name as the query
126        let promql = if request.query.is_empty() || request.query == "*" {
127            request.metric.clone()
128        } else {
129            request.query.clone()
130        };
131
132        let url = self.build_url(&promql, &request);
133        let metric = request.metric.clone();
134        let query = request.query.clone();
135        let granularity_ns = (request.step_secs as u128) * 1_000_000_000;
136
137        log::debug!("Prometheus query: {url}");
138
139        let (sender, promise) = promise_channel();
140        let ctx = ctx.clone();
141        let client = self.http_client.clone();
142
143        self.spawn(async move {
144            let result = match client.get(&url).send().await {
145                Ok(response) => {
146                    let status = response.status();
147                    if status.is_success() {
148                        match response.bytes().await {
149                            Ok(bytes) => parse_response(&bytes, &metric, &query, granularity_ns),
150                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
151                        }
152                    } else {
153                        Err(ClientError::BackendError {
154                            status: status.as_u16(),
155                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
156                        })
157                    }
158                }
159                Err(e) => Err(ClientError::NetworkError(e.to_string())),
160            };
161            sender.send(result);
162            ctx.request_repaint();
163        });
164
165        promise
166    }
167
168    fn fetch_label_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
169        let url = format!("{}/api/v1/labels", self.base_url);
170
171        log::debug!("Prometheus fetch labels: {url}");
172
173        let (sender, promise) = promise_channel();
174        let ctx = ctx.clone();
175        let client = self.http_client.clone();
176
177        self.spawn(async move {
178            let result = match client.get(&url).send().await {
179                Ok(response) => {
180                    let status = response.status();
181                    if status.is_success() {
182                        match response.bytes().await {
183                            Ok(bytes) => parse_labels_response(&bytes),
184                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
185                        }
186                    } else {
187                        Err(ClientError::BackendError {
188                            status: status.as_u16(),
189                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
190                        })
191                    }
192                }
193                Err(e) => Err(ClientError::NetworkError(e.to_string())),
194            };
195            sender.send(result);
196            ctx.request_repaint();
197        });
198
199        promise
200    }
201
202    fn fetch_label_values(&self, label: &str, ctx: &egui::Context) -> Promise<LabelsResult> {
203        let url = format!("{}/api/v1/label/{}/values", self.base_url, label);
204
205        log::debug!("Prometheus fetch label values for '{label}': {url}");
206
207        let (sender, promise) = promise_channel();
208        let ctx = ctx.clone();
209        let client = self.http_client.clone();
210
211        self.spawn(async move {
212            let result = match client.get(&url).send().await {
213                Ok(response) => {
214                    let status = response.status();
215                    if status.is_success() {
216                        match response.bytes().await {
217                            Ok(bytes) => parse_labels_response(&bytes),
218                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
219                        }
220                    } else {
221                        Err(ClientError::BackendError {
222                            status: status.as_u16(),
223                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
224                        })
225                    }
226                }
227                Err(e) => Err(ClientError::NetworkError(e.to_string())),
228            };
229            sender.send(result);
230            ctx.request_repaint();
231        });
232
233        promise
234    }
235
236    fn fetch_metric_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
237        // In Prometheus, metric names are stored in the special __name__ label
238        self.fetch_label_values("__name__", ctx)
239    }
240
241    fn fetch_metric_labels(
242        &self,
243        metric: &str,
244        ctx: &egui::Context,
245    ) -> Promise<MetricLabelsResult> {
246        // Build the series query URL
247        // match[]={__name__="metric_name"}
248        let selector = format!(r#"{{__name__="{metric}"}}"#);
249        let encoded_selector = url_encode(&selector);
250        let url = format!(
251            "{}/api/v1/series?match[]={}",
252            self.base_url, encoded_selector
253        );
254
255        log::debug!("Prometheus fetch metric labels for '{metric}': {url}");
256
257        let (sender, promise) = promise_channel();
258        let ctx = ctx.clone();
259        let client = self.http_client.clone();
260
261        self.spawn(async move {
262            let result = match client.get(&url).send().await {
263                Ok(response) => {
264                    let status = response.status();
265                    if status.is_success() {
266                        match response.bytes().await {
267                            Ok(bytes) => parse_series_response(&bytes),
268                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
269                        }
270                    } else {
271                        Err(ClientError::BackendError {
272                            status: status.as_u16(),
273                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
274                        })
275                    }
276                }
277                Err(e) => Err(ClientError::NetworkError(e.to_string())),
278            };
279            sender.send(result);
280            ctx.request_repaint();
281        });
282
283        promise
284    }
285
286    fn backend_type(&self) -> &'static str {
287        "prometheus"
288    }
289
290    fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
291        let url = format!("{}/api/v1/status/buildinfo", self.base_url);
292
293        log::debug!("Prometheus health check: {url}");
294
295        let (sender, promise) = promise_channel();
296        let ctx = ctx.clone();
297        let client = self.http_client.clone();
298
299        self.spawn(async move {
300            let result = match client.get(&url).send().await {
301                Ok(response) => {
302                    let status = response.status();
303                    if status.is_success() {
304                        match response.bytes().await {
305                            Ok(bytes) => parse_buildinfo_response(&bytes).map(|info| BackendInfo {
306                                backend_type: "prometheus".to_string(),
307                                version: info.version,
308                            }),
309                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
310                        }
311                    } else {
312                        Err(ClientError::BackendError {
313                            status: status.as_u16(),
314                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
315                        })
316                    }
317                }
318                Err(e) => Err(ClientError::NetworkError(e.to_string())),
319            };
320            sender.send(result);
321            ctx.request_repaint();
322        });
323
324        promise
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    /// Helper to create a tokio runtime for tests
333    fn with_runtime<F: FnOnce()>(f: F) {
334        let rt = tokio::runtime::Runtime::new().unwrap();
335        let _guard = rt.enter();
336        f();
337    }
338
339    #[test]
340    fn test_new_removes_trailing_slash() {
341        with_runtime(|| {
342            let client = PrometheusClient::new("http://localhost:9090/");
343            assert_eq!(client.base_url, "http://localhost:9090");
344        });
345    }
346
347    #[test]
348    fn test_new_adds_http_protocol() {
349        with_runtime(|| {
350            let client = PrometheusClient::new("localhost:9090");
351            assert_eq!(client.base_url, "http://localhost:9090");
352        });
353    }
354
355    #[test]
356    fn test_new_preserves_https() {
357        with_runtime(|| {
358            let client = PrometheusClient::new("https://prometheus.example.com");
359            assert_eq!(client.base_url, "https://prometheus.example.com");
360        });
361    }
362
363    #[test]
364    fn test_build_url() {
365        with_runtime(|| {
366            let client = PrometheusClient::new("http://localhost:9090");
367            let request = QueryRequest::new("cpu", "*").with_step(60);
368
369            let url = client.build_url("cpu", &request);
370            assert!(url.starts_with("http://localhost:9090/api/v1/query_range?"));
371            assert!(url.contains("query=cpu"));
372            assert!(url.contains("step=60"));
373        });
374    }
375
376    #[test]
377    fn test_url_encode() {
378        assert_eq!(url_encode("simple"), "simple");
379        assert_eq!(
380            url_encode(r#"cpu{env="prod"}"#),
381            "cpu%7Benv%3D%22prod%22%7D"
382        );
383        assert_eq!(url_encode("rate(m[5m])"), "rate(m%5B5m%5D)");
384    }
385
386    #[test]
387    fn test_backend_type() {
388        with_runtime(|| {
389            let client = PrometheusClient::new("http://localhost:9090");
390            assert_eq!(client.backend_type(), "prometheus");
391        });
392    }
393}