use dbuff::*;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone, Default)]
struct AppData {
task_statuses: HashMap<String, TaskStatus<i32>>,
}
#[tokio::main]
async fn main() {
let rt = tokio::runtime::Handle::current();
let (domain, wh) =
SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
tokio::spawn(wh.run());
let domain_for_pool = domain.clone();
let pool: TaskPool<String, i32> = TaskPool::new(rt, move |key: &String, status| {
let key = key.clone();
domain_for_pool.modify(move |d: &mut AppData| {
d.task_statuses.insert(key, status);
});
});
println!("=== Spawn a task ===");
pool.spawn("compute".to_string(), || async {
tokio::time::sleep(Duration::from_millis(50)).await;
42 });
loop {
let snap = domain.read();
match snap.task_statuses.get("compute") {
Some(TaskStatus::Pending) => {
println!(" compute: pending");
break;
}
None | Some(TaskStatus::Idle) => {}
other => panic!("unexpected status: {other:?}"),
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
loop {
let snap = domain.read();
match snap.task_statuses.get("compute") {
Some(TaskStatus::Resolved(v)) => {
println!(" compute: resolved ({v})");
break;
}
Some(TaskStatus::Pending) => {}
other => panic!("unexpected status: {other:?}"),
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
println!("\n=== Re-spawn same key ===");
pool.spawn("compute".to_string(), || async {
tokio::time::sleep(Duration::from_millis(50)).await;
99 });
loop {
let snap = domain.read();
match snap.task_statuses.get("compute") {
Some(TaskStatus::Resolved(v)) if *v == 99 => {
println!(" compute: resolved ({v})");
break;
}
Some(_) | None => {}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
println!("\n=== Abort a running task ===");
pool.spawn("slow".to_string(), || async {
tokio::time::sleep(Duration::from_secs(10)).await;
0
});
tokio::time::sleep(Duration::from_millis(20)).await;
let snap = domain.read();
assert!(
snap.task_statuses
.get("slow")
.is_some_and(TaskStatus::is_pending)
);
println!(" slow: pending");
let was_aborted = pool.abort(&"slow".to_string());
assert!(was_aborted);
tokio::time::sleep(Duration::from_millis(5)).await;
let snap = domain.read();
assert!(
snap.task_statuses
.get("slow")
.is_some_and(TaskStatus::is_aborted)
);
println!(" slow: aborted");
let was_aborted = pool.abort(&"nonexistent".to_string());
assert!(!was_aborted);
println!(" nonexistent: abort returned false (expected)");
println!("\n=== Independent keys ===");
pool.spawn("alpha".to_string(), || async { 1 }); pool.spawn("beta".to_string(), || async { 2 });
tokio::time::sleep(Duration::from_millis(50)).await;
let snapshot = domain.read();
let alpha = snapshot.task_statuses.get("alpha").unwrap();
let beta = snapshot.task_statuses.get("beta").unwrap();
assert!(alpha.is_resolved() && *alpha.resolved().unwrap() == 1);
assert!(beta.is_resolved() && *beta.resolved().unwrap() == 2);
println!(" alpha: {alpha:?}");
println!(" beta: {beta:?}");
println!("\n=== Graceful shutdown ===");
pool.shutdown(Duration::from_secs(1)).await;
println!(" shutdown complete");
println!("\nAll examples passed!");
}