1use log::{debug, error, warn};
2use serde::{Deserialize, Serialize};
3
4use dcs::nodes::SystemNodeId;
5use dcs::rules::measurements::{Measurement, NodeMeasurementsVec, ClusterType, SystemState};
6use dcs::properties::CLUSTER_NODE_COUNT;
7
8use crate::server::{Merge, NoOp};
9
10#[derive(Clone, Default, Eq, PartialEq, Debug, Serialize, Deserialize)]
11pub struct RaftState {
12 measurements: dcs::heapless::LinearMap<SystemNodeId, Measurement, CLUSTER_NODE_COUNT>,
13}
14
15impl NoOp for RaftState {
16 fn noop() -> Self {
17 RaftState {
18 measurements: Default::default(),
19 }
20 }
21}
22
23impl From<(SystemNodeId, Measurement)> for RaftState {
24 fn from((id, measurement): (SystemNodeId, Measurement)) -> Self {
25 let mut state = RaftState::default();
26 state.measurements.insert(id, measurement).unwrap();
27 state
28 }
29}
30
31impl From<RaftState> for SystemState {
32 fn from(raft_state: RaftState) -> Self {
33 let mut system_state = SystemState::default();
34 for (id, measurement) in raft_state.measurements.into_iter() {
35 system_state.update(*id, *measurement);
36 }
37 system_state
38 }
39}
40
41impl Merge for RaftState {
42 fn merge(self, rhs: Self) -> Self {
43 let mut merged = self.clone();
44 for (id, measurement) in rhs.measurements.iter() {
45 if merged.measurements.insert(*id, *measurement).is_err() {
46 error!("Cannot merge states, exceeded max capacity. Returning partial state.");
47 return merged;
48 }
49 }
50
51 merged
52 }
53}
54
55#[cfg(test)]
56mod raft_state_tests {
57 use dcs::nodes::SystemNodeId;
58 use std::fmt::Debug;
59
60 use dcs::rules::measurements::{Measurement, ClusterType};
61
62 use crate::server::Merge;
63 use crate::state::RaftState;
64
65 pub fn measurement(value: i32) -> Measurement {
66 Measurement::new(ClusterType::HUMIDITY, value)
67 }
68
69 #[test]
70 fn given_measurement_when_merging_with_empty_then_value_doesnt_change() {
71 let (id, measurement) = (SystemNodeId::from(0), measurement(1));
72 let base_state = RaftState::from((id, measurement));
73 let empty_state = RaftState::default();
74 assert_eq!(base_state.clone(), base_state.merge(empty_state));
75 }
76
77 #[test]
78 fn given_two_measurements_from_two_sensors_when_merging_the_results_dont_collision() {
79 let (id1, measurement1) = (SystemNodeId::from(1), measurement(1));
80 let (id2, measurement2) = (SystemNodeId::from(2), measurement(2));
81
82 let state1 = RaftState::from((id1, measurement1));
83 let state2 = RaftState::from((id2, measurement2));
84
85 let state = state1.merge(state2);
86 assert_eq!(state.measurements.len(), 2);
87 assert_eq!(state.measurements.get(&id1), Some(&measurement1));
88 assert_eq!(state.measurements.get(&id2), Some(&measurement2));
89 }
90
91 #[test]
92 fn given_two_measurements_from_same_sensor_when_merging_then_results_are_merged() {
93 let id = SystemNodeId::from(1);
94 let (measurement1, measurement2) = (measurement(1), measurement(2));
95
96 let state1 = RaftState::from((id, measurement1));
97 let state2 = RaftState::from((id, measurement2));
98
99 let state = state1.merge(state2);
100 assert_eq!(state.measurements.len(), 1);
101 assert_eq!(state.measurements.get(&id), Some(&measurement2));
102 }
103}