use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rust_task_queue::prelude::*;
use rust_task_queue::TaskResult;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
struct AutoscalerTestTask {
id: u32,
duration_ms: u64,
cpu_intensive: bool,
}
#[async_trait::async_trait]
impl Task for AutoscalerTestTask {
async fn execute(&self) -> TaskResult {
if self.cpu_intensive {
let mut sum = 0u64;
for i in 0..100000 {
sum = sum.wrapping_add(i * 2);
}
Ok(rmp_serde::to_vec(&sum)?)
} else {
let cycles = self.duration_ms * 100;
let mut result = 0u64;
for i in 0..cycles {
result = result.wrapping_add(i);
}
Ok(rmp_serde::to_vec(&format!(
"completed_{}_{}",
self.id, result
))?)
}
}
fn name(&self) -> &str {
"autoscaler_test_task"
}
fn timeout_seconds(&self) -> u64 {
30
}
}
fn bench_autoscaler_config_creation(c: &mut Criterion) {
let mut group = c.benchmark_group("autoscaler_config_creation");
group.bench_function("default_autoscaler_config", |b| {
b.iter(|| {
let config = AutoScalerConfig::default();
black_box(config);
})
});
group.bench_function("custom_autoscaler_config", |b| {
b.iter(|| {
let config = AutoScalerConfig {
min_workers: 1,
max_workers: 20,
scale_up_count: 3,
scale_down_count: 1,
scaling_triggers: rust_task_queue::autoscaler::ScalingTriggers {
queue_pressure_threshold: 1.0,
worker_utilization_threshold: 0.85,
task_complexity_threshold: 1.5,
error_rate_threshold: 0.05,
memory_pressure_threshold: 512.0,
},
enable_adaptive_thresholds: true,
learning_rate: 0.1,
adaptation_window_minutes: 30,
scale_up_cooldown_seconds: 60,
scale_down_cooldown_seconds: 300,
consecutive_signals_required: 2,
target_sla: rust_task_queue::autoscaler::SLATargets {
max_p95_latency_ms: 5000.0,
min_success_rate: 0.95,
max_queue_wait_time_ms: 10000.0,
target_worker_utilization: 0.70,
},
};
black_box(config);
})
});
group.bench_function("config_validation", |b| {
b.iter(|| {
let config = AutoScalerConfig::default();
let is_valid = config.validate().is_ok();
black_box((config, is_valid));
})
});
group.finish();
}
fn bench_scaling_decision_logic(c: &mut Criterion) {
let mut group = c.benchmark_group("scaling_decision_logic");
let load_scenarios = vec![
("low_load", 2.0), ("medium_load", 7.0), ("high_load", 15.0), ("extreme_load", 50.0), ];
for (scenario_name, tasks_per_worker) in load_scenarios {
group.bench_function(format!("scaling_decision_{}", scenario_name), |b| {
b.iter(|| {
let config = AutoScalerConfig::default();
let current_workers = 2;
let queue_pressure_threshold = config.scaling_triggers.queue_pressure_threshold;
let should_scale_up = tasks_per_worker > queue_pressure_threshold
&& current_workers < config.max_workers;
let should_scale_down = tasks_per_worker < queue_pressure_threshold * 0.3
&& current_workers > config.min_workers;
let decision = if should_scale_up {
"scale_up"
} else if should_scale_down {
"scale_down"
} else {
"no_change"
};
black_box(decision);
})
});
}
group.finish();
}
fn bench_load_pattern_analysis(c: &mut Criterion) {
let mut group = c.benchmark_group("load_pattern_analysis");
group.bench_function("burst_load_pattern", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..20)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: 50,
cpu_intensive: false,
})
.collect();
black_box(tasks);
})
});
group.bench_function("sustained_load_pattern", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..10)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: 100,
cpu_intensive: false,
})
.collect();
black_box(tasks);
})
});
group.bench_function("gradual_load_pattern", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..15)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: 30,
cpu_intensive: false,
})
.collect();
black_box(tasks);
})
});
group.finish();
}
fn bench_cpu_vs_io_task_analysis(c: &mut Criterion) {
let mut group = c.benchmark_group("cpu_vs_io_task_analysis");
group.bench_function("cpu_intensive_task_creation", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..10)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: 0,
cpu_intensive: true,
})
.collect();
black_box(tasks);
})
});
group.bench_function("io_intensive_task_creation", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..10)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: 100,
cpu_intensive: false,
})
.collect();
black_box(tasks);
})
});
group.bench_function("mixed_task_workload", |b| {
b.iter(|| {
let tasks: Vec<AutoscalerTestTask> = (0..10)
.map(|i| AutoscalerTestTask {
id: i,
duration_ms: if i % 2 == 0 { 50 } else { 0 },
cpu_intensive: i % 2 != 0,
})
.collect();
black_box(tasks);
})
});
group.finish();
}
fn bench_scaling_thresholds_tuning(c: &mut Criterion) {
let mut group = c.benchmark_group("scaling_thresholds_tuning");
let threshold_configs = vec![
("conservative", 20.0, 5.0), ("aggressive", 5.0, 1.0), ("balanced", 10.0, 3.0), ];
for (config_name, queue_pressure_threshold, utilization_threshold) in threshold_configs {
group.bench_function(format!("threshold_config_{}", config_name), |b| {
b.iter(|| {
let config = AutoScalerConfig {
min_workers: 1,
max_workers: 10,
scale_up_count: 2,
scale_down_count: 1,
scaling_triggers: rust_task_queue::autoscaler::ScalingTriggers {
queue_pressure_threshold,
worker_utilization_threshold: utilization_threshold / 100.0, task_complexity_threshold: 1.5,
error_rate_threshold: 0.05,
memory_pressure_threshold: 512.0,
},
enable_adaptive_thresholds: false,
learning_rate: 0.1,
adaptation_window_minutes: 30,
scale_up_cooldown_seconds: 60,
scale_down_cooldown_seconds: 300,
consecutive_signals_required: 2,
target_sla: rust_task_queue::autoscaler::SLATargets {
max_p95_latency_ms: 5000.0,
min_success_rate: 0.95,
max_queue_wait_time_ms: 10000.0,
target_worker_utilization: 0.70,
},
};
let tasks_per_worker_scenarios = [2.0, 4.0, 8.0, 12.0, 25.0];
let decisions: Vec<_> = tasks_per_worker_scenarios
.iter()
.map(|&tasks_per_worker| {
let current_workers = 3;
if tasks_per_worker > config.scaling_triggers.queue_pressure_threshold
&& current_workers < config.max_workers
{
"scale_up"
} else if tasks_per_worker
< config.scaling_triggers.queue_pressure_threshold * 0.3
&& current_workers > config.min_workers
{
"scale_down"
} else {
"no_change"
}
})
.collect();
black_box((config, decisions));
})
});
}
group.finish();
}
fn bench_scale_count_calculation(c: &mut Criterion) {
let mut group = c.benchmark_group("scale_count_calculation");
group.bench_function("scale_up_count_limiting", |b| {
b.iter(|| {
let config = AutoScalerConfig {
min_workers: 1,
max_workers: 10,
scale_up_count: 5, scale_down_count: 1,
scaling_triggers: rust_task_queue::autoscaler::ScalingTriggers {
queue_pressure_threshold: 0.75,
worker_utilization_threshold: 0.80,
task_complexity_threshold: 1.5,
error_rate_threshold: 0.05,
memory_pressure_threshold: 512.0,
},
enable_adaptive_thresholds: false,
learning_rate: 0.1,
adaptation_window_minutes: 30,
scale_up_cooldown_seconds: 60,
scale_down_cooldown_seconds: 300,
consecutive_signals_required: 2,
target_sla: rust_task_queue::autoscaler::SLATargets {
max_p95_latency_ms: 5000.0,
min_success_rate: 0.95,
max_queue_wait_time_ms: 10000.0,
target_worker_utilization: 0.70,
},
};
let current_workers = 8;
let actual_scale_count =
std::cmp::min(config.scale_up_count, config.max_workers - current_workers);
black_box((config, actual_scale_count));
})
});
group.bench_function("scale_down_count_limiting", |b| {
b.iter(|| {
let config = AutoScalerConfig {
min_workers: 2,
max_workers: 10,
scale_up_count: 2,
scale_down_count: 3, scaling_triggers: rust_task_queue::autoscaler::ScalingTriggers {
queue_pressure_threshold: 0.75,
worker_utilization_threshold: 0.80,
task_complexity_threshold: 1.5,
error_rate_threshold: 0.05,
memory_pressure_threshold: 512.0,
},
enable_adaptive_thresholds: false,
learning_rate: 0.1,
adaptation_window_minutes: 30,
scale_up_cooldown_seconds: 60,
scale_down_cooldown_seconds: 300,
consecutive_signals_required: 2,
target_sla: rust_task_queue::autoscaler::SLATargets {
max_p95_latency_ms: 5000.0,
min_success_rate: 0.95,
max_queue_wait_time_ms: 10000.0,
target_worker_utilization: 0.70,
},
};
let current_workers = 4;
let actual_scale_count = std::cmp::min(
config.scale_down_count,
current_workers - config.min_workers,
);
black_box((config, actual_scale_count));
})
});
group.finish();
}
fn bench_concurrent_scaling_simulation(c: &mut Criterion) {
let mut group = c.benchmark_group("concurrent_scaling_simulation");
for concurrency in [2, 4, 8].iter() {
group.throughput(Throughput::Elements(*concurrency as u64));
group.bench_with_input(
BenchmarkId::new("concurrent_task_submission", concurrency),
concurrency,
|b, &concurrency| {
b.iter(|| {
let all_tasks: Vec<Vec<AutoscalerTestTask>> = (0..concurrency)
.map(|client_id| {
(0..5)
.map(|i| AutoscalerTestTask {
id: (client_id * 5 + i) as u32,
duration_ms: 50,
cpu_intensive: false,
})
.collect()
})
.collect();
black_box(all_tasks);
})
},
);
}
group.finish();
}
fn bench_worker_management_simulation(c: &mut Criterion) {
let mut group = c.benchmark_group("worker_management_simulation");
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct WorkerState {
id: String,
status: String,
current_task: Option<String>,
started_at: std::time::Instant,
}
group.bench_function("worker_state_tracking", |b| {
b.iter(|| {
let workers: Vec<WorkerState> = (0..10)
.map(|i| WorkerState {
id: format!("worker_{}", i),
status: if i % 3 == 0 { "idle" } else { "busy" }.to_string(),
current_task: if i % 3 == 0 {
None
} else {
Some(format!("task_{}", i))
},
started_at: std::time::Instant::now(),
})
.collect();
black_box(workers);
})
});
group.bench_function("worker_count_calculation", |b| {
b.iter(|| {
let workers: Vec<WorkerState> = (0..10)
.map(|i| WorkerState {
id: format!("worker_{}", i),
status: if i % 3 == 0 { "idle" } else { "busy" }.to_string(),
current_task: if i % 3 == 0 {
None
} else {
Some(format!("task_{}", i))
},
started_at: std::time::Instant::now(),
})
.collect();
let active_workers = workers.iter().filter(|w| w.status == "busy").count();
let idle_workers = workers.iter().filter(|w| w.status == "idle").count();
let total_workers = workers.len();
black_box((active_workers, idle_workers, total_workers));
})
});
group.finish();
}
fn bench_tasks_per_worker_calculation(c: &mut Criterion) {
let mut group = c.benchmark_group("tasks_per_worker_calculation");
group.bench_function("tasks_per_worker_ratio", |b| {
b.iter(|| {
let scenarios = [
(5, 10), (3, 15), (8, 4), (0, 10), ];
let ratios: Vec<f64> = scenarios
.iter()
.map(|(workers, tasks)| {
if *workers > 0 {
*tasks as f64 / *workers as f64
} else {
*tasks as f64 }
})
.collect();
black_box(ratios);
})
});
group.finish();
}
fn bench_memory_efficiency(c: &mut Criterion) {
let mut group = c.benchmark_group("memory_efficiency");
group.bench_function("autoscaler_metadata_overhead", |b| {
b.iter(|| {
let worker_configs: Vec<String> = (0..10).map(|i| format!("worker_{}", i)).collect();
let worker_states: std::collections::HashMap<String, String> = worker_configs
.into_iter()
.map(|id| (id, "active".to_string()))
.collect();
let queue_stats = [
("high_priority", 5, 100, 2), ("normal_priority", 15, 300, 5),
("low_priority", 3, 150, 1),
];
let queue_data: Vec<(String, usize, usize, usize)> = queue_stats
.iter()
.map(|(name, pending, processed, failed)| {
(name.to_string(), *pending, *processed, *failed)
})
.collect();
black_box((worker_states, queue_data));
})
});
group.finish();
}
criterion_group!(
benches,
bench_autoscaler_config_creation,
bench_scaling_decision_logic,
bench_load_pattern_analysis,
bench_cpu_vs_io_task_analysis,
bench_scaling_thresholds_tuning,
bench_scale_count_calculation,
bench_concurrent_scaling_simulation,
bench_worker_management_simulation,
bench_tasks_per_worker_calculation,
bench_memory_efficiency
);
criterion_main!(benches);