any_mpsc/
lib.rs

1#[cfg(feature = "buf_recv")]
2mod buffered_receiver;
3#[cfg(feature = "buf_recv")]
4pub use buffered_receiver::*;
5
6use core::any::*;
7use std::{error::Error, fmt::Display, sync::mpsc::{self, Sender, Receiver}};
8
9/// An [mpsc::channel] that supports dynamic typing.
10#[inline]
11pub fn channel() -> (AnySender, AnyReceiver)
12{
13    let (tx, rx) = mpsc::channel();
14    (AnySender(tx), AnyReceiver(rx))
15}
16
17/// Wraps an [mpsc::Sender] to support dynamic typing.
18#[derive(Debug)]
19pub struct AnySender(pub Sender<Box<dyn Any>>);
20unsafe impl Send for AnySender {}
21
22impl AnySender
23{
24    /// Wraps [mpsc::Sender::send].
25    #[inline]
26    pub fn send<T: Any>(&self, t: T) -> Result<(), mpsc::SendError<Box<dyn Any>>>
27    {
28        self.0.send(Box::new(t))
29    }
30}
31
32/// Wraps an [mpsc::Receiver] to support dynamic typing.
33#[derive(Debug)]
34pub struct AnyReceiver(pub Receiver<Box<dyn Any>>);
35unsafe impl Send for AnyReceiver {}
36
37impl AnyReceiver
38{
39    /// Wraps [mpsc::Receiver::recv]. See [crate::AnyRecvError] for details on the 
40    /// return value.
41    #[inline]
42    pub fn recv<T: 'static>(&self) -> Result<T, AnyRecvError>
43    {
44        self.0
45            .recv()
46            .map_err(AnyRecvError::RecvError)
47            .and_then(|r| match r.downcast()
48            {
49                Ok(r) => Ok(*r),
50                Err(r) => Err(AnyRecvError::WrongType(r)),
51            })
52    }
53
54    /// Wraps [mpsc::Receiver::recv_timeout]. See [crate::AnyRecvError] for 
55    /// details on the return value.
56    #[inline]
57    pub fn recv_timeout<T: 'static>(&self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
58    {
59        self.0
60            .recv_timeout(timeout)
61            .map_err(AnyRecvError::RecvTimeoutError)
62            .and_then(|r| match r.downcast()
63            {
64                Ok(r) => Ok(*r),
65                Err(r) => Err(AnyRecvError::WrongType(r)),
66            })
67    }
68
69    /// Wraps [mpsc::Receiver::try_recv]. See [crate::AnyRecvError] for 
70    /// details on the return value.
71    #[inline]
72    pub fn try_recv<T: 'static>(&self) -> Result<T, AnyRecvError>
73    {
74        self.0
75            .try_recv()
76            .map_err(AnyRecvError::TryRecvError)
77            .and_then(|r| match r.downcast()
78            {
79                Ok(r) => Ok(*r),
80                Err(r) => Err(AnyRecvError::WrongType(r)),
81            })
82    }
83}
84
85/// Error type for receievers. If an [mpsc] error occurs, it will be wrapped
86/// by an appropriate wrapper variant. If receiver is supplied an incorrect type, 
87/// a [AnyRecvError::WrongType(Box<dyn Any>)] will be returned containing the result 
88/// that did not successfully downcast. If buffered receiver is supplied an 
89/// incorrect type, a [BufRecvError::WrongType(TypeId)] will be returned and the 
90/// result will be stored in a buffer. If [BufferedReceiver::recv_buf] is called
91/// with an empty buffer, EmptyBuffer will be returned
92#[derive(Debug)]
93pub enum AnyRecvError
94{
95    RecvError(mpsc::RecvError),
96    RecvTimeoutError(mpsc::RecvTimeoutError),
97    TryRecvError(mpsc::TryRecvError),
98    WrongType(Box<dyn Any>),
99    #[cfg(feature = "buf_recv")]
100    BufRecvError(TypeId),
101    #[cfg(feature = "buf_recv")]
102    EmptyBuffer
103}
104
105impl Display for AnyRecvError
106{
107    #[inline]
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result 
109    {
110        match self
111        {
112            AnyRecvError::RecvError(err) => err.fmt(f), 
113            AnyRecvError::RecvTimeoutError(err) => err.fmt(f),
114            AnyRecvError::TryRecvError(err) => err.fmt(f),
115            AnyRecvError::WrongType(_) => write!(f, "Received wrong type"),
116            AnyRecvError::BufRecvError(type_id) => write!(f, "Received wrong type: {:?}", type_id),
117            AnyRecvError::EmptyBuffer => write!(f, "Buffer is empty"),
118        }
119    }
120}
121
122impl Error for AnyRecvError 
123{
124    #[inline]
125    fn source(&self) -> Option<&(dyn Error + 'static)>
126    {
127        match self
128        {
129            AnyRecvError::RecvError(err) => Some(err),
130            AnyRecvError::RecvTimeoutError(err) => Some(err),
131            AnyRecvError::TryRecvError(err) => Some(err),
132            _ => None
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests 
139{
140    use crate::*;
141
142    #[test]
143    pub fn any_channel_test()
144    {
145        let (atx, arx) = channel();
146        atx.send(67 as i32).unwrap();
147        println!("{:?}", arx.recv::<u32>());
148    }
149    
150    #[test]
151    pub fn any_channel_test_1()
152    {
153        let (atx, arx) = channel();
154        atx.send(88 as i32).unwrap();
155        println!("{:?}", arx.recv::<i32>());
156    }
157    
158    #[test]
159    pub fn any_channel_test_2()
160    {
161        let to_send = 55.7f32;
162        let (atx, arx) = channel();
163        atx.send(to_send).unwrap();
164        println!("{:?}", arx.recv::<f32>());
165    }
166
167    #[test]
168    #[cfg(feature = "buf_recv")]
169    pub fn readme_test()
170    {
171        fn receive_handler<T: std::fmt::Debug + 'static>(rx: &mut BufferedReceiver)
172        {
173            match rx.recv::<T>()
174            {
175                Ok(result) => println!("{:?}", result),
176                Err(AnyRecvError::BufRecvError(type_id)) => println!("Type with id {:?} added to buffer", type_id),
177                Err(e) => eprintln!("{:?}", e)
178            }
179        }
180        
181        let (tx, mut rx) = crate::buffered_channel();
182        
183        tx.send(55.7f32).unwrap();
184        tx.send(String::from("example")).unwrap();
185        
186        receive_handler::<f32>(&mut rx);
187        receive_handler::<f32>(&mut rx);
188        receive_handler::<String>(&mut rx);
189    }
190
191    #[test]
192    #[cfg(feature = "buf_recv")]
193    pub fn recv_until_test()
194    {
195        use std::thread;
196        fn receive_handler<T: std::fmt::Debug + 'static>(rx: &mut BufferedReceiver)
197        {
198            match rx.recv_until::<T>()
199            {
200                Ok(result) => println!("{:?}", result),
201                Err(e) => eprintln!("{:?}", e)
202            }
203        }
204        
205        let (tx, mut rx) = crate::buffered_channel();
206
207        thread::spawn(move || 
208        {
209            receive_handler::<String>(&mut rx);
210            receive_handler::<f32>(&mut rx);
211        });
212        
213        tx.send(55.7f32).unwrap();
214        tx.send(String::from("example")).unwrap();
215    
216    }
217}