1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
use std::thread::spawn; use std::sync::mpsc::{channel, Sender, Receiver, RecvError, TryRecvError, Iter}; use std::convert::From; use std::sync::Arc; type FilterFn<T> = Option<Arc<Box<Fn(&T) -> bool + 'static + Sync + Send>>>; pub struct MReceiver<T> { subscribe: Sender<(Sender<T>, FilterFn<T>)>, rec: Receiver<T>, filter: FilterFn<T> } pub fn mchannel<T: Send + Clone + 'static>() -> (Sender<T>, MReceiver<T>) { let (ds, dr) = channel(); (ds, MReceiver::from_receiver(dr, None)) } fn listen<T: Send + Clone + 'static>(r: Receiver<T>) -> Sender<(Sender<T>, FilterFn<T>)> { let (ls, lr) = channel(); spawn(move || { let dr = r; let lr = lr; let mut connected = vec![]; loop { match dr.recv() { Ok(m) => { let m: T = m; loop { match lr.try_recv() { Ok(l) => connected.push(l), Err(_) => break } } connected.retain(|out| { let &(ref l, ref f): &(Sender<T>, FilterFn<T>) = out; let f: &FilterFn<T> = f; if f.is_none() || f.as_ref().unwrap()(&m) { match l.send(m.clone()) { Ok(()) => true, Err(_) => false } } else { true } }); } Err(_) => break } } }); ls } impl <T> From<Receiver<T>> for MReceiver<T> where T: Send + Clone + 'static { fn from(other: Receiver<T>) -> MReceiver<T> { MReceiver::from_sub(listen(other), None) } } impl <T> MReceiver<T> where T: Send + Clone + 'static { fn from_receiver(other: Receiver<T>, filter: FilterFn<T>) -> MReceiver<T> { MReceiver::from_sub(listen(other), filter) } fn from_sub(subscriber: Sender<(Sender<T>, FilterFn<T>)>, filter: FilterFn<T>) -> MReceiver<T> { let (sx, rx)= channel(); let _ = subscriber.send((sx, filter.clone())); MReceiver { subscribe: subscriber, rec: rx, filter: filter } } pub fn into_inner(self) -> Receiver<T> { self.rec } pub fn as_inner(&self) -> &Receiver<T> { &self.rec } pub fn recv(&self) -> Result<T, RecvError> { self.rec.recv() } pub fn try_recv(&self) -> Result<T, TryRecvError> { self.rec.try_recv() } pub fn iter<'a>(&'a self) -> Iter<'a, T> { self.rec.iter() } pub fn clone_filter<F>(&self, f: F) -> MReceiver<T> where F: Fn(&T) -> bool + 'static + Send + Sync { if let Some(cur_fn) = self.filter.as_ref() { let old_f = cur_fn.clone(); let new_f = move |a: &T| { old_f(a) && f(a) }; MReceiver::from_sub( self.subscribe.clone(), Some(Arc::new( Box::new(new_f) as Box<Fn(&T) -> bool + 'static + Send + Sync>))) } else { MReceiver::from_sub( self.subscribe.clone(), Some(Arc::new( Box::new(f) as Box<Fn(&T) -> bool + 'static + Send + Sync>))) } } } impl <T> Clone for MReceiver<T> where T: Send + Clone + 'static { fn clone(&self) -> MReceiver<T> { MReceiver::from_sub(self.subscribe.clone(), None) } }