use tokio::sync::broadcast;
use super::action::ActionEvent;
pub struct ActionEventPublisher {
tx: broadcast::Sender<ActionEvent>,
}
impl ActionEventPublisher {
pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ActionEvent>) {
let (tx, rx) = broadcast::channel(capacity);
(Self { tx }, rx)
}
pub fn publish(&self, event: ActionEvent) {
let _ = self.tx.send(event);
}
pub fn publish_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
for event in events {
self.publish(event);
}
}
pub fn subscribe(&self) -> broadcast::Receiver<ActionEvent> {
self.tx.subscribe()
}
pub fn sender(&self) -> broadcast::Sender<ActionEvent> {
self.tx.clone()
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn record(&self, event: ActionEvent) {
self.publish(event);
}
pub fn record_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
self.publish_batch(events);
}
}
pub fn create_action_event_publisher(
capacity: usize,
) -> (
ActionEventPublisher,
broadcast::Sender<ActionEvent>,
broadcast::Receiver<ActionEvent>,
) {
let (publisher, rx) = ActionEventPublisher::new(capacity);
let tx = publisher.sender();
(publisher, tx, rx)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::events::action::{ActionEventBuilder, ActionEventResult};
use crate::types::WorkerId;
fn make_event(tick: u64, action: &str) -> ActionEvent {
ActionEventBuilder::new(tick, WorkerId(0), action)
.result(ActionEventResult::success())
.duration(Duration::from_millis(50))
.build()
}
#[tokio::test]
async fn test_publisher_broadcast() {
let (publisher, mut rx) = ActionEventPublisher::new(16);
let event = make_event(1, "CheckStatus");
publisher.publish(event);
let received = rx.recv().await.unwrap();
assert_eq!(received.tick, 1);
assert_eq!(received.action, "CheckStatus");
}
#[tokio::test]
async fn test_publisher_multiple_subscribers() {
let (publisher, mut rx1) = ActionEventPublisher::new(16);
let mut rx2 = publisher.subscribe();
let event = make_event(1, "Action1");
publisher.publish(event);
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert_eq!(e1.action, e2.action);
}
#[tokio::test]
async fn test_publisher_no_subscriber_ok() {
let (publisher, rx) = ActionEventPublisher::new(16);
drop(rx);
let event = make_event(1, "Action1");
publisher.publish(event); }
#[test]
fn test_publisher_subscriber_count() {
let (publisher, rx1) = ActionEventPublisher::new(16);
assert_eq!(publisher.subscriber_count(), 1);
let _rx2 = publisher.subscribe();
assert_eq!(publisher.subscriber_count(), 2);
drop(rx1);
assert_eq!(publisher.subscriber_count(), 1);
}
}