use std::any::{
Any,
TypeId,
};
use std::collections::HashMap;
use std::sync::Arc;
use crate::{
DeadLetterStrategyAnyCallback,
DeadLetterStrategyCallback,
EventBusError,
EventBusFactory,
EventBusResult,
LocalEventBus,
PublishOptions,
PublisherInterceptor,
PublisherInterceptorAny,
SubscribeOptions,
SubscriberInterceptor,
SubscriberInterceptorAny,
UnsupportedTransactionalEventBus,
};
use super::local_event_bus::{
create_publisher_interceptor_entry,
create_subscriber_interceptor_entry,
};
use super::local_event_bus_inner::LocalEventBusRuntimeOptions;
use super::publisher_interceptor_entry::PublisherInterceptorEntry;
use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
use crate::core::subscribe_options::{
DeadLetterStrategyAnyFn,
wrap_dead_letter_strategy,
wrap_dead_letter_strategy_any,
};
fn default_subscription_handler_pool_size() -> usize {
std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
}
pub struct LocalEventBusFactory {
default_publish_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
default_subscribe_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
default_dead_letter_strategies: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
global_default_dead_letter_strategy: Option<Arc<DeadLetterStrategyAnyFn>>,
global_publisher_interceptors: Vec<Arc<dyn PublisherInterceptorAny>>,
global_subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorAny>>,
publisher_interceptors: Vec<Arc<dyn PublisherInterceptorEntry>>,
subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorEntry>>,
subscription_handler_pool_size: usize,
subscription_handler_queue_capacity: Option<usize>,
}
impl Default for LocalEventBusFactory {
fn default() -> Self {
Self::new()
}
}
impl LocalEventBusFactory {
pub fn new() -> Self {
Self {
default_publish_options: HashMap::new(),
default_subscribe_options: HashMap::new(),
default_dead_letter_strategies: HashMap::new(),
global_default_dead_letter_strategy: None,
global_publisher_interceptors: Vec::new(),
global_subscriber_interceptors: Vec::new(),
publisher_interceptors: Vec::new(),
subscriber_interceptors: Vec::new(),
subscription_handler_pool_size: default_subscription_handler_pool_size(),
subscription_handler_queue_capacity: None,
}
}
pub fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>)
where
T: Send + Sync + 'static,
{
self.default_publish_options
.insert(TypeId::of::<T>(), Arc::new(options));
}
pub fn set_default_subscribe_options<T>(&mut self, options: SubscribeOptions<T>)
where
T: Send + Sync + 'static,
{
self.default_subscribe_options
.insert(TypeId::of::<T>(), Arc::new(options));
}
pub fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F)
where
T: Clone + Send + Sync + 'static,
F: DeadLetterStrategyCallback<T>,
{
let strategy = wrap_dead_letter_strategy(strategy);
self.default_dead_letter_strategies
.insert(TypeId::of::<T>(), Arc::new(strategy));
}
pub fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F)
where
F: DeadLetterStrategyAnyCallback,
{
self.global_default_dead_letter_strategy = Some(wrap_dead_letter_strategy_any(strategy));
}
pub fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
I: PublisherInterceptor<T>,
{
self.publisher_interceptors
.push(create_publisher_interceptor_entry::<T, I>(interceptor));
Ok(())
}
pub fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
where
I: PublisherInterceptorAny,
{
self.global_publisher_interceptors
.push(Arc::new(interceptor));
Ok(())
}
pub fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
I: SubscriberInterceptor<T>,
{
self.subscriber_interceptors
.push(create_subscriber_interceptor_entry::<T, I>(interceptor));
Ok(())
}
pub fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
where
I: SubscriberInterceptorAny,
{
self.global_subscriber_interceptors
.push(Arc::new(interceptor));
Ok(())
}
pub fn set_subscription_handler_pool_size(&mut self, pool_size: usize) -> EventBusResult<()> {
if pool_size == 0 {
return Err(EventBusError::invalid_argument(
"pool_size",
"subscription handler pool size must be greater than zero",
));
}
self.subscription_handler_pool_size = pool_size;
Ok(())
}
pub fn set_subscription_handler_queue_capacity(
&mut self,
capacity: Option<usize>,
) -> EventBusResult<()> {
if capacity == Some(0) {
return Err(EventBusError::invalid_argument(
"capacity",
"subscription handler queue capacity must be greater than zero",
));
}
self.subscription_handler_queue_capacity = capacity;
Ok(())
}
pub fn create(&self) -> LocalEventBus {
LocalEventBus::with_runtime_options(LocalEventBusRuntimeOptions {
default_publish_options: self.default_publish_options.clone(),
default_subscribe_options: self.default_subscribe_options.clone(),
default_dead_letter_strategies: self.default_dead_letter_strategies.clone(),
global_default_dead_letter_strategy: self.global_default_dead_letter_strategy.clone(),
global_publisher_interceptors: self.global_publisher_interceptors.clone(),
global_subscriber_interceptors: self.global_subscriber_interceptors.clone(),
publisher_interceptors: self.publisher_interceptors.clone(),
subscriber_interceptors: self.subscriber_interceptors.clone(),
subscription_handler_pool_size: self.subscription_handler_pool_size,
subscription_handler_queue_capacity: self.subscription_handler_queue_capacity,
})
}
pub fn create_started(&self) -> EventBusResult<LocalEventBus> {
let bus = self.create();
bus.start()?;
Ok(bus)
}
}
impl EventBusFactory for LocalEventBusFactory {
type Bus = LocalEventBus;
type TransactionalBus = UnsupportedTransactionalEventBus;
fn is_transactional_supported(&self) -> bool {
false
}
fn create(&self) -> Self::Bus {
Self::create(self)
}
fn create_started(&self) -> EventBusResult<Self::Bus> {
Self::create_started(self)
}
fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>) -> EventBusResult<()>
where
T: Send + Sync + 'static,
{
Self::set_default_publish_options(self, options);
Ok(())
}
fn set_default_subscribe_options<T>(
&mut self,
options: SubscribeOptions<T>,
) -> EventBusResult<()>
where
T: Send + Sync + 'static,
{
Self::set_default_subscribe_options(self, options);
Ok(())
}
fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
F: DeadLetterStrategyCallback<T>,
{
Self::set_default_dead_letter_strategy::<T, F>(self, strategy);
Ok(())
}
fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F) -> EventBusResult<()>
where
F: DeadLetterStrategyAnyCallback,
{
Self::set_global_default_dead_letter_strategy(self, strategy);
Ok(())
}
fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
I: PublisherInterceptor<T>,
{
Self::add_publisher_interceptor::<T, I>(self, interceptor)
}
fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
where
I: PublisherInterceptorAny,
{
Self::add_global_publisher_interceptor(self, interceptor)
}
fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
I: SubscriberInterceptor<T>,
{
Self::add_subscriber_interceptor::<T, I>(self, interceptor)
}
fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
where
I: SubscriberInterceptorAny,
{
Self::add_global_subscriber_interceptor(self, interceptor)
}
}