use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub(crate) u64);
impl SubscriptionId {
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryStrategy {
Fixed(Duration),
Exponential {
base: Duration,
max: Duration,
},
ExponentialWithJitter {
base: Duration,
max: Duration,
},
}
impl RetryStrategy {
pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
match *self {
RetryStrategy::Fixed(d) => d,
RetryStrategy::Exponential { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
if delay > max { max } else { delay }
}
RetryStrategy::ExponentialWithJitter { base, max } => {
let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
let factor = u32::try_from(factor).unwrap_or(u32::MAX);
let delay = base.saturating_mul(factor);
let capped = if delay > max { max } else { delay };
let nanos = capped.as_nanos() as u64;
if nanos == 0 {
Duration::ZERO
} else {
let entropy = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos() as u64;
let jittered = entropy % (nanos + 1);
Duration::from_nanos(jittered)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubscriptionPolicy {
pub priority: i32,
pub max_retries: usize,
pub retry_strategy: Option<RetryStrategy>,
pub dead_letter: bool,
}
impl Default for SubscriptionPolicy {
fn default() -> Self {
Self {
priority: 0,
max_retries: 0,
retry_strategy: None,
dead_letter: true,
}
}
}
impl SubscriptionPolicy {
pub const fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_strategy = Some(strategy);
self
}
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SyncSubscriptionPolicy {
pub priority: i32,
pub dead_letter: bool,
}
impl Default for SyncSubscriptionPolicy {
fn default() -> Self {
Self {
priority: 0,
dead_letter: true,
}
}
}
impl SyncSubscriptionPolicy {
pub const fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
self.dead_letter = dead_letter;
self
}
}
impl From<SyncSubscriptionPolicy> for SubscriptionPolicy {
fn from(policy: SyncSubscriptionPolicy) -> SubscriptionPolicy {
SubscriptionPolicy {
priority: policy.priority,
max_retries: 0,
retry_strategy: None,
dead_letter: policy.dead_letter,
}
}
}
mod sealed {
pub trait Sealed {}
impl Sealed for super::SubscriptionPolicy {}
impl Sealed for super::SyncSubscriptionPolicy {}
}
pub trait IntoSubscriptionPolicy<M>: sealed::Sealed {
fn into_subscription_policy(self) -> SubscriptionPolicy;
}
impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self
}
}
impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self
}
}
impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SyncSubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self.into()
}
}
impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SyncSubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self.into()
}
}
impl IntoSubscriptionPolicy<crate::handler::SyncMode> for SyncSubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self.into()
}
}
impl IntoSubscriptionPolicy<crate::handler::SyncFnMode> for SyncSubscriptionPolicy {
fn into_subscription_policy(self) -> SubscriptionPolicy {
self.into()
}
}
#[derive(Clone, Debug)]
pub struct DeadLetter {
pub event_name: &'static str,
pub subscription_id: SubscriptionId,
pub attempts: usize,
pub error: String,
pub event: Arc<dyn Any + Send + Sync>,
pub failed_at: SystemTime,
pub handler_name: Option<&'static str>,
}
pub trait Event: Send + Sync + 'static {}
impl<T: Send + Sync + 'static> Event for T {}
#[derive(Debug, Clone)]
pub struct HandlerInfo {
pub subscription_id: SubscriptionId,
pub name: Option<&'static str>,
}
#[derive(Debug, Clone)]
pub struct BusStats {
pub total_subscriptions: usize,
pub subscriptions_by_event: HashMap<&'static str, Vec<HandlerInfo>>,
pub registered_event_types: Vec<&'static str>,
pub queue_capacity: usize,
pub publish_permits_available: usize,
pub publish_in_flight: usize,
pub in_flight_async: usize,
pub shutdown_called: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct BusConfig {
pub buffer_size: usize,
pub handler_timeout: Option<Duration>,
pub max_concurrent_async: Option<usize>,
pub default_subscription_policy: SubscriptionPolicy,
pub shutdown_timeout: Option<Duration>,
}
impl Default for BusConfig {
fn default() -> Self {
Self {
buffer_size: 256,
handler_timeout: None,
max_concurrent_async: None,
default_subscription_policy: SubscriptionPolicy::default(),
shutdown_timeout: None,
}
}
}