Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! Provides production-ready metrics collection with support for:
12//!
13//! - **`metrics` feature only:** Prometheus scrape endpoint via `/metrics`
14//! - **`otel-metrics` feature only:** OTLP push to OTel-compatible backends
15//! - **Both features:** Fanout recorder sends to both Prometheus AND OTel
16//!
17//! ## Features
18//!
19//! - Counter, Gauge, Histogram metric types
20//! - Automatic process metrics (CPU, memory, file descriptors)
21//! - Container metrics from cgroups (memory limit, CPU limit)
22//! - Built-in HTTP server for `/metrics` endpoint (Prometheus)
23//! - OTLP push to HyperDX, Jaeger, Grafana, etc. (OTel)
24//! - Readiness callback for `/health/ready` endpoints
25//! - Optional scaling pressure endpoint (`/scaling/pressure`)
26//! - Optional memory guard endpoint (`/memory/pressure`)
27//! - Custom route support via [`start_server_with_routes`](MetricsManager::start_server_with_routes)
28//!
29//! ## Basic Example
30//!
31//! ```rust,no_run
32//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
33//!
34//! #[tokio::main]
35//! async fn main() {
36//!     let mut manager = MetricsManager::new("myapp");
37//!
38//!     // Create metrics
39//!     let requests = manager.counter("requests_total", "Total requests");
40//!     let active = manager.gauge("active_connections", "Active connections");
41//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
42//!
43//!     // Start metrics server (simple — built-in endpoints only)
44//!     manager.start_server("0.0.0.0:9090").await.unwrap();
45//!
46//!     // Record metrics
47//!     requests.increment(1);
48//!     active.set(42.0);
49//!     latency.record(0.123);
50//! }
51//! ```
52//!
53//! ## Advanced Example (with custom routes, scaling, memory)
54//!
55//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
56//!
57//! ```rust,ignore
58//! use std::sync::Arc;
59//! use hyperi_rustlib::metrics::MetricsManager;
60//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
61//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
62//! use axum::{Router, routing::post};
63//!
64//! let mut mgr = MetricsManager::new("myapp");
65//!
66//! // Readiness callback
67//! mgr.set_readiness_check(|| true);
68//!
69//! // Attach scaling pressure (adds /scaling/pressure endpoint)
70//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
71//! mgr.set_scaling_pressure(scaling);
72//!
73//! // Attach memory guard (adds /memory/pressure endpoint)
74//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
75//! mgr.set_memory_guard(guard);
76//!
77//! // Service-specific routes
78//! let custom = Router::new()
79//!     .route("/test", post(|| async { "ok" }));
80//!
81//! // Start with everything merged into one server
82//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
83//! ```
84
85mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104/// Readiness check callback type.
105pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120/// Cloneable handle for rendering Prometheus metrics text.
121///
122/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
123/// `axum` route handlers or share across tasks.
124#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130    /// Render current metrics in Prometheus text format.
131    #[must_use]
132    pub fn render(&self) -> String {
133        self.0.render()
134    }
135}
136
137/// Metrics errors.
138#[derive(Debug, Error)]
139pub enum MetricsError {
140    /// Failed to build metrics exporter.
141    #[error("failed to build metrics exporter: {0}")]
142    BuildError(String),
143
144    /// Failed to start metrics server.
145    #[error("failed to start metrics server: {0}")]
146    ServerError(String),
147
148    /// Server already running.
149    #[error("metrics server already running")]
150    AlreadyRunning,
151
152    /// Server not running.
153    #[error("metrics server not running")]
154    NotRunning,
155}
156
157/// Metrics configuration.
158#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160    /// Metric namespace prefix.
161    pub namespace: String,
162    /// Enable process metrics collection.
163    pub enable_process_metrics: bool,
164    /// Enable container metrics collection.
165    pub enable_container_metrics: bool,
166    /// Update interval for auto-collected metrics.
167    pub update_interval: Duration,
168    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
169    #[cfg(feature = "otel-metrics")]
170    pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174    fn default() -> Self {
175        Self {
176            namespace: String::new(),
177            enable_process_metrics: true,
178            enable_container_metrics: true,
179            update_interval: Duration::from_secs(15),
180            #[cfg(feature = "otel-metrics")]
181            otel: OtelMetricsConfig::default(),
182        }
183    }
184}
185
186/// Intermediate struct to pass recorder setup results across cfg boundaries.
187struct RecorderSetup {
188    #[cfg(feature = "metrics")]
189    prom_handle: Option<PrometheusHandle>,
190    #[cfg(feature = "otel-metrics")]
191    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194/// Install the metrics recorder(s) based on enabled features.
195///
196/// Returns setup results containing handles/providers. When both
197/// `metrics` and `otel-metrics` features are enabled, uses `metrics-util`
198/// `FanoutBuilder` to compose both recorders into a single global recorder.
199#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201    // --- Prometheus only (no OTel) ---
202    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203    {
204        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205        let handle = recorder.handle();
206        metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207        RecorderSetup {
208            prom_handle: Some(handle),
209        }
210    }
211
212    // --- OTel only (no Prometheus) ---
213    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214    {
215        match otel::build_otel_recorder(&config.namespace, &config.otel) {
216            Ok((otel_recorder, provider)) => {
217                opentelemetry::global::set_meter_provider(provider.clone());
218                metrics::set_global_recorder(otel_recorder)
219                    .expect("failed to set OTel metrics recorder");
220                RecorderSetup {
221                    otel_provider: Some(provider),
222                }
223            }
224            Err(e) => {
225                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226                RecorderSetup {
227                    otel_provider: None,
228                }
229            }
230        }
231    }
232
233    // --- Both Prometheus + OTel (Fanout) ---
234    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235    {
236        // Build Prometheus recorder (without installing globally)
237        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238        let prom_handle = prom_recorder.handle();
239
240        // Build OTel recorder
241        match otel::build_otel_recorder(&config.namespace, &config.otel) {
242            Ok((otel_recorder, provider)) => {
243                opentelemetry::global::set_meter_provider(provider.clone());
244
245                // Compose via Fanout: both recorders receive every measurement
246                let fanout = metrics_util::layers::FanoutBuilder::default()
247                    .add_recorder(prom_recorder)
248                    .add_recorder(otel_recorder)
249                    .build();
250
251                metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253                RecorderSetup {
254                    prom_handle: Some(prom_handle),
255                    otel_provider: Some(provider),
256                }
257            }
258            Err(e) => {
259                // Fallback: just Prometheus if OTel fails
260                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261                metrics::set_global_recorder(prom_recorder)
262                    .expect("failed to set Prometheus recorder");
263                RecorderSetup {
264                    prom_handle: Some(prom_handle),
265                    otel_provider: None,
266                }
267            }
268        }
269    }
270}
271
272/// Metrics manager handling Prometheus and/or OTel exposition.
273pub struct MetricsManager {
274    #[cfg(feature = "metrics")]
275    handle: Option<PrometheusHandle>,
276    config: MetricsConfig,
277    shutdown_tx: Option<oneshot::Sender<()>>,
278    process_metrics: Option<ProcessMetrics>,
279    container_metrics: Option<ContainerMetrics>,
280    readiness_fn: Option<ReadinessFn>,
281    registry: MetricRegistry,
282    #[cfg(all(feature = "metrics", feature = "scaling"))]
283    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
284    #[cfg(all(feature = "metrics", feature = "memory"))]
285    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
286    #[cfg(feature = "otel-metrics")]
287    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
288}
289
290impl MetricsManager {
291    /// Create a new metrics manager with the given namespace.
292    #[must_use]
293    pub fn new(namespace: &str) -> Self {
294        Self::with_config(MetricsConfig {
295            namespace: namespace.to_string(),
296            ..Default::default()
297        })
298    }
299
300    /// Create a metrics manager with custom configuration.
301    ///
302    /// Installs the appropriate recorder(s) based on enabled features:
303    /// - `metrics` only: Prometheus recorder
304    /// - `otel-metrics` only: OTel recorder (OTLP push)
305    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
306    #[must_use]
307    pub fn with_config(config: MetricsConfig) -> Self {
308        let setup = install_recorders(&config);
309
310        let process_metrics = if config.enable_process_metrics {
311            Some(ProcessMetrics::new(&config.namespace))
312        } else {
313            None
314        };
315
316        let container_metrics = if config.enable_container_metrics {
317            Some(ContainerMetrics::new(&config.namespace))
318        } else {
319            None
320        };
321
322        let registry = MetricRegistry::new(&config.namespace);
323
324        Self {
325            #[cfg(feature = "metrics")]
326            handle: setup.prom_handle,
327            registry,
328            config,
329            shutdown_tx: None,
330            process_metrics,
331            container_metrics,
332            readiness_fn: None,
333            #[cfg(all(feature = "metrics", feature = "scaling"))]
334            scaling_pressure: None,
335            #[cfg(all(feature = "metrics", feature = "memory"))]
336            memory_guard: None,
337            #[cfg(feature = "otel-metrics")]
338            otel_provider: setup.otel_provider,
339        }
340    }
341
342    /// Create a counter metric.
343    ///
344    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
345    /// with empty labels and `group = "custom"`.
346    #[must_use]
347    pub fn counter(&self, name: &str, description: &str) -> Counter {
348        let key = self.prefixed_key(name);
349        let desc = description.to_string();
350        metrics::describe_counter!(key.clone(), desc.clone());
351        self.registry.push(MetricDescriptor {
352            name: key.clone(),
353            metric_type: MetricType::Counter,
354            description: desc,
355            unit: String::new(),
356            labels: vec![],
357            group: "custom".into(),
358            buckets: None,
359            use_cases: vec![],
360            dashboard_hint: None,
361        });
362        metrics::counter!(key)
363    }
364
365    /// Create a counter with label keys and group metadata for the manifest.
366    ///
367    /// The `labels` parameter declares label **key names** for the manifest.
368    /// The returned `Counter` is label-free — apply label values at recording
369    /// time via `metrics::counter!(key, "label" => value)`.
370    #[must_use]
371    pub fn counter_with_labels(
372        &self,
373        name: &str,
374        description: &str,
375        labels: &[&str],
376        group: &str,
377    ) -> Counter {
378        let key = self.prefixed_key(name);
379        let desc = description.to_string();
380        metrics::describe_counter!(key.clone(), desc.clone());
381        self.registry.push(MetricDescriptor {
382            name: key.clone(),
383            metric_type: MetricType::Counter,
384            description: desc,
385            unit: String::new(),
386            labels: labels.iter().map(|s| (*s).to_string()).collect(),
387            group: group.into(),
388            buckets: None,
389            use_cases: vec![],
390            dashboard_hint: None,
391        });
392        metrics::counter!(key)
393    }
394
395    /// Create a gauge metric.
396    ///
397    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
398    /// with empty labels and `group = "custom"`.
399    #[must_use]
400    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
401        let key = self.prefixed_key(name);
402        let desc = description.to_string();
403        metrics::describe_gauge!(key.clone(), desc.clone());
404        self.registry.push(MetricDescriptor {
405            name: key.clone(),
406            metric_type: MetricType::Gauge,
407            description: desc,
408            unit: String::new(),
409            labels: vec![],
410            group: "custom".into(),
411            buckets: None,
412            use_cases: vec![],
413            dashboard_hint: None,
414        });
415        metrics::gauge!(key)
416    }
417
418    /// Create a gauge with label keys and group metadata for the manifest.
419    #[must_use]
420    pub fn gauge_with_labels(
421        &self,
422        name: &str,
423        description: &str,
424        labels: &[&str],
425        group: &str,
426    ) -> Gauge {
427        let key = self.prefixed_key(name);
428        let desc = description.to_string();
429        metrics::describe_gauge!(key.clone(), desc.clone());
430        self.registry.push(MetricDescriptor {
431            name: key.clone(),
432            metric_type: MetricType::Gauge,
433            description: desc,
434            unit: String::new(),
435            labels: labels.iter().map(|s| (*s).to_string()).collect(),
436            group: group.into(),
437            buckets: None,
438            use_cases: vec![],
439            dashboard_hint: None,
440        });
441        metrics::gauge!(key)
442    }
443
444    /// Create a histogram metric with default buckets.
445    ///
446    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
447    /// with empty labels and `group = "custom"`.
448    #[must_use]
449    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
450        let key = self.prefixed_key(name);
451        let desc = description.to_string();
452        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
453        self.registry.push(MetricDescriptor {
454            name: key.clone(),
455            metric_type: MetricType::Histogram,
456            description: desc,
457            unit: "seconds".into(),
458            labels: vec![],
459            group: "custom".into(),
460            buckets: None,
461            use_cases: vec![],
462            dashboard_hint: None,
463        });
464        metrics::histogram!(key)
465    }
466
467    /// Create a histogram with label keys, group, and optional buckets for the manifest.
468    #[must_use]
469    pub fn histogram_with_labels(
470        &self,
471        name: &str,
472        description: &str,
473        labels: &[&str],
474        group: &str,
475        buckets: Option<&[f64]>,
476    ) -> Histogram {
477        let key = self.prefixed_key(name);
478        let desc = description.to_string();
479        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
480        self.registry.push(MetricDescriptor {
481            name: key.clone(),
482            metric_type: MetricType::Histogram,
483            description: desc,
484            unit: "seconds".into(),
485            labels: labels.iter().map(|s| (*s).to_string()).collect(),
486            group: group.into(),
487            buckets: buckets.map(|b| b.to_vec()),
488            use_cases: vec![],
489            dashboard_hint: None,
490        });
491        metrics::histogram!(key)
492    }
493
494    /// Create a histogram metric with custom buckets.
495    ///
496    /// **Note:** The `buckets` parameter is captured in the manifest registry
497    /// but currently ignored by the `metrics` crate at runtime (buckets are set
498    /// globally at recorder installation time).
499    #[must_use]
500    pub fn histogram_with_buckets(
501        &self,
502        name: &str,
503        description: &str,
504        buckets: &[f64],
505    ) -> Histogram {
506        let key = self.prefixed_key(name);
507        let desc = description.to_string();
508        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
509        self.registry.push(MetricDescriptor {
510            name: key.clone(),
511            metric_type: MetricType::Histogram,
512            description: desc,
513            unit: "seconds".into(),
514            labels: vec![],
515            group: "custom".into(),
516            buckets: Some(buckets.to_vec()),
517            use_cases: vec![],
518            dashboard_hint: None,
519        });
520        metrics::histogram!(key)
521    }
522
523    /// Get the Prometheus metrics output.
524    ///
525    /// Returns the rendered Prometheus text format. Only available when
526    /// the `metrics` feature is enabled.
527    #[cfg(feature = "metrics")]
528    #[must_use]
529    pub fn render(&self) -> String {
530        self.handle
531            .as_ref()
532            .map_or_else(String::new, PrometheusHandle::render)
533    }
534
535    /// Get a cloneable render handle for use in route handlers.
536    ///
537    /// Returns a closure that renders the current Prometheus metrics text.
538    /// The closure is `Send + Sync + Clone`, making it safe to move into
539    /// `axum` route handlers or share across tasks via `Arc`.
540    ///
541    /// Returns `None` if no Prometheus recorder is installed.
542    ///
543    /// # Example
544    ///
545    /// ```rust,ignore
546    /// let mut mgr = MetricsManager::new("myapp");
547    /// let render = mgr.render_handle().expect("recorder installed");
548    ///
549    /// // Use in axum route
550    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
551    ///     let r = render.clone();
552    ///     async move { r() }
553    /// }));
554    ///
555    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
556    /// ```
557    #[cfg(feature = "metrics")]
558    #[must_use]
559    pub fn render_handle(&self) -> Option<RenderHandle> {
560        self.handle.clone().map(RenderHandle)
561    }
562
563    /// Set a readiness check callback.
564    ///
565    /// When set, `/readyz` and `/health/ready` call this function and return
566    /// 503 Service Unavailable if it returns `false`. Without a callback,
567    /// these endpoints always return 200.
568    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
569        self.readiness_fn = Some(Arc::new(f));
570    }
571
572    /// Attach a `ScalingPressure` instance.
573    ///
574    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
575    /// endpoint is automatically added that returns the current pressure value.
576    #[cfg(all(feature = "metrics", feature = "scaling"))]
577    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
578        self.scaling_pressure = Some(sp);
579    }
580
581    /// Attach a `MemoryGuard` instance.
582    ///
583    /// When set and using `start_server_with_routes`, a `/memory/pressure`
584    /// endpoint is automatically added that returns the current memory status.
585    #[cfg(all(feature = "metrics", feature = "memory"))]
586    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
587        self.memory_guard = Some(mg);
588    }
589
590    /// Update process and container metrics.
591    pub fn update(&self) {
592        if let Some(ref pm) = self.process_metrics {
593            pm.update();
594        }
595        if let Some(ref cm) = self.container_metrics {
596            cm.update();
597        }
598    }
599
600    /// Start the metrics HTTP server.
601    ///
602    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
603    /// `/readyz`, `/health/ready` endpoints.
604    ///
605    /// Only available when the `metrics` feature is enabled (for scraping).
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if the server fails to start.
610    #[cfg(feature = "metrics")]
611    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
612        if self.shutdown_tx.is_some() {
613            return Err(MetricsError::AlreadyRunning);
614        }
615
616        let addr: SocketAddr = addr
617            .parse()
618            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
619
620        let listener = TcpListener::bind(addr)
621            .await
622            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
623
624        let (shutdown_tx, shutdown_rx) = oneshot::channel();
625        self.shutdown_tx = Some(shutdown_tx);
626
627        let handle = self
628            .handle
629            .as_ref()
630            .ok_or_else(|| {
631                MetricsError::ServerError(
632                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
633                )
634            })?
635            .clone();
636        let update_interval = self.config.update_interval;
637        let process_metrics = self.process_metrics.clone();
638        let container_metrics = self.container_metrics.clone();
639        let readiness_fn = self.readiness_fn.clone();
640
641        let registry = self.registry();
642
643        tokio::spawn(async move {
644            run_server(
645                listener,
646                handle,
647                registry,
648                shutdown_rx,
649                update_interval,
650                process_metrics,
651                container_metrics,
652                readiness_fn,
653            )
654            .await;
655        });
656
657        Ok(())
658    }
659
660    /// Start the metrics HTTP server with additional custom routes.
661    ///
662    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
663    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
664    ///
665    /// Additionally:
666    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
667    ///   adds `/scaling/pressure` returning the current pressure value.
668    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
669    ///   adds `/memory/pressure` returning memory status JSON.
670    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
671    ///
672    /// Requires both `metrics` and `http-server` features.
673    ///
674    /// # Errors
675    ///
676    /// Returns an error if the server fails to start.
677    #[cfg(all(feature = "metrics", feature = "http-server"))]
678    pub async fn start_server_with_routes(
679        &mut self,
680        addr: &str,
681        extra_routes: axum::Router,
682    ) -> Result<(), MetricsError> {
683        if self.shutdown_tx.is_some() {
684            return Err(MetricsError::AlreadyRunning);
685        }
686
687        let addr: SocketAddr = addr
688            .parse()
689            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
690
691        let listener = TcpListener::bind(addr)
692            .await
693            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
694
695        let (shutdown_tx, shutdown_rx) = oneshot::channel();
696        self.shutdown_tx = Some(shutdown_tx);
697
698        let handle = self
699            .handle
700            .as_ref()
701            .ok_or_else(|| {
702                MetricsError::ServerError(
703                    "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
704                )
705            })?
706            .clone();
707        let update_interval = self.config.update_interval;
708        let process_metrics = self.process_metrics.clone();
709        let container_metrics = self.container_metrics.clone();
710        let readiness_fn = self.readiness_fn.clone();
711
712        // Build the axum router with built-in + optional + custom routes
713        let metrics_handle = handle.clone();
714        let readiness_for_live = readiness_fn.clone();
715        let registry_handle = self.registry();
716
717        let mut app = axum::Router::new()
718            .route(
719                "/metrics/manifest",
720                axum::routing::get(move || {
721                    let reg = registry_handle.clone();
722                    async move {
723                        (
724                            [(axum::http::header::CONTENT_TYPE, "application/json")],
725                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
726                        )
727                    }
728                }),
729            )
730            .route(
731                "/metrics",
732                axum::routing::get(move || {
733                    let h = metrics_handle.clone();
734                    async move { h.render() }
735                }),
736            )
737            .route(
738                "/healthz",
739                axum::routing::get(|| async {
740                    (
741                        [(axum::http::header::CONTENT_TYPE, "application/json")],
742                        r#"{"status":"alive"}"#,
743                    )
744                }),
745            )
746            .route(
747                "/health/live",
748                axum::routing::get(|| async {
749                    (
750                        [(axum::http::header::CONTENT_TYPE, "application/json")],
751                        r#"{"status":"alive"}"#,
752                    )
753                }),
754            )
755            .route(
756                "/readyz",
757                axum::routing::get(move || {
758                    let rf = readiness_fn.clone();
759                    async move { readiness_response(rf) }
760                }),
761            )
762            .route(
763                "/health/ready",
764                axum::routing::get(move || {
765                    let rf = readiness_for_live.clone();
766                    async move { readiness_response(rf) }
767                }),
768            );
769
770        // Add scaling pressure endpoint if configured
771        #[cfg(feature = "scaling")]
772        if let Some(ref sp) = self.scaling_pressure {
773            let sp = sp.clone();
774            app = app.route(
775                "/scaling/pressure",
776                axum::routing::get(move || {
777                    let s = sp.clone();
778                    async move { format!("{:.2}", s.calculate()) }
779                }),
780            );
781        }
782
783        // Add memory pressure endpoint if configured
784        #[cfg(feature = "memory")]
785        if let Some(ref mg) = self.memory_guard {
786            let mg = mg.clone();
787            app = app.route(
788                "/memory/pressure",
789                axum::routing::get(move || {
790                    let m = mg.clone();
791                    async move {
792                        (
793                            [(axum::http::header::CONTENT_TYPE, "application/json")],
794                            format!(
795                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
796                                m.under_pressure(),
797                                m.pressure_ratio(),
798                                m.current_bytes(),
799                                m.limit_bytes()
800                            ),
801                        )
802                    }
803                }),
804            );
805        }
806
807        // Merge service-specific routes
808        app = app.merge(extra_routes);
809
810        tokio::spawn(async move {
811            run_axum_server(
812                listener,
813                app,
814                shutdown_rx,
815                update_interval,
816                process_metrics,
817                container_metrics,
818            )
819            .await;
820        });
821
822        Ok(())
823    }
824
825    /// Stop the metrics server.
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if the server is not running.
830    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
831        if let Some(tx) = self.shutdown_tx.take() {
832            let _ = tx.send(());
833            Ok(())
834        } else {
835            Err(MetricsError::NotRunning)
836        }
837    }
838
839    /// Gracefully shut down the OTel provider (flushes pending exports).
840    ///
841    /// Call this before application exit to ensure all metrics are exported.
842    #[cfg(feature = "otel-metrics")]
843    pub fn shutdown_otel(&mut self) {
844        if let Some(provider) = self.otel_provider.take()
845            && let Err(e) = provider.shutdown()
846        {
847            tracing::warn!(error = %e, "OTel provider shutdown error");
848        }
849    }
850
851    /// Set application version and git commit for the manifest.
852    ///
853    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
854    /// so only `&self` is needed. Called automatically by
855    /// [`dfe_groups::AppMetrics::new()`] if the `metrics-dfe` feature is enabled.
856    pub fn set_build_info(&self, version: &str, commit: &str) {
857        self.registry.set_build_info(version, commit);
858    }
859
860    /// Set operational use cases for a metric (by full prefixed name).
861    ///
862    /// No-op if the metric is not found in the registry.
863    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
864        self.registry.set_use_cases(metric_name, use_cases);
865    }
866
867    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
868    ///
869    /// No-op if the metric is not found in the registry.
870    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
871        self.registry.set_dashboard_hint(metric_name, hint);
872    }
873
874    /// Get a cloneable handle to the metric registry.
875    ///
876    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
877    /// Call before starting the server, consistent with the `render_handle()` pattern.
878    #[must_use]
879    pub fn registry(&self) -> MetricRegistry {
880        self.registry.clone()
881    }
882
883    /// Get the namespace prefix (e.g. `dfe_loader`).
884    ///
885    /// Used by [`dfe_groups`] metric structs to build labelled metric keys.
886    #[must_use]
887    pub fn namespace(&self) -> &str {
888        &self.config.namespace
889    }
890
891    /// Get prefixed metric name.
892    fn prefixed_key(&self, name: &str) -> String {
893        if self.config.namespace.is_empty() {
894            name.to_string()
895        } else {
896            format!("{}_{}", self.config.namespace, name)
897        }
898    }
899}
900
901/// Run the metrics HTTP server.
902#[cfg(feature = "metrics")]
903#[allow(clippy::too_many_arguments)]
904async fn run_server(
905    listener: TcpListener,
906    handle: PrometheusHandle,
907    registry: MetricRegistry,
908    mut shutdown_rx: oneshot::Receiver<()>,
909    update_interval: Duration,
910    process_metrics: Option<ProcessMetrics>,
911    container_metrics: Option<ContainerMetrics>,
912    readiness_fn: Option<ReadinessFn>,
913) {
914    let mut update_interval = tokio::time::interval(update_interval);
915
916    loop {
917        tokio::select! {
918            _ = &mut shutdown_rx => {
919                break;
920            }
921            _ = update_interval.tick() => {
922                if let Some(ref pm) = process_metrics {
923                    pm.update();
924                }
925                if let Some(ref cm) = container_metrics {
926                    cm.update();
927                }
928            }
929            result = listener.accept() => {
930                if let Ok((stream, _)) = result {
931                    let handle = handle.clone();
932                    let registry = registry.clone();
933                    let readiness_fn = readiness_fn.clone();
934                    tokio::spawn(async move {
935                        handle_connection(stream, handle, registry, readiness_fn).await;
936                    });
937                }
938            }
939        }
940    }
941}
942
943/// Handle a single HTTP connection.
944///
945/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
946/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
947#[cfg(feature = "metrics")]
948async fn handle_connection(
949    mut stream: tokio::net::TcpStream,
950    handle: PrometheusHandle,
951    registry: MetricRegistry,
952    readiness_fn: Option<ReadinessFn>,
953) {
954    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
955
956    let mut reader = BufReader::new(&mut stream);
957    let mut request_line = String::new();
958
959    if reader.read_line(&mut request_line).await.is_err() {
960        return;
961    }
962
963    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
964    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
965        (
966            "200 OK",
967            "application/json",
968            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
969        )
970    } else if request_line.starts_with("GET /metrics") {
971        ("200 OK", "text/plain; charset=utf-8", handle.render())
972    } else if request_line.starts_with("GET /healthz")
973        || request_line.starts_with("GET /health/live")
974    {
975        (
976            "200 OK",
977            "application/json",
978            r#"{"status":"alive"}"#.to_string(),
979        )
980    } else if request_line.starts_with("GET /readyz")
981        || request_line.starts_with("GET /health/ready")
982    {
983        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
984
985        #[cfg(feature = "health")]
986        let registry_ready = crate::health::HealthRegistry::is_ready();
987        #[cfg(not(feature = "health"))]
988        let registry_ready = true;
989
990        let ready = callback_ready && registry_ready;
991        if ready {
992            (
993                "200 OK",
994                "application/json",
995                r#"{"status":"ready"}"#.to_string(),
996            )
997        } else {
998            (
999                "503 Service Unavailable",
1000                "application/json",
1001                r#"{"status":"not_ready"}"#.to_string(),
1002            )
1003        }
1004    } else {
1005        (
1006            "404 Not Found",
1007            "text/plain; charset=utf-8",
1008            "Not Found".to_string(),
1009        )
1010    };
1011
1012    let response = format!(
1013        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1014        body.len()
1015    );
1016
1017    let _ = stream.write_all(response.as_bytes()).await;
1018}
1019
1020/// Readiness response helper for axum endpoints.
1021///
1022/// Checks the caller-supplied readiness callback AND (when the `health`
1023/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1024/// Both must be true for a 200 response.
1025#[cfg(all(feature = "metrics", feature = "http-server"))]
1026fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1027    use axum::response::IntoResponse;
1028
1029    let callback_ready = rf.as_ref().is_none_or(|f| f());
1030
1031    #[cfg(feature = "health")]
1032    let registry_ready = crate::health::HealthRegistry::is_ready();
1033    #[cfg(not(feature = "health"))]
1034    let registry_ready = true;
1035
1036    let ready = callback_ready && registry_ready;
1037    if ready {
1038        (
1039            [(axum::http::header::CONTENT_TYPE, "application/json")],
1040            r#"{"status":"ready"}"#,
1041        )
1042            .into_response()
1043    } else {
1044        (
1045            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1046            [(axum::http::header::CONTENT_TYPE, "application/json")],
1047            r#"{"status":"not_ready"}"#,
1048        )
1049            .into_response()
1050    }
1051}
1052
1053/// Run the axum-based metrics HTTP server with custom routes.
1054#[cfg(all(feature = "metrics", feature = "http-server"))]
1055async fn run_axum_server(
1056    listener: TcpListener,
1057    app: axum::Router,
1058    shutdown_rx: oneshot::Receiver<()>,
1059    update_interval: Duration,
1060    process_metrics: Option<ProcessMetrics>,
1061    container_metrics: Option<ContainerMetrics>,
1062) {
1063    let mut interval = tokio::time::interval(update_interval);
1064
1065    // Spawn the metrics update loop
1066    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1067    tokio::spawn(async move {
1068        loop {
1069            tokio::select! {
1070                _ = &mut update_stop_rx => break,
1071                _ = interval.tick() => {
1072                    if let Some(ref pm) = process_metrics {
1073                        pm.update();
1074                    }
1075                    if let Some(ref cm) = container_metrics {
1076                        cm.update();
1077                    }
1078                }
1079            }
1080        }
1081    });
1082
1083    // Run axum server with graceful shutdown
1084    axum::serve(listener, app)
1085        .with_graceful_shutdown(async move {
1086            let _ = shutdown_rx.await;
1087        })
1088        .await
1089        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1090
1091    let _ = update_stop_tx.send(());
1092}
1093
1094/// Standard latency histogram buckets.
1095#[must_use]
1096pub fn latency_buckets() -> Vec<f64> {
1097    vec![
1098        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1099    ]
1100}
1101
1102/// Standard size histogram buckets.
1103#[must_use]
1104pub fn size_buckets() -> Vec<f64> {
1105    vec![
1106        100.0,
1107        1_000.0,
1108        10_000.0,
1109        100_000.0,
1110        1_000_000.0,
1111        10_000_000.0,
1112    ]
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118
1119    #[test]
1120    fn test_metrics_config_default() {
1121        let config = MetricsConfig::default();
1122        assert!(config.namespace.is_empty());
1123        assert!(config.enable_process_metrics);
1124        assert!(config.enable_container_metrics);
1125        assert_eq!(config.update_interval, Duration::from_secs(15));
1126    }
1127
1128    #[test]
1129    fn test_latency_buckets() {
1130        let buckets = latency_buckets();
1131        assert_eq!(buckets.len(), 12);
1132        assert!(buckets[0] < buckets[11]);
1133    }
1134
1135    #[test]
1136    fn test_size_buckets() {
1137        let buckets = size_buckets();
1138        assert_eq!(buckets.len(), 6);
1139        assert!(buckets[0] < buckets[5]);
1140    }
1141}