fraiseql-wire 2.2.1

Streaming JSON query engine for Postgres 17
Documentation
#![allow(clippy::unwrap_used)] // Reason: test code, panics acceptable
#![allow(clippy::unreadable_literal)] // Reason: test byte-count thresholds use large literals
#![allow(clippy::items_after_statements)] // Reason: test helper closures defined near use site

//! Load testing suite for fraiseql-wire
//!
//! Tests throughput, memory stability, and performance under sustained load.
//! Uses testcontainers to automatically spin up PostgreSQL with test data.

mod common;

use common::connect_test_client;
use futures::stream::StreamExt;
use std::time::Instant;

/// Test streaming with moderate data volume
#[tokio::test]
async fn test_load_moderate_volume() {
    println!("Test: Moderate data volume streaming");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_project")
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;
    }

    let elapsed = start.elapsed();
    let throughput = f64::from(count) / elapsed.as_secs_f64();

    println!("  Rows: {}", count);
    println!("  Time: {:?}", elapsed);
    println!("  Throughput: {:.0} rows/sec", throughput);

    assert!(count > 0, "should have received rows");
    assert!(throughput > 0.0, "throughput should be positive");
}

/// Test streaming with large data volume and custom chunk size
#[tokio::test]
async fn test_load_large_volume_custom_chunk() {
    println!("Test: Large volume with custom chunk size");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_task")
        .chunk_size(512) // Larger chunk for more rows per batch
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;
    }

    let elapsed = start.elapsed();
    let throughput = f64::from(count) / elapsed.as_secs_f64();

    println!("  Rows: {}", count);
    println!("  Chunk size: 512");
    println!("  Time: {:?}", elapsed);
    println!("  Throughput: {:.0} rows/sec", throughput);

    assert!(count > 0, "should have received rows");
}

/// Test streaming with WHERE predicate (reduces data volume)
#[tokio::test]
async fn test_load_with_sql_predicate() {
    println!("Test: With SQL predicate filtering");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_project")
        .where_sql("data->>'status' = 'active'")
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;
    }

    let elapsed = start.elapsed();
    let throughput = f64::from(count) / elapsed.as_secs_f64();

    println!("  Rows: {}", count);
    println!("  Time: {:?}", elapsed);
    println!("  Throughput: {:.0} rows/sec", throughput);
    println!("  (Predicate filtering should reduce row count)");
}

/// Test streaming with Rust predicate (client-side filtering)
#[tokio::test]
async fn test_load_with_rust_predicate() {
    println!("Test: With Rust predicate filtering");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_user")
        .where_rust(|json| {
            // Only accept users with profile info
            json.get("profile").is_some()
        })
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;
    }

    let elapsed = start.elapsed();
    let throughput = f64::from(count) / elapsed.as_secs_f64();

    println!("  Rows: {}", count);
    println!("  Time: {:?}", elapsed);
    println!("  Throughput: {:.0} rows/sec", throughput);
}

/// Test streaming documents (large JSON objects)
#[tokio::test]
async fn test_load_large_json_objects() {
    println!("Test: Large JSON objects");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_document")
        .chunk_size(32) // Small chunks for large objects
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    let mut total_size: usize = 0;

    while let Some(result) = stream.next().await {
        let value = result.expect("failed to deserialize row");
        let size = value.to_string().len();
        total_size += size;
        count += 1;
    }

    let elapsed = start.elapsed();
    let avg_size = total_size.checked_div(count).unwrap_or(0);

    println!("  Rows: {}", count);
    println!("  Total size: {} bytes", total_size);
    println!("  Average size: {} bytes", avg_size);
    println!("  Time: {:?}", elapsed);

    assert!(count > 0, "should have received at least one large object");
}

/// Test with ORDER BY (server-side sorting)
#[tokio::test]
async fn test_load_with_order_by() {
    println!("Test: With ORDER BY clause");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let start = Instant::now();
    let mut stream = client
        .query::<serde_json::Value>("test.v_project")
        .order_by("data->>'name' ASC")
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    let mut prev_name: Option<String> = None;

    while let Some(result) = stream.next().await {
        let value = result.expect("failed to deserialize row");
        let name = value
            .get("name")
            .and_then(|v| v.as_str())
            .unwrap_or("unknown")
            .to_string();

        // Verify ordering
        if let Some(ref pn) = prev_name {
            assert!(pn <= &name, "order violation: {} > {}", pn, name);
        }

        prev_name = Some(name);
        count += 1;
    }

    let elapsed = start.elapsed();

    println!("  Rows: {}", count);
    println!("  Time: {:?}", elapsed);
    println!("  Ordering verified: ✓");
}

