1mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130 #[must_use]
132 pub fn render(&self) -> String {
133 self.0.render()
134 }
135}
136
137#[derive(Debug, Error)]
139pub enum MetricsError {
140 #[error("failed to build metrics exporter: {0}")]
142 BuildError(String),
143
144 #[error("failed to start metrics server: {0}")]
146 ServerError(String),
147
148 #[error("metrics server already running")]
150 AlreadyRunning,
151
152 #[error("metrics server not running")]
154 NotRunning,
155}
156
157#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160 pub namespace: String,
162 pub enable_process_metrics: bool,
164 pub enable_container_metrics: bool,
166 pub update_interval: Duration,
168 #[cfg(feature = "otel-metrics")]
170 pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174 fn default() -> Self {
175 Self {
176 namespace: String::new(),
177 enable_process_metrics: true,
178 enable_container_metrics: true,
179 update_interval: Duration::from_secs(15),
180 #[cfg(feature = "otel-metrics")]
181 otel: OtelMetricsConfig::default(),
182 }
183 }
184}
185
186struct RecorderSetup {
188 #[cfg(feature = "metrics")]
189 prom_handle: Option<PrometheusHandle>,
190 #[cfg(feature = "otel-metrics")]
191 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203 {
204 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205 let handle = recorder.handle();
206 metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207 RecorderSetup {
208 prom_handle: Some(handle),
209 }
210 }
211
212 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214 {
215 match otel::build_otel_recorder(&config.namespace, &config.otel) {
216 Ok((otel_recorder, provider)) => {
217 opentelemetry::global::set_meter_provider(provider.clone());
218 metrics::set_global_recorder(otel_recorder)
219 .expect("failed to set OTel metrics recorder");
220 RecorderSetup {
221 otel_provider: Some(provider),
222 }
223 }
224 Err(e) => {
225 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226 RecorderSetup {
227 otel_provider: None,
228 }
229 }
230 }
231 }
232
233 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235 {
236 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238 let prom_handle = prom_recorder.handle();
239
240 match otel::build_otel_recorder(&config.namespace, &config.otel) {
242 Ok((otel_recorder, provider)) => {
243 opentelemetry::global::set_meter_provider(provider.clone());
244
245 let fanout = metrics_util::layers::FanoutBuilder::default()
247 .add_recorder(prom_recorder)
248 .add_recorder(otel_recorder)
249 .build();
250
251 metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253 RecorderSetup {
254 prom_handle: Some(prom_handle),
255 otel_provider: Some(provider),
256 }
257 }
258 Err(e) => {
259 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261 metrics::set_global_recorder(prom_recorder)
262 .expect("failed to set Prometheus recorder");
263 RecorderSetup {
264 prom_handle: Some(prom_handle),
265 otel_provider: None,
266 }
267 }
268 }
269 }
270}
271
272pub struct MetricsManager {
274 #[cfg(feature = "metrics")]
275 handle: Option<PrometheusHandle>,
276 config: MetricsConfig,
277 shutdown_tx: Option<oneshot::Sender<()>>,
278 process_metrics: Option<ProcessMetrics>,
279 container_metrics: Option<ContainerMetrics>,
280 readiness_fn: Option<ReadinessFn>,
281 started: Arc<std::sync::atomic::AtomicBool>,
282 registry: MetricRegistry,
283 #[cfg(all(feature = "metrics", feature = "scaling"))]
284 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
285 #[cfg(all(feature = "metrics", feature = "memory"))]
286 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
287 #[cfg(feature = "otel-metrics")]
288 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
289}
290
291impl MetricsManager {
292 #[must_use]
294 pub fn new(namespace: &str) -> Self {
295 Self::with_config(MetricsConfig {
296 namespace: namespace.to_string(),
297 ..Default::default()
298 })
299 }
300
301 #[must_use]
308 pub fn with_config(config: MetricsConfig) -> Self {
309 let setup = install_recorders(&config);
310
311 let process_metrics = if config.enable_process_metrics {
312 Some(ProcessMetrics::new(&config.namespace))
313 } else {
314 None
315 };
316
317 let container_metrics = if config.enable_container_metrics {
318 Some(ContainerMetrics::new(&config.namespace))
319 } else {
320 None
321 };
322
323 let registry = MetricRegistry::new(&config.namespace);
324
325 Self {
326 #[cfg(feature = "metrics")]
327 handle: setup.prom_handle,
328 registry,
329 config,
330 shutdown_tx: None,
331 process_metrics,
332 container_metrics,
333 readiness_fn: None,
334 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
335 #[cfg(all(feature = "metrics", feature = "scaling"))]
336 scaling_pressure: None,
337 #[cfg(all(feature = "metrics", feature = "memory"))]
338 memory_guard: None,
339 #[cfg(feature = "otel-metrics")]
340 otel_provider: setup.otel_provider,
341 }
342 }
343
344 #[must_use]
349 pub fn counter(&self, name: &str, description: &str) -> Counter {
350 let key = self.prefixed_key(name);
351 let desc = description.to_string();
352 metrics::describe_counter!(key.clone(), desc.clone());
353 self.registry.push(MetricDescriptor {
354 name: key.clone(),
355 metric_type: MetricType::Counter,
356 description: desc,
357 unit: String::new(),
358 labels: vec![],
359 group: "custom".into(),
360 buckets: None,
361 use_cases: vec![],
362 dashboard_hint: None,
363 });
364 metrics::counter!(key)
365 }
366
367 #[must_use]
373 pub fn counter_with_labels(
374 &self,
375 name: &str,
376 description: &str,
377 labels: &[&str],
378 group: &str,
379 ) -> Counter {
380 let key = self.prefixed_key(name);
381 let desc = description.to_string();
382 metrics::describe_counter!(key.clone(), desc.clone());
383 self.registry.push(MetricDescriptor {
384 name: key.clone(),
385 metric_type: MetricType::Counter,
386 description: desc,
387 unit: String::new(),
388 labels: labels.iter().map(|s| (*s).to_string()).collect(),
389 group: group.into(),
390 buckets: None,
391 use_cases: vec![],
392 dashboard_hint: None,
393 });
394 metrics::counter!(key)
395 }
396
397 #[must_use]
402 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
403 let key = self.prefixed_key(name);
404 let desc = description.to_string();
405 metrics::describe_gauge!(key.clone(), desc.clone());
406 self.registry.push(MetricDescriptor {
407 name: key.clone(),
408 metric_type: MetricType::Gauge,
409 description: desc,
410 unit: String::new(),
411 labels: vec![],
412 group: "custom".into(),
413 buckets: None,
414 use_cases: vec![],
415 dashboard_hint: None,
416 });
417 metrics::gauge!(key)
418 }
419
420 #[must_use]
422 pub fn gauge_with_labels(
423 &self,
424 name: &str,
425 description: &str,
426 labels: &[&str],
427 group: &str,
428 ) -> Gauge {
429 let key = self.prefixed_key(name);
430 let desc = description.to_string();
431 metrics::describe_gauge!(key.clone(), desc.clone());
432 self.registry.push(MetricDescriptor {
433 name: key.clone(),
434 metric_type: MetricType::Gauge,
435 description: desc,
436 unit: String::new(),
437 labels: labels.iter().map(|s| (*s).to_string()).collect(),
438 group: group.into(),
439 buckets: None,
440 use_cases: vec![],
441 dashboard_hint: None,
442 });
443 metrics::gauge!(key)
444 }
445
446 #[must_use]
451 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
452 let key = self.prefixed_key(name);
453 let desc = description.to_string();
454 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
455 self.registry.push(MetricDescriptor {
456 name: key.clone(),
457 metric_type: MetricType::Histogram,
458 description: desc,
459 unit: "seconds".into(),
460 labels: vec![],
461 group: "custom".into(),
462 buckets: None,
463 use_cases: vec![],
464 dashboard_hint: None,
465 });
466 metrics::histogram!(key)
467 }
468
469 #[must_use]
471 pub fn histogram_with_labels(
472 &self,
473 name: &str,
474 description: &str,
475 labels: &[&str],
476 group: &str,
477 buckets: Option<&[f64]>,
478 ) -> Histogram {
479 let key = self.prefixed_key(name);
480 let desc = description.to_string();
481 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
482 self.registry.push(MetricDescriptor {
483 name: key.clone(),
484 metric_type: MetricType::Histogram,
485 description: desc,
486 unit: "seconds".into(),
487 labels: labels.iter().map(|s| (*s).to_string()).collect(),
488 group: group.into(),
489 buckets: buckets.map(|b| b.to_vec()),
490 use_cases: vec![],
491 dashboard_hint: None,
492 });
493 metrics::histogram!(key)
494 }
495
496 #[must_use]
502 pub fn histogram_with_buckets(
503 &self,
504 name: &str,
505 description: &str,
506 buckets: &[f64],
507 ) -> Histogram {
508 let key = self.prefixed_key(name);
509 let desc = description.to_string();
510 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
511 self.registry.push(MetricDescriptor {
512 name: key.clone(),
513 metric_type: MetricType::Histogram,
514 description: desc,
515 unit: "seconds".into(),
516 labels: vec![],
517 group: "custom".into(),
518 buckets: Some(buckets.to_vec()),
519 use_cases: vec![],
520 dashboard_hint: None,
521 });
522 metrics::histogram!(key)
523 }
524
525 #[cfg(feature = "metrics")]
530 #[must_use]
531 pub fn render(&self) -> String {
532 self.handle
533 .as_ref()
534 .map_or_else(String::new, PrometheusHandle::render)
535 }
536
537 #[cfg(feature = "metrics")]
560 #[must_use]
561 pub fn render_handle(&self) -> Option<RenderHandle> {
562 self.handle.clone().map(RenderHandle)
563 }
564
565 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
571 self.readiness_fn = Some(Arc::new(f));
572 }
573
574 pub fn mark_started(&self) {
580 self.started
581 .store(true, std::sync::atomic::Ordering::Release);
582 }
583
584 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
586 Arc::clone(&self.started)
587 }
588
589 #[cfg(all(feature = "metrics", feature = "scaling"))]
594 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
595 self.scaling_pressure = Some(sp);
596 }
597
598 #[cfg(all(feature = "metrics", feature = "memory"))]
603 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
604 self.memory_guard = Some(mg);
605 }
606
607 pub fn update(&self) {
609 if let Some(ref pm) = self.process_metrics {
610 pm.update();
611 }
612 if let Some(ref cm) = self.container_metrics {
613 cm.update();
614 }
615 }
616
617 #[cfg(feature = "metrics")]
628 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
629 if self.shutdown_tx.is_some() {
630 return Err(MetricsError::AlreadyRunning);
631 }
632
633 let addr: SocketAddr = addr
634 .parse()
635 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
636
637 let listener = TcpListener::bind(addr)
638 .await
639 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
640
641 let (shutdown_tx, shutdown_rx) = oneshot::channel();
642 self.shutdown_tx = Some(shutdown_tx);
643
644 let handle = self
645 .handle
646 .as_ref()
647 .ok_or_else(|| {
648 MetricsError::ServerError(
649 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
650 )
651 })?
652 .clone();
653 let update_interval = self.config.update_interval;
654 let process_metrics = self.process_metrics.clone();
655 let container_metrics = self.container_metrics.clone();
656 let readiness_fn = self.readiness_fn.clone();
657 let started_flag = self.started_flag();
658
659 let registry = self.registry();
660
661 tokio::spawn(async move {
662 run_server(
663 listener,
664 handle,
665 registry,
666 shutdown_rx,
667 update_interval,
668 process_metrics,
669 container_metrics,
670 readiness_fn,
671 started_flag,
672 )
673 .await;
674 });
675
676 Ok(())
677 }
678
679 #[cfg(all(feature = "metrics", feature = "http-server"))]
697 pub async fn start_server_with_routes(
698 &mut self,
699 addr: &str,
700 extra_routes: axum::Router,
701 ) -> Result<(), MetricsError> {
702 if self.shutdown_tx.is_some() {
703 return Err(MetricsError::AlreadyRunning);
704 }
705
706 let addr: SocketAddr = addr
707 .parse()
708 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
709
710 let listener = TcpListener::bind(addr)
711 .await
712 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
713
714 let (shutdown_tx, shutdown_rx) = oneshot::channel();
715 self.shutdown_tx = Some(shutdown_tx);
716
717 let handle = self
718 .handle
719 .as_ref()
720 .ok_or_else(|| {
721 MetricsError::ServerError(
722 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
723 )
724 })?
725 .clone();
726 let update_interval = self.config.update_interval;
727 let process_metrics = self.process_metrics.clone();
728 let container_metrics = self.container_metrics.clone();
729 let readiness_fn = self.readiness_fn.clone();
730
731 let metrics_handle = handle.clone();
733 let readiness_for_live = readiness_fn.clone();
734 let registry_handle = self.registry();
735
736 let mut app = axum::Router::new()
737 .route(
738 "/metrics/manifest",
739 axum::routing::get(move || {
740 let reg = registry_handle.clone();
741 async move {
742 (
743 [(axum::http::header::CONTENT_TYPE, "application/json")],
744 serde_json::to_string(®.manifest()).unwrap_or_default(),
745 )
746 }
747 }),
748 )
749 .route(
750 "/metrics",
751 axum::routing::get(move || {
752 let h = metrics_handle.clone();
753 async move { h.render() }
754 }),
755 )
756 .route("/startupz", {
757 let sf = self.started_flag();
758 axum::routing::get(move || {
759 let started = sf.load(std::sync::atomic::Ordering::Acquire);
760 async move {
761 if started {
762 (
763 axum::http::StatusCode::OK,
764 [(axum::http::header::CONTENT_TYPE, "application/json")],
765 r#"{"status":"started"}"#,
766 )
767 } else {
768 (
769 axum::http::StatusCode::SERVICE_UNAVAILABLE,
770 [(axum::http::header::CONTENT_TYPE, "application/json")],
771 r#"{"status":"starting"}"#,
772 )
773 }
774 }
775 })
776 })
777 .route(
778 "/healthz",
779 axum::routing::get(|| async {
780 (
781 [(axum::http::header::CONTENT_TYPE, "application/json")],
782 r#"{"status":"alive"}"#,
783 )
784 }),
785 )
786 .route(
787 "/health/live",
788 axum::routing::get(|| async {
789 (
790 [(axum::http::header::CONTENT_TYPE, "application/json")],
791 r#"{"status":"alive"}"#,
792 )
793 }),
794 )
795 .route(
796 "/readyz",
797 axum::routing::get(move || {
798 let rf = readiness_fn.clone();
799 async move { readiness_response(rf) }
800 }),
801 )
802 .route(
803 "/health/ready",
804 axum::routing::get(move || {
805 let rf = readiness_for_live.clone();
806 async move { readiness_response(rf) }
807 }),
808 );
809
810 #[cfg(feature = "scaling")]
812 if let Some(ref sp) = self.scaling_pressure {
813 let sp = sp.clone();
814 app = app.route(
815 "/scaling/pressure",
816 axum::routing::get(move || {
817 let s = sp.clone();
818 async move { format!("{:.2}", s.calculate()) }
819 }),
820 );
821 }
822
823 #[cfg(feature = "memory")]
825 if let Some(ref mg) = self.memory_guard {
826 let mg = mg.clone();
827 app = app.route(
828 "/memory/pressure",
829 axum::routing::get(move || {
830 let m = mg.clone();
831 async move {
832 (
833 [(axum::http::header::CONTENT_TYPE, "application/json")],
834 format!(
835 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
836 m.under_pressure(),
837 m.pressure_ratio(),
838 m.current_bytes(),
839 m.limit_bytes()
840 ),
841 )
842 }
843 }),
844 );
845 }
846
847 app = app.merge(extra_routes);
849
850 tokio::spawn(async move {
851 run_axum_server(
852 listener,
853 app,
854 shutdown_rx,
855 update_interval,
856 process_metrics,
857 container_metrics,
858 )
859 .await;
860 });
861
862 Ok(())
863 }
864
865 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
871 if let Some(tx) = self.shutdown_tx.take() {
872 let _ = tx.send(());
873 Ok(())
874 } else {
875 Err(MetricsError::NotRunning)
876 }
877 }
878
879 #[cfg(feature = "otel-metrics")]
883 pub fn shutdown_otel(&mut self) {
884 if let Some(provider) = self.otel_provider.take()
885 && let Err(e) = provider.shutdown()
886 {
887 tracing::warn!(error = %e, "OTel provider shutdown error");
888 }
889 }
890
891 pub fn set_build_info(&self, version: &str, commit: &str) {
897 self.registry.set_build_info(version, commit);
898 }
899
900 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
904 self.registry.set_use_cases(metric_name, use_cases);
905 }
906
907 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
911 self.registry.set_dashboard_hint(metric_name, hint);
912 }
913
914 #[must_use]
919 pub fn registry(&self) -> MetricRegistry {
920 self.registry.clone()
921 }
922
923 #[must_use]
927 pub fn namespace(&self) -> &str {
928 &self.config.namespace
929 }
930
931 fn prefixed_key(&self, name: &str) -> String {
933 if self.config.namespace.is_empty() {
934 name.to_string()
935 } else {
936 format!("{}_{}", self.config.namespace, name)
937 }
938 }
939}
940
941#[cfg(feature = "metrics")]
943#[allow(clippy::too_many_arguments)]
944async fn run_server(
945 listener: TcpListener,
946 handle: PrometheusHandle,
947 registry: MetricRegistry,
948 mut shutdown_rx: oneshot::Receiver<()>,
949 update_interval: Duration,
950 process_metrics: Option<ProcessMetrics>,
951 container_metrics: Option<ContainerMetrics>,
952 readiness_fn: Option<ReadinessFn>,
953 started_flag: Arc<std::sync::atomic::AtomicBool>,
954) {
955 let mut update_interval = tokio::time::interval(update_interval);
956
957 loop {
958 tokio::select! {
959 _ = &mut shutdown_rx => {
960 break;
961 }
962 _ = update_interval.tick() => {
963 if let Some(ref pm) = process_metrics {
964 pm.update();
965 }
966 if let Some(ref cm) = container_metrics {
967 cm.update();
968 }
969 }
970 result = listener.accept() => {
971 if let Ok((stream, _)) = result {
972 let handle = handle.clone();
973 let registry = registry.clone();
974 let readiness_fn = readiness_fn.clone();
975 let sf = Arc::clone(&started_flag);
976 tokio::spawn(async move {
977 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
978 });
979 }
980 }
981 }
982 }
983}
984
985#[cfg(feature = "metrics")]
990async fn handle_connection(
991 mut stream: tokio::net::TcpStream,
992 handle: PrometheusHandle,
993 registry: MetricRegistry,
994 readiness_fn: Option<ReadinessFn>,
995 started_flag: &std::sync::atomic::AtomicBool,
996) {
997 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
998
999 let mut reader = BufReader::new(&mut stream);
1000 let mut request_line = String::new();
1001
1002 if reader.read_line(&mut request_line).await.is_err() {
1003 return;
1004 }
1005
1006 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1008 (
1009 "200 OK",
1010 "application/json",
1011 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1012 )
1013 } else if request_line.starts_with("GET /metrics") {
1014 ("200 OK", "text/plain; charset=utf-8", handle.render())
1015 } else if request_line.starts_with("GET /startupz")
1016 || request_line.starts_with("GET /health/startup")
1017 {
1018 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1019 (
1020 "200 OK",
1021 "application/json",
1022 r#"{"status":"started"}"#.to_string(),
1023 )
1024 } else {
1025 (
1026 "503 Service Unavailable",
1027 "application/json",
1028 r#"{"status":"starting"}"#.to_string(),
1029 )
1030 }
1031 } else if request_line.starts_with("GET /healthz")
1032 || request_line.starts_with("GET /health/live")
1033 {
1034 (
1035 "200 OK",
1036 "application/json",
1037 r#"{"status":"alive"}"#.to_string(),
1038 )
1039 } else if request_line.starts_with("GET /readyz")
1040 || request_line.starts_with("GET /health/ready")
1041 {
1042 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1043
1044 #[cfg(feature = "health")]
1045 let registry_ready = crate::health::HealthRegistry::is_ready();
1046 #[cfg(not(feature = "health"))]
1047 let registry_ready = true;
1048
1049 let ready = callback_ready && registry_ready;
1050 if ready {
1051 (
1052 "200 OK",
1053 "application/json",
1054 r#"{"status":"ready"}"#.to_string(),
1055 )
1056 } else {
1057 (
1058 "503 Service Unavailable",
1059 "application/json",
1060 r#"{"status":"not_ready"}"#.to_string(),
1061 )
1062 }
1063 } else {
1064 (
1065 "404 Not Found",
1066 "text/plain; charset=utf-8",
1067 "Not Found".to_string(),
1068 )
1069 };
1070
1071 let response = format!(
1072 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1073 body.len()
1074 );
1075
1076 let _ = stream.write_all(response.as_bytes()).await;
1077}
1078
1079#[cfg(all(feature = "metrics", feature = "http-server"))]
1085fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1086 use axum::response::IntoResponse;
1087
1088 let callback_ready = rf.as_ref().is_none_or(|f| f());
1089
1090 #[cfg(feature = "health")]
1091 let registry_ready = crate::health::HealthRegistry::is_ready();
1092 #[cfg(not(feature = "health"))]
1093 let registry_ready = true;
1094
1095 let ready = callback_ready && registry_ready;
1096 if ready {
1097 (
1098 [(axum::http::header::CONTENT_TYPE, "application/json")],
1099 r#"{"status":"ready"}"#,
1100 )
1101 .into_response()
1102 } else {
1103 (
1104 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1105 [(axum::http::header::CONTENT_TYPE, "application/json")],
1106 r#"{"status":"not_ready"}"#,
1107 )
1108 .into_response()
1109 }
1110}
1111
1112#[cfg(all(feature = "metrics", feature = "http-server"))]
1114async fn run_axum_server(
1115 listener: TcpListener,
1116 app: axum::Router,
1117 shutdown_rx: oneshot::Receiver<()>,
1118 update_interval: Duration,
1119 process_metrics: Option<ProcessMetrics>,
1120 container_metrics: Option<ContainerMetrics>,
1121) {
1122 let mut interval = tokio::time::interval(update_interval);
1123
1124 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1126 tokio::spawn(async move {
1127 loop {
1128 tokio::select! {
1129 _ = &mut update_stop_rx => break,
1130 _ = interval.tick() => {
1131 if let Some(ref pm) = process_metrics {
1132 pm.update();
1133 }
1134 if let Some(ref cm) = container_metrics {
1135 cm.update();
1136 }
1137 }
1138 }
1139 }
1140 });
1141
1142 axum::serve(listener, app)
1144 .with_graceful_shutdown(async move {
1145 let _ = shutdown_rx.await;
1146 })
1147 .await
1148 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1149
1150 let _ = update_stop_tx.send(());
1151}
1152
1153#[must_use]
1155pub fn latency_buckets() -> Vec<f64> {
1156 vec![
1157 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1158 ]
1159}
1160
1161#[must_use]
1163pub fn size_buckets() -> Vec<f64> {
1164 vec![
1165 100.0,
1166 1_000.0,
1167 10_000.0,
1168 100_000.0,
1169 1_000_000.0,
1170 10_000_000.0,
1171 ]
1172}
1173
1174#[cfg(test)]
1175mod tests {
1176 use super::*;
1177
1178 #[test]
1179 fn test_metrics_config_default() {
1180 let config = MetricsConfig::default();
1181 assert!(config.namespace.is_empty());
1182 assert!(config.enable_process_metrics);
1183 assert!(config.enable_container_metrics);
1184 assert_eq!(config.update_interval, Duration::from_secs(15));
1185 }
1186
1187 #[test]
1188 fn test_latency_buckets() {
1189 let buckets = latency_buckets();
1190 assert_eq!(buckets.len(), 12);
1191 assert!(buckets[0] < buckets[11]);
1192 }
1193
1194 #[test]
1195 fn test_size_buckets() {
1196 let buckets = size_buckets();
1197 assert_eq!(buckets.len(), 6);
1198 assert!(buckets[0] < buckets[5]);
1199 }
1200}