allora_runtime/
service_activator_processor.rs

1use crate::channel::Channel;
2use crate::dsl::runtime::AlloraRuntime;
3use crate::{error::Result, service::Service, spec::ServiceActivatorSpec, Exchange};
4use std::fmt::{Debug, Formatter, Result as FmtResult};
5use std::sync::Arc;
6
7/// Processor that binds a ServiceSpec (activator metadata) to service logic and wires channels.
8pub struct ServiceActivatorProcessor {
9    activator: ServiceActivatorSpec,
10    service: Option<Arc<dyn Service>>, // logic assigned after runtime build
11}
12
13impl Debug for ServiceActivatorProcessor {
14    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
15        f.debug_struct("ServiceActivatorProcessor")
16            .field("id", &self.activator.id())
17            .field("from", &self.activator.from())
18            .field("to", &self.activator.to())
19            .finish()
20    }
21}
22
23impl ServiceActivatorProcessor {
24    pub fn new(activator: ServiceActivatorSpec) -> Self {
25        Self {
26            activator,
27            service: None,
28        }
29    }
30    pub fn id(&self) -> &str {
31        self.activator.id().unwrap_or("")
32    }
33    pub fn from(&self) -> &str {
34        self.activator.from()
35    }
36    pub fn to(&self) -> &str {
37        self.activator.to()
38    }
39    pub fn ref_name(&self) -> &str {
40        self.activator.ref_name()
41    }
42    pub fn has_service(&self) -> bool {
43        self.service.is_some()
44    }
45    pub fn set_service_and_wire(
46        &mut self,
47        svc: impl Service + 'static,
48        runtime: &'static AlloraRuntime,
49    ) -> Result<()> {
50        let arc = Arc::new(svc);
51        let from_id = self.from().to_string();
52        let to_id = self.to().to_string();
53        let inbound = runtime.channel::<crate::DirectChannel>(&from_id);
54        let ref_name = self.ref_name().to_string();
55        let svc_clone = arc.clone();
56        inbound.subscribe(move |exchange: Exchange| {
57            let svc_task = svc_clone.clone();
58            let runtime_ref: &'static AlloraRuntime = runtime;
59            let to_id_clone = to_id.clone();
60            let ref_name_clone = ref_name.clone();
61            tokio::spawn(async move {
62                let mut ex_mut = exchange;
63                if let Err(err) = svc_task.process(&mut ex_mut).await {
64                    tracing::error!(target="allora::service", service.ref=%ref_name_clone, error=%err, "service processing failed");
65                    return;
66                }
67                let outbound = runtime_ref.channel::<crate::DirectChannel>(&to_id_clone);
68                if let Err(err) = outbound.send(ex_mut).await {
69                    tracing::error!(target="allora::service", service.ref=%ref_name_clone, outbound.channel=outbound.id(), error=%err, "outbound send failed");
70                }
71            });
72            Ok(())
73        });
74        self.service = Some(arc);
75        Ok(())
76    }
77}