Skip to main content

hyperi_rustlib/metrics/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/mod.rs
3// Purpose:   Prometheus metrics with process and container awareness
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Metrics with Prometheus and/or OpenTelemetry backends.
10//!
11//! - **`metrics` only:** Prometheus scrape endpoint via `/metrics`
12//! - **`otel-metrics` only:** OTLP push to OTel-compatible backends
13//! - **Both:** Fanout recorder sends to both Prometheus AND OTel
14//!
15//! Counter/Gauge/Histogram types, automatic process + cgroup container metrics,
16//! built-in HTTP server, readiness/startup probes, optional scaling-pressure and
17//! memory-guard endpoints, custom routes via
18//! `MetricsManager::start_server_with_routes` (`http-server` feature).
19//!
20//! ## Basic Example
21//!
22//! ```rust,no_run
23//! use hyperi_rustlib::metrics::{MetricsManager, MetricsConfig};
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     let mut manager = MetricsManager::new("myapp");
28//!
29//!     // Create metrics
30//!     let requests = manager.counter("requests_total", "Total requests");
31//!     let active = manager.gauge("active_connections", "Active connections");
32//!     let latency = manager.histogram("request_duration_seconds", "Request latency");
33//!
34//!     // Start metrics server (simple -- built-in endpoints only)
35//!     manager.start_server("0.0.0.0:9090").await.unwrap();
36//!
37//!     // Record metrics
38//!     requests.increment(1);
39//!     active.set(42.0);
40//!     latency.record(0.123);
41//! }
42//! ```
43//!
44//! ## Advanced Example (with custom routes, scaling, memory)
45//!
46//! Requires features: `metrics`, `http-server`, `scaling`, `memory`.
47//!
48//! ```rust,ignore
49//! use std::sync::Arc;
50//! use hyperi_rustlib::metrics::MetricsManager;
51//! use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig};
52//! use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
53//! use axum::{Router, routing::post};
54//!
55//! let mut mgr = MetricsManager::new("myapp");
56//!
57//! // Readiness callback
58//! mgr.set_readiness_check(|| true);
59//!
60//! // Attach scaling pressure (adds /scaling/pressure endpoint)
61//! let scaling = Arc::new(ScalingPressure::new(ScalingPressureConfig::default(), vec![]));
62//! mgr.set_scaling_pressure(scaling);
63//!
64//! // Attach memory guard (adds /memory/pressure endpoint)
65//! let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig::default()));
66//! mgr.set_memory_guard(guard);
67//!
68//! // Service-specific routes
69//! let custom = Router::new()
70//!     .route("/test", post(|| async { "ok" }));
71//!
72//! // Start with everything merged into one server
73//! mgr.start_server_with_routes("0.0.0.0:9090", custom).await.unwrap();
74//! ```
75
76mod container;
77pub mod dfe;
78pub mod labels;
79pub mod manifest;
80mod process;
81
82pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
83
84#[cfg(feature = "otel-metrics")]
85pub(crate) mod otel;
86#[cfg(feature = "otel-metrics")]
87pub mod otel_types;
88
89use std::net::SocketAddr;
90use std::sync::Arc;
91use std::time::Duration;
92
93use metrics::{Counter, Gauge, Histogram, Unit};
94use thiserror::Error;
95use tokio::net::TcpListener;
96use tokio::sync::oneshot;
97
98/// Readiness check callback type.
99pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
100
101#[cfg(feature = "metrics")]
102use metrics_exporter_prometheus::PrometheusHandle;
103
104pub use container::ContainerMetrics;
105pub use dfe::DfeMetrics;
106#[cfg(feature = "metrics-dfe")]
107pub mod dfe_groups;
108pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
109pub use process::ProcessMetrics;
110
111// Internal samplers reused by the scaling-pressure engine tick (cgroup CPU
112// limit + this process's cumulative CPU seconds). Gated on `scaling` so they
113// are never dead code when the engine is absent.
114#[cfg(all(feature = "scaling", feature = "expression"))]
115pub(crate) use container::cpu_limit_cores;
116#[cfg(all(feature = "scaling", feature = "expression"))]
117pub(crate) use process::cumulative_cpu_seconds;
118
119#[cfg(feature = "otel-metrics")]
120pub use otel_types::{OtelMetricsConfig, OtelProtocol};
121
122/// Cloneable handle for rendering Prometheus metrics text.
123///
124/// Obtained via [`MetricsManager::render_handle`]. Safe to clone into
125/// `axum` route handlers or share across tasks.
126#[cfg(feature = "metrics")]
127#[derive(Clone)]
128pub struct RenderHandle(PrometheusHandle);
129
130#[cfg(feature = "metrics")]
131impl RenderHandle {
132    /// Render current metrics in Prometheus text format.
133    #[must_use]
134    pub fn render(&self) -> String {
135        self.0.render()
136    }
137}
138
139/// Metrics errors.
140#[derive(Debug, Error)]
141pub enum MetricsError {
142    /// Failed to build metrics exporter.
143    #[error("failed to build metrics exporter: {0}")]
144    BuildError(String),
145
146    /// Failed to start metrics server.
147    #[error("failed to start metrics server: {0}")]
148    ServerError(String),
149
150    /// Server already running.
151    #[error("metrics server already running")]
152    AlreadyRunning,
153
154    /// Server not running.
155    #[error("metrics server not running")]
156    NotRunning,
157}
158
159/// Metrics configuration.
160#[derive(Debug, Clone)]
161pub struct MetricsConfig {
162    /// Metric namespace prefix.
163    pub namespace: String,
164    /// Enable process metrics collection.
165    pub enable_process_metrics: bool,
166    /// Enable container metrics collection.
167    pub enable_container_metrics: bool,
168    /// Update interval for auto-collected metrics.
169    pub update_interval: Duration,
170    /// OTel-specific configuration (only used when `otel-metrics` feature is enabled).
171    #[cfg(feature = "otel-metrics")]
172    pub otel: OtelMetricsConfig,
173}
174
175impl Default for MetricsConfig {
176    fn default() -> Self {
177        Self {
178            namespace: String::new(),
179            enable_process_metrics: true,
180            enable_container_metrics: true,
181            update_interval: Duration::from_secs(15),
182            #[cfg(feature = "otel-metrics")]
183            otel: OtelMetricsConfig::default(),
184        }
185    }
186}
187
188/// Intermediate struct to pass recorder setup results across cfg boundaries.
189struct RecorderSetup {
190    #[cfg(feature = "metrics")]
191    prom_handle: Option<PrometheusHandle>,
192    #[cfg(feature = "otel-metrics")]
193    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
194}
195
196/// Install the metrics recorder(s) for the enabled features. When both
197/// `metrics` and `otel-metrics` are on, composes both into one global
198/// recorder via `metrics-util` `FanoutBuilder`.
199#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201    // --- Prometheus only (no OTel) ---
202    #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203    {
204        let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205        let handle = recorder.handle();
206        if let Err(e) = metrics::set_global_recorder(recorder) {
207            tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
208        }
209        RecorderSetup {
210            prom_handle: Some(handle),
211        }
212    }
213
214    // --- OTel only (no Prometheus) ---
215    #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
216    {
217        match otel::build_otel_recorder(&config.namespace, &config.otel) {
218            Ok((otel_recorder, provider)) => {
219                opentelemetry::global::set_meter_provider(provider.clone());
220                if let Err(e) = metrics::set_global_recorder(otel_recorder) {
221                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
222                }
223                RecorderSetup {
224                    otel_provider: Some(provider),
225                }
226            }
227            Err(e) => {
228                tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
229                RecorderSetup {
230                    otel_provider: None,
231                }
232            }
233        }
234    }
235
236    // --- Both Prometheus + OTel (Fanout) ---
237    #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
238    {
239        // Build Prometheus recorder (without installing globally)
240        let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
241        let prom_handle = prom_recorder.handle();
242
243        // Build OTel recorder
244        match otel::build_otel_recorder(&config.namespace, &config.otel) {
245            Ok((otel_recorder, provider)) => {
246                opentelemetry::global::set_meter_provider(provider.clone());
247
248                // Compose via Fanout: both recorders receive every measurement
249                let fanout = metrics_util::layers::FanoutBuilder::default()
250                    .add_recorder(prom_recorder)
251                    .add_recorder(otel_recorder)
252                    .build();
253
254                if let Err(e) = metrics::set_global_recorder(fanout) {
255                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
256                }
257
258                RecorderSetup {
259                    prom_handle: Some(prom_handle),
260                    otel_provider: Some(provider),
261                }
262            }
263            Err(e) => {
264                // Fallback: just Prometheus if OTel fails
265                tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
266                if let Err(e) = metrics::set_global_recorder(prom_recorder) {
267                    tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
268                }
269                RecorderSetup {
270                    prom_handle: Some(prom_handle),
271                    otel_provider: None,
272                }
273            }
274        }
275    }
276}
277
278/// Metrics manager handling Prometheus and/or OTel exposition.
279pub struct MetricsManager {
280    #[cfg(feature = "metrics")]
281    handle: Option<PrometheusHandle>,
282    config: MetricsConfig,
283    shutdown_tx: Option<oneshot::Sender<()>>,
284    process_metrics: Option<ProcessMetrics>,
285    container_metrics: Option<ContainerMetrics>,
286    readiness_fn: Option<ReadinessFn>,
287    started: Arc<std::sync::atomic::AtomicBool>,
288    registry: MetricRegistry,
289    #[cfg(all(feature = "metrics", feature = "scaling"))]
290    scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
291    #[cfg(all(feature = "metrics", feature = "memory"))]
292    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
293    #[cfg(feature = "otel-metrics")]
294    otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
295}
296
297impl MetricsManager {
298    /// Create a new metrics manager with the given namespace.
299    #[must_use]
300    pub fn new(namespace: &str) -> Self {
301        Self::with_config(MetricsConfig {
302            namespace: namespace.to_string(),
303            ..Default::default()
304        })
305    }
306
307    /// Test constructor that skips the global recorder install.
308    ///
309    /// The global recorder installs once per process; parallel `new()` calls
310    /// race and all but the first panic with `SetRecorderError`. Skipping it
311    /// makes `metrics!` macros no-ops (documented behaviour with no recorder)
312    /// while registry tracking, descriptor push, and namespace logic -- what
313    /// tests actually verify -- still work.
314    #[cfg(test)]
315    pub(crate) fn new_for_test(namespace: &str) -> Self {
316        let config = MetricsConfig {
317            namespace: namespace.to_string(),
318            enable_process_metrics: false,
319            enable_container_metrics: false,
320            ..Default::default()
321        };
322
323        let registry = MetricRegistry::new(&config.namespace);
324
325        Self {
326            #[cfg(feature = "metrics")]
327            handle: None,
328            registry,
329            config,
330            shutdown_tx: None,
331            process_metrics: None,
332            container_metrics: None,
333            readiness_fn: None,
334            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
335            #[cfg(all(feature = "metrics", feature = "scaling"))]
336            scaling_pressure: None,
337            #[cfg(all(feature = "metrics", feature = "memory"))]
338            memory_guard: None,
339            #[cfg(feature = "otel-metrics")]
340            otel_provider: None,
341        }
342    }
343
344    /// Create a metrics manager with custom configuration.
345    ///
346    /// Installs the appropriate recorder(s) based on enabled features:
347    /// - `metrics` only: Prometheus recorder
348    /// - `otel-metrics` only: OTel recorder (OTLP push)
349    /// - Both: Fanout recorder (Prometheus scrape + OTel OTLP push)
350    #[must_use]
351    pub fn with_config(config: MetricsConfig) -> Self {
352        let setup = install_recorders(&config);
353
354        let process_metrics = if config.enable_process_metrics {
355            Some(ProcessMetrics::new(&config.namespace))
356        } else {
357            None
358        };
359
360        let container_metrics = if config.enable_container_metrics {
361            Some(ContainerMetrics::new(&config.namespace))
362        } else {
363            None
364        };
365
366        let registry = MetricRegistry::new(&config.namespace);
367
368        Self {
369            #[cfg(feature = "metrics")]
370            handle: setup.prom_handle,
371            registry,
372            config,
373            shutdown_tx: None,
374            process_metrics,
375            container_metrics,
376            readiness_fn: None,
377            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
378            #[cfg(all(feature = "metrics", feature = "scaling"))]
379            scaling_pressure: None,
380            #[cfg(all(feature = "metrics", feature = "memory"))]
381            memory_guard: None,
382            #[cfg(feature = "otel-metrics")]
383            otel_provider: setup.otel_provider,
384        }
385    }
386
387    /// Create a counter metric.
388    ///
389    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
390    /// with empty labels and `group = "custom"`.
391    #[must_use]
392    pub fn counter(&self, name: &str, description: &str) -> Counter {
393        let key = self.prefixed_key(name);
394        let desc = description.to_string();
395        metrics::describe_counter!(key.clone(), desc.clone());
396        self.registry.push(MetricDescriptor {
397            name: key.clone(),
398            metric_type: MetricType::Counter,
399            description: desc,
400            unit: String::new(),
401            labels: vec![],
402            group: "custom".into(),
403            buckets: None,
404            use_cases: vec![],
405            dashboard_hint: None,
406        });
407        metrics::counter!(key)
408    }
409
410    /// Create a counter with label keys and group metadata for the manifest.
411    ///
412    /// The `labels` parameter declares label **key names** for the manifest.
413    /// The returned `Counter` is label-free -- apply label values at recording
414    /// time via `metrics::counter!(key, "label" => value)`.
415    #[must_use]
416    pub fn counter_with_labels(
417        &self,
418        name: &str,
419        description: &str,
420        labels: &[&str],
421        group: &str,
422    ) -> Counter {
423        let key = self.prefixed_key(name);
424        let desc = description.to_string();
425        metrics::describe_counter!(key.clone(), desc.clone());
426        self.registry.push(MetricDescriptor {
427            name: key.clone(),
428            metric_type: MetricType::Counter,
429            description: desc,
430            unit: String::new(),
431            labels: labels.iter().map(|s| (*s).to_string()).collect(),
432            group: group.into(),
433            buckets: None,
434            use_cases: vec![],
435            dashboard_hint: None,
436        });
437        metrics::counter!(key)
438    }
439
440    /// Create a gauge metric.
441    ///
442    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
443    /// with empty labels and `group = "custom"`.
444    #[must_use]
445    pub fn gauge(&self, name: &str, description: &str) -> Gauge {
446        let key = self.prefixed_key(name);
447        let desc = description.to_string();
448        metrics::describe_gauge!(key.clone(), desc.clone());
449        self.registry.push(MetricDescriptor {
450            name: key.clone(),
451            metric_type: MetricType::Gauge,
452            description: desc,
453            unit: String::new(),
454            labels: vec![],
455            group: "custom".into(),
456            buckets: None,
457            use_cases: vec![],
458            dashboard_hint: None,
459        });
460        metrics::gauge!(key)
461    }
462
463    /// Create a gauge with label keys and group metadata for the manifest.
464    #[must_use]
465    pub fn gauge_with_labels(
466        &self,
467        name: &str,
468        description: &str,
469        labels: &[&str],
470        group: &str,
471    ) -> Gauge {
472        let key = self.prefixed_key(name);
473        let desc = description.to_string();
474        metrics::describe_gauge!(key.clone(), desc.clone());
475        self.registry.push(MetricDescriptor {
476            name: key.clone(),
477            metric_type: MetricType::Gauge,
478            description: desc,
479            unit: String::new(),
480            labels: labels.iter().map(|s| (*s).to_string()).collect(),
481            group: group.into(),
482            buckets: None,
483            use_cases: vec![],
484            dashboard_hint: None,
485        });
486        metrics::gauge!(key)
487    }
488
489    /// Create a histogram metric with default buckets.
490    ///
491    /// Automatically registers a [`MetricDescriptor`] in the manifest registry
492    /// with empty labels and `group = "custom"`.
493    #[must_use]
494    pub fn histogram(&self, name: &str, description: &str) -> Histogram {
495        let key = self.prefixed_key(name);
496        let desc = description.to_string();
497        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
498        self.registry.push(MetricDescriptor {
499            name: key.clone(),
500            metric_type: MetricType::Histogram,
501            description: desc,
502            unit: "seconds".into(),
503            labels: vec![],
504            group: "custom".into(),
505            buckets: None,
506            use_cases: vec![],
507            dashboard_hint: None,
508        });
509        metrics::histogram!(key)
510    }
511
512    /// Create a histogram with label keys, group, and optional buckets for the manifest.
513    #[must_use]
514    pub fn histogram_with_labels(
515        &self,
516        name: &str,
517        description: &str,
518        labels: &[&str],
519        group: &str,
520        buckets: Option<&[f64]>,
521    ) -> Histogram {
522        let key = self.prefixed_key(name);
523        let desc = description.to_string();
524        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
525        self.registry.push(MetricDescriptor {
526            name: key.clone(),
527            metric_type: MetricType::Histogram,
528            description: desc,
529            unit: "seconds".into(),
530            labels: labels.iter().map(|s| (*s).to_string()).collect(),
531            group: group.into(),
532            buckets: buckets.map(|b| b.to_vec()),
533            use_cases: vec![],
534            dashboard_hint: None,
535        });
536        metrics::histogram!(key)
537    }
538
539    /// Create a histogram metric with custom buckets.
540    ///
541    /// **Note:** The `buckets` parameter is captured in the manifest registry
542    /// but currently ignored by the `metrics` crate at runtime (buckets are set
543    /// globally at recorder installation time).
544    #[must_use]
545    pub fn histogram_with_buckets(
546        &self,
547        name: &str,
548        description: &str,
549        buckets: &[f64],
550    ) -> Histogram {
551        let key = self.prefixed_key(name);
552        let desc = description.to_string();
553        metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
554        self.registry.push(MetricDescriptor {
555            name: key.clone(),
556            metric_type: MetricType::Histogram,
557            description: desc,
558            unit: "seconds".into(),
559            labels: vec![],
560            group: "custom".into(),
561            buckets: Some(buckets.to_vec()),
562            use_cases: vec![],
563            dashboard_hint: None,
564        });
565        metrics::histogram!(key)
566    }
567
568    /// Create a histogram with an explicit unit.
569    ///
570    /// Use this for non-time distributions (byte sizes, item counts, ...) so the
571    /// recorded unit matches the metric name. The plain
572    /// [`histogram`](Self::histogram) defaults to seconds (the common latency
573    /// case); reaching for that on a count/size metric is the seconds-mislabel
574    /// the metrics audit flagged.
575    #[must_use]
576    pub fn histogram_with_unit(&self, name: &str, description: &str, unit: Unit) -> Histogram {
577        let key = self.prefixed_key(name);
578        let desc = description.to_string();
579        metrics::describe_histogram!(key.clone(), unit, desc.clone());
580        self.registry.push(MetricDescriptor {
581            name: key.clone(),
582            metric_type: MetricType::Histogram,
583            description: desc,
584            unit: unit_label(unit).to_string(),
585            labels: vec![],
586            group: "custom".into(),
587            buckets: None,
588            use_cases: vec![],
589            dashboard_hint: None,
590        });
591        metrics::histogram!(key)
592    }
593
594    /// Create a dimensionless count-distribution histogram (e.g. items per chunk).
595    ///
596    /// Convenience for [`histogram_with_unit`](Self::histogram_with_unit) with
597    /// [`Unit::Count`] -- avoids the seconds mislabel the plain
598    /// [`histogram`](Self::histogram) would apply.
599    #[must_use]
600    pub fn histogram_count(&self, name: &str, description: &str) -> Histogram {
601        self.histogram_with_unit(name, description, Unit::Count)
602    }
603
604    /// Get the Prometheus metrics output.
605    ///
606    /// Returns the rendered Prometheus text format. Only available when
607    /// the `metrics` feature is enabled.
608    #[cfg(feature = "metrics")]
609    #[must_use]
610    pub fn render(&self) -> String {
611        self.handle
612            .as_ref()
613            .map_or_else(String::new, PrometheusHandle::render)
614    }
615
616    /// Cloneable render handle for route handlers. `Send + Sync + Clone` --
617    /// safe to move into `axum` handlers or share across tasks.
618    ///
619    /// Returns `None` if no Prometheus recorder is installed.
620    ///
621    /// # Example
622    ///
623    /// ```rust,ignore
624    /// let mut mgr = MetricsManager::new("myapp");
625    /// let render = mgr.render_handle().expect("recorder installed");
626    ///
627    /// // Use in axum route
628    /// let route = axum::Router::new().route("/metrics", axum::routing::get(move || {
629    ///     let r = render.clone();
630    ///     async move { r() }
631    /// }));
632    ///
633    /// mgr.start_server_with_routes("0.0.0.0:9090", route).await?;
634    /// ```
635    #[cfg(feature = "metrics")]
636    #[must_use]
637    pub fn render_handle(&self) -> Option<RenderHandle> {
638        self.handle.clone().map(RenderHandle)
639    }
640
641    /// Set a readiness check callback.
642    ///
643    /// When set, `/readyz` and `/health/ready` call this function and return
644    /// 503 Service Unavailable if it returns `false`. Without a callback,
645    /// these endpoints always return 200.
646    pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
647        self.readiness_fn = Some(Arc::new(f));
648    }
649
650    /// Mark the service as started (startup probe passes).
651    ///
652    /// Call this once init is complete. K8s `startupProbe` hits `/startupz`
653    /// which returns 503 until this is called, then 200 thereafter.
654    /// Separate from readiness -- startup has a longer timeout for slow starters.
655    pub fn mark_started(&self) {
656        self.started
657            .store(true, std::sync::atomic::Ordering::Release);
658    }
659
660    /// Get a clone of the started flag (for passing to HTTP handler).
661    pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
662        Arc::clone(&self.started)
663    }
664
665    /// Attach a `ScalingPressure` instance.
666    ///
667    /// When set and using `start_server_with_routes`, a `/scaling/pressure`
668    /// endpoint is automatically added that returns the current pressure value.
669    #[cfg(all(feature = "metrics", feature = "scaling"))]
670    pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
671        self.scaling_pressure = Some(sp);
672    }
673
674    /// Attach a `MemoryGuard` instance.
675    ///
676    /// When set and using `start_server_with_routes`, a `/memory/pressure`
677    /// endpoint is automatically added that returns the current memory status.
678    #[cfg(all(feature = "metrics", feature = "memory"))]
679    pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
680        self.memory_guard = Some(mg);
681    }
682
683    /// Update process and container metrics.
684    pub fn update(&self) {
685        if let Some(ref pm) = self.process_metrics {
686            pm.update();
687        }
688        if let Some(ref cm) = self.container_metrics {
689            cm.update();
690        }
691    }
692
693    /// Start the metrics HTTP server.
694    ///
695    /// Serves `/metrics` (Prometheus), `/healthz`, `/health/live`,
696    /// `/readyz`, `/health/ready` endpoints.
697    ///
698    /// Only available when the `metrics` feature is enabled (for scraping).
699    ///
700    /// # Errors
701    ///
702    /// Returns an error if the server fails to start.
703    #[cfg(feature = "metrics")]
704    pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
705        if self.shutdown_tx.is_some() {
706            return Err(MetricsError::AlreadyRunning);
707        }
708
709        let addr: SocketAddr = addr
710            .parse()
711            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
712
713        let listener = TcpListener::bind(addr)
714            .await
715            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
716
717        let (shutdown_tx, shutdown_rx) = oneshot::channel();
718        self.shutdown_tx = Some(shutdown_tx);
719
720        let handle = self
721            .handle
722            .as_ref()
723            .ok_or_else(|| {
724                MetricsError::ServerError(
725                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
726                )
727            })?
728            .clone();
729        let update_interval = self.config.update_interval;
730        let process_metrics = self.process_metrics.clone();
731        let container_metrics = self.container_metrics.clone();
732        let readiness_fn = self.readiness_fn.clone();
733        let started_flag = self.started_flag();
734
735        let registry = self.registry();
736
737        tokio::spawn(async move {
738            run_server(
739                listener,
740                handle,
741                registry,
742                shutdown_rx,
743                update_interval,
744                process_metrics,
745                container_metrics,
746                readiness_fn,
747                started_flag,
748            )
749            .await;
750        });
751
752        Ok(())
753    }
754
755    /// Start the metrics HTTP server with additional custom routes.
756    ///
757    /// Serves the same built-in endpoints as [`start_server`](Self::start_server):
758    /// `/metrics`, `/healthz`, `/health/live`, `/readyz`, `/health/ready`.
759    ///
760    /// Additionally:
761    /// - If [`set_scaling_pressure`](Self::set_scaling_pressure) was called,
762    ///   adds `/scaling/pressure` returning the current pressure value.
763    /// - If [`set_memory_guard`](Self::set_memory_guard) was called,
764    ///   adds `/memory/pressure` returning memory status JSON.
765    /// - Any routes in `extra_routes` are merged (service-specific endpoints).
766    ///
767    /// Requires both `metrics` and `http-server` features.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the server fails to start.
772    #[cfg(all(feature = "metrics", feature = "http-server"))]
773    pub async fn start_server_with_routes(
774        &mut self,
775        addr: &str,
776        extra_routes: axum::Router,
777    ) -> Result<(), MetricsError> {
778        if self.shutdown_tx.is_some() {
779            return Err(MetricsError::AlreadyRunning);
780        }
781
782        let addr: SocketAddr = addr
783            .parse()
784            .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
785
786        let listener = TcpListener::bind(addr)
787            .await
788            .map_err(|e| MetricsError::ServerError(e.to_string()))?;
789
790        let (shutdown_tx, shutdown_rx) = oneshot::channel();
791        self.shutdown_tx = Some(shutdown_tx);
792
793        let handle = self
794            .handle
795            .as_ref()
796            .ok_or_else(|| {
797                MetricsError::ServerError(
798                    "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
799                )
800            })?
801            .clone();
802        let update_interval = self.config.update_interval;
803        let process_metrics = self.process_metrics.clone();
804        let container_metrics = self.container_metrics.clone();
805        let readiness_fn = self.readiness_fn.clone();
806
807        // Build the axum router with built-in + optional + custom routes
808        let metrics_handle = handle.clone();
809        let readiness_for_live = readiness_fn.clone();
810        let registry_handle = self.registry();
811
812        let mut app = axum::Router::new()
813            .route(
814                "/metrics/manifest",
815                axum::routing::get(move || {
816                    let reg = registry_handle.clone();
817                    async move {
818                        (
819                            [(axum::http::header::CONTENT_TYPE, "application/json")],
820                            serde_json::to_string(&reg.manifest()).unwrap_or_default(),
821                        )
822                    }
823                }),
824            )
825            .route(
826                "/metrics",
827                axum::routing::get(move || {
828                    let h = metrics_handle.clone();
829                    async move { h.render() }
830                }),
831            )
832            .route("/startupz", {
833                let sf = self.started_flag();
834                axum::routing::get(move || {
835                    let started = sf.load(std::sync::atomic::Ordering::Acquire);
836                    async move {
837                        if started {
838                            (
839                                axum::http::StatusCode::OK,
840                                [(axum::http::header::CONTENT_TYPE, "application/json")],
841                                r#"{"status":"started"}"#,
842                            )
843                        } else {
844                            (
845                                axum::http::StatusCode::SERVICE_UNAVAILABLE,
846                                [(axum::http::header::CONTENT_TYPE, "application/json")],
847                                r#"{"status":"starting"}"#,
848                            )
849                        }
850                    }
851                })
852            })
853            .route(
854                "/healthz",
855                axum::routing::get(|| async {
856                    (
857                        [(axum::http::header::CONTENT_TYPE, "application/json")],
858                        r#"{"status":"alive"}"#,
859                    )
860                }),
861            )
862            .route(
863                "/health/live",
864                axum::routing::get(|| async {
865                    (
866                        [(axum::http::header::CONTENT_TYPE, "application/json")],
867                        r#"{"status":"alive"}"#,
868                    )
869                }),
870            )
871            .route(
872                "/readyz",
873                axum::routing::get(move || {
874                    let rf = readiness_fn.clone();
875                    async move { readiness_response(rf) }
876                }),
877            )
878            .route(
879                "/health/ready",
880                axum::routing::get(move || {
881                    let rf = readiness_for_live.clone();
882                    async move { readiness_response(rf) }
883                }),
884            );
885
886        // Add scaling pressure endpoint if configured
887        #[cfg(feature = "scaling")]
888        if let Some(ref sp) = self.scaling_pressure {
889            let sp = sp.clone();
890            app = app.route(
891                "/scaling/pressure",
892                axum::routing::get(move || {
893                    let s = sp.clone();
894                    async move { format!("{:.2}", s.calculate()) }
895                }),
896            );
897        }
898
899        // Add memory pressure endpoint if configured
900        #[cfg(feature = "memory")]
901        if let Some(ref mg) = self.memory_guard {
902            let mg = mg.clone();
903            app = app.route(
904                "/memory/pressure",
905                axum::routing::get(move || {
906                    let m = mg.clone();
907                    async move {
908                        (
909                            [(axum::http::header::CONTENT_TYPE, "application/json")],
910                            format!(
911                                r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
912                                m.under_pressure(),
913                                m.pressure_ratio(),
914                                m.current_bytes(),
915                                m.limit_bytes()
916                            ),
917                        )
918                    }
919                }),
920            );
921        }
922
923        // Merge service-specific routes
924        app = app.merge(extra_routes);
925
926        tokio::spawn(async move {
927            run_axum_server(
928                listener,
929                app,
930                shutdown_rx,
931                update_interval,
932                process_metrics,
933                container_metrics,
934            )
935            .await;
936        });
937
938        Ok(())
939    }
940
941    /// Stop the metrics server.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the server is not running.
946    pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
947        if let Some(tx) = self.shutdown_tx.take() {
948            let _ = tx.send(());
949            Ok(())
950        } else {
951            Err(MetricsError::NotRunning)
952        }
953    }
954
955    /// Gracefully shut down the OTel provider (flushes pending exports).
956    ///
957    /// Call this before application exit to ensure all metrics are exported.
958    #[cfg(feature = "otel-metrics")]
959    pub fn shutdown_otel(&mut self) {
960        if let Some(provider) = self.otel_provider.take()
961            && let Err(e) = provider.shutdown()
962        {
963            tracing::warn!(error = %e, "OTel provider shutdown error");
964        }
965    }
966
967    /// Set application version and git commit for the manifest.
968    ///
969    /// Uses interior mutability (writes through the registry's `Arc<RwLock>`),
970    /// so only `&self` is needed. Called automatically by
971    /// `dfe_groups::AppMetrics::new()` if the `metrics-dfe` feature is enabled.
972    pub fn set_build_info(&self, version: &str, commit: &str) {
973        self.registry.set_build_info(version, commit);
974    }
975
976    /// Set operational use cases for a metric (by full prefixed name).
977    ///
978    /// No-op if the metric is not found in the registry.
979    pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
980        self.registry.set_use_cases(metric_name, use_cases);
981    }
982
983    /// Set the suggested Grafana panel type for a metric (by full prefixed name).
984    ///
985    /// No-op if the metric is not found in the registry.
986    pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
987        self.registry.set_dashboard_hint(metric_name, hint);
988    }
989
990    /// Get a cloneable handle to the metric registry.
991    ///
992    /// Use this to pass into route handlers. The handle is `Clone + Send + Sync`.
993    /// Call before starting the server, consistent with the `render_handle()` pattern.
994    #[must_use]
995    pub fn registry(&self) -> MetricRegistry {
996        self.registry.clone()
997    }
998
999    /// Get the namespace prefix (e.g. `dfe_loader`).
1000    ///
1001    /// Used by `dfe_groups` metric structs (`metrics-dfe` feature) to build
1002    /// labelled metric keys.
1003    #[must_use]
1004    pub fn namespace(&self) -> &str {
1005        &self.config.namespace
1006    }
1007
1008    /// Get prefixed metric name.
1009    fn prefixed_key(&self, name: &str) -> String {
1010        if self.config.namespace.is_empty() {
1011            name.to_string()
1012        } else {
1013            format!("{}_{}", self.config.namespace, name)
1014        }
1015    }
1016}
1017
1018/// Run the metrics HTTP server.
1019#[cfg(feature = "metrics")]
1020#[allow(clippy::too_many_arguments)]
1021async fn run_server(
1022    listener: TcpListener,
1023    handle: PrometheusHandle,
1024    registry: MetricRegistry,
1025    mut shutdown_rx: oneshot::Receiver<()>,
1026    update_interval: Duration,
1027    process_metrics: Option<ProcessMetrics>,
1028    container_metrics: Option<ContainerMetrics>,
1029    readiness_fn: Option<ReadinessFn>,
1030    started_flag: Arc<std::sync::atomic::AtomicBool>,
1031) {
1032    let mut update_interval = tokio::time::interval(update_interval);
1033
1034    loop {
1035        tokio::select! {
1036            _ = &mut shutdown_rx => {
1037                break;
1038            }
1039            _ = update_interval.tick() => {
1040                if let Some(ref pm) = process_metrics {
1041                    pm.update();
1042                }
1043                if let Some(ref cm) = container_metrics {
1044                    cm.update();
1045                }
1046            }
1047            result = listener.accept() => {
1048                if let Ok((stream, _)) = result {
1049                    let handle = handle.clone();
1050                    let registry = registry.clone();
1051                    let readiness_fn = readiness_fn.clone();
1052                    let sf = Arc::clone(&started_flag);
1053                    tokio::spawn(async move {
1054                        handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1055                    });
1056                }
1057            }
1058        }
1059    }
1060}
1061
1062/// Handle a single HTTP connection.
1063///
1064/// **Path ordering:** `/metrics/manifest` MUST be checked BEFORE `/metrics`
1065/// because `"GET /metrics/manifest"` also matches `starts_with("GET /metrics")`.
1066#[cfg(feature = "metrics")]
1067async fn handle_connection(
1068    mut stream: tokio::net::TcpStream,
1069    handle: PrometheusHandle,
1070    registry: MetricRegistry,
1071    readiness_fn: Option<ReadinessFn>,
1072    started_flag: &std::sync::atomic::AtomicBool,
1073) {
1074    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1075
1076    let mut reader = BufReader::new(&mut stream);
1077    let mut request_line = String::new();
1078
1079    if reader.read_line(&mut request_line).await.is_err() {
1080        return;
1081    }
1082
1083    // IMPORTANT: /metrics/manifest MUST come before /metrics (prefix match ordering)
1084    let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1085        (
1086            "200 OK",
1087            "application/json",
1088            serde_json::to_string(&registry.manifest()).unwrap_or_default(),
1089        )
1090    } else if request_line.starts_with("GET /metrics") {
1091        ("200 OK", "text/plain; charset=utf-8", handle.render())
1092    } else if request_line.starts_with("GET /startupz")
1093        || request_line.starts_with("GET /health/startup")
1094    {
1095        if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1096            (
1097                "200 OK",
1098                "application/json",
1099                r#"{"status":"started"}"#.to_string(),
1100            )
1101        } else {
1102            (
1103                "503 Service Unavailable",
1104                "application/json",
1105                r#"{"status":"starting"}"#.to_string(),
1106            )
1107        }
1108    } else if request_line.starts_with("GET /healthz")
1109        || request_line.starts_with("GET /health/live")
1110    {
1111        (
1112            "200 OK",
1113            "application/json",
1114            r#"{"status":"alive"}"#.to_string(),
1115        )
1116    } else if request_line.starts_with("GET /readyz")
1117        || request_line.starts_with("GET /health/ready")
1118    {
1119        let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1120
1121        #[cfg(feature = "health")]
1122        let registry_ready = crate::health::HealthRegistry::is_ready();
1123        #[cfg(not(feature = "health"))]
1124        let registry_ready = true;
1125
1126        let ready = callback_ready && registry_ready;
1127        if ready {
1128            (
1129                "200 OK",
1130                "application/json",
1131                r#"{"status":"ready"}"#.to_string(),
1132            )
1133        } else {
1134            (
1135                "503 Service Unavailable",
1136                "application/json",
1137                r#"{"status":"not_ready"}"#.to_string(),
1138            )
1139        }
1140    } else {
1141        (
1142            "404 Not Found",
1143            "text/plain; charset=utf-8",
1144            "Not Found".to_string(),
1145        )
1146    };
1147
1148    let response = format!(
1149        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1150        body.len()
1151    );
1152
1153    let _ = stream.write_all(response.as_bytes()).await;
1154}
1155
1156/// Readiness response helper for axum endpoints.
1157///
1158/// Checks the caller-supplied readiness callback AND (when the `health`
1159/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
1160/// Both must be true for a 200 response.
1161#[cfg(all(feature = "metrics", feature = "http-server"))]
1162fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1163    use axum::response::IntoResponse;
1164
1165    let callback_ready = rf.as_ref().is_none_or(|f| f());
1166
1167    #[cfg(feature = "health")]
1168    let registry_ready = crate::health::HealthRegistry::is_ready();
1169    #[cfg(not(feature = "health"))]
1170    let registry_ready = true;
1171
1172    let ready = callback_ready && registry_ready;
1173    if ready {
1174        (
1175            [(axum::http::header::CONTENT_TYPE, "application/json")],
1176            r#"{"status":"ready"}"#,
1177        )
1178            .into_response()
1179    } else {
1180        (
1181            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1182            [(axum::http::header::CONTENT_TYPE, "application/json")],
1183            r#"{"status":"not_ready"}"#,
1184        )
1185            .into_response()
1186    }
1187}
1188
1189/// Run the axum-based metrics HTTP server with custom routes.
1190#[cfg(all(feature = "metrics", feature = "http-server"))]
1191async fn run_axum_server(
1192    listener: TcpListener,
1193    app: axum::Router,
1194    shutdown_rx: oneshot::Receiver<()>,
1195    update_interval: Duration,
1196    process_metrics: Option<ProcessMetrics>,
1197    container_metrics: Option<ContainerMetrics>,
1198) {
1199    let mut interval = tokio::time::interval(update_interval);
1200
1201    // Spawn the metrics update loop
1202    let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1203    tokio::spawn(async move {
1204        loop {
1205            tokio::select! {
1206                _ = &mut update_stop_rx => break,
1207                _ = interval.tick() => {
1208                    if let Some(ref pm) = process_metrics {
1209                        pm.update();
1210                    }
1211                    if let Some(ref cm) = container_metrics {
1212                        cm.update();
1213                    }
1214                }
1215            }
1216        }
1217    });
1218
1219    // Run axum server with graceful shutdown
1220    axum::serve(listener, app)
1221        .with_graceful_shutdown(async move {
1222            let _ = shutdown_rx.await;
1223        })
1224        .await
1225        .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1226
1227    let _ = update_stop_tx.send(());
1228}
1229
1230/// Standard latency histogram buckets.
1231#[must_use]
1232pub fn latency_buckets() -> Vec<f64> {
1233    vec![
1234        0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1235    ]
1236}
1237
1238/// Standard size histogram buckets.
1239#[must_use]
1240pub fn size_buckets() -> Vec<f64> {
1241    vec![
1242        100.0,
1243        1_000.0,
1244        10_000.0,
1245        100_000.0,
1246        1_000_000.0,
1247        10_000_000.0,
1248    ]
1249}
1250
1251/// Canonical unit label for a metric descriptor's `unit` field (manifest).
1252///
1253/// Maps the `metrics` crate [`Unit`] to the Prometheus-style base-unit string
1254/// (`seconds`/`bytes`); counts are dimensionless. Only the units rustlib
1255/// actually emits are mapped; anything else is left blank.
1256fn unit_label(unit: Unit) -> &'static str {
1257    match unit {
1258        Unit::Count => "count",
1259        Unit::Bytes => "bytes",
1260        Unit::Seconds => "seconds",
1261        _ => "",
1262    }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267    use super::*;
1268
1269    #[test]
1270    fn test_metrics_config_default() {
1271        let config = MetricsConfig::default();
1272        assert!(config.namespace.is_empty());
1273        assert!(config.enable_process_metrics);
1274        assert!(config.enable_container_metrics);
1275        assert_eq!(config.update_interval, Duration::from_secs(15));
1276    }
1277
1278    #[test]
1279    fn test_latency_buckets() {
1280        let buckets = latency_buckets();
1281        assert_eq!(buckets.len(), 12);
1282        assert!(buckets[0] < buckets[11]);
1283    }
1284
1285    #[test]
1286    fn test_size_buckets() {
1287        let buckets = size_buckets();
1288        assert_eq!(buckets.len(), 6);
1289        assert!(buckets[0] < buckets[5]);
1290    }
1291}