jaeb 0.3.2

simple snapshot-driven event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
//! Tests for the middleware / interceptor pipeline.

use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use jaeb::{
    EventBus, EventBusError, HandlerResult, Middleware, MiddlewareDecision, SyncEventHandler, SyncMiddleware, TypedMiddleware, TypedSyncMiddleware,
};

#[derive(Clone)]
struct Ping;

#[derive(Clone)]
struct Pong;

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

struct AllowAll;
impl SyncMiddleware for AllowAll {
    fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
        MiddlewareDecision::Continue
    }
}

struct RejectAll(String);
impl SyncMiddleware for RejectAll {
    fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
        MiddlewareDecision::Reject(self.0.clone())
    }
}

struct Counter(Arc<AtomicUsize>);

impl SyncEventHandler<Ping> for Counter {
    fn handle(&self, _event: &Ping) -> HandlerResult {
        self.0.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

struct PongCounter(Arc<AtomicUsize>);

impl SyncEventHandler<Pong> for PongCounter {
    fn handle(&self, _event: &Pong) -> HandlerResult {
        self.0.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

struct OrderTracker {
    id: &'static str,
    log: Arc<std::sync::Mutex<Vec<&'static str>>>,
}

impl SyncMiddleware for OrderTracker {
    fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
        self.log.lock().unwrap().push(self.id);
        MiddlewareDecision::Continue
    }
}

struct AsyncOrderTracker {
    id: &'static str,
    log: Arc<std::sync::Mutex<Vec<&'static str>>>,
}

impl Middleware for AsyncOrderTracker {
    async fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
        self.log.lock().unwrap().push(self.id);
        MiddlewareDecision::Continue
    }
}

struct TypedPingCounter(Arc<AtomicUsize>);

impl TypedSyncMiddleware<Ping> for TypedPingCounter {
    fn process(&self, _event_name: &'static str, _event: &Ping) -> MiddlewareDecision {
        self.0.fetch_add(1, Ordering::SeqCst);
        MiddlewareDecision::Continue
    }
}

struct TypedSyncReject(&'static str);

impl TypedSyncMiddleware<Ping> for TypedSyncReject {
    fn process(&self, _event_name: &'static str, _event: &Ping) -> MiddlewareDecision {
        MiddlewareDecision::Reject(self.0.to_string())
    }
}

struct TypedAsyncReject(&'static str);

impl TypedMiddleware<Ping> for TypedAsyncReject {
    async fn process(&self, _event_name: &'static str, _event: &Ping) -> MiddlewareDecision {
        MiddlewareDecision::Reject(self.0.to_string())
    }
}

struct TypedSyncOrderTracker {
    id: &'static str,
    log: Arc<std::sync::Mutex<Vec<&'static str>>>,
}

impl TypedSyncMiddleware<Ping> for TypedSyncOrderTracker {
    fn process(&self, _event_name: &'static str, _event: &Ping) -> MiddlewareDecision {
        self.log.lock().unwrap().push(self.id);
        MiddlewareDecision::Continue
    }
}

struct TypedAsyncOrderTracker {
    id: &'static str,
    log: Arc<std::sync::Mutex<Vec<&'static str>>>,
}

impl TypedMiddleware<Ping> for TypedAsyncOrderTracker {
    async fn process(&self, _event_name: &'static str, _event: &Ping) -> MiddlewareDecision {
        self.log.lock().unwrap().push(self.id);
        MiddlewareDecision::Continue
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[tokio::test]
async fn middleware_continues_handlers_fire() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _mw = bus.add_sync_middleware(AllowAll).await.expect("add middleware");
    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    bus.publish(Ping).await.expect("publish should succeed");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(count.load(Ordering::SeqCst), 1, "handler should have fired");
}

#[tokio::test]
async fn middleware_rejects_handlers_do_not_fire() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _mw = bus.add_sync_middleware(RejectAll("blocked".into())).await.expect("add middleware");
    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    let err = bus.publish(Ping).await.unwrap_err();
    assert!(
        matches!(err, EventBusError::MiddlewareRejected(ref reason) if reason == "blocked"),
        "expected MiddlewareRejected, got: {err:?}"
    );

    bus.shutdown().await.expect("shutdown");
    assert_eq!(count.load(Ordering::SeqCst), 0, "handler should NOT have fired");
}

#[tokio::test]
async fn middleware_ordering_fifo() {
    let bus = EventBus::new(64).expect("valid config");
    let log = Arc::new(std::sync::Mutex::new(Vec::<&str>::new()));
    let count = Arc::new(AtomicUsize::new(0));

    let _mw1 = bus
        .add_sync_middleware(OrderTracker {
            id: "first",
            log: Arc::clone(&log),
        })
        .await
        .expect("add middleware 1");

    let _mw2 = bus
        .add_sync_middleware(OrderTracker {
            id: "second",
            log: Arc::clone(&log),
        })
        .await
        .expect("add middleware 2");

    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    bus.publish(Ping).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    let entries = log.lock().unwrap().clone();
    assert_eq!(entries, vec!["first", "second"], "middlewares should execute in FIFO order");
    assert_eq!(count.load(Ordering::SeqCst), 1, "handler should have fired");
}

#[tokio::test]
async fn middleware_removal() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let mw_sub = bus.add_sync_middleware(RejectAll("blocked".into())).await.expect("add middleware");
    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    // Should be rejected.
    let err = bus.publish(Ping).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(_)));

    // Remove the middleware.
    let removed = mw_sub.unsubscribe().await.expect("unsubscribe middleware");
    assert!(removed, "middleware should have been found and removed");

    // Now publish should succeed.
    bus.publish(Ping).await.expect("publish after removal");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(count.load(Ordering::SeqCst), 1, "handler should fire after middleware removed");
}

#[tokio::test]
async fn middleware_with_downcast() {
    #[derive(Clone)]
    struct ImportantEvent(u32);

    #[derive(Clone)]
    struct IgnoredEvent;

    struct OnlyAllowImportant;
    impl SyncMiddleware for OnlyAllowImportant {
        fn process(&self, _name: &'static str, event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
            if let Some(e) = event.downcast_ref::<ImportantEvent>()
                && e.0 > 10
            {
                return MiddlewareDecision::Reject("value too high".into());
            }
            MiddlewareDecision::Continue
        }
    }

    let bus = EventBus::new(64).expect("valid config");
    let _mw = bus.add_sync_middleware(OnlyAllowImportant).await.expect("add middleware");

    // ImportantEvent(5) should pass.
    bus.publish(ImportantEvent(5)).await.expect("low value should pass");

    // ImportantEvent(20) should be rejected.
    let err = bus.publish(ImportantEvent(20)).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(ref r) if r == "value too high"));

    // IgnoredEvent should pass (not an ImportantEvent, middleware says Continue).
    bus.publish(IgnoredEvent).await.expect("ignored event should pass");

    bus.shutdown().await.expect("shutdown");
}

