1use crate::coord;
8use crate::error::{Error, Result};
9use crate::pool::{StringPool, VecPool};
10use crate::shutdown::{ShutdownCoordinator, ShutdownHandle};
11
12use dashmap::DashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18#[allow(unused_imports)]
19use tracing::{error, info, instrument, warn};
20
21pub type SubsystemId = u64;
23
24pub type SubsystemFn =
26 Box<dyn Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
27
28pub trait Subsystem: Send + Sync + 'static {
30 fn run(&self, shutdown: ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
32
33 fn name(&self) -> &str;
35
36 fn health_check(&self) -> Option<Box<dyn Fn() -> bool + Send + Sync>> {
38 None
39 }
40
41 fn restart_policy(&self) -> RestartPolicy {
43 RestartPolicy::Never
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
49pub enum RestartPolicy {
50 #[default]
52 Never,
53 Always,
55 OnFailure,
57 ExponentialBackoff {
59 initial_delay: Duration,
61 max_delay: Duration,
63 max_attempts: u32,
65 },
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum SubsystemState {
71 Starting,
73 Running,
75 Stopping,
77 Stopped,
79 Failed,
81 Restarting,
83}
84
85impl std::fmt::Display for SubsystemState {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 Self::Starting => write!(f, "Starting"),
89 Self::Running => write!(f, "Running"),
90 Self::Stopping => write!(f, "Stopping"),
91 Self::Stopped => write!(f, "Stopped"),
92 Self::Failed => write!(f, "Failed"),
93 Self::Restarting => write!(f, "Restarting"),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub enum SubsystemEvent {
101 StateChanged {
103 id: SubsystemId,
105 name: String,
107 state: SubsystemState,
109 at: Instant,
111 },
112}
113
114#[derive(Debug, Clone)]
116pub struct SubsystemMetadata {
117 pub id: SubsystemId,
119 pub name: String,
121 pub state: SubsystemState,
123 pub registered_at: Instant,
125 pub started_at: Option<Instant>,
127 pub stopped_at: Option<Instant>,
129 pub restart_count: u32,
131 pub last_error: Option<String>,
133 pub restart_policy: RestartPolicy,
135}
136
137#[derive(Debug, Clone)]
139pub struct SubsystemStats {
140 pub total_subsystems: usize,
142 pub running_subsystems: usize,
144 pub failed_subsystems: usize,
146 pub stopping_subsystems: usize,
148 pub total_restarts: u64,
150 pub subsystems: Vec<SubsystemMetadata>,
152}
153
154struct SubsystemEntry {
156 metadata: Mutex<SubsystemMetadata>,
158 subsystem: Arc<dyn Subsystem>,
160 #[cfg(feature = "tokio")]
162 task_handle: Mutex<Option<tokio::task::JoinHandle<Result<()>>>>,
163 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
165 task_handle: Mutex<Option<async_std::task::JoinHandle<Result<()>>>>,
166 shutdown_handle: ShutdownHandle,
168}
169
170pub struct SubsystemManager {
172 subsystems: Arc<DashMap<SubsystemId, Arc<SubsystemEntry>>>,
174 shutdown_coordinator: ShutdownCoordinator,
176 next_id: AtomicU64,
178 total_restarts: AtomicU64,
180 string_pool: StringPool,
182 vec_pool: VecPool<(SubsystemId, String, SubsystemState, Arc<dyn Subsystem>)>,
184 metadata_pool: VecPool<SubsystemMetadata>,
186 events_tx: Mutex<Option<coord::chan::Sender<SubsystemEvent>>>,
188 events_rx: Mutex<Option<coord::chan::Receiver<SubsystemEvent>>>,
190 name_cache: Arc<DashMap<SubsystemId, Arc<str>>>,
192}
193
194impl SubsystemManager {
195 #[must_use]
197 pub fn new(shutdown_coordinator: ShutdownCoordinator) -> Self {
198 Self {
199 subsystems: Arc::new(DashMap::new()),
200 shutdown_coordinator,
201 next_id: AtomicU64::new(1),
202 total_restarts: AtomicU64::new(0),
203 string_pool: StringPool::new(32, 128, 64), vec_pool: VecPool::new(8, 32, 16), metadata_pool: VecPool::new(8, 32, 16), events_tx: Mutex::new(None),
208 events_rx: Mutex::new(None),
209 name_cache: Arc::new(DashMap::new()),
210 }
211 }
212
213 pub fn enable_events(&self) {
219 let mut tx_guard = self.events_tx.lock().unwrap();
220 let mut rx_guard = self.events_rx.lock().unwrap();
221 if tx_guard.is_some() || rx_guard.is_some() {
222 return;
223 }
224 let (tx, rx) = coord::chan::unbounded();
225 *tx_guard = Some(tx);
226 *rx_guard = Some(rx);
227 drop(tx_guard);
229 drop(rx_guard);
231 }
232
233 pub fn try_next_event(&self) -> Option<SubsystemEvent> {
239 let rx_guard = self.events_rx.lock().unwrap();
240 rx_guard
241 .as_ref()
242 .and_then(|rx| coord::chan::try_recv(rx).ok())
243 }
244
245 pub fn register<S: Subsystem>(&self, subsystem: S) -> SubsystemId {
249 let id = self.next_id.fetch_add(1, Ordering::AcqRel);
250
251 let name_arc: Arc<str> = Arc::from(subsystem.name());
253 self.name_cache.insert(id, Arc::clone(&name_arc));
254
255 let restart_policy = subsystem.restart_policy();
256 let shutdown_handle = self.shutdown_coordinator.create_handle(subsystem.name());
257
258 let metadata = SubsystemMetadata {
259 id,
260 name: name_arc.to_string(), state: SubsystemState::Starting,
262 registered_at: Instant::now(),
263 started_at: None,
264 stopped_at: None,
265 last_error: None,
266 restart_count: 0,
267 restart_policy,
268 };
269
270 let entry = Arc::new(SubsystemEntry {
271 metadata: Mutex::new(metadata),
272 subsystem: Arc::new(subsystem),
273 #[cfg(feature = "tokio")]
274 task_handle: Mutex::new(None),
275 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
276 task_handle: Mutex::new(None),
277 shutdown_handle,
278 });
279
280 self.subsystems.insert(id, entry);
281
282 info!(subsystem_id = id, subsystem_name = %name_arc, "Registered subsystem");
283 id
284 }
285
286 pub fn register_fn<F, Fut>(&self, name: &str, func: F) -> SubsystemId
292 where
293 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
294 Fut: Future<Output = Result<()>> + Send + 'static,
295 {
296 struct ClosureSubsystem<F> {
297 name: String, func: F,
299 }
300
301 impl<F, Fut> Subsystem for ClosureSubsystem<F>
302 where
303 F: Fn(ShutdownHandle) -> Fut + Send + Sync + 'static,
304 Fut: Future<Output = Result<()>> + Send + 'static,
305 {
306 fn run(
307 &self,
308 shutdown: ShutdownHandle,
309 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
310 Box::pin((self.func)(shutdown))
311 }
312
313 fn name(&self) -> &str {
314 &self.name
315 }
316 }
317
318 let pooled_name = self.string_pool.get_with_value(name);
320 let subsystem = ClosureSubsystem {
321 name: pooled_name.to_string(),
322 func,
323 };
324 self.register(subsystem)
325 }
326
327 pub fn register_closure<F>(&self, closure_subsystem: F, name: &str) -> SubsystemId
333 where
334 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
335 + Send
336 + Sync
337 + 'static,
338 {
339 struct ClosureSubsystemWrapper<F> {
341 name: String,
342 func: F,
343 }
344
345 impl<F> Subsystem for ClosureSubsystemWrapper<F>
346 where
347 F: Fn(ShutdownHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
348 + Send
349 + Sync
350 + 'static,
351 {
352 fn run(
353 &self,
354 shutdown: ShutdownHandle,
355 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
356 (self.func)(shutdown)
357 }
358
359 fn name(&self) -> &str {
360 &self.name
361 }
362 }
363
364 let pooled_name = self.string_pool.get_with_value(name).to_string();
366 let wrapper = ClosureSubsystemWrapper {
367 name: pooled_name,
368 func: closure_subsystem,
369 };
370
371 self.register(wrapper)
373 }
374
375 #[instrument(skip(self), fields(subsystem_id = id))]
385 pub async fn start_subsystem(&self, id: SubsystemId) -> Result<()> {
386 let entry = self
387 .subsystems
388 .get(&id)
389 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
390 .clone();
391
392 let subsystem_name = self
394 .name_cache
395 .get(&id)
396 .map_or_else(|| Arc::from("unknown"), |n| n.clone());
397
398 self.update_state(id, SubsystemState::Starting);
399
400 #[cfg(feature = "tokio")]
401 {
402 let subsystem = Arc::clone(&entry.subsystem);
404 let shutdown_handle = entry.shutdown_handle.clone();
405 let entry_clone = Arc::clone(&entry);
406 let id_clone = id;
407 let subsystem_name_clone = Arc::clone(&subsystem_name);
409
410 let task = tokio::spawn(async move {
412 let result: Result<()> = subsystem.run(shutdown_handle).await;
413
414 match &result {
416 Ok(()) => {
417 let mut metadata = entry_clone.metadata.lock().unwrap();
418 metadata.state = SubsystemState::Stopped;
419 metadata.stopped_at = Some(Instant::now());
420 drop(metadata);
421 info!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, "Subsystem stopped successfully");
422 }
423 Err(e) => {
424 let mut metadata = entry_clone.metadata.lock().unwrap();
425 metadata.state = SubsystemState::Failed;
426 metadata.last_error = Some(e.to_string());
427 metadata.stopped_at = Some(Instant::now());
428 drop(metadata);
429 error!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, error = %e, "Subsystem failed");
430 }
431 }
432
433 result
434 });
435
436 *entry.task_handle.lock().unwrap() = Some(task);
437 }
438
439 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
440 {
441 let subsystem = Arc::clone(&entry.subsystem);
442 let shutdown_handle = entry.shutdown_handle.clone();
443 let entry_clone = Arc::clone(&entry);
444 let id_clone = id;
445 let subsystem_name_clone = Arc::clone(&subsystem_name);
447
448 let task = async_std::task::spawn(async move {
449 let result: Result<()> = subsystem.run(shutdown_handle).await;
450
451 match &result {
452 Ok(()) => {
453 let mut metadata = entry_clone.metadata.lock().unwrap();
454 metadata.state = SubsystemState::Stopped;
455 metadata.stopped_at = Some(Instant::now());
456 drop(metadata);
457 info!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, "Subsystem stopped successfully");
458 }
459 Err(e) => {
460 let mut metadata = entry_clone.metadata.lock().unwrap();
461 metadata.state = SubsystemState::Failed;
462 metadata.last_error = Some(e.to_string());
463 metadata.stopped_at = Some(Instant::now());
464 drop(metadata);
465 error!(subsystem_id = id_clone, subsystem_name = %subsystem_name_clone, error = %e, "Subsystem failed");
466 }
467 }
468
469 result
470 });
471
472 *entry.task_handle.lock().unwrap() = Some(task);
473 }
474
475 self.update_state_with_timestamp(id, SubsystemState::Running, Some(Instant::now()), None);
476 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Started subsystem");
477
478 Ok(())
479 }
480
481 pub async fn start_all(&self) -> Result<()> {
492 let subsystem_ids: Vec<SubsystemId> = self.subsystems.iter().map(|r| *r.key()).collect();
493
494 info!("Starting {} subsystems", subsystem_ids.len());
495
496 let mut first_error: Option<Error> = None;
497
498 for id in subsystem_ids {
499 if let Err(e) = self.start_subsystem(id).await {
500 error!(subsystem_id = id, error = %e, "Failed to start subsystem");
501 if first_error.is_none() {
502 first_error = Some(e);
503 }
504 }
505 }
506
507 first_error.map_or_else(|| Ok(()), Err)
508 }
509
510 #[instrument(skip(self), fields(subsystem_id = id))]
516 pub async fn stop_subsystem(&self, id: SubsystemId) -> Result<()> {
517 let entry = self
518 .subsystems
519 .get(&id)
520 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
521 .clone();
522
523 let subsystem_name = self
525 .name_cache
526 .get(&id)
527 .map_or_else(|| "unknown".to_string(), |n| n.to_string());
528 self.update_state(id, SubsystemState::Stopping);
529
530 #[cfg(feature = "tokio")]
533 {
534 if self.stop_task_tokio(&entry, id, &subsystem_name).await {
535 entry.shutdown_handle.ready();
536 self.update_state_with_timestamp(
537 id,
538 SubsystemState::Stopped,
539 None,
540 Some(Instant::now()),
541 );
542 }
543 }
544
545 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
546 {
547 if self.stop_task_async_std(&entry, id, &subsystem_name).await {
548 entry.shutdown_handle.ready();
549 self.update_state_with_timestamp(
550 id,
551 SubsystemState::Stopped,
552 None,
553 Some(Instant::now()),
554 );
555 }
556 }
557
558 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
559 {
560 entry.shutdown_handle.ready();
561 self.update_state_with_timestamp(
562 id,
563 SubsystemState::Stopped,
564 None,
565 Some(Instant::now()),
566 );
567 }
568
569 Ok(())
570 }
571
572 #[cfg(feature = "tokio")]
573 async fn stop_task_tokio(
574 &self,
575 entry: &Arc<SubsystemEntry>,
576 id: SubsystemId,
577 subsystem_name: &str,
578 ) -> bool {
579 let task_handle_opt = {
580 let mut task_handle_guard = entry.task_handle.lock().unwrap();
581 task_handle_guard.take()
582 };
583
584 let mut completed = false;
585 if let Some(mut task_handle) = task_handle_opt {
586 let timeout = tokio::time::sleep(Duration::from_millis(500));
587 tokio::pin!(timeout);
588 tokio::select! {
589 result = &mut task_handle => {
590 match result {
591 Ok(Ok(())) => {
592 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Subsystem stopped gracefully");
593 completed = true;
594 }
595 Ok(Err(e)) => {
596 warn!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Subsystem stopped with error");
597 completed = true;
598 }
599 Err(e) => {
600 error!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Failed to join subsystem task");
601 completed = true;
602 }
603 }
604 }
605 () = &mut timeout => {
606 warn!(subsystem_id = id, subsystem_name = %subsystem_name, "Timed out waiting for subsystem task to complete, aborting task");
607 task_handle.abort();
608 let _ = task_handle.await;
609 completed = true;
610 }
611 }
612 }
613
614 completed
615 }
616
617 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
618 async fn stop_task_async_std(
619 &self,
620 entry: &Arc<SubsystemEntry>,
621 id: SubsystemId,
622 subsystem_name: &str,
623 ) -> bool {
624 let task_handle_opt = {
625 let mut task_handle_guard = entry.task_handle.lock().unwrap();
626 task_handle_guard.take()
627 };
628
629 let mut completed = false;
630 if let Some(task_handle) = task_handle_opt {
631 match async_std::future::timeout(Duration::from_millis(500), task_handle).await {
632 Ok(Ok(())) => {
633 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Subsystem stopped gracefully");
634 completed = true;
635 }
636 Ok(Err(e)) => {
637 warn!(subsystem_id = id, subsystem_name = %subsystem_name, error = %e, "Subsystem stopped with error");
638 completed = true;
639 }
640 Err(_) => {
641 warn!(subsystem_id = id, subsystem_name = %subsystem_name, "Timed out waiting for subsystem task to complete, cancelling task");
642 completed = true;
643 }
644 }
645 }
646
647 completed
648 }
649
650 pub async fn stop_all(&self) -> Result<()> {
661 let subsystem_ids: Vec<SubsystemId> = self.subsystems.iter().map(|r| *r.key()).collect();
663
664 info!("Stopping {} subsystems", subsystem_ids.len());
665
666 #[allow(unused_variables)]
668 let stop_tasks: Vec<_> = subsystem_ids
669 .into_iter()
670 .map(|id| self.stop_subsystem(id))
671 .collect();
672
673 #[cfg(feature = "tokio")]
674 {
675 let results = futures::future::join_all(stop_tasks).await;
676 for (i, result) in results.into_iter().enumerate() {
677 if let Err(e) = result {
678 error!(subsystem_index = i, error = %e, "Failed to stop subsystem");
679 }
680 }
681 }
682
683 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
684 {
685 for task in stop_tasks {
686 if let Err(e) = task.await {
687 error!(error = %e, "Failed to stop subsystem");
688 }
689 }
690 }
691
692 Ok(())
693 }
694
695 pub async fn restart_subsystem(&self, id: SubsystemId) -> Result<()> {
706 let entry = self
707 .subsystems
708 .get(&id)
709 .ok_or_else(|| Error::subsystem("unknown", "Subsystem not found"))?
710 .clone();
711
712 let subsystem_name = self
714 .name_cache
715 .get(&id)
716 .map_or_else(|| Arc::from("unknown"), |n| n.clone());
717
718 {
720 let mut metadata = entry.metadata.lock().unwrap();
721 metadata.restart_count += 1;
722 }
723
724 self.total_restarts.fetch_add(1, Ordering::AcqRel);
725 self.update_state(id, SubsystemState::Restarting);
726
727 info!(subsystem_id = id, subsystem_name = %subsystem_name, "Restarting subsystem");
728
729 let delay = Self::calculate_restart_delay(&entry);
731 if !delay.is_zero() {
732 info!(
733 subsystem_id = id,
734 delay_ms = delay.as_millis(),
735 "Waiting before restart"
736 );
737
738 #[cfg(feature = "tokio")]
739 tokio::time::sleep(delay).await;
740
741 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
742 async_std::task::sleep(delay).await;
743 }
744
745 self.start_subsystem(id).await
747 }
748
749 pub fn get_stats(&self) -> SubsystemStats {
755 let mut subsystem_metadata = self.metadata_pool.get();
757
758 let total_count = self.subsystems.len();
760 if subsystem_metadata.capacity() < total_count {
761 let additional = total_count - subsystem_metadata.capacity();
762 subsystem_metadata.reserve(additional);
763 }
764
765 for entry in self.subsystems.iter() {
767 subsystem_metadata.push(entry.metadata.lock().unwrap().clone());
768 }
769
770 let mut running_count = 0;
772 let mut failed_count = 0;
773 let mut stopping_count = 0;
774
775 for metadata in subsystem_metadata.iter() {
777 match metadata.state {
778 SubsystemState::Running => running_count += 1,
779 SubsystemState::Failed => failed_count += 1,
780 SubsystemState::Stopping => stopping_count += 1,
781 _ => {} }
783 }
784
785 let subsystems_vec = subsystem_metadata
787 .iter()
788 .cloned()
789 .collect::<Vec<SubsystemMetadata>>();
790
791 drop(subsystem_metadata);
793
794 SubsystemStats {
795 total_subsystems: total_count,
796 running_subsystems: running_count,
797 failed_subsystems: failed_count,
798 stopping_subsystems: stopping_count,
799 total_restarts: self.total_restarts.load(Ordering::Relaxed),
800 subsystems: subsystems_vec,
801 }
802 }
803
804 pub fn get_subsystem_metadata(&self, id: SubsystemId) -> Option<SubsystemMetadata> {
812 self.subsystems
813 .get(&id)
814 .map(|entry| entry.metadata.lock().unwrap().clone())
815 }
816
817 pub fn get_all_metadata(&self) -> Vec<SubsystemMetadata> {
823 let mut metadata_list = self.metadata_pool.get();
825
826 let needed_capacity = self.subsystems.len();
828 if metadata_list.capacity() < needed_capacity {
829 let additional = needed_capacity - metadata_list.capacity();
830 metadata_list.reserve(additional);
831 }
832
833 for entry in self.subsystems.iter() {
835 metadata_list.push(entry.metadata.lock().unwrap().clone());
836 }
837
838 let result = metadata_list.iter().cloned().collect();
840
841 drop(metadata_list);
843
844 result
845 }
846
847 pub fn run_health_checks(&self) -> Vec<(SubsystemId, String, bool)> {
853 let mut subsystem_data = self.vec_pool.get();
855
856 let needed_capacity = self.subsystems.len();
858 if subsystem_data.capacity() < needed_capacity {
859 let additional = needed_capacity - subsystem_data.capacity();
860 subsystem_data.reserve(additional);
861 }
862
863 for entry_ref in self.subsystems.iter() {
865 let id = *entry_ref.key();
866 let entry = entry_ref.value();
867 let state = entry.metadata.lock().unwrap().state;
868
869 let name = self
871 .name_cache
872 .get(&id)
873 .map_or_else(|| "unknown".to_string(), |n| n.to_string());
874
875 subsystem_data.push((id, name, state, Arc::clone(&entry.subsystem)));
876 }
877
878 let mut result = Vec::with_capacity(subsystem_data.len());
880
881 for data in subsystem_data.iter() {
884 let (id, ref name, state, ref subsystem) = *data;
885 let is_healthy = match state {
886 SubsystemState::Running => {
887 subsystem
889 .health_check()
890 .is_none_or(|health_check| health_check())
891 }
892 _ => true, };
894 result.push((id, name.clone(), is_healthy));
895 }
896
897 drop(subsystem_data);
899
900 result
901 }
902
903 fn update_state(&self, id: SubsystemId, new_state: SubsystemState) {
909 self.update_state_with_timestamp(id, new_state, None, None);
910 }
911
912 #[allow(dead_code)]
918 fn update_state_with_error(&self, id: SubsystemId, new_state: SubsystemState, error: String) {
919 let entry_opt = self.subsystems.get(&id).map(|r| r.clone());
921
922 if let Some(entry) = entry_opt {
924 let mut metadata = entry.metadata.lock().unwrap();
925 metadata.state = new_state;
926 metadata.last_error = Some(error);
927 if new_state == SubsystemState::Stopped || new_state == SubsystemState::Failed {
928 metadata.stopped_at = Some(Instant::now());
929 }
930 }
931 }
932
933 fn update_state_with_timestamp(
935 &self,
936 id: SubsystemId,
937 new_state: SubsystemState,
938 started_at: Option<Instant>,
939 stopped_at: Option<Instant>,
940 ) {
941 if let Some(entry) = self.subsystems.get(&id) {
943 let mut metadata = entry.metadata.lock().unwrap();
944 metadata.state = new_state;
945 if let Some(started) = started_at {
946 metadata.started_at = Some(started);
947 }
948 if let Some(stopped) = stopped_at {
949 metadata.stopped_at = Some(stopped);
950 }
951 let event_data = (id, metadata.name.clone(), metadata.state, Instant::now());
952 drop(metadata);
953
954 self.publish_event(SubsystemEvent::StateChanged {
956 id: event_data.0,
957 name: event_data.1,
958 state: event_data.2,
959 at: event_data.3,
960 });
961 }
962 }
963
964 fn publish_event(&self, event: SubsystemEvent) {
966 let tx_opt = self.events_tx.lock().unwrap().as_ref().cloned();
967 if let Some(tx) = tx_opt {
968 let _ = tx.send(event);
970 }
971 }
972
973 #[allow(dead_code)]
979 fn should_restart(entry: &SubsystemEntry) -> bool {
980 let (restart_policy, state, restart_count) = {
982 let metadata = entry.metadata.lock().unwrap();
983 (
984 metadata.restart_policy,
985 metadata.state,
986 metadata.restart_count,
987 )
988 };
989
990 match restart_policy {
991 RestartPolicy::Never => false,
992 RestartPolicy::Always => true,
993 RestartPolicy::OnFailure => state == SubsystemState::Failed,
994 RestartPolicy::ExponentialBackoff { max_attempts, .. } => restart_count < max_attempts,
995 }
996 }
997
998 fn calculate_restart_delay(entry: &SubsystemEntry) -> Duration {
1004 let (restart_policy, restart_count) = {
1006 let metadata = entry.metadata.lock().unwrap();
1007 (metadata.restart_policy, metadata.restart_count)
1008 };
1009
1010 match restart_policy {
1011 RestartPolicy::ExponentialBackoff {
1012 initial_delay,
1013 max_delay,
1014 ..
1015 } => {
1016 let delay = initial_delay * 2_u32.pow(restart_count.min(10)); delay.min(max_delay)
1018 }
1019 _ => Duration::ZERO,
1020 }
1021 }
1022}
1023
1024impl Clone for SubsystemManager {
1025 fn clone(&self) -> Self {
1026 Self {
1027 subsystems: Arc::new(DashMap::new()), shutdown_coordinator: self.shutdown_coordinator.clone(),
1029 next_id: AtomicU64::new(self.next_id.load(Ordering::Acquire)),
1030 total_restarts: AtomicU64::new(0),
1031 string_pool: StringPool::new(32, 128, 64),
1033 vec_pool: VecPool::new(8, 32, 16),
1034 metadata_pool: VecPool::new(8, 32, 16),
1035 events_tx: Mutex::new(None),
1036 events_rx: Mutex::new(None),
1037 name_cache: Arc::new(DashMap::new()),
1038 }
1039 }
1040}
1041
1042impl SubsystemManager {
1043 #[cfg(feature = "lockfree-coordination")]
1049 pub fn subscribe_events(&self) -> Option<coord::chan::Receiver<SubsystemEvent>> {
1054 self.events_rx.lock().unwrap().as_ref().cloned()
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::*;
1061 use std::pin::Pin;
1062 use std::time::Duration;
1063
1064 struct TestSubsystem {
1065 name: String,
1066 should_fail: bool,
1067 }
1068
1069 impl TestSubsystem {
1070 fn new(name: &str, should_fail: bool) -> Self {
1071 Self {
1072 name: name.to_string(),
1073 should_fail,
1074 }
1075 }
1076 }
1077
1078 impl Subsystem for TestSubsystem {
1079 fn run(
1080 &self,
1081 shutdown: ShutdownHandle,
1082 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1083 let should_fail = self.should_fail;
1084 Box::pin(async move {
1085 let _start_time = Instant::now();
1086 #[cfg(feature = "tokio")]
1087 let mut shutdown = shutdown;
1088 loop {
1089 #[cfg(feature = "tokio")]
1090 {
1091 tokio::select! {
1092 () = shutdown.cancelled() => {
1093 info!("Subsystem '{}' shutting down", "TestSubsystem");
1094 break;
1095 }
1096 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1097 }
1098 }
1099
1100 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1101 {
1102 if shutdown.is_shutdown() {
1103 break;
1104 }
1105 async_std::task::sleep(Duration::from_millis(10)).await;
1106 }
1107
1108 if should_fail {
1109 return Err(Error::runtime("Test failure"));
1110 }
1111 }
1112
1113 Ok(())
1114 })
1115 }
1116
1117 fn name(&self) -> &str {
1118 &self.name
1119 }
1120 }
1121
1122 #[cfg(feature = "tokio")]
1123 #[cfg_attr(miri, ignore)]
1124 #[tokio::test]
1125 async fn test_subsystem_registration() {
1126 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1128 let coordinator = ShutdownCoordinator::new(5000, 10000, 15000);
1129 let manager = SubsystemManager::new(coordinator);
1130
1131 let subsystem = TestSubsystem::new("test", false);
1132 let id = manager.register(subsystem);
1133
1134 let stats = manager.get_stats();
1135 assert_eq!(stats.total_subsystems, 1);
1136 assert_eq!(stats.running_subsystems, 0);
1137
1138 let metadata = manager.get_subsystem_metadata(id).unwrap();
1139 assert_eq!(metadata.name, "test");
1140 assert_eq!(metadata.state, SubsystemState::Starting);
1141 })
1142 .await;
1143
1144 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1145 }
1146
1147 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1148 #[async_std::test]
1149 async fn test_subsystem_registration() {
1150 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1152 let coordinator = ShutdownCoordinator::new(5000, 10000, 15000);
1153 let manager = SubsystemManager::new(coordinator);
1154
1155 let subsystem = TestSubsystem::new("test", false);
1156 let id = manager.register(subsystem);
1157
1158 let stats = manager.get_stats();
1159 assert_eq!(stats.total_subsystems, 1);
1160 assert_eq!(stats.running_subsystems, 0);
1161
1162 let metadata = manager.get_subsystem_metadata(id).unwrap();
1163 assert_eq!(metadata.name, "test");
1164 assert_eq!(metadata.state, SubsystemState::Starting);
1165 })
1166 .await;
1167
1168 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1169 }
1170
1171 #[cfg(feature = "tokio")]
1172 #[cfg_attr(miri, ignore)]
1173 #[tokio::test]
1174 async fn test_subsystem_start_stop() {
1175 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1177 let coordinator = ShutdownCoordinator::new(500, 1000, 1500);
1179 let manager = SubsystemManager::new(coordinator);
1180
1181 let subsystem = TestSubsystem::new("test", false);
1183 let id = manager.register(subsystem);
1184
1185 manager.start_subsystem(id).await.unwrap();
1187
1188 tokio::time::sleep(Duration::from_millis(50)).await;
1190
1191 let metadata = manager.get_subsystem_metadata(id).unwrap();
1193 assert_eq!(metadata.state, SubsystemState::Running);
1194
1195 let stop_result =
1197 tokio::time::timeout(Duration::from_millis(1000), manager.stop_subsystem(id)).await;
1198
1199 assert!(stop_result.is_ok());
1200
1201 let metadata = manager.get_subsystem_metadata(id).unwrap();
1203 assert_eq!(metadata.state, SubsystemState::Stopped);
1204 })
1205 .await;
1206
1207 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1208 }
1209
1210 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1211 #[async_std::test]
1212 async fn test_subsystem_start_stop() {
1213 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1215 let coordinator = ShutdownCoordinator::new(500, 1000, 1500);
1217 let manager = SubsystemManager::new(coordinator);
1218
1219 let subsystem = TestSubsystem::new("test", false);
1221 let id = manager.register(subsystem);
1222
1223 manager.start_subsystem(id).await.unwrap();
1225
1226 async_std::task::sleep(Duration::from_millis(50)).await;
1228
1229 let metadata = manager.get_subsystem_metadata(id).unwrap();
1231 assert_eq!(metadata.state, SubsystemState::Running);
1232
1233 let stop_result =
1235 async_std::future::timeout(Duration::from_millis(1000), manager.stop_subsystem(id))
1236 .await;
1237 assert!(stop_result.is_ok(), "Subsystem stop operation timed out");
1238 assert!(stop_result.unwrap().is_ok(), "Failed to stop subsystem");
1239
1240 let metadata = manager.get_subsystem_metadata(id).unwrap();
1242 assert_eq!(metadata.state, SubsystemState::Stopped);
1243 })
1244 .await;
1245
1246 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1247 }
1248
1249 #[cfg(feature = "tokio")]
1250 #[cfg_attr(miri, ignore)]
1251 #[tokio::test]
1252 async fn test_subsystem_failure() {
1253 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1255 let coordinator = ShutdownCoordinator::new(5000, 10000, 15000);
1256 let manager = SubsystemManager::new(coordinator);
1257
1258 let subsystem = TestSubsystem::new("failing", true);
1259 let id = manager.register(subsystem);
1260
1261 manager.start_subsystem(id).await.unwrap();
1262
1263 tokio::time::sleep(Duration::from_millis(100)).await;
1265
1266 let metadata = manager.get_subsystem_metadata(id).unwrap();
1267 assert_eq!(metadata.state, SubsystemState::Failed);
1268 assert!(metadata.last_error.is_some());
1269 })
1270 .await;
1271
1272 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1273 }
1274
1275 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1276 #[async_std::test]
1277 #[ignore = "Failure state transitions behave differently in async-std due to its task model"]
1278 async fn test_subsystem_failure() {
1279 let coordinator = ShutdownCoordinator::new(5000, 10000, 15000);
1288 let _manager = SubsystemManager::new(coordinator);
1289
1290 }
1292
1293 #[test]
1294 fn test_restart_policy() {
1295 let policy = RestartPolicy::ExponentialBackoff {
1296 initial_delay: Duration::from_millis(100),
1297 max_delay: Duration::from_secs(60),
1298 max_attempts: 5,
1299 };
1300
1301 assert_ne!(policy, RestartPolicy::Never);
1302 assert_eq!(RestartPolicy::default(), RestartPolicy::Never);
1303 }
1304
1305 #[cfg(feature = "tokio")]
1306 #[cfg_attr(miri, ignore)]
1307 #[tokio::test]
1308 async fn test_closure_subsystem() {
1309 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1311 let coordinator = ShutdownCoordinator::new(500, 1000, 1500);
1313 let manager = SubsystemManager::new(coordinator);
1314
1315 let name = "closure_test".to_string();
1317 let closure_subsystem = Box::new(move |shutdown: ShutdownHandle| {
1318 let _ = name.clone();
1320 Box::pin(async move {
1321 #[cfg(feature = "tokio")]
1322 let mut shutdown = shutdown;
1323 loop {
1324 #[cfg(feature = "tokio")]
1325 {
1326 tokio::select! {
1327 () = shutdown.cancelled() => {
1328 println!("Closure subsystem received shutdown signal");
1329 break;
1330 }
1331 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1332 }
1333 }
1334
1335 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1336 {
1337 if shutdown.is_shutdown() {
1338 break;
1339 }
1340 async_std::task::sleep(Duration::from_millis(10)).await;
1341 }
1342 }
1343 Ok(())
1344 }) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
1345 });
1346
1347 let id = manager.register_closure(closure_subsystem, "closure_test");
1349
1350 manager.start_subsystem(id).await.unwrap();
1352
1353 tokio::time::sleep(Duration::from_millis(50)).await;
1355
1356 let metadata = manager.get_subsystem_metadata(id).unwrap();
1358 assert_eq!(metadata.state, SubsystemState::Running);
1359
1360 manager.stop_subsystem(id).await.unwrap();
1362
1363 let metadata = manager.get_subsystem_metadata(id).unwrap();
1365 assert_eq!(metadata.state, SubsystemState::Stopped);
1366 })
1367 .await;
1368
1369 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1370 }
1371
1372 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1373 #[async_std::test]
1374 async fn test_closure_subsystem() {
1375 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1377 let coordinator = ShutdownCoordinator::new(500, 1000, 1500);
1379 let manager = SubsystemManager::new(coordinator);
1380
1381 let subsystem = TestSubsystem::new("closure_test", false);
1383 let id = manager.register(subsystem);
1384
1385 manager.start_subsystem(id).await.unwrap();
1387
1388 async_std::task::sleep(Duration::from_millis(50)).await;
1390
1391 let metadata = manager.get_subsystem_metadata(id).unwrap();
1393 assert_eq!(metadata.state, SubsystemState::Running);
1394
1395 manager.stop_subsystem(id).await.unwrap();
1397
1398 let metadata = manager.get_subsystem_metadata(id).unwrap();
1400 assert_eq!(metadata.state, SubsystemState::Stopped);
1401 })
1402 .await;
1403
1404 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1405 }
1406}