use super::state::AgentState;
use anyhow::{Result, bail};
use std::collections::HashSet;
use std::time::Duration;
use tokio::sync::{mpsc, watch};
pub(crate) type Watchers = Vec<(String, watch::Receiver<AgentState>)>;
pub(crate) async fn wait_holding(duration: Duration, watchers: Watchers) -> Result<()> {
let (tx, mut rx) = mpsc::channel::<(String, String)>(8);
let mut guards = Vec::new();
for (name, watch) in watchers {
let established = watch.borrow().established_ids();
if !established.is_empty() {
guards.push(tokio::spawn(guard_calls(
name,
watch,
established,
tx.clone(),
)));
}
}
drop(tx);
let outcome = tokio::select! {
_ = tokio::time::sleep(duration) => Ok(()),
msg = rx.recv() => match msg {
Some((agent, reason)) => {
bail!("call on `{agent}` dropped during wait — reason: {reason:?}")
}
None => Ok(()), },
};
for g in guards {
g.abort();
}
outcome
}
async fn guard_calls(
agent: String,
mut rx: watch::Receiver<AgentState>,
established: HashSet<String>,
tx: mpsc::Sender<(String, String)>,
) {
loop {
let (current, reason) = {
let s = rx.borrow();
(s.call_ids(), s.last_call_reason.clone())
};
if established.iter().any(|id| !current.contains(id)) {
let _ = tx.send((agent, reason.unwrap_or_default())).await;
return;
}
if rx.changed().await.is_err() {
return; }
}
}