1use crate::error::types::SupervisorError;
7use crate::id::types::{ChildId, SupervisorPath};
8use crate::policy::budget::RestartBudgetConfig;
9use crate::policy::failure_window::FailureWindowConfig;
10use crate::policy::group::GroupDependencyEdge;
11use crate::policy::meltdown::MeltdownPolicy;
12use crate::policy::task_role_defaults::{SeverityClass, TaskRole, semantic_conflicts_for_child};
13use crate::spec::child::{BackoffPolicy, ChildSpec, HealthPolicy, RestartPolicy, ShutdownPolicy};
14use confique::Config;
15use schemars::JsonSchema;
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
22pub enum SupervisionStrategy {
23 OneForOne,
25 OneForAll,
27 RestForOne,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
33#[serde(rename_all = "snake_case")]
34pub enum EscalationPolicy {
35 EscalateToParent,
37 ShutdownTree,
39 QuarantineScope,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
45pub struct RestartLimit {
46 pub max_restarts: u32,
48 pub window: Duration,
50}
51
52impl RestartLimit {
53 pub fn new(max_restarts: u32, window: Duration) -> Self {
64 Self {
65 max_restarts,
66 window,
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct GroupStrategy {
74 pub group: String,
76 pub strategy: SupervisionStrategy,
78 pub restart_limit: Option<RestartLimit>,
80 pub escalation_policy: Option<EscalationPolicy>,
82}
83
84impl GroupStrategy {
85 pub fn new(group: impl Into<String>, strategy: SupervisionStrategy) -> Self {
96 Self {
97 group: group.into(),
98 strategy,
99 restart_limit: None,
100 escalation_policy: None,
101 }
102 }
103}
104
105#[derive(Debug, Clone, PartialEq)]
108pub struct GroupConfig {
109 pub name: String,
111 pub children: Vec<ChildId>,
113 pub budget: Option<RestartBudgetConfig>,
119}
120
121impl GroupConfig {
122 pub fn new(
134 name: impl Into<String>,
135 children: Vec<ChildId>,
136 budget: Option<RestartBudgetConfig>,
137 ) -> Self {
138 Self {
139 name: name.into(),
140 children,
141 budget,
142 }
143 }
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct ChildStrategyOverride {
149 pub child_id: ChildId,
151 pub strategy: SupervisionStrategy,
153 pub restart_limit: Option<RestartLimit>,
155 pub escalation_policy: Option<EscalationPolicy>,
157}
158
159impl ChildStrategyOverride {
160 pub fn new(child_id: ChildId, strategy: SupervisionStrategy) -> Self {
171 Self {
172 child_id,
173 strategy,
174 restart_limit: None,
175 escalation_policy: None,
176 }
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub struct DynamicSupervisorPolicy {
183 pub enabled: bool,
185 pub child_limit: Option<usize>,
187}
188
189impl DynamicSupervisorPolicy {
190 pub fn unbounded() -> Self {
200 Self {
201 enabled: true,
202 child_limit: None,
203 }
204 }
205
206 pub fn limited(child_limit: usize) -> Self {
216 Self {
217 enabled: true,
218 child_limit: Some(child_limit),
219 }
220 }
221
222 pub fn allows_addition(&self, current_child_count: usize) -> bool {
232 self.enabled
233 && self
234 .child_limit
235 .is_none_or(|limit| current_child_count < limit)
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct StrategyExecutionPlan {
242 pub failed_child: ChildId,
244 pub strategy: SupervisionStrategy,
246 pub scope: Vec<ChildId>,
248 pub group: Option<String>,
250 pub restart_limit: Option<RestartLimit>,
252 pub escalation_policy: Option<EscalationPolicy>,
254 pub dynamic_supervisor_enabled: bool,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
260#[serde(rename_all = "snake_case")]
261pub enum BackpressureStrategy {
262 AlertAndBlock,
264 SampleAndAudit,
266}
267
268impl Default for BackpressureStrategy {
269 fn default() -> Self {
271 Self::AlertAndBlock
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Config, JsonSchema)]
277pub struct BackpressureConfig {
278 #[config(default = "alert_and_block")]
280 #[serde(default)]
281 pub strategy: BackpressureStrategy,
282 #[config(default = 80)]
284 #[serde(default = "default_warn_threshold")]
285 pub warn_threshold_pct: u8,
286 #[config(default = 95)]
288 #[serde(default = "default_critical_threshold")]
289 pub critical_threshold_pct: u8,
290 #[config(default = 30)]
292 #[serde(default = "default_window_secs")]
293 pub window_secs: u64,
294 #[config(default = 1024)]
296 #[serde(default = "default_audit_capacity")]
297 pub audit_channel_capacity: usize,
298}
299
300fn default_warn_threshold() -> u8 {
302 80
303}
304
305fn default_critical_threshold() -> u8 {
307 95
308}
309
310fn default_window_secs() -> u64 {
312 30
313}
314
315fn default_audit_capacity() -> usize {
317 1024
318}
319
320impl Default for BackpressureConfig {
321 fn default() -> Self {
323 Self {
324 strategy: BackpressureStrategy::AlertAndBlock,
325 warn_threshold_pct: default_warn_threshold(),
326 critical_threshold_pct: default_critical_threshold(),
327 window_secs: default_window_secs(),
328 audit_channel_capacity: default_audit_capacity(),
329 }
330 }
331}
332
333#[derive(Debug, Clone)]
335pub struct SupervisorSpec {
336 pub path: SupervisorPath,
338 pub strategy: SupervisionStrategy,
340 pub children: Vec<ChildSpec>,
342 pub config_version: String,
344 pub default_restart_policy: RestartPolicy,
346 pub default_backoff_policy: BackoffPolicy,
348 pub default_health_policy: HealthPolicy,
350 pub default_shutdown_policy: ShutdownPolicy,
352 pub supervisor_failure_limit: u32,
354 pub restart_limit: Option<RestartLimit>,
356 pub escalation_policy: Option<EscalationPolicy>,
358 pub group_strategies: Vec<GroupStrategy>,
360 pub group_configs: Vec<GroupConfig>,
362 pub group_dependencies: Vec<GroupDependencyEdge>,
364 pub severity_defaults: HashMap<TaskRole, SeverityClass>,
366 pub child_strategy_overrides: Vec<ChildStrategyOverride>,
368 pub dynamic_supervisor_policy: DynamicSupervisorPolicy,
370 pub control_channel_capacity: usize,
372 pub event_channel_capacity: usize,
374 pub backpressure_config: BackpressureConfig,
376 pub meltdown_policy: MeltdownPolicy,
378 pub failure_window_config: FailureWindowConfig,
380 pub restart_budget_config: RestartBudgetConfig,
382 pub pipeline_journal_capacity: usize,
384 pub pipeline_subscriber_capacity: usize,
386 pub concurrent_restart_limit: u32,
388}
389
390impl SupervisorSpec {
391 pub fn root(children: Vec<ChildSpec>) -> Self {
408 let channel_capacity = channel_capacity_for_children(children.len());
409 Self {
410 path: SupervisorPath::root(),
411 strategy: SupervisionStrategy::OneForOne,
412 children,
413 config_version: String::from("unversioned"),
414 default_restart_policy: RestartPolicy::Transient,
415 default_backoff_policy: BackoffPolicy::new(
416 Duration::from_millis(10),
417 Duration::from_secs(1),
418 0.0,
419 ),
420 default_health_policy: HealthPolicy::new(
421 Duration::from_secs(1),
422 Duration::from_secs(3),
423 ),
424 default_shutdown_policy: ShutdownPolicy::new(
425 Duration::from_secs(5),
426 Duration::from_secs(1),
427 ),
428 supervisor_failure_limit: 1,
429 restart_limit: None,
430 escalation_policy: None,
431 group_strategies: Vec::new(),
432 group_configs: Vec::new(),
433 group_dependencies: Vec::new(),
434 severity_defaults: HashMap::new(),
435 child_strategy_overrides: Vec::new(),
436 dynamic_supervisor_policy: DynamicSupervisorPolicy::unbounded(),
437 control_channel_capacity: channel_capacity,
438 event_channel_capacity: channel_capacity.saturating_mul(2),
439 backpressure_config: BackpressureConfig::default(),
440 meltdown_policy: MeltdownPolicy::new(
441 3,
442 Duration::from_secs(10),
443 5,
444 Duration::from_secs(30),
445 10,
446 Duration::from_secs(60),
447 Duration::from_secs(120),
448 ),
449 failure_window_config: FailureWindowConfig::time_sliding(60, 5),
450 restart_budget_config: RestartBudgetConfig::safe_default(),
451 pipeline_journal_capacity: 100,
452 pipeline_subscriber_capacity: 10,
453 concurrent_restart_limit: 5,
454 }
455 }
456
457 pub fn validate(&self) -> Result<(), SupervisorError> {
467 if self.config_version.trim().is_empty() {
468 return Err(SupervisorError::fatal_config(
469 "config version must not be empty",
470 ));
471 }
472 if self.supervisor_failure_limit == 0 {
473 return Err(SupervisorError::fatal_config(
474 "supervisor failure limit must be greater than zero",
475 ));
476 }
477 if self.control_channel_capacity == 0 {
478 return Err(SupervisorError::fatal_config(
479 "control channel capacity must be greater than zero",
480 ));
481 }
482 if self.event_channel_capacity == 0 {
483 return Err(SupervisorError::fatal_config(
484 "event channel capacity must be greater than zero",
485 ));
486 }
487 validate_backpressure_config(&self.backpressure_config)?;
488 for child in &self.children {
489 child.validate()?;
490 }
491 validate_restart_limit(self.restart_limit)?;
492 validate_group_strategies(&self.group_strategies, &self.children)?;
493 validate_child_strategy_overrides(self)?;
494 validate_task_roles(&self.children)?;
495 validate_dynamic_policy(self.dynamic_supervisor_policy)?;
496 validate_child_group_names(&self.children, &self.group_configs)?;
497 validate_pipeline_policy(self)?;
498 Ok(())
499 }
500}
501
502fn validate_child_group_names(
506 children: &[ChildSpec],
507 group_configs: &[GroupConfig],
508) -> Result<(), SupervisorError> {
509 let group_names: std::collections::HashSet<&str> =
510 group_configs.iter().map(|g| g.name.as_str()).collect();
511
512 for child in children {
513 if let Some(ref group_name) = child.group
514 && !group_names.contains(group_name.as_str())
515 {
516 return Err(SupervisorError::fatal_config(format!(
517 "child '{}' references unknown group '{}'; available groups: {:?}",
518 child.id,
519 group_name,
520 group_names.iter().copied().collect::<Vec<_>>(),
521 )));
522 }
523 }
524 Ok(())
525}
526
527fn validate_restart_limit(limit: Option<RestartLimit>) -> Result<(), SupervisorError> {
537 let Some(limit) = limit else {
538 return Ok(());
539 };
540 if limit.max_restarts == 0 {
541 return Err(SupervisorError::fatal_config(
542 "restart limit max_restarts must be greater than zero",
543 ));
544 }
545 if limit.window.is_zero() {
546 return Err(SupervisorError::fatal_config(
547 "restart limit window must be greater than zero",
548 ));
549 }
550 Ok(())
551}
552
553fn validate_group_strategies(
563 strategies: &[GroupStrategy],
564 children: &[ChildSpec],
565) -> Result<(), SupervisorError> {
566 let mut groups = HashSet::new();
567 for strategy in strategies {
568 if strategy.group.trim().is_empty() {
569 return Err(SupervisorError::fatal_config(
570 "group strategy group must not be empty",
571 ));
572 }
573 if !groups.insert(strategy.group.clone()) {
574 return Err(SupervisorError::fatal_config(format!(
575 "duplicate group strategy: {}",
576 strategy.group
577 )));
578 }
579 validate_restart_limit(strategy.restart_limit)?;
580 }
581 validate_group_membership(strategies, children)?;
582 Ok(())
583}
584
585fn validate_group_membership(
596 strategies: &[GroupStrategy],
597 children: &[ChildSpec],
598) -> Result<(), SupervisorError> {
599 let groups = strategies
600 .iter()
601 .map(|strategy| strategy.group.clone())
602 .collect::<HashSet<_>>();
603 for strategy in strategies {
604 if !children
605 .iter()
606 .any(|child| child.tags.contains(&strategy.group))
607 {
608 return Err(SupervisorError::fatal_config(format!(
609 "group strategy references unused group: {}",
610 strategy.group
611 )));
612 }
613 }
614 for child in children {
615 let configured_group_count = child
616 .tags
617 .iter()
618 .filter(|tag| groups.contains(*tag))
619 .count();
620 if configured_group_count > 1 {
621 return Err(SupervisorError::fatal_config(format!(
622 "child strategy groups are ambiguous for child: {}",
623 child.id
624 )));
625 }
626 }
627 Ok(())
628}
629
630fn validate_child_strategy_overrides(spec: &SupervisorSpec) -> Result<(), SupervisorError> {
640 let child_ids = spec
641 .children
642 .iter()
643 .map(|child| child.id.clone())
644 .collect::<HashSet<_>>();
645 let mut overrides = HashSet::new();
646 for strategy in &spec.child_strategy_overrides {
647 if !child_ids.contains(&strategy.child_id) {
648 return Err(SupervisorError::fatal_config(format!(
649 "child strategy override references unknown child: {}",
650 strategy.child_id
651 )));
652 }
653 if !overrides.insert(strategy.child_id.clone()) {
654 return Err(SupervisorError::fatal_config(format!(
655 "duplicate child strategy override: {}",
656 strategy.child_id
657 )));
658 }
659 validate_restart_limit(strategy.restart_limit)?;
660 }
661 Ok(())
662}
663
664fn validate_task_roles(children: &[ChildSpec]) -> Result<(), SupervisorError> {
674 let child_ids = children
675 .iter()
676 .map(|child| child.id.clone())
677 .collect::<HashSet<_>>();
678
679 for child in children {
680 emit_role_conflict_warnings(child);
681 if child.task_role != Some(TaskRole::Sidecar) {
682 continue;
683 }
684
685 let sidecar_config = child.sidecar_config.as_ref().ok_or_else(|| {
686 SupervisorError::fatal_config(format!(
687 "sidecar child {} requires sidecar_config",
688 child.id
689 ))
690 })?;
691
692 if !child_ids.contains(&sidecar_config.primary_child_id) {
693 return Err(SupervisorError::fatal_config(format!(
694 "sidecar child {} references unknown primary_child_id {}",
695 child.id, sidecar_config.primary_child_id
696 )));
697 }
698
699 let primary_child = children
700 .iter()
701 .find(|candidate| candidate.id == sidecar_config.primary_child_id)
702 .ok_or_else(|| {
703 SupervisorError::fatal_config(format!(
704 "sidecar child {} references unknown primary_child_id {}",
705 child.id, sidecar_config.primary_child_id
706 ))
707 })?;
708
709 if primary_child.task_role == Some(TaskRole::Sidecar) {
710 return Err(SupervisorError::fatal_config(format!(
711 "sidecar child {} must not use another sidecar {} as primary_child_id",
712 child.id, sidecar_config.primary_child_id
713 )));
714 }
715 }
716
717 Ok(())
718}
719
720fn emit_role_conflict_warnings(child: &ChildSpec) {
730 for conflict in semantic_conflicts_for_child(child) {
731 tracing::warn!(
732 child_id = %conflict.child_id,
733 task_role = %conflict.task_role,
734 conflicting_field = %conflict.conflicting_field,
735 user_value = %conflict.user_value,
736 expected_semantic = %conflict.expected_semantic,
737 reason = %conflict.reason,
738 "task role semantic conflict"
739 );
740 }
741}
742
743fn validate_dynamic_policy(policy: DynamicSupervisorPolicy) -> Result<(), SupervisorError> {
753 if policy.child_limit == Some(0) {
754 return Err(SupervisorError::fatal_config(
755 "dynamic supervisor child_limit must be greater than zero",
756 ));
757 }
758 Ok(())
759}
760
761fn validate_pipeline_policy(spec: &SupervisorSpec) -> Result<(), SupervisorError> {
771 if spec.pipeline_journal_capacity == 0 {
772 return Err(SupervisorError::fatal_config(
773 "pipeline journal capacity must be greater than zero",
774 ));
775 }
776 if spec.pipeline_subscriber_capacity == 0 {
777 return Err(SupervisorError::fatal_config(
778 "pipeline subscriber capacity must be greater than zero",
779 ));
780 }
781 if spec.concurrent_restart_limit == 0 {
782 return Err(SupervisorError::fatal_config(
783 "concurrent restart limit must be greater than zero",
784 ));
785 }
786 if spec.restart_budget_config.window.is_zero() {
787 return Err(SupervisorError::fatal_config(
788 "restart budget window must be greater than zero",
789 ));
790 }
791 if spec.restart_budget_config.max_burst == 0 {
792 return Err(SupervisorError::fatal_config(
793 "restart budget max_burst must be greater than zero",
794 ));
795 }
796 if spec.restart_budget_config.recovery_rate_per_sec <= 0.0 {
797 return Err(SupervisorError::fatal_config(
798 "restart budget recovery_rate_per_sec must be greater than zero",
799 ));
800 }
801 Ok(())
802}
803
804fn validate_backpressure_config(config: &BackpressureConfig) -> Result<(), SupervisorError> {
814 if config.warn_threshold_pct == 0 || config.warn_threshold_pct > 100 {
815 return Err(SupervisorError::fatal_config(
816 "backpressure warn_threshold_pct must be between 1 and 100",
817 ));
818 }
819 if config.critical_threshold_pct == 0 || config.critical_threshold_pct > 100 {
820 return Err(SupervisorError::fatal_config(
821 "backpressure critical_threshold_pct must be between 1 and 100",
822 ));
823 }
824 if config.warn_threshold_pct >= config.critical_threshold_pct {
825 return Err(SupervisorError::fatal_config(
826 "backpressure warn_threshold_pct must be less than critical_threshold_pct",
827 ));
828 }
829 if config.window_secs == 0 {
830 return Err(SupervisorError::fatal_config(
831 "backpressure window_secs must be greater than zero",
832 ));
833 }
834 if config.audit_channel_capacity == 0 {
835 return Err(SupervisorError::fatal_config(
836 "backpressure audit_channel_capacity must be greater than zero",
837 ));
838 }
839 Ok(())
840}
841
842fn channel_capacity_for_children(child_count: usize) -> usize {
852 child_count.saturating_add(1)
853}