spring-batch-rs 0.3.4

A toolkit for building enterprise-grade batch applications
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
---
title: Performance Tuning Guide
description: Optimize Spring Batch RS jobs for maximum throughput and efficiency
sidebar:
  order: 2
---

import { Card, CardGrid, Aside, Tabs, TabItem } from '@astrojs/starlight/components';

This guide covers performance optimization strategies for Spring Batch RS applications.

## Quick Wins

<CardGrid>
  <Card title="Chunk Size" icon="rocket">
    Increase chunk size to 500-1000 for large datasets
  </Card>
  <Card title="Connection Pooling" icon="star">
    Configure appropriate pool sizes for database operations
  </Card>
  <Card title="Buffering" icon="list">
    Use buffered I/O for file operations
  </Card>
  <Card title="Batch Operations" icon="setting">
    Leverage bulk inserts and batch APIs
  </Card>
</CardGrid>

---

## Chunk Size Optimization

### Understanding Chunk Size

Chunk size determines how many items are processed before writing to the destination.

**Small Chunks (10-50):**
- Lower memory usage
- More frequent I/O operations
- Better for debugging
- More transaction overhead

**Large Chunks (500-1000):**
- Higher memory usage
- Fewer I/O operations
- Better throughput
- Fewer transaction commits

### Recommended Chunk Sizes by Use Case

| Use Case | Recommended Size | Reason |
|----------|-----------------|--------|
| File to File | 500-1000 | Minimize I/O overhead |
| File to Database | 500-1000 | Batch inserts are efficient |
| Database to File | 100-500 | Balance query overhead and memory |
| Database to Database | 500-1000 | Bulk operations on both sides |
| Complex Processing | 100-200 | Manage memory for transformations |
| Testing/Development | 10-50 | Easier debugging |

### Example Configuration

```rust
// Small chunks for development
let step = StepBuilder::new("dev-process")
    .chunk::<Input, Output>(10)  // Small for debugging
    .reader(&reader)
    .writer(&writer)
    .build();

// Large chunks for production
let step = StepBuilder::new("prod-process")
    .chunk::<Input, Output>(1000)  // Large for throughput
    .reader(&reader)
    .writer(&writer)
    .build();
```

---

## Database Performance

### Connection Pool Configuration

<Tabs>
  <TabItem label="PostgreSQL">
    ```rust
    use sqlx::postgres::PgPoolOptions;

    let pool = PgPoolOptions::new()
        .max_connections(10)          // Pool size
        .min_connections(2)           // Minimum idle connections
        .acquire_timeout(Duration::from_secs(30))
        .idle_timeout(Duration::from_secs(600))
        .max_lifetime(Duration::from_secs(1800))
        .connect("postgres://...").await?;
    ```
  </TabItem>
  <TabItem label="MySQL">
    ```rust
    use sqlx::mysql::MySqlPoolOptions;

    let pool = MySqlPoolOptions::new()
        .max_connections(10)
        .min_connections(2)
        .connect("mysql://...").await?;
    ```
  </TabItem>
  <TabItem label="SQLite">
    ```rust
    use sqlx::sqlite::SqlitePoolOptions;

    let pool = SqlitePoolOptions::new()
        .max_connections(5)  // SQLite has lower concurrency
        .connect("sqlite://...").await?;
    ```
  </TabItem>
</Tabs>

**Pool Size Guidelines:**
- Single reader + single writer: 5-10 connections
- Multiple concurrent jobs: 10-20 connections
- High concurrency: 20-50 connections
- Don't exceed database max_connections limit

### Database Reader Optimization

```rust
// Use appropriate page size
let reader = RdbcItemReaderBuilder::<Record>::new()
    .postgres(pool)
    .query("SELECT * FROM large_table ORDER BY id")  // ORDER BY for consistency
    .with_page_size(500)  // Match or exceed chunk size
    .build_postgres();

// For very large datasets, add WHERE clause
let reader = RdbcItemReaderBuilder::<Record>::new()
    .postgres(pool)
    .query("SELECT * FROM large_table WHERE created_at > '2024-01-01' ORDER BY id")
    .with_page_size(1000)
    .build_postgres();
```

### Database Writer Optimization

```rust
// Use bulk inserts with appropriate chunk size
let writer = PostgresItemWriterBuilder::new()
    .pool(pool)
    .table("target_table")
    .binder(|query, records: &Record| {
        // Batch bind all records at once
        query.push_values(records, |mut b, record| {
            b.push_bind(&record.field1)
             .push_bind(&record.field2)
             .push_bind(&record.field3);
        });
    })
    .build();

let step = StepBuilder::new("bulk-insert")
    .chunk::<Record, Record>(1000)  // Large chunks for bulk inserts
    .reader(&reader)
    .writer(&writer)
    .build();
```

