use memable::{Context, Engine, EngineError};
async fn data_pipeline(ctx: Context) -> Result<(), EngineError> {
ctx.set_status("fetching records");
let count: u32 = ctx
.step("fetch-records:v1")
.run(async || {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
Ok(42)
})
.await?;
ctx.set_status(format!("validating {count} records"));
let valid: u32 = ctx
.step("validate:v1")
.run(async || {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
Ok(38)
})
.await?;
ctx.set_status(format!("transforming {valid} records"));
let _transformed: Vec<String> = ctx
.step("transform:v1")
.run(async || {
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
Ok(vec!["row-a".into(), "row-b".into()])
})
.await?;
ctx.set_status("writing to destination");
let _rows_written: u32 = ctx
.step("write:v1")
.run(async || {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
Ok(38)
})
.await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut engine = Engine::builder().in_memory().build();
engine.register("data-pipeline", data_pipeline);
engine.start().await?;
let (instance_id, mut status_rx) = engine.invoke("data-pipeline").await?.into_parts();
let monitor = tokio::spawn(async move {
while status_rx.changed().await.is_ok() {
let state = status_rx.borrow_and_update().clone();
match state.message() {
Some(msg) => println!("[{instance_id}] {msg}"),
None => println!("[{instance_id}] {state}"),
}
if state.is_terminal() {
break;
}
}
});
monitor.await?;
engine.stop().await;
Ok(())
}