1use anyhow::Result;
2use futures::future::join_all;
3use tokio::sync::mpsc;
4use tokio_util::sync::CancellationToken;
5
6use crate::agent::Agent;
7use crate::event::AgentEvent;
8use crate::message::{Message, Role};
9use crate::runtime::AgentRuntime;
10
11pub async fn discuss(
19 runtime: &dyn AgentRuntime,
20 agents: &[Agent],
21 topic: &str,
22 rounds: usize,
23 cancel: CancellationToken,
24 events: mpsc::Sender<AgentEvent>,
25) -> Result<String> {
26 let mut history: Vec<Message> = vec![Message {
27 role: Role::User,
28 content: topic.to_string(),
29 }];
30
31 let mut primary_last_response = String::new();
32
33 for round in 1..=rounds {
34 if cancel.is_cancelled() {
35 let _ = events.send(AgentEvent::Cancelled).await;
36 return Ok(String::new());
37 }
38
39 let _ = events
40 .send(AgentEvent::Progress {
41 current_round: round,
42 max_rounds: rounds,
43 })
44 .await;
45
46 let futs: Vec<_> = agents
50 .iter()
51 .map(|agent| {
52 let snap = history.clone();
53 async move {
54 runtime
55 .respond(agent, &snap)
56 .await
57 .map(|content| (agent.name.clone(), content))
58 }
59 })
60 .collect();
61
62 let results = join_all(futs).await;
63
64 for (agent_name, content) in results.into_iter().flatten() {
65 if agent_name == agents[0].name {
66 primary_last_response = content.clone();
67 }
68
69 let _ = events
70 .send(AgentEvent::Round {
71 round,
72 agent_name: agent_name.clone(),
73 content: content.clone(),
74 })
75 .await;
76
77 history.push(Message {
78 role: Role::User,
79 content: format!("[{agent_name}]: {content}"),
80 });
81 }
82 }
83
84 let _ = events
85 .send(AgentEvent::Summary {
86 content: primary_last_response.clone(),
87 })
88 .await;
89 let _ = events.send(AgentEvent::Completed).await;
90
91 Ok(primary_last_response)
92}