swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Manager フェーズの処理
//!
//! - should_run_manager(): Manager 起動判定
//! - run_manager(): Manager 実行(Batch 対応、複数Manager並列処理)
//! - generate_exploration_guidances(): Manager なしでの Guidance 自動生成

use std::sync::Arc;

use tracing::debug;

use crate::agent::{DecisionResponse, Guidance, ManagementStrategy};
use crate::exploration::{GraphMap, ProviderContext};
use crate::types::WorkerId;

use super::Orchestrator;

impl Orchestrator {
    /// Manager 起動判定
    pub(super) fn should_run_manager(&self) -> bool {
        if self.managers.is_empty() {
            return false;
        }

        let current_tick = self.state.shared.tick;
        match &self.config.management_strategy {
            ManagementStrategy::EveryTick => true,
            ManagementStrategy::FixedInterval { interval } => {
                current_tick > 0 && current_tick.is_multiple_of(*interval)
            }
            ManagementStrategy::CompletionBased { max_wait_ticks } => {
                // Note: Worker の状態は WorkResult で Orchestrator が管理するため、
                // ここでは履歴の最新エントリから判定する(idle アクションの検出)
                let all_idle = self.state.workers.iter().all(|ctx| {
                    ctx.history
                        .latest()
                        .map(|e| e.action_name.ends_with(":idle"))
                        .unwrap_or(true) // 履歴がない場合は idle 扱い
                });
                // 全Managerの最小last_tickを使用
                let min_last_tick = self
                    .managers
                    .iter()
                    .map(|m| self.last_manager_ticks.get(&m.id()).copied().unwrap_or(0))
                    .min()
                    .unwrap_or(0);
                all_idle || (current_tick - min_last_tick) >= *max_wait_ticks
            }
            ManagementStrategy::Hybrid {
                preferred_interval,
                force_after_ticks,
            } => {
                // 全Managerの最小last_tickを使用
                let min_last_tick = self
                    .managers
                    .iter()
                    .map(|m| self.last_manager_ticks.get(&m.id()).copied().unwrap_or(0))
                    .min()
                    .unwrap_or(0);
                let ticks_since_last = current_tick - min_last_tick;
                ticks_since_last >= *preferred_interval || ticks_since_last >= *force_after_ticks
            }
            ManagementStrategy::EscalationBased {
                interval,
                immediate_on_escalation,
            } => {
                // 定期起動チェック
                let periodic = current_tick > 0 && current_tick.is_multiple_of(*interval);

                // Escalation チェック
                let has_escalation = if *immediate_on_escalation {
                    self.state
                        .workers
                        .iter()
                        .any(|ctx| ctx.escalation.is_some())
                } else {
                    false
                };

                periodic || has_escalation
            }
        }
    }

