ruststream 0.5.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
//! Integration tests for the batch subscriber pipeline: the `#[subscriber(batch(..))]` form,
//! `include_batch` mounting, per-element decode failures, and the `Buffered` adapter.
#![cfg(feature = "macros")]

mod common;

use std::{
    sync::{
        Arc, LazyLock, Mutex,
        atomic::{AtomicBool, AtomicUsize, Ordering},
    },
    time::Duration,
};

use common::handler_signal;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, Router, RustStream, TypedPublisher};
use ruststream::testing::expect_published;
use ruststream::{Buffered, Name, OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;

#[derive(Debug, Serialize, Deserialize)]
struct Order {
    id: u32,
}

fn order_bytes(id: u32) -> Vec<u8> {
    serde_json::to_vec(&Order { id }).unwrap()
}

static BATCHES: Mutex<Vec<Vec<u32>>> = Mutex::new(Vec::new());
static BILL_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);

/// Settles a whole page of orders at once.
#[subscriber(batch("orders"))]
async fn bill(orders: &[Order]) -> HandlerResult {
    BATCHES
        .lock()
        .unwrap()
        .push(orders.iter().map(|o| o.id).collect());
    BILL_NOTIFY.notify_one();
    HandlerResult::Ack
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_macro_def_receives_batches() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();

    let app = RustStream::new(AppInfo::new("billing", "0.1.0"))
        .with_broker(broker, |b| b.include_batch(bill));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    // The subscription opens inside run(); retry publishing until deliveries land.
    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            for id in 0..3u32 {
                let _ = publisher
                    .publish(OutgoingMessage::new("orders", &order_bytes(id)))
                    .await;
            }
            handler_signal(&BILL_NOTIFY).await;
            let received: usize = BATCHES.lock().unwrap().iter().map(Vec::len).sum();
            if received >= 3 {
                break;
            }
        }
    })
    .await;
    assert!(result.is_ok(), "no batch arrived within the deadline");

    // Order within and across batches must follow publish order. The subscription opens inside
    // run(), so the first publish round can be partly dropped (sent before the subscriber is
    // ready); the surviving stream still follows the repeating 0,1,2 publish cycle, so assert each
    // consecutive pair advances the cycle rather than requiring the stream to start at 0.
    let flattened: Vec<u32> = BATCHES.lock().unwrap().iter().flatten().copied().collect();
    assert!(
        flattened.windows(2).all(|w| w[1] == (w[0] + 1) % 3),
        "deliveries out of publish order: {flattened:?}",
    );
    assert!(
        BATCHES
            .lock()
            .unwrap()
            .iter()
            .all(|batch| !batch.is_empty()),
        "batches must not be empty",
    );

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}

static GOOD_IDS: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static SIFT_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);

/// Records the ids that survived decoding.
#[subscriber(batch("mixed"))]
async fn sift(orders: &[Order]) -> HandlerResult {
    GOOD_IDS.lock().unwrap().extend(orders.iter().map(|o| o.id));
    SIFT_NOTIFY.notify_one();
    HandlerResult::Ack
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn undecodable_elements_never_reach_the_handler() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();

    let app = RustStream::new(AppInfo::new("billing", "0.1.0"))
        .with_broker(broker, |b| b.include_batch(sift));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            let _ = publisher
                .publish(OutgoingMessage::new("mixed", &order_bytes(1)))
                .await;
            let _ = publisher
                .publish(OutgoingMessage::new("mixed", b"not json"))
                .await;
            let _ = publisher
                .publish(OutgoingMessage::new("mixed", &order_bytes(2)))
                .await;
            handler_signal(&SIFT_NOTIFY).await;
            // Subscriptions open inside run(), so the first publishes can be lost and the loop
            // republishes; wait until both decodable ids have actually arrived rather than for a
            // bare count, which a partial first batch would satisfy out of order.
            let both_seen = {
                let seen = GOOD_IDS.lock().unwrap();
                seen.contains(&1) && seen.contains(&2)
            };
            if both_seen {
                break;
            }
        }
    })
    .await;
    assert!(result.is_ok(), "decodable elements did not arrive");

    // The undecodable element is dropped individually, never failing the batch around it: only the
    // two decodable ids ever reach the handler (the loop above already confirmed both did).
    let ids = GOOD_IDS.lock().unwrap().clone();
    assert!(
        ids.iter().all(|&id| id == 1 || id == 2),
        "an undecodable element reached the handler: {ids:?}"
    );

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}

