use std::sync::Arc;
use std::time::Duration;
use futures::stream::Stream;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::council::agent::{
Agent, AgentError, ChatMessage, ChatRequest, ChatRole,
};
use crate::council::config::{CouncilConfig, SamplingConfig};
use crate::council::convergence::{self, ConvergenceOutcome};
use crate::council::embedder::Embedder;
use crate::council::event::{
CouncilEvent, CouncilFailure, CouncilStartedSummary, ExpertId,
};
use crate::council::transcript::{Answer, Round};
#[derive(Debug, Clone)]
pub struct CouncilParams {
pub min_rounds: u32,
pub max_rounds: u32,
pub convergence_threshold: f32,
pub min_quorum: u32,
pub system_prompt: Option<String>,
pub sampling: SamplingConfig,
}
impl CouncilParams {
pub fn from_config(cfg: &CouncilConfig) -> Self {
Self {
min_rounds: cfg.min_rounds,
max_rounds: cfg.max_rounds,
convergence_threshold: cfg.convergence_threshold,
min_quorum: cfg.effective_min_quorum(),
system_prompt: cfg.system_prompt.clone(),
sampling: cfg.sampling.clone(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum CouncilError {
#[error("quorum lost in round {round}: required {required}, got {actual}")]
QuorumLost {
round: u32,
required: u32,
actual: u32,
},
#[error("synthesizer failed: {message}")]
SynthesizerFailed { message: String },
#[error("embedder failed: {message}")]
EmbedderFailed { message: String },
#[error("council was cancelled")]
Cancelled,
#[error("internal error: {0}")]
Internal(String),
}
pub struct Council {
experts: Vec<Arc<dyn Agent>>,
synthesizer: Arc<dyn Agent>,
embedder: Arc<dyn Embedder>,
params: CouncilParams,
}
impl Council {
pub fn new(
experts: Vec<Arc<dyn Agent>>,
synthesizer: Arc<dyn Agent>,
embedder: Arc<dyn Embedder>,
params: CouncilParams,
) -> Self {
Self {
experts,
synthesizer,
embedder,
params,
}
}
pub async fn deliberate(&self, prompt: &str) -> Result<String, CouncilError> {
let mut stream = Box::pin(self.deliberate_stream(prompt));
let mut final_answer = String::new();
let mut completed = false;
let mut error: Option<CouncilError> = None;
while let Some(ev) = stream.next().await {
match ev {
CouncilEvent::FinalToken { delta } => final_answer.push_str(&delta),
CouncilEvent::CouncilCompleted { .. } => completed = true,
CouncilEvent::CouncilFailed { error: failure } => {
error = Some(failure_to_error(failure));
}
_ => {}
}
}
if let Some(err) = error {
return Err(err);
}
if !completed {
return Err(CouncilError::Internal(
"stream ended without council_completed".into(),
));
}
Ok(final_answer)
}
pub fn deliberate_stream(
&self,
prompt: &str,
) -> impl Stream<Item = CouncilEvent> + Send + 'static {
let prompt = prompt.to_string();
let experts = self.experts.clone();
let synthesizer = self.synthesizer.clone();
let embedder = self.embedder.clone();
let params = self.params.clone();
let (tx, rx) = mpsc::unbounded_channel::<CouncilEvent>();
tokio::spawn(async move {
run_deliberation(experts, synthesizer, embedder, params, prompt, tx).await;
});
futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
})
}
}
fn failure_to_error(failure: CouncilFailure) -> CouncilError {
match failure {
CouncilFailure::QuorumLost {
round,
required,
actual,
} => CouncilError::QuorumLost {
round,
required,
actual,
},
CouncilFailure::SynthesizerFailed { message } => {
CouncilError::SynthesizerFailed { message }
}
CouncilFailure::EmbedderFailed { message, .. } => {
CouncilError::EmbedderFailed { message }
}
CouncilFailure::Cancelled => CouncilError::Cancelled,
CouncilFailure::ConfigError { message } | CouncilFailure::Internal { message } => {
CouncilError::Internal(message)
}
}
}
async fn run_deliberation(
experts: Vec<Arc<dyn Agent>>,
synthesizer: Arc<dyn Agent>,
embedder: Arc<dyn Embedder>,
params: CouncilParams,
prompt: String,
tx: mpsc::UnboundedSender<CouncilEvent>,
) {
let _ = tx.send(CouncilEvent::CouncilStarted {
config_summary: CouncilStartedSummary {
experts: experts.len() as u32,
synthesizer: synthesizer.id().clone(),
min_rounds: params.min_rounds,
max_rounds: params.max_rounds,
},
});
let mut prev_round: Option<Round> = None;
let mut round_idx: u32 = 0;
let final_round = loop {
if tx.is_closed() {
return;
}
let _ = tx.send(CouncilEvent::RoundStarted { round: round_idx });
let messages = build_round_messages(
round_idx,
&prompt,
params.system_prompt.as_deref(),
prev_round.as_ref(),
);
let round = run_round(round_idx, &experts, &messages, ¶ms.sampling, &tx).await;
let responded = round.responded_count();
let _ = tx.send(CouncilEvent::RoundCompleted {
round: round_idx,
responded: round.responded_ids(),
failed: round.failed_ids(),
});
if responded < params.min_quorum {
let _ = tx.send(CouncilEvent::CouncilFailed {
error: CouncilFailure::QuorumLost {
round: round_idx,
required: params.min_quorum,
actual: responded,
},
});
return;
}
let should_check = round_idx + 1 >= params.min_rounds;
let converged = if should_check {
match check_convergence(&round, embedder.as_ref(), params.convergence_threshold) {
Ok(outcome) => {
let _ = tx.send(CouncilEvent::ConvergenceCheck {
round: round_idx,
min_cosine: outcome.min_cosine,
threshold: params.convergence_threshold,
converged: outcome.converged,
});
outcome.converged
}
Err(message) => {
let _ = tx.send(CouncilEvent::CouncilFailed {
error: CouncilFailure::EmbedderFailed {
round: round_idx,
message,
},
});
return;
}
}
} else {
false
};
let at_max = round_idx + 1 >= params.max_rounds;
if converged || at_max {
break round;
}
prev_round = Some(round);
round_idx += 1;
};
if tx.is_closed() {
return;
}
let _ = tx.send(CouncilEvent::SynthesisStarted {
synthesizer_id: synthesizer.id().clone(),
});
let synth_messages =
build_synthesis_messages(&prompt, params.system_prompt.as_deref(), &final_round);
let req = ChatRequest {
model: synthesizer.model().to_string(),
messages: synth_messages,
sampling: params.sampling.clone(),
request_id: None,
};
let mut stream = synthesizer.chat(req);
let timeout_dur = Duration::from_millis(synthesizer.timeout_ms());
let mut final_text = String::new();
loop {
match timeout(timeout_dur, stream.next()).await {
Ok(Some(Ok(token))) => {
if !token.text.is_empty() {
final_text.push_str(&token.text);
let _ = tx.send(CouncilEvent::FinalToken { delta: token.text });
}
if token.finished {
break;
}
}
Ok(Some(Err(e))) => {
let _ = tx.send(CouncilEvent::CouncilFailed {
error: CouncilFailure::SynthesizerFailed {
message: e.to_string(),
},
});
return;
}
Ok(None) => break,
Err(_) => {
let _ = tx.send(CouncilEvent::CouncilFailed {
error: CouncilFailure::SynthesizerFailed {
message: format!(
"synthesizer {} timed out after {} ms",
synthesizer.id(),
synthesizer.timeout_ms()
),
},
});
return;
}
}
}
let _ = tx.send(CouncilEvent::CouncilCompleted {
rounds: round_idx + 1,
final_answer_length: final_text.len() as u32,
});
}
async fn run_round(
round_idx: u32,
experts: &[Arc<dyn Agent>],
messages: &[ChatMessage],
sampling: &SamplingConfig,
tx: &mpsc::UnboundedSender<CouncilEvent>,
) -> Round {
let mut futures = Vec::with_capacity(experts.len());
for agent in experts {
let agent = agent.clone();
let req = ChatRequest {
model: agent.model().to_string(),
messages: messages.to_vec(),
sampling: sampling.clone(),
request_id: None,
};
let tx = tx.clone();
let timeout_dur = Duration::from_millis(agent.timeout_ms());
futures.push(async move { run_one_expert(round_idx, agent, req, timeout_dur, tx).await });
}
let results = futures::future::join_all(futures).await;
let mut round = Round::new(round_idx);
for outcome in results {
match outcome {
ExpertOutcome::Answer(answer) => round.record_answer(answer),
ExpertOutcome::Failure { id, message } => round.record_failure(id, message),
}
}
round
}
enum ExpertOutcome {
Answer(Answer),
Failure { id: ExpertId, message: String },
}
async fn run_one_expert(
round_idx: u32,
agent: Arc<dyn Agent>,
request: ChatRequest,
timeout_dur: Duration,
tx: mpsc::UnboundedSender<CouncilEvent>,
) -> ExpertOutcome {
let id = agent.id().clone();
let _ = tx.send(CouncilEvent::ExpertStarted {
round: round_idx,
expert_id: id.clone(),
model: agent.model().to_string(),
});
let mut stream = agent.chat(request);
let mut text = String::new();
let mut tokens: u32 = 0;
loop {
match timeout(timeout_dur, stream.next()).await {
Ok(Some(Ok(token))) => {
tokens += 1;
if !token.text.is_empty() {
text.push_str(&token.text);
let _ = tx.send(CouncilEvent::ExpertToken {
round: round_idx,
expert_id: id.clone(),
delta: token.text,
});
}
if token.finished {
break;
}
}
Ok(Some(Err(e))) => {
let msg = match e {
AgentError::Timeout { .. } => "timeout".to_string(),
other => other.to_string(),
};
let _ = tx.send(CouncilEvent::ExpertFailed {
round: round_idx,
expert_id: id.clone(),
error: msg.clone(),
});
return ExpertOutcome::Failure { id, message: msg };
}
Ok(None) => break,
Err(_) => {
let msg = format!("timeout after {} ms", timeout_dur.as_millis());
let _ = tx.send(CouncilEvent::ExpertFailed {
round: round_idx,
expert_id: id.clone(),
error: msg.clone(),
});
return ExpertOutcome::Failure { id, message: msg };
}
}
}
let _ = tx.send(CouncilEvent::ExpertCompleted {
round: round_idx,
expert_id: id.clone(),
tokens,
});
ExpertOutcome::Answer(Answer {
expert_id: id,
text,
tokens,
})
}
pub fn build_round_messages(
round_idx: u32,
prompt: &str,
system_prompt: Option<&str>,
prev_round: Option<&Round>,
) -> Vec<ChatMessage> {
let user_content = if round_idx == 0 {
prompt.to_string()
} else {
let prev = prev_round.expect("refinement round needs prev_round");
let mut s = String::new();
s.push_str("User prompt:\n");
s.push_str(prompt);
s.push_str("\n\nPrevious round, the experts answered:\n");
for (id, ans) in &prev.answers {
s.push_str(&format!("- Expert {}: {}\n", id, ans.text));
}
s.push_str(
"\nConsidering these other perspectives, provide your refined answer to the user prompt. \
Disagree where you think they're wrong; agree explicitly where you concur.",
);
s
};
let mut messages: Vec<ChatMessage> = Vec::with_capacity(2);
if let Some(sys) = system_prompt {
messages.push(ChatMessage {
role: ChatRole::System,
content: sys.to_string(),
});
}
messages.push(ChatMessage {
role: ChatRole::User,
content: user_content,
});
messages
}
fn build_synthesis_messages(
prompt: &str,
system_prompt: Option<&str>,
final_round: &Round,
) -> Vec<ChatMessage> {
let mut s = String::new();
s.push_str("Original user prompt:\n");
s.push_str(prompt);
s.push_str("\n\nFinal-round expert answers:\n");
for (id, ans) in &final_round.answers {
s.push_str(&format!("- Expert {}: {}\n", id, ans.text));
}
s.push_str(
"\nProduce a single coherent final answer to the user prompt that integrates the experts' \
perspectives. Where they disagreed, explain the disagreement and resolve it. \
Do not hedge unnecessarily.",
);
let mut messages = Vec::with_capacity(2);
if let Some(sys) = system_prompt {
messages.push(ChatMessage {
role: ChatRole::System,
content: sys.to_string(),
});
}
messages.push(ChatMessage {
role: ChatRole::User,
content: s,
});
messages
}
fn check_convergence(
round: &Round,
embedder: &dyn Embedder,
threshold: f32,
) -> Result<ConvergenceOutcome, String> {
let texts: Vec<&str> = round.answers.values().map(|a| a.text.as_str()).collect();
convergence::check(embedder, &texts, threshold).map_err(|e| e.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::council::agent::testing::{MockAgent, Script};
use crate::council::embedder::testing::{AlwaysEmbedder, MockEmbedder};
fn params(min: u32, max: u32, threshold: f32, quorum: u32) -> CouncilParams {
CouncilParams {
min_rounds: min,
max_rounds: max,
convergence_threshold: threshold,
min_quorum: quorum,
system_prompt: None,
sampling: SamplingConfig::default(),
}
}
fn arc_agent(id: &str, scripts: Vec<Script>) -> Arc<dyn Agent> {
Arc::new(MockAgent::new(id, "mock-model", scripts))
}
async fn collect_events(stream: impl Stream<Item = CouncilEvent>) -> Vec<CouncilEvent> {
let mut s = Box::pin(stream);
let mut out = Vec::new();
while let Some(ev) = s.next().await {
out.push(ev);
}
out
}
#[tokio::test]
async fn deliberate_runs_min_rounds_then_synthesizes_when_converged() {
let experts = vec![
arc_agent(
"A",
vec![Script::Tokens(vec!["alpha"]), Script::Tokens(vec!["alpha"])],
),
arc_agent(
"B",
vec![Script::Tokens(vec!["beta"]), Script::Tokens(vec!["beta"])],
),
];
let synth = arc_agent("S", vec![Script::Tokens(vec!["FINAL"])]);
let embedder: Arc<dyn Embedder> = Arc::new(AlwaysEmbedder {
vector: vec![1.0, 0.0],
dim: 2,
});
let council = Council::new(experts, synth, embedder, params(2, 4, 0.99, 1));
let answer = council.deliberate("hi").await.expect("ok");
assert_eq!(answer, "FINAL");
}
#[tokio::test]
async fn min_rounds_gate_blocks_early_synthesis_even_when_converged() {
let experts = vec![
arc_agent("A", vec![Script::Tokens(vec!["x"]); 3]),
arc_agent("B", vec![Script::Tokens(vec!["x"]); 3]),
];
let synth = arc_agent("S", vec![Script::Tokens(vec!["F"])]);
let embedder: Arc<dyn Embedder> = Arc::new(AlwaysEmbedder {
vector: vec![1.0, 0.0],
dim: 2,
});
let council = Council::new(experts, synth, embedder, params(3, 5, 0.99, 1));
let evs = collect_events(council.deliberate_stream("hi")).await;
let convergence_checks: Vec<_> = evs
.iter()
.filter(|e| matches!(e, CouncilEvent::ConvergenceCheck { .. }))
.collect();
assert_eq!(convergence_checks.len(), 1, "events: {evs:#?}");
}
#[tokio::test]
async fn runs_to_max_rounds_when_never_converged() {
let experts = vec![
arc_agent(
"A",
vec![
Script::Tokens(vec!["x"]),
Script::Tokens(vec!["x"]),
Script::Tokens(vec!["x"]),
],
),
arc_agent(
"B",
vec![
Script::Tokens(vec!["y"]),
Script::Tokens(vec!["y"]),
Script::Tokens(vec!["y"]),
],
),
];
let synth = arc_agent("S", vec![Script::Tokens(vec!["F"])]);
let scripts: Vec<Vec<f32>> = (0..6)
.map(|i| if i % 2 == 0 { vec![1.0, 0.0] } else { vec![0.0, 1.0] })
.collect();
let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(2, scripts));
let council = Council::new(experts, synth, embedder, params(2, 3, 0.99, 1));
let evs = collect_events(council.deliberate_stream("hi")).await;
let rounds_started: Vec<_> = evs
.iter()
.filter(|e| matches!(e, CouncilEvent::RoundStarted { .. }))
.collect();
assert_eq!(rounds_started.len(), 3, "should run all 3 rounds");
assert!(matches!(
evs.last().unwrap(),
CouncilEvent::CouncilCompleted { .. }
));
}
#[tokio::test]
async fn quorum_loss_terminates_with_failure_event() {
let experts = vec![
arc_agent("A", vec![Script::Error("network down")]),
arc_agent("B", vec![Script::Error("network down")]),
];
let synth = arc_agent("S", vec![Script::Tokens(vec!["F"])]);
let embedder: Arc<dyn Embedder> = Arc::new(AlwaysEmbedder {
vector: vec![1.0, 0.0],
dim: 2,
});
let council = Council::new(experts, synth, embedder, params(2, 4, 0.99, 1));
let result = council.deliberate("hi").await.unwrap_err();
assert!(
matches!(
result,
CouncilError::QuorumLost {
round: 0,
required: 1,
actual: 0
}
),
"got {result:?}"
);
}
#[tokio::test]
async fn synthesizer_failure_surfaces_as_error() {
let experts = vec![
arc_agent(
"A",
vec![Script::Tokens(vec!["alpha"]), Script::Tokens(vec!["alpha"])],
),
arc_agent(
"B",
vec![Script::Tokens(vec!["beta"]), Script::Tokens(vec!["beta"])],
),
];
let synth = arc_agent("S", vec![Script::Error("synth boom")]);
let embedder: Arc<dyn Embedder> = Arc::new(AlwaysEmbedder {
vector: vec![1.0, 0.0],
dim: 2,
});
let council = Council::new(experts, synth, embedder, params(2, 4, 0.99, 1));
let result = council.deliberate("hi").await.unwrap_err();
match result {
CouncilError::SynthesizerFailed { message, .. } => {
assert!(message.contains("synth boom"), "msg: {message}");
}
other => panic!("expected SynthesizerFailed, got {other:?}"),
}
}
#[tokio::test]
async fn refinement_round_prompt_carries_anonymized_peer_answers() {
let mut prev = Round::new(0);
prev.record_answer(Answer {
expert_id: "A".into(),
text: "alpha-answer".into(),
tokens: 1,
});
prev.record_answer(Answer {
expert_id: "B".into(),
text: "beta-answer".into(),
tokens: 1,
});
let messages = build_round_messages(1, "What is X?", None, Some(&prev));
let user_msg = &messages
.iter()
.find(|m| m.role == ChatRole::User)
.unwrap()
.content;
assert!(user_msg.contains("Expert A: alpha-answer"));
assert!(user_msg.contains("Expert B: beta-answer"));
assert!(!user_msg.contains("mock-model"));
}
#[tokio::test]
async fn round_zero_uses_raw_prompt_only() {
let messages = build_round_messages(0, "what is the answer?", None, None);
let user_msg = &messages
.iter()
.find(|m| m.role == ChatRole::User)
.unwrap()
.content;
assert_eq!(user_msg, "what is the answer?");
}
#[tokio::test]
async fn system_prompt_is_prepended_when_set() {
let messages = build_round_messages(0, "X", Some("be careful"), None);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, ChatRole::System);
assert_eq!(messages[0].content, "be careful");
assert_eq!(messages[1].role, ChatRole::User);
}
}