use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use crate::error::MobError;
use crate::runtime::MobState;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum MobOrchestratorInput {
InitializeOrchestrator,
BindCoordinator,
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "schema-aligned authority input retained even when current shell paths do not construct it"
)
)]
#[cfg_attr(test, allow(dead_code))]
UnbindCoordinator,
StageSpawn,
CompleteSpawn,
StartFlow,
CompleteFlow,
StopOrchestrator,
ResumeOrchestrator,
MarkCompleted,
DestroyOrchestrator,
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "schema-aligned authority input retained even when current shell paths do not construct it"
)
)]
#[cfg_attr(test, allow(dead_code))]
ForceCancelMember,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum MobOrchestratorEffect {
ActivateSupervisor,
DeactivateSupervisor,
FlowActivated,
FlowDeactivated,
EmitOrchestratorNotice,
MemberForceCancelled,
}
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "transition snapshot retained for schema-aligned authority surface and tests"
)
)]
#[cfg_attr(test, allow(dead_code))]
#[derive(Debug)]
pub(crate) struct MobOrchestratorTransition {
pub next_phase: MobState,
pub snapshot: MobOrchestratorSnapshot,
pub effects: Vec<MobOrchestratorEffect>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MobOrchestratorSnapshot {
pub coordinator_bound: bool,
pub pending_spawn_count: u32,
pub active_flow_count: u32,
pub topology_revision: u32,
pub supervisor_active: bool,
}
#[derive(Debug, Clone, Default)]
struct MobOrchestratorFields {
coordinator_bound: bool,
pending_spawn_count: u32,
active_flow_count: u32,
topology_revision: u32,
supervisor_active: bool,
}
impl MobOrchestratorFields {
fn to_snapshot(&self) -> MobOrchestratorSnapshot {
MobOrchestratorSnapshot {
coordinator_bound: self.coordinator_bound,
pending_spawn_count: self.pending_spawn_count,
active_flow_count: self.active_flow_count,
topology_revision: self.topology_revision,
supervisor_active: self.supervisor_active,
}
}
}
mod sealed {
pub trait Sealed {}
}
pub(crate) trait MobOrchestratorMutator: sealed::Sealed {
fn apply(&mut self, input: MobOrchestratorInput)
-> Result<MobOrchestratorTransition, MobError>;
}
pub(crate) struct MobOrchestratorAuthority {
phase: MobState,
fields: MobOrchestratorFields,
observable: Arc<AtomicU8>,
}
impl sealed::Sealed for MobOrchestratorAuthority {}
impl MobOrchestratorAuthority {
pub(crate) fn new(observable: Arc<AtomicU8>) -> Self {
observable.store(MobState::Creating as u8, Ordering::Release);
Self {
phase: MobState::Creating,
fields: MobOrchestratorFields::default(),
observable,
}
}
pub(crate) fn with_phase(observable: Arc<AtomicU8>, phase: MobState) -> Self {
observable.store(phase as u8, Ordering::Release);
Self {
phase,
fields: MobOrchestratorFields::default(),
observable,
}
}
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "authority inspection helper retained for schema-aligned orchestrator introspection"
)
)]
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn phase(&self) -> MobState {
self.phase
}
pub(crate) fn snapshot(&self) -> MobOrchestratorSnapshot {
self.fields.to_snapshot()
}
pub(crate) fn can_accept(&self, input: MobOrchestratorInput) -> bool {
self.evaluate(input).is_ok()
}
fn evaluate(
&self,
input: MobOrchestratorInput,
) -> Result<(MobState, MobOrchestratorFields, Vec<MobOrchestratorEffect>), MobError> {
use MobOrchestratorInput::{
BindCoordinator, CompleteFlow, CompleteSpawn, DestroyOrchestrator, ForceCancelMember,
InitializeOrchestrator, MarkCompleted, ResumeOrchestrator, StageSpawn, StartFlow,
StopOrchestrator, UnbindCoordinator,
};
use MobState::{Completed, Creating, Destroyed, Running, Stopped};
let phase = self.phase;
let mut fields = self.fields.clone();
let mut effects = Vec::new();
let next_phase = match (phase, input) {
(Creating, InitializeOrchestrator) => {
fields.supervisor_active = true;
effects.push(MobOrchestratorEffect::ActivateSupervisor);
Running
}
(Running | Stopped | Completed, BindCoordinator) => {
if fields.coordinator_bound {
return Err(MobError::Internal(
"guard failed: coordinator is already bound".into(),
));
}
fields.coordinator_bound = true;
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
(Running | Stopped | Completed, UnbindCoordinator) => {
if !fields.coordinator_bound {
return Err(MobError::Internal(
"guard failed: coordinator is not bound".into(),
));
}
if fields.pending_spawn_count != 0 {
return Err(MobError::Internal(
"guard failed: pending spawns exist".into(),
));
}
fields.coordinator_bound = false;
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Stopped
}
(Running, StageSpawn) => {
if !fields.coordinator_bound {
return Err(MobError::Internal(
"guard failed: coordinator is not bound (stage spawn requires bound coordinator)".into(),
));
}
fields.pending_spawn_count = fields.pending_spawn_count.saturating_add(1);
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
(Running | Stopped, CompleteSpawn) => {
if fields.pending_spawn_count == 0 {
return Err(MobError::Internal(
"guard failed: no pending spawns to complete".into(),
));
}
fields.pending_spawn_count -= 1;
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
(Running | Completed, StartFlow) => {
if !fields.coordinator_bound {
return Err(MobError::Internal(
"guard failed: coordinator is not bound (start flow requires bound coordinator)".into(),
));
}
fields.active_flow_count = fields.active_flow_count.saturating_add(1);
effects.push(MobOrchestratorEffect::FlowActivated);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
(Running | Completed, CompleteFlow) => {
if fields.active_flow_count == 0 {
return Err(MobError::Internal(
"guard failed: no active flows to complete".into(),
));
}
fields.active_flow_count -= 1;
effects.push(MobOrchestratorEffect::FlowDeactivated);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
(Running | Completed, StopOrchestrator) => {
if fields.active_flow_count != 0 {
return Err(MobError::Internal(
"guard failed: active flows exist (stop requires no active flows)".into(),
));
}
fields.supervisor_active = false;
if fields.coordinator_bound {
fields.coordinator_bound = false;
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
}
effects.push(MobOrchestratorEffect::DeactivateSupervisor);
Stopped
}
(Stopped, ResumeOrchestrator) => {
if !fields.coordinator_bound {
fields.coordinator_bound = true;
fields.topology_revision = fields.topology_revision.saturating_add(1);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
}
fields.supervisor_active = true;
effects.push(MobOrchestratorEffect::ActivateSupervisor);
Running
}
(Running | Stopped, MarkCompleted) => {
if fields.active_flow_count != 0 {
return Err(MobError::Internal(
"guard failed: active flows exist (mark completed requires no active flows)".into(),
));
}
if fields.pending_spawn_count != 0 {
return Err(MobError::Internal(
"guard failed: pending spawns exist (mark completed requires no pending spawns)".into(),
));
}
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Completed
}
(Stopped | Completed, DestroyOrchestrator) => {
if fields.pending_spawn_count != 0 {
return Err(MobError::Internal(
"guard failed: pending spawns exist (destroy requires no pending spawns)"
.into(),
));
}
if fields.active_flow_count != 0 {
return Err(MobError::Internal(
"guard failed: active flows exist (destroy requires no active flows)"
.into(),
));
}
fields.supervisor_active = false;
fields.coordinator_bound = false;
effects.push(MobOrchestratorEffect::DeactivateSupervisor);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Destroyed
}
(Running, ForceCancelMember) => {
if !fields.coordinator_bound {
return Err(MobError::Internal(
"guard failed: coordinator is not bound (force cancel requires bound coordinator)".into(),
));
}
effects.push(MobOrchestratorEffect::MemberForceCancelled);
effects.push(MobOrchestratorEffect::EmitOrchestratorNotice);
Running
}
_ => {
let target = match input {
InitializeOrchestrator
| BindCoordinator
| ResumeOrchestrator
| StageSpawn
| CompleteSpawn
| StartFlow
| CompleteFlow
| ForceCancelMember => Running,
UnbindCoordinator | StopOrchestrator => Stopped,
MarkCompleted => Completed,
DestroyOrchestrator => Destroyed,
};
return Err(MobError::InvalidTransition {
from: phase,
to: target,
});
}
};
Ok((next_phase, fields, effects))
}
}
impl MobOrchestratorMutator for MobOrchestratorAuthority {
fn apply(
&mut self,
input: MobOrchestratorInput,
) -> Result<MobOrchestratorTransition, MobError> {
let (next_phase, next_fields, effects) = self.evaluate(input)?;
self.phase = next_phase;
self.fields = next_fields;
self.observable.store(next_phase as u8, Ordering::Release);
Ok(MobOrchestratorTransition {
next_phase,
snapshot: self.fields.to_snapshot(),
effects,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_authority() -> MobOrchestratorAuthority {
MobOrchestratorAuthority::new(Arc::new(AtomicU8::new(0)))
}
fn make_running_authority() -> MobOrchestratorAuthority {
let mut auth = make_authority();
auth.apply(MobOrchestratorInput::InitializeOrchestrator)
.expect("init");
auth
}
#[test]
fn initialize_transitions_to_running_and_activates_supervisor() {
let mut auth = make_authority();
let t = auth
.apply(MobOrchestratorInput::InitializeOrchestrator)
.expect("init should succeed from Creating");
assert_eq!(t.next_phase, MobState::Running);
assert!(t.snapshot.supervisor_active);
assert!(
t.effects
.contains(&MobOrchestratorEffect::ActivateSupervisor)
);
}
#[test]
fn bind_coordinator_sets_bound_and_increments_revision() {
let mut auth = make_running_authority();
let t = auth
.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind should succeed");
assert!(t.snapshot.coordinator_bound);
assert_eq!(t.snapshot.topology_revision, 1);
assert!(
t.effects
.contains(&MobOrchestratorEffect::EmitOrchestratorNotice)
);
}
#[test]
fn bind_coordinator_rejects_when_already_bound() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("first bind");
let result = auth.apply(MobOrchestratorInput::BindCoordinator);
assert!(result.is_err(), "second bind should fail");
}
#[test]
fn unbind_coordinator_clears_bound_and_transitions_to_stopped() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
let t = auth
.apply(MobOrchestratorInput::UnbindCoordinator)
.expect("unbind should succeed");
assert!(!t.snapshot.coordinator_bound);
assert_eq!(t.next_phase, MobState::Stopped);
assert_eq!(t.snapshot.topology_revision, 2);
}
#[test]
fn unbind_rejects_when_not_bound() {
let mut auth = make_running_authority();
let result = auth.apply(MobOrchestratorInput::UnbindCoordinator);
assert!(result.is_err());
}
#[test]
fn unbind_rejects_with_pending_spawns() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StageSpawn)
.expect("stage spawn");
let result = auth.apply(MobOrchestratorInput::UnbindCoordinator);
assert!(result.is_err(), "unbind should fail with pending spawns");
}
#[test]
fn stage_spawn_increments_pending_and_revision() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
let t = auth
.apply(MobOrchestratorInput::StageSpawn)
.expect("stage spawn");
assert_eq!(t.snapshot.pending_spawn_count, 1);
assert_eq!(t.snapshot.topology_revision, 2);
}
#[test]
fn stage_spawn_rejects_without_coordinator() {
let mut auth = make_running_authority();
let result = auth.apply(MobOrchestratorInput::StageSpawn);
assert!(result.is_err());
}
#[test]
fn complete_spawn_decrements_pending() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StageSpawn).expect("stage");
let t = auth
.apply(MobOrchestratorInput::CompleteSpawn)
.expect("complete spawn");
assert_eq!(t.snapshot.pending_spawn_count, 0);
}
#[test]
fn complete_spawn_rejects_zero_pending() {
let mut auth = make_running_authority();
let result = auth.apply(MobOrchestratorInput::CompleteSpawn);
assert!(result.is_err());
}
#[test]
fn start_flow_increments_active_and_emits_activated() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
let t = auth
.apply(MobOrchestratorInput::StartFlow)
.expect("start flow");
assert_eq!(t.snapshot.active_flow_count, 1);
assert!(t.effects.contains(&MobOrchestratorEffect::FlowActivated));
}
#[test]
fn complete_flow_decrements_active_and_emits_deactivated() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StartFlow).expect("start");
let t = auth
.apply(MobOrchestratorInput::CompleteFlow)
.expect("complete flow");
assert_eq!(t.snapshot.active_flow_count, 0);
assert!(t.effects.contains(&MobOrchestratorEffect::FlowDeactivated));
}
#[test]
fn complete_flow_rejects_zero_active() {
let mut auth = make_running_authority();
let result = auth.apply(MobOrchestratorInput::CompleteFlow);
assert!(result.is_err());
}
#[test]
fn stop_deactivates_supervisor() {
let mut auth = make_running_authority();
let t = auth
.apply(MobOrchestratorInput::StopOrchestrator)
.expect("stop");
assert_eq!(t.next_phase, MobState::Stopped);
assert!(!t.snapshot.supervisor_active);
assert!(!t.snapshot.coordinator_bound);
assert!(
t.effects
.contains(&MobOrchestratorEffect::DeactivateSupervisor)
);
}
#[test]
fn stop_rejects_with_active_flows() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StartFlow)
.expect("start flow");
let result = auth.apply(MobOrchestratorInput::StopOrchestrator);
assert!(result.is_err());
}
#[test]
fn resume_reactivates_supervisor_and_rebinds_coordinator() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StopOrchestrator)
.expect("stop");
let t = auth
.apply(MobOrchestratorInput::ResumeOrchestrator)
.expect("resume");
assert_eq!(t.next_phase, MobState::Running);
assert!(t.snapshot.supervisor_active);
assert!(t.snapshot.coordinator_bound);
assert!(
t.effects
.contains(&MobOrchestratorEffect::ActivateSupervisor)
);
}
#[test]
fn resume_binds_when_coordinator_is_absent() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::StopOrchestrator)
.expect("stop");
let t = auth
.apply(MobOrchestratorInput::ResumeOrchestrator)
.expect("resume should rebind coordinator");
assert!(t.snapshot.coordinator_bound);
assert!(t.snapshot.supervisor_active);
}
#[test]
fn mark_completed_transitions_to_completed() {
let mut auth = make_running_authority();
let t = auth
.apply(MobOrchestratorInput::MarkCompleted)
.expect("mark completed");
assert_eq!(t.next_phase, MobState::Completed);
}
#[test]
fn mark_completed_rejects_with_active_flows() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StartFlow).expect("start");
let result = auth.apply(MobOrchestratorInput::MarkCompleted);
assert!(result.is_err());
}
#[test]
fn mark_completed_rejects_with_pending_spawns() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StageSpawn).expect("stage");
let result = auth.apply(MobOrchestratorInput::MarkCompleted);
assert!(result.is_err());
}
#[test]
fn destroy_from_stopped() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::StopOrchestrator)
.expect("stop");
let t = auth
.apply(MobOrchestratorInput::DestroyOrchestrator)
.expect("destroy");
assert_eq!(t.next_phase, MobState::Destroyed);
assert!(!t.snapshot.supervisor_active);
assert!(!t.snapshot.coordinator_bound);
assert!(
t.effects
.contains(&MobOrchestratorEffect::DeactivateSupervisor)
);
assert!(
t.effects
.contains(&MobOrchestratorEffect::EmitOrchestratorNotice)
);
}
#[test]
fn destroy_from_completed() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::MarkCompleted)
.expect("complete");
let t = auth
.apply(MobOrchestratorInput::DestroyOrchestrator)
.expect("destroy");
assert_eq!(t.next_phase, MobState::Destroyed);
}
#[test]
fn destroy_rejects_from_running() {
let mut auth = make_running_authority();
let result = auth.apply(MobOrchestratorInput::DestroyOrchestrator);
assert!(result.is_err(), "destroy from Running should fail");
}
#[test]
fn destroy_rejects_from_creating() {
let mut auth = make_authority();
let result = auth.apply(MobOrchestratorInput::DestroyOrchestrator);
assert!(result.is_err(), "destroy from Creating should fail");
}
#[test]
fn force_cancel_member_emits_effect() {
let mut auth = make_running_authority();
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
let t = auth
.apply(MobOrchestratorInput::ForceCancelMember)
.expect("force cancel");
assert_eq!(t.next_phase, MobState::Running);
assert!(
t.effects
.contains(&MobOrchestratorEffect::MemberForceCancelled)
);
}
#[test]
fn observable_cache_updated_on_transition() {
let observable = Arc::new(AtomicU8::new(0));
let mut auth = MobOrchestratorAuthority::new(observable.clone());
assert_eq!(observable.load(Ordering::Acquire), MobState::Creating as u8);
auth.apply(MobOrchestratorInput::InitializeOrchestrator)
.expect("init");
assert_eq!(observable.load(Ordering::Acquire), MobState::Running as u8);
}
#[test]
fn phase_unchanged_on_rejected_transition() {
let mut auth = make_authority();
let result = auth.apply(MobOrchestratorInput::StopOrchestrator);
assert!(result.is_err());
assert_eq!(auth.phase(), MobState::Creating);
}
#[test]
fn can_accept_probes_without_mutation() {
let auth = make_running_authority();
assert!(auth.can_accept(MobOrchestratorInput::StopOrchestrator));
assert!(!auth.can_accept(MobOrchestratorInput::InitializeOrchestrator));
assert_eq!(auth.phase(), MobState::Running);
}
#[test]
fn full_lifecycle_creating_to_destroyed() {
let mut auth = make_authority();
auth.apply(MobOrchestratorInput::InitializeOrchestrator)
.expect("init");
auth.apply(MobOrchestratorInput::BindCoordinator)
.expect("bind");
auth.apply(MobOrchestratorInput::StageSpawn).expect("stage");
auth.apply(MobOrchestratorInput::CompleteSpawn)
.expect("complete spawn");
auth.apply(MobOrchestratorInput::StartFlow)
.expect("start flow");
auth.apply(MobOrchestratorInput::CompleteFlow)
.expect("complete flow");
auth.apply(MobOrchestratorInput::StopOrchestrator)
.expect("stop");
assert_eq!(auth.phase(), MobState::Stopped);
auth.apply(MobOrchestratorInput::DestroyOrchestrator)
.expect("destroy");
assert_eq!(auth.phase(), MobState::Destroyed);
}
#[test]
fn with_phase_initializes_to_given_phase() {
let observable = Arc::new(AtomicU8::new(0));
let auth = MobOrchestratorAuthority::with_phase(observable.clone(), MobState::Completed);
assert_eq!(auth.phase(), MobState::Completed);
assert_eq!(
observable.load(Ordering::Acquire),
MobState::Completed as u8
);
}
#[test]
fn with_phase_stopped_accepts_resume() {
let observable = Arc::new(AtomicU8::new(0));
let mut auth = MobOrchestratorAuthority::with_phase(observable.clone(), MobState::Stopped);
assert_eq!(auth.phase(), MobState::Stopped);
let t = auth
.apply(MobOrchestratorInput::ResumeOrchestrator)
.expect("resume from stopped should succeed");
assert_eq!(t.next_phase, MobState::Running);
}
}