prometheus_mcp/mcp/
prometheus_client.rs

1use crate::mcp::prometheus_config::PrometheusConfig;
2use reqwest::{Client, Error as ReqwestError, RequestBuilder};
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::HashMap;
6use std::fmt;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9/// Prometheus API client
10pub struct PrometheusClient {
11    pub(crate) config: PrometheusConfig,
12    client: Client,
13}
14
15/// Prometheus query result
16#[derive(Debug, Deserialize, Serialize)]
17pub struct PrometheusQueryResult {
18    pub status: String,
19    pub data: PrometheusData,
20}
21
22/// Prometheus data
23#[derive(Debug, Deserialize, Serialize)]
24pub struct PrometheusData {
25    #[serde(rename = "resultType")]
26    pub result_type: String,
27    pub result: Vec<PrometheusResult>,
28}
29
30/// Prometheus result
31#[derive(Debug, Deserialize, Serialize)]
32pub struct PrometheusResult {
33    pub metric: HashMap<String, String>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub value: Option<(f64, String)>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub values: Option<Vec<(f64, String)>>,
38}
39
40/// Metadata about a metric
41#[derive(Debug, Deserialize, Serialize)]
42pub struct MetricMetadata {
43    pub metric: String,
44    #[serde(rename = "type")]
45    pub type_name: String,
46    pub help: String,
47    pub unit: String,
48}
49
50/// Prometheus API error
51#[derive(Debug)]
52#[allow(clippy::enum_variant_names)]
53pub enum PrometheusError {
54    /// Error from reqwest
55    ReqwestError(ReqwestError),
56    /// Error from Prometheus API
57    ApiError(String),
58    /// Error parsing response
59    ParseError(String),
60    /// Error building HTTP client
61    BuildClientError(String),
62}
63
64impl fmt::Display for PrometheusError {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        match self {
67            PrometheusError::ReqwestError(e) => write!(f, "HTTP error: {}", e),
68            PrometheusError::ApiError(msg) => write!(f, "Prometheus API error: {}", msg),
69            PrometheusError::ParseError(msg) => write!(f, "Parse error: {}", msg),
70            PrometheusError::BuildClientError(msg) => write!(f, "Client build error: {}", msg),
71        }
72    }
73}
74
75impl std::error::Error for PrometheusError {}
76
77impl From<ReqwestError> for PrometheusError {
78    fn from(error: ReqwestError) -> Self {
79        PrometheusError::ReqwestError(error)
80    }
81}
82
83impl PrometheusClient {
84    /// Create a new PrometheusClient with the given configuration
85    pub fn new(config: PrometheusConfig) -> Result<Self, PrometheusError> {
86        let builder = Client::builder().timeout(config.timeout);
87        let client = builder
88            .build()
89            .map_err(|e| PrometheusError::BuildClientError(e.to_string()))?;
90
91        Ok(Self { config, client })
92    }
93
94    /// Apply basic auth if configured
95    fn build_get(&self, url: &str) -> RequestBuilder {
96        let rb = self.client.get(url);
97        match (&self.config.username, &self.config.password) {
98            (Some(user), Some(pass)) => rb.basic_auth(user, Some(pass)),
99            _ => rb,
100        }
101    }
102
103    /// Internal helper: optionally rate-limit, send request, and error-check HTTP status.
104    async fn send_request_response(
105        &self,
106        rb: RequestBuilder,
107        rate_limit: bool,
108    ) -> Result<reqwest::Response, PrometheusError> {
109        if rate_limit {
110            if let Some(min_interval) = self.config.min_request_interval_ms {
111                tokio::time::sleep(Duration::from_millis(min_interval)).await;
112            }
113        }
114        let response = rb.send().await?;
115        if !response.status().is_success() {
116            let status = response.status();
117            let text = response
118                .text()
119                .await
120                .unwrap_or_else(|_| "Unknown error".to_string());
121            return Err(PrometheusError::ApiError(format!(
122                "Prometheus API error: {} - {}",
123                status, text
124            )));
125        }
126        Ok(response)
127    }
128
129    /// Execute an instant query
130    pub async fn query(
131        &self,
132        query: &str,
133        time: Option<&str>,
134    ) -> Result<PrometheusQueryResult, PrometheusError> {
135        let url = format!("{}/api/v1/query", self.config.url);
136        let mut params: Vec<(&str, &str)> = vec![("query", query)];
137
138        let time_holder;
139        if let Some(t) = time {
140            time_holder = t.to_string();
141            params.push(("time", &time_holder));
142        }
143
144        self.execute_with_retry(url, params).await
145    }
146
147    /// Execute a range query
148    pub async fn query_range(
149        &self,
150        query: &str,
151        start: &str,
152        end: &str,
153        step: &str,
154    ) -> Result<PrometheusQueryResult, PrometheusError> {
155        let url = format!("{}/api/v1/query_range", self.config.url);
156        let params = vec![
157            ("query", query),
158            ("start", start),
159            ("end", end),
160            ("step", step),
161        ];
162
163        self.execute_with_retry(url, params).await
164    }
165
166    /// Execute a query with retry
167    async fn execute_with_retry<'a>(
168        &self,
169        url: String,
170        params: Vec<(&'a str, &'a str)>,
171    ) -> Result<PrometheusQueryResult, PrometheusError> {
172        let mut last_error = None;
173
174        for _ in 0..self.config.retries {
175            match self.execute_query(&url, &params).await {
176                Ok(result) => return Ok(result),
177                Err(err) => {
178                    last_error = Some(err);
179                    // Wait a bit before retrying
180                    tokio::time::sleep(Duration::from_millis(self.config.retry_backoff_ms)).await;
181                }
182            }
183        }
184
185        Err(last_error
186            .unwrap_or_else(|| PrometheusError::ApiError("Maximum retries exceeded".to_string())))
187    }
188
189    /// Execute a query
190    async fn execute_query<'a>(
191        &self,
192        url: &str,
193        params: &[(&'a str, &'a str)],
194    ) -> Result<PrometheusQueryResult, PrometheusError> {
195        // Use rate limiting for query endpoints (preserve prior behavior)
196        let rb = self.build_get(url).query(params);
197        let response = self.send_request_response(rb, true).await?;
198
199        let result: PrometheusQueryResult = response.json().await.map_err(|e| {
200            PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
201        })?;
202
203        Ok(result)
204    }
205
206    /// Convert a timestamp to a Prometheus-compatible time string
207    #[allow(dead_code)]
208    pub fn timestamp_to_prometheus_time(timestamp: SystemTime) -> String {
209        match timestamp.duration_since(UNIX_EPOCH) {
210            Ok(since_epoch) => format!("{}.{}", since_epoch.as_secs(), since_epoch.subsec_nanos()),
211            Err(_) => "0".to_string(),
212        }
213    }
214
215    /// Get the current time as a Prometheus-compatible time string
216    #[allow(dead_code)]
217    pub fn current_time() -> String {
218        Self::timestamp_to_prometheus_time(SystemTime::now())
219    }
220
221    /// List all metric names that can be queried from Prometheus
222    pub async fn list_metrics(&self) -> Result<Vec<String>, PrometheusError> {
223        let url = format!("{}/api/v1/label/__name__/values", self.config.url);
224        let rb = self.build_get(&url);
225        // No rate limiting here previously; preserve behavior
226        let response = self.send_request_response(rb, false).await?;
227
228        let value: Value = response.json().await.map_err(|e| {
229            PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
230        })?;
231        let mut out = Vec::new();
232        if let Some(data) = value.get("data") {
233            if let Some(arr) = data.as_array() {
234                for item in arr {
235                    if let Some(s) = item.as_str() {
236                        out.push(s.to_string());
237                    }
238                }
239            }
240        }
241        Ok(out)
242    }
243
244    /// Get metadata about a specific metric
245    pub async fn get_metadata(&self, metric: &str) -> Result<Vec<MetricMetadata>, PrometheusError> {
246        let url = format!("{}/api/v1/metadata", self.config.url);
247        let params = vec![("metric", metric)];
248
249        let rb = self.build_get(&url).query(&params);
250        // No rate limiting here previously; preserve behavior
251        let response = self.send_request_response(rb, false).await?;
252
253        let result: Value = response.json().await.map_err(|e| {
254            PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
255        })?;
256
257        let mut metadata = Vec::new();
258        if let Some(data) = result.get("data") {
259            if let Some(metric_data) = data.get(metric) {
260                if let Some(meta_array) = metric_data.as_array() {
261                    for meta in meta_array {
262                        if let (Some(type_val), Some(help), Some(unit)) = (
263                            meta.get("type").and_then(|v| v.as_str()),
264                            meta.get("help").and_then(|v| v.as_str()),
265                            meta.get("unit").and_then(|v| v.as_str()),
266                        ) {
267                            metadata.push(MetricMetadata {
268                                metric: metric.to_string(),
269                                type_name: type_val.to_string(),
270                                help: help.to_string(),
271                                unit: unit.to_string(),
272                            });
273                        }
274                    }
275                }
276            }
277        }
278
279        Ok(metadata)
280    }
281
282    /// Get time series data for a specific metric with optional label matchers
283    pub async fn get_series(
284        &self,
285        match_strings: Vec<&str>,
286    ) -> Result<Vec<HashMap<String, String>>, PrometheusError> {
287        let url = format!("{}/api/v1/series", self.config.url);
288
289        // Build the match[] parameters
290        let mut params = Vec::new();
291        for m in match_strings {
292            params.push(("match[]", m));
293        }
294
295        let rb = self.build_get(&url).query(&params);
296        // No rate limiting here previously; preserve behavior
297        let response = self.send_request_response(rb, false).await?;
298
299        let result: Value = response.json().await.map_err(|e| {
300            PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
301        })?;
302
303        let mut series = Vec::new();
304        if let Some(data) = result.get("data") {
305            if let Some(data_array) = data.as_array() {
306                for item in data_array {
307                    if let Some(obj) = item.as_object() {
308                        let mut series_item = HashMap::new();
309                        for (k, v) in obj {
310                            if let Some(value_str) = v.as_str() {
311                                series_item.insert(k.clone(), value_str.to_string());
312                            }
313                        }
314                        series.push(series_item);
315                    }
316                }
317            }
318        }
319
320        Ok(series)
321    }
322
323    /// Get all label values for a specific label name
324    pub async fn get_label_values(&self, label_name: &str) -> Result<Vec<String>, PrometheusError> {
325        let url = format!("{}/api/v1/label/{}/values", self.config.url, label_name);
326
327        let rb = self.build_get(&url);
328        // No rate limiting here previously; preserve behavior
329        let response = self.send_request_response(rb, false).await?;
330
331        let result: Value = response.json().await.map_err(|e| {
332            PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
333        })?;
334
335        let mut values = Vec::new();
336        if let Some(data) = result.get("data") {
337            if let Some(data_array) = data.as_array() {
338                for item in data_array {
339                    if let Some(value_str) = item.as_str() {
340                        values.push(value_str.to_string());
341                    }
342                }
343            }
344        }
345
346        Ok(values)
347    }
348}