Skip to main content

rust_supervisor/runtime/
supervisor.rs

1//! Runtime supervisor entry point.
2//!
3//! This module validates supervisor declarations, derives runtime options, and
4//! returns a [`crate::control::handle::SupervisorHandle`].
5
6use crate::config::state::ConfigState;
7use crate::control::handle::SupervisorHandle;
8use crate::dashboard::config::validate_dashboard_ipc_config;
9use crate::dashboard::error::DashboardError;
10use crate::dashboard::runtime::start_dashboard_ipc_runtime;
11use crate::error::types::SupervisorError;
12use crate::observe::pipeline::ObservabilityPipeline;
13use crate::runtime::control_loop::{RuntimeControlState, run_control_loop};
14use crate::runtime::lifecycle::RuntimeControlPlane;
15use crate::runtime::watchdog::RuntimeWatchdog;
16use crate::shutdown::stage::ShutdownPolicy;
17use crate::spec::supervisor::SupervisorSpec;
18use std::path::Path;
19use std::sync::{Arc, Mutex};
20use tokio::sync::{broadcast, mpsc};
21
22/// Supervisor runtime entry point.
23#[derive(Debug, Clone, Copy, Default)]
24pub struct Supervisor;
25
26impl Supervisor {
27    /// Starts a supervisor runtime from an owned specification value.
28    ///
29    /// # Arguments
30    ///
31    /// - `spec`: Supervisor specification owned by the caller.
32    ///
33    /// # Returns
34    ///
35    /// Returns a [`SupervisorHandle`] connected to the runtime control loop.
36    pub async fn start(spec: SupervisorSpec) -> Result<SupervisorHandle, SupervisorError> {
37        let shutdown_policy = shutdown_policy_from_spec(&spec);
38        Self::start_with_policy(spec, shutdown_policy).await
39    }
40
41    /// Starts a supervisor runtime from validated configuration state.
42    ///
43    /// # Arguments
44    ///
45    /// - `state`: Validated configuration state owned by the caller.
46    ///
47    /// # Returns
48    ///
49    /// Returns a [`SupervisorHandle`] only after configuration has produced a
50    /// valid supervisor specification.
51    pub async fn start_from_config_state(
52        state: ConfigState,
53    ) -> Result<SupervisorHandle, SupervisorError> {
54        let audit_config = state.audit.clone();
55        let dashboard_config = state.dashboard.clone();
56        let spec = state.to_supervisor_spec()?;
57        let mut handle = Self::start(spec.clone()).await?;
58        let dashboard_config = validate_dashboard_ipc_config(dashboard_config.as_ref())
59            .map_err(dashboard_startup_error)?;
60        if let Some(dashboard_config) = dashboard_config {
61            let dashboard_runtime =
62                start_dashboard_ipc_runtime(dashboard_config, audit_config, spec, handle.clone())
63                    .map_err(dashboard_startup_error)?;
64            handle = handle.with_dashboard_runtime(dashboard_runtime);
65        }
66        Ok(handle)
67    }
68
69    /// Starts a supervisor runtime from a YAML configuration file.
70    ///
71    /// # Arguments
72    ///
73    /// - `path`: Path to the YAML configuration file.
74    ///
75    /// # Returns
76    ///
77    /// Returns a [`SupervisorHandle`] only after the configuration file has
78    /// loaded and validated successfully.
79    pub async fn start_from_config_file(
80        path: impl AsRef<Path>,
81    ) -> Result<SupervisorHandle, SupervisorError> {
82        let state = crate::config::loader::load_config_from_yaml_file(path)?;
83        Self::start_from_config_state(state).await
84    }
85
86    /// Starts a supervisor runtime with an explicit shutdown policy.
87    ///
88    /// # Arguments
89    ///
90    /// - `spec`: Supervisor specification owned by the caller.
91    /// - `shutdown_policy`: Policy used by the control loop.
92    ///
93    /// # Returns
94    ///
95    /// Returns a [`SupervisorHandle`] connected to the runtime control loop.
96    pub async fn start_with_policy(
97        spec: SupervisorSpec,
98        shutdown_policy: ShutdownPolicy,
99    ) -> Result<SupervisorHandle, SupervisorError> {
100        spec.validate()?;
101        let backpressure_config = spec.backpressure_config.clone();
102        let (command_sender, command_receiver) = mpsc::channel(spec.control_channel_capacity);
103        let (event_sender, _) = broadcast::channel(spec.event_channel_capacity);
104        let control_plane = RuntimeControlPlane::new();
105        let observability = Arc::new(Mutex::new(ObservabilityPipeline::with_backpressure_config(
106            spec.event_channel_capacity,
107            spec.event_channel_capacity,
108            true,
109            true,
110            backpressure_config,
111        )));
112        let state = RuntimeControlState::new(
113            spec,
114            shutdown_policy,
115            command_sender.clone(),
116            observability.clone(),
117        )?;
118        let join_handle = tokio::spawn(run_control_loop(
119            state,
120            command_receiver,
121            event_sender.clone(),
122        ));
123        RuntimeWatchdog::publish_started(control_plane.clone(), event_sender.clone());
124        RuntimeWatchdog::spawn(control_plane.clone(), join_handle, event_sender.clone());
125        Ok(SupervisorHandle::new_with_observability(
126            command_sender,
127            event_sender,
128            control_plane,
129            observability,
130        ))
131    }
132}
133
134/// Builds the shutdown policy from supervisor defaults.
135///
136/// # Arguments
137///
138/// - `spec`: Supervisor declaration that owns default shutdown values.
139///
140/// # Returns
141///
142/// Returns a [`ShutdownPolicy`] for runtime shutdown coordination.
143fn shutdown_policy_from_spec(spec: &SupervisorSpec) -> ShutdownPolicy {
144    ShutdownPolicy::new(
145        spec.default_shutdown_policy.graceful_timeout,
146        spec.default_shutdown_policy.abort_wait,
147        true,
148    )
149}
150
151/// Converts dashboard startup failures into supervisor startup errors.
152fn dashboard_startup_error(error: DashboardError) -> SupervisorError {
153    SupervisorError::fatal_config(format!("dashboard IPC startup failed: {error}"))
154}