Skip to main content

cqlite_cli/commands/
bench.rs

1use crate::cli_types::BenchCommands;
2use anyhow::Result;
3#[cfg(feature = "state_machine")]
4use chrono;
5use cqlite_core::Database;
6#[cfg(feature = "state_machine")]
7use indicatif::{ProgressBar, ProgressStyle};
8#[cfg(feature = "state_machine")]
9use std::sync::Arc;
10#[cfg(feature = "state_machine")]
11use std::time::{Duration, Instant};
12
13#[cfg(feature = "state_machine")]
14pub async fn handle_bench_command(database: &Database, command: BenchCommands) -> Result<()> {
15    match command {
16        BenchCommands::Read {
17            operations,
18            concurrency,
19            table: _,
20        } => run_read_benchmark(database, operations as u64, concurrency as u32).await,
21        BenchCommands::Write {
22            operations,
23            concurrency,
24            table: _,
25        } => run_write_benchmark(database, operations as u64, concurrency as u32).await,
26        BenchCommands::Mixed {
27            operations,
28            read_ratio,
29            concurrency,
30        } => {
31            run_mixed_benchmark(
32                database,
33                (read_ratio * 100.0) as u8,
34                operations as u64,
35                concurrency as u32,
36            )
37            .await
38        }
39    }
40}
41
42#[cfg(not(feature = "state_machine"))]
43pub async fn handle_bench_command(_database: &Database, _command: BenchCommands) -> Result<()> {
44    Err(anyhow::anyhow!(
45        "Benchmark commands requiring query execution are not available in M1.\n\
46         Build with --features state_machine or use SSTableReader directly.\n\
47         See CLAUDE.md for M1 API examples."
48    ))
49}
50
51#[cfg(feature = "state_machine")]
52async fn run_read_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
53    let _database = Arc::new(database.clone());
54    println!("šŸ“š Running read benchmark");
55    println!("Operations: {}, Threads: {}", ops, threads);
56
57    // Create benchmark table if it doesn't exist
58    let setup_result = setup_benchmark_table(&database).await;
59    if let Err(e) = setup_result {
60        println!("āš ļø  Warning: Could not create benchmark table: {}", e);
61        println!("Using simple system queries instead...");
62        return run_simple_read_benchmark(database, ops, threads).await;
63    }
64
65    // Populate table with test data if empty
66    match populate_benchmark_data(database, 1000).await {
67        Ok(rows) => println!("āœ“ Benchmark table populated with {} rows", rows),
68        Err(e) => {
69            println!("āš ļø  Warning: Could not populate benchmark data: {}", e);
70            return run_simple_read_benchmark(database, ops, threads).await;
71        }
72    }
73
74    let pb = create_progress_bar(ops, "Reading");
75    let start = Instant::now();
76    let mut successful_ops = 0u64;
77    let mut failed_ops = 0u64;
78    let mut total_latency = Duration::ZERO;
79    let mut min_latency = Duration::from_secs(999);
80    let mut max_latency = Duration::ZERO;
81
82    if threads == 1 {
83        // Single-threaded benchmark
84        for i in 0..ops {
85            let op_start = Instant::now();
86
87            // Perform different types of read operations
88            let query = match i % 4 {
89                0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
90                1 => format!(
91                    "SELECT * FROM benchmark_table WHERE id = {}",
92                    (i % 1000) + 1
93                ),
94                2 => "SELECT COUNT(*) FROM benchmark_table".to_string(),
95                _ => "SELECT id, name FROM benchmark_table ORDER BY id LIMIT 5".to_string(),
96            };
97
98            match database.execute(&query).await {
99                Ok(_) => {
100                    successful_ops += 1;
101                    let latency = op_start.elapsed();
102                    total_latency += latency;
103                    min_latency = min_latency.min(latency);
104                    max_latency = max_latency.max(latency);
105                }
106                Err(_) => failed_ops += 1,
107            }
108
109            pb.inc(1);
110            if i % 100 == 0 {
111                pb.set_message(format!(
112                    "Read operation {} (success: {}, failed: {})",
113                    i, successful_ops, failed_ops
114                ));
115            }
116        }
117    } else {
118        // Multi-threaded benchmark - simplified for now
119        println!("āš ļø  Multi-threaded benchmarks temporarily simplified");
120        return run_simple_read_benchmark(database, ops, 1).await;
121    }
122
123    // pb.finish_with_message("Read benchmark completed");
124    let duration = start.elapsed();
125
126    // Calculate statistics
127    let total_ops = successful_ops + failed_ops;
128    let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
129    let avg_latency = if successful_ops > 0 {
130        total_latency / successful_ops as u32
131    } else {
132        Duration::ZERO
133    };
134
135    println!("\nšŸ“Š Read Benchmark Results:");
136    println!("  Total time: {:.2}s", duration.as_secs_f64());
137    println!("  Total operations: {}", total_ops);
138    println!(
139        "  Successful operations: {} ({:.1}%)",
140        successful_ops, success_rate
141    );
142    println!("  Failed operations: {}", failed_ops);
143    println!(
144        "  Operations/sec: {:.2}",
145        total_ops as f64 / duration.as_secs_f64()
146    );
147    println!("  Average latency: {:.2}ms", avg_latency.as_millis());
148    if successful_ops > 0 {
149        println!("  Min latency: {:.2}ms", min_latency.as_millis());
150        println!("  Max latency: {:.2}ms", max_latency.as_millis());
151    }
152    println!("  Concurrency: {} thread(s)", threads);
153
154    Ok(())
155}
156
157/// Simple read benchmark using system queries when benchmark table is not available
158#[cfg(feature = "state_machine")]
159async fn run_simple_read_benchmark(database: &Database, ops: u64, _threads: u32) -> Result<()> {
160    let pb = create_progress_bar(ops, "Simple reads");
161    let start = Instant::now();
162    let mut successful_ops = 0u64;
163
164    // Use simple system queries that should always work
165    let queries = vec![
166        "SELECT COUNT(*) FROM system.tables",
167        "SELECT * FROM system.tables LIMIT 1",
168        "SELECT keyspace_name FROM system.tables LIMIT 5",
169    ];
170
171    for i in 0..ops {
172        let query = queries[i as usize % queries.len()];
173
174        match database.execute(query).await {
175            Ok(_) => successful_ops += 1,
176            Err(_) => {}
177        }
178
179        pb.inc(1);
180    }
181
182    pb.finish_with_message("Simple read benchmark completed");
183    let duration = start.elapsed();
184
185    println!("\nšŸ“Š Simple Read Benchmark Results:");
186    println!("  Total time: {:.2}s", duration.as_secs_f64());
187    println!("  Successful operations: {}/{}", successful_ops, ops);
188    println!(
189        "  Operations/sec: {:.2}",
190        successful_ops as f64 / duration.as_secs_f64()
191    );
192
193    Ok(())
194}
195
196#[cfg(feature = "state_machine")]
197async fn run_write_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
198    let _database = Arc::new(database.clone());
199    println!("āœļø  Running write benchmark");
200    println!("Operations: {}, Threads: {}", ops, threads);
201
202    // Create benchmark table if it doesn't exist
203    let setup_result = setup_benchmark_table(&database).await;
204    if let Err(e) = setup_result {
205        println!("āš ļø  Error: Could not create benchmark table: {}", e);
206        println!("Write benchmark requires table creation capability.");
207        return Ok(());
208    }
209
210    let pb = create_progress_bar(ops, "Writing");
211    let start = Instant::now();
212    let mut successful_ops = 0u64;
213    let mut failed_ops = 0u64;
214    let mut total_latency = Duration::ZERO;
215    let mut min_latency = Duration::from_secs(999);
216    let mut max_latency = Duration::ZERO;
217
218    if threads == 1 {
219        // Single-threaded benchmark
220        for i in 0..ops {
221            let op_start = Instant::now();
222
223            // Perform different types of write operations
224            let query = match i % 3 {
225                0 => {
226                    // INSERT
227                    format!(
228                        "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
229                        1000000 + i, // Use high IDs to avoid conflicts
230                        i,
231                        i * 10,
232                        chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
233                    )
234                }
235                1 => {
236                    // UPDATE (only if some data exists)
237                    format!(
238                        "UPDATE benchmark_table SET value = {} WHERE id = {}",
239                        i * 20,
240                        (i % 100) + 1
241                    )
242                }
243                _ => {
244                    // DELETE and re-insert
245                    format!("DELETE FROM benchmark_table WHERE id > {}", 2000000 + i)
246                }
247            };
248
249            match database.execute(&query).await {
250                Ok(_) => {
251                    successful_ops += 1;
252                    let latency = op_start.elapsed();
253                    total_latency += latency;
254                    min_latency = min_latency.min(latency);
255                    max_latency = max_latency.max(latency);
256                }
257                Err(_) => failed_ops += 1,
258            }
259
260            pb.inc(1);
261            if i % 50 == 0 {
262                pb.set_message(format!(
263                    "Write operation {} (success: {}, failed: {})",
264                    i, successful_ops, failed_ops
265                ));
266            }
267        }
268    } else {
269        // Multi-threaded benchmark - simplified for now
270        println!("āš ļø  Multi-threaded benchmarks temporarily simplified");
271        return run_simple_read_benchmark(database, ops, 1).await;
272    }
273
274    // pb.finish_with_message("Write benchmark completed");
275    let duration = start.elapsed();
276
277    // Calculate statistics
278    let total_ops = successful_ops + failed_ops;
279    let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
280    let avg_latency = if successful_ops > 0 {
281        total_latency / successful_ops as u32
282    } else {
283        Duration::ZERO
284    };
285
286    println!("\nāœļø  Write Benchmark Results:");
287    println!("  Total time: {:.2}s", duration.as_secs_f64());
288    println!("  Total operations: {}", total_ops);
289    println!(
290        "  Successful operations: {} ({:.1}%)",
291        successful_ops, success_rate
292    );
293    println!("  Failed operations: {}", failed_ops);
294    println!(
295        "  Operations/sec: {:.2}",
296        total_ops as f64 / duration.as_secs_f64()
297    );
298    println!("  Average latency: {:.2}ms", avg_latency.as_millis());
299    if successful_ops > 0 {
300        println!("  Min latency: {:.2}ms", min_latency.as_millis());
301        println!("  Max latency: {:.2}ms", max_latency.as_millis());
302    }
303    println!("  Concurrency: {} thread(s)", threads);
304
305    Ok(())
306}
307
308#[cfg(feature = "state_machine")]
309async fn run_mixed_benchmark(
310    database: &Database,
311    read_pct: u8,
312    ops: u64,
313    threads: u32,
314) -> Result<()> {
315    let database = Arc::new(database.clone());
316    println!("šŸ”„ Running mixed benchmark");
317    println!(
318        "Operations: {}, Threads: {}, Read%: {}",
319        ops, threads, read_pct
320    );
321
322    // Create and populate benchmark table
323    let setup_result = setup_benchmark_table(&database).await;
324    if let Err(e) = setup_result {
325        println!("āš ļø  Warning: Could not create benchmark table: {}", e);
326        println!("Using simplified mixed benchmark...");
327        return run_simple_mixed_benchmark(&database, read_pct, ops, threads).await;
328    }
329
330    match populate_benchmark_data(&database, 500).await {
331        Ok(rows) => println!("āœ“ Benchmark table populated with {} rows", rows),
332        Err(e) => println!("āš ļø  Warning: Could not populate data: {}", e),
333    }
334
335    let pb = create_progress_bar(ops, "Mixed workload");
336    let start = Instant::now();
337    let mut read_ops = 0u64;
338    let mut write_ops = 0u64;
339    let mut successful_ops = 0u64;
340    let mut failed_ops = 0u64;
341    let mut read_latency = Duration::ZERO;
342    let mut write_latency = Duration::ZERO;
343
344    if threads == 1 {
345        // Single-threaded mixed benchmark
346        for i in 0..ops {
347            let op_start = Instant::now();
348
349            // Determine operation type based on read percentage
350            let is_read = (i * 100) % 100 < read_pct as u64;
351
352            let query = if is_read {
353                read_ops += 1;
354                match i % 3 {
355                    0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
356                    1 => format!("SELECT * FROM benchmark_table WHERE id = {}", (i % 500) + 1),
357                    _ => "SELECT COUNT(*) FROM benchmark_table".to_string(),
358                }
359            } else {
360                write_ops += 1;
361                match i % 2 {
362                    0 => format!(
363                        "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'mixed_user_{}', {}, '{}')",
364                        3000000 + i,
365                        i,
366                        i * 5,
367                        chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
368                    ),
369                    _ => format!(
370                        "UPDATE benchmark_table SET value = {} WHERE id <= {}",
371                        i * 7,
372                        (i % 100) + 1
373                    ),
374                }
375            };
376
377            match database.execute(&query).await {
378                Ok(_) => {
379                    successful_ops += 1;
380                    let latency = op_start.elapsed();
381                    if is_read {
382                        read_latency += latency;
383                    } else {
384                        write_latency += latency;
385                    }
386                }
387                Err(_) => failed_ops += 1,
388            }
389
390            pb.inc(1);
391            if i % 100 == 0 {
392                pb.set_message(format!(
393                    "Mixed operation {} (R:{} W:{} S:{} F:{})",
394                    i, read_ops, write_ops, successful_ops, failed_ops
395                ));
396            }
397        }
398    } else {
399        // Multi-threaded mixed benchmark
400        use std::sync::atomic::{AtomicU64, Ordering};
401        use std::sync::Arc;
402        use std::sync::Mutex;
403        use tokio::task::JoinSet;
404
405        let read_counter = Arc::new(AtomicU64::new(0));
406        let write_counter = Arc::new(AtomicU64::new(0));
407        let successful_counter = Arc::new(AtomicU64::new(0));
408        let failed_counter = Arc::new(AtomicU64::new(0));
409        let read_latency_total = Arc::new(Mutex::new(Duration::ZERO));
410        let write_latency_total = Arc::new(Mutex::new(Duration::ZERO));
411        let pb_shared = Arc::new(Mutex::new(pb));
412
413        let ops_per_thread = ops / threads as u64;
414        let mut tasks = JoinSet::new();
415
416        for thread_id in 0..threads {
417            let database = database.clone();
418            let read_counter = read_counter.clone();
419            let write_counter = write_counter.clone();
420            let successful_counter = successful_counter.clone();
421            let failed_counter = failed_counter.clone();
422            let read_latency_total = read_latency_total.clone();
423            let write_latency_total = write_latency_total.clone();
424            let pb = pb_shared.clone();
425
426            tasks.spawn(async move {
427                for i in 0..ops_per_thread {
428                    let op_start = Instant::now();
429                    let thread_offset = thread_id as u64 * 1000000;
430
431                    // Determine operation type
432                    let is_read = (thread_id as u64 + i) * 100 % 100 < read_pct as u64;
433
434                    let query = if is_read {
435                        read_counter.fetch_add(1, Ordering::Relaxed);
436                        format!("SELECT * FROM benchmark_table WHERE id = {} LIMIT 5", ((thread_id as u64 + i) % 500) + 1)
437                    } else {
438                        write_counter.fetch_add(1, Ordering::Relaxed);
439                        format!(
440                            "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'thread_{}_mixed_{}', {}, '{}')",
441                            thread_offset + 4000000 + i,
442                            thread_id,
443                            i,
444                            (thread_id as u64 + i) * 3,
445                            chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
446                        )
447                    };
448
449                    match database.execute(&query).await {
450                        Ok(_) => {
451                            successful_counter.fetch_add(1, Ordering::Relaxed);
452                            let latency = op_start.elapsed();
453
454                            if is_read {
455                                if let Ok(mut total) = read_latency_total.lock() {
456                                    *total += latency;
457                                }
458                            } else {
459                                if let Ok(mut total) = write_latency_total.lock() {
460                                    *total += latency;
461                                }
462                            }
463                        }
464                        Err(_) => {
465                            failed_counter.fetch_add(1, Ordering::Relaxed);
466                        }
467                    }
468
469                    if let Ok(pb) = pb.lock() {
470                        pb.inc(1);
471                        if i % 50 == 0 {
472                            let reads = read_counter.load(Ordering::Relaxed);
473                            let writes = write_counter.load(Ordering::Relaxed);
474                            pb.set_message(format!("Thread {} - R:{} W:{}", thread_id, reads, writes));
475                        }
476                    }
477                }
478            });
479        }
480
481        // Wait for all tasks to complete
482        while let Some(_) = tasks.join_next().await {}
483
484        read_ops = read_counter.load(Ordering::Relaxed);
485        write_ops = write_counter.load(Ordering::Relaxed);
486        successful_ops = successful_counter.load(Ordering::Relaxed);
487        failed_ops = failed_counter.load(Ordering::Relaxed);
488
489        if let Ok(total) = read_latency_total.lock() {
490            read_latency = *total;
491        };
492        if let Ok(total) = write_latency_total.lock() {
493            write_latency = *total;
494        };
495    }
496
497    /* if let Ok(pb_lock) = pb_shared.lock() {
498        pb_lock.finish_with_message("Mixed benchmark completed");
499    } */
500    let duration = start.elapsed();
501
502    // Calculate statistics
503    let total_ops = read_ops + write_ops;
504    let success_rate = (successful_ops as f64 / (successful_ops + failed_ops) as f64) * 100.0;
505    let avg_read_latency = if read_ops > 0 {
506        read_latency / read_ops as u32
507    } else {
508        Duration::ZERO
509    };
510    let avg_write_latency = if write_ops > 0 {
511        write_latency / write_ops as u32
512    } else {
513        Duration::ZERO
514    };
515
516    println!("\nšŸ”„ Mixed Benchmark Results:");
517    println!("  Total time: {:.2}s", duration.as_secs_f64());
518    println!("  Total operations: {} (target: {})", total_ops, ops);
519    println!(
520        "  Successful operations: {} ({:.1}%)",
521        successful_ops, success_rate
522    );
523    println!("  Failed operations: {}", failed_ops);
524    println!(
525        "  Operations/sec: {:.2}",
526        total_ops as f64 / duration.as_secs_f64()
527    );
528    println!(
529        "  Read operations: {} ({:.1}% of total, target: {}%)",
530        read_ops,
531        read_ops as f64 / total_ops as f64 * 100.0,
532        read_pct
533    );
534    println!(
535        "  Write operations: {} ({:.1}% of total)",
536        write_ops,
537        write_ops as f64 / total_ops as f64 * 100.0
538    );
539    if read_ops > 0 {
540        println!(
541            "  Average read latency: {:.2}ms",
542            avg_read_latency.as_millis()
543        );
544    }
545    if write_ops > 0 {
546        println!(
547            "  Average write latency: {:.2}ms",
548            avg_write_latency.as_millis()
549        );
550    }
551    println!("  Concurrency: {} thread(s)", threads);
552
553    Ok(())
554}
555
556/// Simple mixed benchmark using system queries when benchmark table is not available
557#[cfg(feature = "state_machine")]
558async fn run_simple_mixed_benchmark(
559    database: &Database,
560    read_pct: u8,
561    ops: u64,
562    _threads: u32,
563) -> Result<()> {
564    let pb = create_progress_bar(ops, "Simple mixed");
565    let start = Instant::now();
566    let mut read_ops = 0u64;
567    let mut write_ops = 0u64;
568
569    for i in 0..ops {
570        let is_read = (i * 100) % 100 < read_pct as u64;
571
572        if is_read {
573            let _ = database.execute("SELECT COUNT(*) FROM system.tables").await;
574            read_ops += 1;
575        } else {
576            // For writes, we can't do much without table creation capability
577            // Just simulate the timing
578            tokio::time::sleep(Duration::from_micros(200)).await;
579            write_ops += 1;
580        }
581
582        pb.inc(1);
583    }
584
585    pb.finish_with_message("Simple mixed benchmark completed");
586    let duration = start.elapsed();
587
588    println!("\nšŸ”„ Simple Mixed Benchmark Results:");
589    println!("  Total time: {:.2}s", duration.as_secs_f64());
590    println!("  Read operations: {}", read_ops);
591    println!("  Write operations: {} (simulated)", write_ops);
592    println!(
593        "  Operations/sec: {:.2}",
594        ops as f64 / duration.as_secs_f64()
595    );
596
597    Ok(())
598}
599
600#[cfg(feature = "state_machine")]
601fn create_progress_bar(total: u64, prefix: &str) -> ProgressBar {
602    let pb = ProgressBar::new(total);
603    pb.set_style(
604        ProgressStyle::default_bar()
605            .template(&format!(
606                "{} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{pos}}/{{len}} ({{eta}}) {{msg}}",
607                prefix
608            ))
609            .unwrap()
610            .progress_chars("=>-"),
611    );
612    pb
613}
614
615/// Setup benchmark table for performance testing
616#[cfg(feature = "state_machine")]
617async fn setup_benchmark_table(database: &Database) -> Result<()> {
618    let create_table_sql = r#"
619        CREATE TABLE IF NOT EXISTS benchmark_table (
620            id bigint PRIMARY KEY,
621            name text,
622            value bigint,
623            created_at timestamp
624        )
625    "#;
626
627    database
628        .execute(create_table_sql)
629        .await
630        .map_err(|e| anyhow::anyhow!("Failed to create benchmark table: {}", e))?;
631
632    Ok(())
633}
634
635/// Populate benchmark table with test data
636#[cfg(feature = "state_machine")]
637async fn populate_benchmark_data(database: &Database, num_rows: u64) -> Result<u64> {
638    // Check if table already has data
639    match database
640        .execute("SELECT COUNT(*) as count FROM benchmark_table")
641        .await
642    {
643        Ok(result) => {
644            if let Some(row) = result.rows.first() {
645                if let Some(count_value) = row.get("count") {
646                    let count_str = count_value.to_string();
647                    if let Ok(existing_count) = count_str.parse::<u64>() {
648                        if existing_count >= num_rows {
649                            return Ok(existing_count);
650                        }
651                    }
652                }
653            }
654        }
655        Err(_) => {} // Continue with population
656    }
657
658    println!("šŸ“¦ Populating benchmark table with {} rows...", num_rows);
659
660    let mut inserted = 0;
661    let batch_size = 50;
662
663    for batch_start in (0..num_rows).step_by(batch_size) {
664        let batch_end = (batch_start + batch_size as u64).min(num_rows);
665
666        for i in batch_start..batch_end {
667            let insert_sql = format!(
668                "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
669                i + 1,
670                i,
671                i * 100,
672                chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
673            );
674
675            match database.execute(&insert_sql).await {
676                Ok(_) => inserted += 1,
677                Err(_) => {} // Skip conflicts or errors
678            }
679        }
680
681        // Small delay to prevent overwhelming the database
682        if batch_start % 200 == 0 {
683            tokio::time::sleep(Duration::from_millis(10)).await;
684        }
685    }
686
687    Ok(inserted)
688}