#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
use crate::{
rc_map::{InsertError, RcMap},
subscription::Subscription,
};
use async_broadcast::{Sender, broadcast};
use std::sync::Arc;
pub const DEFAULT_TOPIC_CAPACITY: usize = 1000;
mod rc_map;
mod subscription;
#[derive(thiserror::Error, Debug)]
pub enum PublishError {
#[error("Failed to publish message to topic, it was unexpectedly closed: '{0}'")]
ChannelClosed(Arc<str>),
#[error(
"Failed to publish message to topic, the topic is full and can't handle more messages: '{0}'"
)]
CapacityOverflow(Arc<str>),
}
#[derive(Clone)]
pub struct EventBus {
inner: RcMap<Arc<str>, Sender<Arc<[u8]>>>,
topic_capacity: usize,
}
impl Default for EventBus {
fn default() -> Self {
EventBus {
inner: RcMap::new(),
topic_capacity: DEFAULT_TOPIC_CAPACITY,
}
}
}
impl EventBus {
pub fn new() -> EventBus {
Self::default()
}
pub fn new_with_topic_capacity(topic_capacity: usize) -> EventBus {
let mut bus = EventBus::new();
bus.topic_capacity = topic_capacity;
bus
}
pub fn subscribe(&self, topic: &str) -> Subscription {
let (tx, rx) = broadcast(self.topic_capacity);
match self.inner.insert(topic.into(), tx) {
Ok(object_ref) => {
Subscription::new_with_rx(object_ref, rx)
}
Err(InsertError::AlreadyExists(_key, object_ref)) => Subscription::from(object_ref),
}
}
pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(object_ref) = self.inner.get(topic.into()) else {
return Ok(());
};
let tx = object_ref.value();
let result = tx.try_broadcast(Arc::from(data));
match result {
Ok(_) => Ok(()),
Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()),
Err(async_broadcast::TrySendError::Closed(_)) => {
Err(PublishError::ChannelClosed(topic.into()))
}
Err(async_broadcast::TrySendError::Full(_)) => {
Err(PublishError::CapacityOverflow(topic.into()))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
async fn get_next_message(sub: &mut Subscription) -> String {
let payload = sub.next().await.expect("Stream unexpectedly closed");
String::from_utf8_lossy(&payload).to_string()
}
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_pub_sub() {
let event_bus = EventBus::new();
let topic = "test_simple";
let expected_message = "Hello EventBus";
let mut subscription = event_bus.subscribe(topic);
let task_handle = tokio::spawn(async move { get_next_message(&mut subscription).await });
event_bus
.publish(topic, expected_message.as_bytes())
.unwrap();
let received = task_handle
.await
.expect("Failed to receive result from task");
assert_eq!(received, expected_message);
}
}