ordered_channel/
lib.rs

1//! Equivalent of mpsc or crossbeam-channel, but allows sending messages with an index associated with them,
2//! to be delivered in the specific indexed order.
3//!
4//! This enables performing multi-threaded work without accidentally reordering results.
5//!
6//! Results are delivered as soon as possible, with minimal buffering.
7//! Items received out of order are temporarily kept in a binary heap.
8
9#![allow(clippy::match_same_arms)]
10#[cfg(feature = "crossbeam-channel")]
11/// Error type
12///
13/// Use these re-exported aliases, since they change depending on configured back-end.
14///
15pub use crossbeam_channel::{SendError, TrySendError, RecvError, TryRecvError};
16#[cfg(feature = "crossbeam-channel")]
17use crossbeam_channel::{
18    Receiver as PlainReceiver,
19    Sender as PlainSender,
20    Sender as PlainSyncSender,
21    bounded as plain_bounded,
22    unbounded as plain_unbounded,
23};
24
25#[cfg(not(feature = "crossbeam-channel"))]
26/// Error type
27///
28/// Use these re-exported aliases, since they change depending on configured back-end.
29///
30pub use std::sync::mpsc::{SendError, TrySendError, RecvError, TryRecvError};
31#[cfg(not(feature = "crossbeam-channel"))]
32use std::sync::mpsc::{
33    Receiver as PlainReceiver,
34    Sender as PlainSender,
35    SyncSender as PlainSyncSender,
36    sync_channel as plain_bounded,
37    channel as plain_unbounded,
38};
39
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42use std::iter::FusedIterator;
43
44enum SenderKind<T> {
45    Bounded(PlainSyncSender<T>),
46    Unbounded(PlainSender<T>),
47}
48
49/// A channel sender that orders messages by an index
50///
51/// It's cheap to clone
52pub struct Sender<T> {
53    sender: SenderKind<OrderByKey<T>>,
54}
55
56impl<T> Clone for Sender<T> {
57    #[inline]
58    fn clone(&self) -> Self {
59        Self { sender: match &self.sender {
60            SenderKind::Bounded(s) => SenderKind::Bounded(s.clone()),
61            SenderKind::Unbounded(s) => SenderKind::Unbounded(s.clone()),
62        } }
63    }
64}
65
66/// Receiver that orders messages by an index
67pub struct Receiver<T> {
68    receiver: PlainReceiver<OrderByKey<T>>,
69    next_index: usize,
70    receive_buffer: BinaryHeap<OrderByKey<T>>,
71}
72
73impl<T> Receiver<T> {
74    /// Gets a message with the next *consecutive* index
75    ///
76    /// Blocks until next message in order is received, or until all senders are dropped.
77    /// Messages are never lost.
78    pub fn recv(&mut self) -> Result<T, RecvError> {
79        while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
80            match self.receiver.recv() {
81                Ok(OrderByKey(index, item)) if index <= self.next_index => {
82                    self.next_index = self.next_index.max(index + 1);
83                    return Ok(item);
84                },
85                Ok(queued) => {
86                    self.receive_buffer.push(queued);
87                },
88                Err(_) => {
89                    // Sender dropped (but continue to dump receive_buffer)
90                    break;
91                },
92            }
93        }
94
95        let OrderByKey(index, item) = self.receive_buffer.pop()
96            .ok_or(RecvError)?;
97        self.next_index = self.next_index.max(index + 1);
98        Ok(item)
99    }
100
101    /// Does not block, and returns immediately if there are no messages ready to take
102    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
103        while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
104            match self.receiver.try_recv() {
105                Ok(OrderByKey(index, item)) if index <= self.next_index => {
106                    self.next_index = self.next_index.max(index + 1);
107                    return Ok(item);
108                },
109                Ok(queued) => {
110                    self.receive_buffer.push(queued);
111                },
112                Err(e @ TryRecvError::Empty) => {
113                    return Err(e);
114                },
115                Err(TryRecvError::Disconnected) => {
116                    // Sender dropped (but continue to dump receive_buffer)
117                    break;
118                },
119            }
120        }
121
122        let OrderByKey(index, item) = self.receive_buffer.pop()
123            .ok_or(TryRecvError::Disconnected)?;
124        self.next_index = self.next_index.max(index + 1);
125        Ok(item)
126    }
127}
128
129/// Make a blocking channel with finite size
130///
131/// Returns `(sender, receiver)`
132#[inline]
133#[must_use]
134pub fn bounded<T>(depth: usize) -> (Sender<T>, Receiver<T>) {
135    let (tx, rx) = plain_bounded(depth);
136    make(SenderKind::Bounded(tx), rx)
137}
138
139/// Make a channel that can grow until the program runs out of memory
140///
141/// Returns `(sender, receiver)`
142#[inline]
143#[must_use]
144pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
145    let (tx, rx) = plain_unbounded();
146    make(SenderKind::Unbounded(tx), rx)
147}
148
149#[inline]
150fn make<T>(sender: SenderKind<OrderByKey<T>>, receiver: PlainReceiver<OrderByKey<T>>) -> (Sender<T>, Receiver<T>) {
151    (Sender {
152        sender,
153    }, Receiver {
154        receiver,
155        next_index: 0,
156        receive_buffer: BinaryHeap::new(),
157    })
158}
159
160impl<T: Send> Sender<T> {
161    /// It's important that indexes are consecutive and have no holes.
162    /// Count starts at 0. Indexes from `iter().enumerate()` will work great.
163    ///
164    /// If any integer is missing, the receiver will wait for it until
165    /// all senders are dropped.
166    ///
167    /// If any integer is sent more than once, only the first one received
168    /// is going to be ordered, and the order of the remaining ones is undefined.
169    #[inline]
170    pub fn send(&self, index: usize, item: T) -> Result<(), SendError<T>> {
171        match &self.sender {
172            SenderKind::Bounded(s) => s.send(OrderByKey(index, item)),
173            SenderKind::Unbounded(s) => s.send(OrderByKey(index, item)),
174        }
175        .map_err(|SendError(OrderByKey(_, e))| SendError(e))
176    }
177
178    /// Does not send if the channel is bounded and full.
179    ///
180    /// For unbounded channels it's the same as `send()`.
181    #[inline]
182    pub fn try_send(&self, index: usize, item: T) -> Result<(), TrySendError<T>> {
183        match &self.sender {
184            SenderKind::Bounded(s) => match s.try_send(OrderByKey(index, item)) {
185                Ok(()) => Ok(()),
186                Err(TrySendError::Full(OrderByKey(_, e))) => Err(TrySendError::Full(e)),
187                Err(TrySendError::Disconnected(OrderByKey(_, e))) => Err(TrySendError::Disconnected(e)),
188            },
189            SenderKind::Unbounded(s) => match s.send(OrderByKey(index, item)) {
190                Ok(()) => Ok(()),
191                Err(SendError(OrderByKey(_, e))) => Err(TrySendError::Disconnected(e)),
192            },
193        }
194    }
195}
196
197impl<T> FusedIterator for Receiver<T> {}
198
199impl<T> Iterator for Receiver<T> {
200    type Item = T;
201
202    #[inline]
203    fn next(&mut self) -> Option<T> {
204        self.recv().ok()
205    }
206}
207
208struct OrderByKey<T>(usize, T);
209impl<T> PartialEq for OrderByKey<T> {
210    #[inline]
211    fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
212}
213impl<T> Eq for OrderByKey<T> {}
214impl<T> PartialOrd for OrderByKey<T> {
215    #[inline]
216    fn partial_cmp(&self, o: &Self) -> Option<Ordering> { Some(self.cmp(o)) }
217}
218impl<T> Ord for OrderByKey<T> {
219    #[inline]
220    fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
221}
222
223#[test]
224fn test() {
225    let (s, mut r) = bounded(10);
226    s.send(1, "B").unwrap();
227    s.send(0, "A").unwrap();
228    s.send(200, "X").unwrap();
229    s.send(0, "bad A").unwrap();
230    std::thread::spawn(move || {
231        s.send(2, "C").unwrap();
232    });
233    assert_eq!("A", r.recv().unwrap());
234    assert_eq!("B", r.recv().unwrap());
235    assert_eq!("bad A", r.recv().unwrap());
236    assert_eq!("C", r.recv().unwrap());
237    assert_eq!("X", r.recv().unwrap());
238    assert!(r.recv().is_err());
239}
240
241#[test]
242fn test_recovers_order() {
243    let (s, mut r) = unbounded();
244    s.send(1, "B").unwrap();
245    s.send(0, "A").unwrap();
246    assert_eq!("A", r.recv().unwrap());
247    assert_eq!("B", r.recv().unwrap());
248
249    s.send(3, "D").unwrap();
250    s.send(0, "bad A").unwrap();
251    s.send(2, "C").unwrap();
252    assert_eq!("bad A", r.recv().unwrap());
253
254    assert_eq!("C", r.recv().unwrap());
255    drop(s);
256    assert_eq!("D", r.recv().unwrap());
257    assert!(r.recv().is_err());
258}
259
260#[test]
261fn test_recovers_order_buffered() {
262    let (s, mut r) = unbounded();
263    s.send(3, "D").unwrap();
264    s.send(1, "B").unwrap();
265    s.send(4, "E").unwrap();
266    s.send(1, "bad B").unwrap();
267    s.send(0, "A").unwrap();
268    assert_eq!("A", r.recv().unwrap());
269    assert_eq!("B", r.recv().unwrap());
270    assert_eq!("bad B", r.recv().unwrap());
271
272    s.send(2, "C").unwrap();
273    assert_eq!("C", r.recv().unwrap());
274    drop(s);
275    assert_eq!("D", r.recv().unwrap());
276    assert_eq!("E", r.recv().unwrap());
277    assert!(r.recv().is_err());
278}
279
280#[test]
281fn test_try() {
282    let (s, mut r) = bounded(10);
283    s.try_send(1, "B").unwrap();
284    s.try_send(0, "A").unwrap();
285    s.try_send(2, "C").unwrap();
286
287    assert_eq!("A", r.try_recv().unwrap());
288    assert_eq!("B", r.try_recv().unwrap());
289    assert_eq!("C", r.try_recv().unwrap());
290    drop(s);
291    assert!(r.try_recv().is_err());
292    assert!(r.recv().is_err());
293}