1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use parking_lot::RwLock;
32
33use crate::checkpoint::{CheckpointStorage, FileStorage, MemoryStorage};
34#[cfg(feature = "cloud-storage")]
35use crate::cloud_storage::{S3Config, S3Storage};
36use crate::config::{CheckpointStorageType, RingKernelConfig};
37use crate::error::{Result, RingKernelError};
38use crate::health::{
39 CircuitBreaker, CircuitState, DegradationManager, HealthChecker, HealthStatus, KernelWatchdog,
40};
41use crate::multi_gpu::{KernelMigrator, MultiGpuBuilder, MultiGpuCoordinator};
42use crate::observability::{ObservabilityContext, PrometheusExporter};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum LifecycleState {
51 Initializing,
53 Running,
55 Draining,
57 ShuttingDown,
59 Stopped,
61}
62
63impl LifecycleState {
64 pub fn is_accepting_work(&self) -> bool {
66 matches!(self, LifecycleState::Running)
67 }
68
69 pub fn is_active(&self) -> bool {
71 !matches!(self, LifecycleState::Stopped)
72 }
73}
74
75#[derive(Debug, Default)]
77struct BackgroundTasks {
78 health_check_loops: AtomicU64,
80 watchdog_loops: AtomicU64,
82 metrics_flush_loops: AtomicU64,
84 last_health_check: RwLock<Option<Instant>>,
86 last_watchdog_scan: RwLock<Option<Instant>>,
88 last_metrics_flush: RwLock<Option<Instant>>,
90}
91
92impl BackgroundTasks {
93 fn new() -> Self {
94 Self::default()
95 }
96
97 fn record_health_check(&self) {
98 *self.last_health_check.write() = Some(Instant::now());
99 }
100
101 fn record_watchdog_scan(&self) {
102 *self.last_watchdog_scan.write() = Some(Instant::now());
103 }
104
105 fn record_metrics_flush(&self) {
106 *self.last_metrics_flush.write() = Some(Instant::now());
107 }
108
109 fn health_check_age(&self) -> Option<Duration> {
110 self.last_health_check.read().map(|t| t.elapsed())
111 }
112
113 fn watchdog_scan_age(&self) -> Option<Duration> {
114 self.last_watchdog_scan.read().map(|t| t.elapsed())
115 }
116
117 fn metrics_flush_age(&self) -> Option<Duration> {
118 self.last_metrics_flush.read().map(|t| t.elapsed())
119 }
120}
121
122use tokio::sync::watch;
127use tokio::task::JoinHandle;
128
129#[derive(Debug, Clone)]
131pub struct MonitoringConfig {
132 pub health_check_interval: Duration,
134 pub watchdog_interval: Duration,
136 pub metrics_flush_interval: Duration,
138 pub enable_health_checks: bool,
140 pub enable_watchdog: bool,
142 pub enable_metrics_flush: bool,
144}
145
146impl Default for MonitoringConfig {
147 fn default() -> Self {
148 Self {
149 health_check_interval: Duration::from_secs(10),
150 watchdog_interval: Duration::from_secs(5),
151 metrics_flush_interval: Duration::from_secs(60),
152 enable_health_checks: true,
153 enable_watchdog: true,
154 enable_metrics_flush: true,
155 }
156 }
157}
158
159impl MonitoringConfig {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn health_check_interval(mut self, interval: Duration) -> Self {
167 self.health_check_interval = interval;
168 self
169 }
170
171 pub fn watchdog_interval(mut self, interval: Duration) -> Self {
173 self.watchdog_interval = interval;
174 self
175 }
176
177 pub fn metrics_flush_interval(mut self, interval: Duration) -> Self {
179 self.metrics_flush_interval = interval;
180 self
181 }
182
183 pub fn enable_health_checks(mut self, enable: bool) -> Self {
185 self.enable_health_checks = enable;
186 self
187 }
188
189 pub fn enable_watchdog(mut self, enable: bool) -> Self {
191 self.enable_watchdog = enable;
192 self
193 }
194
195 pub fn enable_metrics_flush(mut self, enable: bool) -> Self {
197 self.enable_metrics_flush = enable;
198 self
199 }
200}
201
202pub struct MonitoringHandles {
204 pub health_check_handle: Option<JoinHandle<()>>,
206 pub watchdog_handle: Option<JoinHandle<()>>,
208 pub metrics_flush_handle: Option<JoinHandle<()>>,
210 shutdown_tx: watch::Sender<bool>,
212}
213
214impl MonitoringHandles {
215 fn new() -> (Self, watch::Receiver<bool>) {
217 let (shutdown_tx, shutdown_rx) = watch::channel(false);
218 (
219 Self {
220 health_check_handle: None,
221 watchdog_handle: None,
222 metrics_flush_handle: None,
223 shutdown_tx,
224 },
225 shutdown_rx,
226 )
227 }
228
229 pub fn signal_shutdown(&self) {
231 let _ = self.shutdown_tx.send(true);
232 }
233
234 pub async fn wait_for_shutdown(self) {
236 if let Some(handle) = self.health_check_handle {
237 let _ = handle.await;
238 }
239 if let Some(handle) = self.watchdog_handle {
240 let _ = handle.await;
241 }
242 if let Some(handle) = self.metrics_flush_handle {
243 let _ = handle.await;
244 }
245 }
246
247 pub fn is_running(&self) -> bool {
249 self.health_check_handle
250 .as_ref()
251 .map(|h| !h.is_finished())
252 .unwrap_or(false)
253 || self
254 .watchdog_handle
255 .as_ref()
256 .map(|h| !h.is_finished())
257 .unwrap_or(false)
258 || self
259 .metrics_flush_handle
260 .as_ref()
261 .map(|h| !h.is_finished())
262 .unwrap_or(false)
263 }
264}
265
266pub struct RingKernelContext {
288 config: RingKernelConfig,
290 health_checker: Arc<HealthChecker>,
292 watchdog: Arc<KernelWatchdog>,
294 circuit_breaker: Arc<CircuitBreaker>,
296 degradation_manager: Arc<DegradationManager>,
298 prometheus_exporter: Arc<PrometheusExporter>,
300 observability: Arc<ObservabilityContext>,
302 multi_gpu_coordinator: Arc<MultiGpuCoordinator>,
304 migrator: Arc<KernelMigrator>,
306 checkpoint_storage: Arc<dyn CheckpointStorage>,
308 stats: RuntimeStats,
310 started_at: Instant,
312 running: AtomicBool,
314 lifecycle_state: RwLock<LifecycleState>,
316 background_tasks: BackgroundTasks,
318 shutdown_requested: AtomicBool,
320}
321
322impl RingKernelContext {
323 pub fn config(&self) -> &RingKernelConfig {
325 &self.config
326 }
327
328 pub fn health_checker(&self) -> Arc<HealthChecker> {
330 Arc::clone(&self.health_checker)
331 }
332
333 pub fn watchdog(&self) -> Arc<KernelWatchdog> {
335 Arc::clone(&self.watchdog)
336 }
337
338 pub fn circuit_breaker(&self) -> Arc<CircuitBreaker> {
340 Arc::clone(&self.circuit_breaker)
341 }
342
343 pub fn degradation_manager(&self) -> Arc<DegradationManager> {
345 Arc::clone(&self.degradation_manager)
346 }
347
348 pub fn prometheus_exporter(&self) -> Arc<PrometheusExporter> {
350 Arc::clone(&self.prometheus_exporter)
351 }
352
353 pub fn observability(&self) -> Arc<ObservabilityContext> {
355 Arc::clone(&self.observability)
356 }
357
358 pub fn multi_gpu_coordinator(&self) -> Arc<MultiGpuCoordinator> {
360 Arc::clone(&self.multi_gpu_coordinator)
361 }
362
363 pub fn migrator(&self) -> Arc<KernelMigrator> {
365 Arc::clone(&self.migrator)
366 }
367
368 pub fn checkpoint_storage(&self) -> Arc<dyn CheckpointStorage> {
370 Arc::clone(&self.checkpoint_storage)
371 }
372
373 pub fn is_running(&self) -> bool {
375 self.running.load(Ordering::SeqCst)
376 }
377
378 pub fn uptime(&self) -> std::time::Duration {
380 self.started_at.elapsed()
381 }
382
383 pub fn stats(&self) -> RuntimeStatsSnapshot {
385 RuntimeStatsSnapshot {
386 uptime: self.uptime(),
387 kernels_launched: self.stats.kernels_launched.load(Ordering::Relaxed),
388 messages_processed: self.stats.messages_processed.load(Ordering::Relaxed),
389 migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
390 checkpoints_created: self.stats.checkpoints_created.load(Ordering::Relaxed),
391 health_checks_run: self.stats.health_checks_run.load(Ordering::Relaxed),
392 circuit_breaker_trips: self.stats.circuit_breaker_trips.load(Ordering::Relaxed),
393 }
394 }
395
396 pub fn record_kernel_launch(&self) {
398 self.stats.kernels_launched.fetch_add(1, Ordering::Relaxed);
399 }
400
401 pub fn record_messages(&self, count: u64) {
403 self.stats
404 .messages_processed
405 .fetch_add(count, Ordering::Relaxed);
406 }
407
408 pub fn record_migration(&self) {
410 self.stats
411 .migrations_completed
412 .fetch_add(1, Ordering::Relaxed);
413 }
414
415 pub fn record_checkpoint(&self) {
417 self.stats
418 .checkpoints_created
419 .fetch_add(1, Ordering::Relaxed);
420 }
421
422 pub fn record_health_check(&self) {
424 self.stats.health_checks_run.fetch_add(1, Ordering::Relaxed);
425 }
426
427 pub fn record_circuit_trip(&self) {
429 self.stats
430 .circuit_breaker_trips
431 .fetch_add(1, Ordering::Relaxed);
432 }
433
434 pub fn lifecycle_state(&self) -> LifecycleState {
440 *self.lifecycle_state.read()
441 }
442
443 pub fn is_shutdown_requested(&self) -> bool {
445 self.shutdown_requested.load(Ordering::SeqCst)
446 }
447
448 pub fn is_accepting_work(&self) -> bool {
450 self.lifecycle_state().is_accepting_work()
451 }
452
453 pub fn start(&self) -> Result<()> {
457 let mut state = self.lifecycle_state.write();
458 if *state != LifecycleState::Initializing {
459 return Err(RingKernelError::InvalidState {
460 expected: "Initializing".to_string(),
461 actual: format!("{:?}", *state),
462 });
463 }
464 *state = LifecycleState::Running;
465 self.running.store(true, Ordering::SeqCst);
466 Ok(())
467 }
468
469 pub fn run_health_check_cycle(&self) -> HealthCycleResult {
477 self.background_tasks.record_health_check();
478 self.record_health_check();
479
480 let circuit_state = self.circuit_breaker.state();
482
483 let status = match circuit_state {
485 CircuitState::Closed => HealthStatus::Healthy,
486 CircuitState::HalfOpen => HealthStatus::Degraded,
487 CircuitState::Open => HealthStatus::Unhealthy,
488 };
489
490 let current_level = self.degradation_manager.level();
492 let new_level = match circuit_state {
493 CircuitState::Open => {
494 current_level.next_worse()
496 }
497 CircuitState::Closed => {
498 current_level.next_better()
500 }
501 CircuitState::HalfOpen => {
502 current_level
504 }
505 };
506
507 if new_level != current_level {
508 self.degradation_manager.set_level(new_level);
509 }
510
511 HealthCycleResult {
512 status,
513 circuit_state,
514 degradation_level: self.degradation_manager.level(),
515 timestamp: Instant::now(),
516 }
517 }
518
519 pub fn run_watchdog_cycle(&self) -> WatchdogResult {
523 self.background_tasks.record_watchdog_scan();
524
525 let kernel_health = self.watchdog.check_all();
526 let stale_count = kernel_health
527 .iter()
528 .filter(|h| h.status == HealthStatus::Unhealthy)
529 .count();
530
531 WatchdogResult {
532 stale_kernels: stale_count,
533 timestamp: Instant::now(),
534 }
535 }
536
537 pub fn flush_metrics(&self) -> String {
541 self.background_tasks.record_metrics_flush();
542 self.prometheus_exporter.render()
543 }
544
545 pub fn background_task_status(&self) -> BackgroundTaskStatus {
547 BackgroundTaskStatus {
548 health_check_age: self.background_tasks.health_check_age(),
549 watchdog_scan_age: self.background_tasks.watchdog_scan_age(),
550 metrics_flush_age: self.background_tasks.metrics_flush_age(),
551 active_health_loops: self
552 .background_tasks
553 .health_check_loops
554 .load(Ordering::Relaxed),
555 active_watchdog_loops: self.background_tasks.watchdog_loops.load(Ordering::Relaxed),
556 active_metrics_loops: self
557 .background_tasks
558 .metrics_flush_loops
559 .load(Ordering::Relaxed),
560 }
561 }
562
563 pub fn start_monitoring(self: &Arc<Self>, config: MonitoringConfig) -> MonitoringHandles {
595 let (mut handles, shutdown_rx) = MonitoringHandles::new();
596
597 if config.enable_health_checks {
599 let runtime = Arc::clone(self);
600 let interval = config.health_check_interval;
601 let mut shutdown = shutdown_rx.clone();
602
603 handles.health_check_handle = Some(tokio::spawn(async move {
604 runtime
605 .background_tasks
606 .health_check_loops
607 .fetch_add(1, Ordering::Relaxed);
608
609 let mut interval_timer = tokio::time::interval(interval);
610 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
611
612 loop {
613 tokio::select! {
614 _ = interval_timer.tick() => {
615 if runtime.is_shutdown_requested() {
616 break;
617 }
618 let _result = runtime.run_health_check_cycle();
619 tracing::debug!("Health check cycle completed");
620 }
621 _ = shutdown.changed() => {
622 tracing::info!("Health check loop shutting down");
623 break;
624 }
625 }
626 }
627
628 runtime
629 .background_tasks
630 .health_check_loops
631 .fetch_sub(1, Ordering::Relaxed);
632 }));
633 }
634
635 if config.enable_watchdog {
637 let runtime = Arc::clone(self);
638 let interval = config.watchdog_interval;
639 let mut shutdown = shutdown_rx.clone();
640
641 handles.watchdog_handle = Some(tokio::spawn(async move {
642 runtime
643 .background_tasks
644 .watchdog_loops
645 .fetch_add(1, Ordering::Relaxed);
646
647 let mut interval_timer = tokio::time::interval(interval);
648 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
649
650 loop {
651 tokio::select! {
652 _ = interval_timer.tick() => {
653 if runtime.is_shutdown_requested() {
654 break;
655 }
656 let result = runtime.run_watchdog_cycle();
657 if result.stale_kernels > 0 {
658 tracing::warn!("Watchdog detected {} stale kernels", result.stale_kernels);
659 }
660 }
661 _ = shutdown.changed() => {
662 tracing::info!("Watchdog loop shutting down");
663 break;
664 }
665 }
666 }
667
668 runtime
669 .background_tasks
670 .watchdog_loops
671 .fetch_sub(1, Ordering::Relaxed);
672 }));
673 }
674
675 if config.enable_metrics_flush {
677 let runtime = Arc::clone(self);
678 let interval = config.metrics_flush_interval;
679 let mut shutdown = shutdown_rx;
680
681 handles.metrics_flush_handle = Some(tokio::spawn(async move {
682 runtime
683 .background_tasks
684 .metrics_flush_loops
685 .fetch_add(1, Ordering::Relaxed);
686
687 let mut interval_timer = tokio::time::interval(interval);
688 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
689
690 loop {
691 tokio::select! {
692 _ = interval_timer.tick() => {
693 if runtime.is_shutdown_requested() {
694 break;
695 }
696 let _metrics = runtime.flush_metrics();
697 tracing::debug!("Metrics flush completed");
698 }
699 _ = shutdown.changed() => {
700 tracing::info!("Metrics flush loop shutting down");
701 break;
702 }
703 }
704 }
705
706 runtime
707 .background_tasks
708 .metrics_flush_loops
709 .fetch_sub(1, Ordering::Relaxed);
710 }));
711 }
712
713 handles
714 }
715
716 pub fn start_monitoring_default(self: &Arc<Self>) -> MonitoringHandles {
718 self.start_monitoring(MonitoringConfig::default())
719 }
720
721 pub fn request_shutdown(&self) -> Result<()> {
726 self.shutdown_requested.store(true, Ordering::SeqCst);
728
729 let mut state = self.lifecycle_state.write();
731 match *state {
732 LifecycleState::Running => {
733 *state = LifecycleState::Draining;
734 Ok(())
735 }
736 LifecycleState::Draining | LifecycleState::ShuttingDown => {
737 Ok(())
739 }
740 LifecycleState::Stopped => Err(RingKernelError::InvalidState {
741 expected: "Running or Draining".to_string(),
742 actual: "Stopped".to_string(),
743 }),
744 LifecycleState::Initializing => {
745 *state = LifecycleState::ShuttingDown;
747 Ok(())
748 }
749 }
750 }
751
752 pub fn complete_shutdown(&self) -> Result<ShutdownReport> {
756 let start = Instant::now();
757
758 {
760 let mut state = self.lifecycle_state.write();
761 if *state == LifecycleState::Stopped {
762 return Err(RingKernelError::InvalidState {
763 expected: "not Stopped".to_string(),
764 actual: "Stopped".to_string(),
765 });
766 }
767 *state = LifecycleState::ShuttingDown;
768 }
769
770 let final_stats = self.stats();
772 let final_metrics = self.flush_metrics();
773
774 {
776 let mut state = self.lifecycle_state.write();
777 *state = LifecycleState::Stopped;
778 self.running.store(false, Ordering::SeqCst);
779 }
780
781 Ok(ShutdownReport {
782 duration: start.elapsed(),
783 total_uptime: self.uptime(),
784 final_stats,
785 final_metrics,
786 })
787 }
788
789 pub fn shutdown(&self) -> Result<()> {
793 self.request_shutdown()?;
794 self.complete_shutdown()?;
795 Ok(())
796 }
797
798 pub fn app_info(&self) -> AppInfo {
800 AppInfo {
801 name: self.config.general.app_name.clone(),
802 version: self.config.general.app_version.clone(),
803 environment: self.config.general.environment.as_str().to_string(),
804 }
805 }
806}
807
808#[derive(Debug, Clone)]
810pub struct HealthCycleResult {
811 pub status: HealthStatus,
813 pub circuit_state: CircuitState,
815 pub degradation_level: crate::health::DegradationLevel,
817 pub timestamp: Instant,
819}
820
821#[derive(Debug, Clone)]
823pub struct WatchdogResult {
824 pub stale_kernels: usize,
826 pub timestamp: Instant,
828}
829
830#[derive(Debug, Clone)]
832pub struct BackgroundTaskStatus {
833 pub health_check_age: Option<Duration>,
835 pub watchdog_scan_age: Option<Duration>,
837 pub metrics_flush_age: Option<Duration>,
839 pub active_health_loops: u64,
841 pub active_watchdog_loops: u64,
843 pub active_metrics_loops: u64,
845}
846
847#[derive(Debug, Clone)]
849pub struct ShutdownReport {
850 pub duration: Duration,
852 pub total_uptime: Duration,
854 pub final_stats: RuntimeStatsSnapshot,
856 pub final_metrics: String,
858}
859
860#[derive(Debug, Default)]
862struct RuntimeStats {
863 kernels_launched: AtomicU64,
864 messages_processed: AtomicU64,
865 migrations_completed: AtomicU64,
866 checkpoints_created: AtomicU64,
867 health_checks_run: AtomicU64,
868 circuit_breaker_trips: AtomicU64,
869}
870
871#[derive(Debug, Clone)]
873pub struct RuntimeStatsSnapshot {
874 pub uptime: std::time::Duration,
876 pub kernels_launched: u64,
878 pub messages_processed: u64,
880 pub migrations_completed: u64,
882 pub checkpoints_created: u64,
884 pub health_checks_run: u64,
886 pub circuit_breaker_trips: u64,
888}
889
890#[derive(Debug, Clone)]
892pub struct AppInfo {
893 pub name: String,
895 pub version: String,
897 pub environment: String,
899}
900
901pub struct RuntimeBuilder {
907 config: Option<RingKernelConfig>,
908 health_checker: Option<Arc<HealthChecker>>,
909 watchdog: Option<Arc<KernelWatchdog>>,
910 multi_gpu_coordinator: Option<Arc<MultiGpuCoordinator>>,
911 checkpoint_storage: Option<Arc<dyn CheckpointStorage>>,
912}
913
914impl RuntimeBuilder {
915 pub fn new() -> Self {
917 Self {
918 config: None,
919 health_checker: None,
920 watchdog: None,
921 multi_gpu_coordinator: None,
922 checkpoint_storage: None,
923 }
924 }
925
926 pub fn with_config(mut self, config: RingKernelConfig) -> Self {
928 self.config = Some(config);
929 self
930 }
931
932 pub fn development(mut self) -> Self {
934 self.config = Some(RingKernelConfig::development());
935 self
936 }
937
938 pub fn production(mut self) -> Self {
940 self.config = Some(RingKernelConfig::production());
941 self
942 }
943
944 pub fn high_performance(mut self) -> Self {
946 self.config = Some(RingKernelConfig::high_performance());
947 self
948 }
949
950 pub fn with_health_checker(mut self, checker: Arc<HealthChecker>) -> Self {
952 self.health_checker = Some(checker);
953 self
954 }
955
956 pub fn with_watchdog(mut self, watchdog: Arc<KernelWatchdog>) -> Self {
958 self.watchdog = Some(watchdog);
959 self
960 }
961
962 pub fn with_multi_gpu_coordinator(mut self, coordinator: Arc<MultiGpuCoordinator>) -> Self {
964 self.multi_gpu_coordinator = Some(coordinator);
965 self
966 }
967
968 pub fn with_checkpoint_storage(mut self, storage: Arc<dyn CheckpointStorage>) -> Self {
970 self.checkpoint_storage = Some(storage);
971 self
972 }
973
974 pub fn build(self) -> Result<Arc<RingKernelContext>> {
976 let config = self.config.unwrap_or_default();
977 config.validate()?;
978
979 let health_checker = self.health_checker.unwrap_or_default();
981
982 let watchdog = self.watchdog.unwrap_or_default();
984
985 let circuit_breaker = CircuitBreaker::with_config(config.health.circuit_breaker.clone());
987
988 let degradation_manager =
990 DegradationManager::with_policy(config.health.load_shedding.clone());
991
992 let prometheus_exporter = PrometheusExporter::new();
994
995 let observability = ObservabilityContext::new();
997
998 let multi_gpu_coordinator = self.multi_gpu_coordinator.unwrap_or_else(|| {
1000 MultiGpuBuilder::new()
1001 .load_balancing(config.multi_gpu.load_balancing)
1002 .auto_select_device(config.multi_gpu.auto_select_device)
1003 .max_kernels_per_device(config.multi_gpu.max_kernels_per_device)
1004 .enable_p2p(config.multi_gpu.p2p_enabled)
1005 .preferred_devices(config.multi_gpu.preferred_devices.clone())
1006 .build()
1007 });
1008
1009 let checkpoint_storage: Arc<dyn CheckpointStorage> =
1011 self.checkpoint_storage.unwrap_or_else(|| {
1012 match config.migration.storage {
1013 CheckpointStorageType::Memory => Arc::new(MemoryStorage::new()),
1014 CheckpointStorageType::File => {
1015 Arc::new(FileStorage::new(&config.migration.checkpoint_dir))
1016 }
1017 CheckpointStorageType::Cloud => {
1018 #[cfg(feature = "cloud-storage")]
1019 {
1020 let cloud_cfg = &config.migration.cloud_config;
1022 let s3_config = S3Config::new(&cloud_cfg.s3_bucket)
1023 .with_prefix(&cloud_cfg.s3_prefix);
1024 let s3_config = if let Some(ref region) = cloud_cfg.s3_region {
1025 s3_config.with_region(region)
1026 } else {
1027 s3_config
1028 };
1029 let s3_config = if let Some(ref endpoint) = cloud_cfg.s3_endpoint {
1030 s3_config.with_endpoint(endpoint)
1031 } else {
1032 s3_config
1033 };
1034 let s3_config = if cloud_cfg.s3_encryption {
1035 s3_config.with_encryption()
1036 } else {
1037 s3_config
1038 };
1039
1040 match tokio::task::block_in_place(|| {
1042 tokio::runtime::Handle::current()
1043 .block_on(S3Storage::new(s3_config))
1044 }) {
1045 Ok(storage) => Arc::new(storage) as Arc<dyn CheckpointStorage>,
1046 Err(e) => {
1047 tracing::warn!(
1048 "Failed to create S3 storage: {}, falling back to memory",
1049 e
1050 );
1051 Arc::new(MemoryStorage::new())
1052 }
1053 }
1054 }
1055 #[cfg(not(feature = "cloud-storage"))]
1056 {
1057 tracing::warn!(
1058 "Cloud storage requested but cloud-storage feature not enabled, \
1059 falling back to memory storage"
1060 );
1061 Arc::new(MemoryStorage::new())
1062 }
1063 }
1064 }
1065 });
1066
1067 let migrator = Arc::new(KernelMigrator::with_storage(
1069 Arc::clone(&multi_gpu_coordinator),
1070 Arc::clone(&checkpoint_storage),
1071 ));
1072
1073 Ok(Arc::new(RingKernelContext {
1074 config,
1075 health_checker,
1076 watchdog,
1077 circuit_breaker,
1078 degradation_manager,
1079 prometheus_exporter,
1080 observability,
1081 multi_gpu_coordinator,
1082 migrator,
1083 checkpoint_storage,
1084 stats: RuntimeStats::default(),
1085 started_at: Instant::now(),
1086 running: AtomicBool::new(false), lifecycle_state: RwLock::new(LifecycleState::Initializing),
1088 background_tasks: BackgroundTasks::new(),
1089 shutdown_requested: AtomicBool::new(false),
1090 }))
1091 }
1092}
1093
1094impl Default for RuntimeBuilder {
1095 fn default() -> Self {
1096 Self::new()
1097 }
1098}
1099
1100pub struct CircuitGuard<'a> {
1106 context: &'a RingKernelContext,
1107 operation_name: String,
1108}
1109
1110impl<'a> CircuitGuard<'a> {
1111 pub fn new(context: &'a RingKernelContext, operation_name: impl Into<String>) -> Self {
1113 Self {
1114 context,
1115 operation_name: operation_name.into(),
1116 }
1117 }
1118
1119 pub fn execute<T, F>(&self, f: F) -> Result<T>
1121 where
1122 F: FnOnce() -> Result<T>,
1123 {
1124 if self.context.circuit_breaker.state() == CircuitState::Open {
1126 self.context.record_circuit_trip();
1127 return Err(RingKernelError::CircuitBreakerOpen {
1128 name: self.operation_name.clone(),
1129 });
1130 }
1131
1132 match f() {
1134 Ok(result) => {
1135 self.context.circuit_breaker.record_success();
1136 Ok(result)
1137 }
1138 Err(e) => {
1139 self.context.circuit_breaker.record_failure();
1140 Err(e)
1141 }
1142 }
1143 }
1144}
1145
1146pub struct DegradationGuard<'a> {
1148 context: &'a RingKernelContext,
1149}
1150
1151impl<'a> DegradationGuard<'a> {
1152 pub fn new(context: &'a RingKernelContext) -> Self {
1154 Self { context }
1155 }
1156
1157 pub fn allow_operation(&self, priority: OperationPriority) -> bool {
1159 let level = self.context.degradation_manager.level();
1160 match level {
1161 crate::health::DegradationLevel::Normal => true,
1162 crate::health::DegradationLevel::Light => true,
1163 crate::health::DegradationLevel::Moderate => {
1164 matches!(
1165 priority,
1166 OperationPriority::Normal
1167 | OperationPriority::High
1168 | OperationPriority::Critical
1169 )
1170 }
1171 crate::health::DegradationLevel::Severe => {
1172 matches!(
1173 priority,
1174 OperationPriority::High | OperationPriority::Critical
1175 )
1176 }
1177 crate::health::DegradationLevel::Critical => {
1178 matches!(priority, OperationPriority::Critical)
1179 }
1180 }
1181 }
1182
1183 pub fn execute_if_allowed<T, F>(&self, priority: OperationPriority, f: F) -> Result<T>
1185 where
1186 F: FnOnce() -> Result<T>,
1187 {
1188 if self.allow_operation(priority) {
1189 f()
1190 } else {
1191 Err(RingKernelError::LoadSheddingRejected {
1192 level: format!("{:?}", self.context.degradation_manager.level()),
1193 })
1194 }
1195 }
1196}
1197
1198#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1200pub enum OperationPriority {
1201 Low,
1203 Normal,
1205 High,
1207 Critical,
1209}
1210
1211impl RingKernelContext {
1216 pub fn export_metrics(&self) -> String {
1218 self.prometheus_exporter.render()
1219 }
1220
1221 pub fn metrics_snapshot(&self) -> ContextMetrics {
1223 let stats = self.stats();
1224 ContextMetrics {
1225 uptime_seconds: stats.uptime.as_secs_f64(),
1226 kernels_launched: stats.kernels_launched,
1227 messages_processed: stats.messages_processed,
1228 migrations_completed: stats.migrations_completed,
1229 checkpoints_created: stats.checkpoints_created,
1230 health_checks_run: stats.health_checks_run,
1231 circuit_breaker_trips: stats.circuit_breaker_trips,
1232 circuit_breaker_state: format!("{:?}", self.circuit_breaker.state()),
1233 degradation_level: format!("{:?}", self.degradation_manager.level()),
1234 multi_gpu_device_count: self.multi_gpu_coordinator.device_count(),
1235 }
1236 }
1237}
1238
1239#[derive(Debug, Clone)]
1241pub struct ContextMetrics {
1242 pub uptime_seconds: f64,
1244 pub kernels_launched: u64,
1246 pub messages_processed: u64,
1248 pub migrations_completed: u64,
1250 pub checkpoints_created: u64,
1252 pub health_checks_run: u64,
1254 pub circuit_breaker_trips: u64,
1256 pub circuit_breaker_state: String,
1258 pub degradation_level: String,
1260 pub multi_gpu_device_count: usize,
1262}
1263
1264#[cfg(test)]
1269mod tests {
1270 use super::*;
1271 use crate::config::ConfigBuilder;
1272 use std::time::Duration;
1273
1274 #[test]
1275 fn test_runtime_builder_default() {
1276 let runtime = RuntimeBuilder::new().build().unwrap();
1277 assert!(!runtime.is_running());
1279 assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1280
1281 runtime.start().unwrap();
1283 assert!(runtime.is_running());
1284 assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1285 }
1286
1287 #[test]
1288 fn test_runtime_builder_with_config() {
1289 let config = ConfigBuilder::new()
1290 .with_general(|g| g.app_name("test_app"))
1291 .build()
1292 .unwrap();
1293
1294 let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1295
1296 assert_eq!(runtime.config().general.app_name, "test_app");
1297 }
1298
1299 #[test]
1300 fn test_runtime_presets() {
1301 let dev = RuntimeBuilder::new().development().build().unwrap();
1302 assert_eq!(
1303 dev.config().general.environment,
1304 crate::config::Environment::Development
1305 );
1306
1307 let prod = RuntimeBuilder::new().production().build().unwrap();
1308 assert_eq!(
1309 prod.config().general.environment,
1310 crate::config::Environment::Production
1311 );
1312
1313 let perf = RuntimeBuilder::new().high_performance().build().unwrap();
1314 assert!(!perf.config().observability.tracing_enabled);
1315 }
1316
1317 #[test]
1318 fn test_runtime_stats() {
1319 let runtime = RuntimeBuilder::new().build().unwrap();
1320
1321 runtime.record_kernel_launch();
1322 runtime.record_kernel_launch();
1323 runtime.record_messages(100);
1324 runtime.record_migration();
1325 runtime.record_checkpoint();
1326 runtime.record_health_check();
1327
1328 let stats = runtime.stats();
1329 assert_eq!(stats.kernels_launched, 2);
1330 assert_eq!(stats.messages_processed, 100);
1331 assert_eq!(stats.migrations_completed, 1);
1332 assert_eq!(stats.checkpoints_created, 1);
1333 assert_eq!(stats.health_checks_run, 1);
1334 }
1335
1336 #[test]
1337 fn test_runtime_uptime() {
1338 let runtime = RuntimeBuilder::new().build().unwrap();
1339 std::thread::sleep(Duration::from_millis(10));
1340 assert!(runtime.uptime() >= Duration::from_millis(10));
1341 }
1342
1343 #[test]
1344 fn test_runtime_shutdown() {
1345 let runtime = RuntimeBuilder::new().build().unwrap();
1346 runtime.start().unwrap();
1347 assert!(runtime.is_running());
1348 assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1349
1350 runtime.shutdown().unwrap();
1351 assert!(!runtime.is_running());
1352 assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1353
1354 assert!(runtime.shutdown().is_err());
1356 }
1357
1358 #[test]
1359 fn test_runtime_app_info() {
1360 let config = ConfigBuilder::new()
1361 .with_general(|g| {
1362 g.app_name("my_app")
1363 .app_version("1.2.3")
1364 .environment(crate::config::Environment::Staging)
1365 })
1366 .build()
1367 .unwrap();
1368
1369 let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1370
1371 let info = runtime.app_info();
1372 assert_eq!(info.name, "my_app");
1373 assert_eq!(info.version, "1.2.3");
1374 assert_eq!(info.environment, "staging");
1375 }
1376
1377 #[test]
1378 fn test_circuit_guard() {
1379 let runtime = RuntimeBuilder::new().build().unwrap();
1380
1381 let guard = CircuitGuard::new(&runtime, "test_op");
1382
1383 let result: Result<i32> = guard.execute(|| Ok(42));
1385 assert_eq!(result.unwrap(), 42);
1386
1387 let result: Result<i32> =
1389 guard.execute(|| Err(RingKernelError::Internal("test error".to_string())));
1390 assert!(result.is_err());
1391 }
1392
1393 #[test]
1394 fn test_degradation_guard() {
1395 let runtime = RuntimeBuilder::new().build().unwrap();
1396 let guard = DegradationGuard::new(&runtime);
1397
1398 assert!(guard.allow_operation(OperationPriority::Low));
1400 assert!(guard.allow_operation(OperationPriority::Normal));
1401 assert!(guard.allow_operation(OperationPriority::High));
1402 assert!(guard.allow_operation(OperationPriority::Critical));
1403 }
1404
1405 #[test]
1406 fn test_operation_priority_ordering() {
1407 assert!(OperationPriority::Low < OperationPriority::Normal);
1408 assert!(OperationPriority::Normal < OperationPriority::High);
1409 assert!(OperationPriority::High < OperationPriority::Critical);
1410 }
1411
1412 #[test]
1413 fn test_metrics_snapshot() {
1414 let runtime = RuntimeBuilder::new().build().unwrap();
1415
1416 runtime.record_kernel_launch();
1417 runtime.record_messages(50);
1418
1419 let metrics = runtime.metrics_snapshot();
1420 assert_eq!(metrics.kernels_launched, 1);
1421 assert_eq!(metrics.messages_processed, 50);
1422 assert!(metrics.uptime_seconds >= 0.0);
1423 }
1424
1425 #[test]
1426 fn test_custom_storage() {
1427 let storage = Arc::new(MemoryStorage::new());
1428 let runtime = RuntimeBuilder::new()
1429 .with_checkpoint_storage(storage.clone())
1430 .build()
1431 .unwrap();
1432
1433 let _migrator = runtime.migrator();
1435 }
1436
1437 #[test]
1438 fn test_export_metrics() {
1439 let runtime = RuntimeBuilder::new().build().unwrap();
1440 let metrics = runtime.export_metrics();
1441 assert!(
1443 metrics.is_empty()
1444 || metrics.contains('#')
1445 || metrics.contains('\n')
1446 || !metrics.is_empty()
1447 );
1448 }
1449
1450 #[test]
1455 fn test_lifecycle_state_transitions() {
1456 let runtime = RuntimeBuilder::new().build().unwrap();
1457
1458 assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1460 assert!(!runtime.is_accepting_work());
1461
1462 runtime.start().unwrap();
1464 assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1465 assert!(runtime.is_accepting_work());
1466
1467 runtime.request_shutdown().unwrap();
1469 assert_eq!(runtime.lifecycle_state(), LifecycleState::Draining);
1470 assert!(!runtime.is_accepting_work());
1471
1472 let report = runtime.complete_shutdown().unwrap();
1474 assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1475 assert!(report.duration.as_nanos() > 0);
1476 }
1477
1478 #[test]
1479 fn test_lifecycle_state_helpers() {
1480 assert!(LifecycleState::Running.is_accepting_work());
1481 assert!(!LifecycleState::Initializing.is_accepting_work());
1482 assert!(!LifecycleState::Draining.is_accepting_work());
1483 assert!(!LifecycleState::ShuttingDown.is_accepting_work());
1484 assert!(!LifecycleState::Stopped.is_accepting_work());
1485
1486 assert!(LifecycleState::Initializing.is_active());
1487 assert!(LifecycleState::Running.is_active());
1488 assert!(LifecycleState::Draining.is_active());
1489 assert!(LifecycleState::ShuttingDown.is_active());
1490 assert!(!LifecycleState::Stopped.is_active());
1491 }
1492
1493 #[test]
1494 fn test_health_check_cycle() {
1495 let runtime = RuntimeBuilder::new().build().unwrap();
1496 runtime.start().unwrap();
1497
1498 let result = runtime.run_health_check_cycle();
1499 assert_eq!(result.status, crate::health::HealthStatus::Healthy);
1500 assert_eq!(result.circuit_state, CircuitState::Closed);
1501
1502 let status = runtime.background_task_status();
1504 assert!(status.health_check_age.is_some());
1505 }
1506
1507 #[test]
1508 fn test_watchdog_cycle() {
1509 let runtime = RuntimeBuilder::new().build().unwrap();
1510 runtime.start().unwrap();
1511
1512 let result = runtime.run_watchdog_cycle();
1513 assert_eq!(result.stale_kernels, 0);
1514
1515 let status = runtime.background_task_status();
1516 assert!(status.watchdog_scan_age.is_some());
1517 }
1518
1519 #[test]
1520 fn test_metrics_flush() {
1521 let runtime = RuntimeBuilder::new().build().unwrap();
1522
1523 let metrics = runtime.flush_metrics();
1524 assert!(metrics.is_empty() || !metrics.is_empty()); let status = runtime.background_task_status();
1527 assert!(status.metrics_flush_age.is_some());
1528 }
1529
1530 #[test]
1531 fn test_shutdown_report() {
1532 let runtime = RuntimeBuilder::new().build().unwrap();
1533 runtime.start().unwrap();
1534
1535 runtime.record_kernel_launch();
1537 runtime.record_messages(100);
1538
1539 let report = runtime.complete_shutdown().unwrap();
1540
1541 assert_eq!(report.final_stats.kernels_launched, 1);
1542 assert_eq!(report.final_stats.messages_processed, 100);
1543 assert!(report.total_uptime.as_nanos() > 0);
1544 }
1545
1546 #[test]
1547 fn test_cannot_start_twice() {
1548 let runtime = RuntimeBuilder::new().build().unwrap();
1549 runtime.start().unwrap();
1550
1551 assert!(runtime.start().is_err());
1553 }
1554
1555 #[test]
1556 fn test_shutdown_from_initializing() {
1557 let runtime = RuntimeBuilder::new().build().unwrap();
1558 assert!(runtime.shutdown().is_ok());
1560 assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1561 }
1562
1563 #[test]
1568 fn test_enterprise_full_lifecycle() {
1569 let config = ConfigBuilder::new()
1571 .with_general(|g| g.app_name("integration-test").app_version("1.0.0"))
1572 .build()
1573 .unwrap();
1574
1575 let runtime = RuntimeBuilder::new().with_config(config).build().unwrap();
1576
1577 assert_eq!(runtime.lifecycle_state(), LifecycleState::Initializing);
1579 assert!(!runtime.is_accepting_work());
1580
1581 runtime.start().unwrap();
1583 assert_eq!(runtime.lifecycle_state(), LifecycleState::Running);
1584 assert!(runtime.is_accepting_work());
1585
1586 for _ in 0..10 {
1588 runtime.record_kernel_launch();
1589 runtime.record_messages(100);
1590 }
1591
1592 for _ in 0..3 {
1594 let result = runtime.run_health_check_cycle();
1595 assert_eq!(result.status, crate::health::HealthStatus::Healthy);
1596 }
1597
1598 let stats = runtime.stats();
1600 assert_eq!(stats.kernels_launched, 10);
1601 assert_eq!(stats.messages_processed, 1000);
1602 assert_eq!(stats.health_checks_run, 3);
1603
1604 runtime.request_shutdown().unwrap();
1606 assert_eq!(runtime.lifecycle_state(), LifecycleState::Draining);
1607
1608 let report = runtime.complete_shutdown().unwrap();
1609 assert_eq!(runtime.lifecycle_state(), LifecycleState::Stopped);
1610 assert_eq!(report.final_stats.kernels_launched, 10);
1611 }
1612
1613 #[test]
1614 fn test_circuit_breaker_integration() {
1615 let runtime = RuntimeBuilder::new().build().unwrap();
1616 runtime.start().unwrap();
1617
1618 let result = runtime.run_health_check_cycle();
1620 assert_eq!(result.circuit_state, CircuitState::Closed);
1621
1622 let cb = runtime.circuit_breaker();
1624 for _ in 0..10 {
1625 cb.record_failure();
1626 }
1627
1628 assert_eq!(cb.state(), CircuitState::Open);
1630
1631 let result = runtime.run_health_check_cycle();
1633 assert_eq!(result.circuit_state, CircuitState::Open);
1634 assert_eq!(result.status, crate::health::HealthStatus::Unhealthy);
1635 }
1636
1637 #[test]
1638 fn test_degradation_integration() {
1639 let runtime = RuntimeBuilder::new().build().unwrap();
1640 runtime.start().unwrap();
1641
1642 let result = runtime.run_health_check_cycle();
1644 assert_eq!(
1645 result.degradation_level,
1646 crate::health::DegradationLevel::Normal
1647 );
1648
1649 let cb = runtime.circuit_breaker();
1651 for _ in 0..10 {
1652 cb.record_failure();
1653 }
1654
1655 let result = runtime.run_health_check_cycle();
1657 assert_ne!(
1659 result.degradation_level,
1660 crate::health::DegradationLevel::Normal
1661 );
1662 }
1663
1664 #[test]
1665 fn test_configuration_presets_integration() {
1666 let dev = RuntimeBuilder::new().development().build().unwrap();
1668 assert_eq!(
1669 dev.config().general.environment,
1670 crate::config::Environment::Development
1671 );
1672 assert!(dev.config().observability.tracing_enabled);
1673
1674 let prod = RuntimeBuilder::new().production().build().unwrap();
1676 assert_eq!(
1677 prod.config().general.environment,
1678 crate::config::Environment::Production
1679 );
1680
1681 let perf = RuntimeBuilder::new().high_performance().build().unwrap();
1683 assert!(!perf.config().observability.tracing_enabled);
1684 }
1685
1686 #[test]
1687 fn test_multi_gpu_coordinator_access() {
1688 let runtime = RuntimeBuilder::new().build().unwrap();
1689
1690 let coordinator = runtime.multi_gpu_coordinator();
1692 assert_eq!(coordinator.device_count(), 0);
1693
1694 let device = crate::multi_gpu::DeviceInfo::new(
1696 0,
1697 "Test GPU".to_string(),
1698 crate::runtime::Backend::Cpu,
1699 );
1700 coordinator.register_device(device);
1701 assert_eq!(coordinator.device_count(), 1);
1702 }
1703
1704 #[test]
1705 fn test_background_task_tracking() {
1706 let runtime = RuntimeBuilder::new().build().unwrap();
1707 runtime.start().unwrap();
1708
1709 let status = runtime.background_task_status();
1711 assert!(status.health_check_age.is_none());
1712 assert!(status.watchdog_scan_age.is_none());
1713 assert!(status.metrics_flush_age.is_none());
1714
1715 runtime.run_health_check_cycle();
1717 let status = runtime.background_task_status();
1718 assert!(status.health_check_age.is_some());
1719
1720 runtime.run_watchdog_cycle();
1722 let status = runtime.background_task_status();
1723 assert!(status.watchdog_scan_age.is_some());
1724
1725 runtime.flush_metrics();
1727 let status = runtime.background_task_status();
1728 assert!(status.metrics_flush_age.is_some());
1729 }
1730
1731 #[test]
1736 fn test_monitoring_config_builder() {
1737 let config = MonitoringConfig::new()
1738 .health_check_interval(Duration::from_secs(5))
1739 .watchdog_interval(Duration::from_secs(2))
1740 .metrics_flush_interval(Duration::from_secs(30))
1741 .enable_health_checks(true)
1742 .enable_watchdog(false)
1743 .enable_metrics_flush(true);
1744
1745 assert_eq!(config.health_check_interval, Duration::from_secs(5));
1746 assert_eq!(config.watchdog_interval, Duration::from_secs(2));
1747 assert_eq!(config.metrics_flush_interval, Duration::from_secs(30));
1748 assert!(config.enable_health_checks);
1749 assert!(!config.enable_watchdog);
1750 assert!(config.enable_metrics_flush);
1751 }
1752
1753 #[test]
1754 fn test_monitoring_config_default() {
1755 let config = MonitoringConfig::default();
1756
1757 assert_eq!(config.health_check_interval, Duration::from_secs(10));
1758 assert_eq!(config.watchdog_interval, Duration::from_secs(5));
1759 assert_eq!(config.metrics_flush_interval, Duration::from_secs(60));
1760 assert!(config.enable_health_checks);
1761 assert!(config.enable_watchdog);
1762 assert!(config.enable_metrics_flush);
1763 }
1764
1765 #[tokio::test]
1766 async fn test_async_monitoring_start_stop() {
1767 let runtime = RuntimeBuilder::new().build().unwrap();
1768 runtime.start().unwrap();
1769
1770 let config = MonitoringConfig::new()
1772 .health_check_interval(Duration::from_millis(50))
1773 .watchdog_interval(Duration::from_millis(50))
1774 .metrics_flush_interval(Duration::from_millis(50));
1775
1776 let handles = runtime.start_monitoring(config);
1777
1778 assert!(handles.is_running());
1780
1781 tokio::time::sleep(Duration::from_millis(150)).await;
1783
1784 let status = runtime.background_task_status();
1786 assert!(status.health_check_age.is_some());
1787 assert!(status.watchdog_scan_age.is_some());
1788
1789 handles.signal_shutdown();
1791
1792 handles.wait_for_shutdown().await;
1794 }
1795
1796 #[tokio::test]
1797 async fn test_async_monitoring_default_config() {
1798 let runtime = RuntimeBuilder::new().build().unwrap();
1799 runtime.start().unwrap();
1800
1801 let handles = runtime.start_monitoring_default();
1803 assert!(handles.is_running());
1804
1805 handles.signal_shutdown();
1807 handles.wait_for_shutdown().await;
1808 }
1809
1810 #[tokio::test]
1811 async fn test_async_monitoring_selective_loops() {
1812 let runtime = RuntimeBuilder::new().build().unwrap();
1813 runtime.start().unwrap();
1814
1815 let config = MonitoringConfig::new()
1817 .health_check_interval(Duration::from_millis(50))
1818 .enable_health_checks(true)
1819 .enable_watchdog(false)
1820 .enable_metrics_flush(false);
1821
1822 let handles = runtime.start_monitoring(config);
1823
1824 assert!(handles.health_check_handle.is_some());
1826 assert!(handles.watchdog_handle.is_none());
1827 assert!(handles.metrics_flush_handle.is_none());
1828
1829 handles.signal_shutdown();
1830 handles.wait_for_shutdown().await;
1831 }
1832
1833 #[tokio::test]
1834 async fn test_async_monitoring_respects_shutdown_flag() {
1835 let runtime = RuntimeBuilder::new().build().unwrap();
1836 runtime.start().unwrap();
1837
1838 let config = MonitoringConfig::new().health_check_interval(Duration::from_millis(50));
1839
1840 let handles = runtime.start_monitoring(config);
1841
1842 runtime.request_shutdown().unwrap();
1844
1845 tokio::time::sleep(Duration::from_millis(100)).await;
1847
1848 handles.wait_for_shutdown().await;
1850 }
1851
1852 #[tokio::test]
1853 async fn test_monitoring_handles_is_running() {
1854 let runtime = RuntimeBuilder::new().build().unwrap();
1855 runtime.start().unwrap();
1856
1857 let config = MonitoringConfig::new().health_check_interval(Duration::from_millis(100));
1858
1859 let handles = runtime.start_monitoring(config);
1860 assert!(handles.is_running());
1861
1862 handles.signal_shutdown();
1863 handles.wait_for_shutdown().await;
1864
1865 }
1867}