1mod container;
77pub mod dfe;
78pub mod labels;
79pub mod manifest;
80mod process;
81
82pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
83
84#[cfg(feature = "otel-metrics")]
85pub(crate) mod otel;
86#[cfg(feature = "otel-metrics")]
87pub mod otel_types;
88
89use std::net::SocketAddr;
90use std::sync::Arc;
91use std::time::Duration;
92
93use metrics::{Counter, Gauge, Histogram, Unit};
94use thiserror::Error;
95use tokio::net::TcpListener;
96use tokio::sync::oneshot;
97
98pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
100
101#[cfg(feature = "metrics")]
102use metrics_exporter_prometheus::PrometheusHandle;
103
104pub use container::ContainerMetrics;
105pub use dfe::DfeMetrics;
106#[cfg(feature = "metrics-dfe")]
107pub mod dfe_groups;
108pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
109pub use process::ProcessMetrics;
110
111#[cfg(all(feature = "scaling", feature = "expression"))]
115pub(crate) use container::cpu_limit_cores;
116#[cfg(all(feature = "scaling", feature = "expression"))]
117pub(crate) use process::cumulative_cpu_seconds;
118
119#[cfg(feature = "otel-metrics")]
120pub use otel_types::{OtelMetricsConfig, OtelProtocol};
121
122#[cfg(feature = "metrics")]
127#[derive(Clone)]
128pub struct RenderHandle(PrometheusHandle);
129
130#[cfg(feature = "metrics")]
131impl RenderHandle {
132 #[must_use]
134 pub fn render(&self) -> String {
135 self.0.render()
136 }
137}
138
139#[derive(Debug, Error)]
141pub enum MetricsError {
142 #[error("failed to build metrics exporter: {0}")]
144 BuildError(String),
145
146 #[error("failed to start metrics server: {0}")]
148 ServerError(String),
149
150 #[error("metrics server already running")]
152 AlreadyRunning,
153
154 #[error("metrics server not running")]
156 NotRunning,
157}
158
159#[derive(Debug, Clone)]
161pub struct MetricsConfig {
162 pub namespace: String,
164 pub enable_process_metrics: bool,
166 pub enable_container_metrics: bool,
168 pub update_interval: Duration,
170 #[cfg(feature = "otel-metrics")]
172 pub otel: OtelMetricsConfig,
173}
174
175impl Default for MetricsConfig {
176 fn default() -> Self {
177 Self {
178 namespace: String::new(),
179 enable_process_metrics: true,
180 enable_container_metrics: true,
181 update_interval: Duration::from_secs(15),
182 #[cfg(feature = "otel-metrics")]
183 otel: OtelMetricsConfig::default(),
184 }
185 }
186}
187
188struct RecorderSetup {
190 #[cfg(feature = "metrics")]
191 prom_handle: Option<PrometheusHandle>,
192 #[cfg(feature = "otel-metrics")]
193 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
194}
195
196#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203 {
204 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205 let handle = recorder.handle();
206 if let Err(e) = metrics::set_global_recorder(recorder) {
207 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
208 }
209 RecorderSetup {
210 prom_handle: Some(handle),
211 }
212 }
213
214 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
216 {
217 match otel::build_otel_recorder(&config.namespace, &config.otel) {
218 Ok((otel_recorder, provider)) => {
219 opentelemetry::global::set_meter_provider(provider.clone());
220 if let Err(e) = metrics::set_global_recorder(otel_recorder) {
221 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
222 }
223 RecorderSetup {
224 otel_provider: Some(provider),
225 }
226 }
227 Err(e) => {
228 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
229 RecorderSetup {
230 otel_provider: None,
231 }
232 }
233 }
234 }
235
236 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
238 {
239 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
241 let prom_handle = prom_recorder.handle();
242
243 match otel::build_otel_recorder(&config.namespace, &config.otel) {
245 Ok((otel_recorder, provider)) => {
246 opentelemetry::global::set_meter_provider(provider.clone());
247
248 let fanout = metrics_util::layers::FanoutBuilder::default()
250 .add_recorder(prom_recorder)
251 .add_recorder(otel_recorder)
252 .build();
253
254 if let Err(e) = metrics::set_global_recorder(fanout) {
255 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
256 }
257
258 RecorderSetup {
259 prom_handle: Some(prom_handle),
260 otel_provider: Some(provider),
261 }
262 }
263 Err(e) => {
264 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
266 if let Err(e) = metrics::set_global_recorder(prom_recorder) {
267 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
268 }
269 RecorderSetup {
270 prom_handle: Some(prom_handle),
271 otel_provider: None,
272 }
273 }
274 }
275 }
276}
277
278pub struct MetricsManager {
280 #[cfg(feature = "metrics")]
281 handle: Option<PrometheusHandle>,
282 config: MetricsConfig,
283 shutdown_tx: Option<oneshot::Sender<()>>,
284 process_metrics: Option<ProcessMetrics>,
285 container_metrics: Option<ContainerMetrics>,
286 readiness_fn: Option<ReadinessFn>,
287 started: Arc<std::sync::atomic::AtomicBool>,
288 registry: MetricRegistry,
289 #[cfg(all(feature = "metrics", feature = "scaling"))]
290 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
291 #[cfg(all(feature = "metrics", feature = "memory"))]
292 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
293 #[cfg(feature = "otel-metrics")]
294 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
295}
296
297impl MetricsManager {
298 #[must_use]
300 pub fn new(namespace: &str) -> Self {
301 Self::with_config(MetricsConfig {
302 namespace: namespace.to_string(),
303 ..Default::default()
304 })
305 }
306
307 #[cfg(test)]
315 pub(crate) fn new_for_test(namespace: &str) -> Self {
316 let config = MetricsConfig {
317 namespace: namespace.to_string(),
318 enable_process_metrics: false,
319 enable_container_metrics: false,
320 ..Default::default()
321 };
322
323 let registry = MetricRegistry::new(&config.namespace);
324
325 Self {
326 #[cfg(feature = "metrics")]
327 handle: None,
328 registry,
329 config,
330 shutdown_tx: None,
331 process_metrics: None,
332 container_metrics: None,
333 readiness_fn: None,
334 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
335 #[cfg(all(feature = "metrics", feature = "scaling"))]
336 scaling_pressure: None,
337 #[cfg(all(feature = "metrics", feature = "memory"))]
338 memory_guard: None,
339 #[cfg(feature = "otel-metrics")]
340 otel_provider: None,
341 }
342 }
343
344 #[must_use]
351 pub fn with_config(config: MetricsConfig) -> Self {
352 let setup = install_recorders(&config);
353
354 let process_metrics = if config.enable_process_metrics {
355 Some(ProcessMetrics::new(&config.namespace))
356 } else {
357 None
358 };
359
360 let container_metrics = if config.enable_container_metrics {
361 Some(ContainerMetrics::new(&config.namespace))
362 } else {
363 None
364 };
365
366 let registry = MetricRegistry::new(&config.namespace);
367
368 Self {
369 #[cfg(feature = "metrics")]
370 handle: setup.prom_handle,
371 registry,
372 config,
373 shutdown_tx: None,
374 process_metrics,
375 container_metrics,
376 readiness_fn: None,
377 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
378 #[cfg(all(feature = "metrics", feature = "scaling"))]
379 scaling_pressure: None,
380 #[cfg(all(feature = "metrics", feature = "memory"))]
381 memory_guard: None,
382 #[cfg(feature = "otel-metrics")]
383 otel_provider: setup.otel_provider,
384 }
385 }
386
387 #[must_use]
392 pub fn counter(&self, name: &str, description: &str) -> Counter {
393 let key = self.prefixed_key(name);
394 let desc = description.to_string();
395 metrics::describe_counter!(key.clone(), desc.clone());
396 self.registry.push(MetricDescriptor {
397 name: key.clone(),
398 metric_type: MetricType::Counter,
399 description: desc,
400 unit: String::new(),
401 labels: vec![],
402 group: "custom".into(),
403 buckets: None,
404 use_cases: vec![],
405 dashboard_hint: None,
406 });
407 metrics::counter!(key)
408 }
409
410 #[must_use]
416 pub fn counter_with_labels(
417 &self,
418 name: &str,
419 description: &str,
420 labels: &[&str],
421 group: &str,
422 ) -> Counter {
423 let key = self.prefixed_key(name);
424 let desc = description.to_string();
425 metrics::describe_counter!(key.clone(), desc.clone());
426 self.registry.push(MetricDescriptor {
427 name: key.clone(),
428 metric_type: MetricType::Counter,
429 description: desc,
430 unit: String::new(),
431 labels: labels.iter().map(|s| (*s).to_string()).collect(),
432 group: group.into(),
433 buckets: None,
434 use_cases: vec![],
435 dashboard_hint: None,
436 });
437 metrics::counter!(key)
438 }
439
440 #[must_use]
445 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
446 let key = self.prefixed_key(name);
447 let desc = description.to_string();
448 metrics::describe_gauge!(key.clone(), desc.clone());
449 self.registry.push(MetricDescriptor {
450 name: key.clone(),
451 metric_type: MetricType::Gauge,
452 description: desc,
453 unit: String::new(),
454 labels: vec![],
455 group: "custom".into(),
456 buckets: None,
457 use_cases: vec![],
458 dashboard_hint: None,
459 });
460 metrics::gauge!(key)
461 }
462
463 #[must_use]
465 pub fn gauge_with_labels(
466 &self,
467 name: &str,
468 description: &str,
469 labels: &[&str],
470 group: &str,
471 ) -> Gauge {
472 let key = self.prefixed_key(name);
473 let desc = description.to_string();
474 metrics::describe_gauge!(key.clone(), desc.clone());
475 self.registry.push(MetricDescriptor {
476 name: key.clone(),
477 metric_type: MetricType::Gauge,
478 description: desc,
479 unit: String::new(),
480 labels: labels.iter().map(|s| (*s).to_string()).collect(),
481 group: group.into(),
482 buckets: None,
483 use_cases: vec![],
484 dashboard_hint: None,
485 });
486 metrics::gauge!(key)
487 }
488
489 #[must_use]
494 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
495 let key = self.prefixed_key(name);
496 let desc = description.to_string();
497 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
498 self.registry.push(MetricDescriptor {
499 name: key.clone(),
500 metric_type: MetricType::Histogram,
501 description: desc,
502 unit: "seconds".into(),
503 labels: vec![],
504 group: "custom".into(),
505 buckets: None,
506 use_cases: vec![],
507 dashboard_hint: None,
508 });
509 metrics::histogram!(key)
510 }
511
512 #[must_use]
514 pub fn histogram_with_labels(
515 &self,
516 name: &str,
517 description: &str,
518 labels: &[&str],
519 group: &str,
520 buckets: Option<&[f64]>,
521 ) -> Histogram {
522 let key = self.prefixed_key(name);
523 let desc = description.to_string();
524 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
525 self.registry.push(MetricDescriptor {
526 name: key.clone(),
527 metric_type: MetricType::Histogram,
528 description: desc,
529 unit: "seconds".into(),
530 labels: labels.iter().map(|s| (*s).to_string()).collect(),
531 group: group.into(),
532 buckets: buckets.map(|b| b.to_vec()),
533 use_cases: vec![],
534 dashboard_hint: None,
535 });
536 metrics::histogram!(key)
537 }
538
539 #[must_use]
545 pub fn histogram_with_buckets(
546 &self,
547 name: &str,
548 description: &str,
549 buckets: &[f64],
550 ) -> Histogram {
551 let key = self.prefixed_key(name);
552 let desc = description.to_string();
553 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
554 self.registry.push(MetricDescriptor {
555 name: key.clone(),
556 metric_type: MetricType::Histogram,
557 description: desc,
558 unit: "seconds".into(),
559 labels: vec![],
560 group: "custom".into(),
561 buckets: Some(buckets.to_vec()),
562 use_cases: vec![],
563 dashboard_hint: None,
564 });
565 metrics::histogram!(key)
566 }
567
568 #[must_use]
576 pub fn histogram_with_unit(&self, name: &str, description: &str, unit: Unit) -> Histogram {
577 let key = self.prefixed_key(name);
578 let desc = description.to_string();
579 metrics::describe_histogram!(key.clone(), unit, desc.clone());
580 self.registry.push(MetricDescriptor {
581 name: key.clone(),
582 metric_type: MetricType::Histogram,
583 description: desc,
584 unit: unit_label(unit).to_string(),
585 labels: vec![],
586 group: "custom".into(),
587 buckets: None,
588 use_cases: vec![],
589 dashboard_hint: None,
590 });
591 metrics::histogram!(key)
592 }
593
594 #[must_use]
600 pub fn histogram_count(&self, name: &str, description: &str) -> Histogram {
601 self.histogram_with_unit(name, description, Unit::Count)
602 }
603
604 #[cfg(feature = "metrics")]
609 #[must_use]
610 pub fn render(&self) -> String {
611 self.handle
612 .as_ref()
613 .map_or_else(String::new, PrometheusHandle::render)
614 }
615
616 #[cfg(feature = "metrics")]
636 #[must_use]
637 pub fn render_handle(&self) -> Option<RenderHandle> {
638 self.handle.clone().map(RenderHandle)
639 }
640
641 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
647 self.readiness_fn = Some(Arc::new(f));
648 }
649
650 pub fn mark_started(&self) {
656 self.started
657 .store(true, std::sync::atomic::Ordering::Release);
658 }
659
660 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
662 Arc::clone(&self.started)
663 }
664
665 #[cfg(all(feature = "metrics", feature = "scaling"))]
670 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
671 self.scaling_pressure = Some(sp);
672 }
673
674 #[cfg(all(feature = "metrics", feature = "memory"))]
679 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
680 self.memory_guard = Some(mg);
681 }
682
683 pub fn update(&self) {
685 if let Some(ref pm) = self.process_metrics {
686 pm.update();
687 }
688 if let Some(ref cm) = self.container_metrics {
689 cm.update();
690 }
691 }
692
693 #[cfg(feature = "metrics")]
704 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
705 if self.shutdown_tx.is_some() {
706 return Err(MetricsError::AlreadyRunning);
707 }
708
709 let addr: SocketAddr = addr
710 .parse()
711 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
712
713 let listener = TcpListener::bind(addr)
714 .await
715 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
716
717 let (shutdown_tx, shutdown_rx) = oneshot::channel();
718 self.shutdown_tx = Some(shutdown_tx);
719
720 let handle = self
721 .handle
722 .as_ref()
723 .ok_or_else(|| {
724 MetricsError::ServerError(
725 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
726 )
727 })?
728 .clone();
729 let update_interval = self.config.update_interval;
730 let process_metrics = self.process_metrics.clone();
731 let container_metrics = self.container_metrics.clone();
732 let readiness_fn = self.readiness_fn.clone();
733 let started_flag = self.started_flag();
734
735 let registry = self.registry();
736
737 tokio::spawn(async move {
738 run_server(
739 listener,
740 handle,
741 registry,
742 shutdown_rx,
743 update_interval,
744 process_metrics,
745 container_metrics,
746 readiness_fn,
747 started_flag,
748 )
749 .await;
750 });
751
752 Ok(())
753 }
754
755 #[cfg(all(feature = "metrics", feature = "http-server"))]
773 pub async fn start_server_with_routes(
774 &mut self,
775 addr: &str,
776 extra_routes: axum::Router,
777 ) -> Result<(), MetricsError> {
778 if self.shutdown_tx.is_some() {
779 return Err(MetricsError::AlreadyRunning);
780 }
781
782 let addr: SocketAddr = addr
783 .parse()
784 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
785
786 let listener = TcpListener::bind(addr)
787 .await
788 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
789
790 let (shutdown_tx, shutdown_rx) = oneshot::channel();
791 self.shutdown_tx = Some(shutdown_tx);
792
793 let handle = self
794 .handle
795 .as_ref()
796 .ok_or_else(|| {
797 MetricsError::ServerError(
798 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
799 )
800 })?
801 .clone();
802 let update_interval = self.config.update_interval;
803 let process_metrics = self.process_metrics.clone();
804 let container_metrics = self.container_metrics.clone();
805 let readiness_fn = self.readiness_fn.clone();
806
807 let metrics_handle = handle.clone();
809 let readiness_for_live = readiness_fn.clone();
810 let registry_handle = self.registry();
811
812 let mut app = axum::Router::new()
813 .route(
814 "/metrics/manifest",
815 axum::routing::get(move || {
816 let reg = registry_handle.clone();
817 async move {
818 (
819 [(axum::http::header::CONTENT_TYPE, "application/json")],
820 serde_json::to_string(®.manifest()).unwrap_or_default(),
821 )
822 }
823 }),
824 )
825 .route(
826 "/metrics",
827 axum::routing::get(move || {
828 let h = metrics_handle.clone();
829 async move { h.render() }
830 }),
831 )
832 .route("/startupz", {
833 let sf = self.started_flag();
834 axum::routing::get(move || {
835 let started = sf.load(std::sync::atomic::Ordering::Acquire);
836 async move {
837 if started {
838 (
839 axum::http::StatusCode::OK,
840 [(axum::http::header::CONTENT_TYPE, "application/json")],
841 r#"{"status":"started"}"#,
842 )
843 } else {
844 (
845 axum::http::StatusCode::SERVICE_UNAVAILABLE,
846 [(axum::http::header::CONTENT_TYPE, "application/json")],
847 r#"{"status":"starting"}"#,
848 )
849 }
850 }
851 })
852 })
853 .route(
854 "/healthz",
855 axum::routing::get(|| async {
856 (
857 [(axum::http::header::CONTENT_TYPE, "application/json")],
858 r#"{"status":"alive"}"#,
859 )
860 }),
861 )
862 .route(
863 "/health/live",
864 axum::routing::get(|| async {
865 (
866 [(axum::http::header::CONTENT_TYPE, "application/json")],
867 r#"{"status":"alive"}"#,
868 )
869 }),
870 )
871 .route(
872 "/readyz",
873 axum::routing::get(move || {
874 let rf = readiness_fn.clone();
875 async move { readiness_response(rf) }
876 }),
877 )
878 .route(
879 "/health/ready",
880 axum::routing::get(move || {
881 let rf = readiness_for_live.clone();
882 async move { readiness_response(rf) }
883 }),
884 );
885
886 #[cfg(feature = "scaling")]
888 if let Some(ref sp) = self.scaling_pressure {
889 let sp = sp.clone();
890 app = app.route(
891 "/scaling/pressure",
892 axum::routing::get(move || {
893 let s = sp.clone();
894 async move { format!("{:.2}", s.calculate()) }
895 }),
896 );
897 }
898
899 #[cfg(feature = "memory")]
901 if let Some(ref mg) = self.memory_guard {
902 let mg = mg.clone();
903 app = app.route(
904 "/memory/pressure",
905 axum::routing::get(move || {
906 let m = mg.clone();
907 async move {
908 (
909 [(axum::http::header::CONTENT_TYPE, "application/json")],
910 format!(
911 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
912 m.under_pressure(),
913 m.pressure_ratio(),
914 m.current_bytes(),
915 m.limit_bytes()
916 ),
917 )
918 }
919 }),
920 );
921 }
922
923 app = app.merge(extra_routes);
925
926 tokio::spawn(async move {
927 run_axum_server(
928 listener,
929 app,
930 shutdown_rx,
931 update_interval,
932 process_metrics,
933 container_metrics,
934 )
935 .await;
936 });
937
938 Ok(())
939 }
940
941 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
947 if let Some(tx) = self.shutdown_tx.take() {
948 let _ = tx.send(());
949 Ok(())
950 } else {
951 Err(MetricsError::NotRunning)
952 }
953 }
954
955 #[cfg(feature = "otel-metrics")]
959 pub fn shutdown_otel(&mut self) {
960 if let Some(provider) = self.otel_provider.take()
961 && let Err(e) = provider.shutdown()
962 {
963 tracing::warn!(error = %e, "OTel provider shutdown error");
964 }
965 }
966
967 pub fn set_build_info(&self, version: &str, commit: &str) {
973 self.registry.set_build_info(version, commit);
974 }
975
976 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
980 self.registry.set_use_cases(metric_name, use_cases);
981 }
982
983 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
987 self.registry.set_dashboard_hint(metric_name, hint);
988 }
989
990 #[must_use]
995 pub fn registry(&self) -> MetricRegistry {
996 self.registry.clone()
997 }
998
999 #[must_use]
1004 pub fn namespace(&self) -> &str {
1005 &self.config.namespace
1006 }
1007
1008 fn prefixed_key(&self, name: &str) -> String {
1010 if self.config.namespace.is_empty() {
1011 name.to_string()
1012 } else {
1013 format!("{}_{}", self.config.namespace, name)
1014 }
1015 }
1016}
1017
1018#[cfg(feature = "metrics")]
1020#[allow(clippy::too_many_arguments)]
1021async fn run_server(
1022 listener: TcpListener,
1023 handle: PrometheusHandle,
1024 registry: MetricRegistry,
1025 mut shutdown_rx: oneshot::Receiver<()>,
1026 update_interval: Duration,
1027 process_metrics: Option<ProcessMetrics>,
1028 container_metrics: Option<ContainerMetrics>,
1029 readiness_fn: Option<ReadinessFn>,
1030 started_flag: Arc<std::sync::atomic::AtomicBool>,
1031) {
1032 let mut update_interval = tokio::time::interval(update_interval);
1033
1034 loop {
1035 tokio::select! {
1036 _ = &mut shutdown_rx => {
1037 break;
1038 }
1039 _ = update_interval.tick() => {
1040 if let Some(ref pm) = process_metrics {
1041 pm.update();
1042 }
1043 if let Some(ref cm) = container_metrics {
1044 cm.update();
1045 }
1046 }
1047 result = listener.accept() => {
1048 if let Ok((stream, _)) = result {
1049 let handle = handle.clone();
1050 let registry = registry.clone();
1051 let readiness_fn = readiness_fn.clone();
1052 let sf = Arc::clone(&started_flag);
1053 tokio::spawn(async move {
1054 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1055 });
1056 }
1057 }
1058 }
1059 }
1060}
1061
1062#[cfg(feature = "metrics")]
1067async fn handle_connection(
1068 mut stream: tokio::net::TcpStream,
1069 handle: PrometheusHandle,
1070 registry: MetricRegistry,
1071 readiness_fn: Option<ReadinessFn>,
1072 started_flag: &std::sync::atomic::AtomicBool,
1073) {
1074 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1075
1076 let mut reader = BufReader::new(&mut stream);
1077 let mut request_line = String::new();
1078
1079 if reader.read_line(&mut request_line).await.is_err() {
1080 return;
1081 }
1082
1083 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1085 (
1086 "200 OK",
1087 "application/json",
1088 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1089 )
1090 } else if request_line.starts_with("GET /metrics") {
1091 ("200 OK", "text/plain; charset=utf-8", handle.render())
1092 } else if request_line.starts_with("GET /startupz")
1093 || request_line.starts_with("GET /health/startup")
1094 {
1095 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1096 (
1097 "200 OK",
1098 "application/json",
1099 r#"{"status":"started"}"#.to_string(),
1100 )
1101 } else {
1102 (
1103 "503 Service Unavailable",
1104 "application/json",
1105 r#"{"status":"starting"}"#.to_string(),
1106 )
1107 }
1108 } else if request_line.starts_with("GET /healthz")
1109 || request_line.starts_with("GET /health/live")
1110 {
1111 (
1112 "200 OK",
1113 "application/json",
1114 r#"{"status":"alive"}"#.to_string(),
1115 )
1116 } else if request_line.starts_with("GET /readyz")
1117 || request_line.starts_with("GET /health/ready")
1118 {
1119 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1120
1121 #[cfg(feature = "health")]
1122 let registry_ready = crate::health::HealthRegistry::is_ready();
1123 #[cfg(not(feature = "health"))]
1124 let registry_ready = true;
1125
1126 let ready = callback_ready && registry_ready;
1127 if ready {
1128 (
1129 "200 OK",
1130 "application/json",
1131 r#"{"status":"ready"}"#.to_string(),
1132 )
1133 } else {
1134 (
1135 "503 Service Unavailable",
1136 "application/json",
1137 r#"{"status":"not_ready"}"#.to_string(),
1138 )
1139 }
1140 } else {
1141 (
1142 "404 Not Found",
1143 "text/plain; charset=utf-8",
1144 "Not Found".to_string(),
1145 )
1146 };
1147
1148 let response = format!(
1149 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1150 body.len()
1151 );
1152
1153 let _ = stream.write_all(response.as_bytes()).await;
1154}
1155
1156#[cfg(all(feature = "metrics", feature = "http-server"))]
1162fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1163 use axum::response::IntoResponse;
1164
1165 let callback_ready = rf.as_ref().is_none_or(|f| f());
1166
1167 #[cfg(feature = "health")]
1168 let registry_ready = crate::health::HealthRegistry::is_ready();
1169 #[cfg(not(feature = "health"))]
1170 let registry_ready = true;
1171
1172 let ready = callback_ready && registry_ready;
1173 if ready {
1174 (
1175 [(axum::http::header::CONTENT_TYPE, "application/json")],
1176 r#"{"status":"ready"}"#,
1177 )
1178 .into_response()
1179 } else {
1180 (
1181 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1182 [(axum::http::header::CONTENT_TYPE, "application/json")],
1183 r#"{"status":"not_ready"}"#,
1184 )
1185 .into_response()
1186 }
1187}
1188
1189#[cfg(all(feature = "metrics", feature = "http-server"))]
1191async fn run_axum_server(
1192 listener: TcpListener,
1193 app: axum::Router,
1194 shutdown_rx: oneshot::Receiver<()>,
1195 update_interval: Duration,
1196 process_metrics: Option<ProcessMetrics>,
1197 container_metrics: Option<ContainerMetrics>,
1198) {
1199 let mut interval = tokio::time::interval(update_interval);
1200
1201 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1203 tokio::spawn(async move {
1204 loop {
1205 tokio::select! {
1206 _ = &mut update_stop_rx => break,
1207 _ = interval.tick() => {
1208 if let Some(ref pm) = process_metrics {
1209 pm.update();
1210 }
1211 if let Some(ref cm) = container_metrics {
1212 cm.update();
1213 }
1214 }
1215 }
1216 }
1217 });
1218
1219 axum::serve(listener, app)
1221 .with_graceful_shutdown(async move {
1222 let _ = shutdown_rx.await;
1223 })
1224 .await
1225 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1226
1227 let _ = update_stop_tx.send(());
1228}
1229
1230#[must_use]
1232pub fn latency_buckets() -> Vec<f64> {
1233 vec![
1234 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1235 ]
1236}
1237
1238#[must_use]
1240pub fn size_buckets() -> Vec<f64> {
1241 vec![
1242 100.0,
1243 1_000.0,
1244 10_000.0,
1245 100_000.0,
1246 1_000_000.0,
1247 10_000_000.0,
1248 ]
1249}
1250
1251fn unit_label(unit: Unit) -> &'static str {
1257 match unit {
1258 Unit::Count => "count",
1259 Unit::Bytes => "bytes",
1260 Unit::Seconds => "seconds",
1261 _ => "",
1262 }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267 use super::*;
1268
1269 #[test]
1270 fn test_metrics_config_default() {
1271 let config = MetricsConfig::default();
1272 assert!(config.namespace.is_empty());
1273 assert!(config.enable_process_metrics);
1274 assert!(config.enable_container_metrics);
1275 assert_eq!(config.update_interval, Duration::from_secs(15));
1276 }
1277
1278 #[test]
1279 fn test_latency_buckets() {
1280 let buckets = latency_buckets();
1281 assert_eq!(buckets.len(), 12);
1282 assert!(buckets[0] < buckets[11]);
1283 }
1284
1285 #[test]
1286 fn test_size_buckets() {
1287 let buckets = size_buckets();
1288 assert_eq!(buckets.len(), 6);
1289 assert!(buckets[0] < buckets[5]);
1290 }
1291}