1use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router};
17use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
18use std::{sync::Arc, time::Instant};
19
20pub use prometheus::Registry;
21
22use super::{DeploymentState, RouteDoc};
23
24pub const REQUEST_STATUS_SUCCESS: &str = "success";
26
27pub const REQUEST_STATUS_ERROR: &str = "error";
29
30pub const REQUEST_TYPE_STREAM: &str = "stream";
32
33pub const REQUEST_TYPE_UNARY: &str = "unary";
35
36pub struct Metrics {
37 request_counter: IntCounterVec,
38 inflight_gauge: IntGaugeVec,
39 request_duration: HistogramVec,
40}
41
42pub struct InflightGuard {
47 metrics: Arc<Metrics>,
48 model: String,
49 endpoint: Endpoint,
50 request_type: RequestType,
51 status: Status,
52 timer: Instant,
53}
54
55pub enum Endpoint {
58 Completions,
60
61 ChatCompletions,
63}
64
65pub enum RequestType {
67 Unary,
69
70 Stream,
72}
73
74pub enum Status {
76 Success,
77 Error,
78}
79
80impl Default for Metrics {
81 fn default() -> Self {
82 Self::new("nv_llm")
83 }
84}
85
86impl Metrics {
87 pub fn new(prefix: &str) -> Self {
93 let request_counter = IntCounterVec::new(
94 Opts::new(
95 format!("{}_http_service_requests_total", prefix),
96 "Total number of LLM requests processed",
97 ),
98 &["model", "endpoint", "request_type", "status"],
99 )
100 .unwrap();
101
102 let inflight_gauge = IntGaugeVec::new(
103 Opts::new(
104 format!("{}_http_service_inflight_requests", prefix),
105 "Number of inflight requests",
106 ),
107 &["model"],
108 )
109 .unwrap();
110
111 let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
112
113 let request_duration = HistogramVec::new(
114 HistogramOpts::new(
115 format!("{}_http_service_request_duration_seconds", prefix),
116 "Duration of LLM requests",
117 )
118 .buckets(buckets),
119 &["model"],
120 )
121 .unwrap();
122
123 Metrics {
124 request_counter,
125 inflight_gauge,
126 request_duration,
127 }
128 }
129
130 pub fn get_request_counter(
136 &self,
137 model: &str,
138 endpoint: &Endpoint,
139 request_type: &RequestType,
140 status: &Status,
141 ) -> u64 {
142 self.request_counter
143 .with_label_values(&[
144 model,
145 endpoint.as_str(),
146 request_type.as_str(),
147 status.as_str(),
148 ])
149 .get()
150 }
151
152 fn inc_request_counter(
158 &self,
159 model: &str,
160 endpoint: &Endpoint,
161 request_type: &RequestType,
162 status: &Status,
163 ) {
164 self.request_counter
165 .with_label_values(&[
166 model,
167 endpoint.as_str(),
168 request_type.as_str(),
169 status.as_str(),
170 ])
171 .inc()
172 }
173
174 pub fn get_inflight_count(&self, model: &str) -> i64 {
176 self.inflight_gauge.with_label_values(&[model]).get()
177 }
178
179 fn inc_inflight_gauge(&self, model: &str) {
180 self.inflight_gauge.with_label_values(&[model]).inc()
181 }
182
183 fn dec_inflight_gauge(&self, model: &str) {
184 self.inflight_gauge.with_label_values(&[model]).dec()
185 }
186
187 pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
188 registry.register(Box::new(self.request_counter.clone()))?;
189 registry.register(Box::new(self.inflight_gauge.clone()))?;
190 registry.register(Box::new(self.request_duration.clone()))?;
191 Ok(())
192 }
193}
194
195impl DeploymentState {
196 pub fn create_inflight_guard(
202 &self,
203 model: &str,
204 endpoint: Endpoint,
205 streaming: bool,
206 ) -> InflightGuard {
207 let request_type = if streaming {
208 RequestType::Stream
209 } else {
210 RequestType::Unary
211 };
212
213 InflightGuard::new(
214 self.metrics.clone(),
215 model.to_string(),
216 endpoint,
217 request_type,
218 )
219 }
220}
221
222impl InflightGuard {
223 fn new(
224 metrics: Arc<Metrics>,
225 model: String,
226 endpoint: Endpoint,
227 request_type: RequestType,
228 ) -> Self {
229 let timer = Instant::now();
231
232 metrics.inc_inflight_gauge(&model);
234
235 InflightGuard {
237 metrics,
238 model,
239 endpoint,
240 request_type,
241 status: Status::Error,
242 timer,
243 }
244 }
245
246 pub(crate) fn mark_ok(&mut self) {
247 self.status = Status::Success;
248 }
249}
250
251impl Drop for InflightGuard {
252 fn drop(&mut self) {
253 self.metrics.dec_inflight_gauge(&self.model);
255
256 self.metrics.inc_request_counter(
260 &self.model,
261 &self.endpoint,
262 &self.request_type,
263 &self.status,
264 );
265
266 self.metrics
268 .request_duration
269 .with_label_values(&[&self.model])
270 .observe(self.timer.elapsed().as_secs_f64());
271 }
272}
273
274impl std::fmt::Display for Endpoint {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Endpoint::Completions => write!(f, "completions"),
278 Endpoint::ChatCompletions => write!(f, "chat_completions"),
279 }
280 }
281}
282
283impl Endpoint {
284 pub fn as_str(&self) -> &'static str {
285 match self {
286 Endpoint::Completions => "completions",
287 Endpoint::ChatCompletions => "chat_completions",
288 }
289 }
290}
291
292impl RequestType {
293 pub fn as_str(&self) -> &'static str {
294 match self {
295 RequestType::Unary => REQUEST_TYPE_UNARY,
296 RequestType::Stream => REQUEST_TYPE_STREAM,
297 }
298 }
299}
300
301impl Status {
302 pub fn as_str(&self) -> &'static str {
303 match self {
304 Status::Success => REQUEST_STATUS_SUCCESS,
305 Status::Error => REQUEST_STATUS_ERROR,
306 }
307 }
308}
309
310pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
312 let registry = Arc::new(registry);
313 let path = path.unwrap_or_else(|| "/metrics".to_string());
314 let doc = RouteDoc::new(axum::http::Method::GET, &path);
315 let route = Router::new()
316 .route(&path, get(handler_metrics))
317 .with_state(registry);
318 (vec![doc], route)
319}
320
321async fn handler_metrics(State(registry): State<Arc<Registry>>) -> impl IntoResponse {
323 let encoder = prometheus::TextEncoder::new();
324 let metric_families = registry.gather();
325 let mut buffer = vec![];
326 if encoder.encode(&metric_families, &mut buffer).is_err() {
327 return (
328 StatusCode::INTERNAL_SERVER_ERROR,
329 "Failed to encode metrics",
330 )
331 .into_response();
332 }
333
334 let metrics = match String::from_utf8(buffer) {
335 Ok(metrics) => metrics,
336 Err(_) => {
337 return (
338 StatusCode::INTERNAL_SERVER_ERROR,
339 "Failed to encode metrics",
340 )
341 .into_response()
342 }
343 };
344
345 (StatusCode::OK, metrics).into_response()
346}