### Index Optimization

<Aside type="caution">
**Critical for Performance:**

1. **Add indexes** on columns in WHERE clauses
2. **Add indexes** on ORDER BY columns (for pagination)
3. **Consider composite indexes** for multi-column queries
4. **Monitor index usage** with EXPLAIN ANALYZE
5. **Drop indexes before bulk inserts**, rebuild after
</Aside>

```sql
-- Good: Index on filter and order columns
CREATE INDEX idx_records_created_id ON records(created_at, id);

-- Query uses index effectively
SELECT * FROM records WHERE created_at > '2024-01-01' ORDER BY id;
```

---

## File I/O Performance

### Buffered Reading

```rust
use std::io::BufReader;
use std::fs::File;

// Inefficient: No buffering
let file = File::open("large.csv")?;
let reader = CsvItemReaderBuilder::new()
    .from_reader(file);  // Direct file access

// Efficient: Buffered I/O
let file = File::open("large.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file);  // 64KB buffer
let reader = CsvItemReaderBuilder::new()
    .from_reader(buffered);
```

### Buffered Writing

```rust
use std::io::BufWriter;

// Efficient: Buffered writing
let file = File::create("output.csv")?;
let buffered = BufWriter::with_capacity(64 * 1024, file);
let writer = CsvItemWriterBuilder::new()
    .from_writer(buffered);
```

### Memory-Mapped Files

For very large files, consider memory-mapped I/O:

```rust
use memmap2::Mmap;
use std::fs::File;

let file = File::open("huge.csv")?;
let mmap = unsafe { Mmap::map(&file)? };
let reader = CsvItemReaderBuilder::new()
    .from_reader(&mmap[..]);
```

---

## Memory Management

### Monitoring Memory Usage

```rust
use sysinfo::{System, SystemExt};

fn log_memory_usage() {
    let mut system = System::new_all();
    system.refresh_all();

    let used = system.used_memory();
    let total = system.total_memory();

    println!("Memory: {:.2} GB / {:.2} GB ({:.1}%)",
        used as f64 / 1_000_000_000.0,
        total as f64 / 1_000_000_000.0,
        (used as f64 / total as f64) * 100.0
    );
}
```

### Memory-Efficient Patterns

**1. Clone Only When Necessary:**
```rust
// Inefficient: Unnecessary clone
impl ItemProcessor<Record, Record> for MyProcessor {
    fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
        let mut cloned = item.clone();  // Expensive
        cloned.field = transform(cloned.field);
        Ok(cloned)
    }
}

// Efficient: Transform in-place when possible
impl ItemProcessor<Record, Record> for MyProcessor {
    fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
        Ok(Record {
            field: transform(&item.field),
            ..item.clone()  // Only clone necessary parts
        })
    }
}
```

**2. Use References for Large Strings:**
```rust
use std::borrow::Cow;

struct Processor;

impl ItemProcessor<String, String> for Processor {
    fn process(&self, item: &String) -> ItemProcessorResult<String> {
        // Use Cow to avoid unnecessary cloning
        let result: Cow<str> = if item.contains("special") {
            Cow::Owned(item.to_uppercase())
        } else {
            Cow::Borrowed(item)
        };

        Ok(result.into_owned())
    }
}
```

**3. Stream Processing:**
```rust
// Process items one at a time without buffering
let step = StepBuilder::new("stream-process")
    .chunk::<Input, Output>(1)  // Process immediately
    .reader(&reader)
    .processor(&processor)
    .writer(&writer)
    .build();
```

---

## Parallel Processing Considerations

### Single-Threaded Model

Spring Batch RS uses a single-threaded model per step execution. For parallel processing:

**Option 1: Multiple Job Instances**
```rust
use std::thread;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let handles: Vec<_> = (0..4).map(|i| {
        thread::spawn(move || {
            process_partition(i)
        })
    }).collect();

    for handle in handles {
        handle.join().unwrap()?;
    }

    Ok(())
}

fn process_partition(partition_id: usize) -> Result<(), Box<dyn std::error::Error>> {
    // Each thread processes a partition
    let reader = CsvItemReaderBuilder::new()
        .from_path(format!("data_part_{}.csv", partition_id))?;

    // ... process partition
    Ok(())
}
```

