jaeb 0.3.7

simple snapshot-driven event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

/// Unique identifier for a listener or middleware registration.
///
/// Assigned by the bus at subscription time and remains stable for the
/// lifetime of the registration. Obtain it from
/// [`Subscription::id`](crate::Subscription::id) or
/// [`SubscriptionGuard::id`](crate::SubscriptionGuard::id).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub(crate) u64);

impl SubscriptionId {
    /// Return the raw numeric value of the identifier.
    ///
    /// Useful for logging and tracing; do not rely on the magnitude or
    /// sequence of values.
    pub const fn as_u64(self) -> u64 {
        self.0
    }
}

impl fmt::Display for SubscriptionId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.0)
    }
}

/// Strategy for computing the delay between retry attempts.
///
/// Used by [`SubscriptionPolicy`] to control back-off behaviour when a handler
/// fails and is eligible for retry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryStrategy {
    /// Wait a fixed duration between each retry.
    Fixed(Duration),

    /// Exponential back-off: the delay doubles on each attempt, capped at
    /// `max`.
    ///
    /// The delay for attempt *n* (0-indexed) is `min(base * 2^n, max)`.
    Exponential {
        /// Initial delay (attempt 0).
        base: Duration,
        /// Upper bound on the delay.
        max: Duration,
    },

    /// Exponential back-off with random jitter added to each delay.
    ///
    /// The computed delay is `rand(0 ..= min(base * 2^n, max))`.
    ///
    /// Jitter is derived from [`SystemTime`] nanoseconds
    /// — lightweight but **not** cryptographically random.
    ExponentialWithJitter {
        /// Initial delay (attempt 0).
        base: Duration,
        /// Upper bound on the delay before jitter.
        max: Duration,
    },
}

impl RetryStrategy {
    /// Compute the delay for the given zero-based retry attempt.
    pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
        match *self {
            RetryStrategy::Fixed(d) => d,
            RetryStrategy::Exponential { base, max } => {
                let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
                let factor = u32::try_from(factor).unwrap_or(u32::MAX);
                let delay = base.saturating_mul(factor);
                if delay > max { max } else { delay }
            }
            RetryStrategy::ExponentialWithJitter { base, max } => {
                let factor = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
                let factor = u32::try_from(factor).unwrap_or(u32::MAX);
                let delay = base.saturating_mul(factor);
                let capped = if delay > max { max } else { delay };
                // Simple jitter: pick a random fraction of the capped delay.
                // We use a lightweight approach without pulling in the `rand` crate.
                let nanos = capped.as_nanos() as u64;
                if nanos == 0 {
                    Duration::ZERO
                } else {
                    // Use the current time's nanoseconds as a cheap entropy source.
                    let entropy = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos() as u64;
                    let jittered = entropy % (nanos + 1);
                    Duration::from_nanos(jittered)
                }
            }
        }
    }
}

/// Policy controlling how a subscription is scheduled and how failures are treated.
///
/// - `priority`: listener ordering hint. Higher values are dispatched first
///   within the same dispatch lane (sync or async). Equal priorities keep
///   FIFO registration order.
/// - `max_retries`: how many *additional* attempts after the first failure
///   (0 means no retries). **Only supported for async handlers.** Sync
///   handlers must use [`SyncSubscriptionPolicy`] instead; attempting to pass a
///   `SubscriptionPolicy` to a sync handler via
///   [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy) is a
///   compile-time error.
/// - `retry_strategy`: optional [`RetryStrategy`] controlling the delay
///   between retries. When `None`, retries happen immediately. Ignored when
///   `max_retries` is 0. Only applies to async handlers.
/// - `dead_letter`: whether a [`DeadLetter`] event is emitted after all
///   attempts are exhausted (or on first failure for sync handlers).
///   Automatically forced to `false` for dead-letter listeners to prevent
///   infinite recursion.
///
/// All fields are public for convenience; invalid combinations (e.g.
/// `retry_strategy` set with `max_retries: 0`) are harmless but have no effect.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubscriptionPolicy {
    /// Listener ordering hint (higher runs first).
    ///
    /// Applied independently within sync and async lanes.
    pub priority: i32,
    /// Number of additional attempts after the first failure (0 = no retries).
    ///
    /// Only honoured for async handlers. Sync handlers always behave as if
    /// this is `0`.
    pub max_retries: usize,
    /// Optional back-off strategy between retry attempts.
    ///
    /// `None` means retry immediately. Ignored when `max_retries` is `0`.
    pub retry_strategy: Option<RetryStrategy>,
    /// Emit a [`DeadLetter`] event after all attempts are exhausted.
    ///
    /// Automatically forced to `false` for dead-letter listeners.
    pub dead_letter: bool,
}

