signal_simple/
channel.rs

1use std::sync::mpsc::{Sender, Receiver, channel, TryRecvError};
2use std::sync::{Arc, Mutex};
3
4#[derive(Debug)]
5pub enum ChannelError {
6    SyncError(String),
7    SendError,
8    RecvError,
9    
10}
11
12pub struct SyncChannel<T> {
13    _s: Arc<Mutex<Sender<T>>>,
14    _r: Arc<Mutex<Receiver<T>>>
15}
16
17impl<T> SyncChannel<T> {
18    pub fn new() -> Self {
19        let (s, r) = channel::<T>();
20        SyncChannel { _s: Arc::new(Mutex::new(s)), _r: Arc::new(Mutex::new(r)) }
21    }
22    
23    pub fn send(&self, data: T) -> Result<(), ChannelError> {
24        self._s.lock()
25            .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
26            .send(data)
27            .map_err(|_| { ChannelError::SendError })
28    }
29    
30    pub fn recv(&self) -> Result<T, ChannelError> {
31        self._r.lock()
32            .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
33            .recv()
34            .map_err(|_| { ChannelError::RecvError })
35    }
36    
37    pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
38        match self._r.lock()
39            .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
40            .try_recv() {
41            
42            Err(TryRecvError::Empty) => Ok(None),
43            Err(_) => Err(ChannelError::RecvError),
44            Ok(d) => Ok(Some(d))
45        }
46        
47    }
48}
49
50impl<T> Clone for SyncChannel<T> {
51    fn clone(&self) -> Self {
52        SyncChannel {
53            _r: self._r.clone(),
54            _s: self._s.clone(),
55        }
56    }
57}
58unsafe impl<T> Send for SyncChannel<T> {}
59unsafe impl<T> Sync for SyncChannel<T> {}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64    use std::thread;
65    use std::sync::atomic::{AtomicBool, Ordering};
66    
67    #[test]
68    fn test_channels() {
69        let ch: SyncChannel<i32> = SyncChannel::new();
70        let ch_clone = ch.clone();
71        thread::spawn(move || {
72            ch_clone.send(3)
73        });
74        assert_eq!(ch.recv().unwrap(), 3);
75    }
76    
77    #[test]
78    fn test_channels_try_recv() {
79        let ch: SyncChannel<i32> = SyncChannel::new();
80        let ch_clone = ch.clone();
81        let switch = Arc::new(AtomicBool::new(false));
82        let sw_clone = switch.clone();
83        assert_eq!(ch.try_recv().unwrap(), None);
84        
85        thread::spawn(move || {
86            ch_clone.send(3).unwrap();
87            sw_clone.store(true, Ordering::Relaxed);
88        });
89        loop {
90            if switch.load(Ordering::Relaxed) { break; }
91        }
92        assert_eq!(ch.try_recv().unwrap(), Some(3));
93    }
94}