dcs2 0.1.0

An extensible distributed control system framework made in rust with no-std support.
Documentation
use log::*;

use crate::communication::connection::{
    Address, Connection, ConnectionFactory, InMsgQueue, OutMsgQueue, Readable, Writable, Write,
};
use crate::communication::messages::{GenericPackage, GenericPackageBuilder, PackageBuilder};
use crate::communication::router::Router;
use crate::communication::ConnectionError;
use crate::nodes::SystemNodeId;
use crate::properties::{init_buffer, SystemBufferVec, MAX_QUEUED_MESSAGES};

/// A CommunicationService is the main interface to send and receive [packages](crate::communication::messages::Package).
/// It's the conjunction of an [`OutMsgQueue`] and an [`InMsgQueue`].
pub trait CommunicationService<Pkg: Writable + Readable>:
    OutMsgQueue<Pkg> + InMsgQueue<Pkg>
{
}

struct QueuedPackage {
    pub receiver_id: SystemNodeId,
    pub package: GenericPackage,
    pub ready: bool,
}

/// A generic implementation of a [`CommunicationService`]. It sends and receives [`GenericPackage`].
pub struct GenericCommunicationService<
    Conn: Connection,
    Addr: Address,
    F: ConnectionFactory<Address = Addr, Connection = Conn>,
    R: Router<Address = Addr>,
> {
    pub id: SystemNodeId,
    pub router: R,
    pub connection_factory: F,
    pub package_builder: GenericPackageBuilder,
    package_queue: heapless::Vec<QueuedPackage, MAX_QUEUED_MESSAGES>,
}

impl<
        Conn: Connection,
        Addr: Address,
        F: ConnectionFactory<Address = Addr, Connection = Conn>,
        R: Router<Address = Addr>,
    > GenericCommunicationService<Conn, Addr, F, R>
{
    pub fn new(id: SystemNodeId, router: R, connection_factory: F) -> Self {
        Self {
            id,
            router,
            connection_factory,
            package_builder: GenericPackageBuilder::default(),
            package_queue: heapless::Vec::default(),
        }
    }

    fn unqueue_available_package(&mut self) -> Option<GenericPackage> {
        let mut package_ready_idx = None;
        for (idx, package) in self.package_queue.iter().enumerate() {
            if package.ready {
                package_ready_idx = Some(idx);
            }
        }

        package_ready_idx.map(|idx| self.package_queue.remove(idx).package)
    }

    pub fn update(&mut self, new_id: SystemNodeId, previous_id: SystemNodeId, address: Addr) {
        if self.router.update(new_id, address).is_err() {
            return;
        }

        for package in self.package_queue.iter_mut() {
            if package.receiver_id == previous_id {
                package.receiver_id = new_id;
                package.ready = true
            }
        }
    }

    pub fn unpeer(&mut self, previous_address: Addr) -> Option<SystemNodeId> {
        self.router.drop(previous_address)
    }

    pub fn handshake(&mut self, address: Addr, message: SystemBufferVec) {
        match self.connection_factory.connect(&address) {
            Ok(connection) => match Write::write(connection, message.as_slice()) {
                Ok(bytes) => {
                    trace!("Wrote {} bytes to {}.", bytes, address);
                    self.connection_factory.clear();
                }
                Err(err) => {
                    warn!(
                        "Got error {:?} while trying to write message to {}.",
                        err, address
                    )
                }
            },
            Err(ConnectionError::NotAvailable) => {
                warn!("Connection to {} not available. Disconnecting.", address);
                self.connection_factory.clear();
            }
            Err(_) => {
                error!("Couldn't connect to {}. Dropping output package.", address);
            }
        }
    }

    pub fn queue(&mut self, peer_id: SystemNodeId, package: GenericPackage) {
        if self
            .package_queue
            .push(QueuedPackage {
                receiver_id: peer_id,
                package: package.clone(),
                ready: false,
            })
            .is_err()
        {
            error!("Couldn't queue package {:?} with peer {}", package, peer_id);
        }
    }
}

impl<
        Conn: Connection,
        Addr: Address,
        F: ConnectionFactory<Address = Addr, Connection = Conn>,
        R: Router<Address = Addr>,
    > OutMsgQueue<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
    F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
    fn push(&mut self, mut pkg: GenericPackage) {
        if let Some(destiny) = self.router.route(&pkg.get_receiver()) {
            match self.connection_factory.connect(destiny) {
                Ok(connection) => match Write::write(connection, pkg.get_mut_message()) {
                    Ok(bytes) => {
                        trace!("Wrote {} bytes to {}.", bytes, destiny);
                        self.connection_factory.clear();
                        self.router.keep(pkg.get_receiver(), true);
                        return;
                    }
                    Err(err) => {
                        warn!("Got error {:?} while trying to write message to {}.", err, destiny)
                    }
                },
                Err(ConnectionError::NotAvailable) => {
                    warn!("Connection to {} not available. Disconnecting.", destiny);
                    self.connection_factory.clear();
                }
                Err(_) => {
                    error!("Couldn't connect to {}. Dropping output package.", destiny);
                }
            }
            self.router.keep(pkg.get_receiver(), false);
        } else {
            error!("Couldn't find receiver with ID #{} to route package. Dropping output package.", pkg.get_receiver());
        }
    }
}

impl<
        Conn: Connection,
        Addr: Address,
        F: ConnectionFactory<Address = Addr, Connection = Conn>,
        R: Router<Address = Addr>,
    > InMsgQueue<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
    F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
    fn pop(&mut self) -> Option<GenericPackage> {
        if let Some(package) = self.unqueue_available_package() {
            return Some(package);
        }

        let (conn, from) = self.connection_factory.listen().ok()?;
        let mut buffer = init_buffer();
        let bytes = conn.read(&mut buffer).ok()?;
        let buffer = SystemBufferVec::from_slice(&buffer[..bytes]).ok()?;

        let from = self
            .router
            .inverse(&from)
            .copied()
            .unwrap_or_else(|| self.router.register(from).unwrap());

        self.router.clean();
        self.package_builder
            .clean_copy()
            .to(self.id)
            .from(from)
            .with_message(buffer)
            .build()
            .ok()
    }
}

impl<
        Conn: Connection,
        Addr: Address,
        F: ConnectionFactory<Address = Addr, Connection = Conn>,
        R: Router<Address = Addr>,
    > CommunicationService<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
    F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
}