1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::thread::spawn;
use std::sync::mpsc::{channel, Sender, Receiver, RecvError, TryRecvError, Iter};
use std::convert::From;
use std::sync::Arc;

type FilterFn<T> = Option<Arc<Box<Fn(&T) -> bool + 'static + Sync + Send>>>;

pub struct MReceiver<T> {
    subscribe: Sender<(Sender<T>, FilterFn<T>)>,
    rec: Receiver<T>,
    filter: FilterFn<T>
}

pub fn mchannel<T: Send + Clone + 'static>() -> (Sender<T>, MReceiver<T>) {
    let (ds, dr) = channel();

    (ds, MReceiver::from_receiver(dr, None))
}

fn listen<T: Send + Clone + 'static>(r: Receiver<T>) -> Sender<(Sender<T>, FilterFn<T>)> {
    let (ls, lr) = channel();
    spawn(move || {
        let dr = r;
        let lr = lr;

        let mut connected = vec![];
        loop {
            match dr.recv() {
                Ok(m) => {
                    let m: T = m;
                    loop {
                        match lr.try_recv() {
                            Ok(l) => connected.push(l),
                            Err(_) => break
                        }
                    }

                    connected.retain(|out| {
                        let &(ref l, ref f): &(Sender<T>, FilterFn<T>) = out;
                        let f: &FilterFn<T> = f;
                        if f.is_none() || f.as_ref().unwrap()(&m) {
                            match l.send(m.clone()) {
                                Ok(()) => true,
                                Err(_) => false
                            }
                        } else {
                            true
                        }
                    });
                }
                Err(_) => break
            }
        }
    });
    ls
}

impl <T> From<Receiver<T>> for MReceiver<T> where T: Send + Clone + 'static {
    fn from(other: Receiver<T>) -> MReceiver<T> {
        MReceiver::from_sub(listen(other), None)
    }
}

impl <T> MReceiver<T> where T: Send + Clone + 'static {
    fn from_receiver(other: Receiver<T>, filter: FilterFn<T>) -> MReceiver<T> {
        MReceiver::from_sub(listen(other), filter)
    }

    fn from_sub(subscriber: Sender<(Sender<T>, FilterFn<T>)>,
                filter: FilterFn<T>) -> MReceiver<T> {
        let (sx, rx)= channel();
        let _ = subscriber.send((sx, filter.clone()));
        MReceiver {
            subscribe: subscriber,
            rec: rx,
            filter: filter
        }
    }

    pub fn into_inner(self) -> Receiver<T> {
        self.rec
    }

    pub fn as_inner(&self) -> &Receiver<T> {
        &self.rec
    }

    pub fn recv(&self) -> Result<T, RecvError> {
        self.rec.recv()
    }

    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        self.rec.try_recv()
    }

    pub fn iter<'a>(&'a self) -> Iter<'a, T> {
        self.rec.iter()
    }

    pub fn clone_filter<F>(&self, f: F) -> MReceiver<T>
    where F: Fn(&T) -> bool + 'static + Send + Sync
    {
        if let Some(cur_fn) = self.filter.as_ref() {
            let old_f = cur_fn.clone();
            let new_f = move |a: &T| {
                old_f(a) && f(a)
            };

            MReceiver::from_sub(
                self.subscribe.clone(),
                Some(Arc::new(
                     Box::new(new_f) as Box<Fn(&T) -> bool + 'static + Send + Sync>)))
        } else {
            MReceiver::from_sub(
                self.subscribe.clone(),
                Some(Arc::new(
                     Box::new(f) as Box<Fn(&T) -> bool + 'static + Send + Sync>)))
        }
    }
}

impl <T> Clone for MReceiver<T> where T: Send + Clone + 'static {
    fn clone(&self) -> MReceiver<T> {
        MReceiver::from_sub(self.subscribe.clone(), None)
    }
}