Skip to main content

agent_kernel/
discuss.rs

1use anyhow::Result;
2use futures::future::join_all;
3use tokio::sync::mpsc;
4use tokio_util::sync::CancellationToken;
5
6use crate::agent::Agent;
7use crate::event::AgentEvent;
8use crate::message::{Message, Role};
9use crate::runtime::AgentRuntime;
10
11/// The ONLY orchestration primitive in the kernel.
12///
13/// Runs a fixed-N-round multi-agent discussion, emitting [`AgentEvent`]s via
14/// the provided channel. No convergence logic — runs exactly `rounds` rounds.
15/// Caller controls policy through `rounds` and `cancel`.
16///
17/// The primary agent (`agents[0]`) provides the final summary.
18pub async fn discuss(
19    runtime: &dyn AgentRuntime,
20    agents: &[Agent],
21    topic: &str,
22    rounds: usize,
23    cancel: CancellationToken,
24    events: mpsc::Sender<AgentEvent>,
25) -> Result<String> {
26    let mut history: Vec<Message> = vec![Message {
27        role: Role::User,
28        content: topic.to_string(),
29    }];
30
31    let mut primary_last_response = String::new();
32
33    for round in 1..=rounds {
34        if cancel.is_cancelled() {
35            let _ = events.send(AgentEvent::Cancelled).await;
36            return Ok(String::new());
37        }
38
39        let _ = events
40            .send(AgentEvent::Progress {
41                current_round: round,
42                max_rounds: rounds,
43            })
44            .await;
45
46        // Concurrent agent calls — each gets a snapshot of the shared history.
47        // Snapshot is cloned per-agent so the owned Vec lives inside the async block,
48        // satisfying the single-lifetime constraint of AgentRuntime::respond().
49        let futs: Vec<_> = agents
50            .iter()
51            .map(|agent| {
52                let snap = history.clone();
53                async move {
54                    runtime
55                        .respond(agent, &snap)
56                        .await
57                        .map(|content| (agent.name.clone(), content))
58                }
59            })
60            .collect();
61
62        let results = join_all(futs).await;
63
64        for (agent_name, content) in results.into_iter().flatten() {
65            if agent_name == agents[0].name {
66                primary_last_response = content.clone();
67            }
68
69            let _ = events
70                .send(AgentEvent::Round {
71                    round,
72                    agent_name: agent_name.clone(),
73                    content: content.clone(),
74                })
75                .await;
76
77            history.push(Message {
78                role: Role::User,
79                content: format!("[{agent_name}]: {content}"),
80            });
81        }
82    }
83
84    let _ = events
85        .send(AgentEvent::Summary {
86            content: primary_last_response.clone(),
87        })
88        .await;
89    let _ = events.send(AgentEvent::Completed).await;
90
91    Ok(primary_last_response)
92}