1use crate::service::{self, EventKind};
3use async_trait::async_trait;
4use busrt::QoS;
5use busrt::client::AsyncClient;
6use busrt::rpc::{Rpc, RpcClient, RpcEvent, RpcHandlers};
7use eva_common::acl::OIDMask;
8use eva_common::common_payloads::ParamsId;
9use eva_common::events::{RAW_STATE_TOPIC, RawStateEvent};
10use eva_common::payload::{pack, unpack};
11use eva_common::prelude::*;
12use eva_common::services::Initial;
13use eva_common::services::Registry;
14use log::error;
15use serde::{Deserialize, Deserializer, Serialize, Serializer, ser::SerializeSeq};
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::sync::{Arc, OnceLock};
19use std::time::Duration;
20use tokio::sync::Mutex;
21use uuid::Uuid;
22
23pub const AAA_REPORT_TOPIC: &str = "AAA/REPORT";
24
25static RPC: OnceLock<Arc<RpcClient>> = OnceLock::new();
26static RPC_SECONDARY: OnceLock<Arc<RpcClient>> = OnceLock::new();
27static REGISTRY: OnceLock<Arc<Registry>> = OnceLock::new();
28static CLIENT: OnceLock<Arc<Mutex<dyn AsyncClient>>> = OnceLock::new();
29static TIMEOUT: OnceLock<Duration> = OnceLock::new();
30
31pub enum LvarCommand<'a> {
32 Set {
33 status: ItemStatus,
34 value: &'a Value,
35 },
36 Reset,
37 Clear,
38 Toggle,
39 Increment,
40 Decrement,
41}
42
43impl LvarCommand<'_> {
44 pub fn as_str(&self) -> &'static str {
45 match self {
46 LvarCommand::Set { .. } => "lvar.set",
47 LvarCommand::Reset => "lvar.reset",
48 LvarCommand::Clear => "lvar.clear",
49 LvarCommand::Toggle => "lvar.toggle",
50 LvarCommand::Increment => "lvar.incr",
51 LvarCommand::Decrement => "lvar.decr",
52 }
53 }
54 pub async fn execute(&self, oid: &OID) -> EResult<i64> {
56 #[derive(Serialize)]
57 struct Payload<'a> {
58 i: &'a OID,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 status: Option<ItemStatus>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 value: Option<&'a Value>,
63 }
64 let payload = Payload {
65 i: oid,
66 status: match self {
67 LvarCommand::Set { status, .. } => Some(*status),
68 LvarCommand::Reset
69 | LvarCommand::Clear
70 | LvarCommand::Toggle
71 | LvarCommand::Increment
72 | LvarCommand::Decrement => None,
73 },
74 value: match self {
75 LvarCommand::Set { value, .. } => Some(value),
76 LvarCommand::Reset
77 | LvarCommand::Clear
78 | LvarCommand::Toggle
79 | LvarCommand::Increment
80 | LvarCommand::Decrement => None,
81 },
82 };
83 let res = call("eva.core", self.as_str(), pack(&payload)?.into()).await?;
84 match self {
85 LvarCommand::Set { .. }
86 | LvarCommand::Reset
87 | LvarCommand::Clear
88 | LvarCommand::Toggle => Ok(0),
89 LvarCommand::Increment | LvarCommand::Decrement => {
90 let value: i64 = unpack(res.payload())?;
91 Ok(value)
92 }
93 }
94 }
95}
96
97pub async fn lvar_set(oid: &OID, status: ItemStatus, value: &Value) -> EResult<()> {
98 LvarCommand::Set { status, value }.execute(oid).await?;
99 Ok(())
100}
101
102pub async fn lvar_reset(oid: &OID) -> EResult<()> {
103 LvarCommand::Reset.execute(oid).await?;
104 Ok(())
105}
106
107pub async fn lvar_clear(oid: &OID) -> EResult<()> {
108 LvarCommand::Clear.execute(oid).await?;
109 Ok(())
110}
111
112pub async fn lvar_toggle(oid: &OID) -> EResult<()> {
113 LvarCommand::Toggle.execute(oid).await?;
114 Ok(())
115}
116
117pub async fn lvar_increment(oid: &OID) -> EResult<i64> {
118 LvarCommand::Increment.execute(oid).await
119}
120
121pub async fn lvar_decrement(oid: &OID) -> EResult<i64> {
122 LvarCommand::Decrement.execute(oid).await
123}
124
125#[async_trait]
126pub trait ClientAccounting {
127 async fn report<'a, T>(&self, event: T) -> EResult<()>
128 where
129 T: TryInto<busrt::borrow::Cow<'a>> + Send;
130}
131
132#[async_trait]
133impl ClientAccounting for Arc<Mutex<dyn AsyncClient>> {
134 async fn report<'a, T>(&self, event: T) -> EResult<()>
138 where
139 T: TryInto<busrt::borrow::Cow<'a>> + Send,
140 {
141 let payload: busrt::borrow::Cow = event
142 .try_into()
143 .map_err(|_| Error::invalid_data("Unable to serialize accounting event"))?;
144 self.lock()
145 .await
146 .publish(AAA_REPORT_TOPIC, payload, QoS::Processed)
147 .await?;
148 Ok(())
149 }
150}
151
152#[allow(clippy::ref_option)]
153fn serialize_opt_uuid_as_seq<S>(uuid: &Option<Uuid>, serializer: S) -> Result<S::Ok, S::Error>
154where
155 S: Serializer,
156{
157 if let Some(u) = uuid {
158 let bytes = u.as_bytes();
159 let mut seq = serializer.serialize_seq(Some(bytes.len()))?;
160 for &byte in bytes {
161 seq.serialize_element(&byte)?;
162 }
163 seq.end()
164 } else {
165 serializer.serialize_none()
166 }
167}
168
169fn deserialize_opt_uuid<'de, D>(deserializer: D) -> Result<Option<Uuid>, D::Error>
170where
171 D: Deserializer<'de>,
172{
173 let val: Value = Deserialize::deserialize(deserializer)?;
174 if val == Value::Unit {
175 Ok(None)
176 } else {
177 Ok(Some(
178 Uuid::deserialize(val).map_err(serde::de::Error::custom)?,
179 ))
180 }
181}
182
183#[derive(Serialize, Deserialize, Default)]
184pub struct AccountingEvent<'a> {
185 #[serde(
187 default,
188 skip_serializing_if = "Option::is_none",
189 serialize_with = "serialize_opt_uuid_as_seq",
190 deserialize_with = "deserialize_opt_uuid"
191 )]
192 pub id: Option<Uuid>,
193 #[serde(skip_serializing_if = "Option::is_none")]
194 pub u: Option<&'a str>,
195 #[serde(skip_serializing_if = "Option::is_none")]
196 pub src: Option<&'a str>,
197 #[serde(skip_serializing_if = "Option::is_none")]
198 pub svc: Option<&'a str>,
199 #[serde(skip_serializing_if = "Option::is_none")]
200 pub subj: Option<&'a str>,
201 #[serde(skip_serializing_if = "Option::is_none")]
202 pub oid: Option<OID>,
203 #[serde(default, skip_serializing_if = "Value::is_unit")]
204 pub data: Value,
205 #[serde(skip_serializing_if = "Option::is_none")]
206 pub note: Option<&'a str>,
207 #[serde(default)]
208 pub code: i16,
209 #[serde(skip_serializing_if = "Option::is_none")]
210 pub err: Option<String>,
211}
212
213impl<'a> TryFrom<AccountingEvent<'a>> for busrt::borrow::Cow<'_> {
214 type Error = Error;
215 #[inline]
216 fn try_from(ev: AccountingEvent<'a>) -> EResult<Self> {
217 Ok(busrt::borrow::Cow::Owned(pack(&ev)?))
218 }
219}
220
221impl<'a> TryFrom<&AccountingEvent<'a>> for busrt::borrow::Cow<'_> {
222 type Error = Error;
223 #[inline]
224 fn try_from(ev: &AccountingEvent<'a>) -> EResult<Self> {
225 Ok(busrt::borrow::Cow::Owned(pack(&ev)?))
226 }
227}
228
229impl<'a> AccountingEvent<'a> {
230 #[inline]
231 pub fn new() -> Self {
232 Self::default()
233 }
234 #[inline]
235 pub fn user(mut self, user: &'a str) -> Self {
236 self.u.replace(user);
237 self
238 }
239 #[inline]
240 pub fn src(mut self, src: &'a str) -> Self {
241 self.u.replace(src);
242 self
243 }
244 #[inline]
245 pub fn svc(mut self, svc: &'a str) -> Self {
246 self.u.replace(svc);
247 self
248 }
249 #[inline]
250 pub fn subj(mut self, subj: &'a str) -> Self {
251 self.u.replace(subj);
252 self
253 }
254 #[inline]
255 pub fn data(mut self, data: Value) -> Self {
256 self.data = data;
257 self
258 }
259 #[inline]
260 pub fn note(mut self, note: &'a str) -> Self {
261 self.note.replace(note);
262 self
263 }
264 #[inline]
265 pub fn code(mut self, code: i16) -> Self {
266 self.code = code;
267 self
268 }
269 #[inline]
270 pub fn err(mut self, err: impl Into<String>) -> Self {
271 self.err.replace(err.into());
272 self
273 }
274 pub async fn report(&self) -> EResult<()> {
278 client().report(self).await
279 }
280}
281
282pub async fn init<H: RpcHandlers + Send + Sync + 'static>(
284 initial: &Initial,
285 handlers: H,
286) -> EResult<Arc<RpcClient>> {
287 let rpc = initial.init_rpc(handlers).await?;
288 set(rpc.clone(), initial.timeout())?;
289 let registry = initial.init_registry(&rpc);
290 set_registry(registry)?;
291 Ok(rpc)
292}
293
294pub async fn init_blocking<H: RpcHandlers + Send + Sync + 'static>(
296 initial: &Initial,
297 handlers: H,
298) -> EResult<(Arc<RpcClient>, Arc<RpcClient>)> {
299 let (rpc, rpc_secondary) = initial.init_rpc_blocking_with_secondary(handlers).await?;
300 set_blocking(rpc.clone(), rpc_secondary.clone(), initial.timeout())?;
301 let registry = initial.init_registry(&rpc);
302 set_registry(registry)?;
303 Ok((rpc, rpc_secondary))
304}
305
306pub fn set(rpc: Arc<RpcClient>, timeout: Duration) -> EResult<()> {
308 CLIENT
309 .set(rpc.client())
310 .map_err(|_| Error::core("Unable to set CLIENT"))?;
311 RPC.set(rpc).map_err(|_| Error::core("Unable to set RPC"))?;
312 TIMEOUT
313 .set(timeout)
314 .map_err(|_| Error::core("Unable to set TIMEOUT"))?;
315 Ok(())
316}
317
318pub fn set_blocking(
320 rpc: Arc<RpcClient>,
321 rpc_secondary: Arc<RpcClient>,
322 timeout: Duration,
323) -> EResult<()> {
324 set(rpc, timeout)?;
325 RPC_SECONDARY
326 .set(rpc_secondary)
327 .map_err(|_| Error::core("Unable to set RPC_SECONDARY"))?;
328 Ok(())
329}
330
331pub fn set_registry(registry: Registry) -> EResult<()> {
333 REGISTRY
334 .set(Arc::new(registry))
335 .map_err(|_| Error::core("Unable to set REGISTRY"))
336}
337
338pub async fn call0(target: &str, method: &str) -> EResult<RpcEvent> {
342 tokio::time::timeout(
343 timeout(),
344 rpc_secondary().call(target, method, busrt::empty_payload!(), QoS::Processed),
345 )
346 .await?
347 .map_err(Into::into)
348}
349
350pub async fn call(target: &str, method: &str, params: busrt::borrow::Cow<'_>) -> EResult<RpcEvent> {
354 tokio::time::timeout(
355 timeout(),
356 rpc_secondary().call(target, method, params, QoS::Processed),
357 )
358 .await?
359 .map_err(Into::into)
360}
361
362pub async fn call_with_timeout(
366 target: &str,
367 method: &str,
368 params: busrt::borrow::Cow<'_>,
369 timeout: Duration,
370) -> EResult<RpcEvent> {
371 tokio::time::timeout(
372 timeout,
373 rpc_secondary().call(target, method, params, QoS::Processed),
374 )
375 .await?
376 .map_err(Into::into)
377}
378
379#[inline]
383pub fn rpc() -> Arc<RpcClient> {
384 RPC.get().cloned().unwrap()
385}
386
387#[inline]
395pub fn rpc_secondary() -> Arc<RpcClient> {
396 if let Some(rpc) = RPC_SECONDARY.get() {
397 rpc.clone()
398 } else {
399 rpc()
400 }
401}
402
403#[inline]
407pub fn client() -> Arc<Mutex<dyn AsyncClient>> {
408 CLIENT.get().cloned().unwrap()
409}
410
411#[inline]
413pub fn timeout() -> Duration {
414 TIMEOUT
415 .get()
416 .copied()
417 .unwrap_or(eva_common::DEFAULT_TIMEOUT)
418}
419
420#[inline]
424pub fn registry() -> Arc<Registry> {
425 REGISTRY.get().cloned().unwrap()
426}
427
428#[inline]
433pub async fn subscribe(topic: &str) -> EResult<()> {
434 tokio::time::timeout(timeout(), subscribe_impl(topic)).await??;
435 Ok(())
436}
437#[inline]
438async fn subscribe_impl(topic: &str) -> EResult<()> {
439 let Some(op) = client()
440 .lock()
441 .await
442 .subscribe(topic, QoS::Processed)
443 .await?
444 else {
445 return Ok(());
446 };
447 op.await??;
448 Ok(())
449}
450
451#[inline]
456pub async fn subscribe_bulk(topics: &[&str]) -> EResult<()> {
457 tokio::time::timeout(timeout(), subscribe_bulk_impl(topics)).await??;
458 Ok(())
459}
460#[inline]
461async fn subscribe_bulk_impl(topics: &[&str]) -> EResult<()> {
462 let Some(op) = client()
463 .lock()
464 .await
465 .subscribe_bulk(topics, QoS::Processed)
466 .await?
467 else {
468 return Ok(());
469 };
470 op.await??;
471 Ok(())
472}
473
474#[inline]
475pub async fn publish_item_state(
476 oid: &OID,
477 status: ItemStatus,
478 value: Option<&Value>,
479) -> EResult<()> {
480 let ev = value.map_or_else(
481 || RawStateEvent::new0(status),
482 |v| RawStateEvent::new(status, v),
483 );
484 publish_item_state_event(oid, ev).await
485}
486
487#[inline]
488pub async fn publish_item_state_event(oid: &OID, ev: RawStateEvent<'_>) -> EResult<()> {
489 let topic = format!("{}{}", RAW_STATE_TOPIC, oid.as_path());
490 publish(&topic, pack(&ev)?.into()).await?;
491 Ok(())
492}
493
494#[inline]
498pub async fn publish(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
499 tokio::time::timeout(timeout(), publish_impl(topic, payload)).await??;
500 Ok(())
501}
502#[inline]
503async fn publish_impl(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
504 let Some(op) = client()
505 .lock()
506 .await
507 .publish(topic, payload, QoS::No)
508 .await?
509 else {
510 return Ok(());
511 };
512 op.await??;
513 Ok(())
514}
515
516#[inline]
520pub async fn publish_confirmed(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
521 tokio::time::timeout(timeout(), publish_confirmed_impl(topic, payload)).await??;
522 Ok(())
523}
524#[inline]
525async fn publish_confirmed_impl(topic: &str, payload: busrt::borrow::Cow<'_>) -> EResult<()> {
526 let Some(op) = client()
527 .lock()
528 .await
529 .publish(topic, payload, QoS::Processed)
530 .await?
531 else {
532 return Ok(());
533 };
534 op.await??;
535 Ok(())
536}
537
538#[inline]
542pub async fn subscribe_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
543where
544 M: IntoIterator<Item = &'a OIDMask>,
545{
546 tokio::time::timeout(timeout(), subscribe_oids_impl(masks, kind)).await??;
547 Ok(())
548}
549#[inline]
550async fn subscribe_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
551where
552 M: IntoIterator<Item = &'a OIDMask>,
553{
554 service::subscribe_oids(rpc().as_ref(), masks, kind).await
555}
556
557#[inline]
561pub async fn unsubscribe_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
562where
563 M: IntoIterator<Item = &'a OIDMask>,
564{
565 tokio::time::timeout(timeout(), unsubscribe_oids_impl(masks, kind)).await??;
566 Ok(())
567}
568#[inline]
569async fn unsubscribe_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
570where
571 M: IntoIterator<Item = &'a OIDMask>,
572{
573 service::unsubscribe_oids(rpc().as_ref(), masks, kind).await
574}
575
576#[inline]
583pub async fn request_announce<'a, M>(masks: M, kind: EventKind) -> EResult<()>
584where
585 M: IntoIterator<Item = &'a OIDMask>,
586{
587 #[derive(Serialize)]
588 struct Payload<'a> {
589 i: Vec<&'a OIDMask>,
590 src: Option<&'a str>,
591 broadcast: bool,
592 }
593 if !service::svc_is_core_active(rpc().as_ref(), timeout()).await {
594 return Ok(());
596 }
597 let payload = Payload {
598 i: masks.into_iter().collect(),
599 src: match kind {
600 EventKind::Actual | EventKind::Any => None,
601 EventKind::Local => Some(".local"),
602 EventKind::Remote => Some(".remote-any"),
603 },
604 broadcast: false,
605 };
606 call("eva.core", "item.announce", pack(&payload)?.into()).await?;
607 Ok(())
608}
609
610#[inline]
614pub async fn exclude_oids<'a, M>(masks: M, kind: EventKind) -> EResult<()>
615where
616 M: IntoIterator<Item = &'a OIDMask>,
617{
618 tokio::time::timeout(timeout(), exclude_oids_impl(masks, kind)).await??;
619 Ok(())
620}
621#[inline]
622async fn exclude_oids_impl<'a, M>(masks: M, kind: EventKind) -> EResult<()>
623where
624 M: IntoIterator<Item = &'a OIDMask>,
625{
626 service::exclude_oids(rpc().as_ref(), masks, kind).await
627}
628
629#[inline]
636pub async fn wait_core(wait_forever: bool) -> EResult<bool> {
637 service::svc_wait_core(rpc().as_ref(), timeout(), wait_forever).await
638}
639
640#[inline]
644pub fn init_logs(initial: &Initial) -> EResult<()> {
645 service::svc_init_logs(initial, client())
646}
647
648pub async fn run() -> EResult<()> {
654 mark_ready().await?;
655 block().await;
656 mark_terminating().await?;
657 Ok(())
658}
659
660#[inline]
664pub async fn mark_ready() -> EResult<()> {
665 service::svc_mark_ready(&client()).await
666}
667
668#[inline]
672pub async fn mark_terminating() -> EResult<()> {
673 service::svc_mark_terminating(&client()).await
674}
675
676pub fn set_bus_error_suicide_timeout(bes_timeout: Duration) -> EResult<()> {
678 service::set_bus_error_suicide_timeout(bes_timeout)
679}
680
681#[inline]
693pub async fn block() {
694 if let Some(secondary) = RPC_SECONDARY.get() {
695 service::svc_block2(rpc().as_ref(), secondary).await;
696 } else {
697 service::svc_block(rpc().as_ref()).await;
698 }
699}
700
701pub async fn create_items<O: AsRef<OID>>(oids: &[O]) -> EResult<()> {
709 for oid in oids {
710 let payload = ParamsId {
711 i: oid.as_ref().as_str(),
712 };
713 if let Err(e) = call("eva.core", "item.create", pack(&payload)?.into()).await
714 && e.kind() != ErrorKind::ResourceAlreadyExists
715 {
716 return Err(e);
717 }
718 }
719 Ok(())
720}
721pub async fn deploy_items<T: Serialize>(items: &T) -> EResult<()> {
754 #[derive(Serialize)]
755 struct Payload<'a, T: Serialize> {
756 items: &'a T,
757 }
758 call("eva.core", "item.deploy", pack(&Payload { items })?.into()).await?;
759 Ok(())
760}
761
762pub async fn undeploy_items<T: Serialize>(items: &T) -> EResult<()> {
763 #[derive(Serialize)]
764 struct Payload<'a, T: Serialize> {
765 items: &'a T,
766 }
767 call(
768 "eva.core",
769 "item.undeploy",
770 pack(&Payload { items })?.into(),
771 )
772 .await?;
773 Ok(())
774}
775
776#[derive(Serialize, Debug, Clone)]
777pub struct ParamsRunLmacro {
778 #[serde(skip_serializing_if = "Vec::is_empty")]
779 args: Vec<Value>,
780 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
781 kwargs: BTreeMap<String, Value>,
782 #[serde(skip_serializing)]
783 wait: Duration,
784 #[serde(skip_serializing)]
785 timeout_if_not_finished: bool,
786}
787
788impl Default for ParamsRunLmacro {
789 fn default() -> Self {
790 Self::new()
791 }
792}
793
794impl ParamsRunLmacro {
795 pub fn new() -> Self {
796 Self {
797 args: <_>::default(),
798 kwargs: BTreeMap::new(),
799 wait: eva_common::DEFAULT_TIMEOUT,
800 timeout_if_not_finished: true,
801 }
802 }
803 pub fn arg<V: Serialize>(mut self, arg: V) -> EResult<Self> {
804 self.args.push(to_value(arg)?);
805 Ok(self)
806 }
807 pub fn args<V, I>(mut self, args: I) -> EResult<Self>
808 where
809 V: Serialize,
810 I: IntoIterator<Item = V>,
811 {
812 for arg in args {
813 self.args.push(to_value(arg)?);
814 }
815 Ok(self)
816 }
817 pub fn kwarg<V: Serialize>(mut self, key: impl Into<String>, value: V) -> EResult<Self> {
818 self.kwargs.insert(key.into(), to_value(value)?);
819 Ok(self)
820 }
821 pub fn kwargs<V, I>(mut self, kwargs: I) -> EResult<Self>
822 where
823 V: Serialize,
824 I: IntoIterator<Item = (String, V)>,
825 {
826 for (key, value) in kwargs {
827 self.kwargs.insert(key, to_value(value)?);
828 }
829 Ok(self)
830 }
831 pub fn wait(mut self, wait: Duration) -> Self {
832 self.wait = wait;
833 self
834 }
835 pub fn allow_unfinished(mut self) -> Self {
837 self.timeout_if_not_finished = false;
838 self
839 }
840}
841
842#[derive(Serialize, Debug, Clone)]
843pub struct ParamsUnitAction {
844 value: Value,
845 #[serde(skip_serializing)]
846 wait: Duration,
847 #[serde(skip_serializing)]
848 timeout_if_not_finished: bool,
849}
850
851impl ParamsUnitAction {
852 pub fn new(value: Value) -> Self {
853 Self {
854 value,
855 wait: eva_common::DEFAULT_TIMEOUT,
856 timeout_if_not_finished: true,
857 }
858 }
859 pub fn wait(mut self, wait: Duration) -> Self {
860 self.wait = wait;
861 self
862 }
863 pub fn allow_unfinished(mut self) -> Self {
865 self.timeout_if_not_finished = false;
866 self
867 }
868}
869
870#[derive(Deserialize, Debug, Clone)]
871struct ActionState {
872 exitcode: Option<i16>,
873 #[serde(default)]
874 finished: bool,
875 #[serde(default)]
876 out: Value,
877 #[serde(default)]
878 err: Value,
879}
880
881pub async fn unit_action(i: &OID, params: &ParamsUnitAction) -> EResult<Value> {
882 #[derive(Serialize)]
883 struct Params<'a> {
884 i: &'a OID,
885 #[allow(clippy::struct_field_names)]
886 params: &'a ParamsUnitAction,
887 #[serde(serialize_with = "eva_common::tools::serialize_duration_as_f64")]
888 wait: Duration,
889 }
890 let p = Params {
891 i,
892 params,
893 wait: params.wait,
894 };
895 let payload = pack(&p)?;
896 let recommended_timeout = params.wait + Duration::from_millis(500);
897 let timeout = timeout().max(recommended_timeout);
898 let res: ActionState = unpack(
899 call_with_timeout("eva.core", "action", payload.into(), timeout)
900 .await?
901 .payload(),
902 )?;
903 if (!res.finished || res.exitcode.is_none()) && params.timeout_if_not_finished {
904 return Err(Error::timeout());
905 }
906 if res.exitcode.is_some_and(|code| code != 0) {
907 return Err(Error::failed(res.err.to_string()));
908 }
909 Ok(res.out)
910}
911
912pub async fn run_lmacro(i: &OID, params: &ParamsRunLmacro) -> EResult<Value> {
913 #[derive(Serialize)]
914 struct Params<'a> {
915 i: &'a OID,
916 #[allow(clippy::struct_field_names)]
917 params: &'a ParamsRunLmacro,
918 #[serde(serialize_with = "eva_common::tools::serialize_duration_as_f64")]
919 wait: Duration,
920 }
921 let p = Params {
922 i,
923 params,
924 wait: params.wait,
925 };
926 let payload = pack(&p)?;
927 let recommended_timeout = params.wait + Duration::from_millis(500);
928 let timeout = timeout().max(recommended_timeout);
929 let res: ActionState = unpack(
930 call_with_timeout("eva.core", "run", payload.into(), timeout)
931 .await?
932 .payload(),
933 )?;
934 if (!res.finished || res.exitcode.is_none()) && params.timeout_if_not_finished {
935 return Err(Error::timeout());
936 }
937 if let Some(code) = res.exitcode
938 && code != 0
939 {
940 return Err(Error::failed(res.err.to_string()));
941 }
942 Ok(res.out)
943}
944
945pub fn spawn_when_ready<F>(future: F)
947where
948 F: Future + Send + 'static,
949 F::Output: Send + 'static,
950{
951 tokio::spawn(async move {
952 if let Err(e) = wait_core(true).await {
953 error!("Failed to wait for core: {}", e);
954 return;
955 }
956 future.await;
957 });
958}