dcs2_raft/
state.rs

1use log::{debug, error, warn};
2use serde::{Deserialize, Serialize};
3
4use dcs::nodes::SystemNodeId;
5use dcs::rules::measurements::{Measurement, NodeMeasurementsVec, ClusterType, SystemState};
6use dcs::properties::CLUSTER_NODE_COUNT;
7
8use crate::server::{Merge, NoOp};
9
10#[derive(Clone, Default, Eq, PartialEq, Debug, Serialize, Deserialize)]
11pub struct RaftState {
12    measurements: dcs::heapless::LinearMap<SystemNodeId, Measurement, CLUSTER_NODE_COUNT>,
13}
14
15impl NoOp for RaftState {
16    fn noop() -> Self {
17        RaftState {
18            measurements: Default::default(),
19        }
20    }
21}
22
23impl From<(SystemNodeId, Measurement)> for RaftState {
24    fn from((id, measurement): (SystemNodeId, Measurement)) -> Self {
25        let mut state = RaftState::default();
26        state.measurements.insert(id, measurement).unwrap();
27        state
28    }
29}
30
31impl From<RaftState> for SystemState {
32    fn from(raft_state: RaftState) -> Self {
33        let mut system_state = SystemState::default();
34        for (id, measurement) in raft_state.measurements.into_iter() {
35            system_state.update(*id, *measurement);
36        }
37        system_state
38    }
39}
40
41impl Merge for RaftState {
42    fn merge(self, rhs: Self) -> Self {
43        let mut merged = self.clone();
44        for (id, measurement) in rhs.measurements.iter() {
45            if merged.measurements.insert(*id, *measurement).is_err() {
46                error!("Cannot merge states, exceeded max capacity. Returning partial state.");
47                return merged;
48            }
49        }
50
51        merged
52    }
53}
54
55#[cfg(test)]
56mod raft_state_tests {
57    use dcs::nodes::SystemNodeId;
58    use std::fmt::Debug;
59
60    use dcs::rules::measurements::{Measurement, ClusterType};
61
62    use crate::server::Merge;
63    use crate::state::RaftState;
64
65    pub fn measurement(value: i32) -> Measurement {
66        Measurement::new(ClusterType::HUMIDITY, value)
67    }
68
69    #[test]
70    fn given_measurement_when_merging_with_empty_then_value_doesnt_change() {
71        let (id, measurement) = (SystemNodeId::from(0), measurement(1));
72        let base_state = RaftState::from((id, measurement));
73        let empty_state = RaftState::default();
74        assert_eq!(base_state.clone(), base_state.merge(empty_state));
75    }
76
77    #[test]
78    fn given_two_measurements_from_two_sensors_when_merging_the_results_dont_collision() {
79        let (id1, measurement1) = (SystemNodeId::from(1), measurement(1));
80        let (id2, measurement2) = (SystemNodeId::from(2), measurement(2));
81
82        let state1 = RaftState::from((id1, measurement1));
83        let state2 = RaftState::from((id2, measurement2));
84
85        let state = state1.merge(state2);
86        assert_eq!(state.measurements.len(), 2);
87        assert_eq!(state.measurements.get(&id1), Some(&measurement1));
88        assert_eq!(state.measurements.get(&id2), Some(&measurement2));
89    }
90
91    #[test]
92    fn given_two_measurements_from_same_sensor_when_merging_then_results_are_merged() {
93        let id = SystemNodeId::from(1);
94        let (measurement1, measurement2) = (measurement(1), measurement(2));
95
96        let state1 = RaftState::from((id, measurement1));
97        let state2 = RaftState::from((id, measurement2));
98
99        let state = state1.merge(state2);
100        assert_eq!(state.measurements.len(), 1);
101        assert_eq!(state.measurements.get(&id), Some(&measurement2));
102    }
103}