rust_supervisor/runtime/
supervisor.rs1use crate::config::state::ConfigState;
7use crate::control::handle::SupervisorHandle;
8use crate::dashboard::config::validate_dashboard_ipc_config;
9use crate::dashboard::error::DashboardError;
10use crate::dashboard::runtime::start_dashboard_ipc_runtime;
11use crate::error::types::SupervisorError;
12use crate::observe::pipeline::ObservabilityPipeline;
13use crate::runtime::control_loop::{RuntimeControlState, run_control_loop};
14use crate::runtime::lifecycle::RuntimeControlPlane;
15use crate::runtime::watchdog::RuntimeWatchdog;
16use crate::shutdown::stage::ShutdownPolicy;
17use crate::spec::supervisor::SupervisorSpec;
18use std::path::Path;
19use std::sync::{Arc, Mutex};
20use tokio::sync::{broadcast, mpsc};
21
22#[derive(Debug, Clone, Copy, Default)]
24pub struct Supervisor;
25
26impl Supervisor {
27 pub async fn start(spec: SupervisorSpec) -> Result<SupervisorHandle, SupervisorError> {
37 let shutdown_policy = shutdown_policy_from_spec(&spec);
38 Self::start_with_policy(spec, shutdown_policy).await
39 }
40
41 pub async fn start_from_config_state(
52 state: ConfigState,
53 ) -> Result<SupervisorHandle, SupervisorError> {
54 let audit_config = state.audit.clone();
55 let dashboard_config = state.dashboard.clone();
56 let spec = state.to_supervisor_spec()?;
57 let mut handle = Self::start(spec.clone()).await?;
58 let dashboard_config = validate_dashboard_ipc_config(dashboard_config.as_ref())
59 .map_err(dashboard_startup_error)?;
60 if let Some(dashboard_config) = dashboard_config {
61 let dashboard_runtime =
62 start_dashboard_ipc_runtime(dashboard_config, audit_config, spec, handle.clone())
63 .map_err(dashboard_startup_error)?;
64 handle = handle.with_dashboard_runtime(dashboard_runtime);
65 }
66 Ok(handle)
67 }
68
69 pub async fn start_from_config_file(
80 path: impl AsRef<Path>,
81 ) -> Result<SupervisorHandle, SupervisorError> {
82 let state = crate::config::loader::load_config_from_yaml_file(path)?;
83 Self::start_from_config_state(state).await
84 }
85
86 pub async fn start_with_policy(
97 spec: SupervisorSpec,
98 shutdown_policy: ShutdownPolicy,
99 ) -> Result<SupervisorHandle, SupervisorError> {
100 spec.validate()?;
101 let backpressure_config = spec.backpressure_config.clone();
102 let (command_sender, command_receiver) = mpsc::channel(spec.control_channel_capacity);
103 let (event_sender, _) = broadcast::channel(spec.event_channel_capacity);
104 let control_plane = RuntimeControlPlane::new();
105 let observability = Arc::new(Mutex::new(ObservabilityPipeline::with_backpressure_config(
106 spec.event_channel_capacity,
107 spec.event_channel_capacity,
108 true,
109 true,
110 backpressure_config,
111 )));
112 let state = RuntimeControlState::new(
113 spec,
114 shutdown_policy,
115 command_sender.clone(),
116 observability.clone(),
117 )?;
118 let join_handle = tokio::spawn(run_control_loop(
119 state,
120 command_receiver,
121 event_sender.clone(),
122 ));
123 RuntimeWatchdog::publish_started(control_plane.clone(), event_sender.clone());
124 RuntimeWatchdog::spawn(control_plane.clone(), join_handle, event_sender.clone());
125 Ok(SupervisorHandle::new_with_observability(
126 command_sender,
127 event_sender,
128 control_plane,
129 observability,
130 ))
131 }
132}
133
134fn shutdown_policy_from_spec(spec: &SupervisorSpec) -> ShutdownPolicy {
144 ShutdownPolicy::new(
145 spec.default_shutdown_policy.graceful_timeout,
146 spec.default_shutdown_policy.abort_wait,
147 true,
148 )
149}
150
151fn dashboard_startup_error(error: DashboardError) -> SupervisorError {
153 SupervisorError::fatal_config(format!("dashboard IPC startup failed: {error}"))
154}