1use axum::{
5 Router,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, sse::Event},
9 routing::get,
10};
11use dynamo_runtime::metrics::prometheus_names::{
12 frontend_service, name_prefix, sanitize_frontend_prometheus_prefix,
13};
14use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
15use serde::Serialize;
16use std::{
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use crate::discovery::ModelEntry;
22use crate::local_model::runtime_config::ModelRuntimeConfig;
23use crate::model_card::{ModelDeploymentCard, ROOT_PATH as MDC_ROOT_PATH};
24use dynamo_runtime::metrics::prometheus_names::clamp_u64_to_i64;
25use dynamo_runtime::slug::Slug;
26use dynamo_runtime::storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager};
27
28pub use prometheus::Registry;
29
30use super::RouteDoc;
31
32pub struct Metrics {
33 request_counter: IntCounterVec,
34 inflight_gauge: IntGaugeVec,
35 client_disconnect_gauge: prometheus::IntGauge,
36 http_queue_gauge: IntGaugeVec,
37 request_duration: HistogramVec,
38 input_sequence_length: HistogramVec,
39 output_sequence_length: HistogramVec,
40 time_to_first_token: HistogramVec,
41 inter_token_latency: HistogramVec,
42
43 model_total_kv_blocks: IntGaugeVec,
47 model_max_num_seqs: IntGaugeVec,
48 model_max_num_batched_tokens: IntGaugeVec,
49 model_context_length: IntGaugeVec,
50 model_kv_cache_block_size: IntGaugeVec,
51 model_migration_limit: IntGaugeVec,
52}
53
54pub struct HttpQueueGuard {
62 metrics: Arc<Metrics>,
63 model: String,
64}
65
66pub struct InflightGuard {
71 metrics: Arc<Metrics>,
72 model: String,
73 endpoint: Endpoint,
74 request_type: RequestType,
75 status: Status,
76 timer: Instant,
77}
78
79pub enum Endpoint {
82 Completions,
84
85 ChatCompletions,
87
88 Embeddings,
90
91 Responses,
93
94 Tensor,
96}
97
98pub enum RequestType {
100 Unary,
102
103 Stream,
105}
106
107#[derive(PartialEq)]
109pub enum Status {
110 Success,
111 Error,
112}
113
114pub struct ResponseMetricCollector {
116 metrics: Arc<Metrics>,
117 model: String,
118 start_time: Instant,
119 is_first_token: bool,
122 last_response_time: Option<Duration>,
125 osl: usize,
126}
127
128impl Default for Metrics {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134impl Metrics {
135 pub fn new() -> Self {
167 let raw_prefix = std::env::var(frontend_service::METRICS_PREFIX_ENV)
168 .unwrap_or_else(|_| name_prefix::FRONTEND.to_string());
169 let prefix = sanitize_frontend_prometheus_prefix(&raw_prefix);
170 if prefix != raw_prefix {
171 tracing::warn!(
172 raw=%raw_prefix,
173 sanitized=%prefix,
174 env=%frontend_service::METRICS_PREFIX_ENV,
175 "Sanitized HTTP metrics prefix"
176 );
177 }
178 let frontend_metric_name = |suffix: &str| format!("{}_{}", &prefix, suffix);
179
180 let request_counter = IntCounterVec::new(
181 Opts::new(
182 frontend_metric_name(frontend_service::REQUESTS_TOTAL),
183 "Total number of LLM requests processed",
184 ),
185 &["model", "endpoint", "request_type", "status"],
186 )
187 .unwrap();
188
189 let inflight_gauge = IntGaugeVec::new(
190 Opts::new(
191 frontend_metric_name(frontend_service::INFLIGHT_REQUESTS_TOTAL),
192 "Number of inflight requests",
193 ),
194 &["model"],
195 )
196 .unwrap();
197
198 let client_disconnect_gauge = prometheus::IntGauge::new(
199 frontend_metric_name("client_disconnects"),
200 "Number of connections dropped by clients",
201 )
202 .unwrap();
203
204 let http_queue_gauge = IntGaugeVec::new(
205 Opts::new(
206 frontend_metric_name(frontend_service::QUEUED_REQUESTS_TOTAL),
207 "Number of requests in HTTP processing queue",
208 ),
209 &["model"],
210 )
211 .unwrap();
212
213 let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
214
215 let request_duration = HistogramVec::new(
216 HistogramOpts::new(
217 frontend_metric_name(frontend_service::REQUEST_DURATION_SECONDS),
218 "Duration of LLM requests",
219 )
220 .buckets(buckets),
221 &["model"],
222 )
223 .unwrap();
224
225 let input_sequence_length = HistogramVec::new(
226 HistogramOpts::new(
227 frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS),
228 "Input sequence length in tokens",
229 )
230 .buckets(vec![
231 0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0, 64000.0,
232 128000.0,
233 ]),
234 &["model"],
235 )
236 .unwrap();
237
238 let output_sequence_length = HistogramVec::new(
239 HistogramOpts::new(
240 frontend_metric_name(frontend_service::OUTPUT_SEQUENCE_TOKENS),
241 "Output sequence length in tokens",
242 )
243 .buckets(vec![
244 0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0,
245 ]),
246 &["model"],
247 )
248 .unwrap();
249
250 let time_to_first_token = HistogramVec::new(
251 HistogramOpts::new(
252 frontend_metric_name(frontend_service::TIME_TO_FIRST_TOKEN_SECONDS),
253 "Time to first token in seconds",
254 )
255 .buckets(vec![
256 0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0,
257 60.0, 120.0, 240.0, 480.0,
258 ]),
259 &["model"],
260 )
261 .unwrap();
262
263 let inter_token_latency = HistogramVec::new(
264 HistogramOpts::new(
265 frontend_metric_name(frontend_service::INTER_TOKEN_LATENCY_SECONDS),
266 "Inter-token latency in seconds",
267 )
268 .buckets(vec![
269 0.0, 0.001, 0.005, 0.01, 0.015, 0.02, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0,
270 ]),
271 &["model"],
272 )
273 .unwrap();
274
275 let model_total_kv_blocks = IntGaugeVec::new(
280 Opts::new(
281 frontend_metric_name(frontend_service::MODEL_TOTAL_KV_BLOCKS),
282 "Total KV cache blocks available for a worker serving the model",
283 ),
284 &["model"],
285 )
286 .unwrap();
287
288 let model_max_num_seqs = IntGaugeVec::new(
289 Opts::new(
290 frontend_metric_name(frontend_service::MODEL_MAX_NUM_SEQS),
291 "Maximum number of sequences for a worker serving the model",
292 ),
293 &["model"],
294 )
295 .unwrap();
296
297 let model_max_num_batched_tokens = IntGaugeVec::new(
298 Opts::new(
299 frontend_metric_name(frontend_service::MODEL_MAX_NUM_BATCHED_TOKENS),
300 "Maximum number of batched tokens for a worker serving the model",
301 ),
302 &["model"],
303 )
304 .unwrap();
305
306 let model_context_length = IntGaugeVec::new(
307 Opts::new(
308 frontend_metric_name(frontend_service::MODEL_CONTEXT_LENGTH),
309 "Maximum context length in tokens for a worker serving the model",
310 ),
311 &["model"],
312 )
313 .unwrap();
314
315 let model_kv_cache_block_size = IntGaugeVec::new(
316 Opts::new(
317 frontend_metric_name(frontend_service::MODEL_KV_CACHE_BLOCK_SIZE),
318 "KV cache block size in tokens for a worker serving the model",
319 ),
320 &["model"],
321 )
322 .unwrap();
323
324 let model_migration_limit = IntGaugeVec::new(
325 Opts::new(
326 frontend_metric_name(frontend_service::MODEL_MIGRATION_LIMIT),
327 "Maximum number of request migrations allowed for the model",
328 ),
329 &["model"],
330 )
331 .unwrap();
332
333 Metrics {
334 request_counter,
335 inflight_gauge,
336 client_disconnect_gauge,
337 http_queue_gauge,
338 request_duration,
339 input_sequence_length,
340 output_sequence_length,
341 time_to_first_token,
342 inter_token_latency,
343 model_total_kv_blocks,
344 model_max_num_seqs,
345 model_max_num_batched_tokens,
346 model_context_length,
347 model_kv_cache_block_size,
348 model_migration_limit,
349 }
350 }
351
352 pub fn get_request_counter(
358 &self,
359 model: &str,
360 endpoint: &Endpoint,
361 request_type: &RequestType,
362 status: &Status,
363 ) -> u64 {
364 self.request_counter
365 .with_label_values(&[
366 model,
367 endpoint.as_str(),
368 request_type.as_str(),
369 status.as_str(),
370 ])
371 .get()
372 }
373
374 fn inc_request_counter(
380 &self,
381 model: &str,
382 endpoint: &Endpoint,
383 request_type: &RequestType,
384 status: &Status,
385 ) {
386 self.request_counter
387 .with_label_values(&[
388 model,
389 endpoint.as_str(),
390 request_type.as_str(),
391 status.as_str(),
392 ])
393 .inc()
394 }
395
396 pub fn get_inflight_count(&self, model: &str) -> i64 {
398 self.inflight_gauge.with_label_values(&[model]).get()
399 }
400
401 fn inc_inflight_gauge(&self, model: &str) {
402 self.inflight_gauge.with_label_values(&[model]).inc()
403 }
404
405 fn dec_inflight_gauge(&self, model: &str) {
406 self.inflight_gauge.with_label_values(&[model]).dec()
407 }
408
409 pub fn inc_client_disconnect(&self) {
411 self.client_disconnect_gauge.inc();
412 }
413
414 pub fn get_client_disconnect_count(&self) -> i64 {
416 self.client_disconnect_gauge.get()
417 }
418
419 fn inc_http_queue_gauge(&self, model: &str) {
420 self.http_queue_gauge.with_label_values(&[model]).inc()
421 }
422
423 fn dec_http_queue_gauge(&self, model: &str) {
424 self.http_queue_gauge.with_label_values(&[model]).dec()
425 }
426
427 pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
428 registry.register(Box::new(self.request_counter.clone()))?;
429 registry.register(Box::new(self.inflight_gauge.clone()))?;
430 registry.register(Box::new(self.client_disconnect_gauge.clone()))?;
431 registry.register(Box::new(self.http_queue_gauge.clone()))?;
432 registry.register(Box::new(self.request_duration.clone()))?;
433 registry.register(Box::new(self.input_sequence_length.clone()))?;
434 registry.register(Box::new(self.output_sequence_length.clone()))?;
435 registry.register(Box::new(self.time_to_first_token.clone()))?;
436 registry.register(Box::new(self.inter_token_latency.clone()))?;
437
438 registry.register(Box::new(self.model_total_kv_blocks.clone()))?;
440 registry.register(Box::new(self.model_max_num_seqs.clone()))?;
441 registry.register(Box::new(self.model_max_num_batched_tokens.clone()))?;
442 registry.register(Box::new(self.model_context_length.clone()))?;
443 registry.register(Box::new(self.model_kv_cache_block_size.clone()))?;
444 registry.register(Box::new(self.model_migration_limit.clone()))?;
445
446 Ok(())
447 }
448
449 pub fn update_runtime_config_metrics(
452 &self,
453 model_name: &str,
454 runtime_config: &ModelRuntimeConfig,
455 ) {
456 if let Some(total_kv_blocks) = runtime_config.total_kv_blocks {
457 self.model_total_kv_blocks
458 .with_label_values(&[model_name])
459 .set(clamp_u64_to_i64(total_kv_blocks));
460 }
461
462 if let Some(max_num_seqs) = runtime_config.max_num_seqs {
463 self.model_max_num_seqs
464 .with_label_values(&[model_name])
465 .set(clamp_u64_to_i64(max_num_seqs));
466 }
467
468 if let Some(max_batched_tokens) = runtime_config.max_num_batched_tokens {
469 self.model_max_num_batched_tokens
470 .with_label_values(&[model_name])
471 .set(clamp_u64_to_i64(max_batched_tokens));
472 }
473 }
474
475 pub fn update_mdc_metrics(
478 &self,
479 model_name: &str,
480 context_length: u32,
481 kv_cache_block_size: u32,
482 migration_limit: u32,
483 ) {
484 self.model_context_length
485 .with_label_values(&[model_name])
486 .set(context_length as i64);
487
488 self.model_kv_cache_block_size
489 .with_label_values(&[model_name])
490 .set(kv_cache_block_size as i64);
491
492 self.model_migration_limit
493 .with_label_values(&[model_name])
494 .set(migration_limit as i64);
495 }
496
497 pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) {
501 if let Some(runtime_config) = &model_entry.runtime_config {
502 self.update_runtime_config_metrics(&model_entry.name, runtime_config);
503 }
504 }
505
506 pub async fn update_metrics_from_model_entry_with_mdc(
509 &self,
510 model_entry: &ModelEntry,
511 etcd_client: &dynamo_runtime::transports::etcd::Client,
512 ) -> anyhow::Result<()> {
513 if let Some(runtime_config) = &model_entry.runtime_config {
515 self.update_runtime_config_metrics(&model_entry.name, runtime_config);
516 }
517
518 let model_slug = Slug::from_string(&model_entry.name);
520 let store: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
521 let card_store = Arc::new(KeyValueStoreManager::new(store));
522
523 match card_store
524 .load::<ModelDeploymentCard>(MDC_ROOT_PATH, &model_slug)
525 .await
526 {
527 Ok(Some(mdc)) => {
528 self.update_mdc_metrics(
529 &model_entry.name,
530 mdc.context_length,
531 mdc.kv_cache_block_size,
532 mdc.migration_limit,
533 );
534 tracing::debug!(
535 model = %model_entry.name,
536 "Successfully updated MDC metrics"
537 );
538 }
539 Ok(None) => {
540 tracing::debug!(
541 model = %model_entry.name,
542 "No MDC found in storage, skipping MDC metrics"
543 );
544 }
545 Err(e) => {
546 tracing::debug!(
547 model = %model_entry.name,
548 error = %e,
549 "Failed to load MDC for metrics update"
550 );
551 }
552 }
553
554 Ok(())
555 }
556
557 pub fn start_runtime_config_polling_task(
582 metrics: Arc<Self>,
583 manager: Arc<crate::discovery::ModelManager>,
584 etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
585 poll_interval: Duration,
586 cancel_token: tokio_util::sync::CancellationToken,
587 ) -> tokio::task::JoinHandle<()> {
588 tokio::spawn(async move {
589 let mut interval = tokio::time::interval(poll_interval);
590 let mut known_models = std::collections::HashSet::new();
591
592 tracing::info!(
593 interval_secs = poll_interval.as_secs(),
594 "Starting runtime config metrics polling task (metrics never removed)"
595 );
596
597 loop {
598 tokio::select! {
599 _ = cancel_token.cancelled() => {
600 tracing::info!("Runtime config metrics polling task cancelled");
601 break;
602 }
603 _ = interval.tick() => {
604 }
606 }
607
608 let current_entries = manager.get_model_entries();
610 let mut current_models = std::collections::HashSet::new();
611
612 for entry in current_entries {
620 if !current_models.insert(entry.name.clone()) {
622 tracing::debug!(
623 model_name = %entry.name,
624 endpoint = ?entry.endpoint_id,
625 "Skipping duplicate model instance - only first instance config metrics are recorded"
626 );
627 continue;
628 }
629
630 if let Some(runtime_config) = &entry.runtime_config {
632 metrics.update_runtime_config_metrics(&entry.name, runtime_config);
633 }
634
635 if let Some(ref etcd) = etcd_client
637 && let Err(e) = metrics
638 .update_metrics_from_model_entry_with_mdc(&entry, etcd)
639 .await
640 {
641 tracing::debug!(
642 model = %entry.name,
643 error = %e,
644 "Failed to update MDC metrics (this is normal if MDC is not available)"
645 );
646 }
647 }
648
649 known_models.extend(current_models.iter().cloned());
651
652 tracing::trace!(
653 active_models = current_models.len(),
654 total_known_models = known_models.len(),
655 "Updated runtime config metrics for active models"
656 );
657 }
658 })
659 }
660
661 pub fn create_inflight_guard(
674 self: Arc<Self>,
675 model: &str,
676 endpoint: Endpoint,
677 streaming: bool,
678 ) -> InflightGuard {
679 let request_type = if streaming {
680 RequestType::Stream
681 } else {
682 RequestType::Unary
683 };
684
685 InflightGuard::new(
686 self.clone(),
687 model.to_string().to_lowercase(),
688 endpoint,
689 request_type,
690 )
691 }
692
693 pub fn create_response_collector(self: Arc<Self>, model: &str) -> ResponseMetricCollector {
695 ResponseMetricCollector::new(self, model.to_string().to_lowercase())
696 }
697
698 pub fn create_http_queue_guard(self: Arc<Self>, model: &str) -> HttpQueueGuard {
703 HttpQueueGuard::new(self, model.to_string().to_lowercase())
704 }
705}
706
707impl HttpQueueGuard {
708 fn new(metrics: Arc<Metrics>, model: String) -> Self {
709 metrics.inc_http_queue_gauge(&model);
711
712 HttpQueueGuard { metrics, model }
713 }
714}
715
716impl Drop for HttpQueueGuard {
717 fn drop(&mut self) {
718 self.metrics.dec_http_queue_gauge(&self.model);
720 }
721}
722
723impl InflightGuard {
724 fn new(
725 metrics: Arc<Metrics>,
726 model: String,
727 endpoint: Endpoint,
728 request_type: RequestType,
729 ) -> Self {
730 let timer = Instant::now();
732
733 metrics.inc_inflight_gauge(&model);
735
736 InflightGuard {
738 metrics,
739 model,
740 endpoint,
741 request_type,
742 status: Status::Error,
743 timer,
744 }
745 }
746
747 pub(crate) fn mark_ok(&mut self) {
748 self.status = Status::Success;
749 }
750}
751
752impl Drop for InflightGuard {
753 fn drop(&mut self) {
754 let duration = self.timer.elapsed().as_secs_f64();
755
756 self.metrics.dec_inflight_gauge(&self.model);
758
759 self.metrics.inc_request_counter(
763 &self.model,
764 &self.endpoint,
765 &self.request_type,
766 &self.status,
767 );
768
769 self.metrics
771 .request_duration
772 .with_label_values(&[&self.model])
773 .observe(duration);
774 }
775}
776
777impl std::fmt::Display for Endpoint {
778 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
779 match self {
780 Endpoint::Completions => write!(f, "completions"),
781 Endpoint::ChatCompletions => write!(f, "chat_completions"),
782 Endpoint::Embeddings => write!(f, "embeddings"),
783 Endpoint::Responses => write!(f, "responses"),
784 Endpoint::Tensor => write!(f, "tensor"),
785 }
786 }
787}
788
789impl Endpoint {
790 pub fn as_str(&self) -> &'static str {
791 match self {
792 Endpoint::Completions => "completions",
793 Endpoint::ChatCompletions => "chat_completions",
794 Endpoint::Embeddings => "embeddings",
795 Endpoint::Responses => "responses",
796 Endpoint::Tensor => "tensor",
797 }
798 }
799}
800
801impl RequestType {
802 pub fn as_str(&self) -> &'static str {
803 match self {
804 RequestType::Unary => frontend_service::request_type::UNARY,
805 RequestType::Stream => frontend_service::request_type::STREAM,
806 }
807 }
808}
809
810impl Status {
811 pub fn as_str(&self) -> &'static str {
812 match self {
813 Status::Success => frontend_service::status::SUCCESS,
814 Status::Error => frontend_service::status::ERROR,
815 }
816 }
817}
818
819impl ResponseMetricCollector {
820 fn new(metrics: Arc<Metrics>, model: String) -> Self {
821 ResponseMetricCollector {
822 metrics,
823 model,
824 is_first_token: true,
825 last_response_time: None,
826 start_time: Instant::now(),
827 osl: 0,
828 }
829 }
830
831 pub fn observe_current_osl(&mut self, osl: usize) {
833 self.osl = osl;
834 }
835
836 pub fn is_first_token(&self) -> bool {
838 self.is_first_token
839 }
840
841 pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
843 if num_tokens == 0 {
844 return;
845 }
846
847 if self.is_first_token {
848 self.is_first_token = false;
851
852 let ttft = self.start_time.elapsed().as_secs_f64();
854 self.metrics
855 .time_to_first_token
856 .with_label_values(&[&self.model])
857 .observe(ttft);
858
859 self.metrics
862 .input_sequence_length
863 .with_label_values(&[&self.model])
864 .observe(isl as f64);
865 }
866
867 let current_duration = self.start_time.elapsed();
868
869 if let Some(last_response_time) = self.last_response_time {
870 let response_duration = current_duration - last_response_time;
871 let itl = response_duration.as_secs_f64() / num_tokens as f64;
872 for _ in 0..num_tokens {
873 self.metrics
874 .inter_token_latency
875 .with_label_values(&[&self.model])
876 .observe(itl);
877 }
878 }
879
880 self.last_response_time = Some(current_duration);
881 }
882}
883
884impl Drop for ResponseMetricCollector {
885 fn drop(&mut self) {
886 self.metrics
888 .output_sequence_length
889 .with_label_values(&[&self.model])
890 .observe(self.osl as f64);
891 }
892}
893
894pub fn process_response_and_observe_metrics<T>(
900 annotated: &crate::types::Annotated<T>,
901 response_collector: &mut ResponseMetricCollector,
902 http_queue_guard: &mut Option<HttpQueueGuard>,
903) {
904 use crate::preprocessor::LLMMetricAnnotation;
905
906 if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
908 response_collector.observe_current_osl(metrics.output_tokens);
909
910 if response_collector.is_first_token()
912 && metrics.chunk_tokens > 0
913 && let Some(guard) = http_queue_guard.take()
914 {
915 drop(guard);
916 }
917
918 response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
919 }
920}
921
922pub struct EventConverter<T>(pub crate::types::Annotated<T>);
924
925impl<T> From<crate::types::Annotated<T>> for EventConverter<T> {
926 fn from(annotated: crate::types::Annotated<T>) -> Self {
927 EventConverter(annotated)
928 }
929}
930
931pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
936 annotated: EventConverter<T>,
937 response_collector: &mut ResponseMetricCollector,
938 http_queue_guard: &mut Option<HttpQueueGuard>,
939) -> Result<Event, axum::Error> {
940 use crate::preprocessor::LLMMetricAnnotation;
941
942 let mut annotated = annotated.0;
943
944 if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
946 response_collector.observe_current_osl(metrics.output_tokens);
947
948 if response_collector.is_first_token()
950 && metrics.chunk_tokens > 0
951 && let Some(guard) = http_queue_guard.take()
952 {
953 drop(guard);
954 }
955
956 response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
957
958 if annotated.event.as_deref() == Some(crate::preprocessor::ANNOTATION_LLM_METRICS) {
961 annotated.event = None;
962 annotated.comment = None;
963 }
964 }
965
966 let mut event = Event::default();
967
968 if let Some(data) = annotated.data {
969 event = event.json_data(data)?;
970 }
971
972 if let Some(msg) = annotated.event {
973 if msg == "error" {
974 let msgs = annotated
975 .comment
976 .unwrap_or_else(|| vec!["unspecified error".to_string()]);
977 return Err(axum::Error::new(msgs.join(" -- ")));
978 }
979 event = event.event(msg);
980 }
981
982 if let Some(comments) = annotated.comment {
983 for comment in comments {
984 event = event.comment(comment);
985 }
986 }
987
988 Ok(event)
989}
990
991pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
993 let registry = Arc::new(registry);
994 let path = path.unwrap_or_else(|| "/metrics".to_string());
995 let doc = RouteDoc::new(axum::http::Method::GET, &path);
996 let route = Router::new()
997 .route(&path, get(handler_metrics))
998 .with_state(registry);
999 (vec![doc], route)
1000}
1001
1002async fn handler_metrics(State(registry): State<Arc<Registry>>) -> impl IntoResponse {
1004 let encoder = prometheus::TextEncoder::new();
1005 let metric_families = registry.gather();
1006 let mut buffer = vec![];
1007 if encoder.encode(&metric_families, &mut buffer).is_err() {
1008 return (
1009 StatusCode::INTERNAL_SERVER_ERROR,
1010 "Failed to encode metrics",
1011 )
1012 .into_response();
1013 }
1014
1015 let metrics = match String::from_utf8(buffer) {
1016 Ok(metrics) => metrics,
1017 Err(_) => {
1018 return (
1019 StatusCode::INTERNAL_SERVER_ERROR,
1020 "Failed to encode metrics",
1021 )
1022 .into_response();
1023 }
1024 };
1025
1026 (StatusCode::OK, metrics).into_response()
1027}