#![allow(clippy::doc_markdown, clippy::print_stdout, clippy::print_stderr)]
mod common;
use common::connect_test_client;
use fraiseql_wire::stream::StreamState;
use futures::StreamExt;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_pause_idempotent() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("execute");
stream.pause().await.expect("first pause");
stream.pause().await.expect("second pause (idempotent)");
}
#[tokio::test]
async fn test_resume_idempotent() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("execute");
stream.resume().await.expect("resume before pause");
stream.resume().await.expect("second resume");
}
#[tokio::test]
async fn test_pause_stops_reading() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_task")
.execute()
.await
.expect("execute");
let mut count = 0;
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count += 1;
if count >= 5 {
break;
}
}
let stats_before = stream.stats();
let buffered_before = stats_before.items_buffered;
stream.pause().await.expect("pause");
sleep(Duration::from_millis(100)).await;
let stats_after = stream.stats();
let buffered_after = stats_after.items_buffered;
assert!(
buffered_after <= buffered_before + 1,
"Buffered count should not increase after pause: before={}, after={}",
buffered_before,
buffered_after
);
}
#[tokio::test]
async fn test_resume_continues() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_task")
.execute()
.await
.expect("execute");
let mut count_before_pause = 0;
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count_before_pause += 1;
if count_before_pause >= 3 {
break;
}
}
stream.pause().await.expect("pause");
sleep(Duration::from_millis(50)).await;
stream.resume().await.expect("resume");
let mut count_after_resume = 0;
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count_after_resume += 1;
if count_after_resume >= 3 {
break;
}
}
println!("Collected {} items after resume", count_after_resume);
}
#[tokio::test]
async fn test_pause_on_completed_is_idempotent() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.where_sql("FALSE") .execute()
.await
.expect("execute");
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
}
let result = stream.pause().await;
assert!(
result.is_ok(),
"Pause is idempotent in current implementation"
);
}
#[tokio::test]
async fn test_resume_on_completed_is_idempotent() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.where_sql("FALSE") .execute()
.await
.expect("execute");
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
}
let result = stream.resume().await;
assert!(
result.is_ok(),
"Resume is idempotent in current implementation"
);
}
#[tokio::test]
async fn test_drop_while_paused_cleanup() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("execute");
let mut count = 0;
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count += 1;
if count >= 2 {
break;
}
}
stream.pause().await.expect("pause");
drop(stream);
}
#[tokio::test]
async fn test_pause_with_adaptive_chunking() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_task")
.adaptive_chunking(true)
.execute()
.await
.expect("execute");
let mut count = 0;
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count += 1;
if count >= 5 {
break;
}
}
stream.pause().await.expect("pause");
stream.resume().await.expect("resume");
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
count += 1;
if count >= 10 {
break;
}
}
println!(
"Collected {} items across pause/resume with adaptive chunking",
count
);
}
#[tokio::test]
async fn test_state_snapshot() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("execute");
let initial_state = stream.state_snapshot();
assert_eq!(
initial_state,
StreamState::Running,
"Initial state should be Running"
);
stream.pause().await.expect("pause");
let paused_state = stream.state_snapshot();
assert_eq!(
paused_state,
StreamState::Paused,
"State should be Paused after pause()"
);
stream.resume().await.expect("resume");
let resumed_state = stream.state_snapshot();
assert_eq!(
resumed_state,
StreamState::Running,
"State should be Running after resume()"
);
}
#[tokio::test]
async fn test_state_snapshot_completed() {
let client = connect_test_client().await.expect("connect");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.where_sql("FALSE") .execute()
.await
.expect("execute");
assert_eq!(
stream.state_snapshot(),
StreamState::Running,
"Initial state should be Running"
);
while let Some(result) = stream.next().await {
let _ = result.expect("parse row");
}
assert_eq!(
stream.state_snapshot(),
StreamState::Completed,
"State should be Completed after stream is exhausted"
);
}