offline_intelligence/model_runtime/
runtime_manager.rs1use super::runtime_trait::*;
8use super::format_detector::FormatDetector;
9use super::platform_detector::HardwareCapabilities;
10use crate::model_runtime::{GGUFRuntime, GGMLRuntime, ONNXRuntime, TensorRTRuntime, SafetensorsRuntime, CoreMLRuntime};
11use std::sync::Arc;
12use arc_swap::ArcSwap;
13use tracing::{info, error};
14use super::*;
15
16struct RuntimeHolder {
18 runtime: Option<Box<dyn ModelRuntime>>,
19 config: Option<RuntimeConfig>,
20}
21
22pub struct RuntimeManager {
24 holder: Arc<ArcSwap<RuntimeHolder>>,
26}
27
28impl RuntimeManager {
29 pub fn new() -> Self {
30 Self {
31 holder: Arc::new(ArcSwap::new(Arc::new(RuntimeHolder {
32 runtime: None,
33 config: None,
34 }))),
35 }
36 }
37
38 pub async fn initialize_auto(&self, mut config: RuntimeConfig) -> anyhow::Result<String> {
40 info!("Auto-detecting model format from: {}", config.model_path.display());
41
42 if config.model_path.as_os_str().is_empty() {
44 info!("No model selected, skipping runtime initialization");
45 return Ok(config.host.clone() + ":" + &config.port.to_string());
46 }
47
48 let detected_format = FormatDetector::detect_from_path(&config.model_path)
50 .ok_or_else(|| anyhow::anyhow!(
51 "Could not detect model format from file: {}. Supported formats: {:?}",
52 config.model_path.display(),
53 FormatDetector::supported_extensions()
54 ))?;
55
56 info!("Detected format: {}", detected_format.name());
57
58 config.format = detected_format;
60
61 if config.runtime_binary.is_none() {
63 let hw_caps = HardwareCapabilities::default();
64 if let Some(binary_path) = hw_caps.get_runtime_binary_path() {
65 if binary_path.exists() {
66 info!("Using platform-appropriate runtime binary: {}", binary_path.display());
67 config.runtime_binary = Some(binary_path);
68 } else {
69 info!("Platform-specific binary not found: {}, using default", binary_path.display());
70 }
71 }
72 }
73
74 self.initialize(config).await
75 }
76
77 pub async fn initialize(&self, config: RuntimeConfig) -> anyhow::Result<String> {
79 info!("Initializing runtime for format: {}", config.format.name());
80
81 if config.model_path.as_os_str().is_empty() {
83 info!("No model selected, skipping runtime initialization");
84 let base_url = config.host.clone() + ":" + &config.port.to_string();
86 let new_holder = Arc::new(RuntimeHolder {
87 runtime: None,
88 config: Some(config),
89 });
90 self.holder.store(new_holder);
91 return Ok(base_url);
92 }
93
94 self.shutdown().await?;
96
97 let mut runtime: Box<dyn ModelRuntime> = match config.format {
99 ModelFormat::GGUF => Box::new(GGUFRuntime::new()),
100 ModelFormat::GGML => Box::new(GGMLRuntime::new()),
101 ModelFormat::ONNX => Box::new(ONNXRuntime::new()),
102 ModelFormat::TensorRT => Box::new(TensorRTRuntime::new()),
103 ModelFormat::Safetensors => Box::new(SafetensorsRuntime::new()),
104 ModelFormat::CoreML => Box::new(CoreMLRuntime::new()),
105 };
106
107 runtime.initialize(config.clone()).await
109 .map_err(|e| {
110 error!("Failed to initialize {} runtime: {}", config.format.name(), e);
111 e
112 })?;
113
114 let base_url = runtime.base_url();
115 let metadata = runtime.metadata();
116
117 info!("✅ Runtime initialized successfully:");
118 info!(" Format: {}", metadata.format.name());
119 info!(" Runtime: {}", metadata.runtime_name);
120 info!(" Base URL: {}", base_url);
121 info!(" GPU Support: {}", metadata.supports_gpu);
122 info!(" Streaming: {}", metadata.supports_streaming);
123
124 let new_holder = Arc::new(RuntimeHolder {
126 runtime: Some(runtime),
127 config: Some(config),
128 });
129 self.holder.store(new_holder);
130
131 Ok(base_url)
132 }
133
134 pub async fn get_base_url(&self) -> Option<String> {
136 let holder = self.holder.load();
137 holder.runtime.as_ref().map(|r| r.base_url())
138 }
139
140 pub async fn is_ready(&self) -> bool {
142 let holder = self.holder.load();
143 match holder.runtime.as_ref() {
144 Some(r) => r.is_ready().await,
145 None => false,
146 }
147 }
148
149 pub async fn health_check(&self) -> anyhow::Result<String> {
151 let holder = self.holder.load();
152 match holder.runtime.as_ref() {
153 Some(r) => r.health_check().await,
154 None => Err(anyhow::anyhow!("No runtime initialized")),
155 }
156 }
157
158 pub async fn get_metadata(&self) -> Option<RuntimeMetadata> {
160 let holder = self.holder.load();
161 holder.runtime.as_ref().map(|r| r.metadata())
162 }
163
164 pub async fn shutdown(&self) -> anyhow::Result<()> {
166 let old_holder = self.holder.swap(Arc::new(RuntimeHolder {
168 runtime: None,
169 config: None,
170 }));
171
172 let mut attempt = old_holder;
177 for i in 0..10u8 {
178 match Arc::try_unwrap(attempt) {
179 Ok(mut holder) => {
180 if let Some(mut runtime) = holder.runtime.take() {
181 info!("Shutting down runtime (attempt {})", i + 1);
182 runtime.shutdown().await?;
183 }
184 return Ok(());
185 }
186 Err(arc) => {
187 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
188 attempt = arc;
189 }
190 }
191 }
192 tracing::warn!(
195 "RuntimeManager::shutdown: could not acquire exclusive Arc ownership after 10 retries. \
196 llama-server will be killed by the OS on process exit."
197 );
198
199 Ok(())
200 }
201
202 pub async fn hot_swap(&self, new_config: RuntimeConfig) -> anyhow::Result<String> {
204 info!("Performing hot-swap to new model: {}", new_config.model_path.display());
205
206 self.shutdown().await?;
207 self.initialize(new_config).await
208 }
209
210 pub async fn get_current_config(&self) -> Option<RuntimeConfig> {
212 let holder = self.holder.load();
213 holder.config.clone()
214 }
215
216 pub async fn generate(&self, request: InferenceRequest) -> anyhow::Result<InferenceResponse> {
218 let holder = self.holder.load();
219 match holder.runtime.as_ref() {
220 Some(r) => r.generate(request).await,
221 None => Err(anyhow::anyhow!("No runtime initialized")),
222 }
223 }
224
225 pub async fn generate_stream(
227 &self,
228 request: InferenceRequest,
229 ) -> anyhow::Result<Box<dyn futures_util::Stream<Item = Result<String, anyhow::Error>> + Send + Unpin>> {
230 let holder = self.holder.load();
231 match holder.runtime.as_ref() {
232 Some(r) => r.generate_stream(request).await,
233 None => Err(anyhow::anyhow!("No runtime initialized")),
234 }
235 }
236}
237
238impl Default for RuntimeManager {
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244impl Drop for RuntimeManager {
245 fn drop(&mut self) {
246 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use crate::model_runtime::platform_detector::{Platform, HardwareArchitecture};
255 use std::path::PathBuf;
256
257 #[tokio::test]
258 async fn test_runtime_manager_creation() {
259 let manager = RuntimeManager::new();
260 assert!(!manager.is_ready().await);
261 }
262
263 #[tokio::test]
264 async fn test_format_detection() {
265 let manager = RuntimeManager::new();
266
267 let config = RuntimeConfig {
268 model_path: PathBuf::from("test.gguf"),
269 format: ModelFormat::GGUF, ..Default::default()
271 };
272
273 let result = manager.initialize_auto(config).await;
275 assert!(result.is_err()); }
277
278 #[test]
279 fn test_platform_detection() {
280 let hw_caps = HardwareCapabilities::default();
281
282 assert!(matches!(hw_caps.platform, Platform::Windows | Platform::Linux | Platform::MacOS));
284
285 assert!(matches!(
287 hw_caps.architecture,
288 HardwareArchitecture::X86_64 | HardwareArchitecture::Aarch64 | HardwareArchitecture::Other(_)
289 ));
290 }
291
292 #[tokio::test]
293 async fn test_auto_binary_selection() {
294 let config = RuntimeConfig {
296 model_path: PathBuf::from("test.gguf"),
297 format: ModelFormat::GGUF,
298 runtime_binary: None, ..Default::default()
300 };
301
302 let manager = RuntimeManager::new();
303
304 let result = manager.initialize_auto(config).await;
308
309 assert!(result.is_err());
312 }
313
314 #[tokio::test]
315 async fn test_hot_swap_functionality() {
316 let manager = RuntimeManager::new();
317
318 let config1 = RuntimeConfig {
320 model_path: PathBuf::from("model1.gguf"),
321 format: ModelFormat::GGUF,
322 runtime_binary: None,
323 port: 8081,
324 ..Default::default()
325 };
326
327 let config2 = RuntimeConfig {
328 model_path: PathBuf::from("model2.gguf"),
329 format: ModelFormat::GGUF,
330 runtime_binary: None,
331 port: 8082,
332 ..Default::default()
333 };
334
335 let result1 = manager.initialize_auto(config1).await;
337
338 let result2 = manager.hot_swap(config2).await;
340
341 assert!(result2.is_ok() || result2.is_err());
344
345 manager.shutdown().await.unwrap();
346 }
347
348 #[tokio::test]
349 async fn test_multiple_format_support() {
350 let manager = RuntimeManager::new();
351
352 let formats = [
354 ModelFormat::GGUF,
355 ModelFormat::GGML,
356 ModelFormat::ONNX,
357 ModelFormat::TensorRT,
358 ModelFormat::Safetensors,
359 ModelFormat::CoreML,
360 ];
361
362 for format in &formats {
363 let config = RuntimeConfig {
364 model_path: PathBuf::from("test"),
365 format: format.clone(),
366 runtime_binary: None,
367 port: 8080,
368 ..Default::default()
369 };
370
371 let result = manager.initialize(config).await;
372
373 assert!(result.is_ok() || result.is_err());
375 }
376
377 manager.shutdown().await.unwrap();
378 }
379}