#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::unit_arg,
clippy::redundant_closure
)]
use std::future::Future;
use std::hint::black_box;
use std::pin::Pin;
use std::sync::{Arc, LazyLock, OnceLock};
use std::time::Instant;
use criterion::{Criterion, criterion_group, criterion_main};
use tracing::Level;
use tracing_cache::{LevelPredicate, SpanCache};
use tracing_futures::Instrument;
static CACHE: LazyLock<Arc<SpanCache>> = LazyLock::new(|| {
let (cache, driver) = SpanCache::with_predicate(16384, LevelPredicate::new(Level::INFO));
let cache = Arc::new(cache);
std::thread::spawn(|| {
let _ = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap()
.block_on(async move { tokio::spawn(driver.run()).await });
std::process::exit(333);
});
cache
});
static SUBSCRIBER_INIT: OnceLock<()> = OnceLock::new();
fn init_subscriber() {
SUBSCRIBER_INIT.get_or_init(|| {
tracing::subscriber::set_global_default(Arc::clone(&*CACHE))
.expect("global tracing subscriber already set");
});
}
#[inline]
fn two_level_spans() {
let root = tracing::span!(parent: None, Level::INFO, "bench_root");
let _root = root.enter();
let child = tracing::span!(Level::INFO, "bench_child");
let _child = child.enter();
black_box(());
}
async fn two_level_async_spans() {
black_box(
async { black_box(()) }
.instrument(tracing::span!(Level::INFO, "bench_child"))
.await,
);
}
fn recursive_level(n: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(
async move {
tokio::task::yield_now().await;
if n == 0 {
return;
}
recursive_level(n - 1).await;
}
.instrument(tracing::span!(Level::INFO, "recursive_level")),
)
}
async fn recursive_task(n: u32) {
async move {
recursive_level(n).await;
}
.instrument(tracing::span!(parent: None, Level::INFO, "recursive_root"))
.await;
}
const RECURSIVE_DEPTH: u32 = 10;
const RECURSIVE_TASKS: usize = 1000;
fn nested_info(depth: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(
async move {
tracing::event!(
Level::INFO,
kind = "before_yield",
level = depth,
retry = false,
);
tokio::task::yield_now().await;
tracing::event!(
Level::INFO,
kind = "after_yield",
level = depth,
retry = false,
);
if depth == 0 {
return;
}
nested_info(depth - 1).await;
}
.instrument(tracing::span!(
Level::INFO,
"nested",
depth_label = depth,
kind = "work",
phase = "active",
)),
)
}
fn nested_debug(depth: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(
async move {
tracing::event!(
Level::DEBUG,
kind = "before_yield",
level = depth,
retry = false,
);
tokio::task::yield_now().await;
tracing::event!(
Level::DEBUG,
kind = "after_yield",
level = depth,
retry = false,
);
if depth == 0 {
return;
}
nested_debug(depth - 1).await;
}
.instrument(tracing::span!(
Level::DEBUG,
"nested",
depth_label = depth,
kind = "work",
phase = "active",
)),
)
}
async fn nested_task_info(depth: u32) {
async move {
nested_info(depth).await;
}
.instrument(tracing::span!(parent: None, Level::INFO, "nested_root"))
.await;
}
async fn nested_task_debug(depth: u32) {
async move {
nested_debug(depth).await;
}
.instrument(tracing::span!(parent: None, Level::DEBUG, "nested_root"))
.await;
}
const NESTED_DEPTH: u32 = 3;
const NESTED_TASKS: usize = 4;
fn bench_single_thread(c: &mut Criterion) {
init_subscriber();
c.bench_function("single_thread/two_level_spans", |b| {
b.iter(|| two_level_spans());
});
}
fn bench_four_threads(c: &mut Criterion) {
use std::sync::Barrier;
init_subscriber();
c.bench_function("four_threads/two_level_spans", |b| {
b.iter_custom(|iters| {
let barrier = Arc::new(Barrier::new(5));
let handles: Vec<_> = (0..4usize)
.map(|_| {
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
for _ in 0..iters {
black_box(two_level_spans());
}
})
})
.collect();
barrier.wait(); let start = Instant::now();
for h in handles {
h.join().unwrap();
}
start.elapsed()
});
});
}
fn bench_four_async_tasks(c: &mut Criterion) {
init_subscriber();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
c.bench_function("four_async_tasks/instrumented_two_level_spans", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let start = Instant::now();
let handles: Vec<_> = (0..4usize)
.map(|_| {
tokio::spawn(async move {
for _ in 0..iters {
black_box(
two_level_async_spans()
.instrument(tracing::span!(
parent: None,
Level::INFO,
"bench_root"
))
.await,
);
}
})
})
.collect();
for h in handles {
h.await.unwrap();
}
start.elapsed()
});
});
}
fn bench_recursive_async_tasks(c: &mut Criterion) {
init_subscriber();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
let name = format!(
"recursive_async/per_task_in_batch_of_{}_depth_{}",
RECURSIVE_TASKS, RECURSIVE_DEPTH
);
c.bench_function(&name, |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let start = Instant::now();
for _ in 0..iters {
let mut handles = Vec::with_capacity(RECURSIVE_TASKS);
for _ in 0..RECURSIVE_TASKS {
handles.push(tokio::spawn(recursive_task(RECURSIVE_DEPTH)));
}
for h in handles {
h.await.unwrap();
}
}
start.elapsed() / RECURSIVE_TASKS as u32
});
});
}
criterion_group!(
span_throughput,
bench_single_thread,
bench_four_threads,
bench_four_async_tasks,
bench_recursive_async_tasks,
bench_nested_async_enabled,
bench_nested_async_disabled,
);
criterion_main!(span_throughput);
fn bench_nested_async_enabled(c: &mut Criterion) {
init_subscriber();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
let name = format!(
"nested_async_4t/four_levels_enabled_per_task_in_batch_of_{}",
NESTED_TASKS
);
c.bench_function(&name, |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let start = Instant::now();
for _ in 0..iters {
let mut handles = Vec::with_capacity(NESTED_TASKS);
for _ in 0..NESTED_TASKS {
handles.push(tokio::spawn(nested_task_info(NESTED_DEPTH)));
}
for h in handles {
h.await.unwrap();
}
}
start.elapsed() / NESTED_TASKS as u32
});
});
}
fn bench_nested_async_disabled(c: &mut Criterion) {
init_subscriber();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
let name = format!(
"nested_async_4t/four_levels_disabled_per_task_in_batch_of_{}",
NESTED_TASKS
);
c.bench_function(&name, |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let start = Instant::now();
for _ in 0..iters {
let mut handles = Vec::with_capacity(NESTED_TASKS);
for _ in 0..NESTED_TASKS {
handles.push(tokio::spawn(nested_task_debug(NESTED_DEPTH)));
}
for h in handles {
h.await.unwrap();
}
}
start.elapsed() / NESTED_TASKS as u32
});
});
}