swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Stats Subscriber - ActionEvent → SwarmStats 更新
//!
//! ActionEvent を受信して SwarmStats をリアルタイム更新する。

use std::sync::{Arc, RwLock};

use tokio::sync::broadcast;

use super::swarm::SwarmStats;
use crate::events::ActionEvent;

/// オンライン統計更新 Subscriber
///
/// ActionEvent を受信して SwarmStats に記録。
/// Arc<RwLock<SwarmStats>> を共有することで、
/// Selection から統計を参照可能。
pub struct StatsSubscriber {
    stats: Arc<RwLock<SwarmStats>>,
    rx: broadcast::Receiver<ActionEvent>,
}

impl StatsSubscriber {
    pub fn new(rx: broadcast::Receiver<ActionEvent>, stats: Arc<RwLock<SwarmStats>>) -> Self {
        Self { stats, rx }
    }

    /// 新しい SwarmStats を作成して返す
    pub fn with_new_stats(rx: broadcast::Receiver<ActionEvent>) -> (Self, Arc<RwLock<SwarmStats>>) {
        let stats = Arc::new(RwLock::new(SwarmStats::new()));
        let subscriber = Self::new(rx, Arc::clone(&stats));
        (subscriber, stats)
    }

    /// 受信ループを開始(async)
    pub async fn run(mut self) {
        while let Ok(event) = self.rx.recv().await {
            if let Ok(mut stats) = self.stats.write() {
                stats.record(&event);
            }
        }
    }

    /// Stats への参照を取得
    pub fn stats(&self) -> Arc<RwLock<SwarmStats>> {
        Arc::clone(&self.stats)
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use crate::events::{ActionEventBuilder, ActionEventResult};
    use crate::types::WorkerId;

    fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
        let result = if success {
            ActionEventResult::success()
        } else {
            ActionEventResult::failure("error")
        };

        ActionEventBuilder::new(tick, WorkerId(0), action)
            .result(result)
            .duration(Duration::from_millis(50))
            .build()
    }

    #[tokio::test]
    async fn test_stats_subscriber() {
        let (tx, rx) = broadcast::channel::<ActionEvent>(16);
        let (subscriber, stats) = StatsSubscriber::with_new_stats(rx);

        let handle = tokio::spawn(async move {
            subscriber.run().await;
        });

        tx.send(make_event(1, "CheckStatus", true)).unwrap();
        tx.send(make_event(2, "ReadLogs", true)).unwrap();
        tx.send(make_event(3, "CheckStatus", false)).unwrap();

        tokio::time::sleep(Duration::from_millis(10)).await;

        {
            let s = stats.read().unwrap();
            assert_eq!(s.total_visits(), 3);
            assert_eq!(s.total_successes(), 2);
            assert_eq!(s.total_failures(), 1);
        }

        drop(tx);
        let _ = handle.await;
    }
}