1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use vex_core::{
10 tournament_select, Agent, AgentConfig, Fitness, GeneticOperator, Genome, Hash, MerkleTree,
11 StandardOperator,
12};
13
14use crate::executor::{AgentExecutor, ExecutionResult, ExecutorConfig};
15use vex_llm::LlmProvider;
16
17#[derive(Debug, Clone)]
19pub struct OrchestratorConfig {
20 pub max_depth: u8,
22 pub agents_per_level: usize,
24 pub enable_evolution: bool,
26 pub mutation_rate: f64,
28 pub executor_config: ExecutorConfig,
30 pub max_agent_age: Duration,
32 pub enable_self_correction: bool,
34 pub improvement_threshold: f64,
36 pub reflect_every_n_tasks: usize,
38}
39
40impl Default for OrchestratorConfig {
41 fn default() -> Self {
42 Self {
43 max_depth: 3,
44 agents_per_level: 2,
45 enable_evolution: true,
46 mutation_rate: 0.1,
47 executor_config: ExecutorConfig::default(),
48 max_agent_age: Duration::from_secs(3600), enable_self_correction: false,
50 improvement_threshold: 0.02,
51 reflect_every_n_tasks: 5,
52 }
53 }
54}
55
56use vex_anchor::{AnchorBackend, AnchorMetadata, AnchorReceipt};
57
58#[derive(Debug)]
60pub struct OrchestrationResult {
61 pub root_agent_id: Uuid,
63 pub response: String,
65 pub merkle_root: Hash,
67 pub trace_root: Option<Hash>,
69 pub agent_results: HashMap<Uuid, ExecutionResult>,
71 pub anchor_receipts: Vec<AnchorReceipt>,
73 pub levels_processed: u8,
75 pub confidence: f64,
77}
78
79#[derive(Clone)]
81struct TrackedAgent {
82 agent: Agent,
83 _tenant_id: String,
84 created_at: Instant,
85}
86
87pub struct Orchestrator<L: LlmProvider> {
89 pub config: OrchestratorConfig,
91 agents: RwLock<HashMap<Uuid, TrackedAgent>>,
93 executor: AgentExecutor<L>,
95 anchors: Vec<Arc<dyn AnchorBackend>>,
97 #[allow(dead_code)]
99 llm: Arc<L>,
100 evolution_memory: Option<RwLock<vex_core::EvolutionMemory>>,
102 reflection_agent: Option<vex_adversarial::ReflectionAgent<L>>,
104 persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
106}
107
108impl<L: LlmProvider + 'static> Orchestrator<L> {
109 pub fn new(
111 llm: Arc<L>,
112 config: OrchestratorConfig,
113 persistence_layer: Option<Arc<dyn vex_persist::EvolutionStore>>,
114 gate: Arc<dyn crate::gate::Gate>,
115 ) -> Self {
116 let executor = AgentExecutor::new(llm.clone(), config.executor_config.clone(), gate);
117 let evolution_memory = if config.enable_self_correction {
118 Some(RwLock::new(vex_core::EvolutionMemory::new()))
119 } else {
120 None
121 };
122 let reflection_agent = if config.enable_self_correction {
123 Some(vex_adversarial::ReflectionAgent::new(llm.clone()))
124 } else {
125 None
126 };
127 Self {
128 config,
129 agents: RwLock::new(HashMap::new()),
130 executor,
131 anchors: Vec::new(),
132 llm,
133 evolution_memory,
134 reflection_agent,
135 persistence_layer,
136 }
137 }
138
139 pub fn add_anchor(&mut self, anchor: Arc<dyn AnchorBackend>) {
141 self.anchors.push(anchor);
142 }
143
144 pub async fn cleanup_expired(&self) -> usize {
147 let mut agents = self.agents.write().await;
148 let before = agents.len();
149 agents.retain(|_, tracked| tracked.created_at.elapsed() < self.config.max_agent_age);
150 let removed = before - agents.len();
151 if removed > 0 {
152 tracing::info!(
153 removed = removed,
154 remaining = agents.len(),
155 "Cleaned up expired agents"
156 );
157 }
158 removed
159 }
160
161 pub async fn agent_count(&self) -> usize {
163 self.agents.read().await.len()
164 }
165
166 pub async fn process(
168 &self,
169 tenant_id: &str,
170 query: &str,
171 capabilities: Vec<vex_llm::Capability>,
172 ) -> Result<OrchestrationResult, String> {
173 let root_config = AgentConfig {
175 name: "Root".to_string(),
176 role: "You are a strategic coordinator. Synthesize information from sub-agents into a coherent response.".to_string(),
177 max_depth: self.config.max_depth,
178 spawn_shadow: true,
179 };
180 let mut root = Agent::new(root_config);
181 let root_id = root.id;
182
183 let child_configs = vec![
185 AgentConfig {
186 name: "Researcher".to_string(),
187 role: "You are a thorough researcher. Analyze the query and provide detailed findings.".to_string(),
188 max_depth: 1,
189 spawn_shadow: true,
190 },
191 AgentConfig {
192 name: "Critic".to_string(),
193 role: "You are a critical analyzer. Identify potential issues, edge cases, and weaknesses.".to_string(),
194 max_depth: 1,
195 spawn_shadow: true,
196 },
197 ];
198
199 let mut execution_futures = Vec::new();
201 for config in child_configs.into_iter().take(self.config.agents_per_level) {
202 let mut child = root.spawn_child(config);
203 let executor = self.executor.clone();
204 let query_str = query.to_string();
205 let caps = capabilities.clone();
206
207 execution_futures.push(tokio::spawn(async move {
208 let result = executor.execute(&mut child, &query_str, caps).await;
209 (child.id, child, result)
210 }));
211 }
212
213 let task_results = futures::future::join_all(execution_futures).await;
214
215 let mut agents = self.agents.write().await;
217
218 let mut all_results: HashMap<Uuid, ExecutionResult> = HashMap::new();
219 let mut child_results = Vec::new();
220 for task_result in task_results {
221 let (child_id, child, result) = task_result.map_err(|e| e.to_string())?;
222 let execution_result: ExecutionResult = result?;
223
224 child_results.push((child_id, execution_result.clone()));
225 all_results.insert(child_id, execution_result);
226 agents.insert(
227 child_id,
228 TrackedAgent {
229 agent: child,
230 _tenant_id: tenant_id.to_string(),
231 created_at: Instant::now(),
232 },
233 );
234 }
235
236 drop(agents);
238
239 let synthesis_prompt = format!(
241 "Based on the following research from your sub-agents, provide a comprehensive answer:\n\n\
242 Original Query: \"{}\"\n\n\
243 Researcher's Findings: \"{}\"\n\n\
244 Critic's Analysis: \"{}\"\n\n\
245 Synthesize these into a final, well-reasoned response.",
246 query,
247 child_results.first().map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
248 child_results.get(1).map(|(_, r)| r.response.as_str()).unwrap_or("N/A"),
249 );
250
251 let root_result = self
252 .executor
253 .execute(&mut root, &synthesis_prompt, capabilities.clone())
254 .await?;
255 all_results.insert(root_id, root_result.clone());
256
257 let mut agents = self.agents.write().await;
259
260 agents.insert(
262 root_id,
263 TrackedAgent {
264 agent: root,
265 _tenant_id: tenant_id.to_string(),
266 created_at: Instant::now(),
267 },
268 );
269
270 let leaves: Vec<(String, Hash)> = all_results
272 .iter()
273 .map(|(id, r)| (id.to_string(), r.context.hash.clone()))
274 .collect();
275 let merkle_tree = MerkleTree::from_leaves(leaves);
276
277 let total_confidence: f64 = all_results.values().map(|r| r.confidence).sum();
279 let avg_confidence = total_confidence / all_results.len() as f64;
280
281 if self.config.enable_evolution {
283 if self.config.enable_self_correction {
284 self.evolve_agents_self_correcting(tenant_id, &mut agents, &all_results)
285 .await;
286 } else {
287 self.evolve_agents(tenant_id, &mut agents, &all_results);
288 }
289 }
290
291 let trace_leaves: Vec<(String, Hash)> = all_results
293 .iter()
294 .filter_map(|(id, r)| r.trace_root.clone().map(|tr| (id.to_string(), tr)))
295 .collect();
296 let trace_merkle = MerkleTree::from_leaves(trace_leaves);
297
298 let mut anchor_receipts = Vec::new();
300 if let Some(root_hash) = merkle_tree.root_hash() {
301 let metadata = AnchorMetadata::new(tenant_id, all_results.len() as u64);
302 for anchor in &self.anchors {
303 match anchor.anchor(root_hash, metadata.clone()).await {
304 Ok(receipt) => anchor_receipts.push(receipt),
305 Err(e) => tracing::warn!("Anchoring to {} failed: {}", anchor.name(), e),
306 }
307 }
308 }
309
310 Ok(OrchestrationResult {
311 root_agent_id: root_id,
312 response: root_result.response,
313 merkle_root: merkle_tree
314 .root_hash()
315 .cloned()
316 .unwrap_or(Hash::digest(b"empty")),
317 trace_root: trace_merkle.root_hash().cloned(),
318 agent_results: all_results,
319 anchor_receipts,
320 levels_processed: 2,
321 confidence: avg_confidence,
322 })
323 }
324
325 fn evolve_agents(
327 &self,
328 _tenant_id: &str,
329 agents: &mut HashMap<Uuid, TrackedAgent>,
330 results: &HashMap<Uuid, ExecutionResult>,
331 ) {
332 let operator = StandardOperator;
333
334 let population: Vec<(Genome, Fitness)> = agents
336 .values()
337 .map(|tracked| {
338 let fitness = results
339 .get(&tracked.agent.id)
340 .map(|r| r.confidence)
341 .unwrap_or(0.5);
342 (tracked.agent.genome.clone(), Fitness::new(fitness))
343 })
344 .collect();
345
346 if population.len() < 2 {
347 return;
348 }
349
350 let parent_a = tournament_select(&population, 2);
352 let parent_b = tournament_select(&population, 2);
353 let mut offspring = operator.crossover(parent_a, parent_b);
354 operator.mutate(&mut offspring, self.config.mutation_rate);
355
356 if let Some((worst_id, _worst_fitness)) = results.iter().min_by(|a, b| {
359 a.1.confidence
360 .partial_cmp(&b.1.confidence)
361 .unwrap_or(std::cmp::Ordering::Equal)
362 }) {
363 if let Some(tracked) = agents.get_mut(worst_id) {
364 let old_traits = tracked.agent.genome.traits.clone();
365 tracked.agent.apply_evolved_genome(offspring.clone());
366
367 tracing::info!(
368 agent_id = %worst_id,
369 old_traits = ?old_traits,
370 new_traits = ?offspring.traits,
371 "Evolved genome applied to least fit agent (Elitism preserved fittest)"
372 );
373 }
374 }
375 }
376
377 async fn evolve_agents_self_correcting(
387 &self,
388 tenant_id: &str,
389 agents: &mut HashMap<Uuid, TrackedAgent>,
390 results: &HashMap<Uuid, ExecutionResult>,
391 ) {
392 let memory = match &self.evolution_memory {
394 Some(mem) => mem,
395 None => {
396 tracing::warn!("Self-correction enabled but memory not initialized");
397 return self.evolve_agents(tenant_id, agents, results);
398 }
399 };
400
401 let experiments_to_save: Vec<vex_core::GenomeExperiment> = {
403 let mut memory_guard = memory.write().await;
404 let mut experiments = Vec::new();
405
406 for (id, result) in results {
407 if let Some(tracked) = agents.get(id) {
408 let mut fitness_scores = std::collections::HashMap::new();
409 fitness_scores.insert("confidence".to_string(), result.confidence);
410
411 let experiment = vex_core::GenomeExperiment::new(
412 &tracked.agent.genome,
413 fitness_scores,
414 result.confidence,
415 &format!("Depth {}", tracked.agent.depth),
416 );
417 memory_guard.record(experiment.clone());
418 experiments.push(experiment);
419 }
420 }
421 experiments
422 }; if let Some(store) = &self.persistence_layer {
426 for experiment in experiments_to_save {
427 if let Err(e) = store.save_experiment(tenant_id, &experiment).await {
428 tracing::warn!("Failed to persist evolution experiment: {}", e);
429 }
430 }
431 }
432
433 let best = results.iter().max_by(|a, b| {
435 a.1.confidence
436 .partial_cmp(&b.1.confidence)
437 .unwrap_or(std::cmp::Ordering::Equal)
438 });
439
440 if let Some((best_id, best_result)) = best {
441 if let Some(tracked) = agents.get_mut(best_id) {
442 let suggestions = if let Some(ref reflection) = self.reflection_agent {
444 let memory_guard = memory.read().await;
446
447 let reflection_result = reflection
448 .reflect(
449 &tracked.agent,
450 &format!("Orchestrated task at depth {}", tracked.agent.depth),
451 &best_result.response,
452 best_result.confidence,
453 &memory_guard,
454 )
455 .await;
456
457 drop(memory_guard);
458
459 reflection_result
461 .adjustments
462 .into_iter()
463 .map(|(name, current, suggested)| {
464 vex_core::TraitAdjustment {
465 trait_name: name,
466 current_value: current,
467 suggested_value: suggested,
468 correlation: 0.5, confidence: reflection_result.expected_improvement,
470 }
471 })
472 .collect()
473 } else {
474 let memory_guard = memory.read().await;
476 let suggestions = memory_guard.suggest_adjustments(&tracked.agent.genome);
477 drop(memory_guard);
478 suggestions
479 };
480
481 if !suggestions.is_empty() {
482 let old_traits = tracked.agent.genome.traits.clone();
483
484 for (i, name) in tracked.agent.genome.trait_names.iter().enumerate() {
486 if let Some(sugg) = suggestions.iter().find(|s| &s.trait_name == name) {
487 if sugg.confidence >= 0.3 {
488 tracked.agent.genome.traits[i] = sugg.suggested_value;
489 }
490 }
491 }
492
493 if old_traits != tracked.agent.genome.traits {
494 let source = if self.reflection_agent.is_some() {
495 "LLM + Statistical"
496 } else {
497 "Statistical"
498 };
499
500 tracing::info!(
501 agent_id = %best_id,
502 old_traits = ?old_traits,
503 new_traits = ?tracked.agent.genome.traits,
504 suggestions = suggestions.len(),
505 source = source,
506 "Self-correcting genome applied"
507 );
508 }
509 } else {
510 self.evolve_agents(tenant_id, agents, results);
512 }
513 }
514 }
515
516 self.maybe_consolidate_memory(tenant_id).await;
518 }
519
520 async fn maybe_consolidate_memory(&self, tenant_id: &str) {
522 let memory = match &self.evolution_memory {
523 Some(m) => m,
524 None => return,
525 };
526
527 let reflection = match &self.reflection_agent {
528 Some(r) => r,
529 None => return,
530 };
531
532 let (count, snapshot, batch_size) = {
535 let guard = memory.read().await;
536 if guard.len() >= 70 {
537 (guard.len(), guard.get_experiments_oldest(50), 50)
538 } else {
539 (0, Vec::new(), 0)
540 }
541 };
542
543 if count >= 70 {
544 tracing::info!(
545 "Consolidating evolution memory ({} items, batch execution)...",
546 batch_size
547 );
548
549 let consolidation_result = reflection.consolidate_memory(&snapshot).await;
551
552 let success = consolidation_result.is_ok();
553
554 match consolidation_result {
555 Ok(rules) => {
556 if !rules.is_empty() {
557 if let Some(store) = &self.persistence_layer {
559 for rule in &rules {
560 if let Err(e) = store.save_rule(tenant_id, rule).await {
561 tracing::warn!("Failed to save optimization rule: {}", e);
562 }
563 }
564 }
565
566 tracing::info!(
567 "Consolidated memory into {} rules. Draining batch.",
568 rules.len()
569 );
570 } else {
571 tracing::info!(
572 "Consolidation completed with no patterns found. Draining batch."
573 );
574 }
575 }
576 Err(e) => {
577 tracing::warn!("Consolidation failed: {}", e);
578 }
579 }
580
581 let mut guard = memory.write().await;
583
584 if success {
586 guard.drain_oldest(batch_size);
587 }
588
589 if guard.len() > 100 {
591 let excess = guard.len() - 100;
592 tracing::warn!(
593 "Memory overflow ({} > 100). Evicting {} oldest items.",
594 guard.len(),
595 excess
596 );
597 guard.drain_oldest(excess);
598 }
599 }
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use async_trait::async_trait;
607
608 #[derive(Debug)]
609 struct MockLlm;
610
611 #[async_trait]
612 impl vex_llm::LlmProvider for MockLlm {
613 fn name(&self) -> &str {
614 "MockLLM"
615 }
616
617 async fn is_available(&self) -> bool {
618 true
619 }
620
621 async fn complete(
622 &self,
623 request: vex_llm::LlmRequest,
624 ) -> Result<vex_llm::LlmResponse, vex_llm::LlmError> {
625 let content = if request.system.contains("researcher") {
626 "Research finding: This is a detailed analysis of the topic.".to_string()
627 } else if request.system.contains("critic") {
628 "Critical analysis: The main concern is validation of assumptions.".to_string()
629 } else if request.prompt.is_empty() {
630 "Mock response".to_string()
631 } else {
632 "Synthesized response combining all findings into a coherent answer.".to_string()
633 };
634
635 Ok(vex_llm::LlmResponse {
636 content,
637 model: "mock".to_string(),
638 tokens_used: Some(10),
639 latency_ms: 10,
640 trace_root: None,
641 })
642 }
643 }
644
645 #[tokio::test]
646 async fn test_orchestrator() {
647 let llm = Arc::new(MockLlm);
648 let gate = Arc::new(crate::gate::GenericGateMock);
649 let mut config = OrchestratorConfig::default();
650 config.executor_config.enable_adversarial = false;
651 let orchestrator = Orchestrator::new(llm, config, None, gate);
652
653 let result = orchestrator
654 .process("test-tenant", "What is the meaning of life?", vec![])
655 .await
656 .unwrap();
657
658 assert!(!result.response.is_empty());
659 assert!(result.confidence > 0.0);
660 assert!(!result.agent_results.is_empty());
661 }
662}