Skip to main content

qubit_event_bus/local/
local_event_bus.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Thread-safe in-process event bus.
11// qubit-style: allow coverage-cfg
12// qubit-style: allow multiple-public-types
13
14#[cfg(coverage)]
15mod coverage;
16
17use std::any::{
18    Any,
19    TypeId,
20    type_name,
21};
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::panic::{
25    self,
26    AssertUnwindSafe,
27};
28use std::sync::Arc;
29use std::thread;
30use std::time::{
31    Duration,
32    Instant,
33};
34
35use qubit_executor::{
36    ExecutorService,
37    SingleThreadScheduledExecutorService,
38};
39use qubit_thread_pool::FixedThreadPool;
40
41use crate::core::SubscriptionState;
42use crate::core::subscribe_options::{
43    DeadLetterStrategyAnyFn,
44    DeadLetterStrategyFn,
45    normalize_dead_letter_error,
46};
47use crate::{
48    AckMode,
49    Acknowledgement,
50    BatchPublishFailure,
51    BatchPublishResult,
52    DeadLetterOriginalPayload,
53    DeadLetterPayload,
54    EventBusError,
55    EventBusResult,
56    EventEnvelope,
57    EventEnvelopeMetadata,
58    IntoEventBusResult,
59    PublishOptions,
60    SubscribeOptions,
61    Subscription,
62    Topic,
63};
64
65use super::erased_subscription::ErasedSubscription;
66use super::local_event_bus_inner::{
67    LocalEventBusInner,
68    LocalEventBusRuntimeOptions,
69};
70use super::ordering_lane_key::OrderingLaneKey;
71use super::processing_task::ProcessingTask;
72use super::publisher_interceptor_entry::PublisherInterceptorEntry;
73use super::subscriber_interceptor_chain::{
74    DownstreamErrorSlot,
75    SubscriberInterceptorAnyChain,
76    SubscriberInterceptorChain,
77    create_downstream_error_slot,
78    is_recorded_downstream_error,
79};
80use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
81
82#[cfg(coverage)]
83pub use coverage::coverage_exercise_local_event_bus_defensive_paths;
84
85type HandlerFn<T> = dyn Fn(EventEnvelope<T>) -> EventBusResult<()> + Send + Sync + 'static;
86type PublisherInterceptorFn<T> = dyn PublisherInterceptor<T>;
87type SubscriberInterceptorFn<T> = dyn SubscriberInterceptor<T>;
88
89thread_local! {
90    static SUBSCRIPTION_WORKER_BUS_IDS: RefCell<Vec<usize>> = const { RefCell::new(Vec::new()) };
91}
92
93/// Event delivery state for one handler attempt.
94#[derive(Clone)]
95struct HandlerDelivery<T: Clone + Send + Sync + 'static> {
96    delivered: EventEnvelope<T>,
97    acknowledgement: Acknowledgement,
98}
99
100impl<T> HandlerDelivery<T>
101where
102    T: Clone + Send + Sync + 'static,
103{
104    /// Creates a delivered envelope with a fresh acknowledgement.
105    ///
106    /// # Parameters
107    /// - `envelope`: Original event envelope for this attempt.
108    ///
109    /// # Returns
110    /// Delivery state for one handler attempt.
111    fn new(envelope: &EventEnvelope<T>) -> Self {
112        let acknowledgement = Acknowledgement::new();
113        let delivered = envelope
114            .clone()
115            .with_acknowledgement(acknowledgement.clone());
116        Self {
117            delivered,
118            acknowledgement,
119        }
120    }
121}
122
123/// Terminal handler failure paired with the final attempt delivery.
124struct HandlerRunFailure<T: Clone + Send + Sync + 'static> {
125    error: EventBusError,
126    delivery: HandlerDelivery<T>,
127}
128
129/// Admission outcome for one local publish attempt.
130enum PublishOutcome {
131    /// The envelope reached subscriber dispatch.
132    Accepted,
133    /// A publisher interceptor intentionally dropped the envelope.
134    Dropped,
135}
136
137/// Converts publisher interceptor return values into the standard result form.
138///
139/// This trait lets simple interceptors return an updated envelope directly,
140/// return `None` to drop an event, or return an [`EventBusResult`] when the
141/// interceptor can fail.
142pub trait IntoPublisherInterceptorResult<T: Clone + Send + Sync + 'static> {
143    /// Converts the value into a publisher interceptor result.
144    ///
145    /// # Returns
146    /// `Ok(Some(envelope))` to continue publishing, `Ok(None)` to drop the
147    /// event, or an error when the interceptor failed.
148    fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>>;
149}
150
151impl<T> IntoPublisherInterceptorResult<T> for EventEnvelope<T>
152where
153    T: Clone + Send + Sync + 'static,
154{
155    fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
156        Ok(Some(self))
157    }
158}
159
160impl<T> IntoPublisherInterceptorResult<T> for Option<EventEnvelope<T>>
161where
162    T: Clone + Send + Sync + 'static,
163{
164    fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
165        Ok(self)
166    }
167}
168
169impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<EventEnvelope<T>>
170where
171    T: Clone + Send + Sync + 'static,
172{
173    fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
174        self.map(Some)
175    }
176}
177
178impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<Option<EventEnvelope<T>>>
179where
180    T: Clone + Send + Sync + 'static,
181{
182    fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
183        self
184    }
185}
186
187/// Intercepts events before they are published by a local event bus.
188///
189/// Implementors can mutate event metadata by returning a new envelope, drop an
190/// event by returning `Ok(None)`, or fail publication by returning an error.
191pub trait PublisherInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
192    /// Intercepts an outgoing event.
193    ///
194    /// # Parameters
195    /// - `envelope`: Event about to be published.
196    ///
197    /// # Returns
198    /// Updated event, dropped event marker, or interceptor failure.
199    fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>>;
200}
201
202impl<T, F, R> PublisherInterceptor<T> for F
203where
204    T: Clone + Send + Sync + 'static,
205    F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
206    R: IntoPublisherInterceptorResult<T> + 'static,
207{
208    fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>> {
209        self(envelope).into_publisher_interceptor_result()
210    }
211}
212
213/// Converts global publisher interceptor return values into the standard form.
214pub trait IntoPublisherInterceptorAnyResult {
215    /// Converts the value into a global publisher interceptor result.
216    ///
217    /// # Returns
218    /// `Ok(Some(metadata))` to continue publishing, `Ok(None)` to drop the
219    /// event, or an error when the interceptor failed.
220    fn into_publisher_interceptor_any_result(self)
221    -> EventBusResult<Option<EventEnvelopeMetadata>>;
222}
223
224impl IntoPublisherInterceptorAnyResult for EventEnvelopeMetadata {
225    fn into_publisher_interceptor_any_result(
226        self,
227    ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
228        Ok(Some(self))
229    }
230}
231
232impl IntoPublisherInterceptorAnyResult for Option<EventEnvelopeMetadata> {
233    fn into_publisher_interceptor_any_result(
234        self,
235    ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
236        Ok(self)
237    }
238}
239
240impl IntoPublisherInterceptorAnyResult for EventBusResult<EventEnvelopeMetadata> {
241    fn into_publisher_interceptor_any_result(
242        self,
243    ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
244        self.map(Some)
245    }
246}
247
248impl IntoPublisherInterceptorAnyResult for EventBusResult<Option<EventEnvelopeMetadata>> {
249    fn into_publisher_interceptor_any_result(
250        self,
251    ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
252        self
253    }
254}
255
256/// Intercepts outgoing event metadata before typed publisher interceptors run.
257///
258/// Global publisher interceptors apply to every payload type. They can mutate
259/// envelope metadata such as headers, ordering keys, and delays, or drop an
260/// event by returning `Ok(None)`.
261pub trait PublisherInterceptorAny: Send + Sync + 'static {
262    /// Intercepts outgoing type-erased metadata.
263    ///
264    /// # Parameters
265    /// - `metadata`: Event metadata cloned from the outgoing envelope.
266    ///
267    /// # Returns
268    /// Updated metadata, dropped event marker, or interceptor failure.
269    fn on_publish(
270        &self,
271        metadata: EventEnvelopeMetadata,
272    ) -> EventBusResult<Option<EventEnvelopeMetadata>>;
273}
274
275impl<F, R> PublisherInterceptorAny for F
276where
277    F: Fn(EventEnvelopeMetadata) -> R + Send + Sync + 'static,
278    R: IntoPublisherInterceptorAnyResult + 'static,
279{
280    fn on_publish(
281        &self,
282        metadata: EventEnvelopeMetadata,
283    ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
284        self(metadata).into_publisher_interceptor_any_result()
285    }
286}
287
288/// Intercepts subscriber processing with around-style control.
289///
290/// Implementors can run code before and after downstream handling by calling
291/// [`SubscriberInterceptorChain::proceed`]. Skipping `proceed` short-circuits
292/// subscriber processing.
293pub trait SubscriberInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
294    /// Intercepts an incoming event before the subscriber handler runs.
295    ///
296    /// # Parameters
297    /// - `envelope`: Event delivered to the subscriber.
298    /// - `chain`: Handle for invoking the next interceptor or final handler.
299    ///
300    /// # Returns
301    /// `Ok(())` when interception and downstream processing succeed.
302    fn on_consume(
303        &self,
304        envelope: EventEnvelope<T>,
305        chain: SubscriberInterceptorChain<T>,
306    ) -> EventBusResult<()>;
307}
308
309impl<T, F, R> SubscriberInterceptor<T> for F
310where
311    T: Clone + Send + Sync + 'static,
312    F: Fn(EventEnvelope<T>, SubscriberInterceptorChain<T>) -> R + Send + Sync + 'static,
313    R: IntoEventBusResult + 'static,
314{
315    fn on_consume(
316        &self,
317        envelope: EventEnvelope<T>,
318        chain: SubscriberInterceptorChain<T>,
319    ) -> EventBusResult<()> {
320        self(envelope, chain).into_event_bus_result()
321    }
322}
323
324/// Intercepts subscriber processing for every payload type.
325///
326/// Global subscriber interceptors receive metadata only, so they are best suited
327/// for logging, metrics, tracing, and short-circuit policies that do not need
328/// access to the typed payload.
329pub trait SubscriberInterceptorAny: Send + Sync + 'static {
330    /// Intercepts incoming event metadata before the typed handler chain runs.
331    ///
332    /// # Parameters
333    /// - `metadata`: Metadata cloned from the delivered envelope.
334    /// - `chain`: Handle for invoking downstream processing.
335    ///
336    /// # Returns
337    /// `Ok(())` when interception and downstream processing succeed.
338    fn on_consume(
339        &self,
340        metadata: EventEnvelopeMetadata,
341        chain: SubscriberInterceptorAnyChain,
342    ) -> EventBusResult<()>;
343}
344
345impl<F, R> SubscriberInterceptorAny for F
346where
347    F: Fn(EventEnvelopeMetadata, SubscriberInterceptorAnyChain) -> R + Send + Sync + 'static,
348    R: IntoEventBusResult + 'static,
349{
350    fn on_consume(
351        &self,
352        metadata: EventEnvelopeMetadata,
353        chain: SubscriberInterceptorAnyChain,
354    ) -> EventBusResult<()> {
355        self(metadata, chain).into_event_bus_result()
356    }
357}
358
359/// Thread-safe in-process event bus.
360///
361/// This backend stores subscriptions in memory and dispatches subscriber
362/// handlers on background threads. Publishing schedules work and returns after
363/// dispatch, while [`wait_for_idle`](Self::wait_for_idle) can be used by tests
364/// to wait for all handler work for a topic.
365#[derive(Clone)]
366pub struct LocalEventBus {
367    pub(crate) inner: Arc<LocalEventBusInner>,
368}
369
370impl LocalEventBus {
371    /// Creates a stopped local event bus.
372    ///
373    /// # Returns
374    /// A new event bus with no subscriptions.
375    pub fn new() -> Self {
376        Self::with_runtime_options(LocalEventBusRuntimeOptions {
377            default_publish_options: HashMap::new(),
378            default_subscribe_options: HashMap::new(),
379            default_dead_letter_strategies: HashMap::new(),
380            global_default_dead_letter_strategy: None,
381            global_publisher_interceptors: Vec::new(),
382            global_subscriber_interceptors: Vec::new(),
383            publisher_interceptors: Vec::new(),
384            subscriber_interceptors: Vec::new(),
385            subscription_handler_pool_size: default_subscription_handler_pool_size(),
386            subscription_handler_queue_capacity: None,
387        })
388    }
389
390    /// Creates and starts a local event bus.
391    ///
392    /// # Returns
393    /// A started event bus.
394    ///
395    /// # Errors
396    /// Returns startup errors from the handler executor.
397    pub fn started() -> EventBusResult<Self> {
398        let bus = Self::new();
399        bus.start()?;
400        Ok(bus)
401    }
402
403    /// Creates a stopped event bus with typed defaults and runtime options.
404    ///
405    /// # Returns
406    /// A stopped event bus.
407    pub(crate) fn with_runtime_options(options: LocalEventBusRuntimeOptions) -> Self {
408        Self {
409            inner: Arc::new(LocalEventBusInner::new(options)),
410        }
411    }
412
413    /// Starts the event bus.
414    ///
415    /// # Returns
416    /// `Ok(true)` when this call changed the bus from stopped to started.
417    ///
418    /// # Errors
419    /// Returns startup errors from the handler executor.
420    pub fn start(&self) -> EventBusResult<bool> {
421        self.inner.mark_started()
422    }
423
424    /// Shuts down the event bus.
425    ///
426    /// The method waits for currently scheduled handlers to finish and then
427    /// clears all subscriptions.
428    ///
429    /// # Returns
430    /// `true` when this call changed the bus from started to stopped.
431    ///
432    /// # Panics
433    /// Panics when called from one of this bus's subscriber worker threads. A
434    /// subscriber worker cannot wait for itself to finish. Use
435    /// [`shutdown_nonblocking`](Self::shutdown_nonblocking) or
436    /// [`shutdown_with_timeout`](Self::shutdown_with_timeout) from subscriber
437    /// handlers.
438    pub fn shutdown(&self) -> bool {
439        self.assert_not_own_subscription_worker_for_blocking_shutdown();
440        if !self.inner.mark_stopping() {
441            return false;
442        }
443        let _ = self.inner.wait_for_all_idle();
444        if let Some(executor) = self.inner.take_executor() {
445            executor.shutdown();
446            wait_for_executor_termination(&executor);
447        }
448        if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
449            delay_scheduler.shutdown();
450            wait_for_delay_scheduler_termination(&delay_scheduler);
451        }
452        self.inner.clear_subscriptions();
453        true
454    }
455
456    /// Requests shutdown without waiting for subscriber work to finish.
457    ///
458    /// The bus stops accepting publish and subscribe operations, asks the handler
459    /// executor to shut down, deactivates subscriptions, and returns immediately.
460    /// Already running handler code is not interrupted.
461    ///
462    /// # Returns
463    /// `true` when this call changed the bus from started to stopped.
464    pub fn shutdown_nonblocking(&self) -> bool {
465        let Some(executor) = self.inner.mark_stopped() else {
466            return false;
467        };
468        executor.shutdown();
469        if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
470            delay_scheduler.shutdown();
471        }
472        self.inner.clear_subscriptions();
473        true
474    }
475
476    /// Shuts down the event bus with a maximum wait duration.
477    ///
478    /// The bus stops accepting new publish and subscribe operations immediately,
479    /// then waits for scheduled subscriber work and executor workers to finish.
480    /// If the timeout elapses, subscriptions are deactivated before the timeout
481    /// error is returned.
482    ///
483    /// # Parameters
484    /// - `timeout`: Maximum duration to wait for graceful shutdown.
485    ///
486    /// # Returns
487    /// `Ok(true)` when this call changed the bus from started to stopped and
488    /// shutdown completed within the timeout. `Ok(false)` means the bus was
489    /// already stopped.
490    ///
491    /// # Errors
492    /// Returns [`EventBusError::ShutdownTimedOut`] if subscriber work or executor
493    /// workers do not finish before `timeout`.
494    pub fn shutdown_with_timeout(&self, timeout: Duration) -> EventBusResult<bool> {
495        let started_at = Instant::now();
496        if !self.inner.mark_stopping() {
497            return Ok(false);
498        }
499        let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
500            self.inner.clear_subscriptions();
501            if let Some(executor) = self.inner.take_executor() {
502                executor.shutdown();
503            }
504            if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
505                delay_scheduler.shutdown();
506            }
507            return Err(EventBusError::shutdown_timed_out(timeout));
508        };
509        if !self.inner.wait_for_all_idle_timeout(remaining)? {
510            self.inner.clear_subscriptions();
511            if let Some(executor) = self.inner.take_executor() {
512                executor.shutdown();
513            }
514            if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
515                delay_scheduler.shutdown();
516            }
517            return Err(EventBusError::shutdown_timed_out(timeout));
518        }
519        let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
520            self.inner.clear_subscriptions();
521            if let Some(executor) = self.inner.take_executor() {
522                executor.shutdown();
523            }
524            if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
525                delay_scheduler.shutdown();
526            }
527            return Err(EventBusError::shutdown_timed_out(timeout));
528        };
529        let Some(executor) = self.inner.take_executor() else {
530            if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
531                delay_scheduler.shutdown();
532            }
533            self.inner.clear_subscriptions();
534            return Ok(true);
535        };
536        executor.shutdown();
537        if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
538            delay_scheduler.shutdown();
539            if !wait_for_delay_scheduler_termination_timeout(&delay_scheduler, remaining) {
540                self.inner.clear_subscriptions();
541                return Err(EventBusError::shutdown_timed_out(timeout));
542            }
543        }
544        let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
545            self.inner.clear_subscriptions();
546            return Err(EventBusError::shutdown_timed_out(timeout));
547        };
548        if !wait_for_executor_termination_timeout(&executor, remaining) {
549            self.inner.clear_subscriptions();
550            return Err(EventBusError::shutdown_timed_out(timeout));
551        }
552        self.inner.clear_subscriptions();
553        Ok(true)
554    }
555
556    /// Registers an observer for internal background errors.
557    ///
558    /// # Parameters
559    /// - `observer`: Callback invoked when interceptors, error handlers, or
560    ///   dead-letter routing fail.
561    ///
562    /// # Returns
563    /// `Ok(())` when the observer is stored.
564    ///
565    /// # Errors
566    /// Returns a lock-poisoning error if observer state is unavailable.
567    pub fn add_error_observer<F>(&self, observer: F) -> EventBusResult<()>
568    where
569        F: Fn(&EventBusError) + Send + Sync + 'static,
570    {
571        self.inner.add_error_observer(Arc::new(observer))
572    }
573
574    /// Publishes a payload to a topic.
575    ///
576    /// # Parameters
577    /// - `topic`: Target topic.
578    /// - `payload`: Event payload.
579    ///
580    /// # Returns
581    /// `Ok(())` after subscriber work has been scheduled.
582    ///
583    /// Local dispatch is non-transactional across matching subscribers. If
584    /// scheduling fails for a later subscriber, earlier subscriber work may
585    /// already have been accepted.
586    ///
587    /// # Errors
588    /// Returns [`EventBusError::NotStarted`] if the bus is stopped.
589    pub fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
590    where
591        T: Clone + Send + Sync + 'static,
592    {
593        self.publish_with_options(topic, payload, PublishOptions::empty())
594    }
595
596    /// Publishes a payload to a topic with explicit options.
597    ///
598    /// # Parameters
599    /// - `topic`: Target topic.
600    /// - `payload`: Event payload.
601    /// - `options`: Publish options merged with factory defaults.
602    ///
603    /// # Returns
604    /// `Ok(())` after subscriber work has been scheduled.
605    ///
606    /// # Errors
607    /// Returns [`EventBusError::NotStarted`] if the bus is stopped.
608    pub fn publish_with_options<T>(
609        &self,
610        topic: &Topic<T>,
611        payload: T,
612        options: PublishOptions<T>,
613    ) -> EventBusResult<()>
614    where
615        T: Clone + Send + Sync + 'static,
616    {
617        self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
618    }
619
620    /// Publishes an existing envelope.
621    ///
622    /// # Parameters
623    /// - `envelope`: Event envelope to dispatch.
624    ///
625    /// # Returns
626    /// `Ok(())` after subscriber work has been scheduled.
627    ///
628    /// # Errors
629    /// Returns [`EventBusError::NotStarted`] if the bus is stopped.
630    pub fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
631    where
632        T: Clone + Send + Sync + 'static,
633    {
634        self.publish_envelope_with_options(envelope, PublishOptions::empty())
635    }
636
637    /// Publishes an existing envelope with options.
638    ///
639    /// # Parameters
640    /// - `envelope`: Event envelope to dispatch.
641    /// - `options`: Publish options.
642    ///
643    /// # Returns
644    /// `Ok(())` after subscriber work has been scheduled.
645    ///
646    /// # Errors
647    /// Returns [`EventBusError::NotStarted`] if the bus is stopped.
648    pub fn publish_envelope_with_options<T>(
649        &self,
650        envelope: EventEnvelope<T>,
651        options: PublishOptions<T>,
652    ) -> EventBusResult<()>
653    where
654        T: Clone + Send + Sync + 'static,
655    {
656        let options = options.merge_defaults(self.default_publish_options::<T>());
657        self.publish_envelope_with_options_internal(envelope, options, false, true)
658            .map(|_outcome| ())
659    }
660
661    /// Publishes an envelope through the local dispatch path.
662    fn publish_envelope_with_options_internal<T>(
663        &self,
664        envelope: EventEnvelope<T>,
665        options: PublishOptions<T>,
666        allow_stopping: bool,
667        require_started: bool,
668    ) -> EventBusResult<PublishOutcome>
669    where
670        T: Clone + Send + Sync + 'static,
671    {
672        if let Err(error) = self.ensure_started()
673            && require_started
674        {
675            self.observe_errors(options.notify_publish_error(&envelope, &error));
676            return Err(error);
677        }
678        if let Err(error) = validate_retry_options(options.retry_options()) {
679            self.observe_errors(options.notify_publish_error(&envelope, &error));
680            return Err(error);
681        }
682        let original_envelope = envelope.clone();
683        let envelope = match run_with_retry(options.retry_options(), || {
684            self.apply_publisher_interceptors(original_envelope.clone())
685        }) {
686            Ok(Some(envelope)) => envelope,
687            Ok(None) => return Ok(PublishOutcome::Dropped),
688            Err(error) => {
689                self.inner.observe_error(&error);
690                self.observe_errors(options.notify_publish_error(&original_envelope, &error));
691                return Err(error);
692            }
693        };
694        if let Err(error) =
695            self.dispatch_envelope(envelope.clone(), options.retry_options(), allow_stopping)
696        {
697            self.observe_errors(options.notify_publish_error(&envelope, &error));
698            return Err(error);
699        }
700        Ok(PublishOutcome::Accepted)
701    }
702
703    /// Publishes a dead-letter envelope while graceful shutdown is draining.
704    fn publish_dead_letter_envelope(
705        &self,
706        envelope: EventEnvelope<DeadLetterPayload>,
707    ) -> EventBusResult<()> {
708        let options = PublishOptions::empty()
709            .merge_defaults(self.default_publish_options::<DeadLetterPayload>());
710        self.publish_envelope_with_options_internal(envelope, options, true, false)
711            .map(|_outcome| ())
712    }
713
714    /// Publishes multiple envelopes by submitting each envelope in input order.
715    ///
716    /// This method preserves submission order only. Handler execution order is
717    /// backend-specific because local handlers can run on multiple worker
718    /// threads.
719    ///
720    /// # Parameters
721    /// - `envelopes`: Envelopes to submit in order.
722    ///
723    /// # Returns
724    /// Summary containing per-envelope successes and failures.
725    ///
726    /// # Errors
727    /// Returns lifecycle or option validation errors before the batch starts.
728    pub fn publish_all<T>(
729        &self,
730        envelopes: Vec<EventEnvelope<T>>,
731    ) -> EventBusResult<BatchPublishResult>
732    where
733        T: Clone + Send + Sync + 'static,
734    {
735        self.publish_all_with_options(envelopes, PublishOptions::empty())
736    }
737
738    /// Publishes multiple envelopes with explicit publish options.
739    ///
740    /// # Parameters
741    /// - `envelopes`: Envelopes to submit in order.
742    /// - `options`: Publish options cloned for each envelope.
743    ///
744    /// # Returns
745    /// Summary containing per-envelope successes and failures.
746    ///
747    /// # Errors
748    /// Returns lifecycle or option validation errors before the batch starts.
749    pub fn publish_all_with_options<T>(
750        &self,
751        envelopes: Vec<EventEnvelope<T>>,
752        options: PublishOptions<T>,
753    ) -> EventBusResult<BatchPublishResult>
754    where
755        T: Clone + Send + Sync + 'static,
756    {
757        self.ensure_started()?;
758        let options = options.merge_defaults(self.default_publish_options::<T>());
759        validate_retry_options(options.retry_options())?;
760        let mut result = BatchPublishResult::new(envelopes.len());
761        for (index, envelope) in envelopes.into_iter().enumerate() {
762            let event_id = envelope.id().to_string();
763            match self.publish_envelope_with_options_internal(
764                envelope,
765                options.clone(),
766                false,
767                true,
768            ) {
769                Ok(PublishOutcome::Accepted) => result.record_accepted(),
770                Ok(PublishOutcome::Dropped) => result.record_dropped(),
771                Err(error) => {
772                    result.record_failure(BatchPublishFailure::new(index, event_id, error));
773                }
774            }
775        }
776        Ok(result)
777    }
778
779    /// Subscribes a handler using default options.
780    ///
781    /// # Parameters
782    /// - `subscriber_id`: Subscriber identifier.
783    /// - `topic`: Topic to subscribe.
784    /// - `handler`: Handler invoked for matching events.
785    ///
786    /// # Returns
787    /// Subscription handle.
788    ///
789    /// # Errors
790    /// Returns an error when the bus is stopped or shared state is unavailable.
791    pub fn subscribe<T, S, F, R>(
792        &self,
793        subscriber_id: S,
794        topic: &Topic<T>,
795        handler: F,
796    ) -> EventBusResult<Subscription<T>>
797    where
798        T: Clone + Send + Sync + 'static,
799        S: Into<String>,
800        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
801        R: IntoEventBusResult + 'static,
802    {
803        self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
804    }
805
806    /// Subscribes a handler using explicit options.
807    ///
808    /// # Parameters
809    /// - `subscriber_id`: Subscriber identifier.
810    /// - `topic`: Topic to subscribe.
811    /// - `handler`: Handler invoked for matching events.
812    /// - `options`: Subscription processing options.
813    ///
814    /// # Returns
815    /// Subscription handle.
816    ///
817    /// # Errors
818    /// Returns an error when the bus is stopped, the subscriber ID is blank, or
819    /// shared state is unavailable.
820    pub fn subscribe_with_options<T, S, F, R>(
821        &self,
822        subscriber_id: S,
823        topic: &Topic<T>,
824        handler: F,
825        options: SubscribeOptions<T>,
826    ) -> EventBusResult<Subscription<T>>
827    where
828        T: Clone + Send + Sync + 'static,
829        S: Into<String>,
830        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
831        R: IntoEventBusResult + 'static,
832    {
833        self.ensure_started()?;
834        let options = options.merge_defaults(self.default_subscribe_options::<T>());
835        let subscriber_id = subscriber_id.into();
836        if subscriber_id.trim().is_empty() {
837            return Err(EventBusError::invalid_argument(
838                "subscriber_id",
839                "subscriber ID must not be blank",
840            ));
841        }
842        validate_retry_options(options.retry_options())?;
843
844        let id = self.inner.next_subscription_id();
845        let active = Arc::new(SubscriptionState::active());
846        let topic_key = topic.key();
847        let handler = Arc::new(move |event| handler(event).into_event_bus_result());
848        let handler = self.apply_subscriber_interceptors(handler)?;
849        let entry = TypedSubscriptionEntry {
850            id,
851            subscriber_id: subscriber_id.clone(),
852            topic: topic.clone(),
853            active: Arc::clone(&active),
854            handler,
855            options: options.clone(),
856        };
857        self.inner
858            .add_subscription(topic_key.clone(), Arc::new(entry))?;
859
860        Ok(Subscription {
861            id,
862            subscriber_id,
863            topic: topic.clone(),
864            topic_key,
865            options,
866            active,
867            bus: Arc::downgrade(&self.inner),
868        })
869    }
870
871    /// Subscribes a handler to a dead-letter topic.
872    ///
873    /// Dead-letter payloads are type-erased, so callers can inspect the
874    /// original topic, error metadata, and original payload through
875    /// [`DeadLetterPayload`].
876    ///
877    /// # Parameters
878    /// - `dead_letter_topic`: Topic carrying dead-letter records.
879    /// - `handler`: Handler invoked for dead-letter events.
880    /// - `options`: Subscription options merged with factory defaults.
881    ///
882    /// # Returns
883    /// Subscription handle for the dead-letter topic.
884    ///
885    /// # Errors
886    /// Returns an error when the bus is stopped, the generated subscriber ID is
887    /// invalid, or shared state is unavailable.
888    pub fn add_dead_letter_handler<F, R>(
889        &self,
890        dead_letter_topic: &Topic<DeadLetterPayload>,
891        handler: F,
892        options: SubscribeOptions<DeadLetterPayload>,
893    ) -> EventBusResult<Subscription<DeadLetterPayload>>
894    where
895        F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
896        R: IntoEventBusResult + 'static,
897    {
898        self.subscribe_with_options(
899            format!("dead-letter:{}", dead_letter_topic.name()),
900            dead_letter_topic,
901            handler,
902            options,
903        )
904    }
905
906    /// Waits until all work for a topic is idle.
907    ///
908    /// # Parameters
909    /// - `topic`: Topic to wait for.
910    ///
911    /// # Returns
912    /// `Ok(())` once the topic has no active handler work.
913    ///
914    /// # Errors
915    /// Returns a lock-poisoning error if tracker state is unavailable.
916    pub fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
917    where
918        T: 'static,
919    {
920        self.inner.wait_for_idle(&topic.key())
921    }
922
923    /// Waits until all work for a topic is idle or the timeout elapses.
924    ///
925    /// # Parameters
926    /// - `topic`: Topic to wait for.
927    /// - `timeout`: Maximum duration to wait.
928    ///
929    /// # Returns
930    /// `Ok(true)` once the topic has no active handler work, or `Ok(false)` when
931    /// the timeout elapses first.
932    ///
933    /// # Errors
934    /// Returns a lock-poisoning error if tracker state is unavailable.
935    pub fn wait_for_idle_timeout<T>(
936        &self,
937        topic: &Topic<T>,
938        timeout: Duration,
939    ) -> EventBusResult<bool>
940    where
941        T: 'static,
942    {
943        self.inner.wait_for_idle_timeout(&topic.key(), timeout)
944    }
945
946    /// Returns default publish options for a payload type.
947    ///
948    /// # Returns
949    /// Type-specific default options or empty options.
950    fn default_publish_options<T>(&self) -> PublishOptions<T>
951    where
952        T: 'static,
953    {
954        self.inner
955            .default_publish_options::<T>()
956            .unwrap_or_else(PublishOptions::empty)
957    }
958
959    /// Returns default subscribe options for a payload type.
960    ///
961    /// # Returns
962    /// Type-specific default options or empty options.
963    fn default_subscribe_options<T>(&self) -> SubscribeOptions<T>
964    where
965        T: 'static,
966    {
967        self.inner
968            .default_subscribe_options::<T>()
969            .unwrap_or_else(SubscribeOptions::empty)
970    }
971
972    /// Ensures the event bus is started.
973    ///
974    /// # Returns
975    /// `Ok(())` if started.
976    ///
977    /// # Errors
978    /// Returns [`EventBusError::NotStarted`] when the bus is stopped.
979    fn ensure_started(&self) -> EventBusResult<()> {
980        if self.inner.is_started() {
981            Ok(())
982        } else {
983            Err(EventBusError::not_started())
984        }
985    }
986
987    /// Observes internal failures produced by user callbacks.
988    ///
989    /// # Parameters
990    /// - `errors`: Callback failures to publish to registered error observers.
991    fn observe_errors(&self, errors: Vec<EventBusError>) {
992        for error in errors {
993            self.inner.observe_error(&error);
994        }
995    }
996
997    /// Panics if blocking shutdown is called from this bus's subscriber worker.
998    fn assert_not_own_subscription_worker_for_blocking_shutdown(&self) {
999        let bus_id = local_event_bus_id(&self.inner);
1000        if is_current_subscription_worker_for_bus(bus_id) {
1001            panic!(
1002                "LocalEventBus::shutdown must not be called from this bus's subscriber worker; use shutdown_nonblocking or shutdown_with_timeout"
1003            );
1004        }
1005    }
1006
1007    /// Applies matching publisher interceptors.
1008    ///
1009    /// # Parameters
1010    /// - `envelope`: Original event envelope.
1011    ///
1012    /// # Returns
1013    /// Modified envelope, or `None` when an interceptor drops the event.
1014    ///
1015    /// # Errors
1016    /// Returns lock or type-erasure errors.
1017    fn apply_publisher_interceptors<T>(
1018        &self,
1019        envelope: EventEnvelope<T>,
1020    ) -> EventBusResult<Option<EventEnvelope<T>>>
1021    where
1022        T: Clone + Send + Sync + 'static,
1023    {
1024        let mut envelope = envelope;
1025        for interceptor in self.inner.global_publisher_interceptors()? {
1026            let metadata = envelope.metadata();
1027            let metadata =
1028                match panic::catch_unwind(AssertUnwindSafe(|| interceptor.on_publish(metadata))) {
1029                    Ok(Ok(Some(metadata))) => metadata,
1030                    Ok(Ok(None)) => return Ok(None),
1031                    Ok(Err(error)) => {
1032                        return Err(EventBusError::interceptor_failed(
1033                            "publish",
1034                            error.to_string(),
1035                        ));
1036                    }
1037                    Err(_) => {
1038                        return Err(EventBusError::interceptor_failed(
1039                            "publish",
1040                            "global publisher interceptor panicked",
1041                        ));
1042                    }
1043                };
1044            envelope.apply_metadata(metadata);
1045        }
1046        let interceptors = self.inner.publisher_interceptors()?;
1047        let mut current: Option<Box<dyn Any + Send>> = Some(Box::new(envelope));
1048        for interceptor in interceptors {
1049            if interceptor.payload_type_id() == TypeId::of::<T>()
1050                && let Some(boxed) = current.take()
1051            {
1052                current = interceptor.intercept(boxed)?;
1053            }
1054        }
1055        current
1056            .map(|boxed| {
1057                boxed
1058                    .downcast::<EventEnvelope<T>>()
1059                    .map(|envelope| *envelope)
1060                    .map_err(|_| {
1061                        EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1062                    })
1063            })
1064            .transpose()
1065    }
1066
1067    /// Dispatches an envelope to currently registered subscribers.
1068    ///
1069    /// # Parameters
1070    /// - `envelope`: Envelope to dispatch.
1071    ///
1072    /// # Returns
1073    /// `Ok(())` once matching subscriber tasks have been accepted.
1074    ///
1075    /// This dispatch loop is best-effort across subscriptions: a later
1076    /// submission error does not roll back subscriber tasks accepted earlier in
1077    /// the same publish call.
1078    ///
1079    /// # Errors
1080    /// Returns subscription lookup, type-erasure, or executor submission errors.
1081    fn dispatch_envelope<T>(
1082        &self,
1083        envelope: EventEnvelope<T>,
1084        retry_options: Option<&crate::RetryOptions>,
1085        allow_stopping: bool,
1086    ) -> EventBusResult<()>
1087    where
1088        T: Clone + Send + Sync + 'static,
1089    {
1090        if !allow_stopping {
1091            self.ensure_started()?;
1092        }
1093        let subscriptions = self.inner.subscriptions_for(&envelope.topic().key())?;
1094        let mut first_error = None;
1095        for subscription in subscriptions {
1096            let subscription = Arc::clone(&subscription);
1097            if let Err(error) = run_with_retry(retry_options, || {
1098                subscription.dispatch(
1099                    Box::new(envelope.clone()),
1100                    Arc::clone(&self.inner),
1101                    allow_stopping,
1102                )
1103            }) && first_error.is_none()
1104            {
1105                first_error = Some(error);
1106            }
1107        }
1108        first_error.map_or(Ok(()), Err)
1109    }
1110
1111    /// Applies matching subscriber interceptors to a handler.
1112    ///
1113    /// # Parameters
1114    /// - `handler`: Original subscriber handler.
1115    ///
1116    /// # Returns
1117    /// Handler wrapped by registered subscriber interceptors.
1118    ///
1119    /// # Errors
1120    /// Returns lock or type-erasure errors.
1121    fn apply_subscriber_interceptors<T>(
1122        &self,
1123        handler: Arc<HandlerFn<T>>,
1124    ) -> EventBusResult<Arc<HandlerFn<T>>>
1125    where
1126        T: Clone + Send + Sync + 'static,
1127    {
1128        let interceptors = self.inner.subscriber_interceptors()?;
1129        let mut chain = Box::new(handler) as Box<dyn Any + Send + Sync>;
1130        for interceptor in interceptors.into_iter().rev() {
1131            if interceptor.payload_type_id() == TypeId::of::<T>() {
1132                chain = interceptor.wrap_handler(chain)?;
1133            }
1134        }
1135        let handler = chain
1136            .downcast::<Arc<HandlerFn<T>>>()
1137            .map(|handler| *handler)
1138            .map_err(|_| {
1139                EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
1140            })?;
1141        self.apply_global_subscriber_interceptors(handler)
1142    }
1143
1144    /// Applies global subscriber interceptors around a typed handler chain.
1145    ///
1146    /// # Parameters
1147    /// - `handler`: Typed handler chain after typed interceptors are applied.
1148    ///
1149    /// # Returns
1150    /// Handler wrapped by global subscriber interceptors.
1151    fn apply_global_subscriber_interceptors<T>(
1152        &self,
1153        handler: Arc<HandlerFn<T>>,
1154    ) -> EventBusResult<Arc<HandlerFn<T>>>
1155    where
1156        T: Clone + Send + Sync + 'static,
1157    {
1158        let mut chain = handler;
1159        for interceptor in self
1160            .inner
1161            .global_subscriber_interceptors()?
1162            .into_iter()
1163            .rev()
1164        {
1165            let next = Arc::clone(&chain);
1166            chain = Arc::new(move |event: EventEnvelope<T>| {
1167                let metadata = event.metadata();
1168                let next = Arc::clone(&next);
1169                let event_for_next = event.clone();
1170                let downstream_error = create_downstream_error_slot();
1171                let chain = SubscriberInterceptorAnyChain::with_downstream_error(
1172                    Arc::new(move || next(event_for_next.clone())),
1173                    Arc::clone(&downstream_error),
1174                );
1175                let result = panic::catch_unwind(AssertUnwindSafe(|| {
1176                    interceptor.on_consume(metadata, chain)
1177                }));
1178                normalize_subscriber_interceptor_result(
1179                    result,
1180                    &downstream_error,
1181                    "global subscriber interceptor panicked",
1182                )
1183            });
1184        }
1185        Ok(chain)
1186    }
1187}
1188
1189impl Default for LocalEventBus {
1190    /// Creates a stopped local event bus.
1191    fn default() -> Self {
1192        Self::new()
1193    }
1194}
1195
1196impl crate::EventBus for LocalEventBus {
1197    /// Starts the local event bus.
1198    fn start(&self) -> EventBusResult<bool> {
1199        Self::start(self)
1200    }
1201
1202    /// Shuts down the local event bus.
1203    fn shutdown(&self) -> bool {
1204        Self::shutdown(self)
1205    }
1206
1207    /// Publishes a payload using the local backend.
1208    fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
1209    where
1210        T: Clone + Send + Sync + 'static,
1211    {
1212        Self::publish(self, topic, payload)
1213    }
1214
1215    /// Publishes a payload with options using the local backend.
1216    fn publish_with_options<T>(
1217        &self,
1218        topic: &Topic<T>,
1219        payload: T,
1220        options: PublishOptions<T>,
1221    ) -> EventBusResult<()>
1222    where
1223        T: Clone + Send + Sync + 'static,
1224    {
1225        Self::publish_with_options(self, topic, payload, options)
1226    }
1227
1228    /// Publishes an envelope using the local backend.
1229    fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
1230    where
1231        T: Clone + Send + Sync + 'static,
1232    {
1233        Self::publish_envelope(self, envelope)
1234    }
1235
1236    /// Publishes an envelope with options using the local backend.
1237    fn publish_envelope_with_options<T>(
1238        &self,
1239        envelope: EventEnvelope<T>,
1240        options: PublishOptions<T>,
1241    ) -> EventBusResult<()>
1242    where
1243        T: Clone + Send + Sync + 'static,
1244    {
1245        Self::publish_envelope_with_options(self, envelope, options)
1246    }
1247
1248    /// Publishes a batch using the local backend.
1249    fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
1250    where
1251        T: Clone + Send + Sync + 'static,
1252    {
1253        Self::publish_all(self, envelopes)
1254    }
1255
1256    /// Publishes a batch with options using the local backend.
1257    fn publish_all_with_options<T>(
1258        &self,
1259        envelopes: Vec<EventEnvelope<T>>,
1260        options: PublishOptions<T>,
1261    ) -> EventBusResult<BatchPublishResult>
1262    where
1263        T: Clone + Send + Sync + 'static,
1264    {
1265        Self::publish_all_with_options(self, envelopes, options)
1266    }
1267
1268    /// Subscribes a handler using local backend defaults.
1269    fn subscribe<T, S, F, R>(
1270        &self,
1271        subscriber_id: S,
1272        topic: &Topic<T>,
1273        handler: F,
1274    ) -> EventBusResult<Subscription<T>>
1275    where
1276        T: Clone + Send + Sync + 'static,
1277        S: Into<String>,
1278        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
1279        R: IntoEventBusResult + 'static,
1280    {
1281        Self::subscribe(self, subscriber_id, topic, handler)
1282    }
1283
1284    /// Subscribes a handler with options using the local backend.
1285    fn subscribe_with_options<T, S, F, R>(
1286        &self,
1287        subscriber_id: S,
1288        topic: &Topic<T>,
1289        handler: F,
1290        options: SubscribeOptions<T>,
1291    ) -> EventBusResult<Subscription<T>>
1292    where
1293        T: Clone + Send + Sync + 'static,
1294        S: Into<String>,
1295        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
1296        R: IntoEventBusResult + 'static,
1297    {
1298        Self::subscribe_with_options(self, subscriber_id, topic, handler, options)
1299    }
1300
1301    /// Waits until local topic work is idle.
1302    fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
1303    where
1304        T: 'static,
1305    {
1306        Self::wait_for_idle(self, topic)
1307    }
1308
1309    /// Waits until local topic work is idle or the timeout elapses.
1310    fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
1311    where
1312        T: 'static,
1313    {
1314        Self::wait_for_idle_timeout(self, topic, timeout)
1315    }
1316}
1317
1318/// Typed publisher interceptor adapter.
1319struct TypedPublisherInterceptor<T: Clone + Send + Sync + 'static> {
1320    interceptor: Arc<PublisherInterceptorFn<T>>,
1321}
1322
1323/// Creates a type-erased publisher interceptor entry.
1324///
1325/// # Parameters
1326/// - `interceptor`: Typed publisher interceptor callback.
1327///
1328/// # Returns
1329/// Type-erased entry suitable for local bus storage.
1330pub(super) fn create_publisher_interceptor_entry<T, I>(
1331    interceptor: I,
1332) -> Arc<dyn PublisherInterceptorEntry>
1333where
1334    T: Clone + Send + Sync + 'static,
1335    I: PublisherInterceptor<T>,
1336{
1337    Arc::new(TypedPublisherInterceptor::<T> {
1338        interceptor: Arc::new(interceptor),
1339    })
1340}
1341
1342impl<T> PublisherInterceptorEntry for TypedPublisherInterceptor<T>
1343where
1344    T: Clone + Send + Sync + 'static,
1345{
1346    /// Returns the payload [`TypeId`] handled by this interceptor.
1347    fn payload_type_id(&self) -> TypeId {
1348        TypeId::of::<T>()
1349    }
1350
1351    /// Downcasts and applies the typed interceptor.
1352    fn intercept(
1353        &self,
1354        envelope: Box<dyn Any + Send>,
1355    ) -> EventBusResult<Option<Box<dyn Any + Send>>> {
1356        let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
1357            EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1358        })?;
1359        match panic::catch_unwind(AssertUnwindSafe(|| self.interceptor.on_publish(*envelope))) {
1360            Ok(Ok(envelope)) => {
1361                Ok(envelope.map(|envelope| Box::new(envelope) as Box<dyn Any + Send>))
1362            }
1363            Ok(Err(error)) => Err(EventBusError::interceptor_failed(
1364                "publish",
1365                error.to_string(),
1366            )),
1367            Err(_) => Err(EventBusError::interceptor_failed(
1368                "publish",
1369                "publisher interceptor panicked",
1370            )),
1371        }
1372    }
1373}
1374
1375/// Typed subscriber interceptor adapter.
1376struct TypedSubscriberInterceptor<T: Clone + Send + Sync + 'static> {
1377    interceptor: Arc<SubscriberInterceptorFn<T>>,
1378}
1379
1380/// Creates a type-erased subscriber interceptor entry.
1381///
1382/// # Parameters
1383/// - `interceptor`: Typed subscriber interceptor callback.
1384///
1385/// # Returns
1386/// Type-erased entry suitable for local bus storage.
1387pub(super) fn create_subscriber_interceptor_entry<T, I>(
1388    interceptor: I,
1389) -> Arc<dyn SubscriberInterceptorEntry>
1390where
1391    T: Clone + Send + Sync + 'static,
1392    I: SubscriberInterceptor<T>,
1393{
1394    Arc::new(TypedSubscriberInterceptor::<T> {
1395        interceptor: Arc::new(interceptor),
1396    })
1397}
1398
1399impl<T> SubscriberInterceptorEntry for TypedSubscriberInterceptor<T>
1400where
1401    T: Clone + Send + Sync + 'static,
1402{
1403    /// Returns the payload [`TypeId`] handled by this interceptor.
1404    fn payload_type_id(&self) -> TypeId {
1405        TypeId::of::<T>()
1406    }
1407
1408    /// Downcasts and wraps the typed handler.
1409    fn wrap_handler(
1410        &self,
1411        handler: Box<dyn Any + Send + Sync>,
1412    ) -> EventBusResult<Box<dyn Any + Send + Sync>> {
1413        let next = handler.downcast::<Arc<HandlerFn<T>>>().map_err(|_| {
1414            EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
1415        })?;
1416        let next = *next;
1417        let interceptor = Arc::clone(&self.interceptor);
1418        let wrapped: Arc<HandlerFn<T>> = Arc::new(move |event| {
1419            let downstream_error = create_downstream_error_slot();
1420            let next_chain = SubscriberInterceptorChain::with_downstream_error(
1421                Arc::clone(&next),
1422                Arc::clone(&downstream_error),
1423            );
1424            let result = panic::catch_unwind(AssertUnwindSafe(|| {
1425                interceptor.on_consume(event, next_chain)
1426            }));
1427            normalize_subscriber_interceptor_result(
1428                result,
1429                &downstream_error,
1430                "subscriber interceptor panicked",
1431            )
1432        });
1433        Ok(Box::new(wrapped))
1434    }
1435}
1436
1437/// Typed subscription entry stored in the subscription map.
1438struct TypedSubscriptionEntry<T: Clone + Send + Sync + 'static> {
1439    id: usize,
1440    subscriber_id: String,
1441    topic: Topic<T>,
1442    active: Arc<SubscriptionState>,
1443    handler: Arc<HandlerFn<T>>,
1444    options: SubscribeOptions<T>,
1445}
1446
1447impl<T> ErasedSubscription for TypedSubscriptionEntry<T>
1448where
1449    T: Clone + Send + Sync + 'static,
1450{
1451    /// Returns subscription ID.
1452    fn id(&self) -> usize {
1453        self.id
1454    }
1455
1456    /// Returns subscription priority.
1457    fn priority(&self) -> i32 {
1458        self.options.priority()
1459    }
1460
1461    /// Marks this subscription inactive.
1462    fn deactivate(&self) {
1463        self.active.deactivate();
1464    }
1465
1466    /// Downcasts and schedules handler processing.
1467    fn dispatch(
1468        &self,
1469        envelope: Box<dyn Any + Send>,
1470        bus: Arc<LocalEventBusInner>,
1471        allow_stopping: bool,
1472    ) -> EventBusResult<()> {
1473        if !self.active.is_active() {
1474            return Ok(());
1475        }
1476        let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
1477            EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1478        })?;
1479        if !self.options.try_should_handle(&envelope)? {
1480            return Ok(());
1481        }
1482        let topic_key = self.topic.key();
1483        bus.start_processing(&topic_key)?;
1484        let ordering_lane_key = envelope
1485            .ordering_key()
1486            .map(|ordering_key| OrderingLaneKey::new(topic_key.clone(), ordering_key, self.id));
1487        let delay = envelope.delay();
1488
1489        let active = Arc::clone(&self.active);
1490        let delayed_active = Arc::clone(&self.active);
1491        let handler = Arc::clone(&self.handler);
1492        let options = self.options.clone();
1493        let subscriber_id = self.subscriber_id.clone();
1494        let event_bus = LocalEventBus {
1495            inner: Arc::clone(&bus),
1496        };
1497        let bus_id = local_event_bus_id(&bus);
1498        let processing_task = ProcessingTask::new(Arc::clone(&bus), topic_key, move || {
1499            let _worker_context = SubscriptionWorkerContext::enter(bus_id);
1500            if !active.is_active() {
1501                return;
1502            }
1503            process_subscription_event(
1504                active,
1505                handler,
1506                options,
1507                subscriber_id,
1508                *envelope,
1509                event_bus,
1510            );
1511        });
1512        if let Some(ordering_lane_key) = ordering_lane_key {
1513            if let Some(delay) = delay
1514                && !delay.is_zero()
1515            {
1516                bus.submit_delayed_ordered_processing_task(
1517                    ordering_lane_key,
1518                    processing_task,
1519                    delay,
1520                    delayed_active,
1521                    allow_stopping,
1522                )
1523            } else {
1524                bus.submit_ordered_processing_task(
1525                    ordering_lane_key,
1526                    processing_task,
1527                    allow_stopping,
1528                )
1529            }
1530        } else if let Some(delay) = delay
1531            && !delay.is_zero()
1532        {
1533            bus.submit_delayed_processing_task(
1534                processing_task,
1535                delay,
1536                delayed_active,
1537                allow_stopping,
1538            )
1539        } else {
1540            bus.submit_processing_task(move || processing_task.run(), allow_stopping)
1541        }
1542    }
1543}
1544
1545/// Returns a stable in-process identifier for one local event bus inner value.
1546///
1547/// # Parameters
1548/// - `inner`: Shared event bus state.
1549///
1550/// # Returns
1551/// Pointer-sized identifier used only for thread-local worker tracking.
1552fn local_event_bus_id(inner: &Arc<LocalEventBusInner>) -> usize {
1553    Arc::as_ptr(inner) as usize
1554}
1555
1556/// Returns whether the current thread is processing work for the bus.
1557///
1558/// # Parameters
1559/// - `bus_id`: Identifier returned by [`local_event_bus_id`].
1560///
1561/// # Returns
1562/// `true` when the current thread is inside a subscriber task for the same bus.
1563fn is_current_subscription_worker_for_bus(bus_id: usize) -> bool {
1564    SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| bus_ids.borrow().contains(&bus_id))
1565}
1566
1567/// Thread-local marker for subscriber worker execution.
1568struct SubscriptionWorkerContext {
1569    bus_id: usize,
1570}
1571
1572impl SubscriptionWorkerContext {
1573    /// Marks the current thread as processing subscriber work for a bus.
1574    ///
1575    /// # Parameters
1576    /// - `bus_id`: Identifier returned by [`local_event_bus_id`].
1577    ///
1578    /// # Returns
1579    /// Guard that removes the marker on drop.
1580    fn enter(bus_id: usize) -> Self {
1581        SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
1582            bus_ids.borrow_mut().push(bus_id);
1583        });
1584        Self { bus_id }
1585    }
1586}
1587
1588impl Drop for SubscriptionWorkerContext {
1589    /// Removes this guard's bus marker from thread-local worker state.
1590    fn drop(&mut self) {
1591        SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
1592            let mut bus_ids = bus_ids.borrow_mut();
1593            if let Some(position) = bus_ids.iter().rposition(|bus_id| *bus_id == self.bus_id) {
1594                bus_ids.remove(position);
1595            }
1596        });
1597    }
1598}
1599
1600/// Processes a subscriber event on a background thread.
1601///
1602/// # Parameters
1603/// - `active`: Shared subscription activity flag.
1604/// - `handler`: Handler closure.
1605/// - `options`: Subscriber options.
1606/// - `subscriber_id`: Subscriber identifier.
1607/// - `envelope`: Event envelope.
1608/// - `event_bus`: Event bus used to publish dead letters.
1609fn process_subscription_event<T>(
1610    active: Arc<SubscriptionState>,
1611    handler: Arc<HandlerFn<T>>,
1612    options: SubscribeOptions<T>,
1613    subscriber_id: String,
1614    envelope: EventEnvelope<T>,
1615    event_bus: LocalEventBus,
1616) where
1617    T: Clone + Send + Sync + 'static,
1618{
1619    if !active.is_active() {
1620        return;
1621    }
1622    match run_handler_with_retry(&handler, &options, envelope) {
1623        Ok(delivery) => {
1624            if options.ack_mode() == AckMode::Auto && !delivery.acknowledgement.is_completed() {
1625                delivery.acknowledgement.ack();
1626            }
1627        }
1628        Err(failure) => {
1629            handle_subscription_failure(
1630                &options,
1631                &subscriber_id,
1632                &failure.delivery.delivered,
1633                &failure.error,
1634                &failure.delivery.acknowledgement,
1635                &event_bus,
1636            );
1637        }
1638    }
1639}
1640
1641/// Handles a terminal subscriber failure.
1642///
1643/// # Parameters
1644/// - `options`: Subscription options containing error handlers and DLQ policy.
1645/// - `subscriber_id`: Subscriber identifier.
1646/// - `delivered`: Delivered event envelope.
1647/// - `error`: Failure reason.
1648/// - `acknowledgement`: Shared acknowledgement state.
1649/// - `event_bus`: Bus used to publish dead-letter events.
1650fn handle_subscription_failure<T>(
1651    options: &SubscribeOptions<T>,
1652    subscriber_id: &str,
1653    delivered: &EventEnvelope<T>,
1654    error: &EventBusError,
1655    acknowledgement: &Acknowledgement,
1656    event_bus: &LocalEventBus,
1657) where
1658    T: Clone + Send + Sync + 'static,
1659{
1660    for error in options.notify_subscribe_error(subscriber_id, delivered, error, acknowledgement) {
1661        event_bus.inner.observe_error(&error);
1662    }
1663    if !acknowledgement.is_completed() {
1664        acknowledgement.nack();
1665    }
1666    if acknowledgement.is_nacked() && !delivered.is_dead_letter() {
1667        let dead_letter =
1668            create_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus);
1669        if let Some(dead_letter) = dead_letter
1670            && let Err(error) = event_bus.publish_dead_letter_envelope(dead_letter.as_dead_letter())
1671        {
1672            let observed = EventBusError::dead_letter_failed(error.to_string());
1673            event_bus.inner.observe_error(&observed);
1674        }
1675    }
1676}
1677
1678/// Creates a dead-letter envelope for a failed delivery.
1679///
1680/// # Parameters
1681/// - `options`: Subscription options containing the primary DLQ policy.
1682/// - `subscriber_id`: Subscriber identifier.
1683/// - `delivered`: Failed delivered event.
1684/// - `error`: Failure reason.
1685/// - `event_bus`: Bus containing optional factory defaults and observers.
1686///
1687/// # Returns
1688/// Dead-letter envelope when a strategy creates one.
1689fn create_dead_letter_for_failure<T>(
1690    options: &SubscribeOptions<T>,
1691    subscriber_id: &str,
1692    delivered: &EventEnvelope<T>,
1693    error: &EventBusError,
1694    event_bus: &LocalEventBus,
1695) -> Option<EventEnvelope<DeadLetterPayload>>
1696where
1697    T: Clone + Send + Sync + 'static,
1698{
1699    if options.has_dead_letter_strategy() {
1700        match options.create_dead_letter(subscriber_id, delivered, error) {
1701            Ok(dead_letter) => dead_letter,
1702            Err(error) => {
1703                event_bus.inner.observe_error(&error);
1704                None
1705            }
1706        }
1707    } else {
1708        create_default_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus)
1709    }
1710}
1711
1712/// Creates a dead-letter envelope with the factory default strategy.
1713///
1714/// # Parameters
1715/// - `options`: Subscription options passed to the strategy.
1716/// - `subscriber_id`: Subscriber identifier.
1717/// - `delivered`: Failed delivered event.
1718/// - `error`: Failure reason.
1719/// - `event_bus`: Bus containing optional factory defaults and observers.
1720///
1721/// # Returns
1722/// Dead-letter envelope when the default strategy exists and creates one.
1723fn create_default_dead_letter_for_failure<T>(
1724    options: &SubscribeOptions<T>,
1725    subscriber_id: &str,
1726    delivered: &EventEnvelope<T>,
1727    error: &EventBusError,
1728    event_bus: &LocalEventBus,
1729) -> Option<EventEnvelope<DeadLetterPayload>>
1730where
1731    T: Clone + Send + Sync + 'static,
1732{
1733    if let Some(strategy) = event_bus.inner.default_dead_letter_strategy::<T>() {
1734        return match call_dead_letter_strategy(strategy, subscriber_id, delivered, error, options) {
1735            Ok(dead_letter) => dead_letter,
1736            Err(error) => {
1737                event_bus.inner.observe_error(&error);
1738                None
1739            }
1740        };
1741    }
1742    let strategy = event_bus.inner.global_default_dead_letter_strategy()?;
1743    match call_global_dead_letter_strategy(
1744        strategy,
1745        subscriber_id,
1746        delivered.metadata(),
1747        Arc::new(delivered.payload().clone()),
1748        error,
1749    ) {
1750        Ok(dead_letter) => dead_letter,
1751        Err(error) => {
1752            event_bus.inner.observe_error(&error);
1753            None
1754        }
1755    }
1756}
1757
1758/// Calls a dead-letter strategy while converting failures into event-bus errors.
1759///
1760/// # Parameters
1761/// - `strategy`: Strategy to invoke.
1762/// - `subscriber_id`: Subscriber identifier.
1763/// - `delivered`: Failed delivered event.
1764/// - `error`: Failure reason.
1765/// - `options`: Subscription options.
1766///
1767/// # Returns
1768/// Dead-letter envelope produced by the strategy.
1769fn call_dead_letter_strategy<T>(
1770    strategy: Arc<DeadLetterStrategyFn<T>>,
1771    subscriber_id: &str,
1772    delivered: &EventEnvelope<T>,
1773    error: &EventBusError,
1774    options: &SubscribeOptions<T>,
1775) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
1776where
1777    T: Clone + Send + Sync + 'static,
1778{
1779    match panic::catch_unwind(AssertUnwindSafe(|| {
1780        strategy.create_dead_letter(subscriber_id, delivered, error, options)
1781    })) {
1782        Ok(Ok(dead_letter)) => Ok(dead_letter),
1783        Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
1784        Err(_) => Err(EventBusError::dead_letter_failed(
1785            "default dead-letter strategy panicked",
1786        )),
1787    }
1788}
1789
1790/// Calls a type-erased dead-letter strategy while normalizing failures.
1791///
1792/// # Parameters
1793/// - `strategy`: Strategy to invoke.
1794/// - `subscriber_id`: Subscriber identifier.
1795/// - `metadata`: Failed event metadata.
1796/// - `original_payload`: Type-erased cloned original payload.
1797/// - `error`: Failure reason.
1798///
1799/// # Returns
1800/// Dead-letter envelope produced by the strategy.
1801fn call_global_dead_letter_strategy(
1802    strategy: Arc<DeadLetterStrategyAnyFn>,
1803    subscriber_id: &str,
1804    metadata: EventEnvelopeMetadata,
1805    original_payload: DeadLetterOriginalPayload,
1806    error: &EventBusError,
1807) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
1808    match panic::catch_unwind(AssertUnwindSafe(|| {
1809        strategy.create_dead_letter(subscriber_id, metadata, original_payload, error)
1810    })) {
1811        Ok(Ok(dead_letter)) => Ok(dead_letter),
1812        Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
1813        Err(_) => Err(EventBusError::dead_letter_failed(
1814            "global default dead-letter strategy panicked",
1815        )),
1816    }
1817}
1818
1819/// Runs a handler with retry options.
1820///
1821/// # Parameters
1822/// - `handler`: Subscriber handler.
1823/// - `options`: Subscriber options.
1824/// - `envelope`: Original event envelope.
1825///
1826/// # Returns
1827/// Successful attempt delivery, or the final handler error with its delivery.
1828fn run_handler_with_retry<T>(
1829    handler: &Arc<HandlerFn<T>>,
1830    options: &SubscribeOptions<T>,
1831    envelope: EventEnvelope<T>,
1832) -> Result<HandlerDelivery<T>, Box<HandlerRunFailure<T>>>
1833where
1834    T: Clone + Send + Sync + 'static,
1835{
1836    let mut last_delivery = None;
1837    match run_with_retry(options.retry_options(), || {
1838        let delivery = HandlerDelivery::new(&envelope);
1839        last_delivery = Some(delivery.clone());
1840        call_handler(handler, delivery.delivered.clone())?;
1841        if delivery.acknowledgement.is_nacked() {
1842            Err(EventBusError::handler_failed("subscriber nacked the event"))
1843        } else {
1844            Ok(delivery)
1845        }
1846    }) {
1847        Ok(delivery) => Ok(delivery),
1848        Err(error) => {
1849            let delivery = match last_delivery {
1850                Some(delivery) => delivery,
1851                None => HandlerDelivery::new(&envelope),
1852            };
1853            Err(Box::new(HandlerRunFailure { error, delivery }))
1854        }
1855    }
1856}
1857
1858/// Calls a subscriber handler while converting panics into handler errors.
1859///
1860/// # Parameters
1861/// - `handler`: Subscriber handler or interceptor chain.
1862/// - `envelope`: Envelope delivered to the handler.
1863///
1864/// # Returns
1865/// Handler result, with panics converted to [`EventBusError::HandlerPanicked`].
1866fn call_handler<T>(handler: &Arc<HandlerFn<T>>, envelope: EventEnvelope<T>) -> EventBusResult<()>
1867where
1868    T: Clone + Send + Sync + 'static,
1869{
1870    match panic::catch_unwind(AssertUnwindSafe(|| handler(envelope))) {
1871        Ok(result) => result,
1872        Err(_) => Err(EventBusError::handler_panicked()),
1873    }
1874}
1875
1876/// Normalizes the result returned by a subscriber interceptor.
1877///
1878/// # Parameters
1879/// - `result`: Result of invoking the interceptor callback.
1880/// - `downstream_error`: Shared slot filled when the interceptor called `proceed`.
1881/// - `panic_message`: Message used when the interceptor callback itself panics.
1882///
1883/// # Returns
1884/// Downstream failures unchanged, or interceptor failures wrapped as
1885/// [`EventBusError::InterceptorFailed`].
1886fn normalize_subscriber_interceptor_result(
1887    result: Result<EventBusResult<()>, Box<dyn Any + Send>>,
1888    downstream_error: &DownstreamErrorSlot,
1889    panic_message: &'static str,
1890) -> EventBusResult<()> {
1891    match result {
1892        Ok(Ok(())) => Ok(()),
1893        Ok(Err(error)) if is_recorded_downstream_error(downstream_error, &error) => Err(error),
1894        Ok(Err(error)) => Err(normalize_subscriber_interceptor_error(error)),
1895        Err(_) => Err(EventBusError::interceptor_failed(
1896            "subscribe",
1897            panic_message,
1898        )),
1899    }
1900}
1901
1902/// Converts an interceptor-owned error into the public interceptor failure kind.
1903///
1904/// # Parameters
1905/// - `error`: Error returned directly by an interceptor callback.
1906///
1907/// # Returns
1908/// Existing subscribe interceptor failures are preserved; other errors are
1909/// wrapped with subscribe interceptor context.
1910fn normalize_subscriber_interceptor_error(error: EventBusError) -> EventBusError {
1911    if matches!(
1912        &error,
1913        EventBusError::InterceptorFailed { phase, .. } if *phase == "subscribe"
1914    ) {
1915        error
1916    } else {
1917        EventBusError::interceptor_failed("subscribe", error.to_string())
1918    }
1919}
1920
1921/// Runs a fallible operation with the event-bus retry options.
1922///
1923/// # Parameters
1924/// - `retry_options`: Simple event-bus retry options.
1925/// - `operation`: Operation to call for each attempt.
1926///
1927/// # Returns
1928/// Successful operation value or the final event-bus error.
1929fn run_with_retry<T, F>(
1930    retry_options: Option<&crate::RetryOptions>,
1931    operation: F,
1932) -> EventBusResult<T>
1933where
1934    F: FnMut() -> EventBusResult<T>,
1935{
1936    let Some(retry_options) = retry_options else {
1937        let mut operation = operation;
1938        return operation();
1939    };
1940    let retry = match qubit_retry::Retry::<EventBusError>::from_options(retry_options.clone()) {
1941        Ok(retry) => retry,
1942        Err(error) => {
1943            return Err(EventBusError::invalid_argument(
1944                "retry_options",
1945                error.to_string(),
1946            ));
1947        }
1948    };
1949    match retry.run(operation) {
1950        Ok(value) => Ok(value),
1951        Err(error) => match error.last_error().cloned() {
1952            Some(error) => Err(error),
1953            None => Err(EventBusError::handler_failed(error.to_string())),
1954        },
1955    }
1956}
1957
1958/// Validates retry options supported by the local backend.
1959///
1960/// # Parameters
1961/// - `retry_options`: Optional retry options to validate.
1962///
1963/// # Returns
1964/// `Ok(())` when the local backend can apply the options.
1965///
1966/// # Errors
1967/// Returns [`EventBusError::InvalidArgument`] when unsupported attempt timeout
1968/// options are configured.
1969fn validate_retry_options(retry_options: Option<&crate::RetryOptions>) -> EventBusResult<()> {
1970    if retry_options
1971        .and_then(crate::RetryOptions::attempt_timeout)
1972        .is_some()
1973    {
1974        return Err(EventBusError::invalid_argument(
1975            "retry_options",
1976            "attempt_timeout is not supported by LocalEventBus retry handling",
1977        ));
1978    }
1979    Ok(())
1980}
1981
1982/// Waits for a fixed handler executor to finish after shutdown.
1983///
1984/// # Parameters
1985/// - `executor`: Executor whose graceful shutdown has already been requested.
1986fn wait_for_executor_termination(executor: &FixedThreadPool) {
1987    while !executor.is_terminated() {
1988        thread::sleep(Duration::from_millis(1));
1989    }
1990}
1991
1992/// Waits for a fixed handler executor to finish until the timeout elapses.
1993///
1994/// # Parameters
1995/// - `executor`: Executor whose graceful shutdown has already been requested.
1996/// - `timeout`: Maximum duration to wait.
1997///
1998/// # Returns
1999/// `true` when the executor terminates before the timeout.
2000fn wait_for_executor_termination_timeout(executor: &FixedThreadPool, timeout: Duration) -> bool {
2001    let started_at = Instant::now();
2002    while !executor.is_terminated() {
2003        let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
2004            return false;
2005        };
2006        thread::sleep(remaining.min(Duration::from_millis(1)));
2007    }
2008    true
2009}
2010
2011/// Waits for a delayed task scheduler to finish after shutdown.
2012///
2013/// # Parameters
2014/// - `scheduler`: Scheduler whose graceful shutdown has already been requested.
2015fn wait_for_delay_scheduler_termination(scheduler: &SingleThreadScheduledExecutorService) {
2016    while !scheduler.is_terminated() {
2017        thread::sleep(Duration::from_millis(1));
2018    }
2019}
2020
2021/// Waits for a delayed task scheduler to finish until the timeout elapses.
2022///
2023/// # Parameters
2024/// - `scheduler`: Scheduler whose graceful shutdown has already been requested.
2025/// - `timeout`: Maximum duration to wait.
2026///
2027/// # Returns
2028/// `true` when the scheduler terminates before the timeout.
2029fn wait_for_delay_scheduler_termination_timeout(
2030    scheduler: &SingleThreadScheduledExecutorService,
2031    timeout: Duration,
2032) -> bool {
2033    let started_at = Instant::now();
2034    while !scheduler.is_terminated() {
2035        let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
2036            return false;
2037        };
2038        thread::sleep(remaining.min(Duration::from_millis(1)));
2039    }
2040    true
2041}
2042
2043/// Returns the remaining shutdown timeout.
2044///
2045/// # Parameters
2046/// - `started_at`: Time when the shutdown wait began.
2047/// - `timeout`: Total timeout budget.
2048///
2049/// # Returns
2050/// Remaining duration, or `None` when the timeout has elapsed.
2051fn remaining_shutdown_timeout(started_at: Instant, timeout: Duration) -> Option<Duration> {
2052    timeout.checked_sub(started_at.elapsed())
2053}
2054
2055/// Returns the default subscription handler worker count.
2056///
2057/// # Returns
2058/// Available CPU parallelism, or `1` if it cannot be detected.
2059fn default_subscription_handler_pool_size() -> usize {
2060    thread::available_parallelism()
2061        .map(usize::from)
2062        .unwrap_or(1)
2063}