---
title: Tasklet Examples
description: Complete examples for ZIP compression, FTP transfers, and S3 file transfers with Spring Batch RS
sidebar:
order: 6
---
import { Tabs, TabItem, Card, CardGrid, Aside } from '@astrojs/starlight/components';
<Aside type="tip">
View the complete sources: [examples/tasklet_zip.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/tasklet_zip.rs) · [examples/tasklet_ftp.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/tasklet_ftp.rs) · [examples/tasklet_s3.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/tasklet_s3.rs)
</Aside>
This page provides comprehensive examples for using tasklets in Spring Batch RS for file operations and single-task processing.
## Setup
Add tasklet features to your `Cargo.toml`:
<Tabs>
<TabItem label="ZIP Tasklet">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["zip"] }
```
</TabItem>
<TabItem label="FTP Tasklet">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["ftp"] }
```
</TabItem>
<TabItem label="S3 Tasklet">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["s3"] }
```
</TabItem>
<TabItem label="All Tasklets">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["zip", "ftp", "s3"] }
```
</TabItem>
</Tabs>
---
## ZIP Compression Examples
### Basic Directory Compression
```rust
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::zip::ZipTaskletBuilder,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/data/exports")
.target_path("/backups/exports.zip")
.build()?;
let step = StepBuilder::new("compress-exports")
.tasklet(&zip_tasklet)
.build();
let job = JobBuilder::new()
.start(&step)
.build();
job.run()?;
println!("Compression completed!");
Ok(())
}
```
### Compress with Filtering
```rust
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/logs")
.target_path("/archives/logs.zip")
.include_pattern("*.log") // Only .log files
.exclude_pattern("*.tmp") // Exclude .tmp files
.compression_level(9) // Maximum compression
.build()?;
```
### Flatten Directory Structure
```rust
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/data/nested")
.target_path("/flat-archive.zip")
.preserve_structure(false) // All files in ZIP root
.compression_level(3) // Fast compression
.build()?;
```
### Single File Compression
```rust
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/reports/monthly_report.pdf")
.target_path("/compressed/monthly_report.zip")
.compression_level(9)
.build()?;
```
---
## FTP Transfer Examples
### Upload File to FTP
```rust
use spring_batch_rs::tasklet::ftp::{FtpTaskletBuilder, FtpOperation};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.port(21)
.username("user")
.password("password")
.local_path("/data/export.csv")
.remote_path("/uploads/export.csv")
.operation(FtpOperation::Upload)
.build()?;
let step = StepBuilder::new("upload-to-ftp")
.tasklet(&ftp_tasklet)
.build();
let job = JobBuilder::new()
.start(&step)
.build();
job.run()?;
Ok(())
}
```
### Download File from FTP
```rust
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.remote_path("/exports/data.csv")
.local_path("/downloads/data.csv")
.operation(FtpOperation::Download)
.build()?;
```
### Secure FTP (FTPS)
```rust
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftps.example.com")
.port(990) // FTPS implicit port
.use_tls(true) // Enable TLS encryption
.username("user")
.password("password")
.local_path("/secure/data.xml")
.remote_path("/secure/data.xml")
.operation(FtpOperation::Upload)
.build()?;
```
---
## Multi-Step Job with Tasklets
### Complete ETL Pipeline
```rust
use spring_batch_rs::item::{
csv::CsvItemReaderBuilder,
json::JsonItemWriterBuilder,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Download file from FTP
let download_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.remote_path("/data/input.csv")
.local_path("/temp/input.csv")
.operation(FtpOperation::Download)
.build()?;
let download_step = StepBuilder::new("download")
.tasklet(&download_tasklet)
.build();
// Step 2: Process the file
let reader = CsvItemReaderBuilder::new()
.from_path("/temp/input.csv")?;
let writer = JsonItemWriterBuilder::<Record>::new()
.pretty_formatter(true)
.from_path("/temp/output.json")?;
let process_step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.writer(&writer)
.build();
// Step 3: Compress output
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/temp/output.json")
.target_path("/output/data.zip")
.compression_level(9)
.build()?;
let compress_step = StepBuilder::new("compress")
.tasklet(&zip_tasklet)
.build();
// Step 4: Upload result
let upload_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.local_path("/output/data.zip")
.remote_path("/results/data.zip")
.operation(FtpOperation::Upload)
.build()?;
let upload_step = StepBuilder::new("upload")
.tasklet(&upload_tasklet)
.build();
// Create job
let job = JobBuilder::new()
.start(&download_step)
.next(&process_step)
.next(&compress_step)
.next(&upload_step)
.build();
job.run()?;
println!("Pipeline completed successfully!");
Ok(())
}
```
---
## S3 Transfer Examples
### Upload File to S3
```rust
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::s3::put::S3PutTaskletBuilder,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/data.csv")
.local_file("./output/data.csv")
.region("eu-west-1")
.build()?;
let step = StepBuilder::new("s3-upload")
.tasklet(&put_tasklet)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
println!("Upload completed!");
Ok(())
}
```
### Download File from S3
```rust
use spring_batch_rs::tasklet::s3::get::S3GetTaskletBuilder;
let get_tasklet = S3GetTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("imports/data.csv")
.local_file("./input/data.csv")
.region("eu-west-1")
.build()?;
```
### S3-Compatible Services (MinIO / LocalStack)
```rust
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-bucket")
.key("file.csv")
.local_file("./output/file.csv")
.endpoint_url("http://localhost:9000") // MinIO
.access_key_id("minioadmin")
.secret_access_key("minioadmin")
.build()?;
```
<Aside type="tip">
Use `endpoint_url` to connect to LocalStack (`http://localhost:4566`) or MinIO (`http://localhost:9000`) during development. Remove it in production to use the real AWS endpoint.
</Aside>
### Upload Then Download Pipeline
```rust
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::s3::{get::S3GetTaskletBuilder, put::S3PutTaskletBuilder},
};
use std::env::temp_dir;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let upload_path = temp_dir().join("sample.csv");
let download_path = temp_dir().join("downloaded.csv");
// Step 1: Upload
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/sample.csv")
.local_file(&upload_path)
.endpoint_url("http://localhost:4566") // LocalStack
.access_key_id("test")
.secret_access_key("test")
.region("us-east-1")
.build()?;
let upload_step = StepBuilder::new("s3-upload").tasklet(&put_tasklet).build();
// Step 2: Download
let get_tasklet = S3GetTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/sample.csv")
.local_file(&download_path)
.endpoint_url("http://localhost:4566")
.access_key_id("test")
.secret_access_key("test")
.region("us-east-1")
.build()?;
let download_step = StepBuilder::new("s3-download").tasklet(&get_tasklet).build();
// Run job
let job = JobBuilder::new()
.start(&upload_step)
.next(&download_step)
.build();
let result = job.run()?;
println!("Job status: completed, duration: {:?}", result.duration);
Ok(())
}
```
Run this example with:
```bash
# Start LocalStack first
docker run --rm -p 4566:4566 localstack/localstack
aws --endpoint-url=http://localhost:4566 s3 mb s3://my-batch-bucket
# Run example
cargo run --example tasklet_s3 --features s3
```
---
## Custom Tasklet Examples
### Database Cleanup Tasklet
```rust
use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};
use spring_batch_rs::error::BatchError;
use sqlx::PgPool;
struct DatabaseCleanupTasklet {
pool: PgPool,
days_to_keep: i32,
}
impl Tasklet for DatabaseCleanupTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let runtime = tokio::runtime::Runtime::new()
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
runtime.block_on(async {
let query = format!(
"DELETE FROM logs WHERE created_at < NOW() - INTERVAL '{} days'",
self.days_to_keep
);
sqlx::query(&query)
.execute(&self.pool)
.await
.map_err(|e| BatchError::Tasklet(format!("Cleanup failed: {}", e)))?;
Ok(RepeatStatus::Finished)
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let cleanup_tasklet = DatabaseCleanupTasklet {
pool,
days_to_keep: 30,
};
let step = StepBuilder::new("cleanup")
.tasklet(&cleanup_tasklet)
.build();
Ok(())
}
```
### File Deletion Tasklet
```rust
use std::fs;
use std::path::Path;
struct FileCleanupTasklet {
directory: String,
pattern: String,
}
impl Tasklet for FileCleanupTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let entries = fs::read_dir(&self.directory)
.map_err(|e| BatchError::Tasklet(format!("Cannot read directory: {}", e)))?;
let mut deleted = 0;
for entry in entries {
let entry = entry.map_err(|e| BatchError::Tasklet(e.to_string()))?;
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.contains(&self.pattern) {
fs::remove_file(&path)
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
deleted += 1;
}
}
}
println!("Deleted {} files matching '{}'", deleted, self.pattern);
Ok(RepeatStatus::Finished)
}
}
```
### HTTP Notification Tasklet
```rust
struct WebhookTasklet {
url: String,
message: String,
}
impl Tasklet for WebhookTasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let client = reqwest::blocking::Client::new();
let payload = serde_json::json!({
"step": step_execution.step_name(),
"message": self.message,
"timestamp": chrono::Utc::now().to_rfc3339()
});
let response = client
.post(&self.url)
.json(&payload)
.send()
.map_err(|e| BatchError::Tasklet(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
return Err(BatchError::Tasklet(
format!("API returned error: {}", response.status())
));
}
Ok(RepeatStatus::Finished)
}
}
```
---
## Real-World Example: Daily Report Pipeline
```rust
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Cleanup old reports
let cleanup_tasklet = FileCleanupTasklet {
directory: "/reports/daily".to_string(),
pattern: ".old".to_string(),
};
let cleanup_step = StepBuilder::new("cleanup-old-reports")
.tasklet(&cleanup_tasklet)
.build();
// Step 2: Generate report (chunk-oriented step)
let reader = /* database reader */;
let writer = CsvItemWriterBuilder::new()
.from_path("/reports/daily/report.csv")?;
let generate_step = StepBuilder::new("generate-report")
.chunk(1000)
.reader(&reader)
.writer(&writer)
.build();
// Step 3: Compress report
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/reports/daily/report.csv")
.target_path("/reports/daily/report.zip")
.compression_level(9)
.build()?;
let compress_step = StepBuilder::new("compress-report")
.tasklet(&zip_tasklet)
.build();
// Step 4: Upload to FTP
let upload_tasklet = FtpTaskletBuilder::new()
.host("ftp.reports.com")
.username("reports_user")
.password("secret")
.local_path("/reports/daily/report.zip")
.remote_path("/daily_reports/report.zip")
.operation(FtpOperation::Upload)
.build()?;
let upload_step = StepBuilder::new("upload-report")
.tasklet(&upload_tasklet)
.build();
// Step 5: Send notification
let notify_tasklet = WebhookTasklet {
url: "https://api.company.com/notifications".to_string(),
message: "Daily report generated and uploaded".to_string(),
};
let notify_step = StepBuilder::new("notify")
.tasklet(¬ify_tasklet)
.build();
// Create and run job
let job = JobBuilder::new()
.start(&cleanup_step)
.next(&generate_step)
.next(&compress_step)
.next(&upload_step)
.next(¬ify_step)
.build();
job.run()?;
Ok(())
}
```
---
## Repeatable Tasklets
### Polling Tasklet
```rust
use std::sync::Mutex;
struct PollingTasklet {
url: String,
attempts: Mutex<u32>,
max_attempts: u32,
}
impl Tasklet for PollingTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let mut attempts = self.attempts.lock().unwrap();
*attempts += 1;
println!("Polling attempt {}/{}", *attempts, self.max_attempts);
// Check condition
let client = reqwest::blocking::Client::new();
let response = client.get(&self.url)
.send()
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
if response.status().is_success() {
println!("Condition met!");
return Ok(RepeatStatus::Finished);
}
if *attempts >= self.max_attempts {
return Err(BatchError::Tasklet(
"Max polling attempts reached".to_string()
));
}
// Wait before next attempt
std::thread::sleep(std::time::Duration::from_secs(10));
Ok(RepeatStatus::Continuable)
}
}
```
<Aside type="caution">
Always include a maximum attempt counter when using `RepeatStatus::Continuable` to avoid infinite loops.
</Aside>
---
## Performance Tips
<CardGrid>
<Card title="Compression Level" icon="rocket">
Use level 3-6 for balance between speed and size. Level 9 for archival.
</Card>
<Card title="FTP Connections" icon="star">
Reuse connections when possible. Use connection pooling for multiple transfers.
</Card>
<Card title="Error Handling" icon="warning">
Implement retry logic for network-related tasklets
</Card>
<Card title="Idempotency" icon="setting">
Design tasklets to be safely re-runnable
</Card>
</CardGrid>
## Best Practices
<Aside type="tip">
**Tasklet Tips:**
1. **Single Responsibility**: Each tasklet should do one thing well
2. **Idempotent Operations**: Design for safe re-execution after failures
3. **Resource Cleanup**: Always clean up resources in error paths
4. **Logging**: Add detailed logging for troubleshooting
5. **Validation**: Validate inputs before performing operations
</Aside>
## Next Steps
- [API Reference - Tasklet](/spring-batch-rs/api/tasklet/) - Complete tasklet API
- [Advanced Patterns](/spring-batch-rs/examples/advanced-patterns/) - Complex workflows
- [Error Handling](/spring-batch-rs/error-handling/) - Handling tasklet failures