**Option 2: Async Steps with Tokio**
```rust
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let tasks = (0..4).map(|i| {
        task::spawn(async move {
            process_partition(i).await
        })
    });

    futures::future::try_join_all(tasks).await?;

    Ok(())
}
```

---

## Monitoring and Profiling

### Basic Metrics

```rust
use std::time::Instant;

let start = Instant::now();

let mut execution = StepExecution::new("process");
step.execute(&mut execution)?;

let duration = start.elapsed();
let items = execution.read_count();
let throughput = items as f64 / duration.as_secs_f64();

println!("Processed {} items in {:?}", items, duration);
println!("Throughput: {:.2} items/sec", throughput);
println!("Write count: {}", execution.write_count());
println!("Skip count: {}", execution.skip_count());
```

### Profiling with Criterion

```toml
[dev-dependencies]
criterion = "0.5"
```

```rust
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn benchmark_processor(c: &mut Criterion) {
    let processor = MyProcessor::new();

    c.bench_function("process item", |b| {
        b.iter(|| {
            let item = black_box(create_test_item());
            processor.process(&item)
        })
    });
}

criterion_group!(benches, benchmark_processor);
criterion_main!(benches);
```

### Flamegraph Profiling

```bash
# Install cargo-flamegraph
cargo install flamegraph

# Run with profiling
cargo flamegraph --bin my-batch-job

# Open flamegraph.svg in browser
```

---

## Real-World Optimization Example

### Before Optimization

```rust
// Slow: Small chunks, no buffering, unnecessary clones
let reader = CsvItemReaderBuilder::new()
    .from_path("data.csv")?;  // No buffering

let processor = SlowProcessor;  // Clones everything

let writer = PostgresItemWriterBuilder::new()
    .pool(small_pool)  // 2 connections only
    .table("target")
    .binder(|query, record| { /* ... */ })
    .build();

let step = StepBuilder::new("slow")
    .chunk::<Record, Record>(10)  // Tiny chunks
    .reader(&reader)
    .processor(&processor)
    .writer(&writer)
    .build();

// Result: 100 items/sec
```

### After Optimization

```rust
use std::io::BufReader;

// Fast: Large chunks, buffered I/O, optimized pool
let file = File::open("data.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file);
let reader = CsvItemReaderBuilder::new()
    .from_reader(buffered);

let processor = FastProcessor;  // Minimal cloning

let pool = PgPoolOptions::new()
    .max_connections(20)  // Larger pool
    .connect("...").await?;

let writer = PostgresItemWriterBuilder::new()
    .pool(pool)
    .table("target")
    .binder(|query, records| {
        // Bulk insert all records
        query.push_values(records, |mut b, r| {
            b.push_bind(&r.field1)
             .push_bind(&r.field2);
        });
    })
    .build();

let step = StepBuilder::new("fast")
    .chunk::<Record, Record>(1000)  // Large chunks
    .reader(&reader)
    .processor(&processor)
    .writer(&writer)
    .build();

// Result: 10,000 items/sec (100x improvement!)
```

---

## Performance Checklist

<Aside type="tip">
**Before Production:**

- [ ] Chunk size >= 500 for large datasets
- [ ] Database queries have appropriate indexes
- [ ] Connection pool size matches workload
- [ ] File I/O uses buffering
- [ ] Processor minimizes cloning
- [ ] Writer uses bulk operations
- [ ] Memory usage monitored
- [ ] Benchmarks run on production-like data
- [ ] Profiling identifies bottlenecks
- [ ] Error handling doesn't kill performance (use skip_limit)
</Aside>

## Troubleshooting Slow Jobs

### Symptom: Low Throughput

**Check:**
1. Chunk size too small
2. Missing database indexes
3. Network latency (database/FTP)
4. Expensive processor operations
5. Small connection pool

### Symptom: High Memory Usage

**Check:**
1. Chunk size too large
2. Processor accumulating state
3. Memory leaks in custom implementations
4. Large strings not using Cow
5. Unbounded collections

### Symptom: Slow Database Writes

**Check:**
1. Not using bulk inserts
2. Individual INSERT statements
3. Transaction overhead (too frequent commits)
4. Missing indexes on target table
5. Constraints/triggers slowing inserts

## See Also

- [Feature Flags](/spring-batch-rs/reference/features/) - Enable only what you need
- [Error Handling](/spring-batch-rs/error-handling/) - Fault tolerance patterns
- [Examples](/spring-batch-rs/examples/) - Optimized example code