use std::any::Any;
use std::collections::HashMap;
use std::collections::hash_map::RandomState;
use std::fmt;
use std::hash::{BuildHasher, Hasher};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub(crate) u64);
impl SubscriptionId {
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryStrategy {
Fixed(Duration),
Exponential {
base: Duration,
max: Duration,
},
ExponentialWithJitter {
base: Duration,
max: Duration,
},
}
impl RetryStrategy {
pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
match *self {
RetryStrategy::Fixed(d) => d,
RetryStrategy::Exponential { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
if delay > max { max } else { delay }
}
RetryStrategy::ExponentialWithJitter { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
let capped = if delay > max { max } else { delay };
let nanos = capped.as_nanos() as u64;
if nanos == 0 {
Duration::ZERO
} else {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let tick = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut hasher = RandomState::new().build_hasher();
hasher.write_u64(tick);
let entropy = hasher.finish();
let jittered = entropy % (nanos + 1);
Duration::from_nanos(jittered)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AsyncSubscriptionPolicy {
pub max_retries: usize,
pub retry_strategy: Option<RetryStrategy>,
pub dead_letter: bool,
}
impl Default for AsyncSubscriptionPolicy {
fn default() -> Self {
Self {
max_retries: 0,
retry_strategy: None,
dead_letter: true,
}
}
}
impl AsyncSubscriptionPolicy {
pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_strategy = Some(strategy);
self
}
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SyncSubscriptionPolicy {
pub priority: i32,
pub dead_letter: bool,
}
impl Default for SyncSubscriptionPolicy {
fn default() -> Self {
Self {
priority: 0,
dead_letter: true,
}
}
}
impl SyncSubscriptionPolicy {
pub const fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct SubscriptionDefaults {
pub policy: AsyncSubscriptionPolicy,
pub sync_policy: SyncSubscriptionPolicy,
}
#[derive(Clone, Debug)]
pub struct DeadLetter {
pub event_name: &'static str,
pub subscription_id: SubscriptionId,
pub attempts: usize,
pub error: String,
pub event: Arc<dyn Any + Send + Sync>,
pub failed_at: SystemTime,
pub handler_name: Option<&'static str>,
}
pub trait Event: Send + Sync + 'static {}
impl<T: Send + Sync + 'static> Event for T {}
#[derive(Debug, Clone)]
pub struct HandlerInfo {
pub subscription_id: SubscriptionId,
pub name: Option<&'static str>,
}
#[derive(Debug, Clone)]
pub struct BusStats {
pub total_subscriptions: usize,
pub subscriptions_by_event: HashMap<&'static str, Vec<HandlerInfo>>,
pub registered_event_types: Vec<&'static str>,
pub dispatches_in_flight: usize,
pub in_flight_async: usize,
pub shutdown_called: bool,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct BusConfig {
pub handler_timeout: Option<Duration>,
pub max_concurrent_async: Option<usize>,
pub subscription_defaults: SubscriptionDefaults,
pub shutdown_timeout: Option<Duration>,
}