1use crate::eapi_bus;
2use eva_common::actions::{self, ACTION_TOPIC};
4use eva_common::events::{RAW_STATE_TOPIC, RawStateEventOwned};
5use eva_common::op::Op;
6use eva_common::payload::pack;
7use eva_common::prelude::*;
8use parking_lot::Mutex;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use ttl_cache::TtlCache;
16use uuid::Uuid;
17
18eva_common::err_logger!();
19
20const RAW_STATE_CACHE_MAX_CAPACITY: usize = 1_000_000;
21
22#[derive(Clone)]
23pub struct RawStateEventPreparedOwned {
24 state: RawStateEventOwned,
25 delta: Option<f64>,
26}
27
28impl RawStateEventPreparedOwned {
29 #[inline]
30 pub fn from_rse_owned(state: RawStateEventOwned, delta: Option<f64>) -> Self {
31 Self { state, delta }
32 }
33 #[inline]
34 pub fn delta(&self) -> Option<f64> {
35 self.delta
36 }
37 #[inline]
38 pub fn state(&self) -> &RawStateEventOwned {
39 &self.state
40 }
41 #[inline]
42 pub fn state_mut(&mut self) -> &mut RawStateEventOwned {
43 &mut self.state
44 }
45 pub fn is_modified(&self, prev: &RawStateEventOwned) -> bool {
46 if self.state.force == eva_common::events::Force::None && self.state.status == prev.status {
47 if self.state.value == prev.value {
48 return false;
49 }
50 if let Some(delta_v) = self.delta
51 && let ValueOptionOwned::Value(ref prev_value) = prev.value
52 && let ValueOptionOwned::Value(ref current_value) = self.state.value
53 && let Ok(prev_value_f) = TryInto::<f64>::try_into(prev_value.clone())
54 && let Ok(current_value_f) = TryInto::<f64>::try_into(current_value.clone())
55 && (current_value_f - prev_value_f).abs() < delta_v
56 {
57 return false;
58 }
59 }
60 true
61 }
62}
63
64impl From<RawStateEventPreparedOwned> for RawStateEventOwned {
65 fn from(s: RawStateEventPreparedOwned) -> Self {
66 s.state
67 }
68}
69
70pub struct RawStateCache {
71 cache: Mutex<TtlCache<OID, RawStateEventPreparedOwned>>,
72 ttl: Option<Duration>,
73}
74
75impl RawStateCache {
76 pub fn new(ttl: Option<Duration>) -> Self {
77 Self {
78 cache: Mutex::new(TtlCache::new(RAW_STATE_CACHE_MAX_CAPACITY)),
79 ttl,
80 }
81 }
82 pub fn push_check(
89 &self,
90 oid: &OID,
91 raw_state: &RawStateEventOwned,
92 delta: Option<f64>,
93 ) -> bool {
94 if let Some(ttl) = self.ttl {
95 let mut cache = self.cache.lock();
96 if let Some(v) = cache.get(oid)
97 && !v.is_modified(raw_state)
98 {
99 return false;
100 }
101 cache.insert(
102 oid.clone(),
103 RawStateEventPreparedOwned::from_rse_owned(raw_state.clone(), delta),
104 ttl,
105 );
106 true
107 } else {
108 false
109 }
110 }
111 pub fn retain_map_modified(&self, states: &mut HashMap<&OID, RawStateEventPreparedOwned>) {
118 if let Some(ttl) = self.ttl {
119 let mut cache = self.cache.lock();
120 states.retain(|oid, raw| {
121 if let Some(cached) = cache.get(oid) {
122 cached.is_modified(raw.state())
123 } else {
124 true
125 }
126 });
127 for (oid, raw) in states {
129 cache.insert((*oid).clone(), raw.clone(), ttl);
130 }
131 }
132 }
133}
134
135#[path = "actt.rs"]
136pub mod actt;
137pub use eva_common::transform;
138
139pub const ERR_NO_PARAMS: &str = "action params not specified";
140
141#[inline]
142pub fn format_action_topic(oid: &OID) -> String {
143 format!("{}{}", ACTION_TOPIC, oid.as_path())
144}
145
146#[inline]
147pub fn format_raw_state_topic(oid: &OID) -> String {
148 format!("{}{}", RAW_STATE_TOPIC, oid.as_path())
149}
150
151#[derive(Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
152#[serde(rename_all = "lowercase")]
153pub enum ItemProp {
154 Status,
155 Value,
156}
157
158#[derive(Deserialize, Debug)]
160#[serde(deny_unknown_fields)]
161pub struct Action {
162 uuid: Uuid,
163 i: OID,
164 #[serde(deserialize_with = "eva_common::tools::deserialize_duration_from_micros")]
165 timeout: Duration,
166 priority: u8,
167 params: Option<actions::Params>,
168 config: Option<Value>,
169 #[serde(skip, default = "Instant::now")]
170 received: Instant,
171}
172
173impl Action {
174 pub async fn publish_event_pending(&self) -> EResult<()> {
175 let event = self.event_pending();
176 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
177 Ok(())
178 }
179 pub async fn publish_event_running(&self) -> EResult<()> {
180 let event = self.event_running();
181 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
182 Ok(())
183 }
184 pub async fn publish_event_completed(&self, out: Option<Value>) -> EResult<()> {
185 let event = self.event_completed(out);
186 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
187 Ok(())
188 }
189 pub async fn publish_event_failed(
190 &self,
191 exitcode: i16,
192 out: Option<Value>,
193 err: Option<Value>,
194 ) -> EResult<()> {
195 let event = self.event_failed(exitcode, out, err);
196 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
197 Ok(())
198 }
199 pub async fn publish_event_canceled(&self) -> EResult<()> {
200 let event = self.event_canceled();
201 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
202 Ok(())
203 }
204 pub async fn publish_event_terminated(&self) -> EResult<()> {
205 let event = self.event_terminated();
206 eapi_bus::publish(&format_action_topic(&self.i), pack(&event)?.into()).await?;
207 Ok(())
208 }
209 pub fn event_pending(&self) -> actions::ActionEvent {
210 actions::ActionEvent {
211 uuid: self.uuid,
212 status: actions::Status::Pending as u8,
213 out: None,
214 err: None,
215 exitcode: None,
216 }
217 }
218 pub fn event_running(&self) -> actions::ActionEvent {
219 actions::ActionEvent {
220 uuid: self.uuid,
221 status: actions::Status::Running as u8,
222 out: None,
223 err: None,
224 exitcode: None,
225 }
226 }
227 pub fn event_completed(&self, out: Option<Value>) -> actions::ActionEvent {
228 actions::ActionEvent {
229 uuid: self.uuid,
230 status: actions::Status::Completed as u8,
231 out,
232 err: None,
233 exitcode: Some(0),
234 }
235 }
236 pub fn event_failed(
237 &self,
238 exitcode: i16,
239 out: Option<Value>,
240 err: Option<Value>,
241 ) -> actions::ActionEvent {
242 actions::ActionEvent {
243 uuid: self.uuid,
244 status: actions::Status::Failed as u8,
245 out,
246 err,
247 exitcode: Some(exitcode),
248 }
249 }
250 pub fn event_canceled(&self) -> actions::ActionEvent {
251 actions::ActionEvent {
252 uuid: self.uuid,
253 status: actions::Status::Canceled as u8,
254 out: None,
255 err: None,
256 exitcode: None,
257 }
258 }
259 pub fn event_terminated(&self) -> actions::ActionEvent {
260 actions::ActionEvent {
261 uuid: self.uuid,
262 status: actions::Status::Terminated as u8,
263 out: None,
264 err: None,
265 exitcode: Some(-15),
266 }
267 }
268 #[inline]
269 pub fn uuid(&self) -> &Uuid {
270 &self.uuid
271 }
272 #[inline]
273 pub fn oid(&self) -> &OID {
274 &self.i
275 }
276 #[inline]
277 pub fn timeout(&self) -> Duration {
278 self.timeout
279 }
280 #[inline]
281 pub fn priority(&self) -> u8 {
282 self.priority
283 }
284 #[inline]
285 pub fn params(&self) -> Option<&actions::Params> {
286 self.params.as_ref()
287 }
288 #[inline]
289 pub fn take_params(&mut self) -> Option<actions::Params> {
290 self.params.take()
291 }
292 pub fn take_unit_params(&mut self) -> EResult<actions::UnitParams> {
293 if let Some(params) = self.params.take() {
294 match params {
295 eva_common::actions::Params::Unit(p) => Ok(p),
296 eva_common::actions::Params::Lmacro(_) => Err(Error::not_implemented(
297 "can not exec lmacro action with unit",
298 )),
299 }
300 } else {
301 Err(Error::invalid_data(ERR_NO_PARAMS))
302 }
303 }
304 pub fn take_lmacro_params(&mut self) -> EResult<actions::LmacroParams> {
305 if let Some(params) = self.params.take() {
306 match params {
307 eva_common::actions::Params::Lmacro(p) => Ok(p),
308 eva_common::actions::Params::Unit(_) => Err(Error::not_implemented(
309 "can not exec unit action with lmacro",
310 )),
311 }
312 } else {
313 Err(Error::invalid_data(ERR_NO_PARAMS))
314 }
315 }
316 #[inline]
317 pub fn config(&self) -> Option<&Value> {
318 self.config.as_ref()
319 }
320 #[inline]
321 pub fn take_config(&mut self) -> Option<Value> {
322 self.config.take()
323 }
324 #[inline]
325 pub fn op(&self) -> Op {
326 Op::for_instant(self.received, self.timeout)
327 }
328}
329
330type LmacroExecutorFn = Box<
331 dyn Fn(actions::LmacroParams) -> Pin<Box<dyn Future<Output = EResult<Value>> + Send>>
332 + Send
333 + Sync,
334>;
335
336pub struct LmacroProcessor {
337 entries: Arc<HashMap<OID, LmacroExecutorFn>>,
338}
339
340impl Clone for LmacroProcessor {
341 fn clone(&self) -> Self {
342 Self {
343 entries: self.entries.clone(),
344 }
345 }
346}
347
348impl LmacroProcessor {
349 pub fn builder() -> LmacroProcessorBuilder {
350 LmacroProcessorBuilder::new()
351 }
352 pub async fn execute(&self, mut action: Action) {
353 action.publish_event_pending().await.log_ef();
354 let Ok(params) = action.take_lmacro_params() else {
355 action
356 .publish_event_failed(
357 -1,
358 None,
359 Some(Value::String("failed to parse lmacro params".to_owned())),
360 )
361 .await
362 .log_ef();
363 return;
364 };
365 let Some(lmx) = self.entries.get(action.oid()) else {
366 action
367 .publish_event_failed(
368 -1,
369 None,
370 Some(Value::String(format!(
371 "lmacro not found in executor: {}",
372 action.oid()
373 ))),
374 )
375 .await
376 .log_ef();
377 return;
378 };
379 let timeout = action.timeout();
380 action.publish_event_running().await.log_ef();
381 match tokio::time::timeout(timeout, lmx(params)).await {
382 Ok(Ok(v)) => {
383 action.publish_event_completed(Some(v)).await.log_ef();
384 }
385 Ok(Err(e)) => {
386 action
387 .publish_event_failed(e.code(), None, Some(Value::String(e.to_string())))
388 .await
389 .log_ef();
390 }
391 Err(_) => {
392 action
393 .publish_event_failed(
394 ErrorKind::Timeout as i16,
395 None,
396 Some(Value::String("action timeout".to_owned())),
397 )
398 .await
399 .log_ef();
400 }
401 }
402 }
403}
404
405#[derive(Default)]
406pub struct LmacroProcessorBuilder {
407 entries: HashMap<OID, LmacroExecutorFn>,
408}
409
410impl LmacroProcessorBuilder {
411 pub fn new() -> Self {
412 Self::default()
413 }
414 pub fn register<F, Fut>(&mut self, oid: OID, f: F)
415 where
416 F: Fn(actions::LmacroParams) -> Fut + Send + Sync + 'static,
417 Fut: Future<Output = EResult<Value>> + Send + 'static,
418 {
419 self.entries
420 .insert(oid, Box::new(move |params| Box::pin(f(params))));
421 }
422 pub fn build(self) -> LmacroProcessor {
423 LmacroProcessor {
424 entries: Arc::new(self.entries),
425 }
426 }
427}