spring-batch-rs 0.3.4

A toolkit for building enterprise-grade batch applications
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
---
title: Advanced Patterns
description: Examples of complex, multi-step jobs and advanced use cases.
sidebar:
  order: 7
---

import { Tabs, TabItem, Card, CardGrid, Aside } from '@astrojs/starlight/components';

<Aside type="tip">
  View the complete source: [examples/advanced_patterns.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/advanced_patterns.rs)
</Aside>

# Advanced Patterns

This section covers more complex and realistic use cases that you might encounter when building batch applications. These examples demonstrate how to chain multiple steps together to create a complete data processing pipeline.

## Item Filtering with a Processor

A processor can silently discard items by returning `Ok(None)`. The item is not passed to the
writer and is counted in `StepExecution::filter_count` — separate from errors and skips.

### Three-State Return

```
Ok(Some(item))  →  item passed to writer  →  process_count++
Ok(None)        →  item discarded silently →  filter_count++
Err(BatchError) →  processing failure      →  process_error_count++  (skip_limit applies)
```

### Filtering vs Error Handling

| Situation | Use |
|---|---|
| Business rule: discard minors, inactive records, test data | `Ok(None)` |
| Unexpected invalid data that should be reported | `Err(BatchError)` + `skip_limit` |

### Example: Keep Only Adults

```rust
use spring_batch_rs::{
    core::{
        item::{ItemProcessor, ItemProcessorResult},
        job::JobBuilder,
        step::StepBuilder,
    },
    item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder},
    BatchError,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;

#[derive(Debug, Deserialize, Serialize, Clone)]
struct Person {
    name: String,
    age: u32,
}

#[derive(Default)]
struct AdultFilter;

impl ItemProcessor<Person, Person> for AdultFilter {
    fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
        if item.age >= 18 {
            Ok(Some(item.clone())) // keep
        } else {
            Ok(None) // discard — increments filter_count, not an error
        }
    }
}

fn main() -> Result<(), BatchError> {
    let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";

    let reader = CsvItemReaderBuilder::<Person>::new()
        .has_headers(true)
        .from_reader(csv_data.as_bytes());

    let writer = JsonItemWriterBuilder::<Person>::new()
        .from_path(temp_dir().join("adults.json"));

    let step = StepBuilder::new("filter-adults")
        .chunk::<Person, Person>(10)
        .reader(&reader)
        .processor(&AdultFilter::default())
        .writer(&writer)
        .build();

    let job = JobBuilder::new().start(&step).build();
    job.run()?;

    let exec = job.get_step_execution("filter-adults").unwrap();
    println!("Read:     {}", exec.read_count);    // 6
    println!("Filtered: {}", exec.filter_count);  // 3 (minors discarded)
    println!("Written:  {}", exec.write_count);   // 3 (adults kept)

    Ok(())
}
```

```bash
cargo run --example filter_records_from_csv_with_processor --features csv,json
```

<Aside type="tip">
  `Ok(None)` does **not** trigger `skip_limit`. Use it for intentional business filtering, not for
  error recovery.
</Aside>

---

## Chaining Item Processors

<Aside type="tip">
  View the complete source: [examples/chaining_processors.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/chaining_processors.rs)
</Aside>

`CompositeItemProcessorBuilder` lets you chain multiple processors into a single pipeline.
The output of each processor becomes the input of the next. If any processor returns
`Ok(None)`, the chain stops immediately and the item is filtered — subsequent processors
are never called.

### How It Works

```
item → p1 → intermediate₁ → p2 → intermediate₂ → ... → pN → output
         ↓ Ok(None)?
        stop — item filtered
```

### Chaining Processors with Different Types

```rust
use spring_batch_rs::core::item::{
    ItemProcessor, ItemProcessorResult, CompositeItemProcessorBuilder,
};
use spring_batch_rs::BatchError;

struct RawOrder { id: String, customer: String, amount: String }

#[derive(Clone)]
struct ParsedOrder { id: u32, customer: String, amount: f64 }

struct EnrichedOrder { id: u32, customer: String, amount: f64, tax: f64, total: f64 }

// Step 1: parse raw strings into typed values
struct ParseProcessor;
impl ItemProcessor<RawOrder, ParsedOrder> for ParseProcessor {
    fn process(&self, item: &RawOrder) -> ItemProcessorResult<ParsedOrder> {
        let id = item.id.trim().parse().ok();
        let amount = item.amount.trim().parse().ok();
        match (id, amount) {
            (Some(id), Some(amount)) => Ok(Some(ParsedOrder {
                id, amount, customer: item.customer.clone(),
            })),
            _ => Ok(None), // unparseable — filter silently
        }
    }
}

// Step 2: filter orders below threshold
struct ValidateProcessor { min_amount: f64 }
impl ItemProcessor<ParsedOrder, ParsedOrder> for ValidateProcessor {
    fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<ParsedOrder> {
        if item.amount < self.min_amount { Ok(None) } else { Ok(Some(item.clone())) }
    }
}

// Step 3: add tax and total
struct EnrichProcessor;
impl ItemProcessor<ParsedOrder, EnrichedOrder> for EnrichProcessor {
    fn process(&self, item: &ParsedOrder) -> ItemProcessorResult<EnrichedOrder> {
        let tax = (item.amount * 0.20 * 100.0).round() / 100.0;
        Ok(Some(EnrichedOrder {
            id: item.id, customer: item.customer.clone(),
            amount: item.amount, tax, total: item.amount + tax,
        }))
    }
}

// Wire together: RawOrder → ParsedOrder → ParsedOrder → EnrichedOrder
let composite = CompositeItemProcessorBuilder::new(ParseProcessor)
    .link(ValidateProcessor { min_amount: 10.0 })
    .link(EnrichProcessor)
    .build();

// Use with a step builder
let step = StepBuilder::new("enrich-orders")
    .chunk::<RawOrder, EnrichedOrder>(100)
    .reader(&reader)
    .processor(&composite)
    .writer(&writer)
    .build();
```

