swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Action Event Publisher - 行動イベントの配信
//!
//! ActionEvent を broadcast channel で配信。
//! Subscriber(オンライン統計・永続化)が受信して処理。

use tokio::sync::broadcast;

use super::action::ActionEvent;

/// 行動イベント配信システム
///
/// broadcast channel で ActionEvent を配信。
/// Subscriber パターンでオンライン統計・永続化を分離。
pub struct ActionEventPublisher {
    tx: broadcast::Sender<ActionEvent>,
}

impl ActionEventPublisher {
    /// 新規作成
    ///
    /// # Arguments
    /// * `capacity` - broadcast channel のバッファサイズ
    ///
    /// # Returns
    /// * `ActionEventPublisher` - 配信システム
    /// * `broadcast::Receiver<ActionEvent>` - 最初の Receiver
    pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ActionEvent>) {
        let (tx, rx) = broadcast::channel(capacity);
        (Self { tx }, rx)
    }

    /// イベントを配信
    ///
    /// 全ての Subscriber に配信される。
    /// Subscriber がいない場合は何もしない(エラーにならない)。
    pub fn publish(&self, event: ActionEvent) {
        let _ = self.tx.send(event);
    }

    /// 複数イベントを一括配信
    pub fn publish_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
        for event in events {
            self.publish(event);
        }
    }

    /// 追加の Receiver を取得
    pub fn subscribe(&self) -> broadcast::Receiver<ActionEvent> {
        self.tx.subscribe()
    }

    /// Sender を取得
    pub fn sender(&self) -> broadcast::Sender<ActionEvent> {
        self.tx.clone()
    }

    /// 現在の Subscriber 数
    pub fn subscriber_count(&self) -> usize {
        self.tx.receiver_count()
    }

    // ============================================
    // 後方互換性のためのエイリアス
    // ============================================

    /// record() の別名(後方互換)
    pub fn record(&self, event: ActionEvent) {
        self.publish(event);
    }

    /// record_batch() の別名(後方互換)
    pub fn record_batch(&self, events: impl IntoIterator<Item = ActionEvent>) {
        self.publish_batch(events);
    }
}

/// ActionEventPublisher を作成するヘルパー
pub fn create_action_event_publisher(
    capacity: usize,
) -> (
    ActionEventPublisher,
    broadcast::Sender<ActionEvent>,
    broadcast::Receiver<ActionEvent>,
) {
    let (publisher, rx) = ActionEventPublisher::new(capacity);
    let tx = publisher.sender();
    (publisher, tx, rx)
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use crate::events::action::{ActionEventBuilder, ActionEventResult};
    use crate::types::WorkerId;

    fn make_event(tick: u64, action: &str) -> ActionEvent {
        ActionEventBuilder::new(tick, WorkerId(0), action)
            .result(ActionEventResult::success())
            .duration(Duration::from_millis(50))
            .build()
    }

    #[tokio::test]
    async fn test_publisher_broadcast() {
        let (publisher, mut rx) = ActionEventPublisher::new(16);

        let event = make_event(1, "CheckStatus");
        publisher.publish(event);

        let received = rx.recv().await.unwrap();
        assert_eq!(received.tick, 1);
        assert_eq!(received.action, "CheckStatus");
    }

    #[tokio::test]
    async fn test_publisher_multiple_subscribers() {
        let (publisher, mut rx1) = ActionEventPublisher::new(16);
        let mut rx2 = publisher.subscribe();

        let event = make_event(1, "Action1");
        publisher.publish(event);

        let e1 = rx1.recv().await.unwrap();
        let e2 = rx2.recv().await.unwrap();
        assert_eq!(e1.action, e2.action);
    }

    #[tokio::test]
    async fn test_publisher_no_subscriber_ok() {
        let (publisher, rx) = ActionEventPublisher::new(16);
        drop(rx);

        let event = make_event(1, "Action1");
        publisher.publish(event); // パニックしない
    }

    #[test]
    fn test_publisher_subscriber_count() {
        let (publisher, rx1) = ActionEventPublisher::new(16);
        assert_eq!(publisher.subscriber_count(), 1);

        let _rx2 = publisher.subscribe();
        assert_eq!(publisher.subscriber_count(), 2);

        drop(rx1);
        assert_eq!(publisher.subscriber_count(), 1);
    }
}