1use crate::eapi_bus;
2use eva_common::actions::{self, ACTION_TOPIC};
4use eva_common::events::{RAW_STATE_TOPIC, RawStateEventOwned};
5use eva_common::op::Op;
6use eva_common::payload::pack;
7use eva_common::prelude::*;
8use parking_lot::Mutex;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::time::{Duration, Instant};
12use ttl_cache::TtlCache;
13use uuid::Uuid;
14
15const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
16
17#[derive(Clone)]
18pub struct RawStateEventPreparedOwned {
19 state: RawStateEventOwned,
20 delta: Option<f64>,
21}
22
23impl RawStateEventPreparedOwned {
24 #[inline]
25 pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
26 Self { state, delta }
27 }
28 #[inline]
29 pub fn delta(&self) -> Option<f64> {
30 self.delta
31 }
32 #[inline]
33 pub fn state(&self) -> &RawStateEventOwned {
34 &self.state
35 }
36 #[inline]
37 pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
38 &mut self.state
39 }
40 pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
41 if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
42 if self.state.value == prev.value {
43 return false;
44 }
45 if let Some(delta_v) = self.delta {
46 if let ValueOptionOwned::Value(ref prev_value) = prev.value {
47 if let ValueOptionOwned::Value(ref current_value) = self.state.value {
48 if let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone()) {
49 if let Ok(current_value_f) =
50 TryInto::<f64>::try_into(current_value.clone())
51 {
52 if (current_value_f - prev_value_f).abs() < delta_v {
53 return false;
54 }
55 }
56 }
57 }
58 }
59 }
60 }
61 true
62 }
63}
64
65impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
66 fn from(s: RawStateEventPreparedOwned) -> Self {
67 s.state
68 }
69}
70
71pub struct RawStateCache {
72 cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
73 ttl: Option<Duration>,
74}
75
76impl RawStateCache {
77 pub fn new(ttl: Option<Duration>) -> Self {
78 Self {
79 cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
80 ttl,
81 }
82 }
83 pub fn push_check(
90 &self,
91 oid: &OID,
92 raw_state: &RawStateEventOwned,
93 delta: Option<f64>,
94 ) -> bool {
95 if let Some(ttl) = self.ttl {
96 let mut cache = self.cache.lock();
97 if let Some(v) = cache.get(oid) {
98 if !v.is_modified(raw_state) {
99 return false;
100 }
101 }
102 cache.insert(
103 oid.clone(),
104 RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
105 ttl,
106 );
107 true
108 } else {
109 false
110 }
111 }
112 pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
119 if let Some(ttl) = self.ttl {
120 let mut cache = self.cache.lock();
121 states.retain(|oid, raw| {
122 if let Some(cached) = cache.get(oid) {
123 cached.is_modified(raw.state())
124 } else {
125 true
126 }
127 });
128 for (oid, raw) in states {
130 cache.insert((*oid).clone(), raw.clone(), ttl);
131 }
132 }
133 }
134}
135
136#[path = "actt.rs"]
137pub mod actt;
138pub use eva_common::transform;
139
140pub const ERR_NO_PARAMS: &str = "action params not specified";
141
142#[inline]
143pub fn format_action_topic(oid: &OID) -> String {
144 format!("{}{}", ACTION_TOPIC, oid.as_path())
145}
146
147#[inline]
148pub fn format_raw_state_topic(oid: &OID) -> String {
149 format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
150}
151
152#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
153#[serde(rename_all = "lowercase")]
154pub enum ItemProp {
155 Status,
156 Value,
157}
158
159#[derive(Deserialize, Debug)]
161#[serde(deny_unknown_fields)]
162pub struct Action {
163 uuid: Uuid,
164 i: OID,
165 #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
166 timeout: Duration,
167 priority: u8,
168 params: Option<actions::Params>,
169 config: Option<Value>,
170 #[serde(skip, default = "Instant::now")]
171 received: Instant,
172}
173
174impl Action {
175 pub async fn publish_event_pending(&self) -> EResult<()> {
176 let event = self.event_pending();
177 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
178 Ok(())
179 }
180 pub async fn publish_event_running(&self) -> EResult<()> {
181 let event = self.event_running();
182 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
183 Ok(())
184 }
185 pub async fn publish_event_completed(&self, out: Option<Value>) -> EResult<()> {
186 let event = self.event_completed(out);
187 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
188 Ok(())
189 }
190 pub async fn publish_event_failed(
191 &self,
192 exitcode: i16,
193 out: Option<Value>,
194 err: Option<Value>,
195 ) -> EResult<()> {
196 let event = self.event_failed(exitcode, out, err);
197 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
198 Ok(())
199 }
200 pub async fn publish_event_canceled(&self) -> EResult<()> {
201 let event = self.event_canceled();
202 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
203 Ok(())
204 }
205 pub async fn publish_event_terminated(&self) -> EResult<()> {
206 let event = self.event_terminated();
207 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
208 Ok(())
209 }
210 pub fn event_pending(&self) -> actions::ActionEvent {
211 actions::ActionEvent {
212 uuid: self.uuid,
213 status: actions::Status::Pending as u8,
214 out: None,
215 err: None,
216 exitcode: None,
217 }
218 }
219 pub fn event_running(&self) -> actions::ActionEvent {
220 actions::ActionEvent {
221 uuid: self.uuid,
222 status: actions::Status::Running as u8,
223 out: None,
224 err: None,
225 exitcode: None,
226 }
227 }
228 pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
229 actions::ActionEvent {
230 uuid: self.uuid,
231 status: actions::Status::Completed as u8,
232 out,
233 err: None,
234 exitcode: Some(0),
235 }
236 }
237 pub fn event_failed(
238 &self,
239 exitcode: i16,
240 out: Option<Value>,
241 err: Option<Value>,
242 ) -> actions::ActionEvent {
243 actions::ActionEvent {
244 uuid: self.uuid,
245 status: actions::Status::Failed as u8,
246 out,
247 err,
248 exitcode: Some(exitcode),
249 }
250 }
251 pub fn event_canceled(&self) -> actions::ActionEvent {
252 actions::ActionEvent {
253 uuid: self.uuid,
254 status: actions::Status::Canceled as u8,
255 out: None,
256 err: None,
257 exitcode: None,
258 }
259 }
260 pub fn event_terminated(&self) -> actions::ActionEvent {
261 actions::ActionEvent {
262 uuid: self.uuid,
263 status: actions::Status::Terminated as u8,
264 out: None,
265 err: None,
266 exitcode: Some(-15),
267 }
268 }
269 #[inline]
270 pub fn uuid(&self) -> &Uuid {
271 &self.uuid
272 }
273 #[inline]
274 pub fn oid(&self) -> &OID {
275 &self.i
276 }
277 #[inline]
278 pub fn timeout(&self) -> Duration {
279 self.timeout
280 }
281 #[inline]
282 pub fn priority(&self) -> u8 {
283 self.priority
284 }
285 #[inline]
286 pub fn params(&self) -> Option<&actions::Params> {
287 self.params.as_ref()
288 }
289 #[inline]
290 pub fn take_params(&mut self) -> Option<actions::Params> {
291 self.params.take()
292 }
293 pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
294 if let Some(params) = self.params.take() {
295 match params {
296 eva_common::actions::Params::Unit(p) => Ok(p),
297 eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
298 "can not exec lmacro action with unit",
299 )),
300 }
301 } else {
302 Err(Error::invalid_data(ERR_NO_PARAMS))
303 }
304 }
305 pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
306 if let Some(params) = self.params.take() {
307 match params {
308 eva_common::actions::Params::Lmacro(p) => Ok(p),
309 eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
310 "can not exec unit action with lmacro",
311 )),
312 }
313 } else {
314 Err(Error::invalid_data(ERR_NO_PARAMS))
315 }
316 }
317 #[inline]
318 pub fn config(&self) -> Option<&Value> {
319 self.config.as_ref()
320 }
321 #[inline]
322 pub fn take_config(&mut self) -> Option<Value> {
323 self.config.take()
324 }
325 #[inline]
326 pub fn op(&self) -> Op {
327 Op::for_instant(self.received, self.timeout)
328 }
329}