dynamo_llm/http/service/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router};
17use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
18use std::{sync::Arc, time::Instant};
19
20pub use prometheus::Registry;
21
22use super::{DeploymentState, RouteDoc};
23
24/// Value for the `status` label in the request counter for successful requests
25pub const REQUEST_STATUS_SUCCESS: &str = "success";
26
27/// Value for the `status` label in the request counter if the request failed
28pub const REQUEST_STATUS_ERROR: &str = "error";
29
30/// Partial value for the `type` label in the request counter for streaming requests
31pub const REQUEST_TYPE_STREAM: &str = "stream";
32
33/// Partial value for the `type` label in the request counter for unary requests
34pub const REQUEST_TYPE_UNARY: &str = "unary";
35
36pub struct Metrics {
37    request_counter: IntCounterVec,
38    inflight_gauge: IntGaugeVec,
39    request_duration: HistogramVec,
40}
41
42/// RAII object for inflight gauge and request counters
43/// If this object is dropped without calling `mark_ok`, then the request will increment
44/// the request counter with the `status` label with [`REQUEST_STATUS_ERROR`]; otherwise, it will increment
45/// the counter with `status` label [`REQUEST_STATUS_SUCCESS`]
46pub struct InflightGuard {
47    metrics: Arc<Metrics>,
48    model: String,
49    endpoint: Endpoint,
50    request_type: RequestType,
51    status: Status,
52    timer: Instant,
53}
54
55/// Requests will be logged by the type of endpoint hit
56/// This will include llamastack in the future
57pub enum Endpoint {
58    /// OAI Completions
59    Completions,
60
61    /// OAI Chat Completions
62    ChatCompletions,
63}
64
65/// Metrics for the HTTP service
66pub enum RequestType {
67    /// SingleIn / SingleOut
68    Unary,
69
70    /// SingleIn / ManyOut
71    Stream,
72}
73
74/// Status
75pub enum Status {
76    Success,
77    Error,
78}
79
80impl Default for Metrics {
81    fn default() -> Self {
82        Self::new("nv_llm")
83    }
84}
85
86impl Metrics {
87    /// Create Metrics with the given prefix
88    /// The following metrics will be created:
89    /// - `{prefix}_http_service_requests_total` - IntCounterVec for the total number of requests processed
90    /// - `{prefix}_http_service_inflight_requests` - IntGaugeVec for the number of inflight requests
91    /// - `{prefix}_http_service_request_duration_seconds` - HistogramVec for the duration of requests
92    pub fn new(prefix: &str) -> Self {
93        let request_counter = IntCounterVec::new(
94            Opts::new(
95                format!("{}_http_service_requests_total", prefix),
96                "Total number of LLM requests processed",
97            ),
98            &["model", "endpoint", "request_type", "status"],
99        )
100        .unwrap();
101
102        let inflight_gauge = IntGaugeVec::new(
103            Opts::new(
104                format!("{}_http_service_inflight_requests", prefix),
105                "Number of inflight requests",
106            ),
107            &["model"],
108        )
109        .unwrap();
110
111        let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
112
113        let request_duration = HistogramVec::new(
114            HistogramOpts::new(
115                format!("{}_http_service_request_duration_seconds", prefix),
116                "Duration of LLM requests",
117            )
118            .buckets(buckets),
119            &["model"],
120        )
121        .unwrap();
122
123        Metrics {
124            request_counter,
125            inflight_gauge,
126            request_duration,
127        }
128    }
129
130    /// Get the number of successful requests for the given dimensions:
131    /// - model
132    /// - endpoint (completions/chat_completions)
133    /// - request type (unary/stream)
134    /// - status (success/error)
135    pub fn get_request_counter(
136        &self,
137        model: &str,
138        endpoint: &Endpoint,
139        request_type: &RequestType,
140        status: &Status,
141    ) -> u64 {
142        self.request_counter
143            .with_label_values(&[
144                model,
145                endpoint.as_str(),
146                request_type.as_str(),
147                status.as_str(),
148            ])
149            .get()
150    }
151
152    /// Increment the counter for requests for the given dimensions:
153    /// - model
154    /// - endpoint (completions/chat_completions)
155    /// - request type (unary/stream)
156    /// - status (success/error)
157    fn inc_request_counter(
158        &self,
159        model: &str,
160        endpoint: &Endpoint,
161        request_type: &RequestType,
162        status: &Status,
163    ) {
164        self.request_counter
165            .with_label_values(&[
166                model,
167                endpoint.as_str(),
168                request_type.as_str(),
169                status.as_str(),
170            ])
171            .inc()
172    }
173
174    /// Get the number if inflight requests for the given model
175    pub fn get_inflight_count(&self, model: &str) -> i64 {
176        self.inflight_gauge.with_label_values(&[model]).get()
177    }
178
179    fn inc_inflight_gauge(&self, model: &str) {
180        self.inflight_gauge.with_label_values(&[model]).inc()
181    }
182
183    fn dec_inflight_gauge(&self, model: &str) {
184        self.inflight_gauge.with_label_values(&[model]).dec()
185    }
186
187    pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
188        registry.register(Box::new(self.request_counter.clone()))?;
189        registry.register(Box::new(self.inflight_gauge.clone()))?;
190        registry.register(Box::new(self.request_duration.clone()))?;
191        Ok(())
192    }
193}
194
195impl DeploymentState {
196    /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
197    /// and the kind of endpoint that was hit
198    ///
199    /// The [`InflightGuard`] is an RAII object will handle incrementing the inflight gauge and
200    /// request counters.
201    pub fn create_inflight_guard(
202        &self,
203        model: &str,
204        endpoint: Endpoint,
205        streaming: bool,
206    ) -> InflightGuard {
207        let request_type = if streaming {
208            RequestType::Stream
209        } else {
210            RequestType::Unary
211        };
212
213        InflightGuard::new(
214            self.metrics.clone(),
215            model.to_string(),
216            endpoint,
217            request_type,
218        )
219    }
220}
221
222impl InflightGuard {
223    fn new(
224        metrics: Arc<Metrics>,
225        model: String,
226        endpoint: Endpoint,
227        request_type: RequestType,
228    ) -> Self {
229        // Start the timer
230        let timer = Instant::now();
231
232        // Increment the inflight gauge when the guard is created
233        metrics.inc_inflight_gauge(&model);
234
235        // Return the RAII Guard
236        InflightGuard {
237            metrics,
238            model,
239            endpoint,
240            request_type,
241            status: Status::Error,
242            timer,
243        }
244    }
245
246    pub(crate) fn mark_ok(&mut self) {
247        self.status = Status::Success;
248    }
249}
250
251impl Drop for InflightGuard {
252    fn drop(&mut self) {
253        // Decrement the gauge when the guard is dropped
254        self.metrics.dec_inflight_gauge(&self.model);
255
256        // the frequency on incrementing the full request counter is relatively low
257        // if we were incrementing the counter on every forward pass, we'd use static CounterVec or
258        // discrete counter object without the more costly lookup required for the following calls
259        self.metrics.inc_request_counter(
260            &self.model,
261            &self.endpoint,
262            &self.request_type,
263            &self.status,
264        );
265
266        // Record the duration of the request
267        self.metrics
268            .request_duration
269            .with_label_values(&[&self.model])
270            .observe(self.timer.elapsed().as_secs_f64());
271    }
272}
273
274impl std::fmt::Display for Endpoint {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Endpoint::Completions => write!(f, "completions"),
278            Endpoint::ChatCompletions => write!(f, "chat_completions"),
279        }
280    }
281}
282
283impl Endpoint {
284    pub fn as_str(&self) -> &'static str {
285        match self {
286            Endpoint::Completions => "completions",
287            Endpoint::ChatCompletions => "chat_completions",
288        }
289    }
290}
291
292impl RequestType {
293    pub fn as_str(&self) -> &'static str {
294        match self {
295            RequestType::Unary => REQUEST_TYPE_UNARY,
296            RequestType::Stream => REQUEST_TYPE_STREAM,
297        }
298    }
299}
300
301impl Status {
302    pub fn as_str(&self) -> &'static str {
303        match self {
304            Status::Success => REQUEST_STATUS_SUCCESS,
305            Status::Error => REQUEST_STATUS_ERROR,
306        }
307    }
308}
309
310/// Create a new router with the given path
311pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
312    let registry = Arc::new(registry);
313    let path = path.unwrap_or_else(|| "/metrics".to_string());
314    let doc = RouteDoc::new(axum::http::Method::GET, &path);
315    let route = Router::new()
316        .route(&path, get(handler_metrics))
317        .with_state(registry);
318    (vec![doc], route)
319}
320
321/// Metrics Handler
322async fn handler_metrics(State(registry): State<Arc<Registry>>) -> impl IntoResponse {
323    let encoder = prometheus::TextEncoder::new();
324    let metric_families = registry.gather();
325    let mut buffer = vec![];
326    if encoder.encode(&metric_families, &mut buffer).is_err() {
327        return (
328            StatusCode::INTERNAL_SERVER_ERROR,
329            "Failed to encode metrics",
330        )
331            .into_response();
332    }
333
334    let metrics = match String::from_utf8(buffer) {
335        Ok(metrics) => metrics,
336        Err(_) => {
337            return (
338                StatusCode::INTERNAL_SERVER_ERROR,
339                "Failed to encode metrics",
340            )
341                .into_response()
342        }
343    };
344
345    (StatusCode::OK, metrics).into_response()
346}