dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use kanal::unbounded;
use std::time::Duration;

// --- User: a domain type returned by tasks ---
// Shows that T can be any Send + 'static type, not just primitives.
// In a real app this might come from your database or an API.
#[derive(Debug)]
struct User {
    name: String,
    age: u32,
    role: String,
}

// --- TaskEvent: a typed message sent through the channel ---
// The on_status_change callback wraps each (key, status) pair into
// this struct before sending it. This gives consumers a clean API
// instead of working with raw tuples.
struct TaskEvent {
    key: String,
    status: TaskStatus<User>,
}

#[tokio::main]
async fn main() {
    // --- Tokio runtime handle: lets the pool spawn tasks on this runtime ---
    let rt = tokio::runtime::Handle::current();

    // --- Channel: callback pushes events here, main loop reads from here ---
    // kanal::unbounded() returns a sync sender and sync receiver.
    // The sync sender is perfect for use inside the TaskPool callback
    // (which is a sync Fn closure). The sync receiver can be converted
    // to an async receiver for use in an async event loop.
    let (tx, rx) = unbounded();

    // --- TaskPool<String, User> ---
    //   K = String  → task identifier (e.g. "fetch-user", "fetch-config")
    //   T = User    → task result (a structured domain type)
    //
    // Instead of writing into SharedDomainData, the callback sends a
    // typed event through the channel. This decouples the producer
    // (TaskPool) from the consumer (your event loop).
    //
    // Why use channels instead of SharedDomainData?
    //   - You want to process events in an async loop (e.g. forward to a websocket)
    //   - TaskPool lives in a different module than the consumer
    //   - You need to fan-out events to multiple listeners
    //   - You prefer push-based (channel) over pull-based (poll domain.read())
    let pool: TaskPool<String, User> = TaskPool::new(rt, move |key: &String, status| {
        //   ^^^ key is a reference to the K you passed to spawn()
        //                     ^^^^^^ status is TaskStatus<T> = TaskStatus<User>
        //
        // When the tasks complete, this closure gets called and you can send the results somewhere
        // else via this channel.
        let _ = tx.send(TaskEvent {
            key: key.clone(),
            status,
        });
        // Note: we use kanal's sync sender (tx) inside the callback.
        // The sync send is non-blocking for unbounded channels.
        // We ignore send errors — if the receiver is dropped, events are lost.
    });

    // --- Spawn tasks under different keys ---
    // Each task simulates fetching a user and returning a User struct.
    println!("=== Spawning tasks ===");

    pool.spawn("fetch-user".to_string(), || async {
        tokio::time::sleep(Duration::from_millis(50)).await;
        User {
            name: "Alice".into(),
            age: 30,
            role: "admin".into(),
        }
    });

    pool.spawn("fetch-user-2".to_string(), || async {
        tokio::time::sleep(Duration::from_millis(30)).await;
        User {
            name: "Bob".into(),
            age: 25,
            role: "editor".into(),
        }
    });

    pool.spawn("fetch-user-3".to_string(), || async {
        tokio::time::sleep(Duration::from_millis(70)).await;
        User {
            name: "Carol".into(),
            age: 35,
            role: "viewer".into(),
        }
    });

    // --- Spawn a slow task that we'll abort ---
    pool.spawn("slow-query".to_string(), || async {
        tokio::time::sleep(Duration::from_secs(10)).await;
        User {
            name: "Nobody".into(),
            age: 0,
            role: "never".into(),
        }
    });

    // --- Abort the slow task immediately ---
    // We abort BEFORE processing events so the event loop sees:
    //   4 × Pending  (all four spawns)
    //   3 × Resolved (the three fast fetches)
    //   1 × Aborted  (slow-query, cancelled by abort())
    println!("\n=== Aborting slow-query ===");
    let was_running = pool.abort(&"slow-query".to_string());
    println!("  abort returned: {was_running} (true = task was found and cancelled)");

    // --- Event loop: receive and process events from the channel ---
    // Convert the sync receiver to an async receiver for use with .await.
    // Each task produces exactly 2 events: Pending (on spawn) then
    // Resolved/Aborted/Error (on completion).
    println!("\n=== Event loop ===");
    let mut event_count = 0;
    let mut resolved_count = 0;
    let mut aborted_count = 0;

    // We expect 8 events total:
    //   4 × Pending  (one per spawn)
    //   3 × Resolved (fetch-user, fetch-user-2, fetch-user-3)
    //   1 × Aborted  (slow-query — we aborted it above)
    let async_rx = rx.to_async();
    while event_count < 8 {
        match async_rx.recv().await {
            Ok(event) => {
                event_count += 1;
                match event.status {
                    TaskStatus::Idle => {
                        println!("  [{}] {}: idle", event_count, event.key);
                    }
                    TaskStatus::Pending => {
                        println!("  [{}] {}: started...", event_count, event.key);
                    }
                    TaskStatus::Resolved(ref user) => {
                        resolved_count += 1;
                        println!(
                            "  [{}] {}: done → {} (age {}, role={})",
                            event_count, event.key, user.name, user.age, user.role
                        );
                    }
                    TaskStatus::Aborted => {
                        aborted_count += 1;
                        println!("  [{}] {}: cancelled", event_count, event.key);
                    }
                    TaskStatus::Error(ref e) => {
                        println!("  [{}] {}: failed → {}", event_count, event.key, e);
                    }
                }
            }
            Err(_) => {
                println!("  channel closed");
                break;
            }
        }
    }

    // --- Verify results ---
    println!("\n=== Summary ===");
    println!("  total events: {event_count}");
    println!("  resolved tasks: {resolved_count}");
    println!("  aborted tasks: {aborted_count}");
    assert_eq!(resolved_count, 3, "exactly 3 tasks should have resolved");
    assert_eq!(aborted_count, 1, "exactly 1 task should have been aborted");
    assert_eq!(
        event_count, 8,
        "should have received all 8 events (4 pending + 3 resolved + 1 aborted)"
    );

    // --- Graceful shutdown ---
    println!("\n=== Shutting down ===");
    pool.shutdown(Duration::from_secs(1)).await;
    println!("  shutdown complete");

    println!("\nAll examples passed!");
}