Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! Provides production-ready metrics collection with support for:
12//!
13//! - **`metrics` feature only:** Prometheus scrape endpoint via `/metrics`
14//! - **`otel-metrics` feature only:** OTLP push to OTel-compatible backends
15//! - **Both features:** Fanout recorder sends to both Prometheus AND OTel
16//!
17//! ## Features
18//!
19//! - Counter, Gauge, Histogram metric types
20//! - Automatic process metrics (CPU, memory, file descriptors)
21//! - Container metrics from cgroups (memory limit, CPU limit)
22//! - Built-in HTTP server for `/metrics` endpoint (Prometheus)
23//! - OTLP push to HyperDX, Jaeger, Grafana, etc. (OTel)
24//! - Readiness callback for `/health/ready` endpoints
25//! - Optional scaling pressure endpoint (`/scaling/pressure`)
26//! - Optional memory guard endpoint (`/memory/pressure`)
27//! - Custom route support via [`start_server_with_routes`](MetricsManager::start_server_with_routes)
28//!
29//! ## Basic Example
30//!
31//! ```rust,no_run
32//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
33//!
34//! #[tokio::main]
35//! async fn main() {
36//!     let mut manager = MetricsManager::new("myapp");
37//!
38//!     // Create metrics
39//!     let requests = manager.counter("requests_total", "Total requests");
40//!     let active = manager.gauge("active_connections", "Active connections");
41//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
42//!
43//!     // Start metrics server (simple -- built-in endpoints only)
44//!     manager.start_server("0.0.0.0:9090").await.unwrap();
45//!
46//!     // Record metrics
47//!     requests.increment(1);
48//!     active.set(42.0);
49//!     latency.record(0.123);
50//! }
51//! ```
52//!
53//! ## Advanced Example (with custom routes, scaling, memory)
54//!
55//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
56//!
57//! ```rust,ignore
58//! use std::sync::Arc;
59//! use hyperi_rustlib::metrics::MetricsManager;
60//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
61//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
62//! use axum::{Router, routing::post};
63//!
64//! let mut mgr = MetricsManager::new("myapp");
65//!
66//! // Readiness callback
67//! mgr.set_readiness_check(|| true);
68//!
69//! // Attach scaling pressure (adds /scaling/pressure endpoint)
70//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
71//! mgr.set_scaling_pressure(scaling);
72//!
73//! // Attach memory guard (adds /memory/pressure endpoint)
74//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
75//! mgr.set_memory_guard(guard);
76//!
77//! // Service-specific routes
78//! let custom = Router::new()
79//!     .route("/test", post(|| async { "ok" }));
80//!
81//! // Start with everything merged into one server
82//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
83//! ```
84
85mod container;
86pub mod dfe;
87pub mod labels;
88pub mod manifest;
89mod process;
90
91pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
92
93#[cfg(feature = "otel-metrics")]
94pub(crate) mod otel;
95#[cfg(feature = "otel-metrics")]
96pub mod otel_types;
97
98use std::net::SocketAddr;
99use std::sync::Arc;
100use std::time::Duration;
101
102use metrics::{Counter, Gauge, Histogram, Unit};
103use thiserror::Error;
104use tokio::net::TcpListener;
105use tokio::sync::oneshot;
106
107/// Readiness check callback type.
108pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
109
110#[cfg(feature = "metrics")]
111use metrics_exporter_prometheus::PrometheusHandle;
112
113pub use container::ContainerMetrics;
114pub use dfe::DfeMetrics;
115#[cfg(feature = "metrics-dfe")]
116pub mod dfe_groups;
117pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
118pub use process::ProcessMetrics;
119
120#[cfg(feature = "otel-metrics")]
121pub use otel_types::{OtelMetricsConfig, OtelProtocol};
122
123/// Cloneable handle for rendering Prometheus metrics text.
124///
125/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
126/// `axum` route handlers or share across tasks.
127#[cfg(feature = "metrics")]
128#[derive(Clone)]
129pub struct RenderHandle(PrometheusHandle);
130
131#[cfg(feature = "metrics")]
132impl RenderHandle {
133    /// Render current metrics in Prometheus text format.
134    #[must_use]
135    pub fn render(&self) -> String {
136        self.0.render()
137    }
138}
139
140/// Metrics errors.
141#[derive(Debug, Error)]
142pub enum MetricsError {
143    /// Failed to build metrics exporter.
144    #[error("failed to build metrics exporter: {0}")]
145    BuildError(String),
146
147    /// Failed to start metrics server.
148    #[error("failed to start metrics server: {0}")]
149    ServerError(String),
150
151    /// Server already running.
152    #[error("metrics server already running")]
153    AlreadyRunning,
154
155    /// Server not running.
156    #[error("metrics server not running")]
157    NotRunning,
158}
159
160/// Metrics configuration.
161#[derive(Debug, Clone)]
162pub struct MetricsConfig {
163    /// Metric namespace prefix.
164    pub namespace: String,
165    /// Enable process metrics collection.
166    pub enable_process_metrics: bool,
167    /// Enable container metrics collection.
168    pub enable_container_metrics: bool,
169    /// Update interval for auto-collected metrics.
170    pub update_interval: Duration,
171    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
172    #[cfg(feature = "otel-metrics")]
173    pub otel: OtelMetricsConfig,
174}
175
176impl Default for MetricsConfig {
177    fn default() -> Self {
178        Self {
179            namespace: String::new(),
180            enable_process_metrics: true,
181            enable_container_metrics: true,
182            update_interval: Duration::from_secs(15),
183            #[cfg(feature = "otel-metrics")]
184            otel: OtelMetricsConfig::default(),
185        }
186    }
187}
188
189/// Intermediate struct to pass recorder setup results across cfg boundaries.
190struct RecorderSetup {
191    #[cfg(feature = "metrics")]
192    prom_handle: Option<PrometheusHandle>,
193    #[cfg(feature = "otel-metrics")]
194    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
195}
196
197/// Install the metrics recorder(s) based on enabled features.
198///
199/// Returns setup results containing handles/providers. When both
200/// `metrics` and `otel-metrics` features are enabled, uses `metrics-util`
201/// `FanoutBuilder` to compose both recorders into a single global recorder.
202#[allow(unused_variables)]
203fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
204    // --- Prometheus only (no OTel) ---
205    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
206    {
207        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
208        let handle = recorder.handle();
209        if let Err(e) = metrics::set_global_recorder(recorder) {
210            tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
211        }
212        RecorderSetup {
213            prom_handle: Some(handle),
214        }
215    }
216
217    // --- OTel only (no Prometheus) ---
218    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
219    {
220        match otel::build_otel_recorder(&config.namespace, &config.otel) {
221            Ok((otel_recorder, provider)) => {
222                opentelemetry::global::set_meter_provider(provider.clone());
223                if let Err(e) = metrics::set_global_recorder(otel_recorder) {
224                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
225                }
226                RecorderSetup {
227                    otel_provider: Some(provider),
228                }
229            }
230            Err(e) => {
231                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
232                RecorderSetup {
233                    otel_provider: None,
234                }
235            }
236        }
237    }
238
239    // --- Both Prometheus + OTel (Fanout) ---
240    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
241    {
242        // Build Prometheus recorder (without installing globally)
243        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
244        let prom_handle = prom_recorder.handle();
245
246        // Build OTel recorder
247        match otel::build_otel_recorder(&config.namespace, &config.otel) {
248            Ok((otel_recorder, provider)) => {
249                opentelemetry::global::set_meter_provider(provider.clone());
250
251                // Compose via Fanout: both recorders receive every measurement
252                let fanout = metrics_util::layers::FanoutBuilder::default()
253                    .add_recorder(prom_recorder)
254                    .add_recorder(otel_recorder)
255                    .build();
256
257                if let Err(e) = metrics::set_global_recorder(fanout) {
258                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
259                }
260
261                RecorderSetup {
262                    prom_handle: Some(prom_handle),
263                    otel_provider: Some(provider),
264                }
265            }
266            Err(e) => {
267                // Fallback: just Prometheus if OTel fails
268                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
269                if let Err(e) = metrics::set_global_recorder(prom_recorder) {
270                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
271                }
272                RecorderSetup {
273                    prom_handle: Some(prom_handle),
274                    otel_provider: None,
275                }
276            }
277        }
278    }
279}
280
281/// Metrics manager handling Prometheus and/or OTel exposition.
282pub struct MetricsManager {
283    #[cfg(feature = "metrics")]
284    handle: Option<PrometheusHandle>,
285    config: MetricsConfig,
286    shutdown_tx: Option<oneshot::Sender<()>>,
287    process_metrics: Option<ProcessMetrics>,
288    container_metrics: Option<ContainerMetrics>,
289    readiness_fn: Option<ReadinessFn>,
290    started: Arc<std::sync::atomic::AtomicBool>,
291    registry: MetricRegistry,
292    #[cfg(all(feature = "metrics", feature = "scaling"))]
293    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
294    #[cfg(all(feature = "metrics", feature = "memory"))]
295    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
296    #[cfg(feature = "otel-metrics")]
297    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
298}
299
300impl MetricsManager {
301    /// Create a new metrics manager with the given namespace.
302    #[must_use]
303    pub fn new(namespace: &str) -> Self {
304        Self::with_config(MetricsConfig {
305            namespace: namespace.to_string(),
306            ..Default::default()
307        })
308    }
309
310    /// Create a metrics manager for tests without installing a global recorder.
311    ///
312    /// The global Prometheus recorder can only be installed once per process.
313    /// Tests that call `MetricsManager::new()` in parallel all race to install
314    /// it and all but the first panic with `SetRecorderError`.
315    ///
316    /// This constructor skips recorder installation entirely. `metrics!` macros
317    /// become no-ops (the crate's documented behaviour when no recorder is set),
318    /// but manifest registry tracking, descriptor push, and namespace logic all
319    /// work normally -- which is what the tests actually verify.
320    #[cfg(test)]
321    pub(crate) fn new_for_test(namespace: &str) -> Self {
322        let config = MetricsConfig {
323            namespace: namespace.to_string(),
324            enable_process_metrics: false,
325            enable_container_metrics: false,
326            ..Default::default()
327        };
328
329        let registry = MetricRegistry::new(&config.namespace);
330
331        Self {
332            #[cfg(feature = "metrics")]
333            handle: None,
334            registry,
335            config,
336            shutdown_tx: None,
337            process_metrics: None,
338            container_metrics: None,
339            readiness_fn: None,
340            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
341            #[cfg(all(feature = "metrics", feature = "scaling"))]
342            scaling_pressure: None,
343            #[cfg(all(feature = "metrics", feature = "memory"))]
344            memory_guard: None,
345            #[cfg(feature = "otel-metrics")]
346            otel_provider: None,
347        }
348    }
349
350    /// Create a metrics manager with custom configuration.
351    ///
352    /// Installs the appropriate recorder(s) based on enabled features:
353    /// - `metrics` only: Prometheus recorder
354    /// - `otel-metrics` only: OTel recorder (OTLP push)
355    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
356    #[must_use]
357    pub fn with_config(config: MetricsConfig) -> Self {
358        let setup = install_recorders(&config);
359
360        let process_metrics = if config.enable_process_metrics {
361            Some(ProcessMetrics::new(&config.namespace))
362        } else {
363            None
364        };
365
366        let container_metrics = if config.enable_container_metrics {
367            Some(ContainerMetrics::new(&config.namespace))
368        } else {
369            None
370        };
371
372        let registry = MetricRegistry::new(&config.namespace);
373
374        Self {
375            #[cfg(feature = "metrics")]
376            handle: setup.prom_handle,
377            registry,
378            config,
379            shutdown_tx: None,
380            process_metrics,
381            container_metrics,
382            readiness_fn: None,
383            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
384            #[cfg(all(feature = "metrics", feature = "scaling"))]
385            scaling_pressure: None,
386            #[cfg(all(feature = "metrics", feature = "memory"))]
387            memory_guard: None,
388            #[cfg(feature = "otel-metrics")]
389            otel_provider: setup.otel_provider,
390        }
391    }
392
393    /// Create a counter metric.
394    ///
395    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
396    /// with empty labels and `group = "custom"`.
397    #[must_use]
398    pub fn counter(&self, name: &str, description: &str) -> Counter {
399        let key = self.prefixed_key(name);
400        let desc = description.to_string();
401        metrics::describe_counter!(key.clone(), desc.clone());
402        self.registry.push(MetricDescriptor {
403            name: key.clone(),
404            metric_type: MetricType::Counter,
405            description: desc,
406            unit: String::new(),
407            labels: vec![],
408            group: "custom".into(),
409            buckets: None,
410            use_cases: vec![],
411            dashboard_hint: None,
412        });
413        metrics::counter!(key)
414    }
415
416    /// Create a counter with label keys and group metadata for the manifest.
417    ///
418    /// The `labels` parameter declares label **key names** for the manifest.
419    /// The returned `Counter` is label-free -- apply label values at recording
420    /// time via `metrics::counter!(key, "label" => value)`.
421    #[must_use]
422    pub fn counter_with_labels(
423        &self,
424        name: &str,
425        description: &str,
426        labels: &[&str],
427        group: &str,
428    ) -> Counter {
429        let key = self.prefixed_key(name);
430        let desc = description.to_string();
431        metrics::describe_counter!(key.clone(), desc.clone());
432        self.registry.push(MetricDescriptor {
433            name: key.clone(),
434            metric_type: MetricType::Counter,
435            description: desc,
436            unit: String::new(),
437            labels: labels.iter().map(|s| (*s).to_string()).collect(),
438            group: group.into(),
439            buckets: None,
440            use_cases: vec![],
441            dashboard_hint: None,
442        });
443        metrics::counter!(key)
444    }
445
446    /// Create a gauge metric.
447    ///
448    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
449    /// with empty labels and `group = "custom"`.
450    #[must_use]
451    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
452        let key = self.prefixed_key(name);
453        let desc = description.to_string();
454        metrics::describe_gauge!(key.clone(), desc.clone());
455        self.registry.push(MetricDescriptor {
456            name: key.clone(),
457            metric_type: MetricType::Gauge,
458            description: desc,
459            unit: String::new(),
460            labels: vec![],
461            group: "custom".into(),
462            buckets: None,
463            use_cases: vec![],
464            dashboard_hint: None,
465        });
466        metrics::gauge!(key)
467    }
468
469    /// Create a gauge with label keys and group metadata for the manifest.
470    #[must_use]
471    pub fn gauge_with_labels(
472        &self,
473        name: &str,
474        description: &str,
475        labels: &[&str],
476        group: &str,
477    ) -> Gauge {
478        let key = self.prefixed_key(name);
479        let desc = description.to_string();
480        metrics::describe_gauge!(key.clone(), desc.clone());
481        self.registry.push(MetricDescriptor {
482            name: key.clone(),
483            metric_type: MetricType::Gauge,
484            description: desc,
485            unit: String::new(),
486            labels: labels.iter().map(|s| (*s).to_string()).collect(),
487            group: group.into(),
488            buckets: None,
489            use_cases: vec![],
490            dashboard_hint: None,
491        });
492        metrics::gauge!(key)
493    }
494
495    /// Create a histogram metric with default buckets.
496    ///
497    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
498    /// with empty labels and `group = "custom"`.
499    #[must_use]
500    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
501        let key = self.prefixed_key(name);
502        let desc = description.to_string();
503        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
504        self.registry.push(MetricDescriptor {
505            name: key.clone(),
506            metric_type: MetricType::Histogram,
507            description: desc,
508            unit: "seconds".into(),
509            labels: vec![],
510            group: "custom".into(),
511            buckets: None,
512            use_cases: vec![],
513            dashboard_hint: None,
514        });
515        metrics::histogram!(key)
516    }
517
518    /// Create a histogram with label keys, group, and optional buckets for the manifest.
519    #[must_use]
520    pub fn histogram_with_labels(
521        &self,
522        name: &str,
523        description: &str,
524        labels: &[&str],
525        group: &str,
526        buckets: Option<&[f64]>,
527    ) -> Histogram {
528        let key = self.prefixed_key(name);
529        let desc = description.to_string();
530        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
531        self.registry.push(MetricDescriptor {
532            name: key.clone(),
533            metric_type: MetricType::Histogram,
534            description: desc,
535            unit: "seconds".into(),
536            labels: labels.iter().map(|s| (*s).to_string()).collect(),
537            group: group.into(),
538            buckets: buckets.map(|b| b.to_vec()),
539            use_cases: vec![],
540            dashboard_hint: None,
541        });
542        metrics::histogram!(key)
543    }
544
545    /// Create a histogram metric with custom buckets.
546    ///
547    /// **Note:** The `buckets` parameter is captured in the manifest registry
548    /// but currently ignored by the `metrics` crate at runtime (buckets are set
549    /// globally at recorder installation time).
550    #[must_use]
551    pub fn histogram_with_buckets(
552        &self,
553        name: &str,
554        description: &str,
555        buckets: &[f64],
556    ) -> Histogram {
557        let key = self.prefixed_key(name);
558        let desc = description.to_string();
559        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
560        self.registry.push(MetricDescriptor {
561            name: key.clone(),
562            metric_type: MetricType::Histogram,
563            description: desc,
564            unit: "seconds".into(),
565            labels: vec![],
566            group: "custom".into(),
567            buckets: Some(buckets.to_vec()),
568            use_cases: vec![],
569            dashboard_hint: None,
570        });
571        metrics::histogram!(key)
572    }
573
574    /// Get the Prometheus metrics output.
575    ///
576    /// Returns the rendered Prometheus text format. Only available when
577    /// the `metrics` feature is enabled.
578    #[cfg(feature = "metrics")]
579    #[must_use]
580    pub fn render(&self) -> String {
581        self.handle
582            .as_ref()
583            .map_or_else(String::new, PrometheusHandle::render)
584    }
585
586    /// Get a cloneable render handle for use in route handlers.
587    ///
588    /// Returns a closure that renders the current Prometheus metrics text.
589    /// The closure is `Send + Sync + Clone`, making it safe to move into
590    /// `axum` route handlers or share across tasks via `Arc`.
591    ///
592    /// Returns `None` if no Prometheus recorder is installed.
593    ///
594    /// # Example
595    ///
596    /// ```rust,ignore
597    /// let mut mgr = MetricsManager::new("myapp");
598    /// let render = mgr.render_handle().expect("recorder installed");
599    ///
600    /// // Use in axum route
601    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
602    ///     let r = render.clone();
603    ///     async move { r() }
604    /// }));
605    ///
606    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
607    /// ```
608    #[cfg(feature = "metrics")]
609    #[must_use]
610    pub fn render_handle(&self) -> Option<RenderHandle> {
611        self.handle.clone().map(RenderHandle)
612    }
613
614    /// Set a readiness check callback.
615    ///
616    /// When set, `/readyz` and `/health/ready` call this function and return
617    /// 503 Service Unavailable if it returns `false`. Without a callback,
618    /// these endpoints always return 200.
619    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
620        self.readiness_fn = Some(Arc::new(f));
621    }
622
623    /// Mark the service as started (startup probe passes).
624    ///
625    /// Call this once init is complete. K8s `startupProbe` hits `/startupz`
626    /// which returns 503 until this is called, then 200 thereafter.
627    /// Separate from readiness -- startup has a longer timeout for slow starters.
628    pub fn mark_started(&self) {
629        self.started
630            .store(true, std::sync::atomic::Ordering::Release);
631    }
632
633    /// Get a clone of the started flag (for passing to HTTP handler).
634    pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
635        Arc::clone(&self.started)
636    }
637
638    /// Attach a `ScalingPressure` instance.
639    ///
640    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
641    /// endpoint is automatically added that returns the current pressure value.
642    #[cfg(all(feature = "metrics", feature = "scaling"))]
643    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
644        self.scaling_pressure = Some(sp);
645    }
646
647    /// Attach a `MemoryGuard` instance.
648    ///
649    /// When set and using `start_server_with_routes`, a `/memory/pressure`
650    /// endpoint is automatically added that returns the current memory status.
651    #[cfg(all(feature = "metrics", feature = "memory"))]
652    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
653        self.memory_guard = Some(mg);
654    }
655
656    /// Update process and container metrics.
657    pub fn update(&self) {
658        if let Some(ref pm) = self.process_metrics {
659            pm.update();
660        }
661        if let Some(ref cm) = self.container_metrics {
662            cm.update();
663        }
664    }
665
666    /// Start the metrics HTTP server.
667    ///
668    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
669    /// `/readyz`, `/health/ready` endpoints.
670    ///
671    /// Only available when the `metrics` feature is enabled (for scraping).
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the server fails to start.
676    #[cfg(feature = "metrics")]
677    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
678        if self.shutdown_tx.is_some() {
679            return Err(MetricsError::AlreadyRunning);
680        }
681
682        let addr: SocketAddr = addr
683            .parse()
684            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
685
686        let listener = TcpListener::bind(addr)
687            .await
688            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
689
690        let (shutdown_tx, shutdown_rx) = oneshot::channel();
691        self.shutdown_tx = Some(shutdown_tx);
692
693        let handle = self
694            .handle
695            .as_ref()
696            .ok_or_else(|| {
697                MetricsError::ServerError(
698                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
699                )
700            })?
701            .clone();
702        let update_interval = self.config.update_interval;
703        let process_metrics = self.process_metrics.clone();
704        let container_metrics = self.container_metrics.clone();
705        let readiness_fn = self.readiness_fn.clone();
706        let started_flag = self.started_flag();
707
708        let registry = self.registry();
709
710        tokio::spawn(async move {
711            run_server(
712                listener,
713                handle,
714                registry,
715                shutdown_rx,
716                update_interval,
717                process_metrics,
718                container_metrics,
719                readiness_fn,
720                started_flag,
721            )
722            .await;
723        });
724
725        Ok(())
726    }
727
728    /// Start the metrics HTTP server with additional custom routes.
729    ///
730    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
731    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
732    ///
733    /// Additionally:
734    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
735    ///   adds `/scaling/pressure` returning the current pressure value.
736    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
737    ///   adds `/memory/pressure` returning memory status JSON.
738    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
739    ///
740    /// Requires both `metrics` and `http-server` features.
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if the server fails to start.
745    #[cfg(all(feature = "metrics", feature = "http-server"))]
746    pub async fn start_server_with_routes(
747        &mut self,
748        addr: &str,
749        extra_routes: axum::Router,
750    ) -> Result<(), MetricsError> {
751        if self.shutdown_tx.is_some() {
752            return Err(MetricsError::AlreadyRunning);
753        }
754
755        let addr: SocketAddr = addr
756            .parse()
757            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
758
759        let listener = TcpListener::bind(addr)
760            .await
761            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
762
763        let (shutdown_tx, shutdown_rx) = oneshot::channel();
764        self.shutdown_tx = Some(shutdown_tx);
765
766        let handle = self
767            .handle
768            .as_ref()
769            .ok_or_else(|| {
770                MetricsError::ServerError(
771                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
772                )
773            })?
774            .clone();
775        let update_interval = self.config.update_interval;
776        let process_metrics = self.process_metrics.clone();
777        let container_metrics = self.container_metrics.clone();
778        let readiness_fn = self.readiness_fn.clone();
779
780        // Build the axum router with built-in + optional + custom routes
781        let metrics_handle = handle.clone();
782        let readiness_for_live = readiness_fn.clone();
783        let registry_handle = self.registry();
784
785        let mut app = axum::Router::new()
786            .route(
787                "/metrics/manifest",
788                axum::routing::get(move || {
789                    let reg = registry_handle.clone();
790                    async move {
791                        (
792                            [(axum::http::header::CONTENT_TYPE, "application/json")],
793                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
794                        )
795                    }
796                }),
797            )
798            .route(
799                "/metrics",
800                axum::routing::get(move || {
801                    let h = metrics_handle.clone();
802                    async move { h.render() }
803                }),
804            )
805            .route("/startupz", {
806                let sf = self.started_flag();
807                axum::routing::get(move || {
808                    let started = sf.load(std::sync::atomic::Ordering::Acquire);
809                    async move {
810                        if started {
811                            (
812                                axum::http::StatusCode::OK,
813                                [(axum::http::header::CONTENT_TYPE, "application/json")],
814                                r#"{"status":"started"}"#,
815                            )
816                        } else {
817                            (
818                                axum::http::StatusCode::SERVICE_UNAVAILABLE,
819                                [(axum::http::header::CONTENT_TYPE, "application/json")],
820                                r#"{"status":"starting"}"#,
821                            )
822                        }
823                    }
824                })
825            })
826            .route(
827                "/healthz",
828                axum::routing::get(|| async {
829                    (
830                        [(axum::http::header::CONTENT_TYPE, "application/json")],
831                        r#"{"status":"alive"}"#,
832                    )
833                }),
834            )
835            .route(
836                "/health/live",
837                axum::routing::get(|| async {
838                    (
839                        [(axum::http::header::CONTENT_TYPE, "application/json")],
840                        r#"{"status":"alive"}"#,
841                    )
842                }),
843            )
844            .route(
845                "/readyz",
846                axum::routing::get(move || {
847                    let rf = readiness_fn.clone();
848                    async move { readiness_response(rf) }
849                }),
850            )
851            .route(
852                "/health/ready",
853                axum::routing::get(move || {
854                    let rf = readiness_for_live.clone();
855                    async move { readiness_response(rf) }
856                }),
857            );
858
859        // Add scaling pressure endpoint if configured
860        #[cfg(feature = "scaling")]
861        if let Some(ref sp) = self.scaling_pressure {
862            let sp = sp.clone();
863            app = app.route(
864                "/scaling/pressure",
865                axum::routing::get(move || {
866                    let s = sp.clone();
867                    async move { format!("{:.2}", s.calculate()) }
868                }),
869            );
870        }
871
872        // Add memory pressure endpoint if configured
873        #[cfg(feature = "memory")]
874        if let Some(ref mg) = self.memory_guard {
875            let mg = mg.clone();
876            app = app.route(
877                "/memory/pressure",
878                axum::routing::get(move || {
879                    let m = mg.clone();
880                    async move {
881                        (
882                            [(axum::http::header::CONTENT_TYPE, "application/json")],
883                            format!(
884                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
885                                m.under_pressure(),
886                                m.pressure_ratio(),
887                                m.current_bytes(),
888                                m.limit_bytes()
889                            ),
890                        )
891                    }
892                }),
893            );
894        }
895
896        // Merge service-specific routes
897        app = app.merge(extra_routes);
898
899        tokio::spawn(async move {
900            run_axum_server(
901                listener,
902                app,
903                shutdown_rx,
904                update_interval,
905                process_metrics,
906                container_metrics,
907            )
908            .await;
909        });
910
911        Ok(())
912    }
913
914    /// Stop the metrics server.
915    ///
916    /// # Errors
917    ///
918    /// Returns an error if the server is not running.
919    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
920        if let Some(tx) = self.shutdown_tx.take() {
921            let _ = tx.send(());
922            Ok(())
923        } else {
924            Err(MetricsError::NotRunning)
925        }
926    }
927
928    /// Gracefully shut down the OTel provider (flushes pending exports).
929    ///
930    /// Call this before application exit to ensure all metrics are exported.
931    #[cfg(feature = "otel-metrics")]
932    pub fn shutdown_otel(&mut self) {
933        if let Some(provider) = self.otel_provider.take()
934            && let Err(e) = provider.shutdown()
935        {
936            tracing::warn!(error = %e, "OTel provider shutdown error");
937        }
938    }
939
940    /// Set application version and git commit for the manifest.
941    ///
942    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
943    /// so only `&self` is needed. Called automatically by
944    /// [`dfe_groups::AppMetrics::new()`] if the `metrics-dfe` feature is enabled.
945    pub fn set_build_info(&self, version: &str, commit: &str) {
946        self.registry.set_build_info(version, commit);
947    }
948
949    /// Set operational use cases for a metric (by full prefixed name).
950    ///
951    /// No-op if the metric is not found in the registry.
952    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
953        self.registry.set_use_cases(metric_name, use_cases);
954    }
955
956    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
957    ///
958    /// No-op if the metric is not found in the registry.
959    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
960        self.registry.set_dashboard_hint(metric_name, hint);
961    }
962
963    /// Get a cloneable handle to the metric registry.
964    ///
965    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
966    /// Call before starting the server, consistent with the `render_handle()` pattern.
967    #[must_use]
968    pub fn registry(&self) -> MetricRegistry {
969        self.registry.clone()
970    }
971
972    /// Get the namespace prefix (e.g. `dfe_loader`).
973    ///
974    /// Used by [`dfe_groups`] metric structs to build labelled metric keys.
975    #[must_use]
976    pub fn namespace(&self) -> &str {
977        &self.config.namespace
978    }
979
980    /// Get prefixed metric name.
981    fn prefixed_key(&self, name: &str) -> String {
982        if self.config.namespace.is_empty() {
983            name.to_string()
984        } else {
985            format!("{}_{}", self.config.namespace, name)
986        }
987    }
988}
989
990/// Run the metrics HTTP server.
991#[cfg(feature = "metrics")]
992#[allow(clippy::too_many_arguments)]
993async fn run_server(
994    listener: TcpListener,
995    handle: PrometheusHandle,
996    registry: MetricRegistry,
997    mut shutdown_rx: oneshot::Receiver<()>,
998    update_interval: Duration,
999    process_metrics: Option<ProcessMetrics>,
1000    container_metrics: Option<ContainerMetrics>,
1001    readiness_fn: Option<ReadinessFn>,
1002    started_flag: Arc<std::sync::atomic::AtomicBool>,
1003) {
1004    let mut update_interval = tokio::time::interval(update_interval);
1005
1006    loop {
1007        tokio::select! {
1008            _ = &mut shutdown_rx => {
1009                break;
1010            }
1011            _ = update_interval.tick() => {
1012                if let Some(ref pm) = process_metrics {
1013                    pm.update();
1014                }
1015                if let Some(ref cm) = container_metrics {
1016                    cm.update();
1017                }
1018            }
1019            result = listener.accept() => {
1020                if let Ok((stream, _)) = result {
1021                    let handle = handle.clone();
1022                    let registry = registry.clone();
1023                    let readiness_fn = readiness_fn.clone();
1024                    let sf = Arc::clone(&started_flag);
1025                    tokio::spawn(async move {
1026                        handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1027                    });
1028                }
1029            }
1030        }
1031    }
1032}
1033
1034/// Handle a single HTTP connection.
1035///
1036/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
1037/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
1038#[cfg(feature = "metrics")]
1039async fn handle_connection(
1040    mut stream: tokio::net::TcpStream,
1041    handle: PrometheusHandle,
1042    registry: MetricRegistry,
1043    readiness_fn: Option<ReadinessFn>,
1044    started_flag: &std::sync::atomic::AtomicBool,
1045) {
1046    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1047
1048    let mut reader = BufReader::new(&mut stream);
1049    let mut request_line = String::new();
1050
1051    if reader.read_line(&mut request_line).await.is_err() {
1052        return;
1053    }
1054
1055    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
1056    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1057        (
1058            "200 OK",
1059            "application/json",
1060            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
1061        )
1062    } else if request_line.starts_with("GET /metrics") {
1063        ("200 OK", "text/plain; charset=utf-8", handle.render())
1064    } else if request_line.starts_with("GET /startupz")
1065        || request_line.starts_with("GET /health/startup")
1066    {
1067        if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1068            (
1069                "200 OK",
1070                "application/json",
1071                r#"{"status":"started"}"#.to_string(),
1072            )
1073        } else {
1074            (
1075                "503 Service Unavailable",
1076                "application/json",
1077                r#"{"status":"starting"}"#.to_string(),
1078            )
1079        }
1080    } else if request_line.starts_with("GET /healthz")
1081        || request_line.starts_with("GET /health/live")
1082    {
1083        (
1084            "200 OK",
1085            "application/json",
1086            r#"{"status":"alive"}"#.to_string(),
1087        )
1088    } else if request_line.starts_with("GET /readyz")
1089        || request_line.starts_with("GET /health/ready")
1090    {
1091        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1092
1093        #[cfg(feature = "health")]
1094        let registry_ready = crate::health::HealthRegistry::is_ready();
1095        #[cfg(not(feature = "health"))]
1096        let registry_ready = true;
1097
1098        let ready = callback_ready && registry_ready;
1099        if ready {
1100            (
1101                "200 OK",
1102                "application/json",
1103                r#"{"status":"ready"}"#.to_string(),
1104            )
1105        } else {
1106            (
1107                "503 Service Unavailable",
1108                "application/json",
1109                r#"{"status":"not_ready"}"#.to_string(),
1110            )
1111        }
1112    } else {
1113        (
1114            "404 Not Found",
1115            "text/plain; charset=utf-8",
1116            "Not Found".to_string(),
1117        )
1118    };
1119
1120    let response = format!(
1121        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1122        body.len()
1123    );
1124
1125    let _ = stream.write_all(response.as_bytes()).await;
1126}
1127
1128/// Readiness response helper for axum endpoints.
1129///
1130/// Checks the caller-supplied readiness callback AND (when the `health`
1131/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1132/// Both must be true for a 200 response.
1133#[cfg(all(feature = "metrics", feature = "http-server"))]
1134fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1135    use axum::response::IntoResponse;
1136
1137    let callback_ready = rf.as_ref().is_none_or(|f| f());
1138
1139    #[cfg(feature = "health")]
1140    let registry_ready = crate::health::HealthRegistry::is_ready();
1141    #[cfg(not(feature = "health"))]
1142    let registry_ready = true;
1143
1144    let ready = callback_ready && registry_ready;
1145    if ready {
1146        (
1147            [(axum::http::header::CONTENT_TYPE, "application/json")],
1148            r#"{"status":"ready"}"#,
1149        )
1150            .into_response()
1151    } else {
1152        (
1153            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1154            [(axum::http::header::CONTENT_TYPE, "application/json")],
1155            r#"{"status":"not_ready"}"#,
1156        )
1157            .into_response()
1158    }
1159}
1160
1161/// Run the axum-based metrics HTTP server with custom routes.
1162#[cfg(all(feature = "metrics", feature = "http-server"))]
1163async fn run_axum_server(
1164    listener: TcpListener,
1165    app: axum::Router,
1166    shutdown_rx: oneshot::Receiver<()>,
1167    update_interval: Duration,
1168    process_metrics: Option<ProcessMetrics>,
1169    container_metrics: Option<ContainerMetrics>,
1170) {
1171    let mut interval = tokio::time::interval(update_interval);
1172
1173    // Spawn the metrics update loop
1174    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1175    tokio::spawn(async move {
1176        loop {
1177            tokio::select! {
1178                _ = &mut update_stop_rx => break,
1179                _ = interval.tick() => {
1180                    if let Some(ref pm) = process_metrics {
1181                        pm.update();
1182                    }
1183                    if let Some(ref cm) = container_metrics {
1184                        cm.update();
1185                    }
1186                }
1187            }
1188        }
1189    });
1190
1191    // Run axum server with graceful shutdown
1192    axum::serve(listener, app)
1193        .with_graceful_shutdown(async move {
1194            let _ = shutdown_rx.await;
1195        })
1196        .await
1197        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1198
1199    let _ = update_stop_tx.send(());
1200}
1201
1202/// Standard latency histogram buckets.
1203#[must_use]
1204pub fn latency_buckets() -> Vec<f64> {
1205    vec![
1206        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1207    ]
1208}
1209
1210/// Standard size histogram buckets.
1211#[must_use]
1212pub fn size_buckets() -> Vec<f64> {
1213    vec![
1214        100.0,
1215        1_000.0,
1216        10_000.0,
1217        100_000.0,
1218        1_000_000.0,
1219        10_000_000.0,
1220    ]
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225    use super::*;
1226
1227    #[test]
1228    fn test_metrics_config_default() {
1229        let config = MetricsConfig::default();
1230        assert!(config.namespace.is_empty());
1231        assert!(config.enable_process_metrics);
1232        assert!(config.enable_container_metrics);
1233        assert_eq!(config.update_interval, Duration::from_secs(15));
1234    }
1235
1236    #[test]
1237    fn test_latency_buckets() {
1238        let buckets = latency_buckets();
1239        assert_eq!(buckets.len(), 12);
1240        assert!(buckets[0] < buckets[11]);
1241    }
1242
1243    #[test]
1244    fn test_size_buckets() {
1245        let buckets = size_buckets();
1246        assert_eq!(buckets.len(), 6);
1247        assert!(buckets[0] < buckets[5]);
1248    }
1249}