use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use serde_json::{Value, json};
use super::PromptTemplate;
use crate::application::services::orchestration::types::{
ContentProcessingResult, ContentProcessor, OrchestratorError,
};
use crate::core::platform::container::battalion::formation::Formation;
use crate::core::platform::container::battalion::phalanx::Phalanx;
use crate::core::platform::container::content::ContentItem;
use crate::core::platform::container::orchestration_context::OrchestrationContext;
use paladin_battalion::formation_service::FormationExecutionService;
use paladin_battalion::phalanx_service::PhalanxExecutionService;
use paladin_ports::output::paladin_port::PaladinPort;
#[derive(Debug, Clone)]
pub enum BattalionPattern {
Formation(Formation),
Phalanx(Phalanx),
}
impl BattalionPattern {
fn as_str(&self) -> &'static str {
match self {
BattalionPattern::Formation(_) => "formation",
BattalionPattern::Phalanx(_) => "phalanx",
}
}
}
pub struct BattalionContentProcessor {
name: String,
paladin_port: Arc<dyn PaladinPort>,
pattern: BattalionPattern,
prompt_template: PromptTemplate,
}
impl BattalionContentProcessor {
pub fn new(paladin_port: Arc<dyn PaladinPort>, pattern: BattalionPattern) -> Self {
Self {
name: "BattalionContentProcessor".to_string(),
paladin_port,
pattern,
prompt_template: PromptTemplate::default(),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
pub fn with_prompt_template(mut self, template: PromptTemplate) -> Self {
self.prompt_template = template;
self
}
}
impl Clone for BattalionContentProcessor {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
paladin_port: Arc::clone(&self.paladin_port),
pattern: self.pattern.clone(),
prompt_template: self.prompt_template.clone(),
}
}
}
#[async_trait]
impl ContentProcessor for BattalionContentProcessor {
fn name(&self) -> &str {
&self.name
}
async fn process_content(
&self,
content: ContentItem,
_context: OrchestrationContext,
) -> Result<ContentProcessingResult, OrchestratorError> {
let content_id = content.uuid();
let start = Instant::now();
let prompt = self.prompt_template.render(&content);
let battalion_result = match &self.pattern {
BattalionPattern::Formation(formation) => {
let service = FormationExecutionService::new(Arc::clone(&self.paladin_port));
service
.execute(formation, &prompt)
.await
.map_err(|e| OrchestratorError::ServiceError(e.to_string()))?
}
BattalionPattern::Phalanx(phalanx) => {
let service = PhalanxExecutionService::new(Arc::clone(&self.paladin_port));
service
.execute(phalanx, &prompt)
.await
.map_err(|e| OrchestratorError::ServiceError(e.to_string()))?
}
};
let processing_time_ms = start.elapsed().as_millis() as u64;
let mut agent_outputs = serde_json::Map::new();
let agents: Vec<String> = battalion_result
.paladin_results
.iter()
.enumerate()
.map(|(idx, r)| {
let key = format!("agent_{idx}");
agent_outputs.insert(key.clone(), json!(r.output));
key
})
.collect();
let result_data = json!({
"final_output": battalion_result.final_output,
"agent_outputs": Value::Object(agent_outputs),
});
let mut metadata: HashMap<String, Value> = HashMap::new();
metadata.insert("pattern".to_string(), json!(self.pattern.as_str()));
metadata.insert(
"battalion_name".to_string(),
json!(battalion_result.battalion_name),
);
metadata.insert("agents".to_string(), json!(agents));
metadata.insert(
"paladin_success_count".to_string(),
json!(battalion_result.paladin_success_count),
);
metadata.insert(
"paladin_failure_count".to_string(),
json!(battalion_result.paladin_failure_count),
);
metadata.insert(
"total_tokens".to_string(),
json!(battalion_result.total_tokens),
);
let success = battalion_result.paladin_failure_count == 0;
Ok(ContentProcessingResult {
content_id,
processor_name: self.name.clone(),
processing_time_ms,
success,
result_data: Some(result_data),
error: None,
metadata,
})
}
fn clone_box(&self) -> Result<Box<dyn ContentProcessor>, OrchestratorError> {
Ok(Box::new(self.clone()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::base::entity::node::Node;
use crate::core::platform::container::battalion::BattalionConfig;
use crate::core::platform::container::content::{ContentItem, ContentType, TextContent};
use crate::core::platform::container::paladin::{Paladin, PaladinData};
use paladin_ports::output::paladin_port::{PaladinResult, PaladinStream};
use std::sync::Mutex;
struct MockPaladinPort {
responses: Mutex<Vec<String>>,
inputs: Arc<Mutex<Vec<String>>>,
}
impl MockPaladinPort {
fn new(responses: Vec<&str>, inputs: Arc<Mutex<Vec<String>>>) -> Self {
Self {
responses: Mutex::new(responses.into_iter().map(String::from).collect()),
inputs,
}
}
}
#[async_trait]
impl PaladinPort for MockPaladinPort {
async fn execute(
&self,
_paladin: &Paladin,
input: &str,
) -> Result<PaladinResult, crate::application::services::paladin::error::PaladinError>
{
self.inputs.lock().unwrap().push(input.to_string());
let mut responses = self.responses.lock().unwrap();
let output = if responses.is_empty() {
"default".to_string()
} else {
responses.remove(0)
};
Ok(PaladinResult {
output,
..Default::default()
})
}
async fn execute_stream(
&self,
_paladin: &Paladin,
_input: &str,
) -> Result<PaladinStream, crate::application::services::paladin::error::PaladinError>
{
unimplemented!()
}
fn validate(
&self,
_paladin: &Paladin,
) -> Result<(), crate::application::services::paladin::error::PaladinError> {
Ok(())
}
}
fn test_paladin(name: &str) -> Paladin {
let data = PaladinData {
system_prompt: "test".to_string(),
name: name.to_string(),
model: "mock".to_string(),
..Default::default()
};
Node::new(data, Some(name.to_string()))
}
fn text_item(body: &str) -> ContentItem {
let content = TextContent::new(None, Some(body.to_string())).expect("text content");
ContentItem::new_with_title(ContentType::Text(content), "Doc".to_string())
.expect("content item")
}
fn context() -> OrchestrationContext {
OrchestrationContext::new("test".to_string(), "test".to_string())
}
#[tokio::test]
async fn formation_runs_agents_sequentially_into_one_result() {
let inputs = Arc::new(Mutex::new(Vec::new()));
let port = Arc::new(MockPaladinPort::new(
vec!["summary", "classification"],
Arc::clone(&inputs),
));
let formation = Formation::new(
vec![test_paladin("summarizer"), test_paladin("classifier")],
BattalionConfig::new("pipeline"),
)
.expect("formation");
let processor =
BattalionContentProcessor::new(port, BattalionPattern::Formation(formation));
let item = text_item("Original article text.");
let id = item.uuid();
let result = processor
.process_content(item, context())
.await
.expect("processing succeeds");
assert!(result.success);
assert_eq!(result.content_id, id);
assert_eq!(result.metadata["pattern"], json!("formation"));
let recorded = inputs.lock().unwrap();
assert_eq!(recorded.len(), 2);
assert!(recorded[1].contains("summary"));
let data = result.result_data.unwrap();
assert_eq!(data["final_output"], json!("classification"));
}
#[tokio::test]
async fn phalanx_runs_agents_in_parallel_and_merges_outputs() {
let inputs = Arc::new(Mutex::new(Vec::new()));
let port = Arc::new(MockPaladinPort::new(
vec!["analyst_a", "analyst_b"],
Arc::clone(&inputs),
));
let phalanx = Phalanx::new(
vec![test_paladin("analyst_a"), test_paladin("analyst_b")],
BattalionConfig::new("analysts"),
)
.expect("phalanx");
let processor = BattalionContentProcessor::new(port, BattalionPattern::Phalanx(phalanx));
let item = text_item("Some content to analyze.");
let result = processor
.process_content(item, context())
.await
.expect("processing succeeds");
assert!(result.success);
assert_eq!(result.metadata["pattern"], json!("phalanx"));
let recorded = inputs.lock().unwrap();
assert_eq!(recorded.len(), 2);
assert_eq!(recorded[0], recorded[1]);
let data = result.result_data.unwrap();
let outputs = data["agent_outputs"].as_object().unwrap();
assert_eq!(outputs.len(), 2);
}
}