Expand description
Aggregator pattern: collect messages sharing a correlation header into a group, decide when the
group is complete via a pluggable CompletionCondition, and fold the completed group into an
outbound Message via a pluggable AggregationStrategy. The group storage itself is also
pluggable via GroupStore (default: InMemoryGroupStore).
§Composition
An aggregator is correlation key + completion + strategy + store. Built-in completions:
BySize— completes when the group reachesnmessages (the original behavior).ByWeight— completes whenΣ weight(msg) ≥ threshold; the canonical weighted-quorum condition (e.g. validator voting power, oracle stake).ByPredicate— caller-suppliedFn(&[Message]) -> bool.ByTimeout— completes when the first message in the group is older than aDuration.
Built-in strategies:
ConcatText— UTF-8 concatenation of every payload (legacy behavior); emits nothing if any payload in the group is non-text.JsonArray— emits a singlePayload::Jsonmessage containing each payload mapped into a JSON array element.
§Back-compat
Aggregator::new("corr", n) keeps its original semantics — equivalent to
Aggregator::with_completion("corr", Arc::new(BySize(n))) (with ConcatText + in-memory store).
§Timeout semantics
ByTimeout is evaluated lazily: completion is only checked when a new message arrives.
Timer-driven completion (firing without a triggering message) requires an external tick / poll
and is out of scope for this version.
§Persistence
GroupStore is the seam for durable storage. The default InMemoryGroupStore keeps
everything in a Mutex<HashMap>; downstream crates (e.g. the Fialucci chain) can provide a
disk-backed impl by implementing the trait.
§Example — size completion (back-compat)
use allora_core::{patterns::aggregator::Aggregator, route::Route, Exchange, Message};
let route = Route::new().add(Aggregator::new("corr", 2)).build();
let mut ex1 = Exchange::new(Message::from_text("A"));
ex1.in_msg.set_header("corr", "grp");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { route.run(&mut ex1).await.unwrap(); });
assert!(ex1.out_msg.is_none());
let mut ex2 = Exchange::new(Message::from_text("B"));
ex2.in_msg.set_header("corr", "grp");
rt.block_on(async { route.run(&mut ex2).await.unwrap(); });
assert_eq!(ex2.out_msg.unwrap().body_text(), Some("AB"));§Example — weighted-quorum completion (≥ 2/3 voting power)
use allora_core::{patterns::aggregator::{Aggregator, JsonArray}, route::Route, Exchange, Message};
use std::sync::Arc;
// Three validators with weights 3, 3, 4 (total = 10, ⌈2/3·10⌉ = 7).
// First two votes sum to 6 (< 7); the third pushes total to 10 (≥ 7) and fires.
let threshold: u64 = 7;
let agg = Aggregator::weighted(
"block",
|m: &Message| m.header("voting_power").and_then(|s| s.parse().ok()).unwrap_or(0),
threshold,
)
.with_strategy(Arc::new(JsonArray));
let route = Route::new().add(agg).build();
let rt = tokio::runtime::Runtime::new().unwrap();
for (vp, complete_expected) in [(3u64, false), (3, false), (4, true)] {
let mut ex = Exchange::new(Message::from_text("vote"));
ex.in_msg.set_header("block", "h=42");
ex.in_msg.set_header("voting_power", vp.to_string());
rt.block_on(async { route.run(&mut ex).await.unwrap(); });
assert_eq!(ex.out_msg.is_some(), complete_expected);
}Structs§
- Aggregator
- EIP Aggregator: groups messages by a correlation header, completes via a
CompletionCondition, and emits the folded result via anAggregationStrategy. Storage is pluggable viaGroupStore. - ByPredicate
- Caller-supplied predicate completion.
- BySize
- Size-threshold completion: complete once the group contains
nmessages. - ByTimeout
- Time-based completion: complete when the elapsed time since the first message ≥
Duration. - ByWeight
- Weighted-quorum completion: sum of
weight(msg)over the group must reachthreshold. - Concat
Text - UTF-8 text concatenation. Emits
Noneif any payload in the group is notPayload::Text— preserves the legacy aggregator behavior. - InMemory
Group Store - Default in-process group store (one
Mutex<HashMap>). - Json
Array - JSON-array carry: emit a single
Payload::Jsonmessage whose body is a JSON array, with each element mapped from the corresponding payload:
Traits§
- Aggregation
Strategy - Folds a completed group into an outbound
Message. ReturningNonemeans “the group is complete but emit nothing” (e.g.ConcatTexton non-text payloads). - Completion
Condition - Decides when an in-progress correlation group is complete.
- Group
Store - Pluggable storage for in-progress correlation groups. The default
InMemoryGroupStorekeeps groups in aMutex<HashMap>; persistent implementations can plug in viaAggregator::with_store.