use serde::{Deserialize, Serialize};
use spring_batch_rs::{
BatchError,
core::{
item::{ItemProcessor, ItemReader, PassThroughProcessor},
job::{Job, JobBuilder},
step::StepBuilder,
},
item::{
csv::csv_reader::CsvItemReaderBuilder, csv::csv_writer::CsvItemWriterBuilder,
json::json_reader::JsonItemReaderBuilder, json::json_writer::JsonItemWriterBuilder,
logger::LoggerWriterBuilder,
},
};
use std::{cell::RefCell, collections::VecDeque, env::temp_dir, fs::File};
#[derive(Debug, Clone, Deserialize, Serialize)]
struct RawTransaction {
id: u32,
account: String,
amount: f64,
transaction_type: String,
status: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
struct ValidTransaction {
id: u32,
account: String,
amount: f64,
transaction_type: String,
}
#[derive(Debug, Clone, Serialize)]
struct EnrichedTransaction {
transaction_id: String,
account_number: String,
gross_amount: f64,
fee: f64,
net_amount: f64,
category: String,
}
#[derive(Debug, Clone, Serialize)]
struct TransactionSummary {
account: String,
total_credits: f64,
total_debits: f64,
net_balance: f64,
transaction_count: u32,
}
struct ValidationProcessor;
impl ItemProcessor<RawTransaction, ValidTransaction> for ValidationProcessor {
fn process(&self, item: &RawTransaction) -> Result<Option<ValidTransaction>, BatchError> {
if item.status != "completed" {
return Ok(None);
}
if item.amount <= 0.0 {
return Err(BatchError::ItemProcessor(format!(
"Invalid amount for transaction {}: {}",
item.id, item.amount
)));
}
Ok(Some(ValidTransaction {
id: item.id,
account: item.account.clone(),
amount: item.amount,
transaction_type: item.transaction_type.clone(),
}))
}
}
struct EnrichmentProcessor {
fee_rate: f64,
}
impl EnrichmentProcessor {
fn new(fee_rate: f64) -> Self {
Self { fee_rate }
}
}
impl ItemProcessor<ValidTransaction, EnrichedTransaction> for EnrichmentProcessor {
fn process(&self, item: &ValidTransaction) -> Result<Option<EnrichedTransaction>, BatchError> {
let fee = if item.transaction_type == "credit" {
0.0 } else {
item.amount * self.fee_rate
};
let category = match item.amount {
a if a >= 10000.0 => "large",
a if a >= 1000.0 => "medium",
_ => "small",
};
Ok(Some(EnrichedTransaction {
transaction_id: format!("TXN-{:06}", item.id),
account_number: item.account.clone(),
gross_amount: item.amount,
fee,
net_amount: item.amount - fee,
category: category.to_string(),
}))
}
}
struct InMemoryReader<T> {
items: RefCell<VecDeque<T>>,
}
impl<T: Clone> InMemoryReader<T> {
fn new(items: Vec<T>) -> Self {
Self {
items: RefCell::new(items.into()),
}
}
}
impl<T: Clone> ItemReader<T> for InMemoryReader<T> {
fn read(&self) -> Result<Option<T>, BatchError> {
Ok(self.items.borrow_mut().pop_front())
}
}
fn example_multi_step_etl() -> Result<(), BatchError> {
println!("=== Example 1: Multi-Step ETL Pipeline ===");
println!(" Pipeline: CSV -> Validate -> JSON -> Enrich -> CSV\n");
let raw_csv = "\
id,account,amount,transaction_type,status
1,ACC001,5000.00,debit,completed
2,ACC001,1500.00,credit,completed
3,ACC002,250.00,debit,cancelled
4,ACC002,10000.00,debit,completed
5,ACC003,750.00,credit,failed
6,ACC003,3000.00,debit,completed";
println!(" Step 1: Validating transactions...");
let csv_reader = CsvItemReaderBuilder::<RawTransaction>::new()
.has_headers(true)
.from_reader(raw_csv.as_bytes());
let intermediate_path = temp_dir().join("validated_transactions.json");
let json_writer =
JsonItemWriterBuilder::<ValidTransaction>::new().from_path(&intermediate_path);
let validation_processor = ValidationProcessor;
let validate_step = StepBuilder::new("validate-transactions")
.chunk::<RawTransaction, ValidTransaction>(10)
.reader(&csv_reader)
.processor(&validation_processor)
.writer(&json_writer)
.skip_limit(10) .build();
println!(" Step 2: Enriching transactions...");
let json_file = File::open(&intermediate_path)
.map_err(|e| BatchError::ItemReader(format!("Cannot open intermediate file: {}", e)))?;
let json_reader = JsonItemReaderBuilder::<ValidTransaction>::new().from_reader(json_file);
let output_path = temp_dir().join("enriched_transactions.csv");
let csv_writer = CsvItemWriterBuilder::<EnrichedTransaction>::new()
.has_headers(true)
.from_path(&output_path);
let enrichment_processor = EnrichmentProcessor::new(0.02);
let enrich_step = StepBuilder::new("enrich-transactions")
.chunk::<ValidTransaction, EnrichedTransaction>(10)
.reader(&json_reader)
.processor(&enrichment_processor)
.writer(&csv_writer)
.build();
let job = JobBuilder::new()
.start(&validate_step)
.next(&enrich_step)
.build();
let result = job.run()?;
let step1_exec = job.get_step_execution("validate-transactions").unwrap();
let step2_exec = job.get_step_execution("enrich-transactions").unwrap();
println!("\n Results:");
println!(
" Validation: {} read, {} written, {} skipped",
step1_exec.read_count, step1_exec.write_count, step1_exec.read_error_count
);
println!(
" Enrichment: {} read, {} written",
step2_exec.read_count, step2_exec.write_count
);
println!(" Total duration: {:?}", result.duration);
println!(" Output: {}", output_path.display());
Ok(())
}
fn example_parallel_conversion() -> Result<(), BatchError> {
println!("\n=== Example 2: Multi-Format Export ===");
let transactions = vec![
ValidTransaction {
id: 1,
account: "ACC001".to_string(),
amount: 1000.0,
transaction_type: "debit".to_string(),
},
ValidTransaction {
id: 2,
account: "ACC002".to_string(),
amount: 2500.0,
transaction_type: "credit".to_string(),
},
ValidTransaction {
id: 3,
account: "ACC003".to_string(),
amount: 500.0,
transaction_type: "debit".to_string(),
},
];
let json_reader = InMemoryReader::new(transactions.clone());
let json_path = temp_dir().join("transactions.json");
let json_writer = JsonItemWriterBuilder::<ValidTransaction>::new()
.pretty_formatter(true)
.from_path(&json_path);
let json_processor = PassThroughProcessor::<ValidTransaction>::new();
let json_step = StepBuilder::new("export-json")
.chunk::<ValidTransaction, ValidTransaction>(10)
.reader(&json_reader)
.processor(&json_processor)
.writer(&json_writer)
.build();
let csv_reader = InMemoryReader::new(transactions);
let csv_path = temp_dir().join("transactions.csv");
let csv_writer = CsvItemWriterBuilder::<ValidTransaction>::new()
.has_headers(true)
.from_path(&csv_path);
let csv_processor = PassThroughProcessor::<ValidTransaction>::new();
let csv_step = StepBuilder::new("export-csv")
.chunk::<ValidTransaction, ValidTransaction>(10)
.reader(&csv_reader)
.processor(&csv_processor)
.writer(&csv_writer)
.build();
let job = JobBuilder::new().start(&json_step).next(&csv_step).build();
job.run()?;
println!(" Exported to:");
println!(" - {}", json_path.display());
println!(" - {}", csv_path.display());
Ok(())
}
fn example_aggregation_pipeline() -> Result<(), BatchError> {
println!("\n=== Example 3: Aggregation Pipeline ===");
let transactions = vec![
ValidTransaction {
id: 1,
account: "ACC001".to_string(),
amount: 1000.0,
transaction_type: "credit".to_string(),
},
ValidTransaction {
id: 2,
account: "ACC001".to_string(),
amount: 500.0,
transaction_type: "debit".to_string(),
},
ValidTransaction {
id: 3,
account: "ACC001".to_string(),
amount: 200.0,
transaction_type: "credit".to_string(),
},
ValidTransaction {
id: 4,
account: "ACC002".to_string(),
amount: 3000.0,
transaction_type: "credit".to_string(),
},
ValidTransaction {
id: 5,
account: "ACC002".to_string(),
amount: 1500.0,
transaction_type: "debit".to_string(),
},
];
use std::collections::HashMap;
let mut accounts: HashMap<String, (f64, f64, u32)> = HashMap::new();
for txn in &transactions {
let entry = accounts.entry(txn.account.clone()).or_insert((0.0, 0.0, 0));
if txn.transaction_type == "credit" {
entry.0 += txn.amount;
} else {
entry.1 += txn.amount;
}
entry.2 += 1;
}
let summaries: Vec<TransactionSummary> = accounts
.into_iter()
.map(|(account, (credits, debits, count))| TransactionSummary {
account,
total_credits: credits,
total_debits: debits,
net_balance: credits - debits,
transaction_count: count,
})
.collect();
let reader = InMemoryReader::new(summaries);
let output_path = temp_dir().join("account_summaries.csv");
let writer = CsvItemWriterBuilder::<TransactionSummary>::new()
.has_headers(true)
.from_path(&output_path);
let processor = PassThroughProcessor::<TransactionSummary>::new();
let step = StepBuilder::new("write-summaries")
.chunk::<TransactionSummary, TransactionSummary>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
println!(" Aggregated {} accounts", 2);
println!(" Output: {}", output_path.display());
Ok(())
}
fn example_error_handling() -> Result<(), BatchError> {
println!("\n=== Example 4: Error Handling and Monitoring ===");
let csv_data = "\
id,account,amount,transaction_type,status
1,ACC001,1000.00,debit,completed
2,ACC002,invalid_amount,credit,completed
3,ACC003,2000.00,debit,completed
4,ACC004,-500.00,debit,completed
5,ACC005,3000.00,credit,completed";
let reader = CsvItemReaderBuilder::<RawTransaction>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
let writer = LoggerWriterBuilder::<ValidTransaction>::new().build();
let processor = ValidationProcessor;
let step = StepBuilder::new("error-handling-step")
.chunk::<RawTransaction, ValidTransaction>(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(5) .build();
let job = JobBuilder::new().start(&step).build();
let result = job.run()?;
let step_exec = job.get_step_execution("error-handling-step").unwrap();
println!("\n Execution Summary:");
println!(" Status: {:?}", step_exec.status);
println!(" Read count: {}", step_exec.read_count);
println!(" Write count: {}", step_exec.write_count);
println!(" Read errors: {}", step_exec.read_error_count);
println!(" Process errors: {}", step_exec.process_error_count);
println!(" Duration: {:?}", result.duration);
Ok(())
}
fn main() -> Result<(), BatchError> {
env_logger::init();
println!("Advanced Batch Processing Patterns");
println!("==================================\n");
example_multi_step_etl()?;
example_parallel_conversion()?;
example_aggregation_pipeline()?;
example_error_handling()?;
println!("\n✓ All advanced pattern examples completed successfully!");
Ok(())
}