use std::sync::{Arc, RwLock};
use tokio::sync::broadcast;
use super::swarm::SwarmStats;
use crate::events::ActionEvent;
pub struct StatsSubscriber {
stats: Arc<RwLock<SwarmStats>>,
rx: broadcast::Receiver<ActionEvent>,
}
impl StatsSubscriber {
pub fn new(rx: broadcast::Receiver<ActionEvent>, stats: Arc<RwLock<SwarmStats>>) -> Self {
Self { stats, rx }
}
pub fn with_new_stats(rx: broadcast::Receiver<ActionEvent>) -> (Self, Arc<RwLock<SwarmStats>>) {
let stats = Arc::new(RwLock::new(SwarmStats::new()));
let subscriber = Self::new(rx, Arc::clone(&stats));
(subscriber, stats)
}
pub async fn run(mut self) {
while let Ok(event) = self.rx.recv().await {
if let Ok(mut stats) = self.stats.write() {
stats.record(&event);
}
}
}
pub fn stats(&self) -> Arc<RwLock<SwarmStats>> {
Arc::clone(&self.stats)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::events::{ActionEventBuilder, ActionEventResult};
use crate::types::WorkerId;
fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
let result = if success {
ActionEventResult::success()
} else {
ActionEventResult::failure("error")
};
ActionEventBuilder::new(tick, WorkerId(0), action)
.result(result)
.duration(Duration::from_millis(50))
.build()
}
#[tokio::test]
async fn test_stats_subscriber() {
let (tx, rx) = broadcast::channel::<ActionEvent>(16);
let (subscriber, stats) = StatsSubscriber::with_new_stats(rx);
let handle = tokio::spawn(async move {
subscriber.run().await;
});
tx.send(make_event(1, "CheckStatus", true)).unwrap();
tx.send(make_event(2, "ReadLogs", true)).unwrap();
tx.send(make_event(3, "CheckStatus", false)).unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
{
let s = stats.read().unwrap();
assert_eq!(s.total_visits(), 3);
assert_eq!(s.total_successes(), 2);
assert_eq!(s.total_failures(), 1);
}
drop(tx);
let _ = handle.await;
}
}