1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use dbuff::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
// --- Domain data: the shared state your app reads from ---
// We use TaskStatus<()> because these are fire-and-forget tasks —
// we care about whether they finished or got cancelled, not about a return value.
#[derive(Debug, Clone, Default)]
struct AppData {
task_statuses: HashMap<String, TaskStatus<()>>,
}
#[tokio::main]
async fn main() {
// --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
let rt = tokio::runtime::Handle::current();
// --- SharedDomainData: dbuff's coalesced state wrapper ---
// Wraps AppData so reads/writes are batched at 500µs intervals
let (domain, wh) =
SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
tokio::spawn(wh.run()); // starts the coalescing worker
// --- Shared cancellation flag ---
// Cooperative tasks check this flag in their loop and exit cleanly
// when it becomes true. The stubborn task ignores it.
let cancelled = Arc::new(AtomicBool::new(false));
// --- Clone for the callback closure (moved into TaskPool) ---
let domain_for_pool = domain.clone();
// --- TaskPool<String, ()> ---
// K = String → task identifier (e.g. "cooperative-0", "stubborn")
// T = () → no return value (fire-and-forget tasks)
//
// The callback fires on every status transition:
// TaskStatus::Pending → task started
// TaskStatus::Resolved(()) → task finished
// TaskStatus::Aborted → task was cancelled (by abort() or re-spawn)
// TaskStatus::Error(...) → task panicked
let pool: TaskPool<String, ()> = TaskPool::new(rt, move |key: &String, status| {
// ^^^ key is a reference to the K you passed to spawn()
// ^^^^^^ status is TaskStatus<T> = TaskStatus<()>
let key = key.clone();
domain_for_pool.modify(move |d: &mut AppData| {
d.task_statuses.insert(key, status);
});
});
// --- Spawn 3 cooperative tasks ---
// Each one loops and checks the `cancelled` flag.
// When the flag flips to true, they exit cleanly and the callback
// fires TaskStatus::Resolved(()).
for i in 0..3 {
let cancelled = cancelled.clone(); // each task gets its own Arc reference
pool.spawn(format!("cooperative-{i}"), move || {
async move {
loop {
// --- Cooperative cancellation check ---
// This is the pattern for long-running tasks:
// periodically check a shared flag and exit early.
if cancelled.load(Ordering::Relaxed) {
println!(" cooperative-{i}: cancellation observed, exiting cleanly");
return; // ← returns (), which becomes TaskStatus::Resolved(())
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
});
}
// --- Spawn 1 stubborn task ---
// This task ignores the cancellation flag and sleeps for 10 seconds.
// When shutdown() times out, tokio force-aborts it via JoinHandle::abort().
pool.spawn("stubborn".to_string(), || {
async move {
tokio::time::sleep(Duration::from_secs(10)).await;
// This line should never be reached — shutdown will abort us first
println!(" stubborn: completed (unexpected!)");
}
});
// --- Wait for tasks to start ---
tokio::time::sleep(Duration::from_millis(50)).await;
println!(" 3 cooperative + 1 stubborn task running");
// --- Trigger cooperative cancellation ---
println!("\n=== Setting cancellation flag ===");
cancelled.store(true, Ordering::Relaxed);
println!(" flag set to true — cooperative tasks will exit on next loop iteration");
// --- Graceful shutdown ---
// Waits for all tasks to finish. Tasks that don't finish within the
// timeout are force-aborted via tokio's JoinHandle::abort().
println!("\n=== Calling shutdown(200ms) ===");
let shutdown_start = std::time::Instant::now();
pool.shutdown(Duration::from_millis(200)).await;
let elapsed = shutdown_start.elapsed();
println!(" shutdown completed in {}ms", elapsed.as_millis());
// --- Verify results via shared state ---
println!("\n=== Checking results ===");
let snap = domain.read();
// All 3 cooperative tasks should have resolved (they exited cleanly)
for i in 0..3 {
let key = format!("cooperative-{i}");
let status = snap.task_statuses.get(&key);
match status {
Some(TaskStatus::Resolved(())) => println!(" {key}: resolved ✓"),
other => panic!(" {key}: expected Resolved(()), got {other:?}"),
}
}
// The stubborn task should have been aborted (it was still sleeping
// when the 200ms shutdown timeout expired)
match snap.task_statuses.get("stubborn") {
Some(TaskStatus::Pending) => println!(" stubborn: still pending (force-aborted before callback) ✓"),
Some(TaskStatus::Aborted) => println!(" stubborn: aborted ✓"),
other => panic!(" stubborn: expected Pending or Aborted, got {other:?}"),
}
// Shutdown should be fast — cooperative tasks exit within ~10ms
assert!(
elapsed.as_millis() < 500,
"shutdown should be fast (cooperative tasks exit quickly)"
);
println!("\nAll examples passed!");
}