swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! SwarmEngine AsyncTaskSystem
//!
//! 非同期Task実行基盤。WebSearch、LLM呼び出し、外部API、ファイルI/O など、
//! 1秒〜数十秒かかる処理をTick Loop をブロックせずに実行する。

use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use crossbeam_channel::{Receiver, Sender};
use tokio::runtime::Handle;

use crate::types::TaskId;

/// Task ステータス
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskStatus {
    Pending,
    Running,
    Completed,
    Failed,
    Timeout,
}

/// 非同期Task の抽象 - ユーザーが自由に実装可能
pub trait AsyncTask: Send + Sync + 'static {
    /// Task の種類(ログ・デバッグ用)
    fn task_type(&self) -> &str;

    /// 非同期実行
    fn execute(&self) -> Pin<Box<dyn Future<Output = AsyncTaskResult> + Send>>;

    /// タイムアウト(デフォルト: 30秒)
    fn timeout(&self) -> Duration {
        Duration::from_secs(30)
    }
}

/// 非同期Task の実行結果
pub struct AsyncTaskResult {
    /// Task ID(発行元が結果を受け取るため)
    pub task_id: TaskId,
    /// タスク種別("web_search", "llm_call" など)
    pub task_type: String,
    /// 結果データ(型消去)
    pub payload: Option<Box<dyn Any + Send>>,
    /// メタデータ
    pub metadata: TaskMetadata,
}

impl AsyncTaskResult {
    pub fn success<T: Any + Send + 'static>(
        task_id: TaskId,
        task_type: impl Into<String>,
        payload: T,
        duration: Duration,
    ) -> Self {
        Self {
            task_id,
            task_type: task_type.into(),
            payload: Some(Box::new(payload)),
            metadata: TaskMetadata {
                duration,
                status: TaskStatus::Completed,
                error: None,
            },
        }
    }

    pub fn failure(
        task_id: TaskId,
        task_type: impl Into<String>,
        error: String,
        duration: Duration,
    ) -> Self {
        Self {
            task_id,
            task_type: task_type.into(),
            payload: None,
            metadata: TaskMetadata {
                duration,
                status: TaskStatus::Failed,
                error: Some(AsyncTaskError { message: error }),
            },
        }
    }

    pub fn timeout(task_id: TaskId, task_type: impl Into<String>, duration: Duration) -> Self {
        Self {
            task_id,
            task_type: task_type.into(),
            payload: None,
            metadata: TaskMetadata {
                duration,
                status: TaskStatus::Timeout,
                error: Some(AsyncTaskError {
                    message: "Task timed out".to_string(),
                }),
            },
        }
    }
}

/// Task メタデータ
pub struct TaskMetadata {
    /// 実行時間
    pub duration: Duration,
    /// 成功/失敗
    pub status: TaskStatus,
    /// エラー情報(失敗時)
    pub error: Option<AsyncTaskError>,
}

/// 非同期Taskエラー
#[derive(Debug, Clone, thiserror::Error)]
#[error("Async task error: {message}")]
pub struct AsyncTaskError {
    pub message: String,
}

impl From<crate::error::SwarmError> for AsyncTaskError {
    fn from(err: crate::error::SwarmError) -> Self {
        Self {
            message: err.message().to_string(),
        }
    }
}

impl From<AsyncTaskError> for crate::error::SwarmError {
    fn from(err: AsyncTaskError) -> Self {
        crate::error::SwarmError::AsyncTask {
            message: err.message,
        }
    }
}

/// 非同期Task システム
pub struct AsyncTaskSystem {
    /// 結果送信チャンネル
    result_tx: Sender<AsyncTaskResult>,
    /// 結果受信チャンネル
    result_rx: Receiver<AsyncTaskResult>,
    /// Tokio ランタイムハンドル
    runtime: Handle,
    /// 登録済み Task ファクトリ
    factories: HashMap<String, Box<dyn AsyncTaskFactory>>,
}

impl AsyncTaskSystem {
    /// 新規作成
    pub fn new(runtime: Handle) -> Self {
        let (tx, rx) = crossbeam_channel::unbounded();
        Self {
            result_tx: tx,
            result_rx: rx,
            runtime,
            factories: HashMap::new(),
        }
    }

    /// Worker/Manager から Task を発行
    pub fn spawn<T: AsyncTask>(&self, task: T) -> TaskId {
        self.spawn_boxed(Box::new(task))
    }

    /// Box<dyn AsyncTask> を発行(ファクトリ経由で生成された Task 用)
    pub fn spawn_boxed(&self, task: Box<dyn AsyncTask>) -> TaskId {
        let task_id = TaskId::new();
        let tx = self.result_tx.clone();
        let timeout_duration = task.timeout();
        let task_type = task.task_type().to_string();

        self.runtime.spawn(async move {
            let start = std::time::Instant::now();
            let result = tokio::time::timeout(timeout_duration, task.execute()).await;
            let duration = start.elapsed();

            let task_result = match result {
                Ok(mut r) => {
                    r.task_id = task_id;
                    r.task_type = task_type;
                    r.metadata.duration = duration;
                    r
                }
                Err(_) => AsyncTaskResult::timeout(task_id, task_type, duration),
            };
            let _ = tx.send(task_result);
        });

        task_id
    }

    /// 完了したTaskの結果を収集(ノンブロッキング)
    pub fn collect_results(&self) -> Vec<AsyncTaskResult> {
        let mut results = Vec::new();
        while let Ok(result) = self.result_rx.try_recv() {
            results.push(result);
        }
        results
    }

    /// Task ファクトリを登録(ユーザー拡張ポイント)
    pub fn register_factory<F: AsyncTaskFactory + 'static>(&mut self, name: &str, factory: F) {
        self.factories.insert(name.to_string(), Box::new(factory));
    }

    /// ファクトリからTaskを生成
    pub fn create_task(&self, name: &str, params: TaskParams) -> Option<Box<dyn AsyncTask>> {
        self.factories.get(name).map(|f| f.create(params))
    }
}

/// Task 生成のファクトリ(ユーザー拡張用)
pub trait AsyncTaskFactory: Send + Sync {
    fn create(&self, params: TaskParams) -> Box<dyn AsyncTask>;
}

/// Task パラメータ
#[derive(Debug, Clone, Default)]
pub struct TaskParams {
    pub data: HashMap<String, String>,
}

/// 非同期設定
#[derive(Debug, Clone)]
pub struct AsyncConfig {
    /// 最大同時実行数
    pub max_concurrent: usize,
    /// デフォルトタイムアウト(秒)
    pub default_timeout_secs: u64,
}

impl Default for AsyncConfig {
    fn default() -> Self {
        Self {
            max_concurrent: 100,
            default_timeout_secs: 30,
        }
    }
}

// ============================================================================
// 組み込み AsyncTask 例
// ============================================================================

/// シンプルな遅延Task(テスト用)
pub struct DelayTask {
    pub delay: Duration,
    pub result: String,
}

impl AsyncTask for DelayTask {
    fn task_type(&self) -> &str {
        "delay"
    }

    fn execute(&self) -> Pin<Box<dyn Future<Output = AsyncTaskResult> + Send>> {
        let delay = self.delay;
        let result = self.result.clone();

        Box::pin(async move {
            tokio::time::sleep(delay).await;
            AsyncTaskResult::success(TaskId::new(), "delay", result, delay)
        })
    }

    fn timeout(&self) -> Duration {
        self.delay + Duration::from_secs(1)
    }
}