mod container;
pub mod dfe;
pub mod manifest;
mod process;
#[cfg(feature = "otel-metrics")]
pub(crate) mod otel;
#[cfg(feature = "otel-metrics")]
pub mod otel_types;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use metrics::{Counter, Gauge, Histogram, Unit};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
#[cfg(feature = "metrics")]
use metrics_exporter_prometheus::PrometheusHandle;
pub use container::ContainerMetrics;
pub use dfe::DfeMetrics;
#[cfg(feature = "metrics-dfe")]
pub mod dfe_groups;
pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
pub use process::ProcessMetrics;
#[cfg(feature = "otel-metrics")]
pub use otel_types::{OtelMetricsConfig, OtelProtocol};
#[cfg(feature = "metrics")]
#[derive(Clone)]
pub struct RenderHandle(PrometheusHandle);
#[cfg(feature = "metrics")]
impl RenderHandle {
#[must_use]
pub fn render(&self) -> String {
self.0.render()
}
}
#[derive(Debug, Error)]
pub enum MetricsError {
#[error("failed to build metrics exporter: {0}")]
BuildError(String),
#[error("failed to start metrics server: {0}")]
ServerError(String),
#[error("metrics server already running")]
AlreadyRunning,
#[error("metrics server not running")]
NotRunning,
}
#[derive(Debug, Clone)]
pub struct MetricsConfig {
pub namespace: String,
pub enable_process_metrics: bool,
pub enable_container_metrics: bool,
pub update_interval: Duration,
#[cfg(feature = "otel-metrics")]
pub otel: OtelMetricsConfig,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
namespace: String::new(),
enable_process_metrics: true,
enable_container_metrics: true,
update_interval: Duration::from_secs(15),
#[cfg(feature = "otel-metrics")]
otel: OtelMetricsConfig::default(),
}
}
}
struct RecorderSetup {
#[cfg(feature = "metrics")]
prom_handle: Option<PrometheusHandle>,
#[cfg(feature = "otel-metrics")]
otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
}
#[allow(unused_variables)]
fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
#[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
{
let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
RecorderSetup {
prom_handle: Some(handle),
}
}
#[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
{
match otel::build_otel_recorder(&config.namespace, &config.otel) {
Ok((otel_recorder, provider)) => {
opentelemetry::global::set_meter_provider(provider.clone());
metrics::set_global_recorder(otel_recorder)
.expect("failed to set OTel metrics recorder");
RecorderSetup {
otel_provider: Some(provider),
}
}
Err(e) => {
tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
RecorderSetup {
otel_provider: None,
}
}
}
}
#[cfg(all(feature = "metrics", feature = "otel-metrics"))]
{
let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
let prom_handle = prom_recorder.handle();
match otel::build_otel_recorder(&config.namespace, &config.otel) {
Ok((otel_recorder, provider)) => {
opentelemetry::global::set_meter_provider(provider.clone());
let fanout = metrics_util::layers::FanoutBuilder::default()
.add_recorder(prom_recorder)
.add_recorder(otel_recorder)
.build();
metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
RecorderSetup {
prom_handle: Some(prom_handle),
otel_provider: Some(provider),
}
}
Err(e) => {
tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
metrics::set_global_recorder(prom_recorder)
.expect("failed to set Prometheus recorder");
RecorderSetup {
prom_handle: Some(prom_handle),
otel_provider: None,
}
}
}
}
}
pub struct MetricsManager {
#[cfg(feature = "metrics")]
handle: Option<PrometheusHandle>,
config: MetricsConfig,
shutdown_tx: Option<oneshot::Sender<()>>,
process_metrics: Option<ProcessMetrics>,
container_metrics: Option<ContainerMetrics>,
readiness_fn: Option<ReadinessFn>,
started: Arc<std::sync::atomic::AtomicBool>,
registry: MetricRegistry,
#[cfg(all(feature = "metrics", feature = "scaling"))]
scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
#[cfg(all(feature = "metrics", feature = "memory"))]
memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
#[cfg(feature = "otel-metrics")]
otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
}
impl MetricsManager {
#[must_use]
pub fn new(namespace: &str) -> Self {
Self::with_config(MetricsConfig {
namespace: namespace.to_string(),
..Default::default()
})
}
#[cfg(test)]
pub(crate) fn new_for_test(namespace: &str) -> Self {
let config = MetricsConfig {
namespace: namespace.to_string(),
enable_process_metrics: false,
enable_container_metrics: false,
..Default::default()
};
let registry = MetricRegistry::new(&config.namespace);
Self {
#[cfg(feature = "metrics")]
handle: None,
registry,
config,
shutdown_tx: None,
process_metrics: None,
container_metrics: None,
readiness_fn: None,
started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
#[cfg(all(feature = "metrics", feature = "scaling"))]
scaling_pressure: None,
#[cfg(all(feature = "metrics", feature = "memory"))]
memory_guard: None,
#[cfg(feature = "otel-metrics")]
otel_provider: None,
}
}
#[must_use]
pub fn with_config(config: MetricsConfig) -> Self {
let setup = install_recorders(&config);
let process_metrics = if config.enable_process_metrics {
Some(ProcessMetrics::new(&config.namespace))
} else {
None
};
let container_metrics = if config.enable_container_metrics {
Some(ContainerMetrics::new(&config.namespace))
} else {
None
};
let registry = MetricRegistry::new(&config.namespace);
Self {
#[cfg(feature = "metrics")]
handle: setup.prom_handle,
registry,
config,
shutdown_tx: None,
process_metrics,
container_metrics,
readiness_fn: None,
started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
#[cfg(all(feature = "metrics", feature = "scaling"))]
scaling_pressure: None,
#[cfg(all(feature = "metrics", feature = "memory"))]
memory_guard: None,
#[cfg(feature = "otel-metrics")]
otel_provider: setup.otel_provider,
}
}
#[must_use]
pub fn counter(&self, name: &str, description: &str) -> Counter {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_counter!(key.clone(), desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Counter,
description: desc,
unit: String::new(),
labels: vec![],
group: "custom".into(),
buckets: None,
use_cases: vec![],
dashboard_hint: None,
});
metrics::counter!(key)
}
#[must_use]
pub fn counter_with_labels(
&self,
name: &str,
description: &str,
labels: &[&str],
group: &str,
) -> Counter {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_counter!(key.clone(), desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Counter,
description: desc,
unit: String::new(),
labels: labels.iter().map(|s| (*s).to_string()).collect(),
group: group.into(),
buckets: None,
use_cases: vec![],
dashboard_hint: None,
});
metrics::counter!(key)
}
#[must_use]
pub fn gauge(&self, name: &str, description: &str) -> Gauge {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_gauge!(key.clone(), desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Gauge,
description: desc,
unit: String::new(),
labels: vec![],
group: "custom".into(),
buckets: None,
use_cases: vec![],
dashboard_hint: None,
});
metrics::gauge!(key)
}
#[must_use]
pub fn gauge_with_labels(
&self,
name: &str,
description: &str,
labels: &[&str],
group: &str,
) -> Gauge {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_gauge!(key.clone(), desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Gauge,
description: desc,
unit: String::new(),
labels: labels.iter().map(|s| (*s).to_string()).collect(),
group: group.into(),
buckets: None,
use_cases: vec![],
dashboard_hint: None,
});
metrics::gauge!(key)
}
#[must_use]
pub fn histogram(&self, name: &str, description: &str) -> Histogram {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Histogram,
description: desc,
unit: "seconds".into(),
labels: vec![],
group: "custom".into(),
buckets: None,
use_cases: vec![],
dashboard_hint: None,
});
metrics::histogram!(key)
}
#[must_use]
pub fn histogram_with_labels(
&self,
name: &str,
description: &str,
labels: &[&str],
group: &str,
buckets: Option<&[f64]>,
) -> Histogram {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Histogram,
description: desc,
unit: "seconds".into(),
labels: labels.iter().map(|s| (*s).to_string()).collect(),
group: group.into(),
buckets: buckets.map(|b| b.to_vec()),
use_cases: vec![],
dashboard_hint: None,
});
metrics::histogram!(key)
}
#[must_use]
pub fn histogram_with_buckets(
&self,
name: &str,
description: &str,
buckets: &[f64],
) -> Histogram {
let key = self.prefixed_key(name);
let desc = description.to_string();
metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
self.registry.push(MetricDescriptor {
name: key.clone(),
metric_type: MetricType::Histogram,
description: desc,
unit: "seconds".into(),
labels: vec![],
group: "custom".into(),
buckets: Some(buckets.to_vec()),
use_cases: vec![],
dashboard_hint: None,
});
metrics::histogram!(key)
}
#[cfg(feature = "metrics")]
#[must_use]
pub fn render(&self) -> String {
self.handle
.as_ref()
.map_or_else(String::new, PrometheusHandle::render)
}
#[cfg(feature = "metrics")]
#[must_use]
pub fn render_handle(&self) -> Option<RenderHandle> {
self.handle.clone().map(RenderHandle)
}
pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
self.readiness_fn = Some(Arc::new(f));
}
pub fn mark_started(&self) {
self.started
.store(true, std::sync::atomic::Ordering::Release);
}
pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
Arc::clone(&self.started)
}
#[cfg(all(feature = "metrics", feature = "scaling"))]
pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
self.scaling_pressure = Some(sp);
}
#[cfg(all(feature = "metrics", feature = "memory"))]
pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
self.memory_guard = Some(mg);
}
pub fn update(&self) {
if let Some(ref pm) = self.process_metrics {
pm.update();
}
if let Some(ref cm) = self.container_metrics {
cm.update();
}
}
#[cfg(feature = "metrics")]
pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
if self.shutdown_tx.is_some() {
return Err(MetricsError::AlreadyRunning);
}
let addr: SocketAddr = addr
.parse()
.map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
let listener = TcpListener::bind(addr)
.await
.map_err(|e| MetricsError::ServerError(e.to_string()))?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);
let handle = self
.handle
.as_ref()
.ok_or_else(|| {
MetricsError::ServerError(
"Prometheus handle not configured — MetricsManager was created without a recorder".into(),
)
})?
.clone();
let update_interval = self.config.update_interval;
let process_metrics = self.process_metrics.clone();
let container_metrics = self.container_metrics.clone();
let readiness_fn = self.readiness_fn.clone();
let started_flag = self.started_flag();
let registry = self.registry();
tokio::spawn(async move {
run_server(
listener,
handle,
registry,
shutdown_rx,
update_interval,
process_metrics,
container_metrics,
readiness_fn,
started_flag,
)
.await;
});
Ok(())
}
#[cfg(all(feature = "metrics", feature = "http-server"))]
pub async fn start_server_with_routes(
&mut self,
addr: &str,
extra_routes: axum::Router,
) -> Result<(), MetricsError> {
if self.shutdown_tx.is_some() {
return Err(MetricsError::AlreadyRunning);
}
let addr: SocketAddr = addr
.parse()
.map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
let listener = TcpListener::bind(addr)
.await
.map_err(|e| MetricsError::ServerError(e.to_string()))?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);
let handle = self
.handle
.as_ref()
.ok_or_else(|| {
MetricsError::ServerError(
"Prometheus handle not configured — MetricsManager was created without a recorder".into(),
)
})?
.clone();
let update_interval = self.config.update_interval;
let process_metrics = self.process_metrics.clone();
let container_metrics = self.container_metrics.clone();
let readiness_fn = self.readiness_fn.clone();
let metrics_handle = handle.clone();
let readiness_for_live = readiness_fn.clone();
let registry_handle = self.registry();
let mut app = axum::Router::new()
.route(
"/metrics/manifest",
axum::routing::get(move || {
let reg = registry_handle.clone();
async move {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
serde_json::to_string(®.manifest()).unwrap_or_default(),
)
}
}),
)
.route(
"/metrics",
axum::routing::get(move || {
let h = metrics_handle.clone();
async move { h.render() }
}),
)
.route("/startupz", {
let sf = self.started_flag();
axum::routing::get(move || {
let started = sf.load(std::sync::atomic::Ordering::Acquire);
async move {
if started {
(
axum::http::StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"started"}"#,
)
} else {
(
axum::http::StatusCode::SERVICE_UNAVAILABLE,
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"starting"}"#,
)
}
}
})
})
.route(
"/healthz",
axum::routing::get(|| async {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"alive"}"#,
)
}),
)
.route(
"/health/live",
axum::routing::get(|| async {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"alive"}"#,
)
}),
)
.route(
"/readyz",
axum::routing::get(move || {
let rf = readiness_fn.clone();
async move { readiness_response(rf) }
}),
)
.route(
"/health/ready",
axum::routing::get(move || {
let rf = readiness_for_live.clone();
async move { readiness_response(rf) }
}),
);
#[cfg(feature = "scaling")]
if let Some(ref sp) = self.scaling_pressure {
let sp = sp.clone();
app = app.route(
"/scaling/pressure",
axum::routing::get(move || {
let s = sp.clone();
async move { format!("{:.2}", s.calculate()) }
}),
);
}
#[cfg(feature = "memory")]
if let Some(ref mg) = self.memory_guard {
let mg = mg.clone();
app = app.route(
"/memory/pressure",
axum::routing::get(move || {
let m = mg.clone();
async move {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
format!(
r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
m.under_pressure(),
m.pressure_ratio(),
m.current_bytes(),
m.limit_bytes()
),
)
}
}),
);
}
app = app.merge(extra_routes);
tokio::spawn(async move {
run_axum_server(
listener,
app,
shutdown_rx,
update_interval,
process_metrics,
container_metrics,
)
.await;
});
Ok(())
}
pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
Ok(())
} else {
Err(MetricsError::NotRunning)
}
}
#[cfg(feature = "otel-metrics")]
pub fn shutdown_otel(&mut self) {
if let Some(provider) = self.otel_provider.take()
&& let Err(e) = provider.shutdown()
{
tracing::warn!(error = %e, "OTel provider shutdown error");
}
}
pub fn set_build_info(&self, version: &str, commit: &str) {
self.registry.set_build_info(version, commit);
}
pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
self.registry.set_use_cases(metric_name, use_cases);
}
pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
self.registry.set_dashboard_hint(metric_name, hint);
}
#[must_use]
pub fn registry(&self) -> MetricRegistry {
self.registry.clone()
}
#[must_use]
pub fn namespace(&self) -> &str {
&self.config.namespace
}
fn prefixed_key(&self, name: &str) -> String {
if self.config.namespace.is_empty() {
name.to_string()
} else {
format!("{}_{}", self.config.namespace, name)
}
}
}
#[cfg(feature = "metrics")]
#[allow(clippy::too_many_arguments)]
async fn run_server(
listener: TcpListener,
handle: PrometheusHandle,
registry: MetricRegistry,
mut shutdown_rx: oneshot::Receiver<()>,
update_interval: Duration,
process_metrics: Option<ProcessMetrics>,
container_metrics: Option<ContainerMetrics>,
readiness_fn: Option<ReadinessFn>,
started_flag: Arc<std::sync::atomic::AtomicBool>,
) {
let mut update_interval = tokio::time::interval(update_interval);
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
_ = update_interval.tick() => {
if let Some(ref pm) = process_metrics {
pm.update();
}
if let Some(ref cm) = container_metrics {
cm.update();
}
}
result = listener.accept() => {
if let Ok((stream, _)) = result {
let handle = handle.clone();
let registry = registry.clone();
let readiness_fn = readiness_fn.clone();
let sf = Arc::clone(&started_flag);
tokio::spawn(async move {
handle_connection(stream, handle, registry, readiness_fn, &sf).await;
});
}
}
}
}
}
#[cfg(feature = "metrics")]
async fn handle_connection(
mut stream: tokio::net::TcpStream,
handle: PrometheusHandle,
registry: MetricRegistry,
readiness_fn: Option<ReadinessFn>,
started_flag: &std::sync::atomic::AtomicBool,
) {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
let mut reader = BufReader::new(&mut stream);
let mut request_line = String::new();
if reader.read_line(&mut request_line).await.is_err() {
return;
}
let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
(
"200 OK",
"application/json",
serde_json::to_string(®istry.manifest()).unwrap_or_default(),
)
} else if request_line.starts_with("GET /metrics") {
("200 OK", "text/plain; charset=utf-8", handle.render())
} else if request_line.starts_with("GET /startupz")
|| request_line.starts_with("GET /health/startup")
{
if started_flag.load(std::sync::atomic::Ordering::Acquire) {
(
"200 OK",
"application/json",
r#"{"status":"started"}"#.to_string(),
)
} else {
(
"503 Service Unavailable",
"application/json",
r#"{"status":"starting"}"#.to_string(),
)
}
} else if request_line.starts_with("GET /healthz")
|| request_line.starts_with("GET /health/live")
{
(
"200 OK",
"application/json",
r#"{"status":"alive"}"#.to_string(),
)
} else if request_line.starts_with("GET /readyz")
|| request_line.starts_with("GET /health/ready")
{
let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
#[cfg(feature = "health")]
let registry_ready = crate::health::HealthRegistry::is_ready();
#[cfg(not(feature = "health"))]
let registry_ready = true;
let ready = callback_ready && registry_ready;
if ready {
(
"200 OK",
"application/json",
r#"{"status":"ready"}"#.to_string(),
)
} else {
(
"503 Service Unavailable",
"application/json",
r#"{"status":"not_ready"}"#.to_string(),
)
}
} else {
(
"404 Not Found",
"text/plain; charset=utf-8",
"Not Found".to_string(),
)
};
let response = format!(
"HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(response.as_bytes()).await;
}
#[cfg(all(feature = "metrics", feature = "http-server"))]
fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
use axum::response::IntoResponse;
let callback_ready = rf.as_ref().is_none_or(|f| f());
#[cfg(feature = "health")]
let registry_ready = crate::health::HealthRegistry::is_ready();
#[cfg(not(feature = "health"))]
let registry_ready = true;
let ready = callback_ready && registry_ready;
if ready {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"ready"}"#,
)
.into_response()
} else {
(
axum::http::StatusCode::SERVICE_UNAVAILABLE,
[(axum::http::header::CONTENT_TYPE, "application/json")],
r#"{"status":"not_ready"}"#,
)
.into_response()
}
}
#[cfg(all(feature = "metrics", feature = "http-server"))]
async fn run_axum_server(
listener: TcpListener,
app: axum::Router,
shutdown_rx: oneshot::Receiver<()>,
update_interval: Duration,
process_metrics: Option<ProcessMetrics>,
container_metrics: Option<ContainerMetrics>,
) {
let mut interval = tokio::time::interval(update_interval);
let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut update_stop_rx => break,
_ = interval.tick() => {
if let Some(ref pm) = process_metrics {
pm.update();
}
if let Some(ref cm) = container_metrics {
cm.update();
}
}
}
}
});
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
.unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
let _ = update_stop_tx.send(());
}
#[must_use]
pub fn latency_buckets() -> Vec<f64> {
vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]
}
#[must_use]
pub fn size_buckets() -> Vec<f64> {
vec![
100.0,
1_000.0,
10_000.0,
100_000.0,
1_000_000.0,
10_000_000.0,
]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_config_default() {
let config = MetricsConfig::default();
assert!(config.namespace.is_empty());
assert!(config.enable_process_metrics);
assert!(config.enable_container_metrics);
assert_eq!(config.update_interval, Duration::from_secs(15));
}
#[test]
fn test_latency_buckets() {
let buckets = latency_buckets();
assert_eq!(buckets.len(), 12);
assert!(buckets[0] < buckets[11]);
}
#[test]
fn test_size_buckets() {
let buckets = size_buckets();
assert_eq!(buckets.len(), 6);
assert!(buckets[0] < buckets[5]);
}
}