1use std::time::Duration;
4
5use tokio::runtime::Handle;
6
7use std::path::PathBuf;
8
9use swarm_engine_core::actions::ActionDef;
10use swarm_engine_core::agent::{
11 BatchInvoker, DefaultBatchManagerAgent, GenericWorker, ManagementStrategy, ManagerAgent,
12 ManagerId, WorkerAgent,
13};
14use swarm_engine_core::environment::EnvironmentBox;
15use swarm_engine_core::events::{ActionEventPublisher, JsonlWriter};
16use swarm_engine_core::exploration::{NodeRules, OperatorProvider};
17use swarm_engine_core::extensions::Extensions;
18use swarm_engine_core::learn::{LearningSnapshot, LearningStore, OfflineModel, SnapshotMetadata};
19use swarm_engine_core::orchestrator::{OrchestratorBuilder, SwarmConfig};
20use swarm_engine_core::types::SwarmTask;
21
22use crate::environments::{
23 CodeEnvironment, InternalDiagnosisEnvironment, MazeEnvironment, SearchEnvironment,
24 TroubleshootingEnvironment,
25};
26
27use crate::aggregator::Aggregator;
28use crate::error::Result;
29use crate::metrics::RunMetrics;
30use crate::reporter::{ConfigSummary, EvalReport, SeedInfo};
31use crate::run::{EvalRun, TerminationReason};
32use crate::scenario::conditions::{ConditionValue, TimeoutBehavior};
33use crate::scenario::{EvalScenario, ManagementStrategyConfig};
34
35#[derive(Debug, Clone, Copy)]
39pub struct EvalSeed(pub u64);
40
41pub type ManagerFactory = Box<dyn Fn() -> Box<dyn ManagerAgent> + Send + Sync>;
43
44pub type BatchInvokerFactory = Box<dyn Fn() -> Box<dyn BatchInvoker> + Send + Sync>;
46
47pub type OperatorProviderFactory =
49 Box<dyn Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync>;
50
51pub struct EvalRunner {
66 scenario: EvalScenario,
67 runtime: Handle,
68 runs: usize,
69 seed: u64,
70 task: Option<SwarmTask>,
72 manager_factory: Option<ManagerFactory>,
74 batch_invoker_factory: Option<BatchInvokerFactory>,
76 extensions_factory: Option<Box<dyn Fn() -> Extensions + Send + Sync>>,
78 operator_provider_factory: Option<OperatorProviderFactory>,
80 verbose: bool,
82 enable_exploration: bool,
84 dependency_graph: Option<swarm_engine_core::exploration::DependencyGraph>,
86 action_events_path: Option<PathBuf>,
88 learning_store: Option<LearningStore>,
90 prior_snapshot: Option<LearningSnapshot>,
92 offline_model: Option<OfflineModel>,
94}
95
96impl EvalRunner {
97 pub fn new(scenario: EvalScenario, runtime: Handle) -> Self {
98 Self {
99 scenario,
100 runtime,
101 runs: 1,
102 seed: 42,
103 task: None,
104 manager_factory: None,
105 batch_invoker_factory: None,
106 extensions_factory: None,
107 operator_provider_factory: None,
108 verbose: false,
109 enable_exploration: false,
110 dependency_graph: None,
111 action_events_path: None,
112 learning_store: None,
113 prior_snapshot: None,
114 offline_model: None,
115 }
116 }
117
118 pub fn with_verbose(mut self, verbose: bool) -> Self {
120 self.verbose = verbose;
121 self
122 }
123
124 pub fn with_exploration(mut self, enable: bool) -> Self {
126 self.enable_exploration = enable;
127 self
128 }
129
130 pub fn with_dependency_graph(
135 mut self,
136 graph: swarm_engine_core::exploration::DependencyGraph,
137 ) -> Self {
138 self.dependency_graph = Some(graph);
139 self
140 }
141
142 pub fn with_action_events_path(mut self, path: impl Into<PathBuf>) -> Self {
154 self.action_events_path = Some(path.into());
155 self
156 }
157
158 pub fn with_learning_store(mut self, path: impl AsRef<std::path::Path>) -> Self {
169 match LearningStore::new(path) {
170 Ok(store) => {
171 let scenario_key = self.scenario.meta.id.learning_key();
174 self.prior_snapshot = store.load_scenario(&scenario_key).ok();
175 self.offline_model = store.load_offline_model(&scenario_key).ok();
177 if let Some(ref model) = self.offline_model {
178 println!(
179 "Offline model loaded: ucb1_c={:.3}, strategy={}",
180 model.parameters.ucb1_c, model.strategy_config.initial_strategy
181 );
182 }
183 self.learning_store = Some(store);
184 }
185 Err(e) => {
186 eprintln!("Warning: Failed to create LearningStore: {}", e);
187 }
188 }
189 self
190 }
191
192 pub fn with_runs(mut self, runs: usize) -> Self {
193 self.runs = runs;
194 self
195 }
196
197 pub fn with_seed(mut self, seed: u64) -> Self {
198 self.seed = seed;
199 self
200 }
201
202 pub fn with_task(mut self, task: SwarmTask) -> Self {
204 self.task = Some(task);
205 self
206 }
207
208 pub fn with_manager_factory<F>(mut self, factory: F) -> Self
210 where
211 F: Fn() -> Box<dyn ManagerAgent> + Send + Sync + 'static,
212 {
213 self.manager_factory = Some(Box::new(factory));
214 self
215 }
216
217 pub fn with_batch_invoker_factory<F>(mut self, factory: F) -> Self
219 where
220 F: Fn() -> Box<dyn BatchInvoker> + Send + Sync + 'static,
221 {
222 self.batch_invoker_factory = Some(Box::new(factory));
223 self
224 }
225
226 pub fn with_extensions_factory<F>(mut self, factory: F) -> Self
228 where
229 F: Fn() -> Extensions + Send + Sync + 'static,
230 {
231 self.extensions_factory = Some(Box::new(factory));
232 self
233 }
234
235 pub fn with_operator_provider_factory<F>(mut self, factory: F) -> Self
253 where
254 F: Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync + 'static,
255 {
256 self.operator_provider_factory = Some(Box::new(factory));
257 self
258 }
259
260 pub fn run(&self) -> Result<EvalReport> {
261 let mut eval_runs = Vec::with_capacity(self.runs);
262 let mut run_seeds = Vec::with_capacity(self.runs);
263
264 for i in 0..self.runs {
265 let run_seed = self.seed.wrapping_add(i as u64);
266 run_seeds.push(run_seed);
267
268 let result = self.run_single(i, run_seed)?;
269 eval_runs.push(result);
270 }
271
272 let aggregated = Aggregator::aggregate(&eval_runs);
273
274 if let Some(ref store) = self.learning_store {
276 let scenario_key = self.scenario.meta.id.learning_key();
277 match store.run_offline_learning(&scenario_key, 20) {
278 Ok(model) => {
279 tracing::info!(
280 scenario = %scenario_key,
281 sessions = model.analyzed_sessions,
282 ucb1_c = model.parameters.ucb1_c,
283 initial_strategy = %model.strategy_config.initial_strategy,
284 "Offline learning completed"
285 );
286 }
287 Err(e) => {
288 tracing::debug!("Offline learning skipped: {}", e);
289 }
290 }
291 }
292
293 Ok(EvalReport {
294 config_summary: ConfigSummary {
295 scenario_name: self.scenario.meta.name.clone(),
296 scenario_id: self.scenario.meta.id.to_string(),
297 worker_count: self.scenario.agents.workers.iter().map(|w| w.count).sum(),
298 max_ticks: self.scenario.app_config.max_ticks,
299 run_count: self.runs,
300 },
301 seed_info: SeedInfo {
302 base_seed: self.seed,
303 run_seeds,
304 },
305 runs: eval_runs,
306 aggregated,
307 assertion_results: vec![],
308 })
309 }
310
311 fn run_single(&self, index: usize, seed: u64) -> Result<EvalRun> {
312 let workers = self.build_workers();
313
314 let management_strategy = self.build_management_strategy();
316
317 let config = SwarmConfig {
318 tick_duration: Duration::from_millis(self.scenario.app_config.tick_duration_ms),
319 max_ticks: self.scenario.app_config.max_ticks,
320 management_strategy,
321 };
322
323 let mut builder = OrchestratorBuilder::new().config(config);
325
326 for worker in workers {
328 builder = builder.add_worker_boxed(worker);
329 }
330
331 if let Some(factory) = &self.manager_factory {
333 let manager = factory();
334 builder = builder.manager(DynManagerWrapper(manager));
335 } else {
336 let managers = self.build_managers();
338 for manager in managers {
339 builder = builder.manager(manager);
340 }
341 }
342
343 if let Some(factory) = &self.batch_invoker_factory {
345 let invoker = factory();
346 builder = builder.batch_invoker(DynBatchInvokerWrapper(invoker));
347 }
348
349 let extensions = self.build_extensions_from_scenario(seed);
351 builder = builder.extensions(extensions);
352
353 if self.enable_exploration || self.scenario.app_config.enable_exploration {
355 builder = builder.with_exploration();
356 }
357
358 if self.operator_provider_factory.is_none() {
360 if let Some(ref model) = self.offline_model {
361 builder = builder.with_offline_model(model.clone());
362 println!(
363 "Offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
364 model.parameters.ucb1_c,
365 model.strategy_config.maturity_threshold,
366 model.strategy_config.initial_strategy
367 );
368 }
369 }
370
371 if let Some(factory) = &self.operator_provider_factory {
373 let provider = factory();
374 builder = builder.operator_provider(DynOperatorProviderWrapper(provider));
375 }
376
377 let jsonl_handle = if let Some(base_path) = &self.action_events_path {
379 let path = if self.runs > 1 {
381 let stem = base_path.file_stem().unwrap_or_default().to_string_lossy();
382 let ext = base_path.extension().unwrap_or_default().to_string_lossy();
383 let parent = base_path.parent().unwrap_or(std::path::Path::new("."));
384 parent.join(format!("{}_run{}.{}", stem, index, ext))
385 } else {
386 base_path.clone()
387 };
388
389 let (publisher, _rx) = ActionEventPublisher::new(1024);
390 let jsonl_rx = publisher.subscribe();
391 let writer = JsonlWriter::new(jsonl_rx, path);
392
393 let handle = self.runtime.spawn(async move {
395 if let Err(e) = writer.run().await {
396 eprintln!("JsonlWriter error: {}", e);
397 }
398 });
399
400 builder = builder.action_collector(publisher);
401 Some(handle)
402 } else {
403 None
404 };
405
406 let mut orchestrator = builder.build(self.runtime.clone());
407
408 let manager_count = self
410 .scenario
411 .agents
412 .managers
413 .iter()
414 .map(|t| t.count)
415 .sum::<usize>();
416 if manager_count > 1 {
417 orchestrator.enable_partitioning();
418 }
419
420 let task_to_run = self
422 .task
423 .clone()
424 .or_else(|| self.build_task_from_scenario());
425
426 let result = if let Some(task) = task_to_run {
429 orchestrator.run_task(task)?
430 } else {
431 orchestrator.run()
432 };
433
434 let state = orchestrator.state();
435 let timed_out = result.total_ticks >= self.scenario.app_config.max_ticks;
436 let environment_done = state.shared.is_environment_done();
437 let total_actions = state.shared.stats.total_visits() as u64;
438 let successful_actions = state.shared.stats.total_successes() as u64;
439 let llm_invocations = state.shared.llm_invocations();
440 let llm_invoke_errors = state.shared.llm_errors();
441
442 let metrics = RunMetrics {
443 task: crate::metrics::TaskMetrics {
444 total_ticks: result.total_ticks,
445 total_tasks: 0,
446 completed_tasks: 0,
447 total_actions,
448 successful_actions,
449 success_rate: state.shared.stats.success_rate(),
450 },
451 coordination: crate::metrics::CoordinationMetrics {
452 manager_activations: llm_invocations,
454 manager_intervention_rate: if result.total_ticks > 0 {
455 llm_invocations as f64 / result.total_ticks as f64
456 } else {
457 0.0
458 },
459 ..Default::default()
460 },
461 performance: {
462 let llm_error_rate = if llm_invocations > 0 {
463 llm_invoke_errors as f64 / llm_invocations as f64
464 } else {
465 0.0
466 };
467 crate::metrics::PerformanceMetrics {
468 total_duration_ms: result.total_duration.as_millis() as f64,
469 avg_tick_latency_ms: if result.total_ticks > 0 {
470 result.total_duration.as_millis() as f64 / result.total_ticks as f64
471 } else {
472 0.0
473 },
474 raw_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
475 total_actions as f64 / result.total_duration.as_secs_f64()
476 } else {
477 0.0
478 },
479 effective_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
480 successful_actions as f64 / result.total_duration.as_secs_f64()
481 } else {
482 0.0
483 },
484 llm_invocations,
485 llm_invoke_errors,
486 llm_error_rate,
487 ..Default::default()
488 }
489 },
490 robustness: Default::default(),
491 };
492
493 let (success, termination_reason) = if !result.completed {
495 (false, TerminationReason::Stopped)
496 } else {
497 self.evaluate_conditions(&metrics, environment_done, timed_out)
498 };
499
500 if let Some(ref store) = self.learning_store {
502 let scenario_key = self.scenario.meta.id.learning_key();
505 let metadata = SnapshotMetadata::default()
506 .with_scenario(&scenario_key)
507 .with_task(&self.scenario.task.goal);
508
509 let learn_stats_opt = orchestrator.learned_provider().and_then(|p| p.stats());
511
512 let snapshot = if let Some(learn_stats) = learn_stats_opt {
513 LearningSnapshot {
514 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
515 metadata,
516 episode_transitions: learn_stats.episode_transitions.clone(),
517 ngram_stats: learn_stats.ngram_stats.clone(),
518 selection_performance: learn_stats.selection_performance.clone(),
519 contextual_stats: learn_stats
520 .contextual_stats
521 .iter()
522 .map(|(k, v)| {
523 (
524 k.clone(),
525 swarm_engine_core::online_stats::ActionStats {
526 visits: v.visits,
527 successes: v.successes,
528 failures: v.failures,
529 ..Default::default()
530 },
531 )
532 })
533 .collect(),
534 action_stats: state
535 .shared
536 .stats
537 .all_action_stats()
538 .map(|(k, v)| (k.clone(), v.clone()))
539 .collect(),
540 }
541 } else {
542 LearningSnapshot {
544 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
545 metadata,
546 episode_transitions: Default::default(),
547 ngram_stats: Default::default(),
548 selection_performance: Default::default(),
549 contextual_stats: Default::default(),
550 action_stats: state
551 .shared
552 .stats
553 .all_action_stats()
554 .map(|(k, v)| (k.clone(), v.clone()))
555 .collect(),
556 }
557 };
558
559 if let Err(e) = store.save_session(&scenario_key, &snapshot) {
560 eprintln!("Warning: Failed to save learning data: {}", e);
561 }
562 }
563
564 drop(orchestrator);
566
567 if let Some(handle) = jsonl_handle {
569 let _ = self.runtime.block_on(handle);
571 }
572
573 Ok(EvalRun::new(
574 index,
575 seed,
576 success,
577 termination_reason,
578 metrics,
579 ))
580 }
581
582 fn build_workers(&self) -> Vec<Box<dyn WorkerAgent>> {
583 let mut workers: Vec<Box<dyn WorkerAgent>> = Vec::new();
584
585 for template in &self.scenario.agents.workers {
586 for i in 0..template.count {
587 let id = workers.len();
588 let name = template.id_pattern.replace("{i}", &i.to_string());
589
590 let worker = GenericWorker::new(id)
591 .with_name(name)
592 .with_require_guidance(true);
593
594 workers.push(Box::new(worker));
595 }
596 }
597
598 workers
599 }
600
601 fn build_managers(&self) -> Vec<DefaultBatchManagerAgent> {
602 let mut managers = Vec::new();
603 let mut manager_index = 0;
604
605 for template in &self.scenario.agents.managers {
606 let ids = template.generate_ids();
607 for name in ids {
608 let manager = DefaultBatchManagerAgent::new(ManagerId(manager_index))
609 .with_name(name)
610 .with_interval(self.scenario.manager.process_interval_ticks);
611
612 managers.push(manager);
613 manager_index += 1;
614 }
615 }
616
617 if managers.is_empty() {
619 managers.push(
620 DefaultBatchManagerAgent::new(ManagerId(0))
621 .with_name("default_manager")
622 .with_interval(self.scenario.manager.process_interval_ticks),
623 );
624 }
625
626 managers
627 }
628
629 fn build_management_strategy(&self) -> ManagementStrategy {
630 match &self.scenario.app_config.management_strategy {
631 ManagementStrategyConfig::EveryTick {} => ManagementStrategy::EveryTick,
632 ManagementStrategyConfig::IntervalBased { max_interval } => {
633 ManagementStrategy::FixedInterval {
634 interval: *max_interval,
635 }
636 }
637 ManagementStrategyConfig::EventDriven { triggers: _ } => {
638 ManagementStrategy::CompletionBased { max_wait_ticks: 50 }
640 }
641 ManagementStrategyConfig::Hybrid {
642 max_interval,
643 triggers: _,
644 } => ManagementStrategy::Hybrid {
645 preferred_interval: *max_interval,
646 force_after_ticks: max_interval * 2,
647 },
648 ManagementStrategyConfig::Disabled {} => {
649 ManagementStrategy::FixedInterval { interval: u64::MAX }
651 }
652 }
653 }
654
655 fn build_task_from_scenario(&self) -> Option<SwarmTask> {
659 let task_config = &self.scenario.task;
660
661 if task_config.goal.is_empty() {
662 return None;
663 }
664
665 let mut context = serde_json::Map::new();
667
668 if let Some(target_path) = &task_config.context.target_path {
669 context.insert(
670 "target_path".to_string(),
671 serde_json::Value::String(target_path.clone()),
672 );
673 }
674 if let Some(working_dir) = &task_config.context.working_dir {
675 context.insert(
676 "working_dir".to_string(),
677 serde_json::Value::String(working_dir.clone()),
678 );
679 }
680 if let Some(max_depth) = task_config.context.max_depth {
681 context.insert(
682 "max_depth".to_string(),
683 serde_json::Value::Number(serde_json::Number::from(max_depth)),
684 );
685 }
686
687 for (key, value) in &task_config.context.extra {
689 if let Ok(json_value) = serde_json::to_value(value) {
690 context.insert(key.clone(), json_value);
691 }
692 }
693
694 let task =
695 SwarmTask::new(&task_config.goal).with_context(serde_json::Value::Object(context));
696
697 Some(task)
698 }
699
700 fn build_extensions_from_scenario(&self, seed: u64) -> Extensions {
702 let mut extensions = if let Some(factory) = &self.extensions_factory {
703 factory()
704 } else {
705 Extensions::new()
706 };
707
708 extensions.insert(EvalSeed(seed));
710
711 extensions.insert(self.scenario.llm.clone());
713
714 if let Some(ref lora) = self.scenario.llm.lora {
716 extensions.insert(lora.clone());
717 }
718
719 extensions.insert(self.scenario.manager.clone());
721
722 extensions.insert(self.scenario.batch_processor.clone());
724
725 let core_actions_config = self.scenario.actions.to_core_config();
727 extensions.insert(core_actions_config);
728
729 let env_type = self.scenario.environment.env_type.as_str();
731 let env_params = &self.scenario.environment.params;
732
733 let env_box: Option<EnvironmentBox> = match env_type {
734 "maze" => {
735 let map = env_params.get("map").and_then(|v| v.as_str()).unwrap_or("");
736 let worker_count = env_params
737 .get("worker_count")
738 .and_then(|v| v.as_u64())
739 .unwrap_or(1) as usize;
740 Some(Box::new(MazeEnvironment::from_str(map, worker_count)))
741 }
742 "code" => {
743 Some(Box::new(CodeEnvironment::auth_scenario()))
745 }
746 "troubleshooting" => {
747 let scenario_name = env_params
748 .get("scenario")
749 .and_then(|v| v.as_str())
750 .unwrap_or("memory_leak");
751 let env = match scenario_name {
752 "memory_leak" => TroubleshootingEnvironment::memory_leak_scenario(),
753 "cpu_spike" => TroubleshootingEnvironment::cpu_spike_scenario(),
754 "network_timeout" => TroubleshootingEnvironment::network_timeout_scenario(),
755 "medium" => TroubleshootingEnvironment::complex_scenario(15, 3, 2, seed),
756 "high" => TroubleshootingEnvironment::complex_scenario(30, 8, 3, seed),
757 "extreme" => TroubleshootingEnvironment::complex_scenario(50, 15, 4, seed),
758 "complex" => {
759 let total_services = env_params
760 .get("total_services")
761 .and_then(|v| v.as_u64())
762 .unwrap_or(15) as usize;
763 let noise_services = env_params
764 .get("noise_services")
765 .and_then(|v| v.as_u64())
766 .unwrap_or(3) as usize;
767 let cascade_depth = env_params
768 .get("cascade_depth")
769 .and_then(|v| v.as_u64())
770 .unwrap_or(2) as usize;
771 TroubleshootingEnvironment::complex_scenario(
772 total_services,
773 noise_services,
774 cascade_depth,
775 seed,
776 )
777 }
778 _ => TroubleshootingEnvironment::memory_leak_scenario(),
779 };
780 Some(Box::new(env))
781 }
782 "search" => {
783 let scenario_name = env_params
784 .get("scenario")
785 .and_then(|v| v.as_str())
786 .unwrap_or("basic");
787 let env = match scenario_name {
788 "basic" => SearchEnvironment::basic_scenario(),
789 "medium" => SearchEnvironment::medium_scenario(),
790 "large" => SearchEnvironment::large_scenario(),
791 "custom" => {
792 let file_count = env_params
793 .get("file_count")
794 .and_then(|v| v.as_u64())
795 .unwrap_or(5) as usize;
796 let target_index = env_params
797 .get("target_index")
798 .and_then(|v| v.as_u64())
799 .unwrap_or(2) as usize;
800 SearchEnvironment::custom_scenario(file_count, target_index, seed)
801 }
802 _ => SearchEnvironment::basic_scenario(),
803 };
804 Some(Box::new(env))
805 }
806 "internal_diagnosis" => {
807 let scenario_name = env_params
808 .get("scenario")
809 .and_then(|v| v.as_str())
810 .unwrap_or("routing");
811 let env = match scenario_name {
812 "routing" => InternalDiagnosisEnvironment::routing_error_scenario(),
813 "failover" => InternalDiagnosisEnvironment::failover_error_scenario(),
814 "worker_pool" => InternalDiagnosisEnvironment::worker_pool_scenario(),
815 "strategy" => InternalDiagnosisEnvironment::strategy_mismatch_scenario(),
816 "exploration" => InternalDiagnosisEnvironment::exploration_depth_scenario(),
817 "complex" => InternalDiagnosisEnvironment::complex_scenario(seed),
818 _ => InternalDiagnosisEnvironment::routing_error_scenario(),
819 };
820 Some(Box::new(env))
821 }
822 _ => None, };
824
825 if let Some(env) = env_box {
826 extensions.insert(env);
827 }
828
829 let graph = self.dependency_graph.clone().or_else(|| {
831 self.scenario.dependency_graph.as_ref().and_then(|cfg| {
832 let action_names = self.scenario.actions.action_names();
833 cfg.to_core_graph(&action_names)
834 })
835 });
836 if let Some(g) = graph {
837 extensions.insert(g);
838 }
839
840 if let Some(ref prior) = self.prior_snapshot {
843 extensions.insert(prior.clone());
844 }
845
846 extensions
847 }
848
849 fn evaluate_conditions(
853 &self,
854 metrics: &RunMetrics,
855 environment_done: bool,
856 timed_out: bool,
857 ) -> (bool, TerminationReason) {
858 let conditions = &self.scenario.conditions;
859
860 for condition in &conditions.failure {
862 if let Some(actual) =
863 self.get_metric_value(&condition.metric, metrics, environment_done)
864 {
865 if condition.evaluate(&actual) {
866 return (false, TerminationReason::Failure);
867 }
868 }
869 }
870
871 if timed_out {
873 return match conditions.on_timeout {
874 TimeoutBehavior::Fail => (false, TerminationReason::Timeout),
875 TimeoutBehavior::PartialSuccess => {
876 let success = self.check_success_conditions(metrics, environment_done);
878 (success, TerminationReason::Timeout)
879 }
880 TimeoutBehavior::MilestoneScore => {
881 (false, TerminationReason::Timeout)
883 }
884 };
885 }
886
887 let success = self.check_success_conditions(metrics, environment_done);
889 if success {
890 (true, TerminationReason::Success)
891 } else {
892 (false, TerminationReason::Stopped)
895 }
896 }
897
898 fn check_success_conditions(&self, metrics: &RunMetrics, environment_done: bool) -> bool {
900 let conditions = &self.scenario.conditions;
901
902 if conditions.success.is_empty() {
904 return true;
905 }
906
907 conditions.success.iter().all(|condition| {
909 self.get_metric_value(&condition.metric, metrics, environment_done)
910 .map(|actual| condition.evaluate(&actual))
911 .unwrap_or(false)
912 })
913 }
914
915 fn get_metric_value(
917 &self,
918 path: &str,
919 metrics: &RunMetrics,
920 environment_done: bool,
921 ) -> Option<ConditionValue> {
922 match path {
923 "environment.done" => Some(ConditionValue::Bool(environment_done)),
925
926 "task.total_ticks" | "total_ticks" => {
928 Some(ConditionValue::Integer(metrics.task.total_ticks as i64))
929 }
930 "task.success_rate" | "success_rate" => {
931 Some(ConditionValue::Float(metrics.task.success_rate))
932 }
933 "task.total_actions" | "total_actions" => {
934 Some(ConditionValue::Integer(metrics.task.total_actions as i64))
935 }
936 "task.successful_actions" | "successful_actions" => Some(ConditionValue::Integer(
937 metrics.task.successful_actions as i64,
938 )),
939
940 "performance.llm_error_rate" | "llm_error_rate" => {
942 Some(ConditionValue::Float(metrics.performance.llm_error_rate))
943 }
944 "performance.llm_invocations" | "llm_invocations" => Some(ConditionValue::Integer(
945 metrics.performance.llm_invocations as i64,
946 )),
947
948 "coordination.manager_activations" | "manager_activations" => Some(
950 ConditionValue::Integer(metrics.coordination.manager_activations as i64),
951 ),
952
953 "errors.count" => {
955 let failed = metrics
956 .task
957 .total_actions
958 .saturating_sub(metrics.task.successful_actions);
959 Some(ConditionValue::Integer(failed as i64))
960 }
961
962 _ => None,
964 }
965 }
966}
967
968struct DynManagerWrapper(Box<dyn ManagerAgent>);
970
971impl ManagerAgent for DynManagerWrapper {
972 fn prepare(
973 &self,
974 context: &swarm_engine_core::agent::TaskContext,
975 ) -> swarm_engine_core::agent::BatchDecisionRequest {
976 self.0.prepare(context)
977 }
978
979 fn finalize(
980 &self,
981 context: &swarm_engine_core::agent::TaskContext,
982 responses: Vec<(
983 swarm_engine_core::types::WorkerId,
984 swarm_engine_core::agent::DecisionResponse,
985 )>,
986 ) -> swarm_engine_core::agent::ManagementDecision {
987 self.0.finalize(context, responses)
988 }
989
990 fn id(&self) -> swarm_engine_core::agent::ManagerId {
991 self.0.id()
992 }
993
994 fn name(&self) -> &str {
995 self.0.name()
996 }
997}
998
999struct DynBatchInvokerWrapper(Box<dyn BatchInvoker>);
1001
1002impl BatchInvoker for DynBatchInvokerWrapper {
1003 fn invoke(
1004 &self,
1005 request: swarm_engine_core::agent::BatchDecisionRequest,
1006 extensions: &swarm_engine_core::extensions::Extensions,
1007 ) -> swarm_engine_core::agent::BatchInvokeResult {
1008 self.0.invoke(request, extensions)
1009 }
1010
1011 fn plan_dependencies(
1012 &self,
1013 task: &str,
1014 actions: &[ActionDef],
1015 ) -> Option<swarm_engine_core::exploration::DependencyGraph> {
1016 self.0.plan_dependencies(task, actions)
1017 }
1018
1019 fn name(&self) -> &str {
1020 self.0.name()
1021 }
1022}
1023
1024struct DynOperatorProviderWrapper(Box<dyn OperatorProvider<NodeRules>>);
1026
1027impl OperatorProvider<NodeRules> for DynOperatorProviderWrapper {
1028 fn provide(
1029 &self,
1030 rules: NodeRules,
1031 context: Option<
1032 &swarm_engine_core::exploration::ProviderContext<
1033 '_,
1034 swarm_engine_core::exploration::ActionNodeData,
1035 String,
1036 swarm_engine_core::exploration::MapNodeState,
1037 >,
1038 >,
1039 ) -> swarm_engine_core::exploration::ConfigurableOperator<NodeRules> {
1040 self.0.provide(rules, context)
1041 }
1042
1043 fn reevaluate(
1044 &self,
1045 operator: &mut swarm_engine_core::exploration::ConfigurableOperator<NodeRules>,
1046 ctx: &swarm_engine_core::exploration::ProviderContext<
1047 '_,
1048 swarm_engine_core::exploration::ActionNodeData,
1049 String,
1050 swarm_engine_core::exploration::MapNodeState,
1051 >,
1052 ) {
1053 self.0.reevaluate(operator, ctx)
1054 }
1055
1056 fn name(&self) -> &str {
1057 self.0.name()
1058 }
1059}