1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use async_channel as ac;
use async_channel::Receiver;
use async_channel::Sender;
use async_trait::async_trait;
use crate::err::Error;
use crate::err::Result;
use crate::types::channel::Channel;
#[derive(Debug)]
pub struct AcChannel<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
#[async_trait]
impl<T: Send> Channel<T> for AcChannel<T>
where T: std::fmt::Debug
{
type Sender = Sender<T>;
type Receiver = Receiver<T>;
fn new() -> Self {
let (tx, rx) = ac::unbounded();
Self {
sender: tx,
receiver: rx,
}
}
fn sender(&self) -> Self::Sender {
self.sender.clone()
}
fn receiver(&self) -> Self::Receiver {
self.receiver.clone()
}
async fn send(sender: &Self::Sender, msg: T) -> Result<()> {
tracing::debug!("channel sending message: {:?}", msg);
match sender.send(msg).await {
Ok(_) => {
tracing::debug!("channel send message success");
Ok(())
}
Err(_) => Err(Error::ChannelSendMessageFailed),
}
}
async fn recv(receiver: &Self::Receiver) -> Result<Option<T>> {
match receiver.recv().await {
Ok(v) => {
tracing::debug!("channel received message: {:?}", v);
Ok(Some(v))
}
Err(_) => Err(Error::ChannelRecvMessageFailed),
}
}
}