dcs2 0.1.0

An extensible distributed control system framework made in rust with no-std support.
Documentation
use core::fmt::{Debug, Display, Formatter};

use log::{error, trace};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};

use crate::nodes::{SystemClusterId, SystemNodeId};
use crate::properties::{CLUSTER_NODE_COUNT, EncodedMetadata, SystemBufferVec};
use crate::rules::measurements::{Measurement, ClusterType, SystemState};
use crate::rules::strategy::Rule;
use crate::CodificationError;
use crate::rules::manager::SystemStatus;

pub const MEMBERSHIP_MESSAGE_OPCODE: u8 = 0;
pub const ALERT_MESSAGE_OPCODE: u8 = 2;
pub const CONTROL_MESSAGE_OPCODE: u8 = 3;
pub const MEASUREMENT_MESSAGE_OPCODE: u8 = 4;
pub const COORDINATION_MESSAGE_OPCODE: u8 = 5;
pub const REPORTER_MESSAGE_OPCODE: u8 = 6;

/// The most generic package the sender and receiver are of type [`SystemNodeId`] and the actual
/// body if an array of bytes
pub type GenericPackage = Package<SystemNodeId, SystemBufferVec>;

/// Common messages sent by the nodes.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub enum Messages<CoordMessage> {
    /// Coordination messages are related to the leader election and must be provided by the end user.
    Coordination(CoordMessage),

    /// Actuator messages are sent to controll the actuators, usually by the leader.
    Actuator(AlertMessage),

    /// System messages provide features like starting, exiting, healthcheck and rule changes.
    System(ControlMessage),

    /// This is the message sent to the reporter.
    Reporter(EventMessage),
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub struct EventMessage {
    pub node_id: SystemNodeId,
    pub cluster_id: SystemClusterId,
    pub typed: ClusterType,
    pub active_rule: Rule,
    pub metric: MetricData,
    pub extra: Option<ExtraData>,
}

impl IdentificableMessage for EventMessage {
    fn id() -> u8 {
        REPORTER_MESSAGE_OPCODE
    }
}

