1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
use std::thread::spawn; use std::sync::mpsc::{channel, Sender, Receiver, RecvError, TryRecvError, Iter}; use std::convert::From; pub struct MReceiver<T> { subscribe: Sender<Sender<T>>, rec: Receiver<T> } pub fn mchannel<T: Send + Clone + 'static>() -> (Sender<T>, MReceiver<T>) { let (ds, dr) = channel(); (ds, MReceiver::from_receiver(dr)) } fn listen<T: Send + Clone + 'static>(r: Receiver<T>) -> Sender<Sender<T>> { let (ls, lr) = channel(); spawn(move || { let dr = r; let lr = lr; let mut connected = vec![]; loop { match dr.recv() { Ok(m) => { let m: T = m; loop { match lr.try_recv() { Ok(l) => connected.push(l), Err(_) => break } } connected.retain(|l: &Sender<T>| { match l.send(m.clone()) { Ok(()) => true, Err(_) => false } }); } Err(_) => break } } }); ls } impl <T> From<Receiver<T>> for MReceiver<T> where T: Send + Clone + 'static { fn from(other: Receiver<T>) -> MReceiver<T> { MReceiver::from_sub(listen(other)) } } impl <T> MReceiver<T> where T: Send + Clone + 'static { fn from_receiver(other: Receiver<T>) -> MReceiver<T> { MReceiver::from_sub(listen(other)) } fn from_sub(subscriber: Sender<Sender<T>>) -> MReceiver<T> { let (sx, rx)= channel(); let _ = subscriber.send(sx); MReceiver { subscribe: subscriber, rec: rx } } pub fn unwrap(self) -> Receiver<T> { self.rec } pub fn recv(&self) -> Result<T, RecvError> { self.rec.recv() } pub fn try_recv(&self) -> Result<T, TryRecvError> { self.rec.try_recv() } pub fn iter<'a>(&'a self) -> Iter<'a, T> { self.rec.iter() } } impl <T> Clone for MReceiver<T> where T: Send + Clone + 'static { fn clone(&self) -> MReceiver<T> { MReceiver::from_sub(self.subscribe.clone()) } }