clicktype-batch 0.2.0

Async batching system for ClickType with backpressure and metrics
Documentation
//! Load tests for high-volume batch ingestion
//!
//! These tests verify that the batcher can handle production-scale loads:
//! - Insert 1M+ rows without OOM
//! - Memory stays under control during burst scenarios
//! - Throughput is measured and reasonable
//!
//! Run with: cargo test --test load_tests --release -- --nocapture --ignored

use clicktype_batch::{Batcher, BatchConfig};
use clicktype_core::ClickTable;
use clicktype_macros::ClickTable;
use clicktype_transport::ClientBuilder;
use std::time::Instant;
use tokio::time::Duration;

// Test table structure
#[derive(ClickTable, Debug, Clone)]
#[click_table(name = "load_test_events", primary_key = "id")]
pub struct LoadTestEvent {
    pub id: u64,
    pub timestamp: u64,
    pub user_id: u64,
    pub event_type: String,
    pub value: f64,
}

/// Helper to create test client
async fn create_test_client() -> clicktype_transport::Client {
    ClientBuilder::default()
        .host("84.247.133.129")
        .port(8123)
        .user("default")
        .password("tQg6cKagcgF1f0lGfr0wFl0BqGWgrxjyoNw1dhZdjuRRaYofmmk8NPwhWGnmmayh")
        .database("default")
        .build()
        .await
        .expect("Failed to create client")
}

/// Helper to setup test table
async fn setup_table(client: &clicktype_transport::Client) {
    let _ = client.execute("DROP TABLE IF EXISTS load_test_events").await;
    let ddl = LoadTestEvent::create_table_ddl();
    client.execute(&ddl).await.expect("Failed to create table");
}

/// Helper to cleanup test table
async fn cleanup_table(client: &clicktype_transport::Client) {
    let _ = client.execute("DROP TABLE IF EXISTS load_test_events").await;
}

#[tokio::test]
#[ignore] // Run manually with --ignored flag
async fn load_test_1m_rows() {
    println!("\n=== Load Test: 1M Rows ===\n");

    let client = create_test_client().await;
    setup_table(&client).await;

    let config = BatchConfig {
        max_rows: 10_000,
        max_buffer_size: 5 * 1024 * 1024, // 5MB
        max_wait: Duration::from_secs(1),
        ..Default::default()
    };

    let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
    let (handle, worker) = batcher.spawn();

    let total_rows = 1_000_000u64;
    let start = Instant::now();

    println!("Inserting {} rows...", total_rows);

    for i in 0..total_rows {
        let event = LoadTestEvent {
            id: i,
            timestamp: i,
            user_id: i % 10_000,
            event_type: format!("event_type_{}", i % 10),
            value: (i as f64) * 1.5,
        };

        handle.insert(event).await.expect("Insert failed");

        if i % 100_000 == 0 && i > 0 {
            println!("  {} rows inserted...", i);
        }
    }

    println!("All rows sent, flushing...");
    let stats = handle.flush().await.expect("Flush failed");
    println!("Final flush: {} rows, {} bytes", stats.rows_flushed, stats.bytes_sent);

    handle.close().await.expect("Close failed");
    worker.await.expect("Worker panicked");

    let elapsed = start.elapsed();
    let rows_per_sec = total_rows as f64 / elapsed.as_secs_f64();
    let mb_per_sec = (stats.bytes_sent as f64 / 1_024.0 / 1_024.0) / elapsed.as_secs_f64();

    println!("\n=== Results ===");
    println!("Total time: {:.2}s", elapsed.as_secs_f64());
    println!("Throughput: {:.0} rows/sec", rows_per_sec);
    println!("Bandwidth: {:.2} MB/sec", mb_per_sec);

    // Verify all rows were inserted
    let count = client
        .query_check("SELECT * FROM load_test_events")
        .await
        .expect("Query failed");

    assert_eq!(count, total_rows, "Not all rows were inserted");
    println!("✓ All {} rows verified in database", count);

    cleanup_table(&client).await;
    println!("\n✓ Load test completed successfully\n");
}

