use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::lock::Mutex;
use crate::error::Error;
use crate::error::Result;
use crate::types::channel::Channel;
type Sender<T> = Arc<mpsc::UnboundedSender<T>>;
type Receiver<T> = Arc<Mutex<mpsc::UnboundedReceiver<T>>>;
#[derive(Debug)]
pub struct CbChannel<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
#[async_trait(?Send)]
impl<T: Send> Channel<T> for CbChannel<T> {
type Sender = Sender<T>;
type Receiver = Receiver<T>;
fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Self {
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
}
}
fn sender(&self) -> Self::Sender {
self.sender.clone()
}
fn receiver(&self) -> Self::Receiver {
self.receiver.clone()
}
async fn send(sender: &Self::Sender, msg: T) -> Result<()> {
match sender.unbounded_send(msg) {
Ok(()) => Ok(()),
Err(_) => Err(Error::ChannelSendMessageFailed),
}
}
async fn recv(receiver: &Self::Receiver) -> Result<Option<T>> {
let mut receiver = receiver.lock().await;
match receiver.try_next() {
Err(_) => Ok(None),
Ok(Some(x)) => Ok(Some(x)),
Ok(None) => Ok(None),
}
}
}