rustkernel_core/runtime/
lifecycle.rs

1//! Runtime Lifecycle Management
2//!
3//! Manages the lifecycle of the RustKernels runtime including:
4//! - State machine transitions
5//! - Graceful shutdown with drain periods
6//! - Health check coordination
7//! - Event callbacks
8
9use super::config::RuntimeConfig;
10use super::{RuntimeEvent, RuntimeEventCallback, RuntimeStats, ShutdownSignal};
11use crate::error::{KernelError, Result};
12use crate::registry::KernelRegistry;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use tracing::{debug, info, warn};
18
19/// Runtime lifecycle states
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum LifecycleState {
22    /// Runtime has been created but not started
23    Created,
24    /// Runtime is starting up
25    Starting,
26    /// Runtime is fully operational
27    Running,
28    /// Runtime is draining (accepting no new work, completing existing)
29    Draining,
30    /// Runtime has stopped
31    Stopped,
32    /// Runtime encountered a fatal error
33    Failed,
34}
35
36impl std::fmt::Display for LifecycleState {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::Created => write!(f, "Created"),
40            Self::Starting => write!(f, "Starting"),
41            Self::Running => write!(f, "Running"),
42            Self::Draining => write!(f, "Draining"),
43            Self::Stopped => write!(f, "Stopped"),
44            Self::Failed => write!(f, "Failed"),
45        }
46    }
47}
48
49/// Handle to a running runtime for status queries and control
50#[derive(Clone)]
51pub struct RuntimeHandle {
52    state: Arc<RwLock<LifecycleState>>,
53    shutdown_tx: watch::Sender<bool>,
54    stats: Arc<RuntimeStatsInner>,
55    start_time: Instant,
56}
57
58impl RuntimeHandle {
59    /// Get current lifecycle state
60    pub async fn state(&self) -> LifecycleState {
61        *self.state.read().await
62    }
63
64    /// Check if runtime is running
65    pub async fn is_running(&self) -> bool {
66        *self.state.read().await == LifecycleState::Running
67    }
68
69    /// Request graceful shutdown
70    pub fn request_shutdown(&self) {
71        let _ = self.shutdown_tx.send(true);
72    }
73
74    /// Get runtime statistics
75    pub fn stats(&self) -> RuntimeStats {
76        RuntimeStats {
77            kernels_registered: self.stats.kernels_registered.load(Ordering::Relaxed) as usize,
78            kernels_active: self.stats.kernels_active.load(Ordering::Relaxed) as usize,
79            messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
80            messages_in_flight: self.stats.messages_in_flight.load(Ordering::Relaxed),
81            gpu_memory_bytes: self.stats.gpu_memory_bytes.load(Ordering::Relaxed),
82            gpu_memory_peak_bytes: self.stats.gpu_memory_peak_bytes.load(Ordering::Relaxed),
83            uptime_secs: self.start_time.elapsed().as_secs(),
84        }
85    }
86}
87
88#[derive(Debug, Default)]
89struct RuntimeStatsInner {
90    kernels_registered: AtomicU64,
91    kernels_active: AtomicU64,
92    messages_processed: AtomicU64,
93    messages_in_flight: AtomicU64,
94    gpu_memory_bytes: AtomicU64,
95    gpu_memory_peak_bytes: AtomicU64,
96}
97
98/// The main RustKernels runtime
99pub struct KernelRuntime {
100    config: RuntimeConfig,
101    state: Arc<RwLock<LifecycleState>>,
102    registry: Arc<KernelRegistry>,
103    shutdown_tx: watch::Sender<bool>,
104    shutdown_rx: watch::Receiver<bool>,
105    stats: Arc<RuntimeStatsInner>,
106    start_time: Option<Instant>,
107    event_callbacks: Vec<RuntimeEventCallback>,
108}
109
110impl KernelRuntime {
111    /// Create a new runtime with the given configuration
112    pub fn new(config: RuntimeConfig) -> Self {
113        let (shutdown_tx, shutdown_rx) = watch::channel(false);
114
115        Self {
116            config,
117            state: Arc::new(RwLock::new(LifecycleState::Created)),
118            registry: Arc::new(KernelRegistry::new()),
119            shutdown_tx,
120            shutdown_rx,
121            stats: Arc::new(RuntimeStatsInner::default()),
122            start_time: None,
123            event_callbacks: Vec::new(),
124        }
125    }
126
127    /// Create a runtime builder
128    pub fn builder() -> RuntimeBuilder {
129        RuntimeBuilder::default()
130    }
131
132    /// Get the kernel registry
133    pub fn registry(&self) -> &Arc<KernelRegistry> {
134        &self.registry
135    }
136
137    /// Get the current configuration
138    pub fn config(&self) -> &RuntimeConfig {
139        &self.config
140    }
141
142    /// Get current lifecycle state
143    pub async fn state(&self) -> LifecycleState {
144        *self.state.read().await
145    }
146
147    /// Add an event callback
148    pub fn on_event(&mut self, callback: RuntimeEventCallback) {
149        self.event_callbacks.push(callback);
150    }
151
152    /// Start the runtime
153    pub async fn start(&mut self) -> Result<RuntimeHandle> {
154        let current_state = *self.state.read().await;
155        if current_state != LifecycleState::Created {
156            return Err(KernelError::ConfigError(format!(
157                "Cannot start runtime in state: {}",
158                current_state
159            )));
160        }
161
162        // Transition to Starting
163        *self.state.write().await = LifecycleState::Starting;
164        self.emit_event(RuntimeEvent::Starting);
165        info!("Runtime starting with config: {:?}", self.config);
166
167        // Initialize backend
168        if self.config.gpu_enabled {
169            self.initialize_gpu_backend().await?;
170        }
171
172        // Mark as running
173        *self.state.write().await = LifecycleState::Running;
174        let start_time = Instant::now();
175        self.start_time = Some(start_time);
176        self.emit_event(RuntimeEvent::Started);
177        info!("Runtime started successfully");
178
179        // Start background tasks
180        self.start_health_check_task();
181
182        Ok(RuntimeHandle {
183            state: self.state.clone(),
184            shutdown_tx: self.shutdown_tx.clone(),
185            stats: self.stats.clone(),
186            start_time,
187        })
188    }
189
190    /// Initiate graceful shutdown
191    pub async fn shutdown(&mut self) -> Result<()> {
192        let current_state = *self.state.read().await;
193        if current_state != LifecycleState::Running {
194            return Err(KernelError::ConfigError(format!(
195                "Cannot shutdown runtime in state: {}",
196                current_state
197            )));
198        }
199
200        // Transition to Draining
201        *self.state.write().await = LifecycleState::Draining;
202        self.emit_event(RuntimeEvent::Draining);
203        info!("Runtime draining, timeout: {:?}", self.config.drain_timeout);
204
205        // Signal shutdown to all tasks
206        let _ = self.shutdown_tx.send(true);
207
208        // Wait for drain period or completion
209        let drain_start = Instant::now();
210        while drain_start.elapsed() < self.config.drain_timeout {
211            let in_flight = self.stats.messages_in_flight.load(Ordering::Relaxed);
212            if in_flight == 0 {
213                debug!("All in-flight messages completed");
214                break;
215            }
216            debug!("Waiting for {} in-flight messages", in_flight);
217            tokio::time::sleep(Duration::from_millis(100)).await;
218        }
219
220        // Force stop remaining work
221        let remaining = self.stats.messages_in_flight.load(Ordering::Relaxed);
222        if remaining > 0 {
223            warn!(
224                "Drain timeout reached with {} messages still in flight",
225                remaining
226            );
227        }
228
229        // Cleanup GPU resources
230        if self.config.gpu_enabled {
231            self.cleanup_gpu_backend().await?;
232        }
233
234        // Transition to Stopped
235        *self.state.write().await = LifecycleState::Stopped;
236        self.emit_event(RuntimeEvent::Stopped);
237        info!("Runtime stopped");
238
239        Ok(())
240    }
241
242    /// Force immediate shutdown (not graceful)
243    pub async fn force_shutdown(&mut self) -> Result<()> {
244        warn!("Force shutdown initiated");
245
246        let _ = self.shutdown_tx.send(true);
247
248        if self.config.gpu_enabled {
249            self.cleanup_gpu_backend().await?;
250        }
251
252        *self.state.write().await = LifecycleState::Stopped;
253        self.emit_event(RuntimeEvent::Stopped);
254
255        Ok(())
256    }
257
258    /// Reload configuration (if hot reload enabled)
259    pub async fn reload_config(&mut self, new_config: RuntimeConfig) -> Result<()> {
260        if !self.config.hot_reload_enabled {
261            return Err(KernelError::ConfigError(
262                "Hot reload not enabled".to_string(),
263            ));
264        }
265
266        new_config
267            .validate()
268            .map_err(|e| KernelError::ConfigError(e.to_string()))?;
269
270        // Only update safe-to-reload fields
271        self.config.log_level = new_config.log_level;
272        self.config.metrics_interval = new_config.metrics_interval;
273        self.config.health_check_interval = new_config.health_check_interval;
274        self.config.max_queue_depth = new_config.max_queue_depth;
275
276        self.emit_event(RuntimeEvent::ConfigReloaded);
277        info!("Configuration reloaded");
278
279        Ok(())
280    }
281
282    /// Get a shutdown signal receiver
283    pub fn shutdown_signal(&self) -> ShutdownSignal {
284        self.shutdown_rx.clone()
285    }
286
287    /// Get runtime statistics
288    pub fn stats(&self) -> RuntimeStats {
289        RuntimeStats {
290            kernels_registered: self.stats.kernels_registered.load(Ordering::Relaxed) as usize,
291            kernels_active: self.stats.kernels_active.load(Ordering::Relaxed) as usize,
292            messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
293            messages_in_flight: self.stats.messages_in_flight.load(Ordering::Relaxed),
294            gpu_memory_bytes: self.stats.gpu_memory_bytes.load(Ordering::Relaxed),
295            gpu_memory_peak_bytes: self.stats.gpu_memory_peak_bytes.load(Ordering::Relaxed),
296            uptime_secs: self.start_time.map(|t| t.elapsed().as_secs()).unwrap_or(0),
297        }
298    }
299
300    /// Record a message being processed
301    pub fn record_message_start(&self) {
302        self.stats
303            .messages_in_flight
304            .fetch_add(1, Ordering::Relaxed);
305    }
306
307    /// Record a message completed
308    pub fn record_message_complete(&self) {
309        self.stats
310            .messages_in_flight
311            .fetch_sub(1, Ordering::Relaxed);
312        self.stats
313            .messages_processed
314            .fetch_add(1, Ordering::Relaxed);
315    }
316
317    /// Record kernel registration
318    pub fn record_kernel_registered(&self, id: &str) {
319        self.stats
320            .kernels_registered
321            .fetch_add(1, Ordering::Relaxed);
322        self.emit_event(RuntimeEvent::KernelRegistered { id: id.to_string() });
323    }
324
325    /// Record kernel activation
326    pub fn record_kernel_activated(&self) {
327        self.stats.kernels_active.fetch_add(1, Ordering::Relaxed);
328    }
329
330    /// Record kernel deactivation
331    pub fn record_kernel_deactivated(&self, id: &str) {
332        self.stats.kernels_active.fetch_sub(1, Ordering::Relaxed);
333        self.emit_event(RuntimeEvent::KernelUnregistered { id: id.to_string() });
334    }
335
336    // Private methods
337
338    async fn initialize_gpu_backend(&self) -> Result<()> {
339        info!("Initializing GPU backend: {}", self.config.primary_backend);
340        // GPU backend initialization will be implemented with ringkernel 0.3.1
341        // For now, this is a placeholder that logs the intent
342        Ok(())
343    }
344
345    async fn cleanup_gpu_backend(&self) -> Result<()> {
346        info!("Cleaning up GPU backend");
347        // GPU cleanup will be implemented with ringkernel 0.3.1
348        Ok(())
349    }
350
351    fn start_health_check_task(&self) {
352        let state = self.state.clone();
353        let mut shutdown_rx = self.shutdown_rx.clone();
354        let interval = self.config.health_check_interval;
355        let callbacks = self.event_callbacks.clone();
356
357        tokio::spawn(async move {
358            let mut interval_timer = tokio::time::interval(interval);
359
360            loop {
361                tokio::select! {
362                    _ = interval_timer.tick() => {
363                        let current_state = *state.read().await;
364                        if current_state != LifecycleState::Running {
365                            break;
366                        }
367
368                        // Perform health check
369                        let healthy = true; // TODO: Real health check
370
371                        for callback in &callbacks {
372                            callback(RuntimeEvent::HealthCheckCompleted { healthy });
373                        }
374                    }
375                    _ = shutdown_rx.changed() => {
376                        if *shutdown_rx.borrow() {
377                            break;
378                        }
379                    }
380                }
381            }
382
383            debug!("Health check task stopped");
384        });
385    }
386
387    fn emit_event(&self, event: RuntimeEvent) {
388        for callback in &self.event_callbacks {
389            callback(event.clone());
390        }
391    }
392}
393
394impl std::fmt::Debug for KernelRuntime {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        f.debug_struct("KernelRuntime")
397            .field("config", &self.config)
398            .field("start_time", &self.start_time)
399            .finish()
400    }
401}
402
403/// Builder for KernelRuntime
404#[derive(Default)]
405pub struct RuntimeBuilder {
406    config: Option<RuntimeConfig>,
407    event_callbacks: Vec<RuntimeEventCallback>,
408}
409
410impl RuntimeBuilder {
411    /// Use development configuration
412    pub fn development(mut self) -> Self {
413        self.config = Some(RuntimeConfig::development());
414        self
415    }
416
417    /// Use production configuration
418    pub fn production(mut self) -> Self {
419        self.config = Some(RuntimeConfig::production());
420        self
421    }
422
423    /// Use high-performance configuration
424    pub fn high_performance(mut self) -> Self {
425        self.config = Some(RuntimeConfig::high_performance());
426        self
427    }
428
429    /// Use custom configuration
430    pub fn with_config(mut self, config: RuntimeConfig) -> Self {
431        self.config = Some(config);
432        self
433    }
434
435    /// Load configuration from environment
436    pub fn from_env(mut self) -> Self {
437        self.config = Some(RuntimeConfig::from_env());
438        self
439    }
440
441    /// Load configuration from file
442    pub fn from_file(mut self, path: &std::path::Path) -> Result<Self> {
443        let config =
444            RuntimeConfig::from_file(path).map_err(|e| KernelError::ConfigError(e.to_string()))?;
445        self.config = Some(config);
446        Ok(self)
447    }
448
449    /// Set drain timeout
450    pub fn with_drain_timeout(mut self, timeout: Duration) -> Self {
451        if let Some(ref mut config) = self.config {
452            config.drain_timeout = timeout;
453        }
454        self
455    }
456
457    /// Set max kernel instances
458    pub fn with_max_instances(mut self, count: usize) -> Self {
459        if let Some(ref mut config) = self.config {
460            config.max_kernel_instances = count;
461        }
462        self
463    }
464
465    /// Add event callback
466    pub fn on_event(mut self, callback: RuntimeEventCallback) -> Self {
467        self.event_callbacks.push(callback);
468        self
469    }
470
471    /// Build the runtime
472    pub fn build(self) -> Result<KernelRuntime> {
473        let config = self.config.unwrap_or_default();
474        config
475            .validate()
476            .map_err(|e| KernelError::ConfigError(e.to_string()))?;
477
478        let mut runtime = KernelRuntime::new(config);
479        runtime.event_callbacks = self.event_callbacks;
480
481        Ok(runtime)
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488
489    #[tokio::test]
490    async fn test_runtime_lifecycle() {
491        let mut runtime = KernelRuntime::builder()
492            .development()
493            .with_drain_timeout(Duration::from_millis(100))
494            .build()
495            .unwrap();
496
497        assert_eq!(runtime.state().await, LifecycleState::Created);
498
499        let handle = runtime.start().await.unwrap();
500        assert_eq!(handle.state().await, LifecycleState::Running);
501
502        runtime.shutdown().await.unwrap();
503        assert_eq!(runtime.state().await, LifecycleState::Stopped);
504    }
505
506    #[tokio::test]
507    async fn test_runtime_stats() {
508        let runtime = KernelRuntime::new(RuntimeConfig::testing());
509        let stats = runtime.stats();
510
511        assert_eq!(stats.kernels_registered, 0);
512        assert_eq!(stats.messages_processed, 0);
513    }
514
515    #[tokio::test]
516    async fn test_shutdown_signal() {
517        let runtime = KernelRuntime::new(RuntimeConfig::testing());
518        let mut signal = runtime.shutdown_signal();
519
520        assert!(!*signal.borrow());
521
522        runtime.shutdown_tx.send(true).unwrap();
523        signal.changed().await.unwrap();
524        assert!(*signal.borrow());
525    }
526}