enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! In-memory SignalBus implementation for local/desktop/testing
//!
//! This is the default, in-memory-only implementation suitable for:
//! - Local development
//! - Desktop applications
//! - Testing environments
//! - Single-process deployments
//!
//! ## Important
//!
//! This implementation is **not distributed** and does **not** provide
//! cross-process signaling. For distributed signaling, use the control plane
//! which implements authoritative messaging (Redis, Kafka, etc.).

use super::{SignalBus, SignalReceiver};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};

/// In-memory signal bus using tokio broadcast channels
///
/// Suitable for desktop apps and testing (no external dependencies).
/// Signals are best-effort and may be lost if no receivers are subscribed.
pub struct InMemorySignalBus {
    channels: Arc<RwLock<HashMap<String, broadcast::Sender<Vec<u8>>>>>,
    capacity: usize,
}

impl InMemorySignalBus {
    /// Create a new in-memory signal bus with the specified channel capacity
    pub fn new(capacity: usize) -> Self {
        Self {
            channels: Arc::new(RwLock::new(HashMap::new())),
            capacity,
        }
    }

    /// Create a new in-memory signal bus with default capacity (1024)
    #[allow(clippy::should_implement_trait)]
    pub fn default() -> Self {
        Self::new(1024)
    }
}

#[async_trait]
impl SignalBus for InMemorySignalBus {
    async fn emit(&self, channel: &str, signal: &[u8]) -> anyhow::Result<()> {
        let channels = self.channels.read().await;
        if let Some(sender) = channels.get(channel) {
            // Ignore send errors (no receivers) - this is best-effort
            let _ = sender.send(signal.to_vec());
        }
        Ok(())
    }

    async fn subscribe(&self, channel: &str) -> anyhow::Result<SignalReceiver<Vec<u8>>> {
        let mut channels = self.channels.write().await;
        let sender = channels
            .entry(channel.to_string())
            .or_insert_with(|| broadcast::channel(self.capacity).0);
        Ok(sender.subscribe())
    }

    async fn unsubscribe(&self, _channel: &str) -> anyhow::Result<()> {
        // Broadcast receivers auto-cleanup when dropped
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_inmemory_signal_bus_new() {
        let bus = InMemorySignalBus::new(100);
        assert_eq!(bus.capacity, 100);
    }

    #[tokio::test]
    async fn test_inmemory_signal_bus_default() {
        let bus = InMemorySignalBus::default();
        assert_eq!(bus.capacity, 1024);
    }

    #[tokio::test]
    async fn test_subscribe_and_receive() {
        let bus = InMemorySignalBus::default();

        // Subscribe to a channel
        let mut rx = bus.subscribe("test-channel").await.unwrap();

        // Emit a signal
        bus.emit("test-channel", b"hello world").await.unwrap();

        // Receive the signal
        let received = rx.recv().await.unwrap();
        assert_eq!(received, b"hello world".to_vec());
    }

    #[tokio::test]
    async fn test_multiple_subscribers() {
        let bus = InMemorySignalBus::default();

        // Subscribe two receivers to the same channel
        let mut rx1 = bus.subscribe("multi-channel").await.unwrap();
        let mut rx2 = bus.subscribe("multi-channel").await.unwrap();

        // Emit a signal
        bus.emit("multi-channel", b"broadcast").await.unwrap();

        // Both receivers should get the signal
        let received1 = rx1.recv().await.unwrap();
        let received2 = rx2.recv().await.unwrap();

        assert_eq!(received1, b"broadcast".to_vec());
        assert_eq!(received2, b"broadcast".to_vec());
    }

    #[tokio::test]
    async fn test_emit_without_subscribers() {
        let bus = InMemorySignalBus::default();

        // Emit to a channel with no subscribers - should not fail
        let result = bus.emit("no-subscribers", b"data").await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_emit_to_different_channels() {
        let bus = InMemorySignalBus::default();

        let mut rx1 = bus.subscribe("channel-a").await.unwrap();
        let mut rx2 = bus.subscribe("channel-b").await.unwrap();

        bus.emit("channel-a", b"msg-a").await.unwrap();
        bus.emit("channel-b", b"msg-b").await.unwrap();

        // Each receiver should only get their channel's messages
        let received1 = rx1.recv().await.unwrap();
        let received2 = rx2.recv().await.unwrap();

        assert_eq!(received1, b"msg-a".to_vec());
        assert_eq!(received2, b"msg-b".to_vec());
    }

    #[tokio::test]
    async fn test_unsubscribe() {
        let bus = InMemorySignalBus::default();

        let _rx = bus.subscribe("unsub-channel").await.unwrap();

        // Unsubscribe should succeed (no-op for broadcast)
        let result = bus.unsubscribe("unsub-channel").await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_multiple_messages() {
        let bus = InMemorySignalBus::default();

        let mut rx = bus.subscribe("multi-msg").await.unwrap();

        bus.emit("multi-msg", b"first").await.unwrap();
        bus.emit("multi-msg", b"second").await.unwrap();
        bus.emit("multi-msg", b"third").await.unwrap();

        assert_eq!(rx.recv().await.unwrap(), b"first".to_vec());
        assert_eq!(rx.recv().await.unwrap(), b"second".to_vec());
        assert_eq!(rx.recv().await.unwrap(), b"third".to_vec());
    }

    #[tokio::test]
    async fn test_late_subscriber_misses_messages() {
        let bus = InMemorySignalBus::default();

        // Subscribe first receiver
        let mut rx1 = bus.subscribe("late-sub").await.unwrap();

        // Emit first message
        bus.emit("late-sub", b"early").await.unwrap();

        // Subscribe second receiver (late)
        let mut rx2 = bus.subscribe("late-sub").await.unwrap();

        // Emit second message
        bus.emit("late-sub", b"late").await.unwrap();

        // First receiver gets both
        assert_eq!(rx1.recv().await.unwrap(), b"early".to_vec());
        assert_eq!(rx1.recv().await.unwrap(), b"late".to_vec());

        // Second receiver only gets the late message
        assert_eq!(rx2.recv().await.unwrap(), b"late".to_vec());
    }

    #[tokio::test]
    async fn test_concurrent_emit() {
        let bus = Arc::new(InMemorySignalBus::default());

        let mut rx = bus.subscribe("concurrent").await.unwrap();

        let bus1 = bus.clone();
        let bus2 = bus.clone();

        // Spawn two tasks emitting concurrently
        let h1 = tokio::spawn(async move {
            for i in 0..5 {
                bus1.emit("concurrent", format!("msg-a-{}", i).as_bytes())
                    .await
                    .unwrap();
            }
        });

        let h2 = tokio::spawn(async move {
            for i in 0..5 {
                bus2.emit("concurrent", format!("msg-b-{}", i).as_bytes())
                    .await
                    .unwrap();
            }
        });

        h1.await.unwrap();
        h2.await.unwrap();

        // Collect all received messages
        let mut received = Vec::new();
        while let Ok(msg) = rx.try_recv() {
            received.push(msg);
        }

        // Should have received 10 messages total
        assert_eq!(received.len(), 10);
    }
}