---
title: Database Examples
description: Complete examples for PostgreSQL, MySQL, and SQLite with Spring Batch RS
sidebar:
order: 4
---
import { Tabs, TabItem, Card, CardGrid, Aside } from '@astrojs/starlight/components';
<Aside type="tip">
View the complete source: [examples/database_processing.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/database_processing.rs)
</Aside>
This page provides comprehensive examples for working with relational databases (PostgreSQL, MySQL, SQLite) using Spring Batch RS.
## Setup
Add database features to your `Cargo.toml`:
<Tabs>
<TabItem label="PostgreSQL">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["rdbc-postgres"] }
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
</TabItem>
<TabItem label="MySQL">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["rdbc-mysql"] }
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "mysql"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
</TabItem>
<TabItem label="SQLite">
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["rdbc-sqlite"] }
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "sqlite"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
</TabItem>
</Tabs>
---
## PostgreSQL Examples
### Basic Reading from PostgreSQL
```rust
use spring_batch_rs::{
core::{step::{StepBuilder, StepExecution}, item::PassThroughProcessor},
item::{
rdbc::RdbcItemReaderBuilder,
logger::LoggerWriter,
},
};
use sqlx::{PgPool, FromRow};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize, FromRow)]
struct Person {
id: i64,
first_name: String,
last_name: String,
email: String,
birth_date: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to database
let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Create reader
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_postgres();
// Create writer and processor
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
// Build and execute step
let step = StepBuilder::new("read-postgres")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-postgres");
step.execute(&mut execution)?;
Ok(())
}
```
<Aside type="tip">
The reader automatically handles pagination using LIMIT/OFFSET. Set `with_page_size()` based on your memory constraints.
</Aside>
### Writing to PostgreSQL
```rust
use spring_batch_rs::item::rdbc::postgres::PostgresItemWriterBuilder;
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Read from CSV
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_path("persons.csv")?;
// Write to PostgreSQL
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-postgres")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("csv-to-postgres");
step.execute(&mut execution)?;
Ok(())
}
```
### Filtered Reading with WHERE Clause
```rust
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query(
"SELECT id, first_name, last_name, email, birth_date
FROM persons
WHERE active = true AND created_at > '2024-01-01'"
)
.with_page_size(100)
.build_postgres();
```
---
## MySQL Examples
### Basic Reading from MySQL
```rust
use sqlx::MySqlPool;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.mysql(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_mysql();
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-mysql")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-mysql");
step.execute(&mut execution)?;
Ok(())
}
```
### Writing to MySQL
```rust
use spring_batch_rs::item::rdbc::mysql::MysqlItemWriterBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_path("persons.csv")?;
let writer = MysqlItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-mysql")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
---
## SQLite Examples
### Basic Reading from SQLite
```rust
use sqlx::SqlitePool;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.sqlite(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_sqlite();
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-sqlite")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
### Writing to SQLite
```rust
use spring_batch_rs::item::rdbc::sqlite::SqliteItemWriterBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = JsonItemReaderBuilder::<Person>::new()
.from_path("persons.json")?;
let writer = SqliteItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("json-to-sqlite")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
---
## Database to Database ETL
### PostgreSQL to MySQL Migration
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Source: PostgreSQL
let pg_pool = PgPool::connect("postgres://user:pass@localhost:5432/source_db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pg_pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(100)
.build_postgres();
// Target: MySQL
let mysql_pool = MySqlPool::connect("mysql://user:pass@localhost:3306/target_db").await?;
let writer = MysqlItemWriterBuilder::new()
.pool(mysql_pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("pg-to-mysql")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("pg-to-mysql");
step.execute(&mut execution)?;
println!("Migration completed!");
Ok(())
}
```
---
## Data Transformation
### Transform and Validate
```rust
use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
use spring_batch_rs::error::BatchError;
#[derive(Debug, Clone, FromRow)]
struct RawPerson {
id: i64,
name: String, // Full name
email: String,
}
#[derive(Debug, Clone, Serialize)]
struct ProcessedPerson {
id: i64,
first_name: String,
last_name: String,
email: String,
email_domain: String,
}
struct PersonProcessor;
impl ItemProcessor<RawPerson, ProcessedPerson> for PersonProcessor {
fn process(&self, item: &RawPerson) -> ItemProcessorResult<ProcessedPerson> {
// Split name
let parts: Vec<&str> = item.name.split_whitespace().collect();
if parts.len() < 2 {
return Err(BatchError::ItemProcessor(
format!("Invalid name format: {}", item.name)
));
}
let first_name = parts[0].to_string();
let last_name = parts[1..].join(" ");
// Extract domain
let email_domain = item.email
.split('@')
.nth(1)
.ok_or_else(|| BatchError::ItemProcessor(
format!("Invalid email: {}", item.email)
))?
.to_string();
Ok(ProcessedPerson {
id: item.id,
first_name,
last_name,
email: item.email.clone(),
email_domain,
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<RawPerson>::new()
.postgres(pool.clone())
.query("SELECT id, name, email FROM raw_persons")
.with_page_size(100)
.build_postgres();
let processor = PersonProcessor;
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("processed_persons")
.binder(|query, person: &ProcessedPerson| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.id)
.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.email_domain);
});
})
.build();
let step = StepBuilder::new("transform-persons")
.chunk::<RawPerson, ProcessedPerson>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10) // Skip up to 10 invalid records
.build();
Ok(())
}
```
---
## Complex Queries
### Joining Tables
```rust
#[derive(Debug, Clone, FromRow, Serialize)]
struct OrderWithCustomer {
order_id: i64,
order_date: String,
customer_name: String,
customer_email: String,
total_amount: f64,
}
let reader = RdbcItemReaderBuilder::<OrderWithCustomer>::new()
.postgres(pool)
.query(
"SELECT
o.id as order_id,
o.order_date,
c.name as customer_name,
c.email as customer_email,
o.total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC"
)
.with_page_size(100)
.build_postgres();
```
### Aggregation Queries
```rust
#[derive(Debug, Clone, FromRow, Serialize)]
struct SalesSummary {
product_id: i64,
product_name: String,
total_quantity: i64,
total_revenue: f64,
order_count: i64,
}
let reader = RdbcItemReaderBuilder::<SalesSummary>::new()
.postgres(pool)
.query(
"SELECT
p.id as product_id,
p.name as product_name,
SUM(oi.quantity) as total_quantity,
SUM(oi.quantity * oi.unit_price) as total_revenue,
COUNT(DISTINCT oi.order_id) as order_count
FROM products p
JOIN order_items oi ON p.id = oi.product_id
GROUP BY p.id, p.name
HAVING SUM(oi.quantity * oi.unit_price) > 1000
ORDER BY total_revenue DESC"
)
.with_page_size(50)
.build_postgres();
```
---
## Database Export to Files
### Export to JSON
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query("SELECT * FROM persons WHERE active = true")
.with_page_size(100)
.build_postgres();
let writer = JsonItemWriterBuilder::<Person>::new()
.pretty_formatter(true)
.from_path("persons_export.json")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-json")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
### Export to CSV
```rust
let writer = CsvItemWriterBuilder::new()
.has_headers(true)
.from_path("persons_export.csv")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-csv")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
```
---
## Real-World Example: Data Warehouse ETL
```rust
use chrono::NaiveDate;
#[derive(Debug, Clone, FromRow)]
struct TransactionSource {
transaction_id: i64,
customer_id: i64,
product_id: i64,
quantity: i32,
unit_price: f64,
transaction_date: String,
}
#[derive(Debug, Clone, Serialize)]
struct TransactionFact {
transaction_key: i64,
customer_key: i64,
product_key: i64,
date_key: i32,
quantity: i32,
unit_price: f64,
total_amount: f64,
gross_profit: f64,
}
struct FactProcessor {
product_costs: HashMap<i64, f64>,
}
impl ItemProcessor<TransactionSource, TransactionFact> for FactProcessor {
fn process(&self, item: &TransactionSource) -> ItemProcessorResult<TransactionFact> {
let total_amount = item.quantity as f64 * item.unit_price;
let cost = self.product_costs
.get(&item.product_id)
.copied()
.unwrap_or(0.0);
let gross_profit = total_amount - (cost * item.quantity as f64);
// Convert date to date key (YYYYMMDD format)
let date = NaiveDate::parse_from_str(&item.transaction_date, "%Y-%m-%d")
.map_err(|e| BatchError::ItemProcessor(e.to_string()))?;
let date_key = date.format("%Y%m%d")
.to_string()
.parse::<i32>()
.unwrap();
Ok(TransactionFact {
transaction_key: item.transaction_id,
customer_key: item.customer_id,
product_key: item.product_id,
date_key,
quantity: item.quantity,
unit_price: item.unit_price,
total_amount,
gross_profit,
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load product costs
let mut product_costs = HashMap::new();
product_costs.insert(1, 50.0);
product_costs.insert(2, 25.0);
let processor = FactProcessor { product_costs };
// Source: OLTP database
let source_pool = PgPool::connect("postgres://user:pass@localhost/oltp_db").await?;
let reader = RdbcItemReaderBuilder::<TransactionSource>::new()
.postgres(source_pool)
.query(
"SELECT transaction_id, customer_id, product_id, quantity,
unit_price, transaction_date
FROM transactions
WHERE transaction_date >= CURRENT_DATE - INTERVAL '1 day'"
)
.with_page_size(1000)
.build_postgres();
// Target: Data warehouse
let dw_pool = PgPool::connect("postgres://user:pass@localhost/dw_db").await?;
let writer = PostgresItemWriterBuilder::new()
.pool(dw_pool)
.table("fact_transactions")
.binder(|query, fact: &TransactionFact| {
query.push_values([fact], |mut b, f| {
b.push_bind(&f.transaction_key)
.push_bind(&f.customer_key)
.push_bind(&f.product_key)
.push_bind(&f.date_key)
.push_bind(&f.quantity)
.push_bind(&f.unit_price)
.push_bind(&f.total_amount)
.push_bind(&f.gross_profit);
});
})
.build();
let step = StepBuilder::new("load-facts")
.chunk::<TransactionSource, TransactionFact>(1000)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
---
## Performance Tips
<CardGrid>
<Card title="Page Size" icon="rocket">
Use larger page sizes (500-1000) for better database performance
</Card>
<Card title="Chunk Size" icon="list">
Match chunk size to page size for optimal memory usage
</Card>
<Card title="Connection Pooling" icon="star">
Use appropriate pool size based on concurrent operations
</Card>
<Card title="Indexes" icon="setting">
Ensure source tables have indexes on query columns
</Card>
</CardGrid>
## Best Practices
<Aside type="tip">
**Optimization Tips:**
1. **Use ORDER BY**: Add ORDER BY with indexed columns for consistent pagination
2. **Batch Writes**: Use larger chunk sizes for bulk inserts (500-1000)
3. **Connection Pools**: Configure pool size based on workload (typically 5-20)
4. **Error Handling**: Use `skip_limit()` to handle data quality issues
5. **Transactions**: Writers automatically use transactions per chunk
</Aside>
## Next Steps
- [MongoDB Examples](/spring-batch-rs/examples/mongodb/) - NoSQL database examples
- [API Reference](/spring-batch-rs/api/item-reader/) - Complete API documentation
- [Performance Guide](/spring-batch-rs/reference/performance/) - Tuning tips