---
title: Advanced Patterns
description: Examples of complex, multi-step jobs and advanced use cases.
sidebar:
order: 7
---
import { Tabs, TabItem, Card, CardGrid, Aside } from '@astrojs/starlight/components';
<Aside type="tip">
View the complete source: [examples/advanced_patterns.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/advanced_patterns.rs)
</Aside>
# Advanced Patterns
This section covers more complex and realistic use cases that you might encounter when building batch applications. These examples demonstrate how to chain multiple steps together to create a complete data processing pipeline.
## Item Filtering with a Processor
A processor can silently discard items by returning `Ok(None)`. The item is not passed to the
writer and is counted in `StepExecution::filter_count` — separate from errors and skips.
### Three-State Return
```
Ok(Some(item)) → item passed to writer → process_count++
Ok(None) → item discarded silently → filter_count++
Err(BatchError) → processing failure → process_error_count++ (skip_limit applies)
```
### Filtering vs Error Handling
| Situation | Use |
|---|---|
| Business rule: discard minors, inactive records, test data | `Ok(None)` |
| Unexpected invalid data that should be reported | `Err(BatchError)` + `skip_limit` |
### Example: Keep Only Adults
```rust
use spring_batch_rs::{
core::{
item::{ItemProcessor, ItemProcessorResult},
job::JobBuilder,
step::StepBuilder,
},
item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder},
BatchError,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Person {
name: String,
age: u32,
}
#[derive(Default)]
struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter {
fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
if item.age >= 18 {
Ok(Some(item.clone())) // keep
} else {
Ok(None) // discard — increments filter_count, not an error
}
}
}
fn main() -> Result<(), BatchError> {
let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
let writer = JsonItemWriterBuilder::<Person>::new()
.from_path(temp_dir().join("adults.json"));
let step = StepBuilder::new("filter-adults")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&AdultFilter::default())
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
let exec = job.get_step_execution("filter-adults").unwrap();
println!("Read: {}", exec.read_count); // 6
println!("Filtered: {}", exec.filter_count); // 3 (minors discarded)
println!("Written: {}", exec.write_count); // 3 (adults kept)
Ok(())
}
```
```bash
cargo run --example filter_records_from_csv_with_processor --features csv,json
```
<Aside type="tip">
`Ok(None)` does **not** trigger `skip_limit`. Use it for intentional business filtering, not for
error recovery.
</Aside>
---
## Chaining Item Processors
<Aside type="tip">
View the complete source: [examples/chaining_processors.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/chaining_processors.rs)
</Aside>
`CompositeItemProcessorBuilder` lets you chain multiple processors into a single pipeline.
The output of each processor becomes the input of the next. If any processor returns
`Ok(None)`, the chain stops immediately and the item is filtered — subsequent processors
are never called.
### How It Works
```
item → p1 → intermediate₁ → p2 → intermediate₂ → ... → pN → output
↓ Ok(None)?
stop — item filtered
```
### Chaining Processors with Different Types
```rust
use spring_batch_rs::core::item::{
ItemProcessor, ItemProcessorResult, CompositeItemProcessorBuilder,
};
use spring_batch_rs::BatchError;
struct RawOrder { id: String, customer: String, amount: String }
#[derive(Clone)]
struct ParsedOrder { id: u32, customer: String, amount: f64 }
struct EnrichedOrder { id: u32, customer: String, amount: f64, tax: f64, total: f64 }
// Step 1: parse raw strings into typed values
struct ParseProcessor;
impl ItemProcessor<RawOrder, ParsedOrder> for ParseProcessor {
fn process(&self, item: &RawOrder) -> ItemProcessorResult<ParsedOrder> {
let id = item.id.trim().parse().ok();
let amount = item.amount.trim().parse().ok();
match (id, amount) {
(Some(id), Some(amount)) => Ok(Some(ParsedOrder {
id, amount, customer: item.customer.clone(),
})),
_ => Ok(None), // unparseable — filter silently
}
}
}
// Step 2: filter orders below threshold
struct ValidateProcessor { min_amount: f64 }
impl ItemProcessor<ParsedOrder, ParsedOrder> for ValidateProcessor {
fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<ParsedOrder> {
if item.amount < self.min_amount { Ok(None) } else { Ok(Some(item.clone())) }
}
}
// Step 3: add tax and total
struct EnrichProcessor;
impl ItemProcessor<ParsedOrder, EnrichedOrder> for EnrichProcessor {
fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<EnrichedOrder> {
let tax = (item.amount * 0.20 * 100.0).round() / 100.0;
Ok(Some(EnrichedOrder {
id: item.id, customer: item.customer.clone(),
amount: item.amount, tax, total: item.amount + tax,
}))
}
}
// Wire together: RawOrder → ParsedOrder → ParsedOrder → EnrichedOrder
let composite = CompositeItemProcessorBuilder::new(ParseProcessor)
.link(ValidateProcessor { min_amount: 10.0 })
.link(EnrichProcessor)
.build();
// Use with a step builder
let step = StepBuilder::new("enrich-orders")
.chunk::<RawOrder, EnrichedOrder>(100)
.reader(&reader)
.processor(&composite)
.writer(&writer)
.build();
```
```bash
cargo run --example chaining_processors --features csv,json
```
### Filter Propagation in a Chain
| Scenario | Result |
|---|---|
| Processor N returns `Ok(Some(item))` | Item forwarded to processor N+1 |
| Processor N returns `Ok(None)` | Chain stops — `filter_count++`, no write |
| Processor N returns `Err(e)` | Chain stops — `process_error_count++`, skip logic applies |
<Aside type="tip">
Chains of any length are supported. Each `.link()` call changes the output type, so
type mismatches are caught at compile time.
</Aside>
---
## Chaining Item Writers (Fan-out)
<Aside type="tip">
View the complete source: [examples/chaining_writers.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/chaining_writers.rs)
</Aside>
`CompositeItemWriterBuilder` lets you send the same chunk of items to multiple writers
simultaneously. Writers are called in order; if any writer fails the chain short-circuits
and returns the error.
### How It Works
```
chunk → w1 → w2 → ... → wN (all receive the same slice)
↓ Err?
stop — error propagated
```
### Fan-out to Logger and JSON File
```rust
use spring_batch_rs::core::item::CompositeItemWriterBuilder;
use spring_batch_rs::item::{
json::json_writer::JsonItemWriterBuilder,
logger::LoggerWriterBuilder,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Product { id: u32, name: String, price: f64 }
let json_writer = JsonItemWriterBuilder::<Product>::new()
.from_path(temp_dir().join("products.json"));
let logger_writer = LoggerWriterBuilder::<Product>::new().build();
// Both writers receive identical item slices on every chunk.
let composite = CompositeItemWriterBuilder::new(logger_writer)
.add(json_writer)
.build();
let step = StepBuilder::new("fan-out-products")
.chunk::<Product, Product>(10)
.reader(&reader)
.processor(&PassThroughProcessor::<Product>::new())
.writer(&composite)
.build();
```
```bash
cargo run --example chaining_writers --features csv,json,logger
```
### Error Behaviour
| Scenario | Result |
|---|---|
| All writers succeed | `Ok(())` |
| Writer N returns `Err(e)` | Chain stops — error propagated, writers N+1…M not called |
<Aside type="tip">
Chains of any length are supported. Each `.add()` call wraps the chain in a new
`CompositeItemWriter`, encoding the full structure in the type at zero runtime cost.
Use `Box<dyn ItemWriter<T>>` when you need dynamic dispatch instead.
</Aside>
---
## Multi-Step ETL Job
This example demonstrates a complete Extract, Transform, Load (ETL) job with two distinct steps. This is a common pattern for processing data from one source, transforming it, and loading it into another destination.
The job will perform the following actions:
1. **Step 1: Extract and Filter**
- Reads records from a source CSV file.
- Filters out any records belonging to the "Test" category.
- Writes the resulting records to an intermediate JSON file.
2. **Step 2: Transform and Load**
- Reads records from the intermediate JSON file.
- Transforms the data into a final `OutputRecord` format.
- "Loads" the data by logging it to the console.
This showcases how to use different readers, writers, and processors across multiple, sequential steps within a single `Job`.
```rust
use anyhow::Result;
use serde::{{Deserialize, Serialize}};
use spring_batch_rs::{{
core::{{
item::ItemProcessor,
job::{{Job, JobBuilder}},
step::StepBuilder,
}},
item::{{
csv::csv_reader::CsvItemReaderBuilder,
json::json_reader::JsonItemReaderBuilder,
json::json_writer::JsonItemWriterBuilder,
logger::LoggerItemWriter,
}},
BatchError,
}};
use std::env::temp_dir;
use std::fs::File;
use std::io::Write;
// --- Data Structures ---
// Input record from CSV.
#[derive(Deserialize, Clone, Debug)]
struct InputRecord {
id: u32,
name: String,
category: String,
value: f64,
}
// Intermediate record, stored in JSON. `Serialize` is needed for the writer.
#[derive(Deserialize, Serialize, Clone, Debug)]
struct IntermediateRecord {
id: u32,
name: String,
value: f64,
}
// Final record, after transformation.
#[derive(Serialize, Clone, Debug)]
struct OutputRecord {
record_id: String,
description: String,
processed_value: f64,
}
// --- Step 1: CSV to JSON with Filtering ---
// A custom processor that filters out records from the "Test" category.
struct FilterProcessor;
impl ItemProcessor<InputRecord, IntermediateRecord> for FilterProcessor {
fn process(&self, item: &InputRecord) -> ItemProcessorResult<IntermediateRecord> {
if item.category == "Test" {
println!("Filtering out test record: {{}}", item.name);
return Ok(None); // Returning None filters the item.
}
Ok(Some(IntermediateRecord {
id: item.id,
name: item.name.clone(),
value: item.value,
}))
}
}
// --- Step 2: JSON to Console Log with Transformation ---
// A custom processor that transforms an IntermediateRecord into an OutputRecord.
struct TransformProcessor;
impl ItemProcessor<IntermediateRecord, OutputRecord> for TransformProcessor {
fn process(&self, item: &IntermediateRecord) -> ItemProcessorResult<OutputRecord> {
Ok(Some(OutputRecord {
record_id: format!("REC-{{}}", item.id),
description: format!("{{}} (Value: {{}})", item.name, item.value),
processed_value: item.value * 1.1, // Apply a 10% markup.
}))
}
}
fn main() -> Result<()> {
println!("--- Starting Multi-Step ETL Job Example ---");
// --- Create dummy CSV file for input ---
let csv_path = temp_dir().join("input_data.csv");
let mut file = File::create(&csv_path)?;
file.write_all(b"id,name,category,value\n")?;
file.write_all(b"1,Product A,Live,100.0\n")?;
file.write_all(b"2,Product B,Test,200.0\n")?;
file.write_all(b"3,Product C,Live,300.0\n")?;
println!("Created dummy input file: {{:?}}", csv_path);
// Define path for the intermediate JSON file.
let intermediate_json_path = temp_dir().join("intermediate_data.json");
// --- 1. Configure and build Step 1 ---
println!("\n--- Configuring Step 1: CSV to JSON ---");
let csv_reader = CsvItemReaderBuilder::<InputRecord>::new()
.has_headers(true)
.from_path(&csv_path);
let json_writer = JsonItemWriterBuilder::<IntermediateRecord>::new().from_path(&intermediate_json_path);
let filter_processor = FilterProcessor;
let step1 = StepBuilder::new("csv_to_json_filter_step")
.chunk::<InputRecord, IntermediateRecord>(10)
.reader(&csv_reader)
.processor(&filter_processor)
.writer(&json_writer)
.build();
// --- 2. Configure and build Step 2 ---
println!("\n--- Configuring Step 2: JSON to Console ---");
let json_reader = JsonItemReaderBuilder::<IntermediateRecord>::new()
.from_path(&intermediate_json_path);
let transform_processor = TransformProcessor;
// This writer logs the final records to the console.
let log_writer = LoggerItemWriter::new();
let step2 = StepBuilder::new("json_to_log_transform_step")
.chunk::<IntermediateRecord, OutputRecord>(10)
.reader(&json_reader)
.processor(&transform_processor)
.writer(&log_writer)
.build();
// --- 3. Configure and Run the Job ---
println!("\n--- Configuring and Running the Job ---");
let job = JobBuilder::new()
.start(&step1) // Start with step 1
.next(&step2) // Proceed to step 2 after step 1 succeeds
.build();
let job_execution = job.run();
// --- 4. Verify results ---
assert!(job_execution.is_success());
let step1_execution = job.get_step_execution("csv_to_json_filter_step").unwrap();
assert_eq!(step1_execution.read_count, 3);
assert_eq!(step1_execution.write_count, 2); // 1 record was filtered.
let step2_execution = job.get_step_execution("json_to_log_transform_step").unwrap();
assert_eq!(step2_execution.read_count, 2);
assert_eq!(step2_execution.write_count, 2);
println!("\n--- Multi-Step ETL Job Finished Successfully! ---");
println!("Intermediate file is at: {{:?}}", intermediate_json_path);
Ok(())
}
```