any_mpsc/
buffered_receiver.rs1use dfb::*;
2
3use super::{AnySender, AnyRecvError};
4use core::any::*;
5use std::sync::mpsc::{self, Receiver};
6
7#[inline]
10pub fn buffered_channel() -> (AnySender, BufferedReceiver)
11{
12 let (tx, rx) = mpsc::channel();
13 (AnySender(tx), BufferedReceiver { rx, buf: Dfb::new() })
14}
15
16#[derive(Debug)]
18pub struct BufferedReceiver
19{
20 pub rx: Receiver<Box<dyn Any>>,
21 pub buf: Dfb
22}
23unsafe impl Send for BufferedReceiver {}
24
25impl BufferedReceiver
26{
27 pub fn recv_until<T: 'static>(&mut self) -> Result<T, AnyRecvError>
31 {
32 loop
33 {
34 match self.recv::<T>()
35 {
36 Err(err) => match err
37 {
38 AnyRecvError::BufRecvError(_type_id) => continue,
39 err => break Err(err),
40 },
41 ok => break ok
42 }
43 }
44 }
45
46 #[inline]
50 pub fn recv<T: 'static>(&mut self) -> Result<T, AnyRecvError>
51 {
52 match self.buf.remove::<T>()
53 {
54 Some(t) => Ok(t),
55 None => self.rx
56 .recv()
57 .map_err(AnyRecvError::RecvError)
58 .and_then(|r| match r.downcast()
59 {
60 Ok(r) => Ok(*r),
61 Err(r) =>
62 {
63 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
64 self.buf.insert_dyn(r);
65 err
66 },
67 })
68 }
69 }
70
71 #[inline]
75 pub fn recv_live<T: 'static>(&mut self) -> Result<T, AnyRecvError>
76 {
77 self.rx
78 .recv()
79 .map_err(AnyRecvError::RecvError)
80 .and_then(|r| match r.downcast()
81 {
82 Ok(r) => Ok(*r),
83 Err(r) =>
84 {
85 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
86 self.buf.insert_dyn(r);
87 err
88 },
89 })
90 }
91
92 #[inline]
96 pub fn recv_timeout<T: 'static>(&mut self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
97 {
98 match self.buf.remove::<T>()
99 {
100 Some(t) => Ok(t),
101 None => self.rx
102 .recv_timeout(timeout)
103 .map_err(AnyRecvError::RecvTimeoutError)
104 .and_then(|r| match r.downcast()
105 {
106 Ok(r) => Ok(*r),
107 Err(r) =>
108 {
109 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
110 self.buf.insert_dyn(r);
111 err
112 }
113 })
114 }
115 }
116
117 #[inline]
121 pub fn recv_timeout_live<T: 'static>(&mut self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
122 {
123 match self.buf.remove::<T>()
124 {
125 Some(t) => Ok(t),
126 None => self.rx
127 .recv_timeout(timeout)
128 .map_err(AnyRecvError::RecvTimeoutError)
129 .and_then(|r| match r.downcast()
130 {
131 Ok(r) => Ok(*r),
132 Err(r) =>
133 {
134 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
135 self.buf.insert_dyn(r);
136 err
137 }
138 })
139 }
140 }
141
142 #[inline]
146 pub fn try_recv<T: 'static>(&mut self) -> Result<T, AnyRecvError>
147 {
148 match self.buf.remove::<T>()
149 {
150 Some(t) => Ok(t),
151 None => self.rx
152 .try_recv()
153 .map_err(AnyRecvError::TryRecvError)
154 .and_then(|r| match r.downcast()
155 {
156 Ok(r) => Ok(*r),
157 Err(r) =>
158 {
159 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
160 self.buf.insert_dyn(r);
161 err
162 }
163 })
164 }
165 }
166
167 #[inline]
171 pub fn try_recv_live<T: 'static>(&mut self) -> Result<T, AnyRecvError>
172 {
173 match self.buf.remove::<T>()
174 {
175 Some(t) => Ok(t),
176 None => self.rx
177 .try_recv()
178 .map_err(AnyRecvError::TryRecvError)
179 .and_then(|r| match r.downcast()
180 {
181 Ok(r) => Ok(*r),
182 Err(r) =>
183 {
184 let err = Err(AnyRecvError::BufRecvError(r.as_ref().type_id()));
185 self.buf.insert_dyn(r);
186 err
187 }
188 })
189 }
190 }
191
192 #[inline]
195 pub fn recv_nobuf<T: 'static>(&self) -> Result<T, AnyRecvError>
196 {
197 self.rx
198 .recv()
199 .map_err(AnyRecvError::RecvError)
200 .and_then(|r| match r.downcast()
201 {
202 Ok(r) => Ok(*r),
203 Err(r) => Err(AnyRecvError::WrongType(r)),
204 })
205 }
206
207 #[inline]
210 pub fn recv_timeout_nobuf<T: 'static>(&self, timeout: std::time::Duration) -> Result<T, AnyRecvError>
211 {
212 self.rx
213 .recv_timeout(timeout)
214 .map_err(AnyRecvError::RecvTimeoutError)
215 .and_then(|r| match r.downcast()
216 {
217 Ok(r) => Ok(*r),
218 Err(r) => Err(AnyRecvError::WrongType(r)),
219 })
220 }
221
222 #[inline]
225 pub fn try_recv_nobuf<T: 'static>(&self) -> Result<T, AnyRecvError>
226 {
227 self.rx
228 .try_recv()
229 .map_err(AnyRecvError::TryRecvError)
230 .and_then(|r| match r.downcast()
231 {
232 Ok(r) => Ok(*r),
233 Err(r) => Err(AnyRecvError::WrongType(r)),
234 })
235 }
236
237 #[inline]
240 pub fn recv_buf<T: 'static>(&mut self) -> Result<T, AnyRecvError>
241 {
242 match self.buf.remove::<T>()
243 {
244 Some(t) => Ok(t),
245 None => Err(AnyRecvError::EmptyBuffer)
246 }
247 }
248}
249