static BUFFERED_SEEN: AtomicUsize = AtomicUsize::new(0);
static DRAIN_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);

/// A handler mounted on a `Buffered`-wrapped source directly in the macro. The macro recovers
/// the source type from the constructor path, so a generic source spells its parameter
/// (turbofish).
#[subscriber(batch(Buffered::<Name>::new(Name::new("events")).max_size(2)))]
async fn drain(events: &[Order]) -> HandlerResult {
    BUFFERED_SEEN.fetch_add(events.len(), Ordering::SeqCst);
    DRAIN_NOTIFY.notify_one();
    HandlerResult::Ack
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn buffered_adapter_batches_plain_subscribers_via_router() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();

    // Mounted through the Router path to cover include_batch there as well.
    let router = Router::<MemoryBroker>::new().include_batch(drain);
    let app = RustStream::new(AppInfo::new("events", "0.1.0"))
        .with_broker(broker, |b| b.include_router(router));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            let _ = publisher
                .publish(OutgoingMessage::new("events", &order_bytes(7)))
                .await;
            handler_signal(&DRAIN_NOTIFY).await;
            if BUFFERED_SEEN.load(Ordering::SeqCst) >= 1 {
                break;
            }
        }
    })
    .await;
    assert!(result.is_ok(), "buffered batch did not arrive");

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}

static SETTLED: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static RECONCILE_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
static RETRIED_ONCE: AtomicBool = AtomicBool::new(false);

/// Retries order 11 on first sight; settles everything else, per element.
#[subscriber(batch("pages"))]
async fn reconcile(orders: &[Order]) -> Vec<HandlerResult> {
    let results = orders
        .iter()
        .map(|o| {
            if o.id == 11 && !RETRIED_ONCE.swap(true, Ordering::SeqCst) {
                HandlerResult::retry()
            } else {
                SETTLED.lock().unwrap().push(o.id);
                HandlerResult::Ack
            }
        })
        .collect();
    RECONCILE_NOTIFY.notify_one();
    results
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn per_element_outcomes_retry_individually() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();

    let app = RustStream::new(AppInfo::new("pages", "0.1.0"))
        .with_broker(broker, |b| b.include_batch(reconcile));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    // Warm up until the subscription is live, then publish the real page exactly once, so the
    // retry accounting below is deterministic.
    let warmup = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            let _ = publisher
                .publish(OutgoingMessage::new("pages", &order_bytes(0)))
                .await;
            handler_signal(&RECONCILE_NOTIFY).await;
            if SETTLED.lock().unwrap().contains(&0) {
                break;
            }
        }
    })
    .await;
    assert!(warmup.is_ok(), "subscription did not come up");

    for id in [10u32, 11, 12] {
        publisher
            .publish(OutgoingMessage::new("pages", &order_bytes(id)))
            .await
            .unwrap();
    }

    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            handler_signal(&RECONCILE_NOTIFY).await;
            let settled = SETTLED.lock().unwrap().clone();
            if [10, 11, 12].iter().all(|id| settled.contains(id)) {
                break;
            }
        }
    })
    .await;
    assert!(result.is_ok(), "retried element was not redelivered");

    // 11 was retried exactly once and settled only on redelivery; 10 and 12 settled first try.
    assert!(RETRIED_ONCE.load(Ordering::SeqCst));
    let settled = SETTLED.lock().unwrap().clone();
    for id in [10u32, 11, 12] {
        assert_eq!(
            settled.iter().filter(|s| **s == id).count(),
            1,
            "{id} must settle exactly once; settled: {settled:?}",
        );
    }

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}

#[derive(Debug, Serialize, Deserialize)]
struct Confirmation {
    id: u32,
    accepted: bool,
}

static BATCH_CONFIRM_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);

/// Confirms a page of orders. The Result form gives explicit ack control; the whole-batch
/// rejection path is covered by the runtime unit tests.
#[subscriber(batch("requests"), publish("confirmations"))]
async fn confirm(orders: &[Order]) -> Result<Vec<Confirmation>, HandlerResult> {
    BATCH_CONFIRM_NOTIFY.notify_one();
    Ok(orders
        .iter()
        .map(|o| Confirmation {
            id: o.id,
            accepted: true,
        })
        .collect())
}

