mockforge_ui/
prometheus_client.rs

1//! Prometheus client for querying metrics
2//!
3//! Provides a client for querying Prometheus metrics API with caching support.
4
5use anyhow::{Context, Result};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10use tracing::{debug, error, warn};
11
12/// Prometheus query result
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PrometheusResponse {
15    pub status: String,
16    pub data: PrometheusData,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct PrometheusData {
21    #[serde(rename = "resultType")]
22    pub result_type: String,
23    pub result: Vec<PrometheusResult>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct PrometheusResult {
28    pub metric: serde_json::Value,
29    pub value: Option<(f64, String)>,
30    pub values: Option<Vec<(f64, String)>>,
31}
32
33/// Cached query result
34#[derive(Clone)]
35struct CachedResult {
36    response: PrometheusResponse,
37    cached_at: Instant,
38}
39
40/// Prometheus client with caching
41#[derive(Clone)]
42pub struct PrometheusClient {
43    base_url: String,
44    client: reqwest::Client,
45    cache: Arc<RwLock<std::collections::HashMap<String, CachedResult>>>,
46    cache_ttl: Duration,
47}
48
49impl PrometheusClient {
50    /// Create a new Prometheus client
51    pub fn new(prometheus_url: String) -> Self {
52        Self {
53            base_url: prometheus_url,
54            client: reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap(),
55            cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
56            cache_ttl: Duration::from_secs(10), // Cache for 10 seconds
57        }
58    }
59
60    /// Create a client with custom cache TTL
61    pub fn with_cache_ttl(prometheus_url: String, cache_ttl: Duration) -> Self {
62        Self {
63            base_url: prometheus_url,
64            client: reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap(),
65            cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
66            cache_ttl,
67        }
68    }
69
70    /// Execute an instant query
71    pub async fn query(&self, query: &str) -> Result<PrometheusResponse> {
72        let cache_key = format!("instant:{}", query);
73
74        // Check cache
75        {
76            let cache = self.cache.read().await;
77            if let Some(cached) = cache.get(&cache_key) {
78                if cached.cached_at.elapsed() < self.cache_ttl {
79                    debug!("Returning cached result for query: {}", query);
80                    return Ok(cached.response.clone());
81                }
82            }
83        }
84
85        // Query Prometheus
86        let url = format!("{}/api/v1/query", self.base_url);
87        debug!("Querying Prometheus: {} with query: {}", url, query);
88
89        let response = self
90            .client
91            .get(&url)
92            .query(&[("query", query)])
93            .send()
94            .await
95            .context("Failed to send request to Prometheus")?;
96
97        if !response.status().is_success() {
98            let status = response.status();
99            let body = response.text().await.unwrap_or_default();
100            error!("Prometheus query failed ({}): {}", status, body);
101            anyhow::bail!("Prometheus query failed: {} - {}", status, body);
102        }
103
104        let result: PrometheusResponse =
105            response.json().await.context("Failed to parse Prometheus response")?;
106
107        // Cache the result
108        {
109            let mut cache = self.cache.write().await;
110            cache.insert(
111                cache_key,
112                CachedResult {
113                    response: result.clone(),
114                    cached_at: Instant::now(),
115                },
116            );
117        }
118
119        Ok(result)
120    }
121
122    /// Execute a range query
123    pub async fn query_range(
124        &self,
125        query: &str,
126        start: i64,
127        end: i64,
128        step: &str,
129    ) -> Result<PrometheusResponse> {
130        let cache_key = format!("range:{}:{}:{}:{}", query, start, end, step);
131
132        // Check cache
133        {
134            let cache = self.cache.read().await;
135            if let Some(cached) = cache.get(&cache_key) {
136                if cached.cached_at.elapsed() < self.cache_ttl {
137                    debug!("Returning cached result for range query: {}", query);
138                    return Ok(cached.response.clone());
139                }
140            }
141        }
142
143        // Query Prometheus
144        let url = format!("{}/api/v1/query_range", self.base_url);
145        debug!("Querying Prometheus range: {} with query: {}", url, query);
146
147        let response = self
148            .client
149            .get(&url)
150            .query(&[
151                ("query", query),
152                ("start", &start.to_string()),
153                ("end", &end.to_string()),
154                ("step", step),
155            ])
156            .send()
157            .await
158            .context("Failed to send request to Prometheus")?;
159
160        if !response.status().is_success() {
161            let status = response.status();
162            let body = response.text().await.unwrap_or_default();
163            error!("Prometheus range query failed ({}): {}", status, body);
164            anyhow::bail!("Prometheus range query failed: {} - {}", status, body);
165        }
166
167        let result: PrometheusResponse =
168            response.json().await.context("Failed to parse Prometheus response")?;
169
170        // Cache the result
171        {
172            let mut cache = self.cache.write().await;
173            cache.insert(
174                cache_key,
175                CachedResult {
176                    response: result.clone(),
177                    cached_at: Instant::now(),
178                },
179            );
180        }
181
182        Ok(result)
183    }
184
185    /// Clear the cache
186    pub async fn clear_cache(&self) {
187        let mut cache = self.cache.write().await;
188        cache.clear();
189        debug!("Prometheus client cache cleared");
190    }
191
192    /// Extract single value from query result
193    pub fn extract_single_value(response: &PrometheusResponse) -> Option<f64> {
194        response
195            .data
196            .result
197            .first()
198            .and_then(|r| r.value.as_ref())
199            .and_then(|(_, v)| v.parse().ok())
200    }
201
202    /// Extract multiple values from query result
203    pub fn extract_values(response: &PrometheusResponse) -> Vec<(String, f64)> {
204        response
205            .data
206            .result
207            .iter()
208            .filter_map(|r| {
209                let label = r.metric.as_object()?.values().next()?.as_str()?.to_string();
210                let value: f64 = r.value.as_ref()?.1.parse().ok()?;
211                Some((label, value))
212            })
213            .collect()
214    }
215
216    /// Extract time series data from range query
217    pub fn extract_time_series(response: &PrometheusResponse) -> Vec<(String, Vec<(i64, f64)>)> {
218        response
219            .data
220            .result
221            .iter()
222            .filter_map(|r| {
223                let label = r
224                    .metric
225                    .as_object()?
226                    .values()
227                    .next()
228                    .and_then(|v| v.as_str())
229                    .unwrap_or("value")
230                    .to_string();
231
232                let values: Vec<(i64, f64)> = r
233                    .values
234                    .as_ref()?
235                    .iter()
236                    .filter_map(|(ts, v)| {
237                        let timestamp = *ts as i64;
238                        let value: f64 = v.parse().ok()?;
239                        Some((timestamp, value))
240                    })
241                    .collect();
242
243                Some((label, values))
244            })
245            .collect()
246    }
247
248    /// Check if Prometheus is reachable
249    pub async fn health_check(&self) -> bool {
250        let url = format!("{}/api/v1/query", self.base_url);
251        match self.client.get(&url).query(&[("query", "up")]).send().await {
252            Ok(response) => response.status().is_success(),
253            Err(e) => {
254                warn!("Prometheus health check failed: {}", e);
255                false
256            }
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn test_client_creation() {
267        let client = PrometheusClient::new("http://localhost:9090".to_string());
268        assert_eq!(client.base_url, "http://localhost:9090");
269    }
270
271    #[test]
272    fn test_client_with_custom_ttl() {
273        let client = PrometheusClient::with_cache_ttl(
274            "http://localhost:9090".to_string(),
275            Duration::from_secs(30),
276        );
277        assert_eq!(client.cache_ttl, Duration::from_secs(30));
278    }
279
280    #[test]
281    fn test_extract_single_value() {
282        let response = PrometheusResponse {
283            status: "success".to_string(),
284            data: PrometheusData {
285                result_type: "vector".to_string(),
286                result: vec![PrometheusResult {
287                    metric: serde_json::json!({}),
288                    value: Some((1234567890.0, "125.5".to_string())),
289                    values: None,
290                }],
291            },
292        };
293
294        let value = PrometheusClient::extract_single_value(&response);
295        assert_eq!(value, Some(125.5));
296    }
297}