use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use crate::context::{LLMContext, Message};
use crate::error::Result;
use super::task::{TaskContext, TaskHandle, DEFAULT_READY_TIMEOUT};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FenceOutcome {
Staged,
Quarantined,
}
struct InFlight {
epoch: u64,
target: String,
}
pub struct AgenticCoordinator {
source: String,
context: Arc<Mutex<LLMContext>>,
task_ctx: Arc<TaskContext>,
current_epoch: AtomicU64,
dispatched: Mutex<HashMap<String, InFlight>>,
}
impl AgenticCoordinator {
pub fn new(
source: impl Into<String>,
context: Arc<Mutex<LLMContext>>,
task_ctx: Arc<TaskContext>,
) -> Self {
let epoch = context.lock().unwrap().epoch();
Self {
source: source.into(),
context,
task_ctx,
current_epoch: AtomicU64::new(epoch),
dispatched: Mutex::new(HashMap::new()),
}
}
pub fn current_epoch(&self) -> u64 {
self.current_epoch.load(Ordering::Acquire)
}
pub fn open_turn(&self) -> u64 {
let epoch = self.context.lock().unwrap().epoch();
self.current_epoch.store(epoch, Ordering::Release);
epoch
}
pub async fn dispatch(
&self,
target: &str,
task_name: Option<String>,
payload: Option<serde_json::Value>,
) -> Result<TaskHandle> {
let epoch = self.current_epoch();
let handle = self
.task_ctx
.dispatch_fenced(
&self.source,
target,
task_name,
payload,
epoch,
Some(DEFAULT_READY_TIMEOUT),
)
.await?;
self.dispatched.lock().unwrap().insert(
handle.task_id.clone(),
InFlight {
epoch,
target: target.to_string(),
},
);
Ok(handle)
}
pub fn is_current(&self, task_id: &str) -> bool {
let current = self.current_epoch();
self.dispatched
.lock()
.unwrap()
.get(task_id)
.is_some_and(|f| f.epoch == current)
}
pub fn stage_result(&self, task_id: &str, msg: Message) -> FenceOutcome {
let current = self.current_epoch();
let entry = self.dispatched.lock().unwrap().remove(task_id);
match entry {
Some(f) if f.epoch == current => {
self.context.lock().unwrap().stage_message(msg);
FenceOutcome::Staged
}
Some(f) => {
log::info!(
"coordinator '{}': quarantined result for task {} (dispatched epoch {}, current {})",
self.source,
task_id,
f.epoch,
current
);
FenceOutcome::Quarantined
}
None => {
log::debug!(
"coordinator '{}': result for unknown/resolved task {} dropped",
self.source,
task_id
);
FenceOutcome::Quarantined
}
}
}
pub fn commit(&self) -> usize {
self.context.lock().unwrap().commit()
}
pub fn in_flight(&self) -> usize {
self.dispatched.lock().unwrap().len()
}
pub async fn on_interruption(&self, reason: Option<String>) {
let in_flight: Vec<(String, String)> = {
let mut guard = self.dispatched.lock().unwrap();
guard
.drain()
.map(|(task_id, f)| (task_id, f.target))
.collect()
};
for (task_id, target) in in_flight {
self.task_ctx
.cancel_task(&self.source, &target, task_id, reason.clone())
.await;
}
self.context.lock().unwrap().rollback();
}
}