1use crate::{
87 all_service_descriptors,
88 channel::Channel,
89 dsl::build,
90 dsl::runtime::AlloraRuntime,
91 error::{Error, Result},
92 service,
93};
94use allora_core::adapter::OutboundAdapter;
95use std::path::{Path, PathBuf};
96use std::sync::Arc;
97use tracing::{debug, info, trace};
98
99#[derive(Debug, Clone)]
101pub struct Runtime {
102 config_path: Option<PathBuf>,
103}
104
105impl Default for Runtime {
106 fn default() -> Self {
107 Self { config_path: None }
108 }
109}
110
111impl Runtime {
112 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn with_config_file<P: AsRef<Path>>(mut self, path: P) -> Self {
124 self.config_path = Some(path.as_ref().to_path_buf());
125 self
126 }
127
128 pub fn run(self) -> Result<AlloraRuntime> {
145 let explicit_opt = self.config_path.clone();
146 let path = match &explicit_opt {
147 Some(p) => p.clone(),
148 None => resolve_default_config(),
149 };
150
151 if let Some(parent) = path.parent() {
152 crate::logging::init_from_dir(parent);
153 } else {
154 crate::logging::init_from_dir(Path::new("."));
155 }
156
157 let exists = path.exists();
158 let canonical_opt = if exists {
159 path.canonicalize().ok()
160 } else {
161 None
162 };
163
164 if explicit_opt.is_none() {
166 info!(
167 config.path=%path.display(),
168 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
169 canonical=canonical_opt.is_some(),
170 auto=true,
171 "Configuration auto-discovered"
172 );
173 } else {
174 info!(
175 config.path=%path.display(),
176 config.canonical=?canonical_opt.as_ref().map(|p| p.display().to_string()),
177 canonical=canonical_opt.is_some(),
178 auto=false,
179 "Configuration resolved"
180 );
181 }
182
183 if !exists {
184 return Err(Error::runtime(format!(
185 "config file '{}' not found",
186 path.display()
187 )));
188 }
189
190 let rt = build(&path)?;
191 wire_services(&rt)?;
192 wire_filters(&rt)?;
193 wire_http_outbound_adapters(&rt)?;
194 debug!(
195 channels = rt.channel_count(),
196 filters = rt.filter_count(),
197 "Runtime constructed"
198 );
199 Ok(rt)
200 }
201}
202
203pub fn wire_services(rt: &AlloraRuntime) -> Result<()> {
204 let descriptors = all_service_descriptors();
205 debug!(
206 service_activator.processors = rt.service_processor_count(),
207 descriptors = descriptors.len(),
208 "service wiring start"
209 );
210 for d in &descriptors {
211 trace!(descriptor.impl = d.name, "service descriptor loaded");
212 }
213 let mut service_activator_wirings: Vec<(
214 Arc<dyn Channel>,
215 Arc<dyn Channel>,
216 Arc<dyn service::Service>,
217 String,
218 )> = Vec::new();
219 for sp in rt.service_activator_processors().iter() {
220 let name_key = sp.ref_name();
221 trace!(
222 service_activator.ref_name = name_key,
223 service.id = sp.id(),
224 from = sp.from(),
225 to = sp.to(),
226 "evaluating service processor"
227 );
228 for desc in descriptors.iter() {
229 if desc.name == name_key {
230 trace!(service_activator.ref_name = name_key, "descriptor matched");
231 if rt.channel_by_id(sp.from()).is_some() && rt.channel_by_id(sp.to()).is_some() {
232 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.from());
233 let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == sp.to());
234 if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
235 debug!(
236 service_activator.ref_name = name_key,
237 inbound = sp.from(),
238 outbound = sp.to(),
239 "channels resolved – scheduling wiring"
240 );
241 let proc_arc = (desc.constructor)();
242 service_activator_wirings.push((
243 in_arc.clone(),
244 out_arc.clone(),
245 proc_arc,
246 name_key.to_string(),
247 ));
248 } else {
249 debug!(
250 service_activator.ref_name = name_key,
251 inbound_found = inbound_arc_opt.is_some(),
252 outbound_found = outbound_arc_opt.is_some(),
253 "channel resolution failed – wiring skipped"
254 );
255 }
256 } else {
257 debug!(
258 service_activator.ref_name = name_key,
259 "channel ids not found – skipped"
260 );
261 }
262 }
263 }
264 }
265 if service_activator_wirings.is_empty() {
266 info!("no services wired (none matched or channels missing)");
267 } else {
268 info!(
269 wired.count = service_activator_wirings.len(),
270 "service wiring collected"
271 );
272 }
273 for (in_arc, out_arc, proc_arc, name_key) in service_activator_wirings.into_iter() {
274 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
275 let outbound_arc_dyn = out_arc.clone();
276 let inbound_id = inbound_direct.id().to_string();
277 let name_key_closure = name_key.clone();
278 let proc_shared = proc_arc.clone();
279 let sub_count = inbound_direct.subscribe(move |exchange| {
280 let outbound_clone = outbound_arc_dyn.clone();
281 let proc_task = proc_shared.clone();
282 let name_key_val = name_key_closure.clone();
283 tokio::spawn(async move {
284 let mut ex_mut = exchange;
285 if let Err(err) = proc_task.process(&mut ex_mut).await {
286 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Service async processing failed");
287 return;
288 }
289 if let Err(err) = outbound_clone.send(ex_mut).await {
290 tracing::error!(target="allora::service", service.impl=%name_key_val, error=%err, "Outbound channel send failed");
291 }
292 });
293 Ok(())
294 });
295 debug!(
296 service_activator.ref_name = name_key,
297 inbound = inbound_id,
298 subscribers = sub_count,
299 "service wired"
300 );
301 } else {
302 debug!(
303 service_activator.ref_name = name_key,
304 inbound_id = in_arc.id(),
305 "inbound channel not direct – skipping wiring"
306 );
307 }
308 }
309 for ch in rt.channels() {
310 debug!(channel.id = ch.id(), kind = ch.kind(), "channel registered");
311 }
312 debug!(
313 services.wired = rt.service_processor_count(),
314 "runtime wiring complete"
315 );
316 Ok(())
317}
318
319pub fn wire_filters(rt: &AlloraRuntime) -> Result<()> {
341 debug!(
342 filter.activations = rt.filter_count(),
343 "filter wiring start"
344 );
345 let mut filter_wirings: Vec<(
346 Arc<dyn Channel>,
347 Arc<dyn Channel>,
348 Arc<crate::Filter>,
349 String,
350 )> = Vec::new();
351 for fa in rt.filters().iter() {
352 let Some(to) = fa.to() else {
353 debug!(
354 filter.id = fa.id(),
355 from = fa.from(),
356 "filter has no `to:` — predicate-only, not auto-wired"
357 );
358 continue;
359 };
360 trace!(
361 filter.id = fa.id(),
362 from = fa.from(),
363 to = to,
364 "evaluating filter activation"
365 );
366 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == fa.from());
367 let outbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == to);
368 if let (Some(in_arc), Some(out_arc)) = (inbound_arc_opt, outbound_arc_opt) {
369 debug!(
370 filter.id = fa.id(),
371 inbound = fa.from(),
372 outbound = to,
373 "channels resolved – scheduling filter wiring"
374 );
375 filter_wirings.push((
376 in_arc.clone(),
377 out_arc.clone(),
378 fa.filter().clone(),
379 fa.id().to_string(),
380 ));
381 } else {
382 debug!(
383 filter.id = fa.id(),
384 from = fa.from(),
385 to = to,
386 inbound_found = inbound_arc_opt.is_some(),
387 outbound_found = outbound_arc_opt.is_some(),
388 "filter channel resolution failed – wiring skipped"
389 );
390 }
391 }
392 if filter_wirings.is_empty() {
393 info!("no filters wired (none had `to:` channels resolvable on the runtime)");
394 } else {
395 info!(
396 wired.count = filter_wirings.len(),
397 "filter wiring collected"
398 );
399 }
400 for (in_arc, out_arc, filter_arc, id) in filter_wirings.into_iter() {
401 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
402 let outbound_arc_dyn = out_arc.clone();
403 let inbound_id = inbound_direct.id().to_string();
404 let id_closure = id.clone();
405 let sub_count = inbound_direct.subscribe(move |exchange| {
406 let outbound_clone = outbound_arc_dyn.clone();
407 let f = filter_arc.clone();
408 let id_val = id_closure.clone();
409 tokio::spawn(async move {
410 if !f.accepts(&exchange) {
411 trace!(target="allora::filter", filter.id=%id_val, "filter rejected exchange (dropped)");
412 return;
413 }
414 if let Err(err) = outbound_clone.send(exchange).await {
415 tracing::error!(target="allora::filter", filter.id=%id_val, error=%err, "Filter outbound channel send failed");
416 }
417 });
418 Ok(())
419 });
420 debug!(
421 filter.id = id,
422 inbound = inbound_id,
423 subscribers = sub_count,
424 "filter wired"
425 );
426 } else {
427 debug!(
428 filter.id = id,
429 inbound_id = in_arc.id(),
430 "inbound channel not direct – skipping filter wiring"
431 );
432 }
433 }
434 debug!(
435 filters.wired = rt.filter_count(),
436 "filter runtime wiring complete"
437 );
438 Ok(())
439}
440
441pub fn wire_http_outbound_adapters(rt: &AlloraRuntime) -> Result<()> {
468 debug!(
469 http_outbound.activations = rt.http_outbound_adapter_count(),
470 "http outbound wiring start"
471 );
472 let mut wirings: Vec<(
473 Arc<dyn Channel>,
474 Option<Arc<dyn Channel>>,
475 Arc<allora_http::HttpOutboundAdapter>,
476 String,
477 )> = Vec::new();
478 for activation in rt.http_outbound_adapters() {
479 let Some(from) = activation.from() else {
480 trace!(
481 http_outbound.id = activation.id(),
482 "adapter has no `from:` — static-only, not auto-wired"
483 );
484 continue;
485 };
486 trace!(
487 http_outbound.id = activation.id(),
488 from = from,
489 to = activation.to(),
490 "evaluating http outbound activation"
491 );
492 let inbound_arc_opt = rt.channels_slice().iter().find(|c| c.id() == from);
493 if inbound_arc_opt.is_none() {
494 debug!(
495 http_outbound.id = activation.id(),
496 from = from,
497 "inbound channel not found – wiring skipped"
498 );
499 continue;
500 }
501 let outbound_arc_opt = match activation.to() {
506 None => None,
507 Some(to_name) => match rt.channels_slice().iter().find(|c| c.id() == to_name) {
508 Some(arc) => Some(arc.clone()),
509 None => {
510 tracing::warn!(
511 target = "allora::http_outbound",
512 http_outbound.id = activation.id(),
513 from = from,
514 to = to_name,
515 "outbound channel declared but not registered – wiring skipped \
516 (config mismatch; either declare the channel or remove `to:`)"
517 );
518 continue;
519 }
520 },
521 };
522 debug!(
523 http_outbound.id = activation.id(),
524 inbound = from,
525 outbound = activation.to(),
526 "channels resolved – scheduling http outbound wiring"
527 );
528 wirings.push((
529 inbound_arc_opt.unwrap().clone(),
530 outbound_arc_opt,
531 activation.adapter().clone(),
532 activation.id().to_string(),
533 ));
534 }
535 if wirings.is_empty() {
536 info!("no http outbound adapters wired");
537 } else {
538 info!(
539 wired.count = wirings.len(),
540 "http outbound wiring collected"
541 );
542 }
543 for (in_arc, out_arc_opt, adapter_arc, id) in wirings.into_iter() {
544 if let Some(inbound_direct) = in_arc.as_any().downcast_ref::<crate::DirectChannel>() {
545 let inbound_id = inbound_direct.id().to_string();
546 let id_closure = id.clone();
547 let sub_count = inbound_direct.subscribe(move |exchange| {
548 let outbound_clone = out_arc_opt.clone();
549 let adapter_clone = adapter_arc.clone();
550 let id_val = id_closure.clone();
551 tokio::spawn(async move {
552 match adapter_clone.dispatch(&exchange).await {
553 Ok(result) => {
554 let Some(outbound) = outbound_clone else {
555 tracing::debug!(
558 target = "allora::http_outbound",
559 http_outbound.id = %id_val,
560 status_code = ?result.status_code,
561 acknowledged = result.acknowledged,
562 "dispatched (fire-and-forget)"
563 );
564 return;
565 };
566 let mut ex_mut = exchange;
571 if let Some(body) = result.body {
572 ex_mut.in_msg.payload = allora_core::Payload::Text(body);
573 }
574 if let Some(code) = result.status_code {
575 ex_mut
576 .in_msg
577 .set_header("dispatch-result.status-code", &code.to_string());
578 }
579 ex_mut.in_msg.set_header(
580 "dispatch-result.acknowledged",
581 if result.acknowledged { "true" } else { "false" },
582 );
583 if let Err(err) = outbound.send(ex_mut).await {
584 tracing::error!(
585 target = "allora::http_outbound",
586 http_outbound.id = %id_val,
587 error = %err,
588 "outbound channel send failed"
589 );
590 }
591 }
592 Err(err) => {
593 tracing::error!(
594 target = "allora::http_outbound",
595 http_outbound.id = %id_val,
596 error = %err,
597 "http outbound dispatch failed"
598 );
599 }
600 }
601 });
602 Ok(())
603 });
604 debug!(
605 http_outbound.id = id,
606 inbound = inbound_id,
607 subscribers = sub_count,
608 "http outbound wired"
609 );
610 } else {
611 debug!(
612 http_outbound.id = id,
613 inbound_id = in_arc.id(),
614 "inbound channel not direct – skipping http outbound wiring"
615 );
616 }
617 }
618 debug!(
619 http_outbound.activations = rt.http_outbound_adapter_count(),
620 "http outbound runtime wiring complete"
621 );
622 Ok(())
623}
624
625fn resolve_default_config() -> PathBuf {
626 use std::env;
627
628 let mut args = env::args().skip(1); let mut runtime_override: Option<String> = None;
639
640 while let Some(arg) = args.next() {
641 if arg == "--runtime" {
642 if let Some(val) = args.next() {
643 runtime_override = Some(val);
644 }
645 break;
646 } else if let Some(rest) = arg.strip_prefix("--runtime=") {
647 runtime_override = Some(rest.to_string());
648 break;
649 }
650 }
651
652 if let Some(raw) = runtime_override {
653 let p = PathBuf::from(raw);
654 if p.is_dir() {
656 return p.join("allora.yml");
657 } else {
658 return p;
659 }
660 }
661
662 if let Ok(raw) = env::var("ALLORA_CONFIG") {
664 let p = PathBuf::from(raw);
665 if p.is_dir() {
666 return p.join("allora.yml");
667 } else {
668 return p;
669 }
670 }
671
672 let cwd_candidate = PathBuf::from("allora.yml");
674 if cwd_candidate.exists() {
675 return cwd_candidate;
676 }
677
678 if let Ok(exe) = env::current_exe() {
680 if let Some(dir) = exe.parent() {
681 let candidate = dir.join("allora.yml");
682 if candidate.exists() {
683 return candidate;
684 }
685 }
686 }
687
688 PathBuf::from("allora.yml")
690}
691
692#[cfg(test)]
693mod wire_filters_tests {
694 use super::wire_filters;
700 use crate::dsl::build_runtime_from_str;
701 use crate::dsl::runtime::AlloraRuntime;
702 use crate::DirectChannel;
703 use allora_core::{Exchange, Message};
704 use std::sync::{Arc, Mutex};
705 use std::time::Duration;
706
707 fn build_with_filter_yaml() -> allora_core::Result<AlloraRuntime> {
708 let yaml = r#"
709version: 1
710channels:
711 - kind: direct
712 id: inbound
713 - kind: direct
714 id: high_priority
715filters:
716 - id: filt.priority
717 from: inbound
718 to: high_priority
719 when: header("Priority") == "high"
720"#;
721 build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)
722 }
723
724 fn collect_into(rt: &AlloraRuntime, channel_id: &str) -> Arc<Mutex<Vec<String>>> {
728 let recorded = Arc::new(Mutex::new(Vec::<String>::new()));
729 let arc = rt
730 .channels_slice()
731 .iter()
732 .find(|c| c.id() == channel_id)
733 .cloned()
734 .expect("channel registered");
735 let direct = arc
736 .as_any()
737 .downcast_ref::<DirectChannel>()
738 .expect("channel is direct");
739 let cl = recorded.clone();
740 direct.subscribe(move |ex| {
741 cl.lock()
742 .unwrap()
743 .push(ex.in_msg.body_text().unwrap_or("").to_string());
744 Ok(())
745 });
746 recorded
747 }
748
749 #[tokio::test]
750 async fn filter_forwards_accepted_and_drops_rejected() -> allora_core::Result<()> {
751 let rt = build_with_filter_yaml()?;
752 wire_filters(&rt)?;
753 let high_priority = collect_into(&rt, "high_priority");
754
755 let inbound = rt
756 .channels_slice()
757 .iter()
758 .find(|c| c.id() == "inbound")
759 .cloned()
760 .expect("inbound registered");
761
762 inbound
764 .send(Exchange::new(Message::from_text("no-header")))
765 .await?;
766 let mut low = Exchange::new(Message::from_text("low"));
768 low.in_msg.set_header("Priority", "low");
769 inbound.send(low).await?;
770 let mut high = Exchange::new(Message::from_text("high"));
772 high.in_msg.set_header("Priority", "high");
773 inbound.send(high).await?;
774
775 tokio::time::sleep(Duration::from_millis(50)).await;
778
779 let got = high_priority.lock().unwrap().clone();
780 assert_eq!(
781 got,
782 vec!["high".to_string()],
783 "only Priority=high should reach high_priority; got {got:?}"
784 );
785 Ok(())
786 }
787
788 #[tokio::test]
789 async fn yaml_without_filters_is_a_clean_noop() -> allora_core::Result<()> {
790 let yaml = r#"
791version: 1
792channels:
793 - kind: direct
794 id: inbound
795"#;
796 let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
797 assert_eq!(rt.filter_count(), 0);
798 wire_filters(&rt)?; Ok(())
800 }
801}
802
803#[cfg(test)]
804mod wire_http_outbound_tests {
805 use super::wire_http_outbound_adapters;
815 use crate::dsl::build_runtime_from_str;
816 use crate::dsl::runtime::AlloraRuntime;
817 use crate::DirectChannel;
818 use allora_core::{Exchange, Message};
819 use hyper::service::{make_service_fn, service_fn};
820 use hyper::{Body, Request, Response, Server};
821 use std::sync::{Arc, Mutex};
822 use std::time::Duration;
823
824 async fn spawn_capture_server(
832 reply_body: &'static str,
833 reply_status: u16,
834 ) -> (u16, Arc<Mutex<Vec<Vec<u8>>>>) {
835 let std_listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral");
836 let port = std_listener.local_addr().expect("local_addr").port();
837 std_listener.set_nonblocking(true).expect("nonblocking");
838
839 let bodies = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
840 let bodies_cl = bodies.clone();
841 let make = make_service_fn(move |_| {
842 let bodies = bodies_cl.clone();
843 async move {
844 Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
845 let bodies = bodies.clone();
846 async move {
847 let bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
848 bodies.lock().unwrap().push(bytes.to_vec());
849 Ok::<_, hyper::Error>(
850 Response::builder()
851 .status(reply_status)
852 .body(Body::from(reply_body))
853 .unwrap(),
854 )
855 }
856 }))
857 }
858 });
859 let server = Server::from_tcp(std_listener)
862 .expect("hyper from_tcp")
863 .serve(make);
864 tokio::spawn(server);
865 tokio::time::sleep(Duration::from_millis(50)).await; (port, bodies)
867 }
868
869 fn collect_into(
870 rt: &AlloraRuntime,
871 channel_id: &str,
872 ) -> Arc<Mutex<Vec<(String, Option<String>, Option<String>)>>> {
873 let recorded = Arc::new(Mutex::new(Vec::new()));
874 let arc = rt
875 .channels_slice()
876 .iter()
877 .find(|c| c.id() == channel_id)
878 .cloned()
879 .expect("channel registered");
880 let direct = arc
881 .as_any()
882 .downcast_ref::<DirectChannel>()
883 .expect("channel is direct");
884 let cl = recorded.clone();
885 direct.subscribe(move |ex| {
886 let body = ex.in_msg.body_text().unwrap_or("").to_string();
887 let status = ex
888 .in_msg
889 .header("dispatch-result.status-code")
890 .map(|s| s.to_string());
891 let ack = ex
892 .in_msg
893 .header("dispatch-result.acknowledged")
894 .map(|s| s.to_string());
895 cl.lock().unwrap().push((body, status, ack));
896 Ok(())
897 });
898 recorded
899 }
900
901 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
902 async fn dispatches_message_and_forwards_response_to_outbound_channel(
903 ) -> allora_core::Result<()> {
904 let (port, server_bodies) = spawn_capture_server("ok-from-server", 202).await;
905
906 let yaml = format!(
907 r#"
908version: 1
909channels:
910 - kind: direct
911 id: outbound_requests
912 - kind: direct
913 id: dispatch_results
914http-outbound-adapters:
915 - id: test-out
916 host: 127.0.0.1
917 port: {port}
918 base-path: /
919 method: POST
920 from: outbound_requests
921 to: dispatch_results
922"#
923 );
924 let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
925 wire_http_outbound_adapters(&rt)?;
926
927 let results = collect_into(&rt, "dispatch_results");
928 let inbound = rt
929 .channels_slice()
930 .iter()
931 .find(|c| c.id() == "outbound_requests")
932 .cloned()
933 .expect("inbound registered");
934
935 inbound
936 .send(Exchange::new(Message::from_text("hello-server")))
937 .await?;
938 tokio::time::sleep(Duration::from_millis(150)).await;
939
940 let got_bodies = server_bodies.lock().unwrap().clone();
941 assert_eq!(
942 got_bodies,
943 vec![b"hello-server".to_vec()],
944 "server should have recorded the dispatched body once",
945 );
946
947 let got_results = results.lock().unwrap().clone();
948 assert_eq!(got_results.len(), 1, "one post-dispatch exchange expected");
949 let (body, status, ack) = &got_results[0];
950 assert_eq!(body, "ok-from-server");
951 assert_eq!(status.as_deref(), Some("202"));
952 assert_eq!(ack.as_deref(), Some("true"));
953 Ok(())
954 }
955
956 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
957 async fn fire_and_forget_no_to_channel_drops_response() -> allora_core::Result<()> {
958 let (port, server_bodies) = spawn_capture_server("ack", 202).await;
959
960 let yaml = format!(
961 r#"
962version: 1
963channels:
964 - kind: direct
965 id: outbound_requests
966http-outbound-adapters:
967 - id: fire-forget
968 host: 127.0.0.1
969 port: {port}
970 base-path: /
971 method: POST
972 from: outbound_requests
973"#
974 );
975 let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
976 wire_http_outbound_adapters(&rt)?;
977
978 let inbound = rt
979 .channels_slice()
980 .iter()
981 .find(|c| c.id() == "outbound_requests")
982 .cloned()
983 .expect("inbound registered");
984 inbound
985 .send(Exchange::new(Message::from_text("notify")))
986 .await?;
987 tokio::time::sleep(Duration::from_millis(150)).await;
988
989 let got_bodies = server_bodies.lock().unwrap().clone();
990 assert_eq!(got_bodies, vec![b"notify".to_vec()]);
991 Ok(())
994 }
995
996 #[tokio::test]
997 async fn yaml_without_outbound_adapters_is_a_clean_noop() -> allora_core::Result<()> {
998 let yaml = r#"
999version: 1
1000channels:
1001 - kind: direct
1002 id: inbound
1003"#;
1004 let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
1005 assert_eq!(rt.http_outbound_adapter_count(), 0);
1006 wire_http_outbound_adapters(&rt)?; Ok(())
1008 }
1009
1010 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1014 async fn missing_to_channel_skips_wiring_instead_of_silent_fire_and_forget(
1015 ) -> allora_core::Result<()> {
1016 let (port, server_bodies) = spawn_capture_server("ack", 202).await;
1017
1018 let yaml = format!(
1020 r#"
1021version: 1
1022channels:
1023 - kind: direct
1024 id: outbound_requests
1025http-outbound-adapters:
1026 - id: misconfigured
1027 host: 127.0.0.1
1028 port: {port}
1029 base-path: /
1030 method: POST
1031 from: outbound_requests
1032 to: nonexistent
1033"#
1034 );
1035 let rt = build_runtime_from_str(&yaml, crate::dsl::DslFormat::Yaml)?;
1036 wire_http_outbound_adapters(&rt)?;
1037
1038 let inbound = rt
1039 .channels_slice()
1040 .iter()
1041 .find(|c| c.id() == "outbound_requests")
1042 .cloned()
1043 .expect("inbound registered");
1044 inbound
1047 .send(Exchange::new(Message::from_text("should-not-be-sent")))
1048 .await?;
1049 tokio::time::sleep(Duration::from_millis(150)).await;
1050
1051 let got_bodies = server_bodies.lock().unwrap().clone();
1052 assert!(
1053 got_bodies.is_empty(),
1054 "wiring skipped due to missing `to:` should mean the adapter is not subscribed; \
1055 got bodies={got_bodies:?}"
1056 );
1057 Ok(())
1058 }
1059
1060 #[tokio::test]
1061 async fn adapter_without_from_is_static_only_not_wired() -> allora_core::Result<()> {
1062 let yaml = r#"
1063version: 1
1064channels:
1065 - kind: direct
1066 id: anything
1067http-outbound-adapters:
1068 - id: static-out
1069 host: 127.0.0.1
1070 port: 9
1071 base-path: /
1072"#;
1073 let rt = build_runtime_from_str(yaml, crate::dsl::DslFormat::Yaml)?;
1074 assert_eq!(rt.http_outbound_adapter_count(), 1);
1075 let activation = &rt.http_outbound_adapters()[0];
1076 assert_eq!(activation.id(), "static-out");
1077 assert_eq!(activation.from(), None);
1078 assert_eq!(activation.to(), None);
1079 wire_http_outbound_adapters(&rt)?; Ok(())
1081 }
1082}