1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use dbuff::*;
use kanal::unbounded;
use std::time::Duration;
// --- User: a domain type returned by tasks ---
// Shows that T can be any Send + 'static type, not just primitives.
// In a real app this might come from your database or an API.
#[derive(Debug)]
struct User {
name: String,
age: u32,
role: String,
}
// --- TaskEvent: a typed message sent through the channel ---
// The on_status_change callback wraps each (key, status) pair into
// this struct before sending it. This gives consumers a clean API
// instead of working with raw tuples.
struct TaskEvent {
key: String,
status: TaskStatus<User>,
}
#[tokio::main]
async fn main() {
// --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
let rt = tokio::runtime::Handle::current();
// --- Channel: callback pushes events here, main loop reads from here ---
// kanal::unbounded() returns a sync sender and sync receiver.
// The sync sender is perfect for use inside the TaskPool callback
// (which is a sync Fn closure). The sync receiver can be converted
// to an async receiver for use in an async event loop.
let (tx, rx) = unbounded();
// --- TaskPool<String, User> ---
// K = String → task identifier (e.g. "fetch-user", "fetch-config")
// T = User → task result (a structured domain type)
//
// Instead of writing into SharedDomainData, the callback sends a
// typed event through the channel. This decouples the producer
// (TaskPool) from the consumer (your event loop).
//
// Why use channels instead of SharedDomainData?
// - You want to process events in an async loop (e.g. forward to a websocket)
// - TaskPool lives in a different module than the consumer
// - You need to fan-out events to multiple listeners
// - You prefer push-based (channel) over pull-based (poll domain.read())
let pool: TaskPool<String, User> = TaskPool::new(rt, move |key: &String, status| {
// ^^^ key is a reference to the K you passed to spawn()
// ^^^^^^ status is TaskStatus<T> = TaskStatus<User>
//
// When the tasks complete, this closure gets called and you can send the results somewhere
// else via this channel.
let _ = tx.send(TaskEvent {
key: key.clone(),
status,
});
// Note: we use kanal's sync sender (tx) inside the callback.
// The sync send is non-blocking for unbounded channels.
// We ignore send errors — if the receiver is dropped, events are lost.
});
// --- Spawn tasks under different keys ---
// Each task simulates fetching a user and returning a User struct.
println!("=== Spawning tasks ===");
pool.spawn("fetch-user".to_string(), || async {
tokio::time::sleep(Duration::from_millis(50)).await;
User {
name: "Alice".into(),
age: 30,
role: "admin".into(),
}
});
pool.spawn("fetch-user-2".to_string(), || async {
tokio::time::sleep(Duration::from_millis(30)).await;
User {
name: "Bob".into(),
age: 25,
role: "editor".into(),
}
});
pool.spawn("fetch-user-3".to_string(), || async {
tokio::time::sleep(Duration::from_millis(70)).await;
User {
name: "Carol".into(),
age: 35,
role: "viewer".into(),
}
});
// --- Spawn a slow task that we'll abort ---
pool.spawn("slow-query".to_string(), || async {
tokio::time::sleep(Duration::from_secs(10)).await;
User {
name: "Nobody".into(),
age: 0,
role: "never".into(),
}
});
// --- Abort the slow task immediately ---
// We abort BEFORE processing events so the event loop sees:
// 4 × Pending (all four spawns)
// 3 × Resolved (the three fast fetches)
// 1 × Aborted (slow-query, cancelled by abort())
println!("\n=== Aborting slow-query ===");
let was_running = pool.abort(&"slow-query".to_string());
println!(" abort returned: {was_running} (true = task was found and cancelled)");
// --- Event loop: receive and process events from the channel ---
// Convert the sync receiver to an async receiver for use with .await.
// Each task produces exactly 2 events: Pending (on spawn) then
// Resolved/Aborted/Error (on completion).
println!("\n=== Event loop ===");
let mut event_count = 0;
let mut resolved_count = 0;
let mut aborted_count = 0;
// We expect 8 events total:
// 4 × Pending (one per spawn)
// 3 × Resolved (fetch-user, fetch-user-2, fetch-user-3)
// 1 × Aborted (slow-query — we aborted it above)
let async_rx = rx.to_async();
while event_count < 8 {
match async_rx.recv().await {
Ok(event) => {
event_count += 1;
match event.status {
TaskStatus::Idle => {
println!(" [{}] {}: idle", event_count, event.key);
}
TaskStatus::Pending => {
println!(" [{}] {}: started...", event_count, event.key);
}
TaskStatus::Resolved(ref user) => {
resolved_count += 1;
println!(
" [{}] {}: done → {} (age {}, role={})",
event_count, event.key, user.name, user.age, user.role
);
}
TaskStatus::Aborted => {
aborted_count += 1;
println!(" [{}] {}: cancelled", event_count, event.key);
}
TaskStatus::Error(ref e) => {
println!(" [{}] {}: failed → {}", event_count, event.key, e);
}
}
}
Err(_) => {
println!(" channel closed");
break;
}
}
}
// --- Verify results ---
println!("\n=== Summary ===");
println!(" total events: {event_count}");
println!(" resolved tasks: {resolved_count}");
println!(" aborted tasks: {aborted_count}");
assert_eq!(resolved_count, 3, "exactly 3 tasks should have resolved");
assert_eq!(aborted_count, 1, "exactly 1 task should have been aborted");
assert_eq!(
event_count, 8,
"should have received all 8 events (4 pending + 3 resolved + 1 aborted)"
);
// --- Graceful shutdown ---
println!("\n=== Shutting down ===");
pool.shutdown(Duration::from_secs(1)).await;
println!(" shutdown complete");
println!("\nAll examples passed!");
}