use std::panic::{
self,
AssertUnwindSafe,
};
use std::sync::Arc;
use crate::{
AckMode,
Acknowledgement,
DeadLetterOriginalPayload,
DeadLetterRecord,
EventBusError,
EventBusResult,
EventEnvelope,
EventEnvelopeMetadata,
RetryOptions,
SubscribeOptionsBuilder,
Topic,
};
use super::dead_letter_record::DeadLetterPayload;
pub(crate) type EventFilterFn<T> = dyn Fn(&EventEnvelope<T>) -> bool + Send + Sync + 'static;
pub(crate) type SubscribeErrorHandlerFn<T> = dyn Fn(&str, &EventEnvelope<T>, &EventBusError, &Acknowledgement) -> EventBusResult<()>
+ Send
+ Sync
+ 'static;
pub trait DeadLetterStrategy<T: 'static>: Send + Sync + 'static {
fn create_dead_letter(
&self,
subscriber_id: &str,
failed: &EventEnvelope<T>,
error: &EventBusError,
options: &SubscribeOptions<T>,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>;
}
pub trait DeadLetterStrategyCallback<T: 'static>:
Fn(
&str,
&EventEnvelope<T>,
&EventBusError,
&SubscribeOptions<T>,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
+ Send
+ Sync
+ 'static
{
}
impl<T, F> DeadLetterStrategyCallback<T> for F
where
T: 'static,
F: Fn(
&str,
&EventEnvelope<T>,
&EventBusError,
&SubscribeOptions<T>,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
+ Send
+ Sync
+ 'static,
{
}
struct ClosureDeadLetterStrategy<F> {
callback: F,
}
impl<F> ClosureDeadLetterStrategy<F> {
fn new(callback: F) -> Self {
Self { callback }
}
}
impl<T, F> DeadLetterStrategy<T> for ClosureDeadLetterStrategy<F>
where
T: 'static,
F: DeadLetterStrategyCallback<T>,
{
fn create_dead_letter(
&self,
subscriber_id: &str,
failed: &EventEnvelope<T>,
error: &EventBusError,
options: &SubscribeOptions<T>,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
(self.callback)(subscriber_id, failed, error, options)
}
}
pub(crate) type DeadLetterStrategyFn<T> = dyn DeadLetterStrategy<T>;
pub(crate) fn wrap_dead_letter_strategy<T, F>(strategy: F) -> Arc<DeadLetterStrategyFn<T>>
where
T: 'static,
F: DeadLetterStrategyCallback<T>,
{
Arc::new(ClosureDeadLetterStrategy::new(strategy))
}
pub trait DeadLetterStrategyAny: Send + Sync + 'static {
fn create_dead_letter(
&self,
subscriber_id: &str,
failed: EventEnvelopeMetadata,
original_payload: DeadLetterOriginalPayload,
error: &EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>;
}
pub trait DeadLetterStrategyAnyCallback:
Fn(
&str,
EventEnvelopeMetadata,
DeadLetterOriginalPayload,
&EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
+ Send
+ Sync
+ 'static
{
}
impl<F> DeadLetterStrategyAnyCallback for F where
F: Fn(
&str,
EventEnvelopeMetadata,
DeadLetterOriginalPayload,
&EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>>
+ Send
+ Sync
+ 'static
{
}
struct ClosureDeadLetterStrategyAny<F> {
callback: F,
}
impl<F> ClosureDeadLetterStrategyAny<F> {
fn new(callback: F) -> Self {
Self { callback }
}
}
impl<F> DeadLetterStrategyAny for ClosureDeadLetterStrategyAny<F>
where
F: DeadLetterStrategyAnyCallback,
{
fn create_dead_letter(
&self,
subscriber_id: &str,
failed: EventEnvelopeMetadata,
original_payload: DeadLetterOriginalPayload,
error: &EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
(self.callback)(subscriber_id, failed, original_payload, error)
}
}
pub(crate) type DeadLetterStrategyAnyFn = dyn DeadLetterStrategyAny;
pub(crate) fn wrap_dead_letter_strategy_any<F>(strategy: F) -> Arc<DeadLetterStrategyAnyFn>
where
F: DeadLetterStrategyAnyCallback,
{
Arc::new(ClosureDeadLetterStrategyAny::new(strategy))
}
pub fn discard_dead_letters<T>() -> impl DeadLetterStrategyCallback<T>
where
T: 'static,
{
|_subscriber_id: &str,
_failed: &EventEnvelope<T>,
_error: &EventBusError,
_options: &SubscribeOptions<T>| { Ok(None) }
}
pub fn standard_dead_letters_to<T>(
dead_letter_topic: Topic<DeadLetterPayload>,
) -> impl DeadLetterStrategyCallback<T>
where
T: Clone + Send + Sync + 'static,
{
move |subscriber_id: &str,
failed: &EventEnvelope<T>,
error: &EventBusError,
_options: &SubscribeOptions<T>| {
Ok(Some(
EventEnvelope::create(
dead_letter_topic.clone(),
DeadLetterRecord::from_failure(subscriber_id, failed, error),
)
.as_dead_letter(),
))
}
}
pub fn prefixed_dead_letters<T>(prefix: &str) -> impl DeadLetterStrategyCallback<T>
where
T: Clone + Send + Sync + 'static,
{
let prefix = prefix.to_string();
move |subscriber_id: &str,
failed: &EventEnvelope<T>,
error: &EventBusError,
_options: &SubscribeOptions<T>| {
let topic =
Topic::<DeadLetterPayload>::try_new(format!("{}{}", prefix, failed.topic().name()))?;
Ok(Some(
EventEnvelope::create(
topic,
DeadLetterRecord::from_failure(subscriber_id, failed, error),
)
.as_dead_letter(),
))
}
}
pub struct SubscribeOptions<T: 'static> {
pub(crate) ack_mode: AckMode,
pub(crate) ack_mode_configured: bool,
pub(crate) retry_options: Option<RetryOptions>,
pub(crate) filter: Option<Arc<EventFilterFn<T>>>,
pub(crate) error_handlers: Vec<Arc<SubscribeErrorHandlerFn<T>>>,
pub(crate) dead_letter_strategy: Option<Arc<DeadLetterStrategyFn<T>>>,
pub(crate) priority: i32,
pub(crate) priority_configured: bool,
}
impl<T: 'static> SubscribeOptions<T> {
pub fn builder() -> SubscribeOptionsBuilder<T> {
SubscribeOptionsBuilder::new()
}
pub fn empty() -> Self {
Self {
ack_mode: AckMode::Auto,
ack_mode_configured: false,
retry_options: None,
filter: None,
error_handlers: Vec::new(),
dead_letter_strategy: None,
priority: 0,
priority_configured: false,
}
}
pub const fn ack_mode(&self) -> AckMode {
self.ack_mode
}
pub fn retry_options(&self) -> Option<&RetryOptions> {
self.retry_options.as_ref()
}
pub const fn priority(&self) -> i32 {
self.priority
}
pub fn error_handler_count(&self) -> usize {
self.error_handlers.len()
}
pub(crate) fn merge_defaults(self, defaults: Self) -> Self {
let mut error_handlers = defaults.error_handlers;
error_handlers.extend(self.error_handlers);
Self {
ack_mode: if self.ack_mode_configured {
self.ack_mode
} else {
defaults.ack_mode
},
ack_mode_configured: self.ack_mode_configured || defaults.ack_mode_configured,
retry_options: self.retry_options.or(defaults.retry_options),
filter: self.filter.or(defaults.filter),
error_handlers,
dead_letter_strategy: self.dead_letter_strategy.or(defaults.dead_letter_strategy),
priority: if self.priority_configured {
self.priority
} else {
defaults.priority
},
priority_configured: self.priority_configured || defaults.priority_configured,
}
}
pub(crate) fn has_dead_letter_strategy(&self) -> bool {
self.dead_letter_strategy.is_some()
}
pub fn should_handle(&self, envelope: &EventEnvelope<T>) -> bool {
self.try_should_handle(envelope).unwrap_or(false)
}
pub(crate) fn try_should_handle(&self, envelope: &EventEnvelope<T>) -> EventBusResult<bool> {
let Some(filter) = &self.filter else {
return Ok(true);
};
panic::catch_unwind(AssertUnwindSafe(|| filter(envelope)))
.map_err(|_| EventBusError::handler_failed("subscriber filter panicked"))
}
pub(crate) fn notify_subscribe_error(
&self,
subscriber_id: &str,
envelope: &EventEnvelope<T>,
error: &EventBusError,
acknowledgement: &Acknowledgement,
) -> Vec<EventBusError> {
let mut failures = Vec::new();
for handler in &self.error_handlers {
let was_completed = acknowledgement.is_completed();
let was_acked = acknowledgement.is_acked();
match panic::catch_unwind(AssertUnwindSafe(|| {
handler(subscriber_id, envelope, error, acknowledgement)
})) {
Ok(Ok(())) => {}
Ok(Err(error)) => failures.push(EventBusError::error_handler_failed(
"subscribe",
error.to_string(),
)),
Err(_) => failures.push(EventBusError::error_handler_failed(
"subscribe",
"subscribe error handler panicked",
)),
}
if (!was_completed && acknowledgement.is_completed())
|| (!was_acked && acknowledgement.is_acked())
{
break;
}
}
failures
}
pub(crate) fn create_dead_letter(
&self,
subscriber_id: &str,
envelope: &EventEnvelope<T>,
error: &EventBusError,
) -> EventBusResult<Option<EventEnvelope<DeadLetterPayload>>> {
let Some(strategy) = &self.dead_letter_strategy else {
return Ok(None);
};
match panic::catch_unwind(AssertUnwindSafe(|| {
strategy.create_dead_letter(subscriber_id, envelope, error, self)
})) {
Ok(Ok(dead_letter)) => Ok(dead_letter),
Ok(Err(error)) => Err(normalize_dead_letter_error(error)),
Err(_) => Err(EventBusError::dead_letter_failed(
"dead-letter strategy panicked",
)),
}
}
}
pub(crate) fn normalize_dead_letter_error(error: EventBusError) -> EventBusError {
if matches!(error, EventBusError::DeadLetterFailed { .. }) {
error
} else {
EventBusError::dead_letter_failed(error.to_string())
}
}
impl<T: 'static> Clone for SubscribeOptions<T> {
fn clone(&self) -> Self {
Self {
ack_mode: self.ack_mode,
ack_mode_configured: self.ack_mode_configured,
retry_options: self.retry_options.clone(),
filter: self.filter.clone(),
error_handlers: self.error_handlers.clone(),
dead_letter_strategy: self.dead_letter_strategy.clone(),
priority: self.priority,
priority_configured: self.priority_configured,
}
}
}
impl<T: 'static> Default for SubscribeOptions<T> {
fn default() -> Self {
Self::empty()
}
}