Skip to main content

offline_intelligence/model_runtime/
runtime_manager.rs

1//! Runtime Manager
2//!
3//! Orchestrates model runtime selection, initialization, and lifecycle management.
4//! Automatically selects the appropriate runtime based on model format.
5//! Lock-free implementation using ArcSwap for atomic pointer swapping.
6
7use super::runtime_trait::*;
8use super::format_detector::FormatDetector;
9use super::*;
10use std::sync::Arc;
11use arc_swap::ArcSwap;
12use tracing::{info, error};
13
14/// Runtime holder for lock-free access
15struct RuntimeHolder {
16    runtime: Option<Box<dyn ModelRuntime>>,
17    config: Option<RuntimeConfig>,
18}
19
20/// Runtime Manager - manages active model runtime
21pub struct RuntimeManager {
22    /// Currently active runtime (lock-free via ArcSwap)
23    holder: Arc<ArcSwap<RuntimeHolder>>,
24}
25
26impl RuntimeManager {
27    pub fn new() -> Self {
28        Self {
29            holder: Arc::new(ArcSwap::new(Arc::new(RuntimeHolder {
30                runtime: None,
31                config: None,
32            }))),
33        }
34    }
35
36    /// Initialize runtime with automatic format detection
37    pub async fn initialize_auto(&self, config: RuntimeConfig) -> anyhow::Result<String> {
38        info!("Auto-detecting model format from: {}", config.model_path.display());
39        
40        // Detect format from file extension
41        let detected_format = FormatDetector::detect_from_path(&config.model_path)
42            .ok_or_else(|| anyhow::anyhow!(
43                "Could not detect model format from file: {}. Supported formats: {:?}",
44                config.model_path.display(),
45                FormatDetector::supported_extensions()
46            ))?;
47
48        info!("Detected format: {}", detected_format.name());
49
50        // Override config format with detected format
51        let mut final_config = config;
52        final_config.format = detected_format;
53
54        self.initialize(final_config).await
55    }
56
57    /// Initialize runtime with specified configuration
58    pub async fn initialize(&self, config: RuntimeConfig) -> anyhow::Result<String> {
59        info!("Initializing runtime for format: {}", config.format.name());
60
61        // Shutdown existing runtime if any
62        self.shutdown().await?;
63
64        // Create appropriate runtime based on format
65        let mut runtime: Box<dyn ModelRuntime> = match config.format {
66            ModelFormat::GGUF => Box::new(GGUFRuntime::new()),
67            ModelFormat::GGML => Box::new(GGMLRuntime::new()),
68            ModelFormat::ONNX => Box::new(ONNXRuntime::new()),
69            ModelFormat::TensorRT => Box::new(TensorRTRuntime::new()),
70            ModelFormat::Safetensors => Box::new(SafetensorsRuntime::new()),
71            ModelFormat::CoreML => Box::new(CoreMLRuntime::new()),
72        };
73
74        // Initialize the runtime
75        runtime.initialize(config.clone()).await
76            .map_err(|e| {
77                error!("Failed to initialize {} runtime: {}", config.format.name(), e);
78                e
79            })?;
80
81        let base_url = runtime.base_url();
82        let metadata = runtime.metadata();
83
84        info!("✅ Runtime initialized successfully:");
85        info!("  Format: {}", metadata.format.name());
86        info!("  Runtime: {}", metadata.runtime_name);
87        info!("  Base URL: {}", base_url);
88        info!("  GPU Support: {}", metadata.supports_gpu);
89        info!("  Streaming: {}", metadata.supports_streaming);
90
91        // Atomically store the new runtime
92        let new_holder = Arc::new(RuntimeHolder {
93            runtime: Some(runtime),
94            config: Some(config),
95        });
96        self.holder.store(new_holder);
97
98        Ok(base_url)
99    }
100
101    /// Get the current runtime's base URL (lock-free)
102    pub async fn get_base_url(&self) -> Option<String> {
103        let holder = self.holder.load();
104        holder.runtime.as_ref().map(|r| r.base_url())
105    }
106
107    /// Check if runtime is ready (lock-free read)
108    pub async fn is_ready(&self) -> bool {
109        let holder = self.holder.load();
110        match holder.runtime.as_ref() {
111            Some(r) => r.is_ready().await,
112            None => false,
113        }
114    }
115
116    /// Perform health check (lock-free read)
117    pub async fn health_check(&self) -> anyhow::Result<String> {
118        let holder = self.holder.load();
119        match holder.runtime.as_ref() {
120            Some(r) => r.health_check().await,
121            None => Err(anyhow::anyhow!("No runtime initialized")),
122        }
123    }
124
125    /// Get runtime metadata (lock-free read)
126    pub async fn get_metadata(&self) -> Option<RuntimeMetadata> {
127        let holder = self.holder.load();
128        holder.runtime.as_ref().map(|r| r.metadata())
129    }
130
131    /// Shutdown current runtime (atomic replacement)
132    pub async fn shutdown(&self) -> anyhow::Result<()> {
133        // Atomically replace with empty holder
134        let old_holder = self.holder.swap(Arc::new(RuntimeHolder {
135            runtime: None,
136            config: None,
137        }));
138
139        // Shutdown the old runtime outside the critical section
140        // Try to get exclusive ownership; if not possible (arc still referenced), skip shutdown
141        if let Ok(mut holder) = Arc::try_unwrap(old_holder) {
142            if let Some(mut runtime) = holder.runtime.take() {
143                info!("Shutting down runtime");
144                runtime.shutdown().await?;
145            }
146        }
147
148        Ok(())
149    }
150
151    /// Hot-swap model (shutdown current, initialize new)
152    pub async fn hot_swap(&self, new_config: RuntimeConfig) -> anyhow::Result<String> {
153        info!("Performing hot-swap to new model: {}", new_config.model_path.display());
154        
155        self.shutdown().await?;
156        self.initialize(new_config).await
157    }
158
159    /// Get current configuration (lock-free)
160    pub async fn get_current_config(&self) -> Option<RuntimeConfig> {
161        let holder = self.holder.load();
162        holder.config.clone()
163    }
164
165    /// Perform inference (non-streaming, lock-free read)
166    pub async fn generate(&self, request: InferenceRequest) -> anyhow::Result<InferenceResponse> {
167        let holder = self.holder.load();
168        match holder.runtime.as_ref() {
169            Some(r) => r.generate(request).await,
170            None => Err(anyhow::anyhow!("No runtime initialized")),
171        }
172    }
173
174    /// Perform streaming inference (lock-free read)
175    pub async fn generate_stream(
176        &self,
177        request: InferenceRequest,
178    ) -> anyhow::Result<Box<dyn futures_util::Stream<Item = Result<String, anyhow::Error>> + Send + Unpin>> {
179        let holder = self.holder.load();
180        match holder.runtime.as_ref() {
181            Some(r) => r.generate_stream(request).await,
182            None => Err(anyhow::anyhow!("No runtime initialized")),
183        }
184    }
185}
186
187impl Default for RuntimeManager {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193impl Drop for RuntimeManager {
194    fn drop(&mut self) {
195        // Runtime cleanup happens in shutdown()
196        // This is just a safety net
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use std::path::PathBuf;
204
205    #[tokio::test]
206    async fn test_runtime_manager_creation() {
207        let manager = RuntimeManager::new();
208        assert!(!manager.is_ready().await);
209    }
210
211    #[tokio::test]
212    async fn test_format_detection() {
213        let manager = RuntimeManager::new();
214        
215        let config = RuntimeConfig {
216            model_path: PathBuf::from("test.gguf"),
217            format: ModelFormat::GGUF, // Will be overridden
218            ..Default::default()
219        };
220
221        // This will fail because the file doesn't exist, but tests the detection logic
222        let result = manager.initialize_auto(config).await;
223        assert!(result.is_err()); // Expected to fail - file doesn't exist
224    }
225}