use crate::{Rewrite, RewritePlan};
use crate::actor::Id;
use crate::util::{HashableHashMap, HashableHashSet};
use std::collections::{BTreeMap, VecDeque};
use std::collections::btree_map::Entry;
use std::collections::{btree_map, hash_set, hash_map};
use std::hash::Hash;
use std::str::FromStr;
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(serde::Serialize)]
pub struct Envelope<Msg> { pub src: Id, pub dst: Id, pub msg: Msg }
impl<Msg> Envelope<&Msg> {
pub fn to_cloned_msg(&self) -> Envelope<Msg>
where Msg: Clone,
{
Envelope {
src: self.src,
dst: self.dst,
msg: self.msg.clone(),
}
}
}
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(serde::Serialize)]
pub enum Network<Msg>
where Msg: Eq + Hash
{
UnorderedDuplicating(HashableHashSet<Envelope<Msg>>),
UnorderedNonDuplicating(HashableHashMap<Envelope<Msg>, usize>),
Ordered(BTreeMap<(Id, Id), VecDeque<Msg>>),
}
impl<Msg> Network<Msg>
where Msg: Eq + Hash,
{
pub fn new_ordered(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
let mut this = Self::Ordered(BTreeMap::new());
for env in envelopes {
this.send(env);
}
this
}
pub fn new_unordered_duplicating(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
let mut this = Self::UnorderedDuplicating(
HashableHashSet::with_hasher(
crate::stable::build_hasher()));
for env in envelopes {
this.send(env);
}
this
}
pub fn new_unordered_nonduplicating(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
let mut this = Self::UnorderedNonDuplicating(
HashableHashMap::with_hasher(
crate::stable::build_hasher()));
for env in envelopes {
this.send(env);
}
this
}
pub fn names() -> Vec<&'static str> {
struct IterStr<Msg: Eq + Hash>(Option<Network<Msg>>);
impl<Msg: Eq + Hash> Iterator for IterStr<Msg> {
type Item = &'static str;
fn next(&mut self) -> Option<Self::Item> {
if let Some(network) = &self.0 {
match network {
Network::Ordered(_) => {
self.0 = Some(Network::UnorderedDuplicating(Default::default()));
Some("ordered")
}
Network::UnorderedDuplicating(_) => {
self.0 = Some(Network::UnorderedNonDuplicating(Default::default()));
Some("unordered_duplicating")
}
Network::UnorderedNonDuplicating(_) => {
self.0 = None;
Some("unordered_nonduplicating")
}
}
} else {
None
}
}
}
IterStr::<Msg>(Some(Network::Ordered(Default::default()))).collect()
}
pub fn iter_all(&self) -> NetworkIter<Msg> {
match self {
Network::UnorderedDuplicating(set) => {
NetworkIter::UnorderedDuplicating(set.iter())
}
Network::UnorderedNonDuplicating(multiset) => {
NetworkIter::UnorderedNonDuplicating(None, multiset.iter())
}
Network::Ordered(map) => {
NetworkIter::Ordered(None, map.iter())
}
}
}
pub fn iter_deliverable(&self) -> NetworkDeliverableIter<Msg> {
match self {
Network::UnorderedDuplicating(set) => {
NetworkDeliverableIter::UnorderedDuplicating(set.iter())
}
Network::UnorderedNonDuplicating(multiset) => {
NetworkDeliverableIter::UnorderedNonDuplicating(multiset.keys())
}
Network::Ordered(map) => {
NetworkDeliverableIter::Ordered(map.iter())
}
}
}
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
match self {
Network::UnorderedDuplicating(set) => set.len(),
Network::UnorderedNonDuplicating(multiset) => multiset.values().sum(),
Network::Ordered(map) => map.values().map(VecDeque::len).sum(),
}
}
pub(crate) fn send(&mut self, envelope: Envelope<Msg>) {
match self {
Network::UnorderedDuplicating(set) => {
set.insert(envelope);
}
Network::UnorderedNonDuplicating(multiset) => {
*multiset.entry(envelope).or_insert(0) += 1;
}
Network::Ordered(map) => {
map.entry((envelope.src, envelope.dst))
.or_insert_with(|| VecDeque::with_capacity(1))
.push_back(envelope.msg);
}
}
}
pub(crate) fn on_deliver(&mut self, envelope: Envelope<Msg>)
where Msg: PartialEq,
{
match self {
Network::UnorderedDuplicating(_) => {
}
Network::UnorderedNonDuplicating(multiset) => {
match multiset.entry(envelope) {
hash_map::Entry::Occupied(mut entry) => {
let value = *entry.get();
assert!(value > 0);
if value == 1 { entry.remove(); }
else { *entry.get_mut() -= 1; }
}
hash_map::Entry::Vacant(_) => {
panic!("envelope not found");
}
}
}
Network::Ordered(map) => {
let flow_entry = match map.entry((envelope.src, envelope.dst)) {
Entry::Vacant(_) => panic!("flow not found. src={:?}, dst={:?}", envelope.src, envelope.dst),
Entry::Occupied(flow) => flow,
};
let i = flow_entry.get().iter().position(|x| x == &envelope.msg).expect("message not found");
if flow_entry.get().len() > 1 {
flow_entry.into_mut().remove(i);
} else {
flow_entry.remove();
}
}
}
}
pub(crate) fn on_drop(&mut self, envelope: Envelope<Msg>)
where Msg: PartialEq,
{
match self {
Network::UnorderedDuplicating(set) => {
set.remove(&envelope);
}
Network::UnorderedNonDuplicating(multiset) => {
match multiset.entry(envelope) {
hash_map::Entry::Occupied(mut entry) => {
let value = *entry.get();
assert!(value > 0);
if value == 1 { entry.remove(); }
else { *entry.get_mut() -= 1; }
}
hash_map::Entry::Vacant(_) => {
panic!("envelope not found");
}
}
}
Network::Ordered(map) => {
let flow_entry = match map.entry((envelope.src, envelope.dst)) {
Entry::Vacant(_) => panic!("flow not found. src={:?}, dst={:?}", envelope.src, envelope.dst),
Entry::Occupied(flow) => flow,
};
let i = flow_entry.get().iter().position(|x| x == &envelope.msg).expect("message not found");
if flow_entry.get().len() > 1 {
flow_entry.into_mut().remove(i);
} else {
flow_entry.remove();
}
}
}
}
}
impl<Msg> FromStr for Network<Msg>
where Msg: Eq + Hash,
{
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ordered" => Ok(Self::new_ordered([])),
"unordered_duplicating" => Ok(Self::new_unordered_duplicating([])),
"unordered_nonduplicating" => Ok(Self::new_unordered_nonduplicating([])),
_ => Err(format!("unable to parse network name: {}", s)),
}
}
}
impl<Msg> Rewrite<Id> for Network<Msg>
where Msg: Eq + Hash + Rewrite<Id>,
{
fn rewrite<S>(&self, plan: &RewritePlan<Id,S>) -> Self {
match self {
Network::UnorderedDuplicating(set) => Network::UnorderedDuplicating(set.rewrite(plan)),
Network::UnorderedNonDuplicating(multiset) =>
Network::UnorderedNonDuplicating(multiset.rewrite(plan)),
Network::Ordered(map) => Network::Ordered(map.rewrite(plan)),
}
}
}
pub enum NetworkIter<'a, Msg> {
UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
UnorderedNonDuplicating(
Option<(Envelope<&'a Msg>, usize)>,
std::collections::hash_map::Iter<'a, Envelope<Msg>, usize>),
Ordered(
Option<(Id, Id, &'a VecDeque<Msg>, usize)>,
btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>),
}
impl<'a, Msg> Iterator for NetworkIter<'a, Msg> {
type Item = Envelope<&'a Msg>;
fn next(&mut self) -> Option<Self::Item> {
match self {
NetworkIter::UnorderedDuplicating(it) => {
it.next().map(|env| Envelope {
src: env.src,
dst: env.dst,
msg: &env.msg,
})
}
NetworkIter::UnorderedNonDuplicating(active, it) => {
if let Some((env, count)) = active { let env = *env; *count -= 1;
if *count == 0 { *active = None; }
return Some(env);
}
it.next().map(|(env, count)| {
let env = Envelope {
src: env.src,
dst: env.dst,
msg: &env.msg,
};
if *count > 1 { *active = Some((env, *count)); }
env
})
}
NetworkIter::Ordered(active, it) => {
if let Some((src, dst, messages, index)) = active {
let msg = messages
.get(*index)
.unwrap(); return Some(Envelope {
src: *src,
dst: *dst,
msg,
});
}
it.next().map(|(&(src, dst), messages)| {
let msg = messages.get(0).unwrap(); *active = Some((src, dst, messages, 0));
Envelope { src, dst, msg }
})
}
}
}
}
pub enum NetworkDeliverableIter<'a, Msg> {
UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
UnorderedNonDuplicating(hash_map::Keys<'a, Envelope<Msg>, usize>),
Ordered(btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>),
}
impl<'a, Msg> Iterator for NetworkDeliverableIter<'a, Msg> {
type Item = Envelope<&'a Msg>;
fn next(&mut self) -> Option<Self::Item> {
match self {
NetworkDeliverableIter::UnorderedDuplicating(it) => {
it.next().map(|env| Envelope {
src: env.src,
dst: env.dst,
msg: &env.msg,
})
},
NetworkDeliverableIter::UnorderedNonDuplicating(it) => {
it.next().map(|env| Envelope {
src: env.src,
dst: env.dst,
msg: &env.msg,
})
},
NetworkDeliverableIter::Ordered(it) => {
it.next().map(|(&(src, dst), messages)|{
let msg = messages.get(0).expect("empty channel");
Envelope { src, dst, msg }
})
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::collections::BTreeSet;
#[test]
fn can_enumerate_and_parse_names() {
assert_eq!(
Network::<()>::names()
.into_iter()
.map(Network::<()>::from_str)
.map(Result::unwrap)
.collect::<BTreeSet<_>>(),
vec![
Network::new_ordered([]),
Network::new_unordered_duplicating([]),
Network::new_unordered_nonduplicating([]),
].into_iter().collect());
}
}