#[cfg(coverage)]
mod coverage;
use std::any::{
Any,
TypeId,
type_name,
};
use std::cell::RefCell;
use std::collections::HashMap;
use std::panic::{
self,
AssertUnwindSafe,
};
use std::sync::Arc;
use std::thread;
use std::time::{
Duration,
Instant,
};
use qubit_thread_pool::{
DelayedTaskScheduler,
ExecutorService,
FixedThreadPool,
};
use crate::core::SubscriptionState;
use crate::core::subscribe_options::{
DeadLetterStrategyAnyFn,
DeadLetterStrategyFn,
normalize_dead_letter_error,
};
use crate::{
AckMode,
Acknowledgement,
BatchPublishFailure,
BatchPublishResult,
DeadLetterOriginalPayload,
DeadLetterPayload,
EventBusError,
EventBusResult,
EventEnvelope,
EventEnvelopeMetadata,
IntoEventBusResult,
PublishOptions,
SubscribeOptions,
Subscription,
Topic,
};
use super::erased_subscription::ErasedSubscription;
use super::local_event_bus_inner::{
LocalEventBusInner,
LocalEventBusRuntimeOptions,
};
use super::ordering_lane_key::OrderingLaneKey;
use super::processing_task::ProcessingTask;
use super::publisher_interceptor_entry::PublisherInterceptorEntry;
use super::subscriber_interceptor_chain::{
SubscriberInterceptorAnyChain,
SubscriberInterceptorChain,
};
use super::subscriber_interceptor_entry::SubscriberInterceptorEntry;
#[cfg(coverage)]
pub use coverage::coverage_exercise_local_event_bus_defensive_paths;
type HandlerFn<T> = dyn Fn(EventEnvelope<T>) -> EventBusResult<()> + Send + Sync + 'static;
type PublisherInterceptorFn<T> = dyn PublisherInterceptor<T>;
type SubscriberInterceptorFn<T> = dyn SubscriberInterceptor<T>;
thread_local! {
static SUBSCRIPTION_WORKER_BUS_IDS: RefCell<Vec<usize>> = const { RefCell::new(Vec::new()) };
}
#[derive(Clone)]
struct HandlerDelivery<T: Clone + Send + Sync + 'static> {
delivered: EventEnvelope<T>,
acknowledgement: Acknowledgement,
}
impl<T> HandlerDelivery<T>
where
T: Clone + Send + Sync + 'static,
{
fn new(envelope: &EventEnvelope<T>) -> Self {
let acknowledgement = Acknowledgement::new();
let delivered = envelope
.clone()
.with_acknowledgement(acknowledgement.clone());
Self {
delivered,
acknowledgement,
}
}
}
struct HandlerRunFailure<T: Clone + Send + Sync + 'static> {
error: EventBusError,
delivery: HandlerDelivery<T>,
}
enum PublishOutcome {
Accepted,
Dropped,
}
pub trait IntoPublisherInterceptorResult<T: Clone + Send + Sync + 'static> {
fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>>;
}
impl<T> IntoPublisherInterceptorResult<T> for EventEnvelope<T>
where
T: Clone + Send + Sync + 'static,
{
fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
Ok(Some(self))
}
}
impl<T> IntoPublisherInterceptorResult<T> for Option<EventEnvelope<T>>
where
T: Clone + Send + Sync + 'static,
{
fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
Ok(self)
}
}
impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<EventEnvelope<T>>
where
T: Clone + Send + Sync + 'static,
{
fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
self.map(Some)
}
}
impl<T> IntoPublisherInterceptorResult<T> for EventBusResult<Option<EventEnvelope<T>>>
where
T: Clone + Send + Sync + 'static,
{
fn into_publisher_interceptor_result(self) -> EventBusResult<Option<EventEnvelope<T>>> {
self
}
}
pub trait PublisherInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>>;
}
impl<T, F, R> PublisherInterceptor<T> for F
where
T: Clone + Send + Sync + 'static,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoPublisherInterceptorResult<T> + 'static,
{
fn on_publish(&self, envelope: EventEnvelope<T>) -> EventBusResult<Option<EventEnvelope<T>>> {
self(envelope).into_publisher_interceptor_result()
}
}
pub trait IntoPublisherInterceptorAnyResult {
fn into_publisher_interceptor_any_result(self)
-> EventBusResult<Option<EventEnvelopeMetadata>>;
}
impl IntoPublisherInterceptorAnyResult for EventEnvelopeMetadata {
fn into_publisher_interceptor_any_result(
self,
) -> EventBusResult<Option<EventEnvelopeMetadata>> {
Ok(Some(self))
}
}
impl IntoPublisherInterceptorAnyResult for Option<EventEnvelopeMetadata> {
fn into_publisher_interceptor_any_result(
self,
) -> EventBusResult<Option<EventEnvelopeMetadata>> {
Ok(self)
}
}
impl IntoPublisherInterceptorAnyResult for EventBusResult<EventEnvelopeMetadata> {
fn into_publisher_interceptor_any_result(
self,
) -> EventBusResult<Option<EventEnvelopeMetadata>> {
self.map(Some)
}
}
impl IntoPublisherInterceptorAnyResult for EventBusResult<Option<EventEnvelopeMetadata>> {
fn into_publisher_interceptor_any_result(
self,
) -> EventBusResult<Option<EventEnvelopeMetadata>> {
self
}
}
pub trait PublisherInterceptorAny: Send + Sync + 'static {
fn on_publish(
&self,
metadata: EventEnvelopeMetadata,
) -> EventBusResult<Option<EventEnvelopeMetadata>>;
}
impl<F, R> PublisherInterceptorAny for F
where
F: Fn(EventEnvelopeMetadata) -> R + Send + Sync + 'static,
R: IntoPublisherInterceptorAnyResult + 'static,
{
fn on_publish(
&self,
metadata: EventEnvelopeMetadata,
) -> EventBusResult<Option<EventEnvelopeMetadata>> {
self(metadata).into_publisher_interceptor_any_result()
}
}
pub trait SubscriberInterceptor<T: Clone + Send + Sync + 'static>: Send + Sync + 'static {
fn on_consume(
&self,
envelope: EventEnvelope<T>,
chain: SubscriberInterceptorChain<T>,
) -> EventBusResult<()>;
}
impl<T, F, R> SubscriberInterceptor<T> for F
where
T: Clone + Send + Sync + 'static,
F: Fn(EventEnvelope<T>, SubscriberInterceptorChain<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
fn on_consume(
&self,
envelope: EventEnvelope<T>,
chain: SubscriberInterceptorChain<T>,
) -> EventBusResult<()> {
self(envelope, chain).into_event_bus_result()
}
}
pub trait SubscriberInterceptorAny: Send + Sync + 'static {
fn on_consume(
&self,
metadata: EventEnvelopeMetadata,
chain: SubscriberInterceptorAnyChain,
) -> EventBusResult<()>;
}
impl<F, R> SubscriberInterceptorAny for F
where
F: Fn(EventEnvelopeMetadata, SubscriberInterceptorAnyChain) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
fn on_consume(
&self,
metadata: EventEnvelopeMetadata,
chain: SubscriberInterceptorAnyChain,
) -> EventBusResult<()> {
self(metadata, chain).into_event_bus_result()
}
}
#[derive(Clone)]
pub struct LocalEventBus {
pub(crate) inner: Arc<LocalEventBusInner>,
}
impl LocalEventBus {
pub fn new() -> Self {
Self::with_runtime_options(LocalEventBusRuntimeOptions {
default_publish_options: HashMap::new(),
default_subscribe_options: HashMap::new(),
default_dead_letter_strategies: HashMap::new(),
global_default_dead_letter_strategy: None,
global_publisher_interceptors: Vec::new(),
global_subscriber_interceptors: Vec::new(),
publisher_interceptors: Vec::new(),
subscriber_interceptors: Vec::new(),
subscription_handler_pool_size: default_subscription_handler_pool_size(),
subscription_handler_queue_capacity: None,
})
}
pub fn started() -> EventBusResult<Self> {
let bus = Self::new();
bus.start()?;
Ok(bus)
}
pub(crate) fn with_runtime_options(options: LocalEventBusRuntimeOptions) -> Self {
Self {
inner: Arc::new(LocalEventBusInner::new(options)),
}
}
pub fn start(&self) -> EventBusResult<bool> {
self.inner.mark_started()
}
pub fn shutdown(&self) -> bool {
self.assert_not_own_subscription_worker_for_blocking_shutdown();
if !self.inner.mark_stopping() {
return false;
}
let _ = self.inner.wait_for_all_idle();
if let Some(executor) = self.inner.take_executor() {
executor.shutdown();
wait_for_executor_termination(&executor);
}
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
wait_for_delay_scheduler_termination(&delay_scheduler);
}
self.inner.clear_subscriptions();
true
}
pub fn shutdown_nonblocking(&self) -> bool {
let Some(executor) = self.inner.mark_stopped() else {
return false;
};
executor.shutdown();
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
}
self.inner.clear_subscriptions();
true
}
pub fn shutdown_with_timeout(&self, timeout: Duration) -> EventBusResult<bool> {
let started_at = Instant::now();
if !self.inner.mark_stopping() {
return Ok(false);
}
let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
self.inner.clear_subscriptions();
if let Some(executor) = self.inner.take_executor() {
executor.shutdown();
}
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
}
return Err(EventBusError::shutdown_timed_out(timeout));
};
if !self.inner.wait_for_all_idle_timeout(remaining)? {
self.inner.clear_subscriptions();
if let Some(executor) = self.inner.take_executor() {
executor.shutdown();
}
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
}
return Err(EventBusError::shutdown_timed_out(timeout));
}
let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
self.inner.clear_subscriptions();
if let Some(executor) = self.inner.take_executor() {
executor.shutdown();
}
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
}
return Err(EventBusError::shutdown_timed_out(timeout));
};
let Some(executor) = self.inner.take_executor() else {
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
}
self.inner.clear_subscriptions();
return Ok(true);
};
executor.shutdown();
if let Some(delay_scheduler) = self.inner.take_delay_scheduler() {
delay_scheduler.shutdown();
if !wait_for_delay_scheduler_termination_timeout(&delay_scheduler, remaining) {
self.inner.clear_subscriptions();
return Err(EventBusError::shutdown_timed_out(timeout));
}
}
let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
self.inner.clear_subscriptions();
return Err(EventBusError::shutdown_timed_out(timeout));
};
if !wait_for_executor_termination_timeout(&executor, remaining) {
self.inner.clear_subscriptions();
return Err(EventBusError::shutdown_timed_out(timeout));
}
self.inner.clear_subscriptions();
Ok(true)
}
pub fn add_error_observer<F>(&self, observer: F) -> EventBusResult<()>
where
F: Fn(&EventBusError) + Send + Sync + 'static,
{
self.inner.add_error_observer(Arc::new(observer))
}
pub fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_with_options(topic, payload, PublishOptions::empty())
}
pub fn publish_with_options<T>(
&self,
topic: &Topic<T>,
payload: T,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
}
pub fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_envelope_with_options(envelope, PublishOptions::empty())
}
pub fn publish_envelope_with_options<T>(
&self,
envelope: EventEnvelope<T>,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
let options = options.merge_defaults(self.default_publish_options::<T>());
self.publish_envelope_with_options_internal(envelope, options, false, true)
.map(|_outcome| ())
}
fn publish_envelope_with_options_internal<T>(
&self,
envelope: EventEnvelope<T>,
options: PublishOptions<T>,
allow_stopping: bool,
require_started: bool,
) -> EventBusResult<PublishOutcome>
where
T: Clone + Send + Sync + 'static,
{
if let Err(error) = self.ensure_started()
&& require_started
{
self.observe_errors(options.notify_publish_error(&envelope, &error));
return Err(error);
}
if let Err(error) = validate_retry_options(options.retry_options()) {
self.observe_errors(options.notify_publish_error(&envelope, &error));
return Err(error);
}
let original_envelope = envelope.clone();
let envelope = match run_with_retry(options.retry_options(), || {
self.apply_publisher_interceptors(original_envelope.clone())
}) {
Ok(Some(envelope)) => envelope,
Ok(None) => return Ok(PublishOutcome::Dropped),
Err(error) => {
self.inner.observe_error(&error);
self.observe_errors(options.notify_publish_error(&original_envelope, &error));
return Err(error);
}
};
if let Err(error) =
self.dispatch_envelope(envelope.clone(), options.retry_options(), allow_stopping)
{
self.observe_errors(options.notify_publish_error(&envelope, &error));
return Err(error);
}
Ok(PublishOutcome::Accepted)
}
fn publish_dead_letter_envelope(
&self,
envelope: EventEnvelope<DeadLetterPayload>,
) -> EventBusResult<()> {
let options = PublishOptions::empty()
.merge_defaults(self.default_publish_options::<DeadLetterPayload>());
self.publish_envelope_with_options_internal(envelope, options, true, false)
.map(|_outcome| ())
}
pub fn publish_all<T>(
&self,
envelopes: Vec<EventEnvelope<T>>,
) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
self.publish_all_with_options(envelopes, PublishOptions::empty())
}
pub fn publish_all_with_options<T>(
&self,
envelopes: Vec<EventEnvelope<T>>,
options: PublishOptions<T>,
) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
self.ensure_started()?;
validate_retry_options(options.retry_options())?;
let mut result = BatchPublishResult::new(envelopes.len());
for (index, envelope) in envelopes.into_iter().enumerate() {
let event_id = envelope.id().to_string();
match self.publish_envelope_with_options_internal(
envelope,
options
.clone()
.merge_defaults(self.default_publish_options::<T>()),
false,
true,
) {
Ok(PublishOutcome::Accepted) => result.record_accepted(),
Ok(PublishOutcome::Dropped) => result.record_dropped(),
Err(error) => {
result.record_failure(BatchPublishFailure::new(index, event_id, error));
}
}
}
Ok(result)
}
pub fn subscribe<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
}
pub fn subscribe_with_options<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
options: SubscribeOptions<T>,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
self.ensure_started()?;
let options = options.merge_defaults(self.default_subscribe_options::<T>());
let subscriber_id = subscriber_id.into();
if subscriber_id.trim().is_empty() {
return Err(EventBusError::invalid_argument(
"subscriber_id",
"subscriber ID must not be blank",
));
}
validate_retry_options(options.retry_options())?;
let id = self.inner.next_subscription_id();
let active = Arc::new(SubscriptionState::active());
let topic_key = topic.key();
let handler = Arc::new(move |event| handler(event).into_event_bus_result());
let handler = self.apply_subscriber_interceptors(handler)?;
let entry = TypedSubscriptionEntry {
id,
subscriber_id: subscriber_id.clone(),
topic: topic.clone(),
active: Arc::clone(&active),
handler,
options: options.clone(),
};
self.inner
.add_subscription(topic_key.clone(), Arc::new(entry))?;
Ok(Subscription {
id,
subscriber_id,
topic: topic.clone(),
topic_key,
options,
active,
bus: Arc::downgrade(&self.inner),
})
}
pub fn add_dead_letter_handler<F, R>(
&self,
dead_letter_topic: &Topic<DeadLetterPayload>,
handler: F,
options: SubscribeOptions<DeadLetterPayload>,
) -> EventBusResult<Subscription<DeadLetterPayload>>
where
F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
self.subscribe_with_options(
format!("dead-letter:{}", dead_letter_topic.name()),
dead_letter_topic,
handler,
options,
)
}
pub fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
where
T: 'static,
{
self.inner.wait_for_idle(&topic.key())
}
pub fn wait_for_idle_timeout<T>(
&self,
topic: &Topic<T>,
timeout: Duration,
) -> EventBusResult<bool>
where
T: 'static,
{
self.inner.wait_for_idle_timeout(&topic.key(), timeout)
}
fn default_publish_options<T>(&self) -> PublishOptions<T>
where
T: 'static,
{
self.inner
.default_publish_options::<T>()
.unwrap_or_else(PublishOptions::empty)
}
fn default_subscribe_options<T>(&self) -> SubscribeOptions<T>
where
T: 'static,
{
self.inner
.default_subscribe_options::<T>()
.unwrap_or_else(SubscribeOptions::empty)
}
fn ensure_started(&self) -> EventBusResult<()> {
if self.inner.is_started() {
Ok(())
} else {
Err(EventBusError::not_started())
}
}
fn observe_errors(&self, errors: Vec<EventBusError>) {
for error in errors {
self.inner.observe_error(&error);
}
}
fn assert_not_own_subscription_worker_for_blocking_shutdown(&self) {
let bus_id = local_event_bus_id(&self.inner);
if is_current_subscription_worker_for_bus(bus_id) {
panic!(
"LocalEventBus::shutdown must not be called from this bus's subscriber worker; use shutdown_nonblocking or shutdown_with_timeout"
);
}
}
fn apply_publisher_interceptors<T>(
&self,
envelope: EventEnvelope<T>,
) -> EventBusResult<Option<EventEnvelope<T>>>
where
T: Clone + Send + Sync + 'static,
{
let mut envelope = envelope;
for interceptor in self.inner.global_publisher_interceptors()? {
let metadata = envelope.metadata();
let metadata =
match panic::catch_unwind(AssertUnwindSafe(|| interceptor.on_publish(metadata))) {
Ok(Ok(Some(metadata))) => metadata,
Ok(Ok(None)) => return Ok(None),
Ok(Err(error)) => {
return Err(EventBusError::interceptor_failed(
"publish",
error.to_string(),
));
}
Err(_) => {
return Err(EventBusError::interceptor_failed(
"publish",
"global publisher interceptor panicked",
));
}
};
envelope.apply_metadata(metadata);
}
let interceptors = self.inner.publisher_interceptors()?;
let mut current: Option<Box<dyn Any + Send>> = Some(Box::new(envelope));
for interceptor in interceptors {
if interceptor.payload_type_id() == TypeId::of::<T>()
&& let Some(boxed) = current.take()
{
current = interceptor.intercept(boxed)?;
}
}
current
.map(|boxed| {
boxed
.downcast::<EventEnvelope<T>>()
.map(|envelope| *envelope)
.map_err(|_| {
EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
})
})
.transpose()
}
fn dispatch_envelope<T>(
&self,
envelope: EventEnvelope<T>,
retry_options: Option<&crate::RetryOptions>,
allow_stopping: bool,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
if !allow_stopping {
self.ensure_started()?;
}
let subscriptions = self.inner.subscriptions_for(&envelope.topic().key())?;
let mut first_error = None;
for subscription in subscriptions {
let subscription = Arc::clone(&subscription);
if let Err(error) = run_with_retry(retry_options, || {
subscription.dispatch(
Box::new(envelope.clone()),
Arc::clone(&self.inner),
allow_stopping,
)
}) && first_error.is_none()
{
first_error = Some(error);
}
}
first_error.map_or(Ok(()), Err)
}
fn apply_subscriber_interceptors<T>(
&self,
handler: Arc<HandlerFn<T>>,
) -> EventBusResult<Arc<HandlerFn<T>>>
where
T: Clone + Send + Sync + 'static,
{
let interceptors = self.inner.subscriber_interceptors()?;
let mut chain = Box::new(handler) as Box<dyn Any + Send + Sync>;
for interceptor in interceptors.into_iter().rev() {
if interceptor.payload_type_id() == TypeId::of::<T>() {
chain = interceptor.wrap_handler(chain)?;
}
}
let handler = chain
.downcast::<Arc<HandlerFn<T>>>()
.map(|handler| *handler)
.map_err(|_| {
EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
})?;
self.apply_global_subscriber_interceptors(handler)
}
fn apply_global_subscriber_interceptors<T>(
&self,
handler: Arc<HandlerFn<T>>,
) -> EventBusResult<Arc<HandlerFn<T>>>
where
T: Clone + Send + Sync + 'static,
{
let mut chain = handler;
for interceptor in self
.inner
.global_subscriber_interceptors()?
.into_iter()
.rev()
{
let next = Arc::clone(&chain);
chain = Arc::new(move |event: EventEnvelope<T>| {
let metadata = event.metadata();
let next = Arc::clone(&next);
let event_for_next = event.clone();
let chain = SubscriberInterceptorAnyChain::new(Arc::new(move || {
next(event_for_next.clone())
}));
match panic::catch_unwind(AssertUnwindSafe(|| {
interceptor.on_consume(metadata, chain)
})) {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => Err(EventBusError::interceptor_failed(
"subscribe",
error.to_string(),
)),
Err(_) => Err(EventBusError::interceptor_failed(
"subscribe",
"global subscriber interceptor panicked",
)),
}
});
}
Ok(chain)
}
}
impl Default for LocalEventBus {
fn default() -> Self {
Self::new()
}
}
impl crate::EventBus for LocalEventBus {
fn start(&self) -> EventBusResult<bool> {
Self::start(self)
}
fn shutdown(&self) -> bool {
Self::shutdown(self)
}
fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
Self::publish(self, topic, payload)
}
fn publish_with_options<T>(
&self,
topic: &Topic<T>,
payload: T,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
Self::publish_with_options(self, topic, payload, options)
}
fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
Self::publish_envelope(self, envelope)
}
fn publish_envelope_with_options<T>(
&self,
envelope: EventEnvelope<T>,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
Self::publish_envelope_with_options(self, envelope, options)
}
fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
Self::publish_all(self, envelopes)
}
fn publish_all_with_options<T>(
&self,
envelopes: Vec<EventEnvelope<T>>,
options: PublishOptions<T>,
) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
Self::publish_all_with_options(self, envelopes, options)
}
fn subscribe<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
Self::subscribe(self, subscriber_id, topic, handler)
}
fn subscribe_with_options<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
options: SubscribeOptions<T>,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
Self::subscribe_with_options(self, subscriber_id, topic, handler, options)
}
fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
where
T: 'static,
{
Self::wait_for_idle(self, topic)
}
fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
where
T: 'static,
{
Self::wait_for_idle_timeout(self, topic, timeout)
}
}
struct TypedPublisherInterceptor<T: Clone + Send + Sync + 'static> {
interceptor: Arc<PublisherInterceptorFn<T>>,
}
pub(super) fn create_publisher_interceptor_entry<T, I>(
interceptor: I,
) -> Arc<dyn PublisherInterceptorEntry>
where
T: Clone + Send + Sync + 'static,
I: PublisherInterceptor<T>,
{
Arc::new(TypedPublisherInterceptor::<T> {
interceptor: Arc::new(interceptor),
})
}
impl<T> PublisherInterceptorEntry for TypedPublisherInterceptor<T>
where
T: Clone + Send + Sync + 'static,
{
fn payload_type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn intercept(
&self,
envelope: Box<dyn Any + Send>,
) -> EventBusResult<Option<Box<dyn Any + Send>>> {
let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
})?;
match panic::catch_unwind(AssertUnwindSafe(|| self.interceptor.on_publish(*envelope))) {
Ok(Ok(envelope)) => {
Ok(envelope.map(|envelope| Box::new(envelope) as Box<dyn Any + Send>))
}
Ok(Err(error)) => Err(EventBusError::interceptor_failed(
"publish",
error.to_string(),
)),
Err(_) => Err(EventBusError::interceptor_failed(
"publish",
"publisher interceptor panicked",
)),
}
}
}
struct TypedSubscriberInterceptor<T: Clone + Send + Sync + 'static> {
interceptor: Arc<SubscriberInterceptorFn<T>>,
}
pub(super) fn create_subscriber_interceptor_entry<T, I>(
interceptor: I,
) -> Arc<dyn SubscriberInterceptorEntry>
where
T: Clone + Send + Sync + 'static,
I: SubscriberInterceptor<T>,
{
Arc::new(TypedSubscriberInterceptor::<T> {
interceptor: Arc::new(interceptor),
})
}
impl<T> SubscriberInterceptorEntry for TypedSubscriberInterceptor<T>
where
T: Clone + Send + Sync + 'static,
{
fn payload_type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn wrap_handler(
&self,
handler: Box<dyn Any + Send + Sync>,
) -> EventBusResult<Box<dyn Any + Send + Sync>> {
let next = handler.downcast::<Arc<HandlerFn<T>>>().map_err(|_| {
EventBusError::type_mismatch(type_name::<Arc<HandlerFn<T>>>(), "unknown")
})?;
let next = *next;
let interceptor = Arc::clone(&self.interceptor);
let wrapped: Arc<HandlerFn<T>> = Arc::new(move |event| {
let next_chain = SubscriberInterceptorChain::new(Arc::clone(&next));
interceptor.on_consume(event, next_chain)
});
Ok(Box::new(wrapped))
}
}
struct TypedSubscriptionEntry<T: Clone + Send + Sync + 'static> {
id: usize,
subscriber_id: String,
topic: Topic<T>,
active: Arc<SubscriptionState>,
handler: Arc<HandlerFn<T>>,
options: SubscribeOptions<T>,
}
impl<T> ErasedSubscription for TypedSubscriptionEntry<T>
where
T: Clone + Send + Sync + 'static,
{
fn id(&self) -> usize {
self.id
}
fn priority(&self) -> i32 {
self.options.priority()
}
fn deactivate(&self) {
self.active.deactivate();
}
fn dispatch(
&self,
envelope: Box<dyn Any + Send>,
bus: Arc<LocalEventBusInner>,
allow_stopping: bool,
) -> EventBusResult<()> {
if !self.active.is_active() {
return Ok(());
}
let envelope = envelope.downcast::<EventEnvelope<T>>().map_err(|_| {
EventBusError::type_mismatch(type_name::<EventEnvelope<T>>(), "unknown")
})?;
if !self.options.try_should_handle(&envelope)? {
return Ok(());
}
let topic_key = self.topic.key();
bus.start_processing(&topic_key)?;
let ordering_lane_key = envelope
.ordering_key()
.map(|ordering_key| OrderingLaneKey::new(topic_key.clone(), ordering_key, self.id));
let delay = envelope.delay();
let active = Arc::clone(&self.active);
let delayed_active = Arc::clone(&self.active);
let handler = Arc::clone(&self.handler);
let options = self.options.clone();
let subscriber_id = self.subscriber_id.clone();
let event_bus = LocalEventBus {
inner: Arc::clone(&bus),
};
let bus_id = local_event_bus_id(&bus);
let processing_task = ProcessingTask::new(Arc::clone(&bus), topic_key, move || {
let _worker_context = SubscriptionWorkerContext::enter(bus_id);
if !active.is_active() {
return;
}
process_subscription_event(
active,
handler,
options,
subscriber_id,
*envelope,
event_bus,
);
});
if let Some(ordering_lane_key) = ordering_lane_key {
if let Some(delay) = delay
&& !delay.is_zero()
{
bus.submit_delayed_ordered_processing_task(
ordering_lane_key,
processing_task,
delay,
delayed_active,
allow_stopping,
)
} else {
bus.submit_ordered_processing_task(
ordering_lane_key,
processing_task,
allow_stopping,
)
}
} else if let Some(delay) = delay
&& !delay.is_zero()
{
bus.submit_delayed_processing_task(
processing_task,
delay,
delayed_active,
allow_stopping,
)
} else {
bus.submit_processing_task(move || processing_task.run(), allow_stopping)
}
}
}
fn local_event_bus_id(inner: &Arc<LocalEventBusInner>) -> usize {
Arc::as_ptr(inner) as usize
}
fn is_current_subscription_worker_for_bus(bus_id: usize) -> bool {
SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| bus_ids.borrow().contains(&bus_id))
}
struct SubscriptionWorkerContext {
bus_id: usize,
}
impl SubscriptionWorkerContext {
fn enter(bus_id: usize) -> Self {
SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
bus_ids.borrow_mut().push(bus_id);
});
Self { bus_id }
}
}
impl Drop for SubscriptionWorkerContext {
fn drop(&mut self) {
SUBSCRIPTION_WORKER_BUS_IDS.with(|bus_ids| {
let mut bus_ids = bus_ids.borrow_mut();
if let Some(position) = bus_ids.iter().rposition(|bus_id| *bus_id == self.bus_id) {
bus_ids.remove(position);
}
});
}
}
fn process_subscription_event<T>(
active: Arc<SubscriptionState>,
handler: Arc<HandlerFn<T>>,
options: SubscribeOptions<T>,
subscriber_id: String,
envelope: EventEnvelope<T>,
event_bus: LocalEventBus,
) where
T: Clone + Send + Sync + 'static,
{
if !active.is_active() {
return;
}
match run_handler_with_retry(&handler, &options, envelope) {
Ok(delivery) => {
if options.ack_mode() == AckMode::Auto && !delivery.acknowledgement.is_completed() {
delivery.acknowledgement.ack();
}
}
Err(failure) => {
handle_subscription_failure(
&options,
&subscriber_id,
&failure.delivery.delivered,
&failure.error,
&failure.delivery.acknowledgement,
&event_bus,
);
}
}
}
fn handle_subscription_failure<T>(
options: &SubscribeOptions<T>,
subscriber_id: &str,
delivered: &EventEnvelope<T>,
error: &EventBusError,
acknowledgement: &Acknowledgement,
event_bus: &LocalEventBus,
) where
T: Clone + Send + Sync + 'static,
{
for error in options.notify_subscribe_error(subscriber_id, delivered, error, acknowledgement) {
event_bus.inner.observe_error(&error);
}
if !acknowledgement.is_completed() {
acknowledgement.nack();
}
if acknowledgement.is_nacked() && !delivered.is_dead_letter() {
let dead_letter =
create_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus);
if let Some(dead_letter) = dead_letter
&& let Err(error) = event_bus.publish_dead_letter_envelope(dead_letter.as_dead_letter())
{
let observed = EventBusError::dead_letter_failed(error.to_string());
event_bus.inner.observe_error(&observed);
}
}
}
fn create_dead_letter_for_failure<T>(
options: &SubscribeOptions<T>,
subscriber_id: &str,
delivered: &EventEnvelope<T>,
error: &EventBusError,
event_bus: &LocalEventBus,
) -> Option<EventEnvelope<DeadLetterPayload>>
where
T: Clone + Send + Sync + 'static,
{
if options.has_dead_letter_strategy() {
match options.create_dead_letter(subscriber_id, delivered, error) {
Ok(dead_letter) => dead_letter,
Err(error) => {
event_bus.inner.observe_error(&error);
None
}
}
} else {
create_default_dead_letter_for_failure(options, subscriber_id, delivered, error, event_bus)
}
}
fn create_default_dead_letter_for_failure<T>(
options: &SubscribeOptions<T>,
subscriber_id: &str,
delivered: &EventEnvelope<T>,
error: &EventBusError,
event_bus: &LocalEventBus,
) -> Option<EventEnvelope<DeadLetterPayload>>
where
T: Clone + Send + Sync + 'static,
{
if let Some(strategy) = event_bus.inner.default_dead_letter_strategy::<T>() {
return match call_dead_letter_strategy(strategy, subscriber_id, delivered, error, options) {
Ok(dead_letter) => dead_letter,
Err(error) => {
event_bus.inner.observe_error(&error);
None
}
};
}
let strategy = event_bus.inner.global_default_dead_letter_strategy()?;
match call_global_dead_letter_strategy(
strategy,
subscriber_id,
delivered.metadata(),
Arc::new(delivered.payload().clone()),
error,
) {
Ok(dead_letter) => dead_letter,
Err(error) => {
event_bus.inner.observe_error(&error);
None
}
}
}
fn call_dead_letter_strategy<T>(
strategy: Arc<DeadLetterStrategyFn<T>>,
subscriber_id: &str,
delivered: &EventEnvelope<T>,
error: &EventBusError,
options: &SubscribeOptions<T>,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
where
T: Clone + Send + Sync + 'static,
{
match panic::catch_unwind(AssertUnwindSafe(|| {
strategy.create_dead_letter(subscriber_id, delivered, error, options)
})) {
Ok(Ok(dead_letter)) => Ok(dead_letter),
Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
Err(_) => Err(EventBusError::dead_letter_failed(
"default dead-letter strategy panicked",
)),
}
}
fn call_global_dead_letter_strategy(
strategy: Arc<DeadLetterStrategyAnyFn>,
subscriber_id: &str,
metadata: EventEnvelopeMetadata,
original_payload: DeadLetterOriginalPayload,
error: &EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
match panic::catch_unwind(AssertUnwindSafe(|| {
strategy.create_dead_letter(subscriber_id, metadata, original_payload, error)
})) {
Ok(Ok(dead_letter)) => Ok(dead_letter),
Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
Err(_) => Err(EventBusError::dead_letter_failed(
"global default dead-letter strategy panicked",
)),
}
}
fn run_handler_with_retry<T>(
handler: &Arc<HandlerFn<T>>,
options: &SubscribeOptions<T>,
envelope: EventEnvelope<T>,
) -> Result<HandlerDelivery<T>, Box<HandlerRunFailure<T>>>
where
T: Clone + Send + Sync + 'static,
{
let mut last_delivery = None;
match run_with_retry(options.retry_options(), || {
let delivery = HandlerDelivery::new(&envelope);
last_delivery = Some(delivery.clone());
call_handler(handler, delivery.delivered.clone())?;
if delivery.acknowledgement.is_nacked() {
Err(EventBusError::handler_failed("subscriber nacked the event"))
} else {
Ok(delivery)
}
}) {
Ok(delivery) => Ok(delivery),
Err(error) => {
let delivery = match last_delivery {
Some(delivery) => delivery,
None => HandlerDelivery::new(&envelope),
};
Err(Box::new(HandlerRunFailure { error, delivery }))
}
}
}
fn call_handler<T>(handler: &Arc<HandlerFn<T>>, envelope: EventEnvelope<T>) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
match panic::catch_unwind(AssertUnwindSafe(|| handler(envelope))) {
Ok(result) => result,
Err(_) => Err(EventBusError::handler_panicked()),
}
}
fn run_with_retry<T, F>(
retry_options: Option<&crate::RetryOptions>,
operation: F,
) -> EventBusResult<T>
where
F: FnMut() -> EventBusResult<T>,
{
let Some(retry_options) = retry_options else {
let mut operation = operation;
return operation();
};
let retry = match qubit_retry::Retry::<EventBusError>::from_options(retry_options.clone()) {
Ok(retry) => retry,
Err(error) => {
return Err(EventBusError::invalid_argument(
"retry_options",
error.to_string(),
));
}
};
match retry.run(operation) {
Ok(value) => Ok(value),
Err(error) => match error.last_error().cloned() {
Some(error) => Err(error),
None => Err(EventBusError::handler_failed(error.to_string())),
},
}
}
fn validate_retry_options(retry_options: Option<&crate::RetryOptions>) -> EventBusResult<()> {
if retry_options
.and_then(crate::RetryOptions::attempt_timeout)
.is_some()
{
return Err(EventBusError::invalid_argument(
"retry_options",
"attempt_timeout is not supported by LocalEventBus retry handling",
));
}
Ok(())
}
fn wait_for_executor_termination(executor: &FixedThreadPool) {
while !executor.is_terminated() {
thread::sleep(Duration::from_millis(1));
}
}
fn wait_for_executor_termination_timeout(executor: &FixedThreadPool, timeout: Duration) -> bool {
let started_at = Instant::now();
while !executor.is_terminated() {
let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
return false;
};
thread::sleep(remaining.min(Duration::from_millis(1)));
}
true
}
fn wait_for_delay_scheduler_termination(scheduler: &DelayedTaskScheduler) {
while !scheduler.is_terminated() {
thread::sleep(Duration::from_millis(1));
}
}
fn wait_for_delay_scheduler_termination_timeout(
scheduler: &DelayedTaskScheduler,
timeout: Duration,
) -> bool {
let started_at = Instant::now();
while !scheduler.is_terminated() {
let Some(remaining) = remaining_shutdown_timeout(started_at, timeout) else {
return false;
};
thread::sleep(remaining.min(Duration::from_millis(1)));
}
true
}
fn remaining_shutdown_timeout(started_at: Instant, timeout: Duration) -> Option<Duration> {
timeout.checked_sub(started_at.elapsed())
}
fn default_subscription_handler_pool_size() -> usize {
thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
}