Skip to main content

cortexai_crew/
orchestra.rs

1//! Multi-Agent Orchestra with Parallel Execution and Perspective Synthesis
2//!
3//! Inspired by make-it-heavy's approach to deploying multiple specialized agents
4//! in parallel for comprehensive, multi-perspective analysis.
5//!
6//! # Features
7//! - **Parallel or Sequential Execution**: Choose execution mode based on needs
8//! - **Perspective Synthesis**: Merge multiple agent responses intelligently
9//! - **Dynamic Question Decomposition**: Break complex queries into sub-questions
10//! - **Specialized Agent Roles**: Each agent can have a distinct perspective/expertise
11//!
12//! # Example
13//! ```ignore
14//! use cortexai_crew::orchestra::*;
15//!
16//! let orchestra = Orchestra::builder()
17//!     .add_agent(AgentPerspective::new("analyst", "You are a data analyst..."))
18//!     .add_agent(AgentPerspective::new("critic", "You are a critical thinker..."))
19//!     .add_agent(AgentPerspective::new("creative", "You are a creative problem solver..."))
20//!     .with_synthesizer(SynthesisStrategy::Comprehensive)
21//!     .with_execution_mode(ExecutionMode::Parallel)
22//!     .build();
23//!
24//! let result = orchestra.analyze("How can we improve user engagement?").await?;
25//! ```
26
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use cortexai_core::errors::CrewError;
34use cortexai_providers::LLMBackend;
35
36/// Execution mode for the orchestra
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38pub enum ExecutionMode {
39    /// Execute agents one after another (default, lower resource usage)
40    #[default]
41    Sequential,
42    /// Execute all agents simultaneously (faster, higher resource usage)
43    Parallel,
44    /// Execute in waves - groups of agents run in parallel sequentially
45    Wave { wave_size: usize },
46}
47
48/// Agent perspective/role configuration
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct AgentPerspective {
51    /// Unique identifier for this perspective
52    pub id: String,
53    /// Display name for the perspective
54    pub name: String,
55    /// System prompt defining the agent's role and expertise
56    pub system_prompt: String,
57    /// Optional specific model to use for this agent
58    pub model: Option<String>,
59    /// Temperature for this agent's responses (creativity level)
60    pub temperature: f32,
61    /// Priority weight for synthesis (higher = more influence)
62    pub weight: f32,
63    /// Tags for categorization
64    pub tags: Vec<String>,
65}
66
67impl AgentPerspective {
68    pub fn new(id: impl Into<String>, system_prompt: impl Into<String>) -> Self {
69        let id = id.into();
70        Self {
71            name: id.clone(),
72            id,
73            system_prompt: system_prompt.into(),
74            model: None,
75            temperature: 0.7,
76            weight: 1.0,
77            tags: Vec::new(),
78        }
79    }
80
81    pub fn with_name(mut self, name: impl Into<String>) -> Self {
82        self.name = name.into();
83        self
84    }
85
86    pub fn with_model(mut self, model: impl Into<String>) -> Self {
87        self.model = Some(model.into());
88        self
89    }
90
91    pub fn with_temperature(mut self, temperature: f32) -> Self {
92        self.temperature = temperature;
93        self
94    }
95
96    pub fn with_weight(mut self, weight: f32) -> Self {
97        self.weight = weight;
98        self
99    }
100
101    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
102        self.tags = tags;
103        self
104    }
105}
106
107/// Response from a single agent perspective
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct PerspectiveResponse {
110    /// Which perspective generated this response
111    pub perspective_id: String,
112    /// The perspective's name
113    pub perspective_name: String,
114    /// The actual response content
115    pub content: String,
116    /// Time taken to generate response
117    pub latency_ms: u64,
118    /// Confidence score (0.0 - 1.0) if available
119    pub confidence: Option<f32>,
120    /// Key points extracted from the response
121    pub key_points: Vec<String>,
122    /// Any metadata from the response
123    pub metadata: HashMap<String, serde_json::Value>,
124}
125
126/// Synthesized result from multiple perspectives
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct SynthesizedResponse {
129    /// The final synthesized response
130    pub synthesis: String,
131    /// Individual perspective responses
132    pub perspectives: Vec<PerspectiveResponse>,
133    /// Sub-questions that were analyzed (if decomposition was used)
134    pub sub_questions: Vec<SubQuestion>,
135    /// Total time for all processing
136    pub total_latency_ms: u64,
137    /// Execution mode used
138    pub execution_mode: ExecutionMode,
139    /// Synthesis strategy used
140    pub synthesis_strategy: SynthesisStrategy,
141    /// Agreement score between perspectives (0.0 - 1.0)
142    pub agreement_score: Option<f32>,
143    /// Areas of consensus
144    pub consensus_points: Vec<String>,
145    /// Areas of disagreement
146    pub divergence_points: Vec<String>,
147}
148
149/// Strategy for synthesizing multiple perspectives
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
151pub enum SynthesisStrategy {
152    /// Comprehensive synthesis including all perspectives
153    #[default]
154    Comprehensive,
155    /// Consensus-based - focus on agreed points
156    Consensus,
157    /// Best-of - select the highest quality response
158    BestOf,
159    /// Weighted average based on perspective weights
160    Weighted,
161    /// Debate-style - present contrasting views
162    Debate,
163    /// No synthesis - return raw perspectives only
164    None,
165}
166
167/// A decomposed sub-question
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct SubQuestion {
170    /// The sub-question text
171    pub question: String,
172    /// Which perspectives should answer this
173    pub target_perspectives: Vec<String>,
174    /// Responses to this sub-question
175    pub responses: Vec<PerspectiveResponse>,
176}
177
178/// Trait for custom synthesis implementations
179#[async_trait]
180pub trait Synthesizer: Send + Sync {
181    /// Synthesize multiple perspective responses into a final response
182    async fn synthesize(
183        &self,
184        query: &str,
185        responses: Vec<PerspectiveResponse>,
186        backend: &dyn LLMBackend,
187    ) -> Result<SynthesizedResponse, CrewError>;
188}
189
190/// Trait for question decomposition
191#[async_trait]
192pub trait QuestionDecomposer: Send + Sync {
193    /// Decompose a complex query into sub-questions
194    async fn decompose(
195        &self,
196        query: &str,
197        perspectives: &[AgentPerspective],
198        backend: &dyn LLMBackend,
199    ) -> Result<Vec<SubQuestion>, CrewError>;
200}
201
202/// Default synthesizer implementation
203pub struct DefaultSynthesizer {
204    strategy: SynthesisStrategy,
205}
206
207impl DefaultSynthesizer {
208    pub fn new(strategy: SynthesisStrategy) -> Self {
209        Self { strategy }
210    }
211
212    fn extract_key_points(content: &str) -> Vec<String> {
213        // Simple extraction: split by sentences and take first few
214        content
215            .split(['.', '\n'])
216            .filter(|s| s.trim().len() > 20)
217            .take(5)
218            .map(|s| s.trim().to_string())
219            .collect()
220    }
221
222    fn find_consensus(responses: &[PerspectiveResponse]) -> Vec<String> {
223        // Simple consensus: find common themes/keywords
224        let mut word_counts: HashMap<String, usize> = HashMap::new();
225
226        for response in responses {
227            let words: std::collections::HashSet<_> = response
228                .content
229                .to_lowercase()
230                .split_whitespace()
231                .filter(|w| w.len() > 4)
232                .map(|s| s.to_string())
233                .collect();
234
235            for word in words {
236                *word_counts.entry(word).or_insert(0) += 1;
237            }
238        }
239
240        // Find words that appear in most responses
241        let threshold = (responses.len() as f32 * 0.6).ceil() as usize;
242        word_counts
243            .into_iter()
244            .filter(|(_, count)| *count >= threshold)
245            .map(|(word, _)| word)
246            .take(10)
247            .collect()
248    }
249
250    fn calculate_agreement(responses: &[PerspectiveResponse]) -> f32 {
251        if responses.len() < 2 {
252            return 1.0;
253        }
254
255        // Simple agreement: based on common key points
256        let all_points: Vec<_> = responses.iter().flat_map(|r| &r.key_points).collect();
257        let unique_points: std::collections::HashSet<_> = all_points.iter().collect();
258
259        if unique_points.is_empty() {
260            return 0.5;
261        }
262
263        // More overlap = higher agreement
264        1.0 - (unique_points.len() as f32 / all_points.len().max(1) as f32)
265    }
266}
267
268#[async_trait]
269impl Synthesizer for DefaultSynthesizer {
270    async fn synthesize(
271        &self,
272        query: &str,
273        responses: Vec<PerspectiveResponse>,
274        backend: &dyn LLMBackend,
275    ) -> Result<SynthesizedResponse, CrewError> {
276        use cortexai_core::{LLMMessage, MessageRole};
277
278        let start = Instant::now();
279
280        if responses.is_empty() {
281            return Err(CrewError::ExecutionFailed(
282                "No responses to synthesize".to_string(),
283            ));
284        }
285
286        let synthesis = match self.strategy {
287            SynthesisStrategy::None => {
288                // No synthesis, just format responses
289                responses
290                    .iter()
291                    .map(|r| format!("## {}\n{}", r.perspective_name, r.content))
292                    .collect::<Vec<_>>()
293                    .join("\n\n")
294            }
295            SynthesisStrategy::BestOf => {
296                // Select best response based on length and confidence
297                responses
298                    .iter()
299                    .max_by(|a, b| {
300                        let score_a = a.content.len() as f32 * a.confidence.unwrap_or(0.5);
301                        let score_b = b.content.len() as f32 * b.confidence.unwrap_or(0.5);
302                        score_a.partial_cmp(&score_b).unwrap()
303                    })
304                    .map(|r| r.content.clone())
305                    .unwrap_or_default()
306            }
307            _ => {
308                // Use LLM for sophisticated synthesis
309                let synthesis_prompt = self.build_synthesis_prompt(query, &responses);
310
311                let messages = vec![
312                    LLMMessage {
313                        role: MessageRole::System,
314                        content: self.get_synthesis_system_prompt(),
315                        tool_calls: None,
316                        tool_call_id: None,
317                        name: None,
318                    },
319                    LLMMessage {
320                        role: MessageRole::User,
321                        content: synthesis_prompt,
322                        tool_calls: None,
323                        tool_call_id: None,
324                        name: None,
325                    },
326                ];
327
328                let inference = backend
329                    .infer(&messages, &[], 0.3)
330                    .await
331                    .map_err(|e| CrewError::ExecutionFailed(e.to_string()))?;
332
333                inference.content
334            }
335        };
336
337        let consensus_points = Self::find_consensus(&responses);
338        let agreement_score = Self::calculate_agreement(&responses);
339
340        Ok(SynthesizedResponse {
341            synthesis,
342            perspectives: responses,
343            sub_questions: Vec::new(),
344            total_latency_ms: start.elapsed().as_millis() as u64,
345            execution_mode: ExecutionMode::Sequential, // Will be set by orchestra
346            synthesis_strategy: self.strategy,
347            agreement_score: Some(agreement_score),
348            consensus_points,
349            divergence_points: Vec::new(),
350        })
351    }
352}
353
354impl DefaultSynthesizer {
355    fn get_synthesis_system_prompt(&self) -> String {
356        match self.strategy {
357            SynthesisStrategy::Comprehensive => {
358                "You are an expert synthesizer. Your task is to combine multiple expert perspectives \
359                into a comprehensive, well-structured response. Include insights from all perspectives, \
360                highlight areas of agreement, and note any important differences in viewpoint. \
361                Provide a balanced, thorough analysis.".to_string()
362            }
363            SynthesisStrategy::Consensus => {
364                "You are a consensus builder. Your task is to identify and articulate the points \
365                where multiple experts agree. Focus on shared conclusions and common ground. \
366                Minimize discussion of disagreements unless critical.".to_string()
367            }
368            SynthesisStrategy::Weighted => {
369                "You are a weighted synthesizer. Consider the weight/importance of each perspective \
370                when combining responses. Give more emphasis to higher-weighted perspectives while \
371                still incorporating insights from all sources.".to_string()
372            }
373            SynthesisStrategy::Debate => {
374                "You are a debate moderator. Present the different perspectives as a structured debate, \
375                highlighting contrasting viewpoints and the reasoning behind each position. \
376                Help the reader understand the trade-offs and considerations from each angle.".to_string()
377            }
378            _ => "Synthesize the following perspectives into a coherent response.".to_string(),
379        }
380    }
381
382    fn build_synthesis_prompt(&self, query: &str, responses: &[PerspectiveResponse]) -> String {
383        let mut prompt = format!("Original Query: {}\n\n", query);
384        prompt.push_str("Expert Perspectives:\n\n");
385
386        for (i, response) in responses.iter().enumerate() {
387            prompt.push_str(&format!(
388                "=== Perspective {}: {} ===\n{}\n\n",
389                i + 1,
390                response.perspective_name,
391                response.content
392            ));
393        }
394
395        prompt.push_str(
396            "Please synthesize these perspectives into a comprehensive response that addresses \
397            the original query while incorporating insights from all experts.",
398        );
399
400        prompt
401    }
402}
403
404/// Default question decomposer
405pub struct DefaultDecomposer {
406    max_sub_questions: usize,
407}
408
409impl DefaultDecomposer {
410    pub fn new(max_sub_questions: usize) -> Self {
411        Self { max_sub_questions }
412    }
413}
414
415#[async_trait]
416impl QuestionDecomposer for DefaultDecomposer {
417    async fn decompose(
418        &self,
419        query: &str,
420        perspectives: &[AgentPerspective],
421        backend: &dyn LLMBackend,
422    ) -> Result<Vec<SubQuestion>, CrewError> {
423        use cortexai_core::{LLMMessage, MessageRole};
424
425        let perspective_names: Vec<_> = perspectives.iter().map(|p| p.name.as_str()).collect();
426
427        let system_prompt = format!(
428            "You are a question decomposition expert. Break down complex queries into simpler, \
429            focused sub-questions. Each sub-question should be answerable by one of these specialists: {}. \
430            Return a JSON array of objects with 'question' and 'target_perspectives' (array of specialist names) fields. \
431            Generate at most {} sub-questions. If the query is already simple enough, return an empty array.",
432            perspective_names.join(", "),
433            self.max_sub_questions
434        );
435
436        let messages = vec![
437            LLMMessage {
438                role: MessageRole::System,
439                content: system_prompt,
440                tool_calls: None,
441                tool_call_id: None,
442                name: None,
443            },
444            LLMMessage {
445                role: MessageRole::User,
446                content: format!("Decompose this query: {}", query),
447                tool_calls: None,
448                tool_call_id: None,
449                name: None,
450            },
451        ];
452
453        let inference = backend
454            .infer(&messages, &[], 0.3)
455            .await
456            .map_err(|e| CrewError::ExecutionFailed(e.to_string()))?;
457
458        // Parse JSON response
459        let sub_questions: Vec<SubQuestion> = serde_json::from_str(&inference.content)
460            .unwrap_or_else(|_| {
461                // If parsing fails, return empty (no decomposition)
462                Vec::new()
463            });
464
465        Ok(sub_questions)
466    }
467}
468
469/// Orchestra configuration
470#[derive(Debug, Clone)]
471pub struct OrchestraConfig {
472    /// Execution mode
473    pub execution_mode: ExecutionMode,
474    /// Synthesis strategy
475    pub synthesis_strategy: SynthesisStrategy,
476    /// Whether to decompose questions
477    pub enable_decomposition: bool,
478    /// Maximum sub-questions for decomposition
479    pub max_sub_questions: usize,
480    /// Timeout per agent
481    pub agent_timeout: Duration,
482    /// Whether to continue if some agents fail
483    pub continue_on_error: bool,
484    /// Minimum number of responses required for synthesis
485    pub min_responses: usize,
486}
487
488impl Default for OrchestraConfig {
489    fn default() -> Self {
490        Self {
491            execution_mode: ExecutionMode::Sequential,
492            synthesis_strategy: SynthesisStrategy::Comprehensive,
493            enable_decomposition: false,
494            max_sub_questions: 5,
495            agent_timeout: Duration::from_secs(60),
496            continue_on_error: true,
497            min_responses: 1,
498        }
499    }
500}
501
502/// The main Orchestra struct for multi-agent coordination
503pub struct Orchestra {
504    /// Agent perspectives
505    perspectives: Vec<AgentPerspective>,
506    /// Configuration
507    config: OrchestraConfig,
508    /// LLM backend for agent inference
509    backend: Arc<dyn LLMBackend>,
510    /// Custom synthesizer (optional)
511    synthesizer: Option<Arc<dyn Synthesizer>>,
512    /// Custom decomposer (optional)
513    decomposer: Option<Arc<dyn QuestionDecomposer>>,
514}
515
516impl Orchestra {
517    /// Create a new orchestra builder
518    pub fn builder(backend: Arc<dyn LLMBackend>) -> OrchestraBuilder {
519        OrchestraBuilder::new(backend)
520    }
521
522    /// Analyze a query using all perspectives
523    pub async fn analyze(&self, query: &str) -> Result<SynthesizedResponse, CrewError> {
524        let start = Instant::now();
525
526        // Optionally decompose the question
527        let sub_questions = if self.config.enable_decomposition {
528            self.decompose_query(query).await?
529        } else {
530            Vec::new()
531        };
532
533        // Get responses from all perspectives
534        let responses = self.get_all_responses(query, &sub_questions).await?;
535
536        if responses.len() < self.config.min_responses {
537            return Err(CrewError::ExecutionFailed(format!(
538                "Only {} responses received, minimum {} required",
539                responses.len(),
540                self.config.min_responses
541            )));
542        }
543
544        // Synthesize responses
545        let mut result = self.synthesize_responses(query, responses).await?;
546        result.sub_questions = sub_questions;
547        result.execution_mode = self.config.execution_mode;
548        result.total_latency_ms = start.elapsed().as_millis() as u64;
549
550        Ok(result)
551    }
552
553    /// Get a single perspective's response (useful for sequential/targeted use)
554    pub async fn get_perspective_response(
555        &self,
556        perspective_id: &str,
557        query: &str,
558    ) -> Result<PerspectiveResponse, CrewError> {
559        let perspective = self
560            .perspectives
561            .iter()
562            .find(|p| p.id == perspective_id)
563            .ok_or_else(|| {
564                CrewError::ExecutionFailed(format!("Perspective not found: {}", perspective_id))
565            })?;
566
567        self.query_perspective(perspective, query).await
568    }
569
570    /// List available perspectives
571    pub fn perspectives(&self) -> &[AgentPerspective] {
572        &self.perspectives
573    }
574
575    /// Get configuration
576    pub fn config(&self) -> &OrchestraConfig {
577        &self.config
578    }
579
580    // Internal methods
581
582    async fn decompose_query(&self, query: &str) -> Result<Vec<SubQuestion>, CrewError> {
583        let default_decomposer = DefaultDecomposer::new(self.config.max_sub_questions);
584        let decomposer: &dyn QuestionDecomposer = self
585            .decomposer
586            .as_ref()
587            .map(|d| d.as_ref())
588            .unwrap_or(&default_decomposer);
589
590        decomposer
591            .decompose(query, &self.perspectives, self.backend.as_ref())
592            .await
593    }
594
595    async fn get_all_responses(
596        &self,
597        query: &str,
598        _sub_questions: &[SubQuestion],
599    ) -> Result<Vec<PerspectiveResponse>, CrewError> {
600        match self.config.execution_mode {
601            ExecutionMode::Sequential => self.execute_sequential(query).await,
602            ExecutionMode::Parallel => self.execute_parallel(query).await,
603            ExecutionMode::Wave { wave_size } => self.execute_wave(query, wave_size).await,
604        }
605    }
606
607    async fn execute_sequential(&self, query: &str) -> Result<Vec<PerspectiveResponse>, CrewError> {
608        let mut responses = Vec::new();
609
610        for perspective in &self.perspectives {
611            match tokio::time::timeout(
612                self.config.agent_timeout,
613                self.query_perspective(perspective, query),
614            )
615            .await
616            {
617                Ok(Ok(response)) => responses.push(response),
618                Ok(Err(e)) => {
619                    tracing::warn!(
620                        perspective = %perspective.id,
621                        error = %e,
622                        "Perspective failed"
623                    );
624                    if !self.config.continue_on_error {
625                        return Err(e);
626                    }
627                }
628                Err(_) => {
629                    tracing::warn!(
630                        perspective = %perspective.id,
631                        "Perspective timed out"
632                    );
633                    if !self.config.continue_on_error {
634                        return Err(CrewError::ExecutionFailed(format!(
635                            "Perspective {} timed out",
636                            perspective.id
637                        )));
638                    }
639                }
640            }
641        }
642
643        Ok(responses)
644    }
645
646    async fn execute_parallel(&self, query: &str) -> Result<Vec<PerspectiveResponse>, CrewError> {
647        use futures::future::join_all;
648
649        let futures: Vec<_> = self
650            .perspectives
651            .iter()
652            .map(|p| {
653                let query = query.to_string();
654                let timeout = self.config.agent_timeout;
655                async move {
656                    tokio::time::timeout(timeout, self.query_perspective(p, &query)).await
657                }
658            })
659            .collect();
660
661        let results = join_all(futures).await;
662
663        let mut responses = Vec::new();
664        for (i, result) in results.into_iter().enumerate() {
665            match result {
666                Ok(Ok(response)) => responses.push(response),
667                Ok(Err(e)) => {
668                    tracing::warn!(
669                        perspective = %self.perspectives[i].id,
670                        error = %e,
671                        "Perspective failed"
672                    );
673                    if !self.config.continue_on_error {
674                        return Err(e);
675                    }
676                }
677                Err(_) => {
678                    tracing::warn!(
679                        perspective = %self.perspectives[i].id,
680                        "Perspective timed out"
681                    );
682                    if !self.config.continue_on_error {
683                        return Err(CrewError::ExecutionFailed(format!(
684                            "Perspective {} timed out",
685                            self.perspectives[i].id
686                        )));
687                    }
688                }
689            }
690        }
691
692        Ok(responses)
693    }
694
695    async fn execute_wave(
696        &self,
697        query: &str,
698        wave_size: usize,
699    ) -> Result<Vec<PerspectiveResponse>, CrewError> {
700        use futures::future::join_all;
701
702        let mut all_responses = Vec::new();
703
704        for chunk in self.perspectives.chunks(wave_size) {
705            let futures: Vec<_> =
706                chunk
707                    .iter()
708                    .map(|p| {
709                        let query = query.to_string();
710                        let timeout = self.config.agent_timeout;
711                        async move {
712                            tokio::time::timeout(timeout, self.query_perspective(p, &query)).await
713                        }
714                    })
715                    .collect();
716
717            let results = join_all(futures).await;
718
719            for (i, result) in results.into_iter().enumerate() {
720                match result {
721                    Ok(Ok(response)) => all_responses.push(response),
722                    Ok(Err(e)) => {
723                        if !self.config.continue_on_error {
724                            return Err(e);
725                        }
726                        tracing::warn!(error = %e, "Perspective in wave failed");
727                    }
728                    Err(_) => {
729                        if !self.config.continue_on_error {
730                            return Err(CrewError::ExecutionFailed(format!(
731                                "Perspective {} timed out",
732                                chunk[i].id
733                            )));
734                        }
735                        tracing::warn!("Perspective in wave timed out");
736                    }
737                }
738            }
739        }
740
741        Ok(all_responses)
742    }
743
744    async fn query_perspective(
745        &self,
746        perspective: &AgentPerspective,
747        query: &str,
748    ) -> Result<PerspectiveResponse, CrewError> {
749        use cortexai_core::{LLMMessage, MessageRole};
750
751        let start = Instant::now();
752
753        let messages = vec![
754            LLMMessage {
755                role: MessageRole::System,
756                content: perspective.system_prompt.clone(),
757                tool_calls: None,
758                tool_call_id: None,
759                name: None,
760            },
761            LLMMessage {
762                role: MessageRole::User,
763                content: query.to_string(),
764                tool_calls: None,
765                tool_call_id: None,
766                name: None,
767            },
768        ];
769
770        let inference = self
771            .backend
772            .infer(&messages, &[], perspective.temperature)
773            .await
774            .map_err(|e| CrewError::ExecutionFailed(e.to_string()))?;
775
776        let key_points = DefaultSynthesizer::extract_key_points(&inference.content);
777
778        Ok(PerspectiveResponse {
779            perspective_id: perspective.id.clone(),
780            perspective_name: perspective.name.clone(),
781            content: inference.content,
782            latency_ms: start.elapsed().as_millis() as u64,
783            confidence: None,
784            key_points,
785            metadata: HashMap::new(),
786        })
787    }
788
789    async fn synthesize_responses(
790        &self,
791        query: &str,
792        responses: Vec<PerspectiveResponse>,
793    ) -> Result<SynthesizedResponse, CrewError> {
794        let default_synthesizer = DefaultSynthesizer::new(self.config.synthesis_strategy);
795        let synthesizer: &dyn Synthesizer = self
796            .synthesizer
797            .as_ref()
798            .map(|s| s.as_ref())
799            .unwrap_or(&default_synthesizer);
800
801        synthesizer
802            .synthesize(query, responses, self.backend.as_ref())
803            .await
804    }
805}
806
807/// Builder for Orchestra
808pub struct OrchestraBuilder {
809    backend: Arc<dyn LLMBackend>,
810    perspectives: Vec<AgentPerspective>,
811    config: OrchestraConfig,
812    synthesizer: Option<Arc<dyn Synthesizer>>,
813    decomposer: Option<Arc<dyn QuestionDecomposer>>,
814}
815
816impl OrchestraBuilder {
817    pub fn new(backend: Arc<dyn LLMBackend>) -> Self {
818        Self {
819            backend,
820            perspectives: Vec::new(),
821            config: OrchestraConfig::default(),
822            synthesizer: None,
823            decomposer: None,
824        }
825    }
826
827    /// Add an agent perspective
828    pub fn add_perspective(mut self, perspective: AgentPerspective) -> Self {
829        self.perspectives.push(perspective);
830        self
831    }
832
833    /// Add multiple perspectives at once
834    pub fn with_perspectives(mut self, perspectives: Vec<AgentPerspective>) -> Self {
835        self.perspectives.extend(perspectives);
836        self
837    }
838
839    /// Set execution mode
840    pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
841        self.config.execution_mode = mode;
842        self
843    }
844
845    /// Set synthesis strategy
846    pub fn with_synthesis_strategy(mut self, strategy: SynthesisStrategy) -> Self {
847        self.config.synthesis_strategy = strategy;
848        self
849    }
850
851    /// Enable question decomposition
852    pub fn with_decomposition(mut self, enable: bool) -> Self {
853        self.config.enable_decomposition = enable;
854        self
855    }
856
857    /// Set maximum sub-questions
858    pub fn with_max_sub_questions(mut self, max: usize) -> Self {
859        self.config.max_sub_questions = max;
860        self
861    }
862
863    /// Set agent timeout
864    pub fn with_agent_timeout(mut self, timeout: Duration) -> Self {
865        self.config.agent_timeout = timeout;
866        self
867    }
868
869    /// Set whether to continue on agent errors
870    pub fn with_continue_on_error(mut self, continue_on_error: bool) -> Self {
871        self.config.continue_on_error = continue_on_error;
872        self
873    }
874
875    /// Set minimum responses required
876    pub fn with_min_responses(mut self, min: usize) -> Self {
877        self.config.min_responses = min;
878        self
879    }
880
881    /// Set custom synthesizer
882    pub fn with_synthesizer(mut self, synthesizer: Arc<dyn Synthesizer>) -> Self {
883        self.synthesizer = Some(synthesizer);
884        self
885    }
886
887    /// Set custom decomposer
888    pub fn with_decomposer(mut self, decomposer: Arc<dyn QuestionDecomposer>) -> Self {
889        self.decomposer = Some(decomposer);
890        self
891    }
892
893    /// Build the orchestra
894    pub fn build(self) -> Result<Orchestra, CrewError> {
895        if self.perspectives.is_empty() {
896            return Err(CrewError::InvalidConfiguration(
897                "At least one perspective is required".to_string(),
898            ));
899        }
900
901        Ok(Orchestra {
902            perspectives: self.perspectives,
903            config: self.config,
904            backend: self.backend,
905            synthesizer: self.synthesizer,
906            decomposer: self.decomposer,
907        })
908    }
909}
910
911/// Preset perspective configurations for common use cases
912pub mod presets {
913    use super::*;
914
915    /// Create a balanced analysis team
916    pub fn balanced_analysis() -> Vec<AgentPerspective> {
917        vec![
918            AgentPerspective::new(
919                "analyst",
920                "You are a data-driven analyst. Focus on facts, statistics, and empirical evidence. \
921                Provide structured, quantitative analysis where possible. Be objective and thorough."
922            )
923            .with_name("Data Analyst")
924            .with_temperature(0.3)
925            .with_weight(1.0),
926
927            AgentPerspective::new(
928                "critic",
929                "You are a critical thinker and devil's advocate. Question assumptions, identify \
930                potential flaws, risks, and weaknesses. Consider edge cases and failure modes. \
931                Be constructively skeptical."
932            )
933            .with_name("Critical Thinker")
934            .with_temperature(0.5)
935            .with_weight(0.8),
936
937            AgentPerspective::new(
938                "creative",
939                "You are a creative problem solver. Think outside the box, propose innovative \
940                solutions, and make unexpected connections. Consider unconventional approaches \
941                and possibilities others might miss."
942            )
943            .with_name("Creative Innovator")
944            .with_temperature(0.9)
945            .with_weight(0.7),
946
947            AgentPerspective::new(
948                "pragmatist",
949                "You are a pragmatic implementer. Focus on what's practical, achievable, and \
950                cost-effective. Consider real-world constraints, timelines, and resources. \
951                Provide actionable recommendations."
952            )
953            .with_name("Pragmatist")
954            .with_temperature(0.4)
955            .with_weight(1.0),
956        ]
957    }
958
959    /// Create a technical review team
960    pub fn technical_review() -> Vec<AgentPerspective> {
961        vec![
962            AgentPerspective::new(
963                "architect",
964                "You are a software architect. Evaluate system design, scalability, and \
965                architectural patterns. Consider maintainability, extensibility, and best practices."
966            )
967            .with_name("Software Architect")
968            .with_temperature(0.3),
969
970            AgentPerspective::new(
971                "security",
972                "You are a security expert. Identify vulnerabilities, security risks, and \
973                potential attack vectors. Recommend security best practices and mitigations."
974            )
975            .with_name("Security Expert")
976            .with_temperature(0.2),
977
978            AgentPerspective::new(
979                "performance",
980                "You are a performance engineer. Analyze efficiency, identify bottlenecks, \
981                and suggest optimizations. Consider resource usage, latency, and throughput."
982            )
983            .with_name("Performance Engineer")
984            .with_temperature(0.3),
985        ]
986    }
987
988    /// Create a business strategy team
989    pub fn business_strategy() -> Vec<AgentPerspective> {
990        vec![
991            AgentPerspective::new(
992                "strategist",
993                "You are a business strategist. Analyze market positioning, competitive \
994                advantage, and long-term growth opportunities. Think big picture.",
995            )
996            .with_name("Strategist")
997            .with_temperature(0.5),
998            AgentPerspective::new(
999                "financial",
1000                "You are a financial analyst. Focus on costs, ROI, revenue potential, and \
1001                financial viability. Provide quantitative financial analysis.",
1002            )
1003            .with_name("Financial Analyst")
1004            .with_temperature(0.2),
1005            AgentPerspective::new(
1006                "customer",
1007                "You are a customer experience expert. Consider user needs, pain points, \
1008                and satisfaction. Advocate for the customer perspective.",
1009            )
1010            .with_name("Customer Advocate")
1011            .with_temperature(0.6),
1012        ]
1013    }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019    use cortexai_core::{LLMMessage, ToolSchema};
1020    use cortexai_providers::{InferenceOutput, ModelInfo, TokenUsage};
1021    use std::sync::atomic::{AtomicUsize, Ordering};
1022
1023    // Mock backend for testing
1024    struct MockBackend {
1025        call_count: AtomicUsize,
1026        responses: Vec<String>,
1027    }
1028
1029    impl MockBackend {
1030        fn new(responses: Vec<String>) -> Self {
1031            Self {
1032                call_count: AtomicUsize::new(0),
1033                responses,
1034            }
1035        }
1036    }
1037
1038    #[async_trait]
1039    impl LLMBackend for MockBackend {
1040        async fn infer(
1041            &self,
1042            _messages: &[LLMMessage],
1043            _tools: &[ToolSchema],
1044            _temperature: f32,
1045        ) -> Result<InferenceOutput, cortexai_core::errors::LLMError> {
1046            let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
1047            let response = self
1048                .responses
1049                .get(idx % self.responses.len())
1050                .cloned()
1051                .unwrap_or_else(|| format!("Response {}", idx));
1052
1053            Ok(InferenceOutput {
1054                content: response,
1055                tool_calls: None,
1056                reasoning: None,
1057                confidence: 0.9,
1058                token_usage: TokenUsage::new(10, 20),
1059                metadata: HashMap::new(),
1060            })
1061        }
1062
1063        async fn embed(
1064            &self,
1065            _text: &str,
1066        ) -> Result<Vec<f32>, cortexai_core::errors::LLMError> {
1067            Ok(vec![0.1, 0.2, 0.3])
1068        }
1069
1070        fn model_info(&self) -> ModelInfo {
1071            ModelInfo {
1072                model: "mock-model".to_string(),
1073                provider: "mock".to_string(),
1074                max_tokens: 4096,
1075                input_cost_per_1m: 0.0,
1076                output_cost_per_1m: 0.0,
1077                supports_functions: true,
1078                supports_vision: false,
1079            }
1080        }
1081    }
1082
1083    #[tokio::test]
1084    async fn test_orchestra_sequential() {
1085        let backend = Arc::new(MockBackend::new(vec![
1086            "Analysis from analyst perspective.".to_string(),
1087            "Critical review of the topic.".to_string(),
1088        ]));
1089
1090        let orchestra = Orchestra::builder(backend)
1091            .add_perspective(AgentPerspective::new("analyst", "You are an analyst."))
1092            .add_perspective(AgentPerspective::new("critic", "You are a critic."))
1093            .with_execution_mode(ExecutionMode::Sequential)
1094            .with_synthesis_strategy(SynthesisStrategy::None)
1095            .build()
1096            .unwrap();
1097
1098        let result = orchestra.analyze("Test query").await.unwrap();
1099
1100        assert_eq!(result.perspectives.len(), 2);
1101        assert_eq!(result.execution_mode, ExecutionMode::Sequential);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_orchestra_parallel() {
1106        let backend = Arc::new(MockBackend::new(vec![
1107            "Response 1".to_string(),
1108            "Response 2".to_string(),
1109            "Response 3".to_string(),
1110        ]));
1111
1112        let orchestra = Orchestra::builder(backend)
1113            .add_perspective(AgentPerspective::new("p1", "Perspective 1"))
1114            .add_perspective(AgentPerspective::new("p2", "Perspective 2"))
1115            .add_perspective(AgentPerspective::new("p3", "Perspective 3"))
1116            .with_execution_mode(ExecutionMode::Parallel)
1117            .with_synthesis_strategy(SynthesisStrategy::None)
1118            .build()
1119            .unwrap();
1120
1121        let result = orchestra.analyze("Test").await.unwrap();
1122
1123        assert_eq!(result.perspectives.len(), 3);
1124        assert_eq!(result.execution_mode, ExecutionMode::Parallel);
1125    }
1126
1127    #[tokio::test]
1128    async fn test_orchestra_wave() {
1129        let backend = Arc::new(MockBackend::new(vec!["Wave response".to_string()]));
1130
1131        let orchestra = Orchestra::builder(backend)
1132            .add_perspective(AgentPerspective::new("p1", "P1"))
1133            .add_perspective(AgentPerspective::new("p2", "P2"))
1134            .add_perspective(AgentPerspective::new("p3", "P3"))
1135            .add_perspective(AgentPerspective::new("p4", "P4"))
1136            .with_execution_mode(ExecutionMode::Wave { wave_size: 2 })
1137            .with_synthesis_strategy(SynthesisStrategy::None)
1138            .build()
1139            .unwrap();
1140
1141        let result = orchestra.analyze("Test").await.unwrap();
1142
1143        assert_eq!(result.perspectives.len(), 4);
1144    }
1145
1146    #[tokio::test]
1147    async fn test_single_perspective() {
1148        let backend = Arc::new(MockBackend::new(vec!["Single response".to_string()]));
1149
1150        let orchestra = Orchestra::builder(backend)
1151            .add_perspective(
1152                AgentPerspective::new("analyst", "You are an analyst.").with_name("Data Analyst"),
1153            )
1154            .build()
1155            .unwrap();
1156
1157        let response = orchestra
1158            .get_perspective_response("analyst", "Analyze this")
1159            .await
1160            .unwrap();
1161
1162        assert_eq!(response.perspective_id, "analyst");
1163        assert_eq!(response.perspective_name, "Data Analyst");
1164        assert!(!response.content.is_empty());
1165    }
1166
1167    #[tokio::test]
1168    async fn test_presets() {
1169        let perspectives = presets::balanced_analysis();
1170        assert_eq!(perspectives.len(), 4);
1171
1172        let perspectives = presets::technical_review();
1173        assert_eq!(perspectives.len(), 3);
1174
1175        let perspectives = presets::business_strategy();
1176        assert_eq!(perspectives.len(), 3);
1177    }
1178
1179    #[tokio::test]
1180    async fn test_builder_validation() {
1181        let backend = Arc::new(MockBackend::new(vec![]));
1182
1183        // Should fail without perspectives
1184        let result = Orchestra::builder(backend).build();
1185        assert!(result.is_err());
1186    }
1187
1188    #[tokio::test]
1189    async fn test_synthesis_best_of() {
1190        let backend = Arc::new(MockBackend::new(vec![
1191            "Short".to_string(),
1192            "This is a much longer and more detailed response that should be selected.".to_string(),
1193        ]));
1194
1195        let orchestra = Orchestra::builder(backend)
1196            .add_perspective(AgentPerspective::new("short", "Be brief"))
1197            .add_perspective(AgentPerspective::new("detailed", "Be detailed"))
1198            .with_synthesis_strategy(SynthesisStrategy::BestOf)
1199            .build()
1200            .unwrap();
1201
1202        let result = orchestra.analyze("Test").await.unwrap();
1203
1204        // Best-of should select the longer response
1205        assert!(result.synthesis.len() > 10);
1206    }
1207
1208    #[tokio::test]
1209    async fn test_continue_on_error() {
1210        let backend = Arc::new(MockBackend::new(vec!["Valid response".to_string()]));
1211
1212        let orchestra = Orchestra::builder(backend)
1213            .add_perspective(AgentPerspective::new("p1", "P1"))
1214            .with_continue_on_error(true)
1215            .with_min_responses(1)
1216            .build()
1217            .unwrap();
1218
1219        let result = orchestra.analyze("Test").await;
1220        assert!(result.is_ok());
1221    }
1222}