dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::collections::HashMap;
use std::time::{Duration, Instant};

// --- Domain data: the shared state your app reads from ---
// We use TaskStatus<usize> so Resolved(i) tells us which mutation "won".
// In a real app this might be TaskStatus<SaveResult> or similar.
#[derive(Debug, Clone, Default)]
struct AppData {
    task_statuses: HashMap<String, TaskStatus<usize>>,
}

#[tokio::main]
async fn main() {
    // --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
    let rt = tokio::runtime::Handle::current();

    // --- SharedDomainData: dbuff's coalesced state wrapper ---
    //     Wraps AppData so reads/writes are batched at 500µs intervals
    let (domain, wh) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
    tokio::spawn(wh.run()); // starts the coalescing worker

    // --- Clone for the callback closure (moved into TaskPool) ---
    let domain_for_pool = domain.clone();

    // --- TaskPool<String, usize> ---
    //   K = String  → task identifier ("save" in this case)
    //   T = usize   → which mutation index completed (0, 1, 2, 3, or 4)
    //
    // The key insight: we always spawn under the SAME key "save".
    // Each new spawn auto-aborts the previous task, so only the last
    // one gets to finish. This is the debounce pattern.
    let pool: TaskPool<String, usize> = TaskPool::new(rt, move |key: &String, status| {
        //   ^^^ key is a reference to the K you passed to spawn()
        //                     ^^^^^^ status is TaskStatus<T> = TaskStatus<usize>
        let key = key.clone();
        domain_for_pool.modify(move |d: &mut AppData| {
            d.task_statuses.insert(key, status);
        });
    });

    // --- Simulate 5 rapid user edits, 50ms apart ---
    // Each "save" takes 200ms to complete. Since we fire every 50ms,
    // each new spawn aborts the previous one before it can finish.
    // Only the LAST spawn (mutation 4) has enough time to complete.
    println!("=== Firing 5 rapid mutations ===");
    let start = Instant::now();

    for i in 0..5 {
        pool.spawn("save".to_string(), move || {
            let mutation_id = i;
            async move {
                tokio::time::sleep(Duration::from_millis(200)).await;
                mutation_id // ← this becomes TaskStatus::Resolved(mutation_id)
            }
        });
        println!("  mutation {i} fired at {}ms", start.elapsed().as_millis());
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    // --- Wait for the last mutation to resolve ---
    // Mutation 4 fires at ~200ms, finishes at ~400ms.
    // Mutations 0-3 were all aborted before they could complete.
    println!("\n=== Waiting for result ===");
    loop {
        let snap = domain.read();
        match snap.task_statuses.get("save") {
            Some(TaskStatus::Resolved(mutation_id)) => {
                let elapsed = start.elapsed();
                println!("  resolved: mutation {mutation_id} at {}ms", elapsed.as_millis());
                assert_eq!(*mutation_id, 4, "only the last mutation should resolve");
                break;
            }
            Some(TaskStatus::Pending) => {
                // Still running — the last spawn hasn't finished yet
                tokio::time::sleep(Duration::from_millis(10)).await;
            }
            other => {
                panic!("unexpected status: {other:?}");
            }
        }
    }

    println!("\nAll examples passed!");
}