use crate::mcp::prometheus_config::PrometheusConfig;
use reqwest::{Client, Error as ReqwestError, RequestBuilder};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct PrometheusClient {
pub(crate) config: PrometheusConfig,
client: Client,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PrometheusQueryResult {
pub status: String,
pub data: PrometheusData,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PrometheusData {
#[serde(rename = "resultType")]
pub result_type: String,
pub result: Vec<PrometheusResult>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PrometheusResult {
pub metric: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<(f64, String)>>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct MetricMetadata {
pub metric: String,
#[serde(rename = "type")]
pub type_name: String,
pub help: String,
pub unit: String,
}
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
pub enum PrometheusError {
ReqwestError(ReqwestError),
ApiError(String),
ParseError(String),
BuildClientError(String),
}
impl fmt::Display for PrometheusError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PrometheusError::ReqwestError(e) => write!(f, "HTTP error: {}", e),
PrometheusError::ApiError(msg) => write!(f, "Prometheus API error: {}", msg),
PrometheusError::ParseError(msg) => write!(f, "Parse error: {}", msg),
PrometheusError::BuildClientError(msg) => write!(f, "Client build error: {}", msg),
}
}
}
impl std::error::Error for PrometheusError {}
impl From<ReqwestError> for PrometheusError {
fn from(error: ReqwestError) -> Self {
PrometheusError::ReqwestError(error)
}
}
impl PrometheusClient {
pub fn new(config: PrometheusConfig) -> Result<Self, PrometheusError> {
let builder = Client::builder().timeout(config.timeout);
let client = builder
.build()
.map_err(|e| PrometheusError::BuildClientError(e.to_string()))?;
Ok(Self { config, client })
}
fn build_get(&self, url: &str) -> RequestBuilder {
let rb = self.client.get(url);
match (&self.config.username, &self.config.password) {
(Some(user), Some(pass)) => rb.basic_auth(user, Some(pass)),
_ => rb,
}
}
async fn send_request_response(
&self,
rb: RequestBuilder,
rate_limit: bool,
) -> Result<reqwest::Response, PrometheusError> {
if rate_limit {
if let Some(min_interval) = self.config.min_request_interval_ms {
tokio::time::sleep(Duration::from_millis(min_interval)).await;
}
}
let response = rb.send().await?;
if !response.status().is_success() {
let status = response.status();
let text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(PrometheusError::ApiError(format!(
"Prometheus API error: {} - {}",
status, text
)));
}
Ok(response)
}
pub async fn query(
&self,
query: &str,
time: Option<&str>,
) -> Result<PrometheusQueryResult, PrometheusError> {
let url = format!("{}/api/v1/query", self.config.url);
let mut params: Vec<(&str, &str)> = vec![("query", query)];
let time_holder;
if let Some(t) = time {
time_holder = t.to_string();
params.push(("time", &time_holder));
}
self.execute_with_retry(url, params).await
}
pub async fn query_range(
&self,
query: &str,
start: &str,
end: &str,
step: &str,
) -> Result<PrometheusQueryResult, PrometheusError> {
let url = format!("{}/api/v1/query_range", self.config.url);
let params = vec![
("query", query),
("start", start),
("end", end),
("step", step),
];
self.execute_with_retry(url, params).await
}
async fn execute_with_retry<'a>(
&self,
url: String,
params: Vec<(&'a str, &'a str)>,
) -> Result<PrometheusQueryResult, PrometheusError> {
let mut last_error = None;
for _ in 0..self.config.retries {
match self.execute_query(&url, ¶ms).await {
Ok(result) => return Ok(result),
Err(err) => {
last_error = Some(err);
tokio::time::sleep(Duration::from_millis(self.config.retry_backoff_ms)).await;
}
}
}
Err(last_error
.unwrap_or_else(|| PrometheusError::ApiError("Maximum retries exceeded".to_string())))
}
async fn execute_query<'a>(
&self,
url: &str,
params: &[(&'a str, &'a str)],
) -> Result<PrometheusQueryResult, PrometheusError> {
let rb = self.build_get(url).query(params);
let response = self.send_request_response(rb, true).await?;
let result: PrometheusQueryResult = response.json().await.map_err(|e| {
PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
})?;
Ok(result)
}
#[allow(dead_code)]
pub fn timestamp_to_prometheus_time(timestamp: SystemTime) -> String {
match timestamp.duration_since(UNIX_EPOCH) {
Ok(since_epoch) => format!("{}.{}", since_epoch.as_secs(), since_epoch.subsec_nanos()),
Err(_) => "0".to_string(),
}
}
#[allow(dead_code)]
pub fn current_time() -> String {
Self::timestamp_to_prometheus_time(SystemTime::now())
}
pub async fn list_metrics(&self) -> Result<Vec<String>, PrometheusError> {
let url = format!("{}/api/v1/label/__name__/values", self.config.url);
let rb = self.build_get(&url);
let response = self.send_request_response(rb, false).await?;
let value: Value = response.json().await.map_err(|e| {
PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
})?;
let mut out = Vec::new();
if let Some(data) = value.get("data") {
if let Some(arr) = data.as_array() {
for item in arr {
if let Some(s) = item.as_str() {
out.push(s.to_string());
}
}
}
}
Ok(out)
}
pub async fn get_metadata(&self, metric: &str) -> Result<Vec<MetricMetadata>, PrometheusError> {
let url = format!("{}/api/v1/metadata", self.config.url);
let params = vec![("metric", metric)];
let rb = self.build_get(&url).query(¶ms);
let response = self.send_request_response(rb, false).await?;
let result: Value = response.json().await.map_err(|e| {
PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
})?;
let mut metadata = Vec::new();
if let Some(data) = result.get("data") {
if let Some(metric_data) = data.get(metric) {
if let Some(meta_array) = metric_data.as_array() {
for meta in meta_array {
if let (Some(type_val), Some(help), Some(unit)) = (
meta.get("type").and_then(|v| v.as_str()),
meta.get("help").and_then(|v| v.as_str()),
meta.get("unit").and_then(|v| v.as_str()),
) {
metadata.push(MetricMetadata {
metric: metric.to_string(),
type_name: type_val.to_string(),
help: help.to_string(),
unit: unit.to_string(),
});
}
}
}
}
}
Ok(metadata)
}
pub async fn get_series(
&self,
match_strings: Vec<&str>,
) -> Result<Vec<HashMap<String, String>>, PrometheusError> {
let url = format!("{}/api/v1/series", self.config.url);
let mut params = Vec::new();
for m in match_strings {
params.push(("match[]", m));
}
let rb = self.build_get(&url).query(¶ms);
let response = self.send_request_response(rb, false).await?;
let result: Value = response.json().await.map_err(|e| {
PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
})?;
let mut series = Vec::new();
if let Some(data) = result.get("data") {
if let Some(data_array) = data.as_array() {
for item in data_array {
if let Some(obj) = item.as_object() {
let mut series_item = HashMap::new();
for (k, v) in obj {
if let Some(value_str) = v.as_str() {
series_item.insert(k.clone(), value_str.to_string());
}
}
series.push(series_item);
}
}
}
}
Ok(series)
}
pub async fn get_label_values(&self, label_name: &str) -> Result<Vec<String>, PrometheusError> {
let url = format!("{}/api/v1/label/{}/values", self.config.url, label_name);
let rb = self.build_get(&url);
let response = self.send_request_response(rb, false).await?;
let result: Value = response.json().await.map_err(|e| {
PrometheusError::ParseError(format!("Failed to parse Prometheus response: {}", e))
})?;
let mut values = Vec::new();
if let Some(data) = result.get("data") {
if let Some(data_array) = data.as_array() {
for item in data_array {
if let Some(value_str) = item.as_str() {
values.push(value_str.to_string());
}
}
}
}
Ok(values)
}
}