any_mpsc/
buffered_receiver.rs

1use dfb::*;
2
3use super::{AnySender, AnyRecvError};
4use core::any::*;
5use std::sync::mpsc::{self, Receiver};
6
7/// An [mpsc::channel] that supports dynamic typing and contains a buffer to 
8/// prevent the need for dynamic types to be exposed. 
9#[inline]
10pub fn buffered_channel() -> (AnySender, BufferedReceiver)
11{
12    let (tx, rx) = mpsc::channel();
13    (AnySender(tx), BufferedReceiver { rx, buf: Dfb::new() })
14}
15
16/// Wraps an [mpsc::Receiver] to support dynamic typing and buffered results.
17#[derive(Debug)]
18pub struct BufferedReceiver
19{
20    pub rx: Receiver<Box<dyn Any>>,
21    pub buf: Dfb
22}
23unsafe impl Send for BufferedReceiver {}
24
25impl BufferedReceiver
26{
27    /// Wraps [mpsc::Receiver::recv]. See [BufRecvError] for details on the 
28    /// return value. Will continuously recv until the supplied type is found. 
29    /// Any unmatching types received will be placed in the buffer.
30    pub fn recv_until<T: 'static>(&mut self) -> Result<T, AnyRecvError>
31    {
32        loop
33        {
34            match self.recv::<T>()
35            {
36                Err(err) => match err
37                {
38                    AnyRecvError::BufRecvError(_type_id) => continue,
39                    err => break Err(err),
40                },
41                ok => break ok
42            }
43        }
44    }
45
46    /// Wraps [mpsc::Receiver::recv]. See [BufRecvError] for details on the 
47    /// return value. Will attempt to take from the internal buffer before
48    /// performing an actual channel recv.
49    #[inline]
50    pub fn recv<T: 'static>(&mut self) -> Result<T, AnyRecvError>
51    {
52        match self.buf.remove::<T>()
53        {
54            Some(t) => Ok(t),
55            None => self.rx
56                .recv()
57                .map_err(AnyRecvError::RecvError)
58                .and_then(|r| match r.downcast()
59                {
60                    Ok(r) => Ok(*r),
61                    Err(r) => 
62                    {
63                        let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
64                        self.buf.insert_dyn(r);
65                        err
66                    },
67                })
68        }
69    }
70
71    /// Wraps [mpsc::Receiver::recv]. See [BufRecvError] for details on the 
72    /// return value. Will perform a channel recv regardless of whether or not
73    /// anything is contained in the buffer.
74    #[inline]
75    pub fn recv_live<T: 'static>(&mut self) -> Result<T, AnyRecvError>
76    {
77        self.rx
78            .recv()
79            .map_err(AnyRecvError::RecvError)
80            .and_then(|r| match r.downcast()
81            {
82                Ok(r) => Ok(*r),
83                Err(r) => 
84                {
85                    let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
86                    self.buf.insert_dyn(r);
87                    err
88                },
89            })
90    }
91
92    /// Wraps [mpsc::Receiver::recv_timeout]. See [BufRecvError] for 
93    /// details on the return value. Will attempt to take from the internal 
94    /// buffer before performing an actual channel recv_timeout.
95    #[inline]
96    pub fn recv_timeout<T: 'static>(&mut self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
97    {
98        match self.buf.remove::<T>()
99        {
100            Some(t) => Ok(t),
101            None => self.rx
102                .recv_timeout(timeout)
103                .map_err(AnyRecvError::RecvTimeoutError)
104                .and_then(|r| match r.downcast()
105                {
106                    Ok(r) => Ok(*r),
107                    Err(r) => 
108                    {
109                        let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
110                        self.buf.insert_dyn(r);
111                        err
112                    }
113                })
114        }
115    }
116
117    /// Wraps [mpsc::Receiver::recv_timeout]. See [BufRecvError] for 
118    /// details on the return value. Will perform a channel recv_timeout 
119    /// regardless of whether or not anything is contained in the buffer.
120    #[inline]
121    pub fn recv_timeout_live<T: 'static>(&mut self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
122    {
123        match self.buf.remove::<T>()
124        {
125            Some(t) => Ok(t),
126            None => self.rx
127                .recv_timeout(timeout)
128                .map_err(AnyRecvError::RecvTimeoutError)
129                .and_then(|r| match r.downcast()
130                {
131                    Ok(r) => Ok(*r),
132                    Err(r) => 
133                    {
134                        let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
135                        self.buf.insert_dyn(r);
136                        err
137                    }
138                })
139        }
140    }
141
142    /// Wraps [mpsc::Receiver::try_recv]. See [BufRecvError] for 
143    /// details on the return value. Will attempt to take from the internal 
144    /// buffer before performing an actual channel recv_timeout.
145    #[inline]
146    pub fn try_recv<T: 'static>(&mut self) -> Result<T, AnyRecvError>
147    {
148        match self.buf.remove::<T>()
149        {
150            Some(t) => Ok(t),
151            None => self.rx
152                .try_recv()
153                .map_err(AnyRecvError::TryRecvError)
154                .and_then(|r| match r.downcast()
155                {
156                    Ok(r) => Ok(*r),
157                    Err(r) => 
158                    {
159                        let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
160                        self.buf.insert_dyn(r);
161                        err
162                    }
163                })
164        }
165    }
166
167    /// Wraps [mpsc::Receiver::try_recv]. See [BufRecvError] for 
168    /// details on the return value. Will perform a channel recv_timeout 
169    /// regardless of whether or not anything is contained in the buffer.
170    #[inline]
171    pub fn try_recv_live<T: 'static>(&mut self) -> Result<T, AnyRecvError>
172    {
173        match self.buf.remove::<T>()
174        {
175            Some(t) => Ok(t),
176            None => self.rx
177                .try_recv()
178                .map_err(AnyRecvError::TryRecvError)
179                .and_then(|r| match r.downcast()
180                {
181                    Ok(r) => Ok(*r),
182                    Err(r) => 
183                    {
184                        let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
185                        self.buf.insert_dyn(r);
186                        err
187                    }
188                })
189        }
190    }
191
192    /// Wraps [mpsc::Receiver::recv]. See [crate::AnyRecvError] for details on the 
193    /// return value. Bypasses the buffer entirely.
194    #[inline]
195    pub fn recv_nobuf<T: 'static>(&self) -> Result<T, AnyRecvError>
196    {
197        self.rx
198            .recv()
199            .map_err(AnyRecvError::RecvError)
200            .and_then(|r| match r.downcast()
201            {
202                Ok(r) => Ok(*r),
203                Err(r) => Err(AnyRecvError::WrongType(r)),
204            })
205    }
206
207    /// Wraps [mpsc::Receiver::recv_timeout]. See [crate::AnyRecvError] for 
208    /// details on the return value. Bypasses the buffer entirely.
209    #[inline]
210    pub fn recv_timeout_nobuf<T: 'static>(&self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
211    {
212        self.rx
213            .recv_timeout(timeout)
214            .map_err(AnyRecvError::RecvTimeoutError)
215            .and_then(|r| match r.downcast()
216            {
217                Ok(r) => Ok(*r),
218                Err(r) => Err(AnyRecvError::WrongType(r)),
219            })
220    }
221
222    /// Wraps [mpsc::Receiver::try_recv]. See [crate::AnyRecvError] for 
223    /// details on the return value. Bypasses the buffer entirely.
224    #[inline]
225    pub fn try_recv_nobuf<T: 'static>(&self) -> Result<T, AnyRecvError>
226    {
227        self.rx
228            .try_recv()
229            .map_err(AnyRecvError::TryRecvError)
230            .and_then(|r| match r.downcast()
231            {
232                Ok(r) => Ok(*r),
233                Err(r) => Err(AnyRecvError::WrongType(r)),
234            })
235    }
236
237    /// Will attempt to read a value from the internal buffer. Will not do a
238    /// channel recv of any kind even if the buffer is empty.
239    #[inline]
240    pub fn recv_buf<T: 'static>(&mut self) -> Result<T, AnyRecvError>
241    {
242        match self.buf.remove::<T>()
243        {
244            Some(t) => Ok(t),
245            None => Err(AnyRecvError::EmptyBuffer)
246        }
247    }
248}
249