1use std::time::Duration;
4
5use tokio::runtime::Handle;
6
7use std::path::PathBuf;
8
9use swarm_engine_core::actions::ActionDef;
10use swarm_engine_core::agent::{
11 BatchInvoker, DefaultBatchManagerAgent, GenericWorker, ManagementStrategy, ManagerAgent,
12 ManagerId, WorkerAgent,
13};
14use swarm_engine_core::environment::EnvironmentBox;
15use swarm_engine_core::events::{
16 ActionEventPublisher, JsonlWriter, LearningEventChannel, LearningLifecycleHook,
17};
18use swarm_engine_core::exploration::{LearnedDependencyProvider, NodeRules, OperatorProvider};
19use swarm_engine_core::extensions::Extensions;
20use swarm_engine_core::learn::{
21 CountTrigger, LearningSnapshot, LearningStore, OfflineModel, SnapshotMetadata, TrainTrigger,
22};
23use swarm_engine_core::orchestrator::{OrchestratorBuilder, SwarmConfig};
24use swarm_engine_core::types::{GroupId, SwarmTask};
25
26use crate::environments::{
27 CodeEnvironment, DeepSearchEnvironment, InternalDiagnosisEnvironment, MazeEnvironment,
28 SearchEnvironment, TroubleshootingEnvironment,
29};
30
31use crate::aggregator::Aggregator;
32use crate::error::Result;
33use crate::metrics::RunMetrics;
34use crate::reporter::{ConfigSummary, EvalReport, SeedInfo};
35use crate::run::{EvalRun, TerminationReason};
36use crate::scenario::conditions::{ConditionValue, TimeoutBehavior};
37use crate::scenario::{EvalScenario, ManagementStrategyConfig};
38
39#[derive(Debug, Clone, Copy)]
43pub struct EvalSeed(pub u64);
44
45pub type ManagerFactory = Box<dyn Fn() -> Box<dyn ManagerAgent> + Send + Sync>;
47
48pub type BatchInvokerFactory = Box<dyn Fn() -> Box<dyn BatchInvoker> + Send + Sync>;
50
51pub type OperatorProviderFactory =
53 Box<dyn Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync>;
54
55pub struct EvalRunner {
70 scenario: EvalScenario,
71 runtime: Handle,
72 runs: usize,
73 seed: u64,
74 task: Option<SwarmTask>,
76 manager_factory: Option<ManagerFactory>,
78 batch_invoker_factory: Option<BatchInvokerFactory>,
80 extensions_factory: Option<Box<dyn Fn() -> Extensions + Send + Sync>>,
82 operator_provider_factory: Option<OperatorProviderFactory>,
84 verbose: bool,
86 enable_exploration: bool,
88 dependency_graph: Option<swarm_engine_core::exploration::DependencyGraph>,
90 action_events_path: Option<PathBuf>,
92 learning_store: Option<LearningStore>,
94 prior_snapshot: Option<LearningSnapshot>,
96 offline_model: Option<OfflineModel>,
98 eval_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
100 train_trigger: Option<std::sync::Arc<dyn TrainTrigger>>,
102}
103
104impl EvalRunner {
105 pub fn new(scenario: EvalScenario, runtime: Handle) -> Self {
106 Self {
107 scenario,
108 runtime,
109 runs: 1,
110 seed: 42,
111 task: None,
112 manager_factory: None,
113 batch_invoker_factory: None,
114 extensions_factory: None,
115 operator_provider_factory: None,
116 verbose: false,
117 enable_exploration: false,
118 dependency_graph: None,
119 action_events_path: None,
120 learning_store: None,
121 prior_snapshot: None,
122 offline_model: None,
123 eval_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
124 train_trigger: None,
125 }
126 }
127
128 pub fn with_verbose(mut self, verbose: bool) -> Self {
130 self.verbose = verbose;
131 self
132 }
133
134 pub fn with_exploration(mut self, enable: bool) -> Self {
136 self.enable_exploration = enable;
137 self
138 }
139
140 pub fn with_dependency_graph(
145 mut self,
146 graph: swarm_engine_core::exploration::DependencyGraph,
147 ) -> Self {
148 self.dependency_graph = Some(graph);
149 self
150 }
151
152 pub fn with_action_events_path(mut self, path: impl Into<PathBuf>) -> Self {
164 self.action_events_path = Some(path.into());
165 self
166 }
167
168 pub fn with_learning_store(mut self, path: impl AsRef<std::path::Path>) -> Self {
179 match LearningStore::new(path) {
180 Ok(store) => {
181 let scenario_key = self.scenario.meta.id.learning_key();
184 self.prior_snapshot = store.load_scenario(&scenario_key).ok();
185 self.offline_model = store.load_offline_model(&scenario_key).ok();
187 if let Some(ref model) = self.offline_model {
188 tracing::debug!(
189 ucb1_c = model.parameters.ucb1_c,
190 strategy = %model.strategy_config.initial_strategy,
191 action_order = model.action_order.is_some(),
192 "Offline model loaded"
193 );
194 println!(
195 "Offline model loaded: ucb1_c={:.3}, strategy={}",
196 model.parameters.ucb1_c, model.strategy_config.initial_strategy
197 );
198 }
199 self.learning_store = Some(store);
200
201 LearningEventChannel::global().enable();
203 }
204 Err(e) => {
205 eprintln!("Warning: Failed to create LearningStore: {}", e);
206 }
207 }
208 self
209 }
210
211 pub fn with_train_trigger(mut self, trigger: std::sync::Arc<dyn TrainTrigger>) -> Self {
225 self.train_trigger = Some(trigger);
226 self
227 }
228
229 pub fn with_runs(mut self, runs: usize) -> Self {
230 self.runs = runs;
231 self
232 }
233
234 pub fn with_seed(mut self, seed: u64) -> Self {
235 self.seed = seed;
236 self
237 }
238
239 pub fn with_task(mut self, task: SwarmTask) -> Self {
241 self.task = Some(task);
242 self
243 }
244
245 pub fn with_manager_factory<F>(mut self, factory: F) -> Self
247 where
248 F: Fn() -> Box<dyn ManagerAgent> + Send + Sync + 'static,
249 {
250 self.manager_factory = Some(Box::new(factory));
251 self
252 }
253
254 pub fn with_batch_invoker_factory<F>(mut self, factory: F) -> Self
256 where
257 F: Fn() -> Box<dyn BatchInvoker> + Send + Sync + 'static,
258 {
259 self.batch_invoker_factory = Some(Box::new(factory));
260 self
261 }
262
263 pub fn with_extensions_factory<F>(mut self, factory: F) -> Self
265 where
266 F: Fn() -> Extensions + Send + Sync + 'static,
267 {
268 self.extensions_factory = Some(Box::new(factory));
269 self
270 }
271
272 pub fn with_operator_provider_factory<F>(mut self, factory: F) -> Self
290 where
291 F: Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync + 'static,
292 {
293 self.operator_provider_factory = Some(Box::new(factory));
294 self
295 }
296
297 pub fn run(&self) -> Result<EvalReport> {
298 let mut eval_runs = Vec::with_capacity(self.runs);
299 let mut run_seeds = Vec::with_capacity(self.runs);
300 let mut first_dependency_graph = None;
301
302 let group_id = GroupId::new();
305
306 for i in 0..self.runs {
307 let run_seed = self.seed.wrapping_add(i as u64);
308 run_seeds.push(run_seed);
309
310 let (result, dep_graph) = self.run_single(i, run_seed, group_id)?;
311 eval_runs.push(result);
312
313 if i == 0 {
315 first_dependency_graph = dep_graph;
316 }
317 }
318
319 let aggregated = Aggregator::aggregate(&eval_runs);
320
321 if let (Some(ref store), Some(ref graph)) = (&self.learning_store, &first_dependency_graph)
327 {
328 if graph.has_action_order() {
329 let scenario_key = self.scenario.meta.id.learning_key();
330
331 if let Ok(mut model) = store.load_offline_model(&scenario_key) {
333 use swarm_engine_core::learn::LearnedActionOrder;
334
335 let action_order = LearnedActionOrder::new(
336 graph.discover_order().to_vec(),
337 graph.not_discover_order().to_vec(),
338 graph.available_actions(),
339 );
340 model.action_order = Some(action_order.clone());
341
342 if let Err(e) = store.save_offline_model(&scenario_key, &model) {
343 tracing::warn!("Failed to save action_order: {}", e);
344 } else {
345 println!(
346 "Action order saved: discover={:?}, not_discover={:?}",
347 action_order.discover, action_order.not_discover
348 );
349 }
350 }
351 }
352 }
353
354 Ok(EvalReport {
355 config_summary: ConfigSummary {
356 scenario_name: self.scenario.meta.name.clone(),
357 scenario_id: self.scenario.meta.id.to_string(),
358 worker_count: self.scenario.agents.workers.iter().map(|w| w.count).sum(),
359 max_ticks: self.scenario.app_config.max_ticks,
360 run_count: self.runs,
361 },
362 seed_info: SeedInfo {
363 base_seed: self.seed,
364 run_seeds,
365 },
366 runs: eval_runs,
367 aggregated,
368 assertion_results: vec![],
369 })
370 }
371
372 fn run_single(
380 &self,
381 index: usize,
382 seed: u64,
383 group_id: GroupId,
384 ) -> Result<(
385 EvalRun,
386 Option<swarm_engine_core::exploration::DependencyGraph>,
387 )> {
388 let workers = self.build_workers();
389
390 let management_strategy = self.build_management_strategy();
392
393 let config = SwarmConfig {
394 tick_duration: Duration::from_millis(self.scenario.app_config.tick_duration_ms),
395 max_ticks: self.scenario.app_config.max_ticks,
396 management_strategy,
397 };
398
399 let mut builder = OrchestratorBuilder::new().config(config);
401
402 for worker in workers {
404 builder = builder.add_worker_boxed(worker);
405 }
406
407 if let Some(factory) = &self.manager_factory {
409 let manager = factory();
410 builder = builder.manager(DynManagerWrapper(manager));
411 } else {
412 let managers = self.build_managers();
414 for manager in managers {
415 builder = builder.manager(manager);
416 }
417 }
418
419 if let Some(factory) = &self.batch_invoker_factory {
421 let invoker = factory();
422 builder = builder.batch_invoker(DynBatchInvokerWrapper(invoker));
423 }
424
425 let extensions = self.build_extensions_from_scenario(seed);
427 builder = builder.extensions(extensions);
428
429 if self.enable_exploration || self.scenario.app_config.enable_exploration {
431 builder = builder.with_exploration();
432 }
433
434 if self.operator_provider_factory.is_none() {
436 if let Some(ref model) = self.offline_model {
437 builder = builder.with_offline_model(model.clone());
438 println!(
439 "Offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
440 model.parameters.ucb1_c,
441 model.strategy_config.maturity_threshold,
442 model.strategy_config.initial_strategy
443 );
444 }
445 }
446
447 if let Some(ref model) = self.offline_model {
450 if let Some(ref action_order) = model.action_order {
451 let provider = LearnedDependencyProvider::new(action_order.clone());
452 builder = builder.dependency_provider(provider);
453 println!(
454 "Learned action order applied: discover={:?}, not_discover={:?}",
455 action_order.discover, action_order.not_discover
456 );
457 }
458 }
459
460 if let Some(factory) = &self.operator_provider_factory {
462 let provider = factory();
463 builder = builder.operator_provider(DynOperatorProviderWrapper(provider));
464 }
465
466 let jsonl_handle = if let Some(base_path) = &self.action_events_path {
468 let path = if self.runs > 1 {
470 let stem = base_path.file_stem().unwrap_or_default().to_string_lossy();
471 let ext = base_path.extension().unwrap_or_default().to_string_lossy();
472 let parent = base_path.parent().unwrap_or(std::path::Path::new("."));
473 parent.join(format!("{}_run{}.{}", stem, index, ext))
474 } else {
475 base_path.clone()
476 };
477
478 let (publisher, _rx) = ActionEventPublisher::new(1024);
479 let jsonl_rx = publisher.subscribe();
480 let writer = JsonlWriter::new(jsonl_rx, path);
481
482 let handle = self.runtime.spawn(async move {
484 if let Err(e) = writer.run().await {
485 eprintln!("JsonlWriter error: {}", e);
486 }
487 });
488
489 builder = builder.action_collector(publisher);
490 Some(handle)
491 } else {
492 None
493 };
494
495 if let Some(ref store) = self.learning_store {
497 let scenario_key = self.scenario.meta.id.learning_key();
498 let mut hook = LearningLifecycleHook::new(store.storage().base_dir())
499 .with_scenario(scenario_key)
500 .with_shared_eval_count(std::sync::Arc::clone(&self.eval_count));
501
502 if let Some(ref trigger) = self.train_trigger {
505 hook = hook.with_trigger(std::sync::Arc::clone(trigger));
506 } else {
507 hook = hook.with_trigger(std::sync::Arc::new(CountTrigger::new(self.runs)));
509 }
510
511 builder = builder.lifecycle_hook(Box::new(hook));
512 }
513
514 let mut orchestrator = builder.build(self.runtime.clone());
515
516 let manager_count = self
518 .scenario
519 .agents
520 .managers
521 .iter()
522 .map(|t| t.count)
523 .sum::<usize>();
524 if manager_count > 1 {
525 orchestrator.enable_partitioning();
526 }
527
528 let task_to_run = self
531 .task
532 .clone()
533 .or_else(|| self.build_task_from_scenario())
534 .map(|task| task.with_group_id(group_id));
535
536 let result = if let Some(task) = task_to_run {
539 orchestrator.run_task(task)?
540 } else {
541 orchestrator.run()
542 };
543
544 let state = orchestrator.state();
545 let timed_out = result.total_ticks >= self.scenario.app_config.max_ticks;
546 let environment_done = state.shared.is_environment_done();
547 let total_actions = state.shared.stats.total_visits() as u64;
548 let successful_actions = state.shared.stats.total_successes() as u64;
549 let llm_invocations = state.shared.llm_invocations();
550 let llm_invoke_errors = state.shared.llm_errors();
551
552 let metrics = RunMetrics {
553 task: crate::metrics::TaskMetrics {
554 total_ticks: result.total_ticks,
555 total_tasks: 0,
556 completed_tasks: 0,
557 total_actions,
558 successful_actions,
559 success_rate: state.shared.stats.success_rate(),
560 },
561 coordination: crate::metrics::CoordinationMetrics {
562 manager_activations: llm_invocations,
564 manager_intervention_rate: if result.total_ticks > 0 {
565 llm_invocations as f64 / result.total_ticks as f64
566 } else {
567 0.0
568 },
569 ..Default::default()
570 },
571 performance: {
572 let llm_error_rate = if llm_invocations > 0 {
573 llm_invoke_errors as f64 / llm_invocations as f64
574 } else {
575 0.0
576 };
577 crate::metrics::PerformanceMetrics {
578 total_duration_ms: result.total_duration.as_millis() as f64,
579 avg_tick_latency_ms: if result.total_ticks > 0 {
580 result.total_duration.as_millis() as f64 / result.total_ticks as f64
581 } else {
582 0.0
583 },
584 raw_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
585 total_actions as f64 / result.total_duration.as_secs_f64()
586 } else {
587 0.0
588 },
589 effective_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
590 successful_actions as f64 / result.total_duration.as_secs_f64()
591 } else {
592 0.0
593 },
594 llm_invocations,
595 llm_invoke_errors,
596 llm_error_rate,
597 ..Default::default()
598 }
599 },
600 robustness: Default::default(),
601 };
602
603 let (success, termination_reason) = if !result.completed {
605 (false, TerminationReason::Stopped)
606 } else {
607 self.evaluate_conditions(&metrics, environment_done, timed_out)
608 };
609
610 if let Some(ref store) = self.learning_store {
612 let scenario_key = self.scenario.meta.id.learning_key();
615 let metadata = SnapshotMetadata::default()
616 .with_scenario(&scenario_key)
617 .with_task(&self.scenario.task.goal);
618
619 let learn_stats_opt = orchestrator.learned_provider().and_then(|p| p.stats());
621
622 let snapshot = if let Some(learn_stats) = learn_stats_opt {
623 LearningSnapshot {
624 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
625 metadata,
626 episode_transitions: learn_stats.episode_transitions.clone(),
627 ngram_stats: learn_stats.ngram_stats.clone(),
628 selection_performance: learn_stats.selection_performance.clone(),
629 contextual_stats: learn_stats
630 .contextual_stats
631 .iter()
632 .map(|(k, v)| {
633 (
634 k.clone(),
635 swarm_engine_core::online_stats::ActionStats {
636 visits: v.visits,
637 successes: v.successes,
638 failures: v.failures,
639 ..Default::default()
640 },
641 )
642 })
643 .collect(),
644 action_stats: state
645 .shared
646 .stats
647 .all_action_stats()
648 .map(|(k, v)| (k.clone(), v.clone()))
649 .collect(),
650 }
651 } else {
652 LearningSnapshot {
654 version: swarm_engine_core::learn::SNAPSHOT_VERSION,
655 metadata,
656 episode_transitions: Default::default(),
657 ngram_stats: Default::default(),
658 selection_performance: Default::default(),
659 contextual_stats: Default::default(),
660 action_stats: state
661 .shared
662 .stats
663 .all_action_stats()
664 .map(|(k, v)| (k.clone(), v.clone()))
665 .collect(),
666 }
667 };
668
669 if let Err(e) = store.save_session(&scenario_key, &snapshot) {
670 eprintln!("Warning: Failed to save learning data: {}", e);
671 }
672 }
673
674 let dependency_graph = orchestrator.dependency_graph().cloned();
676 let learn_record = dependency_graph
677 .as_ref()
678 .and_then(|g| g.learn_record().cloned());
679
680 if let (Some(ref store), Some(record)) = (&self.learning_store, learn_record) {
682 use std::fs::OpenOptions;
683 use std::io::Write;
684 use swarm_engine_core::learn::{Episode, EpisodeContext, Outcome};
685
686 let scenario_key = self.scenario.meta.id.learning_key();
687 let ticks = result.total_ticks as u32;
688 let max_ticks = self.scenario.app_config.max_ticks as u32;
689
690 let outcome = if environment_done {
695 let score = 1.0 - (ticks as f64 / max_ticks as f64).min(1.0);
698 Outcome::success(score.max(0.01)) } else if timed_out {
700 let partial_score = (ticks as f64 / max_ticks as f64).min(0.99);
702 Outcome::timeout(Some(partial_score))
703 } else {
704 Outcome::failure(format!("Task not completed at tick {}", ticks))
706 };
707 let is_success = outcome.is_success();
708
709 let context = EpisodeContext::new().with_record(record);
711 let episode = Episode::builder()
712 .learn_model("dependency_graph")
713 .context(context)
714 .outcome(outcome)
715 .scenario(&scenario_key)
716 .build();
717
718 let path = store.storage().base_dir().join(format!(
720 "scenarios/{}/dep_graph_episodes.jsonl",
721 scenario_key
722 ));
723
724 if let Some(parent) = path.parent() {
725 let _ = std::fs::create_dir_all(parent);
726 }
727
728 if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(&path) {
729 if let Ok(json) = serde_json::to_string(&episode) {
730 let _ = writeln!(file, "{}", json);
731 tracing::debug!(
732 path = %path.display(),
733 success = is_success,
734 "Saved Episode with DependencyGraphRecord"
735 );
736 }
737 }
738 }
739
740 drop(orchestrator);
742
743 if let Some(handle) = jsonl_handle {
745 let _ = self.runtime.block_on(handle);
747 }
748
749 Ok((
750 EvalRun::new(index, seed, success, termination_reason, metrics),
751 dependency_graph,
752 ))
753 }
754
755 fn build_workers(&self) -> Vec<Box<dyn WorkerAgent>> {
756 let mut workers: Vec<Box<dyn WorkerAgent>> = Vec::new();
757
758 for template in &self.scenario.agents.workers {
759 for i in 0..template.count {
760 let id = workers.len();
761 let name = template.id_pattern.replace("{i}", &i.to_string());
762
763 let worker = GenericWorker::new(id)
764 .with_name(name)
765 .with_require_guidance(true);
766
767 workers.push(Box::new(worker));
768 }
769 }
770
771 workers
772 }
773
774 fn build_managers(&self) -> Vec<DefaultBatchManagerAgent> {
775 let mut managers = Vec::new();
776 let mut manager_index = 0;
777
778 for template in &self.scenario.agents.managers {
779 let ids = template.generate_ids();
780 for name in ids {
781 let manager = DefaultBatchManagerAgent::new(ManagerId(manager_index))
782 .with_name(name)
783 .with_interval(self.scenario.manager.process_interval_ticks);
784
785 managers.push(manager);
786 manager_index += 1;
787 }
788 }
789
790 if managers.is_empty() {
792 managers.push(
793 DefaultBatchManagerAgent::new(ManagerId(0))
794 .with_name("default_manager")
795 .with_interval(self.scenario.manager.process_interval_ticks),
796 );
797 }
798
799 managers
800 }
801
802 fn build_management_strategy(&self) -> ManagementStrategy {
803 match &self.scenario.app_config.management_strategy {
804 ManagementStrategyConfig::EveryTick {} => ManagementStrategy::EveryTick,
805 ManagementStrategyConfig::IntervalBased { max_interval } => {
806 ManagementStrategy::FixedInterval {
807 interval: *max_interval,
808 }
809 }
810 ManagementStrategyConfig::EventDriven { triggers: _ } => {
811 ManagementStrategy::CompletionBased { max_wait_ticks: 50 }
813 }
814 ManagementStrategyConfig::Hybrid {
815 max_interval,
816 triggers: _,
817 } => ManagementStrategy::Hybrid {
818 preferred_interval: *max_interval,
819 force_after_ticks: max_interval * 2,
820 },
821 ManagementStrategyConfig::Disabled {} => {
822 ManagementStrategy::FixedInterval { interval: u64::MAX }
824 }
825 }
826 }
827
828 fn build_task_from_scenario(&self) -> Option<SwarmTask> {
832 let task_config = &self.scenario.task;
833
834 if task_config.goal.is_empty() {
835 return None;
836 }
837
838 let mut context = serde_json::Map::new();
840
841 if let Some(target_path) = &task_config.context.target_path {
842 context.insert(
843 "target_path".to_string(),
844 serde_json::Value::String(target_path.clone()),
845 );
846 }
847 if let Some(working_dir) = &task_config.context.working_dir {
848 context.insert(
849 "working_dir".to_string(),
850 serde_json::Value::String(working_dir.clone()),
851 );
852 }
853 if let Some(max_depth) = task_config.context.max_depth {
854 context.insert(
855 "max_depth".to_string(),
856 serde_json::Value::Number(serde_json::Number::from(max_depth)),
857 );
858 }
859
860 for (key, value) in &task_config.context.extra {
862 if let Ok(json_value) = serde_json::to_value(value) {
863 context.insert(key.clone(), json_value);
864 }
865 }
866
867 let task =
868 SwarmTask::new(&task_config.goal).with_context(serde_json::Value::Object(context));
869
870 Some(task)
871 }
872
873 fn build_extensions_from_scenario(&self, seed: u64) -> Extensions {
875 let mut extensions = if let Some(factory) = &self.extensions_factory {
876 factory()
877 } else {
878 Extensions::new()
879 };
880
881 extensions.insert(EvalSeed(seed));
883
884 extensions.insert(self.scenario.llm.clone());
886
887 if let Some(ref lora) = self.scenario.llm.lora {
889 extensions.insert(lora.clone());
890 }
891
892 extensions.insert(self.scenario.manager.clone());
894
895 extensions.insert(self.scenario.batch_processor.clone());
897
898 let core_actions_config = self.scenario.actions.to_core_config();
900 extensions.insert(core_actions_config);
901
902 let env_type = self.scenario.environment.env_type.as_str();
904 let env_params = &self.scenario.environment.params;
905
906 let env_box: Option<EnvironmentBox> = match env_type {
907 "maze" => {
908 let map = env_params.get("map").and_then(|v| v.as_str()).unwrap_or("");
909 let worker_count = env_params
910 .get("worker_count")
911 .and_then(|v| v.as_u64())
912 .unwrap_or(1) as usize;
913 Some(Box::new(MazeEnvironment::from_str(map, worker_count)))
914 }
915 "code" => {
916 Some(Box::new(CodeEnvironment::auth_scenario()))
918 }
919 "troubleshooting" => {
920 let scenario_name = env_params
921 .get("scenario")
922 .and_then(|v| v.as_str())
923 .unwrap_or("memory_leak");
924 let env = match scenario_name {
925 "memory_leak" => TroubleshootingEnvironment::memory_leak_scenario(),
926 "cpu_spike" => TroubleshootingEnvironment::cpu_spike_scenario(),
927 "network_timeout" => TroubleshootingEnvironment::network_timeout_scenario(),
928 "medium" => TroubleshootingEnvironment::complex_scenario(15, 3, 2, seed),
929 "high" => TroubleshootingEnvironment::complex_scenario(30, 8, 3, seed),
930 "extreme" => TroubleshootingEnvironment::complex_scenario(50, 15, 4, seed),
931 "complex" => {
932 let total_services = env_params
933 .get("total_services")
934 .and_then(|v| v.as_u64())
935 .unwrap_or(15) as usize;
936 let noise_services = env_params
937 .get("noise_services")
938 .and_then(|v| v.as_u64())
939 .unwrap_or(3) as usize;
940 let cascade_depth = env_params
941 .get("cascade_depth")
942 .and_then(|v| v.as_u64())
943 .unwrap_or(2) as usize;
944 TroubleshootingEnvironment::complex_scenario(
945 total_services,
946 noise_services,
947 cascade_depth,
948 seed,
949 )
950 }
951 _ => TroubleshootingEnvironment::memory_leak_scenario(),
952 };
953 Some(Box::new(env))
954 }
955 "search" => {
956 let scenario_name = env_params
957 .get("scenario")
958 .and_then(|v| v.as_str())
959 .unwrap_or("basic");
960 let env = match scenario_name {
961 "basic" => SearchEnvironment::basic_scenario(),
962 "medium" => SearchEnvironment::medium_scenario(),
963 "large" => SearchEnvironment::large_scenario(),
964 "custom" => {
965 let file_count = env_params
966 .get("file_count")
967 .and_then(|v| v.as_u64())
968 .unwrap_or(5) as usize;
969 let target_index = env_params
970 .get("target_index")
971 .and_then(|v| v.as_u64())
972 .unwrap_or(2) as usize;
973 SearchEnvironment::custom_scenario(file_count, target_index, seed)
974 }
975 _ => SearchEnvironment::basic_scenario(),
976 };
977 Some(Box::new(env))
978 }
979 "internal_diagnosis" => {
980 let scenario_name = env_params
981 .get("scenario")
982 .and_then(|v| v.as_str())
983 .unwrap_or("routing");
984 let env = match scenario_name {
985 "routing" => InternalDiagnosisEnvironment::routing_error_scenario(),
986 "failover" => InternalDiagnosisEnvironment::failover_error_scenario(),
987 "worker_pool" => InternalDiagnosisEnvironment::worker_pool_scenario(),
988 "strategy" => InternalDiagnosisEnvironment::strategy_mismatch_scenario(),
989 "exploration" => InternalDiagnosisEnvironment::exploration_depth_scenario(),
990 "complex" => InternalDiagnosisEnvironment::complex_scenario(seed),
991 _ => InternalDiagnosisEnvironment::routing_error_scenario(),
992 };
993 Some(Box::new(env))
994 }
995 "deep_search" => {
996 let scenario_name = env_params
997 .get("scenario")
998 .and_then(|v| v.as_str())
999 .unwrap_or("tech_question");
1000 let env = match scenario_name {
1001 "tech_question" | _ => DeepSearchEnvironment::tech_question_scenario(),
1002 };
1003 Some(Box::new(env))
1004 }
1005 "default" | "realworld" => {
1007 use swarm_engine_core::environment::DefaultEnvironment;
1008 let working_dir = env_params
1009 .get("working_dir")
1010 .and_then(|v| v.as_str())
1011 .map(std::path::PathBuf::from);
1012 let env = if let Some(dir) = working_dir {
1013 DefaultEnvironment::with_working_dir(dir)
1014 } else {
1015 DefaultEnvironment::new()
1016 };
1017 Some(Box::new(env))
1018 }
1019 _ => None, };
1021
1022 if let Some(env) = env_box {
1023 extensions.insert(env);
1024 }
1025
1026 let graph = self.dependency_graph.clone().or_else(|| {
1028 self.scenario.dependency_graph.as_ref().and_then(|cfg| {
1029 let action_names = self.scenario.actions.action_names();
1030 cfg.to_core_graph(&action_names)
1031 })
1032 });
1033 if let Some(g) = graph {
1034 extensions.insert(g);
1035 }
1036
1037 if let Some(ref prior) = self.prior_snapshot {
1040 extensions.insert(prior.clone());
1041 }
1042
1043 extensions
1044 }
1045
1046 fn evaluate_conditions(
1050 &self,
1051 metrics: &RunMetrics,
1052 environment_done: bool,
1053 timed_out: bool,
1054 ) -> (bool, TerminationReason) {
1055 let conditions = &self.scenario.conditions;
1056
1057 for condition in &conditions.failure {
1059 if let Some(actual) =
1060 self.get_metric_value(&condition.metric, metrics, environment_done)
1061 {
1062 if condition.evaluate(&actual) {
1063 return (false, TerminationReason::Failure);
1064 }
1065 }
1066 }
1067
1068 if timed_out {
1070 return match conditions.on_timeout {
1071 TimeoutBehavior::Fail => (false, TerminationReason::Timeout),
1072 TimeoutBehavior::PartialSuccess => {
1073 let success = self.check_success_conditions(metrics, environment_done);
1075 (success, TerminationReason::Timeout)
1076 }
1077 TimeoutBehavior::MilestoneScore => {
1078 (false, TerminationReason::Timeout)
1080 }
1081 };
1082 }
1083
1084 let success = self.check_success_conditions(metrics, environment_done);
1086 if success {
1087 (true, TerminationReason::Success)
1088 } else {
1089 (false, TerminationReason::Stopped)
1092 }
1093 }
1094
1095 fn check_success_conditions(&self, metrics: &RunMetrics, environment_done: bool) -> bool {
1097 let conditions = &self.scenario.conditions;
1098
1099 if conditions.success.is_empty() {
1101 return true;
1102 }
1103
1104 conditions.success.iter().all(|condition| {
1106 self.get_metric_value(&condition.metric, metrics, environment_done)
1107 .map(|actual| condition.evaluate(&actual))
1108 .unwrap_or(false)
1109 })
1110 }
1111
1112 fn get_metric_value(
1114 &self,
1115 path: &str,
1116 metrics: &RunMetrics,
1117 environment_done: bool,
1118 ) -> Option<ConditionValue> {
1119 match path {
1120 "environment.done" => Some(ConditionValue::Bool(environment_done)),
1122
1123 "task.total_ticks" | "total_ticks" => {
1125 Some(ConditionValue::Integer(metrics.task.total_ticks as i64))
1126 }
1127 "task.success_rate" | "success_rate" => {
1128 Some(ConditionValue::Float(metrics.task.success_rate))
1129 }
1130 "task.total_actions" | "total_actions" => {
1131 Some(ConditionValue::Integer(metrics.task.total_actions as i64))
1132 }
1133 "task.successful_actions" | "successful_actions" => Some(ConditionValue::Integer(
1134 metrics.task.successful_actions as i64,
1135 )),
1136
1137 "performance.llm_error_rate" | "llm_error_rate" => {
1139 Some(ConditionValue::Float(metrics.performance.llm_error_rate))
1140 }
1141 "performance.llm_invocations" | "llm_invocations" => Some(ConditionValue::Integer(
1142 metrics.performance.llm_invocations as i64,
1143 )),
1144
1145 "coordination.manager_activations" | "manager_activations" => Some(
1147 ConditionValue::Integer(metrics.coordination.manager_activations as i64),
1148 ),
1149
1150 "errors.count" => {
1152 let failed = metrics
1153 .task
1154 .total_actions
1155 .saturating_sub(metrics.task.successful_actions);
1156 Some(ConditionValue::Integer(failed as i64))
1157 }
1158
1159 _ => None,
1161 }
1162 }
1163}
1164
1165struct DynManagerWrapper(Box<dyn ManagerAgent>);
1167
1168impl ManagerAgent for DynManagerWrapper {
1169 fn prepare(
1170 &self,
1171 context: &swarm_engine_core::agent::TaskContext,
1172 ) -> swarm_engine_core::agent::BatchDecisionRequest {
1173 self.0.prepare(context)
1174 }
1175
1176 fn finalize(
1177 &self,
1178 context: &swarm_engine_core::agent::TaskContext,
1179 responses: Vec<(
1180 swarm_engine_core::types::WorkerId,
1181 swarm_engine_core::agent::DecisionResponse,
1182 )>,
1183 ) -> swarm_engine_core::agent::ManagementDecision {
1184 self.0.finalize(context, responses)
1185 }
1186
1187 fn id(&self) -> swarm_engine_core::agent::ManagerId {
1188 self.0.id()
1189 }
1190
1191 fn name(&self) -> &str {
1192 self.0.name()
1193 }
1194}
1195
1196struct DynBatchInvokerWrapper(Box<dyn BatchInvoker>);
1198
1199impl BatchInvoker for DynBatchInvokerWrapper {
1200 fn invoke(
1201 &self,
1202 request: swarm_engine_core::agent::BatchDecisionRequest,
1203 extensions: &swarm_engine_core::extensions::Extensions,
1204 ) -> swarm_engine_core::agent::BatchInvokeResult {
1205 self.0.invoke(request, extensions)
1206 }
1207
1208 fn plan_dependencies(
1209 &self,
1210 task: &str,
1211 actions: &[ActionDef],
1212 ) -> Option<swarm_engine_core::exploration::DependencyGraph> {
1213 self.0.plan_dependencies(task, actions)
1214 }
1215
1216 fn name(&self) -> &str {
1217 self.0.name()
1218 }
1219}
1220
1221struct DynOperatorProviderWrapper(Box<dyn OperatorProvider<NodeRules>>);
1223
1224impl OperatorProvider<NodeRules> for DynOperatorProviderWrapper {
1225 fn provide(
1226 &self,
1227 rules: NodeRules,
1228 context: Option<
1229 &swarm_engine_core::exploration::ProviderContext<
1230 '_,
1231 swarm_engine_core::exploration::ActionNodeData,
1232 String,
1233 swarm_engine_core::exploration::MapNodeState,
1234 >,
1235 >,
1236 ) -> swarm_engine_core::exploration::ConfigurableOperator<NodeRules> {
1237 self.0.provide(rules, context)
1238 }
1239
1240 fn reevaluate(
1241 &self,
1242 operator: &mut swarm_engine_core::exploration::ConfigurableOperator<NodeRules>,
1243 ctx: &swarm_engine_core::exploration::ProviderContext<
1244 '_,
1245 swarm_engine_core::exploration::ActionNodeData,
1246 String,
1247 swarm_engine_core::exploration::MapNodeState,
1248 >,
1249 ) {
1250 self.0.reevaluate(operator, ctx)
1251 }
1252
1253 fn name(&self) -> &str {
1254 self.0.name()
1255 }
1256}