swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Debug - Tick単位のデバッグ出力機構
//!
//! Eval/Test実行時のデバッグ用。Tick単位でStateをDumpする。
//!
//! ## 設計思想
//!
//! - **Tick-based**: Spanログではなく、Tick単位のスナップショット
//! - **自動収集**: `Dumpable` 実装 + `register()` で完了
//! - **Subscribe**: broadcast channelで外部からSubscribe可能
//! - **条件付き**: -v / RUST_LOG=debug / Error時のみ有効
//!
//! ## 使い方
//!
//! ```ignore
//! use swarm_engine_core::debug::{TickDumper, Dumpable};
//!
//! // Dumpable を実装
//! impl Dumpable for MyState {
//!     fn name(&self) -> &'static str { "my_state" }
//!     fn snapshot(&self, tick: u64) -> Option<serde_json::Value> {
//!         Some(serde_json::json!({ "count": self.count }))
//!     }
//! }
//!
//! // TickDumper に登録
//! let mut dumper = TickDumper::new(256);
//! dumper.register(Arc::new(my_state));
//! dumper.enable();
//!
//! // Subscribe
//! let mut rx = dumper.subscribe();
//! tokio::spawn(async move {
//!     while let Ok(snap) = rx.recv().await {
//!         eprintln!("{:?}", snap);
//!     }
//! });
//!
//! // Tick終了時
//! dumper.dump(tick);
//! ```

use serde::Serialize;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;

use crate::util::epoch_millis;

// ============================================================================
// Dumpable trait
// ============================================================================

/// Tick単位でDump可能なもの
///
/// Clone/Debug未実装のObjectは `snapshot()` で除外できる。
pub trait Dumpable: Send + Sync {
    /// 識別名(一意であること)
    fn name(&self) -> &'static str;

    /// Tick時点のスナップショット
    ///
    /// - `Some(value)`: このTickでDumpすべきデータがある
    /// - `None`: このTickでは変化なし/Dump不要
    fn snapshot(&self, tick: u64) -> Option<serde_json::Value>;
}

// ============================================================================
// DebugSnapshot
// ============================================================================

/// Tick単位のスナップショット
#[derive(Debug, Clone, Serialize)]
pub struct DebugSnapshot {
    /// Tick番号
    pub tick: u64,
    /// タイムスタンプ(Unix epoch ms)
    pub timestamp_ms: u64,
    /// 各Dumpableからのデータ
    pub data: HashMap<&'static str, serde_json::Value>,
}

// ============================================================================
// TickDumper
// ============================================================================

/// Tick Dumper - Dumpable の Registry + 配信
///
/// Dumpableを登録し、Tick終了時に全てのスナップショットを収集して配信。
pub struct TickDumper {
    /// 登録されたDumpable
    observers: Vec<Arc<dyn Dumpable>>,
    /// broadcast sender
    tx: broadcast::Sender<DebugSnapshot>,
    /// 有効/無効
    enabled: AtomicBool,
    /// Error時は常にDump
    dump_on_error: AtomicBool,
}

impl TickDumper {
    /// 新規作成
    ///
    /// `capacity`: broadcast channelのバッファサイズ
    pub fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self {
            observers: Vec::new(),
            tx,
            enabled: AtomicBool::new(false),
            dump_on_error: AtomicBool::new(true),
        }
    }

    /// Dumpable を登録
    pub fn register(&mut self, obj: Arc<dyn Dumpable>) {
        self.observers.push(obj);
    }

    /// 複数のDumpableを一括登録
    pub fn register_all(&mut self, objs: impl IntoIterator<Item = Arc<dyn Dumpable>>) {
        for obj in objs {
            self.observers.push(obj);
        }
    }

    /// 有効化(-v / RUST_LOG=debug で呼び出し)
    pub fn enable(&self) {
        self.enabled.store(true, Ordering::Relaxed);
    }

    /// 無効化
    pub fn disable(&self) {
        self.enabled.store(false, Ordering::Relaxed);
    }

    /// 有効かどうか
    pub fn is_enabled(&self) -> bool {
        self.enabled.load(Ordering::Relaxed)
    }

    /// Error時のDump有効/無効設定
    pub fn set_dump_on_error(&self, enabled: bool) {
        self.dump_on_error.store(enabled, Ordering::Relaxed);
    }

    /// Tick終了時に呼び出し
    ///
    /// 全ての登録済みDumpableからスナップショットを収集して配信。
    pub fn dump(&self, tick: u64) {
        if !self.enabled.load(Ordering::Relaxed) {
            return;
        }

        if let Some(snapshot) = self.collect_snapshot(tick) {
            let _ = self.tx.send(snapshot);
        }
    }

    /// Error発生時(enabledに関係なくDump)
    pub fn dump_error(&self, tick: u64, error: &str) {
        if !self.dump_on_error.load(Ordering::Relaxed) {
            return;
        }

        let mut snapshot = self
            .collect_snapshot(tick)
            .unwrap_or_else(|| DebugSnapshot {
                tick,
                timestamp_ms: epoch_millis(),
                data: HashMap::new(),
            });

        snapshot
            .data
            .insert("_error", serde_json::json!(error.to_string()));

        let _ = self.tx.send(snapshot);
    }

    /// Subscriberを取得
    pub fn subscribe(&self) -> broadcast::Receiver<DebugSnapshot> {
        self.tx.subscribe()
    }

    /// 登録済みDumpableの数
    pub fn observer_count(&self) -> usize {
        self.observers.len()
    }

    /// スナップショットを収集
    fn collect_snapshot(&self, tick: u64) -> Option<DebugSnapshot> {
        let mut data = HashMap::new();

        for obs in &self.observers {
            if let Some(value) = obs.snapshot(tick) {
                data.insert(obs.name(), value);
            }
        }

        if data.is_empty() {
            return None;
        }

        Some(DebugSnapshot {
            tick,
            timestamp_ms: epoch_millis(),
            data,
        })
    }
}

