1mod container;
77pub mod dfe;
78pub mod labels;
79pub mod manifest;
80mod process;
81
82pub use labels::{AuthFailureReason, FlushTrigger, TransportKind, ValidationFailureReason};
83
84#[cfg(feature = "otel-metrics")]
85pub(crate) mod otel;
86#[cfg(feature = "otel-metrics")]
87pub mod otel_types;
88
89use std::net::SocketAddr;
90use std::sync::Arc;
91use std::time::Duration;
92
93use metrics::{Counter, Gauge, Histogram, Unit};
94use thiserror::Error;
95use tokio::net::TcpListener;
96use tokio::sync::oneshot;
97
98pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
100
101#[cfg(feature = "metrics")]
102use metrics_exporter_prometheus::PrometheusHandle;
103
104pub use container::ContainerMetrics;
105pub use dfe::DfeMetrics;
106#[cfg(feature = "metrics-dfe")]
107pub mod dfe_groups;
108pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
109pub use process::ProcessMetrics;
110
111#[cfg(feature = "otel-metrics")]
112pub use otel_types::{OtelMetricsConfig, OtelProtocol};
113
114#[cfg(feature = "metrics")]
119#[derive(Clone)]
120pub struct RenderHandle(PrometheusHandle);
121
122#[cfg(feature = "metrics")]
123impl RenderHandle {
124 #[must_use]
126 pub fn render(&self) -> String {
127 self.0.render()
128 }
129}
130
131#[derive(Debug, Error)]
133pub enum MetricsError {
134 #[error("failed to build metrics exporter: {0}")]
136 BuildError(String),
137
138 #[error("failed to start metrics server: {0}")]
140 ServerError(String),
141
142 #[error("metrics server already running")]
144 AlreadyRunning,
145
146 #[error("metrics server not running")]
148 NotRunning,
149}
150
151#[derive(Debug, Clone)]
153pub struct MetricsConfig {
154 pub namespace: String,
156 pub enable_process_metrics: bool,
158 pub enable_container_metrics: bool,
160 pub update_interval: Duration,
162 #[cfg(feature = "otel-metrics")]
164 pub otel: OtelMetricsConfig,
165}
166
167impl Default for MetricsConfig {
168 fn default() -> Self {
169 Self {
170 namespace: String::new(),
171 enable_process_metrics: true,
172 enable_container_metrics: true,
173 update_interval: Duration::from_secs(15),
174 #[cfg(feature = "otel-metrics")]
175 otel: OtelMetricsConfig::default(),
176 }
177 }
178}
179
180struct RecorderSetup {
182 #[cfg(feature = "metrics")]
183 prom_handle: Option<PrometheusHandle>,
184 #[cfg(feature = "otel-metrics")]
185 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
186}
187
188#[allow(unused_variables)]
192fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
193 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
195 {
196 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
197 let handle = recorder.handle();
198 if let Err(e) = metrics::set_global_recorder(recorder) {
199 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
200 }
201 RecorderSetup {
202 prom_handle: Some(handle),
203 }
204 }
205
206 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
208 {
209 match otel::build_otel_recorder(&config.namespace, &config.otel) {
210 Ok((otel_recorder, provider)) => {
211 opentelemetry::global::set_meter_provider(provider.clone());
212 if let Err(e) = metrics::set_global_recorder(otel_recorder) {
213 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
214 }
215 RecorderSetup {
216 otel_provider: Some(provider),
217 }
218 }
219 Err(e) => {
220 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
221 RecorderSetup {
222 otel_provider: None,
223 }
224 }
225 }
226 }
227
228 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
230 {
231 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
233 let prom_handle = prom_recorder.handle();
234
235 match otel::build_otel_recorder(&config.namespace, &config.otel) {
237 Ok((otel_recorder, provider)) => {
238 opentelemetry::global::set_meter_provider(provider.clone());
239
240 let fanout = metrics_util::layers::FanoutBuilder::default()
242 .add_recorder(prom_recorder)
243 .add_recorder(otel_recorder)
244 .build();
245
246 if let Err(e) = metrics::set_global_recorder(fanout) {
247 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
248 }
249
250 RecorderSetup {
251 prom_handle: Some(prom_handle),
252 otel_provider: Some(provider),
253 }
254 }
255 Err(e) => {
256 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
258 if let Err(e) = metrics::set_global_recorder(prom_recorder) {
259 tracing::warn!(error = %e, "global metrics recorder already installed; keeping existing");
260 }
261 RecorderSetup {
262 prom_handle: Some(prom_handle),
263 otel_provider: None,
264 }
265 }
266 }
267 }
268}
269
270pub struct MetricsManager {
272 #[cfg(feature = "metrics")]
273 handle: Option<PrometheusHandle>,
274 config: MetricsConfig,
275 shutdown_tx: Option<oneshot::Sender<()>>,
276 process_metrics: Option<ProcessMetrics>,
277 container_metrics: Option<ContainerMetrics>,
278 readiness_fn: Option<ReadinessFn>,
279 started: Arc<std::sync::atomic::AtomicBool>,
280 registry: MetricRegistry,
281 #[cfg(all(feature = "metrics", feature = "scaling"))]
282 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
283 #[cfg(all(feature = "metrics", feature = "memory"))]
284 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
285 #[cfg(feature = "otel-metrics")]
286 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
287}
288
289impl MetricsManager {
290 #[must_use]
292 pub fn new(namespace: &str) -> Self {
293 Self::with_config(MetricsConfig {
294 namespace: namespace.to_string(),
295 ..Default::default()
296 })
297 }
298
299 #[cfg(test)]
307 pub(crate) fn new_for_test(namespace: &str) -> Self {
308 let config = MetricsConfig {
309 namespace: namespace.to_string(),
310 enable_process_metrics: false,
311 enable_container_metrics: false,
312 ..Default::default()
313 };
314
315 let registry = MetricRegistry::new(&config.namespace);
316
317 Self {
318 #[cfg(feature = "metrics")]
319 handle: None,
320 registry,
321 config,
322 shutdown_tx: None,
323 process_metrics: None,
324 container_metrics: None,
325 readiness_fn: None,
326 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
327 #[cfg(all(feature = "metrics", feature = "scaling"))]
328 scaling_pressure: None,
329 #[cfg(all(feature = "metrics", feature = "memory"))]
330 memory_guard: None,
331 #[cfg(feature = "otel-metrics")]
332 otel_provider: None,
333 }
334 }
335
336 #[must_use]
343 pub fn with_config(config: MetricsConfig) -> Self {
344 let setup = install_recorders(&config);
345
346 let process_metrics = if config.enable_process_metrics {
347 Some(ProcessMetrics::new(&config.namespace))
348 } else {
349 None
350 };
351
352 let container_metrics = if config.enable_container_metrics {
353 Some(ContainerMetrics::new(&config.namespace))
354 } else {
355 None
356 };
357
358 let registry = MetricRegistry::new(&config.namespace);
359
360 Self {
361 #[cfg(feature = "metrics")]
362 handle: setup.prom_handle,
363 registry,
364 config,
365 shutdown_tx: None,
366 process_metrics,
367 container_metrics,
368 readiness_fn: None,
369 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
370 #[cfg(all(feature = "metrics", feature = "scaling"))]
371 scaling_pressure: None,
372 #[cfg(all(feature = "metrics", feature = "memory"))]
373 memory_guard: None,
374 #[cfg(feature = "otel-metrics")]
375 otel_provider: setup.otel_provider,
376 }
377 }
378
379 #[must_use]
384 pub fn counter(&self, name: &str, description: &str) -> Counter {
385 let key = self.prefixed_key(name);
386 let desc = description.to_string();
387 metrics::describe_counter!(key.clone(), desc.clone());
388 self.registry.push(MetricDescriptor {
389 name: key.clone(),
390 metric_type: MetricType::Counter,
391 description: desc,
392 unit: String::new(),
393 labels: vec![],
394 group: "custom".into(),
395 buckets: None,
396 use_cases: vec![],
397 dashboard_hint: None,
398 });
399 metrics::counter!(key)
400 }
401
402 #[must_use]
408 pub fn counter_with_labels(
409 &self,
410 name: &str,
411 description: &str,
412 labels: &[&str],
413 group: &str,
414 ) -> Counter {
415 let key = self.prefixed_key(name);
416 let desc = description.to_string();
417 metrics::describe_counter!(key.clone(), desc.clone());
418 self.registry.push(MetricDescriptor {
419 name: key.clone(),
420 metric_type: MetricType::Counter,
421 description: desc,
422 unit: String::new(),
423 labels: labels.iter().map(|s| (*s).to_string()).collect(),
424 group: group.into(),
425 buckets: None,
426 use_cases: vec![],
427 dashboard_hint: None,
428 });
429 metrics::counter!(key)
430 }
431
432 #[must_use]
437 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
438 let key = self.prefixed_key(name);
439 let desc = description.to_string();
440 metrics::describe_gauge!(key.clone(), desc.clone());
441 self.registry.push(MetricDescriptor {
442 name: key.clone(),
443 metric_type: MetricType::Gauge,
444 description: desc,
445 unit: String::new(),
446 labels: vec![],
447 group: "custom".into(),
448 buckets: None,
449 use_cases: vec![],
450 dashboard_hint: None,
451 });
452 metrics::gauge!(key)
453 }
454
455 #[must_use]
457 pub fn gauge_with_labels(
458 &self,
459 name: &str,
460 description: &str,
461 labels: &[&str],
462 group: &str,
463 ) -> Gauge {
464 let key = self.prefixed_key(name);
465 let desc = description.to_string();
466 metrics::describe_gauge!(key.clone(), desc.clone());
467 self.registry.push(MetricDescriptor {
468 name: key.clone(),
469 metric_type: MetricType::Gauge,
470 description: desc,
471 unit: String::new(),
472 labels: labels.iter().map(|s| (*s).to_string()).collect(),
473 group: group.into(),
474 buckets: None,
475 use_cases: vec![],
476 dashboard_hint: None,
477 });
478 metrics::gauge!(key)
479 }
480
481 #[must_use]
486 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
487 let key = self.prefixed_key(name);
488 let desc = description.to_string();
489 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
490 self.registry.push(MetricDescriptor {
491 name: key.clone(),
492 metric_type: MetricType::Histogram,
493 description: desc,
494 unit: "seconds".into(),
495 labels: vec![],
496 group: "custom".into(),
497 buckets: None,
498 use_cases: vec![],
499 dashboard_hint: None,
500 });
501 metrics::histogram!(key)
502 }
503
504 #[must_use]
506 pub fn histogram_with_labels(
507 &self,
508 name: &str,
509 description: &str,
510 labels: &[&str],
511 group: &str,
512 buckets: Option<&[f64]>,
513 ) -> Histogram {
514 let key = self.prefixed_key(name);
515 let desc = description.to_string();
516 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
517 self.registry.push(MetricDescriptor {
518 name: key.clone(),
519 metric_type: MetricType::Histogram,
520 description: desc,
521 unit: "seconds".into(),
522 labels: labels.iter().map(|s| (*s).to_string()).collect(),
523 group: group.into(),
524 buckets: buckets.map(|b| b.to_vec()),
525 use_cases: vec![],
526 dashboard_hint: None,
527 });
528 metrics::histogram!(key)
529 }
530
531 #[must_use]
537 pub fn histogram_with_buckets(
538 &self,
539 name: &str,
540 description: &str,
541 buckets: &[f64],
542 ) -> Histogram {
543 let key = self.prefixed_key(name);
544 let desc = description.to_string();
545 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
546 self.registry.push(MetricDescriptor {
547 name: key.clone(),
548 metric_type: MetricType::Histogram,
549 description: desc,
550 unit: "seconds".into(),
551 labels: vec![],
552 group: "custom".into(),
553 buckets: Some(buckets.to_vec()),
554 use_cases: vec![],
555 dashboard_hint: None,
556 });
557 metrics::histogram!(key)
558 }
559
560 #[cfg(feature = "metrics")]
565 #[must_use]
566 pub fn render(&self) -> String {
567 self.handle
568 .as_ref()
569 .map_or_else(String::new, PrometheusHandle::render)
570 }
571
572 #[cfg(feature = "metrics")]
592 #[must_use]
593 pub fn render_handle(&self) -> Option<RenderHandle> {
594 self.handle.clone().map(RenderHandle)
595 }
596
597 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
603 self.readiness_fn = Some(Arc::new(f));
604 }
605
606 pub fn mark_started(&self) {
612 self.started
613 .store(true, std::sync::atomic::Ordering::Release);
614 }
615
616 pub(crate) fn started_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
618 Arc::clone(&self.started)
619 }
620
621 #[cfg(all(feature = "metrics", feature = "scaling"))]
626 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
627 self.scaling_pressure = Some(sp);
628 }
629
630 #[cfg(all(feature = "metrics", feature = "memory"))]
635 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
636 self.memory_guard = Some(mg);
637 }
638
639 pub fn update(&self) {
641 if let Some(ref pm) = self.process_metrics {
642 pm.update();
643 }
644 if let Some(ref cm) = self.container_metrics {
645 cm.update();
646 }
647 }
648
649 #[cfg(feature = "metrics")]
660 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
661 if self.shutdown_tx.is_some() {
662 return Err(MetricsError::AlreadyRunning);
663 }
664
665 let addr: SocketAddr = addr
666 .parse()
667 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
668
669 let listener = TcpListener::bind(addr)
670 .await
671 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
672
673 let (shutdown_tx, shutdown_rx) = oneshot::channel();
674 self.shutdown_tx = Some(shutdown_tx);
675
676 let handle = self
677 .handle
678 .as_ref()
679 .ok_or_else(|| {
680 MetricsError::ServerError(
681 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
682 )
683 })?
684 .clone();
685 let update_interval = self.config.update_interval;
686 let process_metrics = self.process_metrics.clone();
687 let container_metrics = self.container_metrics.clone();
688 let readiness_fn = self.readiness_fn.clone();
689 let started_flag = self.started_flag();
690
691 let registry = self.registry();
692
693 tokio::spawn(async move {
694 run_server(
695 listener,
696 handle,
697 registry,
698 shutdown_rx,
699 update_interval,
700 process_metrics,
701 container_metrics,
702 readiness_fn,
703 started_flag,
704 )
705 .await;
706 });
707
708 Ok(())
709 }
710
711 #[cfg(all(feature = "metrics", feature = "http-server"))]
729 pub async fn start_server_with_routes(
730 &mut self,
731 addr: &str,
732 extra_routes: axum::Router,
733 ) -> Result<(), MetricsError> {
734 if self.shutdown_tx.is_some() {
735 return Err(MetricsError::AlreadyRunning);
736 }
737
738 let addr: SocketAddr = addr
739 .parse()
740 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
741
742 let listener = TcpListener::bind(addr)
743 .await
744 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
745
746 let (shutdown_tx, shutdown_rx) = oneshot::channel();
747 self.shutdown_tx = Some(shutdown_tx);
748
749 let handle = self
750 .handle
751 .as_ref()
752 .ok_or_else(|| {
753 MetricsError::ServerError(
754 "Prometheus handle not configured -- MetricsManager was created without a recorder".into(),
755 )
756 })?
757 .clone();
758 let update_interval = self.config.update_interval;
759 let process_metrics = self.process_metrics.clone();
760 let container_metrics = self.container_metrics.clone();
761 let readiness_fn = self.readiness_fn.clone();
762
763 let metrics_handle = handle.clone();
765 let readiness_for_live = readiness_fn.clone();
766 let registry_handle = self.registry();
767
768 let mut app = axum::Router::new()
769 .route(
770 "/metrics/manifest",
771 axum::routing::get(move || {
772 let reg = registry_handle.clone();
773 async move {
774 (
775 [(axum::http::header::CONTENT_TYPE, "application/json")],
776 serde_json::to_string(®.manifest()).unwrap_or_default(),
777 )
778 }
779 }),
780 )
781 .route(
782 "/metrics",
783 axum::routing::get(move || {
784 let h = metrics_handle.clone();
785 async move { h.render() }
786 }),
787 )
788 .route("/startupz", {
789 let sf = self.started_flag();
790 axum::routing::get(move || {
791 let started = sf.load(std::sync::atomic::Ordering::Acquire);
792 async move {
793 if started {
794 (
795 axum::http::StatusCode::OK,
796 [(axum::http::header::CONTENT_TYPE, "application/json")],
797 r#"{"status":"started"}"#,
798 )
799 } else {
800 (
801 axum::http::StatusCode::SERVICE_UNAVAILABLE,
802 [(axum::http::header::CONTENT_TYPE, "application/json")],
803 r#"{"status":"starting"}"#,
804 )
805 }
806 }
807 })
808 })
809 .route(
810 "/healthz",
811 axum::routing::get(|| async {
812 (
813 [(axum::http::header::CONTENT_TYPE, "application/json")],
814 r#"{"status":"alive"}"#,
815 )
816 }),
817 )
818 .route(
819 "/health/live",
820 axum::routing::get(|| async {
821 (
822 [(axum::http::header::CONTENT_TYPE, "application/json")],
823 r#"{"status":"alive"}"#,
824 )
825 }),
826 )
827 .route(
828 "/readyz",
829 axum::routing::get(move || {
830 let rf = readiness_fn.clone();
831 async move { readiness_response(rf) }
832 }),
833 )
834 .route(
835 "/health/ready",
836 axum::routing::get(move || {
837 let rf = readiness_for_live.clone();
838 async move { readiness_response(rf) }
839 }),
840 );
841
842 #[cfg(feature = "scaling")]
844 if let Some(ref sp) = self.scaling_pressure {
845 let sp = sp.clone();
846 app = app.route(
847 "/scaling/pressure",
848 axum::routing::get(move || {
849 let s = sp.clone();
850 async move { format!("{:.2}", s.calculate()) }
851 }),
852 );
853 }
854
855 #[cfg(feature = "memory")]
857 if let Some(ref mg) = self.memory_guard {
858 let mg = mg.clone();
859 app = app.route(
860 "/memory/pressure",
861 axum::routing::get(move || {
862 let m = mg.clone();
863 async move {
864 (
865 [(axum::http::header::CONTENT_TYPE, "application/json")],
866 format!(
867 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
868 m.under_pressure(),
869 m.pressure_ratio(),
870 m.current_bytes(),
871 m.limit_bytes()
872 ),
873 )
874 }
875 }),
876 );
877 }
878
879 app = app.merge(extra_routes);
881
882 tokio::spawn(async move {
883 run_axum_server(
884 listener,
885 app,
886 shutdown_rx,
887 update_interval,
888 process_metrics,
889 container_metrics,
890 )
891 .await;
892 });
893
894 Ok(())
895 }
896
897 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
903 if let Some(tx) = self.shutdown_tx.take() {
904 let _ = tx.send(());
905 Ok(())
906 } else {
907 Err(MetricsError::NotRunning)
908 }
909 }
910
911 #[cfg(feature = "otel-metrics")]
915 pub fn shutdown_otel(&mut self) {
916 if let Some(provider) = self.otel_provider.take()
917 && let Err(e) = provider.shutdown()
918 {
919 tracing::warn!(error = %e, "OTel provider shutdown error");
920 }
921 }
922
923 pub fn set_build_info(&self, version: &str, commit: &str) {
929 self.registry.set_build_info(version, commit);
930 }
931
932 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
936 self.registry.set_use_cases(metric_name, use_cases);
937 }
938
939 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
943 self.registry.set_dashboard_hint(metric_name, hint);
944 }
945
946 #[must_use]
951 pub fn registry(&self) -> MetricRegistry {
952 self.registry.clone()
953 }
954
955 #[must_use]
960 pub fn namespace(&self) -> &str {
961 &self.config.namespace
962 }
963
964 fn prefixed_key(&self, name: &str) -> String {
966 if self.config.namespace.is_empty() {
967 name.to_string()
968 } else {
969 format!("{}_{}", self.config.namespace, name)
970 }
971 }
972}
973
974#[cfg(feature = "metrics")]
976#[allow(clippy::too_many_arguments)]
977async fn run_server(
978 listener: TcpListener,
979 handle: PrometheusHandle,
980 registry: MetricRegistry,
981 mut shutdown_rx: oneshot::Receiver<()>,
982 update_interval: Duration,
983 process_metrics: Option<ProcessMetrics>,
984 container_metrics: Option<ContainerMetrics>,
985 readiness_fn: Option<ReadinessFn>,
986 started_flag: Arc<std::sync::atomic::AtomicBool>,
987) {
988 let mut update_interval = tokio::time::interval(update_interval);
989
990 loop {
991 tokio::select! {
992 _ = &mut shutdown_rx => {
993 break;
994 }
995 _ = update_interval.tick() => {
996 if let Some(ref pm) = process_metrics {
997 pm.update();
998 }
999 if let Some(ref cm) = container_metrics {
1000 cm.update();
1001 }
1002 }
1003 result = listener.accept() => {
1004 if let Ok((stream, _)) = result {
1005 let handle = handle.clone();
1006 let registry = registry.clone();
1007 let readiness_fn = readiness_fn.clone();
1008 let sf = Arc::clone(&started_flag);
1009 tokio::spawn(async move {
1010 handle_connection(stream, handle, registry, readiness_fn, &sf).await;
1011 });
1012 }
1013 }
1014 }
1015 }
1016}
1017
1018#[cfg(feature = "metrics")]
1023async fn handle_connection(
1024 mut stream: tokio::net::TcpStream,
1025 handle: PrometheusHandle,
1026 registry: MetricRegistry,
1027 readiness_fn: Option<ReadinessFn>,
1028 started_flag: &std::sync::atomic::AtomicBool,
1029) {
1030 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
1031
1032 let mut reader = BufReader::new(&mut stream);
1033 let mut request_line = String::new();
1034
1035 if reader.read_line(&mut request_line).await.is_err() {
1036 return;
1037 }
1038
1039 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
1041 (
1042 "200 OK",
1043 "application/json",
1044 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
1045 )
1046 } else if request_line.starts_with("GET /metrics") {
1047 ("200 OK", "text/plain; charset=utf-8", handle.render())
1048 } else if request_line.starts_with("GET /startupz")
1049 || request_line.starts_with("GET /health/startup")
1050 {
1051 if started_flag.load(std::sync::atomic::Ordering::Acquire) {
1052 (
1053 "200 OK",
1054 "application/json",
1055 r#"{"status":"started"}"#.to_string(),
1056 )
1057 } else {
1058 (
1059 "503 Service Unavailable",
1060 "application/json",
1061 r#"{"status":"starting"}"#.to_string(),
1062 )
1063 }
1064 } else if request_line.starts_with("GET /healthz")
1065 || request_line.starts_with("GET /health/live")
1066 {
1067 (
1068 "200 OK",
1069 "application/json",
1070 r#"{"status":"alive"}"#.to_string(),
1071 )
1072 } else if request_line.starts_with("GET /readyz")
1073 || request_line.starts_with("GET /health/ready")
1074 {
1075 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
1076
1077 #[cfg(feature = "health")]
1078 let registry_ready = crate::health::HealthRegistry::is_ready();
1079 #[cfg(not(feature = "health"))]
1080 let registry_ready = true;
1081
1082 let ready = callback_ready && registry_ready;
1083 if ready {
1084 (
1085 "200 OK",
1086 "application/json",
1087 r#"{"status":"ready"}"#.to_string(),
1088 )
1089 } else {
1090 (
1091 "503 Service Unavailable",
1092 "application/json",
1093 r#"{"status":"not_ready"}"#.to_string(),
1094 )
1095 }
1096 } else {
1097 (
1098 "404 Not Found",
1099 "text/plain; charset=utf-8",
1100 "Not Found".to_string(),
1101 )
1102 };
1103
1104 let response = format!(
1105 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1106 body.len()
1107 );
1108
1109 let _ = stream.write_all(response.as_bytes()).await;
1110}
1111
1112#[cfg(all(feature = "metrics", feature = "http-server"))]
1118fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1119 use axum::response::IntoResponse;
1120
1121 let callback_ready = rf.as_ref().is_none_or(|f| f());
1122
1123 #[cfg(feature = "health")]
1124 let registry_ready = crate::health::HealthRegistry::is_ready();
1125 #[cfg(not(feature = "health"))]
1126 let registry_ready = true;
1127
1128 let ready = callback_ready && registry_ready;
1129 if ready {
1130 (
1131 [(axum::http::header::CONTENT_TYPE, "application/json")],
1132 r#"{"status":"ready"}"#,
1133 )
1134 .into_response()
1135 } else {
1136 (
1137 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1138 [(axum::http::header::CONTENT_TYPE, "application/json")],
1139 r#"{"status":"not_ready"}"#,
1140 )
1141 .into_response()
1142 }
1143}
1144
1145#[cfg(all(feature = "metrics", feature = "http-server"))]
1147async fn run_axum_server(
1148 listener: TcpListener,
1149 app: axum::Router,
1150 shutdown_rx: oneshot::Receiver<()>,
1151 update_interval: Duration,
1152 process_metrics: Option<ProcessMetrics>,
1153 container_metrics: Option<ContainerMetrics>,
1154) {
1155 let mut interval = tokio::time::interval(update_interval);
1156
1157 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1159 tokio::spawn(async move {
1160 loop {
1161 tokio::select! {
1162 _ = &mut update_stop_rx => break,
1163 _ = interval.tick() => {
1164 if let Some(ref pm) = process_metrics {
1165 pm.update();
1166 }
1167 if let Some(ref cm) = container_metrics {
1168 cm.update();
1169 }
1170 }
1171 }
1172 }
1173 });
1174
1175 axum::serve(listener, app)
1177 .with_graceful_shutdown(async move {
1178 let _ = shutdown_rx.await;
1179 })
1180 .await
1181 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1182
1183 let _ = update_stop_tx.send(());
1184}
1185
1186#[must_use]
1188pub fn latency_buckets() -> Vec<f64> {
1189 vec![
1190 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1191 ]
1192}
1193
1194#[must_use]
1196pub fn size_buckets() -> Vec<f64> {
1197 vec![
1198 100.0,
1199 1_000.0,
1200 10_000.0,
1201 100_000.0,
1202 1_000_000.0,
1203 10_000_000.0,
1204 ]
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210
1211 #[test]
1212 fn test_metrics_config_default() {
1213 let config = MetricsConfig::default();
1214 assert!(config.namespace.is_empty());
1215 assert!(config.enable_process_metrics);
1216 assert!(config.enable_container_metrics);
1217 assert_eq!(config.update_interval, Duration::from_secs(15));
1218 }
1219
1220 #[test]
1221 fn test_latency_buckets() {
1222 let buckets = latency_buckets();
1223 assert_eq!(buckets.len(), 12);
1224 assert!(buckets[0] < buckets[11]);
1225 }
1226
1227 #[test]
1228 fn test_size_buckets() {
1229 let buckets = size_buckets();
1230 assert_eq!(buckets.len(), 6);
1231 assert!(buckets[0] < buckets[5]);
1232 }
1233}