use serde::{Deserialize, Serialize};
use spring_batch_rs::{
core::{
item::{CompositeItemProcessorBuilder, ItemProcessor, ItemProcessorResult},
job::{Job, JobBuilder},
step::StepBuilder,
},
item::{csv::csv_reader::CsvItemReaderBuilder, json::json_writer::JsonItemWriterBuilder},
};
use std::env::temp_dir;
#[derive(Debug, Deserialize)]
struct RawOrder {
id: String,
customer: String,
amount: String,
}
#[derive(Debug, Clone)]
struct ParsedOrder {
id: u32,
customer: String,
amount: f64,
}
#[derive(Debug, Serialize)]
struct EnrichedOrder {
id: u32,
customer: String,
amount: f64,
tax: f64,
total: f64,
}
struct ParseProcessor;
impl ItemProcessor<RawOrder, ParsedOrder> for ParseProcessor {
fn process(&self, item: &RawOrder) -> ItemProcessorResult<ParsedOrder> {
let id = match item.id.trim().parse::<u32>() {
Ok(v) => v,
Err(_) => return Ok(None), };
let amount = match item.amount.trim().parse::<f64>() {
Ok(v) => v,
Err(_) => return Ok(None), };
Ok(Some(ParsedOrder {
id,
customer: item.customer.trim().to_string(),
amount,
}))
}
}
struct ValidateProcessor {
min_amount: f64,
}
impl ItemProcessor<ParsedOrder, ParsedOrder> for ValidateProcessor {
fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<ParsedOrder> {
if item.amount < self.min_amount {
Ok(None) } else {
Ok(Some(item.clone()))
}
}
}
struct EnrichProcessor;
impl ItemProcessor<ParsedOrder, EnrichedOrder> for EnrichProcessor {
fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<EnrichedOrder> {
let tax = (item.amount * 0.20 * 100.0).round() / 100.0;
let total = ((item.amount + tax) * 100.0).round() / 100.0;
Ok(Some(EnrichedOrder {
id: item.id,
customer: item.customer.clone(),
amount: item.amount,
tax,
total,
}))
}
}
fn main() {
let csv = "\
id,customer,amount
1,Alice,50.00
2,Bob,8.00
3,Charlie,120.50
4,Diana,bad_amount
5,Eve,200.00";
let reader = CsvItemReaderBuilder::<RawOrder>::new()
.has_headers(true)
.from_reader(csv.as_bytes());
let output = temp_dir().join("enriched_orders.json");
let writer = JsonItemWriterBuilder::<EnrichedOrder>::new().from_path(&output);
let composite = CompositeItemProcessorBuilder::new(ParseProcessor)
.link(ValidateProcessor { min_amount: 10.0 })
.link(EnrichProcessor)
.build();
let step = StepBuilder::new("enrich-orders")
.chunk::<RawOrder, EnrichedOrder>(10)
.reader(&reader)
.processor(&composite)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run().expect("job failed");
let exec = job.get_step_execution("enrich-orders").unwrap();
println!("Output: {}", output.display());
println!("Read: {}", exec.read_count); println!("Processed: {}", exec.process_count); println!("Filtered: {}", exec.filter_count); println!("Written: {}", exec.write_count); }