---
title: ItemProcessor API
description: Complete reference for the ItemProcessor trait and transformation patterns
sidebar:
order: 3
---
import { Aside, Card, CardGrid, Tabs, TabItem } from '@astrojs/starlight/components';
The `ItemProcessor<I, O>` trait defines how to transform items from input type `I` to output type `O`. Processors are optional but powerful for data transformation, validation, and filtering.
## Trait Definition
```rust
pub trait ItemProcessor<I, O> {
/// Processes an item and returns the transformed result
///
/// # Returns
/// - `Ok(Some(processed_item))` - Successfully processed, pass to writer
/// - `Ok(None)` - Item filtered out, not passed to writer
/// - `Err(BatchError)` - Processing failed
fn process(&self, item: &I) -> ItemProcessorResult<O>;
}
```
<Aside type="tip">
The processor takes a **reference** to the input item, allowing zero-copy transformations when `I == O`.
</Aside>
## Type Alias
```rust
pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
```
## Key Characteristics
<CardGrid>
<Card title="Type Transformation" icon="star">
Convert between different types: `I` → `O`
</Card>
<Card title="Validation" icon="approve-check">
Validate items and reject invalid data
</Card>
<Card title="Filtering" icon="seti:todo">
Return `Ok(None)` to silently discard items. Filtered items increment `filter_count`, not `skip_count`.
</Card>
<Card title="Stateless Design" icon="puzzle">
Processors should be stateless for thread safety
</Card>
</CardGrid>
---
## Built-in Processor
### PassThroughProcessor
A no-op processor that clones items without transformation.
```rust
use spring_batch_rs::core::item::PassThroughProcessor;
#[derive(Clone)]
struct Product {
id: u32,
name: String,
}
let processor = PassThroughProcessor::<Product>::new();
```
**Use when:**
- You don't need transformation
- You want to use the same type for input and output
- Testing reader → writer pipelines
---
## Common Patterns
### 1. Type Transformation
Transform data from one structure to another:
```rust
use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Clone)]
struct RawProduct {
id: u32,
name: String,
price_cents: i64,
}
#[derive(Serialize)]
struct Product {
id: u32,
name: String,
price_dollars: f64,
}
struct ProductTransformer;
impl ItemProcessor<RawProduct, Product> for ProductTransformer {
fn process(&self, item: &RawProduct) -> ItemProcessorResult<Product> {
Ok(Some(Product {
id: item.id,
name: item.name.clone(),
price_dollars: item.price_cents as f64 / 100.0,
}))
}
}
```
### 2. Data Validation
Validate items and reject invalid data:
```rust
use spring_batch_rs::error::BatchError;
#[derive(Clone)]
struct User {
email: String,
age: i32,
}
struct UserValidator;
impl ItemProcessor<User, User> for UserValidator {
fn process(&self, item: &User) -> ItemProcessorResult<User> {
// Validate email format
if !item.email.contains('@') {
return Err(BatchError::ItemProcessor(
format!("Invalid email: {}", item.email)
));
}
// Validate age range
if item.age < 0 || item.age > 150 {
return Err(BatchError::ItemProcessor(
format!("Invalid age: {}", item.age)
));
}
Ok(Some(item.clone()))
}
}
```
<Aside type="tip">
Use `skip_limit()` on your step to handle validation errors gracefully without stopping the entire job.
</Aside>
### 3. Item Filtering
Return `Ok(None)` to silently discard items — they are not passed to the writer and are counted in `StepExecution::filter_count`. This is intentional filtering, not an error.
```rust
use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
#[derive(Clone)]
struct Person {
name: String,
age: u32,
}
struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter {
fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
if item.age >= 18 {
Ok(Some(item.clone())) // keep adults
} else {
Ok(None) // filter out minors — counted in filter_count
}
}
}
```
After job execution, check the filter count:
```rust
for step_execution in &result.step_executions {
println!("Filtered: {}", step_execution.filter_count);
}
```
<Aside type="tip">
Filtering with `Ok(None)` does **not** count toward `skip_limit`. Use it for intentional business filtering, not for error recovery.
</Aside>
### 4. Data Enrichment
Add additional information to items:
```rust
use std::collections::HashMap;
#[derive(Clone)]
struct Order {
id: u32,
product_id: u32,
quantity: u32,
}
#[derive(Serialize)]
struct EnrichedOrder {
id: u32,
product_id: u32,
product_name: String,
quantity: u32,
unit_price: f64,
total_price: f64,
}
struct OrderEnricher {
product_catalog: HashMap<u32, (String, f64)>, // (name, price)
}
impl ItemProcessor<Order, EnrichedOrder> for OrderEnricher {
fn process(&self, item: &Order) -> ItemProcessorResult<EnrichedOrder> {
let (product_name, unit_price) = self.product_catalog
.get(&item.product_id)
.cloned()
.ok_or_else(|| BatchError::ItemProcessor(
format!("Unknown product: {}", item.product_id)
))?;
let total_price = unit_price * item.quantity as f64;
Ok(Some(EnrichedOrder {
id: item.id,
product_id: item.product_id,
product_name,
quantity: item.quantity,
unit_price,
total_price,
}))
}
}
```
### 5. Data Cleansing
Clean and normalize data:
```rust
struct DataCleanser;
impl ItemProcessor<String, String> for DataCleanser {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let cleaned = item
.trim() // Remove whitespace
.to_lowercase() // Normalize case
.replace(" ", " ") // Remove double spaces
.chars()
.filter(|c| c.is_alphanumeric() || c.is_whitespace())
.collect(); // Remove special chars
Ok(Some(cleaned))
}
}
```
### 6. Conditional Processing
Apply different logic based on item properties:
```rust
#[derive(Clone)]
struct Transaction {
amount: f64,
category: String,
}
#[derive(Serialize)]
struct ProcessedTransaction {
amount: f64,
category: String,
tax: f64,
final_amount: f64,
}
struct TaxCalculator {
default_tax_rate: f64,
}
impl ItemProcessor<Transaction, ProcessedTransaction> for TaxCalculator {
fn process(&self, item: &Transaction) -> ItemProcessorResult<ProcessedTransaction> {
// Different tax rates by category
let tax_rate = match item.category.as_str() {
"food" => 0.05,
"electronics" => 0.15,
"books" => 0.0, // No tax on books
_ => self.default_tax_rate,
};
let tax = item.amount * tax_rate;
let final_amount = item.amount + tax;
Ok(Some(ProcessedTransaction {
amount: item.amount,
category: item.category.clone(),
tax,
final_amount,
}))
}
}
```
### 7. String Transformations
Common string operations:
```rust
struct StringProcessor;
impl ItemProcessor<String, String> for StringProcessor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
Ok(Some(item.to_uppercase()
.replace(" ", "_")
.trim()
.to_string()))
}
}
```
---
## Advanced Patterns
### Chaining Processors
You can chain multiple processors together:
```rust
use spring_batch_rs::core::item::ItemProcessor;
struct ProcessorChain<I, M, O> {
first: Box<dyn ItemProcessor<I, M>>,
second: Box<dyn ItemProcessor<M, O>>,
}
impl<I, M, O> ItemProcessor<I, O> for ProcessorChain<I, M, O> {
fn process(&self, item: &I) -> ItemProcessorResult<O> {
match self.first.process(item)? {
Some(intermediate) => self.second.process(&intermediate),
None => Ok(None), // propagate filter
}
}
}
```
### Stateful Processing with Interior Mutability
When you need state (use carefully):
```rust
use std::sync::Mutex;
struct Counter {
count: Mutex<u64>,
}
impl ItemProcessor<String, String> for Counter {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let mut count = self.count.lock().unwrap();
*count += 1;
Ok(Some(format!("{}: {}", *count, item)))
}
}
```
<Aside type="caution">
Stateful processors must use interior mutability (`Mutex`, `RwLock`) since `process()` takes `&self`. Prefer stateless designs when possible.
</Aside>
### Async Operations (via blocking)
If you need async operations:
```rust
use tokio::runtime::Runtime;
struct ApiEnricher {
runtime: Runtime,
}
impl ItemProcessor<String, String> for ApiEnricher {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
// Block on async operation
self.runtime.block_on(async {
// Call external API
let response = fetch_api_data(item).await?;
Ok(Some(response))
})
}
}
async fn fetch_api_data(input: &str) -> Result<String, BatchError> {
// Async API call
Ok(format!("enriched: {}", input))
}
```
---
## Error Handling
### Recoverable Errors
Use with `skip_limit()` to continue processing:
```rust
impl ItemProcessor<Data, Data> for MyProcessor {
fn process(&self, item: &Data) -> ItemProcessorResult<Data> {
if item.is_invalid() {
return Err(BatchError::ItemProcessor(
"Invalid data".to_string()
));
}
Ok(Some(item.clone()))
}
}
// In step configuration:
let step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10) // Skip up to 10 invalid items
.build();
```
### Unrecoverable Errors
Stop the job immediately:
```rust
impl ItemProcessor<Data, Data> for MyProcessor {
fn process(&self, item: &Data) -> ItemProcessorResult<Data> {
if critical_failure() {
return Err(BatchError::Fatal(
"Critical system error".to_string()
));
}
Ok(Some(item.clone()))
}
}
```
---
## Testing Processors
Processors are easy to unit test:
```rust
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_product_transformer() {
let processor = ProductTransformer;
let raw = RawProduct {
id: 1,
name: "Widget".to_string(),
price_cents: 1999,
};
let result = processor.process(&raw).unwrap().unwrap();
assert_eq!(result.id, 1);
assert_eq!(result.name, "Widget");
assert_eq!(result.price_dollars, 19.99);
}
#[test]
fn test_validation_failure() {
let processor = UserValidator;
let invalid_user = User {
email: "not-an-email".to_string(),
age: 25,
};
assert!(processor.process(&invalid_user).is_err());
}
}
```
---
## Real-World Examples
### ETL Pipeline
```rust
#[derive(Deserialize, Clone)]
struct SourceRecord {
customer_id: String,
amount: String,
date: String,
}
#[derive(Serialize)]
struct TargetRecord {
customer_id: i64,
amount_cents: i64,
date: chrono::NaiveDate,
}
struct ETLProcessor;
impl ItemProcessor<SourceRecord, TargetRecord> for ETLProcessor {
fn process(&self, item: &SourceRecord) -> ItemProcessorResult<TargetRecord> {
// Parse customer ID
let customer_id = item.customer_id.parse::<i64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid customer_id: {}", e)
))?;
// Parse amount
let amount_cents = (item.amount.parse::<f64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid amount: {}", e)
))? * 100.0) as i64;
// Parse date
let date = chrono::NaiveDate::parse_from_str(&item.date, "%Y-%m-%d")
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid date: {}", e)
))?;
Ok(Some(TargetRecord {
customer_id,
amount_cents,
date,
}))
}
}
```
### PII Redaction
```rust
use regex::Regex;
struct PIIRedactor {
email_pattern: Regex,
phone_pattern: Regex,
}
impl PIIRedactor {
fn new() -> Self {
Self {
email_pattern: Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b").unwrap(),
phone_pattern: Regex::new(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b").unwrap(),
}
}
}
impl ItemProcessor<String, String> for PIIRedactor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let mut redacted = item.clone();
// Redact emails
redacted = self.email_pattern.replace_all(&redacted, "[EMAIL]").to_string();
// Redact phone numbers
redacted = self.phone_pattern.replace_all(&redacted, "[PHONE]").to_string();
Ok(Some(redacted))
}
}
```
---
## Best Practices
<CardGrid>
<Card title="Keep It Simple" icon="star">
Processors should do one thing well. Chain multiple processors for complex transformations.
</Card>
<Card title="Stateless Design" icon="puzzle">
Prefer stateless processors for better parallelization and testing.
</Card>
<Card title="Immutability" icon="approve-check">
Work with references and return new instances rather than mutating input.
</Card>
<Card title="Error Context" icon="warning">
Provide detailed error messages including which field or validation failed.
</Card>
</CardGrid>
## Performance Tips
<Tabs>
<TabItem label="Memory">
- Avoid cloning large structures unnecessarily
- Use `Cow<str>` for strings that might not need copying
- Consider `Arc` for shared read-only data
</TabItem>
<TabItem label="Computation">
- Cache expensive computations (regex patterns, lookup tables)
- Use lazy initialization for resources
- Profile hot paths and optimize accordingly
</TabItem>
<TabItem label="I/O">
- Batch external API calls when possible
- Use connection pooling for database lookups
- Consider caching frequently accessed reference data
</TabItem>
</Tabs>
## See Also
- [ItemReader API](/spring-batch-rs/api/item-reader/) - Reading data sources
- [ItemWriter API](/spring-batch-rs/api/item-writer/) - Writing destinations
- [Error Handling Guide](/spring-batch-rs/error-handling/) - Fault tolerance patterns
- [Advanced Patterns](/spring-batch-rs/examples/advanced-patterns/) - Complex processor examples