Skip to main content

offline_intelligence/model_runtime/
runtime_manager.rs

1//! Runtime Manager
2//!
3//! Orchestrates model runtime selection, initialization, and lifecycle management.
4//! Automatically selects the appropriate runtime based on model format.
5//! Lock-free implementation using ArcSwap for atomic pointer swapping.
6
7use super::runtime_trait::*;
8use super::format_detector::FormatDetector;
9use super::platform_detector::HardwareCapabilities;
10use crate::model_runtime::{GGUFRuntime, GGMLRuntime, ONNXRuntime, TensorRTRuntime, SafetensorsRuntime, CoreMLRuntime};
11use std::sync::Arc;
12use arc_swap::ArcSwap;
13use tracing::{info, error};
14use super::*;
15
16/// Runtime holder for lock-free access
17struct RuntimeHolder {
18    runtime: Option<Box<dyn ModelRuntime>>,
19    config: Option<RuntimeConfig>,
20}
21
22/// Runtime Manager - manages active model runtime
23pub struct RuntimeManager {
24    /// Currently active runtime (lock-free via ArcSwap)
25    holder: Arc<ArcSwap<RuntimeHolder>>,
26}
27
28impl RuntimeManager {
29    pub fn new() -> Self {
30        Self {
31            holder: Arc::new(ArcSwap::new(Arc::new(RuntimeHolder {
32                runtime: None,
33                config: None,
34            }))),
35        }
36    }
37
38    /// Initialize runtime with automatic format detection and platform-appropriate binary
39    pub async fn initialize_auto(&self, mut config: RuntimeConfig) -> anyhow::Result<String> {
40        info!("Auto-detecting model format from: {}", config.model_path.display());
41        
42        // Check if model path is empty (no model selected)
43        if config.model_path.as_os_str().is_empty() {
44            info!("No model selected, skipping runtime initialization");
45            return Ok(config.host.clone() + ":" + &config.port.to_string());
46        }
47        
48        // Detect format from file extension
49        let detected_format = FormatDetector::detect_from_path(&config.model_path)
50            .ok_or_else(|| anyhow::anyhow!(
51                "Could not detect model format from file: {}. Supported formats: {:?}",
52                config.model_path.display(),
53                FormatDetector::supported_extensions()
54            ))?;
55
56        info!("Detected format: {}", detected_format.name());
57
58        // Override config format with detected format
59        config.format = detected_format;
60        
61        // Auto-detect and set appropriate runtime binary based on platform and hardware
62        if config.runtime_binary.is_none() {
63            let hw_caps = HardwareCapabilities::default();
64            if let Some(binary_path) = hw_caps.get_runtime_binary_path() {
65                if binary_path.exists() {
66                    info!("Using platform-appropriate runtime binary: {}", binary_path.display());
67                    config.runtime_binary = Some(binary_path);
68                } else {
69                    info!("Platform-specific binary not found: {}, using default", binary_path.display());
70                }
71            }
72        }
73
74        self.initialize(config).await
75    }
76
77    /// Initialize runtime with specified configuration
78    pub async fn initialize(&self, config: RuntimeConfig) -> anyhow::Result<String> {
79        info!("Initializing runtime for format: {}", config.format.name());
80
81        // Check if model path is empty (no model selected)
82        if config.model_path.as_os_str().is_empty() {
83            info!("No model selected, skipping runtime initialization");
84            // Store an empty runtime holder but return the expected base URL
85            let base_url = config.host.clone() + ":" + &config.port.to_string();
86            let new_holder = Arc::new(RuntimeHolder {
87                runtime: None,
88                config: Some(config),
89            });
90            self.holder.store(new_holder);
91            return Ok(base_url);
92        }
93        
94        // Shutdown existing runtime if any
95        self.shutdown().await?;
96
97        // Create appropriate runtime based on format
98        let mut runtime: Box<dyn ModelRuntime> = match config.format {
99            ModelFormat::GGUF => Box::new(GGUFRuntime::new()),
100            ModelFormat::GGML => Box::new(GGMLRuntime::new()),
101            ModelFormat::ONNX => Box::new(ONNXRuntime::new()),
102            ModelFormat::TensorRT => Box::new(TensorRTRuntime::new()),
103            ModelFormat::Safetensors => Box::new(SafetensorsRuntime::new()),
104            ModelFormat::CoreML => Box::new(CoreMLRuntime::new()),
105        };
106
107        // Initialize the runtime
108        runtime.initialize(config.clone()).await
109            .map_err(|e| {
110                error!("Failed to initialize {} runtime: {}", config.format.name(), e);
111                e
112            })?;
113
114        let base_url = runtime.base_url();
115        let metadata = runtime.metadata();
116
117        info!("✅ Runtime initialized successfully:");
118        info!("  Format: {}", metadata.format.name());
119        info!("  Runtime: {}", metadata.runtime_name);
120        info!("  Base URL: {}", base_url);
121        info!("  GPU Support: {}", metadata.supports_gpu);
122        info!("  Streaming: {}", metadata.supports_streaming);
123
124        // Atomically store the new runtime
125        let new_holder = Arc::new(RuntimeHolder {
126            runtime: Some(runtime),
127            config: Some(config),
128        });
129        self.holder.store(new_holder);
130
131        Ok(base_url)
132    }
133
134    /// Get the current runtime's base URL (lock-free)
135    pub async fn get_base_url(&self) -> Option<String> {
136        let holder = self.holder.load();
137        holder.runtime.as_ref().map(|r| r.base_url())
138    }
139
140    /// Check if runtime is ready (lock-free read)
141    pub async fn is_ready(&self) -> bool {
142        let holder = self.holder.load();
143        match holder.runtime.as_ref() {
144            Some(r) => r.is_ready().await,
145            None => false,
146        }
147    }
148
149    /// Perform health check (lock-free read)
150    pub async fn health_check(&self) -> anyhow::Result<String> {
151        let holder = self.holder.load();
152        match holder.runtime.as_ref() {
153            Some(r) => r.health_check().await,
154            None => Err(anyhow::anyhow!("No runtime initialized")),
155        }
156    }
157
158    /// Get runtime metadata (lock-free read)
159    pub async fn get_metadata(&self) -> Option<RuntimeMetadata> {
160        let holder = self.holder.load();
161        holder.runtime.as_ref().map(|r| r.metadata())
162    }
163
164    /// Shutdown current runtime (atomic replacement)
165    pub async fn shutdown(&self) -> anyhow::Result<()> {
166        // Atomically replace with empty holder so new load() calls see no runtime.
167        let old_holder = self.holder.swap(Arc::new(RuntimeHolder {
168            runtime: None,
169            config: None,
170        }));
171
172        // Retry Arc::try_unwrap up to 10 times (100 ms total).
173        // ArcSwap load() guards are held for nanoseconds; any concurrent caller
174        // that loaded old_holder just before the swap above will have dropped its
175        // guard by the second or third attempt at most.
176        let mut attempt = old_holder;
177        for i in 0..10u8 {
178            match Arc::try_unwrap(attempt) {
179                Ok(mut holder) => {
180                    if let Some(mut runtime) = holder.runtime.take() {
181                        info!("Shutting down runtime (attempt {})", i + 1);
182                        runtime.shutdown().await?;
183                    }
184                    return Ok(());
185                }
186                Err(arc) => {
187                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
188                    attempt = arc;
189                }
190            }
191        }
192        // Could not get exclusive ownership — the process is exiting anyway
193        // (std::process::exit(0) in the ExitRequested handler terminates everything).
194        tracing::warn!(
195            "RuntimeManager::shutdown: could not acquire exclusive Arc ownership after 10 retries. \
196             llama-server will be killed by the OS on process exit."
197        );
198
199        Ok(())
200    }
201
202    /// Hot-swap model (shutdown current, initialize new)
203    pub async fn hot_swap(&self, new_config: RuntimeConfig) -> anyhow::Result<String> {
204        info!("Performing hot-swap to new model: {}", new_config.model_path.display());
205        
206        self.shutdown().await?;
207        self.initialize(new_config).await
208    }
209
210    /// Get current configuration (lock-free)
211    pub async fn get_current_config(&self) -> Option<RuntimeConfig> {
212        let holder = self.holder.load();
213        holder.config.clone()
214    }
215
216    /// Perform inference (non-streaming, lock-free read)
217    pub async fn generate(&self, request: InferenceRequest) -> anyhow::Result<InferenceResponse> {
218        let holder = self.holder.load();
219        match holder.runtime.as_ref() {
220            Some(r) => r.generate(request).await,
221            None => Err(anyhow::anyhow!("No runtime initialized")),
222        }
223    }
224
225    /// Perform streaming inference (lock-free read)
226    pub async fn generate_stream(
227        &self,
228        request: InferenceRequest,
229    ) -> anyhow::Result<Box<dyn futures_util::Stream<Item = Result<String, anyhow::Error>> + Send + Unpin>> {
230        let holder = self.holder.load();
231        match holder.runtime.as_ref() {
232            Some(r) => r.generate_stream(request).await,
233            None => Err(anyhow::anyhow!("No runtime initialized")),
234        }
235    }
236}
237
238impl Default for RuntimeManager {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244impl Drop for RuntimeManager {
245    fn drop(&mut self) {
246        // Runtime cleanup happens in shutdown()
247        // This is just a safety net
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::model_runtime::platform_detector::{Platform, HardwareArchitecture};
255    use std::path::PathBuf;
256
257    #[tokio::test]
258    async fn test_runtime_manager_creation() {
259        let manager = RuntimeManager::new();
260        assert!(!manager.is_ready().await);
261    }
262
263    #[tokio::test]
264    async fn test_format_detection() {
265        let manager = RuntimeManager::new();
266        
267        let config = RuntimeConfig {
268            model_path: PathBuf::from("test.gguf"),
269            format: ModelFormat::GGUF, // Will be overridden
270            ..Default::default()
271        };
272
273        // This will fail because the file doesn't exist, but tests the detection logic
274        let result = manager.initialize_auto(config).await;
275        assert!(result.is_err()); // Expected to fail - file doesn't exist
276    }
277    
278    #[test]
279    fn test_platform_detection() {
280        let hw_caps = HardwareCapabilities::default();
281        
282        // Verify that platform detection returns a valid platform
283        assert!(matches!(hw_caps.platform, Platform::Windows | Platform::Linux | Platform::MacOS));
284        
285        // Verify that architecture detection returns a valid architecture
286        assert!(matches!(
287            hw_caps.architecture,
288            HardwareArchitecture::X86_64 | HardwareArchitecture::Aarch64 | HardwareArchitecture::Other(_)
289        ));
290    }
291    
292    #[tokio::test]
293    async fn test_auto_binary_selection() {
294        // Create a config without specifying a binary path
295        let config = RuntimeConfig {
296            model_path: PathBuf::from("test.gguf"),
297            format: ModelFormat::GGUF,
298            runtime_binary: None, // Intentionally set to None
299            ..Default::default()
300        };
301        
302        let manager = RuntimeManager::new();
303        
304        // The initialize_auto method should attempt to select an appropriate binary
305        // based on platform. It will fail due to missing file but should at least
306        // try to select a platform-appropriate binary.
307        let result = manager.initialize_auto(config).await;
308        
309        // The result will be an error because the file doesn't exist, but the
310        // platform detection part should work
311        assert!(result.is_err());
312    }
313    
314    #[tokio::test]
315    async fn test_hot_swap_functionality() {
316        let manager = RuntimeManager::new();
317        
318        // Test that hot-swap works properly
319        let config1 = RuntimeConfig {
320            model_path: PathBuf::from("model1.gguf"),
321            format: ModelFormat::GGUF,
322            runtime_binary: None,
323            port: 8081,
324            ..Default::default()
325        };
326        
327        let config2 = RuntimeConfig {
328            model_path: PathBuf::from("model2.gguf"),
329            format: ModelFormat::GGUF,
330            runtime_binary: None,
331            port: 8082,
332            ..Default::default()
333        };
334        
335        // Initialize first config
336        let result1 = manager.initialize_auto(config1).await;
337        
338        // Hot-swap to second config
339        let result2 = manager.hot_swap(config2).await;
340        
341        // Both operations will fail due to missing files, but the process should complete
342        // without crashing
343        assert!(result2.is_ok() || result2.is_err());
344        
345        manager.shutdown().await.unwrap();
346    }
347    
348    #[tokio::test]
349    async fn test_multiple_format_support() {
350        let manager = RuntimeManager::new();
351        
352        // Test different model formats
353        let formats = [
354            ModelFormat::GGUF,
355            ModelFormat::GGML,
356            ModelFormat::ONNX,
357            ModelFormat::TensorRT,
358            ModelFormat::Safetensors,
359            ModelFormat::CoreML,
360        ];
361        
362        for format in &formats {
363            let config = RuntimeConfig {
364                model_path: PathBuf::from("test"),
365                format: format.clone(),
366                runtime_binary: None,
367                port: 8080,
368                ..Default::default()
369            };
370            
371            let result = manager.initialize(config).await;
372            
373            // Each format should be attempted without crashing the system
374            assert!(result.is_ok() || result.is_err());
375        }
376        
377        manager.shutdown().await.unwrap();
378    }
379}