allora_runtime/dsl/runtime.rs
1//! AlloraRuntime: aggregate of built runtime components (extensible).
2//!
3//! Current contents:
4//! * channels: vector of in-memory channels (single kind)
5//!
6//! Future extensions (not yet wired):
7//! * endpoints
8//! * filters / routers
9//! * adapters
10//! * correlation groups
11//!
12//! Design goals:
13//! * Single return object from top-level build to avoid signature churn as components grow.
14//! * Provide accessor methods with owned + borrowed variants.
15//! * Keep internal storage concrete now; migrate to trait objects (`ChannelRef`) when multiple channel kinds arrive.
16//!
17//! Backward compatibility note removed: prefer `build()` which returns `AlloraRuntime`.
18//!
19//! # Overview
20//! `AlloraRuntime` is the single return object from the top-level `build()` DSL facade.
21//! It bundles all instantiated runtime components derived from a configuration spec.
22//! Currently it only contains channels (in-memory kind); future releases will extend it
23//! with endpoints, filters/routers, adapters, and correlation utilities without changing
24//! the public `build()` signature.
25//!
26//! # Guarantees
27//! * The collection of channels preserves the order they were defined in the source spec.
28//! * Channel IDs are unique (enforced at build time); missing IDs receive deterministic
29//! `channel:auto.N` identifiers within the same build invocation.
30//! * Lookup (`channel_by_id`) performs a linear scan; acceptable for small collections.
31//! This can be optimized later by introducing an internal index without API changes.
32//!
33//! # Usage Example
34//! ```no_run
35//! use allora_core::Channel;
36//! use allora_runtime::build;
37//! // Requires a valid allora.yml at the given path.
38//! let rt = build("tests/fixtures/allora.yml").unwrap();
39//! assert!(rt.channel_by_id("inbound.orders").is_some());
40//! for ch in rt.channels() { println!("id={}", ch.id()); }
41//! ```
42//!
43//! # Future Extensions (Illustrative)
44//! * `endpoints()` -> &[Endpoint]
45//! * `filters()` / `routers()` -> pattern components
46//! * `adapters()` -> inbound / outbound integration points
47//! * `correlations()` -> tracking groups for aggregation patterns
48//! These will be added as additional fields with accessor methods while keeping
49//! `AlloraRuntime` construction centralized in the DSL facade.
50use crate::channel::Channel;
51use crate::dsl::component_builders::ServiceProcessor;
52use crate::service_activator_processor::ServiceActivatorProcessor;
53use crate::Filter;
54use crate::HttpInboundAdapter;
55use crate::HttpOutboundAdapter;
56use std::sync::Arc;
57
58#[derive(Debug)]
59/// Aggregated runtime container for all built components (channels today, more later).
60///
61/// Prefer borrowing via the accessor methods (`channels()`, `channel_by_id`) for read-only
62/// operations. Use `into_channels()` only when you need ownership transfer (e.g. embedding
63/// channels into another structure or performing manual lifecycle management).
64pub struct AlloraRuntime {
65 channels: Vec<Arc<dyn Channel>>,
66 filters: Vec<FilterActivation>,
67 services: Vec<ServiceProcessor>,
68 service_activator_processors: Vec<ServiceActivatorProcessor>,
69 http_inbound_adapters: Vec<HttpInboundAdapter>,
70 http_outbound_adapters: Vec<HttpOutboundAdapter>,
71}
72
73/// Pairs a [`Filter`] predicate with the channel routing metadata from
74/// the [`FilterSpec`] it was built from.
75///
76/// A bare `Filter` only knows how to evaluate its predicate
77/// (`accepts(&exchange)`); it has no notion of "where to consume from" or
78/// "where to forward to." `FilterActivation` carries that wiring intent
79/// alongside the predicate so the runtime can subscribe each filter to
80/// its inbound channel and dispatch accepted exchanges to its outbound
81/// channel — the same pattern `ServiceActivatorProcessor` uses for
82/// services.
83///
84/// Constructed by [`build_filters_from_spec`](crate::dsl::component_builders::build_filters_from_spec)
85/// from a parsed [`FiltersSpec`] and stored on the runtime via
86/// [`with_filters`](AlloraRuntime::with_filters). The runtime calls
87/// [`wire_filters`](crate::wire_filters) during startup to subscribe
88/// each activation to its channels.
89///
90/// When `to` is `None` the filter is treated as **predicate-only** —
91/// kept in the runtime for callers that hold the `AlloraRuntime` and
92/// invoke `filter.accepts(...)` directly, but not auto-wired.
93pub struct FilterActivation {
94 filter: std::sync::Arc<Filter>,
95 from: String,
96 to: Option<String>,
97 id: String,
98}
99
100impl std::fmt::Debug for FilterActivation {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("FilterActivation")
103 .field("id", &self.id)
104 .field("from", &self.from)
105 .field("to", &self.to)
106 .finish()
107 }
108}
109
110impl FilterActivation {
111 /// Build a new activation from a `Filter` (typically the output of
112 /// `Filter::from_apl_with_id`) and its routing metadata.
113 pub fn new(
114 filter: Filter,
115 from: impl Into<String>,
116 to: Option<String>,
117 id: impl Into<String>,
118 ) -> Self {
119 Self {
120 filter: std::sync::Arc::new(filter),
121 from: from.into(),
122 to,
123 id: id.into(),
124 }
125 }
126
127 /// The wrapped filter predicate (shared via `Arc` so the runtime can
128 /// hand a clone to its subscription closure).
129 pub fn filter(&self) -> &std::sync::Arc<Filter> {
130 &self.filter
131 }
132
133 /// Inbound channel ID the runtime subscribes the filter to.
134 pub fn from(&self) -> &str {
135 &self.from
136 }
137
138 /// Outbound channel ID the runtime dispatches accepted exchanges to.
139 /// `None` means the filter is predicate-only (not auto-wired by the
140 /// runtime; available via the runtime accessor for external use).
141 pub fn to(&self) -> Option<&str> {
142 self.to.as_deref()
143 }
144
145 /// Stable identifier (matches the `FilterSpec`'s `id`, or a
146 /// deterministic `filter:auto.N` if the spec omitted it).
147 pub fn id(&self) -> &str {
148 &self.id
149 }
150}
151
152impl AlloraRuntime {
153 /// Create a new runtime instance from a vector of channels.
154 ///
155 /// Typically, invoked internally by the DSL (`build_runtime_from_str`).
156 pub fn new(channels: Vec<Box<dyn Channel>>) -> Self {
157 let channels_arc = channels.into_iter().map(|c| Arc::from(c)).collect();
158 Self {
159 channels: channels_arc,
160 filters: Vec::new(),
161 services: Vec::new(),
162 service_activator_processors: Vec::new(),
163 http_inbound_adapters: Vec::new(),
164 http_outbound_adapters: Vec::new(),
165 }
166 }
167 /// Sets the filter activations for this runtime.
168 ///
169 /// Consumes the provided activations vector and assigns it to the
170 /// runtime. The runtime's `wire_filters` step then subscribes each
171 /// activation with a non-`None` `to:` to its inbound channel.
172 pub fn with_filters(mut self, filters: Vec<FilterActivation>) -> Self {
173 self.filters = filters;
174 self
175 }
176 /// Sets the services for this runtime.
177 ///
178 /// Consumes the provided services vector and assigns it to the runtime.
179 pub fn with_services(mut self, services: Vec<ServiceProcessor>) -> Self {
180 self.services = services;
181 self
182 }
183 pub fn with_service_processors(mut self, proc: Vec<ServiceActivatorProcessor>) -> Self {
184 self.service_activator_processors = proc;
185 self
186 }
187 /// Sets the HTTP inbound adapters for this runtime.
188 ///
189 /// Consumes the provided adapters vector and assigns it to the runtime.
190 pub fn with_http_inbound_adapters(mut self, adapters: Vec<HttpInboundAdapter>) -> Self {
191 self.http_inbound_adapters = adapters;
192 self
193 }
194 pub fn with_http_outbound_adapters(mut self, adapters: Vec<HttpOutboundAdapter>) -> Self {
195 self.http_outbound_adapters = adapters;
196 self
197 }
198 /// Borrow all channels as an iterator of &dyn Channel (zero allocation).
199 pub fn channels(&self) -> impl Iterator<Item = &dyn Channel> {
200 self.channels.iter().map(|c| c.as_ref())
201 }
202 /// Borrow underlying boxed channel slice (rarely needed).
203 pub fn channels_slice(&self) -> &[Arc<dyn Channel>] {
204 &self.channels
205 }
206 /// Borrow all filter activations (read-only slice).
207 pub fn filters(&self) -> &[FilterActivation] {
208 &self.filters
209 }
210 /// Borrow all services (read-only slice).
211 pub fn services(&self) -> &[ServiceProcessor] {
212 &self.services
213 }
214 /// Consume the runtime, yielding owned channels.
215 pub fn into_channels(self) -> Vec<Arc<dyn Channel>> {
216 self.channels
217 }
218 /// Consume the runtime, yielding owned filter activations.
219 pub fn into_filters(self) -> Vec<FilterActivation> {
220 self.filters
221 }
222 /// Consume the runtime, yielding owned services.
223 pub fn into_services(self) -> Vec<ServiceProcessor> {
224 self.services
225 }
226 /// Find a channel by its id; returns `None` if not present.
227 ///
228 /// Complexity: O(n). Optimizations (hash index) can be added later without
229 /// changing this method's signature or semantics.
230 pub fn channel_by_id(&self, id: &str) -> Option<&dyn Channel> {
231 self.channels
232 .iter()
233 .find(|c| c.id() == id)
234 .map(|c| c.as_ref())
235 }
236 /// Return a cloned Arc<dyn Channel> by id if it exists (helper for builders needing ownership).
237 pub fn channel_ref_by_id(&self, id: &str) -> Option<Arc<dyn Channel>> {
238 self.channels
239 .iter()
240 .find(|c| c.id() == id)
241 .map(|c| Arc::clone(c))
242 }
243 /// Generic typed channel lookup: returns &T if a channel with `id` exists and downcasts to T.
244 pub fn channel_typed<T: Channel + 'static>(&self, id: &str) -> Option<&T> {
245 self.channels
246 .iter()
247 .find(|c| c.id() == id)
248 .and_then(|c| c.as_any().downcast_ref::<T>())
249 }
250 /// Required typed channel lookup: panics with a clear message if missing or wrong type.
251 pub fn channel<T: Channel + 'static>(&self, id: &str) -> &T {
252 for c in &self.channels {
253 if c.id() == id {
254 if let Some(t) = c.as_any().downcast_ref::<T>() {
255 return t;
256 } else {
257 panic!(
258 "channel '{}' exists with kind '{}' but does not match expected type '{}'",
259 id,
260 c.kind(),
261 std::any::type_name::<T>()
262 );
263 }
264 }
265 }
266 panic!(
267 "channel '{}' not found (expected type '{}')",
268 id,
269 std::any::type_name::<T>()
270 );
271 }
272 /// Predicate: does channel id exist and is of type T?
273 pub fn channel_is<T: Channel + 'static>(&self, id: &str) -> bool {
274 self.channel_typed::<T>(id).is_some()
275 }
276 /// Total number of channels in this runtime.
277 pub fn channel_count(&self) -> usize {
278 self.channels.len()
279 }
280 /// Total number of filters in this runtime.
281 pub fn filter_count(&self) -> usize {
282 self.filters.len()
283 }
284 /// Total number of services in this runtime.
285 pub fn service_count(&self) -> usize {
286 self.services.len()
287 }
288 /// Total number of service processors in this runtime.
289 pub fn service_processor_count(&self) -> usize {
290 self.service_activator_processors.len()
291 }
292 pub fn service_activator_processors(&self) -> &[ServiceActivatorProcessor] {
293 &self.service_activator_processors
294 }
295 pub fn service_activator_processor_by_id(
296 &self,
297 id: &str,
298 ) -> Option<&ServiceActivatorProcessor> {
299 self.service_activator_processors
300 .iter()
301 .find(|p| p.id() == id)
302 }
303 pub fn service_activator_processor_mut_by_id(
304 &mut self,
305 id: &str,
306 ) -> Option<&mut ServiceActivatorProcessor> {
307 self.service_activator_processors
308 .iter_mut()
309 .find(|p| p.id() == id)
310 }
311 pub fn http_inbound_adapters(&self) -> &[HttpInboundAdapter] {
312 &self.http_inbound_adapters
313 }
314 pub fn http_inbound_adapter_count(&self) -> usize {
315 self.http_inbound_adapters.len()
316 }
317 pub fn http_outbound_adapters(&self) -> &[HttpOutboundAdapter] {
318 &self.http_outbound_adapters
319 }
320 pub fn http_outbound_adapter_count(&self) -> usize {
321 self.http_outbound_adapters.len()
322 }
323}