use std;
use strum;
use vec_map;
use crate::{session, Message};
pub mod backend;
pub struct Channel <CTX : session::Context> {
pub def : Def <CTX>,
pub sourcepoints : vec_map::VecMap <Box <dyn Sourcepoint <CTX>>>,
pub endpoints : vec_map::VecMap <Box <dyn Endpoint <CTX>>>
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Def <CTX : session::Context> {
id : CTX::CID,
kind : Kind,
producers : Vec <CTX::PID>,
consumers : Vec <CTX::PID>,
message_type_id : CTX::MID
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RecvError;
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct SendError <M> (pub M);
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Kind {
Simplex,
Sink,
Source
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DefineError {
ProducerEqConsumer,
DuplicateProducer,
DuplicateConsumer,
MultipleProducers,
MultipleConsumers,
ZeroProducers,
ZeroConsumers
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum CreateError {
KindMismatch
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TryRecvError {
Empty,
Disconnected
}
pub type IdReprType = u16;
pub trait Id <CTX> : Clone + Ord + Into <usize> + TryFrom <IdReprType> +
std::fmt::Debug + strum::IntoEnumIterator
where
CTX : session::Context <CID=Self>
{
fn def (&self) -> Def <CTX>;
fn message_type_id (&self) -> CTX::MID;
fn create (_ : Def <CTX>) -> Channel <CTX>;
}
pub trait Sourcepoint <CTX : session::Context> : Send {
fn send (&self, message : CTX::GMSG) -> Result <(), SendError <CTX::GMSG>>;
fn send_to (&self, message : CTX::GMSG, recipient : CTX::PID)
-> Result <(), SendError <CTX::GMSG>>;
}
pub trait Endpoint <CTX : session::Context> : Send {
fn recv (&self) -> Result <CTX::GMSG, RecvError>;
fn try_recv (&self) -> Result <CTX::GMSG, TryRecvError>;
}
impl <CTX : session::Context> Def <CTX> {
pub fn define (
id : CTX::CID,
kind : Kind,
producers : Vec <CTX::PID>,
consumers : Vec <CTX::PID>
) -> Result <Self, Vec <DefineError>> {
let message_type_id = id.message_type_id();
let def = Def {
id, kind, producers, consumers, message_type_id
};
def.validate_roles() ?;
Ok (def)
}
pub const fn id (&self) -> &CTX::CID {
&self.id
}
pub const fn kind (&self) -> &Kind {
&self.kind
}
pub const fn producers (&self) -> &Vec <CTX::PID> {
&self.producers
}
pub const fn consumers (&self) -> &Vec <CTX::PID> {
&self.consumers
}
pub fn to_channel <M> (self) -> Channel <CTX> where
CTX : 'static,
M : Message <CTX> + 'static
{
match self.kind {
Kind::Simplex => backend::Simplex::<CTX, M>::try_from (self).unwrap().into(),
Kind::Sink => backend::Sink::<CTX, M>::try_from (self).unwrap().into(),
Kind::Source => backend::Source::<CTX, M>::try_from (self).unwrap().into()
}
}
fn validate_roles (&self) -> Result <(), Vec <DefineError>> {
let mut errors = Vec::new();
if self.producers.is_empty() {
errors.push (DefineError::ZeroProducers);
}
if self.consumers.is_empty() {
errors.push (DefineError::ZeroConsumers);
}
let mut producers_dedup = self.producers.clone();
producers_dedup.as_mut_slice().sort();
producers_dedup.dedup_by (|x,y| x == y);
if producers_dedup.len() < self.producers.len() {
errors.push (DefineError::DuplicateProducer);
}
let mut consumers_dedup = self.consumers.clone();
consumers_dedup.as_mut_slice().sort();
consumers_dedup.dedup_by (|x,y| x == y);
if consumers_dedup.len() < self.consumers.len() {
errors.push (DefineError::DuplicateConsumer);
}
let mut producers_and_consumers = producers_dedup.clone();
producers_and_consumers.append (&mut consumers_dedup.clone());
producers_and_consumers.as_mut_slice().sort();
producers_and_consumers.dedup_by (|x,y| x == y);
if producers_and_consumers.len()
< producers_dedup.len() + consumers_dedup.len()
{
errors.push (DefineError::ProducerEqConsumer);
}
if let Err (mut errs)
= self.kind.validate_roles::<CTX> (&self.producers, &self.consumers)
{
errors.append (&mut errs);
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (())
}
}
}
impl Kind {
fn validate_roles <CTX : session::Context> (&self,
producers : &[CTX::PID], consumers : &[CTX::PID]
) -> Result <(), Vec <DefineError>> {
let mut errors = Vec::new();
match *self {
Kind::Simplex => {
if 1 < producers.len() {
errors.push (DefineError::MultipleProducers);
}
if 1 < consumers.len() {
errors.push (DefineError::MultipleConsumers);
}
}
Kind::Sink => {
if 1 < consumers.len() {
errors.push (DefineError::MultipleConsumers);
}
}
Kind::Source => {
if 1 < producers.len() {
errors.push (DefineError::MultipleProducers);
}
}
}
if !errors.is_empty() {
Err (errors)
} else {
Ok (())
}
}
}
impl <T> std::fmt::Debug for SendError <T> {
fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
"SendError(..)".fmt (f)
}
}
impl <T> std::fmt::Display for SendError <T> {
fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
"sending on a closed channel".fmt (f)
}
}
impl <T> std::error::Error for SendError <T> {
fn description (&self) -> &'static str {
"sending on a closed channel"
}
fn cause (&self) -> Option <&dyn std::error::Error> {
None
}
}
pub fn report_sizes <CTX : session::Context> () {
println!("channel report sizes...");
println!(" size of channel::Def: {}", size_of::<Def <CTX>>());
println!(" size of Channel: {}", size_of::<Channel <CTX>>());
println!("...channel report sizes");
}