/// Test multiple sequential connections (simulates concurrent workload)
#[tokio::test]
async fn test_load_multiple_sequential_connections() {
    println!("Test: Multiple sequential connections");

    let num_connections = 5;
    let mut total_rows = 0;

    for i in 0..num_connections {
        let client = connect_test_client().await.expect("failed to connect");

        let start = Instant::now();
        let mut stream = client
            .query::<serde_json::Value>("test.v_project")
            .execute()
            .await
            .expect("failed to execute");

        let mut count = 0;
        while let Some(result) = stream.next().await {
            let _value = result.expect("failed to deserialize");
            count += 1;
        }

        let elapsed = start.elapsed();

        total_rows += count;
        println!("    Connection {}: {} rows in {:?}", i, count, elapsed);

        assert!(count > 0, "connection {} should have received rows", i);
    }

    println!("  Total connections: {}", num_connections);
    println!("  Total rows: {}", total_rows);
    println!("  Sequential streaming: ✓");
}

/// Test streaming stability over time (look for memory leaks)
#[tokio::test]
async fn test_load_sustained_streaming() {
    println!("Test: Sustained streaming (duration test)");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    // For test data with only a few rows, we just verify we can stream all of them
    let start = Instant::now();

    let mut stream = client
        .query::<serde_json::Value>("test.v_project")
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;

    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;
    }

    let elapsed = start.elapsed();
    let throughput = f64::from(count) / elapsed.as_secs_f64();

    println!("  Duration: {:?}", elapsed);
    println!("  Rows: {}", count);
    println!("  Throughput: {:.0} rows/sec", throughput);
    println!("  Sustained streaming: ✓");
}

/// Benchmark different chunk sizes to find optimal throughput
#[tokio::test]
async fn test_load_chunk_size_comparison() {
    println!("Test: Chunk size performance comparison");

    let chunk_sizes = vec![16, 32, 64, 128, 256, 512];

    for chunk_size in chunk_sizes {
        let client = connect_test_client().await.expect("failed to connect");

        let start = Instant::now();
        let mut stream = client
            .query::<serde_json::Value>("test.v_project")
            .chunk_size(chunk_size)
            .execute()
            .await
            .expect("failed to execute");

        let mut count = 0;
        while let Some(result) = stream.next().await {
            let _value = result.expect("failed to deserialize");
            count += 1;
        }

        let elapsed = start.elapsed();
        let throughput = f64::from(count) / elapsed.as_secs_f64();

        println!(
            "  Chunk {}: {:.0} rows/sec ({} rows in {:?})",
            chunk_size, throughput, count, elapsed
        );
    }
}

/// Test error recovery during streaming
#[tokio::test]
async fn test_load_partial_stream_drop() {
    println!("Test: Partial stream consumption and drop");

    let client = connect_test_client()
        .await
        .expect("failed to connect to test database");

    let mut stream = client
        .query::<serde_json::Value>("test.v_project")
        .execute()
        .await
        .expect("failed to execute query");

    let mut count = 0;
    const LIMIT: usize = 2; // Only consume first 2 rows

    while let Some(result) = stream.next().await {
        let _value = result.expect("failed to deserialize row");
        count += 1;

        if count >= LIMIT {
            break; // Drop stream early
        }
    }

    println!("  Consumed: {} rows", count);
    println!("  Stream dropped early: ✓");
    // If we get here without panicking, cancellation worked

    // Now verify we can make another connection
    let client2 = connect_test_client()
        .await
        .expect("should be able to reconnect");

    let mut stream2 = client2
        .query::<serde_json::Value>("test.v_project")
        .execute()
        .await
        .expect("failed to execute second query");

    let mut count2 = 0;
    while let Some(result) = stream2.next().await {
        let _value = result.expect("failed to deserialize row");
        count2 += 1;
        if count2 >= 1 {
            break; // Just get one row
        }
    }

    println!("  Reconnection: ✓");
    assert!(count2 > 0, "second connection should work");
}