use taskflow_rs::pipeline::ConcurrentPipeline;
use std::thread;
use std::time::{Duration, Instant};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
fn main() {
println!("=== Pipeline Benchmarks ===\n");
benchmark_throughput();
println!();
benchmark_parallel_scaling();
println!();
benchmark_backpressure();
}
fn benchmark_throughput() {
println!("1. Throughput Benchmark");
println!(" Measuring items/second through pipeline\n");
let num_items = 10_000;
let pipeline = ConcurrentPipeline::new(100, num_items);
let producer_pipeline = pipeline.clone();
let consumer_pipeline = pipeline.clone();
let processed = Arc::new(AtomicUsize::new(0));
let processed_clone = Arc::clone(&processed);
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 0..num_items {
producer_pipeline.push(i).unwrap();
}
producer_pipeline.stop();
});
let consumer = thread::spawn(move || {
while !consumer_pipeline.is_stopped() || consumer_pipeline.tokens_in_flight() > 0 {
if let Some(_token) = consumer_pipeline.try_pop() {
processed_clone.fetch_add(1, Ordering::Relaxed);
} else {
thread::sleep(Duration::from_micros(1));
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
let elapsed = start.elapsed();
let throughput = num_items as f64 / elapsed.as_secs_f64();
println!(" Processed: {} items", processed.load(Ordering::Relaxed));
println!(" Time: {:?}", elapsed);
println!(" Throughput: {:.2} items/sec", throughput);
}
fn benchmark_parallel_scaling() {
println!("2. Parallel Scaling Benchmark");
println!(" Testing throughput with different worker counts\n");
let num_items = 5_000;
let work_time_us = 100;
for num_workers in [1, 2, 4, 8] {
let input = ConcurrentPipeline::new(100, num_items);
let output = ConcurrentPipeline::new(100, num_items);
let start = Instant::now();
let mut workers = Vec::new();
for _ in 0..num_workers {
let i = input.clone();
let o = output.clone();
let worker = thread::spawn(move || {
while !i.is_stopped() || i.tokens_in_flight() > 0 {
if let Some(token) = i.try_pop() {
thread::sleep(Duration::from_micros(work_time_us));
o.push(token.data).ok();
} else {
thread::sleep(Duration::from_micros(10));
}
}
});
workers.push(worker);
}
let input_clone = input.clone();
let producer = thread::spawn(move || {
for i in 0..num_items {
input_clone.push(i).unwrap();
}
input_clone.stop();
});
let c = output.clone();
let processed = Arc::new(AtomicUsize::new(0));
let p = Arc::clone(&processed);
let consumer = thread::spawn(move || {
while !c.is_stopped() || c.tokens_in_flight() > 0 {
if let Some(_token) = c.try_pop() {
p.fetch_add(1, Ordering::Relaxed);
} else {
thread::sleep(Duration::from_micros(10));
}
}
});
producer.join().unwrap();
for worker in workers {
worker.join().unwrap();
}
output.stop(); consumer.join().unwrap();
let elapsed = start.elapsed();
let throughput = num_items as f64 / elapsed.as_secs_f64();
println!(" {} worker(s): {:.2} items/sec ({:?})",
num_workers, throughput, elapsed);
}
}
fn benchmark_backpressure() {
println!("3. Backpressure Benchmark");
println!(" Testing behavior under different buffer sizes\n");
let num_items = 1_000;
for buffer_size in [5, 20, 100] {
let pipeline = ConcurrentPipeline::new(buffer_size, num_items);
let p = pipeline.clone();
let backpressure_hits = Arc::new(AtomicUsize::new(0));
let bp_clone = Arc::clone(&backpressure_hits);
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 0..num_items {
match p.push(i) {
Ok(_) => {},
Err(_) => {
bp_clone.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_micros(10));
}
}
}
p.stop();
});
let c = pipeline.clone();
thread::sleep(Duration::from_millis(10));
let consumer = thread::spawn(move || {
while !c.is_stopped() || c.tokens_in_flight() > 0 {
if let Some(_token) = c.try_pop() {
thread::sleep(Duration::from_micros(50)); } else {
thread::sleep(Duration::from_micros(10));
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
let elapsed = start.elapsed();
let bp_hits = backpressure_hits.load(Ordering::Relaxed);
println!(" Buffer size {}: {} backpressure events ({:?})",
buffer_size, bp_hits, elapsed);
}
}
#[allow(dead_code)]
fn benchmark_memory_pressure() {
println!("4. Memory Pressure");
println!(" Testing with large token counts\n");
let num_items = 100_000;
let pipeline = ConcurrentPipeline::new(1000, num_items);
let p = pipeline.clone();
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 0..num_items {
p.push(vec![i; 100]).unwrap(); }
p.stop();
});
let c = pipeline.clone();
let processed = Arc::new(AtomicUsize::new(0));
let proc_clone = Arc::clone(&processed);
let consumer = thread::spawn(move || {
while !c.is_stopped() || c.tokens_in_flight() > 0 {
if let Some(_token) = c.try_pop() {
proc_clone.fetch_add(1, Ordering::Relaxed);
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
let elapsed = start.elapsed();
println!(" Processed: {} large items", processed.load(Ordering::Relaxed));
println!(" Time: {:?}", elapsed);
println!(" Avg: {:.2} µs/item",
elapsed.as_micros() as f64 / num_items as f64);
}