use super::events::MobEventEmitter;
use super::handle::MobHandle;
use crate::error::MobError;
use crate::ids::{RunId, StepId};
use crate::run::FlowRunConfig;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[cfg(test)]
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
const ESCALATION_TURN_TIMEOUT: Duration = Duration::from_secs(2);
#[cfg(test)]
static TEST_ESCALATION_TURN_TIMEOUT_MS: AtomicU64 = AtomicU64::new(0);
fn escalation_turn_timeout() -> Duration {
#[cfg(test)]
{
let override_ms = TEST_ESCALATION_TURN_TIMEOUT_MS.load(Ordering::Relaxed);
if override_ms > 0 {
return Duration::from_millis(override_ms);
}
}
ESCALATION_TURN_TIMEOUT
}
#[cfg(test)]
pub(crate) fn set_escalation_turn_timeout_for_tests(timeout: Duration) {
TEST_ESCALATION_TURN_TIMEOUT_MS.store(timeout.as_millis() as u64, Ordering::Relaxed);
}
#[cfg(test)]
pub(crate) fn reset_escalation_turn_timeout_for_tests() {
TEST_ESCALATION_TURN_TIMEOUT_MS.store(0, Ordering::Relaxed);
}
pub struct Supervisor {
handle: MobHandle,
emitter: MobEventEmitter,
}
impl Supervisor {
pub fn new(handle: MobHandle, emitter: MobEventEmitter) -> Self {
Self { handle, emitter }
}
pub async fn escalate(
&self,
config: &FlowRunConfig,
run_id: &RunId,
step_id: &StepId,
reason: &str,
) -> Result<(), MobError> {
let supervisor_role = config
.supervisor
.as_ref()
.ok_or_else(|| MobError::SupervisorEscalation("supervisor not configured".into()))?
.role
.clone();
let escalation_target = self
.handle
.list_members()
.await
.into_iter()
.find(|entry| entry.profile == supervisor_role)
.ok_or_else(|| {
MobError::SupervisorEscalation(format!(
"no active supervisor member for role '{supervisor_role}'"
))
})?;
let escalation_turn_timeout = escalation_turn_timeout();
tokio::time::timeout(
escalation_turn_timeout,
self.handle.internal_turn(
escalation_target.meerkat_id.clone(),
format!("Supervisor escalation for run '{run_id}', step '{step_id}': {reason}"),
),
)
.await
.map_err(|_| {
MobError::SupervisorEscalation(format!(
"supervisor escalation timed out after {}ms",
escalation_turn_timeout.as_millis()
))
})??;
self.emitter
.supervisor_escalation(
run_id.clone(),
step_id.clone(),
escalation_target.meerkat_id,
)
.await?;
Ok(())
}
pub async fn force_reset(&self) -> Result<(), MobError> {
let ids = self
.handle
.list_members()
.await
.into_iter()
.map(|entry| entry.meerkat_id)
.collect::<Vec<_>>();
let mut failures = Vec::new();
for id in ids {
if let Err(error) = self.handle.retire(id.clone()).await {
failures.push(format!("{id}: {error}"));
}
}
if !failures.is_empty() {
return Err(MobError::SupervisorEscalation(format!(
"force_reset encountered {} retirement error(s): {}",
failures.len(),
failures.join("; ")
)));
}
Ok(())
}
}