/// The plain reply form: every page is confirmed (compile coverage for `-> Vec<Reply>`).
#[subscriber(batch("requests"), publish("audit"))]
async fn audit(orders: &[Order]) -> Vec<Confirmation> {
    orders
        .iter()
        .map(|o| Confirmation {
            id: o.id,
            accepted: true,
        })
        .collect()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_replies_publish_transactionally() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let observer = broker.clone();

    let replies = TypedPublisher::new(broker.publisher()).transactional();
    let app = RustStream::new(AppInfo::new("confirmations", "0.1.0"))
        .with_broker(broker, |b| b.include_batch_publishing(confirm, replies));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    // Retry publishing until the subscription is live, waking as soon as the handler fires;
    // the published confirmations are then awaited with expect_published's own deadline.
    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            let _ = publisher
                .publish(OutgoingMessage::new("requests", &order_bytes(7)))
                .await;
            handler_signal(&BATCH_CONFIRM_NOTIFY).await;
            let confirmed =
                expect_published(&observer, "confirmations", 1, Duration::from_millis(200)).await;
            if !confirmed.is_empty() {
                break;
            }
        }
    })
    .await;
    assert!(result.is_ok(), "no confirmation arrived");

    let confirmed =
        expect_published(&observer, "confirmations", 1, Duration::from_millis(100)).await;
    for raw in &confirmed {
        let confirmation: Confirmation = serde_json::from_slice(raw.payload()).unwrap();
        assert_eq!(confirmation.id, 7);
        assert!(confirmation.accepted);
    }

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}

#[test]
fn batch_publishing_def_records_metadata() {
    let broker = MemoryBroker::new();
    let replies = TypedPublisher::new(broker.publisher());
    let app = RustStream::new(AppInfo::new("audit", "0.1.0"))
        .with_broker(broker, |b| b.include_batch_publishing(audit, replies));

    assert_eq!(app.handlers().len(), 1);
    assert_eq!(app.handlers()[0].name, "requests");
    assert!(
        app.handlers()[0]
            .output_type
            .is_some_and(|t| t.contains("Confirmation")),
    );
}

#[test]
fn batch_def_records_metadata() {
    let broker = MemoryBroker::new();
    let app = RustStream::new(AppInfo::new("billing", "0.1.0"))
        .with_broker(broker, |b| b.include_batch(bill));

    assert_eq!(app.handlers().len(), 1);
    assert_eq!(app.handlers()[0].name, "orders");
    assert_eq!(
        app.handlers()[0].description.as_deref(),
        Some("Settles a whole page of orders at once."),
    );
}

/// Typed application state read from a batch handler: the multiplier is produced at startup and
/// reaches the whole-batch handler through `ctx.state()`, the same as a single-message handler.
#[derive(Clone, Copy)]
struct Tally {
    multiplier: u32,
}

static SCALED: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static SCALE_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);

#[subscriber(batch("scale"))]
async fn scale(orders: &[Order], ctx: &mut Context<'_, (), Tally>) -> HandlerResult {
    let multiplier = ctx.state().multiplier;
    SCALED
        .lock()
        .unwrap()
        .extend(orders.iter().map(|o| o.id * multiplier));
    SCALE_NOTIFY.notify_one();
    HandlerResult::Ack
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_handler_reads_typed_state() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();

    let app = RustStream::new(AppInfo::new("billing", "0.1.0"))
        .on_startup(|()| async { Ok::<_, std::convert::Infallible>(Tally { multiplier: 10 }) })
        .with_broker(broker, |b| b.include_batch(scale));

    let shutdown = Arc::new(Notify::new());
    let shutdown_signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));

    let result = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            for id in 1..4u32 {
                let _ = publisher
                    .publish(OutgoingMessage::new("scale", &order_bytes(id)))
                    .await;
            }
            handler_signal(&SCALE_NOTIFY).await;
            if SCALED.lock().unwrap().len() >= 3 {
                break;
            }
        }
    })
    .await;
    assert!(
        result.is_ok(),
        "no scaled batch arrived within the deadline"
    );

    // Each id was multiplied by the state's multiplier (10), proving the handler read typed state.
    let scaled = SCALED.lock().unwrap().clone();
    assert!(
        scaled.iter().all(|n| n % 10 == 0),
        "every value must be a multiple of the state multiplier; got {scaled:?}",
    );
    assert!(scaled.contains(&10) && scaled.contains(&20) && scaled.contains(&30));

    shutdown.notify_one();
    run.await.unwrap().unwrap();
}