use std::sync::Arc;
use tokio::sync::broadcast;
use crate::{ActflowError, Result};
#[derive(Clone)]
pub struct Queue<T> {
receiver: Arc<flume::Receiver<T>>,
sender: Arc<flume::Sender<T>>,
}
#[allow(unused)]
impl<T> Queue<T> {
pub fn new(cap: usize) -> Arc<Self> {
let (tx, rx) = flume::bounded(cap);
Arc::new(Self {
receiver: Arc::new(rx),
sender: Arc::new(tx),
})
}
pub fn next(&self) -> Option<T> {
self.receiver.recv().ok()
}
pub fn send(
&self,
msg: T,
) -> Result<()> {
self.sender.send(msg).map_err(|e| ActflowError::Queue(e.to_string()))
}
pub async fn next_async(&self) -> Option<T> {
self.receiver.recv_async().await.ok()
}
pub async fn send_async(
&self,
msg: T,
) -> Result<()> {
self.sender.send_async(msg).await.map_err(|e| ActflowError::Queue(e.to_string()))
}
}
#[derive(Clone)]
pub struct BroadcastQueue<T> {
sender: Arc<broadcast::Sender<T>>,
}
impl<T: Clone> BroadcastQueue<T> {
pub fn new(cap: usize) -> Arc<Self> {
let (tx, _) = broadcast::channel(cap);
Arc::new(Self {
sender: Arc::new(tx),
})
}
pub fn send(
&self,
msg: T,
) -> Result<()> {
self.sender.send(msg).map_err(|e| ActflowError::Queue(e.to_string()))?;
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<T> {
self.sender.subscribe()
}
}