1use std::any::{
13 Any,
14 TypeId,
15};
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use crate::{
20 DeadLetterStrategyAnyCallback,
21 DeadLetterStrategyCallback,
22 EventBusError,
23 EventBusFactory,
24 EventBusResult,
25 LocalEventBus,
26 PublishOptions,
27 PublisherInterceptor,
28 PublisherInterceptorAny,
29 SubscribeOptions,
30 SubscriberInterceptor,
31 SubscriberInterceptorAny,
32 UnsupportedTransactionalEventBus,
33};
34
35use super::local_event_bus::{
36 create_publisher_interceptor_entry,
37 create_subscriber_interceptor_entry,
38};
39use super::local_event_bus_inner::LocalEventBusRuntimeOptions;
40use super::publisher_interceptor_entry::PublisherInterceptorEntry;
41use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
42use crate::core::subscribe_options::{
43 DeadLetterStrategyAnyFn,
44 wrap_dead_letter_strategy,
45 wrap_dead_letter_strategy_any,
46};
47
48fn default_subscription_handler_pool_size() -> usize {
53 std::thread::available_parallelism()
54 .map(usize::from)
55 .unwrap_or(1)
56}
57
58pub struct LocalEventBusFactory {
60 default_publish_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
61 default_subscribe_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
62 default_dead_letter_strategies: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
63 global_default_dead_letter_strategy: Option<Arc<DeadLetterStrategyAnyFn>>,
64 global_publisher_interceptors: Vec<Arc<dyn PublisherInterceptorAny>>,
65 global_subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorAny>>,
66 publisher_interceptors: Vec<Arc<dyn PublisherInterceptorEntry>>,
67 subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorEntry>>,
68 subscription_handler_pool_size: usize,
69 subscription_handler_queue_capacity: Option<usize>,
70}
71
72impl Default for LocalEventBusFactory {
73 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl LocalEventBusFactory {
80 pub fn new() -> Self {
85 Self {
86 default_publish_options: HashMap::new(),
87 default_subscribe_options: HashMap::new(),
88 default_dead_letter_strategies: HashMap::new(),
89 global_default_dead_letter_strategy: None,
90 global_publisher_interceptors: Vec::new(),
91 global_subscriber_interceptors: Vec::new(),
92 publisher_interceptors: Vec::new(),
93 subscriber_interceptors: Vec::new(),
94 subscription_handler_pool_size: default_subscription_handler_pool_size(),
95 subscription_handler_queue_capacity: None,
96 }
97 }
98
99 pub fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>)
104 where
105 T: Send + Sync + 'static,
106 {
107 self.default_publish_options
108 .insert(TypeId::of::<T>(), Arc::new(options));
109 }
110
111 pub fn set_default_subscribe_options<T>(&mut self, options: SubscribeOptions<T>)
116 where
117 T: Send + Sync + 'static,
118 {
119 self.default_subscribe_options
120 .insert(TypeId::of::<T>(), Arc::new(options));
121 }
122
123 pub fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F)
128 where
129 T: Clone + Send + Sync + 'static,
130 F: DeadLetterStrategyCallback<T>,
131 {
132 let strategy = wrap_dead_letter_strategy(strategy);
133 self.default_dead_letter_strategies
134 .insert(TypeId::of::<T>(), Arc::new(strategy));
135 }
136
137 pub fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F)
143 where
144 F: DeadLetterStrategyAnyCallback,
145 {
146 self.global_default_dead_letter_strategy = Some(wrap_dead_letter_strategy_any(strategy));
147 }
148
149 pub fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
157 where
158 T: Clone + Send + Sync + 'static,
159 I: PublisherInterceptor<T>,
160 {
161 self.publisher_interceptors
162 .push(create_publisher_interceptor_entry::<T, I>(interceptor));
163 Ok(())
164 }
165
166 pub fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
174 where
175 I: PublisherInterceptorAny,
176 {
177 self.global_publisher_interceptors
178 .push(Arc::new(interceptor));
179 Ok(())
180 }
181
182 pub fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
190 where
191 T: Clone + Send + Sync + 'static,
192 I: SubscriberInterceptor<T>,
193 {
194 self.subscriber_interceptors
195 .push(create_subscriber_interceptor_entry::<T, I>(interceptor));
196 Ok(())
197 }
198
199 pub fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
207 where
208 I: SubscriberInterceptorAny,
209 {
210 self.global_subscriber_interceptors
211 .push(Arc::new(interceptor));
212 Ok(())
213 }
214
215 pub fn set_subscription_handler_pool_size(&mut self, pool_size: usize) -> EventBusResult<()> {
226 if pool_size == 0 {
227 return Err(EventBusError::invalid_argument(
228 "pool_size",
229 "subscription handler pool size must be greater than zero",
230 ));
231 }
232 self.subscription_handler_pool_size = pool_size;
233 Ok(())
234 }
235
236 pub fn set_subscription_handler_queue_capacity(
247 &mut self,
248 capacity: Option<usize>,
249 ) -> EventBusResult<()> {
250 if capacity == Some(0) {
251 return Err(EventBusError::invalid_argument(
252 "capacity",
253 "subscription handler queue capacity must be greater than zero",
254 ));
255 }
256 self.subscription_handler_queue_capacity = capacity;
257 Ok(())
258 }
259
260 pub fn create(&self) -> LocalEventBus {
265 LocalEventBus::with_runtime_options(LocalEventBusRuntimeOptions {
266 default_publish_options: self.default_publish_options.clone(),
267 default_subscribe_options: self.default_subscribe_options.clone(),
268 default_dead_letter_strategies: self.default_dead_letter_strategies.clone(),
269 global_default_dead_letter_strategy: self.global_default_dead_letter_strategy.clone(),
270 global_publisher_interceptors: self.global_publisher_interceptors.clone(),
271 global_subscriber_interceptors: self.global_subscriber_interceptors.clone(),
272 publisher_interceptors: self.publisher_interceptors.clone(),
273 subscriber_interceptors: self.subscriber_interceptors.clone(),
274 subscription_handler_pool_size: self.subscription_handler_pool_size,
275 subscription_handler_queue_capacity: self.subscription_handler_queue_capacity,
276 })
277 }
278
279 pub fn create_started(&self) -> EventBusResult<LocalEventBus> {
287 let bus = self.create();
288 bus.start()?;
289 Ok(bus)
290 }
291}
292
293impl EventBusFactory for LocalEventBusFactory {
294 type Bus = LocalEventBus;
295 type TransactionalBus = UnsupportedTransactionalEventBus;
296
297 fn is_transactional_supported(&self) -> bool {
299 false
300 }
301
302 fn create(&self) -> Self::Bus {
304 Self::create(self)
305 }
306
307 fn create_started(&self) -> EventBusResult<Self::Bus> {
309 Self::create_started(self)
310 }
311
312 fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>) -> EventBusResult<()>
314 where
315 T: Send + Sync + 'static,
316 {
317 Self::set_default_publish_options(self, options);
318 Ok(())
319 }
320
321 fn set_default_subscribe_options<T>(
323 &mut self,
324 options: SubscribeOptions<T>,
325 ) -> EventBusResult<()>
326 where
327 T: Send + Sync + 'static,
328 {
329 Self::set_default_subscribe_options(self, options);
330 Ok(())
331 }
332
333 fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F) -> EventBusResult<()>
335 where
336 T: Clone + Send + Sync + 'static,
337 F: DeadLetterStrategyCallback<T>,
338 {
339 Self::set_default_dead_letter_strategy::<T, F>(self, strategy);
340 Ok(())
341 }
342
343 fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F) -> EventBusResult<()>
345 where
346 F: DeadLetterStrategyAnyCallback,
347 {
348 Self::set_global_default_dead_letter_strategy(self, strategy);
349 Ok(())
350 }
351
352 fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
354 where
355 T: Clone + Send + Sync + 'static,
356 I: PublisherInterceptor<T>,
357 {
358 Self::add_publisher_interceptor::<T, I>(self, interceptor)
359 }
360
361 fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
363 where
364 I: PublisherInterceptorAny,
365 {
366 Self::add_global_publisher_interceptor(self, interceptor)
367 }
368
369 fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
371 where
372 T: Clone + Send + Sync + 'static,
373 I: SubscriberInterceptor<T>,
374 {
375 Self::add_subscriber_interceptor::<T, I>(self, interceptor)
376 }
377
378 fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
380 where
381 I: SubscriberInterceptorAny,
382 {
383 Self::add_global_subscriber_interceptor(self, interceptor)
384 }
385}