use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use meerkat_core::{AgentExecutionSnapshot, TurnPhase};
use meerkat_mob::{AgentIdentity, MemberState};
use meerkat_mob_mcp::MobMcpState;
use crate::mob_handle_runtime::{
DELEGATE_IDLE_RETIRE_DISABLED_LABEL, DELEGATE_IDLE_RETIRE_SECS_LABEL,
DelegateIdleRetireOverride, ImplicitDelegateRetirementOverrides, MobRuntime,
};
use crate::runtime::RuntimeOptions;
use super::UnifiedRuntime;
impl UnifiedRuntime {
pub(crate) async fn configure_implicit_delegate_retirement(&self, options: &RuntimeOptions) {
let Some(state) = self.mob_runtime.agent_mob_mcp_state() else {
return;
};
let sweep_interval =
Duration::from_millis(options.implicit_delegate_idle_sweep_interval_ms.max(1_000));
let task = tokio::spawn(run_implicit_delegate_retirement(
self.mob_runtime.clone(),
state,
self.mob_runtime.implicit_delegate_retirement_overrides(),
options
.implicit_delegate_idle_retire_secs
.map(Duration::from_secs),
sweep_interval,
));
*self.implicit_delegate_retirement_task.lock().await = Some(task);
}
}
async fn run_implicit_delegate_retirement(
runtime: MobRuntime,
state: Arc<MobMcpState>,
per_delegate_overrides: Option<ImplicitDelegateRetirementOverrides>,
default_idle_after: Option<Duration>,
sweep_interval: Duration,
) {
let primary_mob_id = runtime.handle().mob_id().to_string();
let session_service = state.session_service();
let mut idle_since: BTreeMap<(String, String), Instant> = BTreeMap::new();
loop {
tokio::time::sleep(sweep_interval).await;
let mut seen = BTreeSet::new();
for (mob_id, handle) in state.mob_handles_snapshot().await {
let is_primary_mob = mob_id.as_str() == primary_mob_id;
let is_implicit_mob = state.is_implicit_mob(&mob_id).await;
if !is_primary_mob && !is_implicit_mob {
continue;
}
for member in handle.list_members_observation_snapshot().await {
let identity = member.agent_identity.to_string();
let key = (mob_id.to_string(), identity.clone());
seen.insert(key.clone());
if member.state == MemberState::Retiring {
idle_since.remove(&key);
continue;
}
let per_delegate_override = match per_delegate_overrides.as_ref() {
Some(overrides) => overrides.get(mob_id.as_str(), &identity).await,
None => None,
};
if !idle_retirement_candidate(
is_primary_mob,
is_implicit_mob,
&member.labels,
per_delegate_override,
) {
idle_since.remove(&key);
continue;
}
let Some(idle_after) = delegate_member_idle_retire_after(
&member.labels,
per_delegate_override,
default_idle_after,
) else {
idle_since.remove(&key);
continue;
};
let Some(session_id) = handle
.resolve_bridge_session_id(&member.agent_identity)
.await
else {
idle_since.remove(&key);
continue;
};
let idle = match session_service.execution_snapshot(&session_id).await {
Ok(Some(snapshot)) => delegate_execution_is_idle(&snapshot),
Ok(None) => true,
Err(error) => {
tracing::debug!(
mob_id = %mob_id,
agent_identity = %identity,
session_id = %session_id,
error = %error,
"implicit delegate idle sweep skipped member after snapshot error"
);
false
}
};
if !idle {
idle_since.remove(&key);
continue;
}
let since = idle_since.entry(key.clone()).or_insert_with(Instant::now);
if since.elapsed() < idle_after {
continue;
}
match handle.retire(AgentIdentity::from(identity.as_str())).await {
Ok(()) => {
tracing::info!(
mob_id = %mob_id,
agent_identity = %identity,
idle_after_ms = idle_after.as_millis() as u64,
"retired idle spawned member"
);
}
Err(error) => {
tracing::debug!(
mob_id = %mob_id,
agent_identity = %identity,
error = %error,
"implicit delegate idle retirement failed"
);
}
}
idle_since.remove(&key);
}
}
idle_since.retain(|key, _| seen.contains(key));
}
}
fn delegate_execution_is_idle(snapshot: &AgentExecutionSnapshot) -> bool {
turn_phase_is_idle(snapshot.turn_phase)
}
fn idle_retirement_candidate(
is_primary_mob: bool,
is_implicit_mob: bool,
labels: &std::collections::BTreeMap<String, String>,
per_delegate_override: Option<DelegateIdleRetireOverride>,
) -> bool {
if is_implicit_mob {
return true;
}
is_primary_mob
&& (per_delegate_override.is_some() || labels.contains_key(DELEGATE_IDLE_RETIRE_SECS_LABEL))
}
fn delegate_member_idle_retire_after(
labels: &std::collections::BTreeMap<String, String>,
per_delegate_override: Option<DelegateIdleRetireOverride>,
default_idle_after: Option<Duration>,
) -> Option<Duration> {
match per_delegate_override {
Some(DelegateIdleRetireOverride::Disabled) => return None,
Some(DelegateIdleRetireOverride::Seconds(seconds)) => {
return Some(Duration::from_secs(seconds));
}
None => {}
}
match labels
.get(DELEGATE_IDLE_RETIRE_SECS_LABEL)
.map(String::as_str)
{
Some(value) if value.eq_ignore_ascii_case(DELEGATE_IDLE_RETIRE_DISABLED_LABEL) => None,
Some(value) => value
.parse::<u64>()
.ok()
.map(Duration::from_secs)
.or(default_idle_after),
None => default_idle_after,
}
}
pub(crate) fn turn_phase_is_idle(phase: TurnPhase) -> bool {
matches!(phase, TurnPhase::Ready) || phase.is_terminal()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn implicit_delegate_turn_phase_idle_classification() {
assert!(turn_phase_is_idle(TurnPhase::Ready));
assert!(turn_phase_is_idle(TurnPhase::Completed));
assert!(turn_phase_is_idle(TurnPhase::Failed));
assert!(turn_phase_is_idle(TurnPhase::Cancelled));
assert!(!turn_phase_is_idle(TurnPhase::ApplyingPrimitive));
assert!(!turn_phase_is_idle(TurnPhase::CallingLlm));
assert!(!turn_phase_is_idle(TurnPhase::WaitingForOps));
assert!(!turn_phase_is_idle(TurnPhase::DrainingBoundary));
assert!(!turn_phase_is_idle(TurnPhase::Extracting));
assert!(!turn_phase_is_idle(TurnPhase::ErrorRecovery));
assert!(!turn_phase_is_idle(TurnPhase::Cancelling));
}
#[test]
fn idle_retirement_candidates_require_primary_mob_opt_in() {
let no_labels = std::collections::BTreeMap::new();
let labeled = std::collections::BTreeMap::from([(
DELEGATE_IDLE_RETIRE_SECS_LABEL.to_string(),
"300".to_string(),
)]);
assert!(idle_retirement_candidate(false, true, &no_labels, None,));
assert!(idle_retirement_candidate(true, false, &labeled, None,));
assert!(idle_retirement_candidate(
true,
false,
&no_labels,
Some(DelegateIdleRetireOverride::Seconds(60)),
));
assert!(!idle_retirement_candidate(true, false, &no_labels, None,));
}
#[test]
fn implicit_delegate_idle_retire_label_overrides_runtime_default() {
let labels = std::collections::BTreeMap::from([(
DELEGATE_IDLE_RETIRE_SECS_LABEL.to_string(),
"12".to_string(),
)]);
assert_eq!(
delegate_member_idle_retire_after(&labels, None, Some(Duration::from_mins(5))),
Some(Duration::from_secs(12))
);
}
#[test]
fn implicit_delegate_idle_retire_label_can_disable_member_retirement() {
let labels = std::collections::BTreeMap::from([(
DELEGATE_IDLE_RETIRE_SECS_LABEL.to_string(),
DELEGATE_IDLE_RETIRE_DISABLED_LABEL.to_string(),
)]);
assert_eq!(
delegate_member_idle_retire_after(&labels, None, Some(Duration::from_mins(5))),
None
);
}
#[test]
fn implicit_delegate_idle_retire_call_override_wins_over_label() {
let labels = std::collections::BTreeMap::from([(
DELEGATE_IDLE_RETIRE_SECS_LABEL.to_string(),
"12".to_string(),
)]);
assert_eq!(
delegate_member_idle_retire_after(
&labels,
Some(DelegateIdleRetireOverride::Seconds(9)),
Some(Duration::from_mins(5))
),
Some(Duration::from_secs(9))
);
}
#[test]
fn implicit_delegate_idle_retire_call_override_can_disable_retirement() {
assert_eq!(
delegate_member_idle_retire_after(
&std::collections::BTreeMap::new(),
Some(DelegateIdleRetireOverride::Disabled),
Some(Duration::from_mins(5))
),
None
);
}
#[test]
fn implicit_delegate_idle_retire_invalid_label_uses_runtime_default() {
let labels = std::collections::BTreeMap::from([(
DELEGATE_IDLE_RETIRE_SECS_LABEL.to_string(),
"eventually".to_string(),
)]);
assert_eq!(
delegate_member_idle_retire_after(&labels, None, Some(Duration::from_mins(5))),
Some(Duration::from_mins(5))
);
}
#[test]
fn implicit_delegate_idle_retire_uses_runtime_default_when_unlabeled() {
assert_eq!(
delegate_member_idle_retire_after(
&std::collections::BTreeMap::new(),
None,
Some(Duration::from_mins(5))
),
Some(Duration::from_mins(5))
);
}
}