dynamo_llm/http/
service.rs1mod openai;
34
35pub mod discovery;
36pub mod error;
37pub mod metrics;
38pub mod service_v2;
39
40pub use async_trait::async_trait;
44pub use axum;
45pub use error::ServiceHttpError;
46pub use metrics::Metrics;
47
48use crate::types::openai::{
49 chat_completions::OpenAIChatCompletionsStreamingEngine,
50 completions::OpenAICompletionsStreamingEngine,
51};
52use std::{
53 collections::HashMap,
54 sync::{Arc, Mutex},
55 time::Duration,
56};
57
58#[derive(Clone)]
59pub struct ModelManager {
60 state: Arc<DeploymentState>,
61}
62
63impl Default for ModelManager {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl ModelManager {
70 pub fn new() -> Self {
71 let state = Arc::new(DeploymentState::new());
72 Self { state }
73 }
74
75 pub fn state(&self) -> Arc<DeploymentState> {
76 self.state.clone()
77 }
78
79 pub fn has_model_any(&self, model: &str) -> bool {
80 self.state
81 .chat_completion_engines
82 .lock()
83 .unwrap()
84 .contains(model)
85 || self
86 .state
87 .completion_engines
88 .lock()
89 .unwrap()
90 .contains(model)
91 }
92
93 pub fn list_chat_completions_models(&self) -> Vec<String> {
94 self.state.chat_completion_engines.lock().unwrap().list()
95 }
96
97 pub fn list_completions_models(&self) -> Vec<String> {
98 self.state.completion_engines.lock().unwrap().list()
99 }
100
101 pub fn add_completions_model(
102 &self,
103 model: &str,
104 engine: OpenAICompletionsStreamingEngine,
105 ) -> Result<(), ServiceHttpError> {
106 let mut clients = self.state.completion_engines.lock().unwrap();
107 clients.add(model, engine)
108 }
109
110 pub fn add_chat_completions_model(
111 &self,
112 model: &str,
113 engine: OpenAIChatCompletionsStreamingEngine,
114 ) -> Result<(), ServiceHttpError> {
115 let mut clients = self.state.chat_completion_engines.lock().unwrap();
116 clients.add(model, engine)
117 }
118
119 pub fn remove_completions_model(&self, model: &str) -> Result<(), ServiceHttpError> {
120 let mut clients = self.state.completion_engines.lock().unwrap();
121 clients.remove(model)
122 }
123
124 pub fn remove_chat_completions_model(&self, model: &str) -> Result<(), ServiceHttpError> {
125 let mut clients = self.state.chat_completion_engines.lock().unwrap();
126 clients.remove(model)
127 }
128
129 pub fn metrics(&self) -> Arc<Metrics> {
131 self.state.metrics.clone()
132 }
133}
134
135struct ModelEngines<E> {
136 default: Option<String>,
138 engines: HashMap<String, E>,
139}
140
141impl<E> Default for ModelEngines<E> {
142 fn default() -> Self {
143 Self {
144 default: None,
145 engines: HashMap::new(),
146 }
147 }
148}
149
150impl<E> ModelEngines<E> {
151 #[allow(dead_code)]
152 fn set_default(&mut self, model: &str) {
153 self.default = Some(model.to_string());
154 }
155
156 #[allow(dead_code)]
157 fn clear_default(&mut self) {
158 self.default = None;
159 }
160
161 fn add(&mut self, model: &str, engine: E) -> Result<(), ServiceHttpError> {
162 if self.engines.contains_key(model) {
163 return Err(ServiceHttpError::ModelAlreadyExists(model.to_string()));
164 }
165 self.engines.insert(model.to_string(), engine);
166 Ok(())
167 }
168
169 fn remove(&mut self, model: &str) -> Result<(), ServiceHttpError> {
170 if self.engines.remove(model).is_none() {
171 return Err(ServiceHttpError::ModelNotFound(model.to_string()));
172 }
173 Ok(())
174 }
175
176 fn get(&self, model: &str) -> Option<&E> {
177 self.engines.get(model)
178 }
179
180 fn contains(&self, model: &str) -> bool {
181 self.engines.contains_key(model)
182 }
183
184 fn list(&self) -> Vec<String> {
185 self.engines.keys().map(|k| k.to_owned()).collect()
186 }
187}
188
189pub struct DeploymentState {
192 completion_engines: Arc<Mutex<ModelEngines<OpenAICompletionsStreamingEngine>>>,
193 chat_completion_engines: Arc<Mutex<ModelEngines<OpenAIChatCompletionsStreamingEngine>>>,
194 metrics: Arc<Metrics>,
195 sse_keep_alive: Option<Duration>,
196}
197
198impl DeploymentState {
199 fn new() -> Self {
200 Self {
201 completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
202 chat_completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
203 metrics: Arc::new(Metrics::default()),
204 sse_keep_alive: None,
205 }
206 }
207
208 fn get_completions_engine(
209 &self,
210 model: &str,
211 ) -> Result<OpenAICompletionsStreamingEngine, ServiceHttpError> {
212 self.completion_engines
213 .lock()
214 .unwrap()
215 .get(model)
216 .cloned()
217 .ok_or(ServiceHttpError::ModelNotFound(model.to_string()))
218 }
219
220 fn get_chat_completions_engine(
221 &self,
222 model: &str,
223 ) -> Result<OpenAIChatCompletionsStreamingEngine, ServiceHttpError> {
224 self.chat_completion_engines
225 .lock()
226 .unwrap()
227 .get(model)
228 .cloned()
229 .ok_or(ServiceHttpError::ModelNotFound(model.to_string()))
230 }
231}
232
233#[derive(Debug)]
235pub struct RouteDoc {
236 method: axum::http::Method,
237 path: String,
238}
239
240impl std::fmt::Display for RouteDoc {
241 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242 write!(f, "{} {}", self.method, self.path)
243 }
244}
245
246impl RouteDoc {
247 pub fn new<T: Into<String>>(method: axum::http::Method, path: T) -> Self {
248 RouteDoc {
249 method,
250 path: path.into(),
251 }
252 }
253}