1use crate::error::{Error, Result};
8use crate::pool::{StringPool, VecPool};
9use crate::shutdown::{ShutdownCoordinator, ShutdownHandle};
10
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17#[allow(unused_imports)]
18use tracing::{error, info, instrument, warn};
19
20pub type SubsystemId = u64;
22
23pub type SubsystemFn =
25 Box<dyn Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
26
27pub trait Subsystem: Send + Sync + 'static {
29 fn run(&self, shutdown: ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
31
32 fn name(&self) -> &str;
34
35 fn health_check(&self) -> Option<Box<dyn Fn() -> bool + Send + Sync>> {
37 None
38 }
39
40 fn restart_policy(&self) -> RestartPolicy {
42 RestartPolicy::Never
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RestartPolicy {
49 Never,
51 Always,
53 OnFailure,
55 ExponentialBackoff {
57 initial_delay: Duration,
59 max_delay: Duration,
61 max_attempts: u32,
63 },
64}
65
66impl Default for RestartPolicy {
67 fn default() -> Self {
68 Self::Never
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum SubsystemState {
75 Starting,
77 Running,
79 Stopping,
81 Stopped,
83 Failed,
85 Restarting,
87}
88
89impl std::fmt::Display for SubsystemState {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Self::Starting => write!(f, "Starting"),
93 Self::Running => write!(f, "Running"),
94 Self::Stopping => write!(f, "Stopping"),
95 Self::Stopped => write!(f, "Stopped"),
96 Self::Failed => write!(f, "Failed"),
97 Self::Restarting => write!(f, "Restarting"),
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
104pub struct SubsystemMetadata {
105 pub id: SubsystemId,
107 pub name: String,
109 pub state: SubsystemState,
111 pub registered_at: Instant,
113 pub started_at: Option<Instant>,
115 pub stopped_at: Option<Instant>,
117 pub restart_count: u32,
119 pub last_error: Option<String>,
121 pub restart_policy: RestartPolicy,
123}
124
125#[derive(Debug, Clone)]
127pub struct SubsystemStats {
128 pub total_subsystems: usize,
130 pub running_subsystems: usize,
132 pub failed_subsystems: usize,
134 pub stopping_subsystems: usize,
136 pub total_restarts: u64,
138 pub subsystems: Vec<SubsystemMetadata>,
140}
141
142struct SubsystemEntry {
144 metadata: Mutex<SubsystemMetadata>,
146 subsystem: Arc<dyn Subsystem>,
148 #[cfg(feature = "tokio")]
150 task_handle: Mutex<Option<tokio::task::JoinHandle<Result<()>>>>,
151 shutdown_handle: ShutdownHandle,
153}
154
155pub struct SubsystemManager {
157 subsystems: Mutex<HashMap<SubsystemId, Arc<SubsystemEntry>>>,
159 shutdown_coordinator: ShutdownCoordinator,
161 next_id: AtomicU64,
163 total_restarts: AtomicU64,
165 string_pool: StringPool,
167 vec_pool: VecPool<(SubsystemId, String, SubsystemState, Arc<dyn Subsystem>)>,
169 metadata_pool: VecPool<SubsystemMetadata>,
171}
172
173impl SubsystemManager {
174 #[must_use]
176 pub fn new(shutdown_coordinator: ShutdownCoordinator) -> Self {
177 Self {
178 subsystems: Mutex::new(HashMap::new()),
179 shutdown_coordinator,
180 next_id: AtomicU64::new(1),
181 total_restarts: AtomicU64::new(0),
182 string_pool: StringPool::new(32, 128, 64), vec_pool: VecPool::new(8, 32, 16), metadata_pool: VecPool::new(8, 32, 16), }
187 }
188
189 pub fn register<S: Subsystem>(&self, subsystem: S) -> SubsystemId {
197 let id = self.next_id.fetch_add(1, Ordering::AcqRel);
198 let pooled_name = self.string_pool.get_with_value(subsystem.name());
200 let restart_policy = subsystem.restart_policy();
201
202 let shutdown_handle = self.shutdown_coordinator.create_handle(subsystem.name());
203 let metadata = SubsystemMetadata {
204 id,
205 name: pooled_name.to_string(), state: SubsystemState::Starting,
208 registered_at: Instant::now(),
209 started_at: None,
210 stopped_at: None,
211 last_error: None,
212 restart_count: 0,
213 restart_policy,
214 };
215
216 let entry = Arc::new(SubsystemEntry {
217 metadata: Mutex::new(metadata),
218 subsystem: Arc::new(subsystem),
219 #[cfg(feature = "tokio")]
220 task_handle: Mutex::new(None),
221 shutdown_handle,
222 });
223
224 self.subsystems.lock().unwrap().insert(id, entry);
225
226 info!(subsystem_id = id, subsystem_name = %pooled_name, "Registered subsystem");
227 id
228 }
229
230 pub fn register_fn<F, Fut>(&self, name: &str, func: F) -> SubsystemId
236 where
237 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
238 Fut: Future<Output = Result<()>> + Send + 'static,
239 {
240 struct ClosureSubsystem<F> {
241 name: String, func: F,
243 }
244
245 impl<F, Fut> Subsystem for ClosureSubsystem<F>
246 where
247 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
248 Fut: Future<Output = Result<()>> + Send + 'static,
249 {
250 fn run(
251 &self,
252 shutdown: ShutdownHandle,
253 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
254 Box::pin((self.func)(shutdown))
255 }
256
257 fn name(&self) -> &str {
258 &self.name
259 }
260 }
261
262 let pooled_name = self.string_pool.get_with_value(name);
264 let subsystem = ClosureSubsystem {
265 name: pooled_name.to_string(),
266 func,
267 };
268 self.register(subsystem)
269 }
270
271 pub fn register_closure<F>(&self, closure_subsystem: F, name: &str) -> SubsystemId
277 where
278 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
279 + Send
280 + Sync
281 + 'static,
282 {
283 struct ClosureSubsystemWrapper<F> {
285 name: String,
286 func: F,
287 }
288
289 impl<F> Subsystem for ClosureSubsystemWrapper<F>
290 where
291 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
292 + Send
293 + Sync
294 + 'static,
295 {
296 fn run(
297 &self,
298 shutdown: ShutdownHandle,
299 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
300 (self.func)(shutdown)
301 }
302
303 fn name(&self) -> &str {
304 &self.name
305 }
306 }
307
308 let pooled_name = self.string_pool.get_with_value(name).to_string();
310 let wrapper = ClosureSubsystemWrapper {
311 name: pooled_name,
312 func: closure_subsystem,
313 };
314
315 self.register(wrapper)
317 }
318
319 #[instrument(skip(self), fields(subsystem_id = id))]
329 pub async fn start_subsystem(&self, id: SubsystemId) -> Result<()> {
330 let entry = {
331 let subsystems = self.subsystems.lock().unwrap();
332 subsystems
333 .get(&id)
334 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
335 .clone()
336 };
337
338 self.update_state(id, SubsystemState::Starting);
339
340 let subsystem_name = entry.subsystem.name().to_string();
342
343 #[cfg(feature = "tokio")]
344 {
345 let subsystem = Arc::clone(&entry.subsystem);
347 let shutdown_handle = entry.shutdown_handle.clone();
348 let entry_clone = Arc::clone(&entry);
349 let id_clone = id;
350 let subsystem_name_clone = entry.subsystem.name().to_string();
353
354 let task = tokio::spawn(async move {
356 let result: Result<()> = subsystem.run(shutdown_handle).await;
357
358 match &result {
360 Ok(()) => {
361 entry_clone.metadata.lock().unwrap().state = SubsystemState::Stopped;
362 entry_clone.metadata.lock().unwrap().stopped_at = Some(Instant::now());
363 info!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, "Subsystem stopped successfully");
364 }
365 Err(e) => {
366 {
367 let mut metadata = entry_clone.metadata.lock().unwrap();
368 metadata.state = SubsystemState::Failed;
369 metadata.last_error = Some(e.to_string());
371 metadata.stopped_at = Some(Instant::now());
372 }
373 error!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, error = %e, "Subsystem failed");
374 }
375 }
376
377 result
378 });
379
380 *entry.task_handle.lock().unwrap() = Some(task);
381 }
382
383 self.update_state_with_timestamp(id, SubsystemState::Running, Some(Instant::now()), None);
384 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Started subsystem");
385
386 Ok(())
387 }
388
389 pub async fn start_all(&self) -> Result<()> {
400 let subsystem_ids: Vec<SubsystemId> =
401 { self.subsystems.lock().unwrap().keys().copied().collect() };
402
403 info!("Starting {} subsystems", subsystem_ids.len());
404
405 for id in subsystem_ids {
406 if let Err(e) = self.start_subsystem(id).await {
407 error!(subsystem_id = id, error = %e, "Failed to start subsystem");
408 }
410 }
411
412 Ok(())
413 }
414
415 #[instrument(skip(self), fields(subsystem_id = id))]
425 pub async fn stop_subsystem(&self, id: SubsystemId) -> Result<()> {
426 let entry = {
427 let subsystems = self.subsystems.lock().unwrap();
428 subsystems
429 .get(&id)
430 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
431 .clone()
432 };
433
434 #[allow(unused_variables)]
436 let subsystem_name = self.string_pool.get_with_value(entry.subsystem.name());
437 self.update_state(id, SubsystemState::Stopping);
438
439 entry.shutdown_handle.ready();
441
442 #[cfg(feature = "tokio")]
443 {
444 let task_handle_opt = {
446 let mut task_handle_guard = entry.task_handle.lock().unwrap();
447 task_handle_guard.take()
448 }; if let Some(task_handle) = task_handle_opt {
452 match tokio::time::timeout(Duration::from_millis(500), task_handle).await {
453 Ok(Ok(Ok(()))) => {
454 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Subsystem stopped gracefully");
455 }
456 Ok(Ok(Err(e))) => {
457 warn!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Subsystem stopped with error");
458 }
459 Ok(Err(e)) => {
460 error!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Failed to join subsystem task");
461 }
462 Err(_) => {
463 warn!(subsystem_id = id, subsystem_name = %subsystem_name, "Timed out waiting for subsystem task to complete, marking as stopped anyway");
464 }
465 }
466 }
467 }
468
469 self.update_state_with_timestamp(id, SubsystemState::Stopped, None, Some(Instant::now()));
470 Ok(())
471 }
472
473 pub async fn stop_all(&self) -> Result<()> {
484 let subsystem_ids: Vec<SubsystemId> =
485 { self.subsystems.lock().unwrap().keys().copied().collect() };
486
487 info!("Stopping {} subsystems", subsystem_ids.len());
488
489 #[allow(unused_variables)]
491 let stop_tasks: Vec<_> = subsystem_ids
492 .into_iter()
493 .map(|id| self.stop_subsystem(id))
494 .collect();
495
496 #[cfg(feature = "tokio")]
497 {
498 let results = futures::future::join_all(stop_tasks).await;
499 for (i, result) in results.into_iter().enumerate() {
500 if let Err(e) = result {
501 error!(subsystem_index = i, error = %e, "Failed to stop subsystem");
502 }
503 }
504 }
505
506 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
507 {
508 for task in stop_tasks {
509 if let Err(e) = task.await {
510 error!(error = %e, "Failed to stop subsystem");
511 }
512 }
513 }
514
515 Ok(())
516 }
517
518 pub async fn restart_subsystem(&self, id: SubsystemId) -> Result<()> {
529 let entry = {
530 let subsystems = self.subsystems.lock().unwrap();
531 subsystems
532 .get(&id)
533 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
534 .clone()
535 };
536
537 let subsystem_name = self.string_pool.get_with_value(entry.subsystem.name());
539
540 {
542 let mut metadata = entry.metadata.lock().unwrap();
543 metadata.restart_count += 1;
544 }
545
546 self.total_restarts.fetch_add(1, Ordering::AcqRel);
547 self.update_state(id, SubsystemState::Restarting);
548
549 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Restarting subsystem");
550
551 let delay = Self::calculate_restart_delay(&entry);
553 if !delay.is_zero() {
554 info!(
555 subsystem_id = id,
556 delay_ms = delay.as_millis(),
557 "Waiting before restart"
558 );
559
560 #[cfg(feature = "tokio")]
561 tokio::time::sleep(delay).await;
562
563 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
564 async_std::task::sleep(delay).await;
565 }
566
567 self.start_subsystem(id).await
569 }
570
571 pub fn get_stats(&self) -> SubsystemStats {
577 let mut subsystem_metadata = self.metadata_pool.get();
580 let total_count;
581
582 {
583 let subsystems = self.subsystems.lock().unwrap();
584 total_count = subsystems.len();
585
586 let current_capacity = subsystem_metadata.capacity();
589 if current_capacity < total_count {
590 subsystem_metadata.reserve(total_count - current_capacity);
591 }
592
593 for entry in subsystems.values() {
595 subsystem_metadata.push(entry.metadata.lock().unwrap().clone());
596 }
597
598 }
600
601 let mut running_count = 0;
603 let mut failed_count = 0;
604 #[allow(unused_variables)]
605 let mut stopped_count = 0; let mut stopping_count = 0;
607
608 for metadata in subsystem_metadata.iter() {
610 match metadata.state {
611 SubsystemState::Running => running_count += 1,
612 SubsystemState::Failed => failed_count += 1,
613 SubsystemState::Stopped => stopped_count += 1,
614 SubsystemState::Stopping => stopping_count += 1,
615 _ => {} }
617 }
618
619 let subsystems_vec = subsystem_metadata
621 .iter()
622 .cloned()
623 .collect::<Vec<SubsystemMetadata>>();
624
625 drop(subsystem_metadata);
627
628 SubsystemStats {
629 total_subsystems: total_count,
630 running_subsystems: running_count,
631 failed_subsystems: failed_count,
632 stopping_subsystems: stopping_count,
633 total_restarts: self.total_restarts.load(Ordering::Relaxed),
634 subsystems: subsystems_vec,
635 }
636 }
637
638 pub fn get_subsystem_metadata(&self, id: SubsystemId) -> Option<SubsystemMetadata> {
646 let subsystems = self.subsystems.lock().unwrap();
647 subsystems
648 .get(&id)
649 .map(|entry| entry.metadata.lock().unwrap().clone())
650 }
651
652 pub fn get_all_metadata(&self) -> Vec<SubsystemMetadata> {
658 let mut metadata_list = self.metadata_pool.get();
660
661 {
662 let subsystems = self.subsystems.lock().unwrap();
663
664 let needed_capacity = subsystems.len();
666 let current_capacity = metadata_list.capacity();
667 if current_capacity < needed_capacity {
668 metadata_list.reserve(needed_capacity - current_capacity);
669 }
670
671 for entry in subsystems.values() {
673 metadata_list.push(entry.metadata.lock().unwrap().clone());
674 }
675 } let result = metadata_list.iter().cloned().collect();
679
680 drop(metadata_list);
682
683 result
684 }
685
686 pub fn run_health_checks(&self) -> Vec<(SubsystemId, String, bool)> {
692 let mut subsystem_data = self.vec_pool.get();
695
696 {
697 let subsystems = self.subsystems.lock().unwrap();
698
699 let needed_capacity = subsystems.len();
701 let current_capacity = subsystem_data.capacity();
702 if current_capacity < needed_capacity {
703 subsystem_data.reserve(needed_capacity - current_capacity);
704 }
705
706 for (id, entry) in subsystems.iter() {
708 let state = entry.metadata.lock().unwrap().state;
709 subsystem_data.push((
710 *id,
711 {
714 let name = entry.subsystem.name();
715 let pooled = self.string_pool.get_with_value(name);
716 pooled.to_string()
718 },
719 state,
720 Arc::clone(&entry.subsystem),
721 ));
722 }
723 } let mut result = Vec::with_capacity(subsystem_data.len());
727
728 for data in subsystem_data.iter() {
731 let (id, ref name, state, ref subsystem) = *data;
732 let is_healthy = match state {
733 SubsystemState::Running => {
734 subsystem
736 .health_check()
737 .map_or(true, |health_check| health_check())
738 }
739 _ => true, };
741 result.push((id, name.clone(), is_healthy));
742 }
743
744 drop(subsystem_data);
746
747 result
748 }
749
750 fn update_state(&self, id: SubsystemId, new_state: SubsystemState) {
756 self.update_state_with_timestamp(id, new_state, None, None);
757 }
758
759 #[allow(dead_code)]
765 fn update_state_with_error(&self, id: SubsystemId, new_state: SubsystemState, error: String) {
766 let entry_opt = {
768 let subsystems = self.subsystems.lock().unwrap();
769 subsystems.get(&id).cloned()
770 };
771
772 if let Some(entry) = entry_opt {
774 let mut metadata = entry.metadata.lock().unwrap();
775 metadata.state = new_state;
776 metadata.last_error = Some(error);
777 if new_state == SubsystemState::Stopped || new_state == SubsystemState::Failed {
778 metadata.stopped_at = Some(Instant::now());
779 }
780 }
781 }
782
783 fn update_state_with_timestamp(
789 &self,
790 id: SubsystemId,
791 new_state: SubsystemState,
792 started_at: Option<Instant>,
793 stopped_at: Option<Instant>,
794 ) {
795 let entry_opt = {
797 let subsystems = self.subsystems.lock().unwrap();
798 subsystems.get(&id).cloned()
799 };
800
801 if let Some(entry) = entry_opt {
803 let mut metadata = entry.metadata.lock().unwrap();
804 metadata.state = new_state;
805 if let Some(started) = started_at {
806 metadata.started_at = Some(started);
807 }
808 if let Some(stopped) = stopped_at {
809 metadata.stopped_at = Some(stopped);
810 }
811 }
812 }
813
814 #[allow(dead_code)]
820 fn should_restart(entry: &SubsystemEntry) -> bool {
821 let (restart_policy, state, restart_count) = {
823 let metadata = entry.metadata.lock().unwrap();
824 (
825 metadata.restart_policy,
826 metadata.state,
827 metadata.restart_count,
828 )
829 };
830
831 match restart_policy {
832 RestartPolicy::Never => false,
833 RestartPolicy::Always => true,
834 RestartPolicy::OnFailure => state == SubsystemState::Failed,
835 RestartPolicy::ExponentialBackoff { max_attempts, .. } => restart_count < max_attempts,
836 }
837 }
838
839 fn calculate_restart_delay(entry: &SubsystemEntry) -> Duration {
845 let (restart_policy, restart_count) = {
847 let metadata = entry.metadata.lock().unwrap();
848 (metadata.restart_policy, metadata.restart_count)
849 };
850
851 match restart_policy {
852 RestartPolicy::ExponentialBackoff {
853 initial_delay,
854 max_delay,
855 ..
856 } => {
857 let delay = initial_delay * 2_u32.pow(restart_count.min(10)); delay.min(max_delay)
859 }
860 _ => Duration::ZERO,
861 }
862 }
863}
864
865impl Clone for SubsystemManager {
866 fn clone(&self) -> Self {
867 Self {
868 subsystems: Mutex::new(HashMap::new()), shutdown_coordinator: self.shutdown_coordinator.clone(),
870 next_id: AtomicU64::new(self.next_id.load(Ordering::Acquire)),
871 total_restarts: AtomicU64::new(0),
872 string_pool: StringPool::new(32, 128, 64),
874 vec_pool: VecPool::new(8, 32, 16),
875 metadata_pool: VecPool::new(8, 32, 16),
876 }
877 }
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883 use std::pin::Pin;
884 use std::time::Duration;
885
886 struct TestSubsystem {
887 name: String,
888 should_fail: bool,
889 }
890
891 impl TestSubsystem {
892 fn new(name: &str, should_fail: bool) -> Self {
893 Self {
894 name: name.to_string(),
895 should_fail,
896 }
897 }
898 }
899
900 impl Subsystem for TestSubsystem {
901 fn run(
902 &self,
903 shutdown: ShutdownHandle,
904 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
905 let should_fail = self.should_fail;
906 Box::pin(async move {
907 let _start_time = Instant::now();
908 #[cfg(feature = "tokio")]
909 let mut shutdown = shutdown;
910 loop {
911 #[cfg(feature = "tokio")]
912 {
913 tokio::select! {
914 () = shutdown.cancelled() => {
915 info!("Subsystem '{}' shutting down", "TestSubsystem");
916 break;
917 }
918 () = tokio::time::sleep(Duration::from_millis(10)) => {}
919 }
920 }
921
922 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
923 {
924 if shutdown.is_shutdown() {
925 break;
926 }
927 async_std::task::sleep(Duration::from_millis(10)).await;
928 }
929
930 if should_fail {
931 return Err(Error::runtime("Test failure"));
932 }
933 }
934
935 Ok(())
936 })
937 }
938
939 fn name(&self) -> &str {
940 &self.name
941 }
942 }
943
944 #[cfg(feature = "tokio")]
945 #[cfg_attr(miri, ignore)]
946 #[tokio::test]
947 async fn test_subsystem_registration() {
948 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
950 let coordinator = ShutdownCoordinator::new(5000, 10000);
951 let manager = SubsystemManager::new(coordinator);
952
953 let subsystem = TestSubsystem::new("test", false);
954 let id = manager.register(subsystem);
955
956 let stats = manager.get_stats();
957 assert_eq!(stats.total_subsystems, 1);
958 assert_eq!(stats.running_subsystems, 0);
959
960 let metadata = manager.get_subsystem_metadata(id).unwrap();
961 assert_eq!(metadata.name, "test");
962 assert_eq!(metadata.state, SubsystemState::Starting);
963 })
964 .await;
965
966 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
967 }
968
969 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
970 #[async_std::test]
971 async fn test_subsystem_registration() {
972 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
974 let coordinator = ShutdownCoordinator::new(5000, 10000);
975 let manager = SubsystemManager::new(coordinator);
976
977 let subsystem = TestSubsystem::new("test", false);
978 let id = manager.register(subsystem);
979
980 let stats = manager.get_stats();
981 assert_eq!(stats.total_subsystems, 1);
982 assert_eq!(stats.running_subsystems, 0);
983
984 let metadata = manager.get_subsystem_metadata(id).unwrap();
985 assert_eq!(metadata.name, "test");
986 assert_eq!(metadata.state, SubsystemState::Starting);
987 })
988 .await;
989
990 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
991 }
992
993 #[cfg(feature = "tokio")]
994 #[cfg_attr(miri, ignore)]
995 #[tokio::test]
996 async fn test_subsystem_start_stop() {
997 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
999 let coordinator = ShutdownCoordinator::new(500, 1000);
1001 let manager = SubsystemManager::new(coordinator);
1002
1003 let subsystem = TestSubsystem::new("test", false);
1005 let id = manager.register(subsystem);
1006
1007 manager.start_subsystem(id).await.unwrap();
1009
1010 tokio::time::sleep(Duration::from_millis(50)).await;
1012
1013 let metadata = manager.get_subsystem_metadata(id).unwrap();
1015 assert_eq!(metadata.state, SubsystemState::Running);
1016
1017 let stop_result =
1019 tokio::time::timeout(Duration::from_millis(1000), manager.stop_subsystem(id)).await;
1020
1021 assert!(stop_result.is_ok());
1022
1023 let metadata = manager.get_subsystem_metadata(id).unwrap();
1025 assert_eq!(metadata.state, SubsystemState::Stopped);
1026 })
1027 .await;
1028
1029 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1030 }
1031
1032 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1033 #[async_std::test]
1034 async fn test_subsystem_start_stop() {
1035 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1037 let coordinator = ShutdownCoordinator::new(500, 1000);
1039 let manager = SubsystemManager::new(coordinator);
1040
1041 let subsystem = TestSubsystem::new("test", false);
1043 let id = manager.register(subsystem);
1044
1045 manager.start_subsystem(id).await.unwrap();
1047
1048 async_std::task::sleep(Duration::from_millis(50)).await;
1050
1051 let metadata = manager.get_subsystem_metadata(id).unwrap();
1053 assert_eq!(metadata.state, SubsystemState::Running);
1054
1055 let stop_result =
1057 async_std::future::timeout(Duration::from_millis(1000), manager.stop_subsystem(id))
1058 .await;
1059 assert!(stop_result.is_ok(), "Subsystem stop operation timed out");
1060 assert!(stop_result.unwrap().is_ok(), "Failed to stop subsystem");
1061
1062 let metadata = manager.get_subsystem_metadata(id).unwrap();
1064 assert_eq!(metadata.state, SubsystemState::Stopped);
1065 })
1066 .await;
1067
1068 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1069 }
1070
1071 #[cfg(feature = "tokio")]
1072 #[cfg_attr(miri, ignore)]
1073 #[tokio::test]
1074 async fn test_subsystem_failure() {
1075 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1077 let coordinator = ShutdownCoordinator::new(5000, 10000);
1078 let manager = SubsystemManager::new(coordinator);
1079
1080 let subsystem = TestSubsystem::new("failing", true);
1081 let id = manager.register(subsystem);
1082
1083 manager.start_subsystem(id).await.unwrap();
1084
1085 tokio::time::sleep(Duration::from_millis(100)).await;
1087
1088 let metadata = manager.get_subsystem_metadata(id).unwrap();
1089 assert_eq!(metadata.state, SubsystemState::Failed);
1090 assert!(metadata.last_error.is_some());
1091 })
1092 .await;
1093
1094 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1095 }
1096
1097 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1098 #[async_std::test]
1099 #[ignore = "Failure state transitions behave differently in async-std due to its task model"]
1100 async fn test_subsystem_failure() {
1101 let coordinator = ShutdownCoordinator::new(5000, 10000);
1110 let _manager = SubsystemManager::new(coordinator);
1111
1112 }
1114
1115 #[test]
1116 fn test_restart_policy() {
1117 let policy = RestartPolicy::ExponentialBackoff {
1118 initial_delay: Duration::from_millis(100),
1119 max_delay: Duration::from_secs(60),
1120 max_attempts: 5,
1121 };
1122
1123 assert_ne!(policy, RestartPolicy::Never);
1124 assert_eq!(RestartPolicy::default(), RestartPolicy::Never);
1125 }
1126
1127 #[cfg(feature = "tokio")]
1128 #[cfg_attr(miri, ignore)]
1129 #[tokio::test]
1130 async fn test_closure_subsystem() {
1131 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1133 let coordinator = ShutdownCoordinator::new(500, 1000);
1135 let manager = SubsystemManager::new(coordinator);
1136
1137 let name = "closure_test".to_string();
1139 let closure_subsystem = Box::new(move |shutdown: ShutdownHandle| {
1140 let _ = name.clone();
1142 Box::pin(async move {
1143 #[cfg(feature = "tokio")]
1144 let mut shutdown = shutdown;
1145 loop {
1146 #[cfg(feature = "tokio")]
1147 {
1148 tokio::select! {
1149 () = shutdown.cancelled() => {
1150 println!("Closure subsystem received shutdown signal");
1151 break;
1152 }
1153 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1154 }
1155 }
1156
1157 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1158 {
1159 if shutdown.is_shutdown() {
1160 break;
1161 }
1162 async_std::task::sleep(Duration::from_millis(10)).await;
1163 }
1164 }
1165 Ok(())
1166 }) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
1167 });
1168
1169 let id = manager.register_closure(closure_subsystem, "closure_test");
1171
1172 manager.start_subsystem(id).await.unwrap();
1174
1175 tokio::time::sleep(Duration::from_millis(50)).await;
1177
1178 let metadata = manager.get_subsystem_metadata(id).unwrap();
1180 assert_eq!(metadata.state, SubsystemState::Running);
1181
1182 manager.stop_subsystem(id).await.unwrap();
1184
1185 let metadata = manager.get_subsystem_metadata(id).unwrap();
1187 assert_eq!(metadata.state, SubsystemState::Stopped);
1188 })
1189 .await;
1190
1191 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1192 }
1193
1194 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1195 #[async_std::test]
1196 async fn test_closure_subsystem() {
1197 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1199 let coordinator = ShutdownCoordinator::new(500, 1000);
1201 let manager = SubsystemManager::new(coordinator);
1202
1203 let subsystem = TestSubsystem::new("closure_test", false);
1205 let id = manager.register(subsystem);
1206
1207 manager.start_subsystem(id).await.unwrap();
1209
1210 async_std::task::sleep(Duration::from_millis(50)).await;
1212
1213 let metadata = manager.get_subsystem_metadata(id).unwrap();
1215 assert_eq!(metadata.state, SubsystemState::Running);
1216
1217 manager.stop_subsystem(id).await.unwrap();
1219
1220 let metadata = manager.get_subsystem_metadata(id).unwrap();
1222 assert_eq!(metadata.state, SubsystemState::Stopped);
1223 })
1224 .await;
1225
1226 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1227 }
1228}