kaspa_utils/
channel.rs

1use async_channel::{bounded, unbounded, Receiver, RecvError, SendError, Sender, TryRecvError, TrySendError, WeakReceiver};
2
3/// Multiple producers multiple consumers channel
4#[derive(Clone, Debug)]
5pub struct Channel<T> {
6    sender: Sender<T>,
7    receiver: Receiver<T>,
8}
9
10impl<T> Channel<T> {
11    pub fn new(channel: (Sender<T>, Receiver<T>)) -> Channel<T> {
12        Self { sender: channel.0, receiver: channel.1 }
13    }
14
15    pub fn bounded(capacity: usize) -> Channel<T> {
16        let channel = bounded(capacity);
17        Self { sender: channel.0, receiver: channel.1 }
18    }
19
20    pub fn sender(&self) -> Sender<T> {
21        self.sender.clone()
22    }
23
24    pub fn receiver(&self) -> Receiver<T> {
25        self.receiver.clone()
26    }
27
28    pub fn close(&self) {
29        self.receiver.close();
30    }
31
32    pub fn is_closed(&self) -> bool {
33        self.receiver.is_closed()
34    }
35
36    pub async fn recv(&self) -> Result<T, RecvError> {
37        self.receiver.recv().await
38    }
39
40    pub fn try_recv(&self) -> Result<T, TryRecvError> {
41        self.receiver.try_recv()
42    }
43
44    pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
45        self.sender.send(msg).await
46    }
47
48    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
49        self.sender.try_send(msg)
50    }
51
52    pub fn len(&self) -> usize {
53        self.receiver.len()
54    }
55
56    pub fn is_empty(&self) -> bool {
57        self.receiver.is_empty()
58    }
59
60    pub fn receiver_count(&self) -> usize {
61        self.sender.receiver_count()
62    }
63
64    pub fn sender_count(&self) -> usize {
65        self.sender.sender_count()
66    }
67}
68
69/// Default for a [`Channel<T>`] is unbounded
70impl<T> Default for Channel<T> {
71    fn default() -> Self {
72        let ch = unbounded();
73        Self { sender: ch.0, receiver: ch.1 }
74    }
75}
76
77/// Creates a special `job` channel where the sender might replace a previous pending job
78/// not consumed yet by the receiver. The internal channel has capacity of `1` but senders
79/// can attempt to replace the current `job` via `selector` logic. See [`JobSender::try_send`]
80pub fn job<T>() -> (JobSender<T>, JobReceiver<T>) {
81    let (send, recv) = bounded(1);
82    (JobSender { sender: send, receiver: recv.downgrade() }, recv)
83}
84
85pub type JobReceiver<T> = Receiver<T>;
86
87pub type JobTrySendError<T> = TrySendError<T>;
88
89pub type JobTryRecvError = TryRecvError;
90
91/// The sending side of a [`job`] channel.
92#[derive(Clone)]
93pub struct JobSender<T> {
94    sender: Sender<T>,
95    receiver: WeakReceiver<T>, // Avoid holding a strong receiver so that the channel will close when all actual receivers drop
96}
97
98impl<T> JobSender<T> {
99    /// Attempts to send a message into the job channel. If the channel already contains a message, `selector`
100    /// is applied to choose which one of them remains. Parallel senders might result in undefined message
101    /// selection, the failing sender will receive `TrySendError::Full`.
102    ///
103    /// If the channel is closed, this method returns an error.
104    pub fn try_send<F>(&self, mut msg: T, mut selector: F) -> Result<(), JobTrySendError<T>>
105    where
106        F: FnMut(T, T) -> T,
107    {
108        if let Some(receiver) = self.receiver.upgrade() {
109            while let Ok(prv) = receiver.try_recv() {
110                msg = selector(prv, msg);
111            }
112        }
113        self.sender.try_send(msg)
114    }
115}