#[tokio::test]
async fn async_middleware_works() {
    struct AsyncAllow;
    impl Middleware for AsyncAllow {
        async fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
            // Simulate async work
            tokio::task::yield_now().await;
            MiddlewareDecision::Continue
        }
    }

    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _mw = bus.add_middleware(AsyncAllow).await.expect("add async middleware");
    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    bus.publish(Ping).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(count.load(Ordering::SeqCst), 1, "handler should have fired through async middleware");
}

#[tokio::test]
async fn async_middleware_rejects() {
    struct AsyncReject;
    impl Middleware for AsyncReject {
        async fn process(&self, _name: &'static str, _event: &(dyn Any + Send + Sync)) -> MiddlewareDecision {
            MiddlewareDecision::Reject("async rejection".into())
        }
    }

    let bus = EventBus::new(64).expect("valid config");
    let _mw = bus.add_middleware(AsyncReject).await.expect("add async middleware");

    let err = bus.publish(Ping).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(ref r) if r == "async rejection"));

    bus.shutdown().await.expect("shutdown");
}

#[tokio::test]
async fn typed_middleware_runs_for_matching_event_only() {
    let bus = EventBus::new(64).expect("valid config");
    let typed_hits = Arc::new(AtomicUsize::new(0));
    let ping_count = Arc::new(AtomicUsize::new(0));
    let pong_count = Arc::new(AtomicUsize::new(0));

    let _typed = bus
        .add_typed_sync_middleware::<Ping, _>(TypedPingCounter(Arc::clone(&typed_hits)))
        .await
        .expect("add typed middleware");

    let _ping = bus
        .subscribe::<Ping, _, _>(Counter(Arc::clone(&ping_count)))
        .await
        .expect("subscribe ping");
    let _pong = bus
        .subscribe::<Pong, _, _>(PongCounter(Arc::clone(&pong_count)))
        .await
        .expect("subscribe pong");

    bus.publish(Ping).await.expect("publish ping");
    bus.publish(Pong).await.expect("publish pong");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(typed_hits.load(Ordering::SeqCst), 1, "typed middleware should only run for Ping");
    assert_eq!(ping_count.load(Ordering::SeqCst), 1, "Ping handler should fire");
    assert_eq!(pong_count.load(Ordering::SeqCst), 1, "Pong handler should fire");
}

