eva_sdk/
controller.rs

1/// Controller methods and structures
2use eva_common::actions::{self, ACTION_TOPIC};
3use eva_common::events::{RawStateEventOwned, RAW_STATE_TOPIC};
4use eva_common::op::Op;
5use eva_common::prelude::*;
6use parking_lot::Mutex;
7use serde::Deserialize;
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10use ttl_cache::TtlCache;
11use uuid::Uuid;
12
13const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
14
15#[derive(Clone)]
16pub struct RawStateEventPreparedOwned {
17    state: RawStateEventOwned,
18    delta: Option<f64>,
19}
20
21impl RawStateEventPreparedOwned {
22    #[inline]
23    pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
24        Self { state, delta }
25    }
26    #[inline]
27    pub fn delta(&self) -> Option<f64> {
28        self.delta
29    }
30    #[inline]
31    pub fn state(&self) -> &RawStateEventOwned {
32        &self.state
33    }
34    #[inline]
35    pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
36        &mut self.state
37    }
38    pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
39        if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
40            if self.state.value == prev.value {
41                return false;
42            }
43            if let Some(delta_v) = self.delta {
44                if let ValueOptionOwned::Value(ref prev_value) = prev.value {
45                    if let ValueOptionOwned::Value(ref current_value) = self.state.value {
46                        if let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone()) {
47                            if let Ok(current_value_f) =
48                                TryInto::<f64>::try_into(current_value.clone())
49                            {
50                                if (current_value_f - prev_value_f).abs() < delta_v {
51                                    return false;
52                                }
53                            }
54                        }
55                    }
56                }
57            }
58        }
59        true
60    }
61}
62
63impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
64    fn from(s: RawStateEventPreparedOwned) -> Self {
65        s.state
66    }
67}
68
69pub struct RawStateCache {
70    cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
71    ttl: Option<Duration>,
72}
73
74impl RawStateCache {
75    pub fn new(ttl: Option<Duration>) -> Self {
76        Self {
77            cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
78            ttl,
79        }
80    }
81    /// returns true if no ttl set or the state is modified, false if the event is not required to
82    /// be sent
83    ///
84    /// # Panics
85    ///
86    /// Will panic if the mutex is poisoned
87    pub fn push_check(
88        &self,
89        oid: &OID,
90        raw_state: &RawStateEventOwned,
91        delta: Option<f64>,
92    ) -> bool {
93        if let Some(ttl) = self.ttl {
94            let mut cache = self.cache.lock();
95            if let Some(v) = cache.get(oid) {
96                if !v.is_modified(raw_state) {
97                    return false;
98                }
99            }
100            cache.insert(
101                oid.clone(),
102                RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
103                ttl,
104            );
105            true
106        } else {
107            false
108        }
109    }
110    /// Removes from the state hashmap, these who are not required to be sent, caches
111    /// remaining
112    ///
113    /// # Panics
114    ///
115    /// Will panic if the mutex is poisoned
116    pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
117        if let Some(ttl) = self.ttl {
118            let mut cache = self.cache.lock();
119            states.retain(|oid, raw| {
120                if let Some(cached) = cache.get(oid) {
121                    cached.is_modified(raw.state())
122                } else {
123                    true
124                }
125            });
126            // cache kept ones
127            for (oid, raw) in states {
128                cache.insert((*oid).clone(), raw.clone(), ttl);
129            }
130        }
131    }
132}
133
134#[path = "actt.rs"]
135pub mod actt;
136pub use eva_common::transform;
137
138pub const ERR_NO_PARAMS: &str = "action params not specified";
139
140#[inline]
141pub fn format_action_topic(oid: &OID) -> String {
142    format!("{}{}", ACTION_TOPIC, oid.as_path())
143}
144
145#[inline]
146pub fn format_raw_state_topic(oid: &OID) -> String {
147    format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
148}
149
150#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
151#[serde(rename_all = "lowercase")]
152pub enum ItemProp {
153    Status,
154    Value,
155}
156
157/// Controller action object
158#[derive(Deserialize, Debug)]
159#[serde(deny_unknown_fields)]
160pub struct Action {
161    uuid: Uuid,
162    i: OID,
163    #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
164    timeout: Duration,
165    priority: u8,
166    params: Option<actions::Params>,
167    config: Option<Value>,
168    #[serde(skip, default = "Instant::now")]
169    received: Instant,
170}
171
172impl Action {
173    pub fn event_pending(&self) -> actions::ActionEvent {
174        actions::ActionEvent {
175            uuid: self.uuid,
176            status: actions::Status::Pending as u8,
177            out: None,
178            err: None,
179            exitcode: None,
180        }
181    }
182    pub fn event_running(&self) -> actions::ActionEvent {
183        actions::ActionEvent {
184            uuid: self.uuid,
185            status: actions::Status::Running as u8,
186            out: None,
187            err: None,
188            exitcode: None,
189        }
190    }
191    pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
192        actions::ActionEvent {
193            uuid: self.uuid,
194            status: actions::Status::Completed as u8,
195            out,
196            err: None,
197            exitcode: Some(0),
198        }
199    }
200    pub fn event_failed(
201        &self,
202        exitcode: i16,
203        out: Option<Value>,
204        err: Option<Value>,
205    ) -> actions::ActionEvent {
206        actions::ActionEvent {
207            uuid: self.uuid,
208            status: actions::Status::Failed as u8,
209            out,
210            err,
211            exitcode: Some(exitcode),
212        }
213    }
214    pub fn event_canceled(&self) -> actions::ActionEvent {
215        actions::ActionEvent {
216            uuid: self.uuid,
217            status: actions::Status::Canceled as u8,
218            out: None,
219            err: None,
220            exitcode: None,
221        }
222    }
223    pub fn event_terminated(&self) -> actions::ActionEvent {
224        actions::ActionEvent {
225            uuid: self.uuid,
226            status: actions::Status::Terminated as u8,
227            out: None,
228            err: None,
229            exitcode: Some(-15),
230        }
231    }
232    #[inline]
233    pub fn uuid(&self) -> &Uuid {
234        &self.uuid
235    }
236    #[inline]
237    pub fn oid(&self) -> &OID {
238        &self.i
239    }
240    #[inline]
241    pub fn timeout(&self) -> Duration {
242        self.timeout
243    }
244    #[inline]
245    pub fn priority(&self) -> u8 {
246        self.priority
247    }
248    #[inline]
249    pub fn params(&self) -> Option<&actions::Params> {
250        self.params.as_ref()
251    }
252    #[inline]
253    pub fn take_params(&mut self) -> Option<actions::Params> {
254        self.params.take()
255    }
256    pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
257        if let Some(params) = self.params.take() {
258            match params {
259                eva_common::actions::Params::Unit(p) => Ok(p),
260                eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
261                    "can not exec lmacro action with unit",
262                )),
263            }
264        } else {
265            Err(Error::invalid_data(ERR_NO_PARAMS))
266        }
267    }
268    pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
269        if let Some(params) = self.params.take() {
270            match params {
271                eva_common::actions::Params::Lmacro(p) => Ok(p),
272                eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
273                    "can not exec unit action with lmacro",
274                )),
275            }
276        } else {
277            Err(Error::invalid_data(ERR_NO_PARAMS))
278        }
279    }
280    #[inline]
281    pub fn config(&self) -> Option<&Value> {
282        self.config.as_ref()
283    }
284    #[inline]
285    pub fn take_config(&mut self) -> Option<Value> {
286        self.config.take()
287    }
288    #[inline]
289    pub fn op(&self) -> Op {
290        Op::for_instant(self.received, self.timeout)
291    }
292}