1use std::time::Duration;
4
5use tokio::runtime::Handle;
6
7use swarm_engine_core::actions::ActionDef;
8use swarm_engine_core::agent::{
9 BatchInvoker, DefaultBatchManagerAgent, GenericWorker, ManagementStrategy, ManagerAgent,
10 ManagerId, WorkerAgent,
11};
12use swarm_engine_core::environment::EnvironmentBox;
13use swarm_engine_core::events::TraceSubscriber;
14use swarm_engine_core::exploration::{
15 LearnedDependencyProvider, NodeRules, OperatorProvider, SelectResult,
16};
17use swarm_engine_core::extensions::Extensions;
18use swarm_engine_core::learn::{
19 profile_to_offline_model, CountTrigger, LearnableSwarmBuilder, LearningStore, OfflineModel,
20 ScenarioProfile, TrainTrigger,
21};
22use swarm_engine_core::orchestrator::SwarmConfig;
23use swarm_engine_core::types::{GroupId, SwarmTask};
24
25use crate::environments::{
26 CodeEnvironment, DeepSearchEnvironment, InternalDiagnosisEnvironment, MazeEnvironment,
27 SearchEnvironment, TroubleshootingEnvironment,
28};
29
30use crate::aggregator::Aggregator;
31use crate::config::DependencyProviderKind;
32use crate::error::Result;
33use crate::metrics::RunMetrics;
34use crate::reporter::{ConfigSummary, EvalReport, SeedInfo};
35use crate::run::{EvalRun, TerminationReason};
36use crate::scenario::conditions::{ConditionValue, TimeoutBehavior};
37use crate::scenario::{EvalScenario, ManagementStrategyConfig};
38use crate::validation::{ScenarioValidator, WarningSeverity};
39
40#[derive(Debug, Clone, Copy)]
44pub struct EvalSeed(pub u64);
45
46pub type ManagerFactory = Box<dyn Fn() -> Box<dyn ManagerAgent> + Send + Sync>;
48
49pub type BatchInvokerFactory = Box<dyn Fn() -> Box<dyn BatchInvoker> + Send + Sync>;
51
52pub type OperatorProviderFactory =
54 Box<dyn Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync>;
55
56pub struct EvalRunner {
71 scenario: EvalScenario,
72 runtime: Handle,
73 runs: usize,
74 seed: u64,
75 task: Option<SwarmTask>,
77 manager_factory: Option<ManagerFactory>,
79 batch_invoker_factory: Option<BatchInvokerFactory>,
81 extensions_factory: Option<Box<dyn Fn() -> Extensions + Send + Sync>>,
83 operator_provider_factory: Option<OperatorProviderFactory>,
85 verbose: bool,
87 enable_exploration: bool,
89 dependency_graph: Option<swarm_engine_core::exploration::DependencyGraph>,
91 learning_store: Option<LearningStore>,
93 train_trigger: Option<std::sync::Arc<dyn TrainTrigger>>,
95 skip_learned_action_order: bool,
97 trace_subscriber: Option<std::sync::Arc<dyn TraceSubscriber>>,
99 scenario_profile: Option<ScenarioProfile>,
101 offline_model_from_profile: Option<OfflineModel>,
103 dependency_provider_kind: DependencyProviderKind,
105}
106
107impl EvalRunner {
108 pub fn new(scenario: EvalScenario, runtime: Handle) -> Self {
109 Self {
110 scenario,
111 runtime,
112 runs: 1,
113 seed: 42,
114 task: None,
115 manager_factory: None,
116 batch_invoker_factory: None,
117 extensions_factory: None,
118 operator_provider_factory: None,
119 verbose: false,
120 enable_exploration: false,
121 dependency_graph: None,
122 learning_store: None,
123 train_trigger: None,
124 skip_learned_action_order: false,
125 trace_subscriber: None,
126 scenario_profile: None,
127 offline_model_from_profile: None,
128 dependency_provider_kind: DependencyProviderKind::default(),
129 }
130 }
131
132 pub fn with_dependency_provider_kind(mut self, kind: DependencyProviderKind) -> Self {
134 self.dependency_provider_kind = kind;
135 self
136 }
137
138 fn create_dependency_provider(
140 &self,
141 action_order: &swarm_engine_core::learn::LearnedActionOrder,
142 ) -> Box<dyn swarm_engine_core::exploration::DependencyGraphProvider> {
143 match self.dependency_provider_kind {
146 DependencyProviderKind::Learned | DependencyProviderKind::Smart => {
147 Box::new(LearnedDependencyProvider::new(action_order.clone()))
148 }
149 }
150 }
151
152 pub fn with_verbose(mut self, verbose: bool) -> Self {
154 self.verbose = verbose;
155 self
156 }
157
158 pub fn with_exploration(mut self, enable: bool) -> Self {
160 self.enable_exploration = enable;
161 self
162 }
163
164 pub fn with_dependency_graph(
169 mut self,
170 graph: swarm_engine_core::exploration::DependencyGraph,
171 ) -> Self {
172 self.dependency_graph = Some(graph);
173 self
174 }
175
176 pub fn with_learning_store(mut self, path: impl AsRef<std::path::Path>) -> Self {
187 match LearningStore::new(path) {
188 Ok(store) => {
189 self.learning_store = Some(store);
191 }
192 Err(e) => {
193 eprintln!("Warning: Failed to create LearningStore: {}", e);
194 }
195 }
196 self
197 }
198
199 pub fn with_train_trigger(mut self, trigger: std::sync::Arc<dyn TrainTrigger>) -> Self {
213 self.train_trigger = Some(trigger);
214 self
215 }
216
217 pub fn skip_learned_action_order(mut self, skip: bool) -> Self {
222 self.skip_learned_action_order = skip;
223 self
224 }
225
226 pub fn with_trace_subscriber(
244 mut self,
245 subscriber: std::sync::Arc<dyn TraceSubscriber>,
246 ) -> Self {
247 self.trace_subscriber = Some(subscriber);
248 self
249 }
250
251 pub fn with_scenario_profile(mut self, profile: ScenarioProfile) -> Self {
265 let offline_model = profile_to_offline_model(&profile);
266 self.offline_model_from_profile = Some(offline_model);
267 self.scenario_profile = Some(profile);
268 self
269 }
270
271 pub fn with_runs(mut self, runs: usize) -> Self {
272 self.runs = runs;
273 self
274 }
275
276 pub fn with_seed(mut self, seed: u64) -> Self {
277 self.seed = seed;
278 self
279 }
280
281 pub fn with_task(mut self, task: SwarmTask) -> Self {
283 self.task = Some(task);
284 self
285 }
286
287 pub fn with_manager_factory<F>(mut self, factory: F) -> Self
289 where
290 F: Fn() -> Box<dyn ManagerAgent> + Send + Sync + 'static,
291 {
292 self.manager_factory = Some(Box::new(factory));
293 self
294 }
295
296 pub fn with_batch_invoker_factory<F>(mut self, factory: F) -> Self
298 where
299 F: Fn() -> Box<dyn BatchInvoker> + Send + Sync + 'static,
300 {
301 self.batch_invoker_factory = Some(Box::new(factory));
302 self
303 }
304
305 pub fn with_extensions_factory<F>(mut self, factory: F) -> Self
307 where
308 F: Fn() -> Extensions + Send + Sync + 'static,
309 {
310 self.extensions_factory = Some(Box::new(factory));
311 self
312 }
313
314 pub fn with_operator_provider_factory<F>(mut self, factory: F) -> Self
332 where
333 F: Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync + 'static,
334 {
335 self.operator_provider_factory = Some(Box::new(factory));
336 self
337 }
338
339 pub fn run(&self) -> Result<EvalReport> {
340 let warnings = ScenarioValidator::validate_scenario(&self.scenario);
342 for warning in &warnings {
343 match warning.severity() {
344 WarningSeverity::High => {
345 tracing::warn!(
346 severity = %warning.severity(),
347 "Scenario validation: {}",
348 warning
349 );
350 }
351 WarningSeverity::Medium => {
352 tracing::info!(
353 severity = %warning.severity(),
354 "Scenario validation: {}",
355 warning
356 );
357 }
358 _ => {
359 tracing::debug!(
360 severity = %warning.severity(),
361 "Scenario validation: {}",
362 warning
363 );
364 }
365 }
366 }
367
368 let mut eval_runs = Vec::with_capacity(self.runs);
369 let mut run_seeds = Vec::with_capacity(self.runs);
370
371 let group_id = GroupId::new();
374
375 for i in 0..self.runs {
376 let run_seed = self.seed.wrapping_add(i as u64);
377 run_seeds.push(run_seed);
378
379 let result = self.run_single(i, run_seed, group_id)?;
380 eval_runs.push(result);
381 }
382
383 let aggregated = Aggregator::aggregate(&eval_runs);
384
385 Ok(EvalReport {
389 config_summary: ConfigSummary {
390 scenario_name: self.scenario.meta.name.clone(),
391 scenario_id: self.scenario.meta.id.to_string(),
392 worker_count: self.scenario.agents.workers.iter().map(|w| w.count).sum(),
393 max_ticks: self.scenario.app_config.max_ticks,
394 run_count: self.runs,
395 },
396 seed_info: SeedInfo {
397 base_seed: self.seed,
398 run_seeds,
399 },
400 runs: eval_runs,
401 aggregated,
402 assertion_results: vec![],
403 })
404 }
405
406 fn run_single(&self, index: usize, seed: u64, group_id: GroupId) -> Result<EvalRun> {
414 let workers = self.build_workers();
418 let management_strategy = self.build_management_strategy();
419
420 let swarm_config = SwarmConfig {
421 tick_duration: Duration::from_millis(self.scenario.app_config.tick_duration_ms),
422 max_ticks: self.scenario.app_config.max_ticks,
423 management_strategy,
424 };
425
426 let extensions = self.build_extensions_from_scenario(seed);
428
429 let scenario_key = self.scenario.meta.id.learning_key();
431 let mut builder = LearnableSwarmBuilder::new(self.runtime.clone())
432 .scenario(&scenario_key)
433 .swarm_config(swarm_config)
434 .workers(workers)
435 .extensions(extensions)
436 .enable_exploration(
437 self.enable_exploration || self.scenario.app_config.enable_exploration,
438 );
439
440 if let Some(factory) = &self.manager_factory {
442 let manager = factory();
443 builder = builder.add_manager(Box::new(DynManagerWrapper(manager)));
444 } else {
445 let managers = self.build_managers();
446 for manager in managers {
447 builder = builder.add_manager(Box::new(manager));
448 }
449 }
450
451 if let Some(factory) = &self.batch_invoker_factory {
453 let invoker = factory();
454 builder = builder.batch_invoker(Box::new(DynBatchInvokerWrapper(invoker)));
455 }
456
457 if let Some(factory) = &self.operator_provider_factory {
459 let provider = factory();
460 builder = builder.operator_provider(Box::new(DynOperatorProviderWrapper(provider)));
461 }
462
463 if let Some(ref model) = self.offline_model_from_profile {
465 builder = builder.offline_model(model.clone());
466
467 if self.operator_provider_factory.is_none() {
469 println!(
470 "Profile offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
471 model.parameters.ucb1_c,
472 model.strategy_config.maturity_threshold,
473 model.strategy_config.initial_strategy
474 );
475 }
476
477 if !self.skip_learned_action_order {
479 if let Some(ref action_order) = model.action_order {
480 let provider = self.create_dependency_provider(action_order);
481 builder = builder.dependency_provider(provider);
482 println!(
483 "Learned action order applied ({:?}): discover={:?}, not_discover={:?}",
484 self.dependency_provider_kind,
485 action_order.discover,
486 action_order.not_discover
487 );
488 }
489 } else if model.action_order.is_some() {
490 println!("Learned action order skipped (--no-dep-graph)");
491 }
492 }
493
494 if let Some(ref store) = self.learning_store {
497 builder = builder.with_learning_store(store.clone());
498
499 if self.offline_model_from_profile.is_none() {
501 let offline_model_opt = builder.offline_model_ref().cloned();
503 if let Some(ref model) = offline_model_opt {
504 if self.operator_provider_factory.is_none() {
506 println!(
507 "Offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
508 model.parameters.ucb1_c,
509 model.strategy_config.maturity_threshold,
510 model.strategy_config.initial_strategy
511 );
512 }
513
514 if !self.skip_learned_action_order {
516 if let Some(ref action_order) = model.action_order {
517 let provider = self.create_dependency_provider(action_order);
518 builder = builder.dependency_provider(provider);
519 println!(
520 "Learned action order applied ({:?}): discover={:?}, not_discover={:?}",
521 self.dependency_provider_kind,
522 action_order.discover,
523 action_order.not_discover
524 );
525 }
526 } else if model.action_order.is_some() {
527 println!("Learned action order skipped (--no-dep-graph)");
528 }
529 }
530 }
531
532 if let Some(ref trigger) = self.train_trigger {
534 builder = builder.train_trigger(std::sync::Arc::clone(trigger));
535 } else {
536 builder = builder.train_trigger(std::sync::Arc::new(CountTrigger::new(self.runs)));
537 }
538 }
539
540 if let Some(ref subscriber) = self.trace_subscriber {
542 builder = builder.with_trace_subscriber(std::sync::Arc::clone(subscriber));
543 }
544
545 let mut swarm = builder.build()?;
549
550 let manager_count = self
552 .scenario
553 .agents
554 .managers
555 .iter()
556 .map(|t| t.count)
557 .sum::<usize>();
558 if manager_count > 1 {
559 swarm.orchestrator_mut().enable_partitioning();
560 }
561
562 let task_to_run = self
564 .task
565 .clone()
566 .or_else(|| self.build_task_from_scenario())
567 .map(|task| task.with_group_id(group_id));
568
569 let result = if let Some(task) = task_to_run {
571 swarm.run_task(task)?
572 } else {
573 swarm.run()
574 };
575
576 let state = swarm.orchestrator().state();
580 let timed_out = result.total_ticks >= self.scenario.app_config.max_ticks;
581 let environment_done = state.shared.is_environment_done();
582 let total_actions = state.shared.stats.total_visits() as u64;
583 let successful_actions = state.shared.stats.total_successes() as u64;
584 let llm_invocations = state.shared.llm_invocations();
585 let llm_invoke_errors = state.shared.llm_errors();
586
587 let metrics = RunMetrics {
588 task: crate::metrics::TaskMetrics {
589 total_ticks: result.total_ticks,
590 total_tasks: 0,
591 completed_tasks: 0,
592 total_actions,
593 successful_actions,
594 success_rate: state.shared.stats.success_rate(),
595 },
596 coordination: crate::metrics::CoordinationMetrics {
597 manager_activations: llm_invocations,
598 manager_intervention_rate: if result.total_ticks > 0 {
599 llm_invocations as f64 / result.total_ticks as f64
600 } else {
601 0.0
602 },
603 ..Default::default()
604 },
605 performance: {
606 let llm_error_rate = if llm_invocations > 0 {
607 llm_invoke_errors as f64 / llm_invocations as f64
608 } else {
609 0.0
610 };
611 crate::metrics::PerformanceMetrics {
612 total_duration_ms: result.total_duration.as_millis() as f64,
613 avg_tick_latency_ms: if result.total_ticks > 0 {
614 result.total_duration.as_millis() as f64 / result.total_ticks as f64
615 } else {
616 0.0
617 },
618 raw_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
619 total_actions as f64 / result.total_duration.as_secs_f64()
620 } else {
621 0.0
622 },
623 effective_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
624 successful_actions as f64 / result.total_duration.as_secs_f64()
625 } else {
626 0.0
627 },
628 llm_invocations,
629 llm_invoke_errors,
630 llm_error_rate,
631 ..Default::default()
632 }
633 },
634 robustness: Default::default(),
635 };
636
637 let (success, termination_reason) = if !result.completed {
639 (false, TerminationReason::Stopped)
640 } else {
641 self.evaluate_conditions(&metrics, environment_done, timed_out)
642 };
643
644 if swarm.is_learning_enabled() {
648 swarm.emit_stats_snapshot();
650
651 std::thread::sleep(std::time::Duration::from_millis(150));
654
655 if let Some(tx) = swarm.take_shutdown_tx() {
656 let _ = tx.try_send(());
658 }
659 }
660
661 Ok(EvalRun::new(
662 index,
663 seed,
664 success,
665 termination_reason,
666 metrics,
667 ))
668 }
669
670 fn build_workers(&self) -> Vec<Box<dyn WorkerAgent>> {
671 let mut workers: Vec<Box<dyn WorkerAgent>> = Vec::new();
672
673 for template in &self.scenario.agents.workers {
674 for i in 0..template.count {
675 let id = workers.len();
676 let name = template.id_pattern.replace("{i}", &i.to_string());
677
678 let worker = GenericWorker::new(id)
679 .with_name(name)
680 .with_require_guidance(true);
681
682 workers.push(Box::new(worker));
683 }
684 }
685
686 workers
687 }
688
689 fn build_managers(&self) -> Vec<DefaultBatchManagerAgent> {
690 let mut managers = Vec::new();
691 let mut manager_index = 0;
692
693 for template in &self.scenario.agents.managers {
694 let ids = template.generate_ids();
695 for name in ids {
696 let manager = DefaultBatchManagerAgent::new(ManagerId(manager_index))
697 .with_name(name)
698 .with_interval(self.scenario.manager.process_interval_ticks);
699
700 managers.push(manager);
701 manager_index += 1;
702 }
703 }
704
705 if managers.is_empty() {
707 managers.push(
708 DefaultBatchManagerAgent::new(ManagerId(0))
709 .with_name("default_manager")
710 .with_interval(self.scenario.manager.process_interval_ticks),
711 );
712 }
713
714 managers
715 }
716
717 fn build_management_strategy(&self) -> ManagementStrategy {
718 match &self.scenario.app_config.management_strategy {
719 ManagementStrategyConfig::EveryTick {} => ManagementStrategy::EveryTick,
720 ManagementStrategyConfig::IntervalBased { max_interval } => {
721 ManagementStrategy::FixedInterval {
722 interval: *max_interval,
723 }
724 }
725 ManagementStrategyConfig::EventDriven { triggers: _ } => {
726 ManagementStrategy::CompletionBased { max_wait_ticks: 50 }
728 }
729 ManagementStrategyConfig::Hybrid {
730 max_interval,
731 triggers: _,
732 } => ManagementStrategy::Hybrid {
733 preferred_interval: *max_interval,
734 force_after_ticks: max_interval * 2,
735 },
736 ManagementStrategyConfig::Disabled {} => {
737 ManagementStrategy::FixedInterval { interval: u64::MAX }
739 }
740 }
741 }
742
743 fn build_task_from_scenario(&self) -> Option<SwarmTask> {
747 let task_config = &self.scenario.task;
748
749 if task_config.goal.is_empty() {
750 return None;
751 }
752
753 let mut context = serde_json::Map::new();
755
756 if let Some(target_path) = &task_config.context.target_path {
757 context.insert(
758 "target_path".to_string(),
759 serde_json::Value::String(target_path.clone()),
760 );
761 }
762 if let Some(working_dir) = &task_config.context.working_dir {
763 context.insert(
764 "working_dir".to_string(),
765 serde_json::Value::String(working_dir.clone()),
766 );
767 }
768 if let Some(max_depth) = task_config.context.max_depth {
769 context.insert(
770 "max_depth".to_string(),
771 serde_json::Value::Number(serde_json::Number::from(max_depth)),
772 );
773 }
774
775 for (key, value) in &task_config.context.extra {
777 if let Ok(json_value) = serde_json::to_value(value) {
778 context.insert(key.clone(), json_value);
779 }
780 }
781
782 let task =
783 SwarmTask::new(&task_config.goal).with_context(serde_json::Value::Object(context));
784
785 Some(task)
786 }
787
788 fn build_extensions_from_scenario(&self, seed: u64) -> Extensions {
790 let mut extensions = if let Some(factory) = &self.extensions_factory {
791 factory()
792 } else {
793 Extensions::new()
794 };
795
796 extensions.insert(EvalSeed(seed));
798
799 extensions.insert(self.scenario.llm.clone());
801
802 if let Some(ref lora) = self.scenario.llm.lora {
804 extensions.insert(lora.clone());
805 }
806
807 extensions.insert(self.scenario.manager.clone());
809
810 extensions.insert(self.scenario.batch_processor.clone());
812
813 let core_actions_config = self.scenario.actions.to_core_config();
815 extensions.insert(core_actions_config);
816
817 let env_type = self.scenario.environment.env_type.as_str();
819 let env_params = &self.scenario.environment.params;
820
821 let env_box: Option<EnvironmentBox> = match env_type {
822 "maze" => {
823 let map = env_params.get("map").and_then(|v| v.as_str()).unwrap_or("");
824 let worker_count = env_params
825 .get("worker_count")
826 .and_then(|v| v.as_u64())
827 .unwrap_or(1) as usize;
828 Some(Box::new(MazeEnvironment::from_str(map, worker_count)))
829 }
830 "code" => {
831 Some(Box::new(CodeEnvironment::auth_scenario()))
833 }
834 "troubleshooting" => {
835 let scenario_name = env_params
836 .get("scenario")
837 .and_then(|v| v.as_str())
838 .unwrap_or("memory_leak");
839 let env = match scenario_name {
840 "memory_leak" => TroubleshootingEnvironment::memory_leak_scenario(),
841 "cpu_spike" => TroubleshootingEnvironment::cpu_spike_scenario(),
842 "network_timeout" => TroubleshootingEnvironment::network_timeout_scenario(),
843 "medium" => TroubleshootingEnvironment::complex_scenario(15, 3, 2, seed),
844 "high" => TroubleshootingEnvironment::complex_scenario(30, 8, 3, seed),
845 "extreme" => TroubleshootingEnvironment::complex_scenario(50, 15, 4, seed),
846 "complex" => {
847 let total_services = env_params
848 .get("total_services")
849 .and_then(|v| v.as_u64())
850 .unwrap_or(15) as usize;
851 let noise_services = env_params
852 .get("noise_services")
853 .and_then(|v| v.as_u64())
854 .unwrap_or(3) as usize;
855 let cascade_depth = env_params
856 .get("cascade_depth")
857 .and_then(|v| v.as_u64())
858 .unwrap_or(2) as usize;
859 TroubleshootingEnvironment::complex_scenario(
860 total_services,
861 noise_services,
862 cascade_depth,
863 seed,
864 )
865 }
866 _ => TroubleshootingEnvironment::memory_leak_scenario(),
867 };
868 Some(Box::new(env))
869 }
870 "search" => {
871 let scenario_name = env_params
872 .get("scenario")
873 .and_then(|v| v.as_str())
874 .unwrap_or("basic");
875 let env = match scenario_name {
876 "basic" => SearchEnvironment::basic_scenario(),
877 "medium" => SearchEnvironment::medium_scenario(),
878 "large" => SearchEnvironment::large_scenario(),
879 "custom" => {
880 let file_count = env_params
881 .get("file_count")
882 .and_then(|v| v.as_u64())
883 .unwrap_or(5) as usize;
884 let target_index = env_params
885 .get("target_index")
886 .and_then(|v| v.as_u64())
887 .unwrap_or(2) as usize;
888 SearchEnvironment::custom_scenario(file_count, target_index, seed)
889 }
890 _ => SearchEnvironment::basic_scenario(),
891 };
892 Some(Box::new(env))
893 }
894 "internal_diagnosis" => {
895 let scenario_name = env_params
896 .get("scenario")
897 .and_then(|v| v.as_str())
898 .unwrap_or("routing");
899 let env = match scenario_name {
900 "routing" => InternalDiagnosisEnvironment::routing_error_scenario(),
901 "failover" => InternalDiagnosisEnvironment::failover_error_scenario(),
902 "worker_pool" => InternalDiagnosisEnvironment::worker_pool_scenario(),
903 "strategy" => InternalDiagnosisEnvironment::strategy_mismatch_scenario(),
904 "exploration" => InternalDiagnosisEnvironment::exploration_depth_scenario(),
905 "complex" => InternalDiagnosisEnvironment::complex_scenario(seed),
906 _ => InternalDiagnosisEnvironment::routing_error_scenario(),
907 };
908 Some(Box::new(env))
909 }
910 "deep_search" => {
911 let _scenario_name = env_params
913 .get("scenario")
914 .and_then(|v| v.as_str())
915 .unwrap_or("tech_question");
916 let env = DeepSearchEnvironment::tech_question_scenario();
917 Some(Box::new(env))
918 }
919 "default" | "realworld" => {
921 use swarm_engine_core::environment::DefaultEnvironment;
922 let working_dir = env_params
923 .get("working_dir")
924 .and_then(|v| v.as_str())
925 .map(std::path::PathBuf::from);
926 let env = if let Some(dir) = working_dir {
927 DefaultEnvironment::with_working_dir(dir)
928 } else {
929 DefaultEnvironment::new()
930 };
931 Some(Box::new(env))
932 }
933 _ => None, };
935
936 if let Some(env) = env_box {
937 extensions.insert(env);
938 }
939
940 let graph = self.dependency_graph.clone().or_else(|| {
942 self.scenario.dependency_graph.as_ref().and_then(|cfg| {
943 let action_names = self.scenario.actions.action_names();
944 cfg.to_core_graph(&action_names)
945 })
946 });
947 if let Some(g) = graph {
948 extensions.insert(g);
949 }
950
951 extensions
954 }
955
956 fn evaluate_conditions(
960 &self,
961 metrics: &RunMetrics,
962 environment_done: bool,
963 timed_out: bool,
964 ) -> (bool, TerminationReason) {
965 let conditions = &self.scenario.conditions;
966
967 for condition in &conditions.failure {
969 if let Some(actual) =
970 self.get_metric_value(&condition.metric, metrics, environment_done)
971 {
972 if condition.evaluate(&actual) {
973 return (false, TerminationReason::Failure);
974 }
975 }
976 }
977
978 if timed_out {
980 return match conditions.on_timeout {
981 TimeoutBehavior::Fail => (false, TerminationReason::Timeout),
982 TimeoutBehavior::PartialSuccess => {
983 let success = self.check_success_conditions(metrics, environment_done);
985 (success, TerminationReason::Timeout)
986 }
987 TimeoutBehavior::MilestoneScore => {
988 (false, TerminationReason::Timeout)
990 }
991 };
992 }
993
994 let success = self.check_success_conditions(metrics, environment_done);
996 if success {
997 (true, TerminationReason::Success)
998 } else {
999 (false, TerminationReason::Stopped)
1002 }
1003 }
1004
1005 fn check_success_conditions(&self, metrics: &RunMetrics, environment_done: bool) -> bool {
1007 let conditions = &self.scenario.conditions;
1008
1009 if conditions.success.is_empty() {
1011 return true;
1012 }
1013
1014 conditions.success.iter().all(|condition| {
1016 self.get_metric_value(&condition.metric, metrics, environment_done)
1017 .map(|actual| condition.evaluate(&actual))
1018 .unwrap_or(false)
1019 })
1020 }
1021
1022 fn get_metric_value(
1024 &self,
1025 path: &str,
1026 metrics: &RunMetrics,
1027 environment_done: bool,
1028 ) -> Option<ConditionValue> {
1029 match path {
1030 "environment.done" => Some(ConditionValue::Bool(environment_done)),
1032
1033 "task.total_ticks" | "total_ticks" => {
1035 Some(ConditionValue::Integer(metrics.task.total_ticks as i64))
1036 }
1037 "task.success_rate" | "success_rate" => {
1038 Some(ConditionValue::Float(metrics.task.success_rate))
1039 }
1040 "task.total_actions" | "total_actions" => {
1041 Some(ConditionValue::Integer(metrics.task.total_actions as i64))
1042 }
1043 "task.successful_actions" | "successful_actions" => Some(ConditionValue::Integer(
1044 metrics.task.successful_actions as i64,
1045 )),
1046
1047 "performance.llm_error_rate" | "llm_error_rate" => {
1049 Some(ConditionValue::Float(metrics.performance.llm_error_rate))
1050 }
1051 "performance.llm_invocations" | "llm_invocations" => Some(ConditionValue::Integer(
1052 metrics.performance.llm_invocations as i64,
1053 )),
1054
1055 "coordination.manager_activations" | "manager_activations" => Some(
1057 ConditionValue::Integer(metrics.coordination.manager_activations as i64),
1058 ),
1059
1060 "errors.count" => {
1062 let failed = metrics
1063 .task
1064 .total_actions
1065 .saturating_sub(metrics.task.successful_actions);
1066 Some(ConditionValue::Integer(failed as i64))
1067 }
1068
1069 _ => None,
1071 }
1072 }
1073}
1074
1075struct DynManagerWrapper(Box<dyn ManagerAgent>);
1077
1078impl ManagerAgent for DynManagerWrapper {
1079 fn prepare(
1080 &self,
1081 context: &swarm_engine_core::agent::TaskContext,
1082 ) -> swarm_engine_core::agent::BatchDecisionRequest {
1083 self.0.prepare(context)
1084 }
1085
1086 fn finalize(
1087 &self,
1088 context: &swarm_engine_core::agent::TaskContext,
1089 responses: Vec<(
1090 swarm_engine_core::types::WorkerId,
1091 swarm_engine_core::agent::DecisionResponse,
1092 )>,
1093 ) -> swarm_engine_core::agent::ManagementDecision {
1094 self.0.finalize(context, responses)
1095 }
1096
1097 fn id(&self) -> swarm_engine_core::agent::ManagerId {
1098 self.0.id()
1099 }
1100
1101 fn name(&self) -> &str {
1102 self.0.name()
1103 }
1104}
1105
1106struct DynBatchInvokerWrapper(Box<dyn BatchInvoker>);
1108
1109impl BatchInvoker for DynBatchInvokerWrapper {
1110 fn invoke(
1111 &self,
1112 request: swarm_engine_core::agent::BatchDecisionRequest,
1113 extensions: &swarm_engine_core::extensions::Extensions,
1114 ) -> swarm_engine_core::agent::BatchInvokeResult {
1115 self.0.invoke(request, extensions)
1116 }
1117
1118 fn plan_dependencies(
1119 &self,
1120 task: &str,
1121 actions: &[ActionDef],
1122 hint: Option<&SelectResult>,
1123 ) -> Option<swarm_engine_core::exploration::DependencyGraph> {
1124 self.0.plan_dependencies(task, actions, hint)
1125 }
1126
1127 fn name(&self) -> &str {
1128 self.0.name()
1129 }
1130}
1131
1132struct DynOperatorProviderWrapper(Box<dyn OperatorProvider<NodeRules>>);
1134
1135impl OperatorProvider<NodeRules> for DynOperatorProviderWrapper {
1136 fn provide(
1137 &self,
1138 rules: NodeRules,
1139 context: Option<
1140 &swarm_engine_core::exploration::ProviderContext<
1141 '_,
1142 swarm_engine_core::exploration::ActionNodeData,
1143 String,
1144 swarm_engine_core::exploration::MapNodeState,
1145 >,
1146 >,
1147 ) -> swarm_engine_core::exploration::ConfigurableOperator<NodeRules> {
1148 self.0.provide(rules, context)
1149 }
1150
1151 fn reevaluate(
1152 &self,
1153 operator: &mut swarm_engine_core::exploration::ConfigurableOperator<NodeRules>,
1154 ctx: &swarm_engine_core::exploration::ProviderContext<
1155 '_,
1156 swarm_engine_core::exploration::ActionNodeData,
1157 String,
1158 swarm_engine_core::exploration::MapNodeState,
1159 >,
1160 ) {
1161 self.0.reevaluate(operator, ctx)
1162 }
1163
1164 fn name(&self) -> &str {
1165 self.0.name()
1166 }
1167}