use std::time::Duration;
use crate::deps::Deps;
use crate::error::{ConfigError, EventBusError};
use crate::handler::{DeadLetterDescriptor, HandlerDescriptor};
use crate::types::{BusConfig, SubscriptionPolicy};
use super::EventBus;
pub struct EventBusBuilder {
config: BusConfig,
handlers: Vec<Box<dyn HandlerDescriptor>>,
dead_letter_handlers: Vec<Box<dyn DeadLetterDescriptor>>,
deps: Deps,
}
impl std::fmt::Debug for EventBusBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBusBuilder")
.field("config", &self.config)
.field("handlers", &self.handlers.len())
.field("dead_letter_handlers", &self.dead_letter_handlers.len())
.field("deps", &self.deps)
.finish()
}
}
impl EventBusBuilder {
pub(super) fn new() -> Self {
Self {
config: BusConfig::default(),
handlers: Vec::new(),
dead_letter_handlers: Vec::new(),
deps: Deps::new(),
}
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn handler_timeout(mut self, timeout: Duration) -> Self {
self.config.handler_timeout = Some(timeout);
self
}
pub fn max_concurrent_async(mut self, max: usize) -> Self {
self.config.max_concurrent_async = Some(max);
self
}
pub fn default_subscription_policy(mut self, policy: SubscriptionPolicy) -> Self {
self.config.default_subscription_policy = policy;
self
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.config.shutdown_timeout = Some(timeout);
self
}
pub fn handler(mut self, handler: impl HandlerDescriptor) -> Self {
self.handlers.push(Box::new(handler));
self
}
pub fn dead_letter(mut self, handler: impl DeadLetterDescriptor) -> Self {
self.dead_letter_handlers.push(Box::new(handler));
self
}
pub fn deps(mut self, deps: Deps) -> Self {
self.deps = deps;
self
}
pub async fn build(self) -> Result<EventBus, EventBusError> {
if self.config.buffer_size == 0 {
return Err(ConfigError::ZeroBufferSize.into());
}
if self.config.max_concurrent_async == Some(0) {
return Err(ConfigError::ZeroConcurrency.into());
}
let bus = EventBus::from_config(self.config);
for descriptor in &self.handlers {
let _ = descriptor.register(&bus, &self.deps).await?;
}
for descriptor in &self.dead_letter_handlers {
let _ = descriptor.register_dead_letter(&bus, &self.deps).await?;
}
Ok(bus)
}
}