use async_trait::async_trait;
use deltaflow::{
HasEntityId, NoopRecorder, Pipeline, RunnerBuilder, SqliteTaskStore, Step, StepError,
};
use deltaflow_harness::RunnerHarnessExt;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
struct Order {
id: String,
customer: String,
items: Vec<String>,
}
impl HasEntityId for Order {
fn entity_id(&self) -> String {
self.id.clone()
}
}
struct Receive;
struct Validate;
#[async_trait]
impl Step for Receive {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Receive" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Validate {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Validate" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
struct CheckStock;
struct Reserve;
struct Pack;
#[async_trait]
impl Step for CheckStock {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "CheckStock" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Reserve {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Reserve" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Pack {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Pack" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
struct Label;
struct Dispatch;
#[async_trait]
impl Step for Label {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Label" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Dispatch {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Dispatch" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
struct Queue;
struct Source;
#[async_trait]
impl Step for Queue {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Queue" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Source {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Source" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
struct Process;
struct Refund;
#[async_trait]
impl Step for Process {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Process" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Refund {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Refund" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
struct Format;
struct Send;
#[async_trait]
impl Step for Format {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Format" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[async_trait]
impl Step for Send {
type Input = Order;
type Output = Order;
fn name(&self) -> &'static str { "Send" }
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
Ok(input)
}
}
#[tokio::main]
async fn main() {
println!("==============================================");
println!(" Order Processing Pipeline - Visualizer Demo");
println!("==============================================");
println!();
println!("Open http://localhost:3000 in your browser");
println!("Press Ctrl+C to stop");
println!();
println!("Pipeline flow:");
println!(" orders -----> fulfillment ---> shipping ---> notify");
println!(" | | ^");
println!(" | +---> backorder --+ |");
println!(" | (retry) ----+ |");
println!(" +---> returns --------------------------> notify");
println!();
let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
let store = SqliteTaskStore::new(pool);
store.run_migrations().await.unwrap();
let orders = Pipeline::new("orders")
.start_with(Receive)
.then(Validate)
.fork_when(|_: &Order| true, "fulfillment").desc("valid")
.fork_when(|_: &Order| false, "returns").desc("invalid") .with_recorder(NoopRecorder)
.build();
let fulfillment = Pipeline::new("fulfillment")
.start_with(CheckStock)
.then(Reserve)
.then(Pack)
.fork_when(|_: &Order| true, "shipping").desc("in_stock")
.fork_when(|_: &Order| false, "backorder").desc("out_of_stock") .with_recorder(NoopRecorder)
.build();
let shipping = Pipeline::new("shipping")
.start_with(Label)
.then(Dispatch)
.fork_when(|_: &Order| true, "notify").desc("shipped")
.with_recorder(NoopRecorder)
.build();
let backorder = Pipeline::new("backorder")
.start_with(Queue)
.then(Source)
.fork_when(|_: &Order| true, "fulfillment").desc("restocked") .with_recorder(NoopRecorder)
.build();
let returns = Pipeline::new("returns")
.start_with(Process)
.then(Refund)
.fork_when(|_: &Order| true, "notify").desc("refunded")
.with_recorder(NoopRecorder)
.build();
let notify = Pipeline::new("notify")
.start_with(Format)
.then(Send)
.with_recorder(NoopRecorder)
.build();
let _runner = RunnerBuilder::new(store)
.pipeline(orders)
.pipeline(fulfillment)
.pipeline(shipping)
.pipeline(backorder)
.pipeline(returns)
.pipeline(notify)
.with_visualizer(3000)
.build();
tokio::signal::ctrl_c().await.unwrap();
println!("\nShutting down...");
}