Skip to main content

vex_runtime/
orchestrator.rs

1//! Orchestrator - manages hierarchical agent networks
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use vex_core::{
10    tournament_select, Agent, AgentConfig, Fitness, GeneticOperator, Genome, Hash, MerkleTree,
11    StandardOperator,
12};
13
14use crate::executor::{AgentExecutor, ExecutionResult, ExecutorConfig};
15use vex_llm::LlmProvider;
16
17/// Configuration for the orchestrator
18#[derive(Debug, Clone)]
19pub struct OrchestratorConfig {
20    /// Maximum depth of agent hierarchy
21    pub max_depth: u8,
22    /// Number of agents per level
23    pub agents_per_level: usize,
24    /// Enable evolutionary selection
25    pub enable_evolution: bool,
26    /// Mutation rate for evolution
27    pub mutation_rate: f64,
28    /// Executor configuration
29    pub executor_config: ExecutorConfig,
30    /// Maximum age for tracked agents before cleanup (prevents memory leaks)
31    pub max_agent_age: Duration,
32    /// Enable self-correcting genome evolution
33    pub enable_self_correction: bool,
34    /// Minimum fitness improvement to accept change
35    pub improvement_threshold: f64,
36    /// Number of tasks before reflection
37    pub reflect_every_n_tasks: usize,
38}
39
40impl Default for OrchestratorConfig {
41    fn default() -> Self {
42        Self {
43            max_depth: 3,
44            agents_per_level: 2,
45            enable_evolution: true,
46            mutation_rate: 0.1,
47            executor_config: ExecutorConfig::default(),
48            max_agent_age: Duration::from_secs(3600), // 1 hour default
49            enable_self_correction: false,
50            improvement_threshold: 0.02,
51            reflect_every_n_tasks: 5,
52        }
53    }
54}
55
56use vex_anchor::{AnchorBackend, AnchorMetadata, AnchorReceipt};
57
58/// Result from orchestrated execution
59#[derive(Debug)]
60pub struct OrchestrationResult {
61    /// Root agent ID
62    pub root_agent_id: Uuid,
63    /// Final synthesized response
64    pub response: String,
65    /// Merkle root of all context packets
66    pub merkle_root: Hash,
67    /// Aggregated trace root from all agents
68    pub trace_root: Option<Hash>,
69    /// All execution results (agent_id -> result)
70    pub agent_results: HashMap<Uuid, ExecutionResult>,
71    /// Anchor receipts from blockchain backends
72    pub anchor_receipts: Vec<AnchorReceipt>,
73    /// Total levels processed
74    pub levels_processed: u8,
75    /// Overall confidence
76    pub confidence: f64,
77}
78
79/// Tracked agent with creation timestamp for TTL-based cleanup
80#[derive(Clone)]
81struct TrackedAgent {
82    agent: Agent,
83    _tenant_id: String,
84    created_at: Instant,
85}
86
87/// Orchestrator manages hierarchical agent execution
88pub struct Orchestrator<L: LlmProvider> {
89    /// Configuration
90    pub config: OrchestratorConfig,
91    /// All agents (id -> tracked agent with timestamp)
92    agents: RwLock<HashMap<Uuid, TrackedAgent>>,
93    /// Executor
94    executor: AgentExecutor<L>,
95    /// Anchoring backends (Blockchain, Cloud, etc)
96    anchors: Vec<Arc<dyn AnchorBackend>>,
97    /// LLM backend (stored for future use)
98    #[allow(dead_code)]
99    llm: Arc<L>,
100    /// Evolution memory for self-correction (optional)
101    evolution_memory: Option<RwLock<vex_core::EvolutionMemory>>,
102    /// Reflection agent for LLM-based suggestions (optional)
103    reflection_agent: Option<vex_adversarial::ReflectionAgent<L>>,
104    /// Persistence layer for cross-session learning (optional)
105    persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
106}
107
108impl<L: LlmProvider + 'static> Orchestrator<L> {
109    /// Create a new orchestrator
110    pub fn new(
111        llm: Arc<L>,
112        config: OrchestratorConfig,
113        persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
114        gate: Arc<dyn crate::gate::Gate>,
115    ) -> Self {
116        let executor = AgentExecutor::new(llm.clone(), config.executor_config.clone(), gate);
117        let evolution_memory = if config.enable_self_correction {
118            Some(RwLock::new(vex_core::EvolutionMemory::new()))
119        } else {
120            None
121        };
122        let reflection_agent = if config.enable_self_correction {
123            Some(vex_adversarial::ReflectionAgent::new(llm.clone()))
124        } else {
125            None
126        };
127        Self {
128            config,
129            agents: RwLock::new(HashMap::new()),
130            executor,
131            anchors: Vec::new(),
132            llm,
133            evolution_memory,
134            reflection_agent,
135            persistence_layer,
136        }
137    }
138
139    /// Add an anchoring backend
140    pub fn add_anchor(&mut self, anchor: Arc<dyn AnchorBackend>) {
141        self.anchors.push(anchor);
142    }
143
144    /// Cleanup expired agents to prevent memory leaks
145    /// Returns the number of agents removed
146    pub async fn cleanup_expired(&self) -> usize {
147        let mut agents = self.agents.write().await;
148        let before = agents.len();
149        agents.retain(|_, tracked| tracked.created_at.elapsed() < self.config.max_agent_age);
150        let removed = before - agents.len();
151        if removed > 0 {
152            tracing::info!(
153                removed = removed,
154                remaining = agents.len(),
155                "Cleaned up expired agents"
156            );
157        }
158        removed
159    }
160
161    /// Get current agent count
162    pub async fn agent_count(&self) -> usize {
163        self.agents.read().await.len()
164    }
165
166    /// Process a query with full hierarchical agent network
167    pub async fn process(
168        &self,
169        tenant_id: &str,
170        query: &str,
171        capabilities: Vec<vex_llm::Capability>,
172    ) -> Result<OrchestrationResult, String> {
173        // Create root agent
174        let root_config = AgentConfig {
175            name: "Root".to_string(),
176            role: "You are a strategic coordinator. Synthesize information from sub-agents into a coherent response.".to_string(),
177            max_depth: self.config.max_depth,
178            spawn_shadow: true,
179        };
180        let mut root = Agent::new(root_config);
181        let root_id = root.id;
182
183        // Spawn child agents for research
184        let child_configs = vec![
185            AgentConfig {
186                name: "Researcher".to_string(),
187                role: "You are a thorough researcher. Analyze the query and provide detailed findings.".to_string(),
188                max_depth: 1,
189                spawn_shadow: true,
190            },
191            AgentConfig {
192                name: "Critic".to_string(),
193                role: "You are a critical analyzer. Identify potential issues, edge cases, and weaknesses.".to_string(),
194                max_depth: 1,
195                spawn_shadow: true,
196            },
197        ];
198
199        // Execute child agents in parallel (no lock held during await)
200        let mut execution_futures = Vec::new();
201        for config in child_configs.into_iter().take(self.config.agents_per_level) {
202            let mut child = root.spawn_child(config);
203            let executor = self.executor.clone();
204            let query_str = query.to_string();
205            let caps = capabilities.clone();
206
207            execution_futures.push(tokio::spawn(async move {
208                let result = executor.execute(&mut child, &query_str, caps).await;
209                (child.id, child, result)
210            }));
211        }
212
213        let task_results = futures::future::join_all(execution_futures).await;
214
215        // Re-acquire lock to update agents map
216        let mut agents = self.agents.write().await;
217
218        let mut all_results: HashMap<Uuid, ExecutionResult> = HashMap::new();
219        let mut child_results = Vec::new();
220        for task_result in task_results {
221            let (child_id, child, result) = task_result.map_err(|e| e.to_string())?;
222            let execution_result: ExecutionResult = result?;
223
224            child_results.push((child_id, execution_result.clone()));
225            all_results.insert(child_id, execution_result);
226            agents.insert(
227                child_id,
228                TrackedAgent {
229                    agent: child,
230                    _tenant_id: tenant_id.to_string(),
231                    created_at: Instant::now(),
232                },
233            );
234        }
235
236        // Drop lock before root synthesis
237        drop(agents);
238
239        // Synthesize child results at root level
240        let synthesis_prompt = format!(
241            "Based on the following research from your sub-agents, provide a comprehensive answer:\n\n\
242             Original Query: \"{}\"\n\n\
243             Researcher's Findings: \"{}\"\n\n\
244             Critic's Analysis: \"{}\"\n\n\
245             Synthesize these into a final, well-reasoned response.",
246            query,
247            child_results.first().map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
248            child_results.get(1).map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
249        );
250
251        let root_result = self
252            .executor
253            .execute(&mut root, &synthesis_prompt, capabilities.clone())
254            .await?;
255        all_results.insert(root_id, root_result.clone());
256
257        // Re-acquire lock to update root and run evolution
258        let mut agents = self.agents.write().await;
259
260        // Insert root after children are handled
261        agents.insert(
262            root_id,
263            TrackedAgent {
264                agent: root,
265                _tenant_id: tenant_id.to_string(),
266                created_at: Instant::now(),
267            },
268        );
269
270        // Build Merkle tree from all context packets
271        let leaves: Vec<(String, Hash)> = all_results
272            .iter()
273            .map(|(id, r)| (id.to_string(), r.context.hash.clone()))
274            .collect();
275        let merkle_tree = MerkleTree::from_leaves(leaves);
276
277        // Calculate overall confidence
278        let total_confidence: f64 = all_results.values().map(|r| r.confidence).sum();
279        let avg_confidence = total_confidence / all_results.len() as f64;
280
281        // Evolution step (if enabled)
282        if self.config.enable_evolution {
283            if self.config.enable_self_correction {
284                self.evolve_agents_self_correcting(tenant_id, &mut agents, &all_results)
285                    .await;
286            } else {
287                self.evolve_agents(tenant_id, &mut agents, &all_results);
288            }
289        }
290
291        // Build trace merkle tree from agent trace roots
292        let trace_leaves: Vec<(String, Hash)> = all_results
293            .iter()
294            .filter_map(|(id, r)| r.trace_root.clone().map(|tr| (id.to_string(), tr)))
295            .collect();
296        let trace_merkle = MerkleTree::from_leaves(trace_leaves);
297
298        // Anchoring Step
299        let mut anchor_receipts = Vec::new();
300        if let Some(root_hash) = merkle_tree.root_hash() {
301            let metadata = AnchorMetadata::new(tenant_id, all_results.len() as u64);
302            for anchor in &self.anchors {
303                match anchor.anchor(root_hash, metadata.clone()).await {
304                    Ok(receipt) => anchor_receipts.push(receipt),
305                    Err(e) => tracing::warn!("Anchoring to {} failed: {}", anchor.name(), e),
306                }
307            }
308        }
309
310        Ok(OrchestrationResult {
311            root_agent_id: root_id,
312            response: root_result.response,
313            merkle_root: merkle_tree
314                .root_hash()
315                .cloned()
316                .unwrap_or(Hash::digest(b"empty")),
317            trace_root: trace_merkle.root_hash().cloned(),
318            agent_results: all_results,
319            anchor_receipts,
320            levels_processed: 2,
321            confidence: avg_confidence,
322        })
323    }
324
325    /// Evolve agents based on fitness - persists evolved genome to fittest agent
326    fn evolve_agents(
327        &self,
328        _tenant_id: &str,
329        agents: &mut HashMap<Uuid, TrackedAgent>,
330        results: &HashMap<Uuid, ExecutionResult>,
331    ) {
332        let operator = StandardOperator;
333
334        // Build population with fitness scores from actual agent genomes
335        let population: Vec<(Genome, Fitness)> = agents
336            .values()
337            .map(|tracked| {
338                let fitness = results
339                    .get(&tracked.agent.id)
340                    .map(|r| r.confidence)
341                    .unwrap_or(0.5);
342                (tracked.agent.genome.clone(), Fitness::new(fitness))
343            })
344            .collect();
345
346        if population.len() < 2 {
347            return;
348        }
349
350        // Select parents via tournament selection and create offspring
351        let parent_a = tournament_select(&population, 2);
352        let parent_b = tournament_select(&population, 2);
353        let mut offspring = operator.crossover(parent_a, parent_b);
354        operator.mutate(&mut offspring, self.config.mutation_rate);
355
356        // Find the least fit agent and apply the evolved genome to it (Elitism)
357        // We preserve the 'best' and replace the 'worst' to ensure no regression.
358        if let Some((worst_id, _worst_fitness)) = results.iter().min_by(|a, b| {
359            a.1.confidence
360                .partial_cmp(&b.1.confidence)
361                .unwrap_or(std::cmp::Ordering::Equal)
362        }) {
363            if let Some(tracked) = agents.get_mut(worst_id) {
364                let old_traits = tracked.agent.genome.traits.clone();
365                tracked.agent.apply_evolved_genome(offspring.clone());
366
367                tracing::info!(
368                    agent_id = %worst_id,
369                    old_traits = ?old_traits,
370                    new_traits = ?offspring.traits,
371                    "Evolved genome applied to least fit agent (Elitism preserved fittest)"
372                );
373            }
374        }
375    }
376
377    /// Self-correcting evolution using temporal memory and statistical learning
378    ///
379    /// This enhances basic evolution with:
380    /// - Temporal memory of past experiments  
381    /// - Statistical correlation learning (Pearson)
382    /// - Intelligent trait adjustment suggestions
383    ///
384    /// # Modular Design
385    /// Users can override this for custom strategies.
386    async fn evolve_agents_self_correcting(
387        &self,
388        tenant_id: &str,
389        agents: &mut HashMap<Uuid, TrackedAgent>,
390        results: &HashMap<Uuid, ExecutionResult>,
391    ) {
392        // Require evolution memory
393        let memory = match &self.evolution_memory {
394            Some(mem) => mem,
395            None => {
396                tracing::warn!("Self-correction enabled but memory not initialized");
397                return self.evolve_agents(tenant_id, agents, results);
398            }
399        };
400
401        // Record experiments to memory and collect for persistence
402        let experiments_to_save: Vec<vex_core::GenomeExperiment> = {
403            let mut memory_guard = memory.write().await;
404            let mut experiments = Vec::new();
405
406            for (id, result) in results {
407                if let Some(tracked) = agents.get(id) {
408                    let mut fitness_scores = std::collections::HashMap::new();
409                    fitness_scores.insert("confidence".to_string(), result.confidence);
410
411                    let experiment = vex_core::GenomeExperiment::new(
412                        &tracked.agent.genome,
413                        fitness_scores,
414                        result.confidence,
415                        &format!("Depth {}", tracked.agent.depth),
416                    );
417                    memory_guard.record(experiment.clone());
418                    experiments.push(experiment);
419                }
420            }
421            experiments
422        }; // Release lock here
423
424        // Save to persistence (async, no lock held)
425        if let Some(store) = &self.persistence_layer {
426            for experiment in experiments_to_save {
427                if let Err(e) = store.save_experiment(tenant_id, &experiment).await {
428                    tracing::warn!("Failed to persist evolution experiment: {}", e);
429                }
430            }
431        }
432
433        // Find best performer
434        let best = results.iter().max_by(|a, b| {
435            a.1.confidence
436                .partial_cmp(&b.1.confidence)
437                .unwrap_or(std::cmp::Ordering::Equal)
438        });
439
440        if let Some((best_id, best_result)) = best {
441            if let Some(tracked) = agents.get_mut(best_id) {
442                // Get suggestions using dual-mode learning (statistical + LLM)
443                let suggestions = if let Some(ref reflection) = self.reflection_agent {
444                    // Use ReflectionAgent for LLM + statistical suggestions
445                    let memory_guard = memory.read().await;
446
447                    let reflection_result = reflection
448                        .reflect(
449                            &tracked.agent,
450                            &format!("Orchestrated task at depth {}", tracked.agent.depth),
451                            &best_result.response,
452                            best_result.confidence,
453                            &memory_guard,
454                        )
455                        .await;
456
457                    drop(memory_guard);
458
459                    // Convert to trait adjustments format
460                    reflection_result
461                        .adjustments
462                        .into_iter()
463                        .map(|(name, current, suggested)| {
464                            vex_core::TraitAdjustment {
465                                trait_name: name,
466                                current_value: current,
467                                suggested_value: suggested,
468                                correlation: 0.5, // From LLM, not pure statistical
469                                confidence: reflection_result.expected_improvement,
470                            }
471                        })
472                        .collect()
473                } else {
474                    // Fallback to statistical-only if ReflectionAgent not available
475                    let memory_guard = memory.read().await;
476                    let suggestions = memory_guard.suggest_adjustments(&tracked.agent.genome);
477                    drop(memory_guard);
478                    suggestions
479                };
480
481                if !suggestions.is_empty() {
482                    let old_traits = tracked.agent.genome.traits.clone();
483
484                    // Apply suggestions with high confidence
485                    for (i, name) in tracked.agent.genome.trait_names.iter().enumerate() {
486                        if let Some(sugg) = suggestions.iter().find(|s| &s.trait_name == name) {
487                            if sugg.confidence >= 0.3 {
488                                tracked.agent.genome.traits[i] = sugg.suggested_value;
489                            }
490                        }
491                    }
492
493                    if old_traits != tracked.agent.genome.traits {
494                        let source = if self.reflection_agent.is_some() {
495                            "LLM + Statistical"
496                        } else {
497                            "Statistical"
498                        };
499
500                        tracing::info!(
501                            agent_id = %best_id,
502                            old_traits = ?old_traits,
503                            new_traits = ?tracked.agent.genome.traits,
504                            suggestions = suggestions.len(),
505                            source = source,
506                            "Self-correcting genome applied"
507                        );
508                    }
509                } else {
510                    // Fallback to standard evolution
511                    self.evolve_agents(tenant_id, agents, results);
512                }
513            }
514        }
515
516        // Periodically check for memory consolidation
517        self.maybe_consolidate_memory(tenant_id).await;
518    }
519
520    /// Check if memory needs consolidation and perform it if necessary
521    async fn maybe_consolidate_memory(&self, tenant_id: &str) {
522        let memory = match &self.evolution_memory {
523            Some(m) => m,
524            None => return,
525        };
526
527        let reflection = match &self.reflection_agent {
528            Some(r) => r,
529            None => return,
530        };
531
532        // Check buffer size (Read lock)
533        // Maintain a safety buffer of 20 recent experiments for statistical continuity
534        let (count, snapshot, batch_size) = {
535            let guard = memory.read().await;
536            if guard.len() >= 70 {
537                (guard.len(), guard.get_experiments_oldest(50), 50)
538            } else {
539                (0, Vec::new(), 0)
540            }
541        };
542
543        if count >= 70 {
544            tracing::info!(
545                "Consolidating evolution memory ({} items, batch execution)...",
546                batch_size
547            );
548
549            // 1. Extract rules using LLM
550            let consolidation_result = reflection.consolidate_memory(&snapshot).await;
551
552            let success = consolidation_result.is_ok();
553
554            match consolidation_result {
555                Ok(rules) => {
556                    if !rules.is_empty() {
557                        // 2. Save rules to persistence
558                        if let Some(store) = &self.persistence_layer {
559                            for rule in &rules {
560                                if let Err(e) = store.save_rule(tenant_id, rule).await {
561                                    tracing::warn!("Failed to save optimization rule: {}", e);
562                                }
563                            }
564                        }
565
566                        tracing::info!(
567                            "Consolidated memory into {} rules. Draining batch.",
568                            rules.len()
569                        );
570                    } else {
571                        tracing::info!(
572                            "Consolidation completed with no patterns found. Draining batch."
573                        );
574                    }
575                }
576                Err(e) => {
577                    tracing::warn!("Consolidation failed: {}", e);
578                }
579            }
580
581            // 3. Manage Memory (Write lock)
582            let mut guard = memory.write().await;
583
584            // If success (even if no rules), remove the processed batch
585            if success {
586                guard.drain_oldest(batch_size);
587            }
588
589            // Overflow Protection: Hard cap at 100 to prevent DoS
590            if guard.len() > 100 {
591                let excess = guard.len() - 100;
592                tracing::warn!(
593                    "Memory overflow ({} > 100). Evicting {} oldest items.",
594                    guard.len(),
595                    excess
596                );
597                guard.drain_oldest(excess);
598            }
599        }
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use async_trait::async_trait;
607
608    #[derive(Debug)]
609    struct MockLlm;
610
611    #[async_trait]
612    impl vex_llm::LlmProvider for MockLlm {
613        fn name(&self) -> &str {
614            "MockLLM"
615        }
616
617        async fn is_available(&self) -> bool {
618            true
619        }
620
621        async fn complete(
622            &self,
623            request: vex_llm::LlmRequest,
624        ) -> Result<vex_llm::LlmResponse, vex_llm::LlmError> {
625            let content = if request.system.contains("researcher") {
626                "Research finding: This is a detailed analysis of the topic.".to_string()
627            } else if request.system.contains("critic") {
628                "Critical analysis: The main concern is validation of assumptions.".to_string()
629            } else if request.prompt.is_empty() {
630                "Mock response".to_string()
631            } else {
632                "Synthesized response combining all findings into a coherent answer.".to_string()
633            };
634
635            Ok(vex_llm::LlmResponse {
636                content,
637                model: "mock".to_string(),
638                tokens_used: Some(10),
639                latency_ms: 10,
640                trace_root: None,
641            })
642        }
643    }
644
645    #[tokio::test]
646    async fn test_orchestrator() {
647        let llm = Arc::new(MockLlm);
648        let gate = Arc::new(crate::gate::GenericGateMock);
649        let mut config = OrchestratorConfig::default();
650        config.executor_config.enable_adversarial = false;
651        let orchestrator = Orchestrator::new(llm, config, None, gate);
652
653        let result = orchestrator
654            .process("test-tenant", "What is the meaning of life?", vec![])
655            .await
656            .unwrap();
657
658        assert!(!result.response.is_empty());
659        assert!(result.confidence > 0.0);
660        assert!(!result.agent_results.is_empty());
661    }
662}