1use crate::cli_types::BenchCommands;
2use anyhow::Result;
3#[cfg(feature = "state_machine")]
4use chrono;
5use cqlite_core::Database;
6#[cfg(feature = "state_machine")]
7use indicatif::{ProgressBar, ProgressStyle};
8#[cfg(feature = "state_machine")]
9use std::sync::Arc;
10#[cfg(feature = "state_machine")]
11use std::time::{Duration, Instant};
12
13#[cfg(feature = "state_machine")]
14pub async fn handle_bench_command(database: &Database, command: BenchCommands) -> Result<()> {
15 match command {
16 BenchCommands::Read {
17 operations,
18 concurrency,
19 table: _,
20 } => run_read_benchmark(database, operations as u64, concurrency as u32).await,
21 BenchCommands::Write {
22 operations,
23 concurrency,
24 table: _,
25 } => run_write_benchmark(database, operations as u64, concurrency as u32).await,
26 BenchCommands::Mixed {
27 operations,
28 read_ratio,
29 concurrency,
30 } => {
31 run_mixed_benchmark(
32 database,
33 (read_ratio * 100.0) as u8,
34 operations as u64,
35 concurrency as u32,
36 )
37 .await
38 }
39 }
40}
41
42#[cfg(not(feature = "state_machine"))]
43pub async fn handle_bench_command(_database: &Database, _command: BenchCommands) -> Result<()> {
44 Err(anyhow::anyhow!(
45 "Benchmark commands requiring query execution are not available in M1.\n\
46 Build with --features state_machine or use SSTableReader directly.\n\
47 See CLAUDE.md for M1 API examples."
48 ))
49}
50
51#[cfg(feature = "state_machine")]
52async fn run_read_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
53 let _database = Arc::new(database.clone());
54 println!("š Running read benchmark");
55 println!("Operations: {}, Threads: {}", ops, threads);
56
57 let setup_result = setup_benchmark_table(&database).await;
59 if let Err(e) = setup_result {
60 println!("ā ļø Warning: Could not create benchmark table: {}", e);
61 println!("Using simple system queries instead...");
62 return run_simple_read_benchmark(database, ops, threads).await;
63 }
64
65 match populate_benchmark_data(database, 1000).await {
67 Ok(rows) => println!("ā Benchmark table populated with {} rows", rows),
68 Err(e) => {
69 println!("ā ļø Warning: Could not populate benchmark data: {}", e);
70 return run_simple_read_benchmark(database, ops, threads).await;
71 }
72 }
73
74 let pb = create_progress_bar(ops, "Reading");
75 let start = Instant::now();
76 let mut successful_ops = 0u64;
77 let mut failed_ops = 0u64;
78 let mut total_latency = Duration::ZERO;
79 let mut min_latency = Duration::from_secs(999);
80 let mut max_latency = Duration::ZERO;
81
82 if threads == 1 {
83 for i in 0..ops {
85 let op_start = Instant::now();
86
87 let query = match i % 4 {
89 0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
90 1 => format!(
91 "SELECT * FROM benchmark_table WHERE id = {}",
92 (i % 1000) + 1
93 ),
94 2 => "SELECT COUNT(*) FROM benchmark_table".to_string(),
95 _ => "SELECT id, name FROM benchmark_table ORDER BY id LIMIT 5".to_string(),
96 };
97
98 match database.execute(&query).await {
99 Ok(_) => {
100 successful_ops += 1;
101 let latency = op_start.elapsed();
102 total_latency += latency;
103 min_latency = min_latency.min(latency);
104 max_latency = max_latency.max(latency);
105 }
106 Err(_) => failed_ops += 1,
107 }
108
109 pb.inc(1);
110 if i % 100 == 0 {
111 pb.set_message(format!(
112 "Read operation {} (success: {}, failed: {})",
113 i, successful_ops, failed_ops
114 ));
115 }
116 }
117 } else {
118 println!("ā ļø Multi-threaded benchmarks temporarily simplified");
120 return run_simple_read_benchmark(database, ops, 1).await;
121 }
122
123 let duration = start.elapsed();
125
126 let total_ops = successful_ops + failed_ops;
128 let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
129 let avg_latency = if successful_ops > 0 {
130 total_latency / successful_ops as u32
131 } else {
132 Duration::ZERO
133 };
134
135 println!("\nš Read Benchmark Results:");
136 println!(" Total time: {:.2}s", duration.as_secs_f64());
137 println!(" Total operations: {}", total_ops);
138 println!(
139 " Successful operations: {} ({:.1}%)",
140 successful_ops, success_rate
141 );
142 println!(" Failed operations: {}", failed_ops);
143 println!(
144 " Operations/sec: {:.2}",
145 total_ops as f64 / duration.as_secs_f64()
146 );
147 println!(" Average latency: {:.2}ms", avg_latency.as_millis());
148 if successful_ops > 0 {
149 println!(" Min latency: {:.2}ms", min_latency.as_millis());
150 println!(" Max latency: {:.2}ms", max_latency.as_millis());
151 }
152 println!(" Concurrency: {} thread(s)", threads);
153
154 Ok(())
155}
156
157#[cfg(feature = "state_machine")]
159async fn run_simple_read_benchmark(database: &Database, ops: u64, _threads: u32) -> Result<()> {
160 let pb = create_progress_bar(ops, "Simple reads");
161 let start = Instant::now();
162 let mut successful_ops = 0u64;
163
164 let queries = vec![
166 "SELECT COUNT(*) FROM system.tables",
167 "SELECT * FROM system.tables LIMIT 1",
168 "SELECT keyspace_name FROM system.tables LIMIT 5",
169 ];
170
171 for i in 0..ops {
172 let query = queries[i as usize % queries.len()];
173
174 match database.execute(query).await {
175 Ok(_) => successful_ops += 1,
176 Err(_) => {}
177 }
178
179 pb.inc(1);
180 }
181
182 pb.finish_with_message("Simple read benchmark completed");
183 let duration = start.elapsed();
184
185 println!("\nš Simple Read Benchmark Results:");
186 println!(" Total time: {:.2}s", duration.as_secs_f64());
187 println!(" Successful operations: {}/{}", successful_ops, ops);
188 println!(
189 " Operations/sec: {:.2}",
190 successful_ops as f64 / duration.as_secs_f64()
191 );
192
193 Ok(())
194}
195
196#[cfg(feature = "state_machine")]
197async fn run_write_benchmark(database: &Database, ops: u64, threads: u32) -> Result<()> {
198 let _database = Arc::new(database.clone());
199 println!("āļø Running write benchmark");
200 println!("Operations: {}, Threads: {}", ops, threads);
201
202 let setup_result = setup_benchmark_table(&database).await;
204 if let Err(e) = setup_result {
205 println!("ā ļø Error: Could not create benchmark table: {}", e);
206 println!("Write benchmark requires table creation capability.");
207 return Ok(());
208 }
209
210 let pb = create_progress_bar(ops, "Writing");
211 let start = Instant::now();
212 let mut successful_ops = 0u64;
213 let mut failed_ops = 0u64;
214 let mut total_latency = Duration::ZERO;
215 let mut min_latency = Duration::from_secs(999);
216 let mut max_latency = Duration::ZERO;
217
218 if threads == 1 {
219 for i in 0..ops {
221 let op_start = Instant::now();
222
223 let query = match i % 3 {
225 0 => {
226 format!(
228 "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
229 1000000 + i, i,
231 i * 10,
232 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
233 )
234 }
235 1 => {
236 format!(
238 "UPDATE benchmark_table SET value = {} WHERE id = {}",
239 i * 20,
240 (i % 100) + 1
241 )
242 }
243 _ => {
244 format!("DELETE FROM benchmark_table WHERE id > {}", 2000000 + i)
246 }
247 };
248
249 match database.execute(&query).await {
250 Ok(_) => {
251 successful_ops += 1;
252 let latency = op_start.elapsed();
253 total_latency += latency;
254 min_latency = min_latency.min(latency);
255 max_latency = max_latency.max(latency);
256 }
257 Err(_) => failed_ops += 1,
258 }
259
260 pb.inc(1);
261 if i % 50 == 0 {
262 pb.set_message(format!(
263 "Write operation {} (success: {}, failed: {})",
264 i, successful_ops, failed_ops
265 ));
266 }
267 }
268 } else {
269 println!("ā ļø Multi-threaded benchmarks temporarily simplified");
271 return run_simple_read_benchmark(database, ops, 1).await;
272 }
273
274 let duration = start.elapsed();
276
277 let total_ops = successful_ops + failed_ops;
279 let success_rate = (successful_ops as f64 / total_ops as f64) * 100.0;
280 let avg_latency = if successful_ops > 0 {
281 total_latency / successful_ops as u32
282 } else {
283 Duration::ZERO
284 };
285
286 println!("\nāļø Write Benchmark Results:");
287 println!(" Total time: {:.2}s", duration.as_secs_f64());
288 println!(" Total operations: {}", total_ops);
289 println!(
290 " Successful operations: {} ({:.1}%)",
291 successful_ops, success_rate
292 );
293 println!(" Failed operations: {}", failed_ops);
294 println!(
295 " Operations/sec: {:.2}",
296 total_ops as f64 / duration.as_secs_f64()
297 );
298 println!(" Average latency: {:.2}ms", avg_latency.as_millis());
299 if successful_ops > 0 {
300 println!(" Min latency: {:.2}ms", min_latency.as_millis());
301 println!(" Max latency: {:.2}ms", max_latency.as_millis());
302 }
303 println!(" Concurrency: {} thread(s)", threads);
304
305 Ok(())
306}
307
308#[cfg(feature = "state_machine")]
309async fn run_mixed_benchmark(
310 database: &Database,
311 read_pct: u8,
312 ops: u64,
313 threads: u32,
314) -> Result<()> {
315 let database = Arc::new(database.clone());
316 println!("š Running mixed benchmark");
317 println!(
318 "Operations: {}, Threads: {}, Read%: {}",
319 ops, threads, read_pct
320 );
321
322 let setup_result = setup_benchmark_table(&database).await;
324 if let Err(e) = setup_result {
325 println!("ā ļø Warning: Could not create benchmark table: {}", e);
326 println!("Using simplified mixed benchmark...");
327 return run_simple_mixed_benchmark(&database, read_pct, ops, threads).await;
328 }
329
330 match populate_benchmark_data(&database, 500).await {
331 Ok(rows) => println!("ā Benchmark table populated with {} rows", rows),
332 Err(e) => println!("ā ļø Warning: Could not populate data: {}", e),
333 }
334
335 let pb = create_progress_bar(ops, "Mixed workload");
336 let start = Instant::now();
337 let mut read_ops = 0u64;
338 let mut write_ops = 0u64;
339 let mut successful_ops = 0u64;
340 let mut failed_ops = 0u64;
341 let mut read_latency = Duration::ZERO;
342 let mut write_latency = Duration::ZERO;
343
344 if threads == 1 {
345 for i in 0..ops {
347 let op_start = Instant::now();
348
349 let is_read = (i * 100) % 100 < read_pct as u64;
351
352 let query = if is_read {
353 read_ops += 1;
354 match i % 3 {
355 0 => "SELECT * FROM benchmark_table LIMIT 10".to_string(),
356 1 => format!("SELECT * FROM benchmark_table WHERE id = {}", (i % 500) + 1),
357 _ => "SELECT COUNT(*) FROM benchmark_table".to_string(),
358 }
359 } else {
360 write_ops += 1;
361 match i % 2 {
362 0 => format!(
363 "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'mixed_user_{}', {}, '{}')",
364 3000000 + i,
365 i,
366 i * 5,
367 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
368 ),
369 _ => format!(
370 "UPDATE benchmark_table SET value = {} WHERE id <= {}",
371 i * 7,
372 (i % 100) + 1
373 ),
374 }
375 };
376
377 match database.execute(&query).await {
378 Ok(_) => {
379 successful_ops += 1;
380 let latency = op_start.elapsed();
381 if is_read {
382 read_latency += latency;
383 } else {
384 write_latency += latency;
385 }
386 }
387 Err(_) => failed_ops += 1,
388 }
389
390 pb.inc(1);
391 if i % 100 == 0 {
392 pb.set_message(format!(
393 "Mixed operation {} (R:{} W:{} S:{} F:{})",
394 i, read_ops, write_ops, successful_ops, failed_ops
395 ));
396 }
397 }
398 } else {
399 use std::sync::atomic::{AtomicU64, Ordering};
401 use std::sync::Arc;
402 use std::sync::Mutex;
403 use tokio::task::JoinSet;
404
405 let read_counter = Arc::new(AtomicU64::new(0));
406 let write_counter = Arc::new(AtomicU64::new(0));
407 let successful_counter = Arc::new(AtomicU64::new(0));
408 let failed_counter = Arc::new(AtomicU64::new(0));
409 let read_latency_total = Arc::new(Mutex::new(Duration::ZERO));
410 let write_latency_total = Arc::new(Mutex::new(Duration::ZERO));
411 let pb_shared = Arc::new(Mutex::new(pb));
412
413 let ops_per_thread = ops / threads as u64;
414 let mut tasks = JoinSet::new();
415
416 for thread_id in 0..threads {
417 let database = database.clone();
418 let read_counter = read_counter.clone();
419 let write_counter = write_counter.clone();
420 let successful_counter = successful_counter.clone();
421 let failed_counter = failed_counter.clone();
422 let read_latency_total = read_latency_total.clone();
423 let write_latency_total = write_latency_total.clone();
424 let pb = pb_shared.clone();
425
426 tasks.spawn(async move {
427 for i in 0..ops_per_thread {
428 let op_start = Instant::now();
429 let thread_offset = thread_id as u64 * 1000000;
430
431 let is_read = (thread_id as u64 + i) * 100 % 100 < read_pct as u64;
433
434 let query = if is_read {
435 read_counter.fetch_add(1, Ordering::Relaxed);
436 format!("SELECT * FROM benchmark_table WHERE id = {} LIMIT 5", ((thread_id as u64 + i) % 500) + 1)
437 } else {
438 write_counter.fetch_add(1, Ordering::Relaxed);
439 format!(
440 "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'thread_{}_mixed_{}', {}, '{}')",
441 thread_offset + 4000000 + i,
442 thread_id,
443 i,
444 (thread_id as u64 + i) * 3,
445 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
446 )
447 };
448
449 match database.execute(&query).await {
450 Ok(_) => {
451 successful_counter.fetch_add(1, Ordering::Relaxed);
452 let latency = op_start.elapsed();
453
454 if is_read {
455 if let Ok(mut total) = read_latency_total.lock() {
456 *total += latency;
457 }
458 } else {
459 if let Ok(mut total) = write_latency_total.lock() {
460 *total += latency;
461 }
462 }
463 }
464 Err(_) => {
465 failed_counter.fetch_add(1, Ordering::Relaxed);
466 }
467 }
468
469 if let Ok(pb) = pb.lock() {
470 pb.inc(1);
471 if i % 50 == 0 {
472 let reads = read_counter.load(Ordering::Relaxed);
473 let writes = write_counter.load(Ordering::Relaxed);
474 pb.set_message(format!("Thread {} - R:{} W:{}", thread_id, reads, writes));
475 }
476 }
477 }
478 });
479 }
480
481 while let Some(_) = tasks.join_next().await {}
483
484 read_ops = read_counter.load(Ordering::Relaxed);
485 write_ops = write_counter.load(Ordering::Relaxed);
486 successful_ops = successful_counter.load(Ordering::Relaxed);
487 failed_ops = failed_counter.load(Ordering::Relaxed);
488
489 if let Ok(total) = read_latency_total.lock() {
490 read_latency = *total;
491 };
492 if let Ok(total) = write_latency_total.lock() {
493 write_latency = *total;
494 };
495 }
496
497 let duration = start.elapsed();
501
502 let total_ops = read_ops + write_ops;
504 let success_rate = (successful_ops as f64 / (successful_ops + failed_ops) as f64) * 100.0;
505 let avg_read_latency = if read_ops > 0 {
506 read_latency / read_ops as u32
507 } else {
508 Duration::ZERO
509 };
510 let avg_write_latency = if write_ops > 0 {
511 write_latency / write_ops as u32
512 } else {
513 Duration::ZERO
514 };
515
516 println!("\nš Mixed Benchmark Results:");
517 println!(" Total time: {:.2}s", duration.as_secs_f64());
518 println!(" Total operations: {} (target: {})", total_ops, ops);
519 println!(
520 " Successful operations: {} ({:.1}%)",
521 successful_ops, success_rate
522 );
523 println!(" Failed operations: {}", failed_ops);
524 println!(
525 " Operations/sec: {:.2}",
526 total_ops as f64 / duration.as_secs_f64()
527 );
528 println!(
529 " Read operations: {} ({:.1}% of total, target: {}%)",
530 read_ops,
531 read_ops as f64 / total_ops as f64 * 100.0,
532 read_pct
533 );
534 println!(
535 " Write operations: {} ({:.1}% of total)",
536 write_ops,
537 write_ops as f64 / total_ops as f64 * 100.0
538 );
539 if read_ops > 0 {
540 println!(
541 " Average read latency: {:.2}ms",
542 avg_read_latency.as_millis()
543 );
544 }
545 if write_ops > 0 {
546 println!(
547 " Average write latency: {:.2}ms",
548 avg_write_latency.as_millis()
549 );
550 }
551 println!(" Concurrency: {} thread(s)", threads);
552
553 Ok(())
554}
555
556#[cfg(feature = "state_machine")]
558async fn run_simple_mixed_benchmark(
559 database: &Database,
560 read_pct: u8,
561 ops: u64,
562 _threads: u32,
563) -> Result<()> {
564 let pb = create_progress_bar(ops, "Simple mixed");
565 let start = Instant::now();
566 let mut read_ops = 0u64;
567 let mut write_ops = 0u64;
568
569 for i in 0..ops {
570 let is_read = (i * 100) % 100 < read_pct as u64;
571
572 if is_read {
573 let _ = database.execute("SELECT COUNT(*) FROM system.tables").await;
574 read_ops += 1;
575 } else {
576 tokio::time::sleep(Duration::from_micros(200)).await;
579 write_ops += 1;
580 }
581
582 pb.inc(1);
583 }
584
585 pb.finish_with_message("Simple mixed benchmark completed");
586 let duration = start.elapsed();
587
588 println!("\nš Simple Mixed Benchmark Results:");
589 println!(" Total time: {:.2}s", duration.as_secs_f64());
590 println!(" Read operations: {}", read_ops);
591 println!(" Write operations: {} (simulated)", write_ops);
592 println!(
593 " Operations/sec: {:.2}",
594 ops as f64 / duration.as_secs_f64()
595 );
596
597 Ok(())
598}
599
600#[cfg(feature = "state_machine")]
601fn create_progress_bar(total: u64, prefix: &str) -> ProgressBar {
602 let pb = ProgressBar::new(total);
603 pb.set_style(
604 ProgressStyle::default_bar()
605 .template(&format!(
606 "{} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{pos}}/{{len}} ({{eta}}) {{msg}}",
607 prefix
608 ))
609 .unwrap()
610 .progress_chars("=>-"),
611 );
612 pb
613}
614
615#[cfg(feature = "state_machine")]
617async fn setup_benchmark_table(database: &Database) -> Result<()> {
618 let create_table_sql = r#"
619 CREATE TABLE IF NOT EXISTS benchmark_table (
620 id bigint PRIMARY KEY,
621 name text,
622 value bigint,
623 created_at timestamp
624 )
625 "#;
626
627 database
628 .execute(create_table_sql)
629 .await
630 .map_err(|e| anyhow::anyhow!("Failed to create benchmark table: {}", e))?;
631
632 Ok(())
633}
634
635#[cfg(feature = "state_machine")]
637async fn populate_benchmark_data(database: &Database, num_rows: u64) -> Result<u64> {
638 match database
640 .execute("SELECT COUNT(*) as count FROM benchmark_table")
641 .await
642 {
643 Ok(result) => {
644 if let Some(row) = result.rows.first() {
645 if let Some(count_value) = row.get("count") {
646 let count_str = count_value.to_string();
647 if let Ok(existing_count) = count_str.parse::<u64>() {
648 if existing_count >= num_rows {
649 return Ok(existing_count);
650 }
651 }
652 }
653 }
654 }
655 Err(_) => {} }
657
658 println!("š¦ Populating benchmark table with {} rows...", num_rows);
659
660 let mut inserted = 0;
661 let batch_size = 50;
662
663 for batch_start in (0..num_rows).step_by(batch_size) {
664 let batch_end = (batch_start + batch_size as u64).min(num_rows);
665
666 for i in batch_start..batch_end {
667 let insert_sql = format!(
668 "INSERT INTO benchmark_table (id, name, value, created_at) VALUES ({}, 'user_{}', {}, '{}')",
669 i + 1,
670 i,
671 i * 100,
672 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")
673 );
674
675 match database.execute(&insert_sql).await {
676 Ok(_) => inserted += 1,
677 Err(_) => {} }
679 }
680
681 if batch_start % 200 == 0 {
683 tokio::time::sleep(Duration::from_millis(10)).await;
684 }
685 }
686
687 Ok(inserted)
688}