dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

// --- Domain data: the shared state your app reads from ---
// We use TaskStatus<()> because these are fire-and-forget tasks —
// we care about whether they finished or got cancelled, not about a return value.
#[derive(Debug, Clone, Default)]
struct AppData {
    task_statuses: HashMap<String, TaskStatus<()>>,
}

#[tokio::main]
async fn main() {
    // --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
    let rt = tokio::runtime::Handle::current();

    // --- SharedDomainData: dbuff's coalesced state wrapper ---
    //     Wraps AppData so reads/writes are batched at 500µs intervals
    let (domain, wh) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
    tokio::spawn(wh.run()); // starts the coalescing worker

    // --- Shared cancellation flag ---
    // Cooperative tasks check this flag in their loop and exit cleanly
    // when it becomes true. The stubborn task ignores it.
    let cancelled = Arc::new(AtomicBool::new(false));

    // --- Clone for the callback closure (moved into TaskPool) ---
    let domain_for_pool = domain.clone();

    // --- TaskPool<String, ()> ---
    //   K = String  → task identifier (e.g. "cooperative-0", "stubborn")
    //   T = ()      → no return value (fire-and-forget tasks)
    //
    // The callback fires on every status transition:
    //   TaskStatus::Pending          → task started
    //   TaskStatus::Resolved(())     → task finished
    //   TaskStatus::Aborted          → task was cancelled (by abort() or re-spawn)
    //   TaskStatus::Error(...)       → task panicked
    let pool: TaskPool<String, ()> = TaskPool::new(rt, move |key: &String, status| {
        //   ^^^ key is a reference to the K you passed to spawn()
        //                     ^^^^^^ status is TaskStatus<T> = TaskStatus<()>
        let key = key.clone();
        domain_for_pool.modify(move |d: &mut AppData| {
            d.task_statuses.insert(key, status);
        });
    });

    // --- Spawn 3 cooperative tasks ---
    // Each one loops and checks the `cancelled` flag.
    // When the flag flips to true, they exit cleanly and the callback
    // fires TaskStatus::Resolved(()).
    for i in 0..3 {
        let cancelled = cancelled.clone(); // each task gets its own Arc reference
        pool.spawn(format!("cooperative-{i}"), move || {
            async move {
                loop {
                    // --- Cooperative cancellation check ---
                    // This is the pattern for long-running tasks:
                    // periodically check a shared flag and exit early.
                    if cancelled.load(Ordering::Relaxed) {
                        println!("  cooperative-{i}: cancellation observed, exiting cleanly");
                        return; // ← returns (), which becomes TaskStatus::Resolved(())
                    }
                    tokio::time::sleep(Duration::from_millis(10)).await;
                }
            }
        });
    }

    // --- Spawn 1 stubborn task ---
    // This task ignores the cancellation flag and sleeps for 10 seconds.
    // When shutdown() times out, tokio force-aborts it via JoinHandle::abort().
    pool.spawn("stubborn".to_string(), || {
        async move {
            tokio::time::sleep(Duration::from_secs(10)).await;
            // This line should never be reached — shutdown will abort us first
            println!("  stubborn: completed (unexpected!)");
        }
    });

    // --- Wait for tasks to start ---
    tokio::time::sleep(Duration::from_millis(50)).await;
    println!("  3 cooperative + 1 stubborn task running");

    // --- Trigger cooperative cancellation ---
    println!("\n=== Setting cancellation flag ===");
    cancelled.store(true, Ordering::Relaxed);
    println!("  flag set to true — cooperative tasks will exit on next loop iteration");

    // --- Graceful shutdown ---
    // Waits for all tasks to finish. Tasks that don't finish within the
    // timeout are force-aborted via tokio's JoinHandle::abort().
    println!("\n=== Calling shutdown(200ms) ===");
    let shutdown_start = std::time::Instant::now();
    pool.shutdown(Duration::from_millis(200)).await;
    let elapsed = shutdown_start.elapsed();
    println!("  shutdown completed in {}ms", elapsed.as_millis());

    // --- Verify results via shared state ---
    println!("\n=== Checking results ===");
    let snap = domain.read();

    // All 3 cooperative tasks should have resolved (they exited cleanly)
    for i in 0..3 {
        let key = format!("cooperative-{i}");
        let status = snap.task_statuses.get(&key);
        match status {
            Some(TaskStatus::Resolved(())) => println!("  {key}: resolved ✓"),
            other => panic!("  {key}: expected Resolved(()), got {other:?}"),
        }
    }

    // The stubborn task should have been aborted (it was still sleeping
    // when the 200ms shutdown timeout expired)
    match snap.task_statuses.get("stubborn") {
        Some(TaskStatus::Pending) => println!("  stubborn: still pending (force-aborted before callback) ✓"),
        Some(TaskStatus::Aborted) => println!("  stubborn: aborted ✓"),
        other => panic!("  stubborn: expected Pending or Aborted, got {other:?}"),
    }

    // Shutdown should be fast — cooperative tasks exit within ~10ms
    assert!(
        elapsed.as_millis() < 500,
        "shutdown should be fast (cooperative tasks exit quickly)"
    );

    println!("\nAll examples passed!");
}