dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone, Default)]
struct AppData {
    user_id: i32,
    posts: Vec<String>,
    comments: Vec<String>,
    tags: Vec<String>,
    log: Vec<String>,
    error_log: Vec<String>,
}

#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct CmdError;

struct FetchUser(i32);

#[async_trait::async_trait]
impl Command<()> for FetchUser {
    type Output = i32;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok(self.0)
    }
}

struct FetchPosts(i32);

#[async_trait::async_trait]
impl Command<()> for FetchPosts {
    type Output = Vec<String>;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok((0..self.0).map(|i| format!("post_{i}")).collect())
    }
}

struct FetchComments(i32);

#[async_trait::async_trait]
impl Command<()> for FetchComments {
    type Output = Vec<String>;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok((0..self.0).map(|i| format!("comment_{i}")).collect())
    }
}

struct FetchTags(i32);

#[async_trait::async_trait]
impl Command<()> for FetchTags {
    type Output = Vec<String>;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok((0..self.0).map(|i| format!("tag_{i}")).collect())
    }
}

struct Log(String);

#[async_trait::async_trait]
impl Command<()> for Log {
    type Output = String;
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Ok(self.0)
    }
}

struct Fail;

#[async_trait::async_trait]
impl Command<()> for Fail {
    type Output = ();
    type Error = CmdError;

    async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
        Err(CmdError)
    }
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() {
    let rt = tokio::runtime::Handle::current();

    let (domain, write_handle) =
        SharedDomainData::with_coalesce(AppData::default(), Duration::from_millis(1));
    tokio::spawn(write_handle.run());

    // -- Pattern 1: Basic parallel (2 commands) --
    println!("=== Pattern 1: Basic parallel (2 commands) ===");
    let handle = domain
        .bind((), rt.clone())
        .exec_parallel_2(
            (FetchPosts(3), |d, p: &Vec<String>| d.posts.clone_from(p)),
            (FetchComments(5), |d, c: &Vec<String>| d.comments.clone_from(c)),
        )
        .go();

    let flow = handle.await.unwrap();
    assert_eq!(flow, ControlFlow::Continue);
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.posts, vec!["post_0", "post_1", "post_2"]);
    assert_eq!(g.comments.len(), 5);
    println!("  posts = {:?}, comments.len() = {}", g.posts, g.comments.len());

    // -- Pattern 2: Parallel (3 commands) --
    println!("=== Pattern 2: Parallel (3 commands) ===");
    domain.modify(|d| {
        d.posts.clear();
        d.comments.clear();
        d.tags.clear();
    });
    let handle = domain
        .bind((), rt.clone())
        .exec_parallel_3(
            (FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
            (FetchComments(3), |d, c: &Vec<String>| d.comments.clone_from(c)),
            (FetchTags(4), |d, t: &Vec<String>| d.tags.clone_from(t)),
        )
        .go();

    let flow = handle.await.unwrap();
    assert_eq!(flow, ControlFlow::Continue);
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.posts.len(), 2);
    assert_eq!(g.comments.len(), 3);
    assert_eq!(g.tags.len(), 4);
    println!("  posts={}, comments={}, tags={}", g.posts.len(), g.comments.len(), g.tags.len());

    // -- Pattern 3: Sequential -> Parallel -> Sequential --
    println!("=== Pattern 3: Sequential -> Parallel -> Sequential ===");
    domain.modify(|d| {
        d.user_id = 0;
        d.posts.clear();
        d.comments.clear();
        d.log.clear();
    });
    let handle = domain
        .bind((), rt.clone())
        .exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
        .exec_parallel_2(
            (FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
            (FetchComments(3), |d, c: &Vec<String>| d.comments.clone_from(c)),
        )
        .exec(Log("all done".into()), |d, m: &String| d.log.push(m.clone()))
        .go();

    let flow = handle.await.unwrap();
    assert_eq!(flow, ControlFlow::Continue);
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.user_id, 42);
    assert_eq!(g.posts.len(), 2);
    assert_eq!(g.comments.len(), 3);
    assert_eq!(g.log, vec!["all done"]);
    println!(
        "  user_id={}, posts={}, comments={}, log={:?}",
        g.user_id, g.posts.len(), g.comments.len(), g.log
    );

    // -- Pattern 4: Parallel with error (fail-fast) --
    println!("=== Pattern 4: Parallel with error (fail-fast) ===");
    domain.modify(|d| {
        d.posts.clear();
        d.comments.clear();
        d.error_log.clear();
        d.log.clear();
    });
    let error_called = Arc::new(AtomicBool::new(false));
    let flag = error_called.clone();
    let handle = domain
        .bind((), rt.clone())
        .on_error(move |err, d| {
            flag.store(true, Ordering::SeqCst);
            d.error_log.push(format!("{err}"));
        })
        .exec_parallel_2(
            (FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
            (Fail, |_d, (): &()| {}),
        )
        .exec(Log("should not run".into()), |d, m: &String| d.log.push(m.clone()))
        .go();

    let flow = handle.await.unwrap();
    assert_eq!(flow, ControlFlow::Break);
    tokio::time::sleep(Duration::from_millis(10)).await;
    assert!(error_called.load(Ordering::SeqCst));
    let g = domain.read();
    assert_eq!(g.log.len(), 0);
    println!("  flow=Break, error_called=true, log={:?} (empty = correct)", g.log);

    // -- Pattern 5: exec_parallel_discard --
    println!("=== Pattern 5: exec_parallel_discard ===");
    domain.modify(|d| d.log.clear());
    let handle = domain
        .bind((), rt.clone())
        .exec_parallel_discard_2(
            Log("side effect 1".into()),
            Log("side effect 2".into()),
        )
        .exec(Log("after discard".into()), |d, m: &String| d.log.push(m.clone()))
        .go();

    let flow = handle.await.unwrap();
    assert_eq!(flow, ControlFlow::Continue);
    tokio::time::sleep(Duration::from_millis(10)).await;
    let g = domain.read();
    assert_eq!(g.log, vec!["after discard"]);
    println!("  flow=Continue, log={:?} (discard commands have no setter)", g.log);

    println!("\n=== Summary ===");
    println!("Pattern 1: Two commands run in parallel, both update domain");
    println!("Pattern 2: Three commands run in parallel");
    println!("Pattern 3: Sequential -> Parallel -> Sequential chain");
    println!("Pattern 4: Parallel with error short-circuits remaining chain");
    println!("Pattern 5: exec_parallel_discard for fire-and-forget parallel work");
}