use std::sync::{Mutex, Once};
use std::time::Instant;
use axum::{body::Body, extract::Request, http::Method, middleware::Next, response::Response};
use metrics::Unit;
use metrics::{counter, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
static METRICS_INIT: Once = Once::new();
static METRICS_HANDLE: Mutex<Option<PrometheusHandle>> = Mutex::new(None);
const REQUEST_DURATION_BUCKETS: &[f64] = &[
0.1, 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 60000.0,
];
const OBJECT_SIZE_BUCKETS: &[f64] = &[
1024.0,
65536.0,
1_048_576.0,
10_485_760.0,
104_857_600.0,
1_073_741_824.0,
];
const DEDUP_SAVINGS_RATIO_BUCKETS: &[f64] = &[0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 1.0];
const COMPRESSION_RATIO_BUCKETS: &[f64] = &[0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 1.0, 1.25];
pub fn build_prometheus_builder() -> Result<PrometheusBuilder, Box<dyn std::error::Error>> {
let builder = PrometheusBuilder::new()
.set_buckets_for_metric(
Matcher::Full("rs3gw_request_duration_ms".to_string()),
REQUEST_DURATION_BUCKETS,
)
.map_err(|e| format!("Failed to set request_duration_ms buckets: {e}"))?
.set_buckets_for_metric(
Matcher::Full("rs3gw_object_size_bytes".to_string()),
OBJECT_SIZE_BUCKETS,
)
.map_err(|e| format!("Failed to set object_size_bytes buckets: {e}"))?
.set_buckets_for_metric(
Matcher::Full("rs3gw_dedup_savings_ratio".to_string()),
DEDUP_SAVINGS_RATIO_BUCKETS,
)
.map_err(|e| format!("Failed to set dedup_savings_ratio buckets: {e}"))?
.set_buckets_for_metric(
Matcher::Full("rs3gw_compression_ratio".to_string()),
COMPRESSION_RATIO_BUCKETS,
)
.map_err(|e| format!("Failed to set compression_ratio buckets: {e}"))?;
Ok(builder)
}
pub fn init_metrics() -> Result<PrometheusHandle, Box<dyn std::error::Error>> {
let mut init_error: Option<String> = None;
METRICS_INIT.call_once(|| {
match build_prometheus_builder() {
Ok(builder) => match builder.install_recorder() {
Ok(handle) => {
if let Ok(mut guard) = METRICS_HANDLE.lock() {
*guard = Some(handle);
}
configure_histogram_buckets();
}
Err(_) => {
}
},
Err(e) => {
init_error = Some(e.to_string());
}
}
});
if let Some(e) = init_error {
return Err(e.into());
}
METRICS_HANDLE
.lock()
.map_err(|e| format!("Failed to lock metrics handle: {}", e).into())
.and_then(|guard| {
guard
.as_ref()
.map(|h| h.clone())
.ok_or_else(|| {
"Metrics not initialized: PrometheusBuilder::install_recorder() must be called first.\
This usually means metrics were already initialized elsewhere and the handle was not saved.".into()
})
})
}
pub fn configure_histogram_buckets() {
describe_histogram!(
"rs3gw_request_duration_ms",
Unit::Milliseconds,
"Request latency histogram. Buckets: [0.1, 1, 5, 10, 50, 100, 500, 1000, 5000, 60000] ms"
);
describe_histogram!(
"rs3gw_object_size_bytes",
Unit::Bytes,
"Object size distribution. Buckets: [1024, 65536, 1048576, 10485760, 104857600, 1073741824] bytes"
);
describe_histogram!(
"rs3gw_dedup_savings_ratio",
Unit::Count,
"Dedup savings ratio (bytes_saved/original_bytes). Buckets: [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 1.0]"
);
describe_histogram!(
"rs3gw_compression_ratio",
Unit::Count,
"Compression ratio (compressed/original). Buckets: [0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 1.0, 1.25]"
);
}
pub async fn metrics_layer(request: Request<Body>, next: Next) -> Response {
let start = Instant::now();
let method = request.method().clone();
let path = request.uri().path().to_string();
let query = request.uri().query().map(String::from);
let operation = classify_operation(&method, &path, query.as_deref());
let response = next.run(request).await;
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let status = response.status().as_u16();
record_request(&operation, status);
record_latency(&operation, latency_ms);
response
}
pub async fn metrics_tracker_layer(
axum::extract::State(state): axum::extract::State<crate::AppState>,
request: Request<Body>,
next: Next,
) -> Response {
let method = request.method().clone();
let response = next.run(request).await;
state.metrics_tracker.record_request();
if let Some(content_length) = response.headers().get("content-length") {
if let Ok(len_str) = content_length.to_str() {
if let Ok(len) = len_str.parse::<u64>() {
match method.as_str() {
"GET" | "HEAD" => state.metrics_tracker.record_bytes_downloaded(len),
"PUT" | "POST" => state.metrics_tracker.record_bytes_uploaded(len),
_ => {}
}
}
}
}
response
}
fn has_query_param(query: Option<&str>, param: &str) -> bool {
query.is_some_and(|q| {
q.split('&')
.any(|p| p == param || p.starts_with(&format!("{}=", param)))
})
}
fn classify_operation(method: &Method, path: &str, query: Option<&str>) -> String {
let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
match (method.as_str(), parts.as_slice()) {
(_, ["health"]) => "HealthCheck".to_string(),
(_, ["metrics"]) => "Metrics".to_string(),
("GET", [""]) | ("GET", []) => "ListBuckets".to_string(),
("HEAD", [_bucket]) => "HeadBucket".to_string(),
("PUT", [_bucket]) => {
if has_query_param(query, "versioning") {
"PutBucketVersioning".to_string()
} else if has_query_param(query, "acl") {
"PutBucketAcl".to_string()
} else {
"CreateBucket".to_string()
}
}
("DELETE", [_bucket]) => "DeleteBucket".to_string(),
("POST", [_bucket]) if has_query_param(query, "delete") => "DeleteObjects".to_string(),
("GET", [_bucket]) => {
if has_query_param(query, "location") {
"GetBucketLocation".to_string()
} else if has_query_param(query, "versioning") {
"GetBucketVersioning".to_string()
} else if has_query_param(query, "acl") {
"GetBucketAcl".to_string()
} else if has_query_param(query, "uploads") {
"ListMultipartUploads".to_string()
} else {
"ListObjectsV2".to_string()
}
}
("HEAD", [_bucket, ..]) => "HeadObject".to_string(),
("GET", [_bucket, ..]) => {
if has_query_param(query, "tagging") {
"GetObjectTagging".to_string()
} else if has_query_param(query, "acl") {
"GetObjectAcl".to_string()
} else if has_query_param(query, "uploadId") {
"ListParts".to_string()
} else {
"GetObject".to_string()
}
}
("PUT", [_bucket, ..]) => {
if has_query_param(query, "tagging") {
"PutObjectTagging".to_string()
} else if has_query_param(query, "partNumber") {
"UploadPart".to_string()
} else {
"PutObject".to_string()
}
}
("POST", [_bucket, ..]) => {
if has_query_param(query, "uploads") {
"CreateMultipartUpload".to_string()
} else if has_query_param(query, "uploadId") {
"CompleteMultipartUpload".to_string()
} else {
"PostObject".to_string()
}
}
("DELETE", [_bucket, ..]) => {
if has_query_param(query, "tagging") {
"DeleteObjectTagging".to_string()
} else if has_query_param(query, "uploadId") {
"AbortMultipartUpload".to_string()
} else {
"DeleteObject".to_string()
}
}
_ => "Unknown".to_string(),
}
}
pub fn record_request(operation: &str, status: u16) {
let status_class = match status {
200..=299 => "2xx",
300..=399 => "3xx",
400..=499 => "4xx",
500..=599 => "5xx",
_ => "other",
};
let labels = [
("operation", operation.to_string()),
("status", status_class.to_string()),
];
counter!("rs3gw_requests_total", &labels).increment(1);
}
pub fn record_latency(operation: &str, latency_ms: f64) {
let labels = [("operation", operation.to_string())];
histogram!("rs3gw_request_duration_ms", &labels).record(latency_ms);
}
pub fn record_bytes(direction: &str, bytes: u64) {
let labels = [("direction", direction.to_string())];
counter!("rs3gw_bytes_total", &labels).increment(bytes);
}
pub fn update_storage_stats(bucket_count: usize, object_count: usize, total_size: u64) {
gauge!("rs3gw_buckets_total").set(bucket_count as f64);
gauge!("rs3gw_objects_total").set(object_count as f64);
gauge!("rs3gw_storage_bytes").set(total_size as f64);
}
pub fn record_compression(algorithm: &str, original_size: u64, compressed_size: u64) {
let labels = [("algorithm", algorithm.to_string())];
counter!("rs3gw_compression_original_bytes", &labels).increment(original_size);
counter!("rs3gw_compression_compressed_bytes", &labels).increment(compressed_size);
if original_size > 0 {
let ratio = (compressed_size as f64) / (original_size as f64);
histogram!("rs3gw_compression_ratio", &labels).record(ratio);
}
}
pub fn record_cache_operation(operation: &str, hit: bool) {
let labels = [
("operation", operation.to_string()),
("result", if hit { "hit" } else { "miss" }.to_string()),
];
counter!("rs3gw_cache_operations_total", &labels).increment(1);
}
pub fn update_cache_stats(size_bytes: u64, object_count: usize, hit_rate: f64) {
gauge!("rs3gw_cache_size_bytes").set(size_bytes as f64);
gauge!("rs3gw_cache_objects_total").set(object_count as f64);
gauge!("rs3gw_cache_hit_rate").set(hit_rate);
}
pub fn record_error(error_type: &str, operation: &str) {
let labels = [
("error_type", error_type.to_string()),
("operation", operation.to_string()),
];
counter!("rs3gw_errors_total", &labels).increment(1);
}
pub fn record_cluster_operation(operation: &str, status: &str) {
let labels = [
("operation", operation.to_string()),
("status", status.to_string()),
];
counter!("rs3gw_cluster_operations_total", &labels).increment(1);
}
pub fn update_cluster_health(total_nodes: usize, healthy_nodes: usize, replication_lag_ms: f64) {
gauge!("rs3gw_cluster_nodes_total").set(total_nodes as f64);
gauge!("rs3gw_cluster_healthy_nodes").set(healthy_nodes as f64);
gauge!("rs3gw_cluster_replication_lag_ms").set(replication_lag_ms);
}
pub fn record_storage_class_transition(from_class: &str, to_class: &str, size_bytes: u64) {
let labels = [
("from_class", from_class.to_string()),
("to_class", to_class.to_string()),
];
counter!("rs3gw_storage_class_transitions_total", &labels).increment(1);
counter!("rs3gw_storage_class_transitioned_bytes", &labels).increment(size_bytes);
}
pub fn record_batch_job(job_type: &str, status: &str, objects_processed: u64) {
let labels = [
("job_type", job_type.to_string()),
("status", status.to_string()),
];
counter!("rs3gw_batch_jobs_total", &labels).increment(1);
counter!("rs3gw_batch_objects_processed", &labels).increment(objects_processed);
}
pub fn record_multipart_upload(parts: usize, total_size_bytes: u64, duration_ms: f64) {
histogram!("rs3gw_multipart_parts").record(parts as f64);
histogram!("rs3gw_multipart_size_bytes").record(total_size_bytes as f64);
histogram!("rs3gw_multipart_duration_ms").record(duration_ms);
}
pub fn record_object_size(bucket: &str, size_bytes: u64) {
histogram!("rs3gw_object_size_bytes", "bucket" => bucket.to_string()).record(size_bytes as f64);
}
pub fn record_dedup_savings(bytes_saved: u64, original_bytes: u64) {
counter!("rs3gw_dedup_total_bytes_saved").increment(bytes_saved);
counter!("rs3gw_dedup_operations_total").increment(1);
if original_bytes > 0 {
let ratio = bytes_saved as f64 / original_bytes as f64;
histogram!("rs3gw_dedup_savings_ratio").record(ratio);
}
}