use crate::{
all_service_descriptors,
channel::Channel,
dsl::build,
dsl::runtime::AlloraRuntime,
error::{Error, Result},
service,
};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing::{debug, info, trace};
#[derive(Debug, Clone)]
pub struct Runtime {
config_path: Option<PathBuf>,
}
impl Default for Runtime {
fn default() -> Self {
Self { config_path: None }
}
}
impl Runtime {
pub fn new() -> Self {
Self::default()
}
pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
self.config_path = Some(path.as_ref().to_path_buf());
self
}
pub fn run(self) -> Result<AlloraRuntime> {
let explicit_opt = self.config_path.clone();
let path = match &explicit_opt {
Some(p) => p.clone(),
None => resolve_default_config(),
};
if let Some(parent) = path.parent() {
crate::logging::init_from_dir(parent);
} else {
crate::logging::init_from_dir(Path::new("."));
}
let exists = path.exists();
let canonical_opt = if exists {
path.canonicalize().ok()
} else {
None
};
if explicit_opt.is_none() {
info!(
config.path=%path.display(),
config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
canonical=canonical_opt.is_some(),
auto=true,
"Configuration auto-discovered"
);
} else {
info!(
config.path=%path.display(),
config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
canonical=canonical_opt.is_some(),
auto=false,
"Configuration resolved"
);
}
if !exists {
return Err(Error::runtime(format!(
"config file '{}' not found",
path.display()
)));
}
let rt = build(&path)?;
wire_services(&rt)?;
debug!(
channels = rt.channel_count(),
filters = rt.filter_count(),
"Runtime constructed"
);
Ok(rt)
}
}
pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
let descriptors = all_service_descriptors();
debug!(
service_activator.processors = rt.service_processor_count(),
descriptors = descriptors.len(),
"service wiring start"
);
for d in &descriptors {
trace!(descriptor.impl = d.name, "service descriptor loaded");
}
let mut service_activator_wirings: Vec<(
Arc<dyn Channel>,
Arc<dyn Channel>,
Arc<dyn service::Service>,
String,
)> = Vec::new();
for sp in rt.service_activator_processors().iter() {
let name_key = sp.ref_name();
trace!(
service_activator.ref_name = name_key,
service.id = sp.id(),
from = sp.from(),
to = sp.to(),
"evaluating service processor"
);
for desc in descriptors.iter() {
if desc.name == name_key {
trace!(service_activator.ref_name = name_key, "descriptor matched");
if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
debug!(
service_activator.ref_name = name_key,
inbound = sp.from(),
outbound = sp.to(),
"channels resolved – scheduling wiring"
);
let proc_arc = (desc.constructor)();
service_activator_wirings.push((
in_arc.clone(),
out_arc.clone(),
proc_arc,
name_key.to_string(),
));
} else {
debug!(
service_activator.ref_name = name_key,
inbound_found = inbound_arc_opt.is_some(),
outbound_found = outbound_arc_opt.is_some(),
"channel resolution failed – wiring skipped"
);
}
} else {
debug!(
service_activator.ref_name = name_key,
"channel ids not found – skipped"
);
}
}
}
}
if service_activator_wirings.is_empty() {
info!("no services wired (none matched or channels missing)");
} else {
info!(
wired.count = service_activator_wirings.len(),
"service wiring collected"
);
}
for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
let outbound_arc_dyn = out_arc.clone();
let inbound_id = inbound_direct.id().to_string();
let name_key_closure = name_key.clone();
let proc_shared = proc_arc.clone();
let sub_count = inbound_direct.subscribe(move |exchange| {
let outbound_clone = outbound_arc_dyn.clone();
let proc_task = proc_shared.clone();
let name_key_val = name_key_closure.clone();
tokio::spawn(async move {
let mut ex_mut = exchange;
if let Err(err) = proc_task.process(&mut ex_mut).await {
tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
return;
}
if let Err(err) = outbound_clone.send(ex_mut).await {
tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
}
});
Ok(())
});
debug!(
service_activator.ref_name = name_key,
inbound = inbound_id,
subscribers = sub_count,
"service wired"
);
} else {
debug!(
service_activator.ref_name = name_key,
inbound_id = in_arc.id(),
"inbound channel not direct – skipping wiring"
);
}
}
for ch in rt.channels() {
debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
}
debug!(
services.wired = rt.service_processor_count(),
"runtime wiring complete"
);
Ok(())
}
fn resolve_default_config() -> PathBuf {
use std::env;
let mut args = env::args().skip(1); let mut runtime_override: Option<String> = None;
while let Some(arg) = args.next() {
if arg == "--runtime" {
if let Some(val) = args.next() {
runtime_override = Some(val);
}
break;
} else if let Some(rest) = arg.strip_prefix("--runtime=") {
runtime_override = Some(rest.to_string());
break;
}
}
if let Some(raw) = runtime_override {
let p = PathBuf::from(raw);
if p.is_dir() {
return p.join("allora.yml");
} else {
return p;
}
}
if let Ok(raw) = env::var("ALLORA_CONFIG") {
let p = PathBuf::from(raw);
if p.is_dir() {
return p.join("allora.yml");
} else {
return p;
}
}
let cwd_candidate = PathBuf::from("allora.yml");
if cwd_candidate.exists() {
return cwd_candidate;
}
if let Ok(exe) = env::current_exe() {
if let Some(dir) = exe.parent() {
let candidate = dir.join("allora.yml");
if candidate.exists() {
return candidate;
}
}
}
PathBuf::from("allora.yml")
}