use clockworker::ExecutorBuilder;
use futures::future;
use std::io::Write;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::LocalSet;
use crate::utils::{Step, Work};
#[allow(dead_code)]
mod utils;
const WARMUP_ITERS: usize = 3;
const BENCH_ITERS: usize = 10;
const SPAWN_TASK_COUNTS: &[usize] = &[1_000, 10_000, 100_000];
const YIELD_TASK_COUNT: usize = 1_000;
const YIELDS_PER_TASK: [usize; 3] = [10, 100, 1_000];
const IO_TASK_COUNT: usize = 100;
const SLEEPS_PER_TASK: usize = 10;
const SLEEP_DURATION: Duration = Duration::from_micros(500);
async fn drive<F>(
spawn: impl Fn(F) -> utils::Handle<()>,
n: usize,
factory: impl Fn() -> F,
) -> Duration
where
F: std::future::Future<Output = ()> + 'static,
{
let start = Instant::now();
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let fut = factory();
handles.push(spawn(fut));
}
for h in handles {
let _ = h.await;
}
start.elapsed()
}
fn run_1a() -> Vec<utils::Metrics> {
let mut results = Vec::new();
for &n in SPAWN_TASK_COUNTS {
println!("\n [] Spawn throughput: n={}", n);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let factory = || async move {
let fut = Work::new(vec![]);
fut.run().await
};
let mut tokio_result = utils::Metrics::new();
let mut cw_result = utils::Metrics::new();
println!("Warming up Tokio: ");
for _ in 0..WARMUP_ITERS {
rt.block_on(async move {
let local = LocalSet::new();
let spawn = |fut| {
let handle = local.spawn_local(fut);
utils::Handle::Tokio(handle)
};
local.run_until(drive(spawn, n, factory)).await;
});
}
print!("Running Tokio: ");
for _ in 0..BENCH_ITERS {
let dur = rt.block_on(async move {
let local = LocalSet::new();
let spawn = |fut| {
let handle = local.spawn_local(fut);
utils::Handle::Tokio(handle)
};
local.run_until(drive(spawn, n, factory)).await
});
tokio_result.record(dur, &["spawn"]);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let executor = ExecutorBuilder::new().with_queue(0u8, 1).build().unwrap();
let spawn = |fut| {
let handle = executor.queue(0).unwrap().spawn(fut);
utils::Handle::Clockworker(handle)
};
println!("warming up clockworker");
std::io::stdout().flush().unwrap();
for _ in 0..WARMUP_ITERS {
let executor = executor.clone();
rt.block_on(async move {
let local = LocalSet::new();
local
.run_until(executor.run_until(drive(spawn, n, factory)))
.await;
})
}
print!("Running Clockworker (FIFO): ");
std::io::stdout().flush().unwrap();
for _ in 0..BENCH_ITERS {
let executor = executor.clone();
let dur = rt.block_on(async move {
let local = LocalSet::new();
local
.run_until(executor.run_until(drive(spawn, n, factory)))
.await
});
cw_result.record(dur, &["spawn"]);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let cw_tput = n as f64 / cw_result.mean("spawn").as_secs_f64();
let tokio_tput = n as f64 / tokio_result.mean("spawn").as_secs_f64();
let overhead = (cw_result.mean("spawn").as_secs_f64()
/ tokio_result.mean("spawn").as_secs_f64()
- 1.0)
* 100.0;
println!(
" Clockworker: {:.2} tasks/sec (mean: {:?}, stddev: {:?})",
cw_tput,
cw_result.mean("spawn"),
cw_result.stddev("spawn")
);
println!(
" Tokio: {:.2} tasks/sec (mean: {:?}, stddev: {:?})",
tokio_tput,
tokio_result.mean("spawn"),
tokio_result.stddev("spawn")
);
println!(" Overhead: {:.1}%", overhead);
results.push(cw_result);
results.push(tokio_result);
}
results
}
fn run_1b() -> Vec<utils::Metrics> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut results = Vec::new();
let n = YIELD_TASK_COUNT;
for k in YIELDS_PER_TASK {
let total_polls = n * (k + 1); println!(
"\n [1B] Yield overhead: n={}, k={} ({} total polls)",
n, k, total_polls
);
let factory = move || async move {
let steps = (0..k).map(|_| Step::Yield).collect::<Vec<_>>();
let fut = Work::new(steps);
fut.run().await
};
let mut cw_fifo = utils::Metrics::new();
let executor = ExecutorBuilder::new().with_queue(0u8, 1).build().unwrap();
let spawn = |fut| {
let handle = executor.queue(0).unwrap().spawn(fut);
utils::Handle::Clockworker(handle)
};
println!("Warming up Clockworker (FIFO): ");
std::io::stdout().flush().unwrap();
for _ in 0..WARMUP_ITERS {
let executor = executor.clone();
rt.block_on(async move {
let local = LocalSet::new();
local
.run_until(executor.run_until(drive(spawn, n, factory)))
.await;
})
}
print!("Running Clockworker (FIFO): ");
std::io::stdout().flush().unwrap();
for _ in 0..BENCH_ITERS {
let executor = executor.clone();
let dur = rt.block_on(async move {
let local = LocalSet::new();
local
.run_until(executor.run_until(drive(spawn, n, factory)))
.await
});
cw_fifo.record(dur, &["yield"]);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let mut tokio_result = utils::Metrics::new();
println!("Warming up Tokio: ");
std::io::stdout().flush().unwrap();
for _ in 0..WARMUP_ITERS {
rt.block_on(async move {
let local = LocalSet::new();
let spawn = |fut| {
let handle = local.spawn_local(fut);
utils::Handle::Tokio(handle)
};
local.run_until(drive(spawn, n, factory)).await;
})
}
print!("Running Tokio: ");
std::io::stdout().flush().unwrap();
for _ in 0..BENCH_ITERS {
let dur = rt.block_on(async move {
let local = LocalSet::new();
let spawn = |fut| {
let handle = local.spawn_local(fut);
utils::Handle::Tokio(handle)
};
local.run_until(drive(spawn, n, factory)).await
});
tokio_result.record(dur, &["yield"]);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let tokio_polls_per_sec = total_polls as f64 / tokio_result.mean("yield").as_secs_f64();
let fifo_polls_per_sec = total_polls as f64 / cw_fifo.mean("yield").as_secs_f64();
let fifo_overhead = (cw_fifo.mean("yield").as_nanos() as f64
/ tokio_result.mean("yield").as_nanos() as f64
- 1.0)
* 100.0;
println!(
" Tokio: {:.2e} polls/sec (mean: {:?})",
tokio_polls_per_sec,
tokio_result.mean("yield")
);
println!(
" Clockworker/FIFO: {:.2e} polls/sec (mean: {:?}) [overhead: {:.1}%]",
fifo_polls_per_sec,
cw_fifo.mean("yield"),
fifo_overhead
);
results.push(cw_fifo);
results.push(tokio_result);
}
results
}
async fn bench_1c_clockworker(
n: usize,
sleeps: usize,
sleep_dur: Duration,
) -> (Duration, Vec<Duration>) {
let executor = ExecutorBuilder::new().with_queue(0u8, 1).build().unwrap();
let queue = executor.queue(0).unwrap();
let sleep_accuracies: Arc<std::sync::Mutex<Vec<Duration>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let start = Instant::now();
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let accuracies = sleep_accuracies.clone();
let dur = sleep_dur;
handles.push(queue.spawn(async move {
for _ in 0..sleeps {
let before = Instant::now();
tokio::time::sleep(dur).await;
let actual = before.elapsed();
accuracies.lock().unwrap().push(actual);
}
}));
}
let executor_clone = executor.clone();
executor_clone
.run_until(async {
future::join_all(handles).await;
})
.await;
let elapsed = start.elapsed();
let accuracies = Arc::try_unwrap(sleep_accuracies)
.unwrap()
.into_inner()
.unwrap();
(elapsed, accuracies)
}
async fn bench_1c_tokio(n: usize, sleeps: usize, sleep_dur: Duration) -> (Duration, Vec<Duration>) {
let sleep_accuracies: Arc<std::sync::Mutex<Vec<Duration>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let start = Instant::now();
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let accuracies = sleep_accuracies.clone();
let dur = sleep_dur;
handles.push(tokio::task::spawn_local(async move {
for _ in 0..sleeps {
let before = Instant::now();
tokio::time::sleep(dur).await;
let actual = before.elapsed();
accuracies.lock().unwrap().push(actual);
}
}));
}
for h in handles {
let _ = h.await;
}
let elapsed = start.elapsed();
let accuracies = Arc::try_unwrap(sleep_accuracies)
.unwrap()
.into_inner()
.unwrap();
(elapsed, accuracies)
}
fn analyze_sleep_accuracy(
accuracies: &[Duration],
expected: Duration,
) -> (Duration, Duration, f64) {
let mean: Duration = accuracies.iter().sum::<Duration>() / accuracies.len() as u32;
let max = *accuracies.iter().max().unwrap();
let overslept_ratio = mean.as_nanos() as f64 / expected.as_nanos() as f64;
(mean, max, overslept_ratio)
}
fn run_1c() -> Vec<utils::Metrics> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut results = Vec::new();
println!(
"\n [1C] IO reactor integration: n={}, sleeps={}, sleep_dur={:?}",
IO_TASK_COUNT, SLEEPS_PER_TASK, SLEEP_DURATION
);
let expected_total = SLEEP_DURATION * (SLEEPS_PER_TASK as u32);
println!(
" Expected minimum time: {:?} (if fully parallel)",
expected_total
);
let mut cw_result = utils::Metrics::new();
let mut cw_accuracies = Vec::new();
for _ in 0..WARMUP_ITERS {
let local = LocalSet::new();
rt.block_on(local.run_until(bench_1c_clockworker(
IO_TASK_COUNT,
SLEEPS_PER_TASK,
SLEEP_DURATION,
)));
}
for _ in 0..BENCH_ITERS {
let local = LocalSet::new();
let (dur, accuracies) = rt.block_on(local.run_until(bench_1c_clockworker(
IO_TASK_COUNT,
SLEEPS_PER_TASK,
SLEEP_DURATION,
)));
cw_result.record(dur, &["sleep"]);
cw_accuracies.extend(accuracies);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let mut tokio_result = utils::Metrics::new();
let mut tokio_accuracies = Vec::new();
for _ in 0..WARMUP_ITERS {
let local = LocalSet::new();
rt.block_on(local.run_until(bench_1c_tokio(
IO_TASK_COUNT,
SLEEPS_PER_TASK,
SLEEP_DURATION,
)));
}
for _ in 0..BENCH_ITERS {
let local = LocalSet::new();
let (dur, accuracies) = rt.block_on(local.run_until(bench_1c_tokio(
IO_TASK_COUNT,
SLEEPS_PER_TASK,
SLEEP_DURATION,
)));
tokio_result.record(dur, &["sleep"]);
tokio_accuracies.extend(accuracies);
print!(".");
std::io::stdout().flush().unwrap();
}
println!();
let (cw_mean_sleep, cw_max_sleep, cw_ratio) =
analyze_sleep_accuracy(&cw_accuracies, SLEEP_DURATION);
let (tokio_mean_sleep, tokio_max_sleep, tokio_ratio) =
analyze_sleep_accuracy(&tokio_accuracies, SLEEP_DURATION);
println!(" Tokio:");
println!(" Total time: {:?} (mean)", tokio_result.mean("sleep"));
println!(
" Sleep accuracy: mean={:?} max={:?} ratio={:.2}x",
tokio_mean_sleep, tokio_max_sleep, tokio_ratio
);
println!(" Clockworker:");
println!(" Total time: {:?} (mean)", cw_result.mean("sleep"));
println!(
" Sleep accuracy: mean={:?} max={:?} ratio={:.2}x",
cw_mean_sleep, cw_max_sleep, cw_ratio
);
let overhead = (cw_result.mean("sleep").as_nanos() as f64
/ tokio_result.mean("sleep").as_nanos() as f64
- 1.0)
* 100.0;
println!(" Overhead: {:.1}%", overhead);
if cw_ratio > tokio_ratio * 1.5 {
println!(
" ⚠️ WARNING: Clockworker sleeps are {:.1}x longer than Tokio's - possible reactor starvation",
cw_ratio / tokio_ratio
);
}
results.push(cw_result);
results.push(tokio_result);
results
}
fn main() {
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ Clockworker Overhead Benchmarks ║");
println!("╚══════════════════════════════════════════════════════════════╝");
println!();
println!("Configuration:");
println!(" Warmup iterations: {}", WARMUP_ITERS);
println!(" Bench iterations: {}", BENCH_ITERS);
println!();
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Benchmark 1A: Spawn Throughput");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
run_1a();
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Benchmark 1B: Yield/Poll Overhead");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
run_1b();
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Benchmark 1C: IO Reactor Integration");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
run_1c();
}