1use eva_common::actions::{self, ACTION_TOPIC};
3use eva_common::events::{RawStateEventOwned, RAW_STATE_TOPIC};
4use eva_common::op::Op;
5use eva_common::prelude::*;
6use parking_lot::Mutex;
7use serde::Deserialize;
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10use ttl_cache::TtlCache;
11use uuid::Uuid;
12
13const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
14
15#[derive(Clone)]
16pub struct RawStateEventPreparedOwned {
17 state: RawStateEventOwned,
18 delta: Option<f64>,
19}
20
21impl RawStateEventPreparedOwned {
22 #[inline]
23 pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
24 Self { state, delta }
25 }
26 #[inline]
27 pub fn delta(&self) -> Option<f64> {
28 self.delta
29 }
30 #[inline]
31 pub fn state(&self) -> &RawStateEventOwned {
32 &self.state
33 }
34 #[inline]
35 pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
36 &mut self.state
37 }
38 pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
39 if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
40 if self.state.value == prev.value {
41 return false;
42 }
43 if let Some(delta_v) = self.delta {
44 if let ValueOptionOwned::Value(ref prev_value) = prev.value {
45 if let ValueOptionOwned::Value(ref current_value) = self.state.value {
46 if let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone()) {
47 if let Ok(current_value_f) =
48 TryInto::<f64>::try_into(current_value.clone())
49 {
50 if (current_value_f - prev_value_f).abs() < delta_v {
51 return false;
52 }
53 }
54 }
55 }
56 }
57 }
58 }
59 true
60 }
61}
62
63impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
64 fn from(s: RawStateEventPreparedOwned) -> Self {
65 s.state
66 }
67}
68
69pub struct RawStateCache {
70 cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
71 ttl: Option<Duration>,
72}
73
74impl RawStateCache {
75 pub fn new(ttl: Option<Duration>) -> Self {
76 Self {
77 cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
78 ttl,
79 }
80 }
81 pub fn push_check(
88 &self,
89 oid: &OID,
90 raw_state: &RawStateEventOwned,
91 delta: Option<f64>,
92 ) -> bool {
93 if let Some(ttl) = self.ttl {
94 let mut cache = self.cache.lock();
95 if let Some(v) = cache.get(oid) {
96 if !v.is_modified(raw_state) {
97 return false;
98 }
99 }
100 cache.insert(
101 oid.clone(),
102 RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
103 ttl,
104 );
105 true
106 } else {
107 false
108 }
109 }
110 pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
117 if let Some(ttl) = self.ttl {
118 let mut cache = self.cache.lock();
119 states.retain(|oid, raw| {
120 if let Some(cached) = cache.get(oid) {
121 cached.is_modified(raw.state())
122 } else {
123 true
124 }
125 });
126 for (oid, raw) in states {
128 cache.insert((*oid).clone(), raw.clone(), ttl);
129 }
130 }
131 }
132}
133
134#[path = "actt.rs"]
135pub mod actt;
136pub use eva_common::transform;
137
138pub const ERR_NO_PARAMS: &str = "action params not specified";
139
140#[inline]
141pub fn format_action_topic(oid: &OID) -> String {
142 format!("{}{}", ACTION_TOPIC, oid.as_path())
143}
144
145#[inline]
146pub fn format_raw_state_topic(oid: &OID) -> String {
147 format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
148}
149
150#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
151#[serde(rename_all = "lowercase")]
152pub enum ItemProp {
153 Status,
154 Value,
155}
156
157#[derive(Deserialize, Debug)]
159#[serde(deny_unknown_fields)]
160pub struct Action {
161 uuid: Uuid,
162 i: OID,
163 #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
164 timeout: Duration,
165 priority: u8,
166 params: Option<actions::Params>,
167 config: Option<Value>,
168 #[serde(skip, default = "Instant::now")]
169 received: Instant,
170}
171
172impl Action {
173 pub fn event_pending(&self) -> actions::ActionEvent {
174 actions::ActionEvent {
175 uuid: self.uuid,
176 status: actions::Status::Pending as u8,
177 out: None,
178 err: None,
179 exitcode: None,
180 }
181 }
182 pub fn event_running(&self) -> actions::ActionEvent {
183 actions::ActionEvent {
184 uuid: self.uuid,
185 status: actions::Status::Running as u8,
186 out: None,
187 err: None,
188 exitcode: None,
189 }
190 }
191 pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
192 actions::ActionEvent {
193 uuid: self.uuid,
194 status: actions::Status::Completed as u8,
195 out,
196 err: None,
197 exitcode: Some(0),
198 }
199 }
200 pub fn event_failed(
201 &self,
202 exitcode: i16,
203 out: Option<Value>,
204 err: Option<Value>,
205 ) -> actions::ActionEvent {
206 actions::ActionEvent {
207 uuid: self.uuid,
208 status: actions::Status::Failed as u8,
209 out,
210 err,
211 exitcode: Some(exitcode),
212 }
213 }
214 pub fn event_canceled(&self) -> actions::ActionEvent {
215 actions::ActionEvent {
216 uuid: self.uuid,
217 status: actions::Status::Canceled as u8,
218 out: None,
219 err: None,
220 exitcode: None,
221 }
222 }
223 pub fn event_terminated(&self) -> actions::ActionEvent {
224 actions::ActionEvent {
225 uuid: self.uuid,
226 status: actions::Status::Terminated as u8,
227 out: None,
228 err: None,
229 exitcode: Some(-15),
230 }
231 }
232 #[inline]
233 pub fn uuid(&self) -> &Uuid {
234 &self.uuid
235 }
236 #[inline]
237 pub fn oid(&self) -> &OID {
238 &self.i
239 }
240 #[inline]
241 pub fn timeout(&self) -> Duration {
242 self.timeout
243 }
244 #[inline]
245 pub fn priority(&self) -> u8 {
246 self.priority
247 }
248 #[inline]
249 pub fn params(&self) -> Option<&actions::Params> {
250 self.params.as_ref()
251 }
252 #[inline]
253 pub fn take_params(&mut self) -> Option<actions::Params> {
254 self.params.take()
255 }
256 pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
257 if let Some(params) = self.params.take() {
258 match params {
259 eva_common::actions::Params::Unit(p) => Ok(p),
260 eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
261 "can not exec lmacro action with unit",
262 )),
263 }
264 } else {
265 Err(Error::invalid_data(ERR_NO_PARAMS))
266 }
267 }
268 pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
269 if let Some(params) = self.params.take() {
270 match params {
271 eva_common::actions::Params::Lmacro(p) => Ok(p),
272 eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
273 "can not exec unit action with lmacro",
274 )),
275 }
276 } else {
277 Err(Error::invalid_data(ERR_NO_PARAMS))
278 }
279 }
280 #[inline]
281 pub fn config(&self) -> Option<&Value> {
282 self.config.as_ref()
283 }
284 #[inline]
285 pub fn take_config(&mut self) -> Option<Value> {
286 self.config.take()
287 }
288 #[inline]
289 pub fn op(&self) -> Op {
290 Op::for_instant(self.received, self.timeout)
291 }
292}