Skip to main content

jaeb/
types.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7/// Unique identifier for a listener or middleware registration.
8///
9/// Assigned by the bus at subscription time and remains stable for the
10/// lifetime of the registration. Obtain it from
11/// [`Subscription::id`](crate::Subscription::id) or
12/// [`SubscriptionGuard::id`](crate::SubscriptionGuard::id).
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14pub struct SubscriptionId(pub(crate) u64);
15
16impl SubscriptionId {
17    /// Return the raw numeric value of the identifier.
18    ///
19    /// Useful for logging and tracing; do not rely on the magnitude or
20    /// sequence of values.
21    pub const fn as_u64(self) -> u64 {
22        self.0
23    }
24}
25
26impl fmt::Display for SubscriptionId {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        write!(f, "{}", self.0)
29    }
30}
31
32/// Strategy for computing the delay between retry attempts.
33///
34/// Used by [`SubscriptionPolicy`] to control back-off behaviour when a handler
35/// fails and is eligible for retry.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum RetryStrategy {
38    /// Wait a fixed duration between each retry.
39    Fixed(Duration),
40
41    /// Exponential back-off: the delay doubles on each attempt, capped at
42    /// `max`.
43    ///
44    /// The delay for attempt *n* (0-indexed) is `min(base * 2^n, max)`.
45    Exponential {
46        /// Initial delay (attempt 0).
47        base: Duration,
48        /// Upper bound on the delay.
49        max: Duration,
50    },
51
52    /// Exponential back-off with random jitter added to each delay.
53    ///
54    /// The computed delay is `rand(0 ..= min(base * 2^n, max))`.
55    ///
56    /// Jitter is derived from [`SystemTime`] nanoseconds
57    /// — lightweight but **not** cryptographically random.
58    ExponentialWithJitter {
59        /// Initial delay (attempt 0).
60        base: Duration,
61        /// Upper bound on the delay before jitter.
62        max: Duration,
63    },
64}
65
66impl RetryStrategy {
67    /// Compute the delay for the given zero-based retry attempt.
68    pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
69        match *self {
70            RetryStrategy::Fixed(d) => d,
71            RetryStrategy::Exponential { base, max } => {
72                let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
73                let factor = u32::try_from(factor).unwrap_or(u32::MAX);
74                let delay = base.saturating_mul(factor);
75                if delay > max { max } else { delay }
76            }
77            RetryStrategy::ExponentialWithJitter { base, max } => {
78                let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
79                let factor = u32::try_from(factor).unwrap_or(u32::MAX);
80                let delay = base.saturating_mul(factor);
81                let capped = if delay > max { max } else { delay };
82                // Simple jitter: pick a random fraction of the capped delay.
83                // We use a lightweight approach without pulling in the `rand` crate.
84                let nanos = capped.as_nanos() as u64;
85                if nanos == 0 {
86                    Duration::ZERO
87                } else {
88                    // Use the current time's nanoseconds as a cheap entropy source.
89                    let entropy = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos() as u64;
90                    let jittered = entropy % (nanos + 1);
91                    Duration::from_nanos(jittered)
92                }
93            }
94        }
95    }
96}
97
98/// Policy controlling how a subscription is scheduled and how failures are treated.
99///
100/// - `priority`: listener ordering hint. Higher values are dispatched first
101///   within the same dispatch lane (sync or async). Equal priorities keep
102///   FIFO registration order.
103/// - `max_retries`: how many *additional* attempts after the first failure
104///   (0 means no retries). **Only supported for async handlers.** Sync
105///   handlers must use [`SyncSubscriptionPolicy`] instead; attempting to pass a
106///   `SubscriptionPolicy` to a sync handler via
107///   [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy) is a
108///   compile-time error.
109/// - `retry_strategy`: optional [`RetryStrategy`] controlling the delay
110///   between retries. When `None`, retries happen immediately. Ignored when
111///   `max_retries` is 0. Only applies to async handlers.
112/// - `dead_letter`: whether a [`DeadLetter`] event is emitted after all
113///   attempts are exhausted (or on first failure for sync handlers).
114///   Automatically forced to `false` for dead-letter listeners to prevent
115///   infinite recursion.
116///
117/// All fields are public for convenience; invalid combinations (e.g.
118/// `retry_strategy` set with `max_retries: 0`) are harmless but have no effect.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct SubscriptionPolicy {
121    /// Listener ordering hint (higher runs first).
122    ///
123    /// Applied independently within sync and async lanes.
124    pub priority: i32,
125    /// Number of additional attempts after the first failure (0 = no retries).
126    ///
127    /// Only honoured for async handlers. Sync handlers always behave as if
128    /// this is `0`.
129    pub max_retries: usize,
130    /// Optional back-off strategy between retry attempts.
131    ///
132    /// `None` means retry immediately. Ignored when `max_retries` is `0`.
133    pub retry_strategy: Option<RetryStrategy>,
134    /// Emit a [`DeadLetter`] event after all attempts are exhausted.
135    ///
136    /// Automatically forced to `false` for dead-letter listeners.
137    pub dead_letter: bool,
138}
139
140impl Default for SubscriptionPolicy {
141    fn default() -> Self {
142        Self {
143            priority: 0,
144            max_retries: 0,
145            retry_strategy: None,
146            dead_letter: true,
147        }
148    }
149}
150
151impl SubscriptionPolicy {
152    /// Set listener priority (builder-style).
153    ///
154    /// Higher values are dispatched first within each lane.
155    pub const fn with_priority(mut self, priority: i32) -> Self {
156        self.priority = priority;
157        self
158    }
159
160    /// Set the maximum number of retries (builder-style).
161    ///
162    /// `0` disables retries. Only applicable to async handlers.
163    pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
164        self.max_retries = max_retries;
165        self
166    }
167
168    /// Set a custom [`RetryStrategy`].
169    pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
170        self.retry_strategy = Some(strategy);
171        self
172    }
173
174    /// Enable or disable dead-letter emission for this policy (builder-style).
175    pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
176        self.dead_letter = dead_letter;
177        self
178    }
179}
180
181/// Subscription policy for handlers that do not support retries.
182///
183/// This type is accepted by [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy)
184/// for sync handlers and by [`subscribe_once_with_policy`](crate::EventBus::subscribe_once_with_policy)
185/// for all handler types. It contains only the `dead_letter` flag since
186/// retry-related fields (`max_retries`, `retry_strategy`) are not applicable.
187///
188/// Use [`SubscriptionPolicy`] instead when subscribing async handlers that need
189/// retry support.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct SyncSubscriptionPolicy {
192    /// Listener ordering hint (higher runs first).
193    pub priority: i32,
194    /// Emit a [`DeadLetter`] event on failure.
195    ///
196    /// Automatically forced to `false` for dead-letter listeners.
197    pub dead_letter: bool,
198}
199
200impl Default for SyncSubscriptionPolicy {
201    fn default() -> Self {
202        Self {
203            priority: 0,
204            dead_letter: true,
205        }
206    }
207}
208
209impl SyncSubscriptionPolicy {
210    /// Set listener priority (builder-style).
211    pub const fn with_priority(mut self, priority: i32) -> Self {
212        self.priority = priority;
213        self
214    }
215
216    /// Enable or disable dead-letter emission for this policy (builder-style).
217    pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
218        self.dead_letter = dead_letter;
219        self
220    }
221}
222
223impl From<SyncSubscriptionPolicy> for SubscriptionPolicy {
224    fn from(policy: SyncSubscriptionPolicy) -> SubscriptionPolicy {
225        SubscriptionPolicy {
226            priority: policy.priority,
227            max_retries: 0,
228            retry_strategy: None,
229            dead_letter: policy.dead_letter,
230        }
231    }
232}
233
234// ── IntoSubscriptionPolicy ───────────────────────────────────────────────────
235
236mod sealed {
237    pub trait Sealed {}
238    impl Sealed for super::SubscriptionPolicy {}
239    impl Sealed for super::SyncSubscriptionPolicy {}
240}
241
242/// Trait that converts a policy type into a [`SubscriptionPolicy`] suitable for
243/// the
244/// handler's dispatch mode.
245///
246/// This trait is **sealed** — it cannot be implemented outside this crate.
247///
248/// The marker type `M` ([`AsyncMode`](crate::handler::AsyncMode),
249/// [`SyncMode`](crate::handler::SyncMode),
250/// [`AsyncFnMode`](crate::handler::AsyncFnMode), or
251/// [`SyncFnMode`](crate::handler::SyncFnMode)) is inferred from the handler via
252/// [`IntoHandler<E, M>`](crate::handler::IntoHandler), so callers never need
253/// to specify it explicitly. The type system enforces:
254///
255/// - **Async handlers** accept both [`SubscriptionPolicy`] (full retry
256///   support) and [`SyncSubscriptionPolicy`] (dead-letter only, no retries).
257/// - **Sync handlers** accept only [`SyncSubscriptionPolicy`]. Passing a
258///   [`SubscriptionPolicy`] to a sync handler is a compile-time error.
259pub trait IntoSubscriptionPolicy<M>: sealed::Sealed {
260    /// Convert into the internal [`SubscriptionPolicy`] representation.
261    fn into_subscription_policy(self) -> SubscriptionPolicy;
262}
263
264#[allow(dead_code)]
265#[deprecated(since = "0.3.3", note = "renamed to SubscriptionPolicy")]
266pub type FailurePolicy = SubscriptionPolicy;
267
268#[allow(dead_code)]
269#[deprecated(since = "0.3.3", note = "renamed to SyncSubscriptionPolicy")]
270pub type NoRetryPolicy = SyncSubscriptionPolicy;
271
272#[allow(unused_imports)]
273#[deprecated(since = "0.3.3", note = "renamed to IntoSubscriptionPolicy")]
274pub use IntoSubscriptionPolicy as IntoFailurePolicy;
275
276impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SubscriptionPolicy {
277    fn into_subscription_policy(self) -> SubscriptionPolicy {
278        self
279    }
280}
281
282impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SubscriptionPolicy {
283    fn into_subscription_policy(self) -> SubscriptionPolicy {
284        self
285    }
286}
287
288impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SyncSubscriptionPolicy {
289    fn into_subscription_policy(self) -> SubscriptionPolicy {
290        self.into()
291    }
292}
293
294impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SyncSubscriptionPolicy {
295    fn into_subscription_policy(self) -> SubscriptionPolicy {
296        self.into()
297    }
298}
299
300impl IntoSubscriptionPolicy<crate::handler::SyncMode> for SyncSubscriptionPolicy {
301    fn into_subscription_policy(self) -> SubscriptionPolicy {
302        self.into()
303    }
304}
305
306impl IntoSubscriptionPolicy<crate::handler::SyncFnMode> for SyncSubscriptionPolicy {
307    fn into_subscription_policy(self) -> SubscriptionPolicy {
308        self.into()
309    }
310}
311
312/// A dead-letter record emitted when a handler exhausts all retry attempts.
313///
314/// The original error is stored as a [`String`] rather than a typed error
315/// because `DeadLetter` must be `Clone` (it is published as an event) and
316/// `Box<dyn Error>` does not implement `Clone`. Use [`error`](Self::error)
317/// for diagnostics or pattern-match on the stringified message.
318///
319/// The original event payload is available via [`event`](Self::event) as a
320/// type-erased `Arc`. Consumers can call
321/// `dead_letter.event.downcast_ref::<OriginalEvent>()` to inspect the
322/// payload that caused the failure.
323#[derive(Clone, Debug)]
324pub struct DeadLetter {
325    /// The `type_name` of the event that failed.
326    pub event_name: &'static str,
327    /// The subscription that failed to handle the event.
328    pub subscription_id: SubscriptionId,
329    /// Total number of attempts (initial + retries).
330    pub attempts: usize,
331    /// Stringified error from the last failed attempt.
332    pub error: String,
333    /// The original event payload, type-erased.
334    ///
335    /// Use `downcast_ref::<E>()` to recover the concrete event type.
336    pub event: Arc<dyn Any + Send + Sync>,
337    /// When the terminal failure occurred.
338    pub failed_at: SystemTime,
339    /// Human-readable name of the handler that failed, if provided.
340    pub handler_name: Option<&'static str>,
341}
342
343impl DeadLetter {
344    /// Deprecated accessor for `handler_name`.
345    #[deprecated(since = "0.3.6", note = "renamed to handler_name")]
346    pub fn listener_name(&self) -> Option<&'static str> {
347        self.handler_name
348    }
349}
350
351/// Marker trait for all publishable event types.
352///
353/// Any type that is `Send + Sync + 'static` automatically implements `Event`
354/// via a blanket implementation, so no manual implementation is required.
355/// Async handlers additionally require `E: Clone`.
356///
357/// # Examples
358///
359/// ```
360/// // No derive or impl needed — the blanket impl does it automatically.
361/// #[derive(Clone)]
362/// struct OrderPlaced { pub order_id: u64 }
363/// // OrderPlaced: Event is satisfied automatically.
364/// ```
365pub trait Event: Send + Sync + 'static {}
366impl<T: Send + Sync + 'static> Event for T {}
367
368/// Information about a single registered handler, as reported by
369/// [`BusStats`].
370#[derive(Debug, Clone)]
371pub struct HandlerInfo {
372    /// The unique subscription identifier.
373    pub subscription_id: SubscriptionId,
374    /// Human-readable name, if the handler provides one.
375    pub name: Option<&'static str>,
376}
377
378#[allow(dead_code)]
379#[deprecated(since = "0.3.6", note = "renamed to HandlerInfo")]
380pub type ListenerInfo = HandlerInfo;
381
382/// A point-in-time snapshot of the event bus internal state.
383///
384/// Obtained via [`EventBus::stats()`](crate::EventBus::stats).
385#[derive(Debug, Clone)]
386pub struct BusStats {
387    /// Total number of active subscriptions across all event types.
388    pub total_subscriptions: usize,
389    /// Per-event-type listener details, keyed by the event type name.
390    pub subscriptions_by_event: HashMap<&'static str, Vec<HandlerInfo>>,
391    /// The type names of all event types that currently have at least one
392    /// registered listener.
393    pub registered_event_types: Vec<&'static str>,
394    /// The configured channel buffer capacity.
395    pub queue_capacity: usize,
396    /// Number of async handler tasks currently in flight.
397    pub in_flight_async: usize,
398    /// Whether [`EventBus::shutdown`](crate::EventBus::shutdown) has been called.
399    pub shutdown_called: bool,
400}
401
402/// Internal configuration for the event bus runtime.
403#[derive(Debug, Clone)]
404pub(crate) struct BusConfig {
405    pub buffer_size: usize,
406    pub handler_timeout: Option<Duration>,
407    pub max_concurrent_async: Option<usize>,
408    pub default_subscription_policy: SubscriptionPolicy,
409    pub shutdown_timeout: Option<Duration>,
410}
411
412impl Default for BusConfig {
413    fn default() -> Self {
414        Self {
415            buffer_size: 256,
416            handler_timeout: None,
417            max_concurrent_async: None,
418            default_subscription_policy: SubscriptionPolicy::default(),
419            shutdown_timeout: None,
420        }
421    }
422}