impl Default for TickDumper {
    fn default() -> Self {
        Self::new(256)
    }
}

// ============================================================================
// Built-in Subscribers
// ============================================================================

/// stderr出力用Subscriber
pub struct StderrDumpSubscriber {
    rx: broadcast::Receiver<DebugSnapshot>,
    /// Pretty print JSON
    pretty: bool,
}

impl StderrDumpSubscriber {
    pub fn new(rx: broadcast::Receiver<DebugSnapshot>, pretty: bool) -> Self {
        Self { rx, pretty }
    }

    /// 受信ループを開始(async)
    pub async fn run(mut self) {
        while let Ok(snapshot) = self.rx.recv().await {
            self.print_snapshot(&snapshot);
        }
    }

    fn print_snapshot(&self, snapshot: &DebugSnapshot) {
        eprintln!("=== Tick {} ===", snapshot.tick);
        if self.pretty {
            if let Ok(json) = serde_json::to_string_pretty(&snapshot.data) {
                eprintln!("{}", json);
            }
        } else if let Ok(json) = serde_json::to_string(&snapshot.data) {
            eprintln!("{}", json);
        }
        eprintln!();
    }
}

// ============================================================================
// Tests
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;

    struct MockDumpable {
        name: &'static str,
        value: i32,
    }

    impl Dumpable for MockDumpable {
        fn name(&self) -> &'static str {
            self.name
        }

        fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
            Some(serde_json::json!({ "value": self.value }))
        }
    }

    struct EmptyDumpable;

    impl Dumpable for EmptyDumpable {
        fn name(&self) -> &'static str {
            "empty"
        }

        fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
            None // 常にNone
        }
    }

    #[test]
    fn test_tick_dumper_disabled_by_default() {
        let dumper = TickDumper::new(16);
        assert!(!dumper.is_enabled());
    }

    #[test]
    fn test_tick_dumper_enable_disable() {
        let dumper = TickDumper::new(16);
        dumper.enable();
        assert!(dumper.is_enabled());
        dumper.disable();
        assert!(!dumper.is_enabled());
    }

    #[test]
    fn test_tick_dumper_register() {
        let mut dumper = TickDumper::new(16);
        assert_eq!(dumper.observer_count(), 0);

        dumper.register(Arc::new(MockDumpable {
            name: "test",
            value: 42,
        }));
        assert_eq!(dumper.observer_count(), 1);
    }

    #[tokio::test]
    async fn test_tick_dumper_dump() {
        let mut dumper = TickDumper::new(16);
        dumper.register(Arc::new(MockDumpable {
            name: "test",
            value: 42,
        }));
        dumper.enable();

        let mut rx = dumper.subscribe();

        dumper.dump(1);

        let snapshot = rx.recv().await.unwrap();
        assert_eq!(snapshot.tick, 1);
        assert!(snapshot.data.contains_key("test"));
        assert_eq!(snapshot.data["test"]["value"], 42);
    }

    #[tokio::test]
    async fn test_tick_dumper_empty_snapshot_not_sent() {
        let mut dumper = TickDumper::new(16);
        dumper.register(Arc::new(EmptyDumpable));
        dumper.enable();

        let mut rx = dumper.subscribe();

        dumper.dump(1);

        // 空のスナップショットは送信されないので、タイムアウトする
        let result = tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await;
        assert!(result.is_err()); // timeout
    }

    #[tokio::test]
    async fn test_tick_dumper_dump_error() {
        let mut dumper = TickDumper::new(16);
        dumper.register(Arc::new(MockDumpable {
            name: "test",
            value: 42,
        }));
        // enabled=false でも dump_on_error=true なら送信される

        let mut rx = dumper.subscribe();

        dumper.dump_error(1, "test error");

        let snapshot = rx.recv().await.unwrap();
        assert_eq!(snapshot.tick, 1);
        assert!(snapshot.data.contains_key("_error"));
        assert_eq!(snapshot.data["_error"], "test error");
    }
}