1use anyhow::{Context, Result};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10use tracing::{debug, error, warn};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PrometheusResponse {
15 pub status: String,
16 pub data: PrometheusData,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct PrometheusData {
21 #[serde(rename = "resultType")]
22 pub result_type: String,
23 pub result: Vec<PrometheusResult>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct PrometheusResult {
28 pub metric: serde_json::Value,
29 pub value: Option<(f64, String)>,
30 pub values: Option<Vec<(f64, String)>>,
31}
32
33#[derive(Clone)]
35struct CachedResult {
36 response: PrometheusResponse,
37 cached_at: Instant,
38}
39
40#[derive(Clone)]
42pub struct PrometheusClient {
43 base_url: String,
44 client: reqwest::Client,
45 cache: Arc<RwLock<std::collections::HashMap<String, CachedResult>>>,
46 cache_ttl: Duration,
47}
48
49impl PrometheusClient {
50 pub fn new(prometheus_url: String) -> Self {
52 Self {
53 base_url: prometheus_url,
54 client: reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap(),
55 cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
56 cache_ttl: Duration::from_secs(10), }
58 }
59
60 pub fn with_cache_ttl(prometheus_url: String, cache_ttl: Duration) -> Self {
62 Self {
63 base_url: prometheus_url,
64 client: reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap(),
65 cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
66 cache_ttl,
67 }
68 }
69
70 pub async fn query(&self, query: &str) -> Result<PrometheusResponse> {
72 let cache_key = format!("instant:{}", query);
73
74 {
76 let cache = self.cache.read().await;
77 if let Some(cached) = cache.get(&cache_key) {
78 if cached.cached_at.elapsed() < self.cache_ttl {
79 debug!("Returning cached result for query: {}", query);
80 return Ok(cached.response.clone());
81 }
82 }
83 }
84
85 let url = format!("{}/api/v1/query", self.base_url);
87 debug!("Querying Prometheus: {} with query: {}", url, query);
88
89 let response = self
90 .client
91 .get(&url)
92 .query(&[("query", query)])
93 .send()
94 .await
95 .context("Failed to send request to Prometheus")?;
96
97 if !response.status().is_success() {
98 let status = response.status();
99 let body = response.text().await.unwrap_or_default();
100 error!("Prometheus query failed ({}): {}", status, body);
101 anyhow::bail!("Prometheus query failed: {} - {}", status, body);
102 }
103
104 let result: PrometheusResponse =
105 response.json().await.context("Failed to parse Prometheus response")?;
106
107 {
109 let mut cache = self.cache.write().await;
110 cache.insert(
111 cache_key,
112 CachedResult {
113 response: result.clone(),
114 cached_at: Instant::now(),
115 },
116 );
117 }
118
119 Ok(result)
120 }
121
122 pub async fn query_range(
124 &self,
125 query: &str,
126 start: i64,
127 end: i64,
128 step: &str,
129 ) -> Result<PrometheusResponse> {
130 let cache_key = format!("range:{}:{}:{}:{}", query, start, end, step);
131
132 {
134 let cache = self.cache.read().await;
135 if let Some(cached) = cache.get(&cache_key) {
136 if cached.cached_at.elapsed() < self.cache_ttl {
137 debug!("Returning cached result for range query: {}", query);
138 return Ok(cached.response.clone());
139 }
140 }
141 }
142
143 let url = format!("{}/api/v1/query_range", self.base_url);
145 debug!("Querying Prometheus range: {} with query: {}", url, query);
146
147 let response = self
148 .client
149 .get(&url)
150 .query(&[
151 ("query", query),
152 ("start", &start.to_string()),
153 ("end", &end.to_string()),
154 ("step", step),
155 ])
156 .send()
157 .await
158 .context("Failed to send request to Prometheus")?;
159
160 if !response.status().is_success() {
161 let status = response.status();
162 let body = response.text().await.unwrap_or_default();
163 error!("Prometheus range query failed ({}): {}", status, body);
164 anyhow::bail!("Prometheus range query failed: {} - {}", status, body);
165 }
166
167 let result: PrometheusResponse =
168 response.json().await.context("Failed to parse Prometheus response")?;
169
170 {
172 let mut cache = self.cache.write().await;
173 cache.insert(
174 cache_key,
175 CachedResult {
176 response: result.clone(),
177 cached_at: Instant::now(),
178 },
179 );
180 }
181
182 Ok(result)
183 }
184
185 pub async fn clear_cache(&self) {
187 let mut cache = self.cache.write().await;
188 cache.clear();
189 debug!("Prometheus client cache cleared");
190 }
191
192 pub fn extract_single_value(response: &PrometheusResponse) -> Option<f64> {
194 response
195 .data
196 .result
197 .first()
198 .and_then(|r| r.value.as_ref())
199 .and_then(|(_, v)| v.parse().ok())
200 }
201
202 pub fn extract_values(response: &PrometheusResponse) -> Vec<(String, f64)> {
204 response
205 .data
206 .result
207 .iter()
208 .filter_map(|r| {
209 let label = r.metric.as_object()?.values().next()?.as_str()?.to_string();
210 let value: f64 = r.value.as_ref()?.1.parse().ok()?;
211 Some((label, value))
212 })
213 .collect()
214 }
215
216 pub fn extract_time_series(response: &PrometheusResponse) -> Vec<(String, Vec<(i64, f64)>)> {
218 response
219 .data
220 .result
221 .iter()
222 .filter_map(|r| {
223 let label = r
224 .metric
225 .as_object()?
226 .values()
227 .next()
228 .and_then(|v| v.as_str())
229 .unwrap_or("value")
230 .to_string();
231
232 let values: Vec<(i64, f64)> = r
233 .values
234 .as_ref()?
235 .iter()
236 .filter_map(|(ts, v)| {
237 let timestamp = *ts as i64;
238 let value: f64 = v.parse().ok()?;
239 Some((timestamp, value))
240 })
241 .collect();
242
243 Some((label, values))
244 })
245 .collect()
246 }
247
248 pub async fn health_check(&self) -> bool {
250 let url = format!("{}/api/v1/query", self.base_url);
251 match self.client.get(&url).query(&[("query", "up")]).send().await {
252 Ok(response) => response.status().is_success(),
253 Err(e) => {
254 warn!("Prometheus health check failed: {}", e);
255 false
256 }
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_client_creation() {
267 let client = PrometheusClient::new("http://localhost:9090".to_string());
268 assert_eq!(client.base_url, "http://localhost:9090");
269 }
270
271 #[test]
272 fn test_client_with_custom_ttl() {
273 let client = PrometheusClient::with_cache_ttl(
274 "http://localhost:9090".to_string(),
275 Duration::from_secs(30),
276 );
277 assert_eq!(client.cache_ttl, Duration::from_secs(30));
278 }
279
280 #[test]
281 fn test_extract_single_value() {
282 let response = PrometheusResponse {
283 status: "success".to_string(),
284 data: PrometheusData {
285 result_type: "vector".to_string(),
286 result: vec![PrometheusResult {
287 metric: serde_json::json!({}),
288 value: Some((1234567890.0, "125.5".to_string())),
289 values: None,
290 }],
291 },
292 };
293
294 let value = PrometheusClient::extract_single_value(&response);
295 assert_eq!(value, Some(125.5));
296 }
297}