rust_supervisor/runtime/
supervisor.rs1use crate::config::state::ConfigState;
7use crate::control::handle::SupervisorHandle;
8use crate::dashboard::config::validate_dashboard_ipc_config;
9use crate::dashboard::error::DashboardError;
10use crate::dashboard::runtime::start_dashboard_ipc_runtime;
11use crate::error::types::SupervisorError;
12use crate::runtime::control_loop::{RuntimeControlState, run_control_loop};
13use crate::shutdown::stage::ShutdownPolicy;
14use crate::spec::supervisor::SupervisorSpec;
15use std::path::Path;
16use tokio::sync::{broadcast, mpsc};
17
18#[derive(Debug, Clone, Copy, Default)]
20pub struct Supervisor;
21
22impl Supervisor {
23 pub async fn start(spec: SupervisorSpec) -> Result<SupervisorHandle, SupervisorError> {
33 let shutdown_policy = shutdown_policy_from_spec(&spec);
34 Self::start_with_policy(spec, shutdown_policy).await
35 }
36
37 pub async fn start_from_config_state(
48 state: ConfigState,
49 ) -> Result<SupervisorHandle, SupervisorError> {
50 let ipc_config = state.ipc.clone();
51 let spec = state.to_supervisor_spec()?;
52 let mut handle = Self::start(spec.clone()).await?;
53 let dashboard_config =
54 validate_dashboard_ipc_config(ipc_config.as_ref()).map_err(dashboard_startup_error)?;
55 if let Some(dashboard_config) = dashboard_config {
56 let dashboard_runtime =
57 start_dashboard_ipc_runtime(dashboard_config, spec, handle.clone())
58 .map_err(dashboard_startup_error)?;
59 handle = handle.with_dashboard_runtime(dashboard_runtime);
60 }
61 Ok(handle)
62 }
63
64 pub async fn start_from_config_file(
75 path: impl AsRef<Path>,
76 ) -> Result<SupervisorHandle, SupervisorError> {
77 let state = crate::config::loader::load_config_state(path)?;
78 Self::start_from_config_state(state).await
79 }
80
81 pub async fn start_with_policy(
92 spec: SupervisorSpec,
93 shutdown_policy: ShutdownPolicy,
94 ) -> Result<SupervisorHandle, SupervisorError> {
95 spec.validate()?;
96 let (command_sender, command_receiver) = mpsc::channel(spec.control_channel_capacity);
97 let (event_sender, _) = broadcast::channel(spec.event_channel_capacity);
98 let state = RuntimeControlState::new(spec, shutdown_policy, command_sender.clone())?;
99 tokio::spawn(run_control_loop(
100 state,
101 command_receiver,
102 event_sender.clone(),
103 ));
104 Ok(SupervisorHandle::new(command_sender, event_sender))
105 }
106}
107
108fn shutdown_policy_from_spec(spec: &SupervisorSpec) -> ShutdownPolicy {
118 ShutdownPolicy::new(
119 spec.default_shutdown_policy.graceful_timeout,
120 spec.default_shutdown_policy.abort_wait,
121 true,
122 )
123}
124
125fn dashboard_startup_error(error: DashboardError) -> SupervisorError {
127 SupervisorError::fatal_config(format!("dashboard IPC startup failed: {error}"))
128}