allora_runtime/
service_activator_processor.rs1use crate::channel::Channel;
2use crate::dsl::runtime::AlloraRuntime;
3use crate::{error::Result, service::Service, spec::ServiceActivatorSpec, Exchange};
4use std::fmt::{Debug, Formatter, Result as FmtResult};
5use std::sync::Arc;
6
7pub struct ServiceActivatorProcessor {
9 activator: ServiceActivatorSpec,
10 service: Option<Arc<dyn Service>>, }
12
13impl Debug for ServiceActivatorProcessor {
14 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
15 f.debug_struct("ServiceActivatorProcessor")
16 .field("id", &self.activator.id())
17 .field("from", &self.activator.from())
18 .field("to", &self.activator.to())
19 .finish()
20 }
21}
22
23impl ServiceActivatorProcessor {
24 pub fn new(activator: ServiceActivatorSpec) -> Self {
25 Self {
26 activator,
27 service: None,
28 }
29 }
30 pub fn id(&self) -> &str {
31 self.activator.id().unwrap_or("")
32 }
33 pub fn from(&self) -> &str {
34 self.activator.from()
35 }
36 pub fn to(&self) -> &str {
37 self.activator.to()
38 }
39 pub fn ref_name(&self) -> &str {
40 self.activator.ref_name()
41 }
42 pub fn has_service(&self) -> bool {
43 self.service.is_some()
44 }
45 pub fn set_service_and_wire(
46 &mut self,
47 svc: impl Service + 'static,
48 runtime: &'static AlloraRuntime,
49 ) -> Result<()> {
50 let arc = Arc::new(svc);
51 let from_id = self.from().to_string();
52 let to_id = self.to().to_string();
53 let inbound = runtime.channel::<crate::DirectChannel>(&from_id);
54 let ref_name = self.ref_name().to_string();
55 let svc_clone = arc.clone();
56 inbound.subscribe(move |exchange: Exchange| {
57 let svc_task = svc_clone.clone();
58 let runtime_ref: &'static AlloraRuntime = runtime;
59 let to_id_clone = to_id.clone();
60 let ref_name_clone = ref_name.clone();
61 tokio::spawn(async move {
62 let mut ex_mut = exchange;
63 if let Err(err) = svc_task.process(&mut ex_mut).await {
64 tracing::error!(target="allora::service", service.ref=%ref_name_clone, error=%err, "service processing failed");
65 return;
66 }
67 let outbound = runtime_ref.channel::<crate::DirectChannel>(&to_id_clone);
68 if let Err(err) = outbound.send(ex_mut).await {
69 tracing::error!(target="allora::service", service.ref=%ref_name_clone, outbound.channel=outbound.id(), error=%err, "outbound send failed");
70 }
71 });
72 Ok(())
73 });
74 self.service = Some(arc);
75 Ok(())
76 }
77}