use crate::cli_types::BenchCommands;
use anyhow::Result;
#[cfg(feature = "state_machine")]
use chrono;
use cqlite_core::Database;
#[cfg(feature = "state_machine")]
use indicatif::{ProgressBar, ProgressStyle};
#[cfg(feature = "state_machine")]
use std::sync::Arc;
#[cfg(feature = "state_machine")]
use std::time::{Duration, Instant};
#[cfg(feature = "state_machine")]
pub async fn handle_bench_command(database: &Database, command: BenchCommands) -> Result<()> {
match command {
BenchCommands::Read {
operations,
concurrency,
table: _,
} => run_read_benchmark(database, operations as u64, concurrency as u32).await,
BenchCommands::Write {
operations,
concurrency,
table: _,
} => run_write_benchmark(database, operations as u64, concurrency as u32).await,
BenchCommands::Mixed {
operations,
read_ratio,
concurrency,
} => {
run_mixed_benchmark(
database,
(read_ratio * 100.0) as u8,
operations as u64,
concurrency as u32,
)
.await
}
}
}
#[cfg(not(feature = "state_machine"))]
pub async fn handle_bench_command(_database: &Database, _command: BenchCommands) -> Result<()> {
Err(anyhow::anyhow!(
"Benchmark commands requiring query execution are not available in M1.\n\
Build with --features state_machine or use SSTableReader directly.\n\
See CLAUDE.md for M1 API examples."
))
}
#[cfg(feature = "state_machine")]
async fn run_read_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
let _database = Arc::new(database.clone());
println!("📚 Running read benchmark");
println!("Operations: {}, Threads: {}", ops, threads);
let setup_result = setup_benchmark_table(&database).await;
if let Err(e) = setup_result {
println!("⚠️ Warning: Could not create benchmark table: {}", e);
println!("Using simple system queries instead...");
return run_simple_read_benchmark(database, ops, threads).await;
}
match populate_benchmark_data(database, 1000).await {
Ok(rows) => println!("✓ Benchmark table populated with {} rows", rows),
Err(e) => {
println!("⚠️ Warning: Could not populate benchmark data: {}", e);
return run_simple_read_benchmark(database, ops, threads).await;
}
}
let pb = create_progress_bar(ops, "Reading");
let start = Instant::now();
let mut successful_ops = 0u64;
let mut failed_ops = 0u64;
let mut total_latency = Duration::ZERO;
let mut min_latency = Duration::from_secs(999);
let mut max_latency = Duration::ZERO;
if threads == 1 {
for i in 0..ops {
let op_start = Instant::now();
let query = match i % 4 {
0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
1 => format!(
"SELECT * FROM benchmark_table WHERE id = {}",
(i % 1000) + 1
),
2 => "SELECT COUNT(*) FROM benchmark_table".to_string(),
_ => "SELECT id, name FROM benchmark_table ORDER BY id LIMIT 5".to_string(),
};
match database.execute(&query).await {
Ok(_) => {
successful_ops += 1;
let latency = op_start.elapsed();
total_latency += latency;
min_latency = min_latency.min(latency);
max_latency = max_latency.max(latency);
}
Err(_) => failed_ops += 1,
}
pb.inc(1);
if i % 100 == 0 {
pb.set_message(format!(
"Read operation {} (success: {}, failed: {})",
i, successful_ops, failed_ops
));
}
}
} else {
println!("⚠️ Multi-threaded benchmarks temporarily simplified");
return run_simple_read_benchmark(database, ops, 1).await;
}
let duration = start.elapsed();
let total_ops = successful_ops + failed_ops;
let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
let avg_latency = if successful_ops > 0 {
total_latency / successful_ops as u32
} else {
Duration::ZERO
};
println!("\n📊 Read Benchmark Results:");
println!(" Total time: {:.2}s", duration.as_secs_f64());
println!(" Total operations: {}", total_ops);
println!(
" Successful operations: {} ({:.1}%)",
successful_ops, success_rate
);
println!(" Failed operations: {}", failed_ops);
println!(
" Operations/sec: {:.2}",
total_ops as f64 / duration.as_secs_f64()
);
println!(" Average latency: {:.2}ms", avg_latency.as_millis());
if successful_ops > 0 {
println!(" Min latency: {:.2}ms", min_latency.as_millis());
println!(" Max latency: {:.2}ms", max_latency.as_millis());
}
println!(" Concurrency: {} thread(s)", threads);
Ok(())
}
#[cfg(feature = "state_machine")]
async fn run_simple_read_benchmark(database: &Database, ops: u64, _threads: u32) -> Result<()> {
let pb = create_progress_bar(ops, "Simple reads");
let start = Instant::now();
let mut successful_ops = 0u64;
let queries = vec![
"SELECT COUNT(*) FROM system.tables",
"SELECT * FROM system.tables LIMIT 1",
"SELECT keyspace_name FROM system.tables LIMIT 5",
];
for i in 0..ops {
let query = queries[i as usize % queries.len()];
match database.execute(query).await {
Ok(_) => successful_ops += 1,
Err(_) => {}
}
pb.inc(1);
}
pb.finish_with_message("Simple read benchmark completed");
let duration = start.elapsed();
println!("\n📊 Simple Read Benchmark Results:");
println!(" Total time: {:.2}s", duration.as_secs_f64());
println!(" Successful operations: {}/{}", successful_ops, ops);
println!(
" Operations/sec: {:.2}",
successful_ops as f64 / duration.as_secs_f64()
);
Ok(())
}
#[cfg(feature = "state_machine")]
async fn run_write_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
let _database = Arc::new(database.clone());
println!("✏️ Running write benchmark");
println!("Operations: {}, Threads: {}", ops, threads);
let setup_result = setup_benchmark_table(&database).await;
if let Err(e) = setup_result {
println!("⚠️ Error: Could not create benchmark table: {}", e);
println!("Write benchmark requires table creation capability.");
return Ok(());
}
let pb = create_progress_bar(ops, "Writing");
let start = Instant::now();
let mut successful_ops = 0u64;
let mut failed_ops = 0u64;
let mut total_latency = Duration::ZERO;
let mut min_latency = Duration::from_secs(999);
let mut max_latency = Duration::ZERO;
if threads == 1 {
for i in 0..ops {
let op_start = Instant::now();
let query = match i % 3 {
0 => {
format!(
"INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
1000000 + i, i,
i * 10,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
)
}
1 => {
format!(
"UPDATE benchmark_table SET value = {} WHERE id = {}",
i * 20,
(i % 100) + 1
)
}
_ => {
format!("DELETE FROM benchmark_table WHERE id > {}", 2000000 + i)
}
};
match database.execute(&query).await {
Ok(_) => {
successful_ops += 1;
let latency = op_start.elapsed();
total_latency += latency;
min_latency = min_latency.min(latency);
max_latency = max_latency.max(latency);
}
Err(_) => failed_ops += 1,
}
pb.inc(1);
if i % 50 == 0 {
pb.set_message(format!(
"Write operation {} (success: {}, failed: {})",
i, successful_ops, failed_ops
));
}
}
} else {
println!("⚠️ Multi-threaded benchmarks temporarily simplified");
return run_simple_read_benchmark(database, ops, 1).await;
}
let duration = start.elapsed();
let total_ops = successful_ops + failed_ops;
let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
let avg_latency = if successful_ops > 0 {
total_latency / successful_ops as u32
} else {
Duration::ZERO
};
println!("\n✏️ Write Benchmark Results:");
println!(" Total time: {:.2}s", duration.as_secs_f64());
println!(" Total operations: {}", total_ops);
println!(
" Successful operations: {} ({:.1}%)",
successful_ops, success_rate
);
println!(" Failed operations: {}", failed_ops);
println!(
" Operations/sec: {:.2}",
total_ops as f64 / duration.as_secs_f64()
);
println!(" Average latency: {:.2}ms", avg_latency.as_millis());
if successful_ops > 0 {
println!(" Min latency: {:.2}ms", min_latency.as_millis());
println!(" Max latency: {:.2}ms", max_latency.as_millis());
}
println!(" Concurrency: {} thread(s)", threads);
Ok(())
}
#[cfg(feature = "state_machine")]
async fn run_mixed_benchmark(
database: &Database,
read_pct: u8,
ops: u64,
threads: u32,
) -> Result<()> {
let database = Arc::new(database.clone());
println!("🔄 Running mixed benchmark");
println!(
"Operations: {}, Threads: {}, Read%: {}",
ops, threads, read_pct
);
let setup_result = setup_benchmark_table(&database).await;
if let Err(e) = setup_result {
println!("⚠️ Warning: Could not create benchmark table: {}", e);
println!("Using simplified mixed benchmark...");
return run_simple_mixed_benchmark(&database, read_pct, ops, threads).await;
}
match populate_benchmark_data(&database, 500).await {
Ok(rows) => println!("✓ Benchmark table populated with {} rows", rows),
Err(e) => println!("⚠️ Warning: Could not populate data: {}", e),
}
let pb = create_progress_bar(ops, "Mixed workload");
let start = Instant::now();
let mut read_ops = 0u64;
let mut write_ops = 0u64;
let mut successful_ops = 0u64;
let mut failed_ops = 0u64;
let mut read_latency = Duration::ZERO;
let mut write_latency = Duration::ZERO;
if threads == 1 {
for i in 0..ops {
let op_start = Instant::now();
let is_read = (i * 100) % 100 < read_pct as u64;
let query = if is_read {
read_ops += 1;
match i % 3 {
0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
1 => format!("SELECT * FROM benchmark_table WHERE id = {}", (i % 500) + 1),
_ => "SELECT COUNT(*) FROM benchmark_table".to_string(),
}
} else {
write_ops += 1;
match i % 2 {
0 => format!(
"INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'mixed_user_{}', {}, '{}')",
3000000 + i,
i,
i * 5,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
),
_ => format!(
"UPDATE benchmark_table SET value = {} WHERE id <= {}",
i * 7,
(i % 100) + 1
),
}
};
match database.execute(&query).await {
Ok(_) => {
successful_ops += 1;
let latency = op_start.elapsed();
if is_read {
read_latency += latency;
} else {
write_latency += latency;
}
}
Err(_) => failed_ops += 1,
}
pb.inc(1);
if i % 100 == 0 {
pb.set_message(format!(
"Mixed operation {} (R:{} W:{} S:{} F:{})",
i, read_ops, write_ops, successful_ops, failed_ops
));
}
}
} else {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use tokio::task::JoinSet;
let read_counter = Arc::new(AtomicU64::new(0));
let write_counter = Arc::new(AtomicU64::new(0));
let successful_counter = Arc::new(AtomicU64::new(0));
let failed_counter = Arc::new(AtomicU64::new(0));
let read_latency_total = Arc::new(Mutex::new(Duration::ZERO));
let write_latency_total = Arc::new(Mutex::new(Duration::ZERO));
let pb_shared = Arc::new(Mutex::new(pb));
let ops_per_thread = ops / threads as u64;
let mut tasks = JoinSet::new();
for thread_id in 0..threads {
let database = database.clone();
let read_counter = read_counter.clone();
let write_counter = write_counter.clone();
let successful_counter = successful_counter.clone();
let failed_counter = failed_counter.clone();
let read_latency_total = read_latency_total.clone();
let write_latency_total = write_latency_total.clone();
let pb = pb_shared.clone();
tasks.spawn(async move {
for i in 0..ops_per_thread {
let op_start = Instant::now();
let thread_offset = thread_id as u64 * 1000000;
let is_read = (thread_id as u64 + i) * 100 % 100 < read_pct as u64;
let query = if is_read {
read_counter.fetch_add(1, Ordering::Relaxed);
format!("SELECT * FROM benchmark_table WHERE id = {} LIMIT 5", ((thread_id as u64 + i) % 500) + 1)
} else {
write_counter.fetch_add(1, Ordering::Relaxed);
format!(
"INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'thread_{}_mixed_{}', {}, '{}')",
thread_offset + 4000000 + i,
thread_id,
i,
(thread_id as u64 + i) * 3,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
)
};
match database.execute(&query).await {
Ok(_) => {
successful_counter.fetch_add(1, Ordering::Relaxed);
let latency = op_start.elapsed();
if is_read {
if let Ok(mut total) = read_latency_total.lock() {
*total += latency;
}
} else {
if let Ok(mut total) = write_latency_total.lock() {
*total += latency;
}
}
}
Err(_) => {
failed_counter.fetch_add(1, Ordering::Relaxed);
}
}
if let Ok(pb) = pb.lock() {
pb.inc(1);
if i % 50 == 0 {
let reads = read_counter.load(Ordering::Relaxed);
let writes = write_counter.load(Ordering::Relaxed);
pb.set_message(format!("Thread {} - R:{} W:{}", thread_id, reads, writes));
}
}
}
});
}
while let Some(_) = tasks.join_next().await {}
read_ops = read_counter.load(Ordering::Relaxed);
write_ops = write_counter.load(Ordering::Relaxed);
successful_ops = successful_counter.load(Ordering::Relaxed);
failed_ops = failed_counter.load(Ordering::Relaxed);
if let Ok(total) = read_latency_total.lock() {
read_latency = *total;
};
if let Ok(total) = write_latency_total.lock() {
write_latency = *total;
};
}
let duration = start.elapsed();
let total_ops = read_ops + write_ops;
let success_rate = (successful_ops as f64 / (successful_ops + failed_ops) as f64) * 100.0;
let avg_read_latency = if read_ops > 0 {
read_latency / read_ops as u32
} else {
Duration::ZERO
};
let avg_write_latency = if write_ops > 0 {
write_latency / write_ops as u32
} else {
Duration::ZERO
};
println!("\n🔄 Mixed Benchmark Results:");
println!(" Total time: {:.2}s", duration.as_secs_f64());
println!(" Total operations: {} (target: {})", total_ops, ops);
println!(
" Successful operations: {} ({:.1}%)",
successful_ops, success_rate
);
println!(" Failed operations: {}", failed_ops);
println!(
" Operations/sec: {:.2}",
total_ops as f64 / duration.as_secs_f64()
);
println!(
" Read operations: {} ({:.1}% of total, target: {}%)",
read_ops,
read_ops as f64 / total_ops as f64 * 100.0,
read_pct
);
println!(
" Write operations: {} ({:.1}% of total)",
write_ops,
write_ops as f64 / total_ops as f64 * 100.0
);
if read_ops > 0 {
println!(
" Average read latency: {:.2}ms",
avg_read_latency.as_millis()
);
}
if write_ops > 0 {
println!(
" Average write latency: {:.2}ms",
avg_write_latency.as_millis()
);
}
println!(" Concurrency: {} thread(s)", threads);
Ok(())
}
#[cfg(feature = "state_machine")]
async fn run_simple_mixed_benchmark(
database: &Database,
read_pct: u8,
ops: u64,
_threads: u32,
) -> Result<()> {
let pb = create_progress_bar(ops, "Simple mixed");
let start = Instant::now();
let mut read_ops = 0u64;
let mut write_ops = 0u64;
for i in 0..ops {
let is_read = (i * 100) % 100 < read_pct as u64;
if is_read {
let _ = database.execute("SELECT COUNT(*) FROM system.tables").await;
read_ops += 1;
} else {
tokio::time::sleep(Duration::from_micros(200)).await;
write_ops += 1;
}
pb.inc(1);
}
pb.finish_with_message("Simple mixed benchmark completed");
let duration = start.elapsed();
println!("\n🔄 Simple Mixed Benchmark Results:");
println!(" Total time: {:.2}s", duration.as_secs_f64());
println!(" Read operations: {}", read_ops);
println!(" Write operations: {} (simulated)", write_ops);
println!(
" Operations/sec: {:.2}",
ops as f64 / duration.as_secs_f64()
);
Ok(())
}
#[cfg(feature = "state_machine")]
fn create_progress_bar(total: u64, prefix: &str) -> ProgressBar {
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::default_bar()
.template(&format!(
"{} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{pos}}/{{len}} ({{eta}}) {{msg}}",
prefix
))
.unwrap()
.progress_chars("=>-"),
);
pb
}
#[cfg(feature = "state_machine")]
async fn setup_benchmark_table(database: &Database) -> Result<()> {
let create_table_sql = r#"
CREATE TABLE IF NOT EXISTS benchmark_table (
id bigint PRIMARY KEY,
name text,
value bigint,
created_at timestamp
)
"#;
database
.execute(create_table_sql)
.await
.map_err(|e| anyhow::anyhow!("Failed to create benchmark table: {}", e))?;
Ok(())
}
#[cfg(feature = "state_machine")]
async fn populate_benchmark_data(database: &Database, num_rows: u64) -> Result<u64> {
match database
.execute("SELECT COUNT(*) as count FROM benchmark_table")
.await
{
Ok(result) => {
if let Some(row) = result.rows.first() {
if let Some(count_value) = row.get("count") {
let count_str = count_value.to_string();
if let Ok(existing_count) = count_str.parse::<u64>() {
if existing_count >= num_rows {
return Ok(existing_count);
}
}
}
}
}
Err(_) => {} }
println!("📦 Populating benchmark table with {} rows...", num_rows);
let mut inserted = 0;
let batch_size = 50;
for batch_start in (0..num_rows).step_by(batch_size) {
let batch_end = (batch_start + batch_size as u64).min(num_rows);
for i in batch_start..batch_end {
let insert_sql = format!(
"INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
i + 1,
i,
i * 100,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
);
match database.execute(&insert_sql).await {
Ok(_) => inserted += 1,
Err(_) => {} }
}
if batch_start % 200 == 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
Ok(inserted)
}