Skip to main content

allora_runtime/
runtime.rs

1//! High-level application facade.
2//!
3//! Minimal builder API for constructing an `AlloraRuntime` from a configuration file.
4//! Intended for embedding with only a couple of lines of code.
5//!
6//! # Quick Start
7//! ```no_run
8//! use allora_runtime::Runtime;
9//! let rt = Runtime::new().run()?; // attempts to load ./allora.yml
10//! # Ok::<_, allora_runtime::Error>(())
11//! ```
12//!
13//! # Overview
14//! The builder exposes three operations:
15//! * `new()` – create a fresh builder (auto-discovery enabled)
16//! * `with_config_file(path)` – provide an explicit configuration file path
17//! * `run()` – build and return an `AlloraRuntime`
18//!
19//! If no path is supplied, `run()` will try a sensible default (`allora.yml`).
20//! Errors surface via the returned `Result`.
21//!
22//! ## Custom Path
23//! ```no_run
24//! # use allora_runtime::Runtime;
25//! let rt = Runtime::new().with_config_file("examples/basic/helloworld/allora.yml").run()?;
26//! # Ok::<_, allora_runtime::Error>(())
27//! ```
28//!
29//! # Service Wiring & The `#[service]` Macro
30//! Services are described in configuration (YAML) under `service-activator:` blocks using a `ref-name` field.
31//! Example YAML fragment:
32//! ```yaml
33//! version: 1
34//! service-activators:
35//!   - ref-name: hello_world
36//!     from: inbound.orders
37//!     to: vetted.orders
38//! ```
39//! At runtime, wiring matches each `ref-name` value against inventory descriptors submitted via
40//! the `#[service]` attribute macro. The macro registers a constructor closure that builds a
41//! single shared instance (singleton) of your type via its zero-arg `new()` and invokes its async `process` method.
42//!
43//! ## Using `#[service]`
44//! Apply the attribute to an inherent `impl` block that contains a zero-arg `new()` method.
45//! If `name` is omitted the concrete type name (with spaces removed) is used.
46//!
47//! Supported forms:
48//! * `#[service]` – implicit name = type name
49//! * `#[service(name = "custom")]` – explicit reference name matching YAML `ref-name`
50//!
51//! Example:
52//! ```ignore
53//! use allora::{service, Service, Exchange, error::Result};
54//! #[derive(Debug)]
55//! struct Uppercase;
56//! impl Uppercase { pub fn new() -> Self { Self } }
57//! #[service(name="uppercase")]
58//! impl Uppercase {}
59//! #[async_trait::async_trait]
60//! impl Service for Uppercase {
61//!     async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62//!         if let Some(body) = exchange.in_msg.body_text() { exchange.out_msg = Some(Message::from_text(body.to_uppercase())); }
63//!         Ok(())
64//!     }
65//! }
66//! ```
67//!
68//! # Descriptor Wiring Criteria
69//! A service is wired only if:
70//! 1. Its `ref-name` matches a registered descriptor `name`.
71//! 2. Both `from` and `to` channel IDs exist in the runtime.
72//! 3. The inbound channel is currently a direct channel (other kinds may be supported later).
73//!
74//! # Logging Fields
75//! Structured fields emitted during build:
76//! * `config.path` / `config.canonical` – discovery details
77//! * `service_activator.ref_name`, `inbound`, `outbound` – wiring decisions
78//! * `wired.count` – number of services wired
79//! * `channel.id`, `kind` – registered channels
80//! * `descriptor.impl` (legacy field name in traces) now maps to descriptor `name`.
81//!
82//! # Testing Notes
83//! * Use #[tokio::test] for async service wiring tests.
84//! * Macro UI tests (see `tests/macro/`) validate diagnostics & naming.
85//! * All services share a single instance; design internal mutability carefully.
86use crate::{
87    all_service_descriptors,
88    channel::Channel,
89    dsl::build,
90    dsl::runtime::AlloraRuntime,
91    error::{Error, Result},
92    service,
93};
94use allora_core::adapter::OutboundAdapter;
95use std::path::{Path, PathBuf};
96use std::sync::Arc;
97use tracing::{debug, info, trace};
98
99/// `Allora` builder holds configuration inputs prior to runtime construction.
100#[derive(Debug, Clone)]
101pub struct Runtime {
102    config_path: Option<PathBuf>,
103}
104
105impl Default for Runtime {
106    fn default() -> Self {
107        Self { config_path: None }
108    }
109}
110
111impl Runtime {
112    /// Create a new builder.
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Set an explicit configuration file path (overrides auto-discovery).
118    ///
119    /// Accepts any type implementing `AsRef<Path>` (e.g. `&str`, `PathBuf`).
120    /// Relative paths are resolved according to the current working directory.
121    ///
122    /// Does not validate path existence immediately; validation happens inside `run()`.
123    pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
124        self.config_path = Some(path.as_ref().to_path_buf());
125        self
126    }
127
128    /// Build the runtime from the explicit or default configuration file.
129    ///
130    /// # Configuration Discovery
131    ///
132    /// When no explicit path is provided via `with_config_file()`:
133    /// 1. First checks for `allora.yml` in the current working directory
134    /// 2. If not found, ascends parent directories from the executable location
135    ///    up to `MAX_PARENT_SEARCH_DEPTH` (10) levels
136    /// 3. Returns an error if no config is found
137    ///
138    /// # Note on Testing
139    ///
140    /// The parent directory ascent from the executable location is difficult to
141    /// test in isolation as it depends on the test executable's location.
142    /// Tests focus on explicit path configuration which covers the majority
143    /// of production use cases.
144    pub fn run(self) -> Result<AlloraRuntime> {
145        let explicit_opt = self.config_path.clone();
146        let path = match &explicit_opt {
147            Some(p) => p.clone(),
148            None => resolve_default_config(),
149        };
150
151        if let Some(parent) = path.parent() {
152            crate::logging::init_from_dir(parent);
153        } else {
154            crate::logging::init_from_dir(Path::new("."));
155        }
156
157        let exists = path.exists();
158        let canonical_opt = if exists {
159            path.canonicalize().ok()
160        } else {
161            None
162        };
163
164        // Log discovery/resolution with clearer semantics: canonical only if file exists.
165        if explicit_opt.is_none() {
166            info!(
167                config.path=%path.display(),
168                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
169                canonical=canonical_opt.is_some(),
170                auto=true,
171                "Configuration auto-discovered"
172            );
173        } else {
174            info!(
175                config.path=%path.display(),
176                config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
177                canonical=canonical_opt.is_some(),
178                auto=false,
179                "Configuration resolved"
180            );
181        }
182
183        if !exists {
184            return Err(Error::runtime(format!(
185                "config file '{}' not found",
186                path.display()
187            )));
188        }
189
190        let rt = build(&path)?;
191        wire_services(&rt)?;
192        wire_filters(&rt)?;
193        wire_http_outbound_adapters(&rt)?;
194        debug!(
195            channels = rt.channel_count(),
196            filters = rt.filter_count(),
197            "Runtime constructed"
198        );
199        Ok(rt)
200    }
201}
202
203pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
204    let descriptors = all_service_descriptors();
205    debug!(
206        service_activator.processors = rt.service_processor_count(),
207        descriptors = descriptors.len(),
208        "service wiring start"
209    );
210    for d in &descriptors {
211        trace!(descriptor.impl = d.name, "service descriptor loaded");
212    }
213    let mut service_activator_wirings: Vec<(
214        Arc<dyn Channel>,
215        Arc<dyn Channel>,
216        Arc<dyn service::Service>,
217        String,
218    )> = Vec::new();
219    for sp in rt.service_activator_processors().iter() {
220        let name_key = sp.ref_name();
221        trace!(
222            service_activator.ref_name = name_key,
223            service.id = sp.id(),
224            from = sp.from(),
225            to = sp.to(),
226            "evaluating service processor"
227        );
228        for desc in descriptors.iter() {
229            if desc.name == name_key {
230                trace!(service_activator.ref_name = name_key, "descriptor matched");
231                if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
232                    let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
233                    let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
234                    if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
235                        debug!(
236                            service_activator.ref_name = name_key,
237                            inbound = sp.from(),
238                            outbound = sp.to(),
239                            "channels resolved – scheduling wiring"
240                        );
241                        let proc_arc = (desc.constructor)();
242                        service_activator_wirings.push((
243                            in_arc.clone(),
244                            out_arc.clone(),
245                            proc_arc,
246                            name_key.to_string(),
247                        ));
248                    } else {
249                        debug!(
250                            service_activator.ref_name = name_key,
251                            inbound_found = inbound_arc_opt.is_some(),
252                            outbound_found = outbound_arc_opt.is_some(),
253                            "channel resolution failed – wiring skipped"
254                        );
255                    }
256                } else {
257                    debug!(
258                        service_activator.ref_name = name_key,
259                        "channel ids not found – skipped"
260                    );
261                }
262            }
263        }
264    }
265    if service_activator_wirings.is_empty() {
266        info!("no services wired (none matched or channels missing)");
267    } else {
268        info!(
269            wired.count = service_activator_wirings.len(),
270            "service wiring collected"
271        );
272    }
273    for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
274        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
275            let outbound_arc_dyn = out_arc.clone();
276            let inbound_id = inbound_direct.id().to_string();
277            let name_key_closure = name_key.clone();
278            let proc_shared = proc_arc.clone();
279            let sub_count = inbound_direct.subscribe(move |exchange| {
280                let outbound_clone = outbound_arc_dyn.clone();
281                let proc_task = proc_shared.clone();
282                let name_key_val = name_key_closure.clone();
283                tokio::spawn(async move {
284                    let mut ex_mut = exchange;
285                    if let Err(err) = proc_task.process(&mut ex_mut).await {
286                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
287                        return;
288                    }
289                    if let Err(err) = outbound_clone.send(ex_mut).await {
290                        tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
291                    }
292                });
293                Ok(())
294            });
295            debug!(
296                service_activator.ref_name = name_key,
297                inbound = inbound_id,
298                subscribers = sub_count,
299                "service wired"
300            );
301        } else {
302            debug!(
303                service_activator.ref_name = name_key,
304                inbound_id = in_arc.id(),
305                "inbound channel not direct – skipping wiring"
306            );
307        }
308    }
309    for ch in rt.channels() {
310        debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
311    }
312    debug!(
313        services.wired = rt.service_processor_count(),
314        "runtime wiring complete"
315    );
316    Ok(())
317}
318
319/// Wire each [`FilterActivation`] in the runtime to its inbound channel.
320///
321/// For every activation with a non-`None` `to:`, this:
322///
323/// 1. Resolves `from:` and `to:` to channels registered on the runtime.
324/// 2. Subscribes a closure on the inbound `DirectChannel`. On each
325///    exchange the closure:
326///    - evaluates `filter.accepts(&exchange)` (synchronous; APL is
327///      cheap),
328///    - if accepted → forwards the exchange to the outbound channel,
329///    - if rejected → silently drops (with a `trace!`-level log). Filter
330///      rejection is a normal event in EIP semantics, not an error.
331/// 3. Logs `outbound send failed` at `error!` only for genuine send
332///    errors on the outbound side.
333///
334/// Filters with `to:` = `None` are predicate-only (kept on the runtime
335/// for callers that invoke `filter.accepts(...)` directly) and are not
336/// auto-wired.
337///
338/// Mirrors the structure of [`wire_services`] so the two paths are
339/// consistent in field naming, ordering, and failure modes.
340pub fn wire_filters(rt: &AlloraRuntime) -> Result<()> {
341    debug!(
342        filter.activations = rt.filter_count(),
343        "filter wiring start"
344    );
345    let mut filter_wirings: Vec<(
346        Arc<dyn Channel>,
347        Arc<dyn Channel>,
348        Arc<crate::Filter>,
349        String,
350    )> = Vec::new();
351    for fa in rt.filters().iter() {
352        let Some(to) = fa.to() else {
353            debug!(
354                filter.id = fa.id(),
355                from = fa.from(),
356                "filter has no `to:` — predicate-only, not auto-wired"
357            );
358            continue;
359        };
360        trace!(
361            filter.id = fa.id(),
362            from = fa.from(),
363            to = to,
364            "evaluating filter activation"
365        );
366        let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == fa.from());
367        let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == to);
368        if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
369            debug!(
370                filter.id = fa.id(),
371                inbound = fa.from(),
372                outbound = to,
373                "channels resolved – scheduling filter wiring"
374            );
375            filter_wirings.push((
376                in_arc.clone(),
377                out_arc.clone(),
378                fa.filter().clone(),
379                fa.id().to_string(),
380            ));
381        } else {
382            debug!(
383                filter.id = fa.id(),
384                from = fa.from(),
385                to = to,
386                inbound_found = inbound_arc_opt.is_some(),
387                outbound_found = outbound_arc_opt.is_some(),
388                "filter channel resolution failed – wiring skipped"
389            );
390        }
391    }
392    if filter_wirings.is_empty() {
393        info!("no filters wired (none had `to:` channels resolvable on the runtime)");
394    } else {
395        info!(
396            wired.count = filter_wirings.len(),
397            "filter wiring collected"
398        );
399    }
400    for (in_arc, out_arc, filter_arc, id) in filter_wirings.into_iter() {
401        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
402            let outbound_arc_dyn = out_arc.clone();
403            let inbound_id = inbound_direct.id().to_string();
404            let id_closure = id.clone();
405            let sub_count = inbound_direct.subscribe(move |exchange| {
406                let outbound_clone = outbound_arc_dyn.clone();
407                let f = filter_arc.clone();
408                let id_val = id_closure.clone();
409                tokio::spawn(async move {
410                    if !f.accepts(&exchange) {
411                        trace!(target="allora::filter", filter.id=%id_val, "filter rejected exchange (dropped)");
412                        return;
413                    }
414                    if let Err(err) = outbound_clone.send(exchange).await {
415                        tracing::error!(target="allora::filter", filter.id=%id_val, error=%err, "Filter outbound channel send failed");
416                    }
417                });
418                Ok(())
419            });
420            debug!(
421                filter.id = id,
422                inbound = inbound_id,
423                subscribers = sub_count,
424                "filter wired"
425            );
426        } else {
427            debug!(
428                filter.id = id,
429                inbound_id = in_arc.id(),
430                "inbound channel not direct – skipping filter wiring"
431            );
432        }
433    }
434    debug!(
435        filters.wired = rt.filter_count(),
436        "filter runtime wiring complete"
437    );
438    Ok(())
439}
440
441/// Subscribe each [`HttpOutboundAdapterActivation`] with a `from:`
442/// channel name to that inbound channel; dispatch each arriving exchange
443/// through the adapter's `.dispatch(&exchange)` method.
444///
445/// Mirrors [`wire_filters`] for the http-outbound side. Activations with
446/// `from = None` are static-only: the adapter sits on the runtime for
447/// application code to invoke directly (see the http-outbound example),
448/// but the runtime does not auto-wire it.
449///
450/// ## Response shaping
451///
452/// On a successful dispatch, the post-dispatch exchange is forwarded to
453/// `to:` (when set) with:
454///
455/// - `in_msg.payload` ← the HTTP response body (as `Payload::Text`).
456/// - header `dispatch-result.status-code` ← the numeric HTTP status.
457/// - header `dispatch-result.acknowledged` ← `"true"` if the status was
458///   2xx, else `"false"`.
459///
460/// When `to:` is `None`, the dispatch is fire-and-forget: the result is
461/// logged at `debug!` and the message is dropped. (This matches the
462/// "I just need a webhook delivered" pattern that doesn't care about
463/// the response.)
464///
465/// On a failed dispatch (network error, TLS handshake failure, etc.),
466/// nothing is forwarded; the error is logged at `error!`.
467pub fn wire_http_outbound_adapters(rt: &AlloraRuntime) -> Result<()> {
468    debug!(
469        http_outbound.activations = rt.http_outbound_adapter_count(),
470        "http outbound wiring start"
471    );
472    let mut wirings: Vec<(
473        Arc<dyn Channel>,
474        Option<Arc<dyn Channel>>,
475        Arc<allora_http::HttpOutboundAdapter>,
476        String,
477    )> = Vec::new();
478    for activation in rt.http_outbound_adapters() {
479        let Some(from) = activation.from() else {
480            trace!(
481                http_outbound.id = activation.id(),
482                "adapter has no `from:` — static-only, not auto-wired"
483            );
484            continue;
485        };
486        trace!(
487            http_outbound.id = activation.id(),
488            from = from,
489            to = activation.to(),
490            "evaluating http outbound activation"
491        );
492        let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == from);
493        if inbound_arc_opt.is_none() {
494            debug!(
495                http_outbound.id = activation.id(),
496                from = from,
497                "inbound channel not found – wiring skipped"
498            );
499            continue;
500        }
501        // Resolve `to:` if declared. A missing channel is a config error
502        // — not silent fire-and-forget — so we skip wiring loudly rather
503        // than swallow a typo. The fire-and-forget mode is reserved for
504        // activations that explicitly omit `to:` from the spec.
505        let outbound_arc_opt = match activation.to() {
506            None => None,
507            Some(to_name) => match rt.channels_slice().iter().find(|c| c.id() == to_name) {
508                Some(arc) => Some(arc.clone()),
509                None => {
510                    tracing::warn!(
511                        target = "allora::http_outbound",
512                        http_outbound.id = activation.id(),
513                        from = from,
514                        to = to_name,
515                        "outbound channel declared but not registered – wiring skipped \
516                         (config mismatch; either declare the channel or remove `to:`)"
517                    );
518                    continue;
519                }
520            },
521        };
522        debug!(
523            http_outbound.id = activation.id(),
524            inbound = from,
525            outbound = activation.to(),
526            "channels resolved – scheduling http outbound wiring"
527        );
528        wirings.push((
529            inbound_arc_opt.unwrap().clone(),
530            outbound_arc_opt,
531            activation.adapter().clone(),
532            activation.id().to_string(),
533        ));
534    }
535    if wirings.is_empty() {
536        info!("no http outbound adapters wired");
537    } else {
538        info!(
539            wired.count = wirings.len(),
540            "http outbound wiring collected"
541        );
542    }
543    for (in_arc, out_arc_opt, adapter_arc, id) in wirings.into_iter() {
544        if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
545            let inbound_id = inbound_direct.id().to_string();
546            let id_closure = id.clone();
547            let sub_count = inbound_direct.subscribe(move |exchange| {
548                let outbound_clone = out_arc_opt.clone();
549                let adapter_clone = adapter_arc.clone();
550                let id_val = id_closure.clone();
551                tokio::spawn(async move {
552                    match adapter_clone.dispatch(&exchange).await {
553                        Ok(result) => {
554                            let Some(outbound) = outbound_clone else {
555                                // Fire-and-forget: no `to:` channel; log
556                                // the dispatch result and stop.
557                                tracing::debug!(
558                                    target = "allora::http_outbound",
559                                    http_outbound.id = %id_val,
560                                    status_code = ?result.status_code,
561                                    acknowledged = result.acknowledged,
562                                    "dispatched (fire-and-forget)"
563                                );
564                                return;
565                            };
566                            // Shape the post-dispatch exchange: response
567                            // body becomes the new payload (downstream
568                            // services read body_text() as usual); status
569                            // metadata lands in dispatch-result.* headers.
570                            let mut ex_mut = exchange;
571                            if let Some(body) = result.body {
572                                ex_mut.in_msg.payload = allora_core::Payload::Text(body);
573                            }
574                            if let Some(code) = result.status_code {
575                                ex_mut
576                                    .in_msg
577                                    .set_header("dispatch-result.status-code", &code.to_string());
578                            }
579                            ex_mut.in_msg.set_header(
580                                "dispatch-result.acknowledged",
581                                if result.acknowledged { "true" } else { "false" },
582                            );
583                            if let Err(err) = outbound.send(ex_mut).await {
584                                tracing::error!(
585                                    target = "allora::http_outbound",
586                                    http_outbound.id = %id_val,
587                                    error = %err,
588                                    "outbound channel send failed"
589                                );
590                            }
591                        }
592                        Err(err) => {
593                            tracing::error!(
594                                target = "allora::http_outbound",
595                                http_outbound.id = %id_val,
596                                error = %err,
597                                "http outbound dispatch failed"
598                            );
599                        }
600                    }
601                });
602                Ok(())
603            });
604            debug!(
605                http_outbound.id = id,
606                inbound = inbound_id,
607                subscribers = sub_count,
608                "http outbound wired"
609            );
610        } else {
611            debug!(
612                http_outbound.id = id,
613                inbound_id = in_arc.id(),
614                "inbound channel not direct – skipping http outbound wiring"
615            );
616        }
617    }
618    debug!(
619        http_outbound.activations = rt.http_outbound_adapter_count(),
620        "http outbound runtime wiring complete"
621    );
622    Ok(())
623}
624
625fn resolve_default_config() -> PathBuf {
626    use std::env;
627
628    // 0. CLI override: --runtime <path> or --runtime=/path
629    //
630    // Example:
631    //   cargo run --manifest-path examples/basic/http/Cargo.toml -- \
632    //     --runtime examples/basic/http/allora.yml
633    //
634    // or
635    //
636    //   cargo run -- ... --runtime=examples/basic/http/allora.yml
637    let mut args = env::args().skip(1); // skip program name
638    let mut runtime_override: Option<String> = None;
639
640    while let Some(arg) = args.next() {
641        if arg == "--runtime" {
642            if let Some(val) = args.next() {
643                runtime_override = Some(val);
644            }
645            break;
646        } else if let Some(rest) = arg.strip_prefix("--runtime=") {
647            runtime_override = Some(rest.to_string());
648            break;
649        }
650    }
651
652    if let Some(raw) = runtime_override {
653        let p = PathBuf::from(raw);
654        // If it's a directory, assume allora.yml inside it.
655        if p.is_dir() {
656            return p.join("allora.yml");
657        } else {
658            return p;
659        }
660    }
661
662    // 1. Optional env override (useful for CI or scripts)
663    if let Ok(raw) = env::var("ALLORA_CONFIG") {
664        let p = PathBuf::from(raw);
665        if p.is_dir() {
666            return p.join("allora.yml");
667        } else {
668            return p;
669        }
670    }
671
672    // 2. Prefer ./allora.yml in the current working directory (dev-friendly)
673    let cwd_candidate = PathBuf::from("allora.yml");
674    if cwd_candidate.exists() {
675        return cwd_candidate;
676    }
677
678    // 3. Then try <directory_of_executable>/allora.yml (release-friendly)
679    if let Ok(exe) = env::current_exe() {
680        if let Some(dir) = exe.parent() {
681            let candidate = dir.join("allora.yml");
682            if candidate.exists() {
683                return candidate;
684            }
685        }
686    }
687
688    // 4. Fallback (will cause a clear error in `run()` if missing)
689    PathBuf::from("allora.yml")
690}
691
692#[cfg(test)]
693mod wire_filters_tests {
694    //! Unit-level coverage for `wire_filters`. Builds a runtime from an
695    //! inline YAML spec, sends exchanges through the inbound channel, and
696    //! asserts the rejected ones are silently dropped while the accepted
697    //! ones land on the outbound channel.
698
699    use super::wire_filters;
700    use crate::dsl::build_runtime_from_str;
701    use crate::dsl::runtime::AlloraRuntime;
702    use crate::DirectChannel;
703    use allora_core::{Exchange, Message};
704    use std::sync::{Arc, Mutex};
705    use std::time::Duration;
706
707    fn build_with_filter_yaml() -> allora_core::Result<AlloraRuntime> {
708        let yaml = r#"
709version: 1
710channels:
711  - kind: direct
712    id: inbound
713  - kind: direct
714    id: high_priority
715filters:
716  - id: filt.priority
717    from: inbound
718    to: high_priority
719    when: header("Priority") == "high"
720"#;
721        build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)
722    }
723
724    /// Subscribe a closure on the named channel that records body texts
725    /// into the returned `Arc<Mutex<Vec<String>>>`. Mirrors the pattern a
726    /// real downstream Service would use.
727    fn collect_into(rt: &AlloraRuntime, channel_id: &str) -> Arc<Mutex<Vec<String>>> {
728        let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
729        let arc = rt
730            .channels_slice()
731            .iter()
732            .find(|c| c.id() == channel_id)
733            .cloned()
734            .expect("channel registered");
735        let direct = arc
736            .as_any()
737            .downcast_ref::<DirectChannel>()
738            .expect("channel is direct");
739        let cl = recorded.clone();
740        direct.subscribe(move |ex| {
741            cl.lock()
742                .unwrap()
743                .push(ex.in_msg.body_text().unwrap_or("").to_string());
744            Ok(())
745        });
746        recorded
747    }
748
749    #[tokio::test]
750    async fn filter_forwards_accepted_and_drops_rejected() -> allora_core::Result<()> {
751        let rt = build_with_filter_yaml()?;
752        wire_filters(&rt)?;
753        let high_priority = collect_into(&rt, "high_priority");
754
755        let inbound = rt
756            .channels_slice()
757            .iter()
758            .find(|c| c.id() == "inbound")
759            .cloned()
760            .expect("inbound registered");
761
762        // 1. No header → predicate false → dropped.
763        inbound
764            .send(Exchange::new(Message::from_text("no-header")))
765            .await?;
766        // 2. Priority=low → predicate false → dropped.
767        let mut low = Exchange::new(Message::from_text("low"));
768        low.in_msg.set_header("Priority", "low");
769        inbound.send(low).await?;
770        // 3. Priority=high → predicate true → forwarded.
771        let mut high = Exchange::new(Message::from_text("high"));
772        high.in_msg.set_header("Priority", "high");
773        inbound.send(high).await?;
774
775        // wire_filters spawns the forward via tokio::spawn; yield so the
776        // task fires and the subscriber records its body.
777        tokio::time::sleep(Duration::from_millis(50)).await;
778
779        let got = high_priority.lock().unwrap().clone();
780        assert_eq!(
781            got,
782            vec!["high".to_string()],
783            "only Priority=high should reach high_priority; got {got:?}"
784        );
785        Ok(())
786    }
787
788    #[tokio::test]
789    async fn yaml_without_filters_is_a_clean_noop() -> allora_core::Result<()> {
790        let yaml = r#"
791version: 1
792channels:
793  - kind: direct
794    id: inbound
795"#;
796        let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
797        assert_eq!(rt.filter_count(), 0);
798        wire_filters(&rt)?; // no-op; should not error
799        Ok(())
800    }
801}
802
803#[cfg(test)]
804mod wire_http_outbound_tests {
805    //! Channel-driven dispatch tests for `wire_http_outbound_adapters`.
806    //!
807    //! Each test spawns a tiny hyper server, builds a runtime whose
808    //! `http-outbound-adapter` block has a `from:` / (optionally) `to:`
809    //! pair, sends a message on the inbound channel, and asserts:
810    //! - the test server received the bytes from the message;
811    //! - the post-dispatch exchange landed on `to:` (or didn't, for
812    //!   fire-and-forget) with the documented header + payload shape.
813
814    use super::wire_http_outbound_adapters;
815    use crate::dsl::build_runtime_from_str;
816    use crate::dsl::runtime::AlloraRuntime;
817    use crate::DirectChannel;
818    use allora_core::{Exchange, Message};
819    use hyper::service::{make_service_fn, service_fn};
820    use hyper::{Body, Request, Response, Server};
821    use std::sync::{Arc, Mutex};
822    use std::time::Duration;
823
824    /// Bind a hyper server on `127.0.0.1:0` (kernel-assigned ephemeral
825    /// port — avoids the cross-test port collisions a fixed port would
826    /// invite in CI). The server records every request body it receives
827    /// and replies with the given response body + status.
828    ///
829    /// Returns the bound port (callers interpolate it into their YAML
830    /// fixture) and the shared body sink.
831    async fn spawn_capture_server(
832        reply_body: &'static str,
833        reply_status: u16,
834    ) -> (u16, Arc<Mutex<Vec<Vec<u8>>>>) {
835        let std_listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral");
836        let port = std_listener.local_addr().expect("local_addr").port();
837        std_listener.set_nonblocking(true).expect("nonblocking");
838
839        let bodies = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
840        let bodies_cl = bodies.clone();
841        let make = make_service_fn(move |_| {
842            let bodies = bodies_cl.clone();
843            async move {
844                Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
845                    let bodies = bodies.clone();
846                    async move {
847                        let bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
848                        bodies.lock().unwrap().push(bytes.to_vec());
849                        Ok::<_, hyper::Error>(
850                            Response::builder()
851                                .status(reply_status)
852                                .body(Body::from(reply_body))
853                                .unwrap(),
854                        )
855                    }
856                }))
857            }
858        });
859        // `Server::from_tcp` hands hyper the already-bound listener —
860        // no race between drop and re-bind on a guessed port.
861        let server = Server::from_tcp(std_listener)
862            .expect("hyper from_tcp")
863            .serve(make);
864        tokio::spawn(server);
865        tokio::time::sleep(Duration::from_millis(50)).await; // let accept loop spin up
866        (port, bodies)
867    }
868
869    fn collect_into(
870        rt: &AlloraRuntime,
871        channel_id: &str,
872    ) -> Arc<Mutex<Vec<(String, Option<String>, Option<String>)>>> {
873        let recorded = Arc::new(Mutex::new(Vec::new()));
874        let arc = rt
875            .channels_slice()
876            .iter()
877            .find(|c| c.id() == channel_id)
878            .cloned()
879            .expect("channel registered");
880        let direct = arc
881            .as_any()
882            .downcast_ref::<DirectChannel>()
883            .expect("channel is direct");
884        let cl = recorded.clone();
885        direct.subscribe(move |ex| {
886            let body = ex.in_msg.body_text().unwrap_or("").to_string();
887            let status = ex
888                .in_msg
889                .header("dispatch-result.status-code")
890                .map(|s| s.to_string());
891            let ack = ex
892                .in_msg
893                .header("dispatch-result.acknowledged")
894                .map(|s| s.to_string());
895            cl.lock().unwrap().push((body, status, ack));
896            Ok(())
897        });
898        recorded
899    }
900
901    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
902    async fn dispatches_message_and_forwards_response_to_outbound_channel(
903    ) -> allora_core::Result<()> {
904        let (port, server_bodies) = spawn_capture_server("ok-from-server", 202).await;
905
906        let yaml = format!(
907            r#"
908version: 1
909channels:
910  - kind: direct
911    id: outbound_requests
912  - kind: direct
913    id: dispatch_results
914http-outbound-adapters:
915  - id: test-out
916    host: 127.0.0.1
917    port: {port}
918    base-path: /
919    method: POST
920    from: outbound_requests
921    to: dispatch_results
922"#
923        );
924        let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
925        wire_http_outbound_adapters(&rt)?;
926
927        let results = collect_into(&rt, "dispatch_results");
928        let inbound = rt
929            .channels_slice()
930            .iter()
931            .find(|c| c.id() == "outbound_requests")
932            .cloned()
933            .expect("inbound registered");
934
935        inbound
936            .send(Exchange::new(Message::from_text("hello-server")))
937            .await?;
938        tokio::time::sleep(Duration::from_millis(150)).await;
939
940        let got_bodies = server_bodies.lock().unwrap().clone();
941        assert_eq!(
942            got_bodies,
943            vec![b"hello-server".to_vec()],
944            "server should have recorded the dispatched body once",
945        );
946
947        let got_results = results.lock().unwrap().clone();
948        assert_eq!(got_results.len(), 1, "one post-dispatch exchange expected");
949        let (body, status, ack) = &got_results[0];
950        assert_eq!(body, "ok-from-server");
951        assert_eq!(status.as_deref(), Some("202"));
952        assert_eq!(ack.as_deref(), Some("true"));
953        Ok(())
954    }
955
956    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
957    async fn fire_and_forget_no_to_channel_drops_response() -> allora_core::Result<()> {
958        let (port, server_bodies) = spawn_capture_server("ack", 202).await;
959
960        let yaml = format!(
961            r#"
962version: 1
963channels:
964  - kind: direct
965    id: outbound_requests
966http-outbound-adapters:
967  - id: fire-forget
968    host: 127.0.0.1
969    port: {port}
970    base-path: /
971    method: POST
972    from: outbound_requests
973"#
974        );
975        let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
976        wire_http_outbound_adapters(&rt)?;
977
978        let inbound = rt
979            .channels_slice()
980            .iter()
981            .find(|c| c.id() == "outbound_requests")
982            .cloned()
983            .expect("inbound registered");
984        inbound
985            .send(Exchange::new(Message::from_text("notify")))
986            .await?;
987        tokio::time::sleep(Duration::from_millis(150)).await;
988
989        let got_bodies = server_bodies.lock().unwrap().clone();
990        assert_eq!(got_bodies, vec![b"notify".to_vec()]);
991        // No outbound channel declared → no follow-up assertion to make
992        // beyond "the server saw the request and we didn't crash."
993        Ok(())
994    }
995
996    #[tokio::test]
997    async fn yaml_without_outbound_adapters_is_a_clean_noop() -> allora_core::Result<()> {
998        let yaml = r#"
999version: 1
1000channels:
1001  - kind: direct
1002    id: inbound
1003"#;
1004        let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
1005        assert_eq!(rt.http_outbound_adapter_count(), 0);
1006        wire_http_outbound_adapters(&rt)?; // no-op
1007        Ok(())
1008    }
1009
1010    /// A `to:` channel name that isn't registered on the runtime is a
1011    /// config error — the wiring **skips** loudly (`warn!`) rather than
1012    /// silently degrading to fire-and-forget. (Caught in PR #26 review.)
1013    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1014    async fn missing_to_channel_skips_wiring_instead_of_silent_fire_and_forget(
1015    ) -> allora_core::Result<()> {
1016        let (port, server_bodies) = spawn_capture_server("ack", 202).await;
1017
1018        // `to: nonexistent` — note: the channel is NOT in `channels:`.
1019        let yaml = format!(
1020            r#"
1021version: 1
1022channels:
1023  - kind: direct
1024    id: outbound_requests
1025http-outbound-adapters:
1026  - id: misconfigured
1027    host: 127.0.0.1
1028    port: {port}
1029    base-path: /
1030    method: POST
1031    from: outbound_requests
1032    to: nonexistent
1033"#
1034        );
1035        let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
1036        wire_http_outbound_adapters(&rt)?;
1037
1038        let inbound = rt
1039            .channels_slice()
1040            .iter()
1041            .find(|c| c.id() == "outbound_requests")
1042            .cloned()
1043            .expect("inbound registered");
1044        // If wiring incorrectly degraded to fire-and-forget, the server
1045        // would see the request. We want to assert it does NOT.
1046        inbound
1047            .send(Exchange::new(Message::from_text("should-not-be-sent")))
1048            .await?;
1049        tokio::time::sleep(Duration::from_millis(150)).await;
1050
1051        let got_bodies = server_bodies.lock().unwrap().clone();
1052        assert!(
1053            got_bodies.is_empty(),
1054            "wiring skipped due to missing `to:` should mean the adapter is not subscribed; \
1055             got bodies={got_bodies:?}"
1056        );
1057        Ok(())
1058    }
1059
1060    #[tokio::test]
1061    async fn adapter_without_from_is_static_only_not_wired() -> allora_core::Result<()> {
1062        let yaml = r#"
1063version: 1
1064channels:
1065  - kind: direct
1066    id: anything
1067http-outbound-adapters:
1068  - id: static-out
1069    host: 127.0.0.1
1070    port: 9
1071    base-path: /
1072"#;
1073        let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
1074        assert_eq!(rt.http_outbound_adapter_count(), 1);
1075        let activation = &rt.http_outbound_adapters()[0];
1076        assert_eq!(activation.id(), "static-out");
1077        assert_eq!(activation.from(), None);
1078        assert_eq!(activation.to(), None);
1079        wire_http_outbound_adapters(&rt)?; // no-op (no from:)
1080        Ok(())
1081    }
1082}