1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::thread::spawn;
use std::sync::mpsc::{channel, Sender, Receiver, RecvError, TryRecvError, Iter};
use std::convert::From;

pub struct MReceiver<T> {
    subscribe: Sender<Sender<T>>,
    rec: Receiver<T>
}

pub fn mchannel<T: Send + Clone + 'static>() -> (Sender<T>, MReceiver<T>) {
    let (ds, dr) = channel();

    (ds, MReceiver::from_receiver(dr))
}

fn listen<T: Send + Clone + 'static>(r: Receiver<T>) -> Sender<Sender<T>> {
    let (ls, lr) = channel();
    spawn(move || {
        let dr = r;
        let lr = lr;

        let mut connected = vec![];
        loop {
            match dr.recv() {
                Ok(m) => {
                    let m: T = m;
                    loop {
                        match lr.try_recv() {
                            Ok(l) => connected.push(l),
                            Err(_) => break
                        }
                    }

                    connected.retain(|l: &Sender<T>| {
                        match l.send(m.clone()) {
                            Ok(()) => true,
                            Err(_) => false
                        }
                    });
                }
                Err(_) => break
            }
        }
    });
    ls
}

impl <T> From<Receiver<T>> for MReceiver<T> where T: Send + Clone + 'static {
    fn from(other: Receiver<T>) -> MReceiver<T> {
        MReceiver::from_sub(listen(other))
    }
}

impl <T> MReceiver<T> where T: Send + Clone + 'static {
    fn from_receiver(other: Receiver<T>) -> MReceiver<T> {
        MReceiver::from_sub(listen(other))
    }

    fn from_sub(subscriber: Sender<Sender<T>>) -> MReceiver<T> {
        let (sx, rx)= channel();
        let _ = subscriber.send(sx);
        MReceiver {
            subscribe: subscriber,
            rec: rx
        }
    }

    pub fn unwrap(self) -> Receiver<T> {
        self.rec
    }

    pub fn recv(&self) -> Result<T, RecvError> {
        self.rec.recv()
    }

    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        self.rec.try_recv()
    }

    pub fn iter<'a>(&'a self) -> Iter<'a, T> {
        self.rec.iter()
    }
}

impl <T> Clone for MReceiver<T> where T: Send + Clone + 'static {
    fn clone(&self) -> MReceiver<T> {
        MReceiver::from_sub(self.subscribe.clone())
    }
}