1mod container;
86pub mod dfe;
87pub mod manifest;
88mod process;
89
90#[cfg(feature = "otel-metrics")]
91pub(crate) mod otel;
92#[cfg(feature = "otel-metrics")]
93pub mod otel_types;
94
95use std::net::SocketAddr;
96use std::sync::Arc;
97use std::time::Duration;
98
99use metrics::{Counter, Gauge, Histogram, Unit};
100use thiserror::Error;
101use tokio::net::TcpListener;
102use tokio::sync::oneshot;
103
104pub type ReadinessFn = Arc<dyn Fn() -> bool + Send + Sync>;
106
107#[cfg(feature = "metrics")]
108use metrics_exporter_prometheus::PrometheusHandle;
109
110pub use container::ContainerMetrics;
111pub use dfe::DfeMetrics;
112#[cfg(feature = "metrics-dfe")]
113pub mod dfe_groups;
114pub use manifest::{ManifestResponse, MetricDescriptor, MetricRegistry, MetricType};
115pub use process::ProcessMetrics;
116
117#[cfg(feature = "otel-metrics")]
118pub use otel_types::{OtelMetricsConfig, OtelProtocol};
119
120#[cfg(feature = "metrics")]
125#[derive(Clone)]
126pub struct RenderHandle(PrometheusHandle);
127
128#[cfg(feature = "metrics")]
129impl RenderHandle {
130 #[must_use]
132 pub fn render(&self) -> String {
133 self.0.render()
134 }
135}
136
137#[derive(Debug, Error)]
139pub enum MetricsError {
140 #[error("failed to build metrics exporter: {0}")]
142 BuildError(String),
143
144 #[error("failed to start metrics server: {0}")]
146 ServerError(String),
147
148 #[error("metrics server already running")]
150 AlreadyRunning,
151
152 #[error("metrics server not running")]
154 NotRunning,
155}
156
157#[derive(Debug, Clone)]
159pub struct MetricsConfig {
160 pub namespace: String,
162 pub enable_process_metrics: bool,
164 pub enable_container_metrics: bool,
166 pub update_interval: Duration,
168 #[cfg(feature = "otel-metrics")]
170 pub otel: OtelMetricsConfig,
171}
172
173impl Default for MetricsConfig {
174 fn default() -> Self {
175 Self {
176 namespace: String::new(),
177 enable_process_metrics: true,
178 enable_container_metrics: true,
179 update_interval: Duration::from_secs(15),
180 #[cfg(feature = "otel-metrics")]
181 otel: OtelMetricsConfig::default(),
182 }
183 }
184}
185
186struct RecorderSetup {
188 #[cfg(feature = "metrics")]
189 prom_handle: Option<PrometheusHandle>,
190 #[cfg(feature = "otel-metrics")]
191 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
192}
193
194#[allow(unused_variables)]
200fn install_recorders(config: &MetricsConfig) -> RecorderSetup {
201 #[cfg(all(feature = "metrics", not(feature = "otel-metrics")))]
203 {
204 let recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
205 let handle = recorder.handle();
206 metrics::set_global_recorder(recorder).expect("failed to install Prometheus recorder");
207 RecorderSetup {
208 prom_handle: Some(handle),
209 }
210 }
211
212 #[cfg(all(feature = "otel-metrics", not(feature = "metrics")))]
214 {
215 match otel::build_otel_recorder(&config.namespace, &config.otel) {
216 Ok((otel_recorder, provider)) => {
217 opentelemetry::global::set_meter_provider(provider.clone());
218 metrics::set_global_recorder(otel_recorder)
219 .expect("failed to set OTel metrics recorder");
220 RecorderSetup {
221 otel_provider: Some(provider),
222 }
223 }
224 Err(e) => {
225 tracing::warn!(error = %e, "Failed to build OTel metrics recorder");
226 RecorderSetup {
227 otel_provider: None,
228 }
229 }
230 }
231 }
232
233 #[cfg(all(feature = "metrics", feature = "otel-metrics"))]
235 {
236 let prom_recorder = metrics_exporter_prometheus::PrometheusBuilder::new().build_recorder();
238 let prom_handle = prom_recorder.handle();
239
240 match otel::build_otel_recorder(&config.namespace, &config.otel) {
242 Ok((otel_recorder, provider)) => {
243 opentelemetry::global::set_meter_provider(provider.clone());
244
245 let fanout = metrics_util::layers::FanoutBuilder::default()
247 .add_recorder(prom_recorder)
248 .add_recorder(otel_recorder)
249 .build();
250
251 metrics::set_global_recorder(fanout).expect("failed to set Fanout recorder");
252
253 RecorderSetup {
254 prom_handle: Some(prom_handle),
255 otel_provider: Some(provider),
256 }
257 }
258 Err(e) => {
259 tracing::warn!(error = %e, "Failed to build OTel recorder, falling back to Prometheus only");
261 metrics::set_global_recorder(prom_recorder)
262 .expect("failed to set Prometheus recorder");
263 RecorderSetup {
264 prom_handle: Some(prom_handle),
265 otel_provider: None,
266 }
267 }
268 }
269 }
270}
271
272pub struct MetricsManager {
274 #[cfg(feature = "metrics")]
275 handle: Option<PrometheusHandle>,
276 config: MetricsConfig,
277 shutdown_tx: Option<oneshot::Sender<()>>,
278 process_metrics: Option<ProcessMetrics>,
279 container_metrics: Option<ContainerMetrics>,
280 readiness_fn: Option<ReadinessFn>,
281 registry: MetricRegistry,
282 #[cfg(all(feature = "metrics", feature = "scaling"))]
283 scaling_pressure: Option<Arc<crate::scaling::ScalingPressure>>,
284 #[cfg(all(feature = "metrics", feature = "memory"))]
285 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
286 #[cfg(feature = "otel-metrics")]
287 otel_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
288}
289
290impl MetricsManager {
291 #[must_use]
293 pub fn new(namespace: &str) -> Self {
294 Self::with_config(MetricsConfig {
295 namespace: namespace.to_string(),
296 ..Default::default()
297 })
298 }
299
300 #[must_use]
307 pub fn with_config(config: MetricsConfig) -> Self {
308 let setup = install_recorders(&config);
309
310 let process_metrics = if config.enable_process_metrics {
311 Some(ProcessMetrics::new(&config.namespace))
312 } else {
313 None
314 };
315
316 let container_metrics = if config.enable_container_metrics {
317 Some(ContainerMetrics::new(&config.namespace))
318 } else {
319 None
320 };
321
322 let registry = MetricRegistry::new(&config.namespace);
323
324 Self {
325 #[cfg(feature = "metrics")]
326 handle: setup.prom_handle,
327 registry,
328 config,
329 shutdown_tx: None,
330 process_metrics,
331 container_metrics,
332 readiness_fn: None,
333 #[cfg(all(feature = "metrics", feature = "scaling"))]
334 scaling_pressure: None,
335 #[cfg(all(feature = "metrics", feature = "memory"))]
336 memory_guard: None,
337 #[cfg(feature = "otel-metrics")]
338 otel_provider: setup.otel_provider,
339 }
340 }
341
342 #[must_use]
347 pub fn counter(&self, name: &str, description: &str) -> Counter {
348 let key = self.prefixed_key(name);
349 let desc = description.to_string();
350 metrics::describe_counter!(key.clone(), desc.clone());
351 self.registry.push(MetricDescriptor {
352 name: key.clone(),
353 metric_type: MetricType::Counter,
354 description: desc,
355 unit: String::new(),
356 labels: vec![],
357 group: "custom".into(),
358 buckets: None,
359 use_cases: vec![],
360 dashboard_hint: None,
361 });
362 metrics::counter!(key)
363 }
364
365 #[must_use]
371 pub fn counter_with_labels(
372 &self,
373 name: &str,
374 description: &str,
375 labels: &[&str],
376 group: &str,
377 ) -> Counter {
378 let key = self.prefixed_key(name);
379 let desc = description.to_string();
380 metrics::describe_counter!(key.clone(), desc.clone());
381 self.registry.push(MetricDescriptor {
382 name: key.clone(),
383 metric_type: MetricType::Counter,
384 description: desc,
385 unit: String::new(),
386 labels: labels.iter().map(|s| (*s).to_string()).collect(),
387 group: group.into(),
388 buckets: None,
389 use_cases: vec![],
390 dashboard_hint: None,
391 });
392 metrics::counter!(key)
393 }
394
395 #[must_use]
400 pub fn gauge(&self, name: &str, description: &str) -> Gauge {
401 let key = self.prefixed_key(name);
402 let desc = description.to_string();
403 metrics::describe_gauge!(key.clone(), desc.clone());
404 self.registry.push(MetricDescriptor {
405 name: key.clone(),
406 metric_type: MetricType::Gauge,
407 description: desc,
408 unit: String::new(),
409 labels: vec![],
410 group: "custom".into(),
411 buckets: None,
412 use_cases: vec![],
413 dashboard_hint: None,
414 });
415 metrics::gauge!(key)
416 }
417
418 #[must_use]
420 pub fn gauge_with_labels(
421 &self,
422 name: &str,
423 description: &str,
424 labels: &[&str],
425 group: &str,
426 ) -> Gauge {
427 let key = self.prefixed_key(name);
428 let desc = description.to_string();
429 metrics::describe_gauge!(key.clone(), desc.clone());
430 self.registry.push(MetricDescriptor {
431 name: key.clone(),
432 metric_type: MetricType::Gauge,
433 description: desc,
434 unit: String::new(),
435 labels: labels.iter().map(|s| (*s).to_string()).collect(),
436 group: group.into(),
437 buckets: None,
438 use_cases: vec![],
439 dashboard_hint: None,
440 });
441 metrics::gauge!(key)
442 }
443
444 #[must_use]
449 pub fn histogram(&self, name: &str, description: &str) -> Histogram {
450 let key = self.prefixed_key(name);
451 let desc = description.to_string();
452 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
453 self.registry.push(MetricDescriptor {
454 name: key.clone(),
455 metric_type: MetricType::Histogram,
456 description: desc,
457 unit: "seconds".into(),
458 labels: vec![],
459 group: "custom".into(),
460 buckets: None,
461 use_cases: vec![],
462 dashboard_hint: None,
463 });
464 metrics::histogram!(key)
465 }
466
467 #[must_use]
469 pub fn histogram_with_labels(
470 &self,
471 name: &str,
472 description: &str,
473 labels: &[&str],
474 group: &str,
475 buckets: Option<&[f64]>,
476 ) -> Histogram {
477 let key = self.prefixed_key(name);
478 let desc = description.to_string();
479 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
480 self.registry.push(MetricDescriptor {
481 name: key.clone(),
482 metric_type: MetricType::Histogram,
483 description: desc,
484 unit: "seconds".into(),
485 labels: labels.iter().map(|s| (*s).to_string()).collect(),
486 group: group.into(),
487 buckets: buckets.map(|b| b.to_vec()),
488 use_cases: vec![],
489 dashboard_hint: None,
490 });
491 metrics::histogram!(key)
492 }
493
494 #[must_use]
500 pub fn histogram_with_buckets(
501 &self,
502 name: &str,
503 description: &str,
504 buckets: &[f64],
505 ) -> Histogram {
506 let key = self.prefixed_key(name);
507 let desc = description.to_string();
508 metrics::describe_histogram!(key.clone(), Unit::Seconds, desc.clone());
509 self.registry.push(MetricDescriptor {
510 name: key.clone(),
511 metric_type: MetricType::Histogram,
512 description: desc,
513 unit: "seconds".into(),
514 labels: vec![],
515 group: "custom".into(),
516 buckets: Some(buckets.to_vec()),
517 use_cases: vec![],
518 dashboard_hint: None,
519 });
520 metrics::histogram!(key)
521 }
522
523 #[cfg(feature = "metrics")]
528 #[must_use]
529 pub fn render(&self) -> String {
530 self.handle
531 .as_ref()
532 .map_or_else(String::new, PrometheusHandle::render)
533 }
534
535 #[cfg(feature = "metrics")]
558 #[must_use]
559 pub fn render_handle(&self) -> Option<RenderHandle> {
560 self.handle.clone().map(RenderHandle)
561 }
562
563 pub fn set_readiness_check(&mut self, f: impl Fn() -> bool + Send + Sync + 'static) {
569 self.readiness_fn = Some(Arc::new(f));
570 }
571
572 #[cfg(all(feature = "metrics", feature = "scaling"))]
577 pub fn set_scaling_pressure(&mut self, sp: Arc<crate::scaling::ScalingPressure>) {
578 self.scaling_pressure = Some(sp);
579 }
580
581 #[cfg(all(feature = "metrics", feature = "memory"))]
586 pub fn set_memory_guard(&mut self, mg: Arc<crate::memory::MemoryGuard>) {
587 self.memory_guard = Some(mg);
588 }
589
590 pub fn update(&self) {
592 if let Some(ref pm) = self.process_metrics {
593 pm.update();
594 }
595 if let Some(ref cm) = self.container_metrics {
596 cm.update();
597 }
598 }
599
600 #[cfg(feature = "metrics")]
611 pub async fn start_server(&mut self, addr: &str) -> Result<(), MetricsError> {
612 if self.shutdown_tx.is_some() {
613 return Err(MetricsError::AlreadyRunning);
614 }
615
616 let addr: SocketAddr = addr
617 .parse()
618 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
619
620 let listener = TcpListener::bind(addr)
621 .await
622 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
623
624 let (shutdown_tx, shutdown_rx) = oneshot::channel();
625 self.shutdown_tx = Some(shutdown_tx);
626
627 let handle = self
628 .handle
629 .as_ref()
630 .ok_or_else(|| {
631 MetricsError::ServerError(
632 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
633 )
634 })?
635 .clone();
636 let update_interval = self.config.update_interval;
637 let process_metrics = self.process_metrics.clone();
638 let container_metrics = self.container_metrics.clone();
639 let readiness_fn = self.readiness_fn.clone();
640
641 let registry = self.registry();
642
643 tokio::spawn(async move {
644 run_server(
645 listener,
646 handle,
647 registry,
648 shutdown_rx,
649 update_interval,
650 process_metrics,
651 container_metrics,
652 readiness_fn,
653 )
654 .await;
655 });
656
657 Ok(())
658 }
659
660 #[cfg(all(feature = "metrics", feature = "http-server"))]
678 pub async fn start_server_with_routes(
679 &mut self,
680 addr: &str,
681 extra_routes: axum::Router,
682 ) -> Result<(), MetricsError> {
683 if self.shutdown_tx.is_some() {
684 return Err(MetricsError::AlreadyRunning);
685 }
686
687 let addr: SocketAddr = addr
688 .parse()
689 .map_err(|e| MetricsError::ServerError(format!("invalid address: {e}")))?;
690
691 let listener = TcpListener::bind(addr)
692 .await
693 .map_err(|e| MetricsError::ServerError(e.to_string()))?;
694
695 let (shutdown_tx, shutdown_rx) = oneshot::channel();
696 self.shutdown_tx = Some(shutdown_tx);
697
698 let handle = self
699 .handle
700 .as_ref()
701 .ok_or_else(|| {
702 MetricsError::ServerError(
703 "Prometheus handle not configured — MetricsManager was created without a recorder".into(),
704 )
705 })?
706 .clone();
707 let update_interval = self.config.update_interval;
708 let process_metrics = self.process_metrics.clone();
709 let container_metrics = self.container_metrics.clone();
710 let readiness_fn = self.readiness_fn.clone();
711
712 let metrics_handle = handle.clone();
714 let readiness_for_live = readiness_fn.clone();
715 let registry_handle = self.registry();
716
717 let mut app = axum::Router::new()
718 .route(
719 "/metrics/manifest",
720 axum::routing::get(move || {
721 let reg = registry_handle.clone();
722 async move {
723 (
724 [(axum::http::header::CONTENT_TYPE, "application/json")],
725 serde_json::to_string(®.manifest()).unwrap_or_default(),
726 )
727 }
728 }),
729 )
730 .route(
731 "/metrics",
732 axum::routing::get(move || {
733 let h = metrics_handle.clone();
734 async move { h.render() }
735 }),
736 )
737 .route(
738 "/healthz",
739 axum::routing::get(|| async {
740 (
741 [(axum::http::header::CONTENT_TYPE, "application/json")],
742 r#"{"status":"alive"}"#,
743 )
744 }),
745 )
746 .route(
747 "/health/live",
748 axum::routing::get(|| async {
749 (
750 [(axum::http::header::CONTENT_TYPE, "application/json")],
751 r#"{"status":"alive"}"#,
752 )
753 }),
754 )
755 .route(
756 "/readyz",
757 axum::routing::get(move || {
758 let rf = readiness_fn.clone();
759 async move { readiness_response(rf) }
760 }),
761 )
762 .route(
763 "/health/ready",
764 axum::routing::get(move || {
765 let rf = readiness_for_live.clone();
766 async move { readiness_response(rf) }
767 }),
768 );
769
770 #[cfg(feature = "scaling")]
772 if let Some(ref sp) = self.scaling_pressure {
773 let sp = sp.clone();
774 app = app.route(
775 "/scaling/pressure",
776 axum::routing::get(move || {
777 let s = sp.clone();
778 async move { format!("{:.2}", s.calculate()) }
779 }),
780 );
781 }
782
783 #[cfg(feature = "memory")]
785 if let Some(ref mg) = self.memory_guard {
786 let mg = mg.clone();
787 app = app.route(
788 "/memory/pressure",
789 axum::routing::get(move || {
790 let m = mg.clone();
791 async move {
792 (
793 [(axum::http::header::CONTENT_TYPE, "application/json")],
794 format!(
795 r#"{{"under_pressure":{},"ratio":{:.3},"current_bytes":{},"limit_bytes":{}}}"#,
796 m.under_pressure(),
797 m.pressure_ratio(),
798 m.current_bytes(),
799 m.limit_bytes()
800 ),
801 )
802 }
803 }),
804 );
805 }
806
807 app = app.merge(extra_routes);
809
810 tokio::spawn(async move {
811 run_axum_server(
812 listener,
813 app,
814 shutdown_rx,
815 update_interval,
816 process_metrics,
817 container_metrics,
818 )
819 .await;
820 });
821
822 Ok(())
823 }
824
825 pub async fn stop_server(&mut self) -> Result<(), MetricsError> {
831 if let Some(tx) = self.shutdown_tx.take() {
832 let _ = tx.send(());
833 Ok(())
834 } else {
835 Err(MetricsError::NotRunning)
836 }
837 }
838
839 #[cfg(feature = "otel-metrics")]
843 pub fn shutdown_otel(&mut self) {
844 if let Some(provider) = self.otel_provider.take()
845 && let Err(e) = provider.shutdown()
846 {
847 tracing::warn!(error = %e, "OTel provider shutdown error");
848 }
849 }
850
851 pub fn set_build_info(&self, version: &str, commit: &str) {
857 self.registry.set_build_info(version, commit);
858 }
859
860 pub fn set_use_cases(&self, metric_name: &str, use_cases: &[&str]) {
864 self.registry.set_use_cases(metric_name, use_cases);
865 }
866
867 pub fn set_dashboard_hint(&self, metric_name: &str, hint: &str) {
871 self.registry.set_dashboard_hint(metric_name, hint);
872 }
873
874 #[must_use]
879 pub fn registry(&self) -> MetricRegistry {
880 self.registry.clone()
881 }
882
883 #[must_use]
887 pub fn namespace(&self) -> &str {
888 &self.config.namespace
889 }
890
891 fn prefixed_key(&self, name: &str) -> String {
893 if self.config.namespace.is_empty() {
894 name.to_string()
895 } else {
896 format!("{}_{}", self.config.namespace, name)
897 }
898 }
899}
900
901#[cfg(feature = "metrics")]
903#[allow(clippy::too_many_arguments)]
904async fn run_server(
905 listener: TcpListener,
906 handle: PrometheusHandle,
907 registry: MetricRegistry,
908 mut shutdown_rx: oneshot::Receiver<()>,
909 update_interval: Duration,
910 process_metrics: Option<ProcessMetrics>,
911 container_metrics: Option<ContainerMetrics>,
912 readiness_fn: Option<ReadinessFn>,
913) {
914 let mut update_interval = tokio::time::interval(update_interval);
915
916 loop {
917 tokio::select! {
918 _ = &mut shutdown_rx => {
919 break;
920 }
921 _ = update_interval.tick() => {
922 if let Some(ref pm) = process_metrics {
923 pm.update();
924 }
925 if let Some(ref cm) = container_metrics {
926 cm.update();
927 }
928 }
929 result = listener.accept() => {
930 if let Ok((stream, _)) = result {
931 let handle = handle.clone();
932 let registry = registry.clone();
933 let readiness_fn = readiness_fn.clone();
934 tokio::spawn(async move {
935 handle_connection(stream, handle, registry, readiness_fn).await;
936 });
937 }
938 }
939 }
940 }
941}
942
943#[cfg(feature = "metrics")]
948async fn handle_connection(
949 mut stream: tokio::net::TcpStream,
950 handle: PrometheusHandle,
951 registry: MetricRegistry,
952 readiness_fn: Option<ReadinessFn>,
953) {
954 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
955
956 let mut reader = BufReader::new(&mut stream);
957 let mut request_line = String::new();
958
959 if reader.read_line(&mut request_line).await.is_err() {
960 return;
961 }
962
963 let (status, content_type, body) = if request_line.starts_with("GET /metrics/manifest") {
965 (
966 "200 OK",
967 "application/json",
968 serde_json::to_string(®istry.manifest()).unwrap_or_default(),
969 )
970 } else if request_line.starts_with("GET /metrics") {
971 ("200 OK", "text/plain; charset=utf-8", handle.render())
972 } else if request_line.starts_with("GET /healthz")
973 || request_line.starts_with("GET /health/live")
974 {
975 (
976 "200 OK",
977 "application/json",
978 r#"{"status":"alive"}"#.to_string(),
979 )
980 } else if request_line.starts_with("GET /readyz")
981 || request_line.starts_with("GET /health/ready")
982 {
983 let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
984
985 #[cfg(feature = "health")]
986 let registry_ready = crate::health::HealthRegistry::is_ready();
987 #[cfg(not(feature = "health"))]
988 let registry_ready = true;
989
990 let ready = callback_ready && registry_ready;
991 if ready {
992 (
993 "200 OK",
994 "application/json",
995 r#"{"status":"ready"}"#.to_string(),
996 )
997 } else {
998 (
999 "503 Service Unavailable",
1000 "application/json",
1001 r#"{"status":"not_ready"}"#.to_string(),
1002 )
1003 }
1004 } else {
1005 (
1006 "404 Not Found",
1007 "text/plain; charset=utf-8",
1008 "Not Found".to_string(),
1009 )
1010 };
1011
1012 let response = format!(
1013 "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\n\r\n{body}",
1014 body.len()
1015 );
1016
1017 let _ = stream.write_all(response.as_bytes()).await;
1018}
1019
1020#[cfg(all(feature = "metrics", feature = "http-server"))]
1026fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
1027 use axum::response::IntoResponse;
1028
1029 let callback_ready = rf.as_ref().is_none_or(|f| f());
1030
1031 #[cfg(feature = "health")]
1032 let registry_ready = crate::health::HealthRegistry::is_ready();
1033 #[cfg(not(feature = "health"))]
1034 let registry_ready = true;
1035
1036 let ready = callback_ready && registry_ready;
1037 if ready {
1038 (
1039 [(axum::http::header::CONTENT_TYPE, "application/json")],
1040 r#"{"status":"ready"}"#,
1041 )
1042 .into_response()
1043 } else {
1044 (
1045 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1046 [(axum::http::header::CONTENT_TYPE, "application/json")],
1047 r#"{"status":"not_ready"}"#,
1048 )
1049 .into_response()
1050 }
1051}
1052
1053#[cfg(all(feature = "metrics", feature = "http-server"))]
1055async fn run_axum_server(
1056 listener: TcpListener,
1057 app: axum::Router,
1058 shutdown_rx: oneshot::Receiver<()>,
1059 update_interval: Duration,
1060 process_metrics: Option<ProcessMetrics>,
1061 container_metrics: Option<ContainerMetrics>,
1062) {
1063 let mut interval = tokio::time::interval(update_interval);
1064
1065 let (update_stop_tx, mut update_stop_rx) = oneshot::channel::<()>();
1067 tokio::spawn(async move {
1068 loop {
1069 tokio::select! {
1070 _ = &mut update_stop_rx => break,
1071 _ = interval.tick() => {
1072 if let Some(ref pm) = process_metrics {
1073 pm.update();
1074 }
1075 if let Some(ref cm) = container_metrics {
1076 cm.update();
1077 }
1078 }
1079 }
1080 }
1081 });
1082
1083 axum::serve(listener, app)
1085 .with_graceful_shutdown(async move {
1086 let _ = shutdown_rx.await;
1087 })
1088 .await
1089 .unwrap_or_else(|e| tracing::error!(error = %e, "Metrics axum server error"));
1090
1091 let _ = update_stop_tx.send(());
1092}
1093
1094#[must_use]
1096pub fn latency_buckets() -> Vec<f64> {
1097 vec![
1098 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1099 ]
1100}
1101
1102#[must_use]
1104pub fn size_buckets() -> Vec<f64> {
1105 vec![
1106 100.0,
1107 1_000.0,
1108 10_000.0,
1109 100_000.0,
1110 1_000_000.0,
1111 10_000_000.0,
1112 ]
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118
1119 #[test]
1120 fn test_metrics_config_default() {
1121 let config = MetricsConfig::default();
1122 assert!(config.namespace.is_empty());
1123 assert!(config.enable_process_metrics);
1124 assert!(config.enable_container_metrics);
1125 assert_eq!(config.update_interval, Duration::from_secs(15));
1126 }
1127
1128 #[test]
1129 fn test_latency_buckets() {
1130 let buckets = latency_buckets();
1131 assert_eq!(buckets.len(), 12);
1132 assert!(buckets[0] < buckets[11]);
1133 }
1134
1135 #[test]
1136 fn test_size_buckets() {
1137 let buckets = size_buckets();
1138 assert_eq!(buckets.len(), 6);
1139 assert!(buckets[0] < buckets[5]);
1140 }
1141}