eva_sdk/
eapi_bus.rs

1//! Helper module for EAPI micro-services
2use crate::service::{self, EventKind};
3use async_trait::async_trait;
4use busrt::QoS;
5use busrt::client::AsyncClient;
6use busrt::rpc::{Rpc, RpcClient, RpcEvent, RpcHandlers};
7use eva_common::acl::OIDMask;
8use eva_common::common_payloads::ParamsId;
9use eva_common::events::{RAW_STATE_TOPIC, RawStateEvent};
10use eva_common::payload::{pack, unpack};
11use eva_common::prelude::*;
12use eva_common::services::Initial;
13use eva_common::services::Registry;
14use log::error;
15use serde::{Deserialize, Deserializer, Serialize, Serializer, ser::SerializeSeq};
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::sync::{Arc, OnceLock};
19use std::time::Duration;
20use tokio::sync::Mutex;
21use uuid::Uuid;
22
23pub const AAA_REPORT_TOPIC: &str = "AAA/REPORT";
24
25static RPC: OnceLock<Arc<RpcClient>> = OnceLock::new();
26static RPC_SECONDARY: OnceLock<Arc<RpcClient>> = OnceLock::new();
27static REGISTRY: OnceLock<Arc<Registry>> = OnceLock::new();
28static CLIENT: OnceLock<Arc<Mutex<dyn AsyncClient>>> = OnceLock::new();
29static TIMEOUT: OnceLock<Duration> = OnceLock::new();
30
31pub enum LvarCommand<'a> {
32    Set {
33        status: ItemStatus,
34        value: &'a Value,
35    },
36    Reset,
37    Clear,
38    Toggle,
39    Increment,
40    Decrement,
41}
42
43impl LvarCommand<'_> {
44    pub fn as_str(&self) -> &'static str {
45        match self {
46            LvarCommand::Set { .. } => "lvar.set",
47            LvarCommand::Reset => "lvar.reset",
48            LvarCommand::Clear => "lvar.clear",
49            LvarCommand::Toggle => "lvar.toggle",
50            LvarCommand::Increment => "lvar.incr",
51            LvarCommand::Decrement => "lvar.decr",
52        }
53    }
54    /// returns new lvar value for increment/decrement, zero for others
55    pub async fn execute(&self, oid: &OID) -> EResult<i64> {
56        #[derive(Serialize)]
57        struct Payload<'a> {
58            i: &'a OID,
59            #[serde(skip_serializing_if = "Option::is_none")]
60            status: Option<ItemStatus>,
61            #[serde(skip_serializing_if = "Option::is_none")]
62            value: Option<&'a Value>,
63        }
64        let payload = Payload {
65            i: oid,
66            status: match self {
67                LvarCommand::Set { status, .. } => Some(*status),
68                LvarCommand::Reset
69                | LvarCommand::Clear
70                | LvarCommand::Toggle
71                | LvarCommand::Increment
72                | LvarCommand::Decrement => None,
73            },
74            value: match self {
75                LvarCommand::Set { value, .. } => Some(value),
76                LvarCommand::Reset
77                | LvarCommand::Clear
78                | LvarCommand::Toggle
79                | LvarCommand::Increment
80                | LvarCommand::Decrement => None,
81            },
82        };
83        let res = call("eva.core", self.as_str(), pack(&payload)?.into()).await?;
84        match self {
85            LvarCommand::Set { .. }
86            | LvarCommand::Reset
87            | LvarCommand::Clear
88            | LvarCommand::Toggle => Ok(0),
89            LvarCommand::Increment | LvarCommand::Decrement => {
90                let value: i64 = unpack(res.payload())?;
91                Ok(value)
92            }
93        }
94    }
95}
96
97pub async fn lvar_set(oid: &OID, status: ItemStatus, value: &Value) -> EResult<()> {
98    LvarCommand::Set { status, value }.execute(oid).await?;
99    Ok(())
100}
101
102pub async fn lvar_reset(oid: &OID) -> EResult<()> {
103    LvarCommand::Reset.execute(oid).await?;
104    Ok(())
105}
106
107pub async fn lvar_clear(oid: &OID) -> EResult<()> {
108    LvarCommand::Clear.execute(oid).await?;
109    Ok(())
110}
111
112pub async fn lvar_toggle(oid: &OID) -> EResult<()> {
113    LvarCommand::Toggle.execute(oid).await?;
114    Ok(())
115}
116
117pub async fn lvar_increment(oid: &OID) -> EResult<i64> {
118    LvarCommand::Increment.execute(oid).await
119}
120
121pub async fn lvar_decrement(oid: &OID) -> EResult<i64> {
122    LvarCommand::Decrement.execute(oid).await
123}
124
125#[async_trait]
126pub trait ClientAccounting {
127    async fn report<'a, T>(&self, event: T) -> EResult<()>
128    where
129        T: TryInto<busrt::borrow::Cow<'a>> + Send;
130}
131
132#[async_trait]
133impl ClientAccounting for Arc<Mutex<dyn AsyncClient>> {
134    /// # Panics
135    ///
136    /// Will panic if RPC not set
137    async fn report<'a, T>(&self, event: T) -> EResult<()>
138    where
139        T: TryInto<busrt::borrow::Cow<'a>> + Send,
140    {
141        let payload: busrt::borrow::Cow = event
142            .try_into()
143            .map_err(|_| Error::invalid_data("Unable to serialize accounting event"))?;
144        self.lock()
145            .await
146            .publish(AAA_REPORT_TOPIC, payload, QoS::Processed)
147            .await?;
148        Ok(())
149    }
150}
151
152#[allow(clippy::ref_option)]
153fn serialize_opt_uuid_as_seq<S>(uuid: &Option<Uuid>, serializer: S) -> Result<S::Ok, S::Error>
154where
155    S: Serializer,
156{
157    if let Some(u) = uuid {
158        let bytes = u.as_bytes();
159        let mut seq = serializer.serialize_seq(Some(bytes.len()))?;
160        for &byte in bytes {
161            seq.serialize_element(&byte)?;
162        }
163        seq.end()
164    } else {
165        serializer.serialize_none()
166    }
167}
168
169fn deserialize_opt_uuid<'de, D>(deserializer: D) -> Result<Option<Uuid>, D::Error>
170where
171    D: Deserializer<'de>,
172{
173    let val: Value = Deserialize::deserialize(deserializer)?;
174    if val == Value::Unit {
175        Ok(None)
176    } else {
177        Ok(Some(
178            Uuid::deserialize(val).map_err(serde::de::Error::custom)?,
179        ))
180    }
181}
182
183#[derive(Serialize, Deserialize, Default)]
184pub struct AccountingEvent<'a> {
185    // the ID is usually assigned by the accounting service and should be None
186    #[serde(
187        default,
188        skip_serializing_if = "Option::is_none",
189        serialize_with = "serialize_opt_uuid_as_seq",
190        deserialize_with = "deserialize_opt_uuid"
191    )]
192    pub id: Option<Uuid>,
193    #[serde(skip_serializing_if = "Option::is_none")]
194    pub u: Option<&'a str>,
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub src: Option<&'a str>,
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub svc: Option<&'a str>,
199    #[serde(skip_serializing_if = "Option::is_none")]
200    pub subj: Option<&'a str>,
201    #[serde(skip_serializing_if = "Option::is_none")]
202    pub oid: Option<OID>,
203    #[serde(default, skip_serializing_if = "Value::is_unit")]
204    pub data: Value,
205    #[serde(skip_serializing_if = "Option::is_none")]
206    pub note: Option<&'a str>,
207    #[serde(default)]
208    pub code: i16,
209    #[serde(skip_serializing_if = "Option::is_none")]
210    pub err: Option<String>,
211}
212
213impl<'a> TryFrom<AccountingEvent<'a>> for busrt::borrow::Cow<'_> {
214    type Error = Error;
215    #[inline]
216    fn try_from(ev: AccountingEvent<'a>) -> EResult<Self> {
217        Ok(busrt::borrow::Cow::Owned(pack(&ev)?))
218    }
219}
220
221impl<'a> TryFrom<&AccountingEvent<'a>> for busrt::borrow::Cow<'_> {
222    type Error = Error;
223    #[inline]
224    fn try_from(ev: &AccountingEvent<'a>) -> EResult<Self> {
225        Ok(busrt::borrow::Cow::Owned(pack(&ev)?))
226    }
227}
228
229impl<'a> AccountingEvent<'a> {
230    #[inline]
231    pub fn new() -> Self {
232        Self::default()
233    }
234    #[inline]
235    pub fn user(mut self, user: &'a str) -> Self {
236        self.u.replace(user);
237        self
238    }
239    #[inline]
240    pub fn src(mut self, src: &'a str) -> Self {
241        self.u.replace(src);
242        self
243    }
244    #[inline]
245    pub fn svc(mut self, svc: &'a str) -> Self {
246        self.u.replace(svc);
247        self
248    }
249    #[inline]
250    pub fn subj(mut self, subj: &'a str) -> Self {
251        self.u.replace(subj);
252        self
253    }
254    #[inline]
255    pub fn data(mut self, data: Value) -> Self {
256        self.data = data;
257        self
258    }
259    #[inline]
260    pub fn note(mut self, note: &'a str) -> Self {
261        self.note.replace(note);
262        self
263    }
264    #[inline]
265    pub fn code(mut self, code: i16) -> Self {
266        self.code = code;
267        self
268    }
269    #[inline]
270    pub fn err(mut self, err: impl Into<String>) -> Self {
271        self.err.replace(err.into());
272        self
273    }
274    /// # Panics
275    ///
276    /// Will panic if module RPC not set
277    pub async fn report(&self) -> EResult<()> {
278        client().report(self).await
279    }
280}
281
282/// Initializes the module
283pub async fn init<H: RpcHandlers + Send + Sync + 'static>(
284    initial: &Initial,
285    handlers: H,
286) -> EResult<Arc<RpcClient>> {
287    let rpc = initial.init_rpc(handlers).await?;
288    set(rpc.clone(), initial.timeout())?;
289    let registry = initial.init_registry(&rpc);
290    set_registry(registry)?;
291    Ok(rpc)
292}
293
294/// Initializes the module in blocking mode with secondary client for calls
295pub async fn init_blocking<H: RpcHandlers + Send + Sync + 'static>(
296    initial: &Initial,
297    handlers: H,
298) -> EResult<(Arc<RpcClient>, Arc<RpcClient>)> {
299    let (rpc, rpc_secondary) = initial.init_rpc_blocking_with_secondary(handlers).await?;
300    set_blocking(rpc.clone(), rpc_secondary.clone(), initial.timeout())?;
301    let registry = initial.init_registry(&rpc);
302    set_registry(registry)?;
303    Ok((rpc, rpc_secondary))
304}
305
306/// Manually initialize the module
307pub fn set(rpc: Arc<RpcClient>, timeout: Duration) -> EResult<()> {
308    CLIENT
309        .set(rpc.client())
310        .map_err(|_| Error::core("Unable to set CLIENT"))?;
311    RPC.set(rpc).map_err(|_| Error::core("Unable to set RPC"))?;
312    TIMEOUT
313        .set(timeout)
314        .map_err(|_| Error::core("Unable to set TIMEOUT"))?;
315    Ok(())
316}
317
318/// Manually initialize the module
319pub fn set_blocking(
320    rpc: Arc<RpcClient>,
321    rpc_secondary: Arc<RpcClient>,
322    timeout: Duration,
323) -> EResult<()> {
324    set(rpc, timeout)?;
325    RPC_SECONDARY
326        .set(rpc_secondary)
327        .map_err(|_| Error::core("Unable to set RPC_SECONDARY"))?;
328    Ok(())
329}
330
331/// Manually initialize registry
332pub fn set_registry(registry: Registry) -> EResult<()> {
333    REGISTRY
334        .set(Arc::new(registry))
335        .map_err(|_| Error::core("Unable to set REGISTRY"))
336}
337
338/// # Panics
339///
340/// Will panic if RPC not set
341pub async fn call0(target: &str, method: &str) -> EResult<RpcEvent> {
342    tokio::time::timeout(
343        timeout(),
344        rpc_secondary().call(target, method, busrt::empty_payload!(), QoS::Processed),
345    )
346    .await?
347    .map_err(Into::into)
348}
349
350/// # Panics
351///
352/// Will panic if RPC not set
353pub async fn call(target: &str, method: &str, params: busrt::borrow::Cow<'_>) -> EResult<RpcEvent> {
354    tokio::time::timeout(
355        timeout(),
356        rpc_secondary().call(target, method, params, QoS::Processed),
357    )
358    .await?
359    .map_err(Into::into)
360}
361
362/// # Panics
363///
364/// Will panic if RPC not set
365pub async fn call_with_timeout(
366    target: &str,
367    method: &str,
368    params: busrt::borrow::Cow<'_>,
369    timeout: Duration,
370) -> EResult<RpcEvent> {
371    tokio::time::timeout(
372        timeout,
373        rpc_secondary().call(target, method, params, QoS::Processed),
374    )
375    .await?
376    .map_err(Into::into)
377}
378
379/// # Panics
380///
381/// Will panic if RPC not set
382#[inline]
383pub fn rpc() -> Arc<RpcClient> {
384    RPC.get().cloned().unwrap()
385}
386
387/// Returns secondary client if initialized, otherwise fallbacks to the primary
388///
389/// Must be used for RPC calls when the primary client works in blocking mode
390///
391/// # Panics
392///
393/// Will panic if RPC not set
394#[inline]
395pub fn rpc_secondary() -> Arc<RpcClient> {
396    if let Some(rpc) = RPC_SECONDARY.get() {
397        rpc.clone()
398    } else {
399        rpc()
400    }
401}
402
403/// # Panics
404///
405/// Will panic if RPC not set
406#[inline]
407pub fn client() -> Arc<Mutex<dyn AsyncClient>> {
408    CLIENT.get().cloned().unwrap()
409}
410
411/// Will return the default EVA ICS timeout (5 sec) if not set
412#[inline]
413pub fn timeout() -> Duration {
414    TIMEOUT
415        .get()
416        .copied()
417        .unwrap_or(eva_common::DEFAULT_TIMEOUT)
418}
419
420/// # Panics
421///
422/// Will panic if REGISTRY not set
423#[inline]
424pub fn registry() -> Arc<Registry> {
425    REGISTRY.get().cloned().unwrap()
426}
427
428///
429/// # Panics
430///
431/// Will panic if RPC not set
432#[inline]
433pub async fn subscribe(topic: &str) -> EResult<()> {
434    tokio::time::timeout(timeout(), subscribe_impl(topic)).await??;
435    Ok(())
436}
437#[inline]
438async fn subscribe_impl(topic: &str) -> EResult<()> {
439    let Some(op) = client()
440        .lock()
441        .await
442        .subscribe(topic, QoS::Processed)
443        .await?
444    else {
445        return Ok(());
446    };
447    op.await??;
448    Ok(())
449}
450
451///
452/// # Panics
453///
454/// Will panic if RPC not set
455#[inline]
456pub async fn subscribe_bulk(topics: &[&str]) -> EResult<()> {
457    tokio::time::timeout(timeout(), subscribe_bulk_impl(topics)).await??;
458    Ok(())
459}
460#[inline]
461async fn subscribe_bulk_impl(topics: &[&str]) -> EResult<()> {
462    let Some(op) = client()
463        .lock()
464        .await
465        .subscribe_bulk(topics, QoS::Processed)
466        .await?
467    else {
468        return Ok(());
469    };
470    op.await??;
471    Ok(())
472}
473
474#[inline]
475pub async fn publish_item_state(
476    oid: &OID,
477    status: ItemStatus,
478    value: Option<&Value>,
479) -> EResult<()> {
480    let ev = value.map_or_else(
481        || RawStateEvent::new0(status),
482        |v| RawStateEvent::new(status, v),
483    );
484    publish_item_state_event(oid, ev).await
485}
486
487#[inline]
488pub async fn publish_item_state_event(oid: &OID, ev: RawStateEvent<'_>) -> EResult<()> {
489    let topic = format!("{}{}", RAW_STATE_TOPIC, oid.as_path());
490    publish(&topic, pack(&ev)?.into()).await?;
491    Ok(())
492}
493
494/// # Panics
495///
496/// Will panic if RPC not set
497#[inline]
498pub async fn publish(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
499    tokio::time::timeout(timeout(), publish_impl(topic, payload)).await??;
500    Ok(())
501}
502#[inline]
503async fn publish_impl(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
504    let Some(op) = client()
505        .lock()
506        .await
507        .publish(topic, payload, QoS::No)
508        .await?
509    else {
510        return Ok(());
511    };
512    op.await??;
513    Ok(())
514}
515
516/// # Panics
517///
518/// Will panic if RPC not set
519#[inline]
520pub async fn publish_confirmed(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
521    tokio::time::timeout(timeout(), publish_confirmed_impl(topic, payload)).await??;
522    Ok(())
523}
524#[inline]
525async fn publish_confirmed_impl(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
526    let Some(op) = client()
527        .lock()
528        .await
529        .publish(topic, payload, QoS::Processed)
530        .await?
531    else {
532        return Ok(());
533    };
534    op.await??;
535    Ok(())
536}
537
538/// # Panics
539///
540/// Will panic if RPC not set
541#[inline]
542pub async fn subscribe_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
543where
544    M: IntoIterator<Item = &'a OIDMask>,
545{
546    tokio::time::timeout(timeout(), subscribe_oids_impl(masks, kind)).await??;
547    Ok(())
548}
549#[inline]
550async fn subscribe_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
551where
552    M: IntoIterator<Item = &'a OIDMask>,
553{
554    service::subscribe_oids(rpc().as_ref(), masks, kind).await
555}
556
557/// # Panics
558///
559/// Will panic if RPC not set
560#[inline]
561pub async fn unsubscribe_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
562where
563    M: IntoIterator<Item = &'a OIDMask>,
564{
565    tokio::time::timeout(timeout(), unsubscribe_oids_impl(masks, kind)).await??;
566    Ok(())
567}
568#[inline]
569async fn unsubscribe_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
570where
571    M: IntoIterator<Item = &'a OIDMask>,
572{
573    service::unsubscribe_oids(rpc().as_ref(), masks, kind).await
574}
575
576/// Request announce for the specific items, the core will send states to item state topics,
577/// exclusively for the current service. As the method locks the core inventory for updates, it
578/// should be used with caution
579///
580/// The method will return immediately if the core is inactive as all the states will be
581/// automaticaly announced when the node goes to ready state
582#[inline]
583pub async fn request_announce<'a, M>(masks: M, kind: EventKind) -> EResult<()>
584where
585    M: IntoIterator<Item = &'a OIDMask>,
586{
587    #[derive(Serialize)]
588    struct Payload<'a> {
589        i: Vec<&'a OIDMask>,
590        src: Option<&'a str>,
591        broadcast: bool,
592    }
593    if !service::svc_is_core_active(rpc().as_ref(), timeout()).await {
594        // the core is inactive, no need to announce
595        return Ok(());
596    }
597    let payload = Payload {
598        i: masks.into_iter().collect(),
599        src: match kind {
600            EventKind::Actual | EventKind::Any => None,
601            EventKind::Local => Some(".local"),
602            EventKind::Remote => Some(".remote-any"),
603        },
604        broadcast: false,
605    };
606    call("eva.core", "item.announce", pack(&payload)?.into()).await?;
607    Ok(())
608}
609
610/// # Panics
611///
612/// Will panic if RPC not set
613#[inline]
614pub async fn exclude_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
615where
616    M: IntoIterator<Item = &'a OIDMask>,
617{
618    tokio::time::timeout(timeout(), exclude_oids_impl(masks, kind)).await??;
619    Ok(())
620}
621#[inline]
622async fn exclude_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
623where
624    M: IntoIterator<Item = &'a OIDMask>,
625{
626    service::exclude_oids(rpc().as_ref(), masks, kind).await
627}
628
629/// Returns true if the core was inactive (not ready) and the service has been waiting for it,
630/// false if the core was already active
631///
632/// # Panics
633///
634/// Will panic if RPC not set
635#[inline]
636pub async fn wait_core(wait_forever: bool) -> EResult<bool> {
637    service::svc_wait_core(rpc().as_ref(), timeout(), wait_forever).await
638}
639
640/// # Panics
641///
642/// Will panic if RPC not set
643#[inline]
644pub fn init_logs(initial: &Initial) -> EResult<()> {
645    service::svc_init_logs(initial, client())
646}
647
648/// calls mark_ready, block and mark_terminating
649///
650/// # Panics
651///
652/// Will panic if RPC not set
653pub async fn run() -> EResult<()> {
654    mark_ready().await?;
655    block().await;
656    mark_terminating().await?;
657    Ok(())
658}
659
660/// # Panics
661///
662/// Will panic if RPC not set
663#[inline]
664pub async fn mark_ready() -> EResult<()> {
665    service::svc_mark_ready(&client()).await
666}
667
668/// # Panics
669///
670/// Will panic if RPC not set
671#[inline]
672pub async fn mark_terminating() -> EResult<()> {
673    service::svc_mark_terminating(&client()).await
674}
675
676/// Must be called once
677pub fn set_bus_error_suicide_timeout(bes_timeout: Duration) -> EResult<()> {
678    service::set_bus_error_suicide_timeout(bes_timeout)
679}
680
681/// Blocks the service while active
682///
683/// In case if the local bus connection is dropped, the service is terminated immediately, as well
684/// as all its subprocesses
685///
686/// This behaviour can be changed by calling set_bus_error_suicide_timeout method and specifying a
687/// proper required shutdown timeout until the service is killed
688///
689/// # Panics
690///
691/// Will panic if RPC not set
692#[inline]
693pub async fn block() {
694    if let Some(secondary) = RPC_SECONDARY.get() {
695        service::svc_block2(rpc().as_ref(), secondary).await;
696    } else {
697        service::svc_block(rpc().as_ref()).await;
698    }
699}
700
701/// Creates items, ignores errors if an item already exists
702///
703/// Must be called after the node core is ready
704///
705/// # Panics
706///
707/// Will panic if RPC not set
708pub async fn create_items<O: AsRef<OID>>(oids: &[O]) -> EResult<()> {
709    for oid in oids {
710        let payload = ParamsId {
711            i: oid.as_ref().as_str(),
712        };
713        if let Err(e) = call("eva.core", "item.create", pack(&payload)?.into()).await
714            && e.kind() != ErrorKind::ResourceAlreadyExists
715        {
716            return Err(e);
717        }
718    }
719    Ok(())
720}
721///
722/// Deploys items
723///
724/// Must be called after the node core is ready
725///
726/// The parameter must contain a list of item deployment payloads is equal to item.deploy
727/// eva.core EAPI call
728/// See also https://info.bma.ai/en/actual/eva4/iac.html#items
729///
730/// The parameter MUST be a collection: either inside the payload, or a list of payloads in
731/// a vector/slice etc.
732///
733/// Example:
734///
735/// ```rust,ignore
736/// let me = initial.id().to_owned();
737/// tokio::spawn(async move {
738///   let _ = eapi_bus::wait_core(true).await;
739///     let x: OID = "lmacro:aaa".parse().unwrap();
740///     let payload = serde_json::json! {[
741///        {
742///          "oid": x,
743///           "action": {"svc": me }
744///        }
745///        ]};
746///        let result = eapi_bus::deploy_items(&payload).await;
747/// });
748/// ```
749///
750/// # Panics
751///
752/// Will panic if RPC not set
753pub async fn deploy_items<T: Serialize>(items: &T) -> EResult<()> {
754    #[derive(Serialize)]
755    struct Payload<'a, T: Serialize> {
756        items: &'a T,
757    }
758    call("eva.core", "item.deploy", pack(&Payload { items })?.into()).await?;
759    Ok(())
760}
761
762pub async fn undeploy_items<T: Serialize>(items: &T) -> EResult<()> {
763    #[derive(Serialize)]
764    struct Payload<'a, T: Serialize> {
765        items: &'a T,
766    }
767    call(
768        "eva.core",
769        "item.undeploy",
770        pack(&Payload { items })?.into(),
771    )
772    .await?;
773    Ok(())
774}
775
776#[derive(Serialize, Debug, Clone)]
777pub struct ParamsRunLmacro {
778    #[serde(skip_serializing_if = "Vec::is_empty")]
779    args: Vec<Value>,
780    #[serde(skip_serializing_if = "BTreeMap::is_empty")]
781    kwargs: BTreeMap<String, Value>,
782    #[serde(skip_serializing)]
783    wait: Duration,
784    #[serde(skip_serializing)]
785    timeout_if_not_finished: bool,
786}
787
788impl Default for ParamsRunLmacro {
789    fn default() -> Self {
790        Self::new()
791    }
792}
793
794impl ParamsRunLmacro {
795    pub fn new() -> Self {
796        Self {
797            args: <_>::default(),
798            kwargs: BTreeMap::new(),
799            wait: eva_common::DEFAULT_TIMEOUT,
800            timeout_if_not_finished: true,
801        }
802    }
803    pub fn arg<V: Serialize>(mut self, arg: V) -> EResult<Self> {
804        self.args.push(to_value(arg)?);
805        Ok(self)
806    }
807    pub fn args<V, I>(mut self, args: I) -> EResult<Self>
808    where
809        V: Serialize,
810        I: IntoIterator<Item = V>,
811    {
812        for arg in args {
813            self.args.push(to_value(arg)?);
814        }
815        Ok(self)
816    }
817    pub fn kwarg<V: Serialize>(mut self, key: impl Into<String>, value: V) -> EResult<Self> {
818        self.kwargs.insert(key.into(), to_value(value)?);
819        Ok(self)
820    }
821    pub fn kwargs<V, I>(mut self, kwargs: I) -> EResult<Self>
822    where
823        V: Serialize,
824        I: IntoIterator<Item = (String, V)>,
825    {
826        for (key, value) in kwargs {
827            self.kwargs.insert(key, to_value(value)?);
828        }
829        Ok(self)
830    }
831    pub fn wait(mut self, wait: Duration) -> Self {
832        self.wait = wait;
833        self
834    }
835    /// Does not return an error if the lmacro action is not finished
836    pub fn allow_unfinished(mut self) -> Self {
837        self.timeout_if_not_finished = false;
838        self
839    }
840}
841
842#[derive(Serialize, Debug, Clone)]
843pub struct ParamsUnitAction {
844    value: Value,
845    #[serde(skip_serializing)]
846    wait: Duration,
847    #[serde(skip_serializing)]
848    timeout_if_not_finished: bool,
849}
850
851impl ParamsUnitAction {
852    pub fn new(value: Value) -> Self {
853        Self {
854            value,
855            wait: eva_common::DEFAULT_TIMEOUT,
856            timeout_if_not_finished: true,
857        }
858    }
859    pub fn wait(mut self, wait: Duration) -> Self {
860        self.wait = wait;
861        self
862    }
863    /// Does not return an error if the unit action is not finished
864    pub fn allow_unfinished(mut self) -> Self {
865        self.timeout_if_not_finished = false;
866        self
867    }
868}
869
870#[derive(Deserialize, Debug, Clone)]
871struct ActionState {
872    exitcode: Option<i16>,
873    #[serde(default)]
874    finished: bool,
875    #[serde(default)]
876    out: Value,
877    #[serde(default)]
878    err: Value,
879}
880
881pub async fn unit_action(i: &OID, params: &ParamsUnitAction) -> EResult<Value> {
882    #[derive(Serialize)]
883    struct Params<'a> {
884        i: &'a OID,
885        #[allow(clippy::struct_field_names)]
886        params: &'a ParamsUnitAction,
887        #[serde(serialize_with = "eva_common::tools::serialize_duration_as_f64")]
888        wait: Duration,
889    }
890    let p = Params {
891        i,
892        params,
893        wait: params.wait,
894    };
895    let payload = pack(&p)?;
896    let recommended_timeout = params.wait + Duration::from_millis(500);
897    let timeout = timeout().max(recommended_timeout);
898    let res: ActionState = unpack(
899        call_with_timeout("eva.core", "action", payload.into(), timeout)
900            .await?
901            .payload(),
902    )?;
903    if (!res.finished || res.exitcode.is_none()) && params.timeout_if_not_finished {
904        return Err(Error::timeout());
905    }
906    if res.exitcode.is_some_and(|code| code != 0) {
907        return Err(Error::failed(res.err.to_string()));
908    }
909    Ok(res.out)
910}
911
912pub async fn run_lmacro(i: &OID, params: &ParamsRunLmacro) -> EResult<Value> {
913    #[derive(Serialize)]
914    struct Params<'a> {
915        i: &'a OID,
916        #[allow(clippy::struct_field_names)]
917        params: &'a ParamsRunLmacro,
918        #[serde(serialize_with = "eva_common::tools::serialize_duration_as_f64")]
919        wait: Duration,
920    }
921    let p = Params {
922        i,
923        params,
924        wait: params.wait,
925    };
926    let payload = pack(&p)?;
927    let recommended_timeout = params.wait + Duration::from_millis(500);
928    let timeout = timeout().max(recommended_timeout);
929    let res: ActionState = unpack(
930        call_with_timeout("eva.core", "run", payload.into(), timeout)
931            .await?
932            .payload(),
933    )?;
934    if (!res.finished || res.exitcode.is_none()) && params.timeout_if_not_finished {
935        return Err(Error::timeout());
936    }
937    if let Some(code) = res.exitcode
938        && code != 0
939    {
940        return Err(Error::failed(res.err.to_string()));
941    }
942    Ok(res.out)
943}
944
945/// Spawns a future which is executed after the node core is ready
946pub fn spawn_when_ready<F>(future: F)
947where
948    F: Future + Send + 'static,
949    F::Output: Send + 'static,
950{
951    tokio::spawn(async move {
952        if let Err(e) = wait_core(true).await {
953            error!("Failed to wait for core: {}", e);
954            return;
955        }
956        future.await;
957    });
958}