1mod container;
77pub mod dfe;
78pub mod labels;
79pub mod manifest;
80mod process;
81
82pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
83
84#[cfg(feature = "otel-metrics")]
85pub(crate) mod otel;
86#[cfg(feature = "otel-metrics")]
87pub mod otel_types;
88
89use std::net::SocketAddr;
90use std::sync::Arc;
91use std::time::Duration;
92
93use metrics::{Counter, Gauge, Histogram, Unit};
94use thiserror::Error;
95use tokio::net::TcpListener;
96use tokio::sync::oneshot;
97
98pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
100
101#[cfg(feature = "metrics")]
102use metrics_exporter_prometheus::PrometheusHandle;
103
104pub use container::ContainerMetrics;
105pub use dfe::DfeMetrics;
106#[cfg(feature = "metrics-dfe")]
107pub mod dfe_groups;
108pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
109pub use process::ProcessMetrics;
110
111#[cfg(feature = "otel-metrics")]
112pub use otel_types::{OtelMetricsConfig, OtelProtocol};
113
114#[cfg(feature = "metrics")]
119#[derive(Clone)]
120pub struct RenderHandle(PrometheusHandle);
121
122#[cfg(feature = "metrics")]
123impl RenderHandle {
124 #[must_use]
126 pub fn render(&self) -> String {
127 self.0.render()
128 }
129}
130
131#[derive(Debug, Error)]
133pub enum MetricsError {
134 #[error("failed to build metrics exporter: {0}")]
136 BuildError(String),
137
138 #[error("failed to start metrics server: {0}")]
140 ServerError(String),
141
142 #[error("metrics server already running")]
144 AlreadyRunning,
145
146 #[error("metrics server not running")]
148 NotRunning,
149}
150
151#[derive(Debug, Clone)]
153pub struct MetricsConfig {
154 pub namespace: String,
156 pub enable_process_metrics: bool,
158 pub enable_container_metrics: bool,
160 pub update_interval: Duration,
162 #[cfg(feature = "otel-metrics")]
164 pub otel: OtelMetricsConfig,
165}
166
167impl Default for MetricsConfig {
168 fn default() -> Self {
169 Self {
170 namespace: String::new(),
171 enable_process_metrics: true,
172 enable_container_metrics: true,
173 update_interval: Duration::from_secs(15),
174 #[cfg(feature = "otel-metrics")]
175 otel: OtelMetricsConfig::default(),
176 }
177 }
178}
179
180struct RecorderSetup {
182 #[cfg(feature = "metrics")]
183 prom_handle: Option<PrometheusHandle>,
184 #[cfg(feature = "otel-metrics")]
185 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
186}
187
188#[allow(unused_variables)]
192fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
193 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
195 {
196 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
197 let handle = recorder.handle();
198 if let Err(e) = metrics::set_global_recorder(recorder) {
199 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
200 }
201 RecorderSetup {
202 prom_handle: Some(handle),
203 }
204 }
205
206 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
208 {
209 match otel::build_otel_recorder(&config.namespace, &config.otel) {
210 Ok((otel_recorder, provider)) => {
211 opentelemetry::global::set_meter_provider(provider.clone());
212 if let Err(e) = metrics::set_global_recorder(otel_recorder) {
213 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
214 }
215 RecorderSetup {
216 otel_provider: Some(provider),
217 }
218 }
219 Err(e) => {
220 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
221 RecorderSetup {
222 otel_provider: None,
223 }
224 }
225 }
226 }
227
228 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
230 {
231 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
233 let prom_handle = prom_recorder.handle();
234
235 match otel::build_otel_recorder(&config.namespace, &config.otel) {
237 Ok((otel_recorder, provider)) => {
238 opentelemetry::global::set_meter_provider(provider.clone());
239
240 let fanout = metrics_util::layers::FanoutBuilder::default()
242 .add_recorder(prom_recorder)
243 .add_recorder(otel_recorder)
244 .build();
245
246 if let Err(e) = metrics::set_global_recorder(fanout) {
247 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
248 }
249
250 RecorderSetup {
251 prom_handle: Some(prom_handle),
252 otel_provider: Some(provider),
253 }
254 }
255 Err(e) => {
256 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
258 if let Err(e) = metrics::set_global_recorder(prom_recorder) {
259 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
260 }
261 RecorderSetup {
262 prom_handle: Some(prom_handle),
263 otel_provider: None,
264 }
265 }
266 }
267 }
268}
269
270pub struct MetricsManager {
272 #[cfg(feature = "metrics")]
273 handle: Option<PrometheusHandle>,
274 config: MetricsConfig,
275 shutdown_tx: Option<oneshot::Sender<()>>,
276 process_metrics: Option<ProcessMetrics>,
277 container_metrics: Option<ContainerMetrics>,
278 readiness_fn: Option<ReadinessFn>,
279 started: Arc<std::sync::atomic::AtomicBool>,
280 registry: MetricRegistry,
281 #[cfg(all(feature = "metrics", feature = "scaling"))]
282 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
283 #[cfg(all(feature = "metrics", feature = "memory"))]
284 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
285 #[cfg(feature = "otel-metrics")]
286 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
287}
288
289impl MetricsManager {
290 #[must_use]
292 pub fn new(namespace: &str) -> Self {
293 Self::with_config(MetricsConfig {
294 namespace: namespace.to_string(),
295 ..Default::default()
296 })
297 }
298
299 #[cfg(test)]
307 pub(crate) fn new_for_test(namespace: &str) -> Self {
308 let config = MetricsConfig {
309 namespace: namespace.to_string(),
310 enable_process_metrics: false,
311 enable_container_metrics: false,
312 ..Default::default()
313 };
314
315 let registry = MetricRegistry::new(&config.namespace);
316
317 Self {
318 #[cfg(feature = "metrics")]
319 handle: None,
320 registry,
321 config,
322 shutdown_tx: None,
323 process_metrics: None,
324 container_metrics: None,
325 readiness_fn: None,
326 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
327 #[cfg(all(feature = "metrics", feature = "scaling"))]
328 scaling_pressure: None,
329 #[cfg(all(feature = "metrics", feature = "memory"))]
330 memory_guard: None,
331 #[cfg(feature = "otel-metrics")]
332 otel_provider: None,
333 }
334 }
335
336 #[must_use]
343 pub fn with_config(config: MetricsConfig) -> Self {
344 let setup = install_recorders(&config);
345
346 let process_metrics = if config.enable_process_metrics {
347 Some(ProcessMetrics::new(&config.namespace))
348 } else {
349 None
350 };
351
352 let container_metrics = if config.enable_container_metrics {
353 Some(ContainerMetrics::new(&config.namespace))
354 } else {
355 None
356 };
357
358 let registry = MetricRegistry::new(&config.namespace);
359
360 Self {
361 #[cfg(feature = "metrics")]
362 handle: setup.prom_handle,
363 registry,
364 config,
365 shutdown_tx: None,
366 process_metrics,
367 container_metrics,
368 readiness_fn: None,
369 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
370 #[cfg(all(feature = "metrics", feature = "scaling"))]
371 scaling_pressure: None,
372 #[cfg(all(feature = "metrics", feature = "memory"))]
373 memory_guard: None,
374 #[cfg(feature = "otel-metrics")]
375 otel_provider: setup.otel_provider,
376 }
377 }
378
379 #[must_use]
384 pub fn counter(&self, name: &str, description: &str) -> Counter {
385 let key = self.prefixed_key(name);
386 let desc = description.to_string();
387 metrics::describe_counter!(key.clone(), desc.clone());
388 self.registry.push(MetricDescriptor {
389 name: key.clone(),
390 metric_type: MetricType::Counter,
391 description: desc,
392 unit: String::new(),
393 labels: vec![],
394 group: "custom".into(),
395 buckets: None,
396 use_cases: vec![],
397 dashboard_hint: None,
398 });
399 metrics::counter!(key)
400 }
401
402 #[must_use]
408 pub fn counter_with_labels(
409 &self,
410 name: &str,
411 description: &str,
412 labels: &[&str],
413 group: &str,
414 ) -> Counter {
415 let key = self.prefixed_key(name);
416 let desc = description.to_string();
417 metrics::describe_counter!(key.clone(), desc.clone());
418 self.registry.push(MetricDescriptor {
419 name: key.clone(),
420 metric_type: MetricType::Counter,
421 description: desc,
422 unit: String::new(),
423 labels: labels.iter().map(|s| (*s).to_string()).collect(),
424 group: group.into(),
425 buckets: None,
426 use_cases: vec![],
427 dashboard_hint: None,
428 });
429 metrics::counter!(key)
430 }
431
432 #[must_use]
437 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
438 let key = self.prefixed_key(name);
439 let desc = description.to_string();
440 metrics::describe_gauge!(key.clone(), desc.clone());
441 self.registry.push(MetricDescriptor {
442 name: key.clone(),
443 metric_type: MetricType::Gauge,
444 description: desc,
445 unit: String::new(),
446 labels: vec![],
447 group: "custom".into(),
448 buckets: None,
449 use_cases: vec![],
450 dashboard_hint: None,
451 });
452 metrics::gauge!(key)
453 }
454
455 #[must_use]
457 pub fn gauge_with_labels(
458 &self,
459 name: &str,
460 description: &str,
461 labels: &[&str],
462 group: &str,
463 ) -> Gauge {
464 let key = self.prefixed_key(name);
465 let desc = description.to_string();
466 metrics::describe_gauge!(key.clone(), desc.clone());
467 self.registry.push(MetricDescriptor {
468 name: key.clone(),
469 metric_type: MetricType::Gauge,
470 description: desc,
471 unit: String::new(),
472 labels: labels.iter().map(|s| (*s).to_string()).collect(),
473 group: group.into(),
474 buckets: None,
475 use_cases: vec![],
476 dashboard_hint: None,
477 });
478 metrics::gauge!(key)
479 }
480
481 #[must_use]
486 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
487 let key = self.prefixed_key(name);
488 let desc = description.to_string();
489 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
490 self.registry.push(MetricDescriptor {
491 name: key.clone(),
492 metric_type: MetricType::Histogram,
493 description: desc,
494 unit: "seconds".into(),
495 labels: vec![],
496 group: "custom".into(),
497 buckets: None,
498 use_cases: vec![],
499 dashboard_hint: None,
500 });
501 metrics::histogram!(key)
502 }
503
504 #[must_use]
506 pub fn histogram_with_labels(
507 &self,
508 name: &str,
509 description: &str,
510 labels: &[&str],
511 group: &str,
512 buckets: Option<&[f64]>,
513 ) -> Histogram {
514 let key = self.prefixed_key(name);
515 let desc = description.to_string();
516 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
517 self.registry.push(MetricDescriptor {
518 name: key.clone(),
519 metric_type: MetricType::Histogram,
520 description: desc,
521 unit: "seconds".into(),
522 labels: labels.iter().map(|s| (*s).to_string()).collect(),
523 group: group.into(),
524 buckets: buckets.map(|b| b.to_vec()),
525 use_cases: vec![],
526 dashboard_hint: None,
527 });
528 metrics::histogram!(key)
529 }
530
531 #[must_use]
537 pub fn histogram_with_buckets(
538 &self,
539 name: &str,
540 description: &str,
541 buckets: &[f64],
542 ) -> Histogram {
543 let key = self.prefixed_key(name);
544 let desc = description.to_string();
545 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
546 self.registry.push(MetricDescriptor {
547 name: key.clone(),
548 metric_type: MetricType::Histogram,
549 description: desc,
550 unit: "seconds".into(),
551 labels: vec![],
552 group: "custom".into(),
553 buckets: Some(buckets.to_vec()),
554 use_cases: vec![],
555 dashboard_hint: None,
556 });
557 metrics::histogram!(key)
558 }
559
560 #[cfg(feature = "metrics")]
565 #[must_use]
566 pub fn render(&self) -> String {
567 self.handle
568 .as_ref()
569 .map_or_else(String::new, PrometheusHandle::render)
570 }
571
572 #[cfg(feature = "metrics")]
592 #[must_use]
593 pub fn render_handle(&self) -> Option<RenderHandle> {
594 self.handle.clone().map(RenderHandle)
595 }
596
597 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
603 self.readiness_fn = Some(Arc::new(f));
604 }
605
606 pub fn mark_started(&self) {
612 self.started
613 .store(true, std::sync::atomic::Ordering::Release);
614 }
615
616 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
618 Arc::clone(&self.started)
619 }
620
621 #[cfg(all(feature = "metrics", feature = "scaling"))]
626 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
627 self.scaling_pressure = Some(sp);
628 }
629
630 #[cfg(all(feature = "metrics", feature = "memory"))]
635 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
636 self.memory_guard = Some(mg);
637 }
638
639 pub fn update(&self) {
641 if let Some(ref pm) = self.process_metrics {
642 pm.update();
643 }
644 if let Some(ref cm) = self.container_metrics {
645 cm.update();
646 }
647 }
648
649 #[cfg(feature = "metrics")]
660 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
661 if self.shutdown_tx.is_some() {
662 return Err(MetricsError::AlreadyRunning);
663 }
664
665 let addr: SocketAddr = addr
666 .parse()
667 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
668
669 let listener = TcpListener::bind(addr)
670 .await
671 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
672
673 let (shutdown_tx, shutdown_rx) = oneshot::channel();
674 self.shutdown_tx = Some(shutdown_tx);
675
676 let handle = self
677 .handle
678 .as_ref()
679 .ok_or_else(|| {
680 MetricsError::ServerError(
681 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
682 )
683 })?
684 .clone();
685 let update_interval = self.config.update_interval;
686 let process_metrics = self.process_metrics.clone();
687 let container_metrics = self.container_metrics.clone();
688 let readiness_fn = self.readiness_fn.clone();
689 let started_flag = self.started_flag();
690
691 let registry = self.registry();
692
693 tokio::spawn(async move {
694 run_server(
695 listener,
696 handle,
697 registry,
698 shutdown_rx,
699 update_interval,
700 process_metrics,
701 container_metrics,
702 readiness_fn,
703 started_flag,
704 )
705 .await;
706 });
707
708 Ok(())
709 }
710
711 #[cfg(all(feature = "metrics", feature = "http-server"))]
729 pub async fn start_server_with_routes(
730 &mut self,
731 addr: &str,
732 extra_routes: axum::Router,
733 ) -> Result<(), MetricsError> {
734 if self.shutdown_tx.is_some() {
735 return Err(MetricsError::AlreadyRunning);
736 }
737
738 let addr: SocketAddr = addr
739 .parse()
740 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
741
742 let listener = TcpListener::bind(addr)
743 .await
744 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
745
746 let (shutdown_tx, shutdown_rx) = oneshot::channel();
747 self.shutdown_tx = Some(shutdown_tx);
748
749 let handle = self
750 .handle
751 .as_ref()
752 .ok_or_else(|| {
753 MetricsError::ServerError(
754 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
755 )
756 })?
757 .clone();
758 let update_interval = self.config.update_interval;
759 let process_metrics = self.process_metrics.clone();
760 let container_metrics = self.container_metrics.clone();
761 let readiness_fn = self.readiness_fn.clone();
762
763 let metrics_handle = handle.clone();
765 let readiness_for_live = readiness_fn.clone();
766 let registry_handle = self.registry();
767
768 let mut app = axum::Router::new()
769 .route(
770 "/metrics/manifest",
771 axum::routing::get(move || {
772 let reg = registry_handle.clone();
773 async move {
774 (
775 [(axum::http::header::CONTENT_TYPE, "application/json")],
776 serde_json::to_string(®.manifest()).unwrap_or_default(),
777 )
778 }
779 }),
780 )
781 .route(
782 "/metrics",
783 axum::routing::get(move || {
784 let h = metrics_handle.clone();
785 async move { h.render() }
786 }),
787 )
788 .route("/startupz", {
789 let sf = self.started_flag();
790 axum::routing::get(move || {
791 let started = sf.load(std::sync::atomic::Ordering::Acquire);
792 async move {
793 if started {
794 (
795 axum::http::StatusCode::OK,
796 [(axum::http::header::CONTENT_TYPE, "application/json")],
797 r#"{"status":"started"}"#,
798 )
799 } else {
800 (
801 axum::http::StatusCode::SERVICE_UNAVAILABLE,
802 [(axum::http::header::CONTENT_TYPE, "application/json")],
803 r#"{"status":"starting"}"#,
804 )
805 }
806 }
807 })
808 })
809 .route(
810 "/healthz",
811 axum::routing::get(|| async {
812 (
813 [(axum::http::header::CONTENT_TYPE, "application/json")],
814 r#"{"status":"alive"}"#,
815 )
816 }),
817 )
818 .route(
819 "/health/live",
820 axum::routing::get(|| async {
821 (
822 [(axum::http::header::CONTENT_TYPE, "application/json")],
823 r#"{"status":"alive"}"#,
824 )
825 }),
826 )
827 .route(
828 "/readyz",
829 axum::routing::get(move || {
830 let rf = readiness_fn.clone();
831 async move { readiness_response(rf) }
832 }),
833 )
834 .route(
835 "/health/ready",
836 axum::routing::get(move || {
837 let rf = readiness_for_live.clone();
838 async move { readiness_response(rf) }
839 }),
840 );
841
842 #[cfg(feature = "scaling")]
844 if let Some(ref sp) = self.scaling_pressure {
845 let sp = sp.clone();
846 app = app.route(
847 "/scaling/pressure",
848 axum::routing::get(move || {
849 let s = sp.clone();
850 async move { format!("{:.2}", s.calculate()) }
851 }),
852 );
853 }
854
855 #[cfg(feature = "memory")]
857 if let Some(ref mg) = self.memory_guard {
858 let mg = mg.clone();
859 app = app.route(
860 "/memory/pressure",
861 axum::routing::get(move || {
862 let m = mg.clone();
863 async move {
864 (
865 [(axum::http::header::CONTENT_TYPE, "application/json")],
866 format!(
867 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
868 m.under_pressure(),
869 m.pressure_ratio(),
870 m.current_bytes(),
871 m.limit_bytes()
872 ),
873 )
874 }
875 }),
876 );
877 }
878
879 app = app.merge(extra_routes);
881
882 tokio::spawn(async move {
883 run_axum_server(
884 listener,
885 app,
886 shutdown_rx,
887 update_interval,
888 process_metrics,
889 container_metrics,
890 )
891 .await;
892 });
893
894 Ok(())
895 }
896
897 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
903 if let Some(tx) = self.shutdown_tx.take() {
904 let _ = tx.send(());
905 Ok(())
906 } else {
907 Err(MetricsError::NotRunning)
908 }
909 }
910
911 #[cfg(feature = "otel-metrics")]
915 pub fn shutdown_otel(&mut self) {
916 if let Some(provider) = self.otel_provider.take()
917 && let Err(e) = provider.shutdown()
918 {
919 tracing::warn!(error = %e, "OTel provider shutdown error");
920 }
921 }
922
923 pub fn set_build_info(&self, version: &str, commit: &str) {
929 self.registry.set_build_info(version, commit);
930 }
931
932 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
936 self.registry.set_use_cases(metric_name, use_cases);
937 }
938
939 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
943 self.registry.set_dashboard_hint(metric_name, hint);
944 }
945
946 #[must_use]
951 pub fn registry(&self) -> MetricRegistry {
952 self.registry.clone()
953 }
954
955 #[must_use]
959 pub fn namespace(&self) -> &str {
960 &self.config.namespace
961 }
962
963 fn prefixed_key(&self, name: &str) -> String {
965 if self.config.namespace.is_empty() {
966 name.to_string()
967 } else {
968 format!("{}_{}", self.config.namespace, name)
969 }
970 }
971}
972
973#[cfg(feature = "metrics")]
975#[allow(clippy::too_many_arguments)]
976async fn run_server(
977 listener: TcpListener,
978 handle: PrometheusHandle,
979 registry: MetricRegistry,
980 mut shutdown_rx: oneshot::Receiver<()>,
981 update_interval: Duration,
982 process_metrics: Option<ProcessMetrics>,
983 container_metrics: Option<ContainerMetrics>,
984 readiness_fn: Option<ReadinessFn>,
985 started_flag: Arc<std::sync::atomic::AtomicBool>,
986) {
987 let mut update_interval = tokio::time::interval(update_interval);
988
989 loop {
990 tokio::select! {
991 _ = &mut shutdown_rx => {
992 break;
993 }
994 _ = update_interval.tick() => {
995 if let Some(ref pm) = process_metrics {
996 pm.update();
997 }
998 if let Some(ref cm) = container_metrics {
999 cm.update();
1000 }
1001 }
1002 result = listener.accept() => {
1003 if let Ok((stream, _)) = result {
1004 let handle = handle.clone();
1005 let registry = registry.clone();
1006 let readiness_fn = readiness_fn.clone();
1007 let sf = Arc::clone(&started_flag);
1008 tokio::spawn(async move {
1009 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1010 });
1011 }
1012 }
1013 }
1014 }
1015}
1016
1017#[cfg(feature = "metrics")]
1022async fn handle_connection(
1023 mut stream: tokio::net::TcpStream,
1024 handle: PrometheusHandle,
1025 registry: MetricRegistry,
1026 readiness_fn: Option<ReadinessFn>,
1027 started_flag: &std::sync::atomic::AtomicBool,
1028) {
1029 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1030
1031 let mut reader = BufReader::new(&mut stream);
1032 let mut request_line = String::new();
1033
1034 if reader.read_line(&mut request_line).await.is_err() {
1035 return;
1036 }
1037
1038 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1040 (
1041 "200 OK",
1042 "application/json",
1043 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1044 )
1045 } else if request_line.starts_with("GET /metrics") {
1046 ("200 OK", "text/plain; charset=utf-8", handle.render())
1047 } else if request_line.starts_with("GET /startupz")
1048 || request_line.starts_with("GET /health/startup")
1049 {
1050 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1051 (
1052 "200 OK",
1053 "application/json",
1054 r#"{"status":"started"}"#.to_string(),
1055 )
1056 } else {
1057 (
1058 "503 Service Unavailable",
1059 "application/json",
1060 r#"{"status":"starting"}"#.to_string(),
1061 )
1062 }
1063 } else if request_line.starts_with("GET /healthz")
1064 || request_line.starts_with("GET /health/live")
1065 {
1066 (
1067 "200 OK",
1068 "application/json",
1069 r#"{"status":"alive"}"#.to_string(),
1070 )
1071 } else if request_line.starts_with("GET /readyz")
1072 || request_line.starts_with("GET /health/ready")
1073 {
1074 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1075
1076 #[cfg(feature = "health")]
1077 let registry_ready = crate::health::HealthRegistry::is_ready();
1078 #[cfg(not(feature = "health"))]
1079 let registry_ready = true;
1080
1081 let ready = callback_ready && registry_ready;
1082 if ready {
1083 (
1084 "200 OK",
1085 "application/json",
1086 r#"{"status":"ready"}"#.to_string(),
1087 )
1088 } else {
1089 (
1090 "503 Service Unavailable",
1091 "application/json",
1092 r#"{"status":"not_ready"}"#.to_string(),
1093 )
1094 }
1095 } else {
1096 (
1097 "404 Not Found",
1098 "text/plain; charset=utf-8",
1099 "Not Found".to_string(),
1100 )
1101 };
1102
1103 let response = format!(
1104 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1105 body.len()
1106 );
1107
1108 let _ = stream.write_all(response.as_bytes()).await;
1109}
1110
1111#[cfg(all(feature = "metrics", feature = "http-server"))]
1117fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1118 use axum::response::IntoResponse;
1119
1120 let callback_ready = rf.as_ref().is_none_or(|f| f());
1121
1122 #[cfg(feature = "health")]
1123 let registry_ready = crate::health::HealthRegistry::is_ready();
1124 #[cfg(not(feature = "health"))]
1125 let registry_ready = true;
1126
1127 let ready = callback_ready && registry_ready;
1128 if ready {
1129 (
1130 [(axum::http::header::CONTENT_TYPE, "application/json")],
1131 r#"{"status":"ready"}"#,
1132 )
1133 .into_response()
1134 } else {
1135 (
1136 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1137 [(axum::http::header::CONTENT_TYPE, "application/json")],
1138 r#"{"status":"not_ready"}"#,
1139 )
1140 .into_response()
1141 }
1142}
1143
1144#[cfg(all(feature = "metrics", feature = "http-server"))]
1146async fn run_axum_server(
1147 listener: TcpListener,
1148 app: axum::Router,
1149 shutdown_rx: oneshot::Receiver<()>,
1150 update_interval: Duration,
1151 process_metrics: Option<ProcessMetrics>,
1152 container_metrics: Option<ContainerMetrics>,
1153) {
1154 let mut interval = tokio::time::interval(update_interval);
1155
1156 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1158 tokio::spawn(async move {
1159 loop {
1160 tokio::select! {
1161 _ = &mut update_stop_rx => break,
1162 _ = interval.tick() => {
1163 if let Some(ref pm) = process_metrics {
1164 pm.update();
1165 }
1166 if let Some(ref cm) = container_metrics {
1167 cm.update();
1168 }
1169 }
1170 }
1171 }
1172 });
1173
1174 axum::serve(listener, app)
1176 .with_graceful_shutdown(async move {
1177 let _ = shutdown_rx.await;
1178 })
1179 .await
1180 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1181
1182 let _ = update_stop_tx.send(());
1183}
1184
1185#[must_use]
1187pub fn latency_buckets() -> Vec<f64> {
1188 vec![
1189 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1190 ]
1191}
1192
1193#[must_use]
1195pub fn size_buckets() -> Vec<f64> {
1196 vec![
1197 100.0,
1198 1_000.0,
1199 10_000.0,
1200 100_000.0,
1201 1_000_000.0,
1202 10_000_000.0,
1203 ]
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208 use super::*;
1209
1210 #[test]
1211 fn test_metrics_config_default() {
1212 let config = MetricsConfig::default();
1213 assert!(config.namespace.is_empty());
1214 assert!(config.enable_process_metrics);
1215 assert!(config.enable_container_metrics);
1216 assert_eq!(config.update_interval, Duration::from_secs(15));
1217 }
1218
1219 #[test]
1220 fn test_latency_buckets() {
1221 let buckets = latency_buckets();
1222 assert_eq!(buckets.len(), 12);
1223 assert!(buckets[0] < buckets[11]);
1224 }
1225
1226 #[test]
1227 fn test_size_buckets() {
1228 let buckets = size_buckets();
1229 assert_eq!(buckets.len(), 6);
1230 assert!(buckets[0] < buckets[5]);
1231 }
1232}