dynamo_llm/http/
service.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HTTP Service for Nova LLM
17//!
18//! The primary purpose of this crate is to service the nova-llm-protocols via OpenAI compatible HTTP endpoints. This component
19//! is meant to be a gateway/ingress into the Nova LLM Distributed Runtime.
20//!
21//! In order to create a common pattern, the HttpService forwards the incoming OAI Chat Request or OAI Completion Request to the
22//! to a model-specific engines.  The engines can be attached and detached dynamically using the [`ModelManager`].
23//!
24//! Note: All requests, whether the client requests `stream=true` or `stream=false`, are propagated downstream as `stream=true`.
25//! This enables use to handle only 1 pattern of request-response in the downstream services. Non-streaming user requests are
26//! aggregated by the HttpService and returned as a single response.
27//!
28//! TODO(): Add support for model-specific metadata and status. Status will allow us to return a 503 when the model is supposed
29//! to be ready, but there is a problem with the model.
30//!
31//! The [`service_v2::HttpService`] can be further extended to host any [`axum::Router`] using the [`service_v2::HttpServiceConfigBuilder`].
32
33mod openai;
34
35pub mod discovery;
36pub mod error;
37pub mod metrics;
38pub mod service_v2;
39
40// #[cfg(feature = "py3")]
41// pub mod py3;
42
43pub use async_trait::async_trait;
44pub use axum;
45pub use error::ServiceHttpError;
46pub use metrics::Metrics;
47
48use crate::types::openai::{
49    chat_completions::OpenAIChatCompletionsStreamingEngine,
50    completions::OpenAICompletionsStreamingEngine,
51};
52use std::{
53    collections::HashMap,
54    sync::{Arc, Mutex},
55    time::Duration,
56};
57
58#[derive(Clone)]
59pub struct ModelManager {
60    state: Arc<DeploymentState>,
61}
62
63impl Default for ModelManager {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl ModelManager {
70    pub fn new() -> Self {
71        let state = Arc::new(DeploymentState::new());
72        Self { state }
73    }
74
75    pub fn state(&self) -> Arc<DeploymentState> {
76        self.state.clone()
77    }
78
79    pub fn has_model_any(&self, model: &str) -> bool {
80        self.state
81            .chat_completion_engines
82            .lock()
83            .unwrap()
84            .contains(model)
85            || self
86                .state
87                .completion_engines
88                .lock()
89                .unwrap()
90                .contains(model)
91    }
92
93    pub fn list_chat_completions_models(&self) -> Vec<String> {
94        self.state.chat_completion_engines.lock().unwrap().list()
95    }
96
97    pub fn list_completions_models(&self) -> Vec<String> {
98        self.state.completion_engines.lock().unwrap().list()
99    }
100
101    pub fn add_completions_model(
102        &self,
103        model: &str,
104        engine: OpenAICompletionsStreamingEngine,
105    ) -> Result<(), ServiceHttpError> {
106        let mut clients = self.state.completion_engines.lock().unwrap();
107        clients.add(model, engine)
108    }
109
110    pub fn add_chat_completions_model(
111        &self,
112        model: &str,
113        engine: OpenAIChatCompletionsStreamingEngine,
114    ) -> Result<(), ServiceHttpError> {
115        let mut clients = self.state.chat_completion_engines.lock().unwrap();
116        clients.add(model, engine)
117    }
118
119    pub fn remove_completions_model(&self, model: &str) -> Result<(), ServiceHttpError> {
120        let mut clients = self.state.completion_engines.lock().unwrap();
121        clients.remove(model)
122    }
123
124    pub fn remove_chat_completions_model(&self, model: &str) -> Result<(), ServiceHttpError> {
125        let mut clients = self.state.chat_completion_engines.lock().unwrap();
126        clients.remove(model)
127    }
128
129    /// Get the Prometheus [`Metrics`] object which tracks request counts and inflight requests
130    pub fn metrics(&self) -> Arc<Metrics> {
131        self.state.metrics.clone()
132    }
133}
134
135struct ModelEngines<E> {
136    /// Optional default model name
137    default: Option<String>,
138    engines: HashMap<String, E>,
139}
140
141impl<E> Default for ModelEngines<E> {
142    fn default() -> Self {
143        Self {
144            default: None,
145            engines: HashMap::new(),
146        }
147    }
148}
149
150impl<E> ModelEngines<E> {
151    #[allow(dead_code)]
152    fn set_default(&mut self, model: &str) {
153        self.default = Some(model.to_string());
154    }
155
156    #[allow(dead_code)]
157    fn clear_default(&mut self) {
158        self.default = None;
159    }
160
161    fn add(&mut self, model: &str, engine: E) -> Result<(), ServiceHttpError> {
162        if self.engines.contains_key(model) {
163            return Err(ServiceHttpError::ModelAlreadyExists(model.to_string()));
164        }
165        self.engines.insert(model.to_string(), engine);
166        Ok(())
167    }
168
169    fn remove(&mut self, model: &str) -> Result<(), ServiceHttpError> {
170        if self.engines.remove(model).is_none() {
171            return Err(ServiceHttpError::ModelNotFound(model.to_string()));
172        }
173        Ok(())
174    }
175
176    fn get(&self, model: &str) -> Option<&E> {
177        self.engines.get(model)
178    }
179
180    fn contains(&self, model: &str) -> bool {
181        self.engines.contains_key(model)
182    }
183
184    fn list(&self) -> Vec<String> {
185        self.engines.keys().map(|k| k.to_owned()).collect()
186    }
187}
188
189/// The DeploymentState is a global state that is shared across all the workers
190/// this provides set of known clients to Engines
191pub struct DeploymentState {
192    completion_engines: Arc<Mutex<ModelEngines<OpenAICompletionsStreamingEngine>>>,
193    chat_completion_engines: Arc<Mutex<ModelEngines<OpenAIChatCompletionsStreamingEngine>>>,
194    metrics: Arc<Metrics>,
195    sse_keep_alive: Option<Duration>,
196}
197
198impl DeploymentState {
199    fn new() -> Self {
200        Self {
201            completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
202            chat_completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
203            metrics: Arc::new(Metrics::default()),
204            sse_keep_alive: None,
205        }
206    }
207
208    fn get_completions_engine(
209        &self,
210        model: &str,
211    ) -> Result<OpenAICompletionsStreamingEngine, ServiceHttpError> {
212        self.completion_engines
213            .lock()
214            .unwrap()
215            .get(model)
216            .cloned()
217            .ok_or(ServiceHttpError::ModelNotFound(model.to_string()))
218    }
219
220    fn get_chat_completions_engine(
221        &self,
222        model: &str,
223    ) -> Result<OpenAIChatCompletionsStreamingEngine, ServiceHttpError> {
224        self.chat_completion_engines
225            .lock()
226            .unwrap()
227            .get(model)
228            .cloned()
229            .ok_or(ServiceHttpError::ModelNotFound(model.to_string()))
230    }
231}
232
233/// Documentation for a route
234#[derive(Debug)]
235pub struct RouteDoc {
236    method: axum::http::Method,
237    path: String,
238}
239
240impl std::fmt::Display for RouteDoc {
241    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242        write!(f, "{} {}", self.method, self.path)
243    }
244}
245
246impl RouteDoc {
247    pub fn new<T: Into<String>>(method: axum::http::Method, path: T) -> Self {
248        RouteDoc {
249            method,
250            path: path.into(),
251        }
252    }
253}