swarm_engine_core/events/
publisher.rs1use tokio::sync::broadcast;
7
8use super::action::ActionEvent;
9
10pub struct ActionEventPublisher {
15 tx: broadcast::Sender<ActionEvent>,
16}
17
18impl ActionEventPublisher {
19 pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ActionEvent>) {
28 let (tx, rx) = broadcast::channel(capacity);
29 (Self { tx }, rx)
30 }
31
32 pub fn publish(&self, event: ActionEvent) {
37 let _ = self.tx.send(event);
38 }
39
40 pub fn publish_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
42 for event in events {
43 self.publish(event);
44 }
45 }
46
47 pub fn subscribe(&self) -> broadcast::Receiver<ActionEvent> {
49 self.tx.subscribe()
50 }
51
52 pub fn sender(&self) -> broadcast::Sender<ActionEvent> {
54 self.tx.clone()
55 }
56
57 pub fn subscriber_count(&self) -> usize {
59 self.tx.receiver_count()
60 }
61
62 pub fn record(&self, event: ActionEvent) {
68 self.publish(event);
69 }
70
71 pub fn record_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
73 self.publish_batch(events);
74 }
75}
76
77pub fn create_action_event_publisher(
79 capacity: usize,
80) -> (
81 ActionEventPublisher,
82 broadcast::Sender<ActionEvent>,
83 broadcast::Receiver<ActionEvent>,
84) {
85 let (publisher, rx) = ActionEventPublisher::new(capacity);
86 let tx = publisher.sender();
87 (publisher, tx, rx)
88}
89
90#[cfg(test)]
91mod tests {
92 use std::time::Duration;
93
94 use super::*;
95 use crate::events::action::{ActionEventBuilder, ActionEventResult};
96 use crate::types::WorkerId;
97
98 fn make_event(tick: u64, action: &str) -> ActionEvent {
99 ActionEventBuilder::new(tick, WorkerId(0), action)
100 .result(ActionEventResult::success())
101 .duration(Duration::from_millis(50))
102 .build()
103 }
104
105 #[tokio::test]
106 async fn test_publisher_broadcast() {
107 let (publisher, mut rx) = ActionEventPublisher::new(16);
108
109 let event = make_event(1, "CheckStatus");
110 publisher.publish(event);
111
112 let received = rx.recv().await.unwrap();
113 assert_eq!(received.tick, 1);
114 assert_eq!(received.action, "CheckStatus");
115 }
116
117 #[tokio::test]
118 async fn test_publisher_multiple_subscribers() {
119 let (publisher, mut rx1) = ActionEventPublisher::new(16);
120 let mut rx2 = publisher.subscribe();
121
122 let event = make_event(1, "Action1");
123 publisher.publish(event);
124
125 let e1 = rx1.recv().await.unwrap();
126 let e2 = rx2.recv().await.unwrap();
127 assert_eq!(e1.action, e2.action);
128 }
129
130 #[tokio::test]
131 async fn test_publisher_no_subscriber_ok() {
132 let (publisher, rx) = ActionEventPublisher::new(16);
133 drop(rx);
134
135 let event = make_event(1, "Action1");
136 publisher.publish(event); }
138
139 #[test]
140 fn test_publisher_subscriber_count() {
141 let (publisher, rx1) = ActionEventPublisher::new(16);
142 assert_eq!(publisher.subscriber_count(), 1);
143
144 let _rx2 = publisher.subscribe();
145 assert_eq!(publisher.subscriber_count(), 2);
146
147 drop(rx1);
148 assert_eq!(publisher.subscriber_count(), 1);
149 }
150}