1use async_channel::{bounded, unbounded, Receiver, RecvError, SendError, Sender, TryRecvError, TrySendError, WeakReceiver};
2
3#[derive(Clone, Debug)]
5pub struct Channel<T> {
6 sender: Sender<T>,
7 receiver: Receiver<T>,
8}
9
10impl<T> Channel<T> {
11 pub fn new(channel: (Sender<T>, Receiver<T>)) -> Channel<T> {
12 Self { sender: channel.0, receiver: channel.1 }
13 }
14
15 pub fn bounded(capacity: usize) -> Channel<T> {
16 let channel = bounded(capacity);
17 Self { sender: channel.0, receiver: channel.1 }
18 }
19
20 pub fn sender(&self) -> Sender<T> {
21 self.sender.clone()
22 }
23
24 pub fn receiver(&self) -> Receiver<T> {
25 self.receiver.clone()
26 }
27
28 pub fn close(&self) {
29 self.receiver.close();
30 }
31
32 pub fn is_closed(&self) -> bool {
33 self.receiver.is_closed()
34 }
35
36 pub async fn recv(&self) -> Result<T, RecvError> {
37 self.receiver.recv().await
38 }
39
40 pub fn try_recv(&self) -> Result<T, TryRecvError> {
41 self.receiver.try_recv()
42 }
43
44 pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
45 self.sender.send(msg).await
46 }
47
48 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
49 self.sender.try_send(msg)
50 }
51
52 pub fn len(&self) -> usize {
53 self.receiver.len()
54 }
55
56 pub fn is_empty(&self) -> bool {
57 self.receiver.is_empty()
58 }
59
60 pub fn receiver_count(&self) -> usize {
61 self.sender.receiver_count()
62 }
63
64 pub fn sender_count(&self) -> usize {
65 self.sender.sender_count()
66 }
67}
68
69impl<T> Default for Channel<T> {
71 fn default() -> Self {
72 let ch = unbounded();
73 Self { sender: ch.0, receiver: ch.1 }
74 }
75}
76
77pub fn job<T>() -> (JobSender<T>, JobReceiver<T>) {
81 let (send, recv) = bounded(1);
82 (JobSender { sender: send, receiver: recv.downgrade() }, recv)
83}
84
85pub type JobReceiver<T> = Receiver<T>;
86
87pub type JobTrySendError<T> = TrySendError<T>;
88
89pub type JobTryRecvError = TryRecvError;
90
91#[derive(Clone)]
93pub struct JobSender<T> {
94 sender: Sender<T>,
95 receiver: WeakReceiver<T>, }
97
98impl<T> JobSender<T> {
99 pub fn try_send<F>(&self, mut msg: T, mut selector: F) -> Result<(), JobTrySendError<T>>
105 where
106 F: FnMut(T, T) -> T,
107 {
108 if let Some(receiver) = self.receiver.upgrade() {
109 while let Ok(prv) = receiver.try_recv() {
110 msg = selector(prv, msg);
111 }
112 }
113 self.sender.try_send(msg)
114 }
115}