use std::cell::RefCell;
use std::fmt::{self, Debug};
use std::rc::Rc;
use crate::dataflow::channels::Message;
use crate::communication::Push;
use crate::Container;
use push_set::{PushSet, PushOne, PushMany};
mod push_set {
use crate::communication::Push;
pub trait PushSet<T> : Push<T> {
fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>>;
}
pub struct PushOne<P> { inner: P }
impl<T, P: Push<T>> Push<T> for PushOne<P> {
fn push(&mut self, item: &mut Option<T>) { self.inner.push(item) }
}
impl<T: 'static, P: Push<T> + 'static> PushSet<T> for PushOne<P> {
fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { None }
}
impl<P> From<P> for PushOne<P> { fn from(inner: P) -> Self { Self { inner } } }
pub struct PushMany<T> {
buffer: Option<T>,
list: Vec<Box<dyn Push<T>>>,
}
impl<T: Clone> Push<T> for PushMany<T> {
fn push(&mut self, item: &mut Option<T>) {
if item.is_some() {
for pusher in self.list.iter_mut().rev().skip(1).rev() {
self.buffer.clone_from(&item);
pusher.push(&mut self.buffer);
}
if let Some(pusher) = self.list.last_mut() {
std::mem::swap(&mut self.buffer, item);
pusher.push(&mut self.buffer);
}
}
else { for pusher in self.list.iter_mut() { pusher.done(); } }
}
}
impl<T: Clone + 'static> PushSet<T> for PushMany<T> {
fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { Some(&mut self.list) }
}
impl<T> From<Vec<Box<dyn Push<T>>>> for PushMany<T> { fn from(list: Vec<Box<dyn Push<T>>>) -> Self { Self { list, buffer: None } } }
}
type PushList<T, C> = Rc<RefCell<Option<Box<dyn PushSet<Message<T, C>>>>>>;
pub struct Tee<T, C> { shared: PushList<T, C> }
impl<T: 'static, C: Container> Push<Message<T, C>> for Tee<T, C> {
#[inline]
fn push(&mut self, message: &mut Option<Message<T, C>>) {
if let Some(pushee) = self.shared.borrow_mut().as_mut() {
pushee.push(message)
}
}
}
impl<T, C> Tee<T, C> {
pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
let shared = Rc::new(RefCell::new(None));
let port = Tee { shared: Rc::clone(&shared) };
(port, TeeHelper { shared })
}
}
impl<T, C> Debug for Tee<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Tee") }
}
pub struct TeeHelper<T, C> { shared: PushList<T, C> }
impl<T: 'static, C: 'static> TeeHelper<T, C> {
pub fn upgrade(&self) where T: Clone, C: Clone {
let mut borrow = self.shared.borrow_mut();
if let Some(mut pusher) = borrow.take() {
if pusher.as_list().is_none() {
*borrow = Some(Box::new(PushMany::from(vec![pusher as Box<dyn Push<Message<T, C>>>])));
}
else {
*borrow = Some(pusher);
}
}
else {
*borrow = Some(Box::new(PushMany::from(vec![])));
}
}
pub fn add_pusher<P: Push<Message<T, C>>+'static>(self, pusher: P) {
let mut borrow = self.shared.borrow_mut();
if let Some(many) = borrow.as_mut() {
many.as_list().unwrap().push(Box::new(pusher))
}
else {
assert!(borrow.is_none());
*borrow = Some(Box::new(PushOne::from(pusher)));
}
}
}
impl<T: Clone+'static, C: Clone+'static> Clone for TeeHelper<T, C> {
fn clone(&self) -> Self { self.upgrade(); TeeHelper { shared: Rc::clone(&self.shared) } }
}
impl<T, C> Debug for TeeHelper<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TeeHelper") }
}