#[tokio::test]
#[ignore]
async fn load_test_burst_scenario() {
    println!("\n=== Load Test: Burst Scenario ===\n");

    let client = create_test_client().await;
    setup_table(&client).await;

    // Small buffer to force frequent flushes
    let config = BatchConfig {
        max_rows: 5_000,
        max_buffer_size: 2 * 1024 * 1024, // 2MB
        max_wait: Duration::from_millis(100),
        buffer_shrink_threshold: 10 * 1024 * 1024, // 10MB
        ..Default::default()
    };

    let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
    let (handle, worker) = batcher.spawn();

    println!("Phase 1: Normal load (10k rows)...");
    for i in 0..10_000u64 {
        let event = LoadTestEvent {
            id: i,
            timestamp: i,
            user_id: i % 1000,
            event_type: "normal".to_string(),
            value: i as f64,
        };
        handle.insert(event).await.expect("Insert failed");
    }
    handle.flush().await.expect("Flush failed");
    println!("✓ Phase 1 complete");

    println!("\nPhase 2: Burst (100k rows in rapid succession)...");
    let burst_start = Instant::now();
    for i in 10_000..110_000u64 {
        let event = LoadTestEvent {
            id: i,
            timestamp: i,
            user_id: i % 1000,
            event_type: format!("burst_{}", i % 100),
            value: i as f64,
        };
        handle.insert(event).await.expect("Insert failed");
    }
    handle.flush().await.expect("Flush failed");
    let burst_elapsed = burst_start.elapsed();
    println!("✓ Phase 2 complete in {:.2}s", burst_elapsed.as_secs_f64());

    println!("\nPhase 3: Return to normal (10k rows)...");
    for i in 110_000..120_000u64 {
        let event = LoadTestEvent {
            id: i,
            timestamp: i,
            user_id: i % 1000,
            event_type: "normal".to_string(),
            value: i as f64,
        };
        handle.insert(event).await.expect("Insert failed");
    }
    handle.flush().await.expect("Flush failed");
    println!("✓ Phase 3 complete");

    handle.close().await.expect("Close failed");
    worker.await.expect("Worker panicked");

    // Verify all rows
    let count = client
        .query_check("SELECT * FROM load_test_events")
        .await
        .expect("Query failed");

    assert_eq!(count, 120_000, "Not all rows were inserted");
    println!("\n✓ All {} rows verified", count);
    println!("✓ Memory management survived burst scenario");

    cleanup_table(&client).await;
    println!("\n✓ Burst test completed successfully\n");
}

#[tokio::test]
#[ignore]
async fn load_test_concurrent_inserts() {
    println!("\n=== Load Test: Concurrent Inserts ===\n");

    let client = create_test_client().await;
    setup_table(&client).await;

    let config = BatchConfig {
        max_rows: 10_000,
        channel_capacity: 1000,
        ..Default::default()
    };

    let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
    let (handle, worker) = batcher.spawn();

    let num_tasks = 10;
    let rows_per_task = 10_000u64;
    let total_rows = num_tasks * rows_per_task;

    println!("Spawning {} concurrent tasks, {} rows each...", num_tasks, rows_per_task);

    let mut tasks = vec![];
    for task_id in 0..num_tasks {
        let handle_clone = handle.clone();
        let task = tokio::spawn(async move {
            for i in 0..rows_per_task {
                let id = task_id * rows_per_task + i;
                let event = LoadTestEvent {
                    id,
                    timestamp: id,
                    user_id: id % 5000,
                    event_type: format!("task_{}", task_id),
                    value: id as f64,
                };
                handle_clone.insert(event).await.expect("Insert failed");
            }
        });
        tasks.push(task);
    }

    let start = Instant::now();
    for task in tasks {
        task.await.expect("Task panicked");
    }
    let elapsed = start.elapsed();

    println!("All tasks completed in {:.2}s", elapsed.as_secs_f64());
    println!("Flushing final batch...");

    handle.flush().await.expect("Flush failed");
    handle.close().await.expect("Close failed");
    worker.await.expect("Worker panicked");

    // Verify all rows
    let count = client
        .query_check("SELECT * FROM load_test_events")
        .await
        .expect("Query failed");

    assert_eq!(count, total_rows, "Not all rows were inserted");
    println!("✓ All {} rows verified from {} concurrent tasks", count, num_tasks);

    let rows_per_sec = total_rows as f64 / elapsed.as_secs_f64();
    println!("✓ Throughput: {:.0} rows/sec", rows_per_sec);

    cleanup_table(&client).await;
    println!("\n✓ Concurrent test completed successfully\n");
}

#[tokio::test]
#[ignore]
async fn load_test_try_insert_backpressure() {
    println!("\n=== Load Test: Backpressure with try_insert ===\n");

    let client = create_test_client().await;
    setup_table(&client).await;

    // Very small channel to trigger backpressure
    let config = BatchConfig {
        max_rows: 5_000,
        channel_capacity: 10, // Very small!
        ..Default::default()
    };

    let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
    let (handle, worker) = batcher.spawn();

    let total_attempts = 50_000u64;
    let mut successful = 0u64;
    let mut dropped = 0u64;

    println!("Attempting {} rapid inserts with small channel...", total_attempts);

    for i in 0..total_attempts {
        let event = LoadTestEvent {
            id: i,
            timestamp: i,
            user_id: i % 100,
            event_type: "backpressure_test".to_string(),
            value: i as f64,
        };

        match handle.try_insert(event) {
            Ok(()) => successful += 1,
            Err(_) => dropped += 1,
        }
    }

    println!("Successful: {}", successful);
    println!("Dropped (channel full): {}", dropped);
    println!("Drop rate: {:.2}%", (dropped as f64 / total_attempts as f64) * 100.0);

    handle.flush().await.expect("Flush failed");
    handle.close().await.expect("Close failed");
    worker.await.expect("Worker panicked");

    // Verify inserted count matches successful count
    let count = client
        .query_check("SELECT * FROM load_test_events")
        .await
        .expect("Query failed");

    assert_eq!(count, successful, "Inserted count should match successful count");
    println!("✓ Verified {} rows in database (matches successful inserts)", count);
    println!("✓ Backpressure mechanism working correctly");

    cleanup_table(&client).await;
    println!("\n✓ Backpressure test completed successfully\n");
}