nodo 0.18.5

A realtime framework for robotics
Documentation
// Copyright 2023 David Weikersdorfer

use crate::{
    channels::{RxConnectable, TxConnectError, TxConnectable},
    prelude::OverflowPolicy,
};

/// Connects a transmitter to a receiver
///
/// Receivers can be connected to at most one transmitter. A transmitter can be connected to
/// multiple receivers. There is currently a technical limit of 64 connections per transmitter.
///
/// If a transmitter is connected to no receiver, published messages are dropped immediately.
/// If a transmitter is connected to exactly one receiver, published messages are "moved" without a
/// copy. Note that in this case [Clone] still needs to be implemeted for message data types even
/// though messages are not cloned.
/// If a transmitter is connected to more than one receiver, published messages are cloned such
/// that each receiver gets its own copy. It is recommended that messages are either cheap to clone
/// or use non-locking data sharing mechanisms like [std::sync::Arc]. Note that if locking data
/// sharing mechanisms like [std::borrow::Cow] or [std::sync::RwLock] are used, codelets might block
/// each other when they access message data.
///
/// The runtime will usually print warnings if transmitters or receivers are not connected.
///
/// Certain queue policy combinations are forbidden:
/// * A transmitter with [OverflowPolicy::Resize] can not be connected to a receiver with
///   [OverflowPolicy::Reject] as this can lead to unbounded queue growth. (This might be converted
///   to a warning in the future.)
///
/// `connect` is usually called with `&mut` references for receiver and transmitter.
/// [Option] is supported and will connect channels only if both receiver and transmitter are not
/// [None], otherwise no connection is formed.
pub fn connect<T, R>(tx: T, rx: R) -> Result<(), TxConnectError>
where
    (T, R): Connect,
{
    (tx, rx).connect()
}

/// Trait used to implement the `connect` function and provide "function overloading" for different
/// varients of TX and RX channels.
pub trait Connect {
    fn connect(self) -> Result<(), TxConnectError>;
}

impl<T, R, V> Connect for (T, R)
where
    T: TxConnectable<Message = V>,
    R: RxConnectable<Message = V>,
{
    fn connect(self) -> Result<(), TxConnectError> {
        connect_imp(self.0, self.1)
    }
}

impl<T, R, V> Connect for (Option<T>, R)
where
    T: TxConnectable<Message = V>,
    R: RxConnectable<Message = V>,
{
    fn connect(self) -> Result<(), TxConnectError> {
        if let Some(tx) = self.0 {
            connect_imp(tx, self.1)
        } else {
            Ok(())
        }
    }
}

impl<T, R, V> Connect for (T, Option<R>)
where
    T: TxConnectable<Message = V>,
    R: RxConnectable<Message = V>,
{
    fn connect(self) -> Result<(), TxConnectError> {
        if let Some(rx) = self.1 {
            connect_imp(self.0, rx)
        } else {
            Ok(())
        }
    }
}

impl<T, R, V> Connect for (Option<T>, Option<R>)
where
    T: TxConnectable<Message = V>,
    R: RxConnectable<Message = V>,
{
    fn connect(self) -> Result<(), TxConnectError> {
        if let (Some(tx), Some(rx)) = (self.0, self.1) {
            connect(tx, rx)
        } else {
            Ok(())
        }
    }
}

