Module aggregator

Module aggregator 

Source
Expand description

Aggregator pattern: collects messages sharing a correlation header until a configured completion size is reached, then emits a concatenated outbound text message.

§Behavior

  • Messages are grouped by the value of a specified correlation header (e.g. “corr”).
  • When the number of messages in a group reaches completion_size, all messages are removed from the internal store and concatenated (text-only) into Exchange.out_msg.
  • Only groups where every message has a UTF-8 text payload (Payload::Text) are aggregated. Mixed payload kinds (bytes, JSON, empty) or non-text groups are ignored (no outbound message).
  • Once a group completes it is removed, allowing subsequent batches with the same key.

§Limitations

  • Non-text payloads are ignored for aggregation; bytes-only or JSON-only groups will not produce an out_msg. Future enhancements could support bytes joining or JSON array creation.
  • Concurrency relies on a single Mutex<HashMap<..>>; high contention scenarios may warrant a sharded map or lock-free structure.
  • No time-based or predicate-based completion—only size threshold is supported.

§Example

use allora_core::{patterns::aggregator::Aggregator, route::Route, Exchange, Message};
let route = Route::new().add(Aggregator::new("corr", 2)).build();
let mut ex1 = Exchange::new(Message::from_text("A"));
ex1.in_msg.set_header("corr", "grp");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { route.run(&mut ex1).await.unwrap(); });
assert!(ex1.out_msg.is_none());
let mut ex2 = Exchange::new(Message::from_text("B"));
ex2.in_msg.set_header("corr", "grp");
rt.block_on(async { route.run(&mut ex2).await.unwrap(); });
assert_eq!(ex2.out_msg.unwrap().body_text(), Some("AB"));

Structs§

Aggregator