dynamo_llm/http/service/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::{
5    Router,
6    extract::State,
7    http::StatusCode,
8    response::{IntoResponse, sse::Event},
9    routing::get,
10};
11use dynamo_runtime::metrics::prometheus_names::{
12    frontend_service, name_prefix, sanitize_frontend_prometheus_prefix,
13};
14use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
15use serde::Serialize;
16use std::{
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use crate::discovery::ModelEntry;
22use crate::local_model::runtime_config::ModelRuntimeConfig;
23use crate::model_card::{ModelDeploymentCard, ROOT_PATH as MDC_ROOT_PATH};
24use dynamo_runtime::metrics::prometheus_names::clamp_u64_to_i64;
25use dynamo_runtime::slug::Slug;
26use dynamo_runtime::storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager};
27
28pub use prometheus::Registry;
29
30use super::RouteDoc;
31
32pub struct Metrics {
33    request_counter: IntCounterVec,
34    inflight_gauge: IntGaugeVec,
35    client_disconnect_gauge: prometheus::IntGauge,
36    http_queue_gauge: IntGaugeVec,
37    request_duration: HistogramVec,
38    input_sequence_length: HistogramVec,
39    output_sequence_length: HistogramVec,
40    time_to_first_token: HistogramVec,
41    inter_token_latency: HistogramVec,
42
43    // Runtime configuration metrics. Note: Some of these metrics represent counter-like values from
44    // source systems, but are implemented as gauges because they are copied/synchronized from upstream
45    // counter values rather than being directly incremented.
46    model_total_kv_blocks: IntGaugeVec,
47    model_max_num_seqs: IntGaugeVec,
48    model_max_num_batched_tokens: IntGaugeVec,
49    model_context_length: IntGaugeVec,
50    model_kv_cache_block_size: IntGaugeVec,
51    model_migration_limit: IntGaugeVec,
52}
53
54// Inflight tracks requests from HTTP handler start until complete response is finished.
55// HTTP queue tracks requests from HTTP handler start until first token generation begins (including prefill time).
56// HTTP queue time is a subset of inflight time. For detailed explanation, see:
57// deploy/metrics/README.md - "Request Processing Flow" section
58
59/// RAII object for HTTP queue gauge
60/// Tracks requests from HTTP handler start until metrics processing begins
61pub struct HttpQueueGuard {
62    metrics: Arc<Metrics>,
63    model: String,
64}
65
66/// RAII object for inflight gauge and request counters
67/// If this object is dropped without calling `mark_ok`, then the request will increment
68/// the request counter with the `status` label with [`frontend_service::status::ERROR`]; otherwise, it will increment
69/// the counter with `status` label [`frontend_service::status::SUCCESS`]
70pub struct InflightGuard {
71    metrics: Arc<Metrics>,
72    model: String,
73    endpoint: Endpoint,
74    request_type: RequestType,
75    status: Status,
76    timer: Instant,
77}
78
79/// Requests will be logged by the type of endpoint hit
80/// This will include llamastack in the future
81pub enum Endpoint {
82    /// OAI Completions
83    Completions,
84
85    /// OAI Chat Completions
86    ChatCompletions,
87
88    /// OAI Embeddings
89    Embeddings,
90
91    /// OAI Responses
92    Responses,
93
94    /// Tensor
95    Tensor,
96}
97
98/// Metrics for the HTTP service
99pub enum RequestType {
100    /// SingleIn / SingleOut
101    Unary,
102
103    /// SingleIn / ManyOut
104    Stream,
105}
106
107/// Status
108#[derive(PartialEq)]
109pub enum Status {
110    Success,
111    Error,
112}
113
114/// Track response-specific metrics
115pub struct ResponseMetricCollector {
116    metrics: Arc<Metrics>,
117    model: String,
118    start_time: Instant,
119    // we use is_first_token to distinguish TTFT from ITL. It is true by default and
120    // flipped to false when the first token is returned and TTFT is published.
121    is_first_token: bool,
122    // we track the last response time so that ITL for the newly returned tokens can
123    // be computed.
124    last_response_time: Option<Duration>,
125    osl: usize,
126}
127
128impl Default for Metrics {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl Metrics {
135    /// Create Metrics with the standard prefix defined by [`name_prefix::FRONTEND`] or specify custom prefix via the following environment variable:
136    /// - `DYN_METRICS_PREFIX`: Override the default metrics prefix
137    ///
138    /// The following metrics will be created with the configured prefix:
139    /// - `{prefix}_requests_total` - IntCounterVec for the total number of requests processed
140    /// - `{prefix}_inflight_requests` - IntGaugeVec for the number of inflight requests
141    /// - `{prefix}_request_duration_seconds` - HistogramVec for the duration of requests
142    /// - `{prefix}_input_sequence_tokens` - HistogramVec for input sequence length in tokens
143    /// - `{prefix}_output_sequence_tokens` - HistogramVec for output sequence length in tokens
144    /// - `{prefix}_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
145    /// - `{prefix}_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
146    ///
147    /// ## Model Configuration Metrics
148    ///
149    /// Runtime config metrics (from ModelRuntimeConfig):
150    /// - `{prefix}_model_total_kv_blocks` - IntGaugeVec for total KV cache blocks available for a worker serving the model
151    /// - `{prefix}_model_max_num_seqs` - IntGaugeVec for maximum sequences for a worker serving the model
152    /// - `{prefix}_model_max_num_batched_tokens` - IntGaugeVec for maximum batched tokens for a worker serving the model
153    ///
154    /// MDC metrics (from ModelDeploymentCard):
155    /// - `{prefix}_model_context_length` - IntGaugeVec for maximum context length for a worker serving the model
156    /// - `{prefix}_model_kv_cache_block_size` - IntGaugeVec for KV cache block size for a worker serving the model
157    /// - `{prefix}_model_migration_limit` - IntGaugeVec for request migration limit for a worker serving the model
158    ///
159    /// ## Runtime Config Polling Configuration
160    ///
161    /// The polling behavior can be configured via environment variables:
162    /// - `DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS`: Poll interval in seconds (must be > 0, supports fractional seconds, defaults to 8)
163    ///
164    /// Metrics are never removed to preserve historical data. Runtime config and MDC
165    /// metrics are updated when models are discovered and their configurations are available.
166    pub fn new() -> Self {
167        let raw_prefix = std::env::var(frontend_service::METRICS_PREFIX_ENV)
168            .unwrap_or_else(|_| name_prefix::FRONTEND.to_string());
169        let prefix = sanitize_frontend_prometheus_prefix(&raw_prefix);
170        if prefix != raw_prefix {
171            tracing::warn!(
172                raw=%raw_prefix,
173                sanitized=%prefix,
174                env=%frontend_service::METRICS_PREFIX_ENV,
175                "Sanitized HTTP metrics prefix"
176            );
177        }
178        let frontend_metric_name = |suffix: &str| format!("{}_{}", &prefix, suffix);
179
180        let request_counter = IntCounterVec::new(
181            Opts::new(
182                frontend_metric_name(frontend_service::REQUESTS_TOTAL),
183                "Total number of LLM requests processed",
184            ),
185            &["model", "endpoint", "request_type", "status"],
186        )
187        .unwrap();
188
189        let inflight_gauge = IntGaugeVec::new(
190            Opts::new(
191                frontend_metric_name(frontend_service::INFLIGHT_REQUESTS_TOTAL),
192                "Number of inflight requests",
193            ),
194            &["model"],
195        )
196        .unwrap();
197
198        let client_disconnect_gauge = prometheus::IntGauge::new(
199            frontend_metric_name("client_disconnects"),
200            "Number of connections dropped by clients",
201        )
202        .unwrap();
203
204        let http_queue_gauge = IntGaugeVec::new(
205            Opts::new(
206                frontend_metric_name(frontend_service::QUEUED_REQUESTS_TOTAL),
207                "Number of requests in HTTP processing queue",
208            ),
209            &["model"],
210        )
211        .unwrap();
212
213        let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
214
215        let request_duration = HistogramVec::new(
216            HistogramOpts::new(
217                frontend_metric_name(frontend_service::REQUEST_DURATION_SECONDS),
218                "Duration of LLM requests",
219            )
220            .buckets(buckets),
221            &["model"],
222        )
223        .unwrap();
224
225        let input_sequence_length = HistogramVec::new(
226            HistogramOpts::new(
227                frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS),
228                "Input sequence length in tokens",
229            )
230            .buckets(vec![
231                0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0, 64000.0,
232                128000.0,
233            ]),
234            &["model"],
235        )
236        .unwrap();
237
238        let output_sequence_length = HistogramVec::new(
239            HistogramOpts::new(
240                frontend_metric_name(frontend_service::OUTPUT_SEQUENCE_TOKENS),
241                "Output sequence length in tokens",
242            )
243            .buckets(vec![
244                0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0,
245            ]),
246            &["model"],
247        )
248        .unwrap();
249
250        let time_to_first_token = HistogramVec::new(
251            HistogramOpts::new(
252                frontend_metric_name(frontend_service::TIME_TO_FIRST_TOKEN_SECONDS),
253                "Time to first token in seconds",
254            )
255            .buckets(vec![
256                0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0,
257                60.0, 120.0, 240.0, 480.0,
258            ]),
259            &["model"],
260        )
261        .unwrap();
262
263        let inter_token_latency = HistogramVec::new(
264            HistogramOpts::new(
265                frontend_metric_name(frontend_service::INTER_TOKEN_LATENCY_SECONDS),
266                "Inter-token latency in seconds",
267            )
268            .buckets(vec![
269                0.0, 0.001, 0.005, 0.01, 0.015, 0.02, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0,
270            ]),
271            &["model"],
272        )
273        .unwrap();
274
275        // Runtime configuration metrics
276        // Note: Some of these metrics represent counter-like values from source systems,
277        // but are implemented as gauges because they are copied/synchronized from upstream
278        // counter values rather than being directly incremented.
279        let model_total_kv_blocks = IntGaugeVec::new(
280            Opts::new(
281                frontend_metric_name(frontend_service::MODEL_TOTAL_KV_BLOCKS),
282                "Total KV cache blocks available for a worker serving the model",
283            ),
284            &["model"],
285        )
286        .unwrap();
287
288        let model_max_num_seqs = IntGaugeVec::new(
289            Opts::new(
290                frontend_metric_name(frontend_service::MODEL_MAX_NUM_SEQS),
291                "Maximum number of sequences for a worker serving the model",
292            ),
293            &["model"],
294        )
295        .unwrap();
296
297        let model_max_num_batched_tokens = IntGaugeVec::new(
298            Opts::new(
299                frontend_metric_name(frontend_service::MODEL_MAX_NUM_BATCHED_TOKENS),
300                "Maximum number of batched tokens for a worker serving the model",
301            ),
302            &["model"],
303        )
304        .unwrap();
305
306        let model_context_length = IntGaugeVec::new(
307            Opts::new(
308                frontend_metric_name(frontend_service::MODEL_CONTEXT_LENGTH),
309                "Maximum context length in tokens for a worker serving the model",
310            ),
311            &["model"],
312        )
313        .unwrap();
314
315        let model_kv_cache_block_size = IntGaugeVec::new(
316            Opts::new(
317                frontend_metric_name(frontend_service::MODEL_KV_CACHE_BLOCK_SIZE),
318                "KV cache block size in tokens for a worker serving the model",
319            ),
320            &["model"],
321        )
322        .unwrap();
323
324        let model_migration_limit = IntGaugeVec::new(
325            Opts::new(
326                frontend_metric_name(frontend_service::MODEL_MIGRATION_LIMIT),
327                "Maximum number of request migrations allowed for the model",
328            ),
329            &["model"],
330        )
331        .unwrap();
332
333        Metrics {
334            request_counter,
335            inflight_gauge,
336            client_disconnect_gauge,
337            http_queue_gauge,
338            request_duration,
339            input_sequence_length,
340            output_sequence_length,
341            time_to_first_token,
342            inter_token_latency,
343            model_total_kv_blocks,
344            model_max_num_seqs,
345            model_max_num_batched_tokens,
346            model_context_length,
347            model_kv_cache_block_size,
348            model_migration_limit,
349        }
350    }
351
352    /// Get the number of successful requests for the given dimensions:
353    /// - model
354    /// - endpoint (completions/chat_completions)
355    /// - request type (unary/stream)
356    /// - status (success/error)
357    pub fn get_request_counter(
358        &self,
359        model: &str,
360        endpoint: &Endpoint,
361        request_type: &RequestType,
362        status: &Status,
363    ) -> u64 {
364        self.request_counter
365            .with_label_values(&[
366                model,
367                endpoint.as_str(),
368                request_type.as_str(),
369                status.as_str(),
370            ])
371            .get()
372    }
373
374    /// Increment the counter for requests for the given dimensions:
375    /// - model
376    /// - endpoint (completions/chat_completions)
377    /// - request type (unary/stream)
378    /// - status (success/error)
379    fn inc_request_counter(
380        &self,
381        model: &str,
382        endpoint: &Endpoint,
383        request_type: &RequestType,
384        status: &Status,
385    ) {
386        self.request_counter
387            .with_label_values(&[
388                model,
389                endpoint.as_str(),
390                request_type.as_str(),
391                status.as_str(),
392            ])
393            .inc()
394    }
395
396    /// Get the number if inflight requests for the given model
397    pub fn get_inflight_count(&self, model: &str) -> i64 {
398        self.inflight_gauge.with_label_values(&[model]).get()
399    }
400
401    fn inc_inflight_gauge(&self, model: &str) {
402        self.inflight_gauge.with_label_values(&[model]).inc()
403    }
404
405    fn dec_inflight_gauge(&self, model: &str) {
406        self.inflight_gauge.with_label_values(&[model]).dec()
407    }
408
409    /// Increment the gauge for client disconnections
410    pub fn inc_client_disconnect(&self) {
411        self.client_disconnect_gauge.inc();
412    }
413
414    /// Get the count of client disconnections
415    pub fn get_client_disconnect_count(&self) -> i64 {
416        self.client_disconnect_gauge.get()
417    }
418
419    fn inc_http_queue_gauge(&self, model: &str) {
420        self.http_queue_gauge.with_label_values(&[model]).inc()
421    }
422
423    fn dec_http_queue_gauge(&self, model: &str) {
424        self.http_queue_gauge.with_label_values(&[model]).dec()
425    }
426
427    pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
428        registry.register(Box::new(self.request_counter.clone()))?;
429        registry.register(Box::new(self.inflight_gauge.clone()))?;
430        registry.register(Box::new(self.client_disconnect_gauge.clone()))?;
431        registry.register(Box::new(self.http_queue_gauge.clone()))?;
432        registry.register(Box::new(self.request_duration.clone()))?;
433        registry.register(Box::new(self.input_sequence_length.clone()))?;
434        registry.register(Box::new(self.output_sequence_length.clone()))?;
435        registry.register(Box::new(self.time_to_first_token.clone()))?;
436        registry.register(Box::new(self.inter_token_latency.clone()))?;
437
438        // Register runtime configuration metrics
439        registry.register(Box::new(self.model_total_kv_blocks.clone()))?;
440        registry.register(Box::new(self.model_max_num_seqs.clone()))?;
441        registry.register(Box::new(self.model_max_num_batched_tokens.clone()))?;
442        registry.register(Box::new(self.model_context_length.clone()))?;
443        registry.register(Box::new(self.model_kv_cache_block_size.clone()))?;
444        registry.register(Box::new(self.model_migration_limit.clone()))?;
445
446        Ok(())
447    }
448
449    /// Update runtime configuration metrics for a model
450    /// This should be called when model runtime configuration is available or updated
451    pub fn update_runtime_config_metrics(
452        &self,
453        model_name: &str,
454        runtime_config: &ModelRuntimeConfig,
455    ) {
456        if let Some(total_kv_blocks) = runtime_config.total_kv_blocks {
457            self.model_total_kv_blocks
458                .with_label_values(&[model_name])
459                .set(clamp_u64_to_i64(total_kv_blocks));
460        }
461
462        if let Some(max_num_seqs) = runtime_config.max_num_seqs {
463            self.model_max_num_seqs
464                .with_label_values(&[model_name])
465                .set(clamp_u64_to_i64(max_num_seqs));
466        }
467
468        if let Some(max_batched_tokens) = runtime_config.max_num_batched_tokens {
469            self.model_max_num_batched_tokens
470                .with_label_values(&[model_name])
471                .set(clamp_u64_to_i64(max_batched_tokens));
472        }
473    }
474
475    /// Update model deployment card metrics for a model
476    /// This should be called when model deployment card information is available
477    pub fn update_mdc_metrics(
478        &self,
479        model_name: &str,
480        context_length: u32,
481        kv_cache_block_size: u32,
482        migration_limit: u32,
483    ) {
484        self.model_context_length
485            .with_label_values(&[model_name])
486            .set(context_length as i64);
487
488        self.model_kv_cache_block_size
489            .with_label_values(&[model_name])
490            .set(kv_cache_block_size as i64);
491
492        self.model_migration_limit
493            .with_label_values(&[model_name])
494            .set(migration_limit as i64);
495    }
496
497    /// Update metrics from a ModelEntry
498    /// This is a convenience method that extracts runtime config from a ModelEntry
499    /// and updates the appropriate metrics
500    pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) {
501        if let Some(runtime_config) = &model_entry.runtime_config {
502            self.update_runtime_config_metrics(&model_entry.name, runtime_config);
503        }
504    }
505
506    /// Update metrics from a ModelEntry and its ModelDeploymentCard
507    /// This updates both runtime config metrics and MDC-specific metrics
508    pub async fn update_metrics_from_model_entry_with_mdc(
509        &self,
510        model_entry: &ModelEntry,
511        etcd_client: &dynamo_runtime::transports::etcd::Client,
512    ) -> anyhow::Result<()> {
513        // Update runtime config metrics
514        if let Some(runtime_config) = &model_entry.runtime_config {
515            self.update_runtime_config_metrics(&model_entry.name, runtime_config);
516        }
517
518        // Load and update MDC metrics
519        let model_slug = Slug::from_string(&model_entry.name);
520        let store: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
521        let card_store = Arc::new(KeyValueStoreManager::new(store));
522
523        match card_store
524            .load::<ModelDeploymentCard>(MDC_ROOT_PATH, &model_slug)
525            .await
526        {
527            Ok(Some(mdc)) => {
528                self.update_mdc_metrics(
529                    &model_entry.name,
530                    mdc.context_length,
531                    mdc.kv_cache_block_size,
532                    mdc.migration_limit,
533                );
534                tracing::debug!(
535                    model = %model_entry.name,
536                    "Successfully updated MDC metrics"
537                );
538            }
539            Ok(None) => {
540                tracing::debug!(
541                    model = %model_entry.name,
542                    "No MDC found in storage, skipping MDC metrics"
543                );
544            }
545            Err(e) => {
546                tracing::debug!(
547                    model = %model_entry.name,
548                    error = %e,
549                    "Failed to load MDC for metrics update"
550                );
551            }
552        }
553
554        Ok(())
555    }
556
557    /// Start a background task that periodically updates runtime config metrics
558    ///
559    /// ## Why Polling is Required
560    ///
561    /// Polling is necessary because new models may come online at any time through the distributed
562    /// discovery system. The ModelManager is continuously updated as workers register/deregister
563    /// with etcd, and we need to periodically check for these changes to expose their metrics.
564    ///
565    /// ## Behavior
566    ///
567    /// - Polls the ModelManager for current models and updates metrics accordingly
568    /// - Models are never removed from metrics to preserve historical data
569    /// - If multiple model instances have the same name, only the first instance's metrics are used
570    /// - Subsequent instances with duplicate names will be skipped
571    ///
572    /// ## MDC (Model Deployment Card) Behavior
573    ///
574    /// Currently, we don't overwrite an MDC. The first worker to start wins, and we assume
575    /// that all other workers claiming to serve that model really are using the same configuration.
576    /// Later, every worker will have its own MDC, and the frontend will validate that they
577    /// checksum the same. For right now, you can assume they have the same MDC, because
578    /// they aren't allowed to change it.
579    ///
580    /// The task will run until the provided cancellation token is cancelled.
581    pub fn start_runtime_config_polling_task(
582        metrics: Arc<Self>,
583        manager: Arc<crate::discovery::ModelManager>,
584        etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
585        poll_interval: Duration,
586        cancel_token: tokio_util::sync::CancellationToken,
587    ) -> tokio::task::JoinHandle<()> {
588        tokio::spawn(async move {
589            let mut interval = tokio::time::interval(poll_interval);
590            let mut known_models = std::collections::HashSet::new();
591
592            tracing::info!(
593                interval_secs = poll_interval.as_secs(),
594                "Starting runtime config metrics polling task (metrics never removed)"
595            );
596
597            loop {
598                tokio::select! {
599                    _ = cancel_token.cancelled() => {
600                        tracing::info!("Runtime config metrics polling task cancelled");
601                        break;
602                    }
603                    _ = interval.tick() => {
604                        // Continue with polling logic
605                    }
606                }
607
608                // Get current model entries from the manager
609                let current_entries = manager.get_model_entries();
610                let mut current_models = std::collections::HashSet::new();
611
612                // Note: If multiple model instances have the same name, only the first instance's config metrics are recorded.
613                // Subsequent instances with duplicate names will be skipped for config updates.
614                // This is based on the assumption that all workers serving the same model have identical
615                // configuration values (MDC content, runtime config, etc.). This assumption holds because
616                // workers are not allowed to change their configuration after registration.
617
618                // Update configuration metrics for current models
619                for entry in current_entries {
620                    // Skip config processing if we've already seen this model name
621                    if !current_models.insert(entry.name.clone()) {
622                        tracing::debug!(
623                            model_name = %entry.name,
624                            endpoint = ?entry.endpoint_id,
625                            "Skipping duplicate model instance - only first instance config metrics are recorded"
626                        );
627                        continue;
628                    }
629
630                    // Update runtime config metrics if available
631                    if let Some(runtime_config) = &entry.runtime_config {
632                        metrics.update_runtime_config_metrics(&entry.name, runtime_config);
633                    }
634
635                    // Optionally load MDC for additional metrics if etcd is available
636                    if let Some(ref etcd) = etcd_client
637                        && let Err(e) = metrics
638                            .update_metrics_from_model_entry_with_mdc(&entry, etcd)
639                            .await
640                    {
641                        tracing::debug!(
642                            model = %entry.name,
643                            error = %e,
644                            "Failed to update MDC metrics (this is normal if MDC is not available)"
645                        );
646                    }
647                }
648
649                // Update our known models set
650                known_models.extend(current_models.iter().cloned());
651
652                tracing::trace!(
653                    active_models = current_models.len(),
654                    total_known_models = known_models.len(),
655                    "Updated runtime config metrics for active models"
656                );
657            }
658        })
659    }
660
661    /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
662    /// and the kind of endpoint that was hit
663    ///
664    /// The [`InflightGuard`] is an RAII object will handle incrementing the inflight gauge and
665    /// request counters.
666    ///
667    /// # Metrics Distinction
668    ///
669    /// This method creates an inflight guard  t tracks requests actively being processed by the LLM engine.
670    /// This is distinct from [`HttpQueueGuard`] which tracks requests from HTTP handler start until
671    /// first token generation (including prefill time). The separation allows monitoring both HTTP processing queue time
672    /// and actual LLM processing time.
673    pub fn create_inflight_guard(
674        self: Arc<Self>,
675        model: &str,
676        endpoint: Endpoint,
677        streaming: bool,
678    ) -> InflightGuard {
679        let request_type = if streaming {
680            RequestType::Stream
681        } else {
682            RequestType::Unary
683        };
684
685        InflightGuard::new(
686            self.clone(),
687            model.to_string().to_lowercase(),
688            endpoint,
689            request_type,
690        )
691    }
692
693    /// Create a new [`ResponseMetricCollector`] for collecting per-response metrics (i.e., TTFT, ITL)
694    pub fn create_response_collector(self: Arc<Self>, model: &str) -> ResponseMetricCollector {
695        ResponseMetricCollector::new(self, model.to_string().to_lowercase())
696    }
697
698    /// Create a new [`HttpQueueGuard`] for tracking HTTP processing queue
699    ///
700    /// This guard tracks requests from HTTP handler start until first token generation,
701    /// providing visibility into HTTP processing queue time before actual LLM processing begins.
702    pub fn create_http_queue_guard(self: Arc<Self>, model: &str) -> HttpQueueGuard {
703        HttpQueueGuard::new(self, model.to_string().to_lowercase())
704    }
705}
706
707impl HttpQueueGuard {
708    fn new(metrics: Arc<Metrics>, model: String) -> Self {
709        // Increment the HTTP queue gauge when the guard is created
710        metrics.inc_http_queue_gauge(&model);
711
712        HttpQueueGuard { metrics, model }
713    }
714}
715
716impl Drop for HttpQueueGuard {
717    fn drop(&mut self) {
718        // Decrement the HTTP queue gauge when the guard is dropped
719        self.metrics.dec_http_queue_gauge(&self.model);
720    }
721}
722
723impl InflightGuard {
724    fn new(
725        metrics: Arc<Metrics>,
726        model: String,
727        endpoint: Endpoint,
728        request_type: RequestType,
729    ) -> Self {
730        // Start the timer
731        let timer = Instant::now();
732
733        // Increment the inflight gauge when the guard is created
734        metrics.inc_inflight_gauge(&model);
735
736        // Return the RAII Guard
737        InflightGuard {
738            metrics,
739            model,
740            endpoint,
741            request_type,
742            status: Status::Error,
743            timer,
744        }
745    }
746
747    pub(crate) fn mark_ok(&mut self) {
748        self.status = Status::Success;
749    }
750}
751
752impl Drop for InflightGuard {
753    fn drop(&mut self) {
754        let duration = self.timer.elapsed().as_secs_f64();
755
756        // Decrement the gauge when the guard is dropped
757        self.metrics.dec_inflight_gauge(&self.model);
758
759        // the frequency on incrementing the full request counter is relatively low
760        // if we were incrementing the counter on every forward pass, we'd use static CounterVec or
761        // discrete counter object without the more costly lookup required for the following calls
762        self.metrics.inc_request_counter(
763            &self.model,
764            &self.endpoint,
765            &self.request_type,
766            &self.status,
767        );
768
769        // Record the duration of the request
770        self.metrics
771            .request_duration
772            .with_label_values(&[&self.model])
773            .observe(duration);
774    }
775}
776
777impl std::fmt::Display for Endpoint {
778    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
779        match self {
780            Endpoint::Completions => write!(f, "completions"),
781            Endpoint::ChatCompletions => write!(f, "chat_completions"),
782            Endpoint::Embeddings => write!(f, "embeddings"),
783            Endpoint::Responses => write!(f, "responses"),
784            Endpoint::Tensor => write!(f, "tensor"),
785        }
786    }
787}
788
789impl Endpoint {
790    pub fn as_str(&self) -> &'static str {
791        match self {
792            Endpoint::Completions => "completions",
793            Endpoint::ChatCompletions => "chat_completions",
794            Endpoint::Embeddings => "embeddings",
795            Endpoint::Responses => "responses",
796            Endpoint::Tensor => "tensor",
797        }
798    }
799}
800
801impl RequestType {
802    pub fn as_str(&self) -> &'static str {
803        match self {
804            RequestType::Unary => frontend_service::request_type::UNARY,
805            RequestType::Stream => frontend_service::request_type::STREAM,
806        }
807    }
808}
809
810impl Status {
811    pub fn as_str(&self) -> &'static str {
812        match self {
813            Status::Success => frontend_service::status::SUCCESS,
814            Status::Error => frontend_service::status::ERROR,
815        }
816    }
817}
818
819impl ResponseMetricCollector {
820    fn new(metrics: Arc<Metrics>, model: String) -> Self {
821        ResponseMetricCollector {
822            metrics,
823            model,
824            is_first_token: true,
825            last_response_time: None,
826            start_time: Instant::now(),
827            osl: 0,
828        }
829    }
830
831    /// Observe the current output sequence length
832    pub fn observe_current_osl(&mut self, osl: usize) {
833        self.osl = osl;
834    }
835
836    /// Check if this will be the first token (before calling observe_response)
837    pub fn is_first_token(&self) -> bool {
838        self.is_first_token
839    }
840
841    /// Observe a response with input sequence length and number of new tokens
842    pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
843        if num_tokens == 0 {
844            return;
845        }
846
847        if self.is_first_token {
848            // NOTE: when there are multiple tokens in the first response,
849            // we use the full response time as TTFT and ignore the ITL
850            self.is_first_token = false;
851
852            // Publish TTFT
853            let ttft = self.start_time.elapsed().as_secs_f64();
854            self.metrics
855                .time_to_first_token
856                .with_label_values(&[&self.model])
857                .observe(ttft);
858
859            // Publish ISL
860            // TODO: publish ISL as soon as the tokenization process completes
861            self.metrics
862                .input_sequence_length
863                .with_label_values(&[&self.model])
864                .observe(isl as f64);
865        }
866
867        let current_duration = self.start_time.elapsed();
868
869        if let Some(last_response_time) = self.last_response_time {
870            let response_duration = current_duration - last_response_time;
871            let itl = response_duration.as_secs_f64() / num_tokens as f64;
872            for _ in 0..num_tokens {
873                self.metrics
874                    .inter_token_latency
875                    .with_label_values(&[&self.model])
876                    .observe(itl);
877            }
878        }
879
880        self.last_response_time = Some(current_duration);
881    }
882}
883
884impl Drop for ResponseMetricCollector {
885    fn drop(&mut self) {
886        // Publish final OSL when the collector is dropped
887        self.metrics
888            .output_sequence_length
889            .with_label_values(&[&self.model])
890            .observe(self.osl as f64);
891    }
892}
893
894/// Process streaming metrics for annotated responses
895///
896/// This function handles metrics collection and http_queue_guard management for streaming responses.
897/// It observes the current output sequence length, drops the http_queue_guard on the first token,
898/// and records response metrics.
899pub fn process_response_and_observe_metrics<T>(
900    annotated: &crate::types::Annotated<T>,
901    response_collector: &mut ResponseMetricCollector,
902    http_queue_guard: &mut Option<HttpQueueGuard>,
903) {
904    use crate::preprocessor::LLMMetricAnnotation;
905
906    // update metrics
907    if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
908        response_collector.observe_current_osl(metrics.output_tokens);
909
910        // Drop http_queue_guard on first token for non-streaming (same as streaming)
911        if response_collector.is_first_token()
912            && metrics.chunk_tokens > 0
913            && let Some(guard) = http_queue_guard.take()
914        {
915            drop(guard);
916        }
917
918        response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
919    }
920}
921
922/// Event converter wrapper for streaming responses
923pub struct EventConverter<T>(pub crate::types::Annotated<T>);
924
925impl<T> From<crate::types::Annotated<T>> for EventConverter<T> {
926    fn from(annotated: crate::types::Annotated<T>) -> Self {
927        EventConverter(annotated)
928    }
929}
930
931/// Process streaming response with event conversion for SSE
932///
933/// This function handles metrics collection, http_queue_guard management, and converts
934/// annotated responses to SSE events for streaming responses.
935pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
936    annotated: EventConverter<T>,
937    response_collector: &mut ResponseMetricCollector,
938    http_queue_guard: &mut Option<HttpQueueGuard>,
939) -> Result<Event, axum::Error> {
940    use crate::preprocessor::LLMMetricAnnotation;
941
942    let mut annotated = annotated.0;
943
944    // update metrics
945    if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
946        response_collector.observe_current_osl(metrics.output_tokens);
947
948        // Drop http_queue_guard on first token for streaming
949        if response_collector.is_first_token()
950            && metrics.chunk_tokens > 0
951            && let Some(guard) = http_queue_guard.take()
952        {
953            drop(guard);
954        }
955
956        response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
957
958        // Chomp the LLMMetricAnnotation so it's not returned in the response stream
959        // TODO: add a flag to control what is returned in the SSE stream
960        if annotated.event.as_deref() == Some(crate::preprocessor::ANNOTATION_LLM_METRICS) {
961            annotated.event = None;
962            annotated.comment = None;
963        }
964    }
965
966    let mut event = Event::default();
967
968    if let Some(data) = annotated.data {
969        event = event.json_data(data)?;
970    }
971
972    if let Some(msg) = annotated.event {
973        if msg == "error" {
974            let msgs = annotated
975                .comment
976                .unwrap_or_else(|| vec!["unspecified error".to_string()]);
977            return Err(axum::Error::new(msgs.join(" -- ")));
978        }
979        event = event.event(msg);
980    }
981
982    if let Some(comments) = annotated.comment {
983        for comment in comments {
984            event = event.comment(comment);
985        }
986    }
987
988    Ok(event)
989}
990
991/// Create a new router with the given path
992pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
993    let registry = Arc::new(registry);
994    let path = path.unwrap_or_else(|| "/metrics".to_string());
995    let doc = RouteDoc::new(axum::http::Method::GET, &path);
996    let route = Router::new()
997        .route(&path, get(handler_metrics))
998        .with_state(registry);
999    (vec![doc], route)
1000}
1001
1002/// Metrics Handler
1003async fn handler_metrics(State(registry): State<Arc<Registry>>) -> impl IntoResponse {
1004    let encoder = prometheus::TextEncoder::new();
1005    let metric_families = registry.gather();
1006    let mut buffer = vec![];
1007    if encoder.encode(&metric_families, &mut buffer).is_err() {
1008        return (
1009            StatusCode::INTERNAL_SERVER_ERROR,
1010            "Failed to encode metrics",
1011        )
1012            .into_response();
1013    }
1014
1015    let metrics = match String::from_utf8(buffer) {
1016        Ok(metrics) => metrics,
1017        Err(_) => {
1018            return (
1019                StatusCode::INTERNAL_SERVER_ERROR,
1020                "Failed to encode metrics",
1021            )
1022                .into_response();
1023        }
1024    };
1025
1026    (StatusCode::OK, metrics).into_response()
1027}