allora_runtime/
runtime.rs

1//! High-level application facade.
2//!
3//! Minimal builder API for constructing an `AlloraRuntime` from a configuration file.
4//! Intended for embedding with only a couple of lines of code.
5//!
6//! # Quick Start
7//! ```no_run
8//! use allora_runtime::Runtime;
9//! let rt = Runtime::new().run()?; // attempts to load ./allora.yml
10//! # Ok::<_, allora_runtime::Error>(())
11//! ```
12//!
13//! # Overview
14//! The builder exposes three operations:
15//! * `new()` – create a fresh builder (auto-discovery enabled)
16//! * `with_config_file(path)` – provide an explicit configuration file path
17//! * `run()` – build and return an `AlloraRuntime`
18//!
19//! If no path is supplied, `run()` will try a sensible default (`allora.yml`).
20//! Errors surface via the returned `Result`.
21//!
22//! ## Custom Path
23//! ```no_run
24//! # use allora_runtime::Runtime;
25//! let rt = Runtime::new().with_config_file("examples/basic/helloworld/allora.yml").run()?;
26//! # Ok::<_, allora_runtime::Error>(())
27//! ```
28//!
29//! # Service Wiring & The `#[service]` Macro
30//! Services are described in configuration (YAML) under `service-activator:` blocks using a `ref-name` field.
31//! Example YAML fragment:
32//! ```yaml
33//! version: 1
34//! service-activators:
35//!   - ref-name: hello_world
36//!     from: inbound.orders
37//!     to: vetted.orders
38//! ```
39//! At runtime, wiring matches each `ref-name` value against inventory descriptors submitted via
40//! the `#[service]` attribute macro. The macro registers a constructor closure that builds a
41//! single shared instance (singleton) of your type via its zero-arg `new()` and invokes its async `process` method.
42//!
43//! ## Using `#[service]`
44//! Apply the attribute to an inherent `impl` block that contains a zero-arg `new()` method.
45//! If `name` is omitted the concrete type name (with spaces removed) is used.
46//!
47//! Supported forms:
48//! * `#[service]` – implicit name = type name
49//! * `#[service(name = "custom")]` – explicit reference name matching YAML `ref-name`
50//!
51//! Example:
52//! ```ignore
53//! use allora::{service, Service, Exchange, error::Result};
54//! #[derive(Debug)]
55//! struct Uppercase;
56//! impl Uppercase { pub fn new() -> Self { Self } }
57//! #[service(name="uppercase")]
58//! impl Uppercase {}
59//! #[async_trait::async_trait]
60//! impl Service for Uppercase {
61//!     async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62//!         if let Some(body) = exchange.in_msg.body_text() { exchange.out_msg = Some(Message::from_text(body.to_uppercase())); }
63//!         Ok(())
64//!     }
65//! }
66//! ```
67//!
68//! # Descriptor Wiring Criteria
69//! A service is wired only if:
70//! 1. Its `ref-name` matches a registered descriptor `name`.
71//! 2. Both `from` and `to` channel IDs exist in the runtime.
72//! 3. The inbound channel is currently a direct channel (other kinds may be supported later).
73//!
74//! # Logging Fields
75//! Structured fields emitted during build:
76//! * `config.path` / `config.canonical` – discovery details
77//! * `service_activator.ref_name`, `inbound`, `outbound` – wiring decisions
78//! * `wired.count` – number of services wired
79//! * `channel.id`, `kind` – registered channels
80//! * `descriptor.impl` (legacy field name in traces) now maps to descriptor `name`.
81//!
82//! # Testing Notes
83//! * Use #[tokio::test] for async service wiring tests.
84//! * Macro UI tests (see `tests/macro/`) validate diagnostics & naming.
85//! * All services share a single instance; design internal mutability carefully.
86use crate::{
87    all_service_descriptors,
88    channel::Channel,
89    dsl::build,
90    dsl::runtime::AlloraRuntime,
91    error::{Error, Result},
92    service,
93};
94use std::path::{Path, PathBuf};
95use std::sync::Arc;
96use tracing::{debug, info, trace};
97
98/// `Allora` builder holds configuration inputs prior to runtime construction.
99#[derive(Debug, Clone)]
100pub struct Runtime {
101    config_path: Option<PathBuf>,
102}
103
104impl Default for Runtime {
105    fn default() -> Self {
106        Self { config_path: None }
107    }
108}
109
110impl Runtime {
111    /// Create a new builder.
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Set an explicit configuration file path (overrides auto-discovery).
117    ///
118    /// Accepts any type implementing `AsRef<Path>` (e.g. `&str`, `PathBuf`).
119    /// Relative paths are resolved according to the current working directory.
120    ///
121    /// Does not validate path existence immediately; validation happens inside `run()`.
122    pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
123        self.config_path = Some(path.as_ref().to_path_buf());
124        self
125    }
126
127    /// Build the runtime from the explicit or default configuration file.
128    ///
129    /// # Configuration Discovery
130    ///
131    /// When no explicit path is provided via `with_config_file()`:
132    /// 1. First checks for `allora.yml` in the current working directory
133    /// 2. If not found, ascends parent directories from the executable location
134    ///    up to `MAX_PARENT_SEARCH_DEPTH` (10) levels
135    /// 3. Returns an error if no config is found
136    ///
137    /// # Note on Testing
138    ///
139    /// The parent directory ascent from the executable location is difficult to
140    /// test in isolation as it depends on the test executable's location.
141    /// Tests focus on explicit path configuration which covers the majority
142    /// of production use cases.
143    pub fn run(self) -> Result<AlloraRuntime> {
144        let explicit_opt = self.config_path.clone();
145        let path = match &explicit_opt {
146            Some(p) => p.clone(),
147            None => resolve_default_config(),
148        };
149
150        if let Some(parent) = path.parent() {
151            crate::logging::init_from_dir(parent);
152        } else {
153            crate::logging::init_from_dir(Path::new("."));
154        }
155
156        let exists = path.exists();
157        let canonical_opt = if exists {
158            path.canonicalize().ok()
159        } else {
160            None
161        };
162
163        // Log discovery/resolution with clearer semantics: canonical only if file exists.
164        if explicit_opt.is_none() {
165            info!(
166                config.path=%path.display(),
167                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
168                canonical=canonical_opt.is_some(),
169                auto=true,
170                "Configuration auto-discovered"
171            );
172        } else {
173            info!(
174                config.path=%path.display(),
175                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
176                canonical=canonical_opt.is_some(),
177                auto=false,
178                "Configuration resolved"
179            );
180        }
181
182        if !exists {
183            return Err(Error::runtime(format!(
184                "config file '{}' not found",
185                path.display()
186            )));
187        }
188
189        let rt = build(&path)?;
190        wire_services(&rt)?;
191        debug!(
192            channels = rt.channel_count(),
193            filters = rt.filter_count(),
194            "Runtime constructed"
195        );
196        Ok(rt)
197    }
198}
199
200pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
201    let descriptors = all_service_descriptors();
202    debug!(
203        service_activator.processors = rt.service_processor_count(),
204        descriptors = descriptors.len(),
205        "service wiring start"
206    );
207    for d in &descriptors {
208        trace!(descriptor.impl = d.name, "service descriptor loaded");
209    }
210    let mut service_activator_wirings: Vec<(
211        Arc<dyn Channel>,
212        Arc<dyn Channel>,
213        Arc<dyn service::Service>,
214        String,
215    )> = Vec::new();
216    for sp in rt.service_activator_processors().iter() {
217        let name_key = sp.ref_name();
218        trace!(
219            service_activator.ref_name = name_key,
220            service.id = sp.id(),
221            from = sp.from(),
222            to = sp.to(),
223            "evaluating service processor"
224        );
225        for desc in descriptors.iter() {
226            if desc.name == name_key {
227                trace!(service_activator.ref_name = name_key, "descriptor matched");
228                if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
229                    let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
230                    let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
231                    if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
232                        debug!(
233                            service_activator.ref_name = name_key,
234                            inbound = sp.from(),
235                            outbound = sp.to(),
236                            "channels resolved – scheduling wiring"
237                        );
238                        let proc_arc = (desc.constructor)();
239                        service_activator_wirings.push((
240                            in_arc.clone(),
241                            out_arc.clone(),
242                            proc_arc,
243                            name_key.to_string(),
244                        ));
245                    } else {
246                        debug!(
247                            service_activator.ref_name = name_key,
248                            inbound_found = inbound_arc_opt.is_some(),
249                            outbound_found = outbound_arc_opt.is_some(),
250                            "channel resolution failed – wiring skipped"
251                        );
252                    }
253                } else {
254                    debug!(
255                        service_activator.ref_name = name_key,
256                        "channel ids not found – skipped"
257                    );
258                }
259            }
260        }
261    }
262    if service_activator_wirings.is_empty() {
263        info!("no services wired (none matched or channels missing)");
264    } else {
265        info!(
266            wired.count = service_activator_wirings.len(),
267            "service wiring collected"
268        );
269    }
270    for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
271        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
272            let outbound_arc_dyn = out_arc.clone();
273            let inbound_id = inbound_direct.id().to_string();
274            let name_key_closure = name_key.clone();
275            let proc_shared = proc_arc.clone();
276            let sub_count = inbound_direct.subscribe(move |exchange| {
277                let outbound_clone = outbound_arc_dyn.clone();
278                let proc_task = proc_shared.clone();
279                let name_key_val = name_key_closure.clone();
280                tokio::spawn(async move {
281                    let mut ex_mut = exchange;
282                    if let Err(err) = proc_task.process(&mut ex_mut).await {
283                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
284                        return;
285                    }
286                    if let Err(err) = outbound_clone.send(ex_mut).await {
287                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
288                    }
289                });
290                Ok(())
291            });
292            debug!(
293                service_activator.ref_name = name_key,
294                inbound = inbound_id,
295                subscribers = sub_count,
296                "service wired"
297            );
298        } else {
299            debug!(
300                service_activator.ref_name = name_key,
301                inbound_id = in_arc.id(),
302                "inbound channel not direct – skipping wiring"
303            );
304        }
305    }
306    for ch in rt.channels() {
307        debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
308    }
309    debug!(
310        services.wired = rt.service_processor_count(),
311        "runtime wiring complete"
312    );
313    Ok(())
314}
315
316fn resolve_default_config() -> PathBuf {
317    use std::env;
318
319    // 0. CLI override: --runtime <path> or --runtime=/path
320    //
321    // Example:
322    //   cargo run --manifest-path examples/basic/http/Cargo.toml -- \
323    //     --runtime examples/basic/http/allora.yml
324    //
325    // or
326    //
327    //   cargo run -- ... --runtime=examples/basic/http/allora.yml
328    let mut args = env::args().skip(1); // skip program name
329    let mut runtime_override: Option<String> = None;
330
331    while let Some(arg) = args.next() {
332        if arg == "--runtime" {
333            if let Some(val) = args.next() {
334                runtime_override = Some(val);
335            }
336            break;
337        } else if let Some(rest) = arg.strip_prefix("--runtime=") {
338            runtime_override = Some(rest.to_string());
339            break;
340        }
341    }
342
343    if let Some(raw) = runtime_override {
344        let p = PathBuf::from(raw);
345        // If it's a directory, assume allora.yml inside it.
346        if p.is_dir() {
347            return p.join("allora.yml");
348        } else {
349            return p;
350        }
351    }
352
353    // 1. Optional env override (useful for CI or scripts)
354    if let Ok(raw) = env::var("ALLORA_CONFIG") {
355        let p = PathBuf::from(raw);
356        if p.is_dir() {
357            return p.join("allora.yml");
358        } else {
359            return p;
360        }
361    }
362
363    // 2. Prefer ./allora.yml in the current working directory (dev-friendly)
364    let cwd_candidate = PathBuf::from("allora.yml");
365    if cwd_candidate.exists() {
366        return cwd_candidate;
367    }
368
369    // 3. Then try <directory_of_executable>/allora.yml (release-friendly)
370    if let Ok(exe) = env::current_exe() {
371        if let Some(dir) = exe.parent() {
372            let candidate = dir.join("allora.yml");
373            if candidate.exists() {
374                return candidate;
375            }
376        }
377    }
378
379    // 4. Fallback (will cause a clear error in `run()` if missing)
380    PathBuf::from("allora.yml")
381}