dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::collections::HashMap;
use std::time::Duration;

// --- Domain data: the shared state your app reads from ---
#[derive(Debug, Clone, Default)]
struct AppData {
    task_statuses: HashMap<String, TaskStatus<i32>>,
}

#[tokio::main]
async fn main() {
    // --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
    let rt = tokio::runtime::Handle::current();

    // --- SharedDomainData: dbuff's coalesced state wrapper ---
    //     Wraps AppData so reads/writes are batched at 500µs intervals
    let (domain, wh) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
    tokio::spawn(wh.run()); // starts the coalescing worker

    // --- Clone for the callback closure (moved into TaskPool) ---
    let domain_for_pool = domain.clone();

    // --- TaskPool<String, i32> ---
    //   K = String  → task identifier (e.g. "compute", "slow", "alpha")
    //   T = i32    → what the async closure returns (e.g. 42, 99)
    //
    // The callback fires on every status transition:
    //   TaskStatus::Pending          → task started
    //   TaskStatus::Resolved(42)     → task finished, returned 42
    //   TaskStatus::Error(...)       → task panicked/errored
    //   TaskStatus::Aborted          → task was cancelled
    let pool: TaskPool<String, i32> = TaskPool::new(rt, move |key: &String, status| {
        //   ^^^ key is a reference to the K you passed to spawn()
        //                     ^^^^^^ status is TaskStatus<T> = TaskStatus<i32>
        let key = key.clone();
        domain_for_pool.modify(move |d: &mut AppData| {
            // Store the status into shared state so the UI can read it
            d.task_statuses.insert(key, status);
        });
    });

    // --- 1. Spawn a task and observe Pending → Resolved ---
    println!("=== Spawn a task ===");
    pool.spawn("compute".to_string(), || async {
        tokio::time::sleep(Duration::from_millis(50)).await;
        42 // ← this becomes TaskStatus::Resolved(42) in the callback
    });

    // Poll until we see Pending (coalescing adds a small delay)
    loop {
        let snap = domain.read();
        match snap.task_statuses.get("compute") {
            Some(TaskStatus::Pending) => {
                println!("  compute: pending");
                break;
            }
            None | Some(TaskStatus::Idle) => {}
            other => panic!("unexpected status: {other:?}"),
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }

    // Poll until we see Resolved
    loop {
        let snap = domain.read();
        match snap.task_statuses.get("compute") {
            Some(TaskStatus::Resolved(v)) => {
                println!("  compute: resolved ({v})");
                break;
            }
            Some(TaskStatus::Pending) => {}
            other => panic!("unexpected status: {other:?}"),
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }

    // --- 2. Re-spawn same key (auto-aborts previous) ---
    println!("\n=== Re-spawn same key ===");
    pool.spawn("compute".to_string(), || async {
        tokio::time::sleep(Duration::from_millis(50)).await;
        99 // ← new result, old task was cancelled
    });

    loop {
        let snap = domain.read();
        match snap.task_statuses.get("compute") {
            Some(TaskStatus::Resolved(v)) if *v == 99 => {
                println!("  compute: resolved ({v})");
                break;
            }
            Some(_) | None => {}
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }

    // --- 3. Abort a running task ---
    println!("\n=== Abort a running task ===");
    pool.spawn("slow".to_string(), || async {
        tokio::time::sleep(Duration::from_secs(10)).await;
        0
    });
    // Wait for it to start
    tokio::time::sleep(Duration::from_millis(20)).await;
    let snap = domain.read();
    assert!(
        snap.task_statuses
            .get("slow")
            .is_some_and(TaskStatus::is_pending)
    );
    println!("  slow: pending");

    let was_aborted = pool.abort(&"slow".to_string());
    assert!(was_aborted);
    tokio::time::sleep(Duration::from_millis(5)).await;
    let snap = domain.read();
    assert!(
        snap.task_statuses
            .get("slow")
            .is_some_and(TaskStatus::is_aborted)
    );
    println!("  slow: aborted");

    // Abort unknown key returns false
    let was_aborted = pool.abort(&"nonexistent".to_string());
    assert!(!was_aborted);
    println!("  nonexistent: abort returned false (expected)");

    // --- 4. Different keys run independently ---
    println!("\n=== Independent keys ===");
    pool.spawn("alpha".to_string(), || async { 1 }); // key="alpha", returns 1
    pool.spawn("beta".to_string(), || async { 2 }); // key="beta",  returns 2

    tokio::time::sleep(Duration::from_millis(50)).await;

    let snapshot = domain.read();
    let alpha = snapshot.task_statuses.get("alpha").unwrap();
    let beta = snapshot.task_statuses.get("beta").unwrap();
    assert!(alpha.is_resolved() && *alpha.resolved().unwrap() == 1);
    assert!(beta.is_resolved() && *beta.resolved().unwrap() == 2);
    println!("  alpha: {alpha:?}");
    println!("  beta: {beta:?}");

    // --- 5. Graceful shutdown ---
    println!("\n=== Graceful shutdown ===");
    pool.shutdown(Duration::from_secs(1)).await;
    println!("  shutdown complete");

    println!("\nAll examples passed!");
}