Skip to main content

affinidi_messaging_mediator_processors/
lib.rs

1//! Standalone background processors for the Affinidi Messaging Mediator.
2//!
3//! These processors run as separate processes from the mediator binary,
4//! intended for horizontal scaling: drop more processor instances onto
5//! additional hosts and they coordinate through Redis to share work.
6//!
7//! ## Redis-only by design
8//!
9//! Multi-process coordination relies on Redis primitives:
10//! - **Forwarding processor**: Redis Streams consumer groups
11//!   (`XREADGROUP` / `XACK` / `XAUTOCLAIM`) for at-least-once delivery
12//!   across competing consumers.
13//! - **Message expiry cleanup**: atomic `SPOP` on expiry-timeslot sets
14//!   so multiple processors can drain the same timeslot without
15//!   duplicating work.
16//!
17//! Memory and Fjall backends are single-process by definition and have
18//! no equivalent multi-host coordination, so the standalone binaries
19//! are not portable to those backends. The mediator's in-process tasks
20//! cover the same workloads on every backend via the
21//! `MediatorStore` trait — operators only need this crate when they want
22//! to scale a Redis deployment horizontally.
23//!
24//! ## Implementation
25//!
26//! Both binaries reuse the trait-based code path lifted into
27//! `mediator-common`:
28//!
29//! - `forwarding_processor` constructs an `Arc<dyn MediatorStore>`
30//!   over a `RedisStore` and feeds it to
31//!   `mediator_common::tasks::forwarding::ForwardingProcessor`. Same
32//!   processor type the mediator binary spawns in-process.
33//! - `message_expiry_cleanup` opens the same `RedisStore` and calls
34//!   `MediatorStore::sweep_expired_messages` on a one-second tick,
35//!   matching the mediator's in-process expiry sweep.
36//!
37//! No duplicated implementation lives in this crate — both binaries
38//! are thin shells that wire config to mediator-common.