use crate::error::{DomainError, DomainResult as Result};
use crate::eventing::EventBus;
use crate::persist::SerializedEvent;
use async_trait::async_trait;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
#[derive(Clone)]
pub struct InMemoryEventBus {
tx: broadcast::Sender<SerializedEvent>,
}
impl InMemoryEventBus {
pub fn new(capacity: usize) -> Self {
let (tx, _rx) = broadcast::channel(capacity);
Self { tx }
}
}
#[async_trait]
impl EventBus for InMemoryEventBus {
async fn publish(&self, event: &SerializedEvent) -> Result<()> {
let _ = self.tx.send(event.clone());
Ok(())
}
async fn subscribe(&self) -> BoxStream<'static, Result<SerializedEvent>> {
let rx = self.tx.subscribe();
let stream =
BroadcastStream::new(rx).map(|r| r.map_err(|e| DomainError::event_bus(e.to_string())));
Box::pin(stream)
}
}