impl Default for SubscriptionPolicy {
    fn default() -> Self {
        Self {
            priority: 0,
            max_retries: 0,
            retry_strategy: None,
            dead_letter: true,
        }
    }
}

impl SubscriptionPolicy {
    /// Set listener priority (builder-style).
    ///
    /// Higher values are dispatched first within each lane.
    pub const fn with_priority(mut self, priority: i32) -> Self {
        self.priority = priority;
        self
    }

    /// Set the maximum number of retries (builder-style).
    ///
    /// `0` disables retries. Only applicable to async handlers.
    pub const fn with_max_retries(mut self, max_retries: usize) -> Self {
        self.max_retries = max_retries;
        self
    }

    /// Set a custom [`RetryStrategy`].
    pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
        self.retry_strategy = Some(strategy);
        self
    }

    /// Enable or disable dead-letter emission for this policy (builder-style).
    pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
        self.dead_letter = dead_letter;
        self
    }
}

/// Subscription policy for handlers that do not support retries.
///
/// This type is accepted by [`subscribe_with_policy`](crate::EventBus::subscribe_with_policy)
/// for sync handlers and by [`subscribe_once_with_policy`](crate::EventBus::subscribe_once_with_policy)
/// for all handler types. It contains only the `dead_letter` flag since
/// retry-related fields (`max_retries`, `retry_strategy`) are not applicable.
///
/// Use [`SubscriptionPolicy`] instead when subscribing async handlers that need
/// retry support.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SyncSubscriptionPolicy {
    /// Listener ordering hint (higher runs first).
    pub priority: i32,
    /// Emit a [`DeadLetter`] event on failure.
    ///
    /// Automatically forced to `false` for dead-letter listeners.
    pub dead_letter: bool,
}

impl Default for SyncSubscriptionPolicy {
    fn default() -> Self {
        Self {
            priority: 0,
            dead_letter: true,
        }
    }
}

impl SyncSubscriptionPolicy {
    /// Set listener priority (builder-style).
    pub const fn with_priority(mut self, priority: i32) -> Self {
        self.priority = priority;
        self
    }

    /// Enable or disable dead-letter emission for this policy (builder-style).
    pub const fn with_dead_letter(mut self, dead_letter: bool) -> Self {
        self.dead_letter = dead_letter;
        self
    }
}

impl From<SyncSubscriptionPolicy> for SubscriptionPolicy {
    fn from(policy: SyncSubscriptionPolicy) -> SubscriptionPolicy {
        SubscriptionPolicy {
            priority: policy.priority,
            max_retries: 0,
            retry_strategy: None,
            dead_letter: policy.dead_letter,
        }
    }
}

// ── IntoSubscriptionPolicy ───────────────────────────────────────────────────

mod sealed {
    pub trait Sealed {}
    impl Sealed for super::SubscriptionPolicy {}
    impl Sealed for super::SyncSubscriptionPolicy {}
}

/// Trait that converts a policy type into a [`SubscriptionPolicy`] suitable for
/// the
/// handler's dispatch mode.
///
/// This trait is **sealed** — it cannot be implemented outside this crate.
///
/// The marker type `M` ([`AsyncMode`](crate::handler::AsyncMode),
/// [`SyncMode`](crate::handler::SyncMode),
/// [`AsyncFnMode`](crate::handler::AsyncFnMode), or
/// [`SyncFnMode`](crate::handler::SyncFnMode)) is inferred from the handler via
/// [`IntoHandler<E, M>`](crate::handler::IntoHandler), so callers never need
/// to specify it explicitly. The type system enforces:
///
/// - **Async handlers** accept both [`SubscriptionPolicy`] (full retry
///   support) and [`SyncSubscriptionPolicy`] (dead-letter only, no retries).
/// - **Sync handlers** accept only [`SyncSubscriptionPolicy`]. Passing a
///   [`SubscriptionPolicy`] to a sync handler is a compile-time error.
pub trait IntoSubscriptionPolicy<M>: sealed::Sealed {
    /// Convert into the internal [`SubscriptionPolicy`] representation.
    fn into_subscription_policy(self) -> SubscriptionPolicy;
}

#[allow(dead_code)]
#[deprecated(since = "0.3.3", note = "renamed to SubscriptionPolicy")]
pub type FailurePolicy = SubscriptionPolicy;

#[allow(dead_code)]
#[deprecated(since = "0.3.3", note = "renamed to SyncSubscriptionPolicy")]
pub type NoRetryPolicy = SyncSubscriptionPolicy;

#[allow(unused_imports)]
#[deprecated(since = "0.3.3", note = "renamed to IntoSubscriptionPolicy")]
pub use IntoSubscriptionPolicy as IntoFailurePolicy;

impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self
    }
}

impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self
    }
}

impl IntoSubscriptionPolicy<crate::handler::AsyncMode> for SyncSubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self.into()
    }
}

