1use crate::{
87 all_service_descriptors,
88 channel::Channel,
89 dsl::build,
90 dsl::runtime::AlloraRuntime,
91 error::{Error, Result},
92 service,
93};
94use std::path::{Path, PathBuf};
95use std::sync::Arc;
96use tracing::{debug, info, trace};
97
98#[derive(Debug, Clone)]
100pub struct Runtime {
101 config_path: Option<PathBuf>,
102}
103
104impl Default for Runtime {
105 fn default() -> Self {
106 Self { config_path: None }
107 }
108}
109
110impl Runtime {
111 pub fn new() -> Self {
113 Self::default()
114 }
115
116 pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
123 self.config_path = Some(path.as_ref().to_path_buf());
124 self
125 }
126
127 pub fn run(self) -> Result<AlloraRuntime> {
144 let explicit_opt = self.config_path.clone();
145 let path = match &explicit_opt {
146 Some(p) => p.clone(),
147 None => resolve_default_config(),
148 };
149
150 if let Some(parent) = path.parent() {
151 crate::logging::init_from_dir(parent);
152 } else {
153 crate::logging::init_from_dir(Path::new("."));
154 }
155
156 let exists = path.exists();
157 let canonical_opt = if exists {
158 path.canonicalize().ok()
159 } else {
160 None
161 };
162
163 if explicit_opt.is_none() {
165 info!(
166 config.path=%path.display(),
167 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
168 canonical=canonical_opt.is_some(),
169 auto=true,
170 "Configuration auto-discovered"
171 );
172 } else {
173 info!(
174 config.path=%path.display(),
175 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
176 canonical=canonical_opt.is_some(),
177 auto=false,
178 "Configuration resolved"
179 );
180 }
181
182 if !exists {
183 return Err(Error::runtime(format!(
184 "config file '{}' not found",
185 path.display()
186 )));
187 }
188
189 let rt = build(&path)?;
190 wire_services(&rt)?;
191 wire_filters(&rt)?;
192 debug!(
193 channels = rt.channel_count(),
194 filters = rt.filter_count(),
195 "Runtime constructed"
196 );
197 Ok(rt)
198 }
199}
200
201pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
202 let descriptors = all_service_descriptors();
203 debug!(
204 service_activator.processors = rt.service_processor_count(),
205 descriptors = descriptors.len(),
206 "service wiring start"
207 );
208 for d in &descriptors {
209 trace!(descriptor.impl = d.name, "service descriptor loaded");
210 }
211 let mut service_activator_wirings: Vec<(
212 Arc<dyn Channel>,
213 Arc<dyn Channel>,
214 Arc<dyn service::Service>,
215 String,
216 )> = Vec::new();
217 for sp in rt.service_activator_processors().iter() {
218 let name_key = sp.ref_name();
219 trace!(
220 service_activator.ref_name = name_key,
221 service.id = sp.id(),
222 from = sp.from(),
223 to = sp.to(),
224 "evaluating service processor"
225 );
226 for desc in descriptors.iter() {
227 if desc.name == name_key {
228 trace!(service_activator.ref_name = name_key, "descriptor matched");
229 if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
230 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
231 let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
232 if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
233 debug!(
234 service_activator.ref_name = name_key,
235 inbound = sp.from(),
236 outbound = sp.to(),
237 "channels resolved – scheduling wiring"
238 );
239 let proc_arc = (desc.constructor)();
240 service_activator_wirings.push((
241 in_arc.clone(),
242 out_arc.clone(),
243 proc_arc,
244 name_key.to_string(),
245 ));
246 } else {
247 debug!(
248 service_activator.ref_name = name_key,
249 inbound_found = inbound_arc_opt.is_some(),
250 outbound_found = outbound_arc_opt.is_some(),
251 "channel resolution failed – wiring skipped"
252 );
253 }
254 } else {
255 debug!(
256 service_activator.ref_name = name_key,
257 "channel ids not found – skipped"
258 );
259 }
260 }
261 }
262 }
263 if service_activator_wirings.is_empty() {
264 info!("no services wired (none matched or channels missing)");
265 } else {
266 info!(
267 wired.count = service_activator_wirings.len(),
268 "service wiring collected"
269 );
270 }
271 for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
272 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
273 let outbound_arc_dyn = out_arc.clone();
274 let inbound_id = inbound_direct.id().to_string();
275 let name_key_closure = name_key.clone();
276 let proc_shared = proc_arc.clone();
277 let sub_count = inbound_direct.subscribe(move |exchange| {
278 let outbound_clone = outbound_arc_dyn.clone();
279 let proc_task = proc_shared.clone();
280 let name_key_val = name_key_closure.clone();
281 tokio::spawn(async move {
282 let mut ex_mut = exchange;
283 if let Err(err) = proc_task.process(&mut ex_mut).await {
284 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
285 return;
286 }
287 if let Err(err) = outbound_clone.send(ex_mut).await {
288 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
289 }
290 });
291 Ok(())
292 });
293 debug!(
294 service_activator.ref_name = name_key,
295 inbound = inbound_id,
296 subscribers = sub_count,
297 "service wired"
298 );
299 } else {
300 debug!(
301 service_activator.ref_name = name_key,
302 inbound_id = in_arc.id(),
303 "inbound channel not direct – skipping wiring"
304 );
305 }
306 }
307 for ch in rt.channels() {
308 debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
309 }
310 debug!(
311 services.wired = rt.service_processor_count(),
312 "runtime wiring complete"
313 );
314 Ok(())
315}
316
317pub fn wire_filters(rt: &AlloraRuntime) -> Result<()> {
339 debug!(
340 filter.activations = rt.filter_count(),
341 "filter wiring start"
342 );
343 let mut filter_wirings: Vec<(
344 Arc<dyn Channel>,
345 Arc<dyn Channel>,
346 Arc<crate::Filter>,
347 String,
348 )> = Vec::new();
349 for fa in rt.filters().iter() {
350 let Some(to) = fa.to() else {
351 debug!(
352 filter.id = fa.id(),
353 from = fa.from(),
354 "filter has no `to:` — predicate-only, not auto-wired"
355 );
356 continue;
357 };
358 trace!(
359 filter.id = fa.id(),
360 from = fa.from(),
361 to = to,
362 "evaluating filter activation"
363 );
364 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == fa.from());
365 let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == to);
366 if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
367 debug!(
368 filter.id = fa.id(),
369 inbound = fa.from(),
370 outbound = to,
371 "channels resolved – scheduling filter wiring"
372 );
373 filter_wirings.push((
374 in_arc.clone(),
375 out_arc.clone(),
376 fa.filter().clone(),
377 fa.id().to_string(),
378 ));
379 } else {
380 debug!(
381 filter.id = fa.id(),
382 from = fa.from(),
383 to = to,
384 inbound_found = inbound_arc_opt.is_some(),
385 outbound_found = outbound_arc_opt.is_some(),
386 "filter channel resolution failed – wiring skipped"
387 );
388 }
389 }
390 if filter_wirings.is_empty() {
391 info!("no filters wired (none had `to:` channels resolvable on the runtime)");
392 } else {
393 info!(
394 wired.count = filter_wirings.len(),
395 "filter wiring collected"
396 );
397 }
398 for (in_arc, out_arc, filter_arc, id) in filter_wirings.into_iter() {
399 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
400 let outbound_arc_dyn = out_arc.clone();
401 let inbound_id = inbound_direct.id().to_string();
402 let id_closure = id.clone();
403 let sub_count = inbound_direct.subscribe(move |exchange| {
404 let outbound_clone = outbound_arc_dyn.clone();
405 let f = filter_arc.clone();
406 let id_val = id_closure.clone();
407 tokio::spawn(async move {
408 if !f.accepts(&exchange) {
409 trace!(target="allora::filter", filter.id=%id_val, "filter rejected exchange (dropped)");
410 return;
411 }
412 if let Err(err) = outbound_clone.send(exchange).await {
413 tracing::error!(target="allora::filter", filter.id=%id_val, error=%err, "Filter outbound channel send failed");
414 }
415 });
416 Ok(())
417 });
418 debug!(
419 filter.id = id,
420 inbound = inbound_id,
421 subscribers = sub_count,
422 "filter wired"
423 );
424 } else {
425 debug!(
426 filter.id = id,
427 inbound_id = in_arc.id(),
428 "inbound channel not direct – skipping filter wiring"
429 );
430 }
431 }
432 debug!(
433 filters.wired = rt.filter_count(),
434 "filter runtime wiring complete"
435 );
436 Ok(())
437}
438
439fn resolve_default_config() -> PathBuf {
440 use std::env;
441
442 let mut args = env::args().skip(1); let mut runtime_override: Option<String> = None;
453
454 while let Some(arg) = args.next() {
455 if arg == "--runtime" {
456 if let Some(val) = args.next() {
457 runtime_override = Some(val);
458 }
459 break;
460 } else if let Some(rest) = arg.strip_prefix("--runtime=") {
461 runtime_override = Some(rest.to_string());
462 break;
463 }
464 }
465
466 if let Some(raw) = runtime_override {
467 let p = PathBuf::from(raw);
468 if p.is_dir() {
470 return p.join("allora.yml");
471 } else {
472 return p;
473 }
474 }
475
476 if let Ok(raw) = env::var("ALLORA_CONFIG") {
478 let p = PathBuf::from(raw);
479 if p.is_dir() {
480 return p.join("allora.yml");
481 } else {
482 return p;
483 }
484 }
485
486 let cwd_candidate = PathBuf::from("allora.yml");
488 if cwd_candidate.exists() {
489 return cwd_candidate;
490 }
491
492 if let Ok(exe) = env::current_exe() {
494 if let Some(dir) = exe.parent() {
495 let candidate = dir.join("allora.yml");
496 if candidate.exists() {
497 return candidate;
498 }
499 }
500 }
501
502 PathBuf::from("allora.yml")
504}
505
506#[cfg(test)]
507mod wire_filters_tests {
508 use super::wire_filters;
514 use crate::dsl::build_runtime_from_str;
515 use crate::dsl::runtime::AlloraRuntime;
516 use crate::DirectChannel;
517 use allora_core::{Exchange, Message};
518 use std::sync::{Arc, Mutex};
519 use std::time::Duration;
520
521 fn build_with_filter_yaml() -> allora_core::Result<AlloraRuntime> {
522 let yaml = r#"
523version: 1
524channels:
525 - kind: direct
526 id: inbound
527 - kind: direct
528 id: high_priority
529filters:
530 - id: filt.priority
531 from: inbound
532 to: high_priority
533 when: header("Priority") == "high"
534"#;
535 build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)
536 }
537
538 fn collect_into(rt: &AlloraRuntime, channel_id: &str) -> Arc<Mutex<Vec<String>>> {
542 let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
543 let arc = rt
544 .channels_slice()
545 .iter()
546 .find(|c| c.id() == channel_id)
547 .cloned()
548 .expect("channel registered");
549 let direct = arc
550 .as_any()
551 .downcast_ref::<DirectChannel>()
552 .expect("channel is direct");
553 let cl = recorded.clone();
554 direct.subscribe(move |ex| {
555 cl.lock()
556 .unwrap()
557 .push(ex.in_msg.body_text().unwrap_or("").to_string());
558 Ok(())
559 });
560 recorded
561 }
562
563 #[tokio::test]
564 async fn filter_forwards_accepted_and_drops_rejected() -> allora_core::Result<()> {
565 let rt = build_with_filter_yaml()?;
566 wire_filters(&rt)?;
567 let high_priority = collect_into(&rt, "high_priority");
568
569 let inbound = rt
570 .channels_slice()
571 .iter()
572 .find(|c| c.id() == "inbound")
573 .cloned()
574 .expect("inbound registered");
575
576 inbound
578 .send(Exchange::new(Message::from_text("no-header")))
579 .await?;
580 let mut low = Exchange::new(Message::from_text("low"));
582 low.in_msg.set_header("Priority", "low");
583 inbound.send(low).await?;
584 let mut high = Exchange::new(Message::from_text("high"));
586 high.in_msg.set_header("Priority", "high");
587 inbound.send(high).await?;
588
589 tokio::time::sleep(Duration::from_millis(50)).await;
592
593 let got = high_priority.lock().unwrap().clone();
594 assert_eq!(
595 got,
596 vec!["high".to_string()],
597 "only Priority=high should reach high_priority; got {got:?}"
598 );
599 Ok(())
600 }
601
602 #[tokio::test]
603 async fn yaml_without_filters_is_a_clean_noop() -> allora_core::Result<()> {
604 let yaml = r#"
605version: 1
606channels:
607 - kind: direct
608 id: inbound
609"#;
610 let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
611 assert_eq!(rt.filter_count(), 0);
612 wire_filters(&rt)?; Ok(())
614 }
615}