Skip to main content

swarm_engine_eval/
runner.rs

1//! EvalRunner - Orchestrator を直接使用した評価実行
2
3use std::time::Duration;
4
5use tokio::runtime::Handle;
6
7use std::path::PathBuf;
8
9use swarm_engine_core::actions::ActionDef;
10use swarm_engine_core::agent::{
11    BatchInvoker, DefaultBatchManagerAgent, GenericWorker, ManagementStrategy, ManagerAgent,
12    ManagerId, WorkerAgent,
13};
14use swarm_engine_core::environment::EnvironmentBox;
15use swarm_engine_core::events::{
16    ActionEventPublisher, JsonlWriter, LearningEventChannel, LearningLifecycleHook,
17};
18use swarm_engine_core::exploration::{LearnedDependencyProvider, NodeRules, OperatorProvider};
19use swarm_engine_core::extensions::Extensions;
20use swarm_engine_core::learn::{
21    CountTrigger, LearningSnapshot, LearningStore, OfflineModel, SnapshotMetadata, TrainTrigger,
22};
23use swarm_engine_core::orchestrator::{OrchestratorBuilder, SwarmConfig};
24use swarm_engine_core::types::{GroupId, SwarmTask};
25
26use crate::environments::{
27    CodeEnvironment, DeepSearchEnvironment, InternalDiagnosisEnvironment, MazeEnvironment,
28    SearchEnvironment, TroubleshootingEnvironment,
29};
30
31use crate::aggregator::Aggregator;
32use crate::error::Result;
33use crate::metrics::RunMetrics;
34use crate::reporter::{ConfigSummary, EvalReport, SeedInfo};
35use crate::run::{EvalRun, TerminationReason};
36use crate::scenario::conditions::{ConditionValue, TimeoutBehavior};
37use crate::scenario::{EvalScenario, ManagementStrategyConfig};
38
39/// Evaluation seed for reproducibility
40///
41/// Stored in Extensions to allow Environment and other components to access it.
42#[derive(Debug, Clone, Copy)]
43pub struct EvalSeed(pub u64);
44
45/// Factory for creating ManagerAgent
46pub type ManagerFactory = Box<dyn Fn() -> Box<dyn ManagerAgent> + Send + Sync>;
47
48/// Factory for creating BatchInvoker
49pub type BatchInvokerFactory = Box<dyn Fn() -> Box<dyn BatchInvoker> + Send + Sync>;
50
51/// Factory for creating OperatorProvider
52pub type OperatorProviderFactory =
53    Box<dyn Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync>;
54
55/// Evaluation runner using Orchestrator directly
56///
57/// # Example
58///
59/// ```ignore
60/// let runner = EvalRunner::new(scenario, runtime.handle().clone())
61///     .with_runs(5)
62///     .with_seed(42)
63///     .with_task(SwarmTask::new("Find the auth handler"))
64///     .with_manager_factory(|| Box::new(MyManager::new()))
65///     .with_batch_invoker_factory(|| Box::new(MyInvoker::new()));
66///
67/// let report = runner.run()?;
68/// ```
69pub struct EvalRunner {
70    scenario: EvalScenario,
71    runtime: Handle,
72    runs: usize,
73    seed: u64,
74    /// Task to execute (optional)
75    task: Option<SwarmTask>,
76    /// Manager factory (creates new instance per run)
77    manager_factory: Option<ManagerFactory>,
78    /// BatchInvoker factory (creates new instance per run)
79    batch_invoker_factory: Option<BatchInvokerFactory>,
80    /// Extensions factory (creates new instance per run)
81    extensions_factory: Option<Box<dyn Fn() -> Extensions + Send + Sync>>,
82    /// OperatorProvider factory (creates new instance per run)
83    operator_provider_factory: Option<OperatorProviderFactory>,
84    /// Verbose output (print tick snapshots)
85    verbose: bool,
86    /// Enable ExplorationSpaceV2 tracking
87    enable_exploration: bool,
88    /// Dependency graph for action sequencing
89    dependency_graph: Option<swarm_engine_core::exploration::DependencyGraph>,
90    /// Path to save action events (JSONL format)
91    action_events_path: Option<PathBuf>,
92    /// LearningStore for cross-session learning
93    learning_store: Option<LearningStore>,
94    /// Prior learning snapshot to load before each run
95    prior_snapshot: Option<LearningSnapshot>,
96    /// Offline model for optimized parameters
97    offline_model: Option<OfflineModel>,
98    /// Shared eval count for LifecycleHook (tracks total runs across iterations)
99    eval_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
100    /// TrainTrigger for Learning (default: run after every eval)
101    train_trigger: Option<std::sync::Arc<dyn TrainTrigger>>,
102}
103
104impl EvalRunner {
105    pub fn new(scenario: EvalScenario, runtime: Handle) -> Self {
106        Self {
107            scenario,
108            runtime,
109            runs: 1,
110            seed: 42,
111            task: None,
112            manager_factory: None,
113            batch_invoker_factory: None,
114            extensions_factory: None,
115            operator_provider_factory: None,
116            verbose: false,
117            enable_exploration: false,
118            dependency_graph: None,
119            action_events_path: None,
120            learning_store: None,
121            prior_snapshot: None,
122            offline_model: None,
123            eval_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
124            train_trigger: None,
125        }
126    }
127
128    /// Enable verbose output (print tick snapshots)
129    pub fn with_verbose(mut self, verbose: bool) -> Self {
130        self.verbose = verbose;
131        self
132    }
133
134    /// Enable ExplorationSpace tracking
135    pub fn with_exploration(mut self, enable: bool) -> Self {
136        self.enable_exploration = enable;
137        self
138    }
139
140    /// Set dependency graph for action sequencing
141    ///
142    /// The graph defines valid action transitions and terminal conditions.
143    /// When set, actions will be filtered based on the graph structure.
144    pub fn with_dependency_graph(
145        mut self,
146        graph: swarm_engine_core::exploration::DependencyGraph,
147    ) -> Self {
148        self.dependency_graph = Some(graph);
149        self
150    }
151
152    /// Set path to save action events (JSONL format)
153    ///
154    /// When set, all action events will be saved to the specified path.
155    /// Each run will append to a file with the run index suffix.
156    ///
157    /// # Example
158    ///
159    /// ```ignore
160    /// runner.with_action_events_path("./logs/events.jsonl")
161    /// // Creates: ./logs/events_run0.jsonl, ./logs/events_run1.jsonl, ...
162    /// ```
163    pub fn with_action_events_path(mut self, path: impl Into<PathBuf>) -> Self {
164        self.action_events_path = Some(path.into());
165        self
166    }
167
168    /// Enable LearningStore for cross-session learning
169    ///
170    /// When enabled, statistics will be saved after each run and loaded as prior
171    /// for subsequent runs. This enables incremental learning across sessions.
172    ///
173    /// # Example
174    ///
175    /// ```ignore
176    /// runner.with_learning_store("~/.swarm-engine/learning")
177    /// ```
178    pub fn with_learning_store(mut self, path: impl AsRef<std::path::Path>) -> Self {
179        match LearningStore::new(path) {
180            Ok(store) => {
181                // Load prior snapshot for this scenario
182                // Use learning_key() for consistent directory naming (e.g., "troubleshooting" from "user:troubleshooting:v2")
183                let scenario_key = self.scenario.meta.id.learning_key();
184                self.prior_snapshot = store.load_scenario(&scenario_key).ok();
185                // Load offline model for optimized parameters
186                self.offline_model = store.load_offline_model(&scenario_key).ok();
187                if let Some(ref model) = self.offline_model {
188                    tracing::debug!(
189                        ucb1_c = model.parameters.ucb1_c,
190                        strategy = %model.strategy_config.initial_strategy,
191                        action_order = model.action_order.is_some(),
192                        "Offline model loaded"
193                    );
194                    println!(
195                        "Offline model loaded: ucb1_c={:.3}, strategy={}",
196                        model.parameters.ucb1_c, model.strategy_config.initial_strategy
197                    );
198                }
199                self.learning_store = Some(store);
200
201                // Enable LearningEventChannel for recording strategy advice events
202                LearningEventChannel::global().enable();
203            }
204            Err(e) => {
205                eprintln!("Warning: Failed to create LearningStore: {}", e);
206            }
207        }
208        self
209    }
210
211    /// Set TrainTrigger for Learning
212    ///
213    /// Controls when offline learning is executed after eval runs.
214    /// Default: Learn after every run (if learning_store is configured).
215    ///
216    /// # Example
217    ///
218    /// ```ignore
219    /// use swarm_engine_core::learn::CountTrigger;
220    ///
221    /// // Run learning after every 5 eval iterations
222    /// runner.with_train_trigger(Arc::new(CountTrigger::new(5)))
223    /// ```
224    pub fn with_train_trigger(mut self, trigger: std::sync::Arc<dyn TrainTrigger>) -> Self {
225        self.train_trigger = Some(trigger);
226        self
227    }
228
229    pub fn with_runs(mut self, runs: usize) -> Self {
230        self.runs = runs;
231        self
232    }
233
234    pub fn with_seed(mut self, seed: u64) -> Self {
235        self.seed = seed;
236        self
237    }
238
239    /// Set the task to execute
240    pub fn with_task(mut self, task: SwarmTask) -> Self {
241        self.task = Some(task);
242        self
243    }
244
245    /// Set manager factory (creates new Manager for each run)
246    pub fn with_manager_factory<F>(mut self, factory: F) -> Self
247    where
248        F: Fn() -> Box<dyn ManagerAgent> + Send + Sync + 'static,
249    {
250        self.manager_factory = Some(Box::new(factory));
251        self
252    }
253
254    /// Set batch invoker factory (creates new BatchInvoker for each run)
255    pub fn with_batch_invoker_factory<F>(mut self, factory: F) -> Self
256    where
257        F: Fn() -> Box<dyn BatchInvoker> + Send + Sync + 'static,
258    {
259        self.batch_invoker_factory = Some(Box::new(factory));
260        self
261    }
262
263    /// Set extensions factory (creates new Extensions for each run)
264    pub fn with_extensions_factory<F>(mut self, factory: F) -> Self
265    where
266        F: Fn() -> Extensions + Send + Sync + 'static,
267    {
268        self.extensions_factory = Some(Box::new(factory));
269        self
270    }
271
272    /// Set OperatorProvider factory (creates new provider for each run)
273    ///
274    /// Use this to configure the Selection strategy for exploration.
275    /// Default is `AdaptiveProvider` if not specified.
276    ///
277    /// # Example
278    ///
279    /// ```ignore
280    /// use swarm_engine_core::exploration::{HybridLlmProvider, ReviewPolicy};
281    /// use swarm_engine_llm::LlmStrategyAdvisor;
282    ///
283    /// runner.with_operator_provider_factory(|| {
284    ///     let advisor = LlmStrategyAdvisor::new(decider.clone(), handle.clone());
285    ///     let policy = ReviewPolicy::default();
286    ///     Box::new(HybridLlmProvider::new(advisor, policy))
287    /// })
288    /// ```
289    pub fn with_operator_provider_factory<F>(mut self, factory: F) -> Self
290    where
291        F: Fn() -> Box<dyn OperatorProvider<NodeRules>> + Send + Sync + 'static,
292    {
293        self.operator_provider_factory = Some(Box::new(factory));
294        self
295    }
296
297    pub fn run(&self) -> Result<EvalReport> {
298        let mut eval_runs = Vec::with_capacity(self.runs);
299        let mut run_seeds = Vec::with_capacity(self.runs);
300        let mut first_dependency_graph = None;
301
302        // Generate GroupId for this eval run (all iterations share the same GroupId)
303        // This enables DPO learning to compare multiple executions under the same conditions
304        let group_id = GroupId::new();
305
306        for i in 0..self.runs {
307            let run_seed = self.seed.wrapping_add(i as u64);
308            run_seeds.push(run_seed);
309
310            let (result, dep_graph) = self.run_single(i, run_seed, group_id)?;
311            eval_runs.push(result);
312
313            // Keep the first run's DependencyGraph for action_order caching
314            if i == 0 {
315                first_dependency_graph = dep_graph;
316            }
317        }
318
319        let aggregated = Aggregator::aggregate(&eval_runs);
320
321        // Offline learning is now executed via LifecycleHook (on_terminate)
322        // The TrainTrigger controls when learning runs (default: after all runs complete)
323        //
324        // TODO: action_order saving should be moved to LearningLifecycleHook's learn_callback
325        // For now, save action_order separately if DependencyGraph is available
326        if let (Some(ref store), Some(ref graph)) = (&self.learning_store, &first_dependency_graph)
327        {
328            if graph.has_action_order() {
329                let scenario_key = self.scenario.meta.id.learning_key();
330
331                // Load existing offline model and update action_order
332                if let Ok(mut model) = store.load_offline_model(&scenario_key) {
333                    use swarm_engine_core::learn::LearnedActionOrder;
334
335                    let action_order = LearnedActionOrder::new(
336                        graph.discover_order().to_vec(),
337                        graph.not_discover_order().to_vec(),
338                        graph.available_actions(),
339                    );
340                    model.action_order = Some(action_order.clone());
341
342                    if let Err(e) = store.save_offline_model(&scenario_key, &model) {
343                        tracing::warn!("Failed to save action_order: {}", e);
344                    } else {
345                        println!(
346                            "Action order saved: discover={:?}, not_discover={:?}",
347                            action_order.discover, action_order.not_discover
348                        );
349                    }
350                }
351            }
352        }
353
354        Ok(EvalReport {
355            config_summary: ConfigSummary {
356                scenario_name: self.scenario.meta.name.clone(),
357                scenario_id: self.scenario.meta.id.to_string(),
358                worker_count: self.scenario.agents.workers.iter().map(|w| w.count).sum(),
359                max_ticks: self.scenario.app_config.max_ticks,
360                run_count: self.runs,
361            },
362            seed_info: SeedInfo {
363                base_seed: self.seed,
364                run_seeds,
365            },
366            runs: eval_runs,
367            aggregated,
368            assertion_results: vec![],
369        })
370    }
371
372    /// Run a single evaluation iteration
373    ///
374    /// # Arguments
375    /// * `index` - The iteration index (0-based)
376    /// * `seed` - The random seed for this iteration
377    /// * `group_id` - The group ID shared by all iterations in this eval run.
378    ///    Used for DPO learning to compare multiple executions.
379    fn run_single(
380        &self,
381        index: usize,
382        seed: u64,
383        group_id: GroupId,
384    ) -> Result<(
385        EvalRun,
386        Option<swarm_engine_core::exploration::DependencyGraph>,
387    )> {
388        let workers = self.build_workers();
389
390        // Build ManagementStrategy from scenario config
391        let management_strategy = self.build_management_strategy();
392
393        let config = SwarmConfig {
394            tick_duration: Duration::from_millis(self.scenario.app_config.tick_duration_ms),
395            max_ticks: self.scenario.app_config.max_ticks,
396            management_strategy,
397        };
398
399        // Start building Orchestrator
400        let mut builder = OrchestratorBuilder::new().config(config);
401
402        // Add workers
403        for worker in workers {
404            builder = builder.add_worker_boxed(worker);
405        }
406
407        // Add Managers: from factory if provided, otherwise from scenario templates
408        if let Some(factory) = &self.manager_factory {
409            let manager = factory();
410            builder = builder.manager(DynManagerWrapper(manager));
411        } else {
412            // Build managers from scenario templates
413            let managers = self.build_managers();
414            for manager in managers {
415                builder = builder.manager(manager);
416            }
417        }
418
419        // Add BatchInvoker if factory provided
420        if let Some(factory) = &self.batch_invoker_factory {
421            let invoker = factory();
422            builder = builder.batch_invoker(DynBatchInvokerWrapper(invoker));
423        }
424
425        // Build Extensions from scenario (includes LlmConfig, ActionsConfig, EvalSeed)
426        let extensions = self.build_extensions_from_scenario(seed);
427        builder = builder.extensions(extensions);
428
429        // Enable ExplorationSpace if requested (from scenario or explicit setting)
430        if self.enable_exploration || self.scenario.app_config.enable_exploration {
431            builder = builder.with_exploration();
432        }
433
434        // Apply offline model for optimized parameters (if no explicit OperatorProvider factory)
435        if self.operator_provider_factory.is_none() {
436            if let Some(ref model) = self.offline_model {
437                builder = builder.with_offline_model(model.clone());
438                println!(
439                    "Offline model applied: ucb1_c={:.3}, maturity={}, strategy={}",
440                    model.parameters.ucb1_c,
441                    model.strategy_config.maturity_threshold,
442                    model.strategy_config.initial_strategy
443                );
444            }
445        }
446
447        // Apply learned action order if available (independent of OperatorProvider)
448        // This allows skipping LLM for DependencyGraph generation
449        if let Some(ref model) = self.offline_model {
450            if let Some(ref action_order) = model.action_order {
451                let provider = LearnedDependencyProvider::new(action_order.clone());
452                builder = builder.dependency_provider(provider);
453                println!(
454                    "Learned action order applied: discover={:?}, not_discover={:?}",
455                    action_order.discover, action_order.not_discover
456                );
457            }
458        }
459
460        // Set OperatorProvider if factory provided (overrides offline model)
461        if let Some(factory) = &self.operator_provider_factory {
462            let provider = factory();
463            builder = builder.operator_provider(DynOperatorProviderWrapper(provider));
464        }
465
466        // Set ActionEventPublisher if action_events_path is configured
467        let jsonl_handle = if let Some(base_path) = &self.action_events_path {
468            // Create path with run index suffix
469            let path = if self.runs > 1 {
470                let stem = base_path.file_stem().unwrap_or_default().to_string_lossy();
471                let ext = base_path.extension().unwrap_or_default().to_string_lossy();
472                let parent = base_path.parent().unwrap_or(std::path::Path::new("."));
473                parent.join(format!("{}_run{}.{}", stem, index, ext))
474            } else {
475                base_path.clone()
476            };
477
478            let (publisher, _rx) = ActionEventPublisher::new(1024);
479            let jsonl_rx = publisher.subscribe();
480            let writer = JsonlWriter::new(jsonl_rx, path);
481
482            // Spawn JsonlWriter in background
483            let handle = self.runtime.spawn(async move {
484                if let Err(e) = writer.run().await {
485                    eprintln!("JsonlWriter error: {}", e);
486                }
487            });
488
489            builder = builder.action_collector(publisher);
490            Some(handle)
491        } else {
492            None
493        };
494
495        // Set LifecycleHook for automatic learning trigger
496        if let Some(ref store) = self.learning_store {
497            let scenario_key = self.scenario.meta.id.learning_key();
498            let mut hook = LearningLifecycleHook::new(store.storage().base_dir())
499                .with_scenario(scenario_key)
500                .with_shared_eval_count(std::sync::Arc::clone(&self.eval_count));
501
502            // Apply custom trigger if configured, otherwise use CountTrigger(runs)
503            // This means learning runs after all iterations complete
504            if let Some(ref trigger) = self.train_trigger {
505                hook = hook.with_trigger(std::sync::Arc::clone(trigger));
506            } else {
507                // Default: run learning after all runs in this eval complete
508                hook = hook.with_trigger(std::sync::Arc::new(CountTrigger::new(self.runs)));
509            }
510
511            builder = builder.lifecycle_hook(Box::new(hook));
512        }
513
514        let mut orchestrator = builder.build(self.runtime.clone());
515
516        // Enable partitioning when multiple managers are configured
517        let manager_count = self
518            .scenario
519            .agents
520            .managers
521            .iter()
522            .map(|t| t.count)
523            .sum::<usize>();
524        if manager_count > 1 {
525            orchestrator.enable_partitioning();
526        }
527
528        // Determine task: prefer explicit task, fallback to scenario task
529        // Set group_id for DPO learning (all iterations in this eval share the same GroupId)
530        let task_to_run = self
531            .task
532            .clone()
533            .or_else(|| self.build_task_from_scenario())
534            .map(|task| task.with_group_id(group_id));
535
536        // Run with task if available, otherwise run without task
537        // run_task() returns Result to enforce DependencyGraph requirement
538        let result = if let Some(task) = task_to_run {
539            orchestrator.run_task(task)?
540        } else {
541            orchestrator.run()
542        };
543
544        let state = orchestrator.state();
545        let timed_out = result.total_ticks >= self.scenario.app_config.max_ticks;
546        let environment_done = state.shared.is_environment_done();
547        let total_actions = state.shared.stats.total_visits() as u64;
548        let successful_actions = state.shared.stats.total_successes() as u64;
549        let llm_invocations = state.shared.llm_invocations();
550        let llm_invoke_errors = state.shared.llm_errors();
551
552        let metrics = RunMetrics {
553            task: crate::metrics::TaskMetrics {
554                total_ticks: result.total_ticks,
555                total_tasks: 0,
556                completed_tasks: 0,
557                total_actions,
558                successful_actions,
559                success_rate: state.shared.stats.success_rate(),
560            },
561            coordination: crate::metrics::CoordinationMetrics {
562                // manager_activations は廃止。LLM 呼び出し回数で近似(1 batch = 1 activation として)
563                manager_activations: llm_invocations,
564                manager_intervention_rate: if result.total_ticks > 0 {
565                    llm_invocations as f64 / result.total_ticks as f64
566                } else {
567                    0.0
568                },
569                ..Default::default()
570            },
571            performance: {
572                let llm_error_rate = if llm_invocations > 0 {
573                    llm_invoke_errors as f64 / llm_invocations as f64
574                } else {
575                    0.0
576                };
577                crate::metrics::PerformanceMetrics {
578                    total_duration_ms: result.total_duration.as_millis() as f64,
579                    avg_tick_latency_ms: if result.total_ticks > 0 {
580                        result.total_duration.as_millis() as f64 / result.total_ticks as f64
581                    } else {
582                        0.0
583                    },
584                    raw_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
585                        total_actions as f64 / result.total_duration.as_secs_f64()
586                    } else {
587                        0.0
588                    },
589                    effective_throughput_per_sec: if result.total_duration.as_secs_f64() > 0.0 {
590                        successful_actions as f64 / result.total_duration.as_secs_f64()
591                    } else {
592                        0.0
593                    },
594                    llm_invocations,
595                    llm_invoke_errors,
596                    llm_error_rate,
597                    ..Default::default()
598                }
599            },
600            robustness: Default::default(),
601        };
602
603        // Evaluate success/failure conditions
604        let (success, termination_reason) = if !result.completed {
605            (false, TerminationReason::Stopped)
606        } else {
607            self.evaluate_conditions(&metrics, environment_done, timed_out)
608        };
609
610        // Save statistics to LearningStore if configured
611        if let Some(ref store) = self.learning_store {
612            // Use learning_key for directory naming (e.g., "troubleshooting")
613            // but keep display name in metadata for human readability
614            let scenario_key = self.scenario.meta.id.learning_key();
615            let metadata = SnapshotMetadata::default()
616                .with_scenario(&scenario_key)
617                .with_task(&self.scenario.task.goal);
618
619            // Get LearnStats from LearnedProvider if available
620            let learn_stats_opt = orchestrator.learned_provider().and_then(|p| p.stats());
621
622            let snapshot = if let Some(learn_stats) = learn_stats_opt {
623                LearningSnapshot {
624                    version: swarm_engine_core::learn::SNAPSHOT_VERSION,
625                    metadata,
626                    episode_transitions: learn_stats.episode_transitions.clone(),
627                    ngram_stats: learn_stats.ngram_stats.clone(),
628                    selection_performance: learn_stats.selection_performance.clone(),
629                    contextual_stats: learn_stats
630                        .contextual_stats
631                        .iter()
632                        .map(|(k, v)| {
633                            (
634                                k.clone(),
635                                swarm_engine_core::online_stats::ActionStats {
636                                    visits: v.visits,
637                                    successes: v.successes,
638                                    failures: v.failures,
639                                    ..Default::default()
640                                },
641                            )
642                        })
643                        .collect(),
644                    action_stats: state
645                        .shared
646                        .stats
647                        .all_action_stats()
648                        .map(|(k, v)| (k.clone(), v.clone()))
649                        .collect(),
650                }
651            } else {
652                // No LearnStats available - create snapshot with only action_stats
653                LearningSnapshot {
654                    version: swarm_engine_core::learn::SNAPSHOT_VERSION,
655                    metadata,
656                    episode_transitions: Default::default(),
657                    ngram_stats: Default::default(),
658                    selection_performance: Default::default(),
659                    contextual_stats: Default::default(),
660                    action_stats: state
661                        .shared
662                        .stats
663                        .all_action_stats()
664                        .map(|(k, v)| (k.clone(), v.clone()))
665                        .collect(),
666                }
667            };
668
669            if let Err(e) = store.save_session(&scenario_key, &snapshot) {
670                eprintln!("Warning: Failed to save learning data: {}", e);
671            }
672        }
673
674        // Extract DependencyGraph and learn_record before dropping orchestrator
675        let dependency_graph = orchestrator.dependency_graph().cloned();
676        let learn_record = dependency_graph
677            .as_ref()
678            .and_then(|g| g.learn_record().cloned());
679
680        // Save Episode with DependencyGraphRecord if learn_record is available and learning is enabled
681        if let (Some(ref store), Some(record)) = (&self.learning_store, learn_record) {
682            use std::fs::OpenOptions;
683            use std::io::Write;
684            use swarm_engine_core::learn::{Episode, EpisodeContext, Outcome};
685
686            let scenario_key = self.scenario.meta.id.learning_key();
687            let ticks = result.total_ticks as u32;
688            let max_ticks = self.scenario.app_config.max_ticks as u32;
689
690            // Determine Outcome based on task completion status
691            // - Success: Environment reached its goal (normalized score by ticks)
692            // - Timeout: Hit max_ticks without completing
693            // - Failure: Environment did not complete for other reasons
694            let outcome = if environment_done {
695                // Score: higher is better, normalized by max_ticks
696                // Fast completion (low ticks) = high score
697                let score = 1.0 - (ticks as f64 / max_ticks as f64).min(1.0);
698                Outcome::success(score.max(0.01)) // Minimum score for successful completion
699            } else if timed_out {
700                // Timed out without completing - use partial score based on progress
701                let partial_score = (ticks as f64 / max_ticks as f64).min(0.99);
702                Outcome::timeout(Some(partial_score))
703            } else {
704                // Failed for other reasons (e.g., all workers idle, error)
705                Outcome::failure(format!("Task not completed at tick {}", ticks))
706            };
707            let is_success = outcome.is_success();
708
709            // Build Episode with DependencyGraphRecord in context
710            let context = EpisodeContext::new().with_record(record);
711            let episode = Episode::builder()
712                .learn_model("dependency_graph")
713                .context(context)
714                .outcome(outcome)
715                .scenario(&scenario_key)
716                .build();
717
718            // Save to JSONL file
719            let path = store.storage().base_dir().join(format!(
720                "scenarios/{}/dep_graph_episodes.jsonl",
721                scenario_key
722            ));
723
724            if let Some(parent) = path.parent() {
725                let _ = std::fs::create_dir_all(parent);
726            }
727
728            if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(&path) {
729                if let Ok(json) = serde_json::to_string(&episode) {
730                    let _ = writeln!(file, "{}", json);
731                    tracing::debug!(
732                        path = %path.display(),
733                        success = is_success,
734                        "Saved Episode with DependencyGraphRecord"
735                    );
736                }
737            }
738        }
739
740        // Drop orchestrator to close ActionEventPublisher channel
741        drop(orchestrator);
742
743        // Wait for JsonlWriter to finish writing
744        if let Some(handle) = jsonl_handle {
745            // Use block_on to wait synchronously
746            let _ = self.runtime.block_on(handle);
747        }
748
749        Ok((
750            EvalRun::new(index, seed, success, termination_reason, metrics),
751            dependency_graph,
752        ))
753    }
754
755    fn build_workers(&self) -> Vec<Box<dyn WorkerAgent>> {
756        let mut workers: Vec<Box<dyn WorkerAgent>> = Vec::new();
757
758        for template in &self.scenario.agents.workers {
759            for i in 0..template.count {
760                let id = workers.len();
761                let name = template.id_pattern.replace("{i}", &i.to_string());
762
763                let worker = GenericWorker::new(id)
764                    .with_name(name)
765                    .with_require_guidance(true);
766
767                workers.push(Box::new(worker));
768            }
769        }
770
771        workers
772    }
773
774    fn build_managers(&self) -> Vec<DefaultBatchManagerAgent> {
775        let mut managers = Vec::new();
776        let mut manager_index = 0;
777
778        for template in &self.scenario.agents.managers {
779            let ids = template.generate_ids();
780            for name in ids {
781                let manager = DefaultBatchManagerAgent::new(ManagerId(manager_index))
782                    .with_name(name)
783                    .with_interval(self.scenario.manager.process_interval_ticks);
784
785                managers.push(manager);
786                manager_index += 1;
787            }
788        }
789
790        // デフォルト: Manager テンプレートがない場合は 1 つ作成
791        if managers.is_empty() {
792            managers.push(
793                DefaultBatchManagerAgent::new(ManagerId(0))
794                    .with_name("default_manager")
795                    .with_interval(self.scenario.manager.process_interval_ticks),
796            );
797        }
798
799        managers
800    }
801
802    fn build_management_strategy(&self) -> ManagementStrategy {
803        match &self.scenario.app_config.management_strategy {
804            ManagementStrategyConfig::EveryTick {} => ManagementStrategy::EveryTick,
805            ManagementStrategyConfig::IntervalBased { max_interval } => {
806                ManagementStrategy::FixedInterval {
807                    interval: *max_interval,
808                }
809            }
810            ManagementStrategyConfig::EventDriven { triggers: _ } => {
811                // Event-driven maps to completion-based
812                ManagementStrategy::CompletionBased { max_wait_ticks: 50 }
813            }
814            ManagementStrategyConfig::Hybrid {
815                max_interval,
816                triggers: _,
817            } => ManagementStrategy::Hybrid {
818                preferred_interval: *max_interval,
819                force_after_ticks: max_interval * 2,
820            },
821            ManagementStrategyConfig::Disabled {} => {
822                // Disabled = very large interval (effectively never)
823                ManagementStrategy::FixedInterval { interval: u64::MAX }
824            }
825        }
826    }
827
828    /// Build SwarmTask from scenario task config
829    ///
830    /// Returns None if task goal is empty
831    fn build_task_from_scenario(&self) -> Option<SwarmTask> {
832        let task_config = &self.scenario.task;
833
834        if task_config.goal.is_empty() {
835            return None;
836        }
837
838        // Build context JSON object
839        let mut context = serde_json::Map::new();
840
841        if let Some(target_path) = &task_config.context.target_path {
842            context.insert(
843                "target_path".to_string(),
844                serde_json::Value::String(target_path.clone()),
845            );
846        }
847        if let Some(working_dir) = &task_config.context.working_dir {
848            context.insert(
849                "working_dir".to_string(),
850                serde_json::Value::String(working_dir.clone()),
851            );
852        }
853        if let Some(max_depth) = task_config.context.max_depth {
854            context.insert(
855                "max_depth".to_string(),
856                serde_json::Value::Number(serde_json::Number::from(max_depth)),
857            );
858        }
859
860        // Add extra context (convert toml::Value to serde_json::Value)
861        for (key, value) in &task_config.context.extra {
862            if let Ok(json_value) = serde_json::to_value(value) {
863                context.insert(key.clone(), json_value);
864            }
865        }
866
867        let task =
868            SwarmTask::new(&task_config.goal).with_context(serde_json::Value::Object(context));
869
870        Some(task)
871    }
872
873    /// Build Extensions with LlmConfig, ActionsConfig, and EvalSeed from scenario
874    fn build_extensions_from_scenario(&self, seed: u64) -> Extensions {
875        let mut extensions = if let Some(factory) = &self.extensions_factory {
876            factory()
877        } else {
878            Extensions::new()
879        };
880
881        // Insert EvalSeed for reproducibility
882        extensions.insert(EvalSeed(seed));
883
884        // Insert LlmConfig for BatchInvoker/Manager to use
885        extensions.insert(self.scenario.llm.clone());
886
887        // Insert LoRA config if specified (for BatchInvoker to use)
888        if let Some(ref lora) = self.scenario.llm.lora {
889            extensions.insert(lora.clone());
890        }
891
892        // Insert ManagerConfig for Manager to use
893        extensions.insert(self.scenario.manager.clone());
894
895        // Insert BatchProcessorConfig for BatchInvoker to use
896        extensions.insert(self.scenario.batch_processor.clone());
897
898        // Convert EvalActionsConfig to Core ActionsConfig for Worker/Manager to use
899        let core_actions_config = self.scenario.actions.to_core_config();
900        extensions.insert(core_actions_config);
901
902        // Create and insert Environment based on env_type
903        let env_type = self.scenario.environment.env_type.as_str();
904        let env_params = &self.scenario.environment.params;
905
906        let env_box: Option<EnvironmentBox> = match env_type {
907            "maze" => {
908                let map = env_params.get("map").and_then(|v| v.as_str()).unwrap_or("");
909                let worker_count = env_params
910                    .get("worker_count")
911                    .and_then(|v| v.as_u64())
912                    .unwrap_or(1) as usize;
913                Some(Box::new(MazeEnvironment::from_str(map, worker_count)))
914            }
915            "code" => {
916                // Currently only "auth" scenario is supported, default to it
917                Some(Box::new(CodeEnvironment::auth_scenario()))
918            }
919            "troubleshooting" => {
920                let scenario_name = env_params
921                    .get("scenario")
922                    .and_then(|v| v.as_str())
923                    .unwrap_or("memory_leak");
924                let env = match scenario_name {
925                    "memory_leak" => TroubleshootingEnvironment::memory_leak_scenario(),
926                    "cpu_spike" => TroubleshootingEnvironment::cpu_spike_scenario(),
927                    "network_timeout" => TroubleshootingEnvironment::network_timeout_scenario(),
928                    "medium" => TroubleshootingEnvironment::complex_scenario(15, 3, 2, seed),
929                    "high" => TroubleshootingEnvironment::complex_scenario(30, 8, 3, seed),
930                    "extreme" => TroubleshootingEnvironment::complex_scenario(50, 15, 4, seed),
931                    "complex" => {
932                        let total_services = env_params
933                            .get("total_services")
934                            .and_then(|v| v.as_u64())
935                            .unwrap_or(15) as usize;
936                        let noise_services = env_params
937                            .get("noise_services")
938                            .and_then(|v| v.as_u64())
939                            .unwrap_or(3) as usize;
940                        let cascade_depth = env_params
941                            .get("cascade_depth")
942                            .and_then(|v| v.as_u64())
943                            .unwrap_or(2) as usize;
944                        TroubleshootingEnvironment::complex_scenario(
945                            total_services,
946                            noise_services,
947                            cascade_depth,
948                            seed,
949                        )
950                    }
951                    _ => TroubleshootingEnvironment::memory_leak_scenario(),
952                };
953                Some(Box::new(env))
954            }
955            "search" => {
956                let scenario_name = env_params
957                    .get("scenario")
958                    .and_then(|v| v.as_str())
959                    .unwrap_or("basic");
960                let env = match scenario_name {
961                    "basic" => SearchEnvironment::basic_scenario(),
962                    "medium" => SearchEnvironment::medium_scenario(),
963                    "large" => SearchEnvironment::large_scenario(),
964                    "custom" => {
965                        let file_count = env_params
966                            .get("file_count")
967                            .and_then(|v| v.as_u64())
968                            .unwrap_or(5) as usize;
969                        let target_index = env_params
970                            .get("target_index")
971                            .and_then(|v| v.as_u64())
972                            .unwrap_or(2) as usize;
973                        SearchEnvironment::custom_scenario(file_count, target_index, seed)
974                    }
975                    _ => SearchEnvironment::basic_scenario(),
976                };
977                Some(Box::new(env))
978            }
979            "internal_diagnosis" => {
980                let scenario_name = env_params
981                    .get("scenario")
982                    .and_then(|v| v.as_str())
983                    .unwrap_or("routing");
984                let env = match scenario_name {
985                    "routing" => InternalDiagnosisEnvironment::routing_error_scenario(),
986                    "failover" => InternalDiagnosisEnvironment::failover_error_scenario(),
987                    "worker_pool" => InternalDiagnosisEnvironment::worker_pool_scenario(),
988                    "strategy" => InternalDiagnosisEnvironment::strategy_mismatch_scenario(),
989                    "exploration" => InternalDiagnosisEnvironment::exploration_depth_scenario(),
990                    "complex" => InternalDiagnosisEnvironment::complex_scenario(seed),
991                    _ => InternalDiagnosisEnvironment::routing_error_scenario(),
992                };
993                Some(Box::new(env))
994            }
995            "deep_search" => {
996                let scenario_name = env_params
997                    .get("scenario")
998                    .and_then(|v| v.as_str())
999                    .unwrap_or("tech_question");
1000                let env = match scenario_name {
1001                    "tech_question" | _ => DeepSearchEnvironment::tech_question_scenario(),
1002                };
1003                Some(Box::new(env))
1004            }
1005            // Real filesystem environment (Bash, Read, Write, Grep, Glob)
1006            "default" | "realworld" => {
1007                use swarm_engine_core::environment::DefaultEnvironment;
1008                let working_dir = env_params
1009                    .get("working_dir")
1010                    .and_then(|v| v.as_str())
1011                    .map(std::path::PathBuf::from);
1012                let env = if let Some(dir) = working_dir {
1013                    DefaultEnvironment::with_working_dir(dir)
1014                } else {
1015                    DefaultEnvironment::new()
1016                };
1017                Some(Box::new(env))
1018            }
1019            _ => None, // Unknown env_type - no Environment inserted
1020        };
1021
1022        if let Some(env) = env_box {
1023            extensions.insert(env);
1024        }
1025
1026        // Insert DependencyGraph if specified (explicit or from scenario)
1027        let graph = self.dependency_graph.clone().or_else(|| {
1028            self.scenario.dependency_graph.as_ref().and_then(|cfg| {
1029                let action_names = self.scenario.actions.action_names();
1030                cfg.to_core_graph(&action_names)
1031            })
1032        });
1033        if let Some(g) = graph {
1034            extensions.insert(g);
1035        }
1036
1037        // Insert prior learning snapshot if available
1038        // OrchestratorBuilder.build() will call stats.load_prior()
1039        if let Some(ref prior) = self.prior_snapshot {
1040            extensions.insert(prior.clone());
1041        }
1042
1043        extensions
1044    }
1045
1046    /// Evaluate scenario conditions to determine success/failure
1047    ///
1048    /// Returns (success, termination_reason)
1049    fn evaluate_conditions(
1050        &self,
1051        metrics: &RunMetrics,
1052        environment_done: bool,
1053        timed_out: bool,
1054    ) -> (bool, TerminationReason) {
1055        let conditions = &self.scenario.conditions;
1056
1057        // 1. Check failure conditions first (any match = fail)
1058        for condition in &conditions.failure {
1059            if let Some(actual) =
1060                self.get_metric_value(&condition.metric, metrics, environment_done)
1061            {
1062                if condition.evaluate(&actual) {
1063                    return (false, TerminationReason::Failure);
1064                }
1065            }
1066        }
1067
1068        // 2. Handle timeout
1069        if timed_out {
1070            return match conditions.on_timeout {
1071                TimeoutBehavior::Fail => (false, TerminationReason::Timeout),
1072                TimeoutBehavior::PartialSuccess => {
1073                    // Check if success conditions are met
1074                    let success = self.check_success_conditions(metrics, environment_done);
1075                    (success, TerminationReason::Timeout)
1076                }
1077                TimeoutBehavior::MilestoneScore => {
1078                    // TODO: Implement milestone scoring
1079                    (false, TerminationReason::Timeout)
1080                }
1081            };
1082        }
1083
1084        // 3. Check success conditions (all must match)
1085        let success = self.check_success_conditions(metrics, environment_done);
1086        if success {
1087            (true, TerminationReason::Success)
1088        } else {
1089            // Not yet successful, but no failure conditions met either
1090            // This shouldn't happen if called after completion
1091            (false, TerminationReason::Stopped)
1092        }
1093    }
1094
1095    /// Check if all success conditions are met
1096    fn check_success_conditions(&self, metrics: &RunMetrics, environment_done: bool) -> bool {
1097        let conditions = &self.scenario.conditions;
1098
1099        // If no success conditions defined, consider successful
1100        if conditions.success.is_empty() {
1101            return true;
1102        }
1103
1104        // All conditions must pass
1105        conditions.success.iter().all(|condition| {
1106            self.get_metric_value(&condition.metric, metrics, environment_done)
1107                .map(|actual| condition.evaluate(&actual))
1108                .unwrap_or(false)
1109        })
1110    }
1111
1112    /// Get metric value by path (e.g., "environment.done", "task.success_rate")
1113    fn get_metric_value(
1114        &self,
1115        path: &str,
1116        metrics: &RunMetrics,
1117        environment_done: bool,
1118    ) -> Option<ConditionValue> {
1119        match path {
1120            // Environment metrics
1121            "environment.done" => Some(ConditionValue::Bool(environment_done)),
1122
1123            // Task metrics
1124            "task.total_ticks" | "total_ticks" => {
1125                Some(ConditionValue::Integer(metrics.task.total_ticks as i64))
1126            }
1127            "task.success_rate" | "success_rate" => {
1128                Some(ConditionValue::Float(metrics.task.success_rate))
1129            }
1130            "task.total_actions" | "total_actions" => {
1131                Some(ConditionValue::Integer(metrics.task.total_actions as i64))
1132            }
1133            "task.successful_actions" | "successful_actions" => Some(ConditionValue::Integer(
1134                metrics.task.successful_actions as i64,
1135            )),
1136
1137            // Performance metrics
1138            "performance.llm_error_rate" | "llm_error_rate" => {
1139                Some(ConditionValue::Float(metrics.performance.llm_error_rate))
1140            }
1141            "performance.llm_invocations" | "llm_invocations" => Some(ConditionValue::Integer(
1142                metrics.performance.llm_invocations as i64,
1143            )),
1144
1145            // Coordination metrics
1146            "coordination.manager_activations" | "manager_activations" => Some(
1147                ConditionValue::Integer(metrics.coordination.manager_activations as i64),
1148            ),
1149
1150            // Error metrics (from failed_actions)
1151            "errors.count" => {
1152                let failed = metrics
1153                    .task
1154                    .total_actions
1155                    .saturating_sub(metrics.task.successful_actions);
1156                Some(ConditionValue::Integer(failed as i64))
1157            }
1158
1159            // Unknown metric
1160            _ => None,
1161        }
1162    }
1163}
1164
1165// Wrapper to convert Box<dyn ManagerAgent> to impl ManagerAgent
1166struct DynManagerWrapper(Box<dyn ManagerAgent>);
1167
1168impl ManagerAgent for DynManagerWrapper {
1169    fn prepare(
1170        &self,
1171        context: &swarm_engine_core::agent::TaskContext,
1172    ) -> swarm_engine_core::agent::BatchDecisionRequest {
1173        self.0.prepare(context)
1174    }
1175
1176    fn finalize(
1177        &self,
1178        context: &swarm_engine_core::agent::TaskContext,
1179        responses: Vec<(
1180            swarm_engine_core::types::WorkerId,
1181            swarm_engine_core::agent::DecisionResponse,
1182        )>,
1183    ) -> swarm_engine_core::agent::ManagementDecision {
1184        self.0.finalize(context, responses)
1185    }
1186
1187    fn id(&self) -> swarm_engine_core::agent::ManagerId {
1188        self.0.id()
1189    }
1190
1191    fn name(&self) -> &str {
1192        self.0.name()
1193    }
1194}
1195
1196// Wrapper to convert Box<dyn BatchInvoker> to impl BatchInvoker
1197struct DynBatchInvokerWrapper(Box<dyn BatchInvoker>);
1198
1199impl BatchInvoker for DynBatchInvokerWrapper {
1200    fn invoke(
1201        &self,
1202        request: swarm_engine_core::agent::BatchDecisionRequest,
1203        extensions: &swarm_engine_core::extensions::Extensions,
1204    ) -> swarm_engine_core::agent::BatchInvokeResult {
1205        self.0.invoke(request, extensions)
1206    }
1207
1208    fn plan_dependencies(
1209        &self,
1210        task: &str,
1211        actions: &[ActionDef],
1212    ) -> Option<swarm_engine_core::exploration::DependencyGraph> {
1213        self.0.plan_dependencies(task, actions)
1214    }
1215
1216    fn name(&self) -> &str {
1217        self.0.name()
1218    }
1219}
1220
1221// Wrapper to convert Box<dyn OperatorProvider<NodeRules>> to impl OperatorProvider<NodeRules>
1222struct DynOperatorProviderWrapper(Box<dyn OperatorProvider<NodeRules>>);
1223
1224impl OperatorProvider<NodeRules> for DynOperatorProviderWrapper {
1225    fn provide(
1226        &self,
1227        rules: NodeRules,
1228        context: Option<
1229            &swarm_engine_core::exploration::ProviderContext<
1230                '_,
1231                swarm_engine_core::exploration::ActionNodeData,
1232                String,
1233                swarm_engine_core::exploration::MapNodeState,
1234            >,
1235        >,
1236    ) -> swarm_engine_core::exploration::ConfigurableOperator<NodeRules> {
1237        self.0.provide(rules, context)
1238    }
1239
1240    fn reevaluate(
1241        &self,
1242        operator: &mut swarm_engine_core::exploration::ConfigurableOperator<NodeRules>,
1243        ctx: &swarm_engine_core::exploration::ProviderContext<
1244            '_,
1245            swarm_engine_core::exploration::ActionNodeData,
1246            String,
1247            swarm_engine_core::exploration::MapNodeState,
1248        >,
1249    ) {
1250        self.0.reevaluate(operator, ctx)
1251    }
1252
1253    fn name(&self) -> &str {
1254        self.0.name()
1255    }
1256}