Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! Provides production-ready metrics collection with support for:
12//!
13//! - **`metrics` feature only:** Prometheus scrape endpoint via `/metrics`
14//! - **`otel-metrics` feature only:** OTLP push to OTel-compatible backends
15//! - **Both features:** Fanout recorder sends to both Prometheus AND OTel
16//!
17//! ## Features
18//!
19//! - Counter, Gauge, Histogram metric types
20//! - Automatic process metrics (CPU, memory, file descriptors)
21//! - Container metrics from cgroups (memory limit, CPU limit)
22//! - Built-in HTTP server for `/metrics` endpoint (Prometheus)
23//! - OTLP push to HyperDX, Jaeger, Grafana, etc. (OTel)
24//! - Readiness callback for `/health/ready` endpoints
25//! - Optional scaling pressure endpoint (`/scaling/pressure`)
26//! - Optional memory guard endpoint (`/memory/pressure`)
27//! - Custom route support via [`start_server_with_routes`](MetricsManager::start_server_with_routes)
28//!
29//! ## Basic Example
30//!
31//! ```rust,no_run
32//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
33//!
34//! #[tokio::main]
35//! async fn main() {
36//!     let mut manager = MetricsManager::new("myapp");
37//!
38//!     // Create metrics
39//!     let requests = manager.counter("requests_total", "Total requests");
40//!     let active = manager.gauge("active_connections", "Active connections");
41//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
42//!
43//!     // Start metrics server (simple — built-in endpoints only)
44//!     manager.start_server("0.0.0.0:9090").await.unwrap();
45//!
46//!     // Record metrics
47//!     requests.increment(1);
48//!     active.set(42.0);
49//!     latency.record(0.123);
50//! }
51//! ```
52//!
53//! ## Advanced Example (with custom routes, scaling, memory)
54//!
55//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
56//!
57//! ```rust,ignore
58//! use std::sync::Arc;
59//! use hyperi_rustlib::metrics::MetricsManager;
60//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
61//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
62//! use axum::{Router, routing::post};
63//!
64//! let mut mgr = MetricsManager::new("myapp");
65//!
66//! // Readiness callback
67//! mgr.set_readiness_check(|| true);
68//!
69//! // Attach scaling pressure (adds /scaling/pressure endpoint)
70//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
71//! mgr.set_scaling_pressure(scaling);
72//!
73//! // Attach memory guard (adds /memory/pressure endpoint)
74//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
75//! mgr.set_memory_guard(guard);
76//!
77//! // Service-specific routes
78//! let custom = Router::new()
79//!     .route("/test", post(|| async { "ok" }));
80//!
81//! // Start with everything merged into one server
82//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
83//! ```
84
85mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104/// Readiness check callback type.
105pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120/// Cloneable handle for rendering Prometheus metrics text.
121///
122/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
123/// `axum` route handlers or share across tasks.
124#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130    /// Render current metrics in Prometheus text format.
131    #[must_use]
132    pub fn render(&self) -> String {
133        self.0.render()
134    }
135}
136
137/// Metrics errors.
138#[derive(Debug, Error)]
139pub enum MetricsError {
140    /// Failed to build metrics exporter.
141    #[error("failed to build metrics exporter: {0}")]
142    BuildError(String),
143
144    /// Failed to start metrics server.
145    #[error("failed to start metrics server: {0}")]
146    ServerError(String),
147
148    /// Server already running.
149    #[error("metrics server already running")]
150    AlreadyRunning,
151
152    /// Server not running.
153    #[error("metrics server not running")]
154    NotRunning,
155}
156
157/// Metrics configuration.
158#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160    /// Metric namespace prefix.
161    pub namespace: String,
162    /// Enable process metrics collection.
163    pub enable_process_metrics: bool,
164    /// Enable container metrics collection.
165    pub enable_container_metrics: bool,
166    /// Update interval for auto-collected metrics.
167    pub update_interval: Duration,
168    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
169    #[cfg(feature = "otel-metrics")]
170    pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174    fn default() -> Self {
175        Self {
176            namespace: String::new(),
177            enable_process_metrics: true,
178            enable_container_metrics: true,
179            update_interval: Duration::from_secs(15),
180            #[cfg(feature = "otel-metrics")]
181            otel: OtelMetricsConfig::default(),
182        }
183    }
184}
185
186/// Intermediate struct to pass recorder setup results across cfg boundaries.
187struct RecorderSetup {
188    #[cfg(feature = "metrics")]
189    prom_handle: Option<PrometheusHandle>,
190    #[cfg(feature = "otel-metrics")]
191    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194/// Install the metrics recorder(s) based on enabled features.
195///
196/// Returns setup results containing handles/providers. When both
197/// `metrics` and `otel-metrics` features are enabled, uses `metrics-util`
198/// `FanoutBuilder` to compose both recorders into a single global recorder.
199#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201    // --- Prometheus only (no OTel) ---
202    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203    {
204        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205        let handle = recorder.handle();
206        metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207        RecorderSetup {
208            prom_handle: Some(handle),
209        }
210    }
211
212    // --- OTel only (no Prometheus) ---
213    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214    {
215        match otel::build_otel_recorder(&config.namespace, &config.otel) {
216            Ok((otel_recorder, provider)) => {
217                opentelemetry::global::set_meter_provider(provider.clone());
218                metrics::set_global_recorder(otel_recorder)
219                    .expect("failed to set OTel metrics recorder");
220                RecorderSetup {
221                    otel_provider: Some(provider),
222                }
223            }
224            Err(e) => {
225                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226                RecorderSetup {
227                    otel_provider: None,
228                }
229            }
230        }
231    }
232
233    // --- Both Prometheus + OTel (Fanout) ---
234    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235    {
236        // Build Prometheus recorder (without installing globally)
237        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238        let prom_handle = prom_recorder.handle();
239
240        // Build OTel recorder
241        match otel::build_otel_recorder(&config.namespace, &config.otel) {
242            Ok((otel_recorder, provider)) => {
243                opentelemetry::global::set_meter_provider(provider.clone());
244
245                // Compose via Fanout: both recorders receive every measurement
246                let fanout = metrics_util::layers::FanoutBuilder::default()
247                    .add_recorder(prom_recorder)
248                    .add_recorder(otel_recorder)
249                    .build();
250
251                metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253                RecorderSetup {
254                    prom_handle: Some(prom_handle),
255                    otel_provider: Some(provider),
256                }
257            }
258            Err(e) => {
259                // Fallback: just Prometheus if OTel fails
260                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261                metrics::set_global_recorder(prom_recorder)
262                    .expect("failed to set Prometheus recorder");
263                RecorderSetup {
264                    prom_handle: Some(prom_handle),
265                    otel_provider: None,
266                }
267            }
268        }
269    }
270}
271
272/// Metrics manager handling Prometheus and/or OTel exposition.
273pub struct MetricsManager {
274    #[cfg(feature = "metrics")]
275    handle: Option<PrometheusHandle>,
276    config: MetricsConfig,
277    shutdown_tx: Option<oneshot::Sender<()>>,
278    process_metrics: Option<ProcessMetrics>,
279    container_metrics: Option<ContainerMetrics>,
280    readiness_fn: Option<ReadinessFn>,
281    started: Arc<std::sync::atomic::AtomicBool>,
282    registry: MetricRegistry,
283    #[cfg(all(feature = "metrics", feature = "scaling"))]
284    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
285    #[cfg(all(feature = "metrics", feature = "memory"))]
286    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
287    #[cfg(feature = "otel-metrics")]
288    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
289}
290
291impl MetricsManager {
292    /// Create a new metrics manager with the given namespace.
293    #[must_use]
294    pub fn new(namespace: &str) -> Self {
295        Self::with_config(MetricsConfig {
296            namespace: namespace.to_string(),
297            ..Default::default()
298        })
299    }
300
301    /// Create a metrics manager for tests without installing a global recorder.
302    ///
303    /// The global Prometheus recorder can only be installed once per process.
304    /// Tests that call `MetricsManager::new()` in parallel all race to install
305    /// it and all but the first panic with `SetRecorderError`.
306    ///
307    /// This constructor skips recorder installation entirely. `metrics!` macros
308    /// become no-ops (the crate's documented behaviour when no recorder is set),
309    /// but manifest registry tracking, descriptor push, and namespace logic all
310    /// work normally — which is what the tests actually verify.
311    #[cfg(test)]
312    pub(crate) fn new_for_test(namespace: &str) -> Self {
313        let config = MetricsConfig {
314            namespace: namespace.to_string(),
315            enable_process_metrics: false,
316            enable_container_metrics: false,
317            ..Default::default()
318        };
319
320        let registry = MetricRegistry::new(&config.namespace);
321
322        Self {
323            #[cfg(feature = "metrics")]
324            handle: None,
325            registry,
326            config,
327            shutdown_tx: None,
328            process_metrics: None,
329            container_metrics: None,
330            readiness_fn: None,
331            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
332            #[cfg(all(feature = "metrics", feature = "scaling"))]
333            scaling_pressure: None,
334            #[cfg(all(feature = "metrics", feature = "memory"))]
335            memory_guard: None,
336            #[cfg(feature = "otel-metrics")]
337            otel_provider: None,
338        }
339    }
340
341    /// Create a metrics manager with custom configuration.
342    ///
343    /// Installs the appropriate recorder(s) based on enabled features:
344    /// - `metrics` only: Prometheus recorder
345    /// - `otel-metrics` only: OTel recorder (OTLP push)
346    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
347    #[must_use]
348    pub fn with_config(config: MetricsConfig) -> Self {
349        let setup = install_recorders(&config);
350
351        let process_metrics = if config.enable_process_metrics {
352            Some(ProcessMetrics::new(&config.namespace))
353        } else {
354            None
355        };
356
357        let container_metrics = if config.enable_container_metrics {
358            Some(ContainerMetrics::new(&config.namespace))
359        } else {
360            None
361        };
362
363        let registry = MetricRegistry::new(&config.namespace);
364
365        Self {
366            #[cfg(feature = "metrics")]
367            handle: setup.prom_handle,
368            registry,
369            config,
370            shutdown_tx: None,
371            process_metrics,
372            container_metrics,
373            readiness_fn: None,
374            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
375            #[cfg(all(feature = "metrics", feature = "scaling"))]
376            scaling_pressure: None,
377            #[cfg(all(feature = "metrics", feature = "memory"))]
378            memory_guard: None,
379            #[cfg(feature = "otel-metrics")]
380            otel_provider: setup.otel_provider,
381        }
382    }
383
384    /// Create a counter metric.
385    ///
386    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
387    /// with empty labels and `group = "custom"`.
388    #[must_use]
389    pub fn counter(&self, name: &str, description: &str) -> Counter {
390        let key = self.prefixed_key(name);
391        let desc = description.to_string();
392        metrics::describe_counter!(key.clone(), desc.clone());
393        self.registry.push(MetricDescriptor {
394            name: key.clone(),
395            metric_type: MetricType::Counter,
396            description: desc,
397            unit: String::new(),
398            labels: vec![],
399            group: "custom".into(),
400            buckets: None,
401            use_cases: vec![],
402            dashboard_hint: None,
403        });
404        metrics::counter!(key)
405    }
406
407    /// Create a counter with label keys and group metadata for the manifest.
408    ///
409    /// The `labels` parameter declares label **key names** for the manifest.
410    /// The returned `Counter` is label-free — apply label values at recording
411    /// time via `metrics::counter!(key, "label" => value)`.
412    #[must_use]
413    pub fn counter_with_labels(
414        &self,
415        name: &str,
416        description: &str,
417        labels: &[&str],
418        group: &str,
419    ) -> Counter {
420        let key = self.prefixed_key(name);
421        let desc = description.to_string();
422        metrics::describe_counter!(key.clone(), desc.clone());
423        self.registry.push(MetricDescriptor {
424            name: key.clone(),
425            metric_type: MetricType::Counter,
426            description: desc,
427            unit: String::new(),
428            labels: labels.iter().map(|s| (*s).to_string()).collect(),
429            group: group.into(),
430            buckets: None,
431            use_cases: vec![],
432            dashboard_hint: None,
433        });
434        metrics::counter!(key)
435    }
436
437    /// Create a gauge metric.
438    ///
439    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
440    /// with empty labels and `group = "custom"`.
441    #[must_use]
442    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
443        let key = self.prefixed_key(name);
444        let desc = description.to_string();
445        metrics::describe_gauge!(key.clone(), desc.clone());
446        self.registry.push(MetricDescriptor {
447            name: key.clone(),
448            metric_type: MetricType::Gauge,
449            description: desc,
450            unit: String::new(),
451            labels: vec![],
452            group: "custom".into(),
453            buckets: None,
454            use_cases: vec![],
455            dashboard_hint: None,
456        });
457        metrics::gauge!(key)
458    }
459
460    /// Create a gauge with label keys and group metadata for the manifest.
461    #[must_use]
462    pub fn gauge_with_labels(
463        &self,
464        name: &str,
465        description: &str,
466        labels: &[&str],
467        group: &str,
468    ) -> Gauge {
469        let key = self.prefixed_key(name);
470        let desc = description.to_string();
471        metrics::describe_gauge!(key.clone(), desc.clone());
472        self.registry.push(MetricDescriptor {
473            name: key.clone(),
474            metric_type: MetricType::Gauge,
475            description: desc,
476            unit: String::new(),
477            labels: labels.iter().map(|s| (*s).to_string()).collect(),
478            group: group.into(),
479            buckets: None,
480            use_cases: vec![],
481            dashboard_hint: None,
482        });
483        metrics::gauge!(key)
484    }
485
486    /// Create a histogram metric with default buckets.
487    ///
488    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
489    /// with empty labels and `group = "custom"`.
490    #[must_use]
491    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
492        let key = self.prefixed_key(name);
493        let desc = description.to_string();
494        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
495        self.registry.push(MetricDescriptor {
496            name: key.clone(),
497            metric_type: MetricType::Histogram,
498            description: desc,
499            unit: "seconds".into(),
500            labels: vec![],
501            group: "custom".into(),
502            buckets: None,
503            use_cases: vec![],
504            dashboard_hint: None,
505        });
506        metrics::histogram!(key)
507    }
508
509    /// Create a histogram with label keys, group, and optional buckets for the manifest.
510    #[must_use]
511    pub fn histogram_with_labels(
512        &self,
513        name: &str,
514        description: &str,
515        labels: &[&str],
516        group: &str,
517        buckets: Option<&[f64]>,
518    ) -> Histogram {
519        let key = self.prefixed_key(name);
520        let desc = description.to_string();
521        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
522        self.registry.push(MetricDescriptor {
523            name: key.clone(),
524            metric_type: MetricType::Histogram,
525            description: desc,
526            unit: "seconds".into(),
527            labels: labels.iter().map(|s| (*s).to_string()).collect(),
528            group: group.into(),
529            buckets: buckets.map(|b| b.to_vec()),
530            use_cases: vec![],
531            dashboard_hint: None,
532        });
533        metrics::histogram!(key)
534    }
535
536    /// Create a histogram metric with custom buckets.
537    ///
538    /// **Note:** The `buckets` parameter is captured in the manifest registry
539    /// but currently ignored by the `metrics` crate at runtime (buckets are set
540    /// globally at recorder installation time).
541    #[must_use]
542    pub fn histogram_with_buckets(
543        &self,
544        name: &str,
545        description: &str,
546        buckets: &[f64],
547    ) -> Histogram {
548        let key = self.prefixed_key(name);
549        let desc = description.to_string();
550        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
551        self.registry.push(MetricDescriptor {
552            name: key.clone(),
553            metric_type: MetricType::Histogram,
554            description: desc,
555            unit: "seconds".into(),
556            labels: vec![],
557            group: "custom".into(),
558            buckets: Some(buckets.to_vec()),
559            use_cases: vec![],
560            dashboard_hint: None,
561        });
562        metrics::histogram!(key)
563    }
564
565    /// Get the Prometheus metrics output.
566    ///
567    /// Returns the rendered Prometheus text format. Only available when
568    /// the `metrics` feature is enabled.
569    #[cfg(feature = "metrics")]
570    #[must_use]
571    pub fn render(&self) -> String {
572        self.handle
573            .as_ref()
574            .map_or_else(String::new, PrometheusHandle::render)
575    }
576
577    /// Get a cloneable render handle for use in route handlers.
578    ///
579    /// Returns a closure that renders the current Prometheus metrics text.
580    /// The closure is `Send + Sync + Clone`, making it safe to move into
581    /// `axum` route handlers or share across tasks via `Arc`.
582    ///
583    /// Returns `None` if no Prometheus recorder is installed.
584    ///
585    /// # Example
586    ///
587    /// ```rust,ignore
588    /// let mut mgr = MetricsManager::new("myapp");
589    /// let render = mgr.render_handle().expect("recorder installed");
590    ///
591    /// // Use in axum route
592    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
593    ///     let r = render.clone();
594    ///     async move { r() }
595    /// }));
596    ///
597    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
598    /// ```
599    #[cfg(feature = "metrics")]
600    #[must_use]
601    pub fn render_handle(&self) -> Option<RenderHandle> {
602        self.handle.clone().map(RenderHandle)
603    }
604
605    /// Set a readiness check callback.
606    ///
607    /// When set, `/readyz` and `/health/ready` call this function and return
608    /// 503 Service Unavailable if it returns `false`. Without a callback,
609    /// these endpoints always return 200.
610    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
611        self.readiness_fn = Some(Arc::new(f));
612    }
613
614    /// Mark the service as started (startup probe passes).
615    ///
616    /// Call this once init is complete. K8s `startupProbe` hits `/startupz`
617    /// which returns 503 until this is called, then 200 thereafter.
618    /// Separate from readiness — startup has a longer timeout for slow starters.
619    pub fn mark_started(&self) {
620        self.started
621            .store(true, std::sync::atomic::Ordering::Release);
622    }
623
624    /// Get a clone of the started flag (for passing to HTTP handler).
625    pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
626        Arc::clone(&self.started)
627    }
628
629    /// Attach a `ScalingPressure` instance.
630    ///
631    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
632    /// endpoint is automatically added that returns the current pressure value.
633    #[cfg(all(feature = "metrics", feature = "scaling"))]
634    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
635        self.scaling_pressure = Some(sp);
636    }
637
638    /// Attach a `MemoryGuard` instance.
639    ///
640    /// When set and using `start_server_with_routes`, a `/memory/pressure`
641    /// endpoint is automatically added that returns the current memory status.
642    #[cfg(all(feature = "metrics", feature = "memory"))]
643    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
644        self.memory_guard = Some(mg);
645    }
646
647    /// Update process and container metrics.
648    pub fn update(&self) {
649        if let Some(ref pm) = self.process_metrics {
650            pm.update();
651        }
652        if let Some(ref cm) = self.container_metrics {
653            cm.update();
654        }
655    }
656
657    /// Start the metrics HTTP server.
658    ///
659    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
660    /// `/readyz`, `/health/ready` endpoints.
661    ///
662    /// Only available when the `metrics` feature is enabled (for scraping).
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the server fails to start.
667    #[cfg(feature = "metrics")]
668    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
669        if self.shutdown_tx.is_some() {
670            return Err(MetricsError::AlreadyRunning);
671        }
672
673        let addr: SocketAddr = addr
674            .parse()
675            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
676
677        let listener = TcpListener::bind(addr)
678            .await
679            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
680
681        let (shutdown_tx, shutdown_rx) = oneshot::channel();
682        self.shutdown_tx = Some(shutdown_tx);
683
684        let handle = self
685            .handle
686            .as_ref()
687            .ok_or_else(|| {
688                MetricsError::ServerError(
689                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
690                )
691            })?
692            .clone();
693        let update_interval = self.config.update_interval;
694        let process_metrics = self.process_metrics.clone();
695        let container_metrics = self.container_metrics.clone();
696        let readiness_fn = self.readiness_fn.clone();
697        let started_flag = self.started_flag();
698
699        let registry = self.registry();
700
701        tokio::spawn(async move {
702            run_server(
703                listener,
704                handle,
705                registry,
706                shutdown_rx,
707                update_interval,
708                process_metrics,
709                container_metrics,
710                readiness_fn,
711                started_flag,
712            )
713            .await;
714        });
715
716        Ok(())
717    }
718
719    /// Start the metrics HTTP server with additional custom routes.
720    ///
721    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
722    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
723    ///
724    /// Additionally:
725    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
726    ///   adds `/scaling/pressure` returning the current pressure value.
727    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
728    ///   adds `/memory/pressure` returning memory status JSON.
729    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
730    ///
731    /// Requires both `metrics` and `http-server` features.
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the server fails to start.
736    #[cfg(all(feature = "metrics", feature = "http-server"))]
737    pub async fn start_server_with_routes(
738        &mut self,
739        addr: &str,
740        extra_routes: axum::Router,
741    ) -> Result<(), MetricsError> {
742        if self.shutdown_tx.is_some() {
743            return Err(MetricsError::AlreadyRunning);
744        }
745
746        let addr: SocketAddr = addr
747            .parse()
748            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
749
750        let listener = TcpListener::bind(addr)
751            .await
752            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
753
754        let (shutdown_tx, shutdown_rx) = oneshot::channel();
755        self.shutdown_tx = Some(shutdown_tx);
756
757        let handle = self
758            .handle
759            .as_ref()
760            .ok_or_else(|| {
761                MetricsError::ServerError(
762                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
763                )
764            })?
765            .clone();
766        let update_interval = self.config.update_interval;
767        let process_metrics = self.process_metrics.clone();
768        let container_metrics = self.container_metrics.clone();
769        let readiness_fn = self.readiness_fn.clone();
770
771        // Build the axum router with built-in + optional + custom routes
772        let metrics_handle = handle.clone();
773        let readiness_for_live = readiness_fn.clone();
774        let registry_handle = self.registry();
775
776        let mut app = axum::Router::new()
777            .route(
778                "/metrics/manifest",
779                axum::routing::get(move || {
780                    let reg = registry_handle.clone();
781                    async move {
782                        (
783                            [(axum::http::header::CONTENT_TYPE, "application/json")],
784                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
785                        )
786                    }
787                }),
788            )
789            .route(
790                "/metrics",
791                axum::routing::get(move || {
792                    let h = metrics_handle.clone();
793                    async move { h.render() }
794                }),
795            )
796            .route("/startupz", {
797                let sf = self.started_flag();
798                axum::routing::get(move || {
799                    let started = sf.load(std::sync::atomic::Ordering::Acquire);
800                    async move {
801                        if started {
802                            (
803                                axum::http::StatusCode::OK,
804                                [(axum::http::header::CONTENT_TYPE, "application/json")],
805                                r#"{"status":"started"}"#,
806                            )
807                        } else {
808                            (
809                                axum::http::StatusCode::SERVICE_UNAVAILABLE,
810                                [(axum::http::header::CONTENT_TYPE, "application/json")],
811                                r#"{"status":"starting"}"#,
812                            )
813                        }
814                    }
815                })
816            })
817            .route(
818                "/healthz",
819                axum::routing::get(|| async {
820                    (
821                        [(axum::http::header::CONTENT_TYPE, "application/json")],
822                        r#"{"status":"alive"}"#,
823                    )
824                }),
825            )
826            .route(
827                "/health/live",
828                axum::routing::get(|| async {
829                    (
830                        [(axum::http::header::CONTENT_TYPE, "application/json")],
831                        r#"{"status":"alive"}"#,
832                    )
833                }),
834            )
835            .route(
836                "/readyz",
837                axum::routing::get(move || {
838                    let rf = readiness_fn.clone();
839                    async move { readiness_response(rf) }
840                }),
841            )
842            .route(
843                "/health/ready",
844                axum::routing::get(move || {
845                    let rf = readiness_for_live.clone();
846                    async move { readiness_response(rf) }
847                }),
848            );
849
850        // Add scaling pressure endpoint if configured
851        #[cfg(feature = "scaling")]
852        if let Some(ref sp) = self.scaling_pressure {
853            let sp = sp.clone();
854            app = app.route(
855                "/scaling/pressure",
856                axum::routing::get(move || {
857                    let s = sp.clone();
858                    async move { format!("{:.2}", s.calculate()) }
859                }),
860            );
861        }
862
863        // Add memory pressure endpoint if configured
864        #[cfg(feature = "memory")]
865        if let Some(ref mg) = self.memory_guard {
866            let mg = mg.clone();
867            app = app.route(
868                "/memory/pressure",
869                axum::routing::get(move || {
870                    let m = mg.clone();
871                    async move {
872                        (
873                            [(axum::http::header::CONTENT_TYPE, "application/json")],
874                            format!(
875                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
876                                m.under_pressure(),
877                                m.pressure_ratio(),
878                                m.current_bytes(),
879                                m.limit_bytes()
880                            ),
881                        )
882                    }
883                }),
884            );
885        }
886
887        // Merge service-specific routes
888        app = app.merge(extra_routes);
889
890        tokio::spawn(async move {
891            run_axum_server(
892                listener,
893                app,
894                shutdown_rx,
895                update_interval,
896                process_metrics,
897                container_metrics,
898            )
899            .await;
900        });
901
902        Ok(())
903    }
904
905    /// Stop the metrics server.
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if the server is not running.
910    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
911        if let Some(tx) = self.shutdown_tx.take() {
912            let _ = tx.send(());
913            Ok(())
914        } else {
915            Err(MetricsError::NotRunning)
916        }
917    }
918
919    /// Gracefully shut down the OTel provider (flushes pending exports).
920    ///
921    /// Call this before application exit to ensure all metrics are exported.
922    #[cfg(feature = "otel-metrics")]
923    pub fn shutdown_otel(&mut self) {
924        if let Some(provider) = self.otel_provider.take()
925            && let Err(e) = provider.shutdown()
926        {
927            tracing::warn!(error = %e, "OTel provider shutdown error");
928        }
929    }
930
931    /// Set application version and git commit for the manifest.
932    ///
933    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
934    /// so only `&self` is needed. Called automatically by
935    /// [`dfe_groups::AppMetrics::new()`] if the `metrics-dfe` feature is enabled.
936    pub fn set_build_info(&self, version: &str, commit: &str) {
937        self.registry.set_build_info(version, commit);
938    }
939
940    /// Set operational use cases for a metric (by full prefixed name).
941    ///
942    /// No-op if the metric is not found in the registry.
943    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
944        self.registry.set_use_cases(metric_name, use_cases);
945    }
946
947    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
948    ///
949    /// No-op if the metric is not found in the registry.
950    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
951        self.registry.set_dashboard_hint(metric_name, hint);
952    }
953
954    /// Get a cloneable handle to the metric registry.
955    ///
956    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
957    /// Call before starting the server, consistent with the `render_handle()` pattern.
958    #[must_use]
959    pub fn registry(&self) -> MetricRegistry {
960        self.registry.clone()
961    }
962
963    /// Get the namespace prefix (e.g. `dfe_loader`).
964    ///
965    /// Used by [`dfe_groups`] metric structs to build labelled metric keys.
966    #[must_use]
967    pub fn namespace(&self) -> &str {
968        &self.config.namespace
969    }
970
971    /// Get prefixed metric name.
972    fn prefixed_key(&self, name: &str) -> String {
973        if self.config.namespace.is_empty() {
974            name.to_string()
975        } else {
976            format!("{}_{}", self.config.namespace, name)
977        }
978    }
979}
980
981/// Run the metrics HTTP server.
982#[cfg(feature = "metrics")]
983#[allow(clippy::too_many_arguments)]
984async fn run_server(
985    listener: TcpListener,
986    handle: PrometheusHandle,
987    registry: MetricRegistry,
988    mut shutdown_rx: oneshot::Receiver<()>,
989    update_interval: Duration,
990    process_metrics: Option<ProcessMetrics>,
991    container_metrics: Option<ContainerMetrics>,
992    readiness_fn: Option<ReadinessFn>,
993    started_flag: Arc<std::sync::atomic::AtomicBool>,
994) {
995    let mut update_interval = tokio::time::interval(update_interval);
996
997    loop {
998        tokio::select! {
999            _ = &mut shutdown_rx => {
1000                break;
1001            }
1002            _ = update_interval.tick() => {
1003                if let Some(ref pm) = process_metrics {
1004                    pm.update();
1005                }
1006                if let Some(ref cm) = container_metrics {
1007                    cm.update();
1008                }
1009            }
1010            result = listener.accept() => {
1011                if let Ok((stream, _)) = result {
1012                    let handle = handle.clone();
1013                    let registry = registry.clone();
1014                    let readiness_fn = readiness_fn.clone();
1015                    let sf = Arc::clone(&started_flag);
1016                    tokio::spawn(async move {
1017                        handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1018                    });
1019                }
1020            }
1021        }
1022    }
1023}
1024
1025/// Handle a single HTTP connection.
1026///
1027/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
1028/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
1029#[cfg(feature = "metrics")]
1030async fn handle_connection(
1031    mut stream: tokio::net::TcpStream,
1032    handle: PrometheusHandle,
1033    registry: MetricRegistry,
1034    readiness_fn: Option<ReadinessFn>,
1035    started_flag: &std::sync::atomic::AtomicBool,
1036) {
1037    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1038
1039    let mut reader = BufReader::new(&mut stream);
1040    let mut request_line = String::new();
1041
1042    if reader.read_line(&mut request_line).await.is_err() {
1043        return;
1044    }
1045
1046    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
1047    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1048        (
1049            "200 OK",
1050            "application/json",
1051            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
1052        )
1053    } else if request_line.starts_with("GET /metrics") {
1054        ("200 OK", "text/plain; charset=utf-8", handle.render())
1055    } else if request_line.starts_with("GET /startupz")
1056        || request_line.starts_with("GET /health/startup")
1057    {
1058        if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1059            (
1060                "200 OK",
1061                "application/json",
1062                r#"{"status":"started"}"#.to_string(),
1063            )
1064        } else {
1065            (
1066                "503 Service Unavailable",
1067                "application/json",
1068                r#"{"status":"starting"}"#.to_string(),
1069            )
1070        }
1071    } else if request_line.starts_with("GET /healthz")
1072        || request_line.starts_with("GET /health/live")
1073    {
1074        (
1075            "200 OK",
1076            "application/json",
1077            r#"{"status":"alive"}"#.to_string(),
1078        )
1079    } else if request_line.starts_with("GET /readyz")
1080        || request_line.starts_with("GET /health/ready")
1081    {
1082        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1083
1084        #[cfg(feature = "health")]
1085        let registry_ready = crate::health::HealthRegistry::is_ready();
1086        #[cfg(not(feature = "health"))]
1087        let registry_ready = true;
1088
1089        let ready = callback_ready && registry_ready;
1090        if ready {
1091            (
1092                "200 OK",
1093                "application/json",
1094                r#"{"status":"ready"}"#.to_string(),
1095            )
1096        } else {
1097            (
1098                "503 Service Unavailable",
1099                "application/json",
1100                r#"{"status":"not_ready"}"#.to_string(),
1101            )
1102        }
1103    } else {
1104        (
1105            "404 Not Found",
1106            "text/plain; charset=utf-8",
1107            "Not Found".to_string(),
1108        )
1109    };
1110
1111    let response = format!(
1112        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1113        body.len()
1114    );
1115
1116    let _ = stream.write_all(response.as_bytes()).await;
1117}
1118
1119/// Readiness response helper for axum endpoints.
1120///
1121/// Checks the caller-supplied readiness callback AND (when the `health`
1122/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1123/// Both must be true for a 200 response.
1124#[cfg(all(feature = "metrics", feature = "http-server"))]
1125fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1126    use axum::response::IntoResponse;
1127
1128    let callback_ready = rf.as_ref().is_none_or(|f| f());
1129
1130    #[cfg(feature = "health")]
1131    let registry_ready = crate::health::HealthRegistry::is_ready();
1132    #[cfg(not(feature = "health"))]
1133    let registry_ready = true;
1134
1135    let ready = callback_ready && registry_ready;
1136    if ready {
1137        (
1138            [(axum::http::header::CONTENT_TYPE, "application/json")],
1139            r#"{"status":"ready"}"#,
1140        )
1141            .into_response()
1142    } else {
1143        (
1144            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1145            [(axum::http::header::CONTENT_TYPE, "application/json")],
1146            r#"{"status":"not_ready"}"#,
1147        )
1148            .into_response()
1149    }
1150}
1151
1152/// Run the axum-based metrics HTTP server with custom routes.
1153#[cfg(all(feature = "metrics", feature = "http-server"))]
1154async fn run_axum_server(
1155    listener: TcpListener,
1156    app: axum::Router,
1157    shutdown_rx: oneshot::Receiver<()>,
1158    update_interval: Duration,
1159    process_metrics: Option<ProcessMetrics>,
1160    container_metrics: Option<ContainerMetrics>,
1161) {
1162    let mut interval = tokio::time::interval(update_interval);
1163
1164    // Spawn the metrics update loop
1165    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1166    tokio::spawn(async move {
1167        loop {
1168            tokio::select! {
1169                _ = &mut update_stop_rx => break,
1170                _ = interval.tick() => {
1171                    if let Some(ref pm) = process_metrics {
1172                        pm.update();
1173                    }
1174                    if let Some(ref cm) = container_metrics {
1175                        cm.update();
1176                    }
1177                }
1178            }
1179        }
1180    });
1181
1182    // Run axum server with graceful shutdown
1183    axum::serve(listener, app)
1184        .with_graceful_shutdown(async move {
1185            let _ = shutdown_rx.await;
1186        })
1187        .await
1188        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1189
1190    let _ = update_stop_tx.send(());
1191}
1192
1193/// Standard latency histogram buckets.
1194#[must_use]
1195pub fn latency_buckets() -> Vec<f64> {
1196    vec![
1197        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1198    ]
1199}
1200
1201/// Standard size histogram buckets.
1202#[must_use]
1203pub fn size_buckets() -> Vec<f64> {
1204    vec![
1205        100.0,
1206        1_000.0,
1207        10_000.0,
1208        100_000.0,
1209        1_000_000.0,
1210        10_000_000.0,
1211    ]
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use super::*;
1217
1218    #[test]
1219    fn test_metrics_config_default() {
1220        let config = MetricsConfig::default();
1221        assert!(config.namespace.is_empty());
1222        assert!(config.enable_process_metrics);
1223        assert!(config.enable_container_metrics);
1224        assert_eq!(config.update_interval, Duration::from_secs(15));
1225    }
1226
1227    #[test]
1228    fn test_latency_buckets() {
1229        let buckets = latency_buckets();
1230        assert_eq!(buckets.len(), 12);
1231        assert!(buckets[0] < buckets[11]);
1232    }
1233
1234    #[test]
1235    fn test_size_buckets() {
1236        let buckets = size_buckets();
1237        assert_eq!(buckets.len(), 6);
1238        assert!(buckets[0] < buckets[5]);
1239    }
1240}