1use super::config::RuntimeConfig;
10use super::{RuntimeEvent, RuntimeEventCallback, RuntimeStats, ShutdownSignal};
11use crate::error::{KernelError, Result};
12use crate::registry::KernelRegistry;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use tracing::{debug, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum LifecycleState {
22 Created,
24 Starting,
26 Running,
28 Draining,
30 Stopped,
32 Failed,
34}
35
36impl std::fmt::Display for LifecycleState {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::Created => write!(f, "Created"),
40 Self::Starting => write!(f, "Starting"),
41 Self::Running => write!(f, "Running"),
42 Self::Draining => write!(f, "Draining"),
43 Self::Stopped => write!(f, "Stopped"),
44 Self::Failed => write!(f, "Failed"),
45 }
46 }
47}
48
49#[derive(Clone)]
51pub struct RuntimeHandle {
52 state: Arc<RwLock<LifecycleState>>,
53 shutdown_tx: watch::Sender<bool>,
54 stats: Arc<RuntimeStatsInner>,
55 start_time: Instant,
56}
57
58impl RuntimeHandle {
59 pub async fn state(&self) -> LifecycleState {
61 *self.state.read().await
62 }
63
64 pub async fn is_running(&self) -> bool {
66 *self.state.read().await == LifecycleState::Running
67 }
68
69 pub fn request_shutdown(&self) {
71 let _ = self.shutdown_tx.send(true);
72 }
73
74 pub fn stats(&self) -> RuntimeStats {
76 RuntimeStats {
77 kernels_registered: self.stats.kernels_registered.load(Ordering::Relaxed) as usize,
78 kernels_active: self.stats.kernels_active.load(Ordering::Relaxed) as usize,
79 messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
80 messages_in_flight: self.stats.messages_in_flight.load(Ordering::Relaxed),
81 gpu_memory_bytes: self.stats.gpu_memory_bytes.load(Ordering::Relaxed),
82 gpu_memory_peak_bytes: self.stats.gpu_memory_peak_bytes.load(Ordering::Relaxed),
83 uptime_secs: self.start_time.elapsed().as_secs(),
84 }
85 }
86}
87
88#[derive(Debug, Default)]
89struct RuntimeStatsInner {
90 kernels_registered: AtomicU64,
91 kernels_active: AtomicU64,
92 messages_processed: AtomicU64,
93 messages_in_flight: AtomicU64,
94 gpu_memory_bytes: AtomicU64,
95 gpu_memory_peak_bytes: AtomicU64,
96}
97
98pub struct KernelRuntime {
100 config: RuntimeConfig,
101 state: Arc<RwLock<LifecycleState>>,
102 registry: Arc<KernelRegistry>,
103 shutdown_tx: watch::Sender<bool>,
104 shutdown_rx: watch::Receiver<bool>,
105 stats: Arc<RuntimeStatsInner>,
106 start_time: Option<Instant>,
107 event_callbacks: Vec<RuntimeEventCallback>,
108}
109
110impl KernelRuntime {
111 pub fn new(config: RuntimeConfig) -> Self {
113 let (shutdown_tx, shutdown_rx) = watch::channel(false);
114
115 Self {
116 config,
117 state: Arc::new(RwLock::new(LifecycleState::Created)),
118 registry: Arc::new(KernelRegistry::new()),
119 shutdown_tx,
120 shutdown_rx,
121 stats: Arc::new(RuntimeStatsInner::default()),
122 start_time: None,
123 event_callbacks: Vec::new(),
124 }
125 }
126
127 pub fn builder() -> RuntimeBuilder {
129 RuntimeBuilder::default()
130 }
131
132 pub fn registry(&self) -> &Arc<KernelRegistry> {
134 &self.registry
135 }
136
137 pub fn config(&self) -> &RuntimeConfig {
139 &self.config
140 }
141
142 pub async fn state(&self) -> LifecycleState {
144 *self.state.read().await
145 }
146
147 pub fn on_event(&mut self, callback: RuntimeEventCallback) {
149 self.event_callbacks.push(callback);
150 }
151
152 pub async fn start(&mut self) -> Result<RuntimeHandle> {
154 let current_state = *self.state.read().await;
155 if current_state != LifecycleState::Created {
156 return Err(KernelError::ConfigError(format!(
157 "Cannot start runtime in state: {}",
158 current_state
159 )));
160 }
161
162 *self.state.write().await = LifecycleState::Starting;
164 self.emit_event(RuntimeEvent::Starting);
165 info!("Runtime starting with config: {:?}", self.config);
166
167 if self.config.gpu_enabled {
169 self.initialize_gpu_backend().await?;
170 }
171
172 *self.state.write().await = LifecycleState::Running;
174 let start_time = Instant::now();
175 self.start_time = Some(start_time);
176 self.emit_event(RuntimeEvent::Started);
177 info!("Runtime started successfully");
178
179 self.start_health_check_task();
181
182 Ok(RuntimeHandle {
183 state: self.state.clone(),
184 shutdown_tx: self.shutdown_tx.clone(),
185 stats: self.stats.clone(),
186 start_time,
187 })
188 }
189
190 pub async fn shutdown(&mut self) -> Result<()> {
192 let current_state = *self.state.read().await;
193 if current_state != LifecycleState::Running {
194 return Err(KernelError::ConfigError(format!(
195 "Cannot shutdown runtime in state: {}",
196 current_state
197 )));
198 }
199
200 *self.state.write().await = LifecycleState::Draining;
202 self.emit_event(RuntimeEvent::Draining);
203 info!("Runtime draining, timeout: {:?}", self.config.drain_timeout);
204
205 let _ = self.shutdown_tx.send(true);
207
208 let drain_start = Instant::now();
210 while drain_start.elapsed() < self.config.drain_timeout {
211 let in_flight = self.stats.messages_in_flight.load(Ordering::Relaxed);
212 if in_flight == 0 {
213 debug!("All in-flight messages completed");
214 break;
215 }
216 debug!("Waiting for {} in-flight messages", in_flight);
217 tokio::time::sleep(Duration::from_millis(100)).await;
218 }
219
220 let remaining = self.stats.messages_in_flight.load(Ordering::Relaxed);
222 if remaining > 0 {
223 warn!(
224 "Drain timeout reached with {} messages still in flight",
225 remaining
226 );
227 }
228
229 if self.config.gpu_enabled {
231 self.cleanup_gpu_backend().await?;
232 }
233
234 *self.state.write().await = LifecycleState::Stopped;
236 self.emit_event(RuntimeEvent::Stopped);
237 info!("Runtime stopped");
238
239 Ok(())
240 }
241
242 pub async fn force_shutdown(&mut self) -> Result<()> {
244 warn!("Force shutdown initiated");
245
246 let _ = self.shutdown_tx.send(true);
247
248 if self.config.gpu_enabled {
249 self.cleanup_gpu_backend().await?;
250 }
251
252 *self.state.write().await = LifecycleState::Stopped;
253 self.emit_event(RuntimeEvent::Stopped);
254
255 Ok(())
256 }
257
258 pub async fn reload_config(&mut self, new_config: RuntimeConfig) -> Result<()> {
260 if !self.config.hot_reload_enabled {
261 return Err(KernelError::ConfigError(
262 "Hot reload not enabled".to_string(),
263 ));
264 }
265
266 new_config
267 .validate()
268 .map_err(|e| KernelError::ConfigError(e.to_string()))?;
269
270 self.config.log_level = new_config.log_level;
272 self.config.metrics_interval = new_config.metrics_interval;
273 self.config.health_check_interval = new_config.health_check_interval;
274 self.config.max_queue_depth = new_config.max_queue_depth;
275
276 self.emit_event(RuntimeEvent::ConfigReloaded);
277 info!("Configuration reloaded");
278
279 Ok(())
280 }
281
282 pub fn shutdown_signal(&self) -> ShutdownSignal {
284 self.shutdown_rx.clone()
285 }
286
287 pub fn stats(&self) -> RuntimeStats {
289 RuntimeStats {
290 kernels_registered: self.stats.kernels_registered.load(Ordering::Relaxed) as usize,
291 kernels_active: self.stats.kernels_active.load(Ordering::Relaxed) as usize,
292 messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
293 messages_in_flight: self.stats.messages_in_flight.load(Ordering::Relaxed),
294 gpu_memory_bytes: self.stats.gpu_memory_bytes.load(Ordering::Relaxed),
295 gpu_memory_peak_bytes: self.stats.gpu_memory_peak_bytes.load(Ordering::Relaxed),
296 uptime_secs: self.start_time.map(|t| t.elapsed().as_secs()).unwrap_or(0),
297 }
298 }
299
300 pub fn record_message_start(&self) {
302 self.stats
303 .messages_in_flight
304 .fetch_add(1, Ordering::Relaxed);
305 }
306
307 pub fn record_message_complete(&self) {
309 self.stats
310 .messages_in_flight
311 .fetch_sub(1, Ordering::Relaxed);
312 self.stats
313 .messages_processed
314 .fetch_add(1, Ordering::Relaxed);
315 }
316
317 pub fn record_kernel_registered(&self, id: &str) {
319 self.stats
320 .kernels_registered
321 .fetch_add(1, Ordering::Relaxed);
322 self.emit_event(RuntimeEvent::KernelRegistered { id: id.to_string() });
323 }
324
325 pub fn record_kernel_activated(&self) {
327 self.stats.kernels_active.fetch_add(1, Ordering::Relaxed);
328 }
329
330 pub fn record_kernel_deactivated(&self, id: &str) {
332 self.stats.kernels_active.fetch_sub(1, Ordering::Relaxed);
333 self.emit_event(RuntimeEvent::KernelUnregistered { id: id.to_string() });
334 }
335
336 async fn initialize_gpu_backend(&self) -> Result<()> {
339 info!("Initializing GPU backend: {}", self.config.primary_backend);
340 Ok(())
343 }
344
345 async fn cleanup_gpu_backend(&self) -> Result<()> {
346 info!("Cleaning up GPU backend");
347 Ok(())
349 }
350
351 fn start_health_check_task(&self) {
352 let state = self.state.clone();
353 let mut shutdown_rx = self.shutdown_rx.clone();
354 let interval = self.config.health_check_interval;
355 let callbacks = self.event_callbacks.clone();
356
357 tokio::spawn(async move {
358 let mut interval_timer = tokio::time::interval(interval);
359
360 loop {
361 tokio::select! {
362 _ = interval_timer.tick() => {
363 let current_state = *state.read().await;
364 if current_state != LifecycleState::Running {
365 break;
366 }
367
368 let healthy = true; for callback in &callbacks {
372 callback(RuntimeEvent::HealthCheckCompleted { healthy });
373 }
374 }
375 _ = shutdown_rx.changed() => {
376 if *shutdown_rx.borrow() {
377 break;
378 }
379 }
380 }
381 }
382
383 debug!("Health check task stopped");
384 });
385 }
386
387 fn emit_event(&self, event: RuntimeEvent) {
388 for callback in &self.event_callbacks {
389 callback(event.clone());
390 }
391 }
392}
393
394impl std::fmt::Debug for KernelRuntime {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 f.debug_struct("KernelRuntime")
397 .field("config", &self.config)
398 .field("start_time", &self.start_time)
399 .finish()
400 }
401}
402
403#[derive(Default)]
405pub struct RuntimeBuilder {
406 config: Option<RuntimeConfig>,
407 event_callbacks: Vec<RuntimeEventCallback>,
408}
409
410impl RuntimeBuilder {
411 pub fn development(mut self) -> Self {
413 self.config = Some(RuntimeConfig::development());
414 self
415 }
416
417 pub fn production(mut self) -> Self {
419 self.config = Some(RuntimeConfig::production());
420 self
421 }
422
423 pub fn high_performance(mut self) -> Self {
425 self.config = Some(RuntimeConfig::high_performance());
426 self
427 }
428
429 pub fn with_config(mut self, config: RuntimeConfig) -> Self {
431 self.config = Some(config);
432 self
433 }
434
435 pub fn from_env(mut self) -> Self {
437 self.config = Some(RuntimeConfig::from_env());
438 self
439 }
440
441 pub fn from_file(mut self, path: &std::path::Path) -> Result<Self> {
443 let config =
444 RuntimeConfig::from_file(path).map_err(|e| KernelError::ConfigError(e.to_string()))?;
445 self.config = Some(config);
446 Ok(self)
447 }
448
449 pub fn with_drain_timeout(mut self, timeout: Duration) -> Self {
451 if let Some(ref mut config) = self.config {
452 config.drain_timeout = timeout;
453 }
454 self
455 }
456
457 pub fn with_max_instances(mut self, count: usize) -> Self {
459 if let Some(ref mut config) = self.config {
460 config.max_kernel_instances = count;
461 }
462 self
463 }
464
465 pub fn on_event(mut self, callback: RuntimeEventCallback) -> Self {
467 self.event_callbacks.push(callback);
468 self
469 }
470
471 pub fn build(self) -> Result<KernelRuntime> {
473 let config = self.config.unwrap_or_default();
474 config
475 .validate()
476 .map_err(|e| KernelError::ConfigError(e.to_string()))?;
477
478 let mut runtime = KernelRuntime::new(config);
479 runtime.event_callbacks = self.event_callbacks;
480
481 Ok(runtime)
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 #[tokio::test]
490 async fn test_runtime_lifecycle() {
491 let mut runtime = KernelRuntime::builder()
492 .development()
493 .with_drain_timeout(Duration::from_millis(100))
494 .build()
495 .unwrap();
496
497 assert_eq!(runtime.state().await, LifecycleState::Created);
498
499 let handle = runtime.start().await.unwrap();
500 assert_eq!(handle.state().await, LifecycleState::Running);
501
502 runtime.shutdown().await.unwrap();
503 assert_eq!(runtime.state().await, LifecycleState::Stopped);
504 }
505
506 #[tokio::test]
507 async fn test_runtime_stats() {
508 let runtime = KernelRuntime::new(RuntimeConfig::testing());
509 let stats = runtime.stats();
510
511 assert_eq!(stats.kernels_registered, 0);
512 assert_eq!(stats.messages_processed, 0);
513 }
514
515 #[tokio::test]
516 async fn test_shutdown_signal() {
517 let runtime = KernelRuntime::new(RuntimeConfig::testing());
518 let mut signal = runtime.shutdown_signal();
519
520 assert!(!*signal.borrow());
521
522 runtime.shutdown_tx.send(true).unwrap();
523 signal.changed().await.unwrap();
524 assert!(*signal.borrow());
525 }
526}