1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//! Default channel handle for `node` feature.
use async_channel as ac;
use async_channel::Receiver;
use async_channel::Sender;
use async_trait::async_trait;

use crate::err::Error;
use crate::err::Result;
use crate::types::channel::Channel;

/// Channel combine with async_channel::Sender and async_channel::Receiver.
#[derive(Debug)]
pub struct AcChannel<T> {
    /// async channel send `Message` through sender.
    sender: Sender<T>,
    /// async channel rece `Message` through receiver.
    receiver: Receiver<T>,
}

#[async_trait]
impl<T: Send> Channel<T> for AcChannel<T>
where T: std::fmt::Debug
{
    type Sender = Sender<T>;
    type Receiver = Receiver<T>;

    fn new() -> Self {
        let (tx, rx) = ac::unbounded();
        Self {
            sender: tx,
            receiver: rx,
        }
    }

    fn sender(&self) -> Self::Sender {
        self.sender.clone()
    }

    fn receiver(&self) -> Self::Receiver {
        self.receiver.clone()
    }

    async fn send(sender: &Self::Sender, msg: T) -> Result<()> {
        tracing::debug!("channel sending message: {:?}", msg);
        match sender.send(msg).await {
            Ok(_) => {
                tracing::debug!("channel send message success");
                Ok(())
            }
            Err(_) => Err(Error::ChannelSendMessageFailed),
        }
    }

    async fn recv(receiver: &Self::Receiver) -> Result<Option<T>> {
        match receiver.recv().await {
            Ok(v) => {
                tracing::debug!("channel received message: {:?}", v);
                Ok(Some(v))
            }
            Err(_) => Err(Error::ChannelRecvMessageFailed),
        }
    }
}