impl IntoSubscriptionPolicy<crate::handler::AsyncFnMode> for SyncSubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self.into()
    }
}

impl IntoSubscriptionPolicy<crate::handler::SyncMode> for SyncSubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self.into()
    }
}

impl IntoSubscriptionPolicy<crate::handler::SyncFnMode> for SyncSubscriptionPolicy {
    fn into_subscription_policy(self) -> SubscriptionPolicy {
        self.into()
    }
}

/// A dead-letter record emitted when a handler exhausts all retry attempts.
///
/// The original error is stored as a [`String`] rather than a typed error
/// because `DeadLetter` must be `Clone` (it is published as an event) and
/// `Box<dyn Error>` does not implement `Clone`. Use [`error`](Self::error)
/// for diagnostics or pattern-match on the stringified message.
///
/// The original event payload is available via [`event`](Self::event) as a
/// type-erased `Arc`. Consumers can call
/// `dead_letter.event.downcast_ref::<OriginalEvent>()` to inspect the
/// payload that caused the failure.
#[derive(Clone, Debug)]
pub struct DeadLetter {
    /// The `type_name` of the event that failed.
    pub event_name: &'static str,
    /// The subscription that failed to handle the event.
    pub subscription_id: SubscriptionId,
    /// Total number of attempts (initial + retries).
    pub attempts: usize,
    /// Stringified error from the last failed attempt.
    pub error: String,
    /// The original event payload, type-erased.
    ///
    /// Use `downcast_ref::<E>()` to recover the concrete event type.
    pub event: Arc<dyn Any + Send + Sync>,
    /// When the terminal failure occurred.
    pub failed_at: SystemTime,
    /// Human-readable name of the handler that failed, if provided.
    pub handler_name: Option<&'static str>,
}

impl DeadLetter {
    /// Deprecated accessor for `handler_name`.
    #[deprecated(since = "0.3.6", note = "renamed to handler_name")]
    pub fn listener_name(&self) -> Option<&'static str> {
        self.handler_name
    }
}

/// Marker trait for all publishable event types.
///
/// Any type that is `Send + Sync + 'static` automatically implements `Event`
/// via a blanket implementation, so no manual implementation is required.
/// Async handlers additionally require `E: Clone`.
///
/// # Examples
///
/// ```
/// // No derive or impl needed — the blanket impl does it automatically.
/// #[derive(Clone)]
/// struct OrderPlaced { pub order_id: u64 }
/// // OrderPlaced: Event is satisfied automatically.
/// ```
pub trait Event: Send + Sync + 'static {}
impl<T: Send + Sync + 'static> Event for T {}

/// Information about a single registered handler, as reported by
/// [`BusStats`].
#[derive(Debug, Clone)]
pub struct HandlerInfo {
    /// The unique subscription identifier.
    pub subscription_id: SubscriptionId,
    /// Human-readable name, if the handler provides one.
    pub name: Option<&'static str>,
}

#[allow(dead_code)]
#[deprecated(since = "0.3.6", note = "renamed to HandlerInfo")]
pub type ListenerInfo = HandlerInfo;

/// A point-in-time snapshot of the event bus internal state.
///
/// Obtained via [`EventBus::stats()`](crate::EventBus::stats).
#[derive(Debug, Clone)]
pub struct BusStats {
    /// Total number of active subscriptions across all event types.
    pub total_subscriptions: usize,
    /// Per-event-type listener details, keyed by the event type name.
    pub subscriptions_by_event: HashMap<&'static str, Vec<HandlerInfo>>,
    /// The type names of all event types that currently have at least one
    /// registered listener.
    pub registered_event_types: Vec<&'static str>,
    /// The configured channel buffer capacity.
    pub queue_capacity: usize,
    /// Number of currently available publish permits in the internal semaphore.
    pub publish_permits_available: usize,
    /// Number of currently occupied publish permits.
    pub publish_in_flight: usize,
    /// Number of async handler tasks currently in flight.
    pub in_flight_async: usize,
    /// Whether [`EventBus::shutdown`](crate::EventBus::shutdown) has been called.
    pub shutdown_called: bool,
}

/// Internal configuration for the event bus runtime.
#[derive(Debug, Clone)]
pub(crate) struct BusConfig {
    pub buffer_size: usize,
    pub handler_timeout: Option<Duration>,
    pub max_concurrent_async: Option<usize>,
    pub default_subscription_policy: SubscriptionPolicy,
    pub shutdown_timeout: Option<Duration>,
}

impl Default for BusConfig {
    fn default() -> Self {
        Self {
            buffer_size: 256,
            handler_timeout: None,
            max_concurrent_async: None,
            default_subscription_policy: SubscriptionPolicy::default(),
            shutdown_timeout: None,
        }
    }
}