dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::time::Duration;

// # Chain Composition: exec vs then
//
// All commands in a chain execute **sequentially** — each command completes
// fully before the next one begins. The entire chain is composed into a single
// async future and spawned as one task by `.go()`.
//
// ## exec vs then
//
// - `.exec(cmd, setter)` — runs `cmd` **independently**. The previous command's
//   output is dropped. Use this when the new command doesn't need data from
//   the previous step.
//
// - `.then(factory, setter)` — runs a command **whose input depends on the
//   previous output**. The `factory` closure receives `&T` (the previous
//   command's result) and returns the next command. Use this to build
//   multi-step pipelines where each step feeds the next.
//
// Both `.exec()` and `.then()` produce a `DomainChain` that can be further
// extended with more `.exec()`, `.then()`, or `.exec_discard()` calls.
// Finish the chain with `.go()` to spawn execution.

#[derive(Debug, Clone, Default)]
struct AppData {
    user_id: i32,
    post_count: usize,
    comment_count: usize,
    log: Vec<String>,
}

#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct CmdError;

struct FetchUser(i32);

#[async_trait::async_trait]
impl Command<()> for FetchUser {
    type Output = i32;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok(self.0)
    }
}

struct FetchPosts(i32);

#[async_trait::async_trait]
impl Command<()> for FetchPosts {
    type Output = usize;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        #[allow(clippy::cast_sign_loss)]
        Ok(self.0 as usize * 10)
    }
}

struct FetchComments(usize);

#[async_trait::async_trait]
impl Command<()> for FetchComments {
    type Output = usize;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok(self.0 * 5)
    }
}

struct Log(String);

#[async_trait::async_trait]
impl Command<()> for Log {
    type Output = String;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok(self.0)
    }
}

#[tokio::main]
async fn main() {
    let rt = tokio::runtime::Handle::current();


    let (domain, write_handle) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_millis(1));
    tokio::spawn(write_handle.run());

    // ── Pattern 1: exec → exec (no data flow) ──
    // Both commands run **sequentially** and independently.
    // FetchUser completes first, then Log runs.
    // FetchUser's output is NOT passed to Log — exec drops the previous output.
    println!("=== Pattern 1: exec → exec (no data flow) ===");
    let handle = domain
        .bind((), rt.clone())
        .exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
        .exec(Log("user fetched".into()), |d, msg: &String| {
            d.log.push(msg.clone());
        })
        .go();
    handle.await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.user_id, 42);
    assert_eq!(g.log, vec!["user fetched"]);
    println!("  user_id = {}, log = {:?}", g.user_id, g.log);

    // ── Pattern 2: exec → then (data flows once) ──
    // FetchUser runs first, completes, then FetchPosts runs sequentially.
    // The factory receives &i32 (the user_id from FetchUser) and returns FetchPosts.
    println!("=== Pattern 2: exec → then (data flows once) ===");
    domain.modify(|d| {
        d.log.clear();
        d.post_count = 0;
    });
    let handle = domain
        .bind((), rt.clone())
        .exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
        .then(
            |user_id: &i32| FetchPosts(*user_id),
            |d, count: &usize| d.post_count = *count,
        )
        .go();
    handle.await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.user_id, 42);
    assert_eq!(g.post_count, 420);
    println!(
        "  user_id = {}, post_count = {}",
        g.user_id, g.post_count
    );

    // ── Pattern 3: then → then (multi-step pipeline) ──
    // Three commands run sequentially, each feeding the next:
    //   FetchUser → FetchPosts → FetchComments
    // user_id (i32) → post_count (usize) → comment_count (usize)
    println!("=== Pattern 3: then → then (multi-step pipeline) ===");
    domain.modify(|d| {
        d.post_count = 0;
        d.comment_count = 0;
    });
    let handle = domain
        .bind((), rt.clone())
        .exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
        .then(
            |user_id: &i32| FetchPosts(*user_id),
            |d, count: &usize| d.post_count = *count,
        )
        .then(
            |post_count: &usize| FetchComments(*post_count),
            |d, count: &usize| d.comment_count = *count,
        )
        .go();
    handle.await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.user_id, 42);
    assert_eq!(g.post_count, 420);
    assert_eq!(g.comment_count, 2100);
    println!(
        "  user_id = {}, post_count = {}, comment_count = {}",
        g.user_id, g.post_count, g.comment_count
    );

    // ── Pattern 4: then → exec (data flow breaks) ──
    // FetchUser and FetchPosts run sequentially with data flowing between them.
    // Then Log runs sequentially but **independently** — exec drops the post_count,
    // so Log does not receive it. The data flow breaks here.
    println!("=== Pattern 4: then → exec (data flow breaks) ===");
    domain.modify(|d| {
        d.log.clear();
        d.post_count = 0;
    });
    let handle = domain
        .bind((), rt.clone())
        .exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
        .then(
            |user_id: &i32| FetchPosts(*user_id),
            |d, count: &usize| d.post_count = *count,
        )
        .exec(Log("posts loaded".into()), |d, msg: &String| {
            d.log.push(msg.clone());
        })
        .go();
    handle.await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.user_id, 42);
    assert_eq!(g.post_count, 420);
    assert_eq!(g.log, vec!["posts loaded"]);
    println!(
        "  user_id = {}, post_count = {}, log = {:?}",
        g.user_id, g.post_count, g.log
    );

    println!("\n=== Summary ===");
    println!("All chains execute sequentially: each command completes before the next.");
    println!();
    println!("Pattern 1 (exec→exec):   both run sequentially, no data shared");
    println!("Pattern 2 (exec→then):   sequential, FetchUser output feeds FetchPosts");
    println!("Pattern 3 (then→then):   sequential pipeline: user_id → post_count → comment_count");
    println!("Pattern 4 (then→exec):   sequential, data flow breaks at exec(Log)");
}