1mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130 #[must_use]
132 pub fn render(&self) -> String {
133 self.0.render()
134 }
135}
136
137#[derive(Debug, Error)]
139pub enum MetricsError {
140 #[error("failed to build metrics exporter: {0}")]
142 BuildError(String),
143
144 #[error("failed to start metrics server: {0}")]
146 ServerError(String),
147
148 #[error("metrics server already running")]
150 AlreadyRunning,
151
152 #[error("metrics server not running")]
154 NotRunning,
155}
156
157#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160 pub namespace: String,
162 pub enable_process_metrics: bool,
164 pub enable_container_metrics: bool,
166 pub update_interval: Duration,
168 #[cfg(feature = "otel-metrics")]
170 pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174 fn default() -> Self {
175 Self {
176 namespace: String::new(),
177 enable_process_metrics: true,
178 enable_container_metrics: true,
179 update_interval: Duration::from_secs(15),
180 #[cfg(feature = "otel-metrics")]
181 otel: OtelMetricsConfig::default(),
182 }
183 }
184}
185
186struct RecorderSetup {
188 #[cfg(feature = "metrics")]
189 prom_handle: Option<PrometheusHandle>,
190 #[cfg(feature = "otel-metrics")]
191 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203 {
204 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205 let handle = recorder.handle();
206 metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207 RecorderSetup {
208 prom_handle: Some(handle),
209 }
210 }
211
212 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214 {
215 match otel::build_otel_recorder(&config.namespace, &config.otel) {
216 Ok((otel_recorder, provider)) => {
217 opentelemetry::global::set_meter_provider(provider.clone());
218 metrics::set_global_recorder(otel_recorder)
219 .expect("failed to set OTel metrics recorder");
220 RecorderSetup {
221 otel_provider: Some(provider),
222 }
223 }
224 Err(e) => {
225 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226 RecorderSetup {
227 otel_provider: None,
228 }
229 }
230 }
231 }
232
233 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235 {
236 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238 let prom_handle = prom_recorder.handle();
239
240 match otel::build_otel_recorder(&config.namespace, &config.otel) {
242 Ok((otel_recorder, provider)) => {
243 opentelemetry::global::set_meter_provider(provider.clone());
244
245 let fanout = metrics_util::layers::FanoutBuilder::default()
247 .add_recorder(prom_recorder)
248 .add_recorder(otel_recorder)
249 .build();
250
251 metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253 RecorderSetup {
254 prom_handle: Some(prom_handle),
255 otel_provider: Some(provider),
256 }
257 }
258 Err(e) => {
259 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261 metrics::set_global_recorder(prom_recorder)
262 .expect("failed to set Prometheus recorder");
263 RecorderSetup {
264 prom_handle: Some(prom_handle),
265 otel_provider: None,
266 }
267 }
268 }
269 }
270}
271
272pub struct MetricsManager {
274 #[cfg(feature = "metrics")]
275 handle: Option<PrometheusHandle>,
276 config: MetricsConfig,
277 shutdown_tx: Option<oneshot::Sender<()>>,
278 process_metrics: Option<ProcessMetrics>,
279 container_metrics: Option<ContainerMetrics>,
280 readiness_fn: Option<ReadinessFn>,
281 started: Arc<std::sync::atomic::AtomicBool>,
282 registry: MetricRegistry,
283 #[cfg(all(feature = "metrics", feature = "scaling"))]
284 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
285 #[cfg(all(feature = "metrics", feature = "memory"))]
286 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
287 #[cfg(feature = "otel-metrics")]
288 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
289}
290
291impl MetricsManager {
292 #[must_use]
294 pub fn new(namespace: &str) -> Self {
295 Self::with_config(MetricsConfig {
296 namespace: namespace.to_string(),
297 ..Default::default()
298 })
299 }
300
301 #[cfg(test)]
312 pub(crate) fn new_for_test(namespace: &str) -> Self {
313 let config = MetricsConfig {
314 namespace: namespace.to_string(),
315 enable_process_metrics: false,
316 enable_container_metrics: false,
317 ..Default::default()
318 };
319
320 let registry = MetricRegistry::new(&config.namespace);
321
322 Self {
323 #[cfg(feature = "metrics")]
324 handle: None,
325 registry,
326 config,
327 shutdown_tx: None,
328 process_metrics: None,
329 container_metrics: None,
330 readiness_fn: None,
331 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
332 #[cfg(all(feature = "metrics", feature = "scaling"))]
333 scaling_pressure: None,
334 #[cfg(all(feature = "metrics", feature = "memory"))]
335 memory_guard: None,
336 #[cfg(feature = "otel-metrics")]
337 otel_provider: None,
338 }
339 }
340
341 #[must_use]
348 pub fn with_config(config: MetricsConfig) -> Self {
349 let setup = install_recorders(&config);
350
351 let process_metrics = if config.enable_process_metrics {
352 Some(ProcessMetrics::new(&config.namespace))
353 } else {
354 None
355 };
356
357 let container_metrics = if config.enable_container_metrics {
358 Some(ContainerMetrics::new(&config.namespace))
359 } else {
360 None
361 };
362
363 let registry = MetricRegistry::new(&config.namespace);
364
365 Self {
366 #[cfg(feature = "metrics")]
367 handle: setup.prom_handle,
368 registry,
369 config,
370 shutdown_tx: None,
371 process_metrics,
372 container_metrics,
373 readiness_fn: None,
374 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
375 #[cfg(all(feature = "metrics", feature = "scaling"))]
376 scaling_pressure: None,
377 #[cfg(all(feature = "metrics", feature = "memory"))]
378 memory_guard: None,
379 #[cfg(feature = "otel-metrics")]
380 otel_provider: setup.otel_provider,
381 }
382 }
383
384 #[must_use]
389 pub fn counter(&self, name: &str, description: &str) -> Counter {
390 let key = self.prefixed_key(name);
391 let desc = description.to_string();
392 metrics::describe_counter!(key.clone(), desc.clone());
393 self.registry.push(MetricDescriptor {
394 name: key.clone(),
395 metric_type: MetricType::Counter,
396 description: desc,
397 unit: String::new(),
398 labels: vec![],
399 group: "custom".into(),
400 buckets: None,
401 use_cases: vec![],
402 dashboard_hint: None,
403 });
404 metrics::counter!(key)
405 }
406
407 #[must_use]
413 pub fn counter_with_labels(
414 &self,
415 name: &str,
416 description: &str,
417 labels: &[&str],
418 group: &str,
419 ) -> Counter {
420 let key = self.prefixed_key(name);
421 let desc = description.to_string();
422 metrics::describe_counter!(key.clone(), desc.clone());
423 self.registry.push(MetricDescriptor {
424 name: key.clone(),
425 metric_type: MetricType::Counter,
426 description: desc,
427 unit: String::new(),
428 labels: labels.iter().map(|s| (*s).to_string()).collect(),
429 group: group.into(),
430 buckets: None,
431 use_cases: vec![],
432 dashboard_hint: None,
433 });
434 metrics::counter!(key)
435 }
436
437 #[must_use]
442 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
443 let key = self.prefixed_key(name);
444 let desc = description.to_string();
445 metrics::describe_gauge!(key.clone(), desc.clone());
446 self.registry.push(MetricDescriptor {
447 name: key.clone(),
448 metric_type: MetricType::Gauge,
449 description: desc,
450 unit: String::new(),
451 labels: vec![],
452 group: "custom".into(),
453 buckets: None,
454 use_cases: vec![],
455 dashboard_hint: None,
456 });
457 metrics::gauge!(key)
458 }
459
460 #[must_use]
462 pub fn gauge_with_labels(
463 &self,
464 name: &str,
465 description: &str,
466 labels: &[&str],
467 group: &str,
468 ) -> Gauge {
469 let key = self.prefixed_key(name);
470 let desc = description.to_string();
471 metrics::describe_gauge!(key.clone(), desc.clone());
472 self.registry.push(MetricDescriptor {
473 name: key.clone(),
474 metric_type: MetricType::Gauge,
475 description: desc,
476 unit: String::new(),
477 labels: labels.iter().map(|s| (*s).to_string()).collect(),
478 group: group.into(),
479 buckets: None,
480 use_cases: vec![],
481 dashboard_hint: None,
482 });
483 metrics::gauge!(key)
484 }
485
486 #[must_use]
491 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
492 let key = self.prefixed_key(name);
493 let desc = description.to_string();
494 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
495 self.registry.push(MetricDescriptor {
496 name: key.clone(),
497 metric_type: MetricType::Histogram,
498 description: desc,
499 unit: "seconds".into(),
500 labels: vec![],
501 group: "custom".into(),
502 buckets: None,
503 use_cases: vec![],
504 dashboard_hint: None,
505 });
506 metrics::histogram!(key)
507 }
508
509 #[must_use]
511 pub fn histogram_with_labels(
512 &self,
513 name: &str,
514 description: &str,
515 labels: &[&str],
516 group: &str,
517 buckets: Option<&[f64]>,
518 ) -> Histogram {
519 let key = self.prefixed_key(name);
520 let desc = description.to_string();
521 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
522 self.registry.push(MetricDescriptor {
523 name: key.clone(),
524 metric_type: MetricType::Histogram,
525 description: desc,
526 unit: "seconds".into(),
527 labels: labels.iter().map(|s| (*s).to_string()).collect(),
528 group: group.into(),
529 buckets: buckets.map(|b| b.to_vec()),
530 use_cases: vec![],
531 dashboard_hint: None,
532 });
533 metrics::histogram!(key)
534 }
535
536 #[must_use]
542 pub fn histogram_with_buckets(
543 &self,
544 name: &str,
545 description: &str,
546 buckets: &[f64],
547 ) -> Histogram {
548 let key = self.prefixed_key(name);
549 let desc = description.to_string();
550 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
551 self.registry.push(MetricDescriptor {
552 name: key.clone(),
553 metric_type: MetricType::Histogram,
554 description: desc,
555 unit: "seconds".into(),
556 labels: vec![],
557 group: "custom".into(),
558 buckets: Some(buckets.to_vec()),
559 use_cases: vec![],
560 dashboard_hint: None,
561 });
562 metrics::histogram!(key)
563 }
564
565 #[cfg(feature = "metrics")]
570 #[must_use]
571 pub fn render(&self) -> String {
572 self.handle
573 .as_ref()
574 .map_or_else(String::new, PrometheusHandle::render)
575 }
576
577 #[cfg(feature = "metrics")]
600 #[must_use]
601 pub fn render_handle(&self) -> Option<RenderHandle> {
602 self.handle.clone().map(RenderHandle)
603 }
604
605 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
611 self.readiness_fn = Some(Arc::new(f));
612 }
613
614 pub fn mark_started(&self) {
620 self.started
621 .store(true, std::sync::atomic::Ordering::Release);
622 }
623
624 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
626 Arc::clone(&self.started)
627 }
628
629 #[cfg(all(feature = "metrics", feature = "scaling"))]
634 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
635 self.scaling_pressure = Some(sp);
636 }
637
638 #[cfg(all(feature = "metrics", feature = "memory"))]
643 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
644 self.memory_guard = Some(mg);
645 }
646
647 pub fn update(&self) {
649 if let Some(ref pm) = self.process_metrics {
650 pm.update();
651 }
652 if let Some(ref cm) = self.container_metrics {
653 cm.update();
654 }
655 }
656
657 #[cfg(feature = "metrics")]
668 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
669 if self.shutdown_tx.is_some() {
670 return Err(MetricsError::AlreadyRunning);
671 }
672
673 let addr: SocketAddr = addr
674 .parse()
675 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
676
677 let listener = TcpListener::bind(addr)
678 .await
679 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
680
681 let (shutdown_tx, shutdown_rx) = oneshot::channel();
682 self.shutdown_tx = Some(shutdown_tx);
683
684 let handle = self
685 .handle
686 .as_ref()
687 .ok_or_else(|| {
688 MetricsError::ServerError(
689 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
690 )
691 })?
692 .clone();
693 let update_interval = self.config.update_interval;
694 let process_metrics = self.process_metrics.clone();
695 let container_metrics = self.container_metrics.clone();
696 let readiness_fn = self.readiness_fn.clone();
697 let started_flag = self.started_flag();
698
699 let registry = self.registry();
700
701 tokio::spawn(async move {
702 run_server(
703 listener,
704 handle,
705 registry,
706 shutdown_rx,
707 update_interval,
708 process_metrics,
709 container_metrics,
710 readiness_fn,
711 started_flag,
712 )
713 .await;
714 });
715
716 Ok(())
717 }
718
719 #[cfg(all(feature = "metrics", feature = "http-server"))]
737 pub async fn start_server_with_routes(
738 &mut self,
739 addr: &str,
740 extra_routes: axum::Router,
741 ) -> Result<(), MetricsError> {
742 if self.shutdown_tx.is_some() {
743 return Err(MetricsError::AlreadyRunning);
744 }
745
746 let addr: SocketAddr = addr
747 .parse()
748 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
749
750 let listener = TcpListener::bind(addr)
751 .await
752 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
753
754 let (shutdown_tx, shutdown_rx) = oneshot::channel();
755 self.shutdown_tx = Some(shutdown_tx);
756
757 let handle = self
758 .handle
759 .as_ref()
760 .ok_or_else(|| {
761 MetricsError::ServerError(
762 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
763 )
764 })?
765 .clone();
766 let update_interval = self.config.update_interval;
767 let process_metrics = self.process_metrics.clone();
768 let container_metrics = self.container_metrics.clone();
769 let readiness_fn = self.readiness_fn.clone();
770
771 let metrics_handle = handle.clone();
773 let readiness_for_live = readiness_fn.clone();
774 let registry_handle = self.registry();
775
776 let mut app = axum::Router::new()
777 .route(
778 "/metrics/manifest",
779 axum::routing::get(move || {
780 let reg = registry_handle.clone();
781 async move {
782 (
783 [(axum::http::header::CONTENT_TYPE, "application/json")],
784 serde_json::to_string(®.manifest()).unwrap_or_default(),
785 )
786 }
787 }),
788 )
789 .route(
790 "/metrics",
791 axum::routing::get(move || {
792 let h = metrics_handle.clone();
793 async move { h.render() }
794 }),
795 )
796 .route("/startupz", {
797 let sf = self.started_flag();
798 axum::routing::get(move || {
799 let started = sf.load(std::sync::atomic::Ordering::Acquire);
800 async move {
801 if started {
802 (
803 axum::http::StatusCode::OK,
804 [(axum::http::header::CONTENT_TYPE, "application/json")],
805 r#"{"status":"started"}"#,
806 )
807 } else {
808 (
809 axum::http::StatusCode::SERVICE_UNAVAILABLE,
810 [(axum::http::header::CONTENT_TYPE, "application/json")],
811 r#"{"status":"starting"}"#,
812 )
813 }
814 }
815 })
816 })
817 .route(
818 "/healthz",
819 axum::routing::get(|| async {
820 (
821 [(axum::http::header::CONTENT_TYPE, "application/json")],
822 r#"{"status":"alive"}"#,
823 )
824 }),
825 )
826 .route(
827 "/health/live",
828 axum::routing::get(|| async {
829 (
830 [(axum::http::header::CONTENT_TYPE, "application/json")],
831 r#"{"status":"alive"}"#,
832 )
833 }),
834 )
835 .route(
836 "/readyz",
837 axum::routing::get(move || {
838 let rf = readiness_fn.clone();
839 async move { readiness_response(rf) }
840 }),
841 )
842 .route(
843 "/health/ready",
844 axum::routing::get(move || {
845 let rf = readiness_for_live.clone();
846 async move { readiness_response(rf) }
847 }),
848 );
849
850 #[cfg(feature = "scaling")]
852 if let Some(ref sp) = self.scaling_pressure {
853 let sp = sp.clone();
854 app = app.route(
855 "/scaling/pressure",
856 axum::routing::get(move || {
857 let s = sp.clone();
858 async move { format!("{:.2}", s.calculate()) }
859 }),
860 );
861 }
862
863 #[cfg(feature = "memory")]
865 if let Some(ref mg) = self.memory_guard {
866 let mg = mg.clone();
867 app = app.route(
868 "/memory/pressure",
869 axum::routing::get(move || {
870 let m = mg.clone();
871 async move {
872 (
873 [(axum::http::header::CONTENT_TYPE, "application/json")],
874 format!(
875 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
876 m.under_pressure(),
877 m.pressure_ratio(),
878 m.current_bytes(),
879 m.limit_bytes()
880 ),
881 )
882 }
883 }),
884 );
885 }
886
887 app = app.merge(extra_routes);
889
890 tokio::spawn(async move {
891 run_axum_server(
892 listener,
893 app,
894 shutdown_rx,
895 update_interval,
896 process_metrics,
897 container_metrics,
898 )
899 .await;
900 });
901
902 Ok(())
903 }
904
905 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
911 if let Some(tx) = self.shutdown_tx.take() {
912 let _ = tx.send(());
913 Ok(())
914 } else {
915 Err(MetricsError::NotRunning)
916 }
917 }
918
919 #[cfg(feature = "otel-metrics")]
923 pub fn shutdown_otel(&mut self) {
924 if let Some(provider) = self.otel_provider.take()
925 && let Err(e) = provider.shutdown()
926 {
927 tracing::warn!(error = %e, "OTel provider shutdown error");
928 }
929 }
930
931 pub fn set_build_info(&self, version: &str, commit: &str) {
937 self.registry.set_build_info(version, commit);
938 }
939
940 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
944 self.registry.set_use_cases(metric_name, use_cases);
945 }
946
947 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
951 self.registry.set_dashboard_hint(metric_name, hint);
952 }
953
954 #[must_use]
959 pub fn registry(&self) -> MetricRegistry {
960 self.registry.clone()
961 }
962
963 #[must_use]
967 pub fn namespace(&self) -> &str {
968 &self.config.namespace
969 }
970
971 fn prefixed_key(&self, name: &str) -> String {
973 if self.config.namespace.is_empty() {
974 name.to_string()
975 } else {
976 format!("{}_{}", self.config.namespace, name)
977 }
978 }
979}
980
981#[cfg(feature = "metrics")]
983#[allow(clippy::too_many_arguments)]
984async fn run_server(
985 listener: TcpListener,
986 handle: PrometheusHandle,
987 registry: MetricRegistry,
988 mut shutdown_rx: oneshot::Receiver<()>,
989 update_interval: Duration,
990 process_metrics: Option<ProcessMetrics>,
991 container_metrics: Option<ContainerMetrics>,
992 readiness_fn: Option<ReadinessFn>,
993 started_flag: Arc<std::sync::atomic::AtomicBool>,
994) {
995 let mut update_interval = tokio::time::interval(update_interval);
996
997 loop {
998 tokio::select! {
999 _ = &mut shutdown_rx => {
1000 break;
1001 }
1002 _ = update_interval.tick() => {
1003 if let Some(ref pm) = process_metrics {
1004 pm.update();
1005 }
1006 if let Some(ref cm) = container_metrics {
1007 cm.update();
1008 }
1009 }
1010 result = listener.accept() => {
1011 if let Ok((stream, _)) = result {
1012 let handle = handle.clone();
1013 let registry = registry.clone();
1014 let readiness_fn = readiness_fn.clone();
1015 let sf = Arc::clone(&started_flag);
1016 tokio::spawn(async move {
1017 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1018 });
1019 }
1020 }
1021 }
1022 }
1023}
1024
1025#[cfg(feature = "metrics")]
1030async fn handle_connection(
1031 mut stream: tokio::net::TcpStream,
1032 handle: PrometheusHandle,
1033 registry: MetricRegistry,
1034 readiness_fn: Option<ReadinessFn>,
1035 started_flag: &std::sync::atomic::AtomicBool,
1036) {
1037 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1038
1039 let mut reader = BufReader::new(&mut stream);
1040 let mut request_line = String::new();
1041
1042 if reader.read_line(&mut request_line).await.is_err() {
1043 return;
1044 }
1045
1046 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1048 (
1049 "200 OK",
1050 "application/json",
1051 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1052 )
1053 } else if request_line.starts_with("GET /metrics") {
1054 ("200 OK", "text/plain; charset=utf-8", handle.render())
1055 } else if request_line.starts_with("GET /startupz")
1056 || request_line.starts_with("GET /health/startup")
1057 {
1058 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1059 (
1060 "200 OK",
1061 "application/json",
1062 r#"{"status":"started"}"#.to_string(),
1063 )
1064 } else {
1065 (
1066 "503 Service Unavailable",
1067 "application/json",
1068 r#"{"status":"starting"}"#.to_string(),
1069 )
1070 }
1071 } else if request_line.starts_with("GET /healthz")
1072 || request_line.starts_with("GET /health/live")
1073 {
1074 (
1075 "200 OK",
1076 "application/json",
1077 r#"{"status":"alive"}"#.to_string(),
1078 )
1079 } else if request_line.starts_with("GET /readyz")
1080 || request_line.starts_with("GET /health/ready")
1081 {
1082 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1083
1084 #[cfg(feature = "health")]
1085 let registry_ready = crate::health::HealthRegistry::is_ready();
1086 #[cfg(not(feature = "health"))]
1087 let registry_ready = true;
1088
1089 let ready = callback_ready && registry_ready;
1090 if ready {
1091 (
1092 "200 OK",
1093 "application/json",
1094 r#"{"status":"ready"}"#.to_string(),
1095 )
1096 } else {
1097 (
1098 "503 Service Unavailable",
1099 "application/json",
1100 r#"{"status":"not_ready"}"#.to_string(),
1101 )
1102 }
1103 } else {
1104 (
1105 "404 Not Found",
1106 "text/plain; charset=utf-8",
1107 "Not Found".to_string(),
1108 )
1109 };
1110
1111 let response = format!(
1112 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1113 body.len()
1114 );
1115
1116 let _ = stream.write_all(response.as_bytes()).await;
1117}
1118
1119#[cfg(all(feature = "metrics", feature = "http-server"))]
1125fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1126 use axum::response::IntoResponse;
1127
1128 let callback_ready = rf.as_ref().is_none_or(|f| f());
1129
1130 #[cfg(feature = "health")]
1131 let registry_ready = crate::health::HealthRegistry::is_ready();
1132 #[cfg(not(feature = "health"))]
1133 let registry_ready = true;
1134
1135 let ready = callback_ready && registry_ready;
1136 if ready {
1137 (
1138 [(axum::http::header::CONTENT_TYPE, "application/json")],
1139 r#"{"status":"ready"}"#,
1140 )
1141 .into_response()
1142 } else {
1143 (
1144 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1145 [(axum::http::header::CONTENT_TYPE, "application/json")],
1146 r#"{"status":"not_ready"}"#,
1147 )
1148 .into_response()
1149 }
1150}
1151
1152#[cfg(all(feature = "metrics", feature = "http-server"))]
1154async fn run_axum_server(
1155 listener: TcpListener,
1156 app: axum::Router,
1157 shutdown_rx: oneshot::Receiver<()>,
1158 update_interval: Duration,
1159 process_metrics: Option<ProcessMetrics>,
1160 container_metrics: Option<ContainerMetrics>,
1161) {
1162 let mut interval = tokio::time::interval(update_interval);
1163
1164 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1166 tokio::spawn(async move {
1167 loop {
1168 tokio::select! {
1169 _ = &mut update_stop_rx => break,
1170 _ = interval.tick() => {
1171 if let Some(ref pm) = process_metrics {
1172 pm.update();
1173 }
1174 if let Some(ref cm) = container_metrics {
1175 cm.update();
1176 }
1177 }
1178 }
1179 }
1180 });
1181
1182 axum::serve(listener, app)
1184 .with_graceful_shutdown(async move {
1185 let _ = shutdown_rx.await;
1186 })
1187 .await
1188 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1189
1190 let _ = update_stop_tx.send(());
1191}
1192
1193#[must_use]
1195pub fn latency_buckets() -> Vec<f64> {
1196 vec![
1197 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1198 ]
1199}
1200
1201#[must_use]
1203pub fn size_buckets() -> Vec<f64> {
1204 vec![
1205 100.0,
1206 1_000.0,
1207 10_000.0,
1208 100_000.0,
1209 1_000_000.0,
1210 10_000_000.0,
1211 ]
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use super::*;
1217
1218 #[test]
1219 fn test_metrics_config_default() {
1220 let config = MetricsConfig::default();
1221 assert!(config.namespace.is_empty());
1222 assert!(config.enable_process_metrics);
1223 assert!(config.enable_container_metrics);
1224 assert_eq!(config.update_interval, Duration::from_secs(15));
1225 }
1226
1227 #[test]
1228 fn test_latency_buckets() {
1229 let buckets = latency_buckets();
1230 assert_eq!(buckets.len(), 12);
1231 assert!(buckets[0] < buckets[11]);
1232 }
1233
1234 #[test]
1235 fn test_size_buckets() {
1236 let buckets = size_buckets();
1237 assert_eq!(buckets.len(), 6);
1238 assert!(buckets[0] < buckets[5]);
1239 }
1240}