use amaters_sdk_rust::{QueryStream, Row, StreamConfig, streaming::spawn_stub_producer};
use futures::StreamExt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::time::{Duration, sleep, timeout};
#[tokio::test]
async fn test_stream_results() {
const COLLECTION: &str = "users";
let config = StreamConfig::default();
let (stream, sender) = QueryStream::new(&config);
let _handle = spawn_stub_producer(COLLECTION.to_string(), 128, sender, None);
let items: Vec<_> = stream.take(5).collect().await;
assert_eq!(items.len(), 5, "expected exactly 5 items");
for (i, item) in items.iter().enumerate() {
assert!(item.is_ok(), "item {i} should be Ok");
let row = item.as_ref().expect("row should be Ok");
let idx = u64::from_le_bytes(row.value[..8].try_into().expect("value should be 8 bytes"));
assert_eq!(idx, i as u64, "row {i}: unexpected index in value");
}
}
#[tokio::test]
async fn test_stream_backpressure() {
const BUF: usize = 8;
const TOTAL: usize = 32;
let config = StreamConfig::new(BUF);
let (stream, sender) = QueryStream::new(&config);
let producer_sent = Arc::clone(&sender.sent);
let produce_handle = tokio::spawn(async move {
for i in 0..TOTAL {
let row = Row::new(
format!("key:{i}").into_bytes(),
(i as u64).to_le_bytes().to_vec(),
);
if !sender.send_row(row).await {
break;
}
}
});
let mut received = 0usize;
let mut s = stream;
while let Some(item) = s.next().await {
assert!(item.is_ok(), "row should be Ok");
received += 1;
let sent = producer_sent.load(Ordering::Relaxed);
assert!(
sent <= received + BUF + 1,
"backpressure violated: sent={sent}, received={received}, buffer={BUF}"
);
sleep(Duration::from_millis(1)).await;
}
produce_handle.await.expect("producer task should finish");
assert_eq!(received, TOTAL, "consumer should receive all {TOTAL} rows");
}
#[tokio::test]
async fn test_stream_cancellation() {
const MANY: usize = 100_000;
let config = StreamConfig::new(4);
let (stream, sender) = QueryStream::new(&config);
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = Arc::clone(&finished);
tokio::spawn(async move {
spawn_stub_producer("cancel_test".to_string(), MANY, sender, None)
.await
.ok();
finished_clone.store(true, Ordering::Release);
});
let mut s = stream;
let _ = s.next().await;
let _ = s.next().await;
drop(s);
let result = timeout(Duration::from_secs(1), async {
loop {
if finished.load(Ordering::Acquire) {
break;
}
sleep(Duration::from_millis(5)).await;
}
})
.await;
assert!(
result.is_ok(),
"producer task did not terminate within 1 second after stream was dropped"
);
}
#[tokio::test]
async fn test_stream_config_timeout() {
let config = StreamConfig::new(8).with_timeout(2);
let timeout_secs = config.timeout_secs;
let (stream, sender) = QueryStream::new(&config);
let _handle = spawn_stub_producer("data".to_string(), 128, sender, timeout_secs);
let rows: Vec<_> = stream.collect().await;
for row in &rows {
assert!(row.is_ok());
}
}
#[tokio::test]
async fn test_stream_query_row_key_prefix() {
let collection = "inventory";
let config = StreamConfig::new(16);
let (stream, sender) = QueryStream::new(&config);
let _handle = spawn_stub_producer(collection.to_string(), 16, sender, None);
let rows: Vec<_> = stream.take(3).collect().await;
for item in &rows {
let row = item.as_ref().expect("row should be Ok");
let key_str = String::from_utf8_lossy(&row.key);
assert!(
key_str.starts_with(collection),
"key '{key_str}' should start with collection '{collection}'"
);
}
}