1use crate::coord;
8use crate::error::{Error, Result};
9use crate::pool::{StringPool, VecPool};
10use crate::shutdown::{ShutdownCoordinator, ShutdownHandle};
11
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18#[allow(unused_imports)]
19use tracing::{error, info, instrument, warn};
20
21pub type SubsystemId = u64;
23
24pub type SubsystemFn =
26 Box<dyn Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
27
28pub trait Subsystem: Send + Sync + 'static {
30 fn run(&self, shutdown: ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
32
33 fn name(&self) -> &str;
35
36 fn health_check(&self) -> Option<Box<dyn Fn() -> bool + Send + Sync>> {
38 None
39 }
40
41 fn restart_policy(&self) -> RestartPolicy {
43 RestartPolicy::Never
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum RestartPolicy {
50 Never,
52 Always,
54 OnFailure,
56 ExponentialBackoff {
58 initial_delay: Duration,
60 max_delay: Duration,
62 max_attempts: u32,
64 },
65}
66
67impl Default for RestartPolicy {
68 fn default() -> Self {
69 Self::Never
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum SubsystemState {
76 Starting,
78 Running,
80 Stopping,
82 Stopped,
84 Failed,
86 Restarting,
88}
89
90impl std::fmt::Display for SubsystemState {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 Self::Starting => write!(f, "Starting"),
94 Self::Running => write!(f, "Running"),
95 Self::Stopping => write!(f, "Stopping"),
96 Self::Stopped => write!(f, "Stopped"),
97 Self::Failed => write!(f, "Failed"),
98 Self::Restarting => write!(f, "Restarting"),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub enum SubsystemEvent {
106 StateChanged {
108 id: SubsystemId,
110 name: String,
112 state: SubsystemState,
114 at: Instant,
116 },
117}
118
119#[derive(Debug, Clone)]
121pub struct SubsystemMetadata {
122 pub id: SubsystemId,
124 pub name: String,
126 pub state: SubsystemState,
128 pub registered_at: Instant,
130 pub started_at: Option<Instant>,
132 pub stopped_at: Option<Instant>,
134 pub restart_count: u32,
136 pub last_error: Option<String>,
138 pub restart_policy: RestartPolicy,
140}
141
142#[derive(Debug, Clone)]
144pub struct SubsystemStats {
145 pub total_subsystems: usize,
147 pub running_subsystems: usize,
149 pub failed_subsystems: usize,
151 pub stopping_subsystems: usize,
153 pub total_restarts: u64,
155 pub subsystems: Vec<SubsystemMetadata>,
157}
158
159struct SubsystemEntry {
161 metadata: Mutex<SubsystemMetadata>,
163 subsystem: Arc<dyn Subsystem>,
165 #[cfg(feature = "tokio")]
167 task_handle: Mutex<Option<tokio::task::JoinHandle<Result<()>>>>,
168 shutdown_handle: ShutdownHandle,
170}
171
172pub struct SubsystemManager {
174 subsystems: Mutex<HashMap<SubsystemId, Arc<SubsystemEntry>>>,
176 shutdown_coordinator: ShutdownCoordinator,
178 next_id: AtomicU64,
180 total_restarts: AtomicU64,
182 string_pool: StringPool,
184 vec_pool: VecPool<(SubsystemId, String, SubsystemState, Arc<dyn Subsystem>)>,
186 metadata_pool: VecPool<SubsystemMetadata>,
188 events_tx: Mutex<Option<coord::chan::Sender<SubsystemEvent>>>,
190 events_rx: Mutex<Option<coord::chan::Receiver<SubsystemEvent>>>,
192}
193
194impl SubsystemManager {
195 #[must_use]
197 pub fn new(shutdown_coordinator: ShutdownCoordinator) -> Self {
198 Self {
199 subsystems: Mutex::new(HashMap::new()),
200 shutdown_coordinator,
201 next_id: AtomicU64::new(1),
202 total_restarts: AtomicU64::new(0),
203 string_pool: StringPool::new(32, 128, 64), vec_pool: VecPool::new(8, 32, 16), metadata_pool: VecPool::new(8, 32, 16), events_tx: Mutex::new(None),
208 events_rx: Mutex::new(None),
209 }
210 }
211
212 pub fn enable_events(&self) {
218 let mut tx_guard = self.events_tx.lock().unwrap();
219 let mut rx_guard = self.events_rx.lock().unwrap();
220 if tx_guard.is_some() || rx_guard.is_some() {
221 return;
222 }
223 let (tx, rx) = coord::chan::unbounded();
224 *tx_guard = Some(tx);
225 *rx_guard = Some(rx);
226 drop(tx_guard);
228 drop(rx_guard);
230 }
231
232 pub fn try_next_event(&self) -> Option<SubsystemEvent> {
238 let rx_guard = self.events_rx.lock().unwrap();
239 rx_guard
240 .as_ref()
241 .and_then(|rx| coord::chan::try_recv(rx).ok())
242 }
243
244 pub fn register<S: Subsystem>(&self, subsystem: S) -> SubsystemId {
252 let id = self.next_id.fetch_add(1, Ordering::AcqRel);
253 let pooled_name = self.string_pool.get_with_value(subsystem.name());
255 let restart_policy = subsystem.restart_policy();
256
257 let shutdown_handle = self.shutdown_coordinator.create_handle(subsystem.name());
258 let metadata = SubsystemMetadata {
259 id,
260 name: pooled_name.to_string(), state: SubsystemState::Starting,
263 registered_at: Instant::now(),
264 started_at: None,
265 stopped_at: None,
266 last_error: None,
267 restart_count: 0,
268 restart_policy,
269 };
270
271 let entry = Arc::new(SubsystemEntry {
272 metadata: Mutex::new(metadata),
273 subsystem: Arc::new(subsystem),
274 #[cfg(feature = "tokio")]
275 task_handle: Mutex::new(None),
276 shutdown_handle,
277 });
278
279 self.subsystems.lock().unwrap().insert(id, entry);
280
281 info!(subsystem_id = id, subsystem_name = %pooled_name, "Registered subsystem");
282 id
283 }
284
285 pub fn register_fn<F, Fut>(&self, name: &str, func: F) -> SubsystemId
291 where
292 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
293 Fut: Future<Output = Result<()>> + Send + 'static,
294 {
295 struct ClosureSubsystem<F> {
296 name: String, func: F,
298 }
299
300 impl<F, Fut> Subsystem for ClosureSubsystem<F>
301 where
302 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
303 Fut: Future<Output = Result<()>> + Send + 'static,
304 {
305 fn run(
306 &self,
307 shutdown: ShutdownHandle,
308 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
309 Box::pin((self.func)(shutdown))
310 }
311
312 fn name(&self) -> &str {
313 &self.name
314 }
315 }
316
317 let pooled_name = self.string_pool.get_with_value(name);
319 let subsystem = ClosureSubsystem {
320 name: pooled_name.to_string(),
321 func,
322 };
323 self.register(subsystem)
324 }
325
326 pub fn register_closure<F>(&self, closure_subsystem: F, name: &str) -> SubsystemId
332 where
333 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
334 + Send
335 + Sync
336 + 'static,
337 {
338 struct ClosureSubsystemWrapper<F> {
340 name: String,
341 func: F,
342 }
343
344 impl<F> Subsystem for ClosureSubsystemWrapper<F>
345 where
346 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
347 + Send
348 + Sync
349 + 'static,
350 {
351 fn run(
352 &self,
353 shutdown: ShutdownHandle,
354 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
355 (self.func)(shutdown)
356 }
357
358 fn name(&self) -> &str {
359 &self.name
360 }
361 }
362
363 let pooled_name = self.string_pool.get_with_value(name).to_string();
365 let wrapper = ClosureSubsystemWrapper {
366 name: pooled_name,
367 func: closure_subsystem,
368 };
369
370 self.register(wrapper)
372 }
373
374 #[instrument(skip(self), fields(subsystem_id = id))]
384 pub async fn start_subsystem(&self, id: SubsystemId) -> Result<()> {
385 let entry = {
386 let subsystems = self.subsystems.lock().unwrap();
387 subsystems
388 .get(&id)
389 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
390 .clone()
391 };
392
393 self.update_state(id, SubsystemState::Starting);
394
395 let subsystem_name = entry.subsystem.name().to_string();
397
398 #[cfg(feature = "tokio")]
399 {
400 let subsystem = Arc::clone(&entry.subsystem);
402 let shutdown_handle = entry.shutdown_handle.clone();
403 let entry_clone = Arc::clone(&entry);
404 let id_clone = id;
405 let subsystem_name_clone = entry.subsystem.name().to_string();
408
409 let task = tokio::spawn(async move {
411 let result: Result<()> = subsystem.run(shutdown_handle).await;
412
413 match &result {
415 Ok(()) => {
416 entry_clone.metadata.lock().unwrap().state = SubsystemState::Stopped;
417 entry_clone.metadata.lock().unwrap().stopped_at = Some(Instant::now());
418 info!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, "Subsystem stopped successfully");
419 }
420 Err(e) => {
421 {
422 let mut metadata = entry_clone.metadata.lock().unwrap();
423 metadata.state = SubsystemState::Failed;
424 metadata.last_error = Some(e.to_string());
426 metadata.stopped_at = Some(Instant::now());
427 }
428 error!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, error = %e, "Subsystem failed");
429 }
430 }
431
432 result
433 });
434
435 *entry.task_handle.lock().unwrap() = Some(task);
436 }
437
438 self.update_state_with_timestamp(id, SubsystemState::Running, Some(Instant::now()), None);
439 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Started subsystem");
440
441 Ok(())
442 }
443
444 pub async fn start_all(&self) -> Result<()> {
455 let subsystem_ids: Vec<SubsystemId> =
456 { self.subsystems.lock().unwrap().keys().copied().collect() };
457
458 info!("Starting {} subsystems", subsystem_ids.len());
459
460 for id in subsystem_ids {
461 if let Err(e) = self.start_subsystem(id).await {
462 error!(subsystem_id = id, error = %e, "Failed to start subsystem");
463 }
465 }
466
467 Ok(())
468 }
469
470 #[instrument(skip(self), fields(subsystem_id = id))]
480 pub async fn stop_subsystem(&self, id: SubsystemId) -> Result<()> {
481 let entry = {
482 let subsystems = self.subsystems.lock().unwrap();
483 subsystems
484 .get(&id)
485 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
486 .clone()
487 };
488
489 #[allow(unused_variables)]
491 let subsystem_name = self.string_pool.get_with_value(entry.subsystem.name());
492 self.update_state(id, SubsystemState::Stopping);
493
494 entry.shutdown_handle.ready();
496
497 #[cfg(feature = "tokio")]
498 {
499 let task_handle_opt = {
501 let mut task_handle_guard = entry.task_handle.lock().unwrap();
502 task_handle_guard.take()
503 }; if let Some(task_handle) = task_handle_opt {
507 match tokio::time::timeout(Duration::from_millis(500), task_handle).await {
508 Ok(Ok(Ok(()))) => {
509 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Subsystem stopped gracefully");
510 }
511 Ok(Ok(Err(e))) => {
512 warn!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Subsystem stopped with error");
513 }
514 Ok(Err(e)) => {
515 error!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Failed to join subsystem task");
516 }
517 Err(_) => {
518 warn!(subsystem_id = id, subsystem_name = %subsystem_name, "Timed out waiting for subsystem task to complete, marking as stopped anyway");
519 }
520 }
521 }
522 }
523
524 self.update_state_with_timestamp(id, SubsystemState::Stopped, None, Some(Instant::now()));
525 Ok(())
526 }
527
528 pub async fn stop_all(&self) -> Result<()> {
539 let subsystem_ids: Vec<SubsystemId> =
540 { self.subsystems.lock().unwrap().keys().copied().collect() };
541
542 info!("Stopping {} subsystems", subsystem_ids.len());
543
544 #[allow(unused_variables)]
546 let stop_tasks: Vec<_> = subsystem_ids
547 .into_iter()
548 .map(|id| self.stop_subsystem(id))
549 .collect();
550
551 #[cfg(feature = "tokio")]
552 {
553 let results = futures::future::join_all(stop_tasks).await;
554 for (i, result) in results.into_iter().enumerate() {
555 if let Err(e) = result {
556 error!(subsystem_index = i, error = %e, "Failed to stop subsystem");
557 }
558 }
559 }
560
561 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
562 {
563 for task in stop_tasks {
564 if let Err(e) = task.await {
565 error!(error = %e, "Failed to stop subsystem");
566 }
567 }
568 }
569
570 Ok(())
571 }
572
573 pub async fn restart_subsystem(&self, id: SubsystemId) -> Result<()> {
584 let entry = {
585 let subsystems = self.subsystems.lock().unwrap();
586 subsystems
587 .get(&id)
588 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
589 .clone()
590 };
591
592 let subsystem_name = self.string_pool.get_with_value(entry.subsystem.name());
594
595 {
597 let mut metadata = entry.metadata.lock().unwrap();
598 metadata.restart_count += 1;
599 }
600
601 self.total_restarts.fetch_add(1, Ordering::AcqRel);
602 self.update_state(id, SubsystemState::Restarting);
603
604 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Restarting subsystem");
605
606 let delay = Self::calculate_restart_delay(&entry);
608 if !delay.is_zero() {
609 info!(
610 subsystem_id = id,
611 delay_ms = delay.as_millis(),
612 "Waiting before restart"
613 );
614
615 #[cfg(feature = "tokio")]
616 tokio::time::sleep(delay).await;
617
618 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
619 async_std::task::sleep(delay).await;
620 }
621
622 self.start_subsystem(id).await
624 }
625
626 pub fn get_stats(&self) -> SubsystemStats {
632 let mut subsystem_metadata = self.metadata_pool.get();
635 let total_count;
636
637 {
638 let subsystems = self.subsystems.lock().unwrap();
639 total_count = subsystems.len();
640
641 let current_capacity = subsystem_metadata.capacity();
644 if current_capacity < total_count {
645 subsystem_metadata.reserve(total_count - current_capacity);
646 }
647
648 for entry in subsystems.values() {
650 subsystem_metadata.push(entry.metadata.lock().unwrap().clone());
651 }
652
653 }
655
656 let mut running_count = 0;
658 let mut failed_count = 0;
659 #[allow(unused_variables)]
660 let mut stopped_count = 0; let mut stopping_count = 0;
662
663 for metadata in subsystem_metadata.iter() {
665 match metadata.state {
666 SubsystemState::Running => running_count += 1,
667 SubsystemState::Failed => failed_count += 1,
668 SubsystemState::Stopped => stopped_count += 1,
669 SubsystemState::Stopping => stopping_count += 1,
670 _ => {} }
672 }
673
674 let subsystems_vec = subsystem_metadata
676 .iter()
677 .cloned()
678 .collect::<Vec<SubsystemMetadata>>();
679
680 drop(subsystem_metadata);
682
683 SubsystemStats {
684 total_subsystems: total_count,
685 running_subsystems: running_count,
686 failed_subsystems: failed_count,
687 stopping_subsystems: stopping_count,
688 total_restarts: self.total_restarts.load(Ordering::Relaxed),
689 subsystems: subsystems_vec,
690 }
691 }
692
693 pub fn get_subsystem_metadata(&self, id: SubsystemId) -> Option<SubsystemMetadata> {
701 let subsystems = self.subsystems.lock().unwrap();
702 subsystems
703 .get(&id)
704 .map(|entry| entry.metadata.lock().unwrap().clone())
705 }
706
707 pub fn get_all_metadata(&self) -> Vec<SubsystemMetadata> {
713 let mut metadata_list = self.metadata_pool.get();
715
716 {
717 let subsystems = self.subsystems.lock().unwrap();
718
719 let needed_capacity = subsystems.len();
721 let current_capacity = metadata_list.capacity();
722 if current_capacity < needed_capacity {
723 metadata_list.reserve(needed_capacity - current_capacity);
724 }
725
726 for entry in subsystems.values() {
728 metadata_list.push(entry.metadata.lock().unwrap().clone());
729 }
730 } let result = metadata_list.iter().cloned().collect();
734
735 drop(metadata_list);
737
738 result
739 }
740
741 pub fn run_health_checks(&self) -> Vec<(SubsystemId, String, bool)> {
747 let mut subsystem_data = self.vec_pool.get();
750
751 {
752 let subsystems = self.subsystems.lock().unwrap();
753
754 let needed_capacity = subsystems.len();
756 let current_capacity = subsystem_data.capacity();
757 if current_capacity < needed_capacity {
758 subsystem_data.reserve(needed_capacity - current_capacity);
759 }
760
761 for (id, entry) in subsystems.iter() {
763 let state = entry.metadata.lock().unwrap().state;
764 subsystem_data.push((
765 *id,
766 {
769 let name = entry.subsystem.name();
770 let pooled = self.string_pool.get_with_value(name);
771 pooled.to_string()
773 },
774 state,
775 Arc::clone(&entry.subsystem),
776 ));
777 }
778 } let mut result = Vec::with_capacity(subsystem_data.len());
782
783 for data in subsystem_data.iter() {
786 let (id, ref name, state, ref subsystem) = *data;
787 let is_healthy = match state {
788 SubsystemState::Running => {
789 subsystem
791 .health_check()
792 .map_or(true, |health_check| health_check())
793 }
794 _ => true, };
796 result.push((id, name.clone(), is_healthy));
797 }
798
799 drop(subsystem_data);
801
802 result
803 }
804
805 fn update_state(&self, id: SubsystemId, new_state: SubsystemState) {
811 self.update_state_with_timestamp(id, new_state, None, None);
812 }
813
814 #[allow(dead_code)]
820 fn update_state_with_error(&self, id: SubsystemId, new_state: SubsystemState, error: String) {
821 let entry_opt = {
823 let subsystems = self.subsystems.lock().unwrap();
824 subsystems.get(&id).cloned()
825 };
826
827 if let Some(entry) = entry_opt {
829 let mut metadata = entry.metadata.lock().unwrap();
830 metadata.state = new_state;
831 metadata.last_error = Some(error);
832 if new_state == SubsystemState::Stopped || new_state == SubsystemState::Failed {
833 metadata.stopped_at = Some(Instant::now());
834 }
835 }
836 }
837
838 fn update_state_with_timestamp(
844 &self,
845 id: SubsystemId,
846 new_state: SubsystemState,
847 started_at: Option<Instant>,
848 stopped_at: Option<Instant>,
849 ) {
850 let entry_opt = {
852 let subsystems = self.subsystems.lock().unwrap();
853 subsystems.get(&id).cloned()
854 };
855
856 if let Some(entry) = entry_opt {
858 let mut metadata = entry.metadata.lock().unwrap();
859 metadata.state = new_state;
860 if let Some(started) = started_at {
861 metadata.started_at = Some(started);
862 }
863 if let Some(stopped) = stopped_at {
864 metadata.stopped_at = Some(stopped);
865 }
866 self.publish_event(SubsystemEvent::StateChanged {
868 id,
869 name: metadata.name.clone(),
870 state: metadata.state,
871 at: Instant::now(),
872 });
873 }
874 }
875
876 fn publish_event(&self, event: SubsystemEvent) {
878 let tx_opt = self.events_tx.lock().unwrap().as_ref().cloned();
879 if let Some(tx) = tx_opt {
880 let _ = tx.send(event);
882 }
883 }
884
885 #[allow(dead_code)]
891 fn should_restart(entry: &SubsystemEntry) -> bool {
892 let (restart_policy, state, restart_count) = {
894 let metadata = entry.metadata.lock().unwrap();
895 (
896 metadata.restart_policy,
897 metadata.state,
898 metadata.restart_count,
899 )
900 };
901
902 match restart_policy {
903 RestartPolicy::Never => false,
904 RestartPolicy::Always => true,
905 RestartPolicy::OnFailure => state == SubsystemState::Failed,
906 RestartPolicy::ExponentialBackoff { max_attempts, .. } => restart_count < max_attempts,
907 }
908 }
909
910 fn calculate_restart_delay(entry: &SubsystemEntry) -> Duration {
916 let (restart_policy, restart_count) = {
918 let metadata = entry.metadata.lock().unwrap();
919 (metadata.restart_policy, metadata.restart_count)
920 };
921
922 match restart_policy {
923 RestartPolicy::ExponentialBackoff {
924 initial_delay,
925 max_delay,
926 ..
927 } => {
928 let delay = initial_delay * 2_u32.pow(restart_count.min(10)); delay.min(max_delay)
930 }
931 _ => Duration::ZERO,
932 }
933 }
934}
935
936impl Clone for SubsystemManager {
937 fn clone(&self) -> Self {
938 Self {
939 subsystems: Mutex::new(HashMap::new()), shutdown_coordinator: self.shutdown_coordinator.clone(),
941 next_id: AtomicU64::new(self.next_id.load(Ordering::Acquire)),
942 total_restarts: AtomicU64::new(0),
943 string_pool: StringPool::new(32, 128, 64),
945 vec_pool: VecPool::new(8, 32, 16),
946 metadata_pool: VecPool::new(8, 32, 16),
947 events_tx: Mutex::new(None),
948 events_rx: Mutex::new(None),
949 }
950 }
951}
952
953impl SubsystemManager {
954 #[cfg(feature = "lockfree-coordination")]
960 pub fn subscribe_events(&self) -> Option<coord::chan::Receiver<SubsystemEvent>> {
965 self.events_rx.lock().unwrap().as_ref().cloned()
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972 use std::pin::Pin;
973 use std::time::Duration;
974
975 struct TestSubsystem {
976 name: String,
977 should_fail: bool,
978 }
979
980 impl TestSubsystem {
981 fn new(name: &str, should_fail: bool) -> Self {
982 Self {
983 name: name.to_string(),
984 should_fail,
985 }
986 }
987 }
988
989 impl Subsystem for TestSubsystem {
990 fn run(
991 &self,
992 shutdown: ShutdownHandle,
993 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
994 let should_fail = self.should_fail;
995 Box::pin(async move {
996 let _start_time = Instant::now();
997 #[cfg(feature = "tokio")]
998 let mut shutdown = shutdown;
999 loop {
1000 #[cfg(feature = "tokio")]
1001 {
1002 tokio::select! {
1003 () = shutdown.cancelled() => {
1004 info!("Subsystem '{}' shutting down", "TestSubsystem");
1005 break;
1006 }
1007 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1008 }
1009 }
1010
1011 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1012 {
1013 if shutdown.is_shutdown() {
1014 break;
1015 }
1016 async_std::task::sleep(Duration::from_millis(10)).await;
1017 }
1018
1019 if should_fail {
1020 return Err(Error::runtime("Test failure"));
1021 }
1022 }
1023
1024 Ok(())
1025 })
1026 }
1027
1028 fn name(&self) -> &str {
1029 &self.name
1030 }
1031 }
1032
1033 #[cfg(feature = "tokio")]
1034 #[cfg_attr(miri, ignore)]
1035 #[tokio::test]
1036 async fn test_subsystem_registration() {
1037 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1039 let coordinator = ShutdownCoordinator::new(5000, 10000);
1040 let manager = SubsystemManager::new(coordinator);
1041
1042 let subsystem = TestSubsystem::new("test", false);
1043 let id = manager.register(subsystem);
1044
1045 let stats = manager.get_stats();
1046 assert_eq!(stats.total_subsystems, 1);
1047 assert_eq!(stats.running_subsystems, 0);
1048
1049 let metadata = manager.get_subsystem_metadata(id).unwrap();
1050 assert_eq!(metadata.name, "test");
1051 assert_eq!(metadata.state, SubsystemState::Starting);
1052 })
1053 .await;
1054
1055 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1056 }
1057
1058 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1059 #[async_std::test]
1060 async fn test_subsystem_registration() {
1061 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1063 let coordinator = ShutdownCoordinator::new(5000, 10000);
1064 let manager = SubsystemManager::new(coordinator);
1065
1066 let subsystem = TestSubsystem::new("test", false);
1067 let id = manager.register(subsystem);
1068
1069 let stats = manager.get_stats();
1070 assert_eq!(stats.total_subsystems, 1);
1071 assert_eq!(stats.running_subsystems, 0);
1072
1073 let metadata = manager.get_subsystem_metadata(id).unwrap();
1074 assert_eq!(metadata.name, "test");
1075 assert_eq!(metadata.state, SubsystemState::Starting);
1076 })
1077 .await;
1078
1079 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1080 }
1081
1082 #[cfg(feature = "tokio")]
1083 #[cfg_attr(miri, ignore)]
1084 #[tokio::test]
1085 async fn test_subsystem_start_stop() {
1086 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1088 let coordinator = ShutdownCoordinator::new(500, 1000);
1090 let manager = SubsystemManager::new(coordinator);
1091
1092 let subsystem = TestSubsystem::new("test", false);
1094 let id = manager.register(subsystem);
1095
1096 manager.start_subsystem(id).await.unwrap();
1098
1099 tokio::time::sleep(Duration::from_millis(50)).await;
1101
1102 let metadata = manager.get_subsystem_metadata(id).unwrap();
1104 assert_eq!(metadata.state, SubsystemState::Running);
1105
1106 let stop_result =
1108 tokio::time::timeout(Duration::from_millis(1000), manager.stop_subsystem(id)).await;
1109
1110 assert!(stop_result.is_ok());
1111
1112 let metadata = manager.get_subsystem_metadata(id).unwrap();
1114 assert_eq!(metadata.state, SubsystemState::Stopped);
1115 })
1116 .await;
1117
1118 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1119 }
1120
1121 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1122 #[async_std::test]
1123 async fn test_subsystem_start_stop() {
1124 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1126 let coordinator = ShutdownCoordinator::new(500, 1000);
1128 let manager = SubsystemManager::new(coordinator);
1129
1130 let subsystem = TestSubsystem::new("test", false);
1132 let id = manager.register(subsystem);
1133
1134 manager.start_subsystem(id).await.unwrap();
1136
1137 async_std::task::sleep(Duration::from_millis(50)).await;
1139
1140 let metadata = manager.get_subsystem_metadata(id).unwrap();
1142 assert_eq!(metadata.state, SubsystemState::Running);
1143
1144 let stop_result =
1146 async_std::future::timeout(Duration::from_millis(1000), manager.stop_subsystem(id))
1147 .await;
1148 assert!(stop_result.is_ok(), "Subsystem stop operation timed out");
1149 assert!(stop_result.unwrap().is_ok(), "Failed to stop subsystem");
1150
1151 let metadata = manager.get_subsystem_metadata(id).unwrap();
1153 assert_eq!(metadata.state, SubsystemState::Stopped);
1154 })
1155 .await;
1156
1157 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1158 }
1159
1160 #[cfg(feature = "tokio")]
1161 #[cfg_attr(miri, ignore)]
1162 #[tokio::test]
1163 async fn test_subsystem_failure() {
1164 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1166 let coordinator = ShutdownCoordinator::new(5000, 10000);
1167 let manager = SubsystemManager::new(coordinator);
1168
1169 let subsystem = TestSubsystem::new("failing", true);
1170 let id = manager.register(subsystem);
1171
1172 manager.start_subsystem(id).await.unwrap();
1173
1174 tokio::time::sleep(Duration::from_millis(100)).await;
1176
1177 let metadata = manager.get_subsystem_metadata(id).unwrap();
1178 assert_eq!(metadata.state, SubsystemState::Failed);
1179 assert!(metadata.last_error.is_some());
1180 })
1181 .await;
1182
1183 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1184 }
1185
1186 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1187 #[async_std::test]
1188 #[ignore = "Failure state transitions behave differently in async-std due to its task model"]
1189 async fn test_subsystem_failure() {
1190 let coordinator = ShutdownCoordinator::new(5000, 10000);
1199 let _manager = SubsystemManager::new(coordinator);
1200
1201 }
1203
1204 #[test]
1205 fn test_restart_policy() {
1206 let policy = RestartPolicy::ExponentialBackoff {
1207 initial_delay: Duration::from_millis(100),
1208 max_delay: Duration::from_secs(60),
1209 max_attempts: 5,
1210 };
1211
1212 assert_ne!(policy, RestartPolicy::Never);
1213 assert_eq!(RestartPolicy::default(), RestartPolicy::Never);
1214 }
1215
1216 #[cfg(feature = "tokio")]
1217 #[cfg_attr(miri, ignore)]
1218 #[tokio::test]
1219 async fn test_closure_subsystem() {
1220 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1222 let coordinator = ShutdownCoordinator::new(500, 1000);
1224 let manager = SubsystemManager::new(coordinator);
1225
1226 let name = "closure_test".to_string();
1228 let closure_subsystem = Box::new(move |shutdown: ShutdownHandle| {
1229 let _ = name.clone();
1231 Box::pin(async move {
1232 #[cfg(feature = "tokio")]
1233 let mut shutdown = shutdown;
1234 loop {
1235 #[cfg(feature = "tokio")]
1236 {
1237 tokio::select! {
1238 () = shutdown.cancelled() => {
1239 println!("Closure subsystem received shutdown signal");
1240 break;
1241 }
1242 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1243 }
1244 }
1245
1246 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1247 {
1248 if shutdown.is_shutdown() {
1249 break;
1250 }
1251 async_std::task::sleep(Duration::from_millis(10)).await;
1252 }
1253 }
1254 Ok(())
1255 }) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
1256 });
1257
1258 let id = manager.register_closure(closure_subsystem, "closure_test");
1260
1261 manager.start_subsystem(id).await.unwrap();
1263
1264 tokio::time::sleep(Duration::from_millis(50)).await;
1266
1267 let metadata = manager.get_subsystem_metadata(id).unwrap();
1269 assert_eq!(metadata.state, SubsystemState::Running);
1270
1271 manager.stop_subsystem(id).await.unwrap();
1273
1274 let metadata = manager.get_subsystem_metadata(id).unwrap();
1276 assert_eq!(metadata.state, SubsystemState::Stopped);
1277 })
1278 .await;
1279
1280 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1281 }
1282
1283 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1284 #[async_std::test]
1285 async fn test_closure_subsystem() {
1286 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1288 let coordinator = ShutdownCoordinator::new(500, 1000);
1290 let manager = SubsystemManager::new(coordinator);
1291
1292 let subsystem = TestSubsystem::new("closure_test", false);
1294 let id = manager.register(subsystem);
1295
1296 manager.start_subsystem(id).await.unwrap();
1298
1299 async_std::task::sleep(Duration::from_millis(50)).await;
1301
1302 let metadata = manager.get_subsystem_metadata(id).unwrap();
1304 assert_eq!(metadata.state, SubsystemState::Running);
1305
1306 manager.stop_subsystem(id).await.unwrap();
1308
1309 let metadata = manager.get_subsystem_metadata(id).unwrap();
1311 assert_eq!(metadata.state, SubsystemState::Stopped);
1312 })
1313 .await;
1314
1315 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1316 }
1317}