Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! - **`metrics` only:** Prometheus scrape endpoint via `/metrics`
12//! - **`otel-metrics` only:** OTLP push to OTel-compatible backends
13//! - **Both:** Fanout recorder sends to both Prometheus AND OTel
14//!
15//! Counter/Gauge/Histogram types, automatic process + cgroup container metrics,
16//! built-in HTTP server, readiness/startup probes, optional scaling-pressure and
17//! memory-guard endpoints, custom routes via
18//! `MetricsManager::start_server_with_routes` (`http-server` feature).
19//!
20//! ## Basic Example
21//!
22//! ```rust,no_run
23//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     let mut manager = MetricsManager::new("myapp");
28//!
29//!     // Create metrics
30//!     let requests = manager.counter("requests_total", "Total requests");
31//!     let active = manager.gauge("active_connections", "Active connections");
32//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
33//!
34//!     // Start metrics server (simple -- built-in endpoints only)
35//!     manager.start_server("0.0.0.0:9090").await.unwrap();
36//!
37//!     // Record metrics
38//!     requests.increment(1);
39//!     active.set(42.0);
40//!     latency.record(0.123);
41//! }
42//! ```
43//!
44//! ## Advanced Example (with custom routes, scaling, memory)
45//!
46//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
47//!
48//! ```rust,ignore
49//! use std::sync::Arc;
50//! use hyperi_rustlib::metrics::MetricsManager;
51//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
52//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
53//! use axum::{Router, routing::post};
54//!
55//! let mut mgr = MetricsManager::new("myapp");
56//!
57//! // Readiness callback
58//! mgr.set_readiness_check(|| true);
59//!
60//! // Attach scaling pressure (adds /scaling/pressure endpoint)
61//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
62//! mgr.set_scaling_pressure(scaling);
63//!
64//! // Attach memory guard (adds /memory/pressure endpoint)
65//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
66//! mgr.set_memory_guard(guard);
67//!
68//! // Service-specific routes
69//! let custom = Router::new()
70//!     .route("/test", post(|| async { "ok" }));
71//!
72//! // Start with everything merged into one server
73//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
74//! ```
75
76mod container;
77pub mod dfe;
78pub mod labels;
79pub mod manifest;
80mod process;
81
82pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
83
84#[cfg(feature = "otel-metrics")]
85pub(crate) mod otel;
86#[cfg(feature = "otel-metrics")]
87pub mod otel_types;
88
89use std::net::SocketAddr;
90use std::sync::Arc;
91use std::time::Duration;
92
93use metrics::{Counter, Gauge, Histogram, Unit};
94use thiserror::Error;
95use tokio::net::TcpListener;
96use tokio::sync::oneshot;
97
98/// Readiness check callback type.
99pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
100
101#[cfg(feature = "metrics")]
102use metrics_exporter_prometheus::PrometheusHandle;
103
104pub use container::ContainerMetrics;
105pub use dfe::DfeMetrics;
106#[cfg(feature = "metrics-dfe")]
107pub mod dfe_groups;
108pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
109pub use process::ProcessMetrics;
110
111#[cfg(feature = "otel-metrics")]
112pub use otel_types::{OtelMetricsConfig, OtelProtocol};
113
114/// Cloneable handle for rendering Prometheus metrics text.
115///
116/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
117/// `axum` route handlers or share across tasks.
118#[cfg(feature = "metrics")]
119#[derive(Clone)]
120pub struct RenderHandle(PrometheusHandle);
121
122#[cfg(feature = "metrics")]
123impl RenderHandle {
124    /// Render current metrics in Prometheus text format.
125    #[must_use]
126    pub fn render(&self) -> String {
127        self.0.render()
128    }
129}
130
131/// Metrics errors.
132#[derive(Debug, Error)]
133pub enum MetricsError {
134    /// Failed to build metrics exporter.
135    #[error("failed to build metrics exporter: {0}")]
136    BuildError(String),
137
138    /// Failed to start metrics server.
139    #[error("failed to start metrics server: {0}")]
140    ServerError(String),
141
142    /// Server already running.
143    #[error("metrics server already running")]
144    AlreadyRunning,
145
146    /// Server not running.
147    #[error("metrics server not running")]
148    NotRunning,
149}
150
151/// Metrics configuration.
152#[derive(Debug, Clone)]
153pub struct MetricsConfig {
154    /// Metric namespace prefix.
155    pub namespace: String,
156    /// Enable process metrics collection.
157    pub enable_process_metrics: bool,
158    /// Enable container metrics collection.
159    pub enable_container_metrics: bool,
160    /// Update interval for auto-collected metrics.
161    pub update_interval: Duration,
162    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
163    #[cfg(feature = "otel-metrics")]
164    pub otel: OtelMetricsConfig,
165}
166
167impl Default for MetricsConfig {
168    fn default() -> Self {
169        Self {
170            namespace: String::new(),
171            enable_process_metrics: true,
172            enable_container_metrics: true,
173            update_interval: Duration::from_secs(15),
174            #[cfg(feature = "otel-metrics")]
175            otel: OtelMetricsConfig::default(),
176        }
177    }
178}
179
180/// Intermediate struct to pass recorder setup results across cfg boundaries.
181struct RecorderSetup {
182    #[cfg(feature = "metrics")]
183    prom_handle: Option<PrometheusHandle>,
184    #[cfg(feature = "otel-metrics")]
185    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
186}
187
188/// Install the metrics recorder(s) for the enabled features. When both
189/// `metrics` and `otel-metrics` are on, composes both into one global
190/// recorder via `metrics-util` `FanoutBuilder`.
191#[allow(unused_variables)]
192fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
193    // --- Prometheus only (no OTel) ---
194    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
195    {
196        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
197        let handle = recorder.handle();
198        if let Err(e) = metrics::set_global_recorder(recorder) {
199            tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
200        }
201        RecorderSetup {
202            prom_handle: Some(handle),
203        }
204    }
205
206    // --- OTel only (no Prometheus) ---
207    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
208    {
209        match otel::build_otel_recorder(&config.namespace, &config.otel) {
210            Ok((otel_recorder, provider)) => {
211                opentelemetry::global::set_meter_provider(provider.clone());
212                if let Err(e) = metrics::set_global_recorder(otel_recorder) {
213                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
214                }
215                RecorderSetup {
216                    otel_provider: Some(provider),
217                }
218            }
219            Err(e) => {
220                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
221                RecorderSetup {
222                    otel_provider: None,
223                }
224            }
225        }
226    }
227
228    // --- Both Prometheus + OTel (Fanout) ---
229    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
230    {
231        // Build Prometheus recorder (without installing globally)
232        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
233        let prom_handle = prom_recorder.handle();
234
235        // Build OTel recorder
236        match otel::build_otel_recorder(&config.namespace, &config.otel) {
237            Ok((otel_recorder, provider)) => {
238                opentelemetry::global::set_meter_provider(provider.clone());
239
240                // Compose via Fanout: both recorders receive every measurement
241                let fanout = metrics_util::layers::FanoutBuilder::default()
242                    .add_recorder(prom_recorder)
243                    .add_recorder(otel_recorder)
244                    .build();
245
246                if let Err(e) = metrics::set_global_recorder(fanout) {
247                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
248                }
249
250                RecorderSetup {
251                    prom_handle: Some(prom_handle),
252                    otel_provider: Some(provider),
253                }
254            }
255            Err(e) => {
256                // Fallback: just Prometheus if OTel fails
257                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
258                if let Err(e) = metrics::set_global_recorder(prom_recorder) {
259                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
260                }
261                RecorderSetup {
262                    prom_handle: Some(prom_handle),
263                    otel_provider: None,
264                }
265            }
266        }
267    }
268}
269
270/// Metrics manager handling Prometheus and/or OTel exposition.
271pub struct MetricsManager {
272    #[cfg(feature = "metrics")]
273    handle: Option<PrometheusHandle>,
274    config: MetricsConfig,
275    shutdown_tx: Option<oneshot::Sender<()>>,
276    process_metrics: Option<ProcessMetrics>,
277    container_metrics: Option<ContainerMetrics>,
278    readiness_fn: Option<ReadinessFn>,
279    started: Arc<std::sync::atomic::AtomicBool>,
280    registry: MetricRegistry,
281    #[cfg(all(feature = "metrics", feature = "scaling"))]
282    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
283    #[cfg(all(feature = "metrics", feature = "memory"))]
284    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
285    #[cfg(feature = "otel-metrics")]
286    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
287}
288
289impl MetricsManager {
290    /// Create a new metrics manager with the given namespace.
291    #[must_use]
292    pub fn new(namespace: &str) -> Self {
293        Self::with_config(MetricsConfig {
294            namespace: namespace.to_string(),
295            ..Default::default()
296        })
297    }
298
299    /// Test constructor that skips the global recorder install.
300    ///
301    /// The global recorder installs once per process; parallel `new()` calls
302    /// race and all but the first panic with `SetRecorderError`. Skipping it
303    /// makes `metrics!` macros no-ops (documented behaviour with no recorder)
304    /// while registry tracking, descriptor push, and namespace logic -- what
305    /// tests actually verify -- still work.
306    #[cfg(test)]
307    pub(crate) fn new_for_test(namespace: &str) -> Self {
308        let config = MetricsConfig {
309            namespace: namespace.to_string(),
310            enable_process_metrics: false,
311            enable_container_metrics: false,
312            ..Default::default()
313        };
314
315        let registry = MetricRegistry::new(&config.namespace);
316
317        Self {
318            #[cfg(feature = "metrics")]
319            handle: None,
320            registry,
321            config,
322            shutdown_tx: None,
323            process_metrics: None,
324            container_metrics: None,
325            readiness_fn: None,
326            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
327            #[cfg(all(feature = "metrics", feature = "scaling"))]
328            scaling_pressure: None,
329            #[cfg(all(feature = "metrics", feature = "memory"))]
330            memory_guard: None,
331            #[cfg(feature = "otel-metrics")]
332            otel_provider: None,
333        }
334    }
335
336    /// Create a metrics manager with custom configuration.
337    ///
338    /// Installs the appropriate recorder(s) based on enabled features:
339    /// - `metrics` only: Prometheus recorder
340    /// - `otel-metrics` only: OTel recorder (OTLP push)
341    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
342    #[must_use]
343    pub fn with_config(config: MetricsConfig) -> Self {
344        let setup = install_recorders(&config);
345
346        let process_metrics = if config.enable_process_metrics {
347            Some(ProcessMetrics::new(&config.namespace))
348        } else {
349            None
350        };
351
352        let container_metrics = if config.enable_container_metrics {
353            Some(ContainerMetrics::new(&config.namespace))
354        } else {
355            None
356        };
357
358        let registry = MetricRegistry::new(&config.namespace);
359
360        Self {
361            #[cfg(feature = "metrics")]
362            handle: setup.prom_handle,
363            registry,
364            config,
365            shutdown_tx: None,
366            process_metrics,
367            container_metrics,
368            readiness_fn: None,
369            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
370            #[cfg(all(feature = "metrics", feature = "scaling"))]
371            scaling_pressure: None,
372            #[cfg(all(feature = "metrics", feature = "memory"))]
373            memory_guard: None,
374            #[cfg(feature = "otel-metrics")]
375            otel_provider: setup.otel_provider,
376        }
377    }
378
379    /// Create a counter metric.
380    ///
381    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
382    /// with empty labels and `group = "custom"`.
383    #[must_use]
384    pub fn counter(&self, name: &str, description: &str) -> Counter {
385        let key = self.prefixed_key(name);
386        let desc = description.to_string();
387        metrics::describe_counter!(key.clone(), desc.clone());
388        self.registry.push(MetricDescriptor {
389            name: key.clone(),
390            metric_type: MetricType::Counter,
391            description: desc,
392            unit: String::new(),
393            labels: vec![],
394            group: "custom".into(),
395            buckets: None,
396            use_cases: vec![],
397            dashboard_hint: None,
398        });
399        metrics::counter!(key)
400    }
401
402    /// Create a counter with label keys and group metadata for the manifest.
403    ///
404    /// The `labels` parameter declares label **key names** for the manifest.
405    /// The returned `Counter` is label-free -- apply label values at recording
406    /// time via `metrics::counter!(key, "label" => value)`.
407    #[must_use]
408    pub fn counter_with_labels(
409        &self,
410        name: &str,
411        description: &str,
412        labels: &[&str],
413        group: &str,
414    ) -> Counter {
415        let key = self.prefixed_key(name);
416        let desc = description.to_string();
417        metrics::describe_counter!(key.clone(), desc.clone());
418        self.registry.push(MetricDescriptor {
419            name: key.clone(),
420            metric_type: MetricType::Counter,
421            description: desc,
422            unit: String::new(),
423            labels: labels.iter().map(|s| (*s).to_string()).collect(),
424            group: group.into(),
425            buckets: None,
426            use_cases: vec![],
427            dashboard_hint: None,
428        });
429        metrics::counter!(key)
430    }
431
432    /// Create a gauge metric.
433    ///
434    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
435    /// with empty labels and `group = "custom"`.
436    #[must_use]
437    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
438        let key = self.prefixed_key(name);
439        let desc = description.to_string();
440        metrics::describe_gauge!(key.clone(), desc.clone());
441        self.registry.push(MetricDescriptor {
442            name: key.clone(),
443            metric_type: MetricType::Gauge,
444            description: desc,
445            unit: String::new(),
446            labels: vec![],
447            group: "custom".into(),
448            buckets: None,
449            use_cases: vec![],
450            dashboard_hint: None,
451        });
452        metrics::gauge!(key)
453    }
454
455    /// Create a gauge with label keys and group metadata for the manifest.
456    #[must_use]
457    pub fn gauge_with_labels(
458        &self,
459        name: &str,
460        description: &str,
461        labels: &[&str],
462        group: &str,
463    ) -> Gauge {
464        let key = self.prefixed_key(name);
465        let desc = description.to_string();
466        metrics::describe_gauge!(key.clone(), desc.clone());
467        self.registry.push(MetricDescriptor {
468            name: key.clone(),
469            metric_type: MetricType::Gauge,
470            description: desc,
471            unit: String::new(),
472            labels: labels.iter().map(|s| (*s).to_string()).collect(),
473            group: group.into(),
474            buckets: None,
475            use_cases: vec![],
476            dashboard_hint: None,
477        });
478        metrics::gauge!(key)
479    }
480
481    /// Create a histogram metric with default buckets.
482    ///
483    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
484    /// with empty labels and `group = "custom"`.
485    #[must_use]
486    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
487        let key = self.prefixed_key(name);
488        let desc = description.to_string();
489        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
490        self.registry.push(MetricDescriptor {
491            name: key.clone(),
492            metric_type: MetricType::Histogram,
493            description: desc,
494            unit: "seconds".into(),
495            labels: vec![],
496            group: "custom".into(),
497            buckets: None,
498            use_cases: vec![],
499            dashboard_hint: None,
500        });
501        metrics::histogram!(key)
502    }
503
504    /// Create a histogram with label keys, group, and optional buckets for the manifest.
505    #[must_use]
506    pub fn histogram_with_labels(
507        &self,
508        name: &str,
509        description: &str,
510        labels: &[&str],
511        group: &str,
512        buckets: Option<&[f64]>,
513    ) -> Histogram {
514        let key = self.prefixed_key(name);
515        let desc = description.to_string();
516        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
517        self.registry.push(MetricDescriptor {
518            name: key.clone(),
519            metric_type: MetricType::Histogram,
520            description: desc,
521            unit: "seconds".into(),
522            labels: labels.iter().map(|s| (*s).to_string()).collect(),
523            group: group.into(),
524            buckets: buckets.map(|b| b.to_vec()),
525            use_cases: vec![],
526            dashboard_hint: None,
527        });
528        metrics::histogram!(key)
529    }
530
531    /// Create a histogram metric with custom buckets.
532    ///
533    /// **Note:** The `buckets` parameter is captured in the manifest registry
534    /// but currently ignored by the `metrics` crate at runtime (buckets are set
535    /// globally at recorder installation time).
536    #[must_use]
537    pub fn histogram_with_buckets(
538        &self,
539        name: &str,
540        description: &str,
541        buckets: &[f64],
542    ) -> Histogram {
543        let key = self.prefixed_key(name);
544        let desc = description.to_string();
545        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
546        self.registry.push(MetricDescriptor {
547            name: key.clone(),
548            metric_type: MetricType::Histogram,
549            description: desc,
550            unit: "seconds".into(),
551            labels: vec![],
552            group: "custom".into(),
553            buckets: Some(buckets.to_vec()),
554            use_cases: vec![],
555            dashboard_hint: None,
556        });
557        metrics::histogram!(key)
558    }
559
560    /// Get the Prometheus metrics output.
561    ///
562    /// Returns the rendered Prometheus text format. Only available when
563    /// the `metrics` feature is enabled.
564    #[cfg(feature = "metrics")]
565    #[must_use]
566    pub fn render(&self) -> String {
567        self.handle
568            .as_ref()
569            .map_or_else(String::new, PrometheusHandle::render)
570    }
571
572    /// Cloneable render handle for route handlers. `Send + Sync + Clone` --
573    /// safe to move into `axum` handlers or share across tasks.
574    ///
575    /// Returns `None` if no Prometheus recorder is installed.
576    ///
577    /// # Example
578    ///
579    /// ```rust,ignore
580    /// let mut mgr = MetricsManager::new("myapp");
581    /// let render = mgr.render_handle().expect("recorder installed");
582    ///
583    /// // Use in axum route
584    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
585    ///     let r = render.clone();
586    ///     async move { r() }
587    /// }));
588    ///
589    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
590    /// ```
591    #[cfg(feature = "metrics")]
592    #[must_use]
593    pub fn render_handle(&self) -> Option<RenderHandle> {
594        self.handle.clone().map(RenderHandle)
595    }
596
597    /// Set a readiness check callback.
598    ///
599    /// When set, `/readyz` and `/health/ready` call this function and return
600    /// 503 Service Unavailable if it returns `false`. Without a callback,
601    /// these endpoints always return 200.
602    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
603        self.readiness_fn = Some(Arc::new(f));
604    }
605
606    /// Mark the service as started (startup probe passes).
607    ///
608    /// Call this once init is complete. K8s `startupProbe` hits `/startupz`
609    /// which returns 503 until this is called, then 200 thereafter.
610    /// Separate from readiness -- startup has a longer timeout for slow starters.
611    pub fn mark_started(&self) {
612        self.started
613            .store(true, std::sync::atomic::Ordering::Release);
614    }
615
616    /// Get a clone of the started flag (for passing to HTTP handler).
617    pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
618        Arc::clone(&self.started)
619    }
620
621    /// Attach a `ScalingPressure` instance.
622    ///
623    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
624    /// endpoint is automatically added that returns the current pressure value.
625    #[cfg(all(feature = "metrics", feature = "scaling"))]
626    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
627        self.scaling_pressure = Some(sp);
628    }
629
630    /// Attach a `MemoryGuard` instance.
631    ///
632    /// When set and using `start_server_with_routes`, a `/memory/pressure`
633    /// endpoint is automatically added that returns the current memory status.
634    #[cfg(all(feature = "metrics", feature = "memory"))]
635    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
636        self.memory_guard = Some(mg);
637    }
638
639    /// Update process and container metrics.
640    pub fn update(&self) {
641        if let Some(ref pm) = self.process_metrics {
642            pm.update();
643        }
644        if let Some(ref cm) = self.container_metrics {
645            cm.update();
646        }
647    }
648
649    /// Start the metrics HTTP server.
650    ///
651    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
652    /// `/readyz`, `/health/ready` endpoints.
653    ///
654    /// Only available when the `metrics` feature is enabled (for scraping).
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the server fails to start.
659    #[cfg(feature = "metrics")]
660    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
661        if self.shutdown_tx.is_some() {
662            return Err(MetricsError::AlreadyRunning);
663        }
664
665        let addr: SocketAddr = addr
666            .parse()
667            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
668
669        let listener = TcpListener::bind(addr)
670            .await
671            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
672
673        let (shutdown_tx, shutdown_rx) = oneshot::channel();
674        self.shutdown_tx = Some(shutdown_tx);
675
676        let handle = self
677            .handle
678            .as_ref()
679            .ok_or_else(|| {
680                MetricsError::ServerError(
681                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
682                )
683            })?
684            .clone();
685        let update_interval = self.config.update_interval;
686        let process_metrics = self.process_metrics.clone();
687        let container_metrics = self.container_metrics.clone();
688        let readiness_fn = self.readiness_fn.clone();
689        let started_flag = self.started_flag();
690
691        let registry = self.registry();
692
693        tokio::spawn(async move {
694            run_server(
695                listener,
696                handle,
697                registry,
698                shutdown_rx,
699                update_interval,
700                process_metrics,
701                container_metrics,
702                readiness_fn,
703                started_flag,
704            )
705            .await;
706        });
707
708        Ok(())
709    }
710
711    /// Start the metrics HTTP server with additional custom routes.
712    ///
713    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
714    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
715    ///
716    /// Additionally:
717    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
718    ///   adds `/scaling/pressure` returning the current pressure value.
719    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
720    ///   adds `/memory/pressure` returning memory status JSON.
721    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
722    ///
723    /// Requires both `metrics` and `http-server` features.
724    ///
725    /// # Errors
726    ///
727    /// Returns an error if the server fails to start.
728    #[cfg(all(feature = "metrics", feature = "http-server"))]
729    pub async fn start_server_with_routes(
730        &mut self,
731        addr: &str,
732        extra_routes: axum::Router,
733    ) -> Result<(), MetricsError> {
734        if self.shutdown_tx.is_some() {
735            return Err(MetricsError::AlreadyRunning);
736        }
737
738        let addr: SocketAddr = addr
739            .parse()
740            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
741
742        let listener = TcpListener::bind(addr)
743            .await
744            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
745
746        let (shutdown_tx, shutdown_rx) = oneshot::channel();
747        self.shutdown_tx = Some(shutdown_tx);
748
749        let handle = self
750            .handle
751            .as_ref()
752            .ok_or_else(|| {
753                MetricsError::ServerError(
754                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
755                )
756            })?
757            .clone();
758        let update_interval = self.config.update_interval;
759        let process_metrics = self.process_metrics.clone();
760        let container_metrics = self.container_metrics.clone();
761        let readiness_fn = self.readiness_fn.clone();
762
763        // Build the axum router with built-in + optional + custom routes
764        let metrics_handle = handle.clone();
765        let readiness_for_live = readiness_fn.clone();
766        let registry_handle = self.registry();
767
768        let mut app = axum::Router::new()
769            .route(
770                "/metrics/manifest",
771                axum::routing::get(move || {
772                    let reg = registry_handle.clone();
773                    async move {
774                        (
775                            [(axum::http::header::CONTENT_TYPE, "application/json")],
776                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
777                        )
778                    }
779                }),
780            )
781            .route(
782                "/metrics",
783                axum::routing::get(move || {
784                    let h = metrics_handle.clone();
785                    async move { h.render() }
786                }),
787            )
788            .route("/startupz", {
789                let sf = self.started_flag();
790                axum::routing::get(move || {
791                    let started = sf.load(std::sync::atomic::Ordering::Acquire);
792                    async move {
793                        if started {
794                            (
795                                axum::http::StatusCode::OK,
796                                [(axum::http::header::CONTENT_TYPE, "application/json")],
797                                r#"{"status":"started"}"#,
798                            )
799                        } else {
800                            (
801                                axum::http::StatusCode::SERVICE_UNAVAILABLE,
802                                [(axum::http::header::CONTENT_TYPE, "application/json")],
803                                r#"{"status":"starting"}"#,
804                            )
805                        }
806                    }
807                })
808            })
809            .route(
810                "/healthz",
811                axum::routing::get(|| async {
812                    (
813                        [(axum::http::header::CONTENT_TYPE, "application/json")],
814                        r#"{"status":"alive"}"#,
815                    )
816                }),
817            )
818            .route(
819                "/health/live",
820                axum::routing::get(|| async {
821                    (
822                        [(axum::http::header::CONTENT_TYPE, "application/json")],
823                        r#"{"status":"alive"}"#,
824                    )
825                }),
826            )
827            .route(
828                "/readyz",
829                axum::routing::get(move || {
830                    let rf = readiness_fn.clone();
831                    async move { readiness_response(rf) }
832                }),
833            )
834            .route(
835                "/health/ready",
836                axum::routing::get(move || {
837                    let rf = readiness_for_live.clone();
838                    async move { readiness_response(rf) }
839                }),
840            );
841
842        // Add scaling pressure endpoint if configured
843        #[cfg(feature = "scaling")]
844        if let Some(ref sp) = self.scaling_pressure {
845            let sp = sp.clone();
846            app = app.route(
847                "/scaling/pressure",
848                axum::routing::get(move || {
849                    let s = sp.clone();
850                    async move { format!("{:.2}", s.calculate()) }
851                }),
852            );
853        }
854
855        // Add memory pressure endpoint if configured
856        #[cfg(feature = "memory")]
857        if let Some(ref mg) = self.memory_guard {
858            let mg = mg.clone();
859            app = app.route(
860                "/memory/pressure",
861                axum::routing::get(move || {
862                    let m = mg.clone();
863                    async move {
864                        (
865                            [(axum::http::header::CONTENT_TYPE, "application/json")],
866                            format!(
867                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
868                                m.under_pressure(),
869                                m.pressure_ratio(),
870                                m.current_bytes(),
871                                m.limit_bytes()
872                            ),
873                        )
874                    }
875                }),
876            );
877        }
878
879        // Merge service-specific routes
880        app = app.merge(extra_routes);
881
882        tokio::spawn(async move {
883            run_axum_server(
884                listener,
885                app,
886                shutdown_rx,
887                update_interval,
888                process_metrics,
889                container_metrics,
890            )
891            .await;
892        });
893
894        Ok(())
895    }
896
897    /// Stop the metrics server.
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if the server is not running.
902    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
903        if let Some(tx) = self.shutdown_tx.take() {
904            let _ = tx.send(());
905            Ok(())
906        } else {
907            Err(MetricsError::NotRunning)
908        }
909    }
910
911    /// Gracefully shut down the OTel provider (flushes pending exports).
912    ///
913    /// Call this before application exit to ensure all metrics are exported.
914    #[cfg(feature = "otel-metrics")]
915    pub fn shutdown_otel(&mut self) {
916        if let Some(provider) = self.otel_provider.take()
917            && let Err(e) = provider.shutdown()
918        {
919            tracing::warn!(error = %e, "OTel provider shutdown error");
920        }
921    }
922
923    /// Set application version and git commit for the manifest.
924    ///
925    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
926    /// so only `&self` is needed. Called automatically by
927    /// `dfe_groups::AppMetrics::new()` if the `metrics-dfe` feature is enabled.
928    pub fn set_build_info(&self, version: &str, commit: &str) {
929        self.registry.set_build_info(version, commit);
930    }
931
932    /// Set operational use cases for a metric (by full prefixed name).
933    ///
934    /// No-op if the metric is not found in the registry.
935    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
936        self.registry.set_use_cases(metric_name, use_cases);
937    }
938
939    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
940    ///
941    /// No-op if the metric is not found in the registry.
942    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
943        self.registry.set_dashboard_hint(metric_name, hint);
944    }
945
946    /// Get a cloneable handle to the metric registry.
947    ///
948    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
949    /// Call before starting the server, consistent with the `render_handle()` pattern.
950    #[must_use]
951    pub fn registry(&self) -> MetricRegistry {
952        self.registry.clone()
953    }
954
955    /// Get the namespace prefix (e.g. `dfe_loader`).
956    ///
957    /// Used by `dfe_groups` metric structs (`metrics-dfe` feature) to build
958    /// labelled metric keys.
959    #[must_use]
960    pub fn namespace(&self) -> &str {
961        &self.config.namespace
962    }
963
964    /// Get prefixed metric name.
965    fn prefixed_key(&self, name: &str) -> String {
966        if self.config.namespace.is_empty() {
967            name.to_string()
968        } else {
969            format!("{}_{}", self.config.namespace, name)
970        }
971    }
972}
973
974/// Run the metrics HTTP server.
975#[cfg(feature = "metrics")]
976#[allow(clippy::too_many_arguments)]
977async fn run_server(
978    listener: TcpListener,
979    handle: PrometheusHandle,
980    registry: MetricRegistry,
981    mut shutdown_rx: oneshot::Receiver<()>,
982    update_interval: Duration,
983    process_metrics: Option<ProcessMetrics>,
984    container_metrics: Option<ContainerMetrics>,
985    readiness_fn: Option<ReadinessFn>,
986    started_flag: Arc<std::sync::atomic::AtomicBool>,
987) {
988    let mut update_interval = tokio::time::interval(update_interval);
989
990    loop {
991        tokio::select! {
992            _ = &mut shutdown_rx => {
993                break;
994            }
995            _ = update_interval.tick() => {
996                if let Some(ref pm) = process_metrics {
997                    pm.update();
998                }
999                if let Some(ref cm) = container_metrics {
1000                    cm.update();
1001                }
1002            }
1003            result = listener.accept() => {
1004                if let Ok((stream, _)) = result {
1005                    let handle = handle.clone();
1006                    let registry = registry.clone();
1007                    let readiness_fn = readiness_fn.clone();
1008                    let sf = Arc::clone(&started_flag);
1009                    tokio::spawn(async move {
1010                        handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1011                    });
1012                }
1013            }
1014        }
1015    }
1016}
1017
1018/// Handle a single HTTP connection.
1019///
1020/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
1021/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
1022#[cfg(feature = "metrics")]
1023async fn handle_connection(
1024    mut stream: tokio::net::TcpStream,
1025    handle: PrometheusHandle,
1026    registry: MetricRegistry,
1027    readiness_fn: Option<ReadinessFn>,
1028    started_flag: &std::sync::atomic::AtomicBool,
1029) {
1030    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1031
1032    let mut reader = BufReader::new(&mut stream);
1033    let mut request_line = String::new();
1034
1035    if reader.read_line(&mut request_line).await.is_err() {
1036        return;
1037    }
1038
1039    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
1040    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1041        (
1042            "200 OK",
1043            "application/json",
1044            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
1045        )
1046    } else if request_line.starts_with("GET /metrics") {
1047        ("200 OK", "text/plain; charset=utf-8", handle.render())
1048    } else if request_line.starts_with("GET /startupz")
1049        || request_line.starts_with("GET /health/startup")
1050    {
1051        if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1052            (
1053                "200 OK",
1054                "application/json",
1055                r#"{"status":"started"}"#.to_string(),
1056            )
1057        } else {
1058            (
1059                "503 Service Unavailable",
1060                "application/json",
1061                r#"{"status":"starting"}"#.to_string(),
1062            )
1063        }
1064    } else if request_line.starts_with("GET /healthz")
1065        || request_line.starts_with("GET /health/live")
1066    {
1067        (
1068            "200 OK",
1069            "application/json",
1070            r#"{"status":"alive"}"#.to_string(),
1071        )
1072    } else if request_line.starts_with("GET /readyz")
1073        || request_line.starts_with("GET /health/ready")
1074    {
1075        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1076
1077        #[cfg(feature = "health")]
1078        let registry_ready = crate::health::HealthRegistry::is_ready();
1079        #[cfg(not(feature = "health"))]
1080        let registry_ready = true;
1081
1082        let ready = callback_ready && registry_ready;
1083        if ready {
1084            (
1085                "200 OK",
1086                "application/json",
1087                r#"{"status":"ready"}"#.to_string(),
1088            )
1089        } else {
1090            (
1091                "503 Service Unavailable",
1092                "application/json",
1093                r#"{"status":"not_ready"}"#.to_string(),
1094            )
1095        }
1096    } else {
1097        (
1098            "404 Not Found",
1099            "text/plain; charset=utf-8",
1100            "Not Found".to_string(),
1101        )
1102    };
1103
1104    let response = format!(
1105        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1106        body.len()
1107    );
1108
1109    let _ = stream.write_all(response.as_bytes()).await;
1110}
1111
1112/// Readiness response helper for axum endpoints.
1113///
1114/// Checks the caller-supplied readiness callback AND (when the `health`
1115/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1116/// Both must be true for a 200 response.
1117#[cfg(all(feature = "metrics", feature = "http-server"))]
1118fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1119    use axum::response::IntoResponse;
1120
1121    let callback_ready = rf.as_ref().is_none_or(|f| f());
1122
1123    #[cfg(feature = "health")]
1124    let registry_ready = crate::health::HealthRegistry::is_ready();
1125    #[cfg(not(feature = "health"))]
1126    let registry_ready = true;
1127
1128    let ready = callback_ready && registry_ready;
1129    if ready {
1130        (
1131            [(axum::http::header::CONTENT_TYPE, "application/json")],
1132            r#"{"status":"ready"}"#,
1133        )
1134            .into_response()
1135    } else {
1136        (
1137            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1138            [(axum::http::header::CONTENT_TYPE, "application/json")],
1139            r#"{"status":"not_ready"}"#,
1140        )
1141            .into_response()
1142    }
1143}
1144
1145/// Run the axum-based metrics HTTP server with custom routes.
1146#[cfg(all(feature = "metrics", feature = "http-server"))]
1147async fn run_axum_server(
1148    listener: TcpListener,
1149    app: axum::Router,
1150    shutdown_rx: oneshot::Receiver<()>,
1151    update_interval: Duration,
1152    process_metrics: Option<ProcessMetrics>,
1153    container_metrics: Option<ContainerMetrics>,
1154) {
1155    let mut interval = tokio::time::interval(update_interval);
1156
1157    // Spawn the metrics update loop
1158    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1159    tokio::spawn(async move {
1160        loop {
1161            tokio::select! {
1162                _ = &mut update_stop_rx => break,
1163                _ = interval.tick() => {
1164                    if let Some(ref pm) = process_metrics {
1165                        pm.update();
1166                    }
1167                    if let Some(ref cm) = container_metrics {
1168                        cm.update();
1169                    }
1170                }
1171            }
1172        }
1173    });
1174
1175    // Run axum server with graceful shutdown
1176    axum::serve(listener, app)
1177        .with_graceful_shutdown(async move {
1178            let _ = shutdown_rx.await;
1179        })
1180        .await
1181        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1182
1183    let _ = update_stop_tx.send(());
1184}
1185
1186/// Standard latency histogram buckets.
1187#[must_use]
1188pub fn latency_buckets() -> Vec<f64> {
1189    vec![
1190        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1191    ]
1192}
1193
1194/// Standard size histogram buckets.
1195#[must_use]
1196pub fn size_buckets() -> Vec<f64> {
1197    vec![
1198        100.0,
1199        1_000.0,
1200        10_000.0,
1201        100_000.0,
1202        1_000_000.0,
1203        10_000_000.0,
1204    ]
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210
1211    #[test]
1212    fn test_metrics_config_default() {
1213        let config = MetricsConfig::default();
1214        assert!(config.namespace.is_empty());
1215        assert!(config.enable_process_metrics);
1216        assert!(config.enable_container_metrics);
1217        assert_eq!(config.update_interval, Duration::from_secs(15));
1218    }
1219
1220    #[test]
1221    fn test_latency_buckets() {
1222        let buckets = latency_buckets();
1223        assert_eq!(buckets.len(), 12);
1224        assert!(buckets[0] < buckets[11]);
1225    }
1226
1227    #[test]
1228    fn test_size_buckets() {
1229        let buckets = size_buckets();
1230        assert_eq!(buckets.len(), 6);
1231        assert!(buckets[0] < buckets[5]);
1232    }
1233}