use std::any::Any;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::error;
use super::handler::HandlerResult;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum FailurePolicy {
FailFast,
Drop,
Retry,
RetryAfter(Duration),
Skip,
}
impl FailurePolicy {
pub(crate) const fn settlement(self) -> Option<HandlerResult> {
match self {
Self::FailFast => None,
Self::Drop => Some(HandlerResult::drop()),
Self::Retry => Some(HandlerResult::retry()),
Self::RetryAfter(delay) => Some(HandlerResult::retry_after(delay)),
Self::Skip => Some(HandlerResult::Ack),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub struct FailurePolicies {
pub panic: FailurePolicy,
pub decode: FailurePolicy,
}
impl FailurePolicies {
#[must_use]
pub fn with_panic(mut self, policy: FailurePolicy) -> Self {
self.panic = policy;
self
}
#[must_use]
pub fn with_decode(mut self, policy: FailurePolicy) -> Self {
self.decode = policy;
self
}
}
impl Default for FailurePolicies {
fn default() -> Self {
Self {
panic: FailurePolicy::FailFast,
decode: FailurePolicy::Drop,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct ErrorShutdown {
token: CancellationToken,
failure: Arc<Mutex<Option<String>>>,
}
impl ErrorShutdown {
pub(crate) fn new(token: CancellationToken) -> Self {
Self {
token,
failure: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn signal(&self, subscription: &str, reason: &str) {
error!(
target: "ruststream::dispatch",
subscription = %subscription,
reason = %reason,
"fail-fast: a dispatch failure is tearing the service down",
);
if let Ok(mut slot) = self.failure.lock() {
slot.get_or_insert_with(|| format!("{subscription}: {reason}"));
}
self.token.cancel();
}
pub(crate) fn taken_failure(&self) -> Option<String> {
self.failure.lock().ok().and_then(|mut slot| slot.take())
}
}
#[derive(Debug, Clone)]
pub(crate) struct DispatchFailure {
pub(crate) policies: FailurePolicies,
pub(crate) shutdown: ErrorShutdown,
}
impl DispatchFailure {
pub(crate) fn new(policies: FailurePolicies, shutdown: ErrorShutdown) -> Self {
Self { policies, shutdown }
}
}
pub(crate) fn panic_reason(payload: &(dyn Any + Send)) -> String {
payload.downcast_ref::<&'static str>().map_or_else(
|| {
payload
.downcast_ref::<String>()
.cloned()
.unwrap_or_else(|| "handler panicked".to_owned())
},
|s| (*s).to_owned(),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn settlement_maps_every_non_fail_fast_policy() {
assert_eq!(FailurePolicy::FailFast.settlement(), None);
assert_eq!(
FailurePolicy::Drop.settlement(),
Some(HandlerResult::drop())
);
assert_eq!(
FailurePolicy::Retry.settlement(),
Some(HandlerResult::retry())
);
assert_eq!(
FailurePolicy::RetryAfter(Duration::from_secs(2)).settlement(),
Some(HandlerResult::retry_after(Duration::from_secs(2)))
);
assert_eq!(FailurePolicy::Skip.settlement(), Some(HandlerResult::Ack));
}
#[test]
fn defaults_are_fail_fast_panic_and_drop_decode() {
let policies = FailurePolicies::default();
assert_eq!(policies.panic, FailurePolicy::FailFast);
assert_eq!(policies.decode, FailurePolicy::Drop);
}
#[test]
fn builders_override_one_key_each() {
let policies = FailurePolicies::default()
.with_panic(FailurePolicy::Drop)
.with_decode(FailurePolicy::Skip);
assert_eq!(policies.panic, FailurePolicy::Drop);
assert_eq!(policies.decode, FailurePolicy::Skip);
}
#[test]
fn panic_reason_recovers_the_message() {
let as_str: &(dyn Any + Send) = &"boom";
assert_eq!(panic_reason(as_str), "boom");
let owned = String::from("owned boom");
let as_string: &(dyn Any + Send) = &owned;
assert_eq!(panic_reason(as_string), "owned boom");
let other: &(dyn Any + Send) = &42_u8;
assert_eq!(panic_reason(other), "handler panicked");
}
#[test]
fn signal_records_first_failure_and_cancels() {
let token = CancellationToken::new();
let shutdown = ErrorShutdown::new(token.clone());
assert!(!token.is_cancelled());
shutdown.signal("orders.inbound", "handler panicked");
assert!(token.is_cancelled());
shutdown.signal("other", "second");
assert_eq!(
shutdown.taken_failure().as_deref(),
Some("orders.inbound: handler panicked")
);
assert_eq!(shutdown.taken_failure(), None);
}
}