1use crate::config::audit::AuditConfig;
7use crate::config::configurable::{
8 DashboardIpcConfig, ObservabilityConfig, PolicyConfig, ShutdownConfig, SupervisorConfig,
9 SupervisorRootConfig,
10};
11use crate::config::ipc_security::IpcSecurityConfig;
12use crate::config::policy::{
13 ChildStrategyOverrideConfig, GroupConfig, GroupDependencyConfig, GroupStrategyConfig,
14 SeverityDefaultConfig,
15};
16use crate::spec::child::ChildSpec;
17use crate::spec::child_declaration::{ChildDeclaration, CompensatingRecord, PendingChild, Phase};
18use crate::spec::supervisor::BackpressureConfig;
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21use std::time::Duration;
22use uuid::Uuid;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct ConfigState {
27 pub supervisor: SupervisorRootConfig,
29 pub policy: PolicyConfig,
31 pub shutdown: ShutdownConfig,
33 pub observability: ObservabilityConfig,
35 pub audit: AuditConfig,
37 pub backpressure: BackpressureConfig,
39 pub groups: Vec<GroupConfig>,
41 pub group_strategies: Vec<GroupStrategyConfig>,
43 pub group_dependencies: Vec<GroupDependencyConfig>,
45 pub child_strategy_overrides: Vec<ChildStrategyOverrideConfig>,
47 pub severity_defaults: Vec<SeverityDefaultConfig>,
49 pub dashboard: Option<DashboardIpcConfig>,
51 #[serde(default)]
53 pub children: Vec<ChildSpec>,
54 #[serde(default)]
56 pub spec_hash: String,
57 #[serde(default)]
59 pub pending_additions: Vec<PendingChild>,
60 #[serde(default)]
62 pub compensating_records: Vec<CompensatingRecord>,
63}
64
65impl PartialEq for ConfigState {
68 fn eq(&self, other: &Self) -> bool {
70 self.supervisor == other.supervisor
71 && self.policy == other.policy
72 && self.shutdown == other.shutdown
73 && self.observability == other.observability
74 && self.audit == other.audit
75 && self.backpressure == other.backpressure
76 && self.groups == other.groups
77 && self.group_strategies == other.group_strategies
78 && self.group_dependencies == other.group_dependencies
79 && self.child_strategy_overrides == other.child_strategy_overrides
80 && self.severity_defaults == other.severity_defaults
81 && self.dashboard == other.dashboard
82 && self.spec_hash == other.spec_hash
83 && self.pending_additions == other.pending_additions
84 }
85}
86
87impl TryFrom<SupervisorConfig> for ConfigState {
88 type Error = crate::error::types::SupervisorError;
89
90 fn try_from(config: SupervisorConfig) -> Result<Self, Self::Error> {
92 validate_policy(&config.policy)?;
93 validate_shutdown(&config.shutdown)?;
94 validate_observability(&config.observability)?;
95 validate_audit(&config.audit)?;
96 validate_backpressure(&config.backpressure)?;
97 validate_lower_policy(&config.policy)?;
98 validate_supervisor_root(&config.supervisor)?;
99 validate_group_inputs(
100 &config.groups,
101 &config.group_strategies,
102 &config.group_dependencies,
103 &config.child_strategy_overrides,
104 &config.severity_defaults,
105 &config.children,
106 )?;
107 let dashboard = dashboard_with_default_security(config.dashboard);
108 validate_dashboard(dashboard.as_ref())?;
109
110 use crate::spec::child_declaration::validate_child_declaration;
112 use crate::tree::order::kahn_sort;
113
114 let all_names: HashSet<String> = config.children.iter().map(|c| c.name.clone()).collect();
116
117 for child in &config.children {
119 validate_child_declaration(child, &all_names).map_err(|e| {
120 crate::error::types::SupervisorError::fatal_config(format!(
121 "Child declaration validation failed at {}: {}",
122 e.field_path, e.reason
123 ))
124 })?;
125 }
126
127 let child_specs: Vec<ChildSpec> = config
129 .children
130 .into_iter()
131 .map(ChildSpec::try_from)
132 .collect::<Result<Vec<_>, _>>()
133 .map_err(|e| {
134 crate::error::types::SupervisorError::fatal_config(format!(
135 "Child declaration conversion failed at {}: {}",
136 e.field_path, e.reason
137 ))
138 })?;
139
140 let _sorted = kahn_sort(&child_specs).map_err(|cycle_nodes| {
142 let node_names: Vec<String> = cycle_nodes.iter().map(|id| id.value.clone()).collect();
143 crate::error::types::SupervisorError::fatal_config(format!(
144 "Dependency cycle detected among children: {:?}",
145 node_names
146 ))
147 })?;
148
149 let spec_hash = String::new(); Ok(Self {
152 supervisor: config.supervisor,
153 policy: config.policy,
154 shutdown: config.shutdown,
155 observability: config.observability,
156 audit: config.audit,
157 backpressure: config.backpressure,
158 groups: config.groups,
159 group_strategies: config.group_strategies,
160 group_dependencies: config.group_dependencies,
161 child_strategy_overrides: config.child_strategy_overrides,
162 severity_defaults: config.severity_defaults,
163 dashboard,
164 children: child_specs,
165 spec_hash,
166 pending_additions: Vec::new(),
167 compensating_records: Vec::new(),
168 })
169 }
170}
171
172impl ConfigState {
173 pub fn begin_transaction(
200 &mut self,
201 declaration: ChildDeclaration,
202 ) -> Result<Uuid, crate::error::types::SupervisorError> {
203 if self.has_pending_transaction() {
204 return Err(crate::error::types::SupervisorError::fatal_config(
205 "add_child transaction already in progress",
206 ));
207 }
208 let transaction_id = Uuid::new_v4();
209 let child_spec = Box::new(ChildSpec::try_from(declaration.clone()).map_err(|e| {
210 crate::error::types::SupervisorError::fatal_config(format!(
211 "Child declaration conversion failed: {}",
212 e.reason
213 ))
214 })?);
215 let pending = PendingChild {
216 transaction_id,
217 declaration,
218 child_spec,
219 phase: Phase::Parsed,
220 created_at_unix_nanos: std::time::SystemTime::now()
221 .duration_since(std::time::UNIX_EPOCH)
222 .unwrap_or_default()
223 .as_nanos(),
224 };
225 self.pending_additions.push(pending);
226 Ok(transaction_id)
227 }
228
229 pub fn commit_transaction(
235 &mut self,
236 transaction_id: Uuid,
237 ) -> Result<(), crate::error::types::SupervisorError> {
238 let idx = self
239 .pending_additions
240 .iter()
241 .position(|p| p.transaction_id == transaction_id)
242 .ok_or_else(|| {
243 crate::error::types::SupervisorError::fatal_config(
244 "transaction not found for commit",
245 )
246 })?;
247
248 let mut pending = self.pending_additions.remove(idx);
249 pending.phase = Phase::Committed;
250
251 let spec = (*pending.child_spec).clone();
253 self.children.push(spec);
254
255 self.spec_hash = format!("sha256-{}", transaction_id);
257
258 Ok(())
259 }
260
261 pub fn rollback_transaction(
268 &mut self,
269 transaction_id: Uuid,
270 error: String,
271 ) -> Result<(), crate::error::types::SupervisorError> {
272 let idx = self
273 .pending_additions
274 .iter()
275 .position(|p| p.transaction_id == transaction_id);
276
277 let pending = if let Some(i) = idx {
278 self.pending_additions.remove(i)
279 } else {
280 return Err(crate::error::types::SupervisorError::fatal_config(
281 "transaction not found for rollback",
282 ));
283 };
284
285 let record = CompensatingRecord {
287 transaction_id,
288 operation: "add_child".to_string(),
289 state: "compensated".to_string(),
290 child_name: pending.declaration.name.clone(),
291 declaration_hash: format!("sha256-{}", transaction_id),
292 error: Some(error),
293 correlation_id: None,
294 child_id: Some(pending.child_spec.id.value.clone()),
295 created_at_unix_nanos: std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap_or_default()
298 .as_nanos(),
299 };
300 self.compensating_records.push(record);
301
302 Ok(())
303 }
304
305 pub fn has_pending_transaction(&self) -> bool {
307 self.pending_additions
308 .iter()
309 .any(|p| p.phase != Phase::Committed && p.phase != Phase::Compensated)
310 }
311
312 pub fn hash(&self) -> &str {
314 &self.spec_hash
315 }
316
317 pub fn recover_pending_transactions(&mut self) {
323 let mut recovered = Vec::new();
324 for record in self.compensating_records.iter_mut() {
325 if record.state == "pending" {
326 record.state = "compensated".to_string();
329 recovered.push(record.transaction_id);
330 }
331 }
332 if !recovered.is_empty() {
333 #[cfg(debug_assertions)]
335 eprintln!("Recovered {} pending transactions", recovered.len());
336 }
337 }
338
339 pub fn to_supervisor_spec(
367 &self,
368 ) -> Result<crate::spec::supervisor::SupervisorSpec, crate::error::types::SupervisorError> {
369 let mut spec = crate::spec::supervisor::SupervisorSpec::root(self.children.clone());
370 spec.strategy = self.supervisor.strategy;
371 spec.config_version = self.config_version();
372 spec.supervisor_failure_limit = self.policy.supervisor_failure_limit;
373 spec.escalation_policy = self.supervisor.escalation_policy;
374 spec.control_channel_capacity = self.observability.event_journal_capacity;
375 spec.event_channel_capacity = self.observability.event_journal_capacity;
376 spec.backpressure_config = self.backpressure.clone();
377 spec.group_configs = self
378 .groups
379 .iter()
380 .map(GroupConfig::to_runtime)
381 .collect::<Vec<_>>();
382 spec.group_strategies = self
383 .group_strategies
384 .iter()
385 .map(GroupStrategyConfig::to_runtime)
386 .collect::<Vec<_>>();
387 spec.group_dependencies = self
388 .group_dependencies
389 .iter()
390 .map(GroupDependencyConfig::to_runtime)
391 .collect::<Vec<_>>();
392 spec.child_strategy_overrides = self
393 .child_strategy_overrides
394 .iter()
395 .map(ChildStrategyOverrideConfig::to_runtime)
396 .collect::<Vec<_>>();
397 spec.severity_defaults = self
398 .severity_defaults
399 .iter()
400 .map(|default| (default.task_role, default.severity))
401 .collect::<HashMap<_, _>>();
402 spec.dynamic_supervisor_policy = self.supervisor.dynamic_supervisor.to_runtime();
403 spec.meltdown_policy = self.policy.meltdown.to_runtime();
404 spec.failure_window_config = self.policy.failure_window.to_runtime();
405 spec.restart_budget_config = self.policy.restart_budget.to_runtime();
406 spec.pipeline_journal_capacity = self.policy.supervision_pipeline.journal_capacity;
407 spec.pipeline_subscriber_capacity = self.policy.supervision_pipeline.subscriber_capacity;
408 spec.concurrent_restart_limit = self.policy.supervision_pipeline.concurrent_restart_limit;
409 spec.default_backoff_policy = crate::spec::child::BackoffPolicy::new(
410 Duration::from_millis(self.policy.initial_backoff_ms),
411 Duration::from_millis(self.policy.max_backoff_ms),
412 self.policy.jitter_ratio,
413 );
414 spec.default_health_policy = crate::spec::child::HealthPolicy::new(
415 Duration::from_millis(self.policy.heartbeat_interval_ms),
416 Duration::from_millis(self.policy.stale_after_ms),
417 );
418 spec.default_shutdown_policy = crate::spec::child::ShutdownPolicy::new(
419 Duration::from_millis(self.shutdown.graceful_timeout_ms),
420 Duration::from_millis(self.shutdown.abort_wait_ms),
421 );
422 spec.restart_limit = Some(crate::spec::supervisor::RestartLimit::new(
423 self.policy.child_restart_limit,
424 Duration::from_millis(self.policy.child_restart_window_ms),
425 ));
426 spec.validate()?;
427 Ok(spec)
428 }
429
430 fn config_version(&self) -> String {
440 format!(
441 "supervisor-{:?}-policy-{}-{}-shutdown-{}-observe-{}-backpressure-{:?}-{}-{}",
442 self.supervisor.strategy,
443 self.policy.child_restart_limit,
444 self.policy.supervisor_failure_limit,
445 self.shutdown.graceful_timeout_ms,
446 self.observability.event_journal_capacity,
447 self.backpressure.strategy,
448 self.backpressure.warn_threshold_pct,
449 self.backpressure.critical_threshold_pct
450 )
451 }
452}
453
454fn validate_policy(policy: &PolicyConfig) -> Result<(), crate::error::types::SupervisorError> {
464 validate_positive(policy.child_restart_limit, "policy.child_restart_limit")?;
465 validate_positive(
466 policy.supervisor_failure_limit,
467 "policy.supervisor_failure_limit",
468 )?;
469 validate_positive(
470 policy.child_restart_window_ms,
471 "policy.child_restart_window_ms",
472 )?;
473 validate_positive(
474 policy.supervisor_failure_window_ms,
475 "policy.supervisor_failure_window_ms",
476 )?;
477 validate_positive(policy.initial_backoff_ms, "policy.initial_backoff_ms")?;
478 validate_positive(policy.max_backoff_ms, "policy.max_backoff_ms")?;
479 validate_positive(policy.heartbeat_interval_ms, "policy.heartbeat_interval_ms")?;
480 validate_positive(policy.stale_after_ms, "policy.stale_after_ms")?;
481 if policy.initial_backoff_ms > policy.max_backoff_ms {
482 return Err(crate::error::types::SupervisorError::fatal_config(
483 "policy.initial_backoff_ms must be less than or equal to policy.max_backoff_ms",
484 ));
485 }
486 if !(0.0..=1.0).contains(&policy.jitter_ratio) {
487 return Err(crate::error::types::SupervisorError::fatal_config(
488 "policy.jitter_ratio must be between 0 and 1",
489 ));
490 }
491 Ok(())
492}
493
494fn validate_lower_policy(
504 policy: &PolicyConfig,
505) -> Result<(), crate::error::types::SupervisorError> {
506 validate_positive(
507 policy.restart_budget.window_secs,
508 "policy.restart_budget.window_secs",
509 )?;
510 validate_positive(
511 policy.restart_budget.max_burst as u64,
512 "policy.restart_budget.max_burst",
513 )?;
514 if !(0.0..=1000.0).contains(&policy.restart_budget.recovery_rate_per_sec)
515 || policy.restart_budget.recovery_rate_per_sec == 0.0
516 {
517 return Err(crate::error::types::SupervisorError::fatal_config(
518 "policy.restart_budget.recovery_rate_per_sec must be greater than 0 and less than or equal to 1000",
519 ));
520 }
521
522 match policy.failure_window.mode {
523 crate::config::policy::FailureWindowMode::TimeSliding => {
524 validate_positive(
525 policy.failure_window.window_secs,
526 "policy.failure_window.window_secs",
527 )?;
528 }
529 crate::config::policy::FailureWindowMode::CountSliding => {
530 validate_positive(
531 policy.failure_window.max_count as u64,
532 "policy.failure_window.max_count",
533 )?;
534 }
535 }
536 validate_positive(
537 policy.failure_window.threshold as u64,
538 "policy.failure_window.threshold",
539 )?;
540
541 validate_positive(
542 policy.meltdown.child_max_restarts as u64,
543 "policy.meltdown.child_max_restarts",
544 )?;
545 validate_positive(
546 policy.meltdown.child_window_secs,
547 "policy.meltdown.child_window_secs",
548 )?;
549 validate_positive(
550 policy.meltdown.group_max_failures as u64,
551 "policy.meltdown.group_max_failures",
552 )?;
553 validate_positive(
554 policy.meltdown.group_window_secs,
555 "policy.meltdown.group_window_secs",
556 )?;
557 validate_positive(
558 policy.meltdown.supervisor_max_failures as u64,
559 "policy.meltdown.supervisor_max_failures",
560 )?;
561 validate_positive(
562 policy.meltdown.supervisor_window_secs,
563 "policy.meltdown.supervisor_window_secs",
564 )?;
565 validate_positive(
566 policy.meltdown.reset_after_secs,
567 "policy.meltdown.reset_after_secs",
568 )?;
569 validate_positive(
570 policy.supervision_pipeline.journal_capacity as u64,
571 "policy.supervision_pipeline.journal_capacity",
572 )?;
573 validate_positive(
574 policy.supervision_pipeline.subscriber_capacity as u64,
575 "policy.supervision_pipeline.subscriber_capacity",
576 )?;
577 validate_positive(
578 policy.supervision_pipeline.concurrent_restart_limit as u64,
579 "policy.supervision_pipeline.concurrent_restart_limit",
580 )
581}
582
583fn validate_supervisor_root(
593 supervisor: &SupervisorRootConfig,
594) -> Result<(), crate::error::types::SupervisorError> {
595 if supervisor.dynamic_supervisor.child_limit == Some(0) {
596 return Err(crate::error::types::SupervisorError::fatal_config(
597 "supervisor.dynamic_supervisor.child_limit must be greater than zero",
598 ));
599 }
600 Ok(())
601}
602
603fn validate_group_inputs(
618 groups: &[GroupConfig],
619 group_strategies: &[GroupStrategyConfig],
620 group_dependencies: &[GroupDependencyConfig],
621 child_strategy_overrides: &[ChildStrategyOverrideConfig],
622 severity_defaults: &[SeverityDefaultConfig],
623 children: &[ChildDeclaration],
624) -> Result<(), crate::error::types::SupervisorError> {
625 let child_names = children
626 .iter()
627 .map(|child| child.name.as_str())
628 .collect::<HashSet<_>>();
629 let mut group_names = HashSet::new();
630 for group in groups {
631 if group.name.trim().is_empty() {
632 return Err(crate::error::types::SupervisorError::fatal_config(
633 "groups[].name must not be empty",
634 ));
635 }
636 if !group_names.insert(group.name.as_str()) {
637 return Err(crate::error::types::SupervisorError::fatal_config(format!(
638 "duplicate group name '{}'",
639 group.name
640 )));
641 }
642 for child in &group.children {
643 if !child_names.contains(child.as_str()) {
644 return Err(crate::error::types::SupervisorError::fatal_config(format!(
645 "group '{}' references unknown child '{}'",
646 group.name, child
647 )));
648 }
649 }
650 }
651
652 for strategy in group_strategies {
653 if !group_names.contains(strategy.group.as_str()) {
654 return Err(crate::error::types::SupervisorError::fatal_config(format!(
655 "group_strategies references unknown group '{}'",
656 strategy.group
657 )));
658 }
659 validate_restart_limit_input(
660 strategy.restart_limit.as_ref(),
661 "group_strategies.restart_limit",
662 )?;
663 }
664
665 for dependency in group_dependencies {
666 if !group_names.contains(dependency.from_group.as_str()) {
667 return Err(crate::error::types::SupervisorError::fatal_config(format!(
668 "group_dependencies references unknown from_group '{}'",
669 dependency.from_group
670 )));
671 }
672 if !group_names.contains(dependency.to_group.as_str()) {
673 return Err(crate::error::types::SupervisorError::fatal_config(format!(
674 "group_dependencies references unknown to_group '{}'",
675 dependency.to_group
676 )));
677 }
678 }
679
680 for child_override in child_strategy_overrides {
681 if !child_names.contains(child_override.child_id.as_str()) {
682 return Err(crate::error::types::SupervisorError::fatal_config(format!(
683 "child_strategy_overrides references unknown child '{}'",
684 child_override.child_id
685 )));
686 }
687 validate_restart_limit_input(
688 child_override.restart_limit.as_ref(),
689 "child_strategy_overrides.restart_limit",
690 )?;
691 }
692
693 let mut roles = HashSet::new();
694 for default in severity_defaults {
695 if !roles.insert(default.task_role) {
696 return Err(crate::error::types::SupervisorError::fatal_config(format!(
697 "duplicate severity default for task role '{}'",
698 default.task_role
699 )));
700 }
701 }
702 Ok(())
703}
704
705fn validate_restart_limit_input(
716 limit: Option<&crate::config::policy::RestartLimitConfig>,
717 path: &str,
718) -> Result<(), crate::error::types::SupervisorError> {
719 let Some(limit) = limit else {
720 return Ok(());
721 };
722 validate_positive(limit.max_restarts as u64, &format!("{path}.max_restarts"))?;
723 validate_positive(limit.window_ms, &format!("{path}.window_ms"))
724}
725
726fn validate_shutdown(
736 shutdown: &ShutdownConfig,
737) -> Result<(), crate::error::types::SupervisorError> {
738 validate_positive(shutdown.graceful_timeout_ms, "shutdown.graceful_timeout_ms")?;
739 validate_positive(shutdown.abort_wait_ms, "shutdown.abort_wait_ms")
740}
741
742fn validate_observability(
752 observability: &ObservabilityConfig,
753) -> Result<(), crate::error::types::SupervisorError> {
754 validate_positive(
755 observability.event_journal_capacity as u64,
756 "observability.event_journal_capacity",
757 )
758}
759
760fn validate_audit(audit: &AuditConfig) -> Result<(), crate::error::types::SupervisorError> {
770 match audit.backend.as_str() {
771 "memory" => {}
772 "file" => {
773 let path = audit.file_path.as_deref().unwrap_or_default().trim();
774 if path.is_empty() {
775 return Err(crate::error::types::SupervisorError::fatal_config(
776 "audit.file_path is required when audit.backend is file",
777 ));
778 }
779 }
780 backend => {
781 return Err(crate::error::types::SupervisorError::fatal_config(format!(
782 "audit.backend must be memory or file, got {backend}"
783 )));
784 }
785 }
786
787 match audit.failure_strategy.as_str() {
788 "fail_closed" | "defer_bounded" => {}
789 strategy => {
790 return Err(crate::error::types::SupervisorError::fatal_config(format!(
791 "audit.failure_strategy must be fail_closed or defer_bounded, got {strategy}"
792 )));
793 }
794 }
795
796 validate_positive(audit.max_defer_queue as u64, "audit.max_defer_queue")
797}
798
799fn validate_backpressure(
809 backpressure: &BackpressureConfig,
810) -> Result<(), crate::error::types::SupervisorError> {
811 if backpressure.warn_threshold_pct == 0 || backpressure.warn_threshold_pct > 100 {
812 return Err(crate::error::types::SupervisorError::fatal_config(
813 "backpressure.warn_threshold_pct must be between 1 and 100",
814 ));
815 }
816 if backpressure.critical_threshold_pct == 0 || backpressure.critical_threshold_pct > 100 {
817 return Err(crate::error::types::SupervisorError::fatal_config(
818 "backpressure.critical_threshold_pct must be between 1 and 100",
819 ));
820 }
821 if backpressure.warn_threshold_pct >= backpressure.critical_threshold_pct {
822 return Err(crate::error::types::SupervisorError::fatal_config(
823 "backpressure.warn_threshold_pct must be less than backpressure.critical_threshold_pct",
824 ));
825 }
826 validate_positive(backpressure.window_secs, "backpressure.window_secs")?;
827 validate_positive(
828 backpressure.audit_channel_capacity as u64,
829 "backpressure.audit_channel_capacity",
830 )
831}
832
833fn dashboard_with_default_security(
844 mut dashboard: Option<DashboardIpcConfig>,
845) -> Option<DashboardIpcConfig> {
846 let Some(dashboard_config) = dashboard.as_mut() else {
847 return dashboard;
848 };
849 if !dashboard_config.enabled {
850 return dashboard;
851 }
852
853 dashboard_config
854 .security_config
855 .get_or_insert_with(IpcSecurityConfig::default);
856 dashboard
857}
858
859fn validate_dashboard(
869 dashboard: Option<&DashboardIpcConfig>,
870) -> Result<(), crate::error::types::SupervisorError> {
871 crate::dashboard::config::validate_dashboard_ipc_config(dashboard)
872 .map(|_| ())
873 .map_err(|error| crate::error::types::SupervisorError::fatal_config(error.to_string()))
874}
875
876fn validate_positive(
887 value: impl Into<u64>,
888 name: &str,
889) -> Result<(), crate::error::types::SupervisorError> {
890 if value.into() == 0 {
891 Err(crate::error::types::SupervisorError::fatal_config(format!(
892 "{name} must be greater than zero"
893 )))
894 } else {
895 Ok(())
896 }
897}