Skip to main content

allora_runtime/
runtime.rs

1//! High-level application facade.
2//!
3//! Minimal builder API for constructing an `AlloraRuntime` from a configuration file.
4//! Intended for embedding with only a couple of lines of code.
5//!
6//! # Quick Start
7//! ```no_run
8//! use allora_runtime::Runtime;
9//! let rt = Runtime::new().run()?; // attempts to load ./allora.yml
10//! # Ok::<_, allora_runtime::Error>(())
11//! ```
12//!
13//! # Overview
14//! The builder exposes three operations:
15//! * `new()` – create a fresh builder (auto-discovery enabled)
16//! * `with_config_file(path)` – provide an explicit configuration file path
17//! * `run()` – build and return an `AlloraRuntime`
18//!
19//! If no path is supplied, `run()` will try a sensible default (`allora.yml`).
20//! Errors surface via the returned `Result`.
21//!
22//! ## Custom Path
23//! ```no_run
24//! # use allora_runtime::Runtime;
25//! let rt = Runtime::new().with_config_file("examples/basic/helloworld/allora.yml").run()?;
26//! # Ok::<_, allora_runtime::Error>(())
27//! ```
28//!
29//! # Service Wiring & The `#[service]` Macro
30//! Services are described in configuration (YAML) under `service-activator:` blocks using a `ref-name` field.
31//! Example YAML fragment:
32//! ```yaml
33//! version: 1
34//! service-activators:
35//!   - ref-name: hello_world
36//!     from: inbound.orders
37//!     to: vetted.orders
38//! ```
39//! At runtime, wiring matches each `ref-name` value against inventory descriptors submitted via
40//! the `#[service]` attribute macro. The macro registers a constructor closure that builds a
41//! single shared instance (singleton) of your type via its zero-arg `new()` and invokes its async `process` method.
42//!
43//! ## Using `#[service]`
44//! Apply the attribute to an inherent `impl` block that contains a zero-arg `new()` method.
45//! If `name` is omitted the concrete type name (with spaces removed) is used.
46//!
47//! Supported forms:
48//! * `#[service]` – implicit name = type name
49//! * `#[service(name = "custom")]` – explicit reference name matching YAML `ref-name`
50//!
51//! Example:
52//! ```ignore
53//! use allora::{service, Service, Exchange, error::Result};
54//! #[derive(Debug)]
55//! struct Uppercase;
56//! impl Uppercase { pub fn new() -> Self { Self } }
57//! #[service(name="uppercase")]
58//! impl Uppercase {}
59//! #[async_trait::async_trait]
60//! impl Service for Uppercase {
61//!     async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62//!         if let Some(body) = exchange.in_msg.body_text() { exchange.out_msg = Some(Message::from_text(body.to_uppercase())); }
63//!         Ok(())
64//!     }
65//! }
66//! ```
67//!
68//! # Descriptor Wiring Criteria
69//! A service is wired only if:
70//! 1. Its `ref-name` matches a registered descriptor `name`.
71//! 2. Both `from` and `to` channel IDs exist in the runtime.
72//! 3. The inbound channel is currently a direct channel (other kinds may be supported later).
73//!
74//! # Logging Fields
75//! Structured fields emitted during build:
76//! * `config.path` / `config.canonical` – discovery details
77//! * `service_activator.ref_name`, `inbound`, `outbound` – wiring decisions
78//! * `wired.count` – number of services wired
79//! * `channel.id`, `kind` – registered channels
80//! * `descriptor.impl` (legacy field name in traces) now maps to descriptor `name`.
81//!
82//! # Testing Notes
83//! * Use #[tokio::test] for async service wiring tests.
84//! * Macro UI tests (see `tests/macro/`) validate diagnostics & naming.
85//! * All services share a single instance; design internal mutability carefully.
86use crate::{
87    all_service_descriptors,
88    channel::Channel,
89    dsl::build,
90    dsl::runtime::AlloraRuntime,
91    error::{Error, Result},
92    service,
93};
94use std::path::{Path, PathBuf};
95use std::sync::Arc;
96use tracing::{debug, info, trace};
97
98/// `Allora` builder holds configuration inputs prior to runtime construction.
99#[derive(Debug, Clone)]
100pub struct Runtime {
101    config_path: Option<PathBuf>,
102}
103
104impl Default for Runtime {
105    fn default() -> Self {
106        Self { config_path: None }
107    }
108}
109
110impl Runtime {
111    /// Create a new builder.
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Set an explicit configuration file path (overrides auto-discovery).
117    ///
118    /// Accepts any type implementing `AsRef<Path>` (e.g. `&str`, `PathBuf`).
119    /// Relative paths are resolved according to the current working directory.
120    ///
121    /// Does not validate path existence immediately; validation happens inside `run()`.
122    pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
123        self.config_path = Some(path.as_ref().to_path_buf());
124        self
125    }
126
127    /// Build the runtime from the explicit or default configuration file.
128    ///
129    /// # Configuration Discovery
130    ///
131    /// When no explicit path is provided via `with_config_file()`:
132    /// 1. First checks for `allora.yml` in the current working directory
133    /// 2. If not found, ascends parent directories from the executable location
134    ///    up to `MAX_PARENT_SEARCH_DEPTH` (10) levels
135    /// 3. Returns an error if no config is found
136    ///
137    /// # Note on Testing
138    ///
139    /// The parent directory ascent from the executable location is difficult to
140    /// test in isolation as it depends on the test executable's location.
141    /// Tests focus on explicit path configuration which covers the majority
142    /// of production use cases.
143    pub fn run(self) -> Result<AlloraRuntime> {
144        let explicit_opt = self.config_path.clone();
145        let path = match &explicit_opt {
146            Some(p) => p.clone(),
147            None => resolve_default_config(),
148        };
149
150        if let Some(parent) = path.parent() {
151            crate::logging::init_from_dir(parent);
152        } else {
153            crate::logging::init_from_dir(Path::new("."));
154        }
155
156        let exists = path.exists();
157        let canonical_opt = if exists {
158            path.canonicalize().ok()
159        } else {
160            None
161        };
162
163        // Log discovery/resolution with clearer semantics: canonical only if file exists.
164        if explicit_opt.is_none() {
165            info!(
166                config.path=%path.display(),
167                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
168                canonical=canonical_opt.is_some(),
169                auto=true,
170                "Configuration auto-discovered"
171            );
172        } else {
173            info!(
174                config.path=%path.display(),
175                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
176                canonical=canonical_opt.is_some(),
177                auto=false,
178                "Configuration resolved"
179            );
180        }
181
182        if !exists {
183            return Err(Error::runtime(format!(
184                "config file '{}' not found",
185                path.display()
186            )));
187        }
188
189        let rt = build(&path)?;
190        wire_services(&rt)?;
191        wire_filters(&rt)?;
192        debug!(
193            channels = rt.channel_count(),
194            filters = rt.filter_count(),
195            "Runtime constructed"
196        );
197        Ok(rt)
198    }
199}
200
201pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
202    let descriptors = all_service_descriptors();
203    debug!(
204        service_activator.processors = rt.service_processor_count(),
205        descriptors = descriptors.len(),
206        "service wiring start"
207    );
208    for d in &descriptors {
209        trace!(descriptor.impl = d.name, "service descriptor loaded");
210    }
211    let mut service_activator_wirings: Vec<(
212        Arc<dyn Channel>,
213        Arc<dyn Channel>,
214        Arc<dyn service::Service>,
215        String,
216    )> = Vec::new();
217    for sp in rt.service_activator_processors().iter() {
218        let name_key = sp.ref_name();
219        trace!(
220            service_activator.ref_name = name_key,
221            service.id = sp.id(),
222            from = sp.from(),
223            to = sp.to(),
224            "evaluating service processor"
225        );
226        for desc in descriptors.iter() {
227            if desc.name == name_key {
228                trace!(service_activator.ref_name = name_key, "descriptor matched");
229                if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
230                    let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
231                    let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
232                    if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
233                        debug!(
234                            service_activator.ref_name = name_key,
235                            inbound = sp.from(),
236                            outbound = sp.to(),
237                            "channels resolved – scheduling wiring"
238                        );
239                        let proc_arc = (desc.constructor)();
240                        service_activator_wirings.push((
241                            in_arc.clone(),
242                            out_arc.clone(),
243                            proc_arc,
244                            name_key.to_string(),
245                        ));
246                    } else {
247                        debug!(
248                            service_activator.ref_name = name_key,
249                            inbound_found = inbound_arc_opt.is_some(),
250                            outbound_found = outbound_arc_opt.is_some(),
251                            "channel resolution failed – wiring skipped"
252                        );
253                    }
254                } else {
255                    debug!(
256                        service_activator.ref_name = name_key,
257                        "channel ids not found – skipped"
258                    );
259                }
260            }
261        }
262    }
263    if service_activator_wirings.is_empty() {
264        info!("no services wired (none matched or channels missing)");
265    } else {
266        info!(
267            wired.count = service_activator_wirings.len(),
268            "service wiring collected"
269        );
270    }
271    for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
272        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
273            let outbound_arc_dyn = out_arc.clone();
274            let inbound_id = inbound_direct.id().to_string();
275            let name_key_closure = name_key.clone();
276            let proc_shared = proc_arc.clone();
277            let sub_count = inbound_direct.subscribe(move |exchange| {
278                let outbound_clone = outbound_arc_dyn.clone();
279                let proc_task = proc_shared.clone();
280                let name_key_val = name_key_closure.clone();
281                tokio::spawn(async move {
282                    let mut ex_mut = exchange;
283                    if let Err(err) = proc_task.process(&mut ex_mut).await {
284                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
285                        return;
286                    }
287                    if let Err(err) = outbound_clone.send(ex_mut).await {
288                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
289                    }
290                });
291                Ok(())
292            });
293            debug!(
294                service_activator.ref_name = name_key,
295                inbound = inbound_id,
296                subscribers = sub_count,
297                "service wired"
298            );
299        } else {
300            debug!(
301                service_activator.ref_name = name_key,
302                inbound_id = in_arc.id(),
303                "inbound channel not direct – skipping wiring"
304            );
305        }
306    }
307    for ch in rt.channels() {
308        debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
309    }
310    debug!(
311        services.wired = rt.service_processor_count(),
312        "runtime wiring complete"
313    );
314    Ok(())
315}
316
317/// Wire each [`FilterActivation`] in the runtime to its inbound channel.
318///
319/// For every activation with a non-`None` `to:`, this:
320///
321/// 1. Resolves `from:` and `to:` to channels registered on the runtime.
322/// 2. Subscribes a closure on the inbound `DirectChannel`. On each
323///    exchange the closure:
324///    - evaluates `filter.accepts(&exchange)` (synchronous; APL is
325///      cheap),
326///    - if accepted → forwards the exchange to the outbound channel,
327///    - if rejected → silently drops (with a `trace!`-level log). Filter
328///      rejection is a normal event in EIP semantics, not an error.
329/// 3. Logs `outbound send failed` at `error!` only for genuine send
330///    errors on the outbound side.
331///
332/// Filters with `to:` = `None` are predicate-only (kept on the runtime
333/// for callers that invoke `filter.accepts(...)` directly) and are not
334/// auto-wired.
335///
336/// Mirrors the structure of [`wire_services`] so the two paths are
337/// consistent in field naming, ordering, and failure modes.
338pub fn wire_filters(rt: &AlloraRuntime) -> Result<()> {
339    debug!(
340        filter.activations = rt.filter_count(),
341        "filter wiring start"
342    );
343    let mut filter_wirings: Vec<(
344        Arc<dyn Channel>,
345        Arc<dyn Channel>,
346        Arc<crate::Filter>,
347        String,
348    )> = Vec::new();
349    for fa in rt.filters().iter() {
350        let Some(to) = fa.to() else {
351            debug!(
352                filter.id = fa.id(),
353                from = fa.from(),
354                "filter has no `to:` — predicate-only, not auto-wired"
355            );
356            continue;
357        };
358        trace!(
359            filter.id = fa.id(),
360            from = fa.from(),
361            to = to,
362            "evaluating filter activation"
363        );
364        let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == fa.from());
365        let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == to);
366        if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
367            debug!(
368                filter.id = fa.id(),
369                inbound = fa.from(),
370                outbound = to,
371                "channels resolved – scheduling filter wiring"
372            );
373            filter_wirings.push((
374                in_arc.clone(),
375                out_arc.clone(),
376                fa.filter().clone(),
377                fa.id().to_string(),
378            ));
379        } else {
380            debug!(
381                filter.id = fa.id(),
382                from = fa.from(),
383                to = to,
384                inbound_found = inbound_arc_opt.is_some(),
385                outbound_found = outbound_arc_opt.is_some(),
386                "filter channel resolution failed – wiring skipped"
387            );
388        }
389    }
390    if filter_wirings.is_empty() {
391        info!("no filters wired (none had `to:` channels resolvable on the runtime)");
392    } else {
393        info!(
394            wired.count = filter_wirings.len(),
395            "filter wiring collected"
396        );
397    }
398    for (in_arc, out_arc, filter_arc, id) in filter_wirings.into_iter() {
399        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
400            let outbound_arc_dyn = out_arc.clone();
401            let inbound_id = inbound_direct.id().to_string();
402            let id_closure = id.clone();
403            let sub_count = inbound_direct.subscribe(move |exchange| {
404                let outbound_clone = outbound_arc_dyn.clone();
405                let f = filter_arc.clone();
406                let id_val = id_closure.clone();
407                tokio::spawn(async move {
408                    if !f.accepts(&exchange) {
409                        trace!(target="allora::filter", filter.id=%id_val, "filter rejected exchange (dropped)");
410                        return;
411                    }
412                    if let Err(err) = outbound_clone.send(exchange).await {
413                        tracing::error!(target="allora::filter", filter.id=%id_val, error=%err, "Filter outbound channel send failed");
414                    }
415                });
416                Ok(())
417            });
418            debug!(
419                filter.id = id,
420                inbound = inbound_id,
421                subscribers = sub_count,
422                "filter wired"
423            );
424        } else {
425            debug!(
426                filter.id = id,
427                inbound_id = in_arc.id(),
428                "inbound channel not direct – skipping filter wiring"
429            );
430        }
431    }
432    debug!(
433        filters.wired = rt.filter_count(),
434        "filter runtime wiring complete"
435    );
436    Ok(())
437}
438
439fn resolve_default_config() -> PathBuf {
440    use std::env;
441
442    // 0. CLI override: --runtime <path> or --runtime=/path
443    //
444    // Example:
445    //   cargo run --manifest-path examples/basic/http/Cargo.toml -- \
446    //     --runtime examples/basic/http/allora.yml
447    //
448    // or
449    //
450    //   cargo run -- ... --runtime=examples/basic/http/allora.yml
451    let mut args = env::args().skip(1); // skip program name
452    let mut runtime_override: Option<String> = None;
453
454    while let Some(arg) = args.next() {
455        if arg == "--runtime" {
456            if let Some(val) = args.next() {
457                runtime_override = Some(val);
458            }
459            break;
460        } else if let Some(rest) = arg.strip_prefix("--runtime=") {
461            runtime_override = Some(rest.to_string());
462            break;
463        }
464    }
465
466    if let Some(raw) = runtime_override {
467        let p = PathBuf::from(raw);
468        // If it's a directory, assume allora.yml inside it.
469        if p.is_dir() {
470            return p.join("allora.yml");
471        } else {
472            return p;
473        }
474    }
475
476    // 1. Optional env override (useful for CI or scripts)
477    if let Ok(raw) = env::var("ALLORA_CONFIG") {
478        let p = PathBuf::from(raw);
479        if p.is_dir() {
480            return p.join("allora.yml");
481        } else {
482            return p;
483        }
484    }
485
486    // 2. Prefer ./allora.yml in the current working directory (dev-friendly)
487    let cwd_candidate = PathBuf::from("allora.yml");
488    if cwd_candidate.exists() {
489        return cwd_candidate;
490    }
491
492    // 3. Then try <directory_of_executable>/allora.yml (release-friendly)
493    if let Ok(exe) = env::current_exe() {
494        if let Some(dir) = exe.parent() {
495            let candidate = dir.join("allora.yml");
496            if candidate.exists() {
497                return candidate;
498            }
499        }
500    }
501
502    // 4. Fallback (will cause a clear error in `run()` if missing)
503    PathBuf::from("allora.yml")
504}
505
506#[cfg(test)]
507mod wire_filters_tests {
508    //! Unit-level coverage for `wire_filters`. Builds a runtime from an
509    //! inline YAML spec, sends exchanges through the inbound channel, and
510    //! asserts the rejected ones are silently dropped while the accepted
511    //! ones land on the outbound channel.
512
513    use super::wire_filters;
514    use crate::dsl::build_runtime_from_str;
515    use crate::dsl::runtime::AlloraRuntime;
516    use crate::DirectChannel;
517    use allora_core::{Exchange, Message};
518    use std::sync::{Arc, Mutex};
519    use std::time::Duration;
520
521    fn build_with_filter_yaml() -> allora_core::Result<AlloraRuntime> {
522        let yaml = r#"
523version: 1
524channels:
525  - kind: direct
526    id: inbound
527  - kind: direct
528    id: high_priority
529filters:
530  - id: filt.priority
531    from: inbound
532    to: high_priority
533    when: header("Priority") == "high"
534"#;
535        build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)
536    }
537
538    /// Subscribe a closure on the named channel that records body texts
539    /// into the returned `Arc<Mutex<Vec<String>>>`. Mirrors the pattern a
540    /// real downstream Service would use.
541    fn collect_into(rt: &AlloraRuntime, channel_id: &str) -> Arc<Mutex<Vec<String>>> {
542        let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
543        let arc = rt
544            .channels_slice()
545            .iter()
546            .find(|c| c.id() == channel_id)
547            .cloned()
548            .expect("channel registered");
549        let direct = arc
550            .as_any()
551            .downcast_ref::<DirectChannel>()
552            .expect("channel is direct");
553        let cl = recorded.clone();
554        direct.subscribe(move |ex| {
555            cl.lock()
556                .unwrap()
557                .push(ex.in_msg.body_text().unwrap_or("").to_string());
558            Ok(())
559        });
560        recorded
561    }
562
563    #[tokio::test]
564    async fn filter_forwards_accepted_and_drops_rejected() -> allora_core::Result<()> {
565        let rt = build_with_filter_yaml()?;
566        wire_filters(&rt)?;
567        let high_priority = collect_into(&rt, "high_priority");
568
569        let inbound = rt
570            .channels_slice()
571            .iter()
572            .find(|c| c.id() == "inbound")
573            .cloned()
574            .expect("inbound registered");
575
576        // 1. No header → predicate false → dropped.
577        inbound
578            .send(Exchange::new(Message::from_text("no-header")))
579            .await?;
580        // 2. Priority=low → predicate false → dropped.
581        let mut low = Exchange::new(Message::from_text("low"));
582        low.in_msg.set_header("Priority", "low");
583        inbound.send(low).await?;
584        // 3. Priority=high → predicate true → forwarded.
585        let mut high = Exchange::new(Message::from_text("high"));
586        high.in_msg.set_header("Priority", "high");
587        inbound.send(high).await?;
588
589        // wire_filters spawns the forward via tokio::spawn; yield so the
590        // task fires and the subscriber records its body.
591        tokio::time::sleep(Duration::from_millis(50)).await;
592
593        let got = high_priority.lock().unwrap().clone();
594        assert_eq!(
595            got,
596            vec!["high".to_string()],
597            "only Priority=high should reach high_priority; got {got:?}"
598        );
599        Ok(())
600    }
601
602    #[tokio::test]
603    async fn yaml_without_filters_is_a_clean_noop() -> allora_core::Result<()> {
604        let yaml = r#"
605version: 1
606channels:
607  - kind: direct
608    id: inbound
609"#;
610        let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
611        assert_eq!(rt.filter_count(), 0);
612        wire_filters(&rt)?; // no-op; should not error
613        Ok(())
614    }
615}