ringkernel_core/
runtime_context.rs

1//! Unified runtime context for RingKernel enterprise features.
2//!
3//! This module provides a comprehensive runtime context that instantiates and manages
4//! all enterprise features (observability, health, multi-GPU, migration) based on
5//! the unified configuration.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use ringkernel_core::runtime_context::RuntimeBuilder;
11//! use ringkernel_core::config::RingKernelConfig;
12//!
13//! // Create runtime with default configuration
14//! let runtime = RuntimeBuilder::new()
15//!     .with_config(RingKernelConfig::production())
16//!     .build()?;
17//!
18//! // Access enterprise features
19//! let health = runtime.health_checker();
20//! let metrics = runtime.prometheus_exporter();
21//! let coordinator = runtime.multi_gpu_coordinator();
22//!
23//! // Graceful shutdown
24//! runtime.shutdown().await?;
25//! ```
26
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use parking_lot::RwLock;
32
33use crate::checkpoint::{CheckpointStorage, FileStorage, MemoryStorage};
34#[cfg(feature = "cloud-storage")]
35use crate::cloud_storage::{S3Config, S3Storage};
36use crate::config::{CheckpointStorageType, RingKernelConfig};
37use crate::error::{Result, RingKernelError};
38use crate::health::{
39    CircuitBreaker, CircuitState, DegradationManager, HealthChecker, HealthStatus, KernelWatchdog,
40};
41use crate::multi_gpu::{KernelMigrator, MultiGpuBuilder, MultiGpuCoordinator};
42use crate::observability::{ObservabilityContext, PrometheusExporter};
43
44// ============================================================================
45// Lifecycle Management
46// ============================================================================
47
48/// State of the runtime lifecycle.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum LifecycleState {
51    /// Runtime is initializing.
52    Initializing,
53    /// Runtime is running and accepting work.
54    Running,
55    /// Runtime is draining (not accepting new work, finishing existing).
56    Draining,
57    /// Runtime is shutting down.
58    ShuttingDown,
59    /// Runtime has stopped.
60    Stopped,
61}
62
63impl LifecycleState {
64    /// Check if the runtime is accepting new work.
65    pub fn is_accepting_work(&self) -> bool {
66        matches!(self, LifecycleState::Running)
67    }
68
69    /// Check if the runtime is active (not stopped).
70    pub fn is_active(&self) -> bool {
71        !matches!(self, LifecycleState::Stopped)
72    }
73}
74
75/// Background task tracking.
76#[derive(Debug, Default)]
77struct BackgroundTasks {
78    /// Number of active health check loops.
79    health_check_loops: AtomicU64,
80    /// Number of active watchdog loops.
81    watchdog_loops: AtomicU64,
82    /// Number of active metrics flush loops.
83    metrics_flush_loops: AtomicU64,
84    /// Last health check time.
85    last_health_check: RwLock<Option<Instant>>,
86    /// Last watchdog scan time.
87    last_watchdog_scan: RwLock<Option<Instant>>,
88    /// Last metrics flush time.
89    last_metrics_flush: RwLock<Option<Instant>>,
90}
91
92impl BackgroundTasks {
93    fn new() -> Self {
94        Self::default()
95    }
96
97    fn record_health_check(&self) {
98        *self.last_health_check.write() = Some(Instant::now());
99    }
100
101    fn record_watchdog_scan(&self) {
102        *self.last_watchdog_scan.write() = Some(Instant::now());
103    }
104
105    fn record_metrics_flush(&self) {
106        *self.last_metrics_flush.write() = Some(Instant::now());
107    }
108
109    fn health_check_age(&self) -> Option<Duration> {
110        self.last_health_check.read().map(|t| t.elapsed())
111    }
112
113    fn watchdog_scan_age(&self) -> Option<Duration> {
114        self.last_watchdog_scan.read().map(|t| t.elapsed())
115    }
116
117    fn metrics_flush_age(&self) -> Option<Duration> {
118        self.last_metrics_flush.read().map(|t| t.elapsed())
119    }
120}
121
122// ============================================================================
123// Async Background Monitoring
124// ============================================================================
125
126use tokio::sync::watch;
127use tokio::task::JoinHandle;
128
129/// Configuration for background monitoring loops.
130#[derive(Debug, Clone)]
131pub struct MonitoringConfig {
132    /// Interval for health checks.
133    pub health_check_interval: Duration,
134    /// Interval for watchdog scans.
135    pub watchdog_interval: Duration,
136    /// Interval for metrics flush.
137    pub metrics_flush_interval: Duration,
138    /// Whether to enable health check loop.
139    pub enable_health_checks: bool,
140    /// Whether to enable watchdog loop.
141    pub enable_watchdog: bool,
142    /// Whether to enable metrics flush loop.
143    pub enable_metrics_flush: bool,
144}
145
146impl Default for MonitoringConfig {
147    fn default() -> Self {
148        Self {
149            health_check_interval: Duration::from_secs(10),
150            watchdog_interval: Duration::from_secs(5),
151            metrics_flush_interval: Duration::from_secs(60),
152            enable_health_checks: true,
153            enable_watchdog: true,
154            enable_metrics_flush: true,
155        }
156    }
157}
158
159impl MonitoringConfig {
160    /// Create a new monitoring config.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Set health check interval.
166    pub fn health_check_interval(mut self, interval: Duration) -> Self {
167        self.health_check_interval = interval;
168        self
169    }
170
171    /// Set watchdog interval.
172    pub fn watchdog_interval(mut self, interval: Duration) -> Self {
173        self.watchdog_interval = interval;
174        self
175    }
176
177    /// Set metrics flush interval.
178    pub fn metrics_flush_interval(mut self, interval: Duration) -> Self {
179        self.metrics_flush_interval = interval;
180        self
181    }
182
183    /// Enable or disable health checks.
184    pub fn enable_health_checks(mut self, enable: bool) -> Self {
185        self.enable_health_checks = enable;
186        self
187    }
188
189    /// Enable or disable watchdog.
190    pub fn enable_watchdog(mut self, enable: bool) -> Self {
191        self.enable_watchdog = enable;
192        self
193    }
194
195    /// Enable or disable metrics flush.
196    pub fn enable_metrics_flush(mut self, enable: bool) -> Self {
197        self.enable_metrics_flush = enable;
198        self
199    }
200}
201
202/// Handles for background monitoring tasks.
203pub struct MonitoringHandles {
204    /// Handle to the health check loop task.
205    pub health_check_handle: Option<JoinHandle<()>>,
206    /// Handle to the watchdog loop task.
207    pub watchdog_handle: Option<JoinHandle<()>>,
208    /// Handle to the metrics flush loop task.
209    pub metrics_flush_handle: Option<JoinHandle<()>>,
210    /// Shutdown signal sender.
211    shutdown_tx: watch::Sender<bool>,
212}
213
214impl MonitoringHandles {
215    /// Create new monitoring handles.
216    fn new() -> (Self, watch::Receiver<bool>) {
217        let (shutdown_tx, shutdown_rx) = watch::channel(false);
218        (
219            Self {
220                health_check_handle: None,
221                watchdog_handle: None,
222                metrics_flush_handle: None,
223                shutdown_tx,
224            },
225            shutdown_rx,
226        )
227    }
228
229    /// Signal all monitoring tasks to stop.
230    pub fn signal_shutdown(&self) {
231        let _ = self.shutdown_tx.send(true);
232    }
233
234    /// Wait for all monitoring tasks to complete.
235    pub async fn wait_for_shutdown(self) {
236        if let Some(handle) = self.health_check_handle {
237            let _ = handle.await;
238        }
239        if let Some(handle) = self.watchdog_handle {
240            let _ = handle.await;
241        }
242        if let Some(handle) = self.metrics_flush_handle {
243            let _ = handle.await;
244        }
245    }
246
247    /// Check if any monitoring tasks are running.
248    pub fn is_running(&self) -> bool {
249        self.health_check_handle
250            .as_ref()
251            .map(|h| !h.is_finished())
252            .unwrap_or(false)
253            || self
254                .watchdog_handle
255                .as_ref()
256                .map(|h| !h.is_finished())
257                .unwrap_or(false)
258            || self
259                .metrics_flush_handle
260                .as_ref()
261                .map(|h| !h.is_finished())
262                .unwrap_or(false)
263    }
264}
265
266// ============================================================================
267// Runtime Context
268// ============================================================================
269
270/// Unified runtime context managing all enterprise features.
271///
272/// This is the main entry point for using RingKernel's enterprise features.
273/// It instantiates and manages:
274/// - Health checking and circuit breakers
275/// - Prometheus metrics exporter
276/// - Multi-GPU coordination
277/// - Kernel migration infrastructure
278/// - Background monitoring tasks
279///
280/// ## Lifecycle
281///
282/// The runtime goes through these states:
283/// - `Initializing` → `Running` → `Draining` → `ShuttingDown` → `Stopped`
284///
285/// Use `start_monitoring()` to begin background health checks and watchdog scans.
286/// Use `shutdown()` for graceful termination.
287pub struct RingKernelContext {
288    /// Configuration used to create this context.
289    config: RingKernelConfig,
290    /// Health checker instance.
291    health_checker: Arc<HealthChecker>,
292    /// Kernel watchdog.
293    watchdog: Arc<KernelWatchdog>,
294    /// Circuit breaker for kernel operations.
295    circuit_breaker: Arc<CircuitBreaker>,
296    /// Degradation manager.
297    degradation_manager: Arc<DegradationManager>,
298    /// Prometheus exporter.
299    prometheus_exporter: Arc<PrometheusExporter>,
300    /// Observability context.
301    observability: Arc<ObservabilityContext>,
302    /// Multi-GPU coordinator.
303    multi_gpu_coordinator: Arc<MultiGpuCoordinator>,
304    /// Kernel migrator.
305    migrator: Arc<KernelMigrator>,
306    /// Checkpoint storage.
307    checkpoint_storage: Arc<dyn CheckpointStorage>,
308    /// Runtime statistics.
309    stats: RuntimeStats,
310    /// Startup time.
311    started_at: Instant,
312    /// Running state (deprecated, use lifecycle_state).
313    running: AtomicBool,
314    /// Current lifecycle state.
315    lifecycle_state: RwLock<LifecycleState>,
316    /// Background task tracking.
317    background_tasks: BackgroundTasks,
318    /// Shutdown requested flag.
319    shutdown_requested: AtomicBool,
320}
321
322impl RingKernelContext {
323    /// Get the configuration.
324    pub fn config(&self) -> &RingKernelConfig {
325        &self.config
326    }
327
328    /// Get the health checker.
329    pub fn health_checker(&self) -> Arc<HealthChecker> {
330        Arc::clone(&self.health_checker)
331    }
332
333    /// Get the kernel watchdog.
334    pub fn watchdog(&self) -> Arc<KernelWatchdog> {
335        Arc::clone(&self.watchdog)
336    }
337
338    /// Get the circuit breaker.
339    pub fn circuit_breaker(&self) -> Arc<CircuitBreaker> {
340        Arc::clone(&self.circuit_breaker)
341    }
342
343    /// Get the degradation manager.
344    pub fn degradation_manager(&self) -> Arc<DegradationManager> {
345        Arc::clone(&self.degradation_manager)
346    }
347
348    /// Get the Prometheus exporter.
349    pub fn prometheus_exporter(&self) -> Arc<PrometheusExporter> {
350        Arc::clone(&self.prometheus_exporter)
351    }
352
353    /// Get the observability context.
354    pub fn observability(&self) -> Arc<ObservabilityContext> {
355        Arc::clone(&self.observability)
356    }
357
358    /// Get the multi-GPU coordinator.
359    pub fn multi_gpu_coordinator(&self) -> Arc<MultiGpuCoordinator> {
360        Arc::clone(&self.multi_gpu_coordinator)
361    }
362
363    /// Get the kernel migrator.
364    pub fn migrator(&self) -> Arc<KernelMigrator> {
365        Arc::clone(&self.migrator)
366    }
367
368    /// Get the checkpoint storage.
369    pub fn checkpoint_storage(&self) -> Arc<dyn CheckpointStorage> {
370        Arc::clone(&self.checkpoint_storage)
371    }
372
373    /// Check if the runtime is running.
374    pub fn is_running(&self) -> bool {
375        self.running.load(Ordering::SeqCst)
376    }
377
378    /// Get runtime uptime.
379    pub fn uptime(&self) -> std::time::Duration {
380        self.started_at.elapsed()
381    }
382
383    /// Get runtime statistics.
384    pub fn stats(&self) -> RuntimeStatsSnapshot {
385        RuntimeStatsSnapshot {
386            uptime: self.uptime(),
387            kernels_launched: self.stats.kernels_launched.load(Ordering::Relaxed),
388            messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
389            migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
390            checkpoints_created: self.stats.checkpoints_created.load(Ordering::Relaxed),
391            health_checks_run: self.stats.health_checks_run.load(Ordering::Relaxed),
392            circuit_breaker_trips: self.stats.circuit_breaker_trips.load(Ordering::Relaxed),
393        }
394    }
395
396    /// Record a kernel launch.
397    pub fn record_kernel_launch(&self) {
398        self.stats.kernels_launched.fetch_add(1, Ordering::Relaxed);
399    }
400
401    /// Record messages processed.
402    pub fn record_messages(&self, count: u64) {
403        self.stats
404            .messages_processed
405            .fetch_add(count, Ordering::Relaxed);
406    }
407
408    /// Record a migration completion.
409    pub fn record_migration(&self) {
410        self.stats
411            .migrations_completed
412            .fetch_add(1, Ordering::Relaxed);
413    }
414
415    /// Record a checkpoint creation.
416    pub fn record_checkpoint(&self) {
417        self.stats
418            .checkpoints_created
419            .fetch_add(1, Ordering::Relaxed);
420    }
421
422    /// Record a health check run.
423    pub fn record_health_check(&self) {
424        self.stats.health_checks_run.fetch_add(1, Ordering::Relaxed);
425    }
426
427    /// Record a circuit breaker trip.
428    pub fn record_circuit_trip(&self) {
429        self.stats
430            .circuit_breaker_trips
431            .fetch_add(1, Ordering::Relaxed);
432    }
433
434    // ========================================================================
435    // Lifecycle Management
436    // ========================================================================
437
438    /// Get the current lifecycle state.
439    pub fn lifecycle_state(&self) -> LifecycleState {
440        *self.lifecycle_state.read()
441    }
442
443    /// Check if shutdown has been requested.
444    pub fn is_shutdown_requested(&self) -> bool {
445        self.shutdown_requested.load(Ordering::SeqCst)
446    }
447
448    /// Check if the runtime is accepting new work.
449    pub fn is_accepting_work(&self) -> bool {
450        self.lifecycle_state().is_accepting_work()
451    }
452
453    /// Transition to running state.
454    ///
455    /// Call this after initialization is complete to start accepting work.
456    pub fn start(&self) -> Result<()> {
457        let mut state = self.lifecycle_state.write();
458        if *state != LifecycleState::Initializing {
459            return Err(RingKernelError::InvalidState {
460                expected: "Initializing".to_string(),
461                actual: format!("{:?}", *state),
462            });
463        }
464        *state = LifecycleState::Running;
465        self.running.store(true, Ordering::SeqCst);
466        Ok(())
467    }
468
469    /// Run a single health check cycle.
470    ///
471    /// This performs one round of health checks and updates the circuit breaker
472    /// and degradation manager based on the results.
473    ///
474    /// Note: This is a synchronous method that uses cached circuit breaker state.
475    /// For full async health checks, use the HealthChecker directly.
476    pub fn run_health_check_cycle(&self) -> HealthCycleResult {
477        self.background_tasks.record_health_check();
478        self.record_health_check();
479
480        // Get circuit breaker state as a health proxy
481        let circuit_state = self.circuit_breaker.state();
482
483        // Infer health status from circuit breaker state
484        let status = match circuit_state {
485            CircuitState::Closed => HealthStatus::Healthy,
486            CircuitState::HalfOpen => HealthStatus::Degraded,
487            CircuitState::Open => HealthStatus::Unhealthy,
488        };
489
490        // Update degradation level based on circuit breaker state
491        let current_level = self.degradation_manager.level();
492        let new_level = match circuit_state {
493            CircuitState::Open => {
494                // Increase degradation
495                current_level.next_worse()
496            }
497            CircuitState::Closed => {
498                // Decrease degradation
499                current_level.next_better()
500            }
501            CircuitState::HalfOpen => {
502                // Keep current level
503                current_level
504            }
505        };
506
507        if new_level != current_level {
508            self.degradation_manager.set_level(new_level);
509        }
510
511        HealthCycleResult {
512            status,
513            circuit_state,
514            degradation_level: self.degradation_manager.level(),
515            timestamp: Instant::now(),
516        }
517    }
518
519    /// Run a single watchdog scan cycle.
520    ///
521    /// This checks for stale kernels and takes appropriate action.
522    pub fn run_watchdog_cycle(&self) -> WatchdogResult {
523        self.background_tasks.record_watchdog_scan();
524
525        let kernel_health = self.watchdog.check_all();
526        let stale_count = kernel_health
527            .iter()
528            .filter(|h| h.status == HealthStatus::Unhealthy)
529            .count();
530
531        WatchdogResult {
532            stale_kernels: stale_count,
533            timestamp: Instant::now(),
534        }
535    }
536
537    /// Flush metrics to Prometheus.
538    ///
539    /// This renders current metrics to the Prometheus exporter format.
540    pub fn flush_metrics(&self) -> String {
541        self.background_tasks.record_metrics_flush();
542        self.prometheus_exporter.render()
543    }
544
545    /// Get background task status.
546    pub fn background_task_status(&self) -> BackgroundTaskStatus {
547        BackgroundTaskStatus {
548            health_check_age: self.background_tasks.health_check_age(),
549            watchdog_scan_age: self.background_tasks.watchdog_scan_age(),
550            metrics_flush_age: self.background_tasks.metrics_flush_age(),
551            active_health_loops: self
552                .background_tasks
553                .health_check_loops
554                .load(Ordering::Relaxed),
555            active_watchdog_loops: self.background_tasks.watchdog_loops.load(Ordering::Relaxed),
556            active_metrics_loops: self
557                .background_tasks
558                .metrics_flush_loops
559                .load(Ordering::Relaxed),
560        }
561    }
562
563    // ========================================================================
564    // Async Background Monitoring
565    // ========================================================================
566
567    /// Start background monitoring loops.
568    ///
569    /// This spawns async tasks for:
570    /// - Health check loop (runs at configured interval)
571    /// - Watchdog loop (checks for stale kernels)
572    /// - Metrics flush loop (exports Prometheus metrics)
573    ///
574    /// Returns handles that can be used to stop the monitoring tasks.
575    ///
576    /// # Example
577    ///
578    /// ```ignore
579    /// let runtime = RuntimeBuilder::new().production().build()?;
580    /// runtime.start()?;
581    ///
582    /// let config = MonitoringConfig::new()
583    ///     .health_check_interval(Duration::from_secs(5))
584    ///     .watchdog_interval(Duration::from_secs(2));
585    ///
586    /// let handles = runtime.start_monitoring(config).await;
587    ///
588    /// // ... runtime runs ...
589    ///
590    /// // Graceful shutdown
591    /// handles.signal_shutdown();
592    /// handles.wait_for_shutdown().await;
593    /// ```
594    pub fn start_monitoring(self: &Arc<Self>, config: MonitoringConfig) -> MonitoringHandles {
595        let (mut handles, shutdown_rx) = MonitoringHandles::new();
596
597        // Spawn health check loop
598        if config.enable_health_checks {
599            let runtime = Arc::clone(self);
600            let interval = config.health_check_interval;
601            let mut shutdown = shutdown_rx.clone();
602
603            handles.health_check_handle = Some(tokio::spawn(async move {
604                runtime
605                    .background_tasks
606                    .health_check_loops
607                    .fetch_add(1, Ordering::Relaxed);
608
609                let mut interval_timer = tokio::time::interval(interval);
610                interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
611
612                loop {
613                    tokio::select! {
614                        _ = interval_timer.tick() => {
615                            if runtime.is_shutdown_requested() {
616                                break;
617                            }
618                            let _result = runtime.run_health_check_cycle();
619                            tracing::debug!("Health check cycle completed");
620                        }
621                        _ = shutdown.changed() => {
622                            tracing::info!("Health check loop shutting down");
623                            break;
624                        }
625                    }
626                }
627
628                runtime
629                    .background_tasks
630                    .health_check_loops
631                    .fetch_sub(1, Ordering::Relaxed);
632            }));
633        }
634
635        // Spawn watchdog loop
636        if config.enable_watchdog {
637            let runtime = Arc::clone(self);
638            let interval = config.watchdog_interval;
639            let mut shutdown = shutdown_rx.clone();
640
641            handles.watchdog_handle = Some(tokio::spawn(async move {
642                runtime
643                    .background_tasks
644                    .watchdog_loops
645                    .fetch_add(1, Ordering::Relaxed);
646
647                let mut interval_timer = tokio::time::interval(interval);
648                interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
649
650                loop {
651                    tokio::select! {
652                        _ = interval_timer.tick() => {
653                            if runtime.is_shutdown_requested() {
654                                break;
655                            }
656                            let result = runtime.run_watchdog_cycle();
657                            if result.stale_kernels > 0 {
658                                tracing::warn!("Watchdog detected {} stale kernels", result.stale_kernels);
659                            }
660                        }
661                        _ = shutdown.changed() => {
662                            tracing::info!("Watchdog loop shutting down");
663                            break;
664                        }
665                    }
666                }
667
668                runtime
669                    .background_tasks
670                    .watchdog_loops
671                    .fetch_sub(1, Ordering::Relaxed);
672            }));
673        }
674
675        // Spawn metrics flush loop
676        if config.enable_metrics_flush {
677            let runtime = Arc::clone(self);
678            let interval = config.metrics_flush_interval;
679            let mut shutdown = shutdown_rx;
680
681            handles.metrics_flush_handle = Some(tokio::spawn(async move {
682                runtime
683                    .background_tasks
684                    .metrics_flush_loops
685                    .fetch_add(1, Ordering::Relaxed);
686
687                let mut interval_timer = tokio::time::interval(interval);
688                interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
689
690                loop {
691                    tokio::select! {
692                        _ = interval_timer.tick() => {
693                            if runtime.is_shutdown_requested() {
694                                break;
695                            }
696                            let _metrics = runtime.flush_metrics();
697                            tracing::debug!("Metrics flush completed");
698                        }
699                        _ = shutdown.changed() => {
700                            tracing::info!("Metrics flush loop shutting down");
701                            break;
702                        }
703                    }
704                }
705
706                runtime
707                    .background_tasks
708                    .metrics_flush_loops
709                    .fetch_sub(1, Ordering::Relaxed);
710            }));
711        }
712
713        handles
714    }
715
716    /// Start monitoring with default configuration.
717    pub fn start_monitoring_default(self: &Arc<Self>) -> MonitoringHandles {
718        self.start_monitoring(MonitoringConfig::default())
719    }
720
721    /// Request graceful shutdown.
722    ///
723    /// This signals background tasks to stop and transitions to draining state.
724    /// Returns immediately; use `wait_for_shutdown()` to block until complete.
725    pub fn request_shutdown(&self) -> Result<()> {
726        // Set shutdown flag
727        self.shutdown_requested.store(true, Ordering::SeqCst);
728
729        // Transition to draining state
730        let mut state = self.lifecycle_state.write();
731        match *state {
732            LifecycleState::Running => {
733                *state = LifecycleState::Draining;
734                Ok(())
735            }
736            LifecycleState::Draining | LifecycleState::ShuttingDown => {
737                // Already shutting down
738                Ok(())
739            }
740            LifecycleState::Stopped => Err(RingKernelError::InvalidState {
741                expected: "Running or Draining".to_string(),
742                actual: "Stopped".to_string(),
743            }),
744            LifecycleState::Initializing => {
745                // Can shutdown from initializing too
746                *state = LifecycleState::ShuttingDown;
747                Ok(())
748            }
749        }
750    }
751
752    /// Complete the shutdown process.
753    ///
754    /// This performs final cleanup and transitions to stopped state.
755    pub fn complete_shutdown(&self) -> Result<ShutdownReport> {
756        let start = Instant::now();
757
758        // Transition to shutting down
759        {
760            let mut state = self.lifecycle_state.write();
761            if *state == LifecycleState::Stopped {
762                return Err(RingKernelError::InvalidState {
763                    expected: "not Stopped".to_string(),
764                    actual: "Stopped".to_string(),
765                });
766            }
767            *state = LifecycleState::ShuttingDown;
768        }
769
770        // Perform cleanup
771        let final_stats = self.stats();
772        let final_metrics = self.flush_metrics();
773
774        // Transition to stopped
775        {
776            let mut state = self.lifecycle_state.write();
777            *state = LifecycleState::Stopped;
778            self.running.store(false, Ordering::SeqCst);
779        }
780
781        Ok(ShutdownReport {
782            duration: start.elapsed(),
783            total_uptime: self.uptime(),
784            final_stats,
785            final_metrics,
786        })
787    }
788
789    /// Shutdown the runtime gracefully (legacy method).
790    ///
791    /// This is equivalent to `request_shutdown()` followed by `complete_shutdown()`.
792    pub fn shutdown(&self) -> Result<()> {
793        self.request_shutdown()?;
794        self.complete_shutdown()?;
795        Ok(())
796    }
797
798    /// Get application info.
799    pub fn app_info(&self) -> AppInfo {
800        AppInfo {
801            name: self.config.general.app_name.clone(),
802            version: self.config.general.app_version.clone(),
803            environment: self.config.general.environment.as_str().to_string(),
804        }
805    }
806}
807
808/// Result of a health check cycle run by the runtime context.
809#[derive(Debug, Clone)]
810pub struct HealthCycleResult {
811    /// Overall health status.
812    pub status: HealthStatus,
813    /// Current circuit breaker state.
814    pub circuit_state: CircuitState,
815    /// Current degradation level.
816    pub degradation_level: crate::health::DegradationLevel,
817    /// Timestamp of this check.
818    pub timestamp: Instant,
819}
820
821/// Result of a watchdog scan cycle.
822#[derive(Debug, Clone)]
823pub struct WatchdogResult {
824    /// Number of stale kernels detected.
825    pub stale_kernels: usize,
826    /// Timestamp of this scan.
827    pub timestamp: Instant,
828}
829
830/// Status of background tasks.
831#[derive(Debug, Clone)]
832pub struct BackgroundTaskStatus {
833    /// Time since last health check.
834    pub health_check_age: Option<Duration>,
835    /// Time since last watchdog scan.
836    pub watchdog_scan_age: Option<Duration>,
837    /// Time since last metrics flush.
838    pub metrics_flush_age: Option<Duration>,
839    /// Number of active health check loops.
840    pub active_health_loops: u64,
841    /// Number of active watchdog loops.
842    pub active_watchdog_loops: u64,
843    /// Number of active metrics flush loops.
844    pub active_metrics_loops: u64,
845}
846
847/// Report generated after shutdown completes.
848#[derive(Debug, Clone)]
849pub struct ShutdownReport {
850    /// Time taken for shutdown.
851    pub duration: Duration,
852    /// Total runtime uptime.
853    pub total_uptime: Duration,
854    /// Final runtime statistics.
855    pub final_stats: RuntimeStatsSnapshot,
856    /// Final metrics dump.
857    pub final_metrics: String,
858}
859
860/// Runtime statistics (atomic counters).
861#[derive(Debug, Default)]
862struct RuntimeStats {
863    kernels_launched: AtomicU64,
864    messages_processed: AtomicU64,
865    migrations_completed: AtomicU64,
866    checkpoints_created: AtomicU64,
867    health_checks_run: AtomicU64,
868    circuit_breaker_trips: AtomicU64,
869}
870
871/// Snapshot of runtime statistics.
872#[derive(Debug, Clone)]
873pub struct RuntimeStatsSnapshot {
874    /// Runtime uptime.
875    pub uptime: std::time::Duration,
876    /// Total kernels launched.
877    pub kernels_launched: u64,
878    /// Total messages processed.
879    pub messages_processed: u64,
880    /// Total migrations completed.
881    pub migrations_completed: u64,
882    /// Total checkpoints created.
883    pub checkpoints_created: u64,
884    /// Total health checks run.
885    pub health_checks_run: u64,
886    /// Total circuit breaker trips.
887    pub circuit_breaker_trips: u64,
888}
889
890/// Application information.
891#[derive(Debug, Clone)]
892pub struct AppInfo {
893    /// Application name.
894    pub name: String,
895    /// Application version.
896    pub version: String,
897    /// Environment.
898    pub environment: String,
899}
900
901// ============================================================================
902// Runtime Builder
903// ============================================================================
904
905/// Builder for RingKernelContext.
906pub struct RuntimeBuilder {
907    config: Option<RingKernelConfig>,
908    health_checker: Option<Arc<HealthChecker>>,
909    watchdog: Option<Arc<KernelWatchdog>>,
910    multi_gpu_coordinator: Option<Arc<MultiGpuCoordinator>>,
911    checkpoint_storage: Option<Arc<dyn CheckpointStorage>>,
912}
913
914impl RuntimeBuilder {
915    /// Create a new runtime builder.
916    pub fn new() -> Self {
917        Self {
918            config: None,
919            health_checker: None,
920            watchdog: None,
921            multi_gpu_coordinator: None,
922            checkpoint_storage: None,
923        }
924    }
925
926    /// Set the configuration.
927    pub fn with_config(mut self, config: RingKernelConfig) -> Self {
928        self.config = Some(config);
929        self
930    }
931
932    /// Use development configuration preset.
933    pub fn development(mut self) -> Self {
934        self.config = Some(RingKernelConfig::development());
935        self
936    }
937
938    /// Use production configuration preset.
939    pub fn production(mut self) -> Self {
940        self.config = Some(RingKernelConfig::production());
941        self
942    }
943
944    /// Use high-performance configuration preset.
945    pub fn high_performance(mut self) -> Self {
946        self.config = Some(RingKernelConfig::high_performance());
947        self
948    }
949
950    /// Override health checker (for testing).
951    pub fn with_health_checker(mut self, checker: Arc<HealthChecker>) -> Self {
952        self.health_checker = Some(checker);
953        self
954    }
955
956    /// Override watchdog (for testing).
957    pub fn with_watchdog(mut self, watchdog: Arc<KernelWatchdog>) -> Self {
958        self.watchdog = Some(watchdog);
959        self
960    }
961
962    /// Override multi-GPU coordinator (for testing).
963    pub fn with_multi_gpu_coordinator(mut self, coordinator: Arc<MultiGpuCoordinator>) -> Self {
964        self.multi_gpu_coordinator = Some(coordinator);
965        self
966    }
967
968    /// Override checkpoint storage (for testing).
969    pub fn with_checkpoint_storage(mut self, storage: Arc<dyn CheckpointStorage>) -> Self {
970        self.checkpoint_storage = Some(storage);
971        self
972    }
973
974    /// Build the runtime context.
975    pub fn build(self) -> Result<Arc<RingKernelContext>> {
976        let config = self.config.unwrap_or_default();
977        config.validate()?;
978
979        // Create health checker
980        let health_checker = self.health_checker.unwrap_or_default();
981
982        // Create watchdog
983        let watchdog = self.watchdog.unwrap_or_default();
984
985        // Create circuit breaker
986        let circuit_breaker = CircuitBreaker::with_config(config.health.circuit_breaker.clone());
987
988        // Create degradation manager
989        let degradation_manager =
990            DegradationManager::with_policy(config.health.load_shedding.clone());
991
992        // Create Prometheus exporter
993        let prometheus_exporter = PrometheusExporter::new();
994
995        // Create observability context
996        let observability = ObservabilityContext::new();
997
998        // Create multi-GPU coordinator
999        let multi_gpu_coordinator = self.multi_gpu_coordinator.unwrap_or_else(|| {
1000            MultiGpuBuilder::new()
1001                .load_balancing(config.multi_gpu.load_balancing)
1002                .auto_select_device(config.multi_gpu.auto_select_device)
1003                .max_kernels_per_device(config.multi_gpu.max_kernels_per_device)
1004                .enable_p2p(config.multi_gpu.p2p_enabled)
1005                .preferred_devices(config.multi_gpu.preferred_devices.clone())
1006                .build()
1007        });
1008
1009        // Create checkpoint storage
1010        let checkpoint_storage: Arc<dyn CheckpointStorage> =
1011            self.checkpoint_storage.unwrap_or_else(|| {
1012                match config.migration.storage {
1013                    CheckpointStorageType::Memory => Arc::new(MemoryStorage::new()),
1014                    CheckpointStorageType::File => {
1015                        Arc::new(FileStorage::new(&config.migration.checkpoint_dir))
1016                    }
1017                    CheckpointStorageType::Cloud => {
1018                        #[cfg(feature = "cloud-storage")]
1019                        {
1020                            // Create S3 storage from cloud configuration
1021                            let cloud_cfg = &config.migration.cloud_config;
1022                            let s3_config = S3Config::new(&cloud_cfg.s3_bucket)
1023                                .with_prefix(&cloud_cfg.s3_prefix);
1024                            let s3_config = if let Some(ref region) = cloud_cfg.s3_region {
1025                                s3_config.with_region(region)
1026                            } else {
1027                                s3_config
1028                            };
1029                            let s3_config = if let Some(ref endpoint) = cloud_cfg.s3_endpoint {
1030                                s3_config.with_endpoint(endpoint)
1031                            } else {
1032                                s3_config
1033                            };
1034                            let s3_config = if cloud_cfg.s3_encryption {
1035                                s3_config.with_encryption()
1036                            } else {
1037                                s3_config
1038                            };
1039
1040                            // S3Storage::new is async, we use block_in_place for the sync context
1041                            match tokio::task::block_in_place(|| {
1042                                tokio::runtime::Handle::current()
1043                                    .block_on(S3Storage::new(s3_config))
1044                            }) {
1045                                Ok(storage) => Arc::new(storage) as Arc<dyn CheckpointStorage>,
1046                                Err(e) => {
1047                                    tracing::warn!(
1048                                        "Failed to create S3 storage: {}, falling back to memory",
1049                                        e
1050                                    );
1051                                    Arc::new(MemoryStorage::new())
1052                                }
1053                            }
1054                        }
1055                        #[cfg(not(feature = "cloud-storage"))]
1056                        {
1057                            tracing::warn!(
1058                                "Cloud storage requested but cloud-storage feature not enabled, \
1059                                 falling back to memory storage"
1060                            );
1061                            Arc::new(MemoryStorage::new())
1062                        }
1063                    }
1064                }
1065            });
1066
1067        // Create kernel migrator
1068        let migrator = Arc::new(KernelMigrator::with_storage(
1069            Arc::clone(&multi_gpu_coordinator),
1070            Arc::clone(&checkpoint_storage),
1071        ));
1072
1073        Ok(Arc::new(RingKernelContext {
1074            config,
1075            health_checker,
1076            watchdog,
1077            circuit_breaker,
1078            degradation_manager,
1079            prometheus_exporter,
1080            observability,
1081            multi_gpu_coordinator,
1082            migrator,
1083            checkpoint_storage,
1084            stats: RuntimeStats::default(),
1085            started_at: Instant::now(),
1086            running: AtomicBool::new(false), // Start as not running
1087            lifecycle_state: RwLock::new(LifecycleState::Initializing),
1088            background_tasks: BackgroundTasks::new(),
1089            shutdown_requested: AtomicBool::new(false),
1090        }))
1091    }
1092}
1093
1094impl Default for RuntimeBuilder {
1095    fn default() -> Self {
1096        Self::new()
1097    }
1098}
1099
1100// ============================================================================
1101// Feature Guards
1102// ============================================================================
1103
1104/// Guard for executing operations with circuit breaker protection.
1105pub struct CircuitGuard<'a> {
1106    context: &'a RingKernelContext,
1107    operation_name: String,
1108}
1109
1110impl<'a> CircuitGuard<'a> {
1111    /// Create a new circuit guard.
1112    pub fn new(context: &'a RingKernelContext, operation_name: impl Into<String>) -> Self {
1113        Self {
1114            context,
1115            operation_name: operation_name.into(),
1116        }
1117    }
1118
1119    /// Execute an operation with circuit breaker protection.
1120    pub fn execute<T, F>(&self, f: F) -> Result<T>
1121    where
1122        F: FnOnce() -> Result<T>,
1123    {
1124        // Check if circuit is open
1125        if self.context.circuit_breaker.state() == CircuitState::Open {
1126            self.context.record_circuit_trip();
1127            return Err(RingKernelError::CircuitBreakerOpen {
1128                name: self.operation_name.clone(),
1129            });
1130        }
1131
1132        // Execute the operation
1133        match f() {
1134            Ok(result) => {
1135                self.context.circuit_breaker.record_success();
1136                Ok(result)
1137            }
1138            Err(e) => {
1139                self.context.circuit_breaker.record_failure();
1140                Err(e)
1141            }
1142        }
1143    }
1144}
1145
1146/// Guard for graceful degradation.
1147pub struct DegradationGuard<'a> {
1148    context: &'a RingKernelContext,
1149}
1150
1151impl<'a> DegradationGuard<'a> {
1152    /// Create a new degradation guard.
1153    pub fn new(context: &'a RingKernelContext) -> Self {
1154        Self { context }
1155    }
1156
1157    /// Check if an operation should be allowed at the current degradation level.
1158    pub fn allow_operation(&self, priority: OperationPriority) -> bool {
1159        let level = self.context.degradation_manager.level();
1160        match level {
1161            crate::health::DegradationLevel::Normal => true,
1162            crate::health::DegradationLevel::Light => true,
1163            crate::health::DegradationLevel::Moderate => {
1164                matches!(
1165                    priority,
1166                    OperationPriority::Normal
1167                        | OperationPriority::High
1168                        | OperationPriority::Critical
1169                )
1170            }
1171            crate::health::DegradationLevel::Severe => {
1172                matches!(
1173                    priority,
1174                    OperationPriority::High | OperationPriority::Critical
1175                )
1176            }
1177            crate::health::DegradationLevel::Critical => {
1178                matches!(priority, OperationPriority::Critical)
1179            }
1180        }
1181    }
1182
1183    /// Execute an operation if allowed by degradation level.
1184    pub fn execute_if_allowed<T, F>(&self, priority: OperationPriority, f: F) -> Result<T>
1185    where
1186        F: FnOnce() -> Result<T>,
1187    {
1188        if self.allow_operation(priority) {
1189            f()
1190        } else {
1191            Err(RingKernelError::LoadSheddingRejected {
1192                level: format!("{:?}", self.context.degradation_manager.level()),
1193            })
1194        }
1195    }
1196}
1197
1198/// Operation priority for load shedding decisions.
1199#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1200pub enum OperationPriority {
1201    /// Low priority - shed first.
1202    Low,
1203    /// Normal priority.
1204    Normal,
1205    /// High priority - shed last.
1206    High,
1207    /// Critical - never shed.
1208    Critical,
1209}
1210
1211// ============================================================================
1212// Metrics Integration
1213// ============================================================================
1214
1215impl RingKernelContext {
1216    /// Export Prometheus metrics.
1217    pub fn export_metrics(&self) -> String {
1218        self.prometheus_exporter.render()
1219    }
1220
1221    /// Create a metrics snapshot for the runtime context.
1222    pub fn metrics_snapshot(&self) -> ContextMetrics {
1223        let stats = self.stats();
1224        ContextMetrics {
1225            uptime_seconds: stats.uptime.as_secs_f64(),
1226            kernels_launched: stats.kernels_launched,
1227            messages_processed: stats.messages_processed,
1228            migrations_completed: stats.migrations_completed,
1229            checkpoints_created: stats.checkpoints_created,
1230            health_checks_run: stats.health_checks_run,
1231            circuit_breaker_trips: stats.circuit_breaker_trips,
1232            circuit_breaker_state: format!("{:?}", self.circuit_breaker.state()),
1233            degradation_level: format!("{:?}", self.degradation_manager.level()),
1234            multi_gpu_device_count: self.multi_gpu_coordinator.device_count(),
1235        }
1236    }
1237}
1238
1239/// Context metrics for monitoring the unified runtime.
1240#[derive(Debug, Clone)]
1241pub struct ContextMetrics {
1242    /// Uptime in seconds.
1243    pub uptime_seconds: f64,
1244    /// Total kernels launched.
1245    pub kernels_launched: u64,
1246    /// Total messages processed.
1247    pub messages_processed: u64,
1248    /// Total migrations completed.
1249    pub migrations_completed: u64,
1250    /// Total checkpoints created.
1251    pub checkpoints_created: u64,
1252    /// Total health checks run.
1253    pub health_checks_run: u64,
1254    /// Total circuit breaker trips.
1255    pub circuit_breaker_trips: u64,
1256    /// Current circuit breaker state.
1257    pub circuit_breaker_state: String,
1258    /// Current degradation level.
1259    pub degradation_level: String,
1260    /// Number of GPU devices.
1261    pub multi_gpu_device_count: usize,
1262}
1263
1264// ============================================================================
1265// Tests
1266// ============================================================================
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::*;
1271    use crate::config::ConfigBuilder;
1272    use std::time::Duration;
1273
1274    #[test]
1275    fn test_runtime_builder_default() {
1276        let runtime = RuntimeBuilder::new().build().unwrap();
1277        // Runtime starts in Initializing state
1278        assert!(!runtime.is_running());
1279        assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1280
1281        // Start the runtime
1282        runtime.start().unwrap();
1283        assert!(runtime.is_running());
1284        assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1285    }
1286
1287    #[test]
1288    fn test_runtime_builder_with_config() {
1289        let config = ConfigBuilder::new()
1290            .with_general(|g| g.app_name("test_app"))
1291            .build()
1292            .unwrap();
1293
1294        let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1295
1296        assert_eq!(runtime.config().general.app_name, "test_app");
1297    }
1298
1299    #[test]
1300    fn test_runtime_presets() {
1301        let dev = RuntimeBuilder::new().development().build().unwrap();
1302        assert_eq!(
1303            dev.config().general.environment,
1304            crate::config::Environment::Development
1305        );
1306
1307        let prod = RuntimeBuilder::new().production().build().unwrap();
1308        assert_eq!(
1309            prod.config().general.environment,
1310            crate::config::Environment::Production
1311        );
1312
1313        let perf = RuntimeBuilder::new().high_performance().build().unwrap();
1314        assert!(!perf.config().observability.tracing_enabled);
1315    }
1316
1317    #[test]
1318    fn test_runtime_stats() {
1319        let runtime = RuntimeBuilder::new().build().unwrap();
1320
1321        runtime.record_kernel_launch();
1322        runtime.record_kernel_launch();
1323        runtime.record_messages(100);
1324        runtime.record_migration();
1325        runtime.record_checkpoint();
1326        runtime.record_health_check();
1327
1328        let stats = runtime.stats();
1329        assert_eq!(stats.kernels_launched, 2);
1330        assert_eq!(stats.messages_processed, 100);
1331        assert_eq!(stats.migrations_completed, 1);
1332        assert_eq!(stats.checkpoints_created, 1);
1333        assert_eq!(stats.health_checks_run, 1);
1334    }
1335
1336    #[test]
1337    fn test_runtime_uptime() {
1338        let runtime = RuntimeBuilder::new().build().unwrap();
1339        std::thread::sleep(Duration::from_millis(10));
1340        assert!(runtime.uptime() >= Duration::from_millis(10));
1341    }
1342
1343    #[test]
1344    fn test_runtime_shutdown() {
1345        let runtime = RuntimeBuilder::new().build().unwrap();
1346        runtime.start().unwrap();
1347        assert!(runtime.is_running());
1348        assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1349
1350        runtime.shutdown().unwrap();
1351        assert!(!runtime.is_running());
1352        assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1353
1354        // Second shutdown should fail
1355        assert!(runtime.shutdown().is_err());
1356    }
1357
1358    #[test]
1359    fn test_runtime_app_info() {
1360        let config = ConfigBuilder::new()
1361            .with_general(|g| {
1362                g.app_name("my_app")
1363                    .app_version("1.2.3")
1364                    .environment(crate::config::Environment::Staging)
1365            })
1366            .build()
1367            .unwrap();
1368
1369        let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1370
1371        let info = runtime.app_info();
1372        assert_eq!(info.name, "my_app");
1373        assert_eq!(info.version, "1.2.3");
1374        assert_eq!(info.environment, "staging");
1375    }
1376
1377    #[test]
1378    fn test_circuit_guard() {
1379        let runtime = RuntimeBuilder::new().build().unwrap();
1380
1381        let guard = CircuitGuard::new(&runtime, "test_op");
1382
1383        // Success case
1384        let result: Result<i32> = guard.execute(|| Ok(42));
1385        assert_eq!(result.unwrap(), 42);
1386
1387        // Failure case
1388        let result: Result<i32> =
1389            guard.execute(|| Err(RingKernelError::Internal("test error".to_string())));
1390        assert!(result.is_err());
1391    }
1392
1393    #[test]
1394    fn test_degradation_guard() {
1395        let runtime = RuntimeBuilder::new().build().unwrap();
1396        let guard = DegradationGuard::new(&runtime);
1397
1398        // At normal level, all operations should be allowed
1399        assert!(guard.allow_operation(OperationPriority::Low));
1400        assert!(guard.allow_operation(OperationPriority::Normal));
1401        assert!(guard.allow_operation(OperationPriority::High));
1402        assert!(guard.allow_operation(OperationPriority::Critical));
1403    }
1404
1405    #[test]
1406    fn test_operation_priority_ordering() {
1407        assert!(OperationPriority::Low < OperationPriority::Normal);
1408        assert!(OperationPriority::Normal < OperationPriority::High);
1409        assert!(OperationPriority::High < OperationPriority::Critical);
1410    }
1411
1412    #[test]
1413    fn test_metrics_snapshot() {
1414        let runtime = RuntimeBuilder::new().build().unwrap();
1415
1416        runtime.record_kernel_launch();
1417        runtime.record_messages(50);
1418
1419        let metrics = runtime.metrics_snapshot();
1420        assert_eq!(metrics.kernels_launched, 1);
1421        assert_eq!(metrics.messages_processed, 50);
1422        assert!(metrics.uptime_seconds >= 0.0);
1423    }
1424
1425    #[test]
1426    fn test_custom_storage() {
1427        let storage = Arc::new(MemoryStorage::new());
1428        let runtime = RuntimeBuilder::new()
1429            .with_checkpoint_storage(storage.clone())
1430            .build()
1431            .unwrap();
1432
1433        // Verify we can access the storage
1434        let _migrator = runtime.migrator();
1435    }
1436
1437    #[test]
1438    fn test_export_metrics() {
1439        let runtime = RuntimeBuilder::new().build().unwrap();
1440        let metrics = runtime.export_metrics();
1441        // Prometheus format should be valid
1442        assert!(
1443            metrics.is_empty()
1444                || metrics.contains('#')
1445                || metrics.contains('\n')
1446                || !metrics.is_empty()
1447        );
1448    }
1449
1450    // ========================================================================
1451    // Lifecycle Management Tests
1452    // ========================================================================
1453
1454    #[test]
1455    fn test_lifecycle_state_transitions() {
1456        let runtime = RuntimeBuilder::new().build().unwrap();
1457
1458        // Initial state
1459        assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1460        assert!(!runtime.is_accepting_work());
1461
1462        // Start
1463        runtime.start().unwrap();
1464        assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1465        assert!(runtime.is_accepting_work());
1466
1467        // Request shutdown
1468        runtime.request_shutdown().unwrap();
1469        assert_eq!(runtime.lifecycle_state(), LifecycleState::Draining);
1470        assert!(!runtime.is_accepting_work());
1471
1472        // Complete shutdown
1473        let report = runtime.complete_shutdown().unwrap();
1474        assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1475        assert!(report.duration.as_nanos() > 0);
1476    }
1477
1478    #[test]
1479    fn test_lifecycle_state_helpers() {
1480        assert!(LifecycleState::Running.is_accepting_work());
1481        assert!(!LifecycleState::Initializing.is_accepting_work());
1482        assert!(!LifecycleState::Draining.is_accepting_work());
1483        assert!(!LifecycleState::ShuttingDown.is_accepting_work());
1484        assert!(!LifecycleState::Stopped.is_accepting_work());
1485
1486        assert!(LifecycleState::Initializing.is_active());
1487        assert!(LifecycleState::Running.is_active());
1488        assert!(LifecycleState::Draining.is_active());
1489        assert!(LifecycleState::ShuttingDown.is_active());
1490        assert!(!LifecycleState::Stopped.is_active());
1491    }
1492
1493    #[test]
1494    fn test_health_check_cycle() {
1495        let runtime = RuntimeBuilder::new().build().unwrap();
1496        runtime.start().unwrap();
1497
1498        let result = runtime.run_health_check_cycle();
1499        assert_eq!(result.status, crate::health::HealthStatus::Healthy);
1500        assert_eq!(result.circuit_state, CircuitState::Closed);
1501
1502        // Check that task status was updated
1503        let status = runtime.background_task_status();
1504        assert!(status.health_check_age.is_some());
1505    }
1506
1507    #[test]
1508    fn test_watchdog_cycle() {
1509        let runtime = RuntimeBuilder::new().build().unwrap();
1510        runtime.start().unwrap();
1511
1512        let result = runtime.run_watchdog_cycle();
1513        assert_eq!(result.stale_kernels, 0);
1514
1515        let status = runtime.background_task_status();
1516        assert!(status.watchdog_scan_age.is_some());
1517    }
1518
1519    #[test]
1520    fn test_metrics_flush() {
1521        let runtime = RuntimeBuilder::new().build().unwrap();
1522
1523        let metrics = runtime.flush_metrics();
1524        assert!(metrics.is_empty() || !metrics.is_empty()); // Just verify it doesn't crash
1525
1526        let status = runtime.background_task_status();
1527        assert!(status.metrics_flush_age.is_some());
1528    }
1529
1530    #[test]
1531    fn test_shutdown_report() {
1532        let runtime = RuntimeBuilder::new().build().unwrap();
1533        runtime.start().unwrap();
1534
1535        // Do some work
1536        runtime.record_kernel_launch();
1537        runtime.record_messages(100);
1538
1539        let report = runtime.complete_shutdown().unwrap();
1540
1541        assert_eq!(report.final_stats.kernels_launched, 1);
1542        assert_eq!(report.final_stats.messages_processed, 100);
1543        assert!(report.total_uptime.as_nanos() > 0);
1544    }
1545
1546    #[test]
1547    fn test_cannot_start_twice() {
1548        let runtime = RuntimeBuilder::new().build().unwrap();
1549        runtime.start().unwrap();
1550
1551        // Second start should fail
1552        assert!(runtime.start().is_err());
1553    }
1554
1555    #[test]
1556    fn test_shutdown_from_initializing() {
1557        let runtime = RuntimeBuilder::new().build().unwrap();
1558        // Don't call start, should still be able to shutdown
1559        assert!(runtime.shutdown().is_ok());
1560        assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1561    }
1562
1563    // ========================================================================
1564    // Enterprise Integration Tests
1565    // ========================================================================
1566
1567    #[test]
1568    fn test_enterprise_full_lifecycle() {
1569        // Build runtime with custom config
1570        let config = ConfigBuilder::new()
1571            .with_general(|g| g.app_name("integration-test").app_version("1.0.0"))
1572            .build()
1573            .unwrap();
1574
1575        let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1576
1577        // Verify initial state
1578        assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1579        assert!(!runtime.is_accepting_work());
1580
1581        // Start runtime
1582        runtime.start().unwrap();
1583        assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1584        assert!(runtime.is_accepting_work());
1585
1586        // Simulate work
1587        for _ in 0..10 {
1588            runtime.record_kernel_launch();
1589            runtime.record_messages(100);
1590        }
1591
1592        // Run health cycles
1593        for _ in 0..3 {
1594            let result = runtime.run_health_check_cycle();
1595            assert_eq!(result.status, crate::health::HealthStatus::Healthy);
1596        }
1597
1598        // Verify stats
1599        let stats = runtime.stats();
1600        assert_eq!(stats.kernels_launched, 10);
1601        assert_eq!(stats.messages_processed, 1000);
1602        assert_eq!(stats.health_checks_run, 3);
1603
1604        // Graceful shutdown
1605        runtime.request_shutdown().unwrap();
1606        assert_eq!(runtime.lifecycle_state(), LifecycleState::Draining);
1607
1608        let report = runtime.complete_shutdown().unwrap();
1609        assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1610        assert_eq!(report.final_stats.kernels_launched, 10);
1611    }
1612
1613    #[test]
1614    fn test_circuit_breaker_integration() {
1615        let runtime = RuntimeBuilder::new().build().unwrap();
1616        runtime.start().unwrap();
1617
1618        // Initially healthy
1619        let result = runtime.run_health_check_cycle();
1620        assert_eq!(result.circuit_state, CircuitState::Closed);
1621
1622        // Simulate failures until circuit opens
1623        let cb = runtime.circuit_breaker();
1624        for _ in 0..10 {
1625            cb.record_failure();
1626        }
1627
1628        // Circuit should be open now
1629        assert_eq!(cb.state(), CircuitState::Open);
1630
1631        // Health check should reflect degraded state
1632        let result = runtime.run_health_check_cycle();
1633        assert_eq!(result.circuit_state, CircuitState::Open);
1634        assert_eq!(result.status, crate::health::HealthStatus::Unhealthy);
1635    }
1636
1637    #[test]
1638    fn test_degradation_integration() {
1639        let runtime = RuntimeBuilder::new().build().unwrap();
1640        runtime.start().unwrap();
1641
1642        // Initially at normal level
1643        let result = runtime.run_health_check_cycle();
1644        assert_eq!(
1645            result.degradation_level,
1646            crate::health::DegradationLevel::Normal
1647        );
1648
1649        // Force circuit open
1650        let cb = runtime.circuit_breaker();
1651        for _ in 0..10 {
1652            cb.record_failure();
1653        }
1654
1655        // Health check should increase degradation
1656        let result = runtime.run_health_check_cycle();
1657        // Degradation should have increased from Normal
1658        assert_ne!(
1659            result.degradation_level,
1660            crate::health::DegradationLevel::Normal
1661        );
1662    }
1663
1664    #[test]
1665    fn test_configuration_presets_integration() {
1666        // Development preset
1667        let dev = RuntimeBuilder::new().development().build().unwrap();
1668        assert_eq!(
1669            dev.config().general.environment,
1670            crate::config::Environment::Development
1671        );
1672        assert!(dev.config().observability.tracing_enabled);
1673
1674        // Production preset
1675        let prod = RuntimeBuilder::new().production().build().unwrap();
1676        assert_eq!(
1677            prod.config().general.environment,
1678            crate::config::Environment::Production
1679        );
1680
1681        // High-performance preset
1682        let perf = RuntimeBuilder::new().high_performance().build().unwrap();
1683        assert!(!perf.config().observability.tracing_enabled);
1684    }
1685
1686    #[test]
1687    fn test_multi_gpu_coordinator_access() {
1688        let runtime = RuntimeBuilder::new().build().unwrap();
1689
1690        // Access multi-GPU coordinator
1691        let coordinator = runtime.multi_gpu_coordinator();
1692        assert_eq!(coordinator.device_count(), 0);
1693
1694        // Register a device
1695        let device = crate::multi_gpu::DeviceInfo::new(
1696            0,
1697            "Test GPU".to_string(),
1698            crate::runtime::Backend::Cpu,
1699        );
1700        coordinator.register_device(device);
1701        assert_eq!(coordinator.device_count(), 1);
1702    }
1703
1704    #[test]
1705    fn test_background_task_tracking() {
1706        let runtime = RuntimeBuilder::new().build().unwrap();
1707        runtime.start().unwrap();
1708
1709        // Initially no tasks have run
1710        let status = runtime.background_task_status();
1711        assert!(status.health_check_age.is_none());
1712        assert!(status.watchdog_scan_age.is_none());
1713        assert!(status.metrics_flush_age.is_none());
1714
1715        // Run health check
1716        runtime.run_health_check_cycle();
1717        let status = runtime.background_task_status();
1718        assert!(status.health_check_age.is_some());
1719
1720        // Run watchdog
1721        runtime.run_watchdog_cycle();
1722        let status = runtime.background_task_status();
1723        assert!(status.watchdog_scan_age.is_some());
1724
1725        // Flush metrics
1726        runtime.flush_metrics();
1727        let status = runtime.background_task_status();
1728        assert!(status.metrics_flush_age.is_some());
1729    }
1730
1731    // ========================================================================
1732    // Async Monitoring Tests
1733    // ========================================================================
1734
1735    #[test]
1736    fn test_monitoring_config_builder() {
1737        let config = MonitoringConfig::new()
1738            .health_check_interval(Duration::from_secs(5))
1739            .watchdog_interval(Duration::from_secs(2))
1740            .metrics_flush_interval(Duration::from_secs(30))
1741            .enable_health_checks(true)
1742            .enable_watchdog(false)
1743            .enable_metrics_flush(true);
1744
1745        assert_eq!(config.health_check_interval, Duration::from_secs(5));
1746        assert_eq!(config.watchdog_interval, Duration::from_secs(2));
1747        assert_eq!(config.metrics_flush_interval, Duration::from_secs(30));
1748        assert!(config.enable_health_checks);
1749        assert!(!config.enable_watchdog);
1750        assert!(config.enable_metrics_flush);
1751    }
1752
1753    #[test]
1754    fn test_monitoring_config_default() {
1755        let config = MonitoringConfig::default();
1756
1757        assert_eq!(config.health_check_interval, Duration::from_secs(10));
1758        assert_eq!(config.watchdog_interval, Duration::from_secs(5));
1759        assert_eq!(config.metrics_flush_interval, Duration::from_secs(60));
1760        assert!(config.enable_health_checks);
1761        assert!(config.enable_watchdog);
1762        assert!(config.enable_metrics_flush);
1763    }
1764
1765    #[tokio::test]
1766    async fn test_async_monitoring_start_stop() {
1767        let runtime = RuntimeBuilder::new().build().unwrap();
1768        runtime.start().unwrap();
1769
1770        // Start monitoring with short intervals
1771        let config = MonitoringConfig::new()
1772            .health_check_interval(Duration::from_millis(50))
1773            .watchdog_interval(Duration::from_millis(50))
1774            .metrics_flush_interval(Duration::from_millis(50));
1775
1776        let handles = runtime.start_monitoring(config);
1777
1778        // Verify tasks are running
1779        assert!(handles.is_running());
1780
1781        // Let some cycles run
1782        tokio::time::sleep(Duration::from_millis(150)).await;
1783
1784        // Verify health checks ran
1785        let status = runtime.background_task_status();
1786        assert!(status.health_check_age.is_some());
1787        assert!(status.watchdog_scan_age.is_some());
1788
1789        // Signal shutdown
1790        handles.signal_shutdown();
1791
1792        // Wait for tasks to complete
1793        handles.wait_for_shutdown().await;
1794    }
1795
1796    #[tokio::test]
1797    async fn test_async_monitoring_default_config() {
1798        let runtime = RuntimeBuilder::new().build().unwrap();
1799        runtime.start().unwrap();
1800
1801        // Start with default config (but we'll shut down quickly)
1802        let handles = runtime.start_monitoring_default();
1803        assert!(handles.is_running());
1804
1805        // Shutdown immediately
1806        handles.signal_shutdown();
1807        handles.wait_for_shutdown().await;
1808    }
1809
1810    #[tokio::test]
1811    async fn test_async_monitoring_selective_loops() {
1812        let runtime = RuntimeBuilder::new().build().unwrap();
1813        runtime.start().unwrap();
1814
1815        // Only enable health checks
1816        let config = MonitoringConfig::new()
1817            .health_check_interval(Duration::from_millis(50))
1818            .enable_health_checks(true)
1819            .enable_watchdog(false)
1820            .enable_metrics_flush(false);
1821
1822        let handles = runtime.start_monitoring(config);
1823
1824        // Only health check handle should be set
1825        assert!(handles.health_check_handle.is_some());
1826        assert!(handles.watchdog_handle.is_none());
1827        assert!(handles.metrics_flush_handle.is_none());
1828
1829        handles.signal_shutdown();
1830        handles.wait_for_shutdown().await;
1831    }
1832
1833    #[tokio::test]
1834    async fn test_async_monitoring_respects_shutdown_flag() {
1835        let runtime = RuntimeBuilder::new().build().unwrap();
1836        runtime.start().unwrap();
1837
1838        let config = MonitoringConfig::new().health_check_interval(Duration::from_millis(50));
1839
1840        let handles = runtime.start_monitoring(config);
1841
1842        // Request shutdown via runtime
1843        runtime.request_shutdown().unwrap();
1844
1845        // Let monitoring loop detect shutdown
1846        tokio::time::sleep(Duration::from_millis(100)).await;
1847
1848        // Tasks should have stopped
1849        handles.wait_for_shutdown().await;
1850    }
1851
1852    #[tokio::test]
1853    async fn test_monitoring_handles_is_running() {
1854        let runtime = RuntimeBuilder::new().build().unwrap();
1855        runtime.start().unwrap();
1856
1857        let config = MonitoringConfig::new().health_check_interval(Duration::from_millis(100));
1858
1859        let handles = runtime.start_monitoring(config);
1860        assert!(handles.is_running());
1861
1862        handles.signal_shutdown();
1863        handles.wait_for_shutdown().await;
1864
1865        // After shutdown, a new handles struct would be needed
1866    }
1867}