Skip to main content

swarm_engine_core/orchestrator/
mod.rs

1//! SwarmEngine Orchestrator
2//!
3//! メインループを管理し、Tick駆動でAgent Swarmを実行する。
4//!
5//! # DependencyGraph 自動判別
6//!
7//! DependencyGraph(タスク依存関係グラフ)は以下の優先順位で**自動的に**取得・生成される:
8//!
9//! 1. **DependencyGraphProvider** - 明示的に設定されたプロバイダーを使用
10//! 2. **BatchInvoker.plan_dependencies()** - LLM がタスクとアクション一覧から依存関係を推論
11//! 3. **Extensions に登録された DependencyGraph** - 静的な依存関係グラフ
12//!
13//! シナリオファイル(TOML)で `dependency_graph` を設定しなくても、
14//! `batch_invoker` が設定されていれば LLM が自動生成を試みる。
15//! 生成に失敗した場合(LLM が適切なグラフを出力できない等)はエラーとなる。
16//!
17//! ## 例:自動生成が成功するケース
18//!
19//! ```text
20//! Task: "Diagnose and restart the failing service"
21//! Actions: ["CheckStatus", "ReadLogs", "Diagnose", "Restart"]
22//! → LLM が CheckStatus → ReadLogs → Diagnose → Restart の依存関係を推論
23//! ```
24//!
25//! ## 例:自動生成が失敗するケース
26//!
27//! ```text
28//! Task: "Navigate through the maze"
29//! Actions: ["Move", "Look", "Wait"]
30//! → LLM が適切な依存関係を推論できない場合がある
31//! → シナリオファイルで明示的に dependency_graph を設定する必要がある
32//! ```
33
34mod adapter;
35mod builder;
36mod config;
37mod execution;
38mod lifecycle;
39mod manager;
40mod merge;
41pub mod termination;
42
43pub use adapter::WorkResultAdapter;
44pub use builder::OrchestratorBuilder;
45pub use config::{SwarmConfig, SwarmResult};
46pub use termination::{TerminationConfig, TerminationJudge, TerminationVerdict};
47
48use std::sync::Arc;
49use std::time::Instant;
50
51use tracing::{debug, info};
52
53use crate::actions::ActionDef;
54use crate::agent::{Analyzer, BatchInvoker, DefaultAnalyzer, ManagerAgent, WorkerAgent};
55use crate::async_task::AsyncTaskSystem;
56use crate::error::SwarmError;
57use crate::events::{ActionEventPublisher, LearningEventChannel, LifecycleHook};
58use crate::exploration::{
59    ActionNodeData, AdaptiveOperatorProvider, ConfigurableSpace, DependencyGraphProvider,
60    MapNodeState, NodeRules, OperatorProvider,
61};
62use crate::state::SwarmState;
63use crate::types::{SwarmTask, WorkerId};
64
65/// Orchestrator - メインループ管理
66pub struct Orchestrator {
67    /// 共有状態
68    pub(crate) state: SwarmState,
69    /// Worker Agents
70    pub(crate) workers: Vec<Box<dyn WorkerAgent>>,
71    /// Manager Agents(複数対応)
72    pub(crate) managers: Vec<Box<dyn ManagerAgent>>,
73    /// Analyzer(SwarmState → TaskContext)
74    pub(crate) analyzer: Box<dyn Analyzer>,
75    /// Batch Invoker(LLM 推論、オプション)
76    pub(crate) batch_invoker: Option<Box<dyn BatchInvoker>>,
77    /// DependencyGraph Provider(ExplorationSpace 自動生成用)
78    pub(crate) dependency_provider: Option<Box<dyn DependencyGraphProvider>>,
79    /// 非同期タスクシステム
80    pub(crate) async_system: AsyncTaskSystem,
81    /// 設定
82    pub(crate) config: SwarmConfig,
83    /// Termination Judge - Single Source of Truth for termination decisions
84    pub(crate) termination_judge: TerminationJudge,
85    /// Manager 最終起動Tick(Manager ID -> Tick)
86    pub(crate) last_manager_ticks: std::collections::HashMap<crate::agent::ManagerId, u64>,
87    /// 現在の Guidance(Manager から配布、次 Tick でクリア)
88    /// Arc で共有することでクローン時のディープコピーを回避
89    pub(crate) current_guidances: std::collections::HashMap<WorkerId, Arc<crate::agent::Guidance>>,
90    /// Worker のパーティション割り当て(Manager ID -> Worker IDs)
91    /// None の場合は全 Manager が全 Worker を担当
92    pub(crate) worker_assignments:
93        Option<std::collections::HashMap<crate::agent::ManagerId, Vec<WorkerId>>>,
94
95    // ========================================================================
96    // Exploration V2 (新アーキテクチャ)
97    // ========================================================================
98    /// ExplorationSpace V2 - 新しい探索空間
99    /// GraphMap + Operator ベースの統合レイヤー
100    pub(crate) space_v2: Option<ConfigurableSpace<NodeRules>>,
101
102    /// OperatorProvider(Selection 動的切り替え用)
103    ///
104    /// AdaptiveProvider: エラー率に応じて UCB1 → Greedy/Thompson を自動切替
105    /// ConfigBasedProvider: 固定の Selection を使用
106    pub(crate) operator_provider: Box<dyn OperatorProvider<NodeRules>>,
107
108    /// ActionEventPublisher(行動イベント配信、オプション)
109    pub(crate) action_collector: Option<ActionEventPublisher>,
110
111    /// LearnedProvider(学習済みデータへのアクセス、オプション)
112    pub(crate) learned_provider: Option<crate::learn::SharedLearnedProvider>,
113
114    /// LifecycleHook(開始・終了時のフック)
115    pub(crate) lifecycle_hook: Option<Box<dyn LifecycleHook>>,
116}
117
118impl Orchestrator {
119    /// 新規作成
120    pub fn new(
121        workers: Vec<Box<dyn WorkerAgent>>,
122        config: SwarmConfig,
123        runtime: tokio::runtime::Handle,
124    ) -> Self {
125        let agent_count = workers.len();
126        let termination_config = TerminationConfig::with_max_ticks(config.max_ticks);
127        Self {
128            state: SwarmState::new(agent_count),
129            workers,
130            managers: Vec::new(),
131            analyzer: Box::new(DefaultAnalyzer::new()),
132            batch_invoker: None,
133            dependency_provider: None,
134            async_system: AsyncTaskSystem::new(runtime),
135            config,
136            termination_judge: TerminationJudge::new(termination_config, agent_count),
137            last_manager_ticks: std::collections::HashMap::new(),
138            current_guidances: std::collections::HashMap::new(),
139            worker_assignments: None,
140            space_v2: None,
141            operator_provider: Box::new(AdaptiveOperatorProvider::default()),
142            action_collector: None,
143            learned_provider: None,
144            lifecycle_hook: None,
145        }
146    }
147
148    /// Analyzer を設定
149    pub fn with_analyzer(mut self, analyzer: Box<dyn Analyzer>) -> Self {
150        self.analyzer = analyzer;
151        self
152    }
153
154    /// Manager を追加
155    pub fn add_manager(mut self, manager: Box<dyn ManagerAgent>) -> Self {
156        self.managers.push(manager);
157        self
158    }
159
160    /// BatchInvoker を設定
161    pub fn with_batch_invoker(mut self, invoker: Box<dyn BatchInvoker>) -> Self {
162        self.batch_invoker = Some(invoker);
163        self
164    }
165
166    /// DependencyGraphProvider を設定
167    ///
168    /// run_task() 時に ExplorationSpace がない場合、
169    /// このプロバイダーを使って DependencyGraph を自動生成する。
170    pub fn with_dependency_provider(mut self, provider: Box<dyn DependencyGraphProvider>) -> Self {
171        self.dependency_provider = Some(provider);
172        self
173    }
174
175    /// Worker パーティショニングを有効化
176    ///
177    /// 各 Manager に Worker を均等に割り当てる。
178    /// Manager ごとに担当 Worker のみの TaskContext を渡すため、
179    /// 大規模システムでのスケーラビリティが向上する。
180    ///
181    /// # 呼び出しタイミング
182    ///
183    /// 全ての Manager を追加した後に呼び出すこと。
184    pub fn enable_partitioning(&mut self) {
185        if self.managers.is_empty() {
186            return;
187        }
188
189        let worker_count = self.workers.len();
190        let manager_count = self.managers.len();
191        let workers_per_manager = worker_count.div_ceil(manager_count);
192
193        let mut assignments = std::collections::HashMap::new();
194        let all_worker_ids: Vec<WorkerId> = (0..worker_count).map(WorkerId).collect();
195
196        for (i, manager) in self.managers.iter().enumerate() {
197            let start = i * workers_per_manager;
198            let end = ((i + 1) * workers_per_manager).min(worker_count);
199            let assigned: Vec<WorkerId> = all_worker_ids[start..end].to_vec();
200            assignments.insert(manager.id(), assigned);
201        }
202
203        self.worker_assignments = Some(assignments);
204    }
205
206    /// 特定の Manager に割り当てられた Worker IDs を取得
207    pub(crate) fn get_assigned_workers(
208        &self,
209        manager_id: crate::agent::ManagerId,
210    ) -> Option<Vec<WorkerId>> {
211        self.worker_assignments
212            .as_ref()
213            .and_then(|assignments| assignments.get(&manager_id).cloned())
214    }
215
216    /// DependencyGraph への参照を取得
217    ///
218    /// ExplorationSpaceV2 に設定されている DependencyGraph を返す。
219    /// 学習データ保存時に action_order を抽出するために使用。
220    pub fn dependency_graph(&self) -> Option<&crate::exploration::DependencyGraph> {
221        self.space_v2
222            .as_ref()
223            .and_then(|space| space.dependency_graph())
224    }
225
226    /// タスクを設定してメインループを実行
227    ///
228    /// SwarmTask を Extensions に登録してから run() を実行する。
229    /// Manager は `state.shared.extensions.get::<SwarmTask>()` でタスクを読み取れる。
230    ///
231    /// # Errors
232    ///
233    /// - `SwarmError::MissingDependencyGraph`: DependencyGraph を生成できなかった
234    ///
235    /// # Example
236    ///
237    /// ```ignore
238    /// let task = SwarmTask::new("Find the auth handler")
239    ///     .with_target_path("/path/to/repo");
240    ///
241    /// let result = orchestrator.run_task(task)?;
242    /// ```
243    pub fn run_task(&mut self, task: SwarmTask) -> Result<SwarmResult, SwarmError> {
244        // タスクを Extensions に登録
245        self.state.shared.extensions.insert(task.clone());
246
247        // ActionsConfig からアクション定義一覧を取得
248        let actions: Vec<ActionDef> = self
249            .state
250            .shared
251            .extensions
252            .get::<crate::actions::ActionsConfig>()
253            .map(|cfg| cfg.all_actions().cloned().collect())
254            .unwrap_or_default();
255
256        // ExplorationSpace がない場合、DependencyGraphProvider で自動生成
257        // DependencyGraph は必須。生成できない場合はエラー。
258        self.ensure_exploration_space(&task.goal, &actions)?;
259
260        // initial_context を取得(V2 の initialize() 用)
261        // task.context["initial_context"] が配列なら使用、なければ target_service を使用
262        let initial_contexts: Vec<String> = task
263            .context
264            .get("initial_context")
265            .and_then(|v| v.as_array())
266            .map(|arr| {
267                arr.iter()
268                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
269                    .collect()
270            })
271            .unwrap_or_else(|| {
272                // フォールバック: target_service を単一の初期コンテキストとして使用
273                task.context
274                    .get("target_service")
275                    .and_then(|v| v.as_str())
276                    .map(|s| vec![s.to_string()])
277                    .unwrap_or_default()
278            });
279
280        // ========================================================================
281        // ExplorationSpaceV2 の初期化
282        // ========================================================================
283        if let Some(ref mut space_v2) = self.space_v2 {
284            // ルートノード作成
285            let root_id = space_v2.create_root(ActionNodeData::new("root"));
286            debug!(root_id = ?root_id, "ExplorationSpaceV2 root node created");
287
288            // initial_context に基づいて初期ノードを展開
289            if !initial_contexts.is_empty() {
290                let ctx_refs: Vec<&str> = initial_contexts.iter().map(|s| s.as_str()).collect();
291                let results = space_v2.initialize(&ctx_refs);
292                info!(
293                    initial_contexts = ?initial_contexts,
294                    expanded_nodes = results.len(),
295                    "ExplorationSpaceV2: initial nodes expanded via Rules"
296                );
297
298                // 初期ノード展開後、ルートノードを Close
299                // これによりルートがフロンティアから除外され、初期アクションが選択対象になる
300                if !results.is_empty() {
301                    space_v2.map_mut().set_state(root_id, MapNodeState::Closed);
302                    debug!(root_id = ?root_id, "ExplorationSpaceV2 root node closed after initialization");
303                }
304            }
305        }
306
307        Ok(self.run())
308    }
309
310    /// メインループを実行
311    pub fn run(&mut self) -> SwarmResult {
312        let start = Instant::now();
313        let worker_count = self.workers.len();
314        info!(worker_count = worker_count, "system_start");
315
316        // LifecycleHook: on_start
317        if let Some(ref mut hook) = self.lifecycle_hook {
318            hook.on_start(worker_count);
319        }
320
321        loop {
322            let tick_start = Instant::now();
323            let current_tick = self.state.shared.tick;
324
325            // Update termination judge's tick
326            self.termination_judge.set_tick(current_tick);
327
328            // Tick 開始イベント(telemetry 用)
329            info!(tick = current_tick, "tick_start");
330
331            // LearningEventChannel の current_tick を更新
332            LearningEventChannel::global().set_tick(current_tick);
333
334            // TickStart ActionEvent を発行
335            {
336                let event = crate::events::ActionEventBuilder::new(
337                    current_tick,
338                    crate::types::WorkerId::MANAGER,
339                    "tick_start",
340                )
341                .result(crate::events::ActionEventResult::success())
342                .build();
343                self.state.shared.stats.record(&event);
344                if let Some(ref collector) = self.action_collector {
345                    collector.record(event);
346                }
347            }
348
349            // 1. 非同期結果があれば取り込み
350            self.collect_async_results();
351
352            // 2. Manager 起動判定 & Guidance 更新
353            // Note: manager_activation と llm_errors は ActionEvent で記録される
354            // (manager.rs で llm_invoke イベントを発行)
355            if self.should_run_manager() {
356                let _ = self.run_manager();
357            } else {
358                // Manager が起動しない Tick でも、ExplorationSpaceV2 があれば Guidance を自動生成
359                // これにより、LLM 呼び出しなしで探索を継続できる
360                self.generate_exploration_guidances();
361            }
362
363            // 3. Execute Phase: 各 Worker が並列実行
364            let results = self.execute_workers();
365
366            // 4. Merge Phase: 結果を State に反映
367            self.merge_results(&results);
368
369            // 4.5. SharedData クリーンアップ(メモリリーク防止)
370            self.state.shared.shared_data.cleanup_env_entries();
371
372            // 5. Tick更新
373            self.state.advance_tick();
374            // Update termination judge's tick for correct termination check
375            self.termination_judge.set_tick(self.state.shared.tick);
376
377            // 6. 次Tickまで待機
378            let elapsed = tick_start.elapsed();
379            if elapsed < self.config.tick_duration {
380                std::thread::sleep(self.config.tick_duration - elapsed);
381            }
382
383            // 平均Tick時間を更新(EMA with α=0.1)
384            // EMA = α × current + (1-α) × EMA_old = (current + 9 × EMA_old) / 10
385            let current_ns = elapsed.as_nanos() as u64;
386            let prev_avg = self.state.shared.avg_tick_duration_ns;
387            self.state.shared.avg_tick_duration_ns = if prev_avg == 0 {
388                current_ns // 初回は現在値をそのまま使用
389            } else {
390                (current_ns + 9 * prev_avg) / 10
391            };
392
393            // LearningEventChannel の sync_buffer をクリア
394            // (LearningEvent は LearningEventSubscriber が処理するため、ここでは drain のみ)
395            let _ = LearningEventChannel::global().drain_sync();
396
397            // TickEnd ActionEvent を発行(duration を metadata に含める)
398            {
399                let event = crate::events::ActionEventBuilder::new(
400                    current_tick,
401                    crate::types::WorkerId::MANAGER,
402                    "tick_end",
403                )
404                .duration(elapsed)
405                .result(crate::events::ActionEventResult::success())
406                .context(
407                    crate::events::ActionContext::new()
408                        .with_metadata("duration_ns", elapsed.as_nanos().to_string()),
409                )
410                .build();
411                self.state.shared.stats.record(&event);
412                if let Some(ref collector) = self.action_collector {
413                    collector.record(event);
414                }
415            }
416
417            // Tick 完了イベント(telemetry 用)
418            info!(
419                tick = current_tick,
420                duration_ns = elapsed.as_nanos() as u64,
421                total_actions = self.state.shared.stats.total_visits(),
422                successful_actions = self.state.shared.stats.total_successes(),
423                failed_actions = self.state.shared.stats.total_failures(),
424                active_workers = self.workers.len() as u64,
425                "tick_complete"
426            );
427
428            // 終了判定
429            if self.should_terminate() {
430                break;
431            }
432        }
433
434        let total_duration = start.elapsed();
435
436        // システム停止イベント(telemetry 用)
437        info!(
438            total_ticks = self.state.shared.tick,
439            total_duration_ms = total_duration.as_millis() as u64,
440            "system_stop"
441        );
442
443        let result = SwarmResult {
444            total_ticks: self.state.shared.tick,
445            total_duration,
446            completed: true,
447        };
448
449        // LifecycleHook: on_terminate
450        if let Some(ref mut hook) = self.lifecycle_hook {
451            hook.on_terminate(&self.state, &result);
452        }
453
454        result
455    }
456
457    /// 外部から終了を要求
458    pub fn request_terminate(&mut self) {
459        self.termination_judge.request_terminate("External request");
460    }
461
462    /// TerminationJudge への参照を取得
463    pub fn termination_judge(&self) -> &TerminationJudge {
464        &self.termination_judge
465    }
466
467    /// TerminationJudge への可変参照を取得
468    pub fn termination_judge_mut(&mut self) -> &mut TerminationJudge {
469        &mut self.termination_judge
470    }
471
472    /// State への参照を取得
473    pub fn state(&self) -> &SwarmState {
474        &self.state
475    }
476
477    /// AsyncTaskSystem への参照を取得
478    pub fn async_system(&self) -> &AsyncTaskSystem {
479        &self.async_system
480    }
481
482    /// LearnedProvider への参照を取得
483    pub fn learned_provider(&self) -> Option<&crate::learn::SharedLearnedProvider> {
484        self.learned_provider.as_ref()
485    }
486}