Skip to main content

swarm_engine_core/online_stats/
subscriber.rs

1//! Stats Subscriber - ActionEvent → SwarmStats 更新
2//!
3//! ActionEvent を受信して SwarmStats をリアルタイム更新する。
4
5use std::sync::{Arc, RwLock};
6
7use tokio::sync::broadcast;
8
9use super::swarm::SwarmStats;
10use crate::events::ActionEvent;
11
12/// オンライン統計更新 Subscriber
13///
14/// ActionEvent を受信して SwarmStats に記録。
15/// Arc<RwLock<SwarmStats>> を共有することで、
16/// Selection から統計を参照可能。
17pub struct StatsSubscriber {
18    stats: Arc<RwLock<SwarmStats>>,
19    rx: broadcast::Receiver<ActionEvent>,
20}
21
22impl StatsSubscriber {
23    pub fn new(rx: broadcast::Receiver<ActionEvent>, stats: Arc<RwLock<SwarmStats>>) -> Self {
24        Self { stats, rx }
25    }
26
27    /// 新しい SwarmStats を作成して返す
28    pub fn with_new_stats(rx: broadcast::Receiver<ActionEvent>) -> (Self, Arc<RwLock<SwarmStats>>) {
29        let stats = Arc::new(RwLock::new(SwarmStats::new()));
30        let subscriber = Self::new(rx, Arc::clone(&stats));
31        (subscriber, stats)
32    }
33
34    /// 受信ループを開始(async)
35    pub async fn run(mut self) {
36        while let Ok(event) = self.rx.recv().await {
37            if let Ok(mut stats) = self.stats.write() {
38                stats.record(&event);
39            }
40        }
41    }
42
43    /// Stats への参照を取得
44    pub fn stats(&self) -> Arc<RwLock<SwarmStats>> {
45        Arc::clone(&self.stats)
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use std::time::Duration;
52
53    use super::*;
54    use crate::events::{ActionEventBuilder, ActionEventResult};
55    use crate::types::WorkerId;
56
57    fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
58        let result = if success {
59            ActionEventResult::success()
60        } else {
61            ActionEventResult::failure("error")
62        };
63
64        ActionEventBuilder::new(tick, WorkerId(0), action)
65            .result(result)
66            .duration(Duration::from_millis(50))
67            .build()
68    }
69
70    #[tokio::test]
71    async fn test_stats_subscriber() {
72        let (tx, rx) = broadcast::channel::<ActionEvent>(16);
73        let (subscriber, stats) = StatsSubscriber::with_new_stats(rx);
74
75        let handle = tokio::spawn(async move {
76            subscriber.run().await;
77        });
78
79        tx.send(make_event(1, "CheckStatus", true)).unwrap();
80        tx.send(make_event(2, "ReadLogs", true)).unwrap();
81        tx.send(make_event(3, "CheckStatus", false)).unwrap();
82
83        tokio::time::sleep(Duration::from_millis(10)).await;
84
85        {
86            let s = stats.read().unwrap();
87            assert_eq!(s.total_visits(), 3);
88            assert_eq!(s.total_successes(), 2);
89            assert_eq!(s.total_failures(), 1);
90        }
91
92        drop(tx);
93        let _ = handle.await;
94    }
95}