1#![allow(clippy::match_same_arms)]
10#[cfg(feature = "crossbeam-channel")]
11pub use crossbeam_channel::{SendError, TrySendError, RecvError, TryRecvError};
16#[cfg(feature = "crossbeam-channel")]
17use crossbeam_channel::{
18 Receiver as PlainReceiver,
19 Sender as PlainSender,
20 Sender as PlainSyncSender,
21 bounded as plain_bounded,
22 unbounded as plain_unbounded,
23};
24
25#[cfg(not(feature = "crossbeam-channel"))]
26pub use std::sync::mpsc::{SendError, TrySendError, RecvError, TryRecvError};
31#[cfg(not(feature = "crossbeam-channel"))]
32use std::sync::mpsc::{
33 Receiver as PlainReceiver,
34 Sender as PlainSender,
35 SyncSender as PlainSyncSender,
36 sync_channel as plain_bounded,
37 channel as plain_unbounded,
38};
39
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42use std::iter::FusedIterator;
43
44enum SenderKind<T> {
45 Bounded(PlainSyncSender<T>),
46 Unbounded(PlainSender<T>),
47}
48
49pub struct Sender<T> {
53 sender: SenderKind<OrderByKey<T>>,
54}
55
56impl<T> Clone for Sender<T> {
57 #[inline]
58 fn clone(&self) -> Self {
59 Self { sender: match &self.sender {
60 SenderKind::Bounded(s) => SenderKind::Bounded(s.clone()),
61 SenderKind::Unbounded(s) => SenderKind::Unbounded(s.clone()),
62 } }
63 }
64}
65
66pub struct Receiver<T> {
68 receiver: PlainReceiver<OrderByKey<T>>,
69 next_index: usize,
70 receive_buffer: BinaryHeap<OrderByKey<T>>,
71}
72
73impl<T> Receiver<T> {
74 pub fn recv(&mut self) -> Result<T, RecvError> {
79 while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
80 match self.receiver.recv() {
81 Ok(OrderByKey(index, item)) if index <= self.next_index => {
82 self.next_index = self.next_index.max(index + 1);
83 return Ok(item);
84 },
85 Ok(queued) => {
86 self.receive_buffer.push(queued);
87 },
88 Err(_) => {
89 break;
91 },
92 }
93 }
94
95 let OrderByKey(index, item) = self.receive_buffer.pop()
96 .ok_or(RecvError)?;
97 self.next_index = self.next_index.max(index + 1);
98 Ok(item)
99 }
100
101 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
103 while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
104 match self.receiver.try_recv() {
105 Ok(OrderByKey(index, item)) if index <= self.next_index => {
106 self.next_index = self.next_index.max(index + 1);
107 return Ok(item);
108 },
109 Ok(queued) => {
110 self.receive_buffer.push(queued);
111 },
112 Err(e @ TryRecvError::Empty) => {
113 return Err(e);
114 },
115 Err(TryRecvError::Disconnected) => {
116 break;
118 },
119 }
120 }
121
122 let OrderByKey(index, item) = self.receive_buffer.pop()
123 .ok_or(TryRecvError::Disconnected)?;
124 self.next_index = self.next_index.max(index + 1);
125 Ok(item)
126 }
127}
128
129#[inline]
133#[must_use]
134pub fn bounded<T>(depth: usize) -> (Sender<T>, Receiver<T>) {
135 let (tx, rx) = plain_bounded(depth);
136 make(SenderKind::Bounded(tx), rx)
137}
138
139#[inline]
143#[must_use]
144pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
145 let (tx, rx) = plain_unbounded();
146 make(SenderKind::Unbounded(tx), rx)
147}
148
149#[inline]
150fn make<T>(sender: SenderKind<OrderByKey<T>>, receiver: PlainReceiver<OrderByKey<T>>) -> (Sender<T>, Receiver<T>) {
151 (Sender {
152 sender,
153 }, Receiver {
154 receiver,
155 next_index: 0,
156 receive_buffer: BinaryHeap::new(),
157 })
158}
159
160impl<T: Send> Sender<T> {
161 #[inline]
170 pub fn send(&self, index: usize, item: T) -> Result<(), SendError<T>> {
171 match &self.sender {
172 SenderKind::Bounded(s) => s.send(OrderByKey(index, item)),
173 SenderKind::Unbounded(s) => s.send(OrderByKey(index, item)),
174 }
175 .map_err(|SendError(OrderByKey(_, e))| SendError(e))
176 }
177
178 #[inline]
182 pub fn try_send(&self, index: usize, item: T) -> Result<(), TrySendError<T>> {
183 match &self.sender {
184 SenderKind::Bounded(s) => match s.try_send(OrderByKey(index, item)) {
185 Ok(()) => Ok(()),
186 Err(TrySendError::Full(OrderByKey(_, e))) => Err(TrySendError::Full(e)),
187 Err(TrySendError::Disconnected(OrderByKey(_, e))) => Err(TrySendError::Disconnected(e)),
188 },
189 SenderKind::Unbounded(s) => match s.send(OrderByKey(index, item)) {
190 Ok(()) => Ok(()),
191 Err(SendError(OrderByKey(_, e))) => Err(TrySendError::Disconnected(e)),
192 },
193 }
194 }
195}
196
197impl<T> FusedIterator for Receiver<T> {}
198
199impl<T> Iterator for Receiver<T> {
200 type Item = T;
201
202 #[inline]
203 fn next(&mut self) -> Option<T> {
204 self.recv().ok()
205 }
206}
207
208struct OrderByKey<T>(usize, T);
209impl<T> PartialEq for OrderByKey<T> {
210 #[inline]
211 fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
212}
213impl<T> Eq for OrderByKey<T> {}
214impl<T> PartialOrd for OrderByKey<T> {
215 #[inline]
216 fn partial_cmp(&self, o: &Self) -> Option<Ordering> { Some(self.cmp(o)) }
217}
218impl<T> Ord for OrderByKey<T> {
219 #[inline]
220 fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
221}
222
223#[test]
224fn test() {
225 let (s, mut r) = bounded(10);
226 s.send(1, "B").unwrap();
227 s.send(0, "A").unwrap();
228 s.send(200, "X").unwrap();
229 s.send(0, "bad A").unwrap();
230 std::thread::spawn(move || {
231 s.send(2, "C").unwrap();
232 });
233 assert_eq!("A", r.recv().unwrap());
234 assert_eq!("B", r.recv().unwrap());
235 assert_eq!("bad A", r.recv().unwrap());
236 assert_eq!("C", r.recv().unwrap());
237 assert_eq!("X", r.recv().unwrap());
238 assert!(r.recv().is_err());
239}
240
241#[test]
242fn test_recovers_order() {
243 let (s, mut r) = unbounded();
244 s.send(1, "B").unwrap();
245 s.send(0, "A").unwrap();
246 assert_eq!("A", r.recv().unwrap());
247 assert_eq!("B", r.recv().unwrap());
248
249 s.send(3, "D").unwrap();
250 s.send(0, "bad A").unwrap();
251 s.send(2, "C").unwrap();
252 assert_eq!("bad A", r.recv().unwrap());
253
254 assert_eq!("C", r.recv().unwrap());
255 drop(s);
256 assert_eq!("D", r.recv().unwrap());
257 assert!(r.recv().is_err());
258}
259
260#[test]
261fn test_recovers_order_buffered() {
262 let (s, mut r) = unbounded();
263 s.send(3, "D").unwrap();
264 s.send(1, "B").unwrap();
265 s.send(4, "E").unwrap();
266 s.send(1, "bad B").unwrap();
267 s.send(0, "A").unwrap();
268 assert_eq!("A", r.recv().unwrap());
269 assert_eq!("B", r.recv().unwrap());
270 assert_eq!("bad B", r.recv().unwrap());
271
272 s.send(2, "C").unwrap();
273 assert_eq!("C", r.recv().unwrap());
274 drop(s);
275 assert_eq!("D", r.recv().unwrap());
276 assert_eq!("E", r.recv().unwrap());
277 assert!(r.recv().is_err());
278}
279
280#[test]
281fn test_try() {
282 let (s, mut r) = bounded(10);
283 s.try_send(1, "B").unwrap();
284 s.try_send(0, "A").unwrap();
285 s.try_send(2, "C").unwrap();
286
287 assert_eq!("A", r.try_recv().unwrap());
288 assert_eq!("B", r.try_recv().unwrap());
289 assert_eq!("C", r.try_recv().unwrap());
290 drop(s);
291 assert!(r.try_recv().is_err());
292 assert!(r.recv().is_err());
293}