Expand description
Aggregator pattern: collects messages sharing a correlation header until a configured completion size is reached, then emits a concatenated outbound text message.
§Behavior
- Messages are grouped by the value of a specified correlation header (e.g. “corr”).
- When the number of messages in a group reaches
completion_size, all messages are removed from the internal store and concatenated (text-only) intoExchange.out_msg. - Only groups where every message has a UTF-8 text payload (
Payload::Text) are aggregated. Mixed payload kinds (bytes, JSON, empty) or non-text groups are ignored (no outbound message). - Once a group completes it is removed, allowing subsequent batches with the same key.
§Limitations
- Non-text payloads are ignored for aggregation; bytes-only or JSON-only groups will not produce
an
out_msg. Future enhancements could support bytes joining or JSON array creation. - Concurrency relies on a single
Mutex<HashMap<..>>; high contention scenarios may warrant a sharded map or lock-free structure. - No time-based or predicate-based completion—only size threshold is supported.
§Example
use allora_core::{patterns::aggregator::Aggregator, route::Route, Exchange, Message};
let route = Route::new().add(Aggregator::new("corr", 2)).build();
let mut ex1 = Exchange::new(Message::from_text("A"));
ex1.in_msg.set_header("corr", "grp");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { route.run(&mut ex1).await.unwrap(); });
assert!(ex1.out_msg.is_none());
let mut ex2 = Exchange::new(Message::from_text("B"));
ex2.in_msg.set_header("corr", "grp");
rt.block_on(async { route.run(&mut ex2).await.unwrap(); });
assert_eq!(ex2.out_msg.unwrap().body_text(), Some("AB"));