use clicktype_batch::{Batcher, BatchConfig};
use clicktype_core::ClickTable;
use clicktype_macros::ClickTable;
use clicktype_transport::ClientBuilder;
use std::time::Instant;
use tokio::time::Duration;
#[derive(ClickTable, Debug, Clone)]
#[click_table(name = "load_test_events", primary_key = "id")]
pub struct LoadTestEvent {
pub id: u64,
pub timestamp: u64,
pub user_id: u64,
pub event_type: String,
pub value: f64,
}
async fn create_test_client() -> clicktype_transport::Client {
ClientBuilder::default()
.host("84.247.133.129")
.port(8123)
.user("default")
.password("tQg6cKagcgF1f0lGfr0wFl0BqGWgrxjyoNw1dhZdjuRRaYofmmk8NPwhWGnmmayh")
.database("default")
.build()
.await
.expect("Failed to create client")
}
async fn setup_table(client: &clicktype_transport::Client) {
let _ = client.execute("DROP TABLE IF EXISTS load_test_events").await;
let ddl = LoadTestEvent::create_table_ddl();
client.execute(&ddl).await.expect("Failed to create table");
}
async fn cleanup_table(client: &clicktype_transport::Client) {
let _ = client.execute("DROP TABLE IF EXISTS load_test_events").await;
}
#[tokio::test]
#[ignore] async fn load_test_1m_rows() {
println!("\n=== Load Test: 1M Rows ===\n");
let client = create_test_client().await;
setup_table(&client).await;
let config = BatchConfig {
max_rows: 10_000,
max_buffer_size: 5 * 1024 * 1024, max_wait: Duration::from_secs(1),
..Default::default()
};
let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
let (handle, worker) = batcher.spawn();
let total_rows = 1_000_000u64;
let start = Instant::now();
println!("Inserting {} rows...", total_rows);
for i in 0..total_rows {
let event = LoadTestEvent {
id: i,
timestamp: i,
user_id: i % 10_000,
event_type: format!("event_type_{}", i % 10),
value: (i as f64) * 1.5,
};
handle.insert(event).await.expect("Insert failed");
if i % 100_000 == 0 && i > 0 {
println!(" {} rows inserted...", i);
}
}
println!("All rows sent, flushing...");
let stats = handle.flush().await.expect("Flush failed");
println!("Final flush: {} rows, {} bytes", stats.rows_flushed, stats.bytes_sent);
handle.close().await.expect("Close failed");
worker.await.expect("Worker panicked");
let elapsed = start.elapsed();
let rows_per_sec = total_rows as f64 / elapsed.as_secs_f64();
let mb_per_sec = (stats.bytes_sent as f64 / 1_024.0 / 1_024.0) / elapsed.as_secs_f64();
println!("\n=== Results ===");
println!("Total time: {:.2}s", elapsed.as_secs_f64());
println!("Throughput: {:.0} rows/sec", rows_per_sec);
println!("Bandwidth: {:.2} MB/sec", mb_per_sec);
let count = client
.query_check("SELECT * FROM load_test_events")
.await
.expect("Query failed");
assert_eq!(count, total_rows, "Not all rows were inserted");
println!("✓ All {} rows verified in database", count);
cleanup_table(&client).await;
println!("\n✓ Load test completed successfully\n");
}
#[tokio::test]
#[ignore]
async fn load_test_burst_scenario() {
println!("\n=== Load Test: Burst Scenario ===\n");
let client = create_test_client().await;
setup_table(&client).await;
let config = BatchConfig {
max_rows: 5_000,
max_buffer_size: 2 * 1024 * 1024, max_wait: Duration::from_millis(100),
buffer_shrink_threshold: 10 * 1024 * 1024, ..Default::default()
};
let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
let (handle, worker) = batcher.spawn();
println!("Phase 1: Normal load (10k rows)...");
for i in 0..10_000u64 {
let event = LoadTestEvent {
id: i,
timestamp: i,
user_id: i % 1000,
event_type: "normal".to_string(),
value: i as f64,
};
handle.insert(event).await.expect("Insert failed");
}
handle.flush().await.expect("Flush failed");
println!("✓ Phase 1 complete");
println!("\nPhase 2: Burst (100k rows in rapid succession)...");
let burst_start = Instant::now();
for i in 10_000..110_000u64 {
let event = LoadTestEvent {
id: i,
timestamp: i,
user_id: i % 1000,
event_type: format!("burst_{}", i % 100),
value: i as f64,
};
handle.insert(event).await.expect("Insert failed");
}
handle.flush().await.expect("Flush failed");
let burst_elapsed = burst_start.elapsed();
println!("✓ Phase 2 complete in {:.2}s", burst_elapsed.as_secs_f64());
println!("\nPhase 3: Return to normal (10k rows)...");
for i in 110_000..120_000u64 {
let event = LoadTestEvent {
id: i,
timestamp: i,
user_id: i % 1000,
event_type: "normal".to_string(),
value: i as f64,
};
handle.insert(event).await.expect("Insert failed");
}
handle.flush().await.expect("Flush failed");
println!("✓ Phase 3 complete");
handle.close().await.expect("Close failed");
worker.await.expect("Worker panicked");
let count = client
.query_check("SELECT * FROM load_test_events")
.await
.expect("Query failed");
assert_eq!(count, 120_000, "Not all rows were inserted");
println!("\n✓ All {} rows verified", count);
println!("✓ Memory management survived burst scenario");
cleanup_table(&client).await;
println!("\n✓ Burst test completed successfully\n");
}
#[tokio::test]
#[ignore]
async fn load_test_concurrent_inserts() {
println!("\n=== Load Test: Concurrent Inserts ===\n");
let client = create_test_client().await;
setup_table(&client).await;
let config = BatchConfig {
max_rows: 10_000,
channel_capacity: 1000,
..Default::default()
};
let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
let (handle, worker) = batcher.spawn();
let num_tasks = 10;
let rows_per_task = 10_000u64;
let total_rows = num_tasks * rows_per_task;
println!("Spawning {} concurrent tasks, {} rows each...", num_tasks, rows_per_task);
let mut tasks = vec![];
for task_id in 0..num_tasks {
let handle_clone = handle.clone();
let task = tokio::spawn(async move {
for i in 0..rows_per_task {
let id = task_id * rows_per_task + i;
let event = LoadTestEvent {
id,
timestamp: id,
user_id: id % 5000,
event_type: format!("task_{}", task_id),
value: id as f64,
};
handle_clone.insert(event).await.expect("Insert failed");
}
});
tasks.push(task);
}
let start = Instant::now();
for task in tasks {
task.await.expect("Task panicked");
}
let elapsed = start.elapsed();
println!("All tasks completed in {:.2}s", elapsed.as_secs_f64());
println!("Flushing final batch...");
handle.flush().await.expect("Flush failed");
handle.close().await.expect("Close failed");
worker.await.expect("Worker panicked");
let count = client
.query_check("SELECT * FROM load_test_events")
.await
.expect("Query failed");
assert_eq!(count, total_rows, "Not all rows were inserted");
println!("✓ All {} rows verified from {} concurrent tasks", count, num_tasks);
let rows_per_sec = total_rows as f64 / elapsed.as_secs_f64();
println!("✓ Throughput: {:.0} rows/sec", rows_per_sec);
cleanup_table(&client).await;
println!("\n✓ Concurrent test completed successfully\n");
}
#[tokio::test]
#[ignore]
async fn load_test_try_insert_backpressure() {
println!("\n=== Load Test: Backpressure with try_insert ===\n");
let client = create_test_client().await;
setup_table(&client).await;
let config = BatchConfig {
max_rows: 5_000,
channel_capacity: 10, ..Default::default()
};
let batcher = Batcher::<LoadTestEvent>::new(client.clone(), config);
let (handle, worker) = batcher.spawn();
let total_attempts = 50_000u64;
let mut successful = 0u64;
let mut dropped = 0u64;
println!("Attempting {} rapid inserts with small channel...", total_attempts);
for i in 0..total_attempts {
let event = LoadTestEvent {
id: i,
timestamp: i,
user_id: i % 100,
event_type: "backpressure_test".to_string(),
value: i as f64,
};
match handle.try_insert(event) {
Ok(()) => successful += 1,
Err(_) => dropped += 1,
}
}
println!("Successful: {}", successful);
println!("Dropped (channel full): {}", dropped);
println!("Drop rate: {:.2}%", (dropped as f64 / total_attempts as f64) * 100.0);
handle.flush().await.expect("Flush failed");
handle.close().await.expect("Close failed");
worker.await.expect("Worker panicked");
let count = client
.query_check("SELECT * FROM load_test_events")
.await
.expect("Query failed");
assert_eq!(count, successful, "Inserted count should match successful count");
println!("✓ Verified {} rows in database (matches successful inserts)", count);
println!("✓ Backpressure mechanism working correctly");
cleanup_table(&client).await;
println!("\n✓ Backpressure test completed successfully\n");
}