use std::any::TypeId;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, trace};
use crate::actor::{BusMessage, EventBusActor};
use crate::error::EventBusError;
use crate::handler::{IntoHandler, SyncEventHandler};
use crate::subscription::Subscription;
use crate::types::{BusConfig, DeadLetter, Event, FailurePolicy};
pub struct EventBusBuilder {
config: BusConfig,
}
impl EventBusBuilder {
fn new() -> Self {
Self {
config: BusConfig::default(),
}
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn handler_timeout(mut self, timeout: Duration) -> Self {
self.config.handler_timeout = Some(timeout);
self
}
pub fn max_concurrent_async(mut self, max: usize) -> Self {
self.config.max_concurrent_async = Some(max);
self
}
pub fn default_failure_policy(mut self, policy: FailurePolicy) -> Self {
self.config.default_failure_policy = policy;
self
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.config.shutdown_timeout = Some(timeout);
self
}
pub fn build(self) -> EventBus {
EventBus::from_config(self.config)
}
}
#[derive(Clone)]
pub struct EventBus {
tx: mpsc::Sender<BusMessage>,
default_failure_policy: FailurePolicy,
}
impl EventBus {
pub fn new(buffer: usize) -> Self {
Self::from_config(BusConfig {
buffer_size: buffer,
..BusConfig::default()
})
}
pub fn builder() -> EventBusBuilder {
EventBusBuilder::new()
}
fn from_config(config: BusConfig) -> Self {
let (tx, rx) = mpsc::channel(config.buffer_size);
let default_failure_policy = config.default_failure_policy;
let actor = EventBusActor::new(tx.clone(), rx, &config);
tokio::spawn(actor.run());
Self { tx, default_failure_policy }
}
pub async fn subscribe<E, H, M>(&self, handler: H) -> Result<Subscription, EventBusError>
where
E: Event,
H: IntoHandler<E, M>,
{
self.subscribe_with_policy(handler, self.default_failure_policy).await
}
pub async fn subscribe_with_policy<E, H, M>(&self, handler: H, failure_policy: FailurePolicy) -> Result<Subscription, EventBusError>
where
E: Event,
H: IntoHandler<E, M>,
{
trace!("event_bus.subscribe");
let registered = handler.into_handler();
let (ack_tx, ack_rx) = oneshot::channel();
self.tx
.send(BusMessage::Subscribe {
event_type: TypeId::of::<E>(),
handler: registered.erased,
mode: registered.mode,
failure_policy,
ack: ack_tx,
})
.await
.map_err(|e| {
error!(operation = "subscribe", error = %e, "event_bus.send_failed");
EventBusError::ActorStopped
})?;
let subscription_id = ack_rx.await.map_err(|_| {
error!(operation = "subscribe", "event_bus.ack_wait_failed");
EventBusError::ActorStopped
})?;
Ok(Subscription::new(subscription_id, self.clone()))
}
pub async fn subscribe_dead_letters<H>(&self, handler: H) -> Result<Subscription, EventBusError>
where
H: SyncEventHandler<DeadLetter>,
{
let policy = FailurePolicy::default().with_dead_letter(false);
self.subscribe_with_policy::<DeadLetter, H, crate::handler::SyncMode>(handler, policy)
.await
}
pub async fn publish<E>(&self, event: E) -> Result<(), EventBusError>
where
E: Event + Clone,
{
trace!("event_bus.publish");
let (ack_tx, ack_rx) = oneshot::channel();
self.tx
.send(BusMessage::Publish {
event_type: TypeId::of::<E>(),
event: Box::new(event),
event_name: std::any::type_name::<E>(),
ack: Some(ack_tx),
})
.await
.map_err(|e| {
error!(operation = "publish", error = %e, "event_bus.send_failed");
EventBusError::ActorStopped
})?;
ack_rx.await.map_err(|_| {
error!(operation = "publish", "event_bus.ack_wait_failed");
EventBusError::ActorStopped
})
}
pub fn try_publish<E>(&self, event: E) -> Result<(), EventBusError>
where
E: Event + Clone,
{
trace!("event_bus.try_publish");
match self.tx.try_send(BusMessage::Publish {
event_type: TypeId::of::<E>(),
event: Box::new(event),
event_name: std::any::type_name::<E>(),
ack: None,
}) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => Err(EventBusError::ChannelFull),
Err(mpsc::error::TrySendError::Closed(_)) => Err(EventBusError::ActorStopped),
}
}
pub async fn unsubscribe(&self, subscription_id: crate::types::SubscriptionId) -> Result<bool, EventBusError> {
trace!(subscription_id = subscription_id.as_u64(), "event_bus.unsubscribe");
let (ack_tx, ack_rx) = oneshot::channel();
self.tx
.send(BusMessage::Unsubscribe {
subscription_id,
ack: ack_tx,
})
.await
.map_err(|e| {
error!(operation = "unsubscribe", error = %e, "event_bus.send_failed");
EventBusError::ActorStopped
})?;
ack_rx.await.map_err(|_| {
error!(operation = "unsubscribe", "event_bus.ack_wait_failed");
EventBusError::ActorStopped
})
}
pub async fn shutdown(&self) -> Result<(), EventBusError> {
trace!("event_bus.shutdown");
let (ack_tx, ack_rx) = oneshot::channel();
self.tx.send(BusMessage::Shutdown { ack: ack_tx }).await.map_err(|e| {
error!(operation = "shutdown", error = %e, "event_bus.send_failed");
EventBusError::ActorStopped
})?;
ack_rx.await.map_err(|_| {
error!(operation = "shutdown", "event_bus.ack_wait_failed");
EventBusError::ActorStopped
})?
}
}