use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
const DEFAULT_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct AsyncQueue<T> {
inner: Arc<AsyncQueueInner<T>>,
}
struct AsyncQueueInner<T> {
tx: mpsc::Sender<T>,
rx: Mutex<mpsc::Receiver<T>>,
}
impl<T> Default for AsyncQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> AsyncQueue<T> {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (tx, rx) = mpsc::channel(capacity);
AsyncQueue {
inner: Arc::new(AsyncQueueInner {
tx,
rx: Mutex::new(rx),
}),
}
}
pub async fn push(&self, item: T) -> Result<(), T> {
self.inner.tx.send(item).await.map_err(|e| e.0)
}
pub fn try_push(&self, item: T) -> Result<(), T> {
self.inner.tx.try_send(item).map_err(|e| match e {
mpsc::error::TrySendError::Full(item) => item,
mpsc::error::TrySendError::Closed(item) => item,
})
}
pub async fn pop(&self) -> Option<T> {
let mut rx = self.inner.rx.lock().await;
rx.recv().await
}
pub fn close(&self) {
}
pub fn is_closed(&self) -> bool {
self.inner.tx.is_closed()
}
pub fn len(&self) -> usize {
self.inner.tx.max_capacity() - self.inner.tx.capacity()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_push_pop() {
let queue = AsyncQueue::new();
queue.push(1).await.unwrap();
queue.push(2).await.unwrap();
queue.push(3).await.unwrap();
assert_eq!(queue.pop().await, Some(1));
assert_eq!(queue.pop().await, Some(2));
assert_eq!(queue.pop().await, Some(3));
}
#[tokio::test]
async fn test_try_push() {
let queue: AsyncQueue<i32> = AsyncQueue::with_capacity(2);
assert!(queue.try_push(1).is_ok());
assert!(queue.try_push(2).is_ok());
assert!(queue.try_push(3).is_err());
assert_eq!(queue.pop().await, Some(1));
assert!(queue.try_push(3).is_ok());
}
#[tokio::test]
async fn test_clone_and_push() {
let queue = AsyncQueue::new();
let queue2 = queue.clone();
queue.push(1).await.unwrap();
queue2.push(2).await.unwrap();
assert_eq!(queue.pop().await, Some(1));
assert_eq!(queue.pop().await, Some(2));
}
#[tokio::test]
async fn test_async_wait() {
let queue = AsyncQueue::new();
let queue_clone = queue.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
queue_clone.push(42).await.unwrap();
});
let item = queue.pop().await;
assert_eq!(item, Some(42));
handle.await.unwrap();
}
}