#![allow(clippy::unwrap_used)] #![allow(clippy::unreadable_literal)] #![allow(clippy::items_after_statements)]
mod common;
use common::connect_test_client;
use futures::stream::StreamExt;
use std::time::Instant;
#[tokio::test]
async fn test_load_moderate_volume() {
println!("Test: Moderate data volume streaming");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(" Rows: {}", count);
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.0} rows/sec", throughput);
assert!(count > 0, "should have received rows");
assert!(throughput > 0.0, "throughput should be positive");
}
#[tokio::test]
async fn test_load_large_volume_custom_chunk() {
println!("Test: Large volume with custom chunk size");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_task")
.chunk_size(512) .execute()
.await
.expect("failed to execute query");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(" Rows: {}", count);
println!(" Chunk size: 512");
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.0} rows/sec", throughput);
assert!(count > 0, "should have received rows");
}
#[tokio::test]
async fn test_load_with_sql_predicate() {
println!("Test: With SQL predicate filtering");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.where_sql("data->>'status' = 'active'")
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(" Rows: {}", count);
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.0} rows/sec", throughput);
println!(" (Predicate filtering should reduce row count)");
}
#[tokio::test]
async fn test_load_with_rust_predicate() {
println!("Test: With Rust predicate filtering");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_user")
.where_rust(|json| {
json.get("profile").is_some()
})
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(" Rows: {}", count);
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.0} rows/sec", throughput);
}
#[tokio::test]
async fn test_load_large_json_objects() {
println!("Test: Large JSON objects");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_document")
.chunk_size(32) .execute()
.await
.expect("failed to execute query");
let mut count = 0;
let mut total_size: usize = 0;
while let Some(result) = stream.next().await {
let value = result.expect("failed to deserialize row");
let size = value.to_string().len();
total_size += size;
count += 1;
}
let elapsed = start.elapsed();
let avg_size = total_size.checked_div(count).unwrap_or(0);
println!(" Rows: {}", count);
println!(" Total size: {} bytes", total_size);
println!(" Average size: {} bytes", avg_size);
println!(" Time: {:?}", elapsed);
assert!(count > 0, "should have received at least one large object");
}
#[tokio::test]
async fn test_load_with_order_by() {
println!("Test: With ORDER BY clause");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.order_by("data->>'name' ASC")
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
let mut prev_name: Option<String> = None;
while let Some(result) = stream.next().await {
let value = result.expect("failed to deserialize row");
let name = value
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
if let Some(ref pn) = prev_name {
assert!(pn <= &name, "order violation: {} > {}", pn, name);
}
prev_name = Some(name);
count += 1;
}
let elapsed = start.elapsed();
println!(" Rows: {}", count);
println!(" Time: {:?}", elapsed);
println!(" Ordering verified: ✓");
}
#[tokio::test]
async fn test_load_multiple_sequential_connections() {
println!("Test: Multiple sequential connections");
let num_connections = 5;
let mut total_rows = 0;
for i in 0..num_connections {
let client = connect_test_client().await.expect("failed to connect");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("failed to execute");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize");
count += 1;
}
let elapsed = start.elapsed();
total_rows += count;
println!(" Connection {}: {} rows in {:?}", i, count, elapsed);
assert!(count > 0, "connection {} should have received rows", i);
}
println!(" Total connections: {}", num_connections);
println!(" Total rows: {}", total_rows);
println!(" Sequential streaming: ✓");
}
#[tokio::test]
async fn test_load_sustained_streaming() {
println!("Test: Sustained streaming (duration test)");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(" Duration: {:?}", elapsed);
println!(" Rows: {}", count);
println!(" Throughput: {:.0} rows/sec", throughput);
println!(" Sustained streaming: ✓");
}
#[tokio::test]
async fn test_load_chunk_size_comparison() {
println!("Test: Chunk size performance comparison");
let chunk_sizes = vec![16, 32, 64, 128, 256, 512];
for chunk_size in chunk_sizes {
let client = connect_test_client().await.expect("failed to connect");
let start = Instant::now();
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.chunk_size(chunk_size)
.execute()
.await
.expect("failed to execute");
let mut count = 0;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize");
count += 1;
}
let elapsed = start.elapsed();
let throughput = f64::from(count) / elapsed.as_secs_f64();
println!(
" Chunk {}: {:.0} rows/sec ({} rows in {:?})",
chunk_size, throughput, count, elapsed
);
}
}
#[tokio::test]
async fn test_load_partial_stream_drop() {
println!("Test: Partial stream consumption and drop");
let client = connect_test_client()
.await
.expect("failed to connect to test database");
let mut stream = client
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("failed to execute query");
let mut count = 0;
const LIMIT: usize = 2;
while let Some(result) = stream.next().await {
let _value = result.expect("failed to deserialize row");
count += 1;
if count >= LIMIT {
break; }
}
println!(" Consumed: {} rows", count);
println!(" Stream dropped early: ✓");
let client2 = connect_test_client()
.await
.expect("should be able to reconnect");
let mut stream2 = client2
.query::<serde_json::Value>("test.v_project")
.execute()
.await
.expect("failed to execute second query");
let mut count2 = 0;
while let Some(result) = stream2.next().await {
let _value = result.expect("failed to deserialize row");
count2 += 1;
if count2 >= 1 {
break; }
}
println!(" Reconnection: ✓");
assert!(count2 > 0, "second connection should work");
}