1mod container;
86pub mod dfe;
87pub mod labels;
88pub mod manifest;
89mod process;
90
91pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
92
93#[cfg(feature = "otel-metrics")]
94pub(crate) mod otel;
95#[cfg(feature = "otel-metrics")]
96pub mod otel_types;
97
98use std::net::SocketAddr;
99use std::sync::Arc;
100use std::time::Duration;
101
102use metrics::{Counter, Gauge, Histogram, Unit};
103use thiserror::Error;
104use tokio::net::TcpListener;
105use tokio::sync::oneshot;
106
107pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
109
110#[cfg(feature = "metrics")]
111use metrics_exporter_prometheus::PrometheusHandle;
112
113pub use container::ContainerMetrics;
114pub use dfe::DfeMetrics;
115#[cfg(feature = "metrics-dfe")]
116pub mod dfe_groups;
117pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
118pub use process::ProcessMetrics;
119
120#[cfg(feature = "otel-metrics")]
121pub use otel_types::{OtelMetricsConfig, OtelProtocol};
122
123#[cfg(feature = "metrics")]
128#[derive(Clone)]
129pub struct RenderHandle(PrometheusHandle);
130
131#[cfg(feature = "metrics")]
132impl RenderHandle {
133 #[must_use]
135 pub fn render(&self) -> String {
136 self.0.render()
137 }
138}
139
140#[derive(Debug, Error)]
142pub enum MetricsError {
143 #[error("failed to build metrics exporter: {0}")]
145 BuildError(String),
146
147 #[error("failed to start metrics server: {0}")]
149 ServerError(String),
150
151 #[error("metrics server already running")]
153 AlreadyRunning,
154
155 #[error("metrics server not running")]
157 NotRunning,
158}
159
160#[derive(Debug, Clone)]
162pub struct MetricsConfig {
163 pub namespace: String,
165 pub enable_process_metrics: bool,
167 pub enable_container_metrics: bool,
169 pub update_interval: Duration,
171 #[cfg(feature = "otel-metrics")]
173 pub otel: OtelMetricsConfig,
174}
175
176impl Default for MetricsConfig {
177 fn default() -> Self {
178 Self {
179 namespace: String::new(),
180 enable_process_metrics: true,
181 enable_container_metrics: true,
182 update_interval: Duration::from_secs(15),
183 #[cfg(feature = "otel-metrics")]
184 otel: OtelMetricsConfig::default(),
185 }
186 }
187}
188
189struct RecorderSetup {
191 #[cfg(feature = "metrics")]
192 prom_handle: Option<PrometheusHandle>,
193 #[cfg(feature = "otel-metrics")]
194 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
195}
196
197#[allow(unused_variables)]
203fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
204 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
206 {
207 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
208 let handle = recorder.handle();
209 if let Err(e) = metrics::set_global_recorder(recorder) {
210 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
211 }
212 RecorderSetup {
213 prom_handle: Some(handle),
214 }
215 }
216
217 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
219 {
220 match otel::build_otel_recorder(&config.namespace, &config.otel) {
221 Ok((otel_recorder, provider)) => {
222 opentelemetry::global::set_meter_provider(provider.clone());
223 if let Err(e) = metrics::set_global_recorder(otel_recorder) {
224 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
225 }
226 RecorderSetup {
227 otel_provider: Some(provider),
228 }
229 }
230 Err(e) => {
231 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
232 RecorderSetup {
233 otel_provider: None,
234 }
235 }
236 }
237 }
238
239 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
241 {
242 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
244 let prom_handle = prom_recorder.handle();
245
246 match otel::build_otel_recorder(&config.namespace, &config.otel) {
248 Ok((otel_recorder, provider)) => {
249 opentelemetry::global::set_meter_provider(provider.clone());
250
251 let fanout = metrics_util::layers::FanoutBuilder::default()
253 .add_recorder(prom_recorder)
254 .add_recorder(otel_recorder)
255 .build();
256
257 if let Err(e) = metrics::set_global_recorder(fanout) {
258 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
259 }
260
261 RecorderSetup {
262 prom_handle: Some(prom_handle),
263 otel_provider: Some(provider),
264 }
265 }
266 Err(e) => {
267 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
269 if let Err(e) = metrics::set_global_recorder(prom_recorder) {
270 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
271 }
272 RecorderSetup {
273 prom_handle: Some(prom_handle),
274 otel_provider: None,
275 }
276 }
277 }
278 }
279}
280
281pub struct MetricsManager {
283 #[cfg(feature = "metrics")]
284 handle: Option<PrometheusHandle>,
285 config: MetricsConfig,
286 shutdown_tx: Option<oneshot::Sender<()>>,
287 process_metrics: Option<ProcessMetrics>,
288 container_metrics: Option<ContainerMetrics>,
289 readiness_fn: Option<ReadinessFn>,
290 started: Arc<std::sync::atomic::AtomicBool>,
291 registry: MetricRegistry,
292 #[cfg(all(feature = "metrics", feature = "scaling"))]
293 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
294 #[cfg(all(feature = "metrics", feature = "memory"))]
295 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
296 #[cfg(feature = "otel-metrics")]
297 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
298}
299
300impl MetricsManager {
301 #[must_use]
303 pub fn new(namespace: &str) -> Self {
304 Self::with_config(MetricsConfig {
305 namespace: namespace.to_string(),
306 ..Default::default()
307 })
308 }
309
310 #[cfg(test)]
321 pub(crate) fn new_for_test(namespace: &str) -> Self {
322 let config = MetricsConfig {
323 namespace: namespace.to_string(),
324 enable_process_metrics: false,
325 enable_container_metrics: false,
326 ..Default::default()
327 };
328
329 let registry = MetricRegistry::new(&config.namespace);
330
331 Self {
332 #[cfg(feature = "metrics")]
333 handle: None,
334 registry,
335 config,
336 shutdown_tx: None,
337 process_metrics: None,
338 container_metrics: None,
339 readiness_fn: None,
340 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
341 #[cfg(all(feature = "metrics", feature = "scaling"))]
342 scaling_pressure: None,
343 #[cfg(all(feature = "metrics", feature = "memory"))]
344 memory_guard: None,
345 #[cfg(feature = "otel-metrics")]
346 otel_provider: None,
347 }
348 }
349
350 #[must_use]
357 pub fn with_config(config: MetricsConfig) -> Self {
358 let setup = install_recorders(&config);
359
360 let process_metrics = if config.enable_process_metrics {
361 Some(ProcessMetrics::new(&config.namespace))
362 } else {
363 None
364 };
365
366 let container_metrics = if config.enable_container_metrics {
367 Some(ContainerMetrics::new(&config.namespace))
368 } else {
369 None
370 };
371
372 let registry = MetricRegistry::new(&config.namespace);
373
374 Self {
375 #[cfg(feature = "metrics")]
376 handle: setup.prom_handle,
377 registry,
378 config,
379 shutdown_tx: None,
380 process_metrics,
381 container_metrics,
382 readiness_fn: None,
383 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
384 #[cfg(all(feature = "metrics", feature = "scaling"))]
385 scaling_pressure: None,
386 #[cfg(all(feature = "metrics", feature = "memory"))]
387 memory_guard: None,
388 #[cfg(feature = "otel-metrics")]
389 otel_provider: setup.otel_provider,
390 }
391 }
392
393 #[must_use]
398 pub fn counter(&self, name: &str, description: &str) -> Counter {
399 let key = self.prefixed_key(name);
400 let desc = description.to_string();
401 metrics::describe_counter!(key.clone(), desc.clone());
402 self.registry.push(MetricDescriptor {
403 name: key.clone(),
404 metric_type: MetricType::Counter,
405 description: desc,
406 unit: String::new(),
407 labels: vec![],
408 group: "custom".into(),
409 buckets: None,
410 use_cases: vec![],
411 dashboard_hint: None,
412 });
413 metrics::counter!(key)
414 }
415
416 #[must_use]
422 pub fn counter_with_labels(
423 &self,
424 name: &str,
425 description: &str,
426 labels: &[&str],
427 group: &str,
428 ) -> Counter {
429 let key = self.prefixed_key(name);
430 let desc = description.to_string();
431 metrics::describe_counter!(key.clone(), desc.clone());
432 self.registry.push(MetricDescriptor {
433 name: key.clone(),
434 metric_type: MetricType::Counter,
435 description: desc,
436 unit: String::new(),
437 labels: labels.iter().map(|s| (*s).to_string()).collect(),
438 group: group.into(),
439 buckets: None,
440 use_cases: vec![],
441 dashboard_hint: None,
442 });
443 metrics::counter!(key)
444 }
445
446 #[must_use]
451 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
452 let key = self.prefixed_key(name);
453 let desc = description.to_string();
454 metrics::describe_gauge!(key.clone(), desc.clone());
455 self.registry.push(MetricDescriptor {
456 name: key.clone(),
457 metric_type: MetricType::Gauge,
458 description: desc,
459 unit: String::new(),
460 labels: vec![],
461 group: "custom".into(),
462 buckets: None,
463 use_cases: vec![],
464 dashboard_hint: None,
465 });
466 metrics::gauge!(key)
467 }
468
469 #[must_use]
471 pub fn gauge_with_labels(
472 &self,
473 name: &str,
474 description: &str,
475 labels: &[&str],
476 group: &str,
477 ) -> Gauge {
478 let key = self.prefixed_key(name);
479 let desc = description.to_string();
480 metrics::describe_gauge!(key.clone(), desc.clone());
481 self.registry.push(MetricDescriptor {
482 name: key.clone(),
483 metric_type: MetricType::Gauge,
484 description: desc,
485 unit: String::new(),
486 labels: labels.iter().map(|s| (*s).to_string()).collect(),
487 group: group.into(),
488 buckets: None,
489 use_cases: vec![],
490 dashboard_hint: None,
491 });
492 metrics::gauge!(key)
493 }
494
495 #[must_use]
500 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
501 let key = self.prefixed_key(name);
502 let desc = description.to_string();
503 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
504 self.registry.push(MetricDescriptor {
505 name: key.clone(),
506 metric_type: MetricType::Histogram,
507 description: desc,
508 unit: "seconds".into(),
509 labels: vec![],
510 group: "custom".into(),
511 buckets: None,
512 use_cases: vec![],
513 dashboard_hint: None,
514 });
515 metrics::histogram!(key)
516 }
517
518 #[must_use]
520 pub fn histogram_with_labels(
521 &self,
522 name: &str,
523 description: &str,
524 labels: &[&str],
525 group: &str,
526 buckets: Option<&[f64]>,
527 ) -> Histogram {
528 let key = self.prefixed_key(name);
529 let desc = description.to_string();
530 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
531 self.registry.push(MetricDescriptor {
532 name: key.clone(),
533 metric_type: MetricType::Histogram,
534 description: desc,
535 unit: "seconds".into(),
536 labels: labels.iter().map(|s| (*s).to_string()).collect(),
537 group: group.into(),
538 buckets: buckets.map(|b| b.to_vec()),
539 use_cases: vec![],
540 dashboard_hint: None,
541 });
542 metrics::histogram!(key)
543 }
544
545 #[must_use]
551 pub fn histogram_with_buckets(
552 &self,
553 name: &str,
554 description: &str,
555 buckets: &[f64],
556 ) -> Histogram {
557 let key = self.prefixed_key(name);
558 let desc = description.to_string();
559 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
560 self.registry.push(MetricDescriptor {
561 name: key.clone(),
562 metric_type: MetricType::Histogram,
563 description: desc,
564 unit: "seconds".into(),
565 labels: vec![],
566 group: "custom".into(),
567 buckets: Some(buckets.to_vec()),
568 use_cases: vec![],
569 dashboard_hint: None,
570 });
571 metrics::histogram!(key)
572 }
573
574 #[cfg(feature = "metrics")]
579 #[must_use]
580 pub fn render(&self) -> String {
581 self.handle
582 .as_ref()
583 .map_or_else(String::new, PrometheusHandle::render)
584 }
585
586 #[cfg(feature = "metrics")]
609 #[must_use]
610 pub fn render_handle(&self) -> Option<RenderHandle> {
611 self.handle.clone().map(RenderHandle)
612 }
613
614 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
620 self.readiness_fn = Some(Arc::new(f));
621 }
622
623 pub fn mark_started(&self) {
629 self.started
630 .store(true, std::sync::atomic::Ordering::Release);
631 }
632
633 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
635 Arc::clone(&self.started)
636 }
637
638 #[cfg(all(feature = "metrics", feature = "scaling"))]
643 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
644 self.scaling_pressure = Some(sp);
645 }
646
647 #[cfg(all(feature = "metrics", feature = "memory"))]
652 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
653 self.memory_guard = Some(mg);
654 }
655
656 pub fn update(&self) {
658 if let Some(ref pm) = self.process_metrics {
659 pm.update();
660 }
661 if let Some(ref cm) = self.container_metrics {
662 cm.update();
663 }
664 }
665
666 #[cfg(feature = "metrics")]
677 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
678 if self.shutdown_tx.is_some() {
679 return Err(MetricsError::AlreadyRunning);
680 }
681
682 let addr: SocketAddr = addr
683 .parse()
684 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
685
686 let listener = TcpListener::bind(addr)
687 .await
688 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
689
690 let (shutdown_tx, shutdown_rx) = oneshot::channel();
691 self.shutdown_tx = Some(shutdown_tx);
692
693 let handle = self
694 .handle
695 .as_ref()
696 .ok_or_else(|| {
697 MetricsError::ServerError(
698 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
699 )
700 })?
701 .clone();
702 let update_interval = self.config.update_interval;
703 let process_metrics = self.process_metrics.clone();
704 let container_metrics = self.container_metrics.clone();
705 let readiness_fn = self.readiness_fn.clone();
706 let started_flag = self.started_flag();
707
708 let registry = self.registry();
709
710 tokio::spawn(async move {
711 run_server(
712 listener,
713 handle,
714 registry,
715 shutdown_rx,
716 update_interval,
717 process_metrics,
718 container_metrics,
719 readiness_fn,
720 started_flag,
721 )
722 .await;
723 });
724
725 Ok(())
726 }
727
728 #[cfg(all(feature = "metrics", feature = "http-server"))]
746 pub async fn start_server_with_routes(
747 &mut self,
748 addr: &str,
749 extra_routes: axum::Router,
750 ) -> Result<(), MetricsError> {
751 if self.shutdown_tx.is_some() {
752 return Err(MetricsError::AlreadyRunning);
753 }
754
755 let addr: SocketAddr = addr
756 .parse()
757 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
758
759 let listener = TcpListener::bind(addr)
760 .await
761 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
762
763 let (shutdown_tx, shutdown_rx) = oneshot::channel();
764 self.shutdown_tx = Some(shutdown_tx);
765
766 let handle = self
767 .handle
768 .as_ref()
769 .ok_or_else(|| {
770 MetricsError::ServerError(
771 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
772 )
773 })?
774 .clone();
775 let update_interval = self.config.update_interval;
776 let process_metrics = self.process_metrics.clone();
777 let container_metrics = self.container_metrics.clone();
778 let readiness_fn = self.readiness_fn.clone();
779
780 let metrics_handle = handle.clone();
782 let readiness_for_live = readiness_fn.clone();
783 let registry_handle = self.registry();
784
785 let mut app = axum::Router::new()
786 .route(
787 "/metrics/manifest",
788 axum::routing::get(move || {
789 let reg = registry_handle.clone();
790 async move {
791 (
792 [(axum::http::header::CONTENT_TYPE, "application/json")],
793 serde_json::to_string(®.manifest()).unwrap_or_default(),
794 )
795 }
796 }),
797 )
798 .route(
799 "/metrics",
800 axum::routing::get(move || {
801 let h = metrics_handle.clone();
802 async move { h.render() }
803 }),
804 )
805 .route("/startupz", {
806 let sf = self.started_flag();
807 axum::routing::get(move || {
808 let started = sf.load(std::sync::atomic::Ordering::Acquire);
809 async move {
810 if started {
811 (
812 axum::http::StatusCode::OK,
813 [(axum::http::header::CONTENT_TYPE, "application/json")],
814 r#"{"status":"started"}"#,
815 )
816 } else {
817 (
818 axum::http::StatusCode::SERVICE_UNAVAILABLE,
819 [(axum::http::header::CONTENT_TYPE, "application/json")],
820 r#"{"status":"starting"}"#,
821 )
822 }
823 }
824 })
825 })
826 .route(
827 "/healthz",
828 axum::routing::get(|| async {
829 (
830 [(axum::http::header::CONTENT_TYPE, "application/json")],
831 r#"{"status":"alive"}"#,
832 )
833 }),
834 )
835 .route(
836 "/health/live",
837 axum::routing::get(|| async {
838 (
839 [(axum::http::header::CONTENT_TYPE, "application/json")],
840 r#"{"status":"alive"}"#,
841 )
842 }),
843 )
844 .route(
845 "/readyz",
846 axum::routing::get(move || {
847 let rf = readiness_fn.clone();
848 async move { readiness_response(rf) }
849 }),
850 )
851 .route(
852 "/health/ready",
853 axum::routing::get(move || {
854 let rf = readiness_for_live.clone();
855 async move { readiness_response(rf) }
856 }),
857 );
858
859 #[cfg(feature = "scaling")]
861 if let Some(ref sp) = self.scaling_pressure {
862 let sp = sp.clone();
863 app = app.route(
864 "/scaling/pressure",
865 axum::routing::get(move || {
866 let s = sp.clone();
867 async move { format!("{:.2}", s.calculate()) }
868 }),
869 );
870 }
871
872 #[cfg(feature = "memory")]
874 if let Some(ref mg) = self.memory_guard {
875 let mg = mg.clone();
876 app = app.route(
877 "/memory/pressure",
878 axum::routing::get(move || {
879 let m = mg.clone();
880 async move {
881 (
882 [(axum::http::header::CONTENT_TYPE, "application/json")],
883 format!(
884 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
885 m.under_pressure(),
886 m.pressure_ratio(),
887 m.current_bytes(),
888 m.limit_bytes()
889 ),
890 )
891 }
892 }),
893 );
894 }
895
896 app = app.merge(extra_routes);
898
899 tokio::spawn(async move {
900 run_axum_server(
901 listener,
902 app,
903 shutdown_rx,
904 update_interval,
905 process_metrics,
906 container_metrics,
907 )
908 .await;
909 });
910
911 Ok(())
912 }
913
914 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
920 if let Some(tx) = self.shutdown_tx.take() {
921 let _ = tx.send(());
922 Ok(())
923 } else {
924 Err(MetricsError::NotRunning)
925 }
926 }
927
928 #[cfg(feature = "otel-metrics")]
932 pub fn shutdown_otel(&mut self) {
933 if let Some(provider) = self.otel_provider.take()
934 && let Err(e) = provider.shutdown()
935 {
936 tracing::warn!(error = %e, "OTel provider shutdown error");
937 }
938 }
939
940 pub fn set_build_info(&self, version: &str, commit: &str) {
946 self.registry.set_build_info(version, commit);
947 }
948
949 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
953 self.registry.set_use_cases(metric_name, use_cases);
954 }
955
956 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
960 self.registry.set_dashboard_hint(metric_name, hint);
961 }
962
963 #[must_use]
968 pub fn registry(&self) -> MetricRegistry {
969 self.registry.clone()
970 }
971
972 #[must_use]
976 pub fn namespace(&self) -> &str {
977 &self.config.namespace
978 }
979
980 fn prefixed_key(&self, name: &str) -> String {
982 if self.config.namespace.is_empty() {
983 name.to_string()
984 } else {
985 format!("{}_{}", self.config.namespace, name)
986 }
987 }
988}
989
990#[cfg(feature = "metrics")]
992#[allow(clippy::too_many_arguments)]
993async fn run_server(
994 listener: TcpListener,
995 handle: PrometheusHandle,
996 registry: MetricRegistry,
997 mut shutdown_rx: oneshot::Receiver<()>,
998 update_interval: Duration,
999 process_metrics: Option<ProcessMetrics>,
1000 container_metrics: Option<ContainerMetrics>,
1001 readiness_fn: Option<ReadinessFn>,
1002 started_flag: Arc<std::sync::atomic::AtomicBool>,
1003) {
1004 let mut update_interval = tokio::time::interval(update_interval);
1005
1006 loop {
1007 tokio::select! {
1008 _ = &mut shutdown_rx => {
1009 break;
1010 }
1011 _ = update_interval.tick() => {
1012 if let Some(ref pm) = process_metrics {
1013 pm.update();
1014 }
1015 if let Some(ref cm) = container_metrics {
1016 cm.update();
1017 }
1018 }
1019 result = listener.accept() => {
1020 if let Ok((stream, _)) = result {
1021 let handle = handle.clone();
1022 let registry = registry.clone();
1023 let readiness_fn = readiness_fn.clone();
1024 let sf = Arc::clone(&started_flag);
1025 tokio::spawn(async move {
1026 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1027 });
1028 }
1029 }
1030 }
1031 }
1032}
1033
1034#[cfg(feature = "metrics")]
1039async fn handle_connection(
1040 mut stream: tokio::net::TcpStream,
1041 handle: PrometheusHandle,
1042 registry: MetricRegistry,
1043 readiness_fn: Option<ReadinessFn>,
1044 started_flag: &std::sync::atomic::AtomicBool,
1045) {
1046 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1047
1048 let mut reader = BufReader::new(&mut stream);
1049 let mut request_line = String::new();
1050
1051 if reader.read_line(&mut request_line).await.is_err() {
1052 return;
1053 }
1054
1055 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1057 (
1058 "200 OK",
1059 "application/json",
1060 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1061 )
1062 } else if request_line.starts_with("GET /metrics") {
1063 ("200 OK", "text/plain; charset=utf-8", handle.render())
1064 } else if request_line.starts_with("GET /startupz")
1065 || request_line.starts_with("GET /health/startup")
1066 {
1067 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1068 (
1069 "200 OK",
1070 "application/json",
1071 r#"{"status":"started"}"#.to_string(),
1072 )
1073 } else {
1074 (
1075 "503 Service Unavailable",
1076 "application/json",
1077 r#"{"status":"starting"}"#.to_string(),
1078 )
1079 }
1080 } else if request_line.starts_with("GET /healthz")
1081 || request_line.starts_with("GET /health/live")
1082 {
1083 (
1084 "200 OK",
1085 "application/json",
1086 r#"{"status":"alive"}"#.to_string(),
1087 )
1088 } else if request_line.starts_with("GET /readyz")
1089 || request_line.starts_with("GET /health/ready")
1090 {
1091 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1092
1093 #[cfg(feature = "health")]
1094 let registry_ready = crate::health::HealthRegistry::is_ready();
1095 #[cfg(not(feature = "health"))]
1096 let registry_ready = true;
1097
1098 let ready = callback_ready && registry_ready;
1099 if ready {
1100 (
1101 "200 OK",
1102 "application/json",
1103 r#"{"status":"ready"}"#.to_string(),
1104 )
1105 } else {
1106 (
1107 "503 Service Unavailable",
1108 "application/json",
1109 r#"{"status":"not_ready"}"#.to_string(),
1110 )
1111 }
1112 } else {
1113 (
1114 "404 Not Found",
1115 "text/plain; charset=utf-8",
1116 "Not Found".to_string(),
1117 )
1118 };
1119
1120 let response = format!(
1121 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1122 body.len()
1123 );
1124
1125 let _ = stream.write_all(response.as_bytes()).await;
1126}
1127
1128#[cfg(all(feature = "metrics", feature = "http-server"))]
1134fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1135 use axum::response::IntoResponse;
1136
1137 let callback_ready = rf.as_ref().is_none_or(|f| f());
1138
1139 #[cfg(feature = "health")]
1140 let registry_ready = crate::health::HealthRegistry::is_ready();
1141 #[cfg(not(feature = "health"))]
1142 let registry_ready = true;
1143
1144 let ready = callback_ready && registry_ready;
1145 if ready {
1146 (
1147 [(axum::http::header::CONTENT_TYPE, "application/json")],
1148 r#"{"status":"ready"}"#,
1149 )
1150 .into_response()
1151 } else {
1152 (
1153 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1154 [(axum::http::header::CONTENT_TYPE, "application/json")],
1155 r#"{"status":"not_ready"}"#,
1156 )
1157 .into_response()
1158 }
1159}
1160
1161#[cfg(all(feature = "metrics", feature = "http-server"))]
1163async fn run_axum_server(
1164 listener: TcpListener,
1165 app: axum::Router,
1166 shutdown_rx: oneshot::Receiver<()>,
1167 update_interval: Duration,
1168 process_metrics: Option<ProcessMetrics>,
1169 container_metrics: Option<ContainerMetrics>,
1170) {
1171 let mut interval = tokio::time::interval(update_interval);
1172
1173 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1175 tokio::spawn(async move {
1176 loop {
1177 tokio::select! {
1178 _ = &mut update_stop_rx => break,
1179 _ = interval.tick() => {
1180 if let Some(ref pm) = process_metrics {
1181 pm.update();
1182 }
1183 if let Some(ref cm) = container_metrics {
1184 cm.update();
1185 }
1186 }
1187 }
1188 }
1189 });
1190
1191 axum::serve(listener, app)
1193 .with_graceful_shutdown(async move {
1194 let _ = shutdown_rx.await;
1195 })
1196 .await
1197 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1198
1199 let _ = update_stop_tx.send(());
1200}
1201
1202#[must_use]
1204pub fn latency_buckets() -> Vec<f64> {
1205 vec![
1206 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1207 ]
1208}
1209
1210#[must_use]
1212pub fn size_buckets() -> Vec<f64> {
1213 vec![
1214 100.0,
1215 1_000.0,
1216 10_000.0,
1217 100_000.0,
1218 1_000_000.0,
1219 10_000_000.0,
1220 ]
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225 use super::*;
1226
1227 #[test]
1228 fn test_metrics_config_default() {
1229 let config = MetricsConfig::default();
1230 assert!(config.namespace.is_empty());
1231 assert!(config.enable_process_metrics);
1232 assert!(config.enable_container_metrics);
1233 assert_eq!(config.update_interval, Duration::from_secs(15));
1234 }
1235
1236 #[test]
1237 fn test_latency_buckets() {
1238 let buckets = latency_buckets();
1239 assert_eq!(buckets.len(), 12);
1240 assert!(buckets[0] < buckets[11]);
1241 }
1242
1243 #[test]
1244 fn test_size_buckets() {
1245 let buckets = size_buckets();
1246 assert_eq!(buckets.len(), 6);
1247 assert!(buckets[0] < buckets[5]);
1248 }
1249}