use std::sync::Arc;
use crate::coordination::consensus::Consensus;
use crate::coordination::shared_memory::{MemoryKey, SharedMemory};
use crate::coordination::work_queue::{WorkQueue, WorkResult};
use crate::lifecycle::AgentHandle;
pub struct CoordinatedGroupBuilder {
handles: Vec<AgentHandle>,
work_queue: Option<Arc<WorkQueue>>,
shared_memory: Option<Arc<SharedMemory>>,
consensus: Option<Arc<Consensus>>,
}
impl CoordinatedGroupBuilder {
fn new() -> Self {
Self {
handles: Vec::new(),
work_queue: None,
shared_memory: None,
consensus: None,
}
}
pub fn handle(mut self, handle: AgentHandle) -> Self {
self.handles.push(handle);
self
}
pub fn work_queue(mut self, queue: Arc<WorkQueue>) -> Self {
self.work_queue = Some(queue);
self
}
pub fn shared_memory(mut self, memory: Arc<SharedMemory>) -> Self {
self.shared_memory = Some(memory);
self
}
pub fn consensus(mut self, consensus: Arc<Consensus>) -> Self {
self.consensus = Some(consensus);
self
}
pub fn build(self) -> CoordinatedGroup {
CoordinatedGroup {
handles: self.handles,
work_queue: self.work_queue,
shared_memory: self.shared_memory,
consensus: self.consensus,
}
}
}
pub struct CoordinatedGroup {
handles: Vec<AgentHandle>,
work_queue: Option<Arc<WorkQueue>>,
shared_memory: Option<Arc<SharedMemory>>,
consensus: Option<Arc<Consensus>>,
}
impl CoordinatedGroup {
pub fn builder() -> CoordinatedGroupBuilder {
CoordinatedGroupBuilder::new()
}
pub fn len(&self) -> usize {
self.handles.len()
}
pub fn is_empty(&self) -> bool {
self.handles.is_empty()
}
pub async fn fan_out(
&self,
work_type: &str,
payloads: Vec<serde_json::Value>,
) -> Vec<WorkResult> {
let queue = match &self.work_queue {
Some(q) => q,
None => return Vec::new(),
};
for payload in &payloads {
queue.enqueue(work_type, payload.clone(), 0);
}
let mut join_handles = Vec::new();
for handle in &self.handles {
let queue = Arc::clone(queue);
let handle = handle.clone();
join_handles.push(tokio::spawn(async move {
let item = queue.claim(handle.agent_id(), None);
if let Some(item) = item {
queue.start(&item.id).ok();
let prompt = format!(
"Complete this task:\n{}",
serde_json::to_string_pretty(&item.payload).unwrap_or_default()
);
let start = std::time::Instant::now();
match handle.run(prompt).await {
Ok((response, _)) => {
let result = WorkResult {
success: true,
content: response.content,
error: None,
duration_ms: start.elapsed().as_millis() as u64,
tokens_used: None,
};
queue.complete(&item.id, result.clone()).ok();
Some(result)
}
Err(e) => {
let result = WorkResult {
success: false,
content: String::new(),
error: Some(e.to_string()),
duration_ms: start.elapsed().as_millis() as u64,
tokens_used: None,
};
queue.complete(&item.id, result.clone()).ok();
Some(result)
}
}
} else {
None
}
}));
}
let mut results = Vec::new();
for jh in join_handles {
if let Ok(Some(result)) = jh.await {
results.push(result);
}
}
results
}
pub async fn vote(
&self,
question: &str,
options: &[&str],
) -> Option<crate::coordination::consensus::VoteResult> {
let consensus = self.consensus.as_ref()?;
let vote_id = format!("vote-{}", uuid::Uuid::new_v4());
let voter_ids: Vec<String> = self
.handles
.iter()
.map(|h| h.agent_id().to_string())
.collect();
consensus.start(&vote_id, voter_ids, 0.5);
for handle in &self.handles {
let options_str = options.join(", ");
let prompt = format!(
"Choose one option for this question. Reply with ONLY the option name, nothing else.\n\nQuestion: {}\nOptions: {}",
question, options_str
);
if let Ok((response, _)) = handle.run(prompt).await {
let choice = response.content.trim().to_string();
let matched = options
.iter()
.find(|o| choice.contains(**o))
.map(|o| o.to_string())
.unwrap_or(choice);
consensus.vote(&vote_id, handle.agent_id(), matched).ok();
}
}
consensus.status(&vote_id)
}
pub async fn map_reduce(
&self,
work_type: &str,
payloads: Vec<serde_json::Value>,
reduce_key: &MemoryKey,
) -> anyhow::Result<Vec<WorkResult>> {
let memory = self.shared_memory.as_ref();
let results = self.fan_out(work_type, payloads).await;
if let Some(memory) = memory {
let results_json = serde_json::to_string(&results)?;
memory.write(
reduce_key,
serde_json::json!(results_json),
"coordinator",
None,
)?;
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_creates_group() {
let group = CoordinatedGroup::builder().build();
assert!(group.is_empty());
assert_eq!(group.len(), 0);
}
}