1use std::time::Duration;
4
5use tokio::runtime::Handle;
6
7use std::path::PathBuf;
8
9use swarm_engine_core::actions::ActionDef;
10use swarm_engine_core::agent::{
11 BatchInvoker, DefaultBatchManagerAgent, GenericWorker, ManagementStrategy, ManagerAgent,
12 ManagerId, WorkerAgent,
13};
14use swarm_engine_core::environment::EnvironmentBox;
15use swarm_engine_core::events::{ActionEventPublisher, JsonlWriter, LearningEventChannel};
16use swarm_engine_core::exploration::{LearnedDependencyProvider, NodeRules, OperatorProvider};
17use swarm_engine_core::extensions::Extensions;
18use swarm_engine_core::learn::{LearningSnapshot, LearningStore, OfflineModel, SnapshotMetadata};
19use swarm_engine_core::orchestrator::{OrchestratorBuilder, SwarmConfig};
20use swarm_engine_core::types::SwarmTask;
21
22use crate::environments::{
23 CodeEnvironment, DeepSearchEnvironment, InternalDiagnosisEnvironment, MazeEnvironment,
24 SearchEnvironment, TroubleshootingEnvironment,
25};
26
27use crate::aggregator::Aggregator;
28use crate::error::Result;
29use crate::metrics::RunMetrics;
30use crate::reporter::{ConfigSummary, EvalReport, SeedInfo};
31use crate::run::{EvalRun, TerminationReason};
32use crate::scenario::conditions::{ConditionValue, TimeoutBehavior};
33use crate::scenario::{EvalScenario, ManagementStrategyConfig};
34
35#[derive(Debug, Clone, Copy)]
39pub struct EvalSeed(pub u64);
40
41pub type ManagerFactory = Box<dyn Fn() -> Box<dyn ManagerAgent> + Send + Sync>;
43
44pub type BatchInvokerFactory = Box<dyn Fn() -> Box<dyn BatchInvoker> + Send + Sync>;
46
47pub type OperatorProviderFactory =
49 Box<dyn Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync>;
50
51pub struct EvalRunner {
66 scenario: EvalScenario,
67 runtime: Handle,
68 runs: usize,
69 seed: u64,
70 task: Option<SwarmTask>,
72 manager_factory: Option<ManagerFactory>,
74 batch_invoker_factory: Option<BatchInvokerFactory>,
76 extensions_factory: Option<Box<dyn Fn() -> Extensions + Send + Sync>>,
78 operator_provider_factory: Option<OperatorProviderFactory>,
80 verbose: bool,
82 enable_exploration: bool,
84 dependency_graph: Option<swarm_engine_core::exploration::DependencyGraph>,
86 action_events_path: Option<PathBuf>,
88 learning_store: Option<LearningStore>,
90 prior_snapshot: Option<LearningSnapshot>,
92 offline_model: Option<OfflineModel>,
94}
95
96impl EvalRunner {
97 pub fn new(scenario: EvalScenario, runtime: Handle) -> Self {
98 Self {
99 scenario,
100 runtime,
101 runs: 1,
102 seed: 42,
103 task: None,
104 manager_factory: None,
105 batch_invoker_factory: None,
106 extensions_factory: None,
107 operator_provider_factory: None,
108 verbose: false,
109 enable_exploration: false,
110 dependency_graph: None,
111 action_events_path: None,
112 learning_store: None,
113 prior_snapshot: None,
114 offline_model: None,
115 }
116 }
117
118 pub fn with_verbose(mut self, verbose: bool) -> Self {
120 self.verbose = verbose;
121 self
122 }
123
124 pub fn with_exploration(mut self, enable: bool) -> Self {
126 self.enable_exploration = enable;
127 self
128 }
129
130 pub fn with_dependency_graph(
135 mut self,
136 graph: swarm_engine_core::exploration::DependencyGraph,
137 ) -> Self {
138 self.dependency_graph = Some(graph);
139 self
140 }
141
142 pub fn with_action_events_path(mut self, path: impl Into<PathBuf>) -> Self {
154 self.action_events_path = Some(path.into());
155 self
156 }
157
158 pub fn with_learning_store(mut self, path: impl AsRef<std::path::Path>) -> Self {
169 match LearningStore::new(path) {
170 Ok(store) => {
171 let scenario_key = self.scenario.meta.id.learning_key();
174 self.prior_snapshot = store.load_scenario(&scenario_key).ok();
175 self.offline_model = store.load_offline_model(&scenario_key).ok();
177 if let Some(ref model) = self.offline_model {
178 println!(
179 "Offline model loaded: ucb1_c={:.3}, strategy={}",
180 model.parameters.ucb1_c, model.strategy_config.initial_strategy
181 );
182 }
183 self.learning_store = Some(store);
184
185 LearningEventChannel::global().enable();
187 }
188 Err(e) => {
189 eprintln!("Warning: Failed to create LearningStore: {}", e);
190 }
191 }
192 self
193 }
194
195 pub fn with_runs(mut self, runs: usize) -> Self {
196 self.runs = runs;
197 self
198 }
199
200 pub fn with_seed(mut self, seed: u64) -> Self {
201 self.seed = seed;
202 self
203 }
204
205 pub fn with_task(mut self, task: SwarmTask) -> Self {
207 self.task = Some(task);
208 self
209 }
210
211 pub fn with_manager_factory<F>(mut self, factory: F) -> Self
213 where
214 F: Fn() -> Box<dyn ManagerAgent> + Send + Sync + 'static,
215 {
216 self.manager_factory = Some(Box::new(factory));
217 self
218 }
219
220 pub fn with_batch_invoker_factory<F>(mut self, factory: F) -> Self
222 where
223 F: Fn() -> Box<dyn BatchInvoker> + Send + Sync + 'static,
224 {
225 self.batch_invoker_factory = Some(Box::new(factory));
226 self
227 }
228
229 pub fn with_extensions_factory<F>(mut self, factory: F) -> Self
231 where
232 F: Fn() -> Extensions + Send + Sync + 'static,
233 {
234 self.extensions_factory = Some(Box::new(factory));
235 self
236 }
237
238 pub fn with_operator_provider_factory<F>(mut self, factory: F) -> Self
256 where
257 F: Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync + 'static,
258 {
259 self.operator_provider_factory = Some(Box::new(factory));
260 self
261 }
262
263 pub fn run(&self) -> Result<EvalReport> {
264 let mut eval_runs = Vec::with_capacity(self.runs);
265 let mut run_seeds = Vec::with_capacity(self.runs);
266 let mut first_dependency_graph = None;
267
268 for i in 0..self.runs {
269 let run_seed = self.seed.wrapping_add(i as u64);
270 run_seeds.push(run_seed);
271
272 let (result, dep_graph) = self.run_single(i, run_seed)?;
273 eval_runs.push(result);
274
275 if i == 0 {
277 first_dependency_graph = dep_graph;
278 }
279 }
280
281 let aggregated = Aggregator::aggregate(&eval_runs);
282
283 if let Some(ref store) = self.learning_store {
285 let scenario_key = self.scenario.meta.id.learning_key();
286 match store.run_offline_learning(&scenario_key, 20) {
287 Ok(mut model) => {
288 tracing::info!(
289 scenario = %scenario_key,
290 sessions = model.analyzed_sessions,
291 ucb1_c = model.parameters.ucb1_c,
292 initial_strategy = %model.strategy_config.initial_strategy,
293 "Offline learning completed"
294 );
295
296 if let Some(ref graph) = first_dependency_graph {
298 if graph.has_action_order() {
299 use swarm_engine_core::learn::LearnedActionOrder;
300
301 let action_order = LearnedActionOrder::new(
302 graph.discover_order().to_vec(),
303 graph.not_discover_order().to_vec(),
304 graph.available_actions(),
305 );
306 model.action_order = Some(action_order.clone());
307
308 if let Err(e) = store.save_offline_model(&scenario_key, &model) {
309 tracing::warn!("Failed to save action_order: {}", e);
310 } else {
311 println!(
312 "Action order saved: discover={:?}, not_discover={:?}",
313 action_order.discover, action_order.not_discover
314 );
315 }
316 }
317 }
318 }
319 Err(e) => {
320 tracing::debug!("Offline learning skipped: {}", e);
321 }
322 }
323 }
324
325 Ok(EvalReport {
326 config_summary: ConfigSummary {
327 scenario_name: self.scenario.meta.name.clone(),
328 scenario_id: self.scenario.meta.id.to_string(),
329 worker_count: self.scenario.agents.workers.iter().map(|w| w.count).sum(),
330 max_ticks: self.scenario.app_config.max_ticks,
331 run_count: self.runs,
332 },
333 seed_info: SeedInfo {
334 base_seed: self.seed,
335 run_seeds,
336 },
337 runs: eval_runs,
338 aggregated,
339 assertion_results: vec![],
340 })
341 }
342
343 fn run_single(
344 &self,
345 index: usize,
346 seed: u64,
347 ) -> Result<(
348 EvalRun,
349 Option<swarm_engine_core::exploration::DependencyGraph>,
350 )> {
351 let workers = self.build_workers();
352
353 let management_strategy = self.build_management_strategy();
355
356 let config = SwarmConfig {
357 tick_duration: Duration::from_millis(self.scenario.app_config.tick_duration_ms),
358 max_ticks: self.scenario.app_config.max_ticks,
359 management_strategy,
360 };
361
362 let mut builder = OrchestratorBuilder::new().config(config);
364
365 for worker in workers {
367 builder = builder.add_worker_boxed(worker);
368 }
369
370 if let Some(factory) = &self.manager_factory {
372 let manager = factory();
373 builder = builder.manager(DynManagerWrapper(manager));
374 } else {
375 let managers = self.build_managers();
377 for manager in managers {
378 builder = builder.manager(manager);
379 }
380 }
381
382 if let Some(factory) = &self.batch_invoker_factory {
384 let invoker = factory();
385 builder = builder.batch_invoker(DynBatchInvokerWrapper(invoker));
386 }
387
388 let extensions = self.build_extensions_from_scenario(seed);
390 builder = builder.extensions(extensions);
391
392 if self.enable_exploration || self.scenario.app_config.enable_exploration {
394 builder = builder.with_exploration();
395 }
396
397 if self.operator_provider_factory.is_none() {
399 if let Some(ref model) = self.offline_model {
400 builder = builder.with_offline_model(model.clone());
401 println!(
402 "Offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
403 model.parameters.ucb1_c,
404 model.strategy_config.maturity_threshold,
405 model.strategy_config.initial_strategy
406 );
407 }
408 }
409
410 if let Some(ref model) = self.offline_model {
413 if let Some(ref action_order) = model.action_order {
414 let provider = LearnedDependencyProvider::new(action_order.clone());
415 builder = builder.dependency_provider(provider);
416 println!(
417 "Learned action order applied: discover={:?}, not_discover={:?}",
418 action_order.discover, action_order.not_discover
419 );
420 }
421 }
422
423 if let Some(factory) = &self.operator_provider_factory {
425 let provider = factory();
426 builder = builder.operator_provider(DynOperatorProviderWrapper(provider));
427 }
428
429 let jsonl_handle = if let Some(base_path) = &self.action_events_path {
431 let path = if self.runs > 1 {
433 let stem = base_path.file_stem().unwrap_or_default().to_string_lossy();
434 let ext = base_path.extension().unwrap_or_default().to_string_lossy();
435 let parent = base_path.parent().unwrap_or(std::path::Path::new("."));
436 parent.join(format!("{}_run{}.{}", stem, index, ext))
437 } else {
438 base_path.clone()
439 };
440
441 let (publisher, _rx) = ActionEventPublisher::new(1024);
442 let jsonl_rx = publisher.subscribe();
443 let writer = JsonlWriter::new(jsonl_rx, path);
444
445 let handle = self.runtime.spawn(async move {
447 if let Err(e) = writer.run().await {
448 eprintln!("JsonlWriter error: {}", e);
449 }
450 });
451
452 builder = builder.action_collector(publisher);
453 Some(handle)
454 } else {
455 None
456 };
457
458 let mut orchestrator = builder.build(self.runtime.clone());
459
460 let manager_count = self
462 .scenario
463 .agents
464 .managers
465 .iter()
466 .map(|t| t.count)
467 .sum::<usize>();
468 if manager_count > 1 {
469 orchestrator.enable_partitioning();
470 }
471
472 let task_to_run = self
474 .task
475 .clone()
476 .or_else(|| self.build_task_from_scenario());
477
478 let result = if let Some(task) = task_to_run {
481 orchestrator.run_task(task)?
482 } else {
483 orchestrator.run()
484 };
485
486 let state = orchestrator.state();
487 let timed_out = result.total_ticks >= self.scenario.app_config.max_ticks;
488 let environment_done = state.shared.is_environment_done();
489 let total_actions = state.shared.stats.total_visits() as u64;
490 let successful_actions = state.shared.stats.total_successes() as u64;
491 let llm_invocations = state.shared.llm_invocations();
492 let llm_invoke_errors = state.shared.llm_errors();
493
494 let metrics = RunMetrics {
495 task: crate::metrics::TaskMetrics {
496 total_ticks: result.total_ticks,
497 total_tasks: 0,
498 completed_tasks: 0,
499 total_actions,
500 successful_actions,
501 success_rate: state.shared.stats.success_rate(),
502 },
503 coordination: crate::metrics::CoordinationMetrics {
504 manager_activations: llm_invocations,
506 manager_intervention_rate: if result.total_ticks > 0 {
507 llm_invocations as f64 / result.total_ticks as f64
508 } else {
509 0.0
510 },
511 ..Default::default()
512 },
513 performance: {
514 let llm_error_rate = if llm_invocations > 0 {
515 llm_invoke_errors as f64 / llm_invocations as f64
516 } else {
517 0.0
518 };
519 crate::metrics::PerformanceMetrics {
520 total_duration_ms: result.total_duration.as_millis() as f64,
521 avg_tick_latency_ms: if result.total_ticks > 0 {
522 result.total_duration.as_millis() as f64 / result.total_ticks as f64
523 } else {
524 0.0
525 },
526 raw_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
527 total_actions as f64 / result.total_duration.as_secs_f64()
528 } else {
529 0.0
530 },
531 effective_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
532 successful_actions as f64 / result.total_duration.as_secs_f64()
533 } else {
534 0.0
535 },
536 llm_invocations,
537 llm_invoke_errors,
538 llm_error_rate,
539 ..Default::default()
540 }
541 },
542 robustness: Default::default(),
543 };
544
545 let (success, termination_reason) = if !result.completed {
547 (false, TerminationReason::Stopped)
548 } else {
549 self.evaluate_conditions(&metrics, environment_done, timed_out)
550 };
551
552 if let Some(ref store) = self.learning_store {
554 let scenario_key = self.scenario.meta.id.learning_key();
557 let metadata = SnapshotMetadata::default()
558 .with_scenario(&scenario_key)
559 .with_task(&self.scenario.task.goal);
560
561 let learn_stats_opt = orchestrator.learned_provider().and_then(|p| p.stats());
563
564 let snapshot = if let Some(learn_stats) = learn_stats_opt {
565 LearningSnapshot {
566 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
567 metadata,
568 episode_transitions: learn_stats.episode_transitions.clone(),
569 ngram_stats: learn_stats.ngram_stats.clone(),
570 selection_performance: learn_stats.selection_performance.clone(),
571 contextual_stats: learn_stats
572 .contextual_stats
573 .iter()
574 .map(|(k, v)| {
575 (
576 k.clone(),
577 swarm_engine_core::online_stats::ActionStats {
578 visits: v.visits,
579 successes: v.successes,
580 failures: v.failures,
581 ..Default::default()
582 },
583 )
584 })
585 .collect(),
586 action_stats: state
587 .shared
588 .stats
589 .all_action_stats()
590 .map(|(k, v)| (k.clone(), v.clone()))
591 .collect(),
592 }
593 } else {
594 LearningSnapshot {
596 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
597 metadata,
598 episode_transitions: Default::default(),
599 ngram_stats: Default::default(),
600 selection_performance: Default::default(),
601 contextual_stats: Default::default(),
602 action_stats: state
603 .shared
604 .stats
605 .all_action_stats()
606 .map(|(k, v)| (k.clone(), v.clone()))
607 .collect(),
608 }
609 };
610
611 if let Err(e) = store.save_session(&scenario_key, &snapshot) {
612 eprintln!("Warning: Failed to save learning data: {}", e);
613 }
614 }
615
616 let dependency_graph = orchestrator.dependency_graph().cloned();
618
619 drop(orchestrator);
621
622 if let Some(handle) = jsonl_handle {
624 let _ = self.runtime.block_on(handle);
626 }
627
628 Ok((
629 EvalRun::new(index, seed, success, termination_reason, metrics),
630 dependency_graph,
631 ))
632 }
633
634 fn build_workers(&self) -> Vec<Box<dyn WorkerAgent>> {
635 let mut workers: Vec<Box<dyn WorkerAgent>> = Vec::new();
636
637 for template in &self.scenario.agents.workers {
638 for i in 0..template.count {
639 let id = workers.len();
640 let name = template.id_pattern.replace("{i}", &i.to_string());
641
642 let worker = GenericWorker::new(id)
643 .with_name(name)
644 .with_require_guidance(true);
645
646 workers.push(Box::new(worker));
647 }
648 }
649
650 workers
651 }
652
653 fn build_managers(&self) -> Vec<DefaultBatchManagerAgent> {
654 let mut managers = Vec::new();
655 let mut manager_index = 0;
656
657 for template in &self.scenario.agents.managers {
658 let ids = template.generate_ids();
659 for name in ids {
660 let manager = DefaultBatchManagerAgent::new(ManagerId(manager_index))
661 .with_name(name)
662 .with_interval(self.scenario.manager.process_interval_ticks);
663
664 managers.push(manager);
665 manager_index += 1;
666 }
667 }
668
669 if managers.is_empty() {
671 managers.push(
672 DefaultBatchManagerAgent::new(ManagerId(0))
673 .with_name("default_manager")
674 .with_interval(self.scenario.manager.process_interval_ticks),
675 );
676 }
677
678 managers
679 }
680
681 fn build_management_strategy(&self) -> ManagementStrategy {
682 match &self.scenario.app_config.management_strategy {
683 ManagementStrategyConfig::EveryTick {} => ManagementStrategy::EveryTick,
684 ManagementStrategyConfig::IntervalBased { max_interval } => {
685 ManagementStrategy::FixedInterval {
686 interval: *max_interval,
687 }
688 }
689 ManagementStrategyConfig::EventDriven { triggers: _ } => {
690 ManagementStrategy::CompletionBased { max_wait_ticks: 50 }
692 }
693 ManagementStrategyConfig::Hybrid {
694 max_interval,
695 triggers: _,
696 } => ManagementStrategy::Hybrid {
697 preferred_interval: *max_interval,
698 force_after_ticks: max_interval * 2,
699 },
700 ManagementStrategyConfig::Disabled {} => {
701 ManagementStrategy::FixedInterval { interval: u64::MAX }
703 }
704 }
705 }
706
707 fn build_task_from_scenario(&self) -> Option<SwarmTask> {
711 let task_config = &self.scenario.task;
712
713 if task_config.goal.is_empty() {
714 return None;
715 }
716
717 let mut context = serde_json::Map::new();
719
720 if let Some(target_path) = &task_config.context.target_path {
721 context.insert(
722 "target_path".to_string(),
723 serde_json::Value::String(target_path.clone()),
724 );
725 }
726 if let Some(working_dir) = &task_config.context.working_dir {
727 context.insert(
728 "working_dir".to_string(),
729 serde_json::Value::String(working_dir.clone()),
730 );
731 }
732 if let Some(max_depth) = task_config.context.max_depth {
733 context.insert(
734 "max_depth".to_string(),
735 serde_json::Value::Number(serde_json::Number::from(max_depth)),
736 );
737 }
738
739 for (key, value) in &task_config.context.extra {
741 if let Ok(json_value) = serde_json::to_value(value) {
742 context.insert(key.clone(), json_value);
743 }
744 }
745
746 let task =
747 SwarmTask::new(&task_config.goal).with_context(serde_json::Value::Object(context));
748
749 Some(task)
750 }
751
752 fn build_extensions_from_scenario(&self, seed: u64) -> Extensions {
754 let mut extensions = if let Some(factory) = &self.extensions_factory {
755 factory()
756 } else {
757 Extensions::new()
758 };
759
760 extensions.insert(EvalSeed(seed));
762
763 extensions.insert(self.scenario.llm.clone());
765
766 if let Some(ref lora) = self.scenario.llm.lora {
768 extensions.insert(lora.clone());
769 }
770
771 extensions.insert(self.scenario.manager.clone());
773
774 extensions.insert(self.scenario.batch_processor.clone());
776
777 let core_actions_config = self.scenario.actions.to_core_config();
779 extensions.insert(core_actions_config);
780
781 let env_type = self.scenario.environment.env_type.as_str();
783 let env_params = &self.scenario.environment.params;
784
785 let env_box: Option<EnvironmentBox> = match env_type {
786 "maze" => {
787 let map = env_params.get("map").and_then(|v| v.as_str()).unwrap_or("");
788 let worker_count = env_params
789 .get("worker_count")
790 .and_then(|v| v.as_u64())
791 .unwrap_or(1) as usize;
792 Some(Box::new(MazeEnvironment::from_str(map, worker_count)))
793 }
794 "code" => {
795 Some(Box::new(CodeEnvironment::auth_scenario()))
797 }
798 "troubleshooting" => {
799 let scenario_name = env_params
800 .get("scenario")
801 .and_then(|v| v.as_str())
802 .unwrap_or("memory_leak");
803 let env = match scenario_name {
804 "memory_leak" => TroubleshootingEnvironment::memory_leak_scenario(),
805 "cpu_spike" => TroubleshootingEnvironment::cpu_spike_scenario(),
806 "network_timeout" => TroubleshootingEnvironment::network_timeout_scenario(),
807 "medium" => TroubleshootingEnvironment::complex_scenario(15, 3, 2, seed),
808 "high" => TroubleshootingEnvironment::complex_scenario(30, 8, 3, seed),
809 "extreme" => TroubleshootingEnvironment::complex_scenario(50, 15, 4, seed),
810 "complex" => {
811 let total_services = env_params
812 .get("total_services")
813 .and_then(|v| v.as_u64())
814 .unwrap_or(15) as usize;
815 let noise_services = env_params
816 .get("noise_services")
817 .and_then(|v| v.as_u64())
818 .unwrap_or(3) as usize;
819 let cascade_depth = env_params
820 .get("cascade_depth")
821 .and_then(|v| v.as_u64())
822 .unwrap_or(2) as usize;
823 TroubleshootingEnvironment::complex_scenario(
824 total_services,
825 noise_services,
826 cascade_depth,
827 seed,
828 )
829 }
830 _ => TroubleshootingEnvironment::memory_leak_scenario(),
831 };
832 Some(Box::new(env))
833 }
834 "search" => {
835 let scenario_name = env_params
836 .get("scenario")
837 .and_then(|v| v.as_str())
838 .unwrap_or("basic");
839 let env = match scenario_name {
840 "basic" => SearchEnvironment::basic_scenario(),
841 "medium" => SearchEnvironment::medium_scenario(),
842 "large" => SearchEnvironment::large_scenario(),
843 "custom" => {
844 let file_count = env_params
845 .get("file_count")
846 .and_then(|v| v.as_u64())
847 .unwrap_or(5) as usize;
848 let target_index = env_params
849 .get("target_index")
850 .and_then(|v| v.as_u64())
851 .unwrap_or(2) as usize;
852 SearchEnvironment::custom_scenario(file_count, target_index, seed)
853 }
854 _ => SearchEnvironment::basic_scenario(),
855 };
856 Some(Box::new(env))
857 }
858 "internal_diagnosis" => {
859 let scenario_name = env_params
860 .get("scenario")
861 .and_then(|v| v.as_str())
862 .unwrap_or("routing");
863 let env = match scenario_name {
864 "routing" => InternalDiagnosisEnvironment::routing_error_scenario(),
865 "failover" => InternalDiagnosisEnvironment::failover_error_scenario(),
866 "worker_pool" => InternalDiagnosisEnvironment::worker_pool_scenario(),
867 "strategy" => InternalDiagnosisEnvironment::strategy_mismatch_scenario(),
868 "exploration" => InternalDiagnosisEnvironment::exploration_depth_scenario(),
869 "complex" => InternalDiagnosisEnvironment::complex_scenario(seed),
870 _ => InternalDiagnosisEnvironment::routing_error_scenario(),
871 };
872 Some(Box::new(env))
873 }
874 "deep_search" => {
875 let scenario_name = env_params
876 .get("scenario")
877 .and_then(|v| v.as_str())
878 .unwrap_or("tech_question");
879 let env = match scenario_name {
880 "tech_question" | _ => DeepSearchEnvironment::tech_question_scenario(),
881 };
882 Some(Box::new(env))
883 }
884 _ => None, };
886
887 if let Some(env) = env_box {
888 extensions.insert(env);
889 }
890
891 let graph = self.dependency_graph.clone().or_else(|| {
893 self.scenario.dependency_graph.as_ref().and_then(|cfg| {
894 let action_names = self.scenario.actions.action_names();
895 cfg.to_core_graph(&action_names)
896 })
897 });
898 if let Some(g) = graph {
899 extensions.insert(g);
900 }
901
902 if let Some(ref prior) = self.prior_snapshot {
905 extensions.insert(prior.clone());
906 }
907
908 extensions
909 }
910
911 fn evaluate_conditions(
915 &self,
916 metrics: &RunMetrics,
917 environment_done: bool,
918 timed_out: bool,
919 ) -> (bool, TerminationReason) {
920 let conditions = &self.scenario.conditions;
921
922 for condition in &conditions.failure {
924 if let Some(actual) =
925 self.get_metric_value(&condition.metric, metrics, environment_done)
926 {
927 if condition.evaluate(&actual) {
928 return (false, TerminationReason::Failure);
929 }
930 }
931 }
932
933 if timed_out {
935 return match conditions.on_timeout {
936 TimeoutBehavior::Fail => (false, TerminationReason::Timeout),
937 TimeoutBehavior::PartialSuccess => {
938 let success = self.check_success_conditions(metrics, environment_done);
940 (success, TerminationReason::Timeout)
941 }
942 TimeoutBehavior::MilestoneScore => {
943 (false, TerminationReason::Timeout)
945 }
946 };
947 }
948
949 let success = self.check_success_conditions(metrics, environment_done);
951 if success {
952 (true, TerminationReason::Success)
953 } else {
954 (false, TerminationReason::Stopped)
957 }
958 }
959
960 fn check_success_conditions(&self, metrics: &RunMetrics, environment_done: bool) -> bool {
962 let conditions = &self.scenario.conditions;
963
964 if conditions.success.is_empty() {
966 return true;
967 }
968
969 conditions.success.iter().all(|condition| {
971 self.get_metric_value(&condition.metric, metrics, environment_done)
972 .map(|actual| condition.evaluate(&actual))
973 .unwrap_or(false)
974 })
975 }
976
977 fn get_metric_value(
979 &self,
980 path: &str,
981 metrics: &RunMetrics,
982 environment_done: bool,
983 ) -> Option<ConditionValue> {
984 match path {
985 "environment.done" => Some(ConditionValue::Bool(environment_done)),
987
988 "task.total_ticks" | "total_ticks" => {
990 Some(ConditionValue::Integer(metrics.task.total_ticks as i64))
991 }
992 "task.success_rate" | "success_rate" => {
993 Some(ConditionValue::Float(metrics.task.success_rate))
994 }
995 "task.total_actions" | "total_actions" => {
996 Some(ConditionValue::Integer(metrics.task.total_actions as i64))
997 }
998 "task.successful_actions" | "successful_actions" => Some(ConditionValue::Integer(
999 metrics.task.successful_actions as i64,
1000 )),
1001
1002 "performance.llm_error_rate" | "llm_error_rate" => {
1004 Some(ConditionValue::Float(metrics.performance.llm_error_rate))
1005 }
1006 "performance.llm_invocations" | "llm_invocations" => Some(ConditionValue::Integer(
1007 metrics.performance.llm_invocations as i64,
1008 )),
1009
1010 "coordination.manager_activations" | "manager_activations" => Some(
1012 ConditionValue::Integer(metrics.coordination.manager_activations as i64),
1013 ),
1014
1015 "errors.count" => {
1017 let failed = metrics
1018 .task
1019 .total_actions
1020 .saturating_sub(metrics.task.successful_actions);
1021 Some(ConditionValue::Integer(failed as i64))
1022 }
1023
1024 _ => None,
1026 }
1027 }
1028}
1029
1030struct DynManagerWrapper(Box<dyn ManagerAgent>);
1032
1033impl ManagerAgent for DynManagerWrapper {
1034 fn prepare(
1035 &self,
1036 context: &swarm_engine_core::agent::TaskContext,
1037 ) -> swarm_engine_core::agent::BatchDecisionRequest {
1038 self.0.prepare(context)
1039 }
1040
1041 fn finalize(
1042 &self,
1043 context: &swarm_engine_core::agent::TaskContext,
1044 responses: Vec<(
1045 swarm_engine_core::types::WorkerId,
1046 swarm_engine_core::agent::DecisionResponse,
1047 )>,
1048 ) -> swarm_engine_core::agent::ManagementDecision {
1049 self.0.finalize(context, responses)
1050 }
1051
1052 fn id(&self) -> swarm_engine_core::agent::ManagerId {
1053 self.0.id()
1054 }
1055
1056 fn name(&self) -> &str {
1057 self.0.name()
1058 }
1059}
1060
1061struct DynBatchInvokerWrapper(Box<dyn BatchInvoker>);
1063
1064impl BatchInvoker for DynBatchInvokerWrapper {
1065 fn invoke(
1066 &self,
1067 request: swarm_engine_core::agent::BatchDecisionRequest,
1068 extensions: &swarm_engine_core::extensions::Extensions,
1069 ) -> swarm_engine_core::agent::BatchInvokeResult {
1070 self.0.invoke(request, extensions)
1071 }
1072
1073 fn plan_dependencies(
1074 &self,
1075 task: &str,
1076 actions: &[ActionDef],
1077 ) -> Option<swarm_engine_core::exploration::DependencyGraph> {
1078 self.0.plan_dependencies(task, actions)
1079 }
1080
1081 fn name(&self) -> &str {
1082 self.0.name()
1083 }
1084}
1085
1086struct DynOperatorProviderWrapper(Box<dyn OperatorProvider<NodeRules>>);
1088
1089impl OperatorProvider<NodeRules> for DynOperatorProviderWrapper {
1090 fn provide(
1091 &self,
1092 rules: NodeRules,
1093 context: Option<
1094 &swarm_engine_core::exploration::ProviderContext<
1095 '_,
1096 swarm_engine_core::exploration::ActionNodeData,
1097 String,
1098 swarm_engine_core::exploration::MapNodeState,
1099 >,
1100 >,
1101 ) -> swarm_engine_core::exploration::ConfigurableOperator<NodeRules> {
1102 self.0.provide(rules, context)
1103 }
1104
1105 fn reevaluate(
1106 &self,
1107 operator: &mut swarm_engine_core::exploration::ConfigurableOperator<NodeRules>,
1108 ctx: &swarm_engine_core::exploration::ProviderContext<
1109 '_,
1110 swarm_engine_core::exploration::ActionNodeData,
1111 String,
1112 swarm_engine_core::exploration::MapNodeState,
1113 >,
1114 ) {
1115 self.0.reevaluate(operator, ctx)
1116 }
1117
1118 fn name(&self) -> &str {
1119 self.0.name()
1120 }
1121}