Skip to main content

allora_runtime/dsl/
runtime.rs

1//! AlloraRuntime: aggregate of built runtime components (extensible).
2//!
3//! Current contents:
4//! * channels: vector of in-memory channels (single kind)
5//!
6//! Future extensions (not yet wired):
7//! * endpoints
8//! * filters / routers
9//! * adapters
10//! * correlation groups
11//!
12//! Design goals:
13//! * Single return object from top-level build to avoid signature churn as components grow.
14//! * Provide accessor methods with owned + borrowed variants.
15//! * Keep internal storage concrete now; migrate to trait objects (`ChannelRef`) when multiple channel kinds arrive.
16//!
17//! Backward compatibility note removed: prefer `build()` which returns `AlloraRuntime`.
18//!
19//! # Overview
20//! `AlloraRuntime` is the single return object from the top-level `build()` DSL facade.
21//! It bundles all instantiated runtime components derived from a configuration spec.
22//! Currently it only contains channels (in-memory kind); future releases will extend it
23//! with endpoints, filters/routers, adapters, and correlation utilities without changing
24//! the public `build()` signature.
25//!
26//! # Guarantees
27//! * The collection of channels preserves the order they were defined in the source spec.
28//! * Channel IDs are unique (enforced at build time); missing IDs receive deterministic
29//!   `channel:auto.N` identifiers within the same build invocation.
30//! * Lookup (`channel_by_id`) performs a linear scan; acceptable for small collections.
31//!   This can be optimized later by introducing an internal index without API changes.
32//!
33//! # Usage Example
34//! ```no_run
35//! use allora_core::Channel;
36//! use allora_runtime::build;
37//! // Requires a valid allora.yml at the given path.
38//! let rt = build("tests/fixtures/allora.yml").unwrap();
39//! assert!(rt.channel_by_id("inbound.orders").is_some());
40//! for ch in rt.channels() { println!("id={}", ch.id()); }
41//! ```
42//!
43//! # Future Extensions (Illustrative)
44//! * `endpoints()` -> &[Endpoint]
45//! * `filters()` / `routers()` -> pattern components
46//! * `adapters()` -> inbound / outbound integration points
47//! * `correlations()` -> tracking groups for aggregation patterns
48//! These will be added as additional fields with accessor methods while keeping
49//! `AlloraRuntime` construction centralized in the DSL facade.
50use crate::channel::Channel;
51use crate::dsl::component_builders::ServiceProcessor;
52use crate::service_activator_processor::ServiceActivatorProcessor;
53use crate::Filter;
54use crate::HttpInboundAdapter;
55use crate::HttpOutboundAdapter;
56use std::sync::Arc;
57
58#[derive(Debug)]
59/// Aggregated runtime container for all built components (channels today, more later).
60///
61/// Prefer borrowing via the accessor methods (`channels()`, `channel_by_id`) for read-only
62/// operations. Use `into_channels()` only when you need ownership transfer (e.g. embedding
63/// channels into another structure or performing manual lifecycle management).
64pub struct AlloraRuntime {
65    channels: Vec<Arc<dyn Channel>>,
66    filters: Vec<FilterActivation>,
67    services: Vec<ServiceProcessor>,
68    service_activator_processors: Vec<ServiceActivatorProcessor>,
69    http_inbound_adapters: Vec<HttpInboundAdapter>,
70    http_outbound_adapters: Vec<HttpOutboundAdapterActivation>,
71}
72
73/// Pairs a [`Filter`] predicate with the channel routing metadata from
74/// the [`FilterSpec`] it was built from.
75///
76/// A bare `Filter` only knows how to evaluate its predicate
77/// (`accepts(&exchange)`); it has no notion of "where to consume from" or
78/// "where to forward to." `FilterActivation` carries that wiring intent
79/// alongside the predicate so the runtime can subscribe each filter to
80/// its inbound channel and dispatch accepted exchanges to its outbound
81/// channel — the same pattern `ServiceActivatorProcessor` uses for
82/// services.
83///
84/// Constructed by [`build_filters_from_spec`](crate::dsl::component_builders::build_filters_from_spec)
85/// from a parsed [`FiltersSpec`] and stored on the runtime via
86/// [`with_filters`](AlloraRuntime::with_filters). The runtime calls
87/// [`wire_filters`](crate::wire_filters) during startup to subscribe
88/// each activation to its channels.
89///
90/// When `to` is `None` the filter is treated as **predicate-only** —
91/// kept in the runtime for callers that hold the `AlloraRuntime` and
92/// invoke `filter.accepts(...)` directly, but not auto-wired.
93pub struct FilterActivation {
94    filter: std::sync::Arc<Filter>,
95    from: String,
96    to: Option<String>,
97    id: String,
98}
99
100impl std::fmt::Debug for FilterActivation {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("FilterActivation")
103            .field("id", &self.id)
104            .field("from", &self.from)
105            .field("to", &self.to)
106            .finish()
107    }
108}
109
110impl FilterActivation {
111    /// Build a new activation from a `Filter` (typically the output of
112    /// `Filter::from_apl_with_id`) and its routing metadata.
113    pub fn new(
114        filter: Filter,
115        from: impl Into<String>,
116        to: Option<String>,
117        id: impl Into<String>,
118    ) -> Self {
119        Self {
120            filter: std::sync::Arc::new(filter),
121            from: from.into(),
122            to,
123            id: id.into(),
124        }
125    }
126
127    /// The wrapped filter predicate (shared via `Arc` so the runtime can
128    /// hand a clone to its subscription closure).
129    pub fn filter(&self) -> &std::sync::Arc<Filter> {
130        &self.filter
131    }
132
133    /// Inbound channel ID the runtime subscribes the filter to.
134    pub fn from(&self) -> &str {
135        &self.from
136    }
137
138    /// Outbound channel ID the runtime dispatches accepted exchanges to.
139    /// `None` means the filter is predicate-only (not auto-wired by the
140    /// runtime; available via the runtime accessor for external use).
141    pub fn to(&self) -> Option<&str> {
142        self.to.as_deref()
143    }
144
145    /// Stable identifier (matches the `FilterSpec`'s `id`, or a
146    /// deterministic `filter:auto.N` if the spec omitted it).
147    pub fn id(&self) -> &str {
148        &self.id
149    }
150}
151
152/// Pairs an [`HttpOutboundAdapter`] with the channel routing metadata
153/// from the [`HttpOutboundAdapterSpec`] it was built from.
154///
155/// Mirrors [`FilterActivation`] for the http-outbound side. When the
156/// spec had a `from:` field, the runtime subscribes a closure on that
157/// inbound channel that:
158///
159/// 1. Calls `adapter.dispatch(&exchange)`.
160/// 2. If the spec also had `to:`, transforms the exchange (response
161///    body → `in_msg.payload`; status code + acknowledged → header
162///    metadata) and forwards it to the outbound channel.
163/// 3. If `to:` is `None`, the dispatch is fire-and-forget — result
164///    logged at `debug!`, message dropped.
165///
166/// When `from:` is `None` the activation is **static-only** — the
167/// adapter lives on the runtime for application code that calls
168/// `.dispatch()` directly (the legacy http-outbound pattern). The
169/// runtime does not auto-wire it.
170pub struct HttpOutboundAdapterActivation {
171    adapter: std::sync::Arc<HttpOutboundAdapter>,
172    from: Option<String>,
173    to: Option<String>,
174    id: String,
175}
176
177impl std::fmt::Debug for HttpOutboundAdapterActivation {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_struct("HttpOutboundAdapterActivation")
180            .field("id", &self.id)
181            .field("from", &self.from)
182            .field("to", &self.to)
183            .finish()
184    }
185}
186
187impl HttpOutboundAdapterActivation {
188    /// Build a new activation from a built `HttpOutboundAdapter` and its
189    /// routing metadata.
190    pub fn new(
191        adapter: HttpOutboundAdapter,
192        from: Option<String>,
193        to: Option<String>,
194        id: impl Into<String>,
195    ) -> Self {
196        Self {
197            adapter: std::sync::Arc::new(adapter),
198            from,
199            to,
200            id: id.into(),
201        }
202    }
203
204    /// The wrapped HTTP outbound adapter (shared via `Arc` so the
205    /// runtime can hand a clone to its subscription closure).
206    pub fn adapter(&self) -> &std::sync::Arc<HttpOutboundAdapter> {
207        &self.adapter
208    }
209
210    /// Inbound channel ID the runtime subscribes the adapter to.
211    /// `None` means the activation is static-only — accessible via
212    /// `AlloraRuntime::http_outbound_adapters()` but not auto-wired.
213    pub fn from(&self) -> Option<&str> {
214        self.from.as_deref()
215    }
216
217    /// Outbound channel ID for the post-dispatch exchange. Ignored
218    /// when `from` is `None`. When `from` is set and `to` is `None`,
219    /// dispatch is fire-and-forget.
220    pub fn to(&self) -> Option<&str> {
221        self.to.as_deref()
222    }
223
224    /// Stable identifier (matches the `HttpOutboundAdapterSpec`'s
225    /// `id`, or `http-outbound:<host>:<port>` from the builder default
226    /// when the spec omitted it).
227    pub fn id(&self) -> &str {
228        &self.id
229    }
230}
231
232impl AlloraRuntime {
233    /// Create a new runtime instance from a vector of channels.
234    ///
235    /// Typically, invoked internally by the DSL (`build_runtime_from_str`).
236    pub fn new(channels: Vec<Box<dyn Channel>>) -> Self {
237        let channels_arc = channels.into_iter().map(|c| Arc::from(c)).collect();
238        Self {
239            channels: channels_arc,
240            filters: Vec::new(),
241            services: Vec::new(),
242            service_activator_processors: Vec::new(),
243            http_inbound_adapters: Vec::new(),
244            http_outbound_adapters: Vec::new(),
245        }
246    }
247    /// Sets the filter activations for this runtime.
248    ///
249    /// Consumes the provided activations vector and assigns it to the
250    /// runtime. The runtime's `wire_filters` step then subscribes each
251    /// activation with a non-`None` `to:` to its inbound channel.
252    pub fn with_filters(mut self, filters: Vec<FilterActivation>) -> Self {
253        self.filters = filters;
254        self
255    }
256    /// Sets the services for this runtime.
257    ///
258    /// Consumes the provided services vector and assigns it to the runtime.
259    pub fn with_services(mut self, services: Vec<ServiceProcessor>) -> Self {
260        self.services = services;
261        self
262    }
263    pub fn with_service_processors(mut self, proc: Vec<ServiceActivatorProcessor>) -> Self {
264        self.service_activator_processors = proc;
265        self
266    }
267    /// Sets the HTTP inbound adapters for this runtime.
268    ///
269    /// Consumes the provided adapters vector and assigns it to the runtime.
270    pub fn with_http_inbound_adapters(mut self, adapters: Vec<HttpInboundAdapter>) -> Self {
271        self.http_inbound_adapters = adapters;
272        self
273    }
274    pub fn with_http_outbound_adapters(
275        mut self,
276        adapters: Vec<HttpOutboundAdapterActivation>,
277    ) -> Self {
278        self.http_outbound_adapters = adapters;
279        self
280    }
281    /// Borrow all channels as an iterator of &dyn Channel (zero allocation).
282    pub fn channels(&self) -> impl Iterator<Item = &dyn Channel> {
283        self.channels.iter().map(|c| c.as_ref())
284    }
285    /// Borrow underlying boxed channel slice (rarely needed).
286    pub fn channels_slice(&self) -> &[Arc<dyn Channel>] {
287        &self.channels
288    }
289    /// Borrow all filter activations (read-only slice).
290    pub fn filters(&self) -> &[FilterActivation] {
291        &self.filters
292    }
293    /// Borrow all services (read-only slice).
294    pub fn services(&self) -> &[ServiceProcessor] {
295        &self.services
296    }
297    /// Consume the runtime, yielding owned channels.
298    pub fn into_channels(self) -> Vec<Arc<dyn Channel>> {
299        self.channels
300    }
301    /// Consume the runtime, yielding owned filter activations.
302    pub fn into_filters(self) -> Vec<FilterActivation> {
303        self.filters
304    }
305    /// Consume the runtime, yielding owned services.
306    pub fn into_services(self) -> Vec<ServiceProcessor> {
307        self.services
308    }
309    /// Find a channel by its id; returns `None` if not present.
310    ///
311    /// Complexity: O(n). Optimizations (hash index) can be added later without
312    /// changing this method's signature or semantics.
313    pub fn channel_by_id(&self, id: &str) -> Option<&dyn Channel> {
314        self.channels
315            .iter()
316            .find(|c| c.id() == id)
317            .map(|c| c.as_ref())
318    }
319    /// Return a cloned Arc<dyn Channel> by id if it exists (helper for builders needing ownership).
320    pub fn channel_ref_by_id(&self, id: &str) -> Option<Arc<dyn Channel>> {
321        self.channels
322            .iter()
323            .find(|c| c.id() == id)
324            .map(|c| Arc::clone(c))
325    }
326    /// Generic typed channel lookup: returns &T if a channel with `id` exists and downcasts to T.
327    pub fn channel_typed<T: Channel + 'static>(&self, id: &str) -> Option<&T> {
328        self.channels
329            .iter()
330            .find(|c| c.id() == id)
331            .and_then(|c| c.as_any().downcast_ref::<T>())
332    }
333    /// Required typed channel lookup: panics with a clear message if missing or wrong type.
334    pub fn channel<T: Channel + 'static>(&self, id: &str) -> &T {
335        for c in &self.channels {
336            if c.id() == id {
337                if let Some(t) = c.as_any().downcast_ref::<T>() {
338                    return t;
339                } else {
340                    panic!(
341                        "channel '{}' exists with kind '{}' but does not match expected type '{}'",
342                        id,
343                        c.kind(),
344                        std::any::type_name::<T>()
345                    );
346                }
347            }
348        }
349        panic!(
350            "channel '{}' not found (expected type '{}')",
351            id,
352            std::any::type_name::<T>()
353        );
354    }
355    /// Predicate: does channel id exist and is of type T?
356    pub fn channel_is<T: Channel + 'static>(&self, id: &str) -> bool {
357        self.channel_typed::<T>(id).is_some()
358    }
359    /// Total number of channels in this runtime.
360    pub fn channel_count(&self) -> usize {
361        self.channels.len()
362    }
363    /// Total number of filters in this runtime.
364    pub fn filter_count(&self) -> usize {
365        self.filters.len()
366    }
367    /// Total number of services in this runtime.
368    pub fn service_count(&self) -> usize {
369        self.services.len()
370    }
371    /// Total number of service processors in this runtime.
372    pub fn service_processor_count(&self) -> usize {
373        self.service_activator_processors.len()
374    }
375    pub fn service_activator_processors(&self) -> &[ServiceActivatorProcessor] {
376        &self.service_activator_processors
377    }
378    pub fn service_activator_processor_by_id(
379        &self,
380        id: &str,
381    ) -> Option<&ServiceActivatorProcessor> {
382        self.service_activator_processors
383            .iter()
384            .find(|p| p.id() == id)
385    }
386    pub fn service_activator_processor_mut_by_id(
387        &mut self,
388        id: &str,
389    ) -> Option<&mut ServiceActivatorProcessor> {
390        self.service_activator_processors
391            .iter_mut()
392            .find(|p| p.id() == id)
393    }
394    pub fn http_inbound_adapters(&self) -> &[HttpInboundAdapter] {
395        &self.http_inbound_adapters
396    }
397    pub fn http_inbound_adapter_count(&self) -> usize {
398        self.http_inbound_adapters.len()
399    }
400    pub fn http_outbound_adapters(&self) -> &[HttpOutboundAdapterActivation] {
401        &self.http_outbound_adapters
402    }
403    pub fn http_outbound_adapter_count(&self) -> usize {
404        self.http_outbound_adapters.len()
405    }
406}