use taskflow_rs::{
Executor, Taskflow, Priority, CancellationToken,
PriorityScheduler, Scheduler, NumaTopology
};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::time::Duration;
use std::thread;
fn main() {
println!("=== Advanced Features Demo ===\n");
demo_task_priorities();
println!();
demo_task_cancellation();
println!();
demo_custom_scheduler();
println!();
demo_numa_awareness();
}
fn demo_task_priorities() {
println!("1. Task Priorities");
println!(" Demonstrating priority-based scheduling\n");
let mut executor = Executor::new(2);
let execution_order = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut taskflow = Taskflow::new();
let order1 = execution_order.clone();
let _low_priority = taskflow.emplace(move || {
order1.lock().unwrap().push("Low");
println!(" [Low Priority] Executing...");
thread::sleep(Duration::from_millis(10));
}).name("low_priority");
let order2 = execution_order.clone();
let _normal_priority = taskflow.emplace(move || {
order2.lock().unwrap().push("Normal");
println!(" [Normal Priority] Executing...");
thread::sleep(Duration::from_millis(10));
}).name("normal_priority");
let order3 = execution_order.clone();
let _high_priority = taskflow.emplace(move || {
order3.lock().unwrap().push("High");
println!(" [High Priority] Executing...");
thread::sleep(Duration::from_millis(10));
}).name("high_priority");
let order4 = execution_order.clone();
let _critical_priority = taskflow.emplace(move || {
order4.lock().unwrap().push("Critical");
println!(" [Critical Priority] Executing...");
thread::sleep(Duration::from_millis(10));
}).name("critical_priority");
executor.run(&taskflow).wait();
let final_order = execution_order.lock().unwrap();
println!("\n Execution order: {:?}", *final_order);
println!(" ✓ Priority levels: Low < Normal < High < Critical");
}
fn demo_task_cancellation() {
println!("2. Task Cancellation");
println!(" Cancelling tasks before and during execution\n");
let mut executor = Executor::new(4);
let token = CancellationToken::new();
let completed = Arc::new(AtomicUsize::new(0));
let cancelled = Arc::new(AtomicUsize::new(0));
for iteration in 0..5 {
let mut taskflow = Taskflow::new();
let t = token.clone();
let comp = completed.clone();
let canc = cancelled.clone();
taskflow.emplace(move || {
println!(" [Iteration {}] Starting work...", iteration);
for step in 0..10 {
if t.is_cancelled() {
println!(" [Iteration {}] Cancelled at step {}", iteration, step);
canc.fetch_add(1, Ordering::Relaxed);
return;
}
thread::sleep(Duration::from_millis(20));
}
comp.fetch_add(1, Ordering::Relaxed);
println!(" [Iteration {}] Completed", iteration);
});
if iteration == 2 {
println!("\n >>> Cancelling remaining tasks...\n");
token.cancel();
}
executor.run(&taskflow).wait();
}
println!("\n Tasks completed: {}", completed.load(Ordering::Relaxed));
println!(" Tasks cancelled: {}", cancelled.load(Ordering::Relaxed));
println!(" ✓ Cancellation working correctly");
}
fn demo_custom_scheduler() {
println!("3. Custom Scheduler");
println!(" Using priority-based scheduler\n");
let mut scheduler = PriorityScheduler::new();
println!(" Adding tasks:");
scheduler.push(1, Priority::Low);
println!(" Task 1: Low priority");
scheduler.push(2, Priority::High);
println!(" Task 2: High priority");
scheduler.push(3, Priority::Normal);
println!(" Task 3: Normal priority");
scheduler.push(4, Priority::Critical);
println!(" Task 4: Critical priority");
scheduler.push(5, Priority::Normal);
println!(" Task 5: Normal priority");
println!("\n Execution order (by priority):");
let mut order = Vec::new();
while let Some(task_id) = scheduler.pop() {
order.push(task_id);
println!(" Task {}", task_id);
}
assert_eq!(order, vec![4, 2, 3, 5, 1]);
println!("\n ✓ Custom scheduler working correctly");
}
fn demo_numa_awareness() {
println!("4. NUMA Awareness");
println!(" Detecting and utilizing NUMA topology\n");
let topology = NumaTopology::detect();
println!(" System Information:");
println!(" CPU cores: {}", num_cpus::get());
println!(" NUMA nodes: {}", topology.num_nodes);
println!(" NUMA available: {}", topology.has_numa());
if topology.has_numa() {
println!("\n NUMA Node Details:");
for node in &topology.nodes {
println!(" Node {}: {} CPUs", node.id, node.cpus.len());
println!(" CPUs: {:?}", node.cpus);
}
} else {
println!(" (Uniform Memory Access system detected)");
}
println!("\n Worker Pinning Strategies:");
let num_workers = 4;
use taskflow_rs::numa::{get_worker_cpus, NumaPinning};
println!(" Round-Robin:");
for worker_id in 0..num_workers {
let cpus = get_worker_cpus(worker_id, num_workers, &topology, NumaPinning::RoundRobin);
if !cpus.is_empty() {
println!(" Worker {}: CPU {:?}", worker_id, cpus);
}
}
println!(" Dense:");
for worker_id in 0..num_workers {
let cpus = get_worker_cpus(worker_id, num_workers, &topology, NumaPinning::Dense);
if !cpus.is_empty() {
println!(" Worker {}: CPU {:?}", worker_id, cpus);
}
}
println!("\n ✓ NUMA awareness configured");
}