use crate::error::types::SupervisorError;
use crate::id::types::{ChildId, SupervisorPath};
use crate::policy::budget::RestartBudgetConfig;
use crate::policy::failure_window::FailureWindowConfig;
use crate::policy::group::GroupDependencyEdge;
use crate::policy::meltdown::MeltdownPolicy;
use crate::policy::task_role_defaults::{SeverityClass, TaskRole, semantic_conflicts_for_child};
use crate::spec::child::{BackoffPolicy, ChildSpec, HealthPolicy, RestartPolicy, ShutdownPolicy};
use confique::Config;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub enum SupervisionStrategy {
OneForOne,
OneForAll,
RestForOne,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum EscalationPolicy {
EscalateToParent,
ShutdownTree,
QuarantineScope,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct RestartLimit {
pub max_restarts: u32,
pub window: Duration,
}
impl RestartLimit {
pub fn new(max_restarts: u32, window: Duration) -> Self {
Self {
max_restarts,
window,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GroupStrategy {
pub group: String,
pub strategy: SupervisionStrategy,
pub restart_limit: Option<RestartLimit>,
pub escalation_policy: Option<EscalationPolicy>,
}
impl GroupStrategy {
pub fn new(group: impl Into<String>, strategy: SupervisionStrategy) -> Self {
Self {
group: group.into(),
strategy,
restart_limit: None,
escalation_policy: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct GroupConfig {
pub name: String,
pub children: Vec<ChildId>,
pub budget: Option<RestartBudgetConfig>,
}
impl GroupConfig {
pub fn new(
name: impl Into<String>,
children: Vec<ChildId>,
budget: Option<RestartBudgetConfig>,
) -> Self {
Self {
name: name.into(),
children,
budget,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildStrategyOverride {
pub child_id: ChildId,
pub strategy: SupervisionStrategy,
pub restart_limit: Option<RestartLimit>,
pub escalation_policy: Option<EscalationPolicy>,
}
impl ChildStrategyOverride {
pub fn new(child_id: ChildId, strategy: SupervisionStrategy) -> Self {
Self {
child_id,
strategy,
restart_limit: None,
escalation_policy: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DynamicSupervisorPolicy {
pub enabled: bool,
pub child_limit: Option<usize>,
}
impl DynamicSupervisorPolicy {
pub fn unbounded() -> Self {
Self {
enabled: true,
child_limit: None,
}
}
pub fn limited(child_limit: usize) -> Self {
Self {
enabled: true,
child_limit: Some(child_limit),
}
}
pub fn allows_addition(&self, current_child_count: usize) -> bool {
self.enabled
&& self
.child_limit
.is_none_or(|limit| current_child_count < limit)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StrategyExecutionPlan {
pub failed_child: ChildId,
pub strategy: SupervisionStrategy,
pub scope: Vec<ChildId>,
pub group: Option<String>,
pub restart_limit: Option<RestartLimit>,
pub escalation_policy: Option<EscalationPolicy>,
pub dynamic_supervisor_enabled: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum BackpressureStrategy {
AlertAndBlock,
SampleAndAudit,
}
impl Default for BackpressureStrategy {
fn default() -> Self {
Self::AlertAndBlock
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Config, JsonSchema)]
pub struct BackpressureConfig {
#[config(default = "alert_and_block")]
#[serde(default)]
pub strategy: BackpressureStrategy,
#[config(default = 80)]
#[serde(default = "default_warn_threshold")]
pub warn_threshold_pct: u8,
#[config(default = 95)]
#[serde(default = "default_critical_threshold")]
pub critical_threshold_pct: u8,
#[config(default = 30)]
#[serde(default = "default_window_secs")]
pub window_secs: u64,
#[config(default = 1024)]
#[serde(default = "default_audit_capacity")]
pub audit_channel_capacity: usize,
}
fn default_warn_threshold() -> u8 {
80
}
fn default_critical_threshold() -> u8 {
95
}
fn default_window_secs() -> u64 {
30
}
fn default_audit_capacity() -> usize {
1024
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
strategy: BackpressureStrategy::AlertAndBlock,
warn_threshold_pct: default_warn_threshold(),
critical_threshold_pct: default_critical_threshold(),
window_secs: default_window_secs(),
audit_channel_capacity: default_audit_capacity(),
}
}
}
#[derive(Debug, Clone)]
pub struct SupervisorSpec {
pub path: SupervisorPath,
pub strategy: SupervisionStrategy,
pub children: Vec<ChildSpec>,
pub config_version: String,
pub default_restart_policy: RestartPolicy,
pub default_backoff_policy: BackoffPolicy,
pub default_health_policy: HealthPolicy,
pub default_shutdown_policy: ShutdownPolicy,
pub supervisor_failure_limit: u32,
pub restart_limit: Option<RestartLimit>,
pub escalation_policy: Option<EscalationPolicy>,
pub group_strategies: Vec<GroupStrategy>,
pub group_configs: Vec<GroupConfig>,
pub group_dependencies: Vec<GroupDependencyEdge>,
pub severity_defaults: HashMap<TaskRole, SeverityClass>,
pub child_strategy_overrides: Vec<ChildStrategyOverride>,
pub dynamic_supervisor_policy: DynamicSupervisorPolicy,
pub control_channel_capacity: usize,
pub event_channel_capacity: usize,
pub backpressure_config: BackpressureConfig,
pub meltdown_policy: MeltdownPolicy,
pub failure_window_config: FailureWindowConfig,
pub restart_budget_config: RestartBudgetConfig,
pub pipeline_journal_capacity: usize,
pub pipeline_subscriber_capacity: usize,
pub concurrent_restart_limit: u32,
}
impl SupervisorSpec {
pub fn root(children: Vec<ChildSpec>) -> Self {
let channel_capacity = channel_capacity_for_children(children.len());
Self {
path: SupervisorPath::root(),
strategy: SupervisionStrategy::OneForOne,
children,
config_version: String::from("unversioned"),
default_restart_policy: RestartPolicy::Transient,
default_backoff_policy: BackoffPolicy::new(
Duration::from_millis(10),
Duration::from_secs(1),
0.0,
),
default_health_policy: HealthPolicy::new(
Duration::from_secs(1),
Duration::from_secs(3),
),
default_shutdown_policy: ShutdownPolicy::new(
Duration::from_secs(5),
Duration::from_secs(1),
),
supervisor_failure_limit: 1,
restart_limit: None,
escalation_policy: None,
group_strategies: Vec::new(),
group_configs: Vec::new(),
group_dependencies: Vec::new(),
severity_defaults: HashMap::new(),
child_strategy_overrides: Vec::new(),
dynamic_supervisor_policy: DynamicSupervisorPolicy::unbounded(),
control_channel_capacity: channel_capacity,
event_channel_capacity: channel_capacity.saturating_mul(2),
backpressure_config: BackpressureConfig::default(),
meltdown_policy: MeltdownPolicy::new(
3,
Duration::from_secs(10),
5,
Duration::from_secs(30),
10,
Duration::from_secs(60),
Duration::from_secs(120),
),
failure_window_config: FailureWindowConfig::time_sliding(60, 5),
restart_budget_config: RestartBudgetConfig::safe_default(),
pipeline_journal_capacity: 100,
pipeline_subscriber_capacity: 10,
concurrent_restart_limit: 5,
}
}
pub fn validate(&self) -> Result<(), SupervisorError> {
if self.config_version.trim().is_empty() {
return Err(SupervisorError::fatal_config(
"config version must not be empty",
));
}
if self.supervisor_failure_limit == 0 {
return Err(SupervisorError::fatal_config(
"supervisor failure limit must be greater than zero",
));
}
if self.control_channel_capacity == 0 {
return Err(SupervisorError::fatal_config(
"control channel capacity must be greater than zero",
));
}
if self.event_channel_capacity == 0 {
return Err(SupervisorError::fatal_config(
"event channel capacity must be greater than zero",
));
}
validate_backpressure_config(&self.backpressure_config)?;
for child in &self.children {
child.validate()?;
}
validate_restart_limit(self.restart_limit)?;
validate_group_strategies(&self.group_strategies, &self.children)?;
validate_child_strategy_overrides(self)?;
validate_task_roles(&self.children)?;
validate_dynamic_policy(self.dynamic_supervisor_policy)?;
validate_child_group_names(&self.children, &self.group_configs)?;
validate_pipeline_policy(self)?;
Ok(())
}
}
fn validate_child_group_names(
children: &[ChildSpec],
group_configs: &[GroupConfig],
) -> Result<(), SupervisorError> {
let group_names: std::collections::HashSet<&str> =
group_configs.iter().map(|g| g.name.as_str()).collect();
for child in children {
if let Some(ref group_name) = child.group
&& !group_names.contains(group_name.as_str())
{
return Err(SupervisorError::fatal_config(format!(
"child '{}' references unknown group '{}'; available groups: {:?}",
child.id,
group_name,
group_names.iter().copied().collect::<Vec<_>>(),
)));
}
}
Ok(())
}
fn validate_restart_limit(limit: Option<RestartLimit>) -> Result<(), SupervisorError> {
let Some(limit) = limit else {
return Ok(());
};
if limit.max_restarts == 0 {
return Err(SupervisorError::fatal_config(
"restart limit max_restarts must be greater than zero",
));
}
if limit.window.is_zero() {
return Err(SupervisorError::fatal_config(
"restart limit window must be greater than zero",
));
}
Ok(())
}
fn validate_group_strategies(
strategies: &[GroupStrategy],
children: &[ChildSpec],
) -> Result<(), SupervisorError> {
let mut groups = HashSet::new();
for strategy in strategies {
if strategy.group.trim().is_empty() {
return Err(SupervisorError::fatal_config(
"group strategy group must not be empty",
));
}
if !groups.insert(strategy.group.clone()) {
return Err(SupervisorError::fatal_config(format!(
"duplicate group strategy: {}",
strategy.group
)));
}
validate_restart_limit(strategy.restart_limit)?;
}
validate_group_membership(strategies, children)?;
Ok(())
}
fn validate_group_membership(
strategies: &[GroupStrategy],
children: &[ChildSpec],
) -> Result<(), SupervisorError> {
let groups = strategies
.iter()
.map(|strategy| strategy.group.clone())
.collect::<HashSet<_>>();
for strategy in strategies {
if !children
.iter()
.any(|child| child.tags.contains(&strategy.group))
{
return Err(SupervisorError::fatal_config(format!(
"group strategy references unused group: {}",
strategy.group
)));
}
}
for child in children {
let configured_group_count = child
.tags
.iter()
.filter(|tag| groups.contains(*tag))
.count();
if configured_group_count > 1 {
return Err(SupervisorError::fatal_config(format!(
"child strategy groups are ambiguous for child: {}",
child.id
)));
}
}
Ok(())
}
fn validate_child_strategy_overrides(spec: &SupervisorSpec) -> Result<(), SupervisorError> {
let child_ids = spec
.children
.iter()
.map(|child| child.id.clone())
.collect::<HashSet<_>>();
let mut overrides = HashSet::new();
for strategy in &spec.child_strategy_overrides {
if !child_ids.contains(&strategy.child_id) {
return Err(SupervisorError::fatal_config(format!(
"child strategy override references unknown child: {}",
strategy.child_id
)));
}
if !overrides.insert(strategy.child_id.clone()) {
return Err(SupervisorError::fatal_config(format!(
"duplicate child strategy override: {}",
strategy.child_id
)));
}
validate_restart_limit(strategy.restart_limit)?;
}
Ok(())
}
fn validate_task_roles(children: &[ChildSpec]) -> Result<(), SupervisorError> {
let child_ids = children
.iter()
.map(|child| child.id.clone())
.collect::<HashSet<_>>();
for child in children {
emit_role_conflict_warnings(child);
if child.task_role != Some(TaskRole::Sidecar) {
continue;
}
let sidecar_config = child.sidecar_config.as_ref().ok_or_else(|| {
SupervisorError::fatal_config(format!(
"sidecar child {} requires sidecar_config",
child.id
))
})?;
if !child_ids.contains(&sidecar_config.primary_child_id) {
return Err(SupervisorError::fatal_config(format!(
"sidecar child {} references unknown primary_child_id {}",
child.id, sidecar_config.primary_child_id
)));
}
let primary_child = children
.iter()
.find(|candidate| candidate.id == sidecar_config.primary_child_id)
.ok_or_else(|| {
SupervisorError::fatal_config(format!(
"sidecar child {} references unknown primary_child_id {}",
child.id, sidecar_config.primary_child_id
))
})?;
if primary_child.task_role == Some(TaskRole::Sidecar) {
return Err(SupervisorError::fatal_config(format!(
"sidecar child {} must not use another sidecar {} as primary_child_id",
child.id, sidecar_config.primary_child_id
)));
}
}
Ok(())
}
fn emit_role_conflict_warnings(child: &ChildSpec) {
for conflict in semantic_conflicts_for_child(child) {
tracing::warn!(
child_id = %conflict.child_id,
task_role = %conflict.task_role,
conflicting_field = %conflict.conflicting_field,
user_value = %conflict.user_value,
expected_semantic = %conflict.expected_semantic,
reason = %conflict.reason,
"task role semantic conflict"
);
}
}
fn validate_dynamic_policy(policy: DynamicSupervisorPolicy) -> Result<(), SupervisorError> {
if policy.child_limit == Some(0) {
return Err(SupervisorError::fatal_config(
"dynamic supervisor child_limit must be greater than zero",
));
}
Ok(())
}
fn validate_pipeline_policy(spec: &SupervisorSpec) -> Result<(), SupervisorError> {
if spec.pipeline_journal_capacity == 0 {
return Err(SupervisorError::fatal_config(
"pipeline journal capacity must be greater than zero",
));
}
if spec.pipeline_subscriber_capacity == 0 {
return Err(SupervisorError::fatal_config(
"pipeline subscriber capacity must be greater than zero",
));
}
if spec.concurrent_restart_limit == 0 {
return Err(SupervisorError::fatal_config(
"concurrent restart limit must be greater than zero",
));
}
if spec.restart_budget_config.window.is_zero() {
return Err(SupervisorError::fatal_config(
"restart budget window must be greater than zero",
));
}
if spec.restart_budget_config.max_burst == 0 {
return Err(SupervisorError::fatal_config(
"restart budget max_burst must be greater than zero",
));
}
if spec.restart_budget_config.recovery_rate_per_sec <= 0.0 {
return Err(SupervisorError::fatal_config(
"restart budget recovery_rate_per_sec must be greater than zero",
));
}
Ok(())
}
fn validate_backpressure_config(config: &BackpressureConfig) -> Result<(), SupervisorError> {
if config.warn_threshold_pct == 0 || config.warn_threshold_pct > 100 {
return Err(SupervisorError::fatal_config(
"backpressure warn_threshold_pct must be between 1 and 100",
));
}
if config.critical_threshold_pct == 0 || config.critical_threshold_pct > 100 {
return Err(SupervisorError::fatal_config(
"backpressure critical_threshold_pct must be between 1 and 100",
));
}
if config.warn_threshold_pct >= config.critical_threshold_pct {
return Err(SupervisorError::fatal_config(
"backpressure warn_threshold_pct must be less than critical_threshold_pct",
));
}
if config.window_secs == 0 {
return Err(SupervisorError::fatal_config(
"backpressure window_secs must be greater than zero",
));
}
if config.audit_channel_capacity == 0 {
return Err(SupervisorError::fatal_config(
"backpressure audit_channel_capacity must be greater than zero",
));
}
Ok(())
}
fn channel_capacity_for_children(child_count: usize) -> usize {
child_count.saturating_add(1)
}