use crate::adapter::Adapter;
use crate::channel::ChannelRef;
use crate::dsl::PatternRegistry;
use crate::spec::{AggregatorSpec, AggregatorsSpec};
use crate::spec::{HttpInboundAdapterSpec, HttpInboundAdaptersSpec};
use crate::spec::{HttpOutboundAdapterSpec, HttpOutboundAdaptersSpec};
use crate::{
spec::{ChannelKindSpec, ChannelSpec, ChannelsSpec, FilterSpec, FiltersSpec},
spec::{ServiceActivatorSpec, ServiceActivatorsSpec},
Channel, ClosureProcessor, Error, Filter, Result,
};
use allora_core::patterns::aggregator::Aggregator;
use allora_http::{HttpInboundAdapter, InboundHttpExt, Mep};
use allora_http::{HttpOutboundAdapter, OutboundHttpExt};
use hyper::Method;
fn parse_http_method(m: &str) -> Method {
match m {
"GET" => Method::GET,
"POST" => Method::POST,
"PUT" => Method::PUT,
"PATCH" => Method::PATCH,
"DELETE" => Method::DELETE,
"OPTIONS" => Method::OPTIONS,
"HEAD" => Method::HEAD,
other => Method::from_bytes(other.as_bytes()).unwrap_or(Method::POST),
}
}
pub type ServiceProcessor =
ClosureProcessor<Box<dyn Fn(&mut crate::Exchange) -> Result<()> + Send + Sync + 'static>>;
use std::collections::HashSet;
fn build_channel_spec_internal(
spec: &ChannelSpec,
used_ids: Option<&mut HashSet<String>>,
auto_ctr: Option<&mut u64>,
) -> Result<Box<dyn Channel>> {
let final_id: Option<String> = match (spec.channel_id(), used_ids) {
(Some(""), _) => return Err(Error::serialization("channel.id must not be empty")),
(Some(id), Some(used)) => {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate channel.id '{id}'")));
}
used.insert(id.to_string());
Some(id.to_string())
}
(Some(id), None) => Some(id.to_string()), (None, Some(used)) => {
let ctr = auto_ctr.expect("auto counter must be provided when used_ids is set");
let mut gen = format!("channel:auto.{}", *ctr);
while used.contains(&gen) {
*ctr += 1;
gen = format!("channel:auto.{}", *ctr);
}
*ctr += 1;
used.insert(gen.clone());
Some(gen)
}
(None, None) => None, };
let channel: Box<dyn Channel> = match spec.kind() {
ChannelKindSpec::Queue => match final_id {
Some(id) => Box::new(crate::channel::QueueChannel::with_id(id)),
None => Box::new(crate::channel::QueueChannel::with_random_id()),
},
ChannelKindSpec::Direct => match final_id {
Some(id) => Box::new(crate::channel::DirectChannel::with_id(id)),
None => Box::new(crate::channel::DirectChannel::with_random_id()),
},
};
Ok(channel)
}
pub fn build_channel_from_spec(spec: ChannelSpec) -> Result<Box<dyn Channel>> {
build_channel_spec_internal(&spec, None, None)
}
pub fn build_channels_from_spec(spec: ChannelsSpec) -> Result<Vec<Box<dyn Channel>>> {
let mut result: Vec<Box<dyn Channel>> = Vec::with_capacity(spec.channels().len());
let mut used: HashSet<String> = HashSet::new();
let mut auto_ctr: u64 = 1;
for ch in spec.channels() {
let built = build_channel_spec_internal(ch, Some(&mut used), Some(&mut auto_ctr))?;
result.push(built);
}
Ok(result)
}
pub fn build_filter_from_spec(spec: FilterSpec) -> Result<Filter> {
let id_opt = spec.id().map(|s| s.to_string());
Filter::from_apl_with_id(id_opt, spec.when())
}
pub fn build_filters_from_spec(spec: FiltersSpec) -> Result<Vec<Filter>> {
let mut result = Vec::with_capacity(spec.filters().len());
const AUTO_PREFIX: &str = "filter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for f in spec.filters() {
if let Some(id) = f.id() {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate filter.id '{id}'")));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
match rest.parse::<u64>() {
Ok(n) => max_auto_explicit = max_auto_explicit.max(n),
Err(_) => {
tracing::warn!(%id, "ignoring malformed reserved auto-id suffix; expected numeric after filter:auto.")
}
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for f in spec.filters() {
if let Some(id) = f.id() {
result.push(Filter::from_apl_with_id(Some(id.to_string()), f.when())?);
continue;
}
let gen_id = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
result.push(Filter::from_apl_with_id(Some(gen_id), f.when())?);
}
Ok(result)
}
fn validate_service_activator_spec(spec: &ServiceActivatorSpec) -> Result<()> {
if spec.from().is_empty() {
return Err(Error::serialization("service.from must not be empty"));
}
if spec.to().is_empty() {
return Err(Error::serialization("service.to must not be empty"));
}
if spec.ref_name().is_empty() {
return Err(Error::serialization("service.ref-name must not be empty"));
}
Ok(())
}
fn service_processor_with_headers(id_opt: Option<&str>, ref_name: &str) -> ServiceProcessor {
let ref_name_copy = ref_name.to_string();
let id_copy = id_opt.map(|s| s.to_string());
let proc_fn: Box<dyn Fn(&mut crate::Exchange) -> Result<()> + Send + Sync + 'static> =
Box::new(move |exchange: &mut crate::Exchange| {
if let Some(ref id) = id_copy {
exchange.in_msg.set_header("service-activator.id", id);
}
exchange
.in_msg
.set_header("service-activator.ref-name", ref_name_copy.as_str());
Ok(())
});
ClosureProcessor::new(proc_fn)
}
pub fn build_service_from_spec(spec: ServiceActivatorSpec) -> Result<ServiceProcessor> {
validate_service_activator_spec(&spec)?;
Ok(service_processor_with_headers(spec.id(), spec.ref_name()))
}
pub fn build_service_activators_from_spec(
spec: ServiceActivatorsSpec,
) -> Result<Vec<ServiceProcessor>> {
let mut result = Vec::with_capacity(spec.services_activators().len());
const AUTO_PREFIX: &str = "service:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for s in spec.services_activators() {
validate_service_activator_spec(s)?;
if let Some(id) = s.id() {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate service.id '{id}'")));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for s in spec.services_activators() {
let id_final = match s.id() {
Some(id) => id.to_string(),
None => {
let gen = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
gen
}
};
let proc = service_processor_with_headers(Some(&id_final), s.ref_name());
result.push(proc);
}
Ok(result)
}
fn http_inbound_builder_base(
host: &str,
port: u16,
path: &str,
channel: ChannelRef,
) -> allora_http::HttpInboundBuilder {
Adapter::inbound()
.http()
.host(host)
.port(port)
.base_path(path)
.channel(channel)
}
pub fn build_http_inbound_adapter_from_spec(
spec: HttpInboundAdapterSpec,
channel_lookup: &dyn Fn(&str) -> Option<ChannelRef>,
) -> Result<HttpInboundAdapter> {
let req_id = spec.request_channel();
if req_id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.request-channel must not be empty",
));
}
let ch = channel_lookup(req_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.request-channel",
req_id
))
})?;
let mut builder = http_inbound_builder_base(spec.host(), spec.port(), spec.path(), ch.clone());
if let Some(id) = spec.id() {
builder = builder.id(id);
}
if let Some(reply_id) = spec.reply_channel() {
let rc = channel_lookup(reply_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.reply-channel",
reply_id
))
})?;
builder = builder.reply_channel(rc);
}
if spec.reply_channel().is_none() {
builder = builder.mep(Mep::InOnly202);
}
Ok(builder.build())
}
pub fn build_http_inbound_adapters_from_spec(
spec: HttpInboundAdaptersSpec,
channel_lookup: &dyn Fn(&str) -> Option<ChannelRef>,
) -> Result<Vec<HttpInboundAdapter>> {
let mut result = Vec::with_capacity(spec.adapters().len());
const AUTO_PREFIX: &str = "http-inbound-adapter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for a in spec.adapters() {
let req_id = a.request_channel();
if req_id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.request-channel must not be empty",
));
}
if let Some(id) = a.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.id must not be empty",
));
}
if used.contains(id) {
return Err(Error::serialization(format!(
"duplicate http-inbound-adapter.id '{}'",
id
)));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for a in spec.adapters() {
let id_final = match a.id() {
Some(id) => id.to_string(),
None => {
let gen = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
let mut candidate = gen;
while used.contains(&candidate) {
candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
}
used.insert(candidate.clone());
candidate
}
};
let ch = channel_lookup(a.request_channel()).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.request-channel",
a.request_channel()
))
})?;
let mut builder = http_inbound_builder_base(a.host(), a.port(), a.path(), ch.clone())
.id(id_final.clone());
if let Some(reply_id) = a.reply_channel() {
let rc = channel_lookup(reply_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.reply-channel",
reply_id
))
})?;
builder = builder.reply_channel(rc);
} else {
builder = builder.mep(Mep::InOnly202);
}
result.push(builder.build());
}
Ok(result)
}
pub fn build_http_outbound_adapter_from_spec(
spec: HttpOutboundAdapterSpec,
) -> Result<HttpOutboundAdapter> {
if let Some(id) = spec.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-outbound-adapter.id must not be empty",
));
}
}
let mut builder = Adapter::outbound()
.http()
.host(spec.host())
.port(spec.port())
.base_path(spec.base_path());
if let Some(p) = spec.path() {
builder = builder.path(p);
}
if let Some(m) = spec.method() {
builder = builder.method(parse_http_method(m));
}
if !spec.use_out_msg() {
builder = builder.use_out_msg(false);
}
if let Some(id) = spec.id() {
builder = builder.id(id);
}
builder.build()
}
pub fn build_http_outbound_adapters_from_spec(
spec: HttpOutboundAdaptersSpec,
) -> Result<Vec<HttpOutboundAdapter>> {
let mut result = Vec::with_capacity(spec.adapters().len());
const AUTO_PREFIX: &str = "http-outbound-adapter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for a in spec.adapters() {
if let Some(id) = a.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-outbound-adapter.id must not be empty",
));
}
if used.contains(id) {
return Err(Error::serialization(format!(
"duplicate http-outbound-adapter.id '{}'",
id
)));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for a in spec.adapters() {
let id_final = match a.id() {
Some(id) => id.to_string(),
None => {
let mut candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
while used.contains(&candidate) {
candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
}
used.insert(candidate.clone());
candidate
}
};
let mut builder = Adapter::outbound()
.http()
.host(a.host())
.port(a.port())
.base_path(a.base_path())
.id(id_final);
if let Some(p) = a.path() {
builder = builder.path(p);
}
if let Some(m) = a.method() {
builder = builder.method(parse_http_method(m));
}
if !a.use_out_msg() {
builder = builder.use_out_msg(false);
}
let built = builder.build()?;
result.push(built);
}
Ok(result)
}
pub fn build_aggregator_from_spec(
spec: AggregatorSpec,
registry: &PatternRegistry,
) -> Result<Aggregator> {
if let Some(id) = spec.id() {
if id.is_empty() {
return Err(Error::serialization("aggregator.id must not be empty"));
}
}
if spec.correlation_header().is_empty() {
return Err(Error::serialization(
"aggregator.correlation_header must not be empty",
));
}
let completion = registry.completion(spec.completion()).ok_or_else(|| {
Error::serialization(format!(
"unknown completion '{}' for aggregator (registered completions: [{}])",
spec.completion(),
registry.completion_names().join(", ")
))
})?;
let mut agg = Aggregator::with_completion(spec.correlation_header(), completion);
if let Some(strategy_name) = spec.strategy() {
let strategy = registry.strategy(strategy_name).ok_or_else(|| {
Error::serialization(format!(
"unknown strategy '{}' for aggregator (registered strategies: [{}])",
strategy_name,
registry.strategy_names().join(", ")
))
})?;
agg = agg.with_strategy(strategy);
}
if let Some(store_name) = spec.store() {
let store = registry.store(store_name).ok_or_else(|| {
Error::serialization(format!(
"unknown store '{}' for aggregator (registered stores: [{}])",
store_name,
registry.store_names().join(", ")
))
})?;
agg = agg.with_store(store);
}
Ok(agg)
}
pub fn build_aggregators_from_spec(
spec: AggregatorsSpec,
registry: &PatternRegistry,
) -> Result<Vec<(String, Aggregator)>> {
const AUTO_PREFIX: &str = "aggregator:auto.";
let mut used: HashSet<String> = HashSet::new();
let mut max_auto_explicit = 0u64;
for a in spec.aggregators() {
if let Some(id) = a.id() {
if id.is_empty() {
return Err(Error::serialization("aggregator.id must not be empty"));
}
if used.contains(id) {
return Err(Error::serialization(format!(
"duplicate aggregator.id '{id}'"
)));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
let mut result: Vec<(String, Aggregator)> = Vec::with_capacity(spec.aggregators().len());
for a in spec.aggregators() {
let id_final = match a.id() {
Some(id) => id.to_string(),
None => {
let mut candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
while used.contains(&candidate) {
candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
}
used.insert(candidate.clone());
candidate
}
};
let built = build_aggregator_from_spec(a.clone(), registry)
.map_err(|e| Error::serialization(format!("{e} (in aggregator '{id_final}')")))?;
result.push((id_final, built));
}
Ok(result)
}
#[cfg(test)]
mod aggregator_tests {
use super::*;
use allora_core::patterns::aggregator::CompletionCondition;
use allora_core::Message;
use std::sync::Arc;
use std::time::Instant;
struct CountAtLeast(usize);
impl CompletionCondition for CountAtLeast {
fn is_complete(&self, group: &[Message], _: Instant) -> bool {
group.len() >= self.0
}
}
fn registry_with_test_completion() -> PatternRegistry {
let mut r = PatternRegistry::with_defaults();
r.register_completion("test.count_at_least_3", Arc::new(CountAtLeast(3)));
r
}
#[test]
fn build_single_with_completion_only() {
let spec = AggregatorSpec::new("corr", "test.count_at_least_3");
let _agg = build_aggregator_from_spec(spec, ®istry_with_test_completion())
.expect("build succeeds");
}
#[test]
fn build_single_with_strategy_and_store_refs_resolved() {
let mut registry = registry_with_test_completion();
registry.register_store(
"test.in_mem",
Arc::new(allora_core::patterns::aggregator::InMemoryGroupStore::new()),
);
let spec = AggregatorSpec::new("corr", "test.count_at_least_3")
.set_strategy(crate::dsl::STRATEGY_EMIT_SIGNAL)
.set_store("test.in_mem");
let _agg = build_aggregator_from_spec(spec, ®istry).expect("build succeeds");
}
#[test]
fn build_fails_on_unknown_completion_with_helpful_message() {
let spec = AggregatorSpec::new("corr", "nope.does.not.exist");
let err = build_aggregator_from_spec(spec, &PatternRegistry::with_defaults())
.expect_err("must error");
let msg = err.to_string();
assert!(
msg.contains("unknown completion 'nope.does.not.exist'"),
"error message should name the missing key, got: {msg}"
);
}
#[test]
fn build_fails_on_unknown_strategy() {
let spec = AggregatorSpec::new("corr", "test.count_at_least_3").set_strategy("nope");
let err = build_aggregator_from_spec(spec, ®istry_with_test_completion())
.expect_err("must error");
assert!(err.to_string().contains("unknown strategy 'nope'"));
}
#[test]
fn build_fails_on_unknown_store() {
let spec = AggregatorSpec::new("corr", "test.count_at_least_3").set_store("nope");
let err = build_aggregator_from_spec(spec, ®istry_with_test_completion())
.expect_err("must error");
assert!(err.to_string().contains("unknown store 'nope'"));
}
#[test]
fn collection_preserves_order_with_auto_ids_and_uniqueness() {
let r = registry_with_test_completion();
let spec = AggregatorsSpec::new(1)
.add(AggregatorSpec::with_id(
"explicit.a",
"corr",
"test.count_at_least_3",
))
.add(AggregatorSpec::new("corr", "test.count_at_least_3"))
.add(AggregatorSpec::new("corr2", "test.count_at_least_3"));
let built = build_aggregators_from_spec(spec, &r).expect("build succeeds");
assert_eq!(built.len(), 3);
let ids: Vec<&str> = built.iter().map(|(id, _)| id.as_str()).collect();
assert_eq!(
ids,
vec!["explicit.a", "aggregator:auto.1", "aggregator:auto.2"]
);
}
#[test]
fn collection_rejects_duplicate_explicit_ids() {
let r = registry_with_test_completion();
let spec = AggregatorsSpec::new(1)
.add(AggregatorSpec::with_id(
"dup",
"corr",
"test.count_at_least_3",
))
.add(AggregatorSpec::with_id(
"dup",
"corr2",
"test.count_at_least_3",
));
let err = build_aggregators_from_spec(spec, &r).expect_err("must error");
assert!(err.to_string().contains("duplicate aggregator.id 'dup'"));
}
#[test]
fn auto_ids_skip_past_explicit_reserved_pattern() {
let r = registry_with_test_completion();
let spec = AggregatorsSpec::new(1)
.add(AggregatorSpec::with_id(
"aggregator:auto.5",
"corr",
"test.count_at_least_3",
))
.add(AggregatorSpec::new("corr", "test.count_at_least_3"));
let built = build_aggregators_from_spec(spec, &r).expect("build succeeds");
let ids: Vec<&str> = built.iter().map(|(id, _)| id.as_str()).collect();
assert_eq!(ids, vec!["aggregator:auto.5", "aggregator:auto.6"]);
}
#[test]
fn per_entry_build_errors_name_the_offending_aggregator() {
let r = registry_with_test_completion();
let spec = AggregatorsSpec::new(1)
.add(AggregatorSpec::with_id(
"first.ok",
"corr",
"test.count_at_least_3",
))
.add(AggregatorSpec::with_id(
"second.broken",
"corr2",
"nope.does.not.exist",
));
let err = build_aggregators_from_spec(spec, &r).expect_err("must error");
let msg = err.to_string();
assert!(
msg.contains("unknown completion 'nope.does.not.exist'"),
"underlying error must still be present, got: {msg}"
);
assert!(
msg.contains("in aggregator 'second.broken'"),
"error must name the failing aggregator id, got: {msg}"
);
}
#[test]
fn end_to_end_allora_yaml_with_aggregators_section() {
use crate::spec::AlloraSpecYamlParser;
let raw = r#"
version: 1
channels:
- kind: direct
id: attestations
aggregators:
- id: finality_quorum
correlation_header: block_hash
completion: test.count_at_least_3
strategy: allora.emit_signal
- correlation_header: oracle_submission_id
completion: test.count_at_least_3
"#;
let parsed = AlloraSpecYamlParser::parse_str(raw).expect("yaml parse ok");
assert_eq!(parsed.version(), 1);
assert_eq!(parsed.channels_spec().channels().len(), 1);
let aggs_spec = parsed
.aggregators_spec()
.expect("aggregators section present");
assert_eq!(aggs_spec.aggregators().len(), 2);
assert_eq!(aggs_spec.aggregators()[0].id(), Some("finality_quorum"));
assert_eq!(aggs_spec.aggregators()[1].id(), None);
let mut registry = PatternRegistry::with_defaults();
registry.register_completion("test.count_at_least_3", Arc::new(CountAtLeast(3)));
let built = build_aggregators_from_spec(aggs_spec.clone(), ®istry)
.expect("aggregators build ok");
assert_eq!(built.len(), 2);
let ids: Vec<&str> = built.iter().map(|(id, _)| id.as_str()).collect();
assert_eq!(ids, vec!["finality_quorum", "aggregator:auto.1"]);
let by_id: std::collections::HashMap<String, _> = built.into_iter().collect();
assert!(by_id.contains_key("finality_quorum"));
assert!(by_id.contains_key("aggregator:auto.1"));
}
#[test]
fn end_to_end_yaml_unknown_strategy_surfaces_clear_error() {
use crate::spec::AlloraSpecYamlParser;
let raw = r#"
version: 1
aggregators:
- id: bad
correlation_header: x
completion: test.count_at_least_3
strategy: nope.unknown
"#;
let parsed = AlloraSpecYamlParser::parse_str(raw).expect("yaml parse ok");
let mut registry = PatternRegistry::with_defaults();
registry.register_completion("test.count_at_least_3", Arc::new(CountAtLeast(3)));
let err = build_aggregators_from_spec(
parsed.into_aggregators_spec().expect("aggregators present"),
®istry,
)
.expect_err("must error on unknown strategy");
let msg = err.to_string();
assert!(
msg.contains("unknown strategy 'nope.unknown'"),
"error must name the unknown ref, got: {msg}"
);
}
}