use dbuff::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Default)]
struct AppData {
user_id: i32,
posts: Vec<String>,
comments: Vec<String>,
tags: Vec<String>,
log: Vec<String>,
error_log: Vec<String>,
}
#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct CmdError;
struct FetchUser(i32);
#[async_trait::async_trait]
impl Command<()> for FetchUser {
type Output = i32;
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Ok(self.0)
}
}
struct FetchPosts(i32);
#[async_trait::async_trait]
impl Command<()> for FetchPosts {
type Output = Vec<String>;
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Ok((0..self.0).map(|i| format!("post_{i}")).collect())
}
}
struct FetchComments(i32);
#[async_trait::async_trait]
impl Command<()> for FetchComments {
type Output = Vec<String>;
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Ok((0..self.0).map(|i| format!("comment_{i}")).collect())
}
}
struct FetchTags(i32);
#[async_trait::async_trait]
impl Command<()> for FetchTags {
type Output = Vec<String>;
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Ok((0..self.0).map(|i| format!("tag_{i}")).collect())
}
}
struct Log(String);
#[async_trait::async_trait]
impl Command<()> for Log {
type Output = String;
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Ok(self.0)
}
}
struct Fail;
#[async_trait::async_trait]
impl Command<()> for Fail {
type Output = ();
type Error = CmdError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
Err(CmdError)
}
}
#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() {
let rt = tokio::runtime::Handle::current();
let (domain, write_handle) =
SharedDomainData::with_coalesce(AppData::default(), Duration::from_millis(1));
tokio::spawn(write_handle.run());
println!("=== Pattern 1: Basic parallel (2 commands) ===");
let handle = domain
.bind((), rt.clone())
.exec_parallel_2(
(FetchPosts(3), |d, p: &Vec<String>| d.posts.clone_from(p)),
(FetchComments(5), |d, c: &Vec<String>| d.comments.clone_from(c)),
)
.go();
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Continue);
tokio::time::sleep(Duration::from_millis(10)).await;
let g = domain.read();
assert_eq!(g.posts, vec!["post_0", "post_1", "post_2"]);
assert_eq!(g.comments.len(), 5);
println!(" posts = {:?}, comments.len() = {}", g.posts, g.comments.len());
println!("=== Pattern 2: Parallel (3 commands) ===");
domain.modify(|d| {
d.posts.clear();
d.comments.clear();
d.tags.clear();
});
let handle = domain
.bind((), rt.clone())
.exec_parallel_3(
(FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
(FetchComments(3), |d, c: &Vec<String>| d.comments.clone_from(c)),
(FetchTags(4), |d, t: &Vec<String>| d.tags.clone_from(t)),
)
.go();
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Continue);
tokio::time::sleep(Duration::from_millis(10)).await;
let g = domain.read();
assert_eq!(g.posts.len(), 2);
assert_eq!(g.comments.len(), 3);
assert_eq!(g.tags.len(), 4);
println!(" posts={}, comments={}, tags={}", g.posts.len(), g.comments.len(), g.tags.len());
println!("=== Pattern 3: Sequential -> Parallel -> Sequential ===");
domain.modify(|d| {
d.user_id = 0;
d.posts.clear();
d.comments.clear();
d.log.clear();
});
let handle = domain
.bind((), rt.clone())
.exec(FetchUser(42), |d, id: &i32| d.user_id = *id)
.exec_parallel_2(
(FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
(FetchComments(3), |d, c: &Vec<String>| d.comments.clone_from(c)),
)
.exec(Log("all done".into()), |d, m: &String| d.log.push(m.clone()))
.go();
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Continue);
tokio::time::sleep(Duration::from_millis(10)).await;
let g = domain.read();
assert_eq!(g.user_id, 42);
assert_eq!(g.posts.len(), 2);
assert_eq!(g.comments.len(), 3);
assert_eq!(g.log, vec!["all done"]);
println!(
" user_id={}, posts={}, comments={}, log={:?}",
g.user_id, g.posts.len(), g.comments.len(), g.log
);
println!("=== Pattern 4: Parallel with error (fail-fast) ===");
domain.modify(|d| {
d.posts.clear();
d.comments.clear();
d.error_log.clear();
d.log.clear();
});
let error_called = Arc::new(AtomicBool::new(false));
let flag = error_called.clone();
let handle = domain
.bind((), rt.clone())
.on_error(move |err, d| {
flag.store(true, Ordering::SeqCst);
d.error_log.push(format!("{err}"));
})
.exec_parallel_2(
(FetchPosts(2), |d, p: &Vec<String>| d.posts.clone_from(p)),
(Fail, |_d, (): &()| {}),
)
.exec(Log("should not run".into()), |d, m: &String| d.log.push(m.clone()))
.go();
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Break);
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(error_called.load(Ordering::SeqCst));
let g = domain.read();
assert_eq!(g.log.len(), 0);
println!(" flow=Break, error_called=true, log={:?} (empty = correct)", g.log);
println!("=== Pattern 5: exec_parallel_discard ===");
domain.modify(|d| d.log.clear());
let handle = domain
.bind((), rt.clone())
.exec_parallel_discard_2(
Log("side effect 1".into()),
Log("side effect 2".into()),
)
.exec(Log("after discard".into()), |d, m: &String| d.log.push(m.clone()))
.go();
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Continue);
tokio::time::sleep(Duration::from_millis(10)).await;
let g = domain.read();
assert_eq!(g.log, vec!["after discard"]);
println!(" flow=Continue, log={:?} (discard commands have no setter)", g.log);
println!("\n=== Summary ===");
println!("Pattern 1: Two commands run in parallel, both update domain");
println!("Pattern 2: Three commands run in parallel");
println!("Pattern 3: Sequential -> Parallel -> Sequential chain");
println!("Pattern 4: Parallel with error short-circuits remaining chain");
println!("Pattern 5: exec_parallel_discard for fire-and-forget parallel work");
}