Skip to main content

qubit_event_bus/local/
local_event_bus_factory.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Factory for local event bus instances.
11
12use std::any::{
13    Any,
14    TypeId,
15};
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use crate::{
20    DeadLetterStrategyAnyCallback,
21    DeadLetterStrategyCallback,
22    EventBusError,
23    EventBusFactory,
24    EventBusResult,
25    LocalEventBus,
26    PublishOptions,
27    PublisherInterceptor,
28    PublisherInterceptorAny,
29    SubscribeOptions,
30    SubscriberInterceptor,
31    SubscriberInterceptorAny,
32    UnsupportedTransactionalEventBus,
33};
34
35use super::local_event_bus::{
36    create_publisher_interceptor_entry,
37    create_subscriber_interceptor_entry,
38};
39use super::local_event_bus_inner::LocalEventBusRuntimeOptions;
40use super::publisher_interceptor_entry::PublisherInterceptorEntry;
41use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
42use crate::core::subscribe_options::{
43    DeadLetterStrategyAnyFn,
44    wrap_dead_letter_strategy,
45    wrap_dead_letter_strategy_any,
46};
47
48/// Returns the default subscription handler worker count.
49///
50/// # Returns
51/// Available CPU parallelism, or `1` if it cannot be detected.
52fn default_subscription_handler_pool_size() -> usize {
53    std::thread::available_parallelism()
54        .map(usize::from)
55        .unwrap_or(1)
56}
57
58/// Factory used to create [`LocalEventBus`] instances with default options.
59pub struct LocalEventBusFactory {
60    default_publish_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
61    default_subscribe_options: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
62    default_dead_letter_strategies: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
63    global_default_dead_letter_strategy: Option<Arc<DeadLetterStrategyAnyFn>>,
64    global_publisher_interceptors: Vec<Arc<dyn PublisherInterceptorAny>>,
65    global_subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorAny>>,
66    publisher_interceptors: Vec<Arc<dyn PublisherInterceptorEntry>>,
67    subscriber_interceptors: Vec<Arc<dyn SubscriberInterceptorEntry>>,
68    subscription_handler_pool_size: usize,
69    subscription_handler_queue_capacity: Option<usize>,
70}
71
72impl Default for LocalEventBusFactory {
73    /// Creates an empty local event bus factory with default runtime options.
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl LocalEventBusFactory {
80    /// Creates an empty local event bus factory.
81    ///
82    /// # Returns
83    /// Factory with no typed defaults.
84    pub fn new() -> Self {
85        Self {
86            default_publish_options: HashMap::new(),
87            default_subscribe_options: HashMap::new(),
88            default_dead_letter_strategies: HashMap::new(),
89            global_default_dead_letter_strategy: None,
90            global_publisher_interceptors: Vec::new(),
91            global_subscriber_interceptors: Vec::new(),
92            publisher_interceptors: Vec::new(),
93            subscriber_interceptors: Vec::new(),
94            subscription_handler_pool_size: default_subscription_handler_pool_size(),
95            subscription_handler_queue_capacity: None,
96        }
97    }
98
99    /// Sets default publish options for a payload type.
100    ///
101    /// # Parameters
102    /// - `options`: Options used by default publish methods for payload `T`.
103    pub fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>)
104    where
105        T: Send + Sync + 'static,
106    {
107        self.default_publish_options
108            .insert(TypeId::of::<T>(), Arc::new(options));
109    }
110
111    /// Sets default subscribe options for a payload type.
112    ///
113    /// # Parameters
114    /// - `options`: Options used by [`LocalEventBus::subscribe`] for payload `T`.
115    pub fn set_default_subscribe_options<T>(&mut self, options: SubscribeOptions<T>)
116    where
117        T: Send + Sync + 'static,
118    {
119        self.default_subscribe_options
120            .insert(TypeId::of::<T>(), Arc::new(options));
121    }
122
123    /// Sets the default dead-letter strategy for a payload type.
124    ///
125    /// # Parameters
126    /// - `strategy`: Strategy used when subscription options do not provide one.
127    pub fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F)
128    where
129        T: Clone + Send + Sync + 'static,
130        F: DeadLetterStrategyCallback<T>,
131    {
132        let strategy = wrap_dead_letter_strategy(strategy);
133        self.default_dead_letter_strategies
134            .insert(TypeId::of::<T>(), Arc::new(strategy));
135    }
136
137    /// Sets the global default dead-letter strategy.
138    ///
139    /// # Parameters
140    /// - `strategy`: Strategy used when no subscription or typed factory
141    ///   dead-letter strategy is configured.
142    pub fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F)
143    where
144        F: DeadLetterStrategyAnyCallback,
145    {
146        self.global_default_dead_letter_strategy = Some(wrap_dead_letter_strategy_any(strategy));
147    }
148
149    /// Adds a publisher interceptor to buses created by this factory.
150    ///
151    /// # Parameters
152    /// - `interceptor`: Callback that can modify or drop outgoing envelopes.
153    ///
154    /// # Returns
155    /// `Ok(())` when the interceptor is stored.
156    pub fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
157    where
158        T: Clone + Send + Sync + 'static,
159        I: PublisherInterceptor<T>,
160    {
161        self.publisher_interceptors
162            .push(create_publisher_interceptor_entry::<T, I>(interceptor));
163        Ok(())
164    }
165
166    /// Adds a global publisher interceptor to buses created by this factory.
167    ///
168    /// # Parameters
169    /// - `interceptor`: Callback that can mutate metadata or drop any event.
170    ///
171    /// # Returns
172    /// `Ok(())` when the interceptor is stored.
173    pub fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
174    where
175        I: PublisherInterceptorAny,
176    {
177        self.global_publisher_interceptors
178            .push(Arc::new(interceptor));
179        Ok(())
180    }
181
182    /// Adds a subscriber interceptor to buses created by this factory.
183    ///
184    /// # Parameters
185    /// - `interceptor`: Callback wrapping subscriber handler execution.
186    ///
187    /// # Returns
188    /// `Ok(())` when the interceptor is stored.
189    pub fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
190    where
191        T: Clone + Send + Sync + 'static,
192        I: SubscriberInterceptor<T>,
193    {
194        self.subscriber_interceptors
195            .push(create_subscriber_interceptor_entry::<T, I>(interceptor));
196        Ok(())
197    }
198
199    /// Adds a global subscriber interceptor to buses created by this factory.
200    ///
201    /// # Parameters
202    /// - `interceptor`: Callback wrapping subscriber handling for any payload type.
203    ///
204    /// # Returns
205    /// `Ok(())` when the interceptor is stored.
206    pub fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
207    where
208        I: SubscriberInterceptorAny,
209    {
210        self.global_subscriber_interceptors
211            .push(Arc::new(interceptor));
212        Ok(())
213    }
214
215    /// Sets the subscription handler worker count for created buses.
216    ///
217    /// # Parameters
218    /// - `pool_size`: Number of worker threads used for subscriber handlers.
219    ///
220    /// # Returns
221    /// `Ok(())` when the value is stored.
222    ///
223    /// # Errors
224    /// Returns [`EventBusError::InvalidArgument`] when `pool_size` is zero.
225    pub fn set_subscription_handler_pool_size(&mut self, pool_size: usize) -> EventBusResult<()> {
226        if pool_size == 0 {
227            return Err(EventBusError::invalid_argument(
228                "pool_size",
229                "subscription handler pool size must be greater than zero",
230            ));
231        }
232        self.subscription_handler_pool_size = pool_size;
233        Ok(())
234    }
235
236    /// Sets the optional subscription handler queue capacity.
237    ///
238    /// # Parameters
239    /// - `capacity`: Maximum queued subscriber tasks, or `None` for unbounded.
240    ///
241    /// # Returns
242    /// `Ok(())` when the value is stored.
243    ///
244    /// # Errors
245    /// Returns [`EventBusError::InvalidArgument`] when a configured capacity is zero.
246    pub fn set_subscription_handler_queue_capacity(
247        &mut self,
248        capacity: Option<usize>,
249    ) -> EventBusResult<()> {
250        if capacity == Some(0) {
251            return Err(EventBusError::invalid_argument(
252                "capacity",
253                "subscription handler queue capacity must be greater than zero",
254            ));
255        }
256        self.subscription_handler_queue_capacity = capacity;
257        Ok(())
258    }
259
260    /// Creates a stopped event bus.
261    ///
262    /// # Returns
263    /// Local event bus initialized with factory defaults.
264    pub fn create(&self) -> LocalEventBus {
265        LocalEventBus::with_runtime_options(LocalEventBusRuntimeOptions {
266            default_publish_options: self.default_publish_options.clone(),
267            default_subscribe_options: self.default_subscribe_options.clone(),
268            default_dead_letter_strategies: self.default_dead_letter_strategies.clone(),
269            global_default_dead_letter_strategy: self.global_default_dead_letter_strategy.clone(),
270            global_publisher_interceptors: self.global_publisher_interceptors.clone(),
271            global_subscriber_interceptors: self.global_subscriber_interceptors.clone(),
272            publisher_interceptors: self.publisher_interceptors.clone(),
273            subscriber_interceptors: self.subscriber_interceptors.clone(),
274            subscription_handler_pool_size: self.subscription_handler_pool_size,
275            subscription_handler_queue_capacity: self.subscription_handler_queue_capacity,
276        })
277    }
278
279    /// Creates and starts an event bus.
280    ///
281    /// # Returns
282    /// Started local event bus initialized with factory defaults.
283    ///
284    /// # Errors
285    /// Returns startup errors from the handler executor.
286    pub fn create_started(&self) -> EventBusResult<LocalEventBus> {
287        let bus = self.create();
288        bus.start()?;
289        Ok(bus)
290    }
291}
292
293impl EventBusFactory for LocalEventBusFactory {
294    type Bus = LocalEventBus;
295    type TransactionalBus = UnsupportedTransactionalEventBus;
296
297    /// Local event bus does not support transactional operations.
298    fn is_transactional_supported(&self) -> bool {
299        false
300    }
301
302    /// Creates a stopped local event bus.
303    fn create(&self) -> Self::Bus {
304        Self::create(self)
305    }
306
307    /// Creates and starts a local event bus.
308    fn create_started(&self) -> EventBusResult<Self::Bus> {
309        Self::create_started(self)
310    }
311
312    /// Sets typed default publish options for local buses.
313    fn set_default_publish_options<T>(&mut self, options: PublishOptions<T>) -> EventBusResult<()>
314    where
315        T: Send + Sync + 'static,
316    {
317        Self::set_default_publish_options(self, options);
318        Ok(())
319    }
320
321    /// Sets typed default subscribe options for local buses.
322    fn set_default_subscribe_options<T>(
323        &mut self,
324        options: SubscribeOptions<T>,
325    ) -> EventBusResult<()>
326    where
327        T: Send + Sync + 'static,
328    {
329        Self::set_default_subscribe_options(self, options);
330        Ok(())
331    }
332
333    /// Sets a typed default dead-letter strategy for local buses.
334    fn set_default_dead_letter_strategy<T, F>(&mut self, strategy: F) -> EventBusResult<()>
335    where
336        T: Clone + Send + Sync + 'static,
337        F: DeadLetterStrategyCallback<T>,
338    {
339        Self::set_default_dead_letter_strategy::<T, F>(self, strategy);
340        Ok(())
341    }
342
343    /// Sets the global default dead-letter strategy for local buses.
344    fn set_global_default_dead_letter_strategy<F>(&mut self, strategy: F) -> EventBusResult<()>
345    where
346        F: DeadLetterStrategyAnyCallback,
347    {
348        Self::set_global_default_dead_letter_strategy(self, strategy);
349        Ok(())
350    }
351
352    /// Adds a typed publisher interceptor for local buses.
353    fn add_publisher_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
354    where
355        T: Clone + Send + Sync + 'static,
356        I: PublisherInterceptor<T>,
357    {
358        Self::add_publisher_interceptor::<T, I>(self, interceptor)
359    }
360
361    /// Adds a global publisher interceptor for local buses.
362    fn add_global_publisher_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
363    where
364        I: PublisherInterceptorAny,
365    {
366        Self::add_global_publisher_interceptor(self, interceptor)
367    }
368
369    /// Adds a typed subscriber interceptor for local buses.
370    fn add_subscriber_interceptor<T, I>(&mut self, interceptor: I) -> EventBusResult<()>
371    where
372        T: Clone + Send + Sync + 'static,
373        I: SubscriberInterceptor<T>,
374    {
375        Self::add_subscriber_interceptor::<T, I>(self, interceptor)
376    }
377
378    /// Adds a global subscriber interceptor for local buses.
379    fn add_global_subscriber_interceptor<I>(&mut self, interceptor: I) -> EventBusResult<()>
380    where
381        I: SubscriberInterceptorAny,
382    {
383        Self::add_global_subscriber_interceptor(self, interceptor)
384    }
385}