    /// Manager を実行(Batch 対応、複数Manager並列処理)
    ///
    /// Note: should_run_manager() で起動判定済みの前提
    /// Returns: ManagerPhaseSnapshot for recording
    ///
    /// # フロー
    ///
    /// ```text
    /// 1. Analyzer.analyze(state) → TaskContext
    /// 2. Manager.prepare(context) → BatchDecisionRequest
    /// 3. BatchInvoker.invoke(request) → Response
    /// 4. Manager.finalize(context, responses) → ManagementDecision
    /// ```
    pub(super) fn run_manager(&mut self) -> crate::state::ManagerPhaseSnapshot {
        let current_tick = self.state.shared.tick;

        // Phase 0: Analyzer で TaskContext を生成
        let mut context = self.analyzer.analyze(&self.state);

        // ========================================================================
        // ExplorationSpaceV2 でのノード選択(新アーキテクチャ)
        // ========================================================================
        let worker_count = self.state.workers.len();

        // AdaptiveProvider: Selection を動的に切り替え
        // エラー率に応じて UCB1 → Greedy/Thompson を自動選択
        if let Some(ref mut space_v2) = self.space_v2 {
            // SharedState.stats を使用(Single Source of Truth)
            let empty_map = GraphMap::new();
            let ctx = ProviderContext::new(&empty_map, &self.state.shared.stats);
            self.operator_provider
                .reevaluate(space_v2.operator_mut(), &ctx);
        }

        let v2_guidances: Vec<crate::agent::Guidance> = if let Some(ref space_v2) = self.space_v2 {
            // select_nodes() で MapNode を直接取得(SwarmStats を渡す)
            let nodes = space_v2.select_nodes(worker_count, &self.state.shared.stats);

            // MapNode → Guidance に直接変換
            let guidances: Vec<crate::agent::Guidance> = nodes
                .iter()
                .map(|node| crate::agent::Guidance::from(*node))
                .collect();

            debug!(
                selected_count = nodes.len(),
                actions = ?guidances.iter().map(|g| g.actions.first().map(|a| &a.name)).collect::<Vec<_>>(),
                frontiers = space_v2.frontiers().len(),
                operator = space_v2.operator_name(),
                "ExplorationSpaceV2: nodes selected and converted to Guidance"
            );

            // 完了判定
            if space_v2.is_complete() {
                debug!(
                    exhausted = space_v2.is_exhausted(),
                    completed = space_v2.has_completed(),
                    "ExplorationSpaceV2: exploration complete"
                );
            }

            guidances
        } else {
            vec![]
        };

        // V2 Guidance が生成されていれば、context に設定
        if !v2_guidances.is_empty() {
            context.v2_guidances = Some(v2_guidances);
        }

        // 前回の Guidance を TaskContext に注入
        // Manager.prepare() でこれを使って ResolvedContext に ManagerInstruction を埋め込む
        if !self.current_guidances.is_empty() {
            context.previous_guidances = self.current_guidances.clone();
            debug!(
                count = context.previous_guidances.len(),
                "Injected previous guidances into TaskContext"
            );
        }

        // 全 Manager から Guidance を収集してマージ
        let mut merged_guidances: std::collections::HashMap<WorkerId, Guidance> =
            std::collections::HashMap::new();

        // スナップショット用:最後のManagerのデータを記録
        let mut last_batch_request = crate::agent::BatchDecisionRequest {
            manager_id: crate::agent::ManagerId(0),
            requests: vec![],
        };
        let mut last_responses: Vec<(WorkerId, DecisionResponse)> = vec![];
        let mut llm_errors = 0u64;

        for manager in &self.managers {
            // Partitioning: この Manager に割り当てられた Worker のみの context を作成
            let manager_context =
                if let Some(assigned_workers) = self.get_assigned_workers(manager.id()) {
                    context.filter_for_workers(&assigned_workers)
                } else {
                    context.clone()
                };

            // Phase 1: TaskContext を見て Batch リクエスト生成
            let batch_request = manager.prepare(&manager_context);

            // Phase 2: LLM Batch Call
            let (responses, batch_llm_errors): (Vec<(WorkerId, DecisionResponse)>, u64) =
                if let Some(invoker) = &self.batch_invoker {
                    // BatchInvoker を使用(Extensions を渡す)
                    let mut error_count = 0u64;

                    // Worker ID -> query のマッピングを作成(エラー時のprompt保持用)
                    let query_map: std::collections::HashMap<WorkerId, String> = batch_request
                        .requests
                        .iter()
                        .map(|req| (req.worker_id, req.query.clone()))
                        .collect();

                    let responses: Vec<(WorkerId, DecisionResponse)> = invoker
                        .invoke(batch_request.clone(), &self.state.shared.extensions)
                        .into_iter()
                        .map(|(worker_id, result)| {
                            let response = result.unwrap_or_else(|e| {
                                error_count += 1;
                                eprintln!(
                                    "[LLM Error] W{}: Batch invoke error: {}",
                                    worker_id.0, e
                                );
                                // エラー時もpromptを保持(スナップショット表示用)
                                DecisionResponse {
                                    prompt: query_map.get(&worker_id).cloned(),
                                    ..Default::default()
                                }
                            });
                            (worker_id, response)
                        })
                        .collect();

                    // LLM 呼び出しを ActionEvent として記録
                    let current_tick = self.state.shared.tick;
                    for (worker_id, response) in &responses {
                        // エラー時は raw_response が None(DecisionResponse::default())
                        let success = response.raw_response.is_some();

                        let result = if success {
                            crate::events::ActionEventResult::success()
                        } else {
                            crate::events::ActionEventResult::failure("llm_error")
                        };

                        // LLM 呼び出し結果の概要を metadata に追加
                        // Note: prompt/response の全文が必要な場合は LlmDebugChannel を使用
                        let context = crate::events::ActionContext::new()
                            .with_metadata("tool", response.tool.clone())
                            .with_metadata("target", response.target.clone())
                            .with_metadata("confidence", response.confidence.to_string());

                        let event = crate::events::ActionEventBuilder::new(
                            current_tick,
                            *worker_id, // Worker ID を使用(どの Worker 向けの LLM 呼び出しか)
                            "llm_invoke",
                        )
                        .result(result)
                        .context(context)
                        .build();

                        self.state.shared.stats.record(&event);
                        if let Some(ref collector) = self.action_collector {
                            collector.record(event);
                        }
                    }

                    (responses, error_count)
                } else {
                    // BatchInvoker なし: デフォルトレスポンス
                    let responses = batch_request
                        .requests
                        .iter()
                        .map(|req| (req.worker_id, DecisionResponse::default()))
                        .collect();
                    (responses, 0)
                };
            llm_errors += batch_llm_errors;

            // スナップショット用に保存
            last_batch_request = batch_request;
            last_responses = responses.clone();

            // Phase 3: レスポンス処理(TaskContext と responses を渡す)
            let decision = manager.finalize(&manager_context, responses);

            // Guidance をマージ(後のManagerが優先、または actions を結合)
            for (worker_id, guidance) in decision.guidances {
                merged_guidances
                    .entry(worker_id)
                    .and_modify(|existing| {
                        // 既存の Guidance に新しい actions を追加
                        existing.actions.extend(guidance.actions.clone());
                        // content は後のManagerで上書き
                        if guidance.content.is_some() {
                            existing.content = guidance.content.clone();
                        }
                        // props はマージ
                        existing.props.extend(guidance.props.clone());
                    })
                    .or_insert(guidance);
            }

            // 戦略更新(最後のManagerの決定が優先)
            if let Some(new_strategy) = decision.strategy_update {
                self.config.management_strategy = new_strategy;
            }

            // Manager の last_tick を更新
            self.last_manager_ticks.insert(manager.id(), current_tick);
        }

        // マージした Guidance を Arc でラップして保存(クローン時のディープコピー回避)
        self.current_guidances = merged_guidances
            .iter()
            .map(|(id, g)| (*id, Arc::new(g.clone())))
            .collect();

        // スナップショットを返す
        crate::state::ManagerPhaseSnapshot {
            batch_request: last_batch_request,
            responses: last_responses,
            guidances: merged_guidances,
            llm_errors,
        }
    }

