use std::panic::{
self,
AssertUnwindSafe,
};
use std::sync::Arc;
use crate::{
EventBusError,
EventBusResult,
EventEnvelope,
PublishOptionsBuilder,
RetryOptions,
};
pub(crate) type PublishErrorHandlerFn<T> =
dyn Fn(&EventEnvelope<T>, &EventBusError) -> EventBusResult<()> + Send + Sync + 'static;
pub struct PublishOptions<T: 'static> {
pub(crate) retry_options: Option<RetryOptions>,
pub(crate) error_handlers: Vec<Arc<PublishErrorHandlerFn<T>>>,
}
impl<T: 'static> PublishOptions<T> {
pub fn builder() -> PublishOptionsBuilder<T> {
PublishOptionsBuilder::new()
}
pub fn empty() -> Self {
Self {
retry_options: None,
error_handlers: Vec::new(),
}
}
pub fn retry_options(&self) -> Option<&RetryOptions> {
self.retry_options.as_ref()
}
pub fn error_handler_count(&self) -> usize {
self.error_handlers.len()
}
pub(crate) fn merge_defaults(self, defaults: Self) -> Self {
let mut error_handlers = defaults.error_handlers;
error_handlers.extend(self.error_handlers);
Self {
retry_options: self.retry_options.or(defaults.retry_options),
error_handlers,
}
}
pub(crate) fn notify_publish_error(
&self,
envelope: &EventEnvelope<T>,
error: &EventBusError,
) -> Vec<EventBusError> {
let mut failures = Vec::new();
for handler in &self.error_handlers {
match panic::catch_unwind(AssertUnwindSafe(|| handler(envelope, error))) {
Ok(Ok(())) => {}
Ok(Err(error)) => failures.push(EventBusError::error_handler_failed(
"publish",
error.to_string(),
)),
Err(_) => failures.push(EventBusError::error_handler_failed(
"publish",
"publish error handler panicked",
)),
}
}
failures
}
}
impl<T: 'static> Clone for PublishOptions<T> {
fn clone(&self) -> Self {
Self {
retry_options: self.retry_options.clone(),
error_handlers: self.error_handlers.clone(),
}
}
}
impl<T: 'static> Default for PublishOptions<T> {
fn default() -> Self {
Self::empty()
}
}