use dbuff::*;
use std::time::Duration;
type UserId = i32;
#[derive(Debug, Clone, PartialEq)]
struct User {
id: UserId,
name: String,
email: String,
}
#[derive(Debug, Clone, Default)]
struct AppData {
user_status: TaskStatus<User>,
post_count: usize,
posts_status: TaskStatus<usize>,
}
#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct AppError;
struct FetchUser(UserId);
#[async_trait::async_trait]
impl Command<()> for FetchUser {
type Output = User;
type Error = AppError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(User {
id: self.0,
name: format!("user_{}", self.0),
email: format!("user_{}@example.com", self.0),
})
}
}
struct FetchPosts(UserId);
#[async_trait::async_trait]
impl Command<()> for FetchPosts {
type Output = usize;
type Error = AppError;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(self.0.unsigned_abs() as usize * 10)
}
}
#[tokio::main]
async fn main() {
let rt = tokio::runtime::Handle::current();
let (domain, write_handle) =
SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
tokio::spawn(write_handle.run());
println!("user_status: {:?}", domain.read().user_status);
println!("posts_status: {:?}", domain.read().posts_status);
println!("---");
let handle = domain
.bind((), rt.clone())
.exec(FetchUser(42), |_, _| {})
.tracked(|d, status| d.user_status = status)
.then(|user| FetchPosts(user.id), |d, count| d.post_count = *count)
.tracked(|d, status| d.posts_status = status)
.go();
loop {
let state = domain.read();
match &state.user_status {
TaskStatus::Idle => {
println!("user_status: idle");
}
TaskStatus::Pending => {
println!("user_status: pending...");
}
TaskStatus::Resolved(user) => {
println!("user_status: resolved ({} — {})", user.id, user.email);
break;
}
TaskStatus::Error(e) => {
println!("user_status: error ({e})");
panic!("expected resolved, got error");
}
TaskStatus::Aborted => {
println!("user_status: aborted");
panic!("expected resolved, got aborted");
}
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
loop {
let state = domain.read();
match &state.posts_status {
TaskStatus::Idle => {
println!("posts_status: idle");
}
TaskStatus::Pending => {
println!("posts_status: pending...");
}
TaskStatus::Resolved(count) => {
println!("posts_status: resolved ({count})");
break;
}
TaskStatus::Error(e) => {
println!("posts_status: error ({e})");
panic!("expected resolved, got error");
}
TaskStatus::Aborted => {
println!("posts_status: aborted");
panic!("expected resolved, got aborted");
}
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Continue);
tokio::time::sleep(Duration::from_millis(10)).await;
let state = domain.read();
println!("---");
println!("user_status = {:?}", state.user_status);
println!("post_count = {}", state.post_count);
println!("posts_status = {:?}", state.posts_status);
let user = state.user_status.resolved().unwrap();
assert_eq!(user.id, 42);
assert_eq!(user.name, "user_42");
assert_eq!(user.email, "user_42@example.com");
assert_eq!(state.post_count, 420);
assert!(state.user_status.is_resolved());
assert!(state.posts_status.is_resolved());
assert_eq!(*state.posts_status.resolved().unwrap(), 420);
}