pub(crate) fn connect_imp<T, R, V>(mut tx: T, mut rx: R) -> Result<(), TxConnectError>
where
    T: TxConnectable<Message = V>,
    R: RxConnectable<Message = V>,
{
    if rx.is_connected() {
        return Err(TxConnectError::ReceiverAlreadyConnected);
    }

    if tx.has_max_connection_count() {
        return Err(TxConnectError::MaxConnectionCountExceeded);
    }

    if matches!(tx.overflow_policy(), OverflowPolicy::Resize)
        && matches!(rx.overflow_policy(), OverflowPolicy::Reject(_))
    {
        return Err(TxConnectError::PolicyMismatch);
    }

    tx.on_connect(rx.on_connect());

    Ok(())
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::prelude::{
        DoubleBufferRx, DoubleBufferTx, Message, MessageRx, MessageTx, RxMessageEndpoint,
        TxMessageEndpoint,
    };

    #[test]
    fn test_connect() {
        // T: MessageTx, DoubleBufferTx
        // R: MessageRx, DoubleBufferRx

        test_with_l1(|| MessageTx::new_auto_size(), || MessageRx::new_auto_size());

        test_with_l1(
            || MessageTx::new_auto_size(),
            || DoubleBufferRx::new_auto_size(),
        );

        test_with_l1(
            || DoubleBufferTx::new_auto_size(),
            || MessageRx::new_auto_size(),
        );

        test_with_l1(
            || DoubleBufferTx::new_auto_size(),
            || DoubleBufferRx::new_auto_size(),
        );
    }

    fn test_with_l1<TF, RF, T, R>(txf: TF, rxf: RF)
    where
        TF: Fn() -> T,
        RF: Fn() -> R,
        T: TxConnectable<Message = Message<i32>>,
        R: RxConnectable<Message = Message<i32>>,
    {
        // T, Box<T>

        test_with_l2(|| txf(), || rxf());
        test_with_l2(|| Box::new(txf()), || rxf());

        test_with_l2(|| txf(), || Box::new(rxf()));
        test_with_l2(|| Box::new(txf()), || Box::new(rxf()));
    }

    fn test_with_l2<TF, RF, T, R>(txf: TF, rxf: RF)
    where
        TF: Fn() -> T,
        RF: Fn() -> R,
        T: TxConnectable<Message = Message<i32>>,
        R: RxConnectable<Message = Message<i32>>,
    {
        // T, &mut T, Option<T>, Option<&mut T>

        connect(txf(), rxf()).unwrap();
        connect(txf(), &mut rxf()).unwrap();
        connect(txf(), Some(rxf())).unwrap();
        connect(txf(), Some(&mut rxf())).unwrap();

        connect(&mut txf(), rxf()).unwrap();
        connect(&mut txf(), &mut rxf()).unwrap();
        connect(&mut txf(), Some(rxf())).unwrap();
        connect(&mut txf(), Some(&mut rxf())).unwrap();

        connect(Some(txf()), rxf()).unwrap();
        connect(Some(txf()), &mut rxf()).unwrap();
        connect(Some(txf()), Some(rxf())).unwrap();
        connect(Some(txf()), Some(&mut rxf())).unwrap();

        connect(Some(&mut txf()), rxf()).unwrap();
        connect(Some(&mut txf()), &mut rxf()).unwrap();
        connect(Some(&mut txf()), Some(rxf())).unwrap();
        connect(Some(&mut txf()), Some(&mut rxf())).unwrap();
    }

    #[test]
    fn test_connect_dyn() {
        fn dyn_tx() -> Box<dyn TxMessageEndpoint<i32>> {
            Box::new(MessageTx::new_auto_size())
        }

        fn dyn_rx() -> Box<dyn RxMessageEndpoint<i32>> {
            Box::new(MessageRx::new_auto_size())
        }

        connect(dyn_tx(), MessageRx::new_auto_size()).unwrap();
        connect(MessageTx::new_auto_size(), dyn_rx()).unwrap();
        connect(dyn_tx(), dyn_rx()).unwrap();
    }

    #[test]
    fn test_connect_mut_dyn() {
        struct TxHelper {
            tx: MessageTx<i32>,
        }

        impl TxHelper {
            fn new() -> Self {
                Self {
                    tx: MessageTx::new_auto_size(),
                }
            }

            fn dyn_tx(&mut self) -> Box<dyn TxMessageEndpoint<i32> + '_> {
                Box::new(&mut self.tx)
            }
        }

        struct RxHelper {
            rx: MessageRx<i32>,
        }

        impl RxHelper {
            fn new() -> Self {
                Self {
                    rx: MessageRx::new_auto_size(),
                }
            }

            fn dyn_rx(&mut self) -> Box<dyn RxMessageEndpoint<i32> + '_> {
                Box::new(&mut self.rx)
            }
        }

        connect(TxHelper::new().dyn_tx(), MessageRx::new_auto_size()).unwrap();
        connect(MessageTx::new_auto_size(), RxHelper::new().dyn_rx()).unwrap();
        connect(TxHelper::new().dyn_tx(), RxHelper::new().dyn_rx()).unwrap();
    }
}