swarm_engine_core/online_stats/
subscriber.rs1use std::sync::{Arc, RwLock};
6
7use tokio::sync::broadcast;
8
9use super::swarm::SwarmStats;
10use crate::events::ActionEvent;
11
12pub struct StatsSubscriber {
18 stats: Arc<RwLock<SwarmStats>>,
19 rx: broadcast::Receiver<ActionEvent>,
20}
21
22impl StatsSubscriber {
23 pub fn new(rx: broadcast::Receiver<ActionEvent>, stats: Arc<RwLock<SwarmStats>>) -> Self {
24 Self { stats, rx }
25 }
26
27 pub fn with_new_stats(rx: broadcast::Receiver<ActionEvent>) -> (Self, Arc<RwLock<SwarmStats>>) {
29 let stats = Arc::new(RwLock::new(SwarmStats::new()));
30 let subscriber = Self::new(rx, Arc::clone(&stats));
31 (subscriber, stats)
32 }
33
34 pub async fn run(mut self) {
36 while let Ok(event) = self.rx.recv().await {
37 if let Ok(mut stats) = self.stats.write() {
38 stats.record(&event);
39 }
40 }
41 }
42
43 pub fn stats(&self) -> Arc<RwLock<SwarmStats>> {
45 Arc::clone(&self.stats)
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use std::time::Duration;
52
53 use super::*;
54 use crate::events::{ActionEventBuilder, ActionEventResult};
55 use crate::types::WorkerId;
56
57 fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
58 let result = if success {
59 ActionEventResult::success()
60 } else {
61 ActionEventResult::failure("error")
62 };
63
64 ActionEventBuilder::new(tick, WorkerId(0), action)
65 .result(result)
66 .duration(Duration::from_millis(50))
67 .build()
68 }
69
70 #[tokio::test]
71 async fn test_stats_subscriber() {
72 let (tx, rx) = broadcast::channel::<ActionEvent>(16);
73 let (subscriber, stats) = StatsSubscriber::with_new_stats(rx);
74
75 let handle = tokio::spawn(async move {
76 subscriber.run().await;
77 });
78
79 tx.send(make_event(1, "CheckStatus", true)).unwrap();
80 tx.send(make_event(2, "ReadLogs", true)).unwrap();
81 tx.send(make_event(3, "CheckStatus", false)).unwrap();
82
83 tokio::time::sleep(Duration::from_millis(10)).await;
84
85 {
86 let s = stats.read().unwrap();
87 assert_eq!(s.total_visits(), 3);
88 assert_eq!(s.total_successes(), 2);
89 assert_eq!(s.total_failures(), 1);
90 }
91
92 drop(tx);
93 let _ = handle.await;
94 }
95}