dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::time::Duration;

#[derive(Debug, Clone, Default)]
struct AppData {
    task_status: TaskStatus<String>,
}

#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct AppError;

struct SlowFetch(String);

#[async_trait::async_trait]
impl Command<()> for SlowFetch {
    type Output = String;
    type Error = AppError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        // Long sleep so we can abort mid-execution
        tokio::time::sleep(Duration::from_millis(500)).await;
        Ok(self.0)
    }
}

#[tokio::main]
async fn main() {
    let rt = tokio::runtime::Handle::current();
    let (domain, write_handle) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
    tokio::spawn(write_handle.run());

    // Show initial Idle state
    println!("task_status: {:?}", domain.read().task_status);
    println!("---");

    // Spawn a long-running task with tracking.
    // No separate result field — TaskStatus<String> holds the value.
    let handle = domain
        .bind((), rt.clone())
        .exec(SlowFetch("hello from slow task".into()), |_, _: &String| {})
        .tracked(|d: &mut AppData, status: TaskStatus<String>| d.task_status = status)
        .go();

    // Poll until we see Pending, then abort
    loop {
        let state = domain.read();
        match &state.task_status {
            TaskStatus::Idle => {
                println!("task_status: idle");
            }
            TaskStatus::Pending => {
                println!("task_status: pending — aborting now");
                handle.abort();
                // Manually set status to Aborted since .go() returns JoinHandle
                domain.modify(|d| d.task_status = TaskStatus::Aborted);
                break;
            }
            TaskStatus::Resolved(v) => {
                println!("task_status: resolved ({v:?}) — too fast, couldn't abort");
                panic!("task resolved before we could abort");
            }
            TaskStatus::Error(e) => {
                println!("task_status: error ({e})");
                panic!("unexpected error");
            }
            TaskStatus::Aborted => {
                println!("task_status: aborted (unexpectedly early)");
                panic!("aborted before we triggered it");
            }
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }

    // Poll to confirm Aborted sticks
    loop {
        let state = domain.read();
        match &state.task_status {
            TaskStatus::Aborted => {
                println!("task_status: aborted ✓");
                break;
            }
            other => {
                // The task might resolve between abort and our modify call
                println!("task_status: {other:?} (waiting for abort)");
            }
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }

    tokio::time::sleep(Duration::from_millis(10)).await;

    // Verify final state
    let state = domain.read();
    println!("---");
    println!("task_status = {:?}", state.task_status);

    assert!(state.task_status.is_aborted());
}