```bash
cargo run --example chaining_processors --features csv,json
```

### Filter Propagation in a Chain

| Scenario | Result |
|---|---|
| Processor N returns `Ok(Some(item))` | Item forwarded to processor N+1 |
| Processor N returns `Ok(None)` | Chain stops — `filter_count++`, no write |
| Processor N returns `Err(e)` | Chain stops — `process_error_count++`, skip logic applies |

<Aside type="tip">
  Chains of any length are supported. Each `.link()` call changes the output type, so
  type mismatches are caught at compile time.
</Aside>

---

## Chaining Item Writers (Fan-out)

<Aside type="tip">
  View the complete source: [examples/chaining_writers.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/chaining_writers.rs)
</Aside>

`CompositeItemWriterBuilder` lets you send the same chunk of items to multiple writers
simultaneously. Writers are called in order; if any writer fails the chain short-circuits
and returns the error.

### How It Works

```
chunk → w1 → w2 → ... → wN   (all receive the same slice)
          ↓ Err?
         stop — error propagated
```

### Fan-out to Logger and JSON File

```rust
use spring_batch_rs::core::item::CompositeItemWriterBuilder;
use spring_batch_rs::item::{
    json::json_writer::JsonItemWriterBuilder,
    logger::LoggerWriterBuilder,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;

#[derive(Debug, Deserialize, Serialize, Clone)]
struct Product { id: u32, name: String, price: f64 }

let json_writer = JsonItemWriterBuilder::<Product>::new()
    .from_path(temp_dir().join("products.json"));
let logger_writer = LoggerWriterBuilder::<Product>::new().build();

// Both writers receive identical item slices on every chunk.
let composite = CompositeItemWriterBuilder::new(logger_writer)
    .add(json_writer)
    .build();

let step = StepBuilder::new("fan-out-products")
    .chunk::<Product, Product>(10)
    .reader(&reader)
    .processor(&PassThroughProcessor::<Product>::new())
    .writer(&composite)
    .build();
```

```bash
cargo run --example chaining_writers --features csv,json,logger
```

### Error Behaviour

| Scenario | Result |
|---|---|
| All writers succeed | `Ok(())` |
| Writer N returns `Err(e)` | Chain stops — error propagated, writers N+1…M not called |

<Aside type="tip">
  Chains of any length are supported. Each `.add()` call wraps the chain in a new
  `CompositeItemWriter`, encoding the full structure in the type at zero runtime cost.
  Use `Box&lt;dyn ItemWriter&lt;T&gt;&gt;` when you need dynamic dispatch instead.
</Aside>

---

## Multi-Step ETL Job

This example demonstrates a complete Extract, Transform, Load (ETL) job with two distinct steps. This is a common pattern for processing data from one source, transforming it, and loading it into another destination.

The job will perform the following actions:

1.  **Step 1: Extract and Filter**
    - Reads records from a source CSV file.
    - Filters out any records belonging to the "Test" category.
    - Writes the resulting records to an intermediate JSON file.

2.  **Step 2: Transform and Load**
    - Reads records from the intermediate JSON file.
    - Transforms the data into a final `OutputRecord` format.
    - "Loads" the data by logging it to the console.

This showcases how to use different readers, writers, and processors across multiple, sequential steps within a single `Job`.

