Skip to main content

Module aggregator

Module aggregator 

Source
Expand description

Aggregator pattern: collect messages sharing a correlation header into a group, decide when the group is complete via a pluggable CompletionCondition, and fold the completed group into an outbound Message via a pluggable AggregationStrategy. The group storage itself is also pluggable via GroupStore (default: InMemoryGroupStore).

§Composition

An aggregator is correlation key + completion + strategy + store. Built-in completions:

  • BySize — completes when the group reaches n messages (the original behavior).
  • ByWeight — completes when Σ weight(msg) ≥ threshold; the canonical weighted-quorum condition (e.g. validator voting power, oracle stake).
  • ByPredicate — caller-supplied Fn(&[Message]) -> bool.
  • ByTimeout — completes when the first message in the group is older than a Duration.

Built-in strategies:

  • ConcatText — UTF-8 concatenation of every payload (legacy behavior); emits nothing if any payload in the group is non-text.
  • JsonArray — emits a single Payload::Json message containing each payload mapped into a JSON array element.

§Back-compat

Aggregator::new("corr", n) keeps its original semantics — equivalent to Aggregator::with_completion("corr", Arc::new(BySize(n))) (with ConcatText + in-memory store).

§Timeout semantics

ByTimeout is evaluated lazily: completion is only checked when a new message arrives. Timer-driven completion (firing without a triggering message) requires an external tick / poll and is out of scope for this version.

§Persistence

GroupStore is the seam for durable storage. The default InMemoryGroupStore keeps everything in a Mutex<HashMap>; downstream crates (e.g. the Fialucci chain) can provide a disk-backed impl by implementing the trait.

§Example — size completion (back-compat)

use allora_core::{patterns::aggregator::Aggregator, route::Route, Exchange, Message};
let route = Route::new().add(Aggregator::new("corr", 2)).build();
let mut ex1 = Exchange::new(Message::from_text("A"));
ex1.in_msg.set_header("corr", "grp");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { route.run(&mut ex1).await.unwrap(); });
assert!(ex1.out_msg.is_none());
let mut ex2 = Exchange::new(Message::from_text("B"));
ex2.in_msg.set_header("corr", "grp");
rt.block_on(async { route.run(&mut ex2).await.unwrap(); });
assert_eq!(ex2.out_msg.unwrap().body_text(), Some("AB"));

§Example — weighted-quorum completion (≥ 2/3 voting power)

use allora_core::{patterns::aggregator::{Aggregator, JsonArray}, route::Route, Exchange, Message};
use std::sync::Arc;
// Three validators with weights 3, 3, 4 (total = 10, ⌈2/3·10⌉ = 7).
// First two votes sum to 6 (< 7); the third pushes total to 10 (≥ 7) and fires.
let threshold: u64 = 7;
let agg = Aggregator::weighted(
    "block",
    |m: &Message| m.header("voting_power").and_then(|s| s.parse().ok()).unwrap_or(0),
    threshold,
)
.with_strategy(Arc::new(JsonArray));
let route = Route::new().add(agg).build();
let rt = tokio::runtime::Runtime::new().unwrap();
for (vp, complete_expected) in [(3u64, false), (3, false), (4, true)] {
    let mut ex = Exchange::new(Message::from_text("vote"));
    ex.in_msg.set_header("block", "h=42");
    ex.in_msg.set_header("voting_power", vp.to_string());
    rt.block_on(async { route.run(&mut ex).await.unwrap(); });
    assert_eq!(ex.out_msg.is_some(), complete_expected);
}

Structs§

Aggregator
EIP Aggregator: groups messages by a correlation header, completes via a CompletionCondition, and emits the folded result via an AggregationStrategy. Storage is pluggable via GroupStore.
ByPredicate
Caller-supplied predicate completion.
BySize
Size-threshold completion: complete once the group contains n messages.
ByTimeout
Time-based completion: complete when the elapsed time since the first message ≥ Duration.
ByWeight
Weighted-quorum completion: sum of weight(msg) over the group must reach threshold.
ConcatText
UTF-8 text concatenation. Emits None if any payload in the group is not Payload::Text — preserves the legacy aggregator behavior.
EmitSignal
Always emits an empty Message when the completion condition fires. Use when the aggregator is being treated as a pure “quorum reached” signal — the caller doesn’t care about the group’s contents (typically it re-reads authoritative state from elsewhere on the signal).
InMemoryGroupStore
Default in-process group store (one Mutex<HashMap>).
JsonArray
JSON-array carry: emit a single Payload::Json message whose body is a JSON array, with each element mapped from the corresponding payload:

Traits§

AggregationStrategy
Folds a completed group into an outbound Message. Returning None means “the group is complete but emit nothing” (e.g. ConcatText on non-text payloads).
CompletionCondition
Decides when an in-progress correlation group is complete.
GroupStore
Pluggable storage for in-progress correlation groups. The default InMemoryGroupStore keeps groups in a Mutex<HashMap>; persistent implementations can plug in via Aggregator::with_store.