TaskFlow-RS
A Rust implementation of TaskFlow — a general-purpose task-parallel programming library with heterogeneous CPU/GPU support.
Features
- ✅ Task Graph Construction — DAGs with flexible dependency management
- ✅ Lock-Free Work-Stealing Executor — Per-worker queues, near-linear scalability
- ✅ Subflows — Nested task graphs for recursive parallelism
- ✅ Condition Tasks — Conditional branching and loop constructs
- ✅ Cycle Detection — Catches cycles at graph construction time
- ✅ Parallel Algorithms —
for_each,reduce,transform,sort,scan - ✅ Async Task Support — Full
async/awaitintegration with Tokio - ✅ Pipeline Support — Stream processing with parallel/serial stages and backpressure
- ✅ Composition — Reusable, parameterized task graph components
- ✅ GPU Support — CUDA, OpenCL, and ROCm/HIP with async transfers and multiple streams
- ✅ Task Priorities — Static
Low / Normal / High / Criticalpriority levels - ✅ Preemptive Cancellation — Watchdog timeouts, RAII deadline guards, signal preemption
- ✅ Dynamic Priority Adjustment — O(log n) live reprioritization of queued tasks
- ✅ Hardware Topology Integration — hwloc2 / sysfs cache hierarchy, NUMA binding, CPU affinity
- ✅ Tooling — Profiler, DOT/SVG/HTML visualization, performance monitoring, debug logging
Quick Start
[]
= "0.2"
Basic task graph
use ;
Preemptive Cancellation
TaskFlow-RS provides three escalating levels of cancellation, all sharing the same
PreemptiveCancellationToken type.
Level 1 — Cooperative (polling)
use PreemptiveCancellationToken;
let token = new;
let t = token.clone;
spawn;
token.cancel_with;
Level 2 — Watchdog timeout
use PreemptiveCancellationToken;
use Duration;
let token = new;
// Automatically cancel after 500 ms — no manual polling needed.
token.cancel_after_with;
// The task checks at its own pace:
let t = token.clone;
spawn;
Level 3 — RAII deadline guard
use PreemptiveCancellationToken;
use Duration;
let token = new;
// ← token.cancel() fires here if elapsed > 100 ms
// Or use the scoped helper for a closure:
let result = with_deadline;
Token reuse
token.cancel_after;
// ... run task ...
token.reset; // clear the cancelled flag and reason
token.cancel_after;
// ... run again ...
Signal-based preemption (Linux)
// Call once at program startup:
unsafe
// In the task, check the per-thread SIGUSR2 flag:
check_signal?;
// From another thread, preempt a specific OS thread:
signal_preempt_thread;
Run the demo:
Dynamic Priority Adjustment
SharedDynamicScheduler allows the priority of any queued task to be changed in O(log n)
at any time, including from other threads.
Basic usage
use ;
let sched = new;
sched.push;
sched.push;
let handle = sched.push;
// Task 3 became urgent — escalate it before the executor picks it up.
handle.reprioritize;
// Pop order: 3 (Critical), 2 (Normal), 1 (Low)
while let Some = sched.pop
FIFO ordering within equal priority
Sequence numbers assigned at push time are preserved through reprioritization, so tasks at the same priority level always execute in insertion order:
let h_old = sched.push; // seq=0
let _h_new = sched.push; // seq=1
// Escalate both to High — seq=0 stays with task 10.
h_old.reprioritize;
sched.push; // seq=2
// Pop order: 10 (High, seq=0), 30 (High, seq=2), 20 (Normal)
Cancelling a queued task
let handle = sched.push;
// Changed our mind — remove it before execution.
handle.cancel;
assert!;
Anti-starvation escalation policy
use EscalationPolicy;
let mut policy = new;
// Call inside the scheduler loop:
loop
Run the demo:
Hardware Topology Integration
TopologyProvider exposes full hardware topology (NUMA nodes, CPU packages, L1/L2/L3
cache hierarchy) and can pin threads to specific CPU sets.
Topology discovery
use ;
let topo = detect;
// Uses hwloc2 when compiled with --features hwloc,
// falls back to /sys parsing otherwise.
println!; // "hwloc2 2.2.0" or "sysfs-fallback"
println!;
println!;
for node in topo.numa_nodes
for pkg in topo.packages
for cache in topo.cache_info
Worker CPU affinity
use ;
let topo = detect;
let num_workers = 8;
let affinity = new;
// Inside each worker thread at startup:
affinity.pin_current_thread?;
// Or just query the CPU set without binding:
let cpus = affinity.cpus_for_worker;
println!;
Available strategies:
| Strategy | Behaviour |
|---|---|
None |
No binding; OS decides |
NUMARoundRobin |
Distribute workers evenly across NUMA nodes |
NUMADense |
Fill each NUMA node before moving to the next |
PhysicalCores |
Pin to physical cores; skip hyperthreading siblings |
L3CacheDomain |
Assign workers to L3 cache sharing groups |
Enabling hwloc
# Ubuntu / Debian
# Fedora / RHEL
# macOS
# Build with full hwloc support
Without --features hwloc the sysfs fallback is used automatically — no code changes
required. topo.is_hwloc_backed() returns false so callers can log the difference.
Run the demo:
Static Task Priorities
For simpler use cases where priorities are fixed at enqueue time, use the built-in
PriorityScheduler:
use ;
let mut scheduler = new;
scheduler.push;
scheduler.push;
scheduler.push;
// Pop order: 3 (Critical), 2 (High), 1 (Low)
For live reprioritization use SharedDynamicScheduler instead — it implements the same
Scheduler trait and is a drop-in replacement.
GPU Support
TaskFlow-RS provides a backend-agnostic GPU API supporting CUDA (NVIDIA), OpenCL (NVIDIA/AMD/Intel), and ROCm/HIP (AMD).
Building
# CUDA (NVIDIA) — default CUDA 12.0
# OpenCL (NVIDIA / AMD / Intel)
# Ubuntu: sudo apt install ocl-icd-opencl-dev
# ROCm / HIP (AMD) — requires ROCm ≥ 5.0
ROCM_PATH=/opt/rocm
# All backends
# Stub backend (always available, no flags needed)
Usage
use ;
// Auto-select: CUDA → ROCm → OpenCL → Stub
let device = new?;
// Force a specific backend
let device = with_backend?;
// Allocate and transfer
let mut buf: = allocate?;
buf.copy_from_host?;
// Async transfer via stream
let stream = device.create_stream?;
unsafe
stream.synchronize?;
Async Support
[]
= { = "0.2", = ["async"] }
= { = "1", = ["full"] }
use ;
async
Tooling
use ;
// Profiling
let profiler = new;
profiler.enable;
executor.run.wait;
let profile = profiler.get_profile.unwrap;
println!;
// Visualization
generate_dot_graph;
// dot -Tsvg graph.dot -o graph.svg
// Real-time metrics
let metrics = new;
println!;
println!;
Building
# Core (no GPU, no hwloc)
# With hwloc topology
# With CUDA GPU
# Everything
# Release
# Tests
Examples
# New scheduling features
# Advanced features (priorities, cooperative cancellation, NUMA)
# Async
# GPU (stub — no hardware required)
# GPU with hardware
Architecture
Work-stealing executor
Worker 0: [Task] [Task] [Task] ← push/pop own queue (LIFO, cache-warm)
↓ steal (FIFO)
Worker 1: [Task] [Task] ← idle workers steal from busy ones
Scheduling layer
SharedDynamicScheduler
├── index: BTreeMap<(RevPriority, SeqNum), TaskId> O(log n) pop
└── reverse: HashMap<TaskId, (RevPriority, SeqNum)> O(1) lookup
PriorityHandle ──weak──► SharedDynamicScheduler
reprioritize() / cancel() without owning the queue
EscalationPolicy ──tick──► sched.reprioritize(id, bumped_priority)
anti-starvation for Low / Normal tasks
Preemptive cancellation
PreemptiveCancellationToken
├── Arc<AtomicBool> ← check() fast path: one Acquire load
├── Condvar ← watchdog sleep / early wake on manual cancel
└── cancel_after() ── spawns watchdog thread
wait_timeout → drop(guard) → cancel_with_reason()
↑ guard dropped BEFORE notify
(prevents self-deadlock)
Hardware topology
TopologyProvider::detect()
├── --features hwloc → HwlocBackend (hwloc2 = "2.2.0")
└── default → SysfsBackend (/sys/devices/system/cpu/*/cache/)
HwlocWorkerAffinity
├── cpus_for_worker(id) → Vec<usize>
└── pin_current_thread(id) → pthread_setaffinity_np (Linux)
hwloc set_cpubind THREAD (hwloc backend)
GPU backend
GpuDevice ──Arc<dyn ComputeBackend>──► CudaBackend (--features gpu)
OpenCLBackend (--features opencl)
RocmBackend (--features rocm)
StubBackend (always)
Comparison with C++ TaskFlow
| Feature | C++ TaskFlow | TaskFlow-RS |
|---|---|---|
| Task Graphs | ✅ | ✅ |
| Work-Stealing | ✅ | ✅ |
| Subflows | ✅ | ✅ |
| Condition Tasks | ✅ | ✅ |
| Parallel Algorithms | ✅ | ✅ |
| Async Tasks | ✅ | ✅ |
| Pipeline | ✅ | ✅ |
| GPU — CUDA / OpenCL / ROCm | ✅ | ✅ |
| Async GPU Transfers | ✅ | ✅ |
| Multiple GPU Streams | ✅ | ✅ |
| Task Priorities | ✅ | ✅ |
| Cooperative Cancellation | ✅ | ✅ |
| Preemptive Cancellation | ✅ | ✅ |
| Dynamic Priority Adjustment | ✅ | ✅ |
| Hardware Topology (hwloc) | ✅ | ✅ |
| NUMA-Aware Scheduling | ✅ | ✅ |
Documentation
| Document | Contents |
|---|---|
| DESIGN.md | Architecture, implementation status, design decisions |
| ADVANCED_FEATURES.md | Priorities, cancellation, schedulers, NUMA |
| GPU.md | Full GPU API: backends, streams, async transfers |
| GPU_SETUP.md | CUDA version config, ROCm install, troubleshooting |
| ASYNC_TASKS.md | Async executor and task documentation |
| PIPELINE.md | Concurrent pipeline documentation |
| TOOLING.md | Profiler, visualization, monitoring |
License
MIT