```rust
use anyhow::Result;
use serde::{{Deserialize, Serialize}};
use spring_batch_rs::{{
    core::{{
        item::ItemProcessor,
        job::{{Job, JobBuilder}},
        step::StepBuilder,
    }},
    item::{{
        csv::csv_reader::CsvItemReaderBuilder,
        json::json_reader::JsonItemReaderBuilder,
        json::json_writer::JsonItemWriterBuilder,
        logger::LoggerItemWriter,
    }},
    BatchError,
}};
use std::env::temp_dir;
use std::fs::File;
use std::io::Write;

// --- Data Structures ---
// Input record from CSV.
#[derive(Deserialize, Clone, Debug)]
struct InputRecord {
    id: u32,
    name: String,
    category: String,
    value: f64,
}

// Intermediate record, stored in JSON. `Serialize` is needed for the writer.
#[derive(Deserialize, Serialize, Clone, Debug)]
struct IntermediateRecord {
    id: u32,
    name: String,
    value: f64,
}

// Final record, after transformation.
#[derive(Serialize, Clone, Debug)]
struct OutputRecord {
    record_id: String,
    description: String,
    processed_value: f64,
}

// --- Step 1: CSV to JSON with Filtering ---

// A custom processor that filters out records from the "Test" category.
struct FilterProcessor;
impl ItemProcessor<InputRecord, IntermediateRecord> for FilterProcessor {
    fn process(&self, item: &InputRecord) -> ItemProcessorResult<IntermediateRecord> {
        if item.category == "Test" {
            println!("Filtering out test record: {{}}", item.name);
            return Ok(None); // Returning None filters the item.
        }

        Ok(Some(IntermediateRecord {
            id: item.id,
            name: item.name.clone(),
            value: item.value,
        }))
    }
}

// --- Step 2: JSON to Console Log with Transformation ---

// A custom processor that transforms an IntermediateRecord into an OutputRecord.
struct TransformProcessor;
impl ItemProcessor<IntermediateRecord, OutputRecord> for TransformProcessor {
    fn process(&self, item: &IntermediateRecord) -> ItemProcessorResult<OutputRecord> {
        Ok(Some(OutputRecord {
            record_id: format!("REC-{{}}", item.id),
            description: format!("{{}} (Value: {{}})", item.name, item.value),
            processed_value: item.value * 1.1, // Apply a 10% markup.
        }))
    }
}

fn main() -> Result<()> {
    println!("--- Starting Multi-Step ETL Job Example ---");

    // --- Create dummy CSV file for input ---
    let csv_path = temp_dir().join("input_data.csv");
    let mut file = File::create(&csv_path)?;
    file.write_all(b"id,name,category,value\n")?;
    file.write_all(b"1,Product A,Live,100.0\n")?;
    file.write_all(b"2,Product B,Test,200.0\n")?;
    file.write_all(b"3,Product C,Live,300.0\n")?;
    println!("Created dummy input file: {{:?}}", csv_path);

    // Define path for the intermediate JSON file.
    let intermediate_json_path = temp_dir().join("intermediate_data.json");

    // --- 1. Configure and build Step 1 ---
    println!("\n--- Configuring Step 1: CSV to JSON ---");
    let csv_reader = CsvItemReaderBuilder::<InputRecord>::new()
        .has_headers(true)
        .from_path(&csv_path);

    let json_writer = JsonItemWriterBuilder::<IntermediateRecord>::new().from_path(&intermediate_json_path);

    let filter_processor = FilterProcessor;

    let step1 = StepBuilder::new("csv_to_json_filter_step")
        .chunk::<InputRecord, IntermediateRecord>(10)
        .reader(&csv_reader)
        .processor(&filter_processor)
        .writer(&json_writer)
        .build();

    // --- 2. Configure and build Step 2 ---
    println!("\n--- Configuring Step 2: JSON to Console ---");
    let json_reader = JsonItemReaderBuilder::<IntermediateRecord>::new()
        .from_path(&intermediate_json_path);

    let transform_processor = TransformProcessor;

    // This writer logs the final records to the console.
    let log_writer = LoggerItemWriter::new();

    let step2 = StepBuilder::new("json_to_log_transform_step")
        .chunk::<IntermediateRecord, OutputRecord>(10)
        .reader(&json_reader)
        .processor(&transform_processor)
        .writer(&log_writer)
        .build();

    // --- 3. Configure and Run the Job ---
    println!("\n--- Configuring and Running the Job ---");
    let job = JobBuilder::new()
        .start(&step1) // Start with step 1
        .next(&step2)  // Proceed to step 2 after step 1 succeeds
        .build();

    let job_execution = job.run();

    // --- 4. Verify results ---
    assert!(job_execution.is_success());
    let step1_execution = job.get_step_execution("csv_to_json_filter_step").unwrap();
    assert_eq!(step1_execution.read_count, 3);
    assert_eq!(step1_execution.write_count, 2); // 1 record was filtered.

    let step2_execution = job.get_step_execution("json_to_log_transform_step").unwrap();
    assert_eq!(step2_execution.read_count, 2);
    assert_eq!(step2_execution.write_count, 2);

    println!("\n--- Multi-Step ETL Job Finished Successfully! ---");
    println!("Intermediate file is at: {{:?}}", intermediate_json_path);

    Ok(())
}
```