Skip to main content

swarm_engine_core/events/
publisher.rs

1//! Action Event Publisher - 行動イベントの配信
2//!
3//! ActionEvent を broadcast channel で配信。
4//! Subscriber(オンライン統計・永続化)が受信して処理。
5
6use tokio::sync::broadcast;
7
8use super::action::ActionEvent;
9
10/// 行動イベント配信システム
11///
12/// broadcast channel で ActionEvent を配信。
13/// Subscriber パターンでオンライン統計・永続化を分離。
14pub struct ActionEventPublisher {
15    tx: broadcast::Sender<ActionEvent>,
16}
17
18impl ActionEventPublisher {
19    /// 新規作成
20    ///
21    /// # Arguments
22    /// * `capacity` - broadcast channel のバッファサイズ
23    ///
24    /// # Returns
25    /// * `ActionEventPublisher` - 配信システム
26    /// * `broadcast::Receiver<ActionEvent>` - 最初の Receiver
27    pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ActionEvent>) {
28        let (tx, rx) = broadcast::channel(capacity);
29        (Self { tx }, rx)
30    }
31
32    /// イベントを配信
33    ///
34    /// 全ての Subscriber に配信される。
35    /// Subscriber がいない場合は何もしない(エラーにならない)。
36    pub fn publish(&self, event: ActionEvent) {
37        let _ = self.tx.send(event);
38    }
39
40    /// 複数イベントを一括配信
41    pub fn publish_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
42        for event in events {
43            self.publish(event);
44        }
45    }
46
47    /// 追加の Receiver を取得
48    pub fn subscribe(&self) -> broadcast::Receiver<ActionEvent> {
49        self.tx.subscribe()
50    }
51
52    /// Sender を取得
53    pub fn sender(&self) -> broadcast::Sender<ActionEvent> {
54        self.tx.clone()
55    }
56
57    /// 現在の Subscriber 数
58    pub fn subscriber_count(&self) -> usize {
59        self.tx.receiver_count()
60    }
61
62    // ============================================
63    // 後方互換性のためのエイリアス
64    // ============================================
65
66    /// record() の別名(後方互換)
67    pub fn record(&self, event: ActionEvent) {
68        self.publish(event);
69    }
70
71    /// record_batch() の別名(後方互換)
72    pub fn record_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
73        self.publish_batch(events);
74    }
75}
76
77/// ActionEventPublisher を作成するヘルパー
78pub fn create_action_event_publisher(
79    capacity: usize,
80) -> (
81    ActionEventPublisher,
82    broadcast::Sender<ActionEvent>,
83    broadcast::Receiver<ActionEvent>,
84) {
85    let (publisher, rx) = ActionEventPublisher::new(capacity);
86    let tx = publisher.sender();
87    (publisher, tx, rx)
88}
89
90#[cfg(test)]
91mod tests {
92    use std::time::Duration;
93
94    use super::*;
95    use crate::events::action::{ActionEventBuilder, ActionEventResult};
96    use crate::types::WorkerId;
97
98    fn make_event(tick: u64, action: &str) -> ActionEvent {
99        ActionEventBuilder::new(tick, WorkerId(0), action)
100            .result(ActionEventResult::success())
101            .duration(Duration::from_millis(50))
102            .build()
103    }
104
105    #[tokio::test]
106    async fn test_publisher_broadcast() {
107        let (publisher, mut rx) = ActionEventPublisher::new(16);
108
109        let event = make_event(1, "CheckStatus");
110        publisher.publish(event);
111
112        let received = rx.recv().await.unwrap();
113        assert_eq!(received.tick, 1);
114        assert_eq!(received.action, "CheckStatus");
115    }
116
117    #[tokio::test]
118    async fn test_publisher_multiple_subscribers() {
119        let (publisher, mut rx1) = ActionEventPublisher::new(16);
120        let mut rx2 = publisher.subscribe();
121
122        let event = make_event(1, "Action1");
123        publisher.publish(event);
124
125        let e1 = rx1.recv().await.unwrap();
126        let e2 = rx2.recv().await.unwrap();
127        assert_eq!(e1.action, e2.action);
128    }
129
130    #[tokio::test]
131    async fn test_publisher_no_subscriber_ok() {
132        let (publisher, rx) = ActionEventPublisher::new(16);
133        drop(rx);
134
135        let event = make_event(1, "Action1");
136        publisher.publish(event); // パニックしない
137    }
138
139    #[test]
140    fn test_publisher_subscriber_count() {
141        let (publisher, rx1) = ActionEventPublisher::new(16);
142        assert_eq!(publisher.subscriber_count(), 1);
143
144        let _rx2 = publisher.subscribe();
145        assert_eq!(publisher.subscriber_count(), 2);
146
147        drop(rx1);
148        assert_eq!(publisher.subscriber_count(), 1);
149    }
150}