use crate::agent::swarm::knowledge::SharedKnowledge;
use crate::agent::swarm::knowledge::types::WorkerInstruction;
impl SharedKnowledge {
pub async fn register_worker_handles(
&self,
agent_id: &str,
cancel: tokio_util::sync::CancellationToken,
budget: crate::agent::guard::IterationBudget,
) -> tokio::sync::mpsc::UnboundedReceiver<WorkerInstruction> {
let (instruction_tx, instruction_rx) = tokio::sync::mpsc::unbounded_channel();
self.worker_handles.write().await.insert(
agent_id.to_string(),
crate::agent::swarm::knowledge::types::WorkerHandles {
cancel,
budget,
instruction_tx,
paused: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
);
instruction_rx
}
pub async fn cancel_worker(&self, agent_id: &str) -> bool {
if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
handles.cancel.cancel();
true
} else {
false
}
}
pub async fn extend_worker(&self, agent_id: &str, extra_iterations: u32) -> bool {
if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
handles.budget.extend(extra_iterations);
true
} else {
false
}
}
pub async fn send_instruction(&self, agent_id: &str, instruction: WorkerInstruction) -> bool {
if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
handles.instruction_tx.send(instruction).is_ok()
} else {
false
}
}
pub async fn set_worker_paused(&self, agent_id: &str, paused: bool) {
if let Some(handles) = self.worker_handles.read().await.get(agent_id) {
handles
.paused
.store(paused, std::sync::atomic::Ordering::Relaxed);
if !paused {
self.resume_notify.notify_waiters();
}
}
}
pub async fn all_workers_paused(&self, agent_ids: &[String]) -> bool {
if agent_ids.is_empty() {
return false;
}
let handles = self.worker_handles.read().await;
agent_ids.iter().all(|id| {
handles
.get(id)
.map(|h| h.paused.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(false)
})
}
pub async fn wait_for_resume(&self) {
self.resume_notify.notified().await;
}
pub async fn remove_worker_handles(&self, agent_id: &str) {
self.worker_handles.write().await.remove(agent_id);
}
pub async fn active_worker_ids(&self) -> Vec<String> {
self.worker_handles.read().await.keys().cloned().collect()
}
}