use criterion::{Criterion, criterion_group, criterion_main};
use recoco::base::schema::{BasicValueType, EnrichedValueType, FieldSchema, ValueType};
use recoco::base::value::{BasicValue, FieldValues, KeyPart, KeyValue};
use std::hint::black_box;
use std::sync::Arc;
use std::time::Duration;
use thread_flow::monitoring::performance::PerformanceMetrics;
use thread_flow::targets::d1::D1ExportContext;
fn test_field_schema(name: &str, value_type: BasicValueType, nullable: bool) -> FieldSchema {
FieldSchema::new(
name,
EnrichedValueType {
typ: ValueType::Basic(value_type),
nullable,
attrs: Default::default(),
},
)
}
fn create_benchmark_context() -> D1ExportContext {
let metrics = PerformanceMetrics::new();
let key_schema = vec![
test_field_schema("content_hash", BasicValueType::Str, false),
test_field_schema("file_path", BasicValueType::Str, false),
];
let value_schema = vec![
test_field_schema("symbol_name", BasicValueType::Str, false),
test_field_schema("symbol_type", BasicValueType::Str, false),
test_field_schema("line_number", BasicValueType::Int64, false),
];
D1ExportContext::new_with_default_client(
"benchmark-database".to_string(),
"code_symbols".to_string(),
"benchmark-account".to_string(),
"benchmark-token".to_string(),
key_schema,
value_schema,
metrics,
)
.expect("Failed to create benchmark context")
}
fn bench_statement_generation(c: &mut Criterion) {
let mut group = c.benchmark_group("statement_generation");
let context = create_benchmark_context();
let test_key = KeyValue(Box::new([
KeyPart::Str("abc123def456".into()),
KeyPart::Str("src/main.rs".into()),
]));
let test_values = FieldValues {
fields: vec![
recoco::base::value::Value::Basic(BasicValue::Str("main".into())),
recoco::base::value::Value::Basic(BasicValue::Str("function".into())),
recoco::base::value::Value::Basic(BasicValue::Int64(42)),
],
};
group.bench_function("build_upsert_statement", |b| {
b.iter(|| {
let _ = black_box(context.build_upsert_stmt(&test_key, &test_values));
});
});
group.bench_function("build_delete_statement", |b| {
b.iter(|| {
let _ = black_box(context.build_delete_stmt(&test_key));
});
});
group.bench_function("build_10_upsert_statements", |b| {
let keys_values: Vec<_> = (0..10)
.map(|i| {
let key = KeyValue(Box::new([
KeyPart::Str(format!("hash{:08x}", i).into()),
KeyPart::Str(format!("src/file{}.rs", i).into()),
]));
let values = test_values.clone();
(key, values)
})
.collect();
b.iter(|| {
for (key, values) in &keys_values {
let _ = black_box(context.build_upsert_stmt(key, values));
}
});
});
group.finish();
}
#[cfg(feature = "caching")]
fn bench_cache_operations(c: &mut Criterion) {
let mut group = c.benchmark_group("cache_operations");
let context = create_benchmark_context();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
for i in 0..100 {
let key = format!("warm{:08x}", i);
context
.query_cache
.insert(key, serde_json::json!({"value": i}))
.await;
}
});
group.bench_function("cache_hit_lookup", |b| {
b.iter(|| {
runtime.block_on(async {
let _ = black_box(context.query_cache.get(&"warm00000000".to_string()).await);
});
});
});
group.bench_function("cache_miss_lookup", |b| {
b.iter(|| {
runtime.block_on(async {
let _ = black_box(context.query_cache.get(&"nonexistent".to_string()).await);
});
});
});
group.bench_function("cache_insert", |b| {
let mut counter = 0u64;
b.iter(|| {
runtime.block_on(async {
let key = format!("insert{:016x}", counter);
counter += 1;
context
.query_cache
.insert(key, serde_json::json!({"value": counter}))
.await;
});
});
});
group.bench_function("cache_stats_retrieval", |b| {
b.iter(|| {
runtime.block_on(async {
let _ = black_box(context.cache_stats().await);
});
});
});
group.bench_function("cache_entry_count", |b| {
b.iter(|| {
let _ = black_box(context.query_cache.entry_count());
});
});
group.finish();
}
fn bench_metrics_tracking(c: &mut Criterion) {
let mut group = c.benchmark_group("metrics_tracking");
let metrics = PerformanceMetrics::new();
group.bench_function("record_cache_hit", |b| {
b.iter(|| {
metrics.record_cache_hit();
});
});
group.bench_function("record_cache_miss", |b| {
b.iter(|| {
metrics.record_cache_miss();
});
});
group.bench_function("record_query_10ms", |b| {
b.iter(|| {
metrics.record_query(Duration::from_millis(10), true);
});
});
group.bench_function("record_query_50ms", |b| {
b.iter(|| {
metrics.record_query(Duration::from_millis(50), true);
});
});
group.bench_function("record_query_error", |b| {
b.iter(|| {
metrics.record_query(Duration::from_millis(100), false);
});
});
group.bench_function("get_cache_stats", |b| {
b.iter(|| {
black_box(metrics.cache_stats());
});
});
group.bench_function("get_query_stats", |b| {
b.iter(|| {
black_box(metrics.query_stats());
});
});
group.bench_function("export_prometheus", |b| {
b.iter(|| {
black_box(metrics.export_prometheus());
});
});
group.finish();
}
fn bench_context_creation(c: &mut Criterion) {
let mut group = c.benchmark_group("context_creation");
let key_schema = vec![
test_field_schema("content_hash", BasicValueType::Str, false),
test_field_schema("file_path", BasicValueType::Str, false),
];
let value_schema = vec![
test_field_schema("symbol_name", BasicValueType::Str, false),
test_field_schema("symbol_type", BasicValueType::Str, false),
test_field_schema("line_number", BasicValueType::Int64, false),
];
group.bench_function("create_d1_context", |b| {
b.iter(|| {
let metrics = PerformanceMetrics::new();
let _ = black_box(D1ExportContext::new_with_default_client(
"benchmark-database".to_string(),
"code_symbols".to_string(),
"benchmark-account".to_string(),
"benchmark-token".to_string(),
key_schema.clone(),
value_schema.clone(),
metrics,
));
});
});
group.bench_function("create_performance_metrics", |b| {
b.iter(|| {
let _ = black_box(PerformanceMetrics::new());
});
});
group.finish();
}
fn bench_value_conversion(c: &mut Criterion) {
let mut group = c.benchmark_group("value_conversion");
use thread_flow::targets::d1::{basic_value_to_json, key_part_to_json, value_to_json};
let test_str_value = BasicValue::Str("test_string".into());
let test_int_value = BasicValue::Int64(42);
let test_bool_value = BasicValue::Bool(true);
group.bench_function("basic_value_to_json_str", |b| {
b.iter(|| {
let _ = black_box(basic_value_to_json(&test_str_value));
});
});
group.bench_function("basic_value_to_json_int", |b| {
b.iter(|| {
let _ = black_box(basic_value_to_json(&test_int_value));
});
});
group.bench_function("basic_value_to_json_bool", |b| {
b.iter(|| {
let _ = black_box(basic_value_to_json(&test_bool_value));
});
});
let test_key_part_str = KeyPart::Str("test_key".into());
let test_key_part_int = KeyPart::Int64(123456);
group.bench_function("key_part_to_json_str", |b| {
b.iter(|| {
let _ = black_box(key_part_to_json(&test_key_part_str));
});
});
group.bench_function("key_part_to_json_int", |b| {
b.iter(|| {
let _ = black_box(key_part_to_json(&test_key_part_int));
});
});
let test_value = recoco::base::value::Value::Basic(BasicValue::Str("test".into()));
group.bench_function("value_to_json", |b| {
b.iter(|| {
let _ = black_box(value_to_json(&test_value));
});
});
group.finish();
}
fn bench_http_pool_performance(c: &mut Criterion) {
let mut group = c.benchmark_group("http_pool_performance");
let http_client = Arc::new(
reqwest::Client::builder()
.pool_max_idle_per_host(10)
.pool_idle_timeout(Some(Duration::from_secs(90)))
.tcp_keepalive(Some(Duration::from_secs(60)))
.http2_keep_alive_interval(Some(Duration::from_secs(30)))
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client"),
);
group.bench_function("create_context_with_shared_client", |b| {
let metrics = PerformanceMetrics::new();
let key_schema = vec![test_field_schema("id", BasicValueType::Int64, false)];
let value_schema = vec![test_field_schema("data", BasicValueType::Str, false)];
b.iter(|| {
let client = Arc::clone(&http_client);
let _ = black_box(D1ExportContext::new(
"test-db".to_string(),
"test_table".to_string(),
"test-account".to_string(),
"test-token".to_string(),
client,
key_schema.clone(),
value_schema.clone(),
metrics.clone(),
));
});
});
group.bench_function("arc_clone_http_client", |b| {
b.iter(|| {
let _ = black_box(Arc::clone(&http_client));
});
});
group.bench_function("create_10_contexts_shared_pool", |b| {
b.iter(|| {
let contexts: Vec<_> = (0..10)
.map(|i| {
let metrics = PerformanceMetrics::new();
let key_schema = vec![test_field_schema("id", BasicValueType::Int64, false)];
let value_schema = vec![test_field_schema("data", BasicValueType::Str, false)];
let client = Arc::clone(&http_client);
D1ExportContext::new(
format!("db-{}", i),
format!("table_{}", i),
"account".to_string(),
"token".to_string(),
client,
key_schema,
value_schema,
metrics,
)
.expect("Failed to create context")
})
.collect();
black_box(contexts)
});
});
group.finish();
}
#[cfg(feature = "caching")]
fn bench_e2e_query_pipeline(c: &mut Criterion) {
let mut group = c.benchmark_group("e2e_query_pipeline");
let context = create_benchmark_context();
let runtime = tokio::runtime::Runtime::new().unwrap();
let test_entries: Vec<_> = (0..100)
.map(|i| {
let key = KeyValue(Box::new([
KeyPart::Str(format!("hash{:08x}", i).into()),
KeyPart::Str(format!("src/file{}.rs", i).into()),
]));
let values = FieldValues {
fields: vec![
recoco::base::value::Value::Basic(BasicValue::Str(
format!("func_{}", i).into(),
)),
recoco::base::value::Value::Basic(BasicValue::Str("function".into())),
recoco::base::value::Value::Basic(BasicValue::Int64(i as i64)),
],
};
(key, values)
})
.collect();
runtime.block_on(async {
for (i, (key, values)) in test_entries.iter().enumerate() {
let query_key = format!("query_{:08x}", i);
let result = serde_json::json!({
"key": format!("{:?}", key),
"values": format!("{:?}", values),
});
context.query_cache.insert(query_key, result).await;
}
});
group.bench_function("pipeline_cache_hit_100_percent", |b| {
let mut idx = 0;
b.iter(|| {
runtime.block_on(async {
let query_key = format!("query_{:08x}", idx % 100);
let cached = context.query_cache.get(&query_key).await;
black_box(cached);
idx += 1;
});
});
});
group.bench_function("pipeline_cache_miss", |b| {
let mut idx = 0;
b.iter(|| {
runtime.block_on(async {
let (key, values) = &test_entries[idx % 100];
let query_key = format!("miss_{:08x}", idx);
let cached = context.query_cache.get(&query_key).await;
if cached.is_none() {
let stmt = context.build_upsert_stmt(key, values);
let _ = black_box(stmt);
let result = serde_json::json!({"simulated": true});
context.query_cache.insert(query_key, result).await;
}
idx += 1;
});
});
});
group.bench_function("pipeline_90_percent_cache_hit", |b| {
let mut idx = 0;
b.iter(|| {
runtime.block_on(async {
let (key, values) = &test_entries[idx % 100];
let query_key = if idx % 10 == 0 {
format!("new_{:08x}", idx) } else {
format!("query_{:08x}", idx % 100) };
let cached = context.query_cache.get(&query_key).await;
if cached.is_none() {
let stmt = context.build_upsert_stmt(key, values);
let _ = black_box(stmt);
let result = serde_json::json!({"simulated": true});
context.query_cache.insert(query_key, result).await;
}
idx += 1;
});
});
});
group.finish();
}
fn bench_batch_operations(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_operations");
let context = create_benchmark_context();
let batch_10: Vec<_> = (0..10).map(create_test_entry).collect();
let batch_100: Vec<_> = (0..100).map(create_test_entry).collect();
let batch_1000: Vec<_> = (0..1000).map(create_test_entry).collect();
group.bench_function("batch_upsert_10_entries", |b| {
b.iter(|| {
for (key, values) in &batch_10 {
let _ = black_box(context.build_upsert_stmt(key, values));
}
});
});
group.bench_function("batch_upsert_100_entries", |b| {
b.iter(|| {
for (key, values) in &batch_100 {
let _ = black_box(context.build_upsert_stmt(key, values));
}
});
});
group.bench_function("batch_upsert_1000_entries", |b| {
b.iter(|| {
for (key, values) in &batch_1000 {
let _ = black_box(context.build_upsert_stmt(key, values));
}
});
});
group.bench_function("batch_delete_10_entries", |b| {
b.iter(|| {
for (key, _) in &batch_10 {
let _ = black_box(context.build_delete_stmt(key));
}
});
});
group.bench_function("batch_delete_100_entries", |b| {
b.iter(|| {
for (key, _) in &batch_100 {
let _ = black_box(context.build_delete_stmt(key));
}
});
});
group.finish();
}
fn create_test_entry(idx: usize) -> (KeyValue, FieldValues) {
let key = KeyValue(Box::new([
KeyPart::Str(format!("hash{:08x}", idx).into()),
KeyPart::Str(format!("src/file{}.rs", idx).into()),
]));
let values = FieldValues {
fields: vec![
recoco::base::value::Value::Basic(BasicValue::Str(format!("symbol_{}", idx).into())),
recoco::base::value::Value::Basic(BasicValue::Str("function".into())),
recoco::base::value::Value::Basic(BasicValue::Int64(idx as i64)),
],
};
(key, values)
}
#[cfg(feature = "caching")]
fn bench_p95_latency_validation(c: &mut Criterion) {
let mut group = c.benchmark_group("p95_latency_validation");
group.sample_size(1000);
let context = create_benchmark_context();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
for i in 0..1000 {
let query_key = format!("warm{:08x}", i);
context
.query_cache
.insert(query_key, serde_json::json!({"value": i}))
.await;
}
});
group.bench_function("realistic_workload_p95", |b| {
let mut idx = 0;
b.iter(|| {
runtime.block_on(async {
let query_key = if idx % 20 == 0 {
format!("miss{:08x}", idx)
} else {
format!("warm{:08x}", idx % 1000)
};
let result = context.query_cache.get(&query_key).await;
if result.is_none() {
let (key, values) = create_test_entry(idx);
let stmt = context.build_upsert_stmt(&key, &values);
let _ = black_box(stmt);
context
.query_cache
.insert(query_key, serde_json::json!({"new": true}))
.await;
}
idx += 1;
});
});
});
group.finish();
}
criterion_group!(
benches,
bench_statement_generation,
bench_metrics_tracking,
bench_context_creation,
bench_value_conversion,
bench_http_pool_performance,
bench_batch_operations,
);
#[cfg(feature = "caching")]
criterion_group!(
cache_benches,
bench_cache_operations,
bench_e2e_query_pipeline,
bench_p95_latency_validation,
);
#[cfg(feature = "caching")]
criterion_main!(benches, cache_benches);
#[cfg(not(feature = "caching"))]
criterion_main!(benches);