eva_sdk/
state.rs

1use std::sync::LazyLock;
2use std::{borrow::Cow, collections::BTreeMap, str::Split, sync::Arc};
3
4use eva_common::events::{
5    LOCAL_STATE_TOPIC, LocalStateEvent, REMOTE_ARCHIVE_STATE_TOPIC, REMOTE_STATE_TOPIC,
6    RemoteStateEvent,
7};
8
9use crate::eapi_bus;
10use crate::prelude::{Frame, pack, unpack};
11use crate::types::FullItemStateConnected;
12use eva_common::value::{Value, ValueOptionOwned};
13use eva_common::{
14    EResult, Error, ItemKind, OID, OID_MASK_PREFIX_REGEX,
15    acl::{OIDMask, OIDMaskList},
16};
17use eva_common::{IEID, ItemStatus};
18use serde::{Deserialize, Serialize};
19use tokio::sync::RwLock;
20
21static DB: LazyLock<Db> = LazyLock::new(<_>::default);
22
23pub async fn process_bus_frame(frame: &Frame) -> EResult<()> {
24    DB.process_bus_frame(frame).await
25}
26
27/// Get a state from the process-global database
28pub async fn get(oid: &OID) -> EResult<Option<State>> {
29    DB.get(oid).await
30}
31
32/// Query the process-global database for states matching the given mask and filter
33pub async fn query(query: Query<'_>) -> EResult<Vec<State>> {
34    DB.query(query).await
35}
36
37/// Remove a local state from the process-global database
38pub async fn remove_local(oid: &OID) -> Option<State> {
39    DB.remove_local(oid).await
40}
41
42#[derive(Deserialize, Debug)]
43struct FullItemStateConnectedWithMeta {
44    #[serde(flatten)]
45    st: FullItemStateConnected,
46    #[serde(default)]
47    meta: Option<Value>,
48}
49
50#[derive(Serialize, Deserialize, Debug)]
51pub struct State(Arc<StateInner>);
52
53impl State {
54    pub fn oid(&self) -> &OID {
55        &self.0.st.oid
56    }
57    pub fn status(&self) -> ItemStatus {
58        self.0.st.status
59    }
60    pub fn value(&self) -> &Value {
61        &self.0.st.value
62    }
63    pub fn act(&self) -> usize {
64        self.0.st.act.unwrap_or_default()
65    }
66    pub fn ieid(&self) -> IEID {
67        self.0.st.ieid
68    }
69    pub fn set_time(&self) -> f64 {
70        self.0.st.t
71    }
72    pub fn connected(&self) -> bool {
73        self.0.st.connected
74    }
75    pub fn meta(&self) -> Option<&Value> {
76        self.0.meta.as_ref()
77    }
78}
79
80impl Clone for State {
81    fn clone(&self) -> Self {
82        State(self.0.clone())
83    }
84}
85
86impl From<FullItemStateConnected> for State {
87    fn from(st: FullItemStateConnected) -> Self {
88        State(Arc::new(StateInner {
89            meta: ValueOptionOwned::No,
90            st,
91        }))
92    }
93}
94
95#[derive(Debug, Serialize, Deserialize)]
96struct StateInner {
97    #[serde(skip_serializing_if = "ValueOptionOwned::is_none")]
98    meta: ValueOptionOwned,
99    #[serde(flatten)]
100    st: FullItemStateConnected,
101}
102
103pub struct Query<'a> {
104    mask: &'a OIDMask,
105    filter: Option<Filter<'a>>,
106    local: bool,
107}
108
109impl<'a> Query<'a> {
110    pub fn new(mask: &'a OIDMask) -> Self {
111        Self {
112            mask,
113            filter: None,
114            local: false,
115        }
116    }
117    pub fn filter(mut self, filter: Filter<'a>) -> Self {
118        self.filter = Some(filter);
119        self
120    }
121    /// Set the query as local-only (fetch only local states, do not query the node inventory),
122    /// works faster than a non-local query but may miss certain items or their metadata. It is
123    /// recommended first (or periodically) to run a non-local query then use local ones
124    pub fn local(mut self) -> Self {
125        self.local = true;
126        self
127    }
128}
129
130#[derive(Default)]
131pub struct Db {
132    state_db: Arc<RwLock<StateDb>>,
133}
134
135impl Clone for Db {
136    fn clone(&self) -> Self {
137        Db {
138            state_db: self.state_db.clone(),
139        }
140    }
141}
142
143impl Db {
144    pub fn new() -> Self {
145        Self::default()
146    }
147    pub async fn process_bus_frame(&self, frame: &Frame) -> EResult<()> {
148        let mut db = self.state_db.write().await;
149        db.process_bus_frame(frame)
150    }
151    pub async fn get(&self, oid: &OID) -> EResult<Option<State>> {
152        #[derive(Serialize)]
153        struct Params<'a> {
154            i: &'a OID,
155            full: bool,
156        }
157        let db = self.state_db.read().await;
158        let res = db.get_local(oid);
159        if res.is_some() {
160            return Ok(res);
161        }
162        let r = eapi_bus::call(
163            "eva.core",
164            "item.state",
165            pack(&Params { i: oid, full: true })?.into(),
166        )
167        .await?;
168        let s_st: Vec<FullItemStateConnectedWithMeta> = unpack(r.payload())?;
169        if s_st.len() > 1 {
170            return Err(Error::invalid_data("expected a single state"));
171        }
172        let Some(s) = s_st.into_iter().next() else {
173            return Ok(None);
174        };
175        drop(db);
176        Ok(Some(
177            self.state_db
178                .write()
179                .await
180                .record_state_connected_with_meta(s),
181        ))
182    }
183    pub async fn query(&self, query: Query<'_>) -> EResult<Vec<State>> {
184        #[derive(Serialize)]
185        struct Params<'a> {
186            i: &'a OIDMask,
187            full: bool,
188            #[serde(skip_serializing_if = "Option::is_none")]
189            include: Option<&'a OIDMaskList>,
190            #[serde(skip_serializing_if = "Option::is_none")]
191            exclude: Option<&'a OIDMaskList>,
192        }
193        if query.local {
194            return self.state_db.read().await.query_local(&query);
195        }
196        let payload = Params {
197            i: query.mask,
198            full: true,
199            include: query.filter.as_ref().and_then(|f| f.include),
200            exclude: query.filter.as_ref().and_then(|f| f.exclude),
201        };
202        let r = eapi_bus::call("eva.core", "item.state", pack(&payload)?.into()).await?;
203        let s_st: Vec<FullItemStateConnectedWithMeta> = unpack(r.payload())?;
204        if s_st.is_empty() {
205            return Ok(Vec::new());
206        }
207        let mut r_vec = Vec::with_capacity(s_st.len());
208        let mut db = self.state_db.write().await;
209        for s in s_st {
210            r_vec.push(db.record_state_connected_with_meta(s));
211        }
212        Ok(r_vec)
213    }
214    pub async fn remove_local(&self, oid: &OID) -> Option<State> {
215        let mut db = self.state_db.write().await;
216        db.remove(oid)
217    }
218}
219
220// a low-level lock-free db component
221#[derive(Default)]
222struct StateDb {
223    db: StateMap,
224}
225
226impl StateDb {
227    fn process_bus_frame(&mut self, frame: &Frame) -> EResult<()> {
228        let Some(topic) = frame.topic() else {
229            return Ok(());
230        };
231        if let Some(oid_str) = topic.strip_prefix(LOCAL_STATE_TOPIC) {
232            let oid: OID = OID::from_path(oid_str).map_err(Error::invalid_data)?;
233            let ev: LocalStateEvent = unpack(frame.payload())?;
234            let st: FullItemStateConnected =
235                FullItemStateConnected::from_local_state_event(ev, oid);
236            self.record(State::from(st)).ok();
237            return Ok(());
238        }
239        let Some(oid_str) = topic
240            .strip_prefix(REMOTE_STATE_TOPIC)
241            .or_else(|| topic.strip_prefix(REMOTE_ARCHIVE_STATE_TOPIC))
242        else {
243            return Ok(());
244        };
245        let oid: OID = OID::from_path(oid_str).map_err(Error::invalid_data)?;
246        let ev: RemoteStateEvent = unpack(frame.payload())?;
247        let st: FullItemStateConnected = FullItemStateConnected::from_remote_state_event(ev, oid);
248        self.record(State::from(st)).ok();
249        Ok(())
250    }
251    fn record_state_connected_with_meta(&mut self, mut s: FullItemStateConnectedWithMeta) -> State {
252        if s.meta.is_none() {
253            s.meta = Some(Value::Unit);
254        }
255        let mut st = State(
256            StateInner {
257                st: s.st,
258                meta: s.meta.into(),
259            }
260            .into(),
261        );
262        if let Err(e) = self.record(st.clone()) {
263            st = e; // got newer state, use it instead
264        }
265        st
266    }
267    fn record(&mut self, state: State) -> Result<(), State> {
268        self.db.append(state, false)
269    }
270    //fn record_force(&mut self, state: State) -> EResult<()> {
271    //self.db.append(state, true)
272    //}
273    fn get_local(&self, oid: &OID) -> Option<State> {
274        self.db.get(oid)
275    }
276    fn remove(&mut self, oid: &OID) -> Option<State> {
277        self.db.remove(oid)
278    }
279    fn query_local(&self, query: &Query<'_>) -> EResult<Vec<State>> {
280        let filter: Cow<Filter> = query
281            .filter
282            .as_ref()
283            .map_or_else(|| Cow::Owned(Filter::new()), Cow::Borrowed);
284        self.db.get_by_mask(query.mask, &filter)
285    }
286}
287
288#[derive(Default, Debug)]
289struct StateTree {
290    childs: BTreeMap<String, StateTree>,
291    members: BTreeMap<Arc<OID>, State>,
292    members_wildcard: BTreeMap<Arc<OID>, State>,
293}
294
295impl StateTree {
296    fn is_empty(&self) -> bool {
297        self.childs.is_empty() && self.members.is_empty()
298    }
299}
300
301#[derive(Debug, Default)]
302struct StateMap {
303    unit: StateTree,
304    sensor: StateTree,
305    lvar: StateTree,
306    lmacro: StateTree,
307}
308
309#[derive(Default, Clone)]
310pub struct Filter<'a> {
311    include: Option<&'a OIDMaskList>,
312    exclude: Option<&'a OIDMaskList>,
313}
314
315impl<'a> Filter<'a> {
316    pub fn new() -> Self {
317        Self::default()
318    }
319    pub fn include(mut self, mask_list: &'a OIDMaskList) -> Self {
320        self.include = Some(mask_list);
321        self
322    }
323    pub fn exclude(mut self, mask_list: &'a OIDMaskList) -> Self {
324        self.exclude = Some(mask_list);
325        self
326    }
327    fn matches(&self, state: &State) -> bool {
328        if let Some(f) = self.include
329            && !f.matches(state.oid())
330        {
331            return false;
332        }
333        if let Some(f) = self.exclude {
334            !f.matches(state.oid())
335        } else {
336            true
337        }
338    }
339}
340
341impl StateMap {
342    #[inline]
343    fn get_tree(&self, tp: ItemKind) -> &StateTree {
344        match tp {
345            ItemKind::Unit => &self.unit,
346            ItemKind::Sensor => &self.sensor,
347            ItemKind::Lvar => &self.lvar,
348            ItemKind::Lmacro => &self.lmacro,
349            //_ => Err(Error::not_implemented()),
350        }
351    }
352    #[inline]
353    fn get_tree_mut(&mut self, tp: ItemKind) -> &mut StateTree {
354        match tp {
355            ItemKind::Unit => &mut self.unit,
356            ItemKind::Sensor => &mut self.sensor,
357            ItemKind::Lvar => &mut self.lvar,
358            ItemKind::Lmacro => &mut self.lmacro,
359            //_ => Err(Error::not_implemented()),
360        }
361    }
362    #[inline]
363    /// in case of if the state is already present as is newer or equal to the existing one,
364    /// returns the eixsting state as an error
365    fn append(&mut self, state: State, force: bool) -> Result<(), State> {
366        let tree = self.get_tree_mut(state.oid().kind());
367        append_state_rec(tree, state.oid().full_id().split('/'), &state, force)
368    }
369    #[inline]
370    fn get(&self, oid: &OID) -> Option<State> {
371        let tree = self.get_tree(oid.kind());
372        get_state_rec(tree, oid.full_id().split('/'))
373    }
374    #[inline]
375    fn remove(&mut self, oid: &OID) -> Option<State> {
376        let tree = self.get_tree_mut(oid.kind());
377        remove_state_rec(tree, oid.full_id().split('/'), oid)
378    }
379    fn get_by_mask(&self, mask: &OIDMask, filter: &Filter) -> EResult<Vec<State>> {
380        if let Some(tp) = mask.kind() {
381            if tp == ItemKind::Lmacro {
382                return Ok(Vec::new());
383            }
384            let tree = self.get_tree(tp);
385            if let Some(chunks) = mask.chunks() {
386                let mut result = Vec::new();
387                get_state_by_mask_rec(tree, chunks.iter(), &mut result, filter)?;
388                Ok(result)
389            } else {
390                Ok(tree
391                    .members_wildcard
392                    .values()
393                    .filter(|x| filter.matches(x))
394                    .cloned()
395                    .collect())
396            }
397        } else {
398            let mut result = Vec::new();
399            if let Some(chunks) = mask.chunks() {
400                get_state_by_mask_rec(&self.unit, chunks.iter(), &mut result, filter)?;
401                get_state_by_mask_rec(&self.sensor, chunks.iter(), &mut result, filter)?;
402                get_state_by_mask_rec(&self.lvar, chunks.iter(), &mut result, filter)?;
403            } else {
404                result.extend(
405                    self.unit
406                        .members_wildcard
407                        .values()
408                        .filter(|x| filter.matches(x))
409                        .cloned()
410                        .collect::<Vec<State>>(),
411                );
412                result.extend(
413                    self.sensor
414                        .members_wildcard
415                        .values()
416                        .filter(|x| filter.matches(x))
417                        .cloned()
418                        .collect::<Vec<State>>(),
419                );
420                result.extend(
421                    self.lvar
422                        .members_wildcard
423                        .values()
424                        .filter(|x| filter.matches(x))
425                        .cloned()
426                        .collect::<Vec<State>>(),
427                );
428            }
429            Ok(result)
430        }
431    }
432}
433
434fn get_state_rec(tree: &StateTree, mut sp: Split<char>) -> Option<State> {
435    if let Some(chunk) = sp.next() {
436        if let Some(child) = tree.childs.get(chunk) {
437            get_state_rec(child, sp)
438        } else {
439            None
440        }
441    } else if tree.members.is_empty() {
442        None
443    } else {
444        Some(tree.members.values().next().unwrap().clone())
445    }
446}
447fn remove_state_rec(tree: &mut StateTree, mut sp: Split<char>, oid: &OID) -> Option<State> {
448    if let Some(chunk) = sp.next() {
449        tree.members_wildcard.remove(oid)?;
450        let state = if let Some(c) = tree.childs.get_mut(chunk) {
451            let state = remove_state_rec(c, sp.clone(), oid)?;
452            if c.is_empty() {
453                tree.childs.remove(chunk);
454            }
455            state
456        } else {
457            return None;
458        };
459        Some(state)
460    } else {
461        tree.members.remove(oid)
462    }
463}
464
465fn get_state_by_mask_rec(
466    tree: &StateTree,
467    mut iter: std::slice::Iter<&str>,
468    result: &mut Vec<State>,
469    filter: &Filter,
470) -> EResult<()> {
471    if let Some(chunk) = iter.next() {
472        if *chunk == "#" {
473            result.extend(
474                tree.members_wildcard
475                    .values()
476                    .filter(|x| filter.matches(x))
477                    .cloned()
478                    .collect::<Vec<State>>(),
479            );
480        } else if *chunk == "+" {
481            for child in tree.childs.values() {
482                get_state_by_mask_rec(child, iter.clone(), result, filter)?;
483            }
484        } else if let Some(regex) = chunk.strip_prefix(OID_MASK_PREFIX_REGEX) {
485            let re = regex::Regex::new(regex).map_err(Error::invalid_params)?;
486            for (name, child) in &tree.childs {
487                if re.is_match(name) {
488                    get_state_by_mask_rec(child, iter.clone(), result, filter)?;
489                }
490            }
491        } else if let Some(child) = tree.childs.get(*chunk) {
492            get_state_by_mask_rec(child, iter, result, filter)?;
493        }
494    } else {
495        result.extend(
496            tree.members
497                .values()
498                .filter(|x| filter.matches(x))
499                .cloned()
500                .collect::<Vec<State>>(),
501        );
502    }
503    Ok(())
504}
505
506fn append_state_rec(
507    tree: &mut StateTree,
508    mut sp: Split<char>,
509    state: &State,
510    force: bool,
511) -> Result<(), State> {
512    macro_rules! process_entry {
513        ($entry: expr) => {
514            match $entry {
515                std::collections::btree_map::Entry::Occupied(mut e) => {
516                    let existing = e.get();
517                    if existing.ieid() > state.ieid() && !force {
518                        return Err(existing.clone());
519                    }
520                    if existing.meta().is_none() || state.meta().is_some() {
521                        e.insert(state.clone());
522                    } else {
523                        // existing meta is some but new state has no meta
524                        let state_with_meta = State(Arc::new(StateInner {
525                            meta: existing.meta().cloned().into(),
526                            st: state.0.st.clone(),
527                        }));
528                        e.insert(state_with_meta);
529                    }
530                }
531                std::collections::btree_map::Entry::Vacant(e) => {
532                    e.insert(state.clone());
533                }
534            }
535        };
536    }
537    if let Some(chunk) = sp.next() {
538        process_entry!(tree.members_wildcard.entry(state.oid().clone().into()));
539        if let Some(c) = tree.childs.get_mut(chunk) {
540            append_state_rec(c, sp.clone(), state, force)?;
541        } else {
542            let mut child = StateTree::default();
543            append_state_rec(&mut child, sp.clone(), state, force)?;
544            tree.childs.insert(chunk.to_owned(), child);
545        }
546        return Ok(());
547    }
548    process_entry!(tree.members.entry(state.oid().clone().into()));
549    Ok(())
550}
551
552#[cfg(test)]
553mod test {
554
555    use crate::types::FullItemStateConnected;
556    use eva_common::{IEID, OID, acl::OIDMask, events::LocalStateEvent, value::Value};
557
558    use super::{FullItemStateConnectedWithMeta, Query};
559
560    use super::Db;
561
562    #[tokio::test]
563    async fn test_get_query() {
564        let db = Db::new();
565        let oid: OID = "sensor:tests/t1".parse().unwrap();
566        let ev = LocalStateEvent {
567            status: 1,
568            value: 123u8.into(),
569            act: None,
570            ieid: IEID::new(1, 0),
571            t: 123.0,
572        };
573        let st = FullItemStateConnected::from_local_state_event(ev, oid);
574        db.state_db.write().await.record(st.into()).unwrap();
575        let oid: OID = "sensor:t5".parse().unwrap();
576        let ev = LocalStateEvent {
577            status: 1,
578            value: 456u16.into(),
579            act: None,
580            ieid: IEID::new(1, 0),
581            t: 123.0,
582        };
583        let st = FullItemStateConnected::from_local_state_event(ev, oid);
584        db.state_db.write().await.record(st.into()).unwrap();
585        let st = db.get(&"sensor:tests/t1".parse().unwrap()).await.unwrap();
586        let st = st.expect("state not found");
587        assert!(st.oid() == &"sensor:tests/t1".parse().unwrap());
588        assert!(st.status() == 1);
589        assert!(st.value() == &123u8.into());
590        let st = db.get(&"sensor:t5".parse().unwrap()).await.unwrap();
591        let st = st.expect("state not found");
592        assert!(st.oid() == &"sensor:t5".parse().unwrap());
593        assert!(st.status() == 1);
594        assert!(st.value() == &456u16.into());
595        let mask = "sensor:#".parse::<OIDMask>().unwrap();
596        let q = Query::new(&mask).local();
597        let states = db.query(q).await.unwrap();
598        assert_eq!(states.len(), 2);
599        assert!(
600            states
601                .iter()
602                .any(|s| s.oid() == &"sensor:tests/t1".parse().unwrap())
603        );
604        assert!(
605            states
606                .iter()
607                .any(|s| s.oid() == &"sensor:t5".parse().unwrap())
608        );
609    }
610
611    #[tokio::test]
612    async fn test_push_state_without_meta_update_meta() {
613        let db = Db::new();
614        let oid: OID = "sensor:tests/t1".parse().unwrap();
615        let mut ev = LocalStateEvent {
616            status: 1,
617            value: 123u8.into(),
618            act: None,
619            ieid: IEID::new(1, 0),
620            t: 123.0,
621        };
622        let meta = Value::String("Hello world".into());
623        let st = FullItemStateConnectedWithMeta {
624            st: FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone()),
625            meta: Some(meta.clone()),
626        };
627        db.state_db
628            .write()
629            .await
630            .record_state_connected_with_meta(st);
631        ev.value = 456u16.into();
632        ev.ieid = IEID::new(1, 1);
633        let st = FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
634        db.state_db.write().await.record(st.into()).unwrap();
635        let st = db.get(&oid).await.unwrap();
636        let st = st.expect("state not found");
637        assert!(st.oid() == &oid);
638        assert!(st.status() == 1);
639        assert!(st.value() == &456u16.into());
640        assert_eq!(st.meta().unwrap(), &meta);
641        let meta = Value::String("New meta".into());
642        ev.ieid = IEID::new(1, 2);
643        let st = FullItemStateConnectedWithMeta {
644            st: FullItemStateConnected::from_local_state_event(ev, oid.clone()),
645            meta: Some(meta.clone()),
646        };
647        db.state_db
648            .write()
649            .await
650            .record_state_connected_with_meta(st);
651        let st = db.get(&oid).await.unwrap();
652        let st = st.expect("state not found");
653        assert!(st.oid() == &oid);
654        assert!(st.status() == 1);
655        assert!(st.value() == &456u16.into());
656        assert_eq!(st.meta().unwrap(), &meta);
657    }
658
659    #[tokio::test]
660    async fn test_push_older_state() {
661        let db = Db::new();
662        let oid: OID = "sensor:tests/t1".parse().unwrap();
663        let mut ev = LocalStateEvent {
664            status: 1,
665            value: 123u8.into(),
666            act: None,
667            ieid: IEID::new(1, 1),
668            t: 123.0,
669        };
670        let st: FullItemStateConnected =
671            FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
672        db.state_db.write().await.record(st.into()).unwrap();
673        ev.value = 456u16.into();
674        ev.ieid = IEID::new(1, 0); // older ieid
675        let st: FullItemStateConnected =
676            FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
677        let r = db.state_db.write().await.record(st.into());
678        assert!(r.is_err());
679        let st = db.get(&oid).await.unwrap();
680        let st = st.expect("state not found");
681        assert!(st.oid() == &oid);
682        assert!(st.status() == 1);
683        assert!(st.value() == &123u8.into());
684    }
685}