use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use crate::collector::{Collector, CollectorError, Schedule};
use crate::storage::{MetricCategory, MetricSeries, MetricValue, StaticTags, StorageWriter};
const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_EXPECTED_STATUS: u16 = 200;
const FAILURE_LATENCY_MS: f64 = -1.0;
fn default_enabled() -> bool {
true
}
fn default_group() -> String {
"default".to_string()
}
fn default_method() -> HttpMethod {
HttpMethod::Get
}
fn default_expected_status() -> u16 {
DEFAULT_EXPECTED_STATUS
}
fn default_timeout() -> Duration {
DEFAULT_TIMEOUT
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum HttpMethod {
#[default]
Get,
Post,
Head,
Put,
Delete,
Options,
Patch,
}
impl std::str::FromStr for HttpMethod {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_uppercase().as_str() {
"GET" => Ok(Self::Get),
"POST" => Ok(Self::Post),
"HEAD" => Ok(Self::Head),
"PUT" => Ok(Self::Put),
"DELETE" => Ok(Self::Delete),
"OPTIONS" => Ok(Self::Options),
"PATCH" => Ok(Self::Patch),
_ => Err(()),
}
}
}
impl HttpMethod {
pub fn as_str(&self) -> &'static str {
match self {
Self::Get => "GET",
Self::Post => "POST",
Self::Head => "HEAD",
Self::Put => "PUT",
Self::Delete => "DELETE",
Self::Options => "OPTIONS",
Self::Patch => "PATCH",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
pub name: String,
pub url: String,
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(default = "default_group")]
pub group: String,
#[serde(default = "default_method")]
pub method: HttpMethod,
#[serde(default = "default_expected_status")]
pub expected_status: u16,
#[serde(default, with = "humantime_serde")]
pub interval: Option<Duration>,
#[serde(default)]
pub cron: Option<String>,
#[serde(default = "default_timeout", with = "humantime_serde")]
pub timeout: Duration,
#[serde(default)]
pub tags: BTreeMap<String, String>,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub headers: BTreeMap<String, String>,
#[serde(default)]
pub success_conditions: Option<SuccessConditions>,
#[serde(default)]
pub extract_metrics: Vec<MetricExtraction>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SuccessConditions {
#[serde(default)]
pub status_codes: Option<Vec<u16>>,
#[serde(default)]
pub headers: Option<BTreeMap<String, String>>,
#[serde(default)]
pub body_contains: Option<String>,
#[serde(default)]
pub body_jsonpath: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricExtraction {
pub name: String,
pub jsonpath: String,
#[serde(default)]
pub unit: Option<String>,
}
impl HttpConfig {
pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
Self {
name: name.into(),
url: url.into(),
enabled: true,
group: "default".to_string(),
method: HttpMethod::default(),
expected_status: DEFAULT_EXPECTED_STATUS,
interval: Some(DEFAULT_INTERVAL),
cron: None,
timeout: DEFAULT_TIMEOUT,
tags: BTreeMap::new(),
description: None,
headers: BTreeMap::new(),
success_conditions: None,
extract_metrics: Vec::new(),
}
}
pub fn static_tags(&self) -> &StaticTags {
&self.tags
}
pub fn schedule(&self) -> Schedule {
if let Some(ref cron_expr) = self.cron {
Schedule::Cron(cron_expr.clone())
} else {
Schedule::Interval(self.interval.unwrap_or(DEFAULT_INTERVAL))
}
}
pub fn with_method(mut self, method: HttpMethod) -> Self {
self.method = method;
self
}
pub fn with_expected_status(mut self, status: u16) -> Self {
self.expected_status = status;
self
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = Some(interval);
self.cron = None;
self
}
pub fn with_cron(mut self, cron: impl Into<String>) -> Self {
self.cron = Some(cron.into());
self.interval = None;
self
}
pub fn with_schedule(mut self, schedule: Schedule) -> Self {
match schedule {
Schedule::Interval(d) => {
self.interval = Some(d);
self.cron = None;
}
Schedule::Cron(expr) => {
self.cron = Some(expr);
self.interval = None;
}
}
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_static_tags(mut self, tags: StaticTags) -> Self {
self.tags = tags;
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
self.headers = headers;
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_success_conditions(mut self, conditions: SuccessConditions) -> Self {
self.success_conditions = Some(conditions);
self
}
pub fn with_extract_metrics(mut self, metrics: Vec<MetricExtraction>) -> Self {
self.extract_metrics = metrics;
self
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn with_group(mut self, group: impl Into<String>) -> Self {
self.group = group.into();
self
}
}
pub struct HttpCollector {
config: HttpConfig,
writer: StorageWriter,
series_id: u64,
client: Client,
}
impl HttpCollector {
pub fn new(config: HttpConfig, writer: StorageWriter) -> Result<Self, CollectorError> {
let series_id = MetricSeries::compute_series_id(
MetricCategory::NetworkHttp,
&config.name,
&config.url,
config.static_tags(),
);
let client = Client::builder()
.timeout(config.timeout)
.build()
.map_err(|e| CollectorError::Config(format!("Failed to build HTTP client: {}", e)))?;
Ok(Self {
config,
writer,
series_id,
client,
})
}
}
impl std::fmt::Debug for HttpCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpCollector")
.field("config", &self.config)
.field("series_id", &self.series_id)
.finish_non_exhaustive()
}
}
#[async_trait::async_trait]
impl Collector for HttpCollector {
fn name(&self) -> &str {
&self.config.name
}
fn category(&self) -> MetricCategory {
MetricCategory::NetworkHttp
}
fn schedule(&self) -> Schedule {
self.config.schedule()
}
fn upsert_metric_series(&self) -> Result<u64, CollectorError> {
let series = MetricSeries::new(
MetricCategory::NetworkHttp,
self.config.name.clone(),
self.config.url.clone(),
self.config.static_tags().clone(),
self.config.description.clone(),
);
self.writer.upsert_metric_series(series)?;
Ok(self.series_id)
}
async fn collect(&self) -> Result<(), CollectorError> {
let probe_timeout = self.config.timeout;
let mut request = match self.config.method {
HttpMethod::Get => self.client.get(&self.config.url),
HttpMethod::Post => self.client.post(&self.config.url),
HttpMethod::Head => self.client.head(&self.config.url),
HttpMethod::Put => self.client.put(&self.config.url),
HttpMethod::Delete => self.client.delete(&self.config.url),
HttpMethod::Options => self
.client
.request(reqwest::Method::OPTIONS, &self.config.url),
HttpMethod::Patch => self.client.patch(&self.config.url),
};
for (key, value) in &self.config.headers {
request = request.header(key.as_str(), value.as_str());
}
let start = Instant::now();
let result = timeout(probe_timeout, request.send()).await;
let elapsed = start.elapsed();
let duration_ms = elapsed.as_millis().min(u32::MAX as u128) as u32;
let (latency_ms, success, status_code, body_opt) = match result {
Ok(Ok(response)) => {
let status = response.status().as_u16();
let response_headers = response.headers().clone();
let ms = elapsed.as_secs_f64() * 1000.0;
let needs_body = self.config.success_conditions.as_ref().map_or(false, |sc| {
sc.body_contains.is_some() || sc.body_jsonpath.is_some()
}) || !self.config.extract_metrics.is_empty();
let body = if needs_body {
response.text().await.ok()
} else {
None
};
let success = self.evaluate_success(status, &response_headers, body.as_deref());
if success {
tracing::debug!(
name = %self.config.name,
url = %self.config.url,
latency_ms = ms,
status = status,
"HTTP probe successful"
);
} else {
tracing::warn!(
name = %self.config.name,
url = %self.config.url,
latency_ms = ms,
status = status,
expected = self.config.expected_status,
"HTTP probe failed conditions"
);
}
(ms, success, Some(status), body)
}
Ok(Err(e)) => {
tracing::warn!(
name = %self.config.name,
url = %self.config.url,
error = %e,
"HTTP probe failed"
);
(FAILURE_LATENCY_MS, false, None, None)
}
Err(_) => {
tracing::warn!(
name = %self.config.name,
url = %self.config.url,
timeout_ms = probe_timeout.as_millis(),
"HTTP probe timed out"
);
(FAILURE_LATENCY_MS, false, None, None)
}
};
let mut value = MetricValue::new(self.series_id, latency_ms, success)
.with_unit("ms")
.with_duration_ms(duration_ms)
.with_tag("method", self.config.method.as_str());
if let Some(status) = status_code {
value = value.with_tag("status_code", status.to_string());
}
self.writer.insert_metric_value(value)?;
if let Some(body) = body_opt {
self.extract_metrics_from_body(&body)?;
}
Ok(())
}
}
impl HttpCollector {
fn evaluate_success(
&self,
status: u16,
response_headers: &reqwest::header::HeaderMap,
body: Option<&str>,
) -> bool {
let Some(ref conditions) = self.config.success_conditions else {
return status == self.config.expected_status;
};
if let Some(ref allowed_statuses) = conditions.status_codes {
if !allowed_statuses.contains(&status) {
return false;
}
} else if status != self.config.expected_status {
return false;
}
if let Some(ref required_headers) = conditions.headers {
for (key, expected_value) in required_headers {
match response_headers.get(key) {
Some(actual) => {
if actual.to_str().unwrap_or("") != expected_value {
return false;
}
}
None => return false,
}
}
}
if let Some(ref substring) = conditions.body_contains {
if let Some(body_text) = body {
if !body_text.contains(substring) {
return false;
}
} else {
return false;
}
}
if let Some(ref jsonpath_expr) = conditions.body_jsonpath {
if let Some(body_text) = body {
if !self.evaluate_jsonpath_condition(body_text, jsonpath_expr) {
return false;
}
} else {
return false;
}
}
true
}
fn evaluate_jsonpath_condition(&self, body: &str, jsonpath_expr: &str) -> bool {
use serde_json_path::JsonPath;
let Ok(json) = serde_json::from_str::<serde_json::Value>(body) else {
return false;
};
let Ok(path) = jsonpath_expr.parse::<JsonPath>() else {
tracing::warn!(
name = %self.config.name,
jsonpath = %jsonpath_expr,
"Invalid JSONPath expression"
);
return false;
};
let nodes = path.query(&json);
if nodes.is_empty() {
return false;
}
nodes.iter().any(|node| match node {
serde_json::Value::Null => false,
serde_json::Value::Bool(b) => *b,
serde_json::Value::String(s) => !s.is_empty(),
serde_json::Value::Array(a) => !a.is_empty(),
serde_json::Value::Object(o) => !o.is_empty(),
serde_json::Value::Number(_) => true,
})
}
fn extract_metrics_from_body(&self, body: &str) -> Result<(), CollectorError> {
use serde_json_path::JsonPath;
if self.config.extract_metrics.is_empty() {
return Ok(());
}
let json: serde_json::Value = serde_json::from_str(body).map_err(|e| {
CollectorError::Config(format!("Failed to parse response body as JSON: {}", e))
})?;
for metric in &self.config.extract_metrics {
let path = metric.jsonpath.parse::<JsonPath>().map_err(|e| {
CollectorError::Config(format!("Invalid JSONPath '{}': {}", metric.jsonpath, e))
})?;
let nodes = path.query(&json);
if let Some(node) = nodes.first() {
let node_ref: &serde_json::Value = node;
if let Some(value) = node_ref.as_f64() {
tracing::debug!(
name = %self.config.name,
metric_name = %metric.name,
value = value,
"Extracted metric from response"
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_config_defaults() {
let config = HttpConfig::new("api-health", "https://api.example.com/health");
assert_eq!(config.name, "api-health");
assert_eq!(config.url, "https://api.example.com/health");
assert_eq!(config.method, HttpMethod::Get);
assert_eq!(config.expected_status, DEFAULT_EXPECTED_STATUS);
assert!(matches!(config.schedule(), Schedule::Interval(d) if d == DEFAULT_INTERVAL));
assert_eq!(config.timeout, DEFAULT_TIMEOUT);
}
#[test]
fn test_http_config_builder() {
let config = HttpConfig::new("api-post", "https://api.example.com/submit")
.with_method(HttpMethod::Post)
.with_expected_status(201)
.with_interval(Duration::from_secs(60))
.with_timeout(Duration::from_secs(5))
.with_description("API submission endpoint");
assert_eq!(config.method, HttpMethod::Post);
assert_eq!(config.expected_status, 201);
assert!(matches!(config.schedule(), Schedule::Interval(d) if d == Duration::from_secs(60)));
assert_eq!(config.timeout, Duration::from_secs(5));
assert_eq!(
config.description,
Some("API submission endpoint".to_string())
);
}
#[test]
fn test_http_method_from_str() {
assert_eq!("GET".parse::<HttpMethod>().ok(), Some(HttpMethod::Get));
assert_eq!("get".parse::<HttpMethod>().ok(), Some(HttpMethod::Get));
assert_eq!("POST".parse::<HttpMethod>().ok(), Some(HttpMethod::Post));
assert_eq!("head".parse::<HttpMethod>().ok(), Some(HttpMethod::Head));
assert_eq!("put".parse::<HttpMethod>().ok(), Some(HttpMethod::Put));
assert_eq!(
"DELETE".parse::<HttpMethod>().ok(),
Some(HttpMethod::Delete)
);
assert_eq!(
"options".parse::<HttpMethod>().ok(),
Some(HttpMethod::Options)
);
assert_eq!("PATCH".parse::<HttpMethod>().ok(), Some(HttpMethod::Patch));
assert_eq!("INVALID".parse::<HttpMethod>().ok(), None);
}
#[test]
fn test_http_method_as_str() {
assert_eq!(HttpMethod::Get.as_str(), "GET");
assert_eq!(HttpMethod::Post.as_str(), "POST");
assert_eq!(HttpMethod::Head.as_str(), "HEAD");
}
}