d3_core/channel/
machine_channel.rs

1use self::{connection::*, receiver::*, sender::*};
2use super::*;
3
4/// This is a small wrapper around the crossbeam channel. There are a
5/// few reasons for this. We may want to use adifferent channel
6/// implementation in the future, so we want to encapsulate it.
7/// We want to take action when sending to a channel that is full,
8/// otherwise we block a thread, forcing the spawning of a thread.
9/// We want to enforce that the channels are limited to using
10/// only types that derive from MachineImpl.
11
12/// The channel id can be used in logging, otherwise its useless
13pub static CHANNEL_ID: AtomicUsize = AtomicUsize::new(1);
14
15/// Create a channel with a fixed capacity.
16pub fn channel_with_capacity<T>(capacity: usize) -> (Sender<T>, Receiver<T>)
17where
18    T: MachineImpl,
19{
20    let (s, r) = crossbeam::channel::bounded::<T>(capacity);
21    wrap(s, r)
22}
23
24/// Create a channel with an unlimited capacity. This should be
25/// used with caution, as it can cause a panic when sending.
26pub fn channel<T>() -> (Sender<T>, Receiver<T>)
27where
28    T: MachineImpl,
29{
30    let (s, r) = crossbeam::channel::unbounded::<T>();
31    wrap(s, r)
32}
33
34fn wrap<T>(sender: crossbeam::channel::Sender<T>, receiver: crossbeam::channel::Receiver<T>) -> (Sender<T>, Receiver<T>)
35where
36    T: MachineImpl,
37{
38    let channel_id = CHANNEL_ID.fetch_add(1, Ordering::SeqCst);
39    let (sc, rc) = Connection::new();
40    (
41        wrap_sender::<T>(sender, channel_id, sc),
42        wrap_receiver::<T>(receiver, channel_id, rc),
43    )
44}