jaeb/types.rs
1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7/// Unique identifier for a listener or middleware registration.
8///
9/// Assigned by the bus at subscription time and remains stable for the
10/// lifetime of the registration. Obtain it from
11/// [`Subscription::id`](crate::Subscription::id) or
12/// [`SubscriptionGuard::id`](crate::SubscriptionGuard::id).
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14pub struct SubscriptionId(pub(crate) u64);
15
16impl SubscriptionId {
17 /// Return the raw numeric value of the identifier.
18 ///
19 /// Useful for logging and tracing; do not rely on the magnitude or
20 /// sequence of values.
21 pub const fn as_u64(self) -> u64 {
22 self.0
23 }
24}
25
26impl fmt::Display for SubscriptionId {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 write!(f, "{}", self.0)
29 }
30}
31
32/// Strategy for computing the delay between retry attempts.
33///
34/// Used by [`SubscriptionPolicy`] to control back-off behaviour when a handler
35/// fails and is eligible for retry.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum RetryStrategy {
38 /// Wait a fixed duration between each retry.
39 Fixed(Duration),
40
41 /// Exponential back-off: the delay doubles on each attempt, capped at
42 /// `max`.
43 ///
44 /// The delay for attempt *n* (0-indexed) is `min(base * 2^n, max)`.
45 Exponential {
46 /// Initial delay (attempt 0).
47 base: Duration,
48 /// Upper bound on the delay.
49 max: Duration,
50 },
51
52 /// Exponential back-off with random jitter added to each delay.
53 ///
54 /// The computed delay is `rand(0 ..= min(base * 2^n, max))`.
55 ///
56 /// Jitter is derived from [`SystemTime`] nanoseconds
57 /// — lightweight but **not** cryptographically random.
58 ExponentialWithJitter {
59 /// Initial delay (attempt 0).
60 base: Duration,
61 /// Upper bound on the delay before jitter.
62 max: Duration,
63 },
64}
65
66impl RetryStrategy {
67 /// Compute the delay for the given zero-based retry attempt.
68 pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
69 match *self {
70 RetryStrategy::Fixed(d) => d,
71 RetryStrategy::Exponential { base, max } => {
72 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
73 let factor = u32::try_from(factor).unwrap_or(u32::MAX);
74 let delay = base.saturating_mul(factor);
75 if delay > max { max } else { delay }
76 }
77 RetryStrategy::ExponentialWithJitter { base, max } => {
78 let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
79 let factor = u32::try_from(factor).unwrap_or(u32::MAX);
80 let delay = base.saturating_mul(factor);
81 let capped = if delay > max { max } else { delay };
82 // Simple jitter: pick a random fraction of the capped delay.
83 // We use a lightweight approach without pulling in the `rand` crate.
84 let nanos = capped.as_nanos() as u64;
85 if nanos == 0 {
86 Duration::ZERO
87 } else {
88 // Use the current time's nanoseconds as a cheap entropy source.
89 let entropy = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos() as u64;
90 let jittered = entropy % (nanos + 1);
91 Duration::from_nanos(jittered)
92 }
93 }
94 }
95 }
96}
97
98/// Policy controlling how a subscription is scheduled and how failures are treated.
99///
100/// - `priority`: listener ordering hint. Higher values are dispatched first
101/// within the same dispatch lane (sync or async). Equal priorities keep
102/// FIFO registration order.
103/// - `max_retries`: how many *additional* attempts after the first failure
104/// (0 means no retries). **Only supported for async handlers.** Sync
105/// handlers must use [`SyncSubscriptionPolicy`] instead; attempting to pass a
106/// `SubscriptionPolicy` to a sync handler via
107/// [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy) is a
108/// compile-time error.
109/// - `retry_strategy`: optional [`RetryStrategy`] controlling the delay
110/// between retries. When `None`, retries happen immediately. Ignored when
111/// `max_retries` is 0. Only applies to async handlers.
112/// - `dead_letter`: whether a [`DeadLetter`] event is emitted after all
113/// attempts are exhausted (or on first failure for sync handlers).
114/// Automatically forced to `false` for dead-letter listeners to prevent
115/// infinite recursion.
116///
117/// All fields are public for convenience; invalid combinations (e.g.
118/// `retry_strategy` set with `max_retries: 0`) are harmless but have no effect.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct SubscriptionPolicy {
121 /// Listener ordering hint (higher runs first).
122 ///
123 /// Applied independently within sync and async lanes.
124 pub priority: i32,
125 /// Number of additional attempts after the first failure (0 = no retries).
126 ///
127 /// Only honoured for async handlers. Sync handlers always behave as if
128 /// this is `0`.
129 pub max_retries: usize,
130 /// Optional back-off strategy between retry attempts.
131 ///
132 /// `None` means retry immediately. Ignored when `max_retries` is `0`.
133 pub retry_strategy: Option<RetryStrategy>,
134 /// Emit a [`DeadLetter`] event after all attempts are exhausted.
135 ///
136 /// Automatically forced to `false` for dead-letter listeners.
137 pub dead_letter: bool,
138}
139
140impl Default for SubscriptionPolicy {
141 fn default() -> Self {
142 Self {
143 priority: 0,
144 max_retries: 0,
145 retry_strategy: None,
146 dead_letter: true,
147 }
148 }
149}
150
151impl SubscriptionPolicy {
152 /// Set listener priority (builder-style).
153 ///
154 /// Higher values are dispatched first within each lane.
155 pub const fn with_priority(mut self, priority: i32) -> Self {
156 self.priority = priority;
157 self
158 }
159
160 /// Set the maximum number of retries (builder-style).
161 ///
162 /// `0` disables retries. Only applicable to async handlers.
163 pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
164 self.max_retries = max_retries;
165 self
166 }
167
168 /// Set a custom [`RetryStrategy`].
169 pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
170 self.retry_strategy = Some(strategy);
171 self
172 }
173
174 /// Enable or disable dead-letter emission for this policy (builder-style).
175 pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
176 self.dead_letter = dead_letter;
177 self
178 }
179}
180
181/// Subscription policy for handlers that do not support retries.
182///
183/// This type is accepted by [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy)
184/// for sync handlers and by [`subscribe_once_with_policy`](crate::EventBus::subscribe_once_with_policy)
185/// for all handler types. It contains only the `dead_letter` flag since
186/// retry-related fields (`max_retries`, `retry_strategy`) are not applicable.
187///
188/// Use [`SubscriptionPolicy`] instead when subscribing async handlers that need
189/// retry support.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct SyncSubscriptionPolicy {
192 /// Listener ordering hint (higher runs first).
193 pub priority: i32,
194 /// Emit a [`DeadLetter`] event on failure.
195 ///
196 /// Automatically forced to `false` for dead-letter listeners.
197 pub dead_letter: bool,
198}
199
200impl Default for SyncSubscriptionPolicy {
201 fn default() -> Self {
202 Self {
203 priority: 0,
204 dead_letter: true,
205 }
206 }
207}
208
209impl SyncSubscriptionPolicy {
210 /// Set listener priority (builder-style).
211 pub const fn with_priority(mut self, priority: i32) -> Self {
212 self.priority = priority;
213 self
214 }
215
216 /// Enable or disable dead-letter emission for this policy (builder-style).
217 pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
218 self.dead_letter = dead_letter;
219 self
220 }
221}
222
223impl From<SyncSubscriptionPolicy> for SubscriptionPolicy {
224 fn from(policy: SyncSubscriptionPolicy) -> SubscriptionPolicy {
225 SubscriptionPolicy {
226 priority: policy.priority,
227 max_retries: 0,
228 retry_strategy: None,
229 dead_letter: policy.dead_letter,
230 }
231 }
232}
233
234// ── IntoSubscriptionPolicy ───────────────────────────────────────────────────
235
236mod sealed {
237 pub trait Sealed {}
238 impl Sealed for super::SubscriptionPolicy {}
239 impl Sealed for super::SyncSubscriptionPolicy {}
240}
241
242/// Trait that converts a policy type into a [`SubscriptionPolicy`] suitable for
243/// the
244/// handler's dispatch mode.
245///
246/// This trait is **sealed** — it cannot be implemented outside this crate.
247///
248/// The marker type `M` ([`AsyncMode`](crate::handler::AsyncMode),
249/// [`SyncMode`](crate::handler::SyncMode),
250/// [`AsyncFnMode`](crate::handler::AsyncFnMode), or
251/// [`SyncFnMode`](crate::handler::SyncFnMode)) is inferred from the handler via
252/// [`IntoHandler<E, M>`](crate::handler::IntoHandler), so callers never need
253/// to specify it explicitly. The type system enforces:
254///
255/// - **Async handlers** accept both [`SubscriptionPolicy`] (full retry
256/// support) and [`SyncSubscriptionPolicy`] (dead-letter only, no retries).
257/// - **Sync handlers** accept only [`SyncSubscriptionPolicy`]. Passing a
258/// [`SubscriptionPolicy`] to a sync handler is a compile-time error.
259pub trait IntoSubscriptionPolicy<M>: sealed::Sealed {
260 /// Convert into the internal [`SubscriptionPolicy`] representation.
261 fn into_subscription_policy(self) -> SubscriptionPolicy;
262}
263
264#[allow(dead_code)]
265#[deprecated(since = "0.3.3", note = "renamed to SubscriptionPolicy")]
266pub type FailurePolicy = SubscriptionPolicy;
267
268#[allow(dead_code)]
269#[deprecated(since = "0.3.3", note = "renamed to SyncSubscriptionPolicy")]
270pub type NoRetryPolicy = SyncSubscriptionPolicy;
271
272#[allow(unused_imports)]
273#[deprecated(since = "0.3.3", note = "renamed to IntoSubscriptionPolicy")]
274pub use IntoSubscriptionPolicy as IntoFailurePolicy;
275
276impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SubscriptionPolicy {
277 fn into_subscription_policy(self) -> SubscriptionPolicy {
278 self
279 }
280}
281
282impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SubscriptionPolicy {
283 fn into_subscription_policy(self) -> SubscriptionPolicy {
284 self
285 }
286}
287
288impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SyncSubscriptionPolicy {
289 fn into_subscription_policy(self) -> SubscriptionPolicy {
290 self.into()
291 }
292}
293
294impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SyncSubscriptionPolicy {
295 fn into_subscription_policy(self) -> SubscriptionPolicy {
296 self.into()
297 }
298}
299
300impl IntoSubscriptionPolicy<crate::handler::SyncMode> for SyncSubscriptionPolicy {
301 fn into_subscription_policy(self) -> SubscriptionPolicy {
302 self.into()
303 }
304}
305
306impl IntoSubscriptionPolicy<crate::handler::SyncFnMode> for SyncSubscriptionPolicy {
307 fn into_subscription_policy(self) -> SubscriptionPolicy {
308 self.into()
309 }
310}
311
312/// A dead-letter record emitted when a handler exhausts all retry attempts.
313///
314/// The original error is stored as a [`String`] rather than a typed error
315/// because `DeadLetter` must be `Clone` (it is published as an event) and
316/// `Box<dyn Error>` does not implement `Clone`. Use [`error`](Self::error)
317/// for diagnostics or pattern-match on the stringified message.
318///
319/// The original event payload is available via [`event`](Self::event) as a
320/// type-erased `Arc`. Consumers can call
321/// `dead_letter.event.downcast_ref::<OriginalEvent>()` to inspect the
322/// payload that caused the failure.
323#[derive(Clone, Debug)]
324pub struct DeadLetter {
325 /// The `type_name` of the event that failed.
326 pub event_name: &'static str,
327 /// The subscription that failed to handle the event.
328 pub subscription_id: SubscriptionId,
329 /// Total number of attempts (initial + retries).
330 pub attempts: usize,
331 /// Stringified error from the last failed attempt.
332 pub error: String,
333 /// The original event payload, type-erased.
334 ///
335 /// Use `downcast_ref::<E>()` to recover the concrete event type.
336 pub event: Arc<dyn Any + Send + Sync>,
337 /// When the terminal failure occurred.
338 pub failed_at: SystemTime,
339 /// Human-readable name of the handler that failed, if provided.
340 pub handler_name: Option<&'static str>,
341}
342
343impl DeadLetter {
344 /// Deprecated accessor for `handler_name`.
345 #[deprecated(since = "0.3.6", note = "renamed to handler_name")]
346 pub fn listener_name(&self) -> Option<&'static str> {
347 self.handler_name
348 }
349}
350
351/// Marker trait for all publishable event types.
352///
353/// Any type that is `Send + Sync + 'static` automatically implements `Event`
354/// via a blanket implementation, so no manual implementation is required.
355/// Async handlers additionally require `E: Clone`.
356///
357/// # Examples
358///
359/// ```
360/// // No derive or impl needed — the blanket impl does it automatically.
361/// #[derive(Clone)]
362/// struct OrderPlaced { pub order_id: u64 }
363/// // OrderPlaced: Event is satisfied automatically.
364/// ```
365pub trait Event: Send + Sync + 'static {}
366impl<T: Send + Sync + 'static> Event for T {}
367
368/// Information about a single registered handler, as reported by
369/// [`BusStats`].
370#[derive(Debug, Clone)]
371pub struct HandlerInfo {
372 /// The unique subscription identifier.
373 pub subscription_id: SubscriptionId,
374 /// Human-readable name, if the handler provides one.
375 pub name: Option<&'static str>,
376}
377
378#[allow(dead_code)]
379#[deprecated(since = "0.3.6", note = "renamed to HandlerInfo")]
380pub type ListenerInfo = HandlerInfo;
381
382/// A point-in-time snapshot of the event bus internal state.
383///
384/// Obtained via [`EventBus::stats()`](crate::EventBus::stats).
385#[derive(Debug, Clone)]
386pub struct BusStats {
387 /// Total number of active subscriptions across all event types.
388 pub total_subscriptions: usize,
389 /// Per-event-type listener details, keyed by the event type name.
390 pub subscriptions_by_event: HashMap<&'static str, Vec<HandlerInfo>>,
391 /// The type names of all event types that currently have at least one
392 /// registered listener.
393 pub registered_event_types: Vec<&'static str>,
394 /// The configured channel buffer capacity.
395 pub queue_capacity: usize,
396 /// Number of async handler tasks currently in flight.
397 pub in_flight_async: usize,
398 /// Whether [`EventBus::shutdown`](crate::EventBus::shutdown) has been called.
399 pub shutdown_called: bool,
400}
401
402/// Internal configuration for the event bus runtime.
403#[derive(Debug, Clone)]
404pub(crate) struct BusConfig {
405 pub buffer_size: usize,
406 pub handler_timeout: Option<Duration>,
407 pub max_concurrent_async: Option<usize>,
408 pub default_subscription_policy: SubscriptionPolicy,
409 pub shutdown_timeout: Option<Duration>,
410}
411
412impl Default for BusConfig {
413 fn default() -> Self {
414 Self {
415 buffer_size: 256,
416 handler_timeout: None,
417 max_concurrent_async: None,
418 default_subscription_policy: SubscriptionPolicy::default(),
419 shutdown_timeout: None,
420 }
421 }
422}