use blazen::prelude::*;
use tokio_stream::StreamExt;
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct ProgressEvent {
step_name: String,
percent: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct ProcessEvent {
data: String,
}
#[step]
async fn step_one(event: StartEvent, ctx: Context) -> Result<ProcessEvent, WorkflowError> {
ctx.write_event_to_stream(ProgressEvent {
step_name: "step_one".into(),
percent: 0.0,
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
ctx.write_event_to_stream(ProgressEvent {
step_name: "step_one".into(),
percent: 50.0,
})
.await;
let data = event.data["input"].as_str().unwrap_or("").to_uppercase();
ctx.write_event_to_stream(ProgressEvent {
step_name: "step_one".into(),
percent: 100.0,
})
.await;
Ok(ProcessEvent { data })
}
#[step]
async fn step_two(event: ProcessEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
ctx.write_event_to_stream(ProgressEvent {
step_name: "step_two".into(),
percent: 0.0,
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let result = format!("Processed: {}", event.data);
ctx.write_event_to_stream(ProgressEvent {
step_name: "step_two".into(),
percent: 100.0,
})
.await;
Ok(StopEvent {
result: serde_json::json!({ "output": result }),
})
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let workflow = WorkflowBuilder::new("streaming_example")
.step(step_one_registration())
.step(step_two_registration())
.build()?;
let handler = workflow
.run(serde_json::json!({ "input": "hello world" }))
.await?;
let mut stream = handler.stream_events();
let stream_task = tokio::spawn(async move {
while let Some(event) = stream.next().await {
if let Some(progress) = event.downcast_ref::<ProgressEvent>() {
println!("[{}: {:.0}%]", progress.step_name, progress.percent);
}
}
});
let result = handler.result().await?.event;
let _ = tokio::time::timeout(std::time::Duration::from_millis(100), stream_task).await;
if let Some(stop) = result.downcast_ref::<StopEvent>() {
println!("\nFinal result: {}", stop.result);
}
Ok(())
}