use serde::{Deserialize, Serialize};
use spring_batch_rs::{
BatchError,
core::{
item::{ItemProcessor, PassThroughProcessor},
job::{Job, JobBuilder},
step::StepBuilder,
},
item::{
csv::csv_reader::CsvItemReaderBuilder,
csv::csv_writer::CsvItemWriterBuilder,
json::json_writer::JsonItemWriterBuilder,
logger::LoggerWriterBuilder,
rdbc::{DatabaseItemBinder, RdbcItemReaderBuilder, RdbcItemWriterBuilder},
},
};
use sqlx::{FromRow, Sqlite, SqlitePool, query_builder::Separated};
use std::env::temp_dir;
#[derive(Debug, Clone, Deserialize, Serialize, FromRow)]
struct User {
id: i32,
name: String,
email: String,
active: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, FromRow)]
struct Product {
id: i32,
name: String,
price: f64,
stock: i32,
}
struct UserBinder;
impl DatabaseItemBinder<User, Sqlite> for UserBinder {
fn bind(&self, item: &User, mut query_builder: Separated<Sqlite, &str>) {
query_builder.push_bind(item.id);
query_builder.push_bind(item.name.clone());
query_builder.push_bind(item.email.clone());
query_builder.push_bind(item.active);
}
}
struct ProductBinder;
impl DatabaseItemBinder<Product, Sqlite> for ProductBinder {
fn bind(&self, item: &Product, mut query_builder: Separated<Sqlite, &str>) {
query_builder.push_bind(item.id);
query_builder.push_bind(item.name.clone());
query_builder.push_bind(item.price);
query_builder.push_bind(item.stock);
}
}
struct ActivateUserProcessor;
impl ItemProcessor<User, User> for ActivateUserProcessor {
fn process(&self, item: &User) -> Result<Option<User>, BatchError> {
Ok(Some(User {
id: item.id,
name: item.name.clone(),
email: item.email.clone(),
active: true,
}))
}
}
async fn setup_database() -> Result<SqlitePool, sqlx::Error> {
let pool = SqlitePool::connect("sqlite::memory:").await?;
sqlx::query(
r#"
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
active BOOLEAN NOT NULL DEFAULT 0
)
"#,
)
.execute(&pool)
.await?;
sqlx::query(
r#"
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER NOT NULL
)
"#,
)
.execute(&pool)
.await?;
let users = [
(1, "Alice Johnson", "alice@example.com", true),
(2, "Bob Smith", "bob@example.com", false),
(3, "Charlie Brown", "charlie@example.com", true),
(4, "Diana Prince", "diana@example.com", false),
(5, "Eve Wilson", "eve@example.com", true),
];
for (id, name, email, active) in users {
sqlx::query("INSERT INTO users (id, name, email, active) VALUES (?, ?, ?, ?)")
.bind(id)
.bind(name)
.bind(email)
.bind(active)
.execute(&pool)
.await?;
}
Ok(pool)
}
fn example_read_from_database(pool: &SqlitePool) -> Result<(), BatchError> {
println!("=== Example 1: Read from Database ===");
let reader = RdbcItemReaderBuilder::<User>::new()
.sqlite(pool.clone())
.query("SELECT id, name, email, active FROM users")
.with_page_size(2)
.build_sqlite();
let writer = LoggerWriterBuilder::<User>::new().build();
let processor = PassThroughProcessor::<User>::new();
let step = StepBuilder::new("read-users")
.chunk::<User, User>(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run()?;
let step_exec = job.get_step_execution("read-users").unwrap();
println!(" Users read: {}", step_exec.read_count);
println!(" Duration: {:?}", result.duration);
Ok(())
}
fn example_database_to_json(pool: &SqlitePool) -> Result<(), BatchError> {
println!("\n=== Example 2: Database to JSON Export ===");
let reader = RdbcItemReaderBuilder::<User>::new()
.sqlite(pool.clone())
.query("SELECT id, name, email, active FROM users WHERE active = 1")
.build_sqlite();
let output_path = temp_dir().join("active_users.json");
let writer = JsonItemWriterBuilder::<User>::new()
.pretty_formatter(true)
.from_path(&output_path);
let processor = PassThroughProcessor::<User>::new();
let step = StepBuilder::new("export-to-json")
.chunk::<User, User>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
println!(" Exported active users to JSON");
println!(" Output: {}", output_path.display());
Ok(())
}
fn example_database_to_csv(pool: &SqlitePool) -> Result<(), BatchError> {
println!("\n=== Example 3: Database to CSV Export ===");
let reader = RdbcItemReaderBuilder::<User>::new()
.sqlite(pool.clone())
.query("SELECT id, name, email, active FROM users ORDER BY name")
.build_sqlite();
let output_path = temp_dir().join("users_export.csv");
let writer = CsvItemWriterBuilder::<User>::new()
.has_headers(true)
.from_path(&output_path);
let processor = PassThroughProcessor::<User>::new();
let step = StepBuilder::new("export-to-csv")
.chunk::<User, User>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
println!(" Exported all users to CSV");
println!(" Output: {}", output_path.display());
Ok(())
}
fn example_csv_to_database(pool: &SqlitePool) -> Result<(), BatchError> {
println!("\n=== Example 4: CSV Import to Database ===");
let csv_data = "\
id,name,price,stock
1,Laptop,999.99,50
2,Mouse,29.99,200
3,Keyboard,79.99,150
4,Monitor,299.99,75
5,Headphones,149.99,100";
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
let binder = ProductBinder;
let writer = RdbcItemWriterBuilder::<Product>::new()
.sqlite(pool)
.table("products")
.add_column("id")
.add_column("name")
.add_column("price")
.add_column("stock")
.sqlite_binder(&binder)
.build_sqlite();
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("import-products")
.chunk::<Product, Product>(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run()?;
let step_exec = job.get_step_execution("import-products").unwrap();
println!(" Products imported: {}", step_exec.write_count);
println!(" Duration: {:?}", result.duration);
Ok(())
}
fn example_transform_and_write(pool: &SqlitePool) -> Result<(), BatchError> {
println!("\n=== Example 5: Read, Transform, and Write ===");
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS activated_users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
active BOOLEAN NOT NULL
)
"#,
)
.execute(pool)
.await
})
})
.map_err(|e| BatchError::ItemWriter(e.to_string()))?;
let reader = RdbcItemReaderBuilder::<User>::new()
.sqlite(pool.clone())
.query("SELECT id, name, email, active FROM users WHERE active = 0")
.build_sqlite();
let binder = UserBinder;
let writer = RdbcItemWriterBuilder::<User>::new()
.sqlite(pool)
.table("activated_users")
.add_column("id")
.add_column("name")
.add_column("email")
.add_column("active")
.sqlite_binder(&binder)
.build_sqlite();
let processor = ActivateUserProcessor;
let step = StepBuilder::new("activate-users")
.chunk::<User, User>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run()?;
let step_exec = job.get_step_execution("activate-users").unwrap();
println!(" Inactive users activated: {}", step_exec.write_count);
println!(" Duration: {:?}", result.duration);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), BatchError> {
env_logger::init();
println!("Database Processing Examples (SQLite)");
println!("=====================================\n");
let pool = setup_database()
.await
.map_err(|e| BatchError::ItemReader(format!("Failed to setup database: {}", e)))?;
println!("Database initialized with sample data.\n");
example_read_from_database(&pool)?;
example_database_to_json(&pool)?;
example_database_to_csv(&pool)?;
example_csv_to_database(&pool)?;
example_transform_and_write(&pool)?;
println!("\n✓ All database examples completed successfully!");
Ok(())
}