swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! SwarmLayer - tracing Layer による観測層
//!
//! Orchestrator が発行する tracing イベントを SwarmEvent に変換し、
//! broadcast channel で Consumer に配信する。

use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

use super::event::{ManagerState, SwarmEvent, TickMetrics};
use crate::types::TaskId;

/// Swarm イベントを配信する tracing Layer
pub struct SwarmLayer {
    tx: broadcast::Sender<SwarmEvent>,
}

impl SwarmLayer {
    /// 新規作成
    ///
    /// # Arguments
    /// * `capacity` - broadcast channel のバッファサイズ
    ///
    /// # Returns
    /// * `SwarmLayer` - tracing Layer
    /// * `broadcast::Receiver<SwarmEvent>` - 最初の Receiver
    pub fn new(capacity: usize) -> (Self, broadcast::Receiver<SwarmEvent>) {
        let (tx, rx) = broadcast::channel(capacity);
        (Self { tx }, rx)
    }

    /// 追加の Receiver を取得
    pub fn subscribe(&self) -> broadcast::Receiver<SwarmEvent> {
        self.tx.subscribe()
    }

    /// Sender を Arc でラップして返す(Layer 登録後にも subscribe 可能にする)
    pub fn sender(&self) -> broadcast::Sender<SwarmEvent> {
        self.tx.clone()
    }
}

/// tracing イベントから値を抽出する Visitor
struct SwarmEventVisitor {
    // 共通フィールド
    message: Option<String>,

    // tick 関連
    tick: Option<u64>,
    duration_ns: Option<u64>,

    // metrics 関連
    total_actions: Option<u64>,
    successful_actions: Option<u64>,
    failed_actions: Option<u64>,
    active_workers: Option<usize>,

    // manager 関連
    manager_id: Option<usize>,
    manager_state: Option<String>,

    // worker 関連
    worker_id: Option<usize>,
    worker_count: Option<usize>,
    action: Option<String>,
    success: Option<bool>,

    // async task 関連
    task_id: Option<u64>,
    duration_ms: Option<u64>,

    // system 関連
    total_ticks: Option<u64>,
    total_duration_ms: Option<u64>,
}

impl SwarmEventVisitor {
    fn new() -> Self {
        Self {
            message: None,
            tick: None,
            duration_ns: None,
            total_actions: None,
            successful_actions: None,
            failed_actions: None,
            active_workers: None,
            manager_id: None,
            manager_state: None,
            worker_id: None,
            worker_count: None,
            action: None,
            success: None,
            task_id: None,
            duration_ms: None,
            total_ticks: None,
            total_duration_ms: None,
        }
    }

    /// 収集した値から SwarmEvent を生成
    fn into_event(self) -> Option<SwarmEvent> {
        let msg = self.message.as_deref()?;

        match msg {
            "tick_start" => Some(SwarmEvent::TickStart { tick: self.tick? }),

            "tick_complete" => Some(SwarmEvent::TickComplete {
                tick: self.tick?,
                duration_ns: self.duration_ns.unwrap_or(0),
                metrics: TickMetrics {
                    total_actions: self.total_actions.unwrap_or(0),
                    successful_actions: self.successful_actions.unwrap_or(0),
                    failed_actions: self.failed_actions.unwrap_or(0),
                    active_workers: self.active_workers.unwrap_or(0),
                },
            }),

            "manager_state_change" => Some(SwarmEvent::ManagerStateChange {
                manager_id: self.manager_id?,
                new_state: parse_manager_state(self.manager_state.as_deref()),
            }),

            "worker_action" => Some(SwarmEvent::WorkerAction {
                worker_id: self.worker_id?,
                action: self.action.unwrap_or_default(),
                success: self.success.unwrap_or(true),
            }),

            "async_task_complete" => Some(SwarmEvent::AsyncTaskComplete {
                task_id: TaskId(self.task_id?),
                duration_ms: self.duration_ms.unwrap_or(0),
            }),

            "system_start" => Some(SwarmEvent::SystemStart {
                worker_count: self.worker_count.unwrap_or(0),
            }),

            "system_stop" => Some(SwarmEvent::SystemStop {
                total_ticks: self.total_ticks.unwrap_or(0),
                total_duration_ms: self.total_duration_ms.unwrap_or(0),
            }),

            _ => None,
        }
    }
}

impl Visit for SwarmEventVisitor {
    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
        if field.name() == "message" {
            self.message = Some(format!("{:?}", value).trim_matches('"').to_string());
        }
    }

    fn record_str(&mut self, field: &Field, value: &str) {
        match field.name() {
            "message" => self.message = Some(value.to_string()),
            "manager_state" => self.manager_state = Some(value.to_string()),
            "action" => self.action = Some(value.to_string()),
            _ => {}
        }
    }

    fn record_u64(&mut self, field: &Field, value: u64) {
        match field.name() {
            "tick" => self.tick = Some(value),
            "duration_ns" => self.duration_ns = Some(value),
            "total_actions" => self.total_actions = Some(value),
            "successful_actions" => self.successful_actions = Some(value),
            "failed_actions" => self.failed_actions = Some(value),
            "active_workers" => self.active_workers = Some(value as usize),
            "manager_id" => self.manager_id = Some(value as usize),
            "worker_id" => self.worker_id = Some(value as usize),
            "worker_count" => self.worker_count = Some(value as usize),
            "task_id" => self.task_id = Some(value),
            "duration_ms" => self.duration_ms = Some(value),
            "total_ticks" => self.total_ticks = Some(value),
            "total_duration_ms" => self.total_duration_ms = Some(value),
            _ => {}
        }
    }

    fn record_i64(&mut self, field: &Field, value: i64) {
        // u64 に変換して処理
        if value >= 0 {
            self.record_u64(field, value as u64);
        }
    }

    fn record_bool(&mut self, field: &Field, value: bool) {
        if field.name() == "success" {
            self.success = Some(value);
        }
    }
}

fn parse_manager_state(s: Option<&str>) -> ManagerState {
    match s {
        Some("idle") => ManagerState::Idle,
        Some("processing") => ManagerState::Processing,
        Some("delegated") => ManagerState::Delegated,
        Some("escalated") => ManagerState::Escalated,
        Some("error") => ManagerState::Error,
        _ => ManagerState::Idle,
    }
}

impl<S> Layer<S> for SwarmLayer
where
    S: Subscriber,
{
    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
        let mut visitor = SwarmEventVisitor::new();
        event.record(&mut visitor);

        if let Some(swarm_event) = visitor.into_event() {
            // ブロックしない。満杯なら古いイベントが drop される
            let _ = self.tx.send(swarm_event);
        }
    }
}

/// SwarmLayer を作成して Sender を返すヘルパー
///
/// Layer を tracing に登録した後でも subscribe() できるように、
/// Sender を返す。
pub fn create_swarm_layer(
    capacity: usize,
) -> (
    SwarmLayer,
    broadcast::Sender<SwarmEvent>,
    broadcast::Receiver<SwarmEvent>,
) {
    let (layer, rx) = SwarmLayer::new(capacity);
    let tx = layer.sender();
    (layer, tx, rx)
}