    /// Manager なしで ExplorationSpaceV2 から Guidance を自動生成
    ///
    /// Manager が起動しない Tick でも、ExplorationSpaceV2 があれば
    /// 探索ノードから Guidance を生成して Worker に割り当てる。
    /// これにより LLM 呼び出しなしで探索を継続できる。
    pub(super) fn generate_exploration_guidances(&mut self) {
        // Skip guidance generation if termination is pending
        if self.termination_judge.should_skip_guidance() {
            debug!("Skipping guidance generation: termination pending");
            return;
        }

        let worker_count = self.state.workers.len();

        // ExplorationSpaceV2 がない場合は何もしない
        let Some(ref mut space_v2) = self.space_v2 else {
            return;
        };

        // AdaptiveProvider: Selection を動的に切り替え
        // SharedState.stats を使用(Single Source of Truth)
        let empty_map = GraphMap::new();
        let ctx = ProviderContext::new(&empty_map, &self.state.shared.stats);
        self.operator_provider
            .reevaluate(space_v2.operator_mut(), &ctx);

        // select_nodes() で MapNode を取得し、Guidance に変換(SwarmStats を渡す)
        let nodes = space_v2.select_nodes(worker_count, &self.state.shared.stats);
        let guidances: Vec<Guidance> = nodes.iter().map(|node| Guidance::from(*node)).collect();

        if guidances.is_empty() {
            return;
        }

        debug!(
            selected_count = nodes.len(),
            actions = ?guidances.iter().map(|g| g.actions.first().map(|a| &a.name)).collect::<Vec<_>>(),
            "ExplorationSpaceV2: auto-generated guidances (no Manager)"
        );

        // Worker に Guidance を割り当て
        // Manager の場合と同様に、各 Worker に1つずつ割り当て
        let mut new_guidances = std::collections::HashMap::new();
        for (i, guidance) in guidances.into_iter().enumerate() {
            if i < worker_count {
                new_guidances.insert(WorkerId(i), Arc::new(guidance));
            }
        }

        // current_guidances を更新
        self.current_guidances = new_guidances;
    }
}