use std::sync::{Arc, Mutex};
use std::time::Instant;
use taskflow_rs::{Executor, Taskflow};
use taskflow_rs::gpu::{
GpuDevice, GpuBuffer, BackendKind,
};
fn demo_stream_pool() {
println!("\n╔══════════════════════════════════════════════════════╗");
println!("║ Demo 1: Stream Pool — Round-Robin Assignment ║");
println!("╚══════════════════════════════════════════════════════╝\n");
let device = match GpuDevice::new(0) {
Ok(d) => { println!(" ✓ Device: {} ({:?})", d.name(), d.backend_kind()); d }
Err(e) => { println!(" ✗ No GPU: {} (using stub)", e); return; }
};
let pool = device.stream_pool(4).expect("stream pool");
println!(" ✓ StreamPool created with {} streams\n", pool.len());
let num_batches = 8;
let batch_size = 256 * 1024;
let start = Instant::now();
for batch_idx in 0..num_batches {
let guard = pool.acquire().expect("acquire stream");
let stream = guard.stream();
let src: Vec<f32> = (0..batch_size)
.map(|i| (batch_idx * batch_size + i) as f32)
.collect();
let mut buf: GpuBuffer<f32> = GpuBuffer::allocate(&device, batch_size)
.expect("allocate");
unsafe {
buf.copy_from_host_async(&src, stream).expect("h2d async");
}
println!(
" Batch {:2}: enqueued H2D on stream {} (pending: {})",
batch_idx,
stream.id(),
guard.pending_ops()
);
}
pool.synchronize_all().expect("sync all");
println!("\n ✓ All batches transferred in {:.2?}", start.elapsed());
}
fn demo_double_buffer_pipeline() {
println!("\n╔══════════════════════════════════════════════════════╗");
println!("║ Demo 2: Double-Buffer Pipeline ║");
println!("╚══════════════════════════════════════════════════════╝\n");
let device = match GpuDevice::new(0) {
Ok(d) => d,
Err(e) => { println!(" ✗ No GPU: {}", e); return; }
};
const DEPTH: usize = 2; const BATCH_SIZE: usize = 512 * 1024; const N_BATCHES: usize = 6;
let stream_set = device.stream_set(DEPTH, "pipeline").expect("stream_set");
let mut host_src = [
vec![0.0f32; BATCH_SIZE],
vec![0.0f32; BATCH_SIZE],
];
let mut host_dst = [
vec![0.0f32; BATCH_SIZE],
vec![0.0f32; BATCH_SIZE],
];
let mut dev_bufs: Vec<GpuBuffer<f32>> = (0..DEPTH)
.map(|_| GpuBuffer::allocate(&device, BATCH_SIZE).expect("allocate"))
.collect();
let start = Instant::now();
for batch in 0..N_BATCHES {
let slot = batch % DEPTH;
let stream = stream_set.get(batch);
if batch >= DEPTH {
stream.synchronize().expect("sync slot");
}
for (i, v) in host_src[slot].iter_mut().enumerate() {
*v = (batch * BATCH_SIZE + i) as f32 * 0.001;
}
unsafe {
dev_bufs[slot]
.copy_from_host_async(&host_src[slot], stream)
.expect("h2d async");
}
unsafe {
dev_bufs[slot]
.copy_to_host_async(&mut host_dst[slot], stream)
.expect("d2h async");
}
println!(
" Batch {:2}: slot={} stream={} — H2D+D2H enqueued",
batch, slot, stream.id()
);
}
stream_set.synchronize_all().expect("final sync");
println!("\n ✓ Double-buffer pipeline complete in {:.2?}", start.elapsed());
}
fn demo_taskflow_multi_stream() {
println!("\n╔══════════════════════════════════════════════════════╗");
println!("║ Demo 3: Multi-Stream inside a TaskFlow DAG ║");
println!("╚══════════════════════════════════════════════════════╝\n");
let device = match GpuDevice::new(0) {
Ok(d) => d,
Err(e) => { println!(" ✗ No GPU: {}", e); return; }
};
const N_STREAMS: usize = 3;
const BATCH_SIZE: usize = 128 * 1024;
let shared_data: Arc<Mutex<Vec<Vec<f32>>>> = Arc::new(Mutex::new(vec![]));
let pool = Arc::new(device.stream_pool(N_STREAMS).expect("pool"));
let device = Arc::new(device);
let mut executor = Executor::new(4);
let mut taskflow = Taskflow::new();
let data_ref = Arc::clone(&shared_data);
let generate = taskflow.emplace(move || {
let mut data = data_ref.lock().unwrap();
for s in 0..N_STREAMS {
let batch: Vec<f32> = (0..BATCH_SIZE).map(|i| (s * BATCH_SIZE + i) as f32).collect();
data.push(batch);
}
println!(" [CPU] Generated {} batches of {} elements", N_STREAMS, BATCH_SIZE);
});
let mut gpu_tasks = Vec::new();
for stream_idx in 0..N_STREAMS {
let data_ref = Arc::clone(&shared_data);
let pool_ref = Arc::clone(&pool);
let dev_ref = Arc::clone(&device);
let gpu_task = taskflow.emplace(move || {
let guard = pool_ref.acquire().expect("acquire");
let stream = guard.stream();
let data = data_ref.lock().unwrap();
let src = &data[stream_idx];
let mut buf = GpuBuffer::allocate(&dev_ref, src.len())
.expect("alloc");
unsafe {
buf.copy_from_host_async(src, stream).expect("h2d");
}
let mut result = vec![0.0f32; src.len()];
unsafe {
buf.copy_to_host_async(&mut result, stream).expect("d2h");
}
stream.synchronize().expect("sync stream");
println!(
" [GPU stream {}] Processed {} elements on {}",
stream.id(), src.len(), dev_ref.name()
);
});
gpu_tasks.push(gpu_task);
}
let validate = taskflow.emplace(|| {
println!(" [CPU] Validation complete ✓");
});
for gpu_task in &gpu_tasks {
generate.precede(gpu_task);
gpu_task.precede(&validate);
}
let start = Instant::now();
executor.run(&taskflow).wait();
println!("\n ✓ TaskFlow multi-stream DAG complete in {:.2?}", start.elapsed());
}
fn demo_backend_selection() {
println!("\n╔══════════════════════════════════════════════════════╗");
println!("║ Demo 4: Runtime Backend Selection ║");
println!("╚══════════════════════════════════════════════════════╝\n");
let candidates = [
BackendKind::Cuda,
BackendKind::Rocm,
BackendKind::OpenCL,
BackendKind::Stub,
];
for kind in &candidates {
match GpuDevice::with_backend(0, *kind) {
Ok(dev) => println!(
" ✓ {:<10} — {} | memory: {:.1} GB",
format!("{:?}", kind),
dev.name(),
dev.memory_info()
.map(|(_, t)| t as f64 / 1e9)
.unwrap_or(0.0)
),
Err(e) => println!(" ✗ {:<10} — {}", format!("{:?}", kind), e),
}
}
}
fn main() {
let _ = env_logger::try_init();
println!("┌─────────────────────────────────────────────────────┐");
println!("│ TaskFlow-RS — Async Transfers & Multi-Stream Demo │");
println!("└─────────────────────────────────────────────────────┘");
demo_backend_selection();
demo_stream_pool();
demo_double_buffer_pipeline();
demo_taskflow_multi_stream();
println!("\n═══════════════════════════════════════════════════════");
println!("All demos complete.");
}