1use std::sync::mpsc::{Sender, Receiver, channel, TryRecvError};
2use std::sync::{Arc, Mutex};
3
4#[derive(Debug)]
5pub enum ChannelError {
6 SyncError(String),
7 SendError,
8 RecvError,
9
10}
11
12pub struct SyncChannel<T> {
13 _s: Arc<Mutex<Sender<T>>>,
14 _r: Arc<Mutex<Receiver<T>>>
15}
16
17impl<T> SyncChannel<T> {
18 pub fn new() -> Self {
19 let (s, r) = channel::<T>();
20 SyncChannel { _s: Arc::new(Mutex::new(s)), _r: Arc::new(Mutex::new(r)) }
21 }
22
23 pub fn send(&self, data: T) -> Result<(), ChannelError> {
24 self._s.lock()
25 .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
26 .send(data)
27 .map_err(|_| { ChannelError::SendError })
28 }
29
30 pub fn recv(&self) -> Result<T, ChannelError> {
31 self._r.lock()
32 .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
33 .recv()
34 .map_err(|_| { ChannelError::RecvError })
35 }
36
37 pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
38 match self._r.lock()
39 .map_err(|e| { ChannelError::SyncError(format!("Mutex poisoned: {:?}", e)) })?
40 .try_recv() {
41
42 Err(TryRecvError::Empty) => Ok(None),
43 Err(_) => Err(ChannelError::RecvError),
44 Ok(d) => Ok(Some(d))
45 }
46
47 }
48}
49
50impl<T> Clone for SyncChannel<T> {
51 fn clone(&self) -> Self {
52 SyncChannel {
53 _r: self._r.clone(),
54 _s: self._s.clone(),
55 }
56 }
57}
58unsafe impl<T> Send for SyncChannel<T> {}
59unsafe impl<T> Sync for SyncChannel<T> {}
60
61#[cfg(test)]
62mod tests {
63 use super::*;
64 use std::thread;
65 use std::sync::atomic::{AtomicBool, Ordering};
66
67 #[test]
68 fn test_channels() {
69 let ch: SyncChannel<i32> = SyncChannel::new();
70 let ch_clone = ch.clone();
71 thread::spawn(move || {
72 ch_clone.send(3)
73 });
74 assert_eq!(ch.recv().unwrap(), 3);
75 }
76
77 #[test]
78 fn test_channels_try_recv() {
79 let ch: SyncChannel<i32> = SyncChannel::new();
80 let ch_clone = ch.clone();
81 let switch = Arc::new(AtomicBool::new(false));
82 let sw_clone = switch.clone();
83 assert_eq!(ch.try_recv().unwrap(), None);
84
85 thread::spawn(move || {
86 ch_clone.send(3).unwrap();
87 sw_clone.store(true, Ordering::Relaxed);
88 });
89 loop {
90 if switch.load(Ordering::Relaxed) { break; }
91 }
92 assert_eq!(ch.try_recv().unwrap(), Some(3));
93 }
94}