impl EventMessage {
    pub fn update_extra(&mut self, timestamp: u128, reply_to: u16) {
        self.extra = Some(ExtraData {
            timestamp, reply_to
        });
    }
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub struct ExtraData {
    pub reply_to: u16,
    pub timestamp: u128,
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub enum MetricData {
    Register(SubscriptionData),
    Measurement(ReportedMeasurement),
    Alert(ReportedAlert),
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub struct SubscriptionData {
    pub loop_time: u64,
}

impl SubscriptionData {
    pub fn new(loop_time: u64) -> Self {
        SubscriptionData { loop_time }
    }
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub struct ReportedMeasurement {
    pub is_leader: bool,
    pub measurement: Measurement,
}

impl ReportedMeasurement {
    pub fn new(is_leader: bool, measurement: Measurement) -> Self {
        ReportedMeasurement { is_leader, measurement }
    }
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub struct ReportedAlert {
    pub status: SystemStatus
}

impl ReportedAlert {
    pub fn new(status: SystemStatus) -> Self {
        ReportedAlert { status }
    }
}

/// Unique Opcode related to a message sent by the system.
pub trait IdentificableMessage: Clone + Debug + Serialize + DeserializeOwned {
    fn id() -> u8;
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Copy, Clone)]
pub enum NoCoordinatedMessage {}

impl IdentificableMessage for NoCoordinatedMessage {
    fn id() -> u8 {
        COORDINATION_MESSAGE_OPCODE
    }
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Copy, Clone)]
pub enum AlertMessage {
    DecreaseTemperature,
    IncreaseTemperature,
    DecreaseHumidity,
    IncreaseHumidity,
}

impl IdentificableMessage for AlertMessage {
    fn id() -> u8 {
        ALERT_MESSAGE_OPCODE
    }
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub enum NodeChange {
    AddNode,
    DeleteNode,
}

pub type UpdateClusterVec = heapless::Vec<SystemNodeId, CLUSTER_NODE_COUNT>;

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)]
pub enum ControlMessage {
    Start,
    RuleChange(Rule),
    ClusterChange(UpdateClusterVec),
    GetMetadata(EncodedMetadata),
    Metadata(EncodedMetadata),
    Healthcheck,
    Exit,
    Fail,
    ACK,

    // Only usable for testing purposes.
    Debug,
    DebugData(bool, SystemState, Option<Rule>),
}

impl IdentificableMessage for ControlMessage {
    fn id() -> u8 {
        CONTROL_MESSAGE_OPCODE
    }
}

/// Builder implementation of the [`PackageBuilder`] for the [`GenericPackage`].
#[derive(Default)]
pub struct GenericPackageBuilder {
    to: Option<SystemNodeId>,
    from: Option<SystemNodeId>,
    body: Option<SystemBufferVec>,
}

impl PackageBuilder for GenericPackageBuilder {
    type NodeId = SystemNodeId;
    type Message = SystemBufferVec;

    fn clean_copy(&self) -> Self {
        Self::default()
    }

    fn to(mut self, id: Self::NodeId) -> Self {
        self.to = Some(id);
        self
    }

    fn from(mut self, id: Self::NodeId) -> Self {
        self.from = Some(id);
        self
    }

    fn with_message(mut self, msg: Self::Message) -> Self {
        self.body = Some(msg);
        self
    }

    fn respond_to(self, package: Package<Self::NodeId, Self::Message>) -> Self {
        self.to(package.header.from)
    }

    fn build(self) -> Result<Package<Self::NodeId, Self::Message>, ()> {
        Ok(Package {
            header: Header {
                from: self.from.ok_or(())?,
                to: self.to.ok_or(())?,
            },
            body: self.body.ok_or(())?,
        })
    }
}

/// Interface to codify and decodify structs.
pub trait SimpleCodifier: Default {
    type Data: Clone + Debug;
    /// Given a slice of bytes it tries to decode an struct of type Data.
    fn decode(&self, input: &[u8]) -> Result<Self::Data, CodificationError>;
    /// Given an struct of type Data and slice of bytes it tries to encode the first one and write
    /// the resulting bytes in the provided buffer. It returns the amount of bytes written.
    fn encode(&self, data: &Self::Data, buffer: &mut [u8]) -> Result<usize, CodificationError>;
}

/// Builds a [`Package`] intended to be handled by the [`crate::communication::service::CommunicationService`]
pub trait PackageBuilder {
    type NodeId: NodeId;
    type Message: Body;

    fn clean_copy(&self) -> Self;
    fn to(self, id: Self::NodeId) -> Self;
    fn from(self, id: Self::NodeId) -> Self;
    fn with_message(self, msg: Self::Message) -> Self;
    fn respond_to(self, package: Package<Self::NodeId, Self::Message>) -> Self;
    #[allow(clippy::result_unit_err)]
    fn build(self) -> Result<Package<Self::NodeId, Self::Message>, ()>;
}

/// Identifies the cluster's nodes
pub trait NodeId: Display + Default + Clone + Copy + Serialize + DeserializeOwned {}

impl<T> NodeId for T where T: Display + Default + Copy + Serialize + DeserializeOwned {}

pub trait Body: Clone + Debug {}

impl<T> Body for T where T: Clone + Debug {}

#[derive(Clone, Debug, Serialize)]
pub struct Package<Id: NodeId, B: Body> {
    pub header: Header<Id>,
    pub body: B,
}

impl<Id: NodeId, B: Body> Display for Package<Id, B> {
    fn fmt(&self, formatter: &mut Formatter<'_>) -> core::fmt::Result {
        write!(
            formatter,
            "[ From: {}, To: {}, Message: {:?}]",
            self.header.from, self.header.to, self.body
        )
    }
}

impl<Id: NodeId, B: Body> Package<Id, B> {
    pub fn get_message(&self) -> &B {
        &self.body
    }
    pub fn get_mut_message(&mut self) -> &mut B {
        &mut self.body
    }
    pub fn get_sender(&self) -> Id {
        self.header.from
    }
    pub fn get_receiver(&self) -> Id {
        self.header.to
    }
}

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Header<Id> {
    pub from: Id,
    pub to: Id,
}

/// Generic implementation of a [`SimpleCodifier`]. It uses default message codifiers for membership
/// alert, and control messages. It needs to be provided a coordination message.
pub struct GeneralMessageCodifier<
    CoordMessage: IdentificableMessage,
    CoordinationCod: SimpleCodifier<Data=CoordMessage>,
    ActuatorCod: SimpleCodifier<Data=AlertMessage>,
    SystemCod: SimpleCodifier<Data=ControlMessage>,
    ReporterCod: SimpleCodifier<Data=EventMessage>,
> {
    coordination: CoordinationCod,
    actuator: ActuatorCod,
    system: SystemCod,
    reporter: ReporterCod,
}

impl<
    CoordMessage: IdentificableMessage,
    CoordinationCod: SimpleCodifier<Data=CoordMessage>,
    ActuatorCod: SimpleCodifier<Data=AlertMessage>,
    SystemCod: SimpleCodifier<Data=ControlMessage>,
    ReporterCod: SimpleCodifier<Data=EventMessage>,
> Default
for GeneralMessageCodifier<CoordMessage, CoordinationCod, ActuatorCod, SystemCod, ReporterCod> {
    fn default() -> Self {
        Self {
            coordination: CoordinationCod::default(),
            actuator: ActuatorCod::default(),
            system: SystemCod::default(),
            reporter: ReporterCod::default(),
        }
    }
}

impl<
    CoordMessage: IdentificableMessage,
    CoordinationCod: SimpleCodifier<Data=CoordMessage>,
    ActuatorCod: SimpleCodifier<Data=AlertMessage>,
    SystemCod: SimpleCodifier<Data=ControlMessage>,
    ReporterCod: SimpleCodifier<Data=EventMessage>,
> SimpleCodifier for GeneralMessageCodifier<
    CoordMessage,
    CoordinationCod,
    ActuatorCod,
    SystemCod,
    ReporterCod,
> {
    type Data = Messages<CoordMessage>;

    fn decode(&self, input: &[u8]) -> Result<Self::Data, CodificationError> {
        trace!("About to decode bytes: {:?}", input);
        self.coordination.decode(input).map(Messages::Coordination)
            .or_else(|_| self.actuator.decode(input).map(Messages::Actuator))
            .or_else(|_| self.system.decode(input).map(Messages::System))
            .or_else(|_| self.reporter.decode(input).map(Messages::Reporter))
            .map(|msg| {
                trace!("System message decoded: {:?}", msg);
                msg
            })
            .map_err(|_| {
                error!("Couldn't find any valid decoding for the message.");
                CodificationError::DeserializationError
            })
    }

    fn encode(&self, msg: &Self::Data, buffer: &mut [u8]) -> Result<usize, CodificationError> {
        match msg {
            Messages::Coordination(msg) => self.coordination.encode(msg, buffer),
            Messages::Actuator(msg) => self.actuator.encode(msg, buffer),
            Messages::System(msg) => self.system.encode(msg, buffer),
            Messages::Reporter(msg) => self.reporter.encode(msg, buffer),
        }
    }
}