swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Worker 実行フェーズの処理
//!
//! - collect_async_results(): 非同期タスク結果の収集
//! - execute_workers(): Worker の並列実行

use tracing::debug;

use crate::agent::WorkResult;
use crate::state::CompletedAsyncTask;
use crate::types::WorkerId;

use super::Orchestrator;

impl Orchestrator {
    /// 非同期結果を収集
    ///
    /// 完了した非同期タスクを処理:
    /// 1. Meta 情報を SharedData.completed_async_tasks に追加
    /// 2. payload を payloads HashMap に保存(ReadPayload Action で取得可能)
    /// 3. 発行元 Worker の pending_tasks から削除
    pub(super) fn collect_async_results(&mut self) {
        let results = self.async_system.collect_results();
        let current_tick = self.state.shared.tick;

        for result in results {
            debug!(
                "Async task {} ({}) completed: {:?}",
                result.task_id.0, result.task_type, result.metadata.status
            );

            // 発行元 Worker を特定
            let mut worker_id = None;
            for (idx, ctx) in self.state.workers.iter_mut().enumerate() {
                if ctx.pending_tasks.contains(&result.task_id) {
                    worker_id = Some(WorkerId(idx));
                    ctx.complete_task(result.task_id);
                    break;
                }
            }

            // Meta を SharedData に追加
            let completed = CompletedAsyncTask {
                task_id: result.task_id,
                worker_id,
                task_type: result.task_type,
                completed_at_tick: current_tick,
                status: result.metadata.status,
                error: result.metadata.error.map(|e| e.message),
            };
            self.state
                .shared
                .shared_data
                .completed_async_tasks
                .push(completed);

            // payload を kv に保存(Worker は kv から直接読める)
            // payload が String にダウンキャストできる場合のみ保存
            if let Some(payload) = result.payload {
                if let Some(s) = payload.downcast_ref::<String>() {
                    let key = format!("async_payload:{}", result.task_id.0);
                    self.state
                        .shared
                        .shared_data
                        .kv
                        .insert(key, s.as_bytes().to_vec());
                }
            }
        }
    }

    /// Worker を並列実行(par_iter、ロックフリー)
    pub(super) fn execute_workers(&self) -> Vec<(usize, WorkResult)> {
        use rayon::prelude::*;

        self.workers
            .par_iter()
            .enumerate()
            .map(|(i, worker)| {
                let guidance = self.current_guidances.get(&worker.id());
                // Arc<Guidance> から &Guidance に変換
                let result = worker.think_and_act(&self.state, guidance.map(|g| g.as_ref()));
                (i, result)
            })
            .collect()
    }
}