allora_runtime/runtime.rs
1//! High-level application facade.
2//!
3//! Minimal builder API for constructing an `AlloraRuntime` from a configuration file.
4//! Intended for embedding with only a couple of lines of code.
5//!
6//! # Quick Start
7//! ```no_run
8//! use allora_runtime::Runtime;
9//! let rt = Runtime::new().run()?; // attempts to load ./allora.yml
10//! # Ok::<_, allora_runtime::Error>(())
11//! ```
12//!
13//! # Overview
14//! The builder exposes three operations:
15//! * `new()` – create a fresh builder (auto-discovery enabled)
16//! * `with_config_file(path)` – provide an explicit configuration file path
17//! * `run()` – build and return an `AlloraRuntime`
18//!
19//! If no path is supplied, `run()` will try a sensible default (`allora.yml`).
20//! Errors surface via the returned `Result`.
21//!
22//! ## Custom Path
23//! ```no_run
24//! # use allora_runtime::Runtime;
25//! let rt = Runtime::new().with_config_file("examples/basic/helloworld/allora.yml").run()?;
26//! # Ok::<_, allora_runtime::Error>(())
27//! ```
28//!
29//! # Service Wiring & The `#[service]` Macro
30//! Services are described in configuration (YAML) under `service-activator:` blocks using a `ref-name` field.
31//! Example YAML fragment:
32//! ```yaml
33//! version: 1
34//! service-activators:
35//! - ref-name: hello_world
36//! from: inbound.orders
37//! to: vetted.orders
38//! ```
39//! At runtime, wiring matches each `ref-name` value against inventory descriptors submitted via
40//! the `#[service]` attribute macro. The macro registers a constructor closure that builds a
41//! single shared instance (singleton) of your type via its zero-arg `new()` and invokes its async `process` method.
42//!
43//! ## Using `#[service]`
44//! Apply the attribute to an inherent `impl` block that contains a zero-arg `new()` method.
45//! If `name` is omitted the concrete type name (with spaces removed) is used.
46//!
47//! Supported forms:
48//! * `#[service]` – implicit name = type name
49//! * `#[service(name = "custom")]` – explicit reference name matching YAML `ref-name`
50//!
51//! Example:
52//! ```ignore
53//! use allora::{service, Service, Exchange, error::Result};
54//! #[derive(Debug)]
55//! struct Uppercase;
56//! impl Uppercase { pub fn new() -> Self { Self } }
57//! #[service(name="uppercase")]
58//! impl Uppercase {}
59//! #[async_trait::async_trait]
60//! impl Service for Uppercase {
61//! async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62//! if let Some(body) = exchange.in_msg.body_text() { exchange.out_msg = Some(Message::from_text(body.to_uppercase())); }
63//! Ok(())
64//! }
65//! }
66//! ```
67//!
68//! # Descriptor Wiring Criteria
69//! A service is wired only if:
70//! 1. Its `ref-name` matches a registered descriptor `name`.
71//! 2. Both `from` and `to` channel IDs exist in the runtime.
72//! 3. The inbound channel is currently a direct channel (other kinds may be supported later).
73//!
74//! # Logging Fields
75//! Structured fields emitted during build:
76//! * `config.path` / `config.canonical` – discovery details
77//! * `service_activator.ref_name`, `inbound`, `outbound` – wiring decisions
78//! * `wired.count` – number of services wired
79//! * `channel.id`, `kind` – registered channels
80//! * `descriptor.impl` (legacy field name in traces) now maps to descriptor `name`.
81//!
82//! # Testing Notes
83//! * Use #[tokio::test] for async service wiring tests.
84//! * Macro UI tests (see `tests/macro/`) validate diagnostics & naming.
85//! * All services share a single instance; design internal mutability carefully.
86use crate::{
87 all_service_descriptors,
88 channel::Channel,
89 dsl::build,
90 dsl::runtime::AlloraRuntime,
91 error::{Error, Result},
92 service,
93};
94use std::path::{Path, PathBuf};
95use std::sync::Arc;
96use tracing::{debug, info, trace};
97
98/// `Allora` builder holds configuration inputs prior to runtime construction.
99#[derive(Debug, Clone)]
100pub struct Runtime {
101 config_path: Option<PathBuf>,
102}
103
104impl Default for Runtime {
105 fn default() -> Self {
106 Self { config_path: None }
107 }
108}
109
110impl Runtime {
111 /// Create a new builder.
112 pub fn new() -> Self {
113 Self::default()
114 }
115
116 /// Set an explicit configuration file path (overrides auto-discovery).
117 ///
118 /// Accepts any type implementing `AsRef<Path>` (e.g. `&str`, `PathBuf`).
119 /// Relative paths are resolved according to the current working directory.
120 ///
121 /// Does not validate path existence immediately; validation happens inside `run()`.
122 pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
123 self.config_path = Some(path.as_ref().to_path_buf());
124 self
125 }
126
127 /// Build the runtime from the explicit or default configuration file.
128 ///
129 /// # Configuration Discovery
130 ///
131 /// When no explicit path is provided via `with_config_file()`:
132 /// 1. First checks for `allora.yml` in the current working directory
133 /// 2. If not found, ascends parent directories from the executable location
134 /// up to `MAX_PARENT_SEARCH_DEPTH` (10) levels
135 /// 3. Returns an error if no config is found
136 ///
137 /// # Note on Testing
138 ///
139 /// The parent directory ascent from the executable location is difficult to
140 /// test in isolation as it depends on the test executable's location.
141 /// Tests focus on explicit path configuration which covers the majority
142 /// of production use cases.
143 pub fn run(self) -> Result<AlloraRuntime> {
144 let explicit_opt = self.config_path.clone();
145 let path = match &explicit_opt {
146 Some(p) => p.clone(),
147 None => resolve_default_config(),
148 };
149
150 if let Some(parent) = path.parent() {
151 crate::logging::init_from_dir(parent);
152 } else {
153 crate::logging::init_from_dir(Path::new("."));
154 }
155
156 let exists = path.exists();
157 let canonical_opt = if exists {
158 path.canonicalize().ok()
159 } else {
160 None
161 };
162
163 // Log discovery/resolution with clearer semantics: canonical only if file exists.
164 if explicit_opt.is_none() {
165 info!(
166 config.path=%path.display(),
167 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
168 canonical=canonical_opt.is_some(),
169 auto=true,
170 "Configuration auto-discovered"
171 );
172 } else {
173 info!(
174 config.path=%path.display(),
175 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
176 canonical=canonical_opt.is_some(),
177 auto=false,
178 "Configuration resolved"
179 );
180 }
181
182 if !exists {
183 return Err(Error::runtime(format!(
184 "config file '{}' not found",
185 path.display()
186 )));
187 }
188
189 let rt = build(&path)?;
190 wire_services(&rt)?;
191 debug!(
192 channels = rt.channel_count(),
193 filters = rt.filter_count(),
194 "Runtime constructed"
195 );
196 Ok(rt)
197 }
198}
199
200pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
201 let descriptors = all_service_descriptors();
202 debug!(
203 service_activator.processors = rt.service_processor_count(),
204 descriptors = descriptors.len(),
205 "service wiring start"
206 );
207 for d in &descriptors {
208 trace!(descriptor.impl = d.name, "service descriptor loaded");
209 }
210 let mut service_activator_wirings: Vec<(
211 Arc<dyn Channel>,
212 Arc<dyn Channel>,
213 Arc<dyn service::Service>,
214 String,
215 )> = Vec::new();
216 for sp in rt.service_activator_processors().iter() {
217 let name_key = sp.ref_name();
218 trace!(
219 service_activator.ref_name = name_key,
220 service.id = sp.id(),
221 from = sp.from(),
222 to = sp.to(),
223 "evaluating service processor"
224 );
225 for desc in descriptors.iter() {
226 if desc.name == name_key {
227 trace!(service_activator.ref_name = name_key, "descriptor matched");
228 if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
229 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
230 let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
231 if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
232 debug!(
233 service_activator.ref_name = name_key,
234 inbound = sp.from(),
235 outbound = sp.to(),
236 "channels resolved – scheduling wiring"
237 );
238 let proc_arc = (desc.constructor)();
239 service_activator_wirings.push((
240 in_arc.clone(),
241 out_arc.clone(),
242 proc_arc,
243 name_key.to_string(),
244 ));
245 } else {
246 debug!(
247 service_activator.ref_name = name_key,
248 inbound_found = inbound_arc_opt.is_some(),
249 outbound_found = outbound_arc_opt.is_some(),
250 "channel resolution failed – wiring skipped"
251 );
252 }
253 } else {
254 debug!(
255 service_activator.ref_name = name_key,
256 "channel ids not found – skipped"
257 );
258 }
259 }
260 }
261 }
262 if service_activator_wirings.is_empty() {
263 info!("no services wired (none matched or channels missing)");
264 } else {
265 info!(
266 wired.count = service_activator_wirings.len(),
267 "service wiring collected"
268 );
269 }
270 for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
271 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
272 let outbound_arc_dyn = out_arc.clone();
273 let inbound_id = inbound_direct.id().to_string();
274 let name_key_closure = name_key.clone();
275 let proc_shared = proc_arc.clone();
276 let sub_count = inbound_direct.subscribe(move |exchange| {
277 let outbound_clone = outbound_arc_dyn.clone();
278 let proc_task = proc_shared.clone();
279 let name_key_val = name_key_closure.clone();
280 tokio::spawn(async move {
281 let mut ex_mut = exchange;
282 if let Err(err) = proc_task.process(&mut ex_mut).await {
283 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
284 return;
285 }
286 if let Err(err) = outbound_clone.send(ex_mut).await {
287 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
288 }
289 });
290 Ok(())
291 });
292 debug!(
293 service_activator.ref_name = name_key,
294 inbound = inbound_id,
295 subscribers = sub_count,
296 "service wired"
297 );
298 } else {
299 debug!(
300 service_activator.ref_name = name_key,
301 inbound_id = in_arc.id(),
302 "inbound channel not direct – skipping wiring"
303 );
304 }
305 }
306 for ch in rt.channels() {
307 debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
308 }
309 debug!(
310 services.wired = rt.service_processor_count(),
311 "runtime wiring complete"
312 );
313 Ok(())
314}
315
316fn resolve_default_config() -> PathBuf {
317 use std::env;
318
319 // 0. CLI override: --runtime <path> or --runtime=/path
320 //
321 // Example:
322 // cargo run --manifest-path examples/basic/http/Cargo.toml -- \
323 // --runtime examples/basic/http/allora.yml
324 //
325 // or
326 //
327 // cargo run -- ... --runtime=examples/basic/http/allora.yml
328 let mut args = env::args().skip(1); // skip program name
329 let mut runtime_override: Option<String> = None;
330
331 while let Some(arg) = args.next() {
332 if arg == "--runtime" {
333 if let Some(val) = args.next() {
334 runtime_override = Some(val);
335 }
336 break;
337 } else if let Some(rest) = arg.strip_prefix("--runtime=") {
338 runtime_override = Some(rest.to_string());
339 break;
340 }
341 }
342
343 if let Some(raw) = runtime_override {
344 let p = PathBuf::from(raw);
345 // If it's a directory, assume allora.yml inside it.
346 if p.is_dir() {
347 return p.join("allora.yml");
348 } else {
349 return p;
350 }
351 }
352
353 // 1. Optional env override (useful for CI or scripts)
354 if let Ok(raw) = env::var("ALLORA_CONFIG") {
355 let p = PathBuf::from(raw);
356 if p.is_dir() {
357 return p.join("allora.yml");
358 } else {
359 return p;
360 }
361 }
362
363 // 2. Prefer ./allora.yml in the current working directory (dev-friendly)
364 let cwd_candidate = PathBuf::from("allora.yml");
365 if cwd_candidate.exists() {
366 return cwd_candidate;
367 }
368
369 // 3. Then try <directory_of_executable>/allora.yml (release-friendly)
370 if let Ok(exe) = env::current_exe() {
371 if let Some(dir) = exe.parent() {
372 let candidate = dir.join("allora.yml");
373 if candidate.exists() {
374 return candidate;
375 }
376 }
377 }
378
379 // 4. Fallback (will cause a clear error in `run()` if missing)
380 PathBuf::from("allora.yml")
381}