use std::panic::{
self,
AssertUnwindSafe,
};
use std::sync::{
Arc,
Mutex,
};
use crate::{
EventBusError,
EventBusResult,
EventEnvelope,
};
type HandlerFn<T> = dyn Fn(EventEnvelope<T>) -> EventBusResult<()> + Send + Sync + 'static;
pub(crate) type DownstreamErrorSlot = Arc<Mutex<Vec<EventBusError>>>;
pub struct SubscriberInterceptorChain<T: Clone + Send + Sync + 'static> {
next: Arc<HandlerFn<T>>,
downstream_error: DownstreamErrorSlot,
}
pub struct SubscriberInterceptorAnyChain {
next: Arc<dyn Fn() -> EventBusResult<()> + Send + Sync + 'static>,
downstream_error: DownstreamErrorSlot,
}
impl SubscriberInterceptorAnyChain {
pub(crate) fn with_downstream_error(
next: Arc<dyn Fn() -> EventBusResult<()> + Send + Sync + 'static>,
downstream_error: DownstreamErrorSlot,
) -> Self {
Self {
next,
downstream_error,
}
}
pub fn proceed(&self) -> EventBusResult<()> {
match panic::catch_unwind(AssertUnwindSafe(|| (self.next)())) {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => {
record_downstream_error(&self.downstream_error, &error);
Err(error)
}
Err(_) => {
let error = EventBusError::handler_panicked();
record_downstream_error(&self.downstream_error, &error);
Err(error)
}
}
}
}
impl<T> SubscriberInterceptorChain<T>
where
T: Clone + Send + Sync + 'static,
{
pub(crate) fn with_downstream_error(
next: Arc<dyn Fn(EventEnvelope<T>) -> EventBusResult<()> + Send + Sync + 'static>,
downstream_error: DownstreamErrorSlot,
) -> Self {
Self {
next,
downstream_error,
}
}
pub fn proceed(&self, envelope: EventEnvelope<T>) -> EventBusResult<()> {
match panic::catch_unwind(AssertUnwindSafe(|| (self.next)(envelope))) {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => {
record_downstream_error(&self.downstream_error, &error);
Err(error)
}
Err(_) => {
let error = EventBusError::handler_panicked();
record_downstream_error(&self.downstream_error, &error);
Err(error)
}
}
}
}
pub(crate) fn create_downstream_error_slot() -> DownstreamErrorSlot {
Arc::new(Mutex::new(Vec::new()))
}
pub(crate) fn is_recorded_downstream_error(
downstream_error: &DownstreamErrorSlot,
error: &EventBusError,
) -> bool {
downstream_error
.lock()
.map(|recorded| recorded.contains(error))
.unwrap_or(false)
}
fn record_downstream_error(downstream_error: &DownstreamErrorSlot, error: &EventBusError) {
if let Ok(mut recorded) = downstream_error.lock()
&& !recorded.contains(error)
{
recorded.push(error.clone());
}
}