#[tokio::test]
async fn typed_sync_middleware_rejects_before_handlers() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _typed = bus
        .add_typed_sync_middleware::<Ping, _>(TypedSyncReject("typed sync reject"))
        .await
        .expect("add typed middleware");

    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    let err = bus.publish(Ping).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(ref r) if r == "typed sync reject"));

    bus.shutdown().await.expect("shutdown");
    assert_eq!(count.load(Ordering::SeqCst), 0, "handler should not fire after typed sync rejection");
}

#[tokio::test]
async fn typed_async_middleware_rejects_before_handlers() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _typed = bus
        .add_typed_middleware::<Ping, _>(TypedAsyncReject("typed async reject"))
        .await
        .expect("add typed middleware");

    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    let err = bus.publish(Ping).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(ref r) if r == "typed async reject"));

    bus.shutdown().await.expect("shutdown");
    assert_eq!(count.load(Ordering::SeqCst), 0, "handler should not fire after typed async rejection");
}

#[tokio::test]
async fn typed_middleware_runs_after_global_middleware_in_order() {
    let bus = EventBus::new(64).expect("valid config");
    let log = Arc::new(std::sync::Mutex::new(Vec::<&str>::new()));

    let _global_sync = bus
        .add_sync_middleware(OrderTracker {
            id: "global-sync",
            log: Arc::clone(&log),
        })
        .await
        .expect("add global sync middleware");

    let _global_async = bus
        .add_middleware(AsyncOrderTracker {
            id: "global-async",
            log: Arc::clone(&log),
        })
        .await
        .expect("add global async middleware");

    let _typed_sync = bus
        .add_typed_sync_middleware::<Ping, _>(TypedSyncOrderTracker {
            id: "typed-sync",
            log: Arc::clone(&log),
        })
        .await
        .expect("add typed sync middleware");

    let _typed_async = bus
        .add_typed_middleware::<Ping, _>(TypedAsyncOrderTracker {
            id: "typed-async",
            log: Arc::clone(&log),
        })
        .await
        .expect("add typed async middleware");

    let handler_log = Arc::clone(&log);
    let _sub = bus
        .subscribe::<Ping, _, _>(move |_event: &Ping| {
            handler_log.lock().unwrap().push("handler");
            Ok(())
        })
        .await
        .expect("subscribe handler");

    bus.publish(Ping).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    let entries = log.lock().unwrap().clone();
    assert_eq!(entries, vec!["global-sync", "global-async", "typed-sync", "typed-async", "handler"]);
}

#[tokio::test]
async fn typed_middleware_ordering_fifo() {
    let bus = EventBus::new(64).expect("valid config");
    let log = Arc::new(std::sync::Mutex::new(Vec::<&str>::new()));

    let _typed_a = bus
        .add_typed_sync_middleware::<Ping, _>(TypedSyncOrderTracker {
            id: "typed-a",
            log: Arc::clone(&log),
        })
        .await
        .expect("add typed middleware a");

    let _typed_b = bus
        .add_typed_sync_middleware::<Ping, _>(TypedSyncOrderTracker {
            id: "typed-b",
            log: Arc::clone(&log),
        })
        .await
        .expect("add typed middleware b");

    let handler_log = Arc::clone(&log);
    let _sub = bus
        .subscribe::<Ping, _, _>(move |_event: &Ping| {
            handler_log.lock().unwrap().push("handler");
            Ok(())
        })
        .await
        .expect("subscribe handler");

    bus.publish(Ping).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    let entries = log.lock().unwrap().clone();
    assert_eq!(entries, vec!["typed-a", "typed-b", "handler"], "typed middlewares should run FIFO");
}

#[tokio::test]
async fn typed_middleware_removal() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let typed_sub = bus
        .add_typed_sync_middleware::<Ping, _>(TypedSyncReject("typed blocked"))
        .await
        .expect("add typed middleware");

    let _sub = bus.subscribe::<Ping, _, _>(Counter(Arc::clone(&count))).await.expect("subscribe");

    let err = bus.publish(Ping).await.unwrap_err();
    assert!(matches!(err, EventBusError::MiddlewareRejected(ref r) if r == "typed blocked"));

    let removed = typed_sub.unsubscribe().await.expect("unsubscribe typed middleware");
    assert!(removed, "typed middleware should be removed");

    bus.publish(Ping).await.expect("publish after typed middleware removal");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(count.load(Ordering::SeqCst), 1, "handler should fire once middleware is removed");
}