1use std::sync::LazyLock;
2use std::{borrow::Cow, collections::BTreeMap, str::Split, sync::Arc};
3
4use eva_common::events::{
5 LOCAL_STATE_TOPIC, LocalStateEvent, REMOTE_ARCHIVE_STATE_TOPIC, REMOTE_STATE_TOPIC,
6 RemoteStateEvent,
7};
8
9use crate::eapi_bus;
10use crate::prelude::{Frame, pack, unpack};
11use crate::types::FullItemStateConnected;
12use eva_common::value::{Value, ValueOptionOwned};
13use eva_common::{
14 EResult, Error, ItemKind, OID, OID_MASK_PREFIX_REGEX,
15 acl::{OIDMask, OIDMaskList},
16};
17use eva_common::{IEID, ItemStatus};
18use serde::{Deserialize, Serialize};
19use tokio::sync::RwLock;
20
21static DB: LazyLock<Db> = LazyLock::new(<_>::default);
22
23pub async fn process_bus_frame(frame: &Frame) -> EResult<()> {
24 DB.process_bus_frame(frame).await
25}
26
27pub async fn get(oid: &OID) -> EResult<Option<State>> {
29 DB.get(oid).await
30}
31
32pub async fn query(query: Query<'_>) -> EResult<Vec<State>> {
34 DB.query(query).await
35}
36
37pub async fn remove_local(oid: &OID) -> Option<State> {
39 DB.remove_local(oid).await
40}
41
42#[derive(Deserialize, Debug)]
43struct FullItemStateConnectedWithMeta {
44 #[serde(flatten)]
45 st: FullItemStateConnected,
46 #[serde(default)]
47 meta: Option<Value>,
48}
49
50#[derive(Serialize, Deserialize, Debug)]
51pub struct State(Arc<StateInner>);
52
53impl State {
54 pub fn oid(&self) -> &OID {
55 &self.0.st.oid
56 }
57 pub fn status(&self) -> ItemStatus {
58 self.0.st.status
59 }
60 pub fn value(&self) -> &Value {
61 &self.0.st.value
62 }
63 pub fn act(&self) -> usize {
64 self.0.st.act.unwrap_or_default()
65 }
66 pub fn ieid(&self) -> IEID {
67 self.0.st.ieid
68 }
69 pub fn set_time(&self) -> f64 {
70 self.0.st.t
71 }
72 pub fn connected(&self) -> bool {
73 self.0.st.connected
74 }
75 pub fn meta(&self) -> Option<&Value> {
76 self.0.meta.as_ref()
77 }
78}
79
80impl Clone for State {
81 fn clone(&self) -> Self {
82 State(self.0.clone())
83 }
84}
85
86impl From<FullItemStateConnected> for State {
87 fn from(st: FullItemStateConnected) -> Self {
88 State(Arc::new(StateInner {
89 meta: ValueOptionOwned::No,
90 st,
91 }))
92 }
93}
94
95#[derive(Debug, Serialize, Deserialize)]
96struct StateInner {
97 #[serde(skip_serializing_if = "ValueOptionOwned::is_none")]
98 meta: ValueOptionOwned,
99 #[serde(flatten)]
100 st: FullItemStateConnected,
101}
102
103pub struct Query<'a> {
104 mask: &'a OIDMask,
105 filter: Option<Filter<'a>>,
106 local: bool,
107}
108
109impl<'a> Query<'a> {
110 pub fn new(mask: &'a OIDMask) -> Self {
111 Self {
112 mask,
113 filter: None,
114 local: false,
115 }
116 }
117 pub fn filter(mut self, filter: Filter<'a>) -> Self {
118 self.filter = Some(filter);
119 self
120 }
121 pub fn local(mut self) -> Self {
125 self.local = true;
126 self
127 }
128}
129
130#[derive(Default)]
131pub struct Db {
132 state_db: Arc<RwLock<StateDb>>,
133}
134
135impl Clone for Db {
136 fn clone(&self) -> Self {
137 Db {
138 state_db: self.state_db.clone(),
139 }
140 }
141}
142
143impl Db {
144 pub fn new() -> Self {
145 Self::default()
146 }
147 pub async fn process_bus_frame(&self, frame: &Frame) -> EResult<()> {
148 let mut db = self.state_db.write().await;
149 db.process_bus_frame(frame)
150 }
151 pub async fn get(&self, oid: &OID) -> EResult<Option<State>> {
152 #[derive(Serialize)]
153 struct Params<'a> {
154 i: &'a OID,
155 full: bool,
156 }
157 let db = self.state_db.read().await;
158 let res = db.get_local(oid);
159 if res.is_some() {
160 return Ok(res);
161 }
162 let r = eapi_bus::call(
163 "eva.core",
164 "item.state",
165 pack(&Params { i: oid, full: true })?.into(),
166 )
167 .await?;
168 let s_st: Vec<FullItemStateConnectedWithMeta> = unpack(r.payload())?;
169 if s_st.len() > 1 {
170 return Err(Error::invalid_data("expected a single state"));
171 }
172 let Some(s) = s_st.into_iter().next() else {
173 return Ok(None);
174 };
175 drop(db);
176 Ok(Some(
177 self.state_db
178 .write()
179 .await
180 .record_state_connected_with_meta(s),
181 ))
182 }
183 pub async fn query(&self, query: Query<'_>) -> EResult<Vec<State>> {
184 #[derive(Serialize)]
185 struct Params<'a> {
186 i: &'a OIDMask,
187 full: bool,
188 #[serde(skip_serializing_if = "Option::is_none")]
189 include: Option<&'a OIDMaskList>,
190 #[serde(skip_serializing_if = "Option::is_none")]
191 exclude: Option<&'a OIDMaskList>,
192 }
193 if query.local {
194 return self.state_db.read().await.query_local(&query);
195 }
196 let payload = Params {
197 i: query.mask,
198 full: true,
199 include: query.filter.as_ref().and_then(|f| f.include),
200 exclude: query.filter.as_ref().and_then(|f| f.exclude),
201 };
202 let r = eapi_bus::call("eva.core", "item.state", pack(&payload)?.into()).await?;
203 let s_st: Vec<FullItemStateConnectedWithMeta> = unpack(r.payload())?;
204 if s_st.is_empty() {
205 return Ok(Vec::new());
206 }
207 let mut r_vec = Vec::with_capacity(s_st.len());
208 let mut db = self.state_db.write().await;
209 for s in s_st {
210 r_vec.push(db.record_state_connected_with_meta(s));
211 }
212 Ok(r_vec)
213 }
214 pub async fn remove_local(&self, oid: &OID) -> Option<State> {
215 let mut db = self.state_db.write().await;
216 db.remove(oid)
217 }
218}
219
220#[derive(Default)]
222struct StateDb {
223 db: StateMap,
224}
225
226impl StateDb {
227 fn process_bus_frame(&mut self, frame: &Frame) -> EResult<()> {
228 let Some(topic) = frame.topic() else {
229 return Ok(());
230 };
231 if let Some(oid_str) = topic.strip_prefix(LOCAL_STATE_TOPIC) {
232 let oid: OID = OID::from_path(oid_str).map_err(Error::invalid_data)?;
233 let ev: LocalStateEvent = unpack(frame.payload())?;
234 let st: FullItemStateConnected =
235 FullItemStateConnected::from_local_state_event(ev, oid);
236 self.record(State::from(st)).ok();
237 return Ok(());
238 }
239 let Some(oid_str) = topic
240 .strip_prefix(REMOTE_STATE_TOPIC)
241 .or_else(|| topic.strip_prefix(REMOTE_ARCHIVE_STATE_TOPIC))
242 else {
243 return Ok(());
244 };
245 let oid: OID = OID::from_path(oid_str).map_err(Error::invalid_data)?;
246 let ev: RemoteStateEvent = unpack(frame.payload())?;
247 let st: FullItemStateConnected = FullItemStateConnected::from_remote_state_event(ev, oid);
248 self.record(State::from(st)).ok();
249 Ok(())
250 }
251 fn record_state_connected_with_meta(&mut self, mut s: FullItemStateConnectedWithMeta) -> State {
252 if s.meta.is_none() {
253 s.meta = Some(Value::Unit);
254 }
255 let mut st = State(
256 StateInner {
257 st: s.st,
258 meta: s.meta.into(),
259 }
260 .into(),
261 );
262 if let Err(e) = self.record(st.clone()) {
263 st = e; }
265 st
266 }
267 fn record(&mut self, state: State) -> Result<(), State> {
268 self.db.append(state, false)
269 }
270 fn get_local(&self, oid: &OID) -> Option<State> {
274 self.db.get(oid)
275 }
276 fn remove(&mut self, oid: &OID) -> Option<State> {
277 self.db.remove(oid)
278 }
279 fn query_local(&self, query: &Query<'_>) -> EResult<Vec<State>> {
280 let filter: Cow<Filter> = query
281 .filter
282 .as_ref()
283 .map_or_else(|| Cow::Owned(Filter::new()), Cow::Borrowed);
284 self.db.get_by_mask(query.mask, &filter)
285 }
286}
287
288#[derive(Default, Debug)]
289struct StateTree {
290 childs: BTreeMap<String, StateTree>,
291 members: BTreeMap<Arc<OID>, State>,
292 members_wildcard: BTreeMap<Arc<OID>, State>,
293}
294
295impl StateTree {
296 fn is_empty(&self) -> bool {
297 self.childs.is_empty() && self.members.is_empty()
298 }
299}
300
301#[derive(Debug, Default)]
302struct StateMap {
303 unit: StateTree,
304 sensor: StateTree,
305 lvar: StateTree,
306 lmacro: StateTree,
307}
308
309#[derive(Default, Clone)]
310pub struct Filter<'a> {
311 include: Option<&'a OIDMaskList>,
312 exclude: Option<&'a OIDMaskList>,
313}
314
315impl<'a> Filter<'a> {
316 pub fn new() -> Self {
317 Self::default()
318 }
319 pub fn include(mut self, mask_list: &'a OIDMaskList) -> Self {
320 self.include = Some(mask_list);
321 self
322 }
323 pub fn exclude(mut self, mask_list: &'a OIDMaskList) -> Self {
324 self.exclude = Some(mask_list);
325 self
326 }
327 fn matches(&self, state: &State) -> bool {
328 if let Some(f) = self.include
329 && !f.matches(state.oid())
330 {
331 return false;
332 }
333 if let Some(f) = self.exclude {
334 !f.matches(state.oid())
335 } else {
336 true
337 }
338 }
339}
340
341impl StateMap {
342 #[inline]
343 fn get_tree(&self, tp: ItemKind) -> &StateTree {
344 match tp {
345 ItemKind::Unit => &self.unit,
346 ItemKind::Sensor => &self.sensor,
347 ItemKind::Lvar => &self.lvar,
348 ItemKind::Lmacro => &self.lmacro,
349 }
351 }
352 #[inline]
353 fn get_tree_mut(&mut self, tp: ItemKind) -> &mut StateTree {
354 match tp {
355 ItemKind::Unit => &mut self.unit,
356 ItemKind::Sensor => &mut self.sensor,
357 ItemKind::Lvar => &mut self.lvar,
358 ItemKind::Lmacro => &mut self.lmacro,
359 }
361 }
362 #[inline]
363 fn append(&mut self, state: State, force: bool) -> Result<(), State> {
366 let tree = self.get_tree_mut(state.oid().kind());
367 append_state_rec(tree, state.oid().full_id().split('/'), &state, force)
368 }
369 #[inline]
370 fn get(&self, oid: &OID) -> Option<State> {
371 let tree = self.get_tree(oid.kind());
372 get_state_rec(tree, oid.full_id().split('/'))
373 }
374 #[inline]
375 fn remove(&mut self, oid: &OID) -> Option<State> {
376 let tree = self.get_tree_mut(oid.kind());
377 remove_state_rec(tree, oid.full_id().split('/'), oid)
378 }
379 fn get_by_mask(&self, mask: &OIDMask, filter: &Filter) -> EResult<Vec<State>> {
380 if let Some(tp) = mask.kind() {
381 if tp == ItemKind::Lmacro {
382 return Ok(Vec::new());
383 }
384 let tree = self.get_tree(tp);
385 if let Some(chunks) = mask.chunks() {
386 let mut result = Vec::new();
387 get_state_by_mask_rec(tree, chunks.iter(), &mut result, filter)?;
388 Ok(result)
389 } else {
390 Ok(tree
391 .members_wildcard
392 .values()
393 .filter(|x| filter.matches(x))
394 .cloned()
395 .collect())
396 }
397 } else {
398 let mut result = Vec::new();
399 if let Some(chunks) = mask.chunks() {
400 get_state_by_mask_rec(&self.unit, chunks.iter(), &mut result, filter)?;
401 get_state_by_mask_rec(&self.sensor, chunks.iter(), &mut result, filter)?;
402 get_state_by_mask_rec(&self.lvar, chunks.iter(), &mut result, filter)?;
403 } else {
404 result.extend(
405 self.unit
406 .members_wildcard
407 .values()
408 .filter(|x| filter.matches(x))
409 .cloned()
410 .collect::<Vec<State>>(),
411 );
412 result.extend(
413 self.sensor
414 .members_wildcard
415 .values()
416 .filter(|x| filter.matches(x))
417 .cloned()
418 .collect::<Vec<State>>(),
419 );
420 result.extend(
421 self.lvar
422 .members_wildcard
423 .values()
424 .filter(|x| filter.matches(x))
425 .cloned()
426 .collect::<Vec<State>>(),
427 );
428 }
429 Ok(result)
430 }
431 }
432}
433
434fn get_state_rec(tree: &StateTree, mut sp: Split<char>) -> Option<State> {
435 if let Some(chunk) = sp.next() {
436 if let Some(child) = tree.childs.get(chunk) {
437 get_state_rec(child, sp)
438 } else {
439 None
440 }
441 } else if tree.members.is_empty() {
442 None
443 } else {
444 Some(tree.members.values().next().unwrap().clone())
445 }
446}
447fn remove_state_rec(tree: &mut StateTree, mut sp: Split<char>, oid: &OID) -> Option<State> {
448 if let Some(chunk) = sp.next() {
449 tree.members_wildcard.remove(oid)?;
450 let state = if let Some(c) = tree.childs.get_mut(chunk) {
451 let state = remove_state_rec(c, sp.clone(), oid)?;
452 if c.is_empty() {
453 tree.childs.remove(chunk);
454 }
455 state
456 } else {
457 return None;
458 };
459 Some(state)
460 } else {
461 tree.members.remove(oid)
462 }
463}
464
465fn get_state_by_mask_rec(
466 tree: &StateTree,
467 mut iter: std::slice::Iter<&str>,
468 result: &mut Vec<State>,
469 filter: &Filter,
470) -> EResult<()> {
471 if let Some(chunk) = iter.next() {
472 if *chunk == "#" {
473 result.extend(
474 tree.members_wildcard
475 .values()
476 .filter(|x| filter.matches(x))
477 .cloned()
478 .collect::<Vec<State>>(),
479 );
480 } else if *chunk == "+" {
481 for child in tree.childs.values() {
482 get_state_by_mask_rec(child, iter.clone(), result, filter)?;
483 }
484 } else if let Some(regex) = chunk.strip_prefix(OID_MASK_PREFIX_REGEX) {
485 let re = regex::Regex::new(regex).map_err(Error::invalid_params)?;
486 for (name, child) in &tree.childs {
487 if re.is_match(name) {
488 get_state_by_mask_rec(child, iter.clone(), result, filter)?;
489 }
490 }
491 } else if let Some(child) = tree.childs.get(*chunk) {
492 get_state_by_mask_rec(child, iter, result, filter)?;
493 }
494 } else {
495 result.extend(
496 tree.members
497 .values()
498 .filter(|x| filter.matches(x))
499 .cloned()
500 .collect::<Vec<State>>(),
501 );
502 }
503 Ok(())
504}
505
506fn append_state_rec(
507 tree: &mut StateTree,
508 mut sp: Split<char>,
509 state: &State,
510 force: bool,
511) -> Result<(), State> {
512 macro_rules! process_entry {
513 ($entry: expr) => {
514 match $entry {
515 std::collections::btree_map::Entry::Occupied(mut e) => {
516 let existing = e.get();
517 if existing.ieid() > state.ieid() && !force {
518 return Err(existing.clone());
519 }
520 if existing.meta().is_none() || state.meta().is_some() {
521 e.insert(state.clone());
522 } else {
523 let state_with_meta = State(Arc::new(StateInner {
525 meta: existing.meta().cloned().into(),
526 st: state.0.st.clone(),
527 }));
528 e.insert(state_with_meta);
529 }
530 }
531 std::collections::btree_map::Entry::Vacant(e) => {
532 e.insert(state.clone());
533 }
534 }
535 };
536 }
537 if let Some(chunk) = sp.next() {
538 process_entry!(tree.members_wildcard.entry(state.oid().clone().into()));
539 if let Some(c) = tree.childs.get_mut(chunk) {
540 append_state_rec(c, sp.clone(), state, force)?;
541 } else {
542 let mut child = StateTree::default();
543 append_state_rec(&mut child, sp.clone(), state, force)?;
544 tree.childs.insert(chunk.to_owned(), child);
545 }
546 return Ok(());
547 }
548 process_entry!(tree.members.entry(state.oid().clone().into()));
549 Ok(())
550}
551
552#[cfg(test)]
553mod test {
554
555 use crate::types::FullItemStateConnected;
556 use eva_common::{IEID, OID, acl::OIDMask, events::LocalStateEvent, value::Value};
557
558 use super::{FullItemStateConnectedWithMeta, Query};
559
560 use super::Db;
561
562 #[tokio::test]
563 async fn test_get_query() {
564 let db = Db::new();
565 let oid: OID = "sensor:tests/t1".parse().unwrap();
566 let ev = LocalStateEvent {
567 status: 1,
568 value: 123u8.into(),
569 act: None,
570 ieid: IEID::new(1, 0),
571 t: 123.0,
572 };
573 let st = FullItemStateConnected::from_local_state_event(ev, oid);
574 db.state_db.write().await.record(st.into()).unwrap();
575 let oid: OID = "sensor:t5".parse().unwrap();
576 let ev = LocalStateEvent {
577 status: 1,
578 value: 456u16.into(),
579 act: None,
580 ieid: IEID::new(1, 0),
581 t: 123.0,
582 };
583 let st = FullItemStateConnected::from_local_state_event(ev, oid);
584 db.state_db.write().await.record(st.into()).unwrap();
585 let st = db.get(&"sensor:tests/t1".parse().unwrap()).await.unwrap();
586 let st = st.expect("state not found");
587 assert!(st.oid() == &"sensor:tests/t1".parse().unwrap());
588 assert!(st.status() == 1);
589 assert!(st.value() == &123u8.into());
590 let st = db.get(&"sensor:t5".parse().unwrap()).await.unwrap();
591 let st = st.expect("state not found");
592 assert!(st.oid() == &"sensor:t5".parse().unwrap());
593 assert!(st.status() == 1);
594 assert!(st.value() == &456u16.into());
595 let mask = "sensor:#".parse::<OIDMask>().unwrap();
596 let q = Query::new(&mask).local();
597 let states = db.query(q).await.unwrap();
598 assert_eq!(states.len(), 2);
599 assert!(
600 states
601 .iter()
602 .any(|s| s.oid() == &"sensor:tests/t1".parse().unwrap())
603 );
604 assert!(
605 states
606 .iter()
607 .any(|s| s.oid() == &"sensor:t5".parse().unwrap())
608 );
609 }
610
611 #[tokio::test]
612 async fn test_push_state_without_meta_update_meta() {
613 let db = Db::new();
614 let oid: OID = "sensor:tests/t1".parse().unwrap();
615 let mut ev = LocalStateEvent {
616 status: 1,
617 value: 123u8.into(),
618 act: None,
619 ieid: IEID::new(1, 0),
620 t: 123.0,
621 };
622 let meta = Value::String("Hello world".into());
623 let st = FullItemStateConnectedWithMeta {
624 st: FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone()),
625 meta: Some(meta.clone()),
626 };
627 db.state_db
628 .write()
629 .await
630 .record_state_connected_with_meta(st);
631 ev.value = 456u16.into();
632 ev.ieid = IEID::new(1, 1);
633 let st = FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
634 db.state_db.write().await.record(st.into()).unwrap();
635 let st = db.get(&oid).await.unwrap();
636 let st = st.expect("state not found");
637 assert!(st.oid() == &oid);
638 assert!(st.status() == 1);
639 assert!(st.value() == &456u16.into());
640 assert_eq!(st.meta().unwrap(), &meta);
641 let meta = Value::String("New meta".into());
642 ev.ieid = IEID::new(1, 2);
643 let st = FullItemStateConnectedWithMeta {
644 st: FullItemStateConnected::from_local_state_event(ev, oid.clone()),
645 meta: Some(meta.clone()),
646 };
647 db.state_db
648 .write()
649 .await
650 .record_state_connected_with_meta(st);
651 let st = db.get(&oid).await.unwrap();
652 let st = st.expect("state not found");
653 assert!(st.oid() == &oid);
654 assert!(st.status() == 1);
655 assert!(st.value() == &456u16.into());
656 assert_eq!(st.meta().unwrap(), &meta);
657 }
658
659 #[tokio::test]
660 async fn test_push_older_state() {
661 let db = Db::new();
662 let oid: OID = "sensor:tests/t1".parse().unwrap();
663 let mut ev = LocalStateEvent {
664 status: 1,
665 value: 123u8.into(),
666 act: None,
667 ieid: IEID::new(1, 1),
668 t: 123.0,
669 };
670 let st: FullItemStateConnected =
671 FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
672 db.state_db.write().await.record(st.into()).unwrap();
673 ev.value = 456u16.into();
674 ev.ieid = IEID::new(1, 0); let st: FullItemStateConnected =
676 FullItemStateConnected::from_local_state_event(ev.clone(), oid.clone());
677 let r = db.state_db.write().await.record(st.into());
678 assert!(r.is_err());
679 let st = db.get(&oid).await.unwrap();
680 let st = st.expect("state not found");
681 assert!(st.oid() == &oid);
682 assert!(st.status() == 1);
683 assert!(st.value() == &123u8.into());
684 }
685}