use log::{debug, info, warn};
use std::sync::Arc;
use tokio::time::{Duration, timeout};
use paladin_core::platform::container::battalion::BattalionError;
use paladin_core::platform::container::battalion::council::{
Council, CouncilMessage, TerminationCondition, TurnStrategy,
};
use paladin_core::platform::container::garrison::{ConversationRole, GarrisonEntry};
use paladin_core::platform::container::paladin::Paladin;
use paladin_core::platform::container::paladin_error::PaladinError;
use paladin_ports::output::garrison_port::GarrisonPort;
use paladin_ports::output::paladin_port::PaladinPort;
use paladin_ports::output::paladin_registry::PaladinRegistry;
#[derive(Debug, Clone)]
pub struct CouncilResult {
pub transcript: Vec<CouncilMessage>,
pub conclusion: Option<String>,
pub rounds_completed: u32,
pub termination_reason: TerminationCondition,
}
pub struct CouncilExecutionService {
#[allow(dead_code)]
paladin_port: Arc<dyn PaladinPort>,
garrison_port: Option<Arc<dyn GarrisonPort>>,
registry: Arc<dyn PaladinRegistry>,
}
impl CouncilExecutionService {
pub fn new(
paladin_port: Arc<dyn PaladinPort>,
garrison_port: Option<Arc<dyn GarrisonPort>>,
registry: Arc<dyn PaladinRegistry>,
) -> Self {
info!("Creating CouncilExecutionService");
Self {
paladin_port,
garrison_port,
registry,
}
}
pub async fn convene(
&self,
council: &Council,
topic: &str,
) -> Result<CouncilResult, BattalionError> {
info!(
"Convening Council '{}' with {} participants on topic: {}",
council.node.name,
council.node.participant_ids.len(),
topic
);
let mut resolved_paladins = std::collections::HashMap::new();
for participant_id in &council.node.participant_ids {
match self.registry.get(participant_id) {
Some(paladin) => {
resolved_paladins.insert(participant_id.clone(), paladin);
}
None => {
return Err(BattalionError::PaladinNotFound(format!(
"Participant '{}' not found in registry",
participant_id
)));
}
}
}
if let Some(ref moderator_id) = council.node.moderator_id
&& !resolved_paladins.contains_key(moderator_id)
{
match self.registry.get(moderator_id) {
Some(paladin) => {
resolved_paladins.insert(moderator_id.clone(), paladin);
}
None => {
return Err(BattalionError::PaladinNotFound(format!(
"Moderator '{}' not found in registry",
moderator_id
)));
}
}
}
let mut transcript: Vec<CouncilMessage> = Vec::new();
let mut current_round = 1u32;
let mut speaker_index = 0usize;
loop {
if self.should_terminate(
&council.node.config.termination_condition,
&transcript,
current_round,
&council.node.config.max_rounds,
) {
info!(
"Council termination condition met: {:?}",
council.node.config.termination_condition
);
break;
}
let next_speaker_id = self.determine_next_speaker(
&council.node.config.turn_strategy,
&council.node.participant_ids,
&council.node.moderator_id,
&mut speaker_index,
&transcript,
)?;
debug!(
"Round {}: Next speaker is {}",
current_round, next_speaker_id
);
let paladin = resolved_paladins.get(&next_speaker_id).ok_or_else(|| {
BattalionError::PaladinNotFound(format!(
"Speaker '{}' not found in resolved paladins",
next_speaker_id
))
})?;
let context = if council.node.config.include_history {
self.format_conversation_history(&transcript, topic)
} else {
topic.to_string()
};
let timeout_duration = Duration::from_secs(300); let speaker_output =
match timeout(timeout_duration, self.execute_speaker(paladin, &context)).await {
Ok(Ok(output)) => output,
Ok(Err(e)) => {
warn!(
"Speaker {} failed in round {}: {}",
next_speaker_id, current_round, e
);
continue;
}
Err(_) => {
warn!(
"Speaker {} timed out in round {}",
next_speaker_id, current_round
);
continue;
}
};
let message =
CouncilMessage::new(next_speaker_id.clone(), speaker_output, current_round);
if let Some(garrison) = &self.garrison_port
&& let Err(e) = self
.store_in_garrison(garrison.as_ref(), &message, topic)
.await
{
warn!("Failed to store message in Garrison: {}", e);
}
transcript.push(message);
if speaker_index >= council.node.participant_ids.len() {
current_round += 1;
speaker_index = 0;
}
}
let conclusion = self.extract_conclusion(&transcript, &council.node.moderator_id);
let rounds_completed = if speaker_index > 0 {
current_round
} else if current_round > 1 {
current_round - 1
} else {
current_round
};
let result = CouncilResult {
transcript,
conclusion,
rounds_completed,
termination_reason: council.node.config.termination_condition.clone(),
};
info!(
"Council '{}' concluded after {} rounds",
council.node.name, result.rounds_completed
);
Ok(result)
}
fn determine_next_speaker(
&self,
strategy: &TurnStrategy,
participants: &[String],
moderator_id: &Option<String>,
speaker_index: &mut usize,
transcript: &[CouncilMessage],
) -> Result<String, BattalionError> {
match strategy {
TurnStrategy::RoundRobin => {
let speaker_id = participants[*speaker_index % participants.len()].clone();
*speaker_index += 1;
Ok(speaker_id)
}
TurnStrategy::ModeratorDirected => {
if let Some(last_message) = transcript.last()
&& let Some(mod_id) = moderator_id
&& &last_message.speaker == mod_id
&& let Some(next_speaker) =
self.parse_next_speaker(&last_message.content, participants)
{
return Ok(next_speaker);
}
if let Some(mod_id) = moderator_id {
Ok(mod_id.clone())
} else {
Err(BattalionError::ValidationError(
"ModeratorDirected strategy requires a moderator".to_string(),
))
}
}
TurnStrategy::Random => {
use rand::seq::SliceRandom;
let mut rng = rand::thread_rng();
participants.choose(&mut rng).cloned().ok_or_else(|| {
BattalionError::ValidationError("No participants available".to_string())
})
}
TurnStrategy::VoluntaryWithTimeout { timeout_ms: _ } => {
warn!("VoluntaryWithTimeout not fully implemented, using RoundRobin");
let speaker_id = participants[*speaker_index % participants.len()].clone();
*speaker_index += 1;
Ok(speaker_id)
}
}
}
fn parse_next_speaker(&self, message: &str, participants: &[String]) -> Option<String> {
let message_lower = message.to_lowercase();
for participant in participants {
let participant_lower = participant.to_lowercase();
if message_lower.contains(&format!("next: {}", participant_lower))
|| message_lower.contains(&format!("@{}", participant_lower))
|| message_lower.contains(&format!("{} please", participant_lower))
{
return Some(participant.clone());
}
}
None
}
fn format_conversation_history(&self, transcript: &[CouncilMessage], topic: &str) -> String {
let mut formatted = format!("Discussion Topic: {}\n\n", topic);
formatted.push_str("Conversation History:\n");
formatted.push_str("=".repeat(60).as_str());
formatted.push('\n');
for message in transcript {
formatted.push_str(&message.format());
formatted.push('\n');
}
formatted.push_str("=".repeat(60).as_str());
formatted.push_str("\n\nPlease provide your response:");
formatted
}
async fn execute_speaker(
&self,
paladin: &Paladin,
context: &str,
) -> Result<String, PaladinError> {
debug!("Executing speaker: {:?}", paladin.node.name);
let result = self.paladin_port.execute(paladin, context).await?;
Ok(result.output)
}
async fn store_in_garrison(
&self,
garrison: &dyn GarrisonPort,
message: &CouncilMessage,
topic: &str,
) -> Result<(), BattalionError> {
let entry = GarrisonEntry::new(
ConversationRole::Assistant,
format!(
"[Council: {}] Round {}: {}",
topic, message.round, message.content
),
);
garrison
.remember(entry)
.await
.map_err(|e| BattalionError::ExecutionError(format!("Garrison error: {}", e)))
}
fn should_terminate(
&self,
condition: &TerminationCondition,
transcript: &[CouncilMessage],
current_round: u32,
max_rounds: &u32,
) -> bool {
match condition {
TerminationCondition::MaxRounds => current_round > *max_rounds,
TerminationCondition::Consensus => self.detect_consensus(transcript),
TerminationCondition::ModeratorDecision => self.detect_moderator_conclusion(transcript),
TerminationCondition::Keyword(keyword) => self.detect_keyword(transcript, keyword),
}
}
fn detect_consensus(&self, transcript: &[CouncilMessage]) -> bool {
let recent_messages: Vec<&CouncilMessage> = transcript.iter().rev().take(3).collect();
let consensus_patterns = [
" consensus",
" agree ",
" agreed ",
"we agree",
"in agreement",
" unanimous",
"i agree",
];
recent_messages.iter().any(|msg| {
let content_lower = format!(" {} ", msg.content.to_lowercase());
consensus_patterns
.iter()
.any(|pattern| content_lower.contains(pattern))
})
}
fn detect_moderator_conclusion(&self, transcript: &[CouncilMessage]) -> bool {
if let Some(last_message) = transcript.last() {
let content_lower = last_message.content.to_lowercase();
content_lower.contains("discussion concluded")
|| content_lower.contains("meeting adjourned")
|| content_lower.contains("we have reached a conclusion")
} else {
false
}
}
fn detect_keyword(&self, transcript: &[CouncilMessage], keyword: &str) -> bool {
if let Some(last_message) = transcript.last() {
last_message
.content
.to_lowercase()
.contains(&keyword.to_lowercase())
} else {
false
}
}
fn extract_conclusion(
&self,
transcript: &[CouncilMessage],
moderator_id: &Option<String>,
) -> Option<String> {
if let Some(mod_id) = moderator_id {
for message in transcript.iter().rev() {
if &message.speaker == mod_id {
return Some(message.content.clone());
}
}
}
transcript.last().map(|msg| msg.content.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use paladin_core::platform::container::paladin::Paladin;
#[test]
fn test_format_conversation_history() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "I think we should proceed", 1),
CouncilMessage::new("expert_2", "I agree with expert_1", 1),
];
let formatted = service.format_conversation_history(&messages, "Test Topic");
assert!(formatted.contains("Test Topic"));
assert!(formatted.contains("expert_1"));
assert!(formatted.contains("I think we should proceed"));
assert!(formatted.contains("expert_2"));
}
#[test]
fn test_detect_consensus() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "I think option A is best", 1),
CouncilMessage::new("expert_2", "I agree with expert_1", 1),
CouncilMessage::new("expert_3", "We have reached consensus on option A", 1),
];
assert!(service.detect_consensus(&messages));
}
#[test]
fn test_detect_consensus_negative() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "I think option A is best", 1),
CouncilMessage::new("expert_2", "I disagree strongly", 1),
CouncilMessage::new("expert_3", "I have concerns about this approach", 1),
];
assert!(!service.detect_consensus(&messages));
}
#[test]
fn test_detect_moderator_conclusion() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "My opinion is X", 1),
CouncilMessage::new("moderator", "Thank you all. Discussion concluded.", 1),
];
assert!(service.detect_moderator_conclusion(&messages));
}
#[test]
fn test_detect_keyword() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "Let's discuss more", 1),
CouncilMessage::new("expert_2", "DONE with discussion", 1),
];
assert!(service.detect_keyword(&messages, "DONE"));
assert!(!service.detect_keyword(&messages, "STOP"));
}
#[test]
fn test_parse_next_speaker() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let participants = vec![
"alice".to_string(),
"bob".to_string(),
"charlie".to_string(),
];
assert_eq!(
service.parse_next_speaker("Next: alice", &participants),
Some("alice".to_string())
);
assert_eq!(
service.parse_next_speaker("@bob please respond", &participants),
Some("bob".to_string())
);
assert_eq!(
service.parse_next_speaker("charlie please provide your input", &participants),
Some("charlie".to_string())
);
assert_eq!(
service.parse_next_speaker("no match here", &participants),
None
);
}
#[test]
fn test_extract_conclusion_with_moderator() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "I think A", 1),
CouncilMessage::new("moderator", "Final decision: We go with A", 1),
CouncilMessage::new("expert_2", "Sounds good", 1),
];
let conclusion = service.extract_conclusion(&messages, &Some("moderator".to_string()));
assert_eq!(conclusion, Some("Final decision: We go with A".to_string()));
}
#[test]
fn test_extract_conclusion_without_moderator() {
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let messages = vec![
CouncilMessage::new("expert_1", "First message", 1),
CouncilMessage::new("expert_2", "Last message", 1),
];
let conclusion = service.extract_conclusion(&messages, &None);
assert_eq!(conclusion, Some("Last message".to_string()));
}
#[tokio::test]
async fn test_council_resolves_participants() {
use paladin_core::base::entity::node::Node;
use paladin_core::platform::container::battalion::council::CouncilConfig;
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let config = CouncilConfig {
turn_strategy: TurnStrategy::RoundRobin,
termination_condition: TerminationCondition::MaxRounds,
max_rounds: 1,
include_history: true,
};
let council_data = paladin_core::platform::container::battalion::council::CouncilData {
name: "test_council".to_string(),
participant_ids: vec!["paladin_1".to_string(), "paladin_2".to_string()],
moderator_id: None,
config,
};
let council = Node::new(council_data, Some("test_council".to_string()));
let result = service.convene(&council, "Test topic").await;
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.rounds_completed >= 1);
}
#[tokio::test]
async fn test_council_paladin_not_found_error() {
use paladin_core::base::entity::node::Node;
use paladin_core::platform::container::battalion::council::CouncilConfig;
let service = CouncilExecutionService::new(
Arc::new(MockPaladinPort),
None,
Arc::new(MockPaladinRegistry::new()),
);
let config = CouncilConfig {
turn_strategy: TurnStrategy::RoundRobin,
termination_condition: TerminationCondition::MaxRounds,
max_rounds: 1,
include_history: true,
};
let council_data = paladin_core::platform::container::battalion::council::CouncilData {
name: "test_council".to_string(),
participant_ids: vec!["nonexistent_paladin".to_string()],
moderator_id: None,
config,
};
let council = Node::new(council_data, Some("test_council".to_string()));
let result = service.convene(&council, "Test topic").await;
assert!(result.is_err());
match result.unwrap_err() {
BattalionError::PaladinNotFound(msg) => {
assert!(msg.contains("nonexistent_paladin"));
}
_ => panic!("Expected PaladinNotFound error"),
}
}
struct MockPaladinPort;
#[async_trait::async_trait]
impl PaladinPort for MockPaladinPort {
async fn execute(
&self,
_paladin: &Paladin,
_input: &str,
) -> Result<paladin_ports::output::paladin_port::PaladinResult, PaladinError> {
Ok(paladin_ports::output::paladin_port::PaladinResult {
output: "Mock response".to_string(),
token_count: 100,
execution_time_ms: 1000,
loop_count: 1,
stop_reason: paladin_ports::output::paladin_port::StopReason::Completed,
..Default::default()
})
}
async fn execute_stream(
&self,
_paladin: &Paladin,
_input: &str,
) -> Result<paladin_ports::output::paladin_port::PaladinStream, PaladinError> {
unimplemented!("Streaming not needed for Council tests")
}
fn validate(&self, _paladin: &Paladin) -> Result<(), PaladinError> {
Ok(())
}
}
struct MockPaladinRegistry {
paladins: std::sync::RwLock<std::collections::HashMap<String, Arc<Paladin>>>,
}
impl MockPaladinRegistry {
fn new() -> Self {
use paladin_core::base::entity::node::Node;
use paladin_core::platform::container::paladin::{
MaxLoops, PaladinData, PaladinStatus,
};
let mut paladins = std::collections::HashMap::new();
for i in 1..=2 {
let data = PaladinData {
system_prompt: format!("Test Paladin {}", i),
name: format!("paladin_{}", i),
user_name: "test_user".to_string(),
model: "gpt-4".to_string(),
temperature: 0.7,
max_loops: MaxLoops::Fixed(3),
stop_words: vec![],
status: PaladinStatus::Idle,
vision_enabled: false,
autonomous_planning: false,
autonomous_prompts: false,
agent_description: String::new(),
dynamic_temperature: false,
};
let paladin = Node::new(data, Some(format!("paladin_{}", i)));
paladins.insert(format!("paladin_{}", i), Arc::new(paladin));
}
Self {
paladins: std::sync::RwLock::new(paladins),
}
}
}
impl paladin_ports::output::paladin_registry::PaladinRegistry for MockPaladinRegistry {
fn register(
&self,
id: String,
paladin: Arc<Paladin>,
) -> Result<(), paladin_ports::output::paladin_registry::RegistryError> {
let mut paladins = self.paladins.write().unwrap();
if paladins.contains_key(&id) {
return Err(
paladin_ports::output::paladin_registry::RegistryError::DuplicateId(id),
);
}
paladins.insert(id, paladin);
Ok(())
}
fn get(&self, id: &str) -> Option<Arc<Paladin>> {
let paladins = self.paladins.read().unwrap();
paladins.get(id).cloned()
}
fn contains(&self, id: &str) -> bool {
let paladins = self.paladins.read().unwrap();
paladins.contains_key(id)
}
fn list_ids(&self) -> Vec<String> {
let paladins = self.paladins.read().unwrap();
paladins.keys().cloned().collect()
}
}
}