use dbuff::*;
use error_stack::Report;
use std::time::Duration;
#[derive(Debug, Clone, Default)]
struct AppData {
step1_status: TaskStatus<i32>,
step2_status: TaskStatus<()>,
error_log: Vec<String>,
}
#[derive(Debug, Clone, wherror::Error)]
#[error("step failed: {reason}")]
struct FailError {
reason: String,
}
struct Add(i32);
#[derive(Debug, Clone, wherror::Error)]
#[error(debug)]
struct AddError;
#[async_trait::async_trait]
impl Command<()> for Add {
type Output = i32;
type Error = Report<AddError>;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(self.0)
}
}
struct Fail;
#[async_trait::async_trait]
impl Command<()> for Fail {
type Output = ();
type Error = Report<FailError>;
async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> {
tokio::time::sleep(Duration::from_millis(50)).await;
Err(Report::from(FailError {
reason: "something went wrong".into(),
}))
}
}
#[tokio::main]
async fn main() {
let rt = tokio::runtime::Handle::current();
let (domain, write_handle) =
SharedDomainData::with_coalesce(AppData::default(), Duration::from_micros(500));
tokio::spawn(write_handle.run());
println!("step1_status: {:?}", domain.read().step1_status);
println!("step2_status: {:?}", domain.read().step2_status);
println!("---");
let handle = domain
.bind((), rt.clone())
.on_error(|report, d| {
d.error_log.push(format!("{report}"));
d.error_log.push(format!("{report:?}"));
})
.exec(Add(10), |_, _: &i32| {})
.tracked(|d: &mut AppData, status: TaskStatus<i32>| d.step1_status = status)
.exec_discard(Fail)
.tracked(|d: &mut AppData, status: TaskStatus<()>| d.step2_status = status)
.go();
loop {
let state = domain.read();
match &state.step1_status {
TaskStatus::Idle => {
println!("step1_status: idle");
}
TaskStatus::Pending => {
println!("step1_status: pending...");
}
TaskStatus::Resolved(v) => {
println!("step1_status: resolved ({v})");
break;
}
TaskStatus::Error(e) => {
println!("step1_status: error ({e})");
panic!("step1 should succeed");
}
TaskStatus::Aborted => {
println!("step1_status: aborted");
panic!("step1 should succeed");
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
loop {
let state = domain.read();
match &state.step2_status {
TaskStatus::Idle => {
println!("step2_status: idle");
}
TaskStatus::Pending => {
println!("step2_status: pending...");
}
TaskStatus::Resolved(()) => {
println!("step2_status: resolved");
panic!("step2 should fail");
}
TaskStatus::Error(e) => {
println!("step2_status: error ({e})");
break;
}
TaskStatus::Aborted => {
println!("step2_status: aborted");
panic!("step2 should fail with error, not abort");
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let flow = handle.await.unwrap();
assert_eq!(flow, ControlFlow::Break);
tokio::time::sleep(Duration::from_millis(10)).await;
let state = domain.read();
println!("---");
println!("step1_status = {:?}", state.step1_status);
println!("step2_status = {:?}", state.step2_status);
println!("error_log = {:?}", state.error_log);
assert!(state.step1_status.is_resolved());
assert_eq!(*state.step1_status.resolved().unwrap(), 10);
assert!(state.step2_status.is_error());
let err_msg = state.step2_status.error().unwrap().to_string();
assert!(
err_msg.contains("step failed"),
"expected actual error, got: {err_msg}"
);
assert!(
!err_msg.contains("chain error"),
"should not be generic 'chain error'"
);
assert!(!state.error_log.is_empty());
}