Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! Provides production-ready metrics collection with support for:
12//!
13//! - **`metrics` feature only:** Prometheus scrape endpoint via `/metrics`
14//! - **`otel-metrics` feature only:** OTLP push to OTel-compatible backends
15//! - **Both features:** Fanout recorder sends to both Prometheus AND OTel
16//!
17//! ## Features
18//!
19//! - Counter, Gauge, Histogram metric types
20//! - Automatic process metrics (CPU, memory, file descriptors)
21//! - Container metrics from cgroups (memory limit, CPU limit)
22//! - Built-in HTTP server for `/metrics` endpoint (Prometheus)
23//! - OTLP push to HyperDX, Jaeger, Grafana, etc. (OTel)
24//! - Readiness callback for `/health/ready` endpoints
25//! - Optional scaling pressure endpoint (`/scaling/pressure`)
26//! - Optional memory guard endpoint (`/memory/pressure`)
27//! - Custom route support via [`start_server_with_routes`](MetricsManager::start_server_with_routes)
28//!
29//! ## Basic Example
30//!
31//! ```rust,no_run
32//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
33//!
34//! #[tokio::main]
35//! async fn main() {
36//!     let mut manager = MetricsManager::new("myapp");
37//!
38//!     // Create metrics
39//!     let requests = manager.counter("requests_total", "Total requests");
40//!     let active = manager.gauge("active_connections", "Active connections");
41//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
42//!
43//!     // Start metrics server (simple — built-in endpoints only)
44//!     manager.start_server("0.0.0.0:9090").await.unwrap();
45//!
46//!     // Record metrics
47//!     requests.increment(1);
48//!     active.set(42.0);
49//!     latency.record(0.123);
50//! }
51//! ```
52//!
53//! ## Advanced Example (with custom routes, scaling, memory)
54//!
55//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
56//!
57//! ```rust,ignore
58//! use std::sync::Arc;
59//! use hyperi_rustlib::metrics::MetricsManager;
60//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
61//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
62//! use axum::{Router, routing::post};
63//!
64//! let mut mgr = MetricsManager::new("myapp");
65//!
66//! // Readiness callback
67//! mgr.set_readiness_check(|| true);
68//!
69//! // Attach scaling pressure (adds /scaling/pressure endpoint)
70//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
71//! mgr.set_scaling_pressure(scaling);
72//!
73//! // Attach memory guard (adds /memory/pressure endpoint)
74//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
75//! mgr.set_memory_guard(guard);
76//!
77//! // Service-specific routes
78//! let custom = Router::new()
79//!     .route("/test", post(|| async { "ok" }));
80//!
81//! // Start with everything merged into one server
82//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
83//! ```
84
85mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104/// Readiness check callback type.
105pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120/// Cloneable handle for rendering Prometheus metrics text.
121///
122/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
123/// `axum` route handlers or share across tasks.
124#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130    /// Render current metrics in Prometheus text format.
131    #[must_use]
132    pub fn render(&self) -> String {
133        self.0.render()
134    }
135}
136
137/// Metrics errors.
138#[derive(Debug, Error)]
139pub enum MetricsError {
140    /// Failed to build metrics exporter.
141    #[error("failed to build metrics exporter: {0}")]
142    BuildError(String),
143
144    /// Failed to start metrics server.
145    #[error("failed to start metrics server: {0}")]
146    ServerError(String),
147
148    /// Server already running.
149    #[error("metrics server already running")]
150    AlreadyRunning,
151
152    /// Server not running.
153    #[error("metrics server not running")]
154    NotRunning,
155}
156
157/// Metrics configuration.
158#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160    /// Metric namespace prefix.
161    pub namespace: String,
162    /// Enable process metrics collection.
163    pub enable_process_metrics: bool,
164    /// Enable container metrics collection.
165    pub enable_container_metrics: bool,
166    /// Update interval for auto-collected metrics.
167    pub update_interval: Duration,
168    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
169    #[cfg(feature = "otel-metrics")]
170    pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174    fn default() -> Self {
175        Self {
176            namespace: String::new(),
177            enable_process_metrics: true,
178            enable_container_metrics: true,
179            update_interval: Duration::from_secs(15),
180            #[cfg(feature = "otel-metrics")]
181            otel: OtelMetricsConfig::default(),
182        }
183    }
184}
185
186/// Intermediate struct to pass recorder setup results across cfg boundaries.
187struct RecorderSetup {
188    #[cfg(feature = "metrics")]
189    prom_handle: Option<PrometheusHandle>,
190    #[cfg(feature = "otel-metrics")]
191    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194/// Install the metrics recorder(s) based on enabled features.
195///
196/// Returns setup results containing handles/providers. When both
197/// `metrics` and `otel-metrics` features are enabled, uses `metrics-util`
198/// `FanoutBuilder` to compose both recorders into a single global recorder.
199#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201    // --- Prometheus only (no OTel) ---
202    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203    {
204        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205        let handle = recorder.handle();
206        metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207        RecorderSetup {
208            prom_handle: Some(handle),
209        }
210    }
211
212    // --- OTel only (no Prometheus) ---
213    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214    {
215        match otel::build_otel_recorder(&config.namespace, &config.otel) {
216            Ok((otel_recorder, provider)) => {
217                opentelemetry::global::set_meter_provider(provider.clone());
218                metrics::set_global_recorder(otel_recorder)
219                    .expect("failed to set OTel metrics recorder");
220                RecorderSetup {
221                    otel_provider: Some(provider),
222                }
223            }
224            Err(e) => {
225                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226                RecorderSetup {
227                    otel_provider: None,
228                }
229            }
230        }
231    }
232
233    // --- Both Prometheus + OTel (Fanout) ---
234    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235    {
236        // Build Prometheus recorder (without installing globally)
237        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238        let prom_handle = prom_recorder.handle();
239
240        // Build OTel recorder
241        match otel::build_otel_recorder(&config.namespace, &config.otel) {
242            Ok((otel_recorder, provider)) => {
243                opentelemetry::global::set_meter_provider(provider.clone());
244
245                // Compose via Fanout: both recorders receive every measurement
246                let fanout = metrics_util::layers::FanoutBuilder::default()
247                    .add_recorder(prom_recorder)
248                    .add_recorder(otel_recorder)
249                    .build();
250
251                metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253                RecorderSetup {
254                    prom_handle: Some(prom_handle),
255                    otel_provider: Some(provider),
256                }
257            }
258            Err(e) => {
259                // Fallback: just Prometheus if OTel fails
260                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261                metrics::set_global_recorder(prom_recorder)
262                    .expect("failed to set Prometheus recorder");
263                RecorderSetup {
264                    prom_handle: Some(prom_handle),
265                    otel_provider: None,
266                }
267            }
268        }
269    }
270}
271
272/// Metrics manager handling Prometheus and/or OTel exposition.
273pub struct MetricsManager {
274    #[cfg(feature = "metrics")]
275    handle: Option<PrometheusHandle>,
276    config: MetricsConfig,
277    shutdown_tx: Option<oneshot::Sender<()>>,
278    process_metrics: Option<ProcessMetrics>,
279    container_metrics: Option<ContainerMetrics>,
280    readiness_fn: Option<ReadinessFn>,
281    started: Arc<std::sync::atomic::AtomicBool>,
282    registry: MetricRegistry,
283    #[cfg(all(feature = "metrics", feature = "scaling"))]
284    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
285    #[cfg(all(feature = "metrics", feature = "memory"))]
286    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
287    #[cfg(feature = "otel-metrics")]
288    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
289}
290
291impl MetricsManager {
292    /// Create a new metrics manager with the given namespace.
293    #[must_use]
294    pub fn new(namespace: &str) -> Self {
295        Self::with_config(MetricsConfig {
296            namespace: namespace.to_string(),
297            ..Default::default()
298        })
299    }
300
301    /// Create a metrics manager with custom configuration.
302    ///
303    /// Installs the appropriate recorder(s) based on enabled features:
304    /// - `metrics` only: Prometheus recorder
305    /// - `otel-metrics` only: OTel recorder (OTLP push)
306    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
307    #[must_use]
308    pub fn with_config(config: MetricsConfig) -> Self {
309        let setup = install_recorders(&config);
310
311        let process_metrics = if config.enable_process_metrics {
312            Some(ProcessMetrics::new(&config.namespace))
313        } else {
314            None
315        };
316
317        let container_metrics = if config.enable_container_metrics {
318            Some(ContainerMetrics::new(&config.namespace))
319        } else {
320            None
321        };
322
323        let registry = MetricRegistry::new(&config.namespace);
324
325        Self {
326            #[cfg(feature = "metrics")]
327            handle: setup.prom_handle,
328            registry,
329            config,
330            shutdown_tx: None,
331            process_metrics,
332            container_metrics,
333            readiness_fn: None,
334            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
335            #[cfg(all(feature = "metrics", feature = "scaling"))]
336            scaling_pressure: None,
337            #[cfg(all(feature = "metrics", feature = "memory"))]
338            memory_guard: None,
339            #[cfg(feature = "otel-metrics")]
340            otel_provider: setup.otel_provider,
341        }
342    }
343
344    /// Create a counter metric.
345    ///
346    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
347    /// with empty labels and `group = "custom"`.
348    #[must_use]
349    pub fn counter(&self, name: &str, description: &str) -> Counter {
350        let key = self.prefixed_key(name);
351        let desc = description.to_string();
352        metrics::describe_counter!(key.clone(), desc.clone());
353        self.registry.push(MetricDescriptor {
354            name: key.clone(),
355            metric_type: MetricType::Counter,
356            description: desc,
357            unit: String::new(),
358            labels: vec![],
359            group: "custom".into(),
360            buckets: None,
361            use_cases: vec![],
362            dashboard_hint: None,
363        });
364        metrics::counter!(key)
365    }
366
367    /// Create a counter with label keys and group metadata for the manifest.
368    ///
369    /// The `labels` parameter declares label **key names** for the manifest.
370    /// The returned `Counter` is label-free — apply label values at recording
371    /// time via `metrics::counter!(key, "label" => value)`.
372    #[must_use]
373    pub fn counter_with_labels(
374        &self,
375        name: &str,
376        description: &str,
377        labels: &[&str],
378        group: &str,
379    ) -> Counter {
380        let key = self.prefixed_key(name);
381        let desc = description.to_string();
382        metrics::describe_counter!(key.clone(), desc.clone());
383        self.registry.push(MetricDescriptor {
384            name: key.clone(),
385            metric_type: MetricType::Counter,
386            description: desc,
387            unit: String::new(),
388            labels: labels.iter().map(|s| (*s).to_string()).collect(),
389            group: group.into(),
390            buckets: None,
391            use_cases: vec![],
392            dashboard_hint: None,
393        });
394        metrics::counter!(key)
395    }
396
397    /// Create a gauge metric.
398    ///
399    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
400    /// with empty labels and `group = "custom"`.
401    #[must_use]
402    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
403        let key = self.prefixed_key(name);
404        let desc = description.to_string();
405        metrics::describe_gauge!(key.clone(), desc.clone());
406        self.registry.push(MetricDescriptor {
407            name: key.clone(),
408            metric_type: MetricType::Gauge,
409            description: desc,
410            unit: String::new(),
411            labels: vec![],
412            group: "custom".into(),
413            buckets: None,
414            use_cases: vec![],
415            dashboard_hint: None,
416        });
417        metrics::gauge!(key)
418    }
419
420    /// Create a gauge with label keys and group metadata for the manifest.
421    #[must_use]
422    pub fn gauge_with_labels(
423        &self,
424        name: &str,
425        description: &str,
426        labels: &[&str],
427        group: &str,
428    ) -> Gauge {
429        let key = self.prefixed_key(name);
430        let desc = description.to_string();
431        metrics::describe_gauge!(key.clone(), desc.clone());
432        self.registry.push(MetricDescriptor {
433            name: key.clone(),
434            metric_type: MetricType::Gauge,
435            description: desc,
436            unit: String::new(),
437            labels: labels.iter().map(|s| (*s).to_string()).collect(),
438            group: group.into(),
439            buckets: None,
440            use_cases: vec![],
441            dashboard_hint: None,
442        });
443        metrics::gauge!(key)
444    }
445
446    /// Create a histogram metric with default buckets.
447    ///
448    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
449    /// with empty labels and `group = "custom"`.
450    #[must_use]
451    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
452        let key = self.prefixed_key(name);
453        let desc = description.to_string();
454        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
455        self.registry.push(MetricDescriptor {
456            name: key.clone(),
457            metric_type: MetricType::Histogram,
458            description: desc,
459            unit: "seconds".into(),
460            labels: vec![],
461            group: "custom".into(),
462            buckets: None,
463            use_cases: vec![],
464            dashboard_hint: None,
465        });
466        metrics::histogram!(key)
467    }
468
469    /// Create a histogram with label keys, group, and optional buckets for the manifest.
470    #[must_use]
471    pub fn histogram_with_labels(
472        &self,
473        name: &str,
474        description: &str,
475        labels: &[&str],
476        group: &str,
477        buckets: Option<&[f64]>,
478    ) -> Histogram {
479        let key = self.prefixed_key(name);
480        let desc = description.to_string();
481        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
482        self.registry.push(MetricDescriptor {
483            name: key.clone(),
484            metric_type: MetricType::Histogram,
485            description: desc,
486            unit: "seconds".into(),
487            labels: labels.iter().map(|s| (*s).to_string()).collect(),
488            group: group.into(),
489            buckets: buckets.map(|b| b.to_vec()),
490            use_cases: vec![],
491            dashboard_hint: None,
492        });
493        metrics::histogram!(key)
494    }
495
496    /// Create a histogram metric with custom buckets.
497    ///
498    /// **Note:** The `buckets` parameter is captured in the manifest registry
499    /// but currently ignored by the `metrics` crate at runtime (buckets are set
500    /// globally at recorder installation time).
501    #[must_use]
502    pub fn histogram_with_buckets(
503        &self,
504        name: &str,
505        description: &str,
506        buckets: &[f64],
507    ) -> Histogram {
508        let key = self.prefixed_key(name);
509        let desc = description.to_string();
510        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
511        self.registry.push(MetricDescriptor {
512            name: key.clone(),
513            metric_type: MetricType::Histogram,
514            description: desc,
515            unit: "seconds".into(),
516            labels: vec![],
517            group: "custom".into(),
518            buckets: Some(buckets.to_vec()),
519            use_cases: vec![],
520            dashboard_hint: None,
521        });
522        metrics::histogram!(key)
523    }
524
525    /// Get the Prometheus metrics output.
526    ///
527    /// Returns the rendered Prometheus text format. Only available when
528    /// the `metrics` feature is enabled.
529    #[cfg(feature = "metrics")]
530    #[must_use]
531    pub fn render(&self) -> String {
532        self.handle
533            .as_ref()
534            .map_or_else(String::new, PrometheusHandle::render)
535    }
536
537    /// Get a cloneable render handle for use in route handlers.
538    ///
539    /// Returns a closure that renders the current Prometheus metrics text.
540    /// The closure is `Send + Sync + Clone`, making it safe to move into
541    /// `axum` route handlers or share across tasks via `Arc`.
542    ///
543    /// Returns `None` if no Prometheus recorder is installed.
544    ///
545    /// # Example
546    ///
547    /// ```rust,ignore
548    /// let mut mgr = MetricsManager::new("myapp");
549    /// let render = mgr.render_handle().expect("recorder installed");
550    ///
551    /// // Use in axum route
552    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
553    ///     let r = render.clone();
554    ///     async move { r() }
555    /// }));
556    ///
557    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
558    /// ```
559    #[cfg(feature = "metrics")]
560    #[must_use]
561    pub fn render_handle(&self) -> Option<RenderHandle> {
562        self.handle.clone().map(RenderHandle)
563    }
564
565    /// Set a readiness check callback.
566    ///
567    /// When set, `/readyz` and `/health/ready` call this function and return
568    /// 503 Service Unavailable if it returns `false`. Without a callback,
569    /// these endpoints always return 200.
570    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
571        self.readiness_fn = Some(Arc::new(f));
572    }
573
574    /// Mark the service as started (startup probe passes).
575    ///
576    /// Call this once init is complete. K8s `startupProbe` hits `/startupz`
577    /// which returns 503 until this is called, then 200 thereafter.
578    /// Separate from readiness — startup has a longer timeout for slow starters.
579    pub fn mark_started(&self) {
580        self.started
581            .store(true, std::sync::atomic::Ordering::Release);
582    }
583
584    /// Get a clone of the started flag (for passing to HTTP handler).
585    pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
586        Arc::clone(&self.started)
587    }
588
589    /// Attach a `ScalingPressure` instance.
590    ///
591    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
592    /// endpoint is automatically added that returns the current pressure value.
593    #[cfg(all(feature = "metrics", feature = "scaling"))]
594    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
595        self.scaling_pressure = Some(sp);
596    }
597
598    /// Attach a `MemoryGuard` instance.
599    ///
600    /// When set and using `start_server_with_routes`, a `/memory/pressure`
601    /// endpoint is automatically added that returns the current memory status.
602    #[cfg(all(feature = "metrics", feature = "memory"))]
603    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
604        self.memory_guard = Some(mg);
605    }
606
607    /// Update process and container metrics.
608    pub fn update(&self) {
609        if let Some(ref pm) = self.process_metrics {
610            pm.update();
611        }
612        if let Some(ref cm) = self.container_metrics {
613            cm.update();
614        }
615    }
616
617    /// Start the metrics HTTP server.
618    ///
619    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
620    /// `/readyz`, `/health/ready` endpoints.
621    ///
622    /// Only available when the `metrics` feature is enabled (for scraping).
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the server fails to start.
627    #[cfg(feature = "metrics")]
628    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
629        if self.shutdown_tx.is_some() {
630            return Err(MetricsError::AlreadyRunning);
631        }
632
633        let addr: SocketAddr = addr
634            .parse()
635            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
636
637        let listener = TcpListener::bind(addr)
638            .await
639            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
640
641        let (shutdown_tx, shutdown_rx) = oneshot::channel();
642        self.shutdown_tx = Some(shutdown_tx);
643
644        let handle = self
645            .handle
646            .as_ref()
647            .ok_or_else(|| {
648                MetricsError::ServerError(
649                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
650                )
651            })?
652            .clone();
653        let update_interval = self.config.update_interval;
654        let process_metrics = self.process_metrics.clone();
655        let container_metrics = self.container_metrics.clone();
656        let readiness_fn = self.readiness_fn.clone();
657        let started_flag = self.started_flag();
658
659        let registry = self.registry();
660
661        tokio::spawn(async move {
662            run_server(
663                listener,
664                handle,
665                registry,
666                shutdown_rx,
667                update_interval,
668                process_metrics,
669                container_metrics,
670                readiness_fn,
671                started_flag,
672            )
673            .await;
674        });
675
676        Ok(())
677    }
678
679    /// Start the metrics HTTP server with additional custom routes.
680    ///
681    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
682    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
683    ///
684    /// Additionally:
685    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
686    ///   adds `/scaling/pressure` returning the current pressure value.
687    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
688    ///   adds `/memory/pressure` returning memory status JSON.
689    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
690    ///
691    /// Requires both `metrics` and `http-server` features.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the server fails to start.
696    #[cfg(all(feature = "metrics", feature = "http-server"))]
697    pub async fn start_server_with_routes(
698        &mut self,
699        addr: &str,
700        extra_routes: axum::Router,
701    ) -> Result<(), MetricsError> {
702        if self.shutdown_tx.is_some() {
703            return Err(MetricsError::AlreadyRunning);
704        }
705
706        let addr: SocketAddr = addr
707            .parse()
708            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
709
710        let listener = TcpListener::bind(addr)
711            .await
712            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
713
714        let (shutdown_tx, shutdown_rx) = oneshot::channel();
715        self.shutdown_tx = Some(shutdown_tx);
716
717        let handle = self
718            .handle
719            .as_ref()
720            .ok_or_else(|| {
721                MetricsError::ServerError(
722                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
723                )
724            })?
725            .clone();
726        let update_interval = self.config.update_interval;
727        let process_metrics = self.process_metrics.clone();
728        let container_metrics = self.container_metrics.clone();
729        let readiness_fn = self.readiness_fn.clone();
730
731        // Build the axum router with built-in + optional + custom routes
732        let metrics_handle = handle.clone();
733        let readiness_for_live = readiness_fn.clone();
734        let registry_handle = self.registry();
735
736        let mut app = axum::Router::new()
737            .route(
738                "/metrics/manifest",
739                axum::routing::get(move || {
740                    let reg = registry_handle.clone();
741                    async move {
742                        (
743                            [(axum::http::header::CONTENT_TYPE, "application/json")],
744                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
745                        )
746                    }
747                }),
748            )
749            .route(
750                "/metrics",
751                axum::routing::get(move || {
752                    let h = metrics_handle.clone();
753                    async move { h.render() }
754                }),
755            )
756            .route("/startupz", {
757                let sf = self.started_flag();
758                axum::routing::get(move || {
759                    let started = sf.load(std::sync::atomic::Ordering::Acquire);
760                    async move {
761                        if started {
762                            (
763                                axum::http::StatusCode::OK,
764                                [(axum::http::header::CONTENT_TYPE, "application/json")],
765                                r#"{"status":"started"}"#,
766                            )
767                        } else {
768                            (
769                                axum::http::StatusCode::SERVICE_UNAVAILABLE,
770                                [(axum::http::header::CONTENT_TYPE, "application/json")],
771                                r#"{"status":"starting"}"#,
772                            )
773                        }
774                    }
775                })
776            })
777            .route(
778                "/healthz",
779                axum::routing::get(|| async {
780                    (
781                        [(axum::http::header::CONTENT_TYPE, "application/json")],
782                        r#"{"status":"alive"}"#,
783                    )
784                }),
785            )
786            .route(
787                "/health/live",
788                axum::routing::get(|| async {
789                    (
790                        [(axum::http::header::CONTENT_TYPE, "application/json")],
791                        r#"{"status":"alive"}"#,
792                    )
793                }),
794            )
795            .route(
796                "/readyz",
797                axum::routing::get(move || {
798                    let rf = readiness_fn.clone();
799                    async move { readiness_response(rf) }
800                }),
801            )
802            .route(
803                "/health/ready",
804                axum::routing::get(move || {
805                    let rf = readiness_for_live.clone();
806                    async move { readiness_response(rf) }
807                }),
808            );
809
810        // Add scaling pressure endpoint if configured
811        #[cfg(feature = "scaling")]
812        if let Some(ref sp) = self.scaling_pressure {
813            let sp = sp.clone();
814            app = app.route(
815                "/scaling/pressure",
816                axum::routing::get(move || {
817                    let s = sp.clone();
818                    async move { format!("{:.2}", s.calculate()) }
819                }),
820            );
821        }
822
823        // Add memory pressure endpoint if configured
824        #[cfg(feature = "memory")]
825        if let Some(ref mg) = self.memory_guard {
826            let mg = mg.clone();
827            app = app.route(
828                "/memory/pressure",
829                axum::routing::get(move || {
830                    let m = mg.clone();
831                    async move {
832                        (
833                            [(axum::http::header::CONTENT_TYPE, "application/json")],
834                            format!(
835                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
836                                m.under_pressure(),
837                                m.pressure_ratio(),
838                                m.current_bytes(),
839                                m.limit_bytes()
840                            ),
841                        )
842                    }
843                }),
844            );
845        }
846
847        // Merge service-specific routes
848        app = app.merge(extra_routes);
849
850        tokio::spawn(async move {
851            run_axum_server(
852                listener,
853                app,
854                shutdown_rx,
855                update_interval,
856                process_metrics,
857                container_metrics,
858            )
859            .await;
860        });
861
862        Ok(())
863    }
864
865    /// Stop the metrics server.
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the server is not running.
870    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
871        if let Some(tx) = self.shutdown_tx.take() {
872            let _ = tx.send(());
873            Ok(())
874        } else {
875            Err(MetricsError::NotRunning)
876        }
877    }
878
879    /// Gracefully shut down the OTel provider (flushes pending exports).
880    ///
881    /// Call this before application exit to ensure all metrics are exported.
882    #[cfg(feature = "otel-metrics")]
883    pub fn shutdown_otel(&mut self) {
884        if let Some(provider) = self.otel_provider.take()
885            && let Err(e) = provider.shutdown()
886        {
887            tracing::warn!(error = %e, "OTel provider shutdown error");
888        }
889    }
890
891    /// Set application version and git commit for the manifest.
892    ///
893    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
894    /// so only `&self` is needed. Called automatically by
895    /// [`dfe_groups::AppMetrics::new()`] if the `metrics-dfe` feature is enabled.
896    pub fn set_build_info(&self, version: &str, commit: &str) {
897        self.registry.set_build_info(version, commit);
898    }
899
900    /// Set operational use cases for a metric (by full prefixed name).
901    ///
902    /// No-op if the metric is not found in the registry.
903    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
904        self.registry.set_use_cases(metric_name, use_cases);
905    }
906
907    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
908    ///
909    /// No-op if the metric is not found in the registry.
910    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
911        self.registry.set_dashboard_hint(metric_name, hint);
912    }
913
914    /// Get a cloneable handle to the metric registry.
915    ///
916    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
917    /// Call before starting the server, consistent with the `render_handle()` pattern.
918    #[must_use]
919    pub fn registry(&self) -> MetricRegistry {
920        self.registry.clone()
921    }
922
923    /// Get the namespace prefix (e.g. `dfe_loader`).
924    ///
925    /// Used by [`dfe_groups`] metric structs to build labelled metric keys.
926    #[must_use]
927    pub fn namespace(&self) -> &str {
928        &self.config.namespace
929    }
930
931    /// Get prefixed metric name.
932    fn prefixed_key(&self, name: &str) -> String {
933        if self.config.namespace.is_empty() {
934            name.to_string()
935        } else {
936            format!("{}_{}", self.config.namespace, name)
937        }
938    }
939}
940
941/// Run the metrics HTTP server.
942#[cfg(feature = "metrics")]
943#[allow(clippy::too_many_arguments)]
944async fn run_server(
945    listener: TcpListener,
946    handle: PrometheusHandle,
947    registry: MetricRegistry,
948    mut shutdown_rx: oneshot::Receiver<()>,
949    update_interval: Duration,
950    process_metrics: Option<ProcessMetrics>,
951    container_metrics: Option<ContainerMetrics>,
952    readiness_fn: Option<ReadinessFn>,
953    started_flag: Arc<std::sync::atomic::AtomicBool>,
954) {
955    let mut update_interval = tokio::time::interval(update_interval);
956
957    loop {
958        tokio::select! {
959            _ = &mut shutdown_rx => {
960                break;
961            }
962            _ = update_interval.tick() => {
963                if let Some(ref pm) = process_metrics {
964                    pm.update();
965                }
966                if let Some(ref cm) = container_metrics {
967                    cm.update();
968                }
969            }
970            result = listener.accept() => {
971                if let Ok((stream, _)) = result {
972                    let handle = handle.clone();
973                    let registry = registry.clone();
974                    let readiness_fn = readiness_fn.clone();
975                    let sf = Arc::clone(&started_flag);
976                    tokio::spawn(async move {
977                        handle_connection(stream, handle, registry, readiness_fn, &sf).await;
978                    });
979                }
980            }
981        }
982    }
983}
984
985/// Handle a single HTTP connection.
986///
987/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
988/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
989#[cfg(feature = "metrics")]
990async fn handle_connection(
991    mut stream: tokio::net::TcpStream,
992    handle: PrometheusHandle,
993    registry: MetricRegistry,
994    readiness_fn: Option<ReadinessFn>,
995    started_flag: &std::sync::atomic::AtomicBool,
996) {
997    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
998
999    let mut reader = BufReader::new(&mut stream);
1000    let mut request_line = String::new();
1001
1002    if reader.read_line(&mut request_line).await.is_err() {
1003        return;
1004    }
1005
1006    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
1007    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1008        (
1009            "200 OK",
1010            "application/json",
1011            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
1012        )
1013    } else if request_line.starts_with("GET /metrics") {
1014        ("200 OK", "text/plain; charset=utf-8", handle.render())
1015    } else if request_line.starts_with("GET /startupz")
1016        || request_line.starts_with("GET /health/startup")
1017    {
1018        if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1019            (
1020                "200 OK",
1021                "application/json",
1022                r#"{"status":"started"}"#.to_string(),
1023            )
1024        } else {
1025            (
1026                "503 Service Unavailable",
1027                "application/json",
1028                r#"{"status":"starting"}"#.to_string(),
1029            )
1030        }
1031    } else if request_line.starts_with("GET /healthz")
1032        || request_line.starts_with("GET /health/live")
1033    {
1034        (
1035            "200 OK",
1036            "application/json",
1037            r#"{"status":"alive"}"#.to_string(),
1038        )
1039    } else if request_line.starts_with("GET /readyz")
1040        || request_line.starts_with("GET /health/ready")
1041    {
1042        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1043
1044        #[cfg(feature = "health")]
1045        let registry_ready = crate::health::HealthRegistry::is_ready();
1046        #[cfg(not(feature = "health"))]
1047        let registry_ready = true;
1048
1049        let ready = callback_ready && registry_ready;
1050        if ready {
1051            (
1052                "200 OK",
1053                "application/json",
1054                r#"{"status":"ready"}"#.to_string(),
1055            )
1056        } else {
1057            (
1058                "503 Service Unavailable",
1059                "application/json",
1060                r#"{"status":"not_ready"}"#.to_string(),
1061            )
1062        }
1063    } else {
1064        (
1065            "404 Not Found",
1066            "text/plain; charset=utf-8",
1067            "Not Found".to_string(),
1068        )
1069    };
1070
1071    let response = format!(
1072        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1073        body.len()
1074    );
1075
1076    let _ = stream.write_all(response.as_bytes()).await;
1077}
1078
1079/// Readiness response helper for axum endpoints.
1080///
1081/// Checks the caller-supplied readiness callback AND (when the `health`
1082/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1083/// Both must be true for a 200 response.
1084#[cfg(all(feature = "metrics", feature = "http-server"))]
1085fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1086    use axum::response::IntoResponse;
1087
1088    let callback_ready = rf.as_ref().is_none_or(|f| f());
1089
1090    #[cfg(feature = "health")]
1091    let registry_ready = crate::health::HealthRegistry::is_ready();
1092    #[cfg(not(feature = "health"))]
1093    let registry_ready = true;
1094
1095    let ready = callback_ready && registry_ready;
1096    if ready {
1097        (
1098            [(axum::http::header::CONTENT_TYPE, "application/json")],
1099            r#"{"status":"ready"}"#,
1100        )
1101            .into_response()
1102    } else {
1103        (
1104            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1105            [(axum::http::header::CONTENT_TYPE, "application/json")],
1106            r#"{"status":"not_ready"}"#,
1107        )
1108            .into_response()
1109    }
1110}
1111
1112/// Run the axum-based metrics HTTP server with custom routes.
1113#[cfg(all(feature = "metrics", feature = "http-server"))]
1114async fn run_axum_server(
1115    listener: TcpListener,
1116    app: axum::Router,
1117    shutdown_rx: oneshot::Receiver<()>,
1118    update_interval: Duration,
1119    process_metrics: Option<ProcessMetrics>,
1120    container_metrics: Option<ContainerMetrics>,
1121) {
1122    let mut interval = tokio::time::interval(update_interval);
1123
1124    // Spawn the metrics update loop
1125    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1126    tokio::spawn(async move {
1127        loop {
1128            tokio::select! {
1129                _ = &mut update_stop_rx => break,
1130                _ = interval.tick() => {
1131                    if let Some(ref pm) = process_metrics {
1132                        pm.update();
1133                    }
1134                    if let Some(ref cm) = container_metrics {
1135                        cm.update();
1136                    }
1137                }
1138            }
1139        }
1140    });
1141
1142    // Run axum server with graceful shutdown
1143    axum::serve(listener, app)
1144        .with_graceful_shutdown(async move {
1145            let _ = shutdown_rx.await;
1146        })
1147        .await
1148        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1149
1150    let _ = update_stop_tx.send(());
1151}
1152
1153/// Standard latency histogram buckets.
1154#[must_use]
1155pub fn latency_buckets() -> Vec<f64> {
1156    vec![
1157        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1158    ]
1159}
1160
1161/// Standard size histogram buckets.
1162#[must_use]
1163pub fn size_buckets() -> Vec<f64> {
1164    vec![
1165        100.0,
1166        1_000.0,
1167        10_000.0,
1168        100_000.0,
1169        1_000_000.0,
1170        10_000_000.0,
1171    ]
1172}
1173
1174#[cfg(test)]
1175mod tests {
1176    use super::*;
1177
1178    #[test]
1179    fn test_metrics_config_default() {
1180        let config = MetricsConfig::default();
1181        assert!(config.namespace.is_empty());
1182        assert!(config.enable_process_metrics);
1183        assert!(config.enable_container_metrics);
1184        assert_eq!(config.update_interval, Duration::from_secs(15));
1185    }
1186
1187    #[test]
1188    fn test_latency_buckets() {
1189        let buckets = latency_buckets();
1190        assert_eq!(buckets.len(), 12);
1191        assert!(buckets[0] < buckets[11]);
1192    }
1193
1194    #[test]
1195    fn test_size_buckets() {
1196        let buckets = size_buckets();
1197        assert_eq!(buckets.len(), 6);
1198        assert!(buckets[0] < buckets[5]);
1199    }
1200}