dcs2-raft 0.1.0

An extensible distributed control system framework made in rust with no-std support.
Documentation
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};

use dcs::nodes::SystemNodeId;
use dcs::rules::measurements::{Measurement, NodeMeasurementsVec, ClusterType, SystemState};
use dcs::properties::CLUSTER_NODE_COUNT;

use crate::server::{Merge, NoOp};

#[derive(Clone, Default, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct RaftState {
    measurements: dcs::heapless::LinearMap<SystemNodeId, Measurement, CLUSTER_NODE_COUNT>,
}

impl NoOp for RaftState {
    fn noop() -> Self {
        RaftState {
            measurements: Default::default(),
        }
    }
}

impl From<(SystemNodeId, Measurement)> for RaftState {
    fn from((id, measurement): (SystemNodeId, Measurement)) -> Self {
        let mut state = RaftState::default();
        state.measurements.insert(id, measurement).unwrap();
        state
    }
}

impl From<RaftState> for SystemState {
    fn from(raft_state: RaftState) -> Self {
        let mut system_state = SystemState::default();
        for (id, measurement) in raft_state.measurements.into_iter() {
            system_state.update(*id, *measurement);
        }
        system_state
    }
}

impl Merge for RaftState {
    fn merge(self, rhs: Self) -> Self {
        let mut merged = self.clone();
        for (id, measurement) in rhs.measurements.iter() {
            if merged.measurements.insert(*id, *measurement).is_err() {
                error!("Cannot merge states, exceeded max capacity. Returning partial state.");
                return merged;
            }
        }

        merged
    }
}

#[cfg(test)]
mod raft_state_tests {
    use dcs::nodes::SystemNodeId;
    use std::fmt::Debug;

    use dcs::rules::measurements::{Measurement, ClusterType};

    use crate::server::Merge;
    use crate::state::RaftState;

    pub fn measurement(value: i32) -> Measurement {
        Measurement::new(ClusterType::HUMIDITY, value)
    }

    #[test]
    fn given_measurement_when_merging_with_empty_then_value_doesnt_change() {
        let (id, measurement) = (SystemNodeId::from(0), measurement(1));
        let base_state = RaftState::from((id, measurement));
        let empty_state = RaftState::default();
        assert_eq!(base_state.clone(), base_state.merge(empty_state));
    }

    #[test]
    fn given_two_measurements_from_two_sensors_when_merging_the_results_dont_collision() {
        let (id1, measurement1) = (SystemNodeId::from(1), measurement(1));
        let (id2, measurement2) = (SystemNodeId::from(2), measurement(2));

        let state1 = RaftState::from((id1, measurement1));
        let state2 = RaftState::from((id2, measurement2));

        let state = state1.merge(state2);
        assert_eq!(state.measurements.len(), 2);
        assert_eq!(state.measurements.get(&id1), Some(&measurement1));
        assert_eq!(state.measurements.get(&id2), Some(&measurement2));
    }

    #[test]
    fn given_two_measurements_from_same_sensor_when_merging_then_results_are_merged() {
        let id = SystemNodeId::from(1);
        let (measurement1, measurement2) = (measurement(1), measurement(2));

        let state1 = RaftState::from((id, measurement1));
        let state2 = RaftState::from((id, measurement2));

        let state = state1.merge(state2);
        assert_eq!(state.measurements.len(), 1);
        assert_eq!(state.measurements.get(&id), Some(&measurement2));
    }
}