use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub(crate) u64);
impl SubscriptionId {
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryStrategy {
Fixed(Duration),
Exponential {
base: Duration,
max: Duration,
},
ExponentialWithJitter {
base: Duration,
max: Duration,
},
}
impl RetryStrategy {
pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
match *self {
RetryStrategy::Fixed(d) => d,
RetryStrategy::Exponential { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
if delay > max { max } else { delay }
}
RetryStrategy::ExponentialWithJitter { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
let capped = if delay > max { max } else { delay };
let nanos = capped.as_nanos() as u64;
if nanos == 0 {
Duration::ZERO
} else {
let entropy = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos() as u64;
let jittered = entropy % (nanos + 1);
Duration::from_nanos(jittered)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FailurePolicy {
pub max_retries: usize,
pub retry_strategy: Option<RetryStrategy>,
pub dead_letter: bool,
}
impl Default for FailurePolicy {
fn default() -> Self {
Self {
max_retries: 0,
retry_strategy: None,
dead_letter: true,
}
}
}
impl FailurePolicy {
pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_strategy = Some(strategy);
self
}
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NoRetryPolicy {
pub dead_letter: bool,
}
impl Default for NoRetryPolicy {
fn default() -> Self {
Self { dead_letter: true }
}
}
impl NoRetryPolicy {
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
impl From<NoRetryPolicy> for FailurePolicy {
fn from(policy: NoRetryPolicy) -> FailurePolicy {
FailurePolicy {
max_retries: 0,
retry_strategy: None,
dead_letter: policy.dead_letter,
}
}
}
mod sealed {
pub trait Sealed {}
impl Sealed for super::FailurePolicy {}
impl Sealed for super::NoRetryPolicy {}
}
pub trait IntoFailurePolicy<M>: sealed::Sealed {
fn into_failure_policy(self) -> FailurePolicy;
}
impl IntoFailurePolicy<crate::handler::AsyncMode> for FailurePolicy {
fn into_failure_policy(self) -> FailurePolicy {
self
}
}
impl IntoFailurePolicy<crate::handler::AsyncFnMode> for FailurePolicy {
fn into_failure_policy(self) -> FailurePolicy {
self
}
}
impl IntoFailurePolicy<crate::handler::AsyncMode> for NoRetryPolicy {
fn into_failure_policy(self) -> FailurePolicy {
self.into()
}
}
impl IntoFailurePolicy<crate::handler::AsyncFnMode> for NoRetryPolicy {
fn into_failure_policy(self) -> FailurePolicy {
self.into()
}
}
impl IntoFailurePolicy<crate::handler::SyncMode> for NoRetryPolicy {
fn into_failure_policy(self) -> FailurePolicy {
self.into()
}
}
impl IntoFailurePolicy<crate::handler::SyncFnMode> for NoRetryPolicy {
fn into_failure_policy(self) -> FailurePolicy {
self.into()
}
}
#[derive(Clone, Debug)]
pub struct DeadLetter {
pub event_name: &'static str,
pub subscription_id: SubscriptionId,
pub attempts: usize,
pub error: String,
pub event: Arc<dyn Any + Send + Sync>,
pub failed_at: SystemTime,
pub listener_name: Option<&'static str>,
}
pub trait Event: Send + Sync + 'static {}
impl<T: Send + Sync + 'static> Event for T {}
#[derive(Debug, Clone)]
pub struct ListenerInfo {
pub subscription_id: SubscriptionId,
pub name: Option<&'static str>,
}
#[derive(Debug, Clone)]
pub struct BusStats {
pub total_subscriptions: usize,
pub subscriptions_by_event: HashMap<&'static str, Vec<ListenerInfo>>,
pub registered_event_types: Vec<&'static str>,
pub queue_capacity: usize,
pub in_flight_async: usize,
pub shutdown_called: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct BusConfig {
pub buffer_size: usize,
pub handler_timeout: Option<Duration>,
pub max_concurrent_async: Option<usize>,
pub default_failure_policy: FailurePolicy,
pub shutdown_timeout: Option<Duration>,
}
impl Default for BusConfig {
fn default() -> Self {
Self {
buffer_size: 256,
handler_timeout: None,
max_concurrent_async: None,
default_failure_policy: FailurePolicy::default(),
shutdown_timeout: None,
}
}
}