use crate::{
channels::{FlushResult, SharedBackStage, SyncResult, MAX_RECEIVER_COUNT},
prelude::{Message, OverflowPolicy},
};
use paste::paste;
use std::ops::{Deref, DerefMut};
pub trait Rx: Send {
fn sync(&mut self) -> SyncResult;
fn is_connected(&self) -> bool;
fn len(&self) -> usize;
}
pub trait RxConnectable: Rx {
type Message;
fn overflow_policy(&self) -> OverflowPolicy;
fn on_connect(&mut self) -> SharedBackStage<Self::Message>;
}
pub trait RxMessageEndpoint<V>: RxConnectable<Message = Message<V>> {}
impl<V, R: RxConnectable<Message = Message<V>>> RxMessageEndpoint<V> for R {}
impl<R: Rx> Rx for &mut R {
fn sync(&mut self) -> SyncResult {
(**self).sync()
}
fn is_connected(&self) -> bool {
(**self).is_connected()
}
fn len(&self) -> usize {
(**self).len()
}
}
impl<R: Rx + ?Sized> Rx for Box<R> {
fn sync(&mut self) -> SyncResult {
self.deref_mut().sync()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<V> Rx for Box<&mut dyn RxConnectable<Message = V>> {
fn sync(&mut self) -> SyncResult {
self.deref_mut().sync()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<V> Rx for Box<&mut dyn RxMessageEndpoint<V>> {
fn sync(&mut self) -> SyncResult {
self.deref_mut().sync()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<R: RxConnectable> RxConnectable for &mut R {
type Message = R::Message;
fn overflow_policy(&self) -> OverflowPolicy {
(**self).overflow_policy()
}
fn on_connect(&mut self) -> SharedBackStage<Self::Message> {
(**self).on_connect()
}
}
impl<R: RxConnectable + ?Sized> RxConnectable for Box<R> {
type Message = R::Message;
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self) -> SharedBackStage<Self::Message> {
self.deref_mut().on_connect()
}
}
impl<V> RxConnectable for Box<&mut dyn RxConnectable<Message = V>> {
type Message = V;
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self) -> SharedBackStage<Self::Message> {
self.deref_mut().on_connect()
}
}
impl<V> RxConnectable for Box<&mut dyn RxMessageEndpoint<V>> {
type Message = Message<V>;
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self) -> SharedBackStage<Self::Message> {
self.deref_mut().on_connect()
}
}
pub trait Tx: Send {
fn flush(&mut self) -> FlushResult;
fn is_connected(&self) -> bool;
fn len(&self) -> usize;
}
pub trait TxConnectable: Tx {
type Message;
fn has_max_connection_count(&self) -> bool;
fn overflow_policy(&self) -> OverflowPolicy;
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>);
}
pub trait TxMessageEndpoint<V>: TxConnectable<Message = Message<V>> {}
impl<V, T: TxConnectable<Message = Message<V>>> TxMessageEndpoint<V> for T {}
impl<T: Tx> Tx for &mut T {
fn flush(&mut self) -> FlushResult {
(**self).flush()
}
fn is_connected(&self) -> bool {
(**self).is_connected()
}
fn len(&self) -> usize {
(**self).len()
}
}
impl<T: Tx + ?Sized> Tx for Box<T> {
fn flush(&mut self) -> FlushResult {
self.deref_mut().flush()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<V> Tx for Box<&mut dyn TxConnectable<Message = V>> {
fn flush(&mut self) -> FlushResult {
self.deref_mut().flush()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<V> Tx for Box<&mut dyn TxMessageEndpoint<V>> {
fn flush(&mut self) -> FlushResult {
self.deref_mut().flush()
}
fn is_connected(&self) -> bool {
self.deref().is_connected()
}
fn len(&self) -> usize {
self.deref().len()
}
}
impl<T: TxConnectable> TxConnectable for &mut T {
type Message = T::Message;
fn has_max_connection_count(&self) -> bool {
(**self).has_max_connection_count()
}
fn overflow_policy(&self) -> OverflowPolicy {
(**self).overflow_policy()
}
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
(**self).on_connect(stage)
}
}
impl<T: TxConnectable + ?Sized> TxConnectable for Box<T> {
type Message = T::Message;
fn has_max_connection_count(&self) -> bool {
self.deref().has_max_connection_count()
}
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
self.deref_mut().on_connect(stage)
}
}
impl<V> TxConnectable for Box<&mut dyn TxConnectable<Message = V>> {
type Message = V;
fn has_max_connection_count(&self) -> bool {
self.deref().has_max_connection_count()
}
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
self.deref_mut().on_connect(stage)
}
}
impl<V> TxConnectable for Box<&mut dyn TxMessageEndpoint<V>> {
type Message = Message<V>;
fn has_max_connection_count(&self) -> bool {
self.deref().has_max_connection_count()
}
fn overflow_policy(&self) -> OverflowPolicy {
self.deref().overflow_policy()
}
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
self.deref_mut().on_connect(stage)
}
}
pub trait RxBundle: Send {
fn channel_count(&self) -> usize;
fn name(&self, index: usize) -> &str;
fn inbox_message_count(&self, index: usize) -> usize;
fn sync_all(&mut self, result: &mut [SyncResult]);
fn check_connection(&self) -> ConnectionCheck;
fn iter_names(&self) -> impl Iterator<Item = &str> {
(0..self.channel_count()).map(|i| self.name(i))
}
}
pub trait TxBundle: Send {
fn channel_count(&self) -> usize;
fn name(&self, index: usize) -> &str;
fn outbox_message_count(&self, index: usize) -> usize;
fn flush_all(&mut self, results: &mut [FlushResult]);
fn check_connection(&self) -> ConnectionCheck;
fn iter_names(&self) -> impl Iterator<Item = &str> {
(0..self.channel_count()).map(|i| self.name(i))
}
}
impl<R: Rx> Rx for Option<R> {
fn is_connected(&self) -> bool {
self.as_ref().map_or(false, |rx| rx.is_connected())
}
fn sync(&mut self) -> SyncResult {
self.as_mut().map_or(SyncResult::ZERO, |rx| rx.sync())
}
fn len(&self) -> usize {
self.as_ref().map_or(0, |rx| rx.len())
}
}
impl RxBundle for () {
fn channel_count(&self) -> usize {
0
}
fn name(&self, _index: usize) -> &str {
panic!("empty bundle")
}
fn inbox_message_count(&self, _index: usize) -> usize {
panic!("empty bundle")
}
fn sync_all(&mut self, _: &mut [SyncResult]) {}
fn check_connection(&self) -> ConnectionCheck {
ConnectionCheck::default()
}
}
impl<R: Rx> RxBundle for R {
fn channel_count(&self) -> usize {
1
}
fn name(&self, index: usize) -> &str {
assert_eq!(index, 0);
"in"
}
fn inbox_message_count(&self, index: usize) -> usize {
assert_eq!(index, 0);
<Self as Rx>::len(self)
}
fn sync_all(&mut self, results: &mut [SyncResult]) {
results[0] = self.sync();
}
fn check_connection(&self) -> ConnectionCheck {
let mut cc = ConnectionCheck::new(1);
cc.mark(0, self.is_connected());
cc
}
}
macro_rules! count {
() => (0usize);
($x:tt $($xs:tt)*) => (1usize + count!($($xs)*));
}
macro_rules! impl_rx_bundle_tuple {
( $( $ty: ident, $i: literal ),* ) => {
impl<$($ty),*> RxBundle for ($($ty,)*) where $($ty: Rx,)* {
fn channel_count(&self) -> usize {
count!($($ty)*)
}
fn name(&self, index: usize) -> &str {
match index {
$(
paste!{$i} => stringify!{$i},
)*
_ => panic!("invalid index"),
}
}
fn inbox_message_count(&self, index: usize) -> usize {
match index {
$(
paste!{$i} => paste!{self.$i}.len(),
)*
_ => panic!("invalid index"),
}
}
fn sync_all(&mut self, results: &mut [SyncResult]) {
$(results[$i] = paste!{self.$i}.sync();)*
}
fn check_connection(&self) -> ConnectionCheck {
let len = count!($($ty)*);
let mut cc = ConnectionCheck::new(len);
$(cc.mark($i, paste!{self.$i}.is_connected());)*
cc
}
}
};
}
impl_rx_bundle_tuple!(A, 0);
impl_rx_bundle_tuple!(A, 0, B, 1);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5, G, 6);
impl_rx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5, G, 6, H, 7);
impl<T: Tx> Tx for Option<T> {
fn flush(&mut self) -> FlushResult {
self.as_mut().map_or(FlushResult::ZERO, |tx| tx.flush())
}
fn is_connected(&self) -> bool {
self.as_ref().map_or(false, |tx| tx.is_connected())
}
fn len(&self) -> usize {
self.as_ref().map_or(0, |rx| rx.len())
}
}
impl TxBundle for () {
fn channel_count(&self) -> usize {
0
}
fn name(&self, _index: usize) -> &str {
panic!("empty bundle")
}
fn outbox_message_count(&self, _index: usize) -> usize {
panic!("empty bundle")
}
fn flush_all(&mut self, _results: &mut [FlushResult]) {}
fn check_connection(&self) -> ConnectionCheck {
ConnectionCheck::default()
}
}
impl<T: Tx> TxBundle for T {
fn channel_count(&self) -> usize {
1
}
fn name(&self, index: usize) -> &str {
assert_eq!(index, 0);
"out"
}
fn outbox_message_count(&self, index: usize) -> usize {
assert_eq!(index, 0);
<Self as Tx>::len(self)
}
fn flush_all(&mut self, result: &mut [FlushResult]) {
result[0] = self.flush();
}
fn check_connection(&self) -> ConnectionCheck {
let mut cc = ConnectionCheck::new(1);
cc.mark(0, self.is_connected());
cc
}
}
macro_rules! impl_tx_bundle_tuple {
( $( $ty: ident, $i: literal ),* ) => {
impl<$($ty),*> TxBundle for ($($ty,)*) where $($ty: Tx,)* {
fn channel_count(&self) -> usize {
count!($($ty)*)
}
fn name(&self, index: usize) -> &str {
match index {
$(
paste!{$i} => stringify!{$i},
)*
_ => panic!("invalid index"),
}
}
fn outbox_message_count(&self, index: usize) -> usize {
match index {
$(
paste!{$i} => paste!{self.$i}.len(),
)*
_ => panic!("invalid index"),
}
}
fn flush_all(&mut self, results: &mut [FlushResult]) {
$(results[$i] = paste!{self.$i}.flush();)*
}
fn check_connection(&self) -> ConnectionCheck {
let len = count!($($ty)*);
let mut cc = ConnectionCheck::new(len);
$(cc.mark($i, paste!{self.$i}.is_connected());)*
cc
}
}
};
}
impl_tx_bundle_tuple!(A, 0);
impl_tx_bundle_tuple!(A, 0, B, 1);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5, G, 6);
impl_tx_bundle_tuple!(A, 0, B, 1, C, 2, D, 3, E, 4, F, 5, G, 6, H, 7);
#[derive(Debug)]
pub struct ConnectionCheck(u8, u64);
impl Default for ConnectionCheck {
fn default() -> Self {
Self(0, 0)
}
}
impl ConnectionCheck {
pub fn new(len: usize) -> Self {
assert!(len <= MAX_RECEIVER_COUNT, "too many connections: len={len}");
Self(len as u8, 0)
}
pub fn mark(&mut self, index: usize, is_connected: bool) {
assert!(
index < self.0.into(),
"invalid channel index: len={}, index={}",
self.0,
index
);
if is_connected {
self.1 |= 1 << index
} else {
self.1 &= !(1 << index)
}
}
pub fn is_connected(&self, index: usize) -> bool {
assert!(
index < self.0.into(),
"invalid channel index: len={}, index={}",
self.0,
index
);
self.1 & (1 << index) != 0
}
pub fn is_fully_connected(&self) -> bool {
for i in 0..self.0 as usize {
if !self.is_connected(i) {
return false;
}
}
true
}
pub fn list_unconnected(&self) -> Vec<usize> {
(0..self.0 as usize)
.filter(|&i| !self.is_connected(i))
.collect()
}
}