use std::time::Duration;
use memable::{Context, Engine, EngineError, MetadataStatus, WorkflowState};
async fn pipeline_with_cooldown(ctx: Context) -> Result<(), EngineError> {
let batch_size: u32 = ctx
.step("fetch-batch:v1")
.run(async || {
println!(" [step] fetching batch from upstream API");
Ok(50)
})
.await?;
println!(" [timer] waiting 2 seconds before processing...");
ctx.timer("cooldown:v1", Duration::from_secs(2))?;
let _processed: u32 = ctx
.step("process-batch:v1")
.run(async move || {
println!(" [step] processing {batch_size} records");
Ok(batch_size)
})
.await?;
println!(" [workflow] done — processed {batch_size} records after cooldown");
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut engine = Engine::builder().in_memory().build();
engine.register("pipeline", pipeline_with_cooldown);
engine.start().await?;
println!("=== Invoking workflow ===");
let inv = engine.invoke("pipeline").await?;
let instance_id = inv.instance_id().to_string();
let state = inv.wait().await;
println!("State: {state}");
assert!(matches!(state, WorkflowState::Suspended(_)));
let meta = engine
.get_metadata("pipeline", &instance_id)?
.expect("instance exists");
println!("Metadata: {}", meta.status());
println!();
println!("=== Waiting for timer to fire... ===");
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
let meta = engine
.get_metadata("pipeline", &instance_id)?
.expect("instance exists");
if meta.status().is_terminal() {
println!("State: {}", meta.status());
assert_eq!(*meta.status(), MetadataStatus::Completed);
break;
}
}
println!();
println!("=== Timer fired and workflow completed ===");
println!("=== Resuming (all steps memoised) ===");
let state = engine.resume("pipeline", &instance_id).await?.wait().await;
println!("State: {state}");
assert_eq!(state, WorkflowState::Completed);
engine.stop().await;
Ok(())
}