use taskflow_rs::{
Executor, Taskflow,
PreemptiveCancellationToken, DeadlineGuard, Preempted, with_deadline,
};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::time::Duration;
use std::thread;
fn main() {
println!("=== Preemptive Cancellation Demo ===\n");
demo_watchdog_timeout();
println!();
demo_manual_cancel_beats_watchdog();
println!();
demo_deadline_guard_raii();
println!();
demo_with_deadline_scoped();
println!();
demo_parallel_tasks_with_shared_token();
println!();
demo_reset_and_reuse();
}
fn demo_watchdog_timeout() {
println!("1. Watchdog timeout");
println!(" Task runs until watchdog fires after 100 ms\n");
let token = PreemptiveCancellationToken::new();
let iters = Arc::new(AtomicUsize::new(0));
token.cancel_after_with(Duration::from_millis(100), "budget exceeded");
let t = token.clone();
let ctr = iters.clone();
let handle = thread::spawn(move || {
for i in 0..1_000_000 {
if t.is_cancelled() {
println!(" [worker] preempted at iteration {} (reason: {:?})", i, t.reason());
break;
}
ctr.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_micros(50)); }
});
handle.join().unwrap();
println!(" Completed iterations: {}", iters.load(Ordering::Relaxed));
println!(" ✓ Watchdog cancelled the task automatically");
}
fn demo_manual_cancel_beats_watchdog() {
println!("2. Manual cancel before watchdog");
println!(" Watchdog set to 5 s; manual cancel fires at 50 ms\n");
let token = PreemptiveCancellationToken::new();
token.cancel_after(Duration::from_secs(5));
let t = token.clone();
let handle = thread::spawn(move || {
let mut i = 0u64;
loop {
match t.check() {
Err(Preempted { reason }) => {
println!(" [worker] cancelled at i={} (reason: {:?})", i, reason);
break;
}
Ok(()) => {}
}
i += 1;
thread::sleep(Duration::from_millis(10));
}
});
thread::sleep(Duration::from_millis(50));
token.cancel_with("user requested stop");
handle.join().unwrap();
println!(" ✓ Manual cancel took effect before watchdog");
}
fn demo_deadline_guard_raii() {
println!("3. DeadlineGuard RAII");
println!(" Guard auto-cancels when scope exits past budget\n");
let token = PreemptiveCancellationToken::new();
{
let _guard: DeadlineGuard = token.deadline_guard(Duration::from_millis(100));
thread::sleep(Duration::from_millis(200)); }
println!(" Token cancelled after guard drop: {}", token.is_cancelled());
assert!(token.is_cancelled());
println!(" ✓ DeadlineGuard correctly cancelled the token");
}
fn demo_with_deadline_scoped() {
println!("4. with_deadline() scoped helper");
println!(" Runs a closure with a time budget; returns Err on timeout\n");
let completed = Arc::new(AtomicUsize::new(0));
let c = completed.clone();
let result = with_deadline(Duration::from_millis(120), |tok| {
for step in 0u32..100 {
tok.check()?; thread::sleep(Duration::from_millis(20));
c.fetch_add(1, Ordering::Relaxed);
println!(" [step {}] working...", step);
}
Ok(42u32)
});
match result {
Err(ref e) => println!(" Task preempted: {}", e),
Ok(v) => println!(" Task finished: result={}", v),
}
println!(" Steps completed: {}", completed.load(Ordering::Relaxed));
println!(" ✓ with_deadline correctly interrupted the closure");
}
fn demo_parallel_tasks_with_shared_token() {
println!("5. Shared token across parallel tasks");
println!(" One token cancels all tasks in a Taskflow\n");
let token = PreemptiveCancellationToken::new();
let finished = Arc::new(AtomicUsize::new(0));
let cancelled_count = Arc::new(AtomicUsize::new(0));
token.cancel_after(Duration::from_millis(80));
let mut executor = Executor::new(4);
let mut taskflow = Taskflow::new();
for task_id in 0..6 {
let t = token.clone();
let fin = finished.clone();
let canc = cancelled_count.clone();
taskflow.emplace(move || {
let delay_ms = 20 + task_id * 15;
let steps = delay_ms / 5;
for _ in 0..steps {
if t.is_cancelled() {
canc.fetch_add(1, Ordering::Relaxed);
return;
}
thread::sleep(Duration::from_millis(5));
}
fin.fetch_add(1, Ordering::Relaxed);
});
}
executor.run(&taskflow).wait();
println!(" Tasks finished (within budget): {}", finished.load(Ordering::Relaxed));
println!(" Tasks cancelled (exceeded budget): {}", cancelled_count.load(Ordering::Relaxed));
println!(" ✓ Shared token coordinated cancellation across all workers");
}
fn demo_reset_and_reuse() {
println!("6. Token reset and reuse");
println!(" Same token reused across multiple task runs\n");
let token = PreemptiveCancellationToken::new();
for run in 0..3 {
token.reset();
token.cancel_after(Duration::from_millis(50));
let t = token.clone();
let steps = Arc::new(AtomicUsize::new(0));
let s = steps.clone();
let handle = thread::spawn(move || {
while !t.is_cancelled() {
s.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_millis(10));
}
});
handle.join().unwrap();
println!(" Run {}: {} steps before cancellation", run, steps.load(Ordering::Relaxed));
}
println!(" ✓ Token successfully reused 3 times");
}