eva_sdk/
controller.rs

1use crate::eapi_bus;
2/// Controller methods and structures
3use eva_common::actions::{self, ACTION_TOPIC};
4use eva_common::events::{RAW_STATE_TOPIC, RawStateEventOwned};
5use eva_common::op::Op;
6use eva_common::payload::pack;
7use eva_common::prelude::*;
8use parking_lot::Mutex;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::time::{Duration, Instant};
12use ttl_cache::TtlCache;
13use uuid::Uuid;
14
15const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
16
17#[derive(Clone)]
18pub struct RawStateEventPreparedOwned {
19    state: RawStateEventOwned,
20    delta: Option<f64>,
21}
22
23impl RawStateEventPreparedOwned {
24    #[inline]
25    pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
26        Self { state, delta }
27    }
28    #[inline]
29    pub fn delta(&self) -> Option<f64> {
30        self.delta
31    }
32    #[inline]
33    pub fn state(&self) -> &RawStateEventOwned {
34        &self.state
35    }
36    #[inline]
37    pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
38        &mut self.state
39    }
40    pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
41        if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
42            if self.state.value == prev.value {
43                return false;
44            }
45            if let Some(delta_v) = self.delta {
46                if let ValueOptionOwned::Value(ref prev_value) = prev.value {
47                    if let ValueOptionOwned::Value(ref current_value) = self.state.value {
48                        if let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone()) {
49                            if let Ok(current_value_f) =
50                                TryInto::<f64>::try_into(current_value.clone())
51                            {
52                                if (current_value_f - prev_value_f).abs() < delta_v {
53                                    return false;
54                                }
55                            }
56                        }
57                    }
58                }
59            }
60        }
61        true
62    }
63}
64
65impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
66    fn from(s: RawStateEventPreparedOwned) -> Self {
67        s.state
68    }
69}
70
71pub struct RawStateCache {
72    cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
73    ttl: Option<Duration>,
74}
75
76impl RawStateCache {
77    pub fn new(ttl: Option<Duration>) -> Self {
78        Self {
79            cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
80            ttl,
81        }
82    }
83    /// returns true if no ttl set or the state is modified, false if the event is not required to
84    /// be sent
85    ///
86    /// # Panics
87    ///
88    /// Will panic if the mutex is poisoned
89    pub fn push_check(
90        &self,
91        oid: &OID,
92        raw_state: &RawStateEventOwned,
93        delta: Option<f64>,
94    ) -> bool {
95        if let Some(ttl) = self.ttl {
96            let mut cache = self.cache.lock();
97            if let Some(v) = cache.get(oid) {
98                if !v.is_modified(raw_state) {
99                    return false;
100                }
101            }
102            cache.insert(
103                oid.clone(),
104                RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
105                ttl,
106            );
107            true
108        } else {
109            false
110        }
111    }
112    /// Removes from the state hashmap, these who are not required to be sent, caches
113    /// remaining
114    ///
115    /// # Panics
116    ///
117    /// Will panic if the mutex is poisoned
118    pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
119        if let Some(ttl) = self.ttl {
120            let mut cache = self.cache.lock();
121            states.retain(|oid, raw| {
122                if let Some(cached) = cache.get(oid) {
123                    cached.is_modified(raw.state())
124                } else {
125                    true
126                }
127            });
128            // cache kept ones
129            for (oid, raw) in states {
130                cache.insert((*oid).clone(), raw.clone(), ttl);
131            }
132        }
133    }
134}
135
136#[path = "actt.rs"]
137pub mod actt;
138pub use eva_common::transform;
139
140pub const ERR_NO_PARAMS: &str = "action params not specified";
141
142#[inline]
143pub fn format_action_topic(oid: &OID) -> String {
144    format!("{}{}", ACTION_TOPIC, oid.as_path())
145}
146
147#[inline]
148pub fn format_raw_state_topic(oid: &OID) -> String {
149    format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
150}
151
152#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
153#[serde(rename_all = "lowercase")]
154pub enum ItemProp {
155    Status,
156    Value,
157}
158
159/// Controller action object
160#[derive(Deserialize, Debug)]
161#[serde(deny_unknown_fields)]
162pub struct Action {
163    uuid: Uuid,
164    i: OID,
165    #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
166    timeout: Duration,
167    priority: u8,
168    params: Option<actions::Params>,
169    config: Option<Value>,
170    #[serde(skip, default = "Instant::now")]
171    received: Instant,
172}
173
174impl Action {
175    pub async fn publish_event_pending(&self) -> EResult<()> {
176        let event = self.event_pending();
177        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
178        Ok(())
179    }
180    pub async fn publish_event_running(&self) -> EResult<()> {
181        let event = self.event_running();
182        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
183        Ok(())
184    }
185    pub async fn publish_event_completed(&self, out: Option<Value>) -> EResult<()> {
186        let event = self.event_completed(out);
187        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
188        Ok(())
189    }
190    pub async fn publish_event_failed(
191        &self,
192        exitcode: i16,
193        out: Option<Value>,
194        err: Option<Value>,
195    ) -> EResult<()> {
196        let event = self.event_failed(exitcode, out, err);
197        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
198        Ok(())
199    }
200    pub async fn publish_event_canceled(&self) -> EResult<()> {
201        let event = self.event_canceled();
202        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
203        Ok(())
204    }
205    pub async fn publish_event_terminated(&self) -> EResult<()> {
206        let event = self.event_terminated();
207        eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
208        Ok(())
209    }
210    pub fn event_pending(&self) -> actions::ActionEvent {
211        actions::ActionEvent {
212            uuid: self.uuid,
213            status: actions::Status::Pending as u8,
214            out: None,
215            err: None,
216            exitcode: None,
217        }
218    }
219    pub fn event_running(&self) -> actions::ActionEvent {
220        actions::ActionEvent {
221            uuid: self.uuid,
222            status: actions::Status::Running as u8,
223            out: None,
224            err: None,
225            exitcode: None,
226        }
227    }
228    pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
229        actions::ActionEvent {
230            uuid: self.uuid,
231            status: actions::Status::Completed as u8,
232            out,
233            err: None,
234            exitcode: Some(0),
235        }
236    }
237    pub fn event_failed(
238        &self,
239        exitcode: i16,
240        out: Option<Value>,
241        err: Option<Value>,
242    ) -> actions::ActionEvent {
243        actions::ActionEvent {
244            uuid: self.uuid,
245            status: actions::Status::Failed as u8,
246            out,
247            err,
248            exitcode: Some(exitcode),
249        }
250    }
251    pub fn event_canceled(&self) -> actions::ActionEvent {
252        actions::ActionEvent {
253            uuid: self.uuid,
254            status: actions::Status::Canceled as u8,
255            out: None,
256            err: None,
257            exitcode: None,
258        }
259    }
260    pub fn event_terminated(&self) -> actions::ActionEvent {
261        actions::ActionEvent {
262            uuid: self.uuid,
263            status: actions::Status::Terminated as u8,
264            out: None,
265            err: None,
266            exitcode: Some(-15),
267        }
268    }
269    #[inline]
270    pub fn uuid(&self) -> &Uuid {
271        &self.uuid
272    }
273    #[inline]
274    pub fn oid(&self) -> &OID {
275        &self.i
276    }
277    #[inline]
278    pub fn timeout(&self) -> Duration {
279        self.timeout
280    }
281    #[inline]
282    pub fn priority(&self) -> u8 {
283        self.priority
284    }
285    #[inline]
286    pub fn params(&self) -> Option<&actions::Params> {
287        self.params.as_ref()
288    }
289    #[inline]
290    pub fn take_params(&mut self) -> Option<actions::Params> {
291        self.params.take()
292    }
293    pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
294        if let Some(params) = self.params.take() {
295            match params {
296                eva_common::actions::Params::Unit(p) => Ok(p),
297                eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
298                    "can not exec lmacro action with unit",
299                )),
300            }
301        } else {
302            Err(Error::invalid_data(ERR_NO_PARAMS))
303        }
304    }
305    pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
306        if let Some(params) = self.params.take() {
307            match params {
308                eva_common::actions::Params::Lmacro(p) => Ok(p),
309                eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
310                    "can not exec unit action with lmacro",
311                )),
312            }
313        } else {
314            Err(Error::invalid_data(ERR_NO_PARAMS))
315        }
316    }
317    #[inline]
318    pub fn config(&self) -> Option<&Value> {
319        self.config.as_ref()
320    }
321    #[inline]
322    pub fn take_config(&mut self) -> Option<Value> {
323        self.config.take()
324    }
325    #[inline]
326    pub fn op(&self) -> Op {
327        Op::for_instant(self.received, self.timeout)
328    }
329}