eva_sdk/
controller.rs

1use crate::eapi_bus;
2/// Controller methods and structures
3use eva_common::actions::{self, ACTION_TOPIC};
4use eva_common::events::{RAW_STATE_TOPIC, RawStateEventOwned};
5use eva_common::op::Op;
6use eva_common::payload::pack;
7use eva_common::prelude::*;
8use parking_lot::Mutex;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use ttl_cache::TtlCache;
16use uuid::Uuid;
17
18eva_common::err_logger!();
19
20const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
21
22#[derive(Clone)]
23pub struct RawStateEventPreparedOwned {
24    state: RawStateEventOwned,
25    delta: Option<f64>,
26}
27
28impl RawStateEventPreparedOwned {
29    #[inline]
30    pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
31        Self { state, delta }
32    }
33    #[inline]
34    pub fn delta(&self) -> Option<f64> {
35        self.delta
36    }
37    #[inline]
38    pub fn state(&self) -> &RawStateEventOwned {
39        &self.state
40    }
41    #[inline]
42    pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
43        &mut self.state
44    }
45    pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
46        if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
47            if self.state.value == prev.value {
48                return false;
49            }
50            if let Some(delta_v) = self.delta
51                && let ValueOptionOwned::Value(ref prev_value) = prev.value
52                && let ValueOptionOwned::Value(ref current_value) = self.state.value
53                && let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone())
54                && let Ok(current_value_f) = TryInto::<f64>::try_into(current_value.clone())
55                && (current_value_f - prev_value_f).abs() < delta_v
56            {
57                return false;
58            }
59        }
60        true
61    }
62}
63
64impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
65    fn from(s: RawStateEventPreparedOwned) -> Self {
66        s.state
67    }
68}
69
70pub struct RawStateCache {
71    cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
72    ttl: Option<Duration>,
73}
74
75impl RawStateCache {
76    pub fn new(ttl: Option<Duration>) -> Self {
77        Self {
78            cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
79            ttl,
80        }
81    }
82    /// returns true if no ttl set or the state is modified, false if the event is not required to
83    /// be sent
84    ///
85    /// # Panics
86    ///
87    /// Will panic if the mutex is poisoned
88    pub fn push_check(
89        &self,
90        oid: &OID,
91        raw_state: &RawStateEventOwned,
92        delta: Option<f64>,
93    ) -> bool {
94        if let Some(ttl) = self.ttl {
95            let mut cache = self.cache.lock();
96            if let Some(v) = cache.get(oid)
97                && !v.is_modified(raw_state)
98            {
99                return false;
100            }
101            cache.insert(
102                oid.clone(),
103                RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
104                ttl,
105            );
106            true
107        } else {
108            false
109        }
110    }
111    /// Removes from the state hashmap, these who are not required to be sent, caches
112    /// remaining
113    ///
114    /// # Panics
115    ///
116    /// Will panic if the mutex is poisoned
117    pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
118        if let Some(ttl) = self.ttl {
119            let mut cache = self.cache.lock();
120            states.retain(|oid, raw| {
121                if let Some(cached) = cache.get(oid) {
122                    cached.is_modified(raw.state())
123                } else {
124                    true
125                }
126            });
127            // cache kept ones
128            for (oid, raw) in states {
129                cache.insert((*oid).clone(), raw.clone(), ttl);
130            }
131        }
132    }
133}
134
135#[path = "actt.rs"]
136pub mod actt;
137pub use eva_common::transform;
138
139pub const ERR_NO_PARAMS: &str = "action params not specified";
140
141#[inline]
142pub fn format_action_topic(oid: &OID) -> String {
143    format!("{}{}", ACTION_TOPIC, oid.as_path())
144}
145
146#[inline]
147pub fn format_raw_state_topic(oid: &OID) -> String {
148    format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
149}
150
151#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
152#[serde(rename_all = "lowercase")]
153pub enum ItemProp {
154    Status,
155    Value,
156}
157
158/// Controller action object
159#[derive(Deserialize, Debug)]
160#[serde(deny_unknown_fields)]
161pub struct Action {
162    uuid: Uuid,
163    i: OID,
164    #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
165    timeout: Duration,
166    priority: u8,
167    params: Option<actions::Params>,
168    config: Option<Value>,
169    #[serde(skip, default = "Instant::now")]
170    received: Instant,
171}
172
173impl Action {
174    pub async fn publish_event_pending(&self) -> EResult<()> {
175        let event = self.event_pending();
176        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
177        Ok(())
178    }
179    pub async fn publish_event_running(&self) -> EResult<()> {
180        let event = self.event_running();
181        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
182        Ok(())
183    }
184    pub async fn publish_event_completed(&self, out: Option<Value>) -> EResult<()> {
185        let event = self.event_completed(out);
186        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
187        Ok(())
188    }
189    pub async fn publish_event_failed(
190        &self,
191        exitcode: i16,
192        out: Option<Value>,
193        err: Option<Value>,
194    ) -> EResult<()> {
195        let event = self.event_failed(exitcode, out, err);
196        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
197        Ok(())
198    }
199    pub async fn publish_event_canceled(&self) -> EResult<()> {
200        let event = self.event_canceled();
201        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
202        Ok(())
203    }
204    pub async fn publish_event_terminated(&self) -> EResult<()> {
205        let event = self.event_terminated();
206        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
207        Ok(())
208    }
209    pub fn event_pending(&self) -> actions::ActionEvent {
210        actions::ActionEvent {
211            uuid: self.uuid,
212            status: actions::Status::Pending as u8,
213            out: None,
214            err: None,
215            exitcode: None,
216        }
217    }
218    pub fn event_running(&self) -> actions::ActionEvent {
219        actions::ActionEvent {
220            uuid: self.uuid,
221            status: actions::Status::Running as u8,
222            out: None,
223            err: None,
224            exitcode: None,
225        }
226    }
227    pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
228        actions::ActionEvent {
229            uuid: self.uuid,
230            status: actions::Status::Completed as u8,
231            out,
232            err: None,
233            exitcode: Some(0),
234        }
235    }
236    pub fn event_failed(
237        &self,
238        exitcode: i16,
239        out: Option<Value>,
240        err: Option<Value>,
241    ) -> actions::ActionEvent {
242        actions::ActionEvent {
243            uuid: self.uuid,
244            status: actions::Status::Failed as u8,
245            out,
246            err,
247            exitcode: Some(exitcode),
248        }
249    }
250    pub fn event_canceled(&self) -> actions::ActionEvent {
251        actions::ActionEvent {
252            uuid: self.uuid,
253            status: actions::Status::Canceled as u8,
254            out: None,
255            err: None,
256            exitcode: None,
257        }
258    }
259    pub fn event_terminated(&self) -> actions::ActionEvent {
260        actions::ActionEvent {
261            uuid: self.uuid,
262            status: actions::Status::Terminated as u8,
263            out: None,
264            err: None,
265            exitcode: Some(-15),
266        }
267    }
268    #[inline]
269    pub fn uuid(&self) -> &Uuid {
270        &self.uuid
271    }
272    #[inline]
273    pub fn oid(&self) -> &OID {
274        &self.i
275    }
276    #[inline]
277    pub fn timeout(&self) -> Duration {
278        self.timeout
279    }
280    #[inline]
281    pub fn priority(&self) -> u8 {
282        self.priority
283    }
284    #[inline]
285    pub fn params(&self) -> Option<&actions::Params> {
286        self.params.as_ref()
287    }
288    #[inline]
289    pub fn take_params(&mut self) -> Option<actions::Params> {
290        self.params.take()
291    }
292    pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
293        if let Some(params) = self.params.take() {
294            match params {
295                eva_common::actions::Params::Unit(p) => Ok(p),
296                eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
297                    "can not exec lmacro action with unit",
298                )),
299            }
300        } else {
301            Err(Error::invalid_data(ERR_NO_PARAMS))
302        }
303    }
304    pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
305        if let Some(params) = self.params.take() {
306            match params {
307                eva_common::actions::Params::Lmacro(p) => Ok(p),
308                eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
309                    "can not exec unit action with lmacro",
310                )),
311            }
312        } else {
313            Err(Error::invalid_data(ERR_NO_PARAMS))
314        }
315    }
316    #[inline]
317    pub fn config(&self) -> Option<&Value> {
318        self.config.as_ref()
319    }
320    #[inline]
321    pub fn take_config(&mut self) -> Option<Value> {
322        self.config.take()
323    }
324    #[inline]
325    pub fn op(&self) -> Op {
326        Op::for_instant(self.received, self.timeout)
327    }
328}
329
330type LmacroExecutorFn = Box<
331    dyn Fn(actions::LmacroParams) -> Pin<Box<dyn Future<Output = EResult<Value>> + Send>>
332        + Send
333        + Sync,
334>;
335
336pub struct LmacroProcessor {
337    entries: Arc<HashMap<OID, LmacroExecutorFn>>,
338}
339
340impl Clone for LmacroProcessor {
341    fn clone(&self) -> Self {
342        Self {
343            entries: self.entries.clone(),
344        }
345    }
346}
347
348impl LmacroProcessor {
349    pub fn builder() -> LmacroProcessorBuilder {
350        LmacroProcessorBuilder::new()
351    }
352    pub async fn execute(&self, mut action: Action) {
353        action.publish_event_pending().await.log_ef();
354        let Ok(params) = action.take_lmacro_params() else {
355            action
356                .publish_event_failed(
357                    -1,
358                    None,
359                    Some(Value::String("failed to parse lmacro params".to_owned())),
360                )
361                .await
362                .log_ef();
363            return;
364        };
365        let Some(lmx) = self.entries.get(action.oid()) else {
366            action
367                .publish_event_failed(
368                    -1,
369                    None,
370                    Some(Value::String(format!(
371                        "lmacro not found in executor: {}",
372                        action.oid()
373                    ))),
374                )
375                .await
376                .log_ef();
377            return;
378        };
379        let timeout = action.timeout();
380        action.publish_event_running().await.log_ef();
381        match tokio::time::timeout(timeout, lmx(params)).await {
382            Ok(Ok(v)) => {
383                action.publish_event_completed(Some(v)).await.log_ef();
384            }
385            Ok(Err(e)) => {
386                action
387                    .publish_event_failed(e.code(), None, Some(Value::String(e.to_string())))
388                    .await
389                    .log_ef();
390            }
391            Err(_) => {
392                action
393                    .publish_event_failed(
394                        ErrorKind::Timeout as i16,
395                        None,
396                        Some(Value::String("action timeout".to_owned())),
397                    )
398                    .await
399                    .log_ef();
400            }
401        }
402    }
403}
404
405#[derive(Default)]
406pub struct LmacroProcessorBuilder {
407    entries: HashMap<OID, LmacroExecutorFn>,
408}
409
410impl LmacroProcessorBuilder {
411    pub fn new() -> Self {
412        Self::default()
413    }
414    pub fn register<F, Fut>(&mut self, oid: OID, f: F)
415    where
416        F: Fn(actions::LmacroParams) -> Fut + Send + Sync + 'static,
417        Fut: Future<Output = EResult<Value>> + Send + 'static,
418    {
419        self.entries
420            .insert(oid, Box::new(move |params| Box::pin(f(params))));
421    }
422    pub fn build(self) -> LmacroProcessor {
423        LmacroProcessor {
424            entries: Arc::new(self.entries),
425        }
426    }
427}