eva_common/
events.rs

1use crate::acl::OIDMaskList;
2use crate::value::{Value, ValueOption, ValueOptionOwned};
3use crate::{EResult, Error};
4use crate::{ItemStatus, IEID, OID};
5use serde::{Deserialize, Deserializer, Serialize, Serializer};
6use std::hash::{Hash, Hasher};
7use std::str::FromStr;
8use std::time::Duration;
9
10pub const RAW_STATE_TOPIC: &str = "RAW/";
11pub const RAW_STATE_BULK_TOPIC: &str = "RAW";
12pub const LOCAL_STATE_TOPIC: &str = "ST/LOC/";
13pub const REMOTE_STATE_TOPIC: &str = "ST/REM/";
14pub const REMOTE_ARCHIVE_STATE_TOPIC: &str = "ST/RAR/";
15pub const ANY_STATE_TOPIC: &str = "ST/+/";
16pub const REPLICATION_STATE_TOPIC: &str = "RPL/ST/";
17pub const REPLICATION_INVENTORY_TOPIC: &str = "RPL/INVENTORY/";
18pub const REPLICATION_NODE_STATE_TOPIC: &str = "RPL/NODE/";
19pub const LOG_INPUT_TOPIC: &str = "LOG/IN/";
20pub const LOG_EVENT_TOPIC: &str = "LOG/EV/";
21pub const LOG_CALL_TRACE_TOPIC: &str = "LOG/TR/";
22pub const SERVICE_STATUS_TOPIC: &str = "SVC/ST";
23pub const AAA_ACL_TOPIC: &str = "AAA/ACL/";
24pub const AAA_KEY_TOPIC: &str = "AAA/KEY/";
25pub const AAA_USER_TOPIC: &str = "AAA/USER/";
26
27#[derive(Debug, Copy, Clone)]
28#[repr(i8)]
29pub enum NodeStatus {
30    Online = 1,
31    Offline = 0,
32    Removed = -1,
33}
34
35impl NodeStatus {
36    fn as_str(&self) -> &str {
37        match self {
38            NodeStatus::Online => "online",
39            NodeStatus::Offline => "offline",
40            NodeStatus::Removed => "removed",
41        }
42    }
43}
44
45impl FromStr for NodeStatus {
46    type Err = Error;
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        match s {
49            "online" => Ok(NodeStatus::Online),
50            "offline" => Ok(NodeStatus::Offline),
51            "removed" => Ok(NodeStatus::Removed),
52            _ => Err(Error::invalid_data(format!("Invalid node status: {}", s))),
53        }
54    }
55}
56
57/// submitted to RPL/NODE/<name>
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct NodeStateEvent {
60    pub status: NodeStatus,
61    #[serde(default)]
62    pub info: Option<NodeInfo>,
63    #[serde(
64        default,
65        serialize_with = "crate::tools::serialize_opt_duration_as_f64",
66        deserialize_with = "crate::tools::de_opt_float_as_duration"
67    )]
68    pub timeout: Option<Duration>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct NodeInfo {
73    pub build: u64,
74    pub version: String,
75}
76
77impl Serialize for NodeStatus {
78    #[inline]
79    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
80    where
81        S: Serializer,
82    {
83        serializer.serialize_str(self.as_str())
84    }
85}
86
87impl<'de> Deserialize<'de> for NodeStatus {
88    #[inline]
89    fn deserialize<D>(deserializer: D) -> Result<NodeStatus, D::Error>
90    where
91        D: Deserializer<'de>,
92    {
93        let s: String = Deserialize::deserialize(deserializer)?;
94        s.parse().map_err(serde::de::Error::custom)
95    }
96}
97
98#[derive(Debug, Copy, Clone, Eq, PartialEq, Default, Ord, PartialOrd)]
99pub enum Force {
100    #[default]
101    None,
102    // Update force behavior: always updates item state even if the previous is the same, updates
103    // lvar state even if its status is 0
104    Update,
105    /// Full force behavior: does the same as Weak, but also updates the item state even if the the
106    /// item is disabled
107    Full,
108}
109
110impl Force {
111    #[inline]
112    pub fn is_none(&self) -> bool {
113        matches!(self, Force::None)
114    }
115    #[inline]
116    pub fn is_weak(&self) -> bool {
117        matches!(self, Force::Update)
118    }
119    #[inline]
120    pub fn is_full(&self) -> bool {
121        matches!(self, Force::Full)
122    }
123    #[inline]
124    pub fn is_any(&self) -> bool {
125        !self.is_none()
126    }
127}
128
129impl Serialize for Force {
130    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131    where
132        S: Serializer,
133    {
134        match self {
135            Force::None => serializer.serialize_bool(false),
136            Force::Full => serializer.serialize_bool(true),
137            Force::Update => serializer.serialize_str("update"),
138        }
139    }
140}
141
142impl FromStr for Force {
143    type Err = Error;
144
145    fn from_str(input: &str) -> Result<Force, Self::Err> {
146        match input.to_lowercase().as_str() {
147            "none" => Ok(Force::None),
148            "full" => Ok(Force::Full),
149            "update" | "weak" => Ok(Force::Update),
150            _ => Err(Error::invalid_data(format!(
151                "Invalid force value: {}",
152                input
153            ))),
154        }
155    }
156}
157
158impl<'de> Deserialize<'de> for Force {
159    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
160    where
161        D: Deserializer<'de>,
162    {
163        struct ForceVisitor;
164
165        impl serde::de::Visitor<'_> for ForceVisitor {
166            type Value = Force;
167
168            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
169                formatter.write_str("a boolean or a string representing a force")
170            }
171
172            fn visit_bool<E>(self, value: bool) -> Result<Force, E>
173            where
174                E: serde::de::Error,
175            {
176                Ok(if value { Force::Full } else { Force::None })
177            }
178
179            fn visit_borrowed_str<E>(self, value: &str) -> Result<Force, E>
180            where
181                E: serde::de::Error,
182            {
183                value.parse().map_err(serde::de::Error::custom)
184            }
185
186            fn visit_str<E>(self, value: &str) -> Result<Force, E>
187            where
188                E: serde::de::Error,
189            {
190                value.parse().map_err(serde::de::Error::custom)
191            }
192
193            fn visit_string<E>(self, value: String) -> Result<Force, E>
194            where
195                E: serde::de::Error,
196            {
197                value.parse().map_err(serde::de::Error::custom)
198            }
199        }
200
201        deserializer.deserialize_any(ForceVisitor)
202    }
203}
204
205/// On modified rules
206#[derive(Debug, Clone, Serialize, Eq, PartialEq)]
207#[serde(rename_all = "snake_case")]
208pub enum OnModified<'a> {
209    SetOther(OnModifiedSet<'a>),
210    SetOtherValueDelta(OnModifiedValueDelta<'a>),
211}
212
213/// On modified rules (owned)
214#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq)]
215#[serde(rename_all = "snake_case")]
216pub enum OnModifiedOwned {
217    SetOther(OnModifiedSetOwned),
218    SetOtherValueDelta(OnModifiedValueDeltaOwned),
219}
220
221#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
222#[serde(rename_all = "lowercase")]
223pub enum OnModifiedError {
224    /// Skip the operation
225    Skip,
226    /// Reset item to status = 1, value = 0
227    Reset,
228    /// Process the operation
229    #[default]
230    Process,
231}
232
233#[derive(Debug, Clone, Serialize, Eq, PartialEq)]
234#[serde(deny_unknown_fields)]
235pub struct OnModifiedSet<'a> {
236    /// For the selected OID mask list
237    pub oid: &'a OIDMaskList,
238    /// The new status
239    pub status: ItemStatus,
240    /// The new value (optional)
241    #[serde(default, skip_serializing_if = "ValueOption::is_none")]
242    pub value: ValueOption<'a>,
243}
244
245#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Default)]
246#[serde(rename_all = "snake_case")]
247pub enum OnNegativeDelta {
248    /// Skip the operation
249    Skip,
250    /// Reset item to value = 0
251    Reset,
252    /// Process the operation
253    #[default]
254    Process,
255    /// Respect the overflow
256    Overflow { floor: f64, ceil: f64 },
257}
258
259impl Eq for OnNegativeDelta {}
260
261#[derive(Debug, Clone, Serialize, Eq, PartialEq)]
262#[serde(deny_unknown_fields)]
263pub struct OnModifiedValueDelta<'a> {
264    /// For the selected OID mask list
265    pub oid: &'a OID,
266    #[serde(default)]
267    /// On item status error
268    pub on_error: OnModifiedError,
269    /// On negative delta
270    #[serde(default)]
271    pub on_negative: OnNegativeDelta,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
275#[serde(deny_unknown_fields)]
276pub struct OnModifiedValueDeltaOwned {
277    /// For the selected OID
278    pub oid: OID,
279    /// Calculate delta per given period in seconds (e.g. 1.0 for delta per second, 3600.0 for
280    /// delta per hour etc.)
281    pub period: Option<f64>,
282    #[serde(default)]
283    pub on_error: OnModifiedError,
284    #[serde(default)]
285    pub on_negative: OnNegativeDelta,
286}
287
288impl Eq for OnModifiedValueDeltaOwned {}
289
290#[derive(Debug, Clone, Serialize, Eq, PartialEq, Deserialize)]
291pub struct OnModifiedSetOwned {
292    /// For the selected OID mask list
293    pub oid: OIDMaskList,
294    /// The new status
295    pub status: ItemStatus,
296    /// The new value (optional)
297    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
298    pub value: ValueOptionOwned,
299}
300
301/// Submitted by services via the bus for local items
302#[derive(Debug, Clone, Serialize, PartialEq, Default)]
303#[serde(deny_unknown_fields)]
304pub struct RawStateEvent<'a> {
305    pub status: ItemStatus,
306    #[serde(default, skip_serializing_if = "ValueOption::is_none")]
307    pub value: ValueOption<'a>,
308    #[serde(default, skip_serializing_if = "Force::is_none")]
309    pub force: Force,
310    /// Override the time of the event
311    #[serde(default, skip_serializing_if = "Option::is_none")]
312    pub t: Option<f64>,
313    /// Compare the status with the current status (optional)
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub status_compare: Option<ItemStatus>,
316    /// Compare the value with the current value (optional)
317    #[serde(default, skip_serializing_if = "ValueOption::is_none")]
318    pub value_compare: ValueOption<'a>,
319    /// if comparison is used and unequal, set item status. In case if status is not specified,
320    /// `crate::ITEM_STATUS_ERROR` is used
321    #[serde(skip_serializing_if = "Option::is_none")]
322    pub status_else: Option<ItemStatus>,
323    /// if comparison is used and unequal, set item value (optional)
324    #[serde(default, skip_serializing_if = "ValueOption::is_none")]
325    pub value_else: ValueOption<'a>,
326    /// If the item is modified, OnModified rules are applied
327    #[serde(skip_serializing_if = "Option::is_none")]
328    pub on_modified: Option<OnModified<'a>>,
329}
330
331impl Eq for RawStateEvent<'_> {}
332
333impl<'a> RawStateEvent<'a> {
334    #[inline]
335    pub fn new(status: ItemStatus, value: &'a Value) -> Self {
336        Self {
337            status,
338            value: ValueOption::Value(value),
339            force: Force::None,
340            t: None,
341            on_modified: None,
342            status_compare: None,
343            value_compare: ValueOption::No,
344            status_else: None,
345            value_else: ValueOption::No,
346        }
347    }
348    #[inline]
349    pub fn new0(status: ItemStatus) -> Self {
350        Self {
351            status,
352            value: ValueOption::No,
353            force: Force::None,
354            t: None,
355            on_modified: None,
356            status_compare: None,
357            value_compare: ValueOption::No,
358            status_else: None,
359            value_else: ValueOption::No,
360        }
361    }
362    pub fn force(mut self) -> Self {
363        self.force = Force::Full;
364        self
365    }
366    pub fn force_update(mut self) -> Self {
367        self.force = Force::Update;
368        self
369    }
370    pub fn at(mut self, t: f64) -> Self {
371        self.t = Some(t);
372        self
373    }
374}
375
376/// Submitted by services via the bus for local items
377#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
378#[serde(deny_unknown_fields)]
379pub struct RawStateEventOwned {
380    pub status: ItemStatus,
381    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
382    pub value: ValueOptionOwned,
383    #[serde(default, skip_serializing_if = "Force::is_none")]
384    pub force: Force,
385    /// Override the time of the event
386    #[serde(default, skip_serializing_if = "Option::is_none")]
387    pub t: Option<f64>,
388    /// Compare the status with the current status (optional)
389    #[serde(skip_serializing_if = "Option::is_none")]
390    pub status_compare: Option<ItemStatus>,
391    /// Compare the value with the current value (optional)
392    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
393    pub value_compare: ValueOptionOwned,
394    /// if comparison is used and unequal, set item status. In case if status is not specified,
395    /// `crate::ITEM_STATUS_ERROR` is used
396    #[serde(skip_serializing_if = "Option::is_none")]
397    pub status_else: Option<ItemStatus>,
398    /// if comparison is used and unequal, set item value (optional)
399    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
400    pub value_else: ValueOptionOwned,
401    /// If the item is modified, OnModified rules are applied
402    #[serde(skip_serializing_if = "Option::is_none")]
403    pub on_modified: Option<OnModifiedOwned>,
404}
405
406impl Eq for RawStateEventOwned {}
407
408impl RawStateEventOwned {
409    #[inline]
410    pub fn new(status: ItemStatus, value: Value) -> Self {
411        Self {
412            status,
413            value: ValueOptionOwned::Value(value),
414            force: Force::None,
415            t: None,
416            status_compare: None,
417            value_compare: ValueOptionOwned::No,
418            status_else: None,
419            value_else: ValueOptionOwned::No,
420            on_modified: None,
421        }
422    }
423    #[inline]
424    pub fn new0(status: ItemStatus) -> Self {
425        Self {
426            status,
427            value: ValueOptionOwned::No,
428            force: Force::None,
429            t: None,
430            status_compare: None,
431            value_compare: ValueOptionOwned::No,
432            status_else: None,
433            value_else: ValueOptionOwned::No,
434            on_modified: None,
435        }
436    }
437    pub fn force(mut self) -> Self {
438        self.force = Force::Full;
439        self
440    }
441    pub fn force_update(mut self) -> Self {
442        self.force = Force::Update;
443        self
444    }
445    pub fn at(mut self, t: f64) -> Self {
446        self.t = Some(t);
447        self
448    }
449}
450
451#[derive(Serialize)]
452pub struct RawStateBulkEvent<'a> {
453    #[serde(alias = "i")]
454    pub oid: &'a OID,
455    #[serde(flatten)]
456    pub raw: RawStateEvent<'a>,
457}
458
459impl<'a> RawStateBulkEvent<'a> {
460    #[inline]
461    pub fn new(oid: &'a OID, rse: RawStateEvent<'a>) -> Self {
462        Self { oid, raw: rse }
463    }
464    #[inline]
465    pub fn split_into_oid_and_rse(self) -> (&'a OID, RawStateEvent<'a>) {
466        (self.oid, self.raw)
467    }
468}
469
470impl<'a> From<RawStateBulkEvent<'a>> for RawStateEvent<'a> {
471    #[inline]
472    fn from(r: RawStateBulkEvent<'a>) -> Self {
473        r.raw
474    }
475}
476
477#[derive(Serialize, Deserialize)]
478pub struct RawStateBulkEventOwned {
479    #[serde(alias = "i")]
480    pub oid: OID,
481    #[serde(flatten)]
482    pub raw: RawStateEventOwned,
483}
484
485impl RawStateBulkEventOwned {
486    #[inline]
487    pub fn new(oid: OID, rseo: RawStateEventOwned) -> Self {
488        Self { oid, raw: rseo }
489    }
490    #[inline]
491    pub fn split_into_oid_and_rseo(self) -> (OID, RawStateEventOwned) {
492        (self.oid, self.raw)
493    }
494}
495
496impl From<RawStateBulkEventOwned> for RawStateEventOwned {
497    #[inline]
498    fn from(r: RawStateBulkEventOwned) -> Self {
499        r.raw
500    }
501}
502
503/// Submitted by the core via the bus for procesed local events
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(deny_unknown_fields)]
506pub struct LocalStateEvent {
507    pub status: ItemStatus,
508    pub value: Value,
509    #[serde(skip_serializing_if = "Option::is_none")]
510    pub act: Option<usize>,
511    pub ieid: IEID,
512    pub t: f64,
513}
514
515/// Submitted by the core via the bus for processed remote events
516#[derive(Debug, Clone, Serialize, Deserialize)]
517#[serde(deny_unknown_fields)]
518pub struct RemoteStateEvent {
519    pub status: ItemStatus,
520    pub value: Value,
521    #[serde(skip_serializing_if = "Option::is_none")]
522    pub act: Option<usize>,
523    pub ieid: IEID,
524    pub t: f64,
525    pub node: String,
526    pub connected: bool,
527}
528
529impl RemoteStateEvent {
530    pub fn from_local_state_event(
531        event: LocalStateEvent,
532        system_name: &str,
533        connected: bool,
534    ) -> Self {
535        Self {
536            status: event.status,
537            value: event.value,
538            act: event.act,
539            ieid: event.ieid,
540            t: event.t,
541            node: system_name.to_owned(),
542            connected,
543        }
544    }
545}
546
547/// Stored by the core
548#[derive(Debug, Clone, Serialize, Deserialize)]
549#[serde(deny_unknown_fields)]
550pub struct DbState {
551    pub status: ItemStatus,
552    pub value: Value,
553    pub ieid: IEID,
554    pub t: f64,
555}
556
557/// Processed by the core and some additional services
558#[derive(Debug, Clone, Serialize, Deserialize)]
559#[serde(deny_unknown_fields)]
560pub struct ReplicationState {
561    pub status: ItemStatus,
562    pub value: Value,
563    pub act: Option<usize>,
564    pub ieid: IEID,
565    pub t: f64,
566}
567
568/// Submitted by replication services for remote items
569#[derive(Debug, Clone, Serialize, Deserialize)]
570pub struct ReplicationStateEvent {
571    pub status: ItemStatus,
572    pub value: Value,
573    #[serde(default, skip_serializing_if = "Option::is_none")]
574    pub act: Option<usize>,
575    pub ieid: IEID,
576    pub t: f64,
577    pub node: String,
578    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
579    pub force_accept: bool,
580}
581
582impl From<ReplicationStateEvent> for ReplicationState {
583    fn from(d: ReplicationStateEvent) -> Self {
584        Self {
585            status: d.status,
586            value: d.value,
587            act: d.act,
588            ieid: d.ieid,
589            t: d.t,
590        }
591    }
592}
593
594impl TryFrom<ReplicationInventoryItem> for ReplicationState {
595    type Error = Error;
596    fn try_from(item: ReplicationInventoryItem) -> Result<Self, Self::Error> {
597        let v: Option<Value> = item.value.into();
598        Ok(Self {
599            status: item.status.unwrap_or_default(),
600            value: v.unwrap_or_default(),
601            act: item.act,
602            ieid: item
603                .ieid
604                .ok_or_else(|| Error::invalid_data(format!("IEID missing ({})", item.oid)))?,
605            t: item
606                .t
607                .ok_or_else(|| Error::invalid_data(format!("Set time missing ({})", item.oid)))?,
608        })
609    }
610}
611
612#[allow(clippy::similar_names)]
613impl ReplicationStateEvent {
614    #[inline]
615    pub fn new(
616        status: ItemStatus,
617        value: Value,
618        act: Option<usize>,
619        ieid: IEID,
620        t: f64,
621        node: &str,
622    ) -> Self {
623        Self {
624            status,
625            value,
626            act,
627            ieid,
628            t,
629            node: node.to_owned(),
630            force_accept: false,
631        }
632    }
633}
634
635impl From<ReplicationStateEvent> for RemoteStateEvent {
636    fn from(d: ReplicationStateEvent) -> Self {
637        Self {
638            status: d.status,
639            value: d.value,
640            act: d.act,
641            ieid: d.ieid,
642            t: d.t,
643            node: d.node,
644            connected: true,
645        }
646    }
647}
648
649/// Submitted by replication services to RPL/INVENTORY/<name> (as a list of)
650#[derive(Debug, Serialize, Deserialize, Clone)]
651#[serde(deny_unknown_fields)]
652pub struct ReplicationInventoryItem {
653    pub oid: OID,
654    #[serde(skip_serializing_if = "Option::is_none")]
655    pub status: Option<ItemStatus>,
656    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
657    pub value: ValueOptionOwned,
658    #[serde(skip_serializing_if = "Option::is_none")]
659    pub act: Option<usize>,
660    pub ieid: Option<IEID>,
661    pub t: Option<f64>,
662    pub meta: Option<Value>,
663    pub enabled: bool,
664}
665
666impl Hash for ReplicationInventoryItem {
667    fn hash<H: Hasher>(&self, state: &mut H) {
668        self.oid.hash(state);
669    }
670}
671
672impl Eq for ReplicationInventoryItem {}
673
674impl PartialEq for ReplicationInventoryItem {
675    fn eq(&self, other: &Self) -> bool {
676        self.oid == other.oid
677    }
678}
679
680/// full state with info, returned by item.state RPC functions, used in HMI and other apps
681#[derive(Debug, Serialize, Clone)]
682#[serde(deny_unknown_fields)]
683pub struct FullItemStateAndInfo<'a> {
684    #[serde(flatten)]
685    pub si: ItemStateAndInfo<'a>,
686    // full
687    #[serde(skip_serializing_if = "Option::is_none")]
688    pub meta: Option<&'a Value>,
689    pub enabled: bool,
690}
691
692/// short state with info, returned by item.state RPC functions, used in HMI and other apps
693#[derive(Debug, Serialize, Clone)]
694#[serde(deny_unknown_fields)]
695pub struct ItemStateAndInfo<'a> {
696    pub oid: &'a OID,
697    #[serde(skip_serializing_if = "Option::is_none")]
698    pub status: Option<ItemStatus>,
699    // the value is always owned as states are usually hold under mutexes
700    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
701    pub value: ValueOptionOwned,
702    #[serde(skip_serializing_if = "Option::is_none")]
703    pub act: Option<usize>,
704    pub ieid: Option<IEID>,
705    pub t: Option<f64>,
706    pub node: &'a str,
707    pub connected: bool,
708}
709
710/// full state with info, returned by item.state RPC functions, used in HMI and other apps
711#[derive(Debug, Serialize, Deserialize, Clone)]
712#[serde(deny_unknown_fields)]
713pub struct FullItemStateAndInfoOwned {
714    #[serde(flatten)]
715    pub si: ItemStateAndInfoOwned,
716    // full
717    #[serde(skip_serializing_if = "Option::is_none")]
718    pub meta: Option<Value>,
719    pub enabled: bool,
720}
721
722/// short state with info, returned by item.state RPC functions, used in HMI and other apps
723#[derive(Debug, Serialize, Deserialize, Clone)]
724#[serde(deny_unknown_fields)]
725pub struct ItemStateAndInfoOwned {
726    pub oid: OID,
727    #[serde(skip_serializing_if = "Option::is_none")]
728    pub status: Option<ItemStatus>,
729    #[serde(default, skip_serializing_if = "ValueOptionOwned::is_none")]
730    pub value: ValueOptionOwned,
731    #[serde(skip_serializing_if = "Option::is_none")]
732    pub act: Option<usize>,
733    pub ieid: Option<IEID>,
734    pub t: Option<f64>,
735    pub node: String,
736    pub connected: bool,
737}
738
739impl From<FullItemStateAndInfoOwned> for ReplicationInventoryItem {
740    fn from(s: FullItemStateAndInfoOwned) -> ReplicationInventoryItem {
741        ReplicationInventoryItem {
742            oid: s.si.oid,
743            status: s.si.status,
744            value: s.si.value,
745            act: s.si.act,
746            ieid: s.si.ieid,
747            t: s.si.t,
748            meta: s.meta,
749            enabled: s.enabled,
750        }
751    }
752}
753
754pub struct EventBuffer<T> {
755    data: parking_lot::Mutex<Vec<T>>,
756    size: usize,
757}
758
759#[allow(dead_code)]
760impl<T> EventBuffer<T> {
761    #[inline]
762    pub fn bounded(size: usize) -> Self {
763        Self {
764            data: <_>::default(),
765            size,
766        }
767    }
768    #[inline]
769    pub fn unbounded() -> Self {
770        Self {
771            data: <_>::default(),
772            size: 0,
773        }
774    }
775    pub fn push(&self, value: T) -> EResult<()> {
776        let mut buf = self.data.lock();
777        if self.size > 0 && buf.len() >= self.size {
778            return Err(Error::failed("buffer overflow, event dropped"));
779        }
780        buf.push(value);
781        Ok(())
782    }
783    pub fn len(&self) -> usize {
784        self.data.lock().len()
785    }
786    pub fn is_empty(&self) -> bool {
787        self.data.lock().is_empty()
788    }
789    pub fn take(&self) -> Vec<T> {
790        std::mem::take(&mut *self.data.lock())
791    }
792}
793
794#[derive(Serialize, Deserialize, Clone, Debug)]
795#[serde(untagged)]
796pub enum ReplicationStateEventExtended {
797    Inventory(ReplicationNodeInventoryItem),
798    Basic(ReplicationStateEvent),
799}
800
801impl ReplicationStateEventExtended {
802    pub fn node(&self) -> &str {
803        match self {
804            ReplicationStateEventExtended::Basic(v) => &v.node,
805            ReplicationStateEventExtended::Inventory(v) => &v.node,
806        }
807    }
808}
809
810#[derive(Serialize, Deserialize, Clone, Debug)]
811pub struct ReplicationNodeInventoryItem {
812    pub node: String,
813    #[serde(flatten)]
814    pub item: ReplicationInventoryItem,
815}