use crate::adapter::Adapter;
use crate::channel::ChannelRef;
use crate::spec::{HttpInboundAdapterSpec, HttpInboundAdaptersSpec};
use crate::spec::{HttpOutboundAdapterSpec, HttpOutboundAdaptersSpec};
use crate::{
spec::{ChannelKindSpec, ChannelSpec, ChannelsSpec, FilterSpec, FiltersSpec},
spec::{ServiceActivatorSpec, ServiceActivatorsSpec},
Channel, ClosureProcessor, Error, Filter, Result,
};
use allora_http::{HttpInboundAdapter, InboundHttpExt, Mep};
use allora_http::{HttpOutboundAdapter, OutboundHttpExt};
use hyper::Method;
fn parse_http_method(m: &str) -> Method {
match m {
"GET" => Method::GET,
"POST" => Method::POST,
"PUT" => Method::PUT,
"PATCH" => Method::PATCH,
"DELETE" => Method::DELETE,
"OPTIONS" => Method::OPTIONS,
"HEAD" => Method::HEAD,
other => Method::from_bytes(other.as_bytes()).unwrap_or(Method::POST),
}
}
pub type ServiceProcessor =
ClosureProcessor<Box<dyn Fn(&mut crate::Exchange) -> Result<()> + Send + Sync + 'static>>;
use std::collections::HashSet;
fn build_channel_spec_internal(
spec: &ChannelSpec,
used_ids: Option<&mut HashSet<String>>,
auto_ctr: Option<&mut u64>,
) -> Result<Box<dyn Channel>> {
let final_id: Option<String> = match (spec.channel_id(), used_ids) {
(Some(""), _) => return Err(Error::serialization("channel.id must not be empty")),
(Some(id), Some(used)) => {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate channel.id '{id}'")));
}
used.insert(id.to_string());
Some(id.to_string())
}
(Some(id), None) => Some(id.to_string()), (None, Some(used)) => {
let ctr = auto_ctr.expect("auto counter must be provided when used_ids is set");
let mut gen = format!("channel:auto.{}", *ctr);
while used.contains(&gen) {
*ctr += 1;
gen = format!("channel:auto.{}", *ctr);
}
*ctr += 1;
used.insert(gen.clone());
Some(gen)
}
(None, None) => None, };
let channel: Box<dyn Channel> = match spec.kind() {
ChannelKindSpec::Queue => match final_id {
Some(id) => Box::new(crate::channel::QueueChannel::with_id(id)),
None => Box::new(crate::channel::QueueChannel::with_random_id()),
},
ChannelKindSpec::Direct => match final_id {
Some(id) => Box::new(crate::channel::DirectChannel::with_id(id)),
None => Box::new(crate::channel::DirectChannel::with_random_id()),
},
};
Ok(channel)
}
pub fn build_channel_from_spec(spec: ChannelSpec) -> Result<Box<dyn Channel>> {
build_channel_spec_internal(&spec, None, None)
}
pub fn build_channels_from_spec(spec: ChannelsSpec) -> Result<Vec<Box<dyn Channel>>> {
let mut result: Vec<Box<dyn Channel>> = Vec::with_capacity(spec.channels().len());
let mut used: HashSet<String> = HashSet::new();
let mut auto_ctr: u64 = 1;
for ch in spec.channels() {
let built = build_channel_spec_internal(ch, Some(&mut used), Some(&mut auto_ctr))?;
result.push(built);
}
Ok(result)
}
pub fn build_filter_from_spec(spec: FilterSpec) -> Result<Filter> {
let id_opt = spec.id().map(|s| s.to_string());
Filter::from_apl_with_id(id_opt, spec.when())
}
pub fn build_filters_from_spec(spec: FiltersSpec) -> Result<Vec<Filter>> {
let mut result = Vec::with_capacity(spec.filters().len());
const AUTO_PREFIX: &str = "filter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for f in spec.filters() {
if let Some(id) = f.id() {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate filter.id '{id}'")));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
match rest.parse::<u64>() {
Ok(n) => max_auto_explicit = max_auto_explicit.max(n),
Err(_) => {
tracing::warn!(%id, "ignoring malformed reserved auto-id suffix; expected numeric after filter:auto.")
}
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for f in spec.filters() {
if let Some(id) = f.id() {
result.push(Filter::from_apl_with_id(Some(id.to_string()), f.when())?);
continue;
}
let gen_id = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
result.push(Filter::from_apl_with_id(Some(gen_id), f.when())?);
}
Ok(result)
}
fn validate_service_activator_spec(spec: &ServiceActivatorSpec) -> Result<()> {
if spec.from().is_empty() {
return Err(Error::serialization("service.from must not be empty"));
}
if spec.to().is_empty() {
return Err(Error::serialization("service.to must not be empty"));
}
if spec.ref_name().is_empty() {
return Err(Error::serialization("service.ref-name must not be empty"));
}
Ok(())
}
fn service_processor_with_headers(id_opt: Option<&str>, ref_name: &str) -> ServiceProcessor {
let ref_name_copy = ref_name.to_string();
let id_copy = id_opt.map(|s| s.to_string());
let proc_fn: Box<dyn Fn(&mut crate::Exchange) -> Result<()> + Send + Sync + 'static> =
Box::new(move |exchange: &mut crate::Exchange| {
if let Some(ref id) = id_copy {
exchange.in_msg.set_header("service-activator.id", id);
}
exchange
.in_msg
.set_header("service-activator.ref-name", ref_name_copy.as_str());
Ok(())
});
ClosureProcessor::new(proc_fn)
}
pub fn build_service_from_spec(spec: ServiceActivatorSpec) -> Result<ServiceProcessor> {
validate_service_activator_spec(&spec)?;
Ok(service_processor_with_headers(spec.id(), spec.ref_name()))
}
pub fn build_service_activators_from_spec(
spec: ServiceActivatorsSpec,
) -> Result<Vec<ServiceProcessor>> {
let mut result = Vec::with_capacity(spec.services_activators().len());
const AUTO_PREFIX: &str = "service:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for s in spec.services_activators() {
validate_service_activator_spec(s)?;
if let Some(id) = s.id() {
if used.contains(id) {
return Err(Error::serialization(format!("duplicate service.id '{id}'")));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for s in spec.services_activators() {
let id_final = match s.id() {
Some(id) => id.to_string(),
None => {
let gen = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
gen
}
};
let proc = service_processor_with_headers(Some(&id_final), s.ref_name());
result.push(proc);
}
Ok(result)
}
fn http_inbound_builder_base(
host: &str,
port: u16,
path: &str,
channel: ChannelRef,
) -> allora_http::HttpInboundBuilder {
Adapter::inbound()
.http()
.host(host)
.port(port)
.base_path(path)
.channel(channel)
}
pub fn build_http_inbound_adapter_from_spec(
spec: HttpInboundAdapterSpec,
channel_lookup: &dyn Fn(&str) -> Option<ChannelRef>,
) -> Result<HttpInboundAdapter> {
let req_id = spec.request_channel();
if req_id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.request-channel must not be empty",
));
}
let ch = channel_lookup(req_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.request-channel",
req_id
))
})?;
let mut builder = http_inbound_builder_base(spec.host(), spec.port(), spec.path(), ch.clone());
if let Some(id) = spec.id() {
builder = builder.id(id);
}
if let Some(reply_id) = spec.reply_channel() {
let rc = channel_lookup(reply_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.reply-channel",
reply_id
))
})?;
builder = builder.reply_channel(rc);
}
if spec.reply_channel().is_none() {
builder = builder.mep(Mep::InOnly202);
}
Ok(builder.build())
}
pub fn build_http_inbound_adapters_from_spec(
spec: HttpInboundAdaptersSpec,
channel_lookup: &dyn Fn(&str) -> Option<ChannelRef>,
) -> Result<Vec<HttpInboundAdapter>> {
let mut result = Vec::with_capacity(spec.adapters().len());
const AUTO_PREFIX: &str = "http-inbound-adapter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for a in spec.adapters() {
let req_id = a.request_channel();
if req_id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.request-channel must not be empty",
));
}
if let Some(id) = a.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-inbound-adapter.id must not be empty",
));
}
if used.contains(id) {
return Err(Error::serialization(format!(
"duplicate http-inbound-adapter.id '{}'",
id
)));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for a in spec.adapters() {
let id_final = match a.id() {
Some(id) => id.to_string(),
None => {
let gen = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
let mut candidate = gen;
while used.contains(&candidate) {
candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
}
used.insert(candidate.clone());
candidate
}
};
let ch = channel_lookup(a.request_channel()).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.request-channel",
a.request_channel()
))
})?;
let mut builder = http_inbound_builder_base(a.host(), a.port(), a.path(), ch.clone())
.id(id_final.clone());
if let Some(reply_id) = a.reply_channel() {
let rc = channel_lookup(reply_id).ok_or_else(|| {
Error::serialization(format!(
"unknown channel id '{}' for http-inbound-adapter.reply-channel",
reply_id
))
})?;
builder = builder.reply_channel(rc);
} else {
builder = builder.mep(Mep::InOnly202);
}
result.push(builder.build());
}
Ok(result)
}
pub fn build_http_outbound_adapter_from_spec(
spec: HttpOutboundAdapterSpec,
) -> Result<HttpOutboundAdapter> {
if let Some(id) = spec.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-outbound-adapter.id must not be empty",
));
}
}
let mut builder = Adapter::outbound()
.http()
.host(spec.host())
.port(spec.port())
.base_path(spec.base_path());
if let Some(p) = spec.path() {
builder = builder.path(p);
}
if let Some(m) = spec.method() {
builder = builder.method(parse_http_method(m));
}
if !spec.use_out_msg() {
builder = builder.use_out_msg(false);
}
if let Some(id) = spec.id() {
builder = builder.id(id);
}
builder.build()
}
pub fn build_http_outbound_adapters_from_spec(
spec: HttpOutboundAdaptersSpec,
) -> Result<Vec<HttpOutboundAdapter>> {
let mut result = Vec::with_capacity(spec.adapters().len());
const AUTO_PREFIX: &str = "http-outbound-adapter:auto.";
let mut used = HashSet::new();
let mut max_auto_explicit = 0u64;
for a in spec.adapters() {
if let Some(id) = a.id() {
if id.is_empty() {
return Err(Error::serialization(
"http-outbound-adapter.id must not be empty",
));
}
if used.contains(id) {
return Err(Error::serialization(format!(
"duplicate http-outbound-adapter.id '{}'",
id
)));
}
if let Some(rest) = id.strip_prefix(AUTO_PREFIX) {
if let Ok(n) = rest.parse::<u64>() {
max_auto_explicit = max_auto_explicit.max(n);
}
}
used.insert(id.to_string());
}
}
let mut auto_ctr = max_auto_explicit + 1;
for a in spec.adapters() {
let id_final = match a.id() {
Some(id) => id.to_string(),
None => {
let mut candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
while used.contains(&candidate) {
candidate = format!("{AUTO_PREFIX}{auto_ctr}");
auto_ctr += 1;
}
used.insert(candidate.clone());
candidate
}
};
let mut builder = Adapter::outbound()
.http()
.host(a.host())
.port(a.port())
.base_path(a.base_path())
.id(id_final);
if let Some(p) = a.path() {
builder = builder.path(p);
}
if let Some(m) = a.method() {
builder = builder.method(parse_http_method(m));
}
if !a.use_out_msg() {
builder = builder.use_out_msg(false);
}
let built = builder.build()?;
result.push(built);
}
Ok(result)
}