use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
pub fn install() -> PrometheusHandle {
PrometheusBuilder::new()
.install_recorder()
.expect("metrics recorder install (must be called once at startup)")
}
#[cfg(test)]
pub(crate) fn test_metrics_handle() -> PrometheusHandle {
use std::sync::OnceLock;
static HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
HANDLE.get_or_init(install).clone()
}
pub mod names {
pub const REQUESTS_TOTAL: &str = "s4_requests_total";
pub const BYTES_IN_TOTAL: &str = "s4_bytes_in_total";
pub const BYTES_OUT_TOTAL: &str = "s4_bytes_out_total";
pub const REQUEST_LATENCY_SECONDS: &str = "s4_request_latency_seconds";
pub const POLICY_DENIALS_TOTAL: &str = "s4_policy_denials_total";
pub const TLS_CERT_RELOAD_TOTAL: &str = "s4_tls_cert_reload_total";
pub const ACME_RENEWAL_TOTAL: &str = "s4_acme_renewal_total";
pub const ACME_CERT_EXPIRY_SECONDS: &str = "s4_acme_cert_expiry_seconds";
pub const RATE_LIMIT_THROTTLED_TOTAL: &str = "s4_rate_limit_throttled_total";
pub const COMPLIANCE_MODE_ACTIVE: &str = "s4_compliance_mode_active";
pub const NOTIFICATIONS_DROPPED_TOTAL: &str = "s4_notifications_dropped_total";
pub const LIFECYCLE_ACTIONS_TOTAL: &str = "s4_lifecycle_actions_total";
pub const REPLICATION_DROPPED_TOTAL: &str = "s4_replication_dropped_total";
pub const REPLICATION_REPLICATED_TOTAL: &str = "s4_replication_replicated_total";
pub const MFA_DELETE_DENIALS_TOTAL: &str = "s4_mfa_delete_denials_total";
pub const GPU_COMPRESS_SECONDS: &str = "s4_gpu_compress_seconds";
pub const GPU_DECOMPRESS_SECONDS: &str = "s4_gpu_decompress_seconds";
pub const GPU_THROUGHPUT_BYTES_PER_SEC: &str = "s4_gpu_throughput_bytes_per_sec";
pub const GPU_IN_FLIGHT: &str = "s4_gpu_in_flight";
pub const GPU_OOM_TOTAL: &str = "s4_gpu_oom_total";
pub const SSE_AES_BACKEND: &str = "s4_sse_aes_backend";
pub const SSE_STREAMING_CHUNKS_TOTAL: &str = "s4_sse_streaming_chunks_total";
pub const MULTIPART_ABANDONED_UPLOADS_TOTAL: &str = "s4_multipart_abandoned_uploads_total";
pub const REPLICATION_STATUS_SWEPT_TOTAL: &str = "s4_replication_status_swept_total";
pub const REPLICATION_LOCK_PROPAGATION_SKIPPED_TOTAL: &str =
"s4_replication_lock_propagation_skipped_total";
pub const STATE_FILE_LOAD_FAILURES_TOTAL: &str = "s4_state_file_load_failures_total";
pub const LOCK_POISON_RECOVERY_TOTAL: &str = "s4_lock_poison_recovery_total";
}
pub fn record_lock_poison_recovery(lock: &'static str, kind: &'static str) {
metrics::counter!(
names::LOCK_POISON_RECOVERY_TOTAL,
"lock" => lock,
"kind" => kind,
)
.increment(1);
}
pub fn record_state_file_load_failure(manager: &'static str, reason: &'static str) {
metrics::counter!(
names::STATE_FILE_LOAD_FAILURES_TOTAL,
"manager" => manager,
"reason" => reason,
)
.increment(1);
}
pub const SSE_AES_BACKEND: &str = names::SSE_AES_BACKEND;
pub fn record_sse_aes_backend(kind: &'static str) {
metrics::gauge!(SSE_AES_BACKEND, "kind" => kind).set(1.0);
}
pub fn record_sse_streaming_chunk(op: &'static str) {
metrics::counter!(names::SSE_STREAMING_CHUNKS_TOTAL, "op" => op).increment(1);
}
pub fn record_multipart_abandoned(count: u64) {
metrics::counter!(names::MULTIPART_ABANDONED_UPLOADS_TOTAL).increment(count);
}
pub fn record_replication_status_swept(count: u64) {
metrics::counter!(names::REPLICATION_STATUS_SWEPT_TOTAL).increment(count);
}
pub fn record_gpu_compress(codec: &'static str, secs: f64, bytes_in: u64, bytes_out: u64) {
metrics::histogram!(names::GPU_COMPRESS_SECONDS, "codec" => codec).record(secs);
let throughput = (bytes_in as f64) / secs.max(1e-9);
metrics::gauge!(
names::GPU_THROUGHPUT_BYTES_PER_SEC,
"codec" => codec,
"op" => "compress",
)
.set(throughput);
let _ = bytes_out;
}
pub fn record_gpu_decompress(codec: &'static str, secs: f64, bytes_in: u64, bytes_out: u64) {
metrics::histogram!(names::GPU_DECOMPRESS_SECONDS, "codec" => codec).record(secs);
let throughput = (bytes_out as f64) / secs.max(1e-9);
metrics::gauge!(
names::GPU_THROUGHPUT_BYTES_PER_SEC,
"codec" => codec,
"op" => "decompress",
)
.set(throughput);
let _ = bytes_in;
}
pub fn record_gpu_in_flight_inc(codec: &'static str) {
metrics::gauge!(names::GPU_IN_FLIGHT, "codec" => codec).increment(1.0);
}
pub fn record_gpu_in_flight_dec(codec: &'static str) {
metrics::gauge!(names::GPU_IN_FLIGHT, "codec" => codec).decrement(1.0);
}
pub fn record_gpu_oom(codec: &'static str) {
metrics::counter!(names::GPU_OOM_TOTAL, "codec" => codec).increment(1);
}
pub fn record_mfa_delete_denial(bucket: &str) {
metrics::counter!(
names::MFA_DELETE_DENIALS_TOTAL,
"bucket" => bucket.to_owned(),
)
.increment(1);
}
pub fn record_replication_drop(bucket: &str) {
metrics::counter!(
names::REPLICATION_DROPPED_TOTAL,
"bucket" => bucket.to_owned(),
)
.increment(1);
}
pub fn record_replication_replicated(bucket: &str, dest: &str) {
metrics::counter!(
names::REPLICATION_REPLICATED_TOTAL,
"bucket" => bucket.to_owned(),
"dest" => dest.to_owned(),
)
.increment(1);
}
pub fn record_replication_lock_propagation_skipped() {
metrics::counter!(names::REPLICATION_LOCK_PROPAGATION_SKIPPED_TOTAL).increment(1);
}
pub fn record_lifecycle_action(bucket: &str, action: &'static str) {
metrics::counter!(
names::LIFECYCLE_ACTIONS_TOTAL,
"bucket" => bucket.to_owned(),
"action" => action,
)
.increment(1);
}
pub fn record_notification_drop(dest_type: &'static str) {
metrics::counter!(
names::NOTIFICATIONS_DROPPED_TOTAL,
"dest" => dest_type,
)
.increment(1);
}
pub fn record_compliance_mode_active(mode: &'static str) {
metrics::gauge!(names::COMPLIANCE_MODE_ACTIVE, "mode" => mode).set(1.0);
}
pub fn record_rate_limit_throttle(principal: &str, bucket: &str) {
metrics::counter!(
names::RATE_LIMIT_THROTTLED_TOTAL,
"principal" => principal.to_owned(),
"bucket" => bucket.to_owned(),
)
.increment(1);
}
pub fn record_tls_cert_reload(ok: bool) {
let result = if ok { "ok" } else { "err" };
metrics::counter!(names::TLS_CERT_RELOAD_TOTAL, "result" => result).increment(1);
}
pub fn record_acme_renewal(result: &'static str) {
metrics::counter!(names::ACME_RENEWAL_TOTAL, "result" => result).increment(1);
}
pub fn record_acme_renewal_timeout() {
record_acme_renewal("timeout");
}
pub fn record_acme_cert_expiry(seconds_until_expiry: f64) {
metrics::gauge!(names::ACME_CERT_EXPIRY_SECONDS).set(seconds_until_expiry);
}
pub fn record_policy_denial(action: &'static str, bucket: &str) {
metrics::counter!(
names::POLICY_DENIALS_TOTAL,
"action" => action,
"bucket" => bucket.to_owned(),
)
.increment(1);
}
pub fn record_put(codec: &'static str, bytes_in: u64, bytes_out: u64, latency_secs: f64, ok: bool) {
let result = if ok { "ok" } else { "err" };
metrics::counter!(names::REQUESTS_TOTAL, "op" => "put", "codec" => codec, "result" => result)
.increment(1);
metrics::counter!(names::BYTES_IN_TOTAL, "op" => "put", "codec" => codec).increment(bytes_in);
metrics::counter!(names::BYTES_OUT_TOTAL, "op" => "put", "codec" => codec).increment(bytes_out);
metrics::histogram!(names::REQUEST_LATENCY_SECONDS, "op" => "put", "codec" => codec)
.record(latency_secs);
}
pub fn record_get(codec: &'static str, bytes_in: u64, bytes_out: u64, latency_secs: f64, ok: bool) {
let result = if ok { "ok" } else { "err" };
metrics::counter!(names::REQUESTS_TOTAL, "op" => "get", "codec" => codec, "result" => result)
.increment(1);
metrics::counter!(names::BYTES_IN_TOTAL, "op" => "get", "codec" => codec).increment(bytes_in);
metrics::counter!(names::BYTES_OUT_TOTAL, "op" => "get", "codec" => codec).increment(bytes_out);
metrics::histogram!(names::REQUEST_LATENCY_SECONDS, "op" => "get", "codec" => codec)
.record(latency_secs);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn install_and_render_basic_counters() {
let handle = test_metrics_handle();
record_put("cpu-zstd", 1000, 100, 0.05, true);
record_get("cpu-zstd", 100, 1000, 0.02, true);
record_gpu_compress("nvcomp-zstd", 0.012, 10_000_000, 800_000);
record_gpu_decompress("nvcomp-zstd", 0.008, 800_000, 10_000_000);
record_gpu_in_flight_inc("nvcomp-bitcomp");
record_gpu_in_flight_inc("nvcomp-bitcomp");
record_gpu_in_flight_dec("nvcomp-bitcomp");
record_gpu_oom("nvcomp-gdeflate");
record_sse_aes_backend("aes-ni");
record_sse_aes_backend("software");
record_acme_renewal("ok");
record_acme_renewal("err");
record_acme_renewal_timeout();
let rendered = handle.render();
assert!(rendered.contains("s4_requests_total"));
assert!(rendered.contains("s4_bytes_in_total"));
assert!(rendered.contains("s4_bytes_out_total"));
assert!(rendered.contains("s4_request_latency_seconds"));
assert!(rendered.contains("op=\"put\""));
assert!(rendered.contains("op=\"get\""));
assert!(rendered.contains("codec=\"cpu-zstd\""));
assert!(
rendered.contains("s4_gpu_compress_seconds"),
"missing GPU compress histogram in: {rendered}"
);
assert!(
rendered.contains("s4_gpu_decompress_seconds"),
"missing GPU decompress histogram in: {rendered}"
);
assert!(
rendered.contains("s4_gpu_throughput_bytes_per_sec"),
"missing throughput gauge in: {rendered}"
);
assert!(
rendered.contains("s4_gpu_in_flight"),
"missing in_flight gauge in: {rendered}"
);
assert!(
rendered.contains("s4_gpu_oom_total"),
"missing OOM counter in: {rendered}"
);
assert!(rendered.contains("codec=\"nvcomp-zstd\""));
assert!(rendered.contains("codec=\"nvcomp-bitcomp\""));
assert!(rendered.contains("codec=\"nvcomp-gdeflate\""));
assert!(rendered.contains("op=\"compress\""));
assert!(rendered.contains("op=\"decompress\""));
assert!(
rendered.contains("s4_sse_aes_backend"),
"missing SSE AES backend gauge in: {rendered}"
);
assert!(rendered.contains("kind=\"aes-ni\""));
assert!(rendered.contains("kind=\"software\""));
assert!(
rendered.contains("s4_acme_renewal_total"),
"missing ACME renewal counter in: {rendered}"
);
assert!(
rendered.contains("result=\"ok\""),
"missing result=ok label (ACME) in: {rendered}"
);
assert!(
rendered.contains("result=\"err\""),
"missing result=err label (ACME) in: {rendered}"
);
assert!(
rendered.contains("result=\"timeout\""),
"missing result=timeout label (ACME, v0.8.4 #80) in: {rendered}"
);
}
#[test]
fn gpu_compress_throughput_math() {
let secs = 0.010_f64;
let bytes_in: u64 = 10 * 1024 * 1024;
let bytes_out: u64 = 1024 * 1024;
let expected = (bytes_in as f64) / secs.max(1e-9);
let want_bytes_per_sec: f64 = 10.0 * 1024.0 * 1024.0 / 0.010;
assert!((expected - want_bytes_per_sec).abs() < 1.0);
assert!((expected - 1_048_576_000.0).abs() < 1.0);
record_gpu_compress("nvcomp-zstd", secs, bytes_in, bytes_out);
}
#[test]
fn gpu_decompress_throughput_math() {
let secs = 0.005_f64;
let bytes_in: u64 = 1024 * 1024; let bytes_out: u64 = 10 * 1024 * 1024; let expected = (bytes_out as f64) / secs.max(1e-9);
assert!((expected - 2_097_152_000.0).abs() < 1.0);
record_gpu_decompress("nvcomp-zstd", secs, bytes_in, bytes_out);
}
#[test]
fn gpu_oom_counter_accepts_all_gpu_codecs() {
for codec in ["nvcomp-zstd", "nvcomp-bitcomp", "nvcomp-gdeflate"] {
record_gpu_oom(codec);
}
}
}