use crate::{
channels::{RxConnectable, TxConnectError, TxConnectable},
prelude::OverflowPolicy,
};
pub fn connect<T, R>(tx: T, rx: R) -> Result<(), TxConnectError>
where
(T, R): Connect,
{
(tx, rx).connect()
}
pub trait Connect {
fn connect(self) -> Result<(), TxConnectError>;
}
impl<T, R, V> Connect for (T, R)
where
T: TxConnectable<Message = V>,
R: RxConnectable<Message = V>,
{
fn connect(self) -> Result<(), TxConnectError> {
connect_imp(self.0, self.1)
}
}
impl<T, R, V> Connect for (Option<T>, R)
where
T: TxConnectable<Message = V>,
R: RxConnectable<Message = V>,
{
fn connect(self) -> Result<(), TxConnectError> {
if let Some(tx) = self.0 {
connect_imp(tx, self.1)
} else {
Ok(())
}
}
}
impl<T, R, V> Connect for (T, Option<R>)
where
T: TxConnectable<Message = V>,
R: RxConnectable<Message = V>,
{
fn connect(self) -> Result<(), TxConnectError> {
if let Some(rx) = self.1 {
connect_imp(self.0, rx)
} else {
Ok(())
}
}
}
impl<T, R, V> Connect for (Option<T>, Option<R>)
where
T: TxConnectable<Message = V>,
R: RxConnectable<Message = V>,
{
fn connect(self) -> Result<(), TxConnectError> {
if let (Some(tx), Some(rx)) = (self.0, self.1) {
connect(tx, rx)
} else {
Ok(())
}
}
}
pub(crate) fn connect_imp<T, R, V>(mut tx: T, mut rx: R) -> Result<(), TxConnectError>
where
T: TxConnectable<Message = V>,
R: RxConnectable<Message = V>,
{
if rx.is_connected() {
return Err(TxConnectError::ReceiverAlreadyConnected);
}
if tx.has_max_connection_count() {
return Err(TxConnectError::MaxConnectionCountExceeded);
}
if matches!(tx.overflow_policy(), OverflowPolicy::Resize)
&& matches!(rx.overflow_policy(), OverflowPolicy::Reject(_))
{
return Err(TxConnectError::PolicyMismatch);
}
tx.on_connect(rx.on_connect());
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::prelude::{
DoubleBufferRx, DoubleBufferTx, Message, MessageRx, MessageTx, RxMessageEndpoint,
TxMessageEndpoint,
};
#[test]
fn test_connect() {
test_with_l1(|| MessageTx::new_auto_size(), || MessageRx::new_auto_size());
test_with_l1(
|| MessageTx::new_auto_size(),
|| DoubleBufferRx::new_auto_size(),
);
test_with_l1(
|| DoubleBufferTx::new_auto_size(),
|| MessageRx::new_auto_size(),
);
test_with_l1(
|| DoubleBufferTx::new_auto_size(),
|| DoubleBufferRx::new_auto_size(),
);
}
fn test_with_l1<TF, RF, T, R>(txf: TF, rxf: RF)
where
TF: Fn() -> T,
RF: Fn() -> R,
T: TxConnectable<Message = Message<i32>>,
R: RxConnectable<Message = Message<i32>>,
{
test_with_l2(|| txf(), || rxf());
test_with_l2(|| Box::new(txf()), || rxf());
test_with_l2(|| txf(), || Box::new(rxf()));
test_with_l2(|| Box::new(txf()), || Box::new(rxf()));
}
fn test_with_l2<TF, RF, T, R>(txf: TF, rxf: RF)
where
TF: Fn() -> T,
RF: Fn() -> R,
T: TxConnectable<Message = Message<i32>>,
R: RxConnectable<Message = Message<i32>>,
{
connect(txf(), rxf()).unwrap();
connect(txf(), &mut rxf()).unwrap();
connect(txf(), Some(rxf())).unwrap();
connect(txf(), Some(&mut rxf())).unwrap();
connect(&mut txf(), rxf()).unwrap();
connect(&mut txf(), &mut rxf()).unwrap();
connect(&mut txf(), Some(rxf())).unwrap();
connect(&mut txf(), Some(&mut rxf())).unwrap();
connect(Some(txf()), rxf()).unwrap();
connect(Some(txf()), &mut rxf()).unwrap();
connect(Some(txf()), Some(rxf())).unwrap();
connect(Some(txf()), Some(&mut rxf())).unwrap();
connect(Some(&mut txf()), rxf()).unwrap();
connect(Some(&mut txf()), &mut rxf()).unwrap();
connect(Some(&mut txf()), Some(rxf())).unwrap();
connect(Some(&mut txf()), Some(&mut rxf())).unwrap();
}
#[test]
fn test_connect_dyn() {
fn dyn_tx() -> Box<dyn TxMessageEndpoint<i32>> {
Box::new(MessageTx::new_auto_size())
}
fn dyn_rx() -> Box<dyn RxMessageEndpoint<i32>> {
Box::new(MessageRx::new_auto_size())
}
connect(dyn_tx(), MessageRx::new_auto_size()).unwrap();
connect(MessageTx::new_auto_size(), dyn_rx()).unwrap();
connect(dyn_tx(), dyn_rx()).unwrap();
}
#[test]
fn test_connect_mut_dyn() {
struct TxHelper {
tx: MessageTx<i32>,
}
impl TxHelper {
fn new() -> Self {
Self {
tx: MessageTx::new_auto_size(),
}
}
fn dyn_tx(&mut self) -> Box<dyn TxMessageEndpoint<i32> + '_> {
Box::new(&mut self.tx)
}
}
struct RxHelper {
rx: MessageRx<i32>,
}
impl RxHelper {
fn new() -> Self {
Self {
rx: MessageRx::new_auto_size(),
}
}
fn dyn_rx(&mut self) -> Box<dyn RxMessageEndpoint<i32> + '_> {
Box::new(&mut self.rx)
}
}
connect(TxHelper::new().dyn_tx(), MessageRx::new_auto_size()).unwrap();
connect(MessageTx::new_auto_size(), RxHelper::new().dyn_rx()).unwrap();
connect(TxHelper::new().dyn_tx(), RxHelper::new().dyn_rx()).unwrap();
}
}