use futures_util::StreamExt;
use rs2_stream::queue::Queue;
use tokio::runtime::Runtime;
#[test]
fn test_queue_basic() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(3);
assert!(queue.enqueue(1).await.is_ok());
assert!(queue.enqueue(2).await.is_ok());
assert!(queue.enqueue(3).await.is_ok());
let mut dequeue_stream = queue.dequeue();
assert_eq!(dequeue_stream.next().await.unwrap(), 1);
assert_eq!(dequeue_stream.next().await.unwrap(), 2);
assert_eq!(dequeue_stream.next().await.unwrap(), 3);
});
}
#[test]
fn test_queue_try_enqueue() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(2);
assert!(queue.try_enqueue(1).await.is_ok());
assert!(queue.try_enqueue(2).await.is_ok());
let result = queue.try_enqueue(3).await;
assert!(result.is_err());
if let Err(e) = result {
println!("Got expected error when queue full: {:?}", e);
}
});
}
#[test]
fn test_queue_close() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(3);
assert!(queue.enqueue(1).await.is_ok());
assert!(queue.enqueue(2).await.is_ok());
queue.close().await;
let result = queue.enqueue(3).await;
assert!(result.is_err());
if let Err(e) = result {
println!("Got expected error when queue closed: {:?}", e);
}
let mut dequeue_stream = queue.dequeue();
assert_eq!(dequeue_stream.next().await.unwrap(), 1);
assert_eq!(dequeue_stream.next().await.unwrap(), 2);
});
}
#[test]
fn test_queue_unbounded() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::unbounded();
for i in 0..100 {
assert!(queue.enqueue(i).await.is_ok());
}
let mut dequeue_stream = queue.dequeue();
for i in 0..100 {
assert_eq!(dequeue_stream.next().await.unwrap(), i);
}
});
}
#[test]
fn test_queue_try_enqueue_unbounded() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::unbounded();
for i in 0..50 {
assert!(queue.try_enqueue(i).await.is_ok());
}
let mut dequeue_stream = queue.dequeue();
for i in 0..50 {
assert_eq!(dequeue_stream.next().await.unwrap(), i);
}
});
}
#[test]
fn test_queue_close_unbounded() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::unbounded();
assert!(queue.enqueue(1).await.is_ok());
assert!(queue.enqueue(2).await.is_ok());
queue.close().await;
let result = queue.enqueue(3).await;
assert!(result.is_err());
let result = queue.try_enqueue(4).await;
assert!(result.is_err());
});
}
#[test]
fn test_queue_capacity() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(2);
if let Some(cap) = queue.capacity() {
assert_eq!(cap, 2);
}
let unbounded_queue = Queue::<i32>::unbounded();
assert_eq!(unbounded_queue.capacity(), None);
});
}
#[test]
fn test_queue_is_empty() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(3);
assert!(queue.is_empty().await);
queue.enqueue(1).await.unwrap();
assert!(!queue.is_empty().await);
let mut dequeue_stream = queue.dequeue();
dequeue_stream.next().await.unwrap();
assert!(queue.is_empty().await);
});
}
#[test]
fn test_queue_len() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(5);
assert_eq!(queue.len().await, 0);
queue.enqueue(1).await.unwrap();
queue.enqueue(2).await.unwrap();
assert_eq!(queue.len().await, 2);
let mut dequeue_stream = queue.dequeue();
dequeue_stream.next().await.unwrap();
assert_eq!(queue.len().await, 1);
dequeue_stream.next().await.unwrap();
assert_eq!(queue.len().await, 0);
});
}
#[test]
fn test_queue_dequeue_empty() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(3);
let mut dequeue_stream = queue.dequeue();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), dequeue_stream.next())
.await;
assert!(
result.is_err(),
"dequeue() should block/timeout on empty queue"
);
});
}
#[test]
fn test_queue_stream_multiple_items() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(5);
for i in 1..=5 {
queue.enqueue(i).await.unwrap();
}
let items: Vec<i32> = queue.dequeue().take(5).collect().await;
assert_eq!(items, vec![1, 2, 3, 4, 5]);
});
}
#[test]
fn test_queue_concurrent_access() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let queue = Queue::<i32>::bounded(10);
let queue_clone = queue.clone();
let enqueue_task = tokio::spawn(async move {
for i in 1..=10 {
queue_clone.enqueue(i).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
});
let mut dequeue_stream = queue.dequeue();
let mut collected = Vec::new();
for _ in 0..10 {
if let Some(item) = dequeue_stream.next().await {
collected.push(item);
}
}
enqueue_task.await.unwrap();
assert_eq!(collected.len(), 10);
assert_eq!(collected, (1..=10).collect::<Vec<i32>>());
});
}