use async_trait::async_trait;
use deltaflow::{HasEntityId, NoopRecorder, Pipeline, RetryPolicy, Step, StepError};
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
struct Input {
id: String,
value: String,
}
impl HasEntityId for Input {
fn entity_id(&self) -> String {
self.id.clone()
}
}
#[derive(Clone, Debug, Serialize)]
struct Number {
id: String,
value: i32,
}
impl HasEntityId for Number {
fn entity_id(&self) -> String {
self.id.clone()
}
}
#[derive(Clone, Debug, Serialize)]
struct Output {
id: String,
message: String,
}
impl HasEntityId for Output {
fn entity_id(&self) -> String {
self.id.clone()
}
}
struct ParseStep;
#[async_trait]
impl Step for ParseStep {
type Input = Input;
type Output = Number;
fn name(&self) -> &'static str {
"parse"
}
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
println!("[{}] Parsing input: {:?}", self.name(), input.value);
match input.value.trim().parse::<i32>() {
Ok(value) => {
println!("[{}] Successfully parsed: {}", self.name(), value);
Ok(Number {
id: input.id,
value,
})
}
Err(e) => {
Err(StepError::permanent(anyhow::anyhow!(
"Failed to parse '{}': {}",
input.value,
e
)))
}
}
}
}
struct DoubleStep;
#[async_trait]
impl Step for DoubleStep {
type Input = Number;
type Output = Number;
fn name(&self) -> &'static str {
"double"
}
async fn execute(&self, mut input: Self::Input) -> Result<Self::Output, StepError> {
println!(
"[{}] Doubling {} -> {}",
self.name(),
input.value,
input.value * 2
);
input.value *= 2;
Ok(input)
}
}
struct FormatStep;
#[async_trait]
impl Step for FormatStep {
type Input = Number;
type Output = Output;
fn name(&self) -> &'static str {
"format"
}
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError> {
let message = format!("The final result is: {}", input.value);
println!("[{}] {}", self.name(), message);
Ok(Output {
id: input.id,
message,
})
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("=== Delta Pipeline Engine - Basic Example ===\n");
let pipeline = Pipeline::new("basic_example")
.start_with(ParseStep)
.then(DoubleStep)
.then(FormatStep)
.with_retry(RetryPolicy::exponential(3))
.with_recorder(NoopRecorder)
.build();
println!("--- Example 1: Successful execution ---");
let input = Input {
id: "example-1".to_string(),
value: "42".to_string(),
};
match pipeline.run(input).await {
Ok(output) => {
println!("\nSuccess! {}", output.message);
println!("Entity ID: {}\n", output.entity_id());
}
Err(e) => {
eprintln!("\nPipeline failed: {}\n", e);
}
}
println!("--- Example 2: Invalid input (permanent failure) ---");
let input = Input {
id: "example-2".to_string(),
value: "not a number".to_string(),
};
match pipeline.run(input).await {
Ok(output) => {
println!("\nSuccess! {}", output.message);
}
Err(e) => {
eprintln!("\nExpected failure: {}\n", e);
}
}
println!("--- Example 3: Using fixed retry policy ---");
let pipeline_fixed = Pipeline::new("fixed_retry_example")
.start_with(ParseStep)
.then(DoubleStep)
.then(FormatStep)
.with_retry(RetryPolicy::fixed(5, std::time::Duration::from_millis(100)))
.with_recorder(NoopRecorder)
.build();
let input = Input {
id: "example-3".to_string(),
value: "100".to_string(),
};
match pipeline_fixed.run(input).await {
Ok(output) => {
println!("\nSuccess! {}", output.message);
println!("Pipeline name: {}\n", pipeline_fixed.name());
}
Err(e) => {
eprintln!("\nPipeline failed: {}\n", e);
}
}
println!("=== Examples complete ===");
Ok(())
}