1#[cfg(coverage)]
15mod coverage;
16
17use std::any::{
18 Any,
19 TypeId,
20 type_name,
21};
22use std::cell::RefCell;
23use std::collections::HashMap;
24use std::panic::{
25 self,
26 AssertUnwindSafe,
27};
28use std::sync::Arc;
29use std::thread;
30use std::time::{
31 Duration,
32 Instant,
33};
34
35use qubit_executor::{
36 ExecutorService,
37 SingleThreadScheduledExecutorService,
38};
39use qubit_thread_pool::FixedThreadPool;
40
41use crate::core::SubscriptionState;
42use crate::core::subscribe_options::{
43 DeadLetterStrategyAnyFn,
44 DeadLetterStrategyFn,
45 normalize_dead_letter_error,
46};
47use crate::{
48 AckMode,
49 Acknowledgement,
50 BatchPublishFailure,
51 BatchPublishResult,
52 DeadLetterOriginalPayload,
53 DeadLetterPayload,
54 EventBusError,
55 EventBusResult,
56 EventEnvelope,
57 EventEnvelopeMetadata,
58 IntoEventBusResult,
59 PublishOptions,
60 SubscribeOptions,
61 Subscription,
62 Topic,
63};
64
65use super::erased_subscription::ErasedSubscription;
66use super::local_event_bus_inner::{
67 LocalEventBusInner,
68 LocalEventBusRuntimeOptions,
69};
70use super::ordering_lane_key::OrderingLaneKey;
71use super::processing_task::ProcessingTask;
72use super::publisher_interceptor_entry::PublisherInterceptorEntry;
73use super::subscriber_interceptor_chain::{
74 DownstreamErrorSlot,
75 SubscriberInterceptorAnyChain,
76 SubscriberInterceptorChain,
77 create_downstream_error_slot,
78 is_recorded_downstream_error,
79};
80use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
81
82#[cfg(coverage)]
83pub use coverage::coverage_exercise_local_event_bus_defensive_paths;
84
85type HandlerFn<T> = dyn Fn(EventEnvelope<T>) -> EventBusResult<()> + Send + Sync + 'static;
86type PublisherInterceptorFn<T> = dyn PublisherInterceptor<T>;
87type SubscriberInterceptorFn<T> = dyn SubscriberInterceptor<T>;
88
89thread_local! {
90 static SUBSCRIPTION_WORKER_BUS_IDS: RefCell<Vec<usize>> = const { RefCell::new(Vec::new()) };
91}
92
93#[derive(Clone)]
95struct HandlerDelivery<T: Clone + Send + Sync + 'static> {
96 delivered: EventEnvelope<T>,
97 acknowledgement: Acknowledgement,
98}
99
100impl<T> HandlerDelivery<T>
101where
102 T: Clone + Send + Sync + 'static,
103{
104 fn new(envelope: &EventEnvelope<T>) -> Self {
112 let acknowledgement = Acknowledgement::new();
113 let delivered = envelope
114 .clone()
115 .with_acknowledgement(acknowledgement.clone());
116 Self {
117 delivered,
118 acknowledgement,
119 }
120 }
121}
122
123struct HandlerRunFailure<T: Clone + Send + Sync + 'static> {
125 error: EventBusError,
126 delivery: HandlerDelivery<T>,
127}
128
129enum PublishOutcome {
131 Accepted,
133 Dropped,
135}
136
137pub trait IntoPublisherInterceptorResult<T: Clone + Send + Sync + 'static> {
143 fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>>;
149}
150
151impl<T> IntoPublisherInterceptorResult<T> for EventEnvelope<T>
152where
153 T: Clone + Send + Sync + 'static,
154{
155 fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
156 Ok(Some(self))
157 }
158}
159
160impl<T> IntoPublisherInterceptorResult<T> for Option<EventEnvelope<T>>
161where
162 T: Clone + Send + Sync + 'static,
163{
164 fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
165 Ok(self)
166 }
167}
168
169impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<EventEnvelope<T>>
170where
171 T: Clone + Send + Sync + 'static,
172{
173 fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
174 self.map(Some)
175 }
176}
177
178impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<Option<EventEnvelope<T>>>
179where
180 T: Clone + Send + Sync + 'static,
181{
182 fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
183 self
184 }
185}
186
187pub trait PublisherInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
192 fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>>;
200}
201
202impl<T, F, R> PublisherInterceptor<T> for F
203where
204 T: Clone + Send + Sync + 'static,
205 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
206 R: IntoPublisherInterceptorResult<T> + 'static,
207{
208 fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>> {
209 self(envelope).into_publisher_interceptor_result()
210 }
211}
212
213pub trait IntoPublisherInterceptorAnyResult {
215 fn into_publisher_interceptor_any_result(self)
221 -> EventBusResult<Option<EventEnvelopeMetadata>>;
222}
223
224impl IntoPublisherInterceptorAnyResult for EventEnvelopeMetadata {
225 fn into_publisher_interceptor_any_result(
226 self,
227 ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
228 Ok(Some(self))
229 }
230}
231
232impl IntoPublisherInterceptorAnyResult for Option<EventEnvelopeMetadata> {
233 fn into_publisher_interceptor_any_result(
234 self,
235 ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
236 Ok(self)
237 }
238}
239
240impl IntoPublisherInterceptorAnyResult for EventBusResult<EventEnvelopeMetadata> {
241 fn into_publisher_interceptor_any_result(
242 self,
243 ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
244 self.map(Some)
245 }
246}
247
248impl IntoPublisherInterceptorAnyResult for EventBusResult<Option<EventEnvelopeMetadata>> {
249 fn into_publisher_interceptor_any_result(
250 self,
251 ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
252 self
253 }
254}
255
256pub trait PublisherInterceptorAny: Send + Sync + 'static {
262 fn on_publish(
270 &self,
271 metadata: EventEnvelopeMetadata,
272 ) -> EventBusResult<Option<EventEnvelopeMetadata>>;
273}
274
275impl<F, R> PublisherInterceptorAny for F
276where
277 F: Fn(EventEnvelopeMetadata) -> R + Send + Sync + 'static,
278 R: IntoPublisherInterceptorAnyResult + 'static,
279{
280 fn on_publish(
281 &self,
282 metadata: EventEnvelopeMetadata,
283 ) -> EventBusResult<Option<EventEnvelopeMetadata>> {
284 self(metadata).into_publisher_interceptor_any_result()
285 }
286}
287
288pub trait SubscriberInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
294 fn on_consume(
303 &self,
304 envelope: EventEnvelope<T>,
305 chain: SubscriberInterceptorChain<T>,
306 ) -> EventBusResult<()>;
307}
308
309impl<T, F, R> SubscriberInterceptor<T> for F
310where
311 T: Clone + Send + Sync + 'static,
312 F: Fn(EventEnvelope<T>, SubscriberInterceptorChain<T>) -> R + Send + Sync + 'static,
313 R: IntoEventBusResult + 'static,
314{
315 fn on_consume(
316 &self,
317 envelope: EventEnvelope<T>,
318 chain: SubscriberInterceptorChain<T>,
319 ) -> EventBusResult<()> {
320 self(envelope, chain).into_event_bus_result()
321 }
322}
323
324pub trait SubscriberInterceptorAny: Send + Sync + 'static {
330 fn on_consume(
339 &self,
340 metadata: EventEnvelopeMetadata,
341 chain: SubscriberInterceptorAnyChain,
342 ) -> EventBusResult<()>;
343}
344
345impl<F, R> SubscriberInterceptorAny for F
346where
347 F: Fn(EventEnvelopeMetadata, SubscriberInterceptorAnyChain) -> R + Send + Sync + 'static,
348 R: IntoEventBusResult + 'static,
349{
350 fn on_consume(
351 &self,
352 metadata: EventEnvelopeMetadata,
353 chain: SubscriberInterceptorAnyChain,
354 ) -> EventBusResult<()> {
355 self(metadata, chain).into_event_bus_result()
356 }
357}
358
359#[derive(Clone)]
366pub struct LocalEventBus {
367 pub(crate) inner: Arc<LocalEventBusInner>,
368}
369
370impl LocalEventBus {
371 pub fn new() -> Self {
376 Self::with_runtime_options(LocalEventBusRuntimeOptions {
377 default_publish_options: HashMap::new(),
378 default_subscribe_options: HashMap::new(),
379 default_dead_letter_strategies: HashMap::new(),
380 global_default_dead_letter_strategy: None,
381 global_publisher_interceptors: Vec::new(),
382 global_subscriber_interceptors: Vec::new(),
383 publisher_interceptors: Vec::new(),
384 subscriber_interceptors: Vec::new(),
385 subscription_handler_pool_size: default_subscription_handler_pool_size(),
386 subscription_handler_queue_capacity: None,
387 })
388 }
389
390 pub fn started() -> EventBusResult<Self> {
398 let bus = Self::new();
399 bus.start()?;
400 Ok(bus)
401 }
402
403 pub(crate) fn with_runtime_options(options: LocalEventBusRuntimeOptions) -> Self {
408 Self {
409 inner: Arc::new(LocalEventBusInner::new(options)),
410 }
411 }
412
413 pub fn start(&self) -> EventBusResult<bool> {
421 self.inner.mark_started()
422 }
423
424 pub fn shutdown(&self) -> bool {
439 self.assert_not_own_subscription_worker_for_blocking_shutdown();
440 if !self.inner.mark_stopping() {
441 return false;
442 }
443 let _ = self.inner.wait_for_all_idle();
444 if let Some(executor) = self.inner.take_executor() {
445 executor.shutdown();
446 wait_for_executor_termination(&executor);
447 }
448 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
449 delay_scheduler.shutdown();
450 wait_for_delay_scheduler_termination(&delay_scheduler);
451 }
452 self.inner.clear_subscriptions();
453 true
454 }
455
456 pub fn shutdown_nonblocking(&self) -> bool {
465 let Some(executor) = self.inner.mark_stopped() else {
466 return false;
467 };
468 executor.shutdown();
469 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
470 delay_scheduler.shutdown();
471 }
472 self.inner.clear_subscriptions();
473 true
474 }
475
476 pub fn shutdown_with_timeout(&self, timeout: Duration) -> EventBusResult<bool> {
495 let started_at = Instant::now();
496 if !self.inner.mark_stopping() {
497 return Ok(false);
498 }
499 let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
500 self.inner.clear_subscriptions();
501 if let Some(executor) = self.inner.take_executor() {
502 executor.shutdown();
503 }
504 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
505 delay_scheduler.shutdown();
506 }
507 return Err(EventBusError::shutdown_timed_out(timeout));
508 };
509 if !self.inner.wait_for_all_idle_timeout(remaining)? {
510 self.inner.clear_subscriptions();
511 if let Some(executor) = self.inner.take_executor() {
512 executor.shutdown();
513 }
514 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
515 delay_scheduler.shutdown();
516 }
517 return Err(EventBusError::shutdown_timed_out(timeout));
518 }
519 let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
520 self.inner.clear_subscriptions();
521 if let Some(executor) = self.inner.take_executor() {
522 executor.shutdown();
523 }
524 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
525 delay_scheduler.shutdown();
526 }
527 return Err(EventBusError::shutdown_timed_out(timeout));
528 };
529 let Some(executor) = self.inner.take_executor() else {
530 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
531 delay_scheduler.shutdown();
532 }
533 self.inner.clear_subscriptions();
534 return Ok(true);
535 };
536 executor.shutdown();
537 if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
538 delay_scheduler.shutdown();
539 if !wait_for_delay_scheduler_termination_timeout(&delay_scheduler, remaining) {
540 self.inner.clear_subscriptions();
541 return Err(EventBusError::shutdown_timed_out(timeout));
542 }
543 }
544 let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
545 self.inner.clear_subscriptions();
546 return Err(EventBusError::shutdown_timed_out(timeout));
547 };
548 if !wait_for_executor_termination_timeout(&executor, remaining) {
549 self.inner.clear_subscriptions();
550 return Err(EventBusError::shutdown_timed_out(timeout));
551 }
552 self.inner.clear_subscriptions();
553 Ok(true)
554 }
555
556 pub fn add_error_observer<F>(&self, observer: F) -> EventBusResult<()>
568 where
569 F: Fn(&EventBusError) + Send + Sync + 'static,
570 {
571 self.inner.add_error_observer(Arc::new(observer))
572 }
573
574 pub fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
590 where
591 T: Clone + Send + Sync + 'static,
592 {
593 self.publish_with_options(topic, payload, PublishOptions::empty())
594 }
595
596 pub fn publish_with_options<T>(
609 &self,
610 topic: &Topic<T>,
611 payload: T,
612 options: PublishOptions<T>,
613 ) -> EventBusResult<()>
614 where
615 T: Clone + Send + Sync + 'static,
616 {
617 self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
618 }
619
620 pub fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
631 where
632 T: Clone + Send + Sync + 'static,
633 {
634 self.publish_envelope_with_options(envelope, PublishOptions::empty())
635 }
636
637 pub fn publish_envelope_with_options<T>(
649 &self,
650 envelope: EventEnvelope<T>,
651 options: PublishOptions<T>,
652 ) -> EventBusResult<()>
653 where
654 T: Clone + Send + Sync + 'static,
655 {
656 let options = options.merge_defaults(self.default_publish_options::<T>());
657 self.publish_envelope_with_options_internal(envelope, options, false, true)
658 .map(|_outcome| ())
659 }
660
661 fn publish_envelope_with_options_internal<T>(
663 &self,
664 envelope: EventEnvelope<T>,
665 options: PublishOptions<T>,
666 allow_stopping: bool,
667 require_started: bool,
668 ) -> EventBusResult<PublishOutcome>
669 where
670 T: Clone + Send + Sync + 'static,
671 {
672 if let Err(error) = self.ensure_started()
673 && require_started
674 {
675 self.observe_errors(options.notify_publish_error(&envelope, &error));
676 return Err(error);
677 }
678 if let Err(error) = validate_retry_options(options.retry_options()) {
679 self.observe_errors(options.notify_publish_error(&envelope, &error));
680 return Err(error);
681 }
682 let original_envelope = envelope.clone();
683 let envelope = match run_with_retry(options.retry_options(), || {
684 self.apply_publisher_interceptors(original_envelope.clone())
685 }) {
686 Ok(Some(envelope)) => envelope,
687 Ok(None) => return Ok(PublishOutcome::Dropped),
688 Err(error) => {
689 self.inner.observe_error(&error);
690 self.observe_errors(options.notify_publish_error(&original_envelope, &error));
691 return Err(error);
692 }
693 };
694 if let Err(error) =
695 self.dispatch_envelope(envelope.clone(), options.retry_options(), allow_stopping)
696 {
697 self.observe_errors(options.notify_publish_error(&envelope, &error));
698 return Err(error);
699 }
700 Ok(PublishOutcome::Accepted)
701 }
702
703 fn publish_dead_letter_envelope(
705 &self,
706 envelope: EventEnvelope<DeadLetterPayload>,
707 ) -> EventBusResult<()> {
708 let options = PublishOptions::empty()
709 .merge_defaults(self.default_publish_options::<DeadLetterPayload>());
710 self.publish_envelope_with_options_internal(envelope, options, true, false)
711 .map(|_outcome| ())
712 }
713
714 pub fn publish_all<T>(
729 &self,
730 envelopes: Vec<EventEnvelope<T>>,
731 ) -> EventBusResult<BatchPublishResult>
732 where
733 T: Clone + Send + Sync + 'static,
734 {
735 self.publish_all_with_options(envelopes, PublishOptions::empty())
736 }
737
738 pub fn publish_all_with_options<T>(
750 &self,
751 envelopes: Vec<EventEnvelope<T>>,
752 options: PublishOptions<T>,
753 ) -> EventBusResult<BatchPublishResult>
754 where
755 T: Clone + Send + Sync + 'static,
756 {
757 self.ensure_started()?;
758 let options = options.merge_defaults(self.default_publish_options::<T>());
759 validate_retry_options(options.retry_options())?;
760 let mut result = BatchPublishResult::new(envelopes.len());
761 for (index, envelope) in envelopes.into_iter().enumerate() {
762 let event_id = envelope.id().to_string();
763 match self.publish_envelope_with_options_internal(
764 envelope,
765 options.clone(),
766 false,
767 true,
768 ) {
769 Ok(PublishOutcome::Accepted) => result.record_accepted(),
770 Ok(PublishOutcome::Dropped) => result.record_dropped(),
771 Err(error) => {
772 result.record_failure(BatchPublishFailure::new(index, event_id, error));
773 }
774 }
775 }
776 Ok(result)
777 }
778
779 pub fn subscribe<T, S, F, R>(
792 &self,
793 subscriber_id: S,
794 topic: &Topic<T>,
795 handler: F,
796 ) -> EventBusResult<Subscription<T>>
797 where
798 T: Clone + Send + Sync + 'static,
799 S: Into<String>,
800 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
801 R: IntoEventBusResult + 'static,
802 {
803 self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
804 }
805
806 pub fn subscribe_with_options<T, S, F, R>(
821 &self,
822 subscriber_id: S,
823 topic: &Topic<T>,
824 handler: F,
825 options: SubscribeOptions<T>,
826 ) -> EventBusResult<Subscription<T>>
827 where
828 T: Clone + Send + Sync + 'static,
829 S: Into<String>,
830 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
831 R: IntoEventBusResult + 'static,
832 {
833 self.ensure_started()?;
834 let options = options.merge_defaults(self.default_subscribe_options::<T>());
835 let subscriber_id = subscriber_id.into();
836 if subscriber_id.trim().is_empty() {
837 return Err(EventBusError::invalid_argument(
838 "subscriber_id",
839 "subscriber ID must not be blank",
840 ));
841 }
842 validate_retry_options(options.retry_options())?;
843
844 let id = self.inner.next_subscription_id();
845 let active = Arc::new(SubscriptionState::active());
846 let topic_key = topic.key();
847 let handler = Arc::new(move |event| handler(event).into_event_bus_result());
848 let handler = self.apply_subscriber_interceptors(handler)?;
849 let entry = TypedSubscriptionEntry {
850 id,
851 subscriber_id: subscriber_id.clone(),
852 topic: topic.clone(),
853 active: Arc::clone(&active),
854 handler,
855 options: options.clone(),
856 };
857 self.inner
858 .add_subscription(topic_key.clone(), Arc::new(entry))?;
859
860 Ok(Subscription {
861 id,
862 subscriber_id,
863 topic: topic.clone(),
864 topic_key,
865 options,
866 active,
867 bus: Arc::downgrade(&self.inner),
868 })
869 }
870
871 pub fn add_dead_letter_handler<F, R>(
889 &self,
890 dead_letter_topic: &Topic<DeadLetterPayload>,
891 handler: F,
892 options: SubscribeOptions<DeadLetterPayload>,
893 ) -> EventBusResult<Subscription<DeadLetterPayload>>
894 where
895 F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
896 R: IntoEventBusResult + 'static,
897 {
898 self.subscribe_with_options(
899 format!("dead-letter:{}", dead_letter_topic.name()),
900 dead_letter_topic,
901 handler,
902 options,
903 )
904 }
905
906 pub fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
917 where
918 T: 'static,
919 {
920 self.inner.wait_for_idle(&topic.key())
921 }
922
923 pub fn wait_for_idle_timeout<T>(
936 &self,
937 topic: &Topic<T>,
938 timeout: Duration,
939 ) -> EventBusResult<bool>
940 where
941 T: 'static,
942 {
943 self.inner.wait_for_idle_timeout(&topic.key(), timeout)
944 }
945
946 fn default_publish_options<T>(&self) -> PublishOptions<T>
951 where
952 T: 'static,
953 {
954 self.inner
955 .default_publish_options::<T>()
956 .unwrap_or_else(PublishOptions::empty)
957 }
958
959 fn default_subscribe_options<T>(&self) -> SubscribeOptions<T>
964 where
965 T: 'static,
966 {
967 self.inner
968 .default_subscribe_options::<T>()
969 .unwrap_or_else(SubscribeOptions::empty)
970 }
971
972 fn ensure_started(&self) -> EventBusResult<()> {
980 if self.inner.is_started() {
981 Ok(())
982 } else {
983 Err(EventBusError::not_started())
984 }
985 }
986
987 fn observe_errors(&self, errors: Vec<EventBusError>) {
992 for error in errors {
993 self.inner.observe_error(&error);
994 }
995 }
996
997 fn assert_not_own_subscription_worker_for_blocking_shutdown(&self) {
999 let bus_id = local_event_bus_id(&self.inner);
1000 if is_current_subscription_worker_for_bus(bus_id) {
1001 panic!(
1002 "LocalEventBus::shutdown must not be called from this bus's subscriber worker; use shutdown_nonblocking or shutdown_with_timeout"
1003 );
1004 }
1005 }
1006
1007 fn apply_publisher_interceptors<T>(
1018 &self,
1019 envelope: EventEnvelope<T>,
1020 ) -> EventBusResult<Option<EventEnvelope<T>>>
1021 where
1022 T: Clone + Send + Sync + 'static,
1023 {
1024 let mut envelope = envelope;
1025 for interceptor in self.inner.global_publisher_interceptors()? {
1026 let metadata = envelope.metadata();
1027 let metadata =
1028 match panic::catch_unwind(AssertUnwindSafe(|| interceptor.on_publish(metadata))) {
1029 Ok(Ok(Some(metadata))) => metadata,
1030 Ok(Ok(None)) => return Ok(None),
1031 Ok(Err(error)) => {
1032 return Err(EventBusError::interceptor_failed(
1033 "publish",
1034 error.to_string(),
1035 ));
1036 }
1037 Err(_) => {
1038 return Err(EventBusError::interceptor_failed(
1039 "publish",
1040 "global publisher interceptor panicked",
1041 ));
1042 }
1043 };
1044 envelope.apply_metadata(metadata);
1045 }
1046 let interceptors = self.inner.publisher_interceptors()?;
1047 let mut current: Option<Box<dyn Any + Send>> = Some(Box::new(envelope));
1048 for interceptor in interceptors {
1049 if interceptor.payload_type_id() == TypeId::of::<T>()
1050 && let Some(boxed) = current.take()
1051 {
1052 current = interceptor.intercept(boxed)?;
1053 }
1054 }
1055 current
1056 .map(|boxed| {
1057 boxed
1058 .downcast::<EventEnvelope<T>>()
1059 .map(|envelope| *envelope)
1060 .map_err(|_| {
1061 EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1062 })
1063 })
1064 .transpose()
1065 }
1066
1067 fn dispatch_envelope<T>(
1082 &self,
1083 envelope: EventEnvelope<T>,
1084 retry_options: Option<&crate::RetryOptions>,
1085 allow_stopping: bool,
1086 ) -> EventBusResult<()>
1087 where
1088 T: Clone + Send + Sync + 'static,
1089 {
1090 if !allow_stopping {
1091 self.ensure_started()?;
1092 }
1093 let subscriptions = self.inner.subscriptions_for(&envelope.topic().key())?;
1094 let mut first_error = None;
1095 for subscription in subscriptions {
1096 let subscription = Arc::clone(&subscription);
1097 if let Err(error) = run_with_retry(retry_options, || {
1098 subscription.dispatch(
1099 Box::new(envelope.clone()),
1100 Arc::clone(&self.inner),
1101 allow_stopping,
1102 )
1103 }) && first_error.is_none()
1104 {
1105 first_error = Some(error);
1106 }
1107 }
1108 first_error.map_or(Ok(()), Err)
1109 }
1110
1111 fn apply_subscriber_interceptors<T>(
1122 &self,
1123 handler: Arc<HandlerFn<T>>,
1124 ) -> EventBusResult<Arc<HandlerFn<T>>>
1125 where
1126 T: Clone + Send + Sync + 'static,
1127 {
1128 let interceptors = self.inner.subscriber_interceptors()?;
1129 let mut chain = Box::new(handler) as Box<dyn Any + Send + Sync>;
1130 for interceptor in interceptors.into_iter().rev() {
1131 if interceptor.payload_type_id() == TypeId::of::<T>() {
1132 chain = interceptor.wrap_handler(chain)?;
1133 }
1134 }
1135 let handler = chain
1136 .downcast::<Arc<HandlerFn<T>>>()
1137 .map(|handler| *handler)
1138 .map_err(|_| {
1139 EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
1140 })?;
1141 self.apply_global_subscriber_interceptors(handler)
1142 }
1143
1144 fn apply_global_subscriber_interceptors<T>(
1152 &self,
1153 handler: Arc<HandlerFn<T>>,
1154 ) -> EventBusResult<Arc<HandlerFn<T>>>
1155 where
1156 T: Clone + Send + Sync + 'static,
1157 {
1158 let mut chain = handler;
1159 for interceptor in self
1160 .inner
1161 .global_subscriber_interceptors()?
1162 .into_iter()
1163 .rev()
1164 {
1165 let next = Arc::clone(&chain);
1166 chain = Arc::new(move |event: EventEnvelope<T>| {
1167 let metadata = event.metadata();
1168 let next = Arc::clone(&next);
1169 let event_for_next = event.clone();
1170 let downstream_error = create_downstream_error_slot();
1171 let chain = SubscriberInterceptorAnyChain::with_downstream_error(
1172 Arc::new(move || next(event_for_next.clone())),
1173 Arc::clone(&downstream_error),
1174 );
1175 let result = panic::catch_unwind(AssertUnwindSafe(|| {
1176 interceptor.on_consume(metadata, chain)
1177 }));
1178 normalize_subscriber_interceptor_result(
1179 result,
1180 &downstream_error,
1181 "global subscriber interceptor panicked",
1182 )
1183 });
1184 }
1185 Ok(chain)
1186 }
1187}
1188
1189impl Default for LocalEventBus {
1190 fn default() -> Self {
1192 Self::new()
1193 }
1194}
1195
1196impl crate::EventBus for LocalEventBus {
1197 fn start(&self) -> EventBusResult<bool> {
1199 Self::start(self)
1200 }
1201
1202 fn shutdown(&self) -> bool {
1204 Self::shutdown(self)
1205 }
1206
1207 fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
1209 where
1210 T: Clone + Send + Sync + 'static,
1211 {
1212 Self::publish(self, topic, payload)
1213 }
1214
1215 fn publish_with_options<T>(
1217 &self,
1218 topic: &Topic<T>,
1219 payload: T,
1220 options: PublishOptions<T>,
1221 ) -> EventBusResult<()>
1222 where
1223 T: Clone + Send + Sync + 'static,
1224 {
1225 Self::publish_with_options(self, topic, payload, options)
1226 }
1227
1228 fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
1230 where
1231 T: Clone + Send + Sync + 'static,
1232 {
1233 Self::publish_envelope(self, envelope)
1234 }
1235
1236 fn publish_envelope_with_options<T>(
1238 &self,
1239 envelope: EventEnvelope<T>,
1240 options: PublishOptions<T>,
1241 ) -> EventBusResult<()>
1242 where
1243 T: Clone + Send + Sync + 'static,
1244 {
1245 Self::publish_envelope_with_options(self, envelope, options)
1246 }
1247
1248 fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
1250 where
1251 T: Clone + Send + Sync + 'static,
1252 {
1253 Self::publish_all(self, envelopes)
1254 }
1255
1256 fn publish_all_with_options<T>(
1258 &self,
1259 envelopes: Vec<EventEnvelope<T>>,
1260 options: PublishOptions<T>,
1261 ) -> EventBusResult<BatchPublishResult>
1262 where
1263 T: Clone + Send + Sync + 'static,
1264 {
1265 Self::publish_all_with_options(self, envelopes, options)
1266 }
1267
1268 fn subscribe<T, S, F, R>(
1270 &self,
1271 subscriber_id: S,
1272 topic: &Topic<T>,
1273 handler: F,
1274 ) -> EventBusResult<Subscription<T>>
1275 where
1276 T: Clone + Send + Sync + 'static,
1277 S: Into<String>,
1278 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
1279 R: IntoEventBusResult + 'static,
1280 {
1281 Self::subscribe(self, subscriber_id, topic, handler)
1282 }
1283
1284 fn subscribe_with_options<T, S, F, R>(
1286 &self,
1287 subscriber_id: S,
1288 topic: &Topic<T>,
1289 handler: F,
1290 options: SubscribeOptions<T>,
1291 ) -> EventBusResult<Subscription<T>>
1292 where
1293 T: Clone + Send + Sync + 'static,
1294 S: Into<String>,
1295 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
1296 R: IntoEventBusResult + 'static,
1297 {
1298 Self::subscribe_with_options(self, subscriber_id, topic, handler, options)
1299 }
1300
1301 fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
1303 where
1304 T: 'static,
1305 {
1306 Self::wait_for_idle(self, topic)
1307 }
1308
1309 fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
1311 where
1312 T: 'static,
1313 {
1314 Self::wait_for_idle_timeout(self, topic, timeout)
1315 }
1316}
1317
1318struct TypedPublisherInterceptor<T: Clone + Send + Sync + 'static> {
1320 interceptor: Arc<PublisherInterceptorFn<T>>,
1321}
1322
1323pub(super) fn create_publisher_interceptor_entry<T, I>(
1331 interceptor: I,
1332) -> Arc<dyn PublisherInterceptorEntry>
1333where
1334 T: Clone + Send + Sync + 'static,
1335 I: PublisherInterceptor<T>,
1336{
1337 Arc::new(TypedPublisherInterceptor::<T> {
1338 interceptor: Arc::new(interceptor),
1339 })
1340}
1341
1342impl<T> PublisherInterceptorEntry for TypedPublisherInterceptor<T>
1343where
1344 T: Clone + Send + Sync + 'static,
1345{
1346 fn payload_type_id(&self) -> TypeId {
1348 TypeId::of::<T>()
1349 }
1350
1351 fn intercept(
1353 &self,
1354 envelope: Box<dyn Any + Send>,
1355 ) -> EventBusResult<Option<Box<dyn Any + Send>>> {
1356 let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
1357 EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1358 })?;
1359 match panic::catch_unwind(AssertUnwindSafe(|| self.interceptor.on_publish(*envelope))) {
1360 Ok(Ok(envelope)) => {
1361 Ok(envelope.map(|envelope| Box::new(envelope) as Box<dyn Any + Send>))
1362 }
1363 Ok(Err(error)) => Err(EventBusError::interceptor_failed(
1364 "publish",
1365 error.to_string(),
1366 )),
1367 Err(_) => Err(EventBusError::interceptor_failed(
1368 "publish",
1369 "publisher interceptor panicked",
1370 )),
1371 }
1372 }
1373}
1374
1375struct TypedSubscriberInterceptor<T: Clone + Send + Sync + 'static> {
1377 interceptor: Arc<SubscriberInterceptorFn<T>>,
1378}
1379
1380pub(super) fn create_subscriber_interceptor_entry<T, I>(
1388 interceptor: I,
1389) -> Arc<dyn SubscriberInterceptorEntry>
1390where
1391 T: Clone + Send + Sync + 'static,
1392 I: SubscriberInterceptor<T>,
1393{
1394 Arc::new(TypedSubscriberInterceptor::<T> {
1395 interceptor: Arc::new(interceptor),
1396 })
1397}
1398
1399impl<T> SubscriberInterceptorEntry for TypedSubscriberInterceptor<T>
1400where
1401 T: Clone + Send + Sync + 'static,
1402{
1403 fn payload_type_id(&self) -> TypeId {
1405 TypeId::of::<T>()
1406 }
1407
1408 fn wrap_handler(
1410 &self,
1411 handler: Box<dyn Any + Send + Sync>,
1412 ) -> EventBusResult<Box<dyn Any + Send + Sync>> {
1413 let next = handler.downcast::<Arc<HandlerFn<T>>>().map_err(|_| {
1414 EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
1415 })?;
1416 let next = *next;
1417 let interceptor = Arc::clone(&self.interceptor);
1418 let wrapped: Arc<HandlerFn<T>> = Arc::new(move |event| {
1419 let downstream_error = create_downstream_error_slot();
1420 let next_chain = SubscriberInterceptorChain::with_downstream_error(
1421 Arc::clone(&next),
1422 Arc::clone(&downstream_error),
1423 );
1424 let result = panic::catch_unwind(AssertUnwindSafe(|| {
1425 interceptor.on_consume(event, next_chain)
1426 }));
1427 normalize_subscriber_interceptor_result(
1428 result,
1429 &downstream_error,
1430 "subscriber interceptor panicked",
1431 )
1432 });
1433 Ok(Box::new(wrapped))
1434 }
1435}
1436
1437struct TypedSubscriptionEntry<T: Clone + Send + Sync + 'static> {
1439 id: usize,
1440 subscriber_id: String,
1441 topic: Topic<T>,
1442 active: Arc<SubscriptionState>,
1443 handler: Arc<HandlerFn<T>>,
1444 options: SubscribeOptions<T>,
1445}
1446
1447impl<T> ErasedSubscription for TypedSubscriptionEntry<T>
1448where
1449 T: Clone + Send + Sync + 'static,
1450{
1451 fn id(&self) -> usize {
1453 self.id
1454 }
1455
1456 fn priority(&self) -> i32 {
1458 self.options.priority()
1459 }
1460
1461 fn deactivate(&self) {
1463 self.active.deactivate();
1464 }
1465
1466 fn dispatch(
1468 &self,
1469 envelope: Box<dyn Any + Send>,
1470 bus: Arc<LocalEventBusInner>,
1471 allow_stopping: bool,
1472 ) -> EventBusResult<()> {
1473 if !self.active.is_active() {
1474 return Ok(());
1475 }
1476 let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
1477 EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
1478 })?;
1479 if !self.options.try_should_handle(&envelope)? {
1480 return Ok(());
1481 }
1482 let topic_key = self.topic.key();
1483 bus.start_processing(&topic_key)?;
1484 let ordering_lane_key = envelope
1485 .ordering_key()
1486 .map(|ordering_key| OrderingLaneKey::new(topic_key.clone(), ordering_key, self.id));
1487 let delay = envelope.delay();
1488
1489 let active = Arc::clone(&self.active);
1490 let delayed_active = Arc::clone(&self.active);
1491 let handler = Arc::clone(&self.handler);
1492 let options = self.options.clone();
1493 let subscriber_id = self.subscriber_id.clone();
1494 let event_bus = LocalEventBus {
1495 inner: Arc::clone(&bus),
1496 };
1497 let bus_id = local_event_bus_id(&bus);
1498 let processing_task = ProcessingTask::new(Arc::clone(&bus), topic_key, move || {
1499 let _worker_context = SubscriptionWorkerContext::enter(bus_id);
1500 if !active.is_active() {
1501 return;
1502 }
1503 process_subscription_event(
1504 active,
1505 handler,
1506 options,
1507 subscriber_id,
1508 *envelope,
1509 event_bus,
1510 );
1511 });
1512 if let Some(ordering_lane_key) = ordering_lane_key {
1513 if let Some(delay) = delay
1514 && !delay.is_zero()
1515 {
1516 bus.submit_delayed_ordered_processing_task(
1517 ordering_lane_key,
1518 processing_task,
1519 delay,
1520 delayed_active,
1521 allow_stopping,
1522 )
1523 } else {
1524 bus.submit_ordered_processing_task(
1525 ordering_lane_key,
1526 processing_task,
1527 allow_stopping,
1528 )
1529 }
1530 } else if let Some(delay) = delay
1531 && !delay.is_zero()
1532 {
1533 bus.submit_delayed_processing_task(
1534 processing_task,
1535 delay,
1536 delayed_active,
1537 allow_stopping,
1538 )
1539 } else {
1540 bus.submit_processing_task(move || processing_task.run(), allow_stopping)
1541 }
1542 }
1543}
1544
1545fn local_event_bus_id(inner: &Arc<LocalEventBusInner>) -> usize {
1553 Arc::as_ptr(inner) as usize
1554}
1555
1556fn is_current_subscription_worker_for_bus(bus_id: usize) -> bool {
1564 SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| bus_ids.borrow().contains(&bus_id))
1565}
1566
1567struct SubscriptionWorkerContext {
1569 bus_id: usize,
1570}
1571
1572impl SubscriptionWorkerContext {
1573 fn enter(bus_id: usize) -> Self {
1581 SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
1582 bus_ids.borrow_mut().push(bus_id);
1583 });
1584 Self { bus_id }
1585 }
1586}
1587
1588impl Drop for SubscriptionWorkerContext {
1589 fn drop(&mut self) {
1591 SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
1592 let mut bus_ids = bus_ids.borrow_mut();
1593 if let Some(position) = bus_ids.iter().rposition(|bus_id| *bus_id == self.bus_id) {
1594 bus_ids.remove(position);
1595 }
1596 });
1597 }
1598}
1599
1600fn process_subscription_event<T>(
1610 active: Arc<SubscriptionState>,
1611 handler: Arc<HandlerFn<T>>,
1612 options: SubscribeOptions<T>,
1613 subscriber_id: String,
1614 envelope: EventEnvelope<T>,
1615 event_bus: LocalEventBus,
1616) where
1617 T: Clone + Send + Sync + 'static,
1618{
1619 if !active.is_active() {
1620 return;
1621 }
1622 match run_handler_with_retry(&handler, &options, envelope) {
1623 Ok(delivery) => {
1624 if options.ack_mode() == AckMode::Auto && !delivery.acknowledgement.is_completed() {
1625 delivery.acknowledgement.ack();
1626 }
1627 }
1628 Err(failure) => {
1629 handle_subscription_failure(
1630 &options,
1631 &subscriber_id,
1632 &failure.delivery.delivered,
1633 &failure.error,
1634 &failure.delivery.acknowledgement,
1635 &event_bus,
1636 );
1637 }
1638 }
1639}
1640
1641fn handle_subscription_failure<T>(
1651 options: &SubscribeOptions<T>,
1652 subscriber_id: &str,
1653 delivered: &EventEnvelope<T>,
1654 error: &EventBusError,
1655 acknowledgement: &Acknowledgement,
1656 event_bus: &LocalEventBus,
1657) where
1658 T: Clone + Send + Sync + 'static,
1659{
1660 for error in options.notify_subscribe_error(subscriber_id, delivered, error, acknowledgement) {
1661 event_bus.inner.observe_error(&error);
1662 }
1663 if !acknowledgement.is_completed() {
1664 acknowledgement.nack();
1665 }
1666 if acknowledgement.is_nacked() && !delivered.is_dead_letter() {
1667 let dead_letter =
1668 create_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus);
1669 if let Some(dead_letter) = dead_letter
1670 && let Err(error) = event_bus.publish_dead_letter_envelope(dead_letter.as_dead_letter())
1671 {
1672 let observed = EventBusError::dead_letter_failed(error.to_string());
1673 event_bus.inner.observe_error(&observed);
1674 }
1675 }
1676}
1677
1678fn create_dead_letter_for_failure<T>(
1690 options: &SubscribeOptions<T>,
1691 subscriber_id: &str,
1692 delivered: &EventEnvelope<T>,
1693 error: &EventBusError,
1694 event_bus: &LocalEventBus,
1695) -> Option<EventEnvelope<DeadLetterPayload>>
1696where
1697 T: Clone + Send + Sync + 'static,
1698{
1699 if options.has_dead_letter_strategy() {
1700 match options.create_dead_letter(subscriber_id, delivered, error) {
1701 Ok(dead_letter) => dead_letter,
1702 Err(error) => {
1703 event_bus.inner.observe_error(&error);
1704 None
1705 }
1706 }
1707 } else {
1708 create_default_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus)
1709 }
1710}
1711
1712fn create_default_dead_letter_for_failure<T>(
1724 options: &SubscribeOptions<T>,
1725 subscriber_id: &str,
1726 delivered: &EventEnvelope<T>,
1727 error: &EventBusError,
1728 event_bus: &LocalEventBus,
1729) -> Option<EventEnvelope<DeadLetterPayload>>
1730where
1731 T: Clone + Send + Sync + 'static,
1732{
1733 if let Some(strategy) = event_bus.inner.default_dead_letter_strategy::<T>() {
1734 return match call_dead_letter_strategy(strategy, subscriber_id, delivered, error, options) {
1735 Ok(dead_letter) => dead_letter,
1736 Err(error) => {
1737 event_bus.inner.observe_error(&error);
1738 None
1739 }
1740 };
1741 }
1742 let strategy = event_bus.inner.global_default_dead_letter_strategy()?;
1743 match call_global_dead_letter_strategy(
1744 strategy,
1745 subscriber_id,
1746 delivered.metadata(),
1747 Arc::new(delivered.payload().clone()),
1748 error,
1749 ) {
1750 Ok(dead_letter) => dead_letter,
1751 Err(error) => {
1752 event_bus.inner.observe_error(&error);
1753 None
1754 }
1755 }
1756}
1757
1758fn call_dead_letter_strategy<T>(
1770 strategy: Arc<DeadLetterStrategyFn<T>>,
1771 subscriber_id: &str,
1772 delivered: &EventEnvelope<T>,
1773 error: &EventBusError,
1774 options: &SubscribeOptions<T>,
1775) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
1776where
1777 T: Clone + Send + Sync + 'static,
1778{
1779 match panic::catch_unwind(AssertUnwindSafe(|| {
1780 strategy.create_dead_letter(subscriber_id, delivered, error, options)
1781 })) {
1782 Ok(Ok(dead_letter)) => Ok(dead_letter),
1783 Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
1784 Err(_) => Err(EventBusError::dead_letter_failed(
1785 "default dead-letter strategy panicked",
1786 )),
1787 }
1788}
1789
1790fn call_global_dead_letter_strategy(
1802 strategy: Arc<DeadLetterStrategyAnyFn>,
1803 subscriber_id: &str,
1804 metadata: EventEnvelopeMetadata,
1805 original_payload: DeadLetterOriginalPayload,
1806 error: &EventBusError,
1807) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
1808 match panic::catch_unwind(AssertUnwindSafe(|| {
1809 strategy.create_dead_letter(subscriber_id, metadata, original_payload, error)
1810 })) {
1811 Ok(Ok(dead_letter)) => Ok(dead_letter),
1812 Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
1813 Err(_) => Err(EventBusError::dead_letter_failed(
1814 "global default dead-letter strategy panicked",
1815 )),
1816 }
1817}
1818
1819fn run_handler_with_retry<T>(
1829 handler: &Arc<HandlerFn<T>>,
1830 options: &SubscribeOptions<T>,
1831 envelope: EventEnvelope<T>,
1832) -> Result<HandlerDelivery<T>, Box<HandlerRunFailure<T>>>
1833where
1834 T: Clone + Send + Sync + 'static,
1835{
1836 let mut last_delivery = None;
1837 match run_with_retry(options.retry_options(), || {
1838 let delivery = HandlerDelivery::new(&envelope);
1839 last_delivery = Some(delivery.clone());
1840 call_handler(handler, delivery.delivered.clone())?;
1841 if delivery.acknowledgement.is_nacked() {
1842 Err(EventBusError::handler_failed("subscriber nacked the event"))
1843 } else {
1844 Ok(delivery)
1845 }
1846 }) {
1847 Ok(delivery) => Ok(delivery),
1848 Err(error) => {
1849 let delivery = match last_delivery {
1850 Some(delivery) => delivery,
1851 None => HandlerDelivery::new(&envelope),
1852 };
1853 Err(Box::new(HandlerRunFailure { error, delivery }))
1854 }
1855 }
1856}
1857
1858fn call_handler<T>(handler: &Arc<HandlerFn<T>>, envelope: EventEnvelope<T>) -> EventBusResult<()>
1867where
1868 T: Clone + Send + Sync + 'static,
1869{
1870 match panic::catch_unwind(AssertUnwindSafe(|| handler(envelope))) {
1871 Ok(result) => result,
1872 Err(_) => Err(EventBusError::handler_panicked()),
1873 }
1874}
1875
1876fn normalize_subscriber_interceptor_result(
1887 result: Result<EventBusResult<()>, Box<dyn Any + Send>>,
1888 downstream_error: &DownstreamErrorSlot,
1889 panic_message: &'static str,
1890) -> EventBusResult<()> {
1891 match result {
1892 Ok(Ok(())) => Ok(()),
1893 Ok(Err(error)) if is_recorded_downstream_error(downstream_error, &error) => Err(error),
1894 Ok(Err(error)) => Err(normalize_subscriber_interceptor_error(error)),
1895 Err(_) => Err(EventBusError::interceptor_failed(
1896 "subscribe",
1897 panic_message,
1898 )),
1899 }
1900}
1901
1902fn normalize_subscriber_interceptor_error(error: EventBusError) -> EventBusError {
1911 if matches!(
1912 &error,
1913 EventBusError::InterceptorFailed { phase, .. } if *phase == "subscribe"
1914 ) {
1915 error
1916 } else {
1917 EventBusError::interceptor_failed("subscribe", error.to_string())
1918 }
1919}
1920
1921fn run_with_retry<T, F>(
1930 retry_options: Option<&crate::RetryOptions>,
1931 operation: F,
1932) -> EventBusResult<T>
1933where
1934 F: FnMut() -> EventBusResult<T>,
1935{
1936 let Some(retry_options) = retry_options else {
1937 let mut operation = operation;
1938 return operation();
1939 };
1940 let retry = match qubit_retry::Retry::<EventBusError>::from_options(retry_options.clone()) {
1941 Ok(retry) => retry,
1942 Err(error) => {
1943 return Err(EventBusError::invalid_argument(
1944 "retry_options",
1945 error.to_string(),
1946 ));
1947 }
1948 };
1949 match retry.run(operation) {
1950 Ok(value) => Ok(value),
1951 Err(error) => match error.last_error().cloned() {
1952 Some(error) => Err(error),
1953 None => Err(EventBusError::handler_failed(error.to_string())),
1954 },
1955 }
1956}
1957
1958fn validate_retry_options(retry_options: Option<&crate::RetryOptions>) -> EventBusResult<()> {
1970 if retry_options
1971 .and_then(crate::RetryOptions::attempt_timeout)
1972 .is_some()
1973 {
1974 return Err(EventBusError::invalid_argument(
1975 "retry_options",
1976 "attempt_timeout is not supported by LocalEventBus retry handling",
1977 ));
1978 }
1979 Ok(())
1980}
1981
1982fn wait_for_executor_termination(executor: &FixedThreadPool) {
1987 while !executor.is_terminated() {
1988 thread::sleep(Duration::from_millis(1));
1989 }
1990}
1991
1992fn wait_for_executor_termination_timeout(executor: &FixedThreadPool, timeout: Duration) -> bool {
2001 let started_at = Instant::now();
2002 while !executor.is_terminated() {
2003 let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
2004 return false;
2005 };
2006 thread::sleep(remaining.min(Duration::from_millis(1)));
2007 }
2008 true
2009}
2010
2011fn wait_for_delay_scheduler_termination(scheduler: &SingleThreadScheduledExecutorService) {
2016 while !scheduler.is_terminated() {
2017 thread::sleep(Duration::from_millis(1));
2018 }
2019}
2020
2021fn wait_for_delay_scheduler_termination_timeout(
2030 scheduler: &SingleThreadScheduledExecutorService,
2031 timeout: Duration,
2032) -> bool {
2033 let started_at = Instant::now();
2034 while !scheduler.is_terminated() {
2035 let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
2036 return false;
2037 };
2038 thread::sleep(remaining.min(Duration::from_millis(1)));
2039 }
2040 true
2041}
2042
2043fn remaining_shutdown_timeout(started_at: Instant, timeout: Duration) -> Option<Duration> {
2052 timeout.checked_sub(started_at.elapsed())
2053}
2054
2055fn default_subscription_handler_pool_size() -> usize {
2060 thread::available_parallelism()
2061 .map(usize::from)
2062 .unwrap_or(1)
2063}