spring_batch_rs/item/rdbc/unified_writer_builder.rs
1use sqlx::{MySql, Pool, Postgres, Sqlite};
2
3use super::DatabaseItemBinder;
4use super::database_type::DatabaseType;
5use super::mysql_writer::MySqlItemWriter;
6use super::postgres_writer::PostgresItemWriter;
7use super::sqlite_writer::SqliteItemWriter;
8
9/// Unified builder for creating RDBC item writers for any supported database type.
10///
11/// This builder provides a consistent API for constructing database writers
12/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
13/// Users specify the database type, connection pool, table, and columns,
14/// and the builder handles the creation of the appropriate writer implementation.
15///
16/// # Type Parameters
17///
18/// * `O` - The item type to write
19///
20/// # Examples
21///
22/// ## PostgreSQL
23/// ```no_run
24/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
25/// use sqlx::{PgPool, query_builder::Separated, Postgres};
26/// use serde::Serialize;
27///
28/// #[derive(Clone, Serialize)]
29/// struct User {
30/// id: i32,
31/// name: String,
32/// }
33///
34/// struct UserBinder;
35/// impl DatabaseItemBinder<User, Postgres> for UserBinder {
36/// fn bind(&self, item: &User, mut query_builder: Separated<Postgres, &str>) {
37/// let _ = (item, query_builder); // Placeholder to avoid unused warnings
38/// // In real usage: query_builder.push_bind(item.id);
39/// // In real usage: query_builder.push_bind(&item.name);
40/// }
41/// }
42///
43/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
44/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
45/// let binder = UserBinder;
46///
47/// let writer = RdbcItemWriterBuilder::<User>::new()
48/// .postgres(&pool)
49/// .table("users")
50/// .add_column("id")
51/// .add_column("name")
52/// .postgres_binder(&binder)
53/// .build_postgres();
54/// # Ok(())
55/// # }
56/// ```
57///
58/// ## MySQL
59/// ```no_run
60/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
61/// use sqlx::{MySqlPool, query_builder::Separated, MySql};
62/// use serde::Serialize;
63///
64/// #[derive(Clone, Serialize)]
65/// struct Product {
66/// id: i32,
67/// name: String,
68/// }
69///
70/// struct ProductBinder;
71/// impl DatabaseItemBinder<Product, MySql> for ProductBinder {
72/// fn bind(&self, item: &Product, mut query_builder: Separated<MySql, &str>) {
73/// let _ = (item, query_builder); // Placeholder to avoid unused warnings
74/// // In real usage: query_builder.push_bind(item.id);
75/// // In real usage: query_builder.push_bind(&item.name);
76/// }
77/// }
78///
79/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
80/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
81/// let binder = ProductBinder;
82///
83/// let writer = RdbcItemWriterBuilder::<Product>::new()
84/// .mysql(&pool)
85/// .table("products")
86/// .add_column("id")
87/// .add_column("name")
88/// .mysql_binder(&binder)
89/// .build_mysql();
90/// # Ok(())
91/// # }
92/// ```
93///
94/// ## SQLite
95/// ```no_run
96/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
97/// use sqlx::{SqlitePool, query_builder::Separated, Sqlite};
98/// use serde::Serialize;
99///
100/// #[derive(Clone, Serialize)]
101/// struct Task {
102/// id: i32,
103/// title: String,
104/// }
105///
106/// struct TaskBinder;
107/// impl DatabaseItemBinder<Task, Sqlite> for TaskBinder {
108/// fn bind(&self, item: &Task, mut query_builder: Separated<Sqlite, &str>) {
109/// let _ = (item, query_builder); // Placeholder to avoid unused warnings
110/// // In real usage: query_builder.push_bind(item.id);
111/// // In real usage: query_builder.push_bind(&item.title);
112/// }
113/// }
114///
115/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
116/// let pool = SqlitePool::connect("sqlite::memory:").await?;
117/// let binder = TaskBinder;
118///
119/// let writer = RdbcItemWriterBuilder::<Task>::new()
120/// .sqlite(&pool)
121/// .table("tasks")
122/// .add_column("id")
123/// .add_column("title")
124/// .sqlite_binder(&binder)
125/// .build_sqlite();
126/// # Ok(())
127/// # }
128/// ```
129pub struct RdbcItemWriterBuilder<'a, O> {
130 postgres_pool: Option<&'a Pool<Postgres>>,
131 mysql_pool: Option<&'a Pool<MySql>>,
132 sqlite_pool: Option<&'a Pool<Sqlite>>,
133 table: Option<&'a str>,
134 columns: Vec<&'a str>,
135 postgres_binder: Option<&'a dyn DatabaseItemBinder<O, Postgres>>,
136 mysql_binder: Option<&'a dyn DatabaseItemBinder<O, MySql>>,
137 sqlite_binder: Option<&'a dyn DatabaseItemBinder<O, Sqlite>>,
138 db_type: Option<DatabaseType>,
139}
140
141impl<'a, O> RdbcItemWriterBuilder<'a, O> {
142 /// Creates a new unified writer builder with default configuration.
143 pub fn new() -> Self {
144 Self {
145 postgres_pool: None,
146 mysql_pool: None,
147 sqlite_pool: None,
148 table: None,
149 columns: Vec::new(),
150 postgres_binder: None,
151 mysql_binder: None,
152 sqlite_binder: None,
153 db_type: None,
154 }
155 }
156
157 /// Sets the PostgreSQL connection pool and database type.
158 ///
159 /// # Arguments
160 /// * `pool` - The PostgreSQL connection pool
161 ///
162 /// # Returns
163 /// The updated builder instance configured for PostgreSQL
164 pub fn postgres(mut self, pool: &'a Pool<Postgres>) -> Self {
165 self.postgres_pool = Some(pool);
166 self.db_type = Some(DatabaseType::Postgres);
167 self
168 }
169
170 /// Sets the MySQL connection pool and database type.
171 ///
172 /// # Arguments
173 /// * `pool` - The MySQL connection pool
174 ///
175 /// # Returns
176 /// The updated builder instance configured for MySQL
177 pub fn mysql(mut self, pool: &'a Pool<MySql>) -> Self {
178 self.mysql_pool = Some(pool);
179 self.db_type = Some(DatabaseType::MySql);
180 self
181 }
182
183 /// Sets the SQLite connection pool and database type.
184 ///
185 /// # Arguments
186 /// * `pool` - The SQLite connection pool
187 ///
188 /// # Returns
189 /// The updated builder instance configured for SQLite
190 pub fn sqlite(mut self, pool: &'a Pool<Sqlite>) -> Self {
191 self.sqlite_pool = Some(pool);
192 self.db_type = Some(DatabaseType::Sqlite);
193 self
194 }
195
196 /// Sets the table name for the writer.
197 ///
198 /// # Arguments
199 /// * `table` - The database table name
200 ///
201 /// # Returns
202 /// The updated builder instance
203 pub fn table(mut self, table: &'a str) -> Self {
204 self.table = Some(table);
205 self
206 }
207
208 /// Adds a column to the writer.
209 ///
210 /// # Arguments
211 /// * `column` - The column name
212 ///
213 /// # Returns
214 /// The updated builder instance
215 pub fn add_column(mut self, column: &'a str) -> Self {
216 self.columns.push(column);
217 self
218 }
219
220 /// Sets the item binder for PostgreSQL.
221 ///
222 /// # Arguments
223 /// * `binder` - The PostgreSQL-specific item binder
224 ///
225 /// # Returns
226 /// The updated builder instance
227 pub fn postgres_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, Postgres>) -> Self {
228 self.postgres_binder = Some(binder);
229 self
230 }
231
232 /// Sets the item binder for MySQL.
233 ///
234 /// # Arguments
235 /// * `binder` - The MySQL-specific item binder
236 ///
237 /// # Returns
238 /// The updated builder instance
239 pub fn mysql_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, MySql>) -> Self {
240 self.mysql_binder = Some(binder);
241 self
242 }
243
244 /// Sets the item binder for SQLite.
245 ///
246 /// # Arguments
247 /// * `binder` - The SQLite-specific item binder
248 ///
249 /// # Returns
250 /// The updated builder instance
251 pub fn sqlite_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, Sqlite>) -> Self {
252 self.sqlite_binder = Some(binder);
253 self
254 }
255
256 /// Builds a PostgreSQL writer.
257 ///
258 /// # Returns
259 /// A configured `PostgresItemWriter` instance
260 ///
261 /// # Panics
262 /// Panics if required PostgreSQL-specific configuration is missing
263 pub fn build_postgres(self) -> PostgresItemWriter<'a, O> {
264 let mut writer = PostgresItemWriter::new();
265
266 if let Some(pool) = self.postgres_pool {
267 writer = writer.pool(pool);
268 }
269
270 if let Some(table) = self.table {
271 writer = writer.table(table);
272 }
273
274 for column in self.columns {
275 writer = writer.add_column(column);
276 }
277
278 if let Some(binder) = self.postgres_binder {
279 writer = writer.item_binder(binder);
280 }
281
282 writer
283 }
284
285 /// Builds a MySQL writer.
286 ///
287 /// # Returns
288 /// A configured `MySqlItemWriter` instance
289 ///
290 /// # Panics
291 /// Panics if required MySQL-specific configuration is missing
292 pub fn build_mysql(self) -> MySqlItemWriter<'a, O> {
293 let mut writer = MySqlItemWriter::new();
294
295 if let Some(pool) = self.mysql_pool {
296 writer = writer.pool(pool);
297 }
298
299 if let Some(table) = self.table {
300 writer = writer.table(table);
301 }
302
303 for column in self.columns {
304 writer = writer.add_column(column);
305 }
306
307 if let Some(binder) = self.mysql_binder {
308 writer = writer.item_binder(binder);
309 }
310
311 writer
312 }
313
314 /// Builds a SQLite writer.
315 ///
316 /// # Returns
317 /// A configured `SqliteItemWriter` instance
318 ///
319 /// # Panics
320 /// Panics if required SQLite-specific configuration is missing
321 pub fn build_sqlite(self) -> SqliteItemWriter<'a, O> {
322 let mut writer = SqliteItemWriter::new();
323
324 if let Some(pool) = self.sqlite_pool {
325 writer = writer.pool(pool);
326 }
327
328 if let Some(table) = self.table {
329 writer = writer.table(table);
330 }
331
332 for column in self.columns {
333 writer = writer.add_column(column);
334 }
335
336 if let Some(binder) = self.sqlite_binder {
337 writer = writer.item_binder(binder);
338 }
339
340 writer
341 }
342}
343
344impl<'a, O> Default for RdbcItemWriterBuilder<'a, O> {
345 fn default() -> Self {
346 Self::new()
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::core::item::ItemWriter;
354 use sqlx::query_builder::Separated;
355
356 struct NopBinder;
357 impl DatabaseItemBinder<String, Postgres> for NopBinder {
358 fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
359 }
360 impl DatabaseItemBinder<String, MySql> for NopBinder {
361 fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
362 }
363 impl DatabaseItemBinder<String, Sqlite> for NopBinder {
364 fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
365 }
366
367 #[test]
368 fn should_set_table_name_in_postgres_writer() {
369 let writer = RdbcItemWriterBuilder::<String>::new()
370 .table("users")
371 .build_postgres();
372 assert_eq!(writer.table, Some("users"));
373 }
374
375 #[test]
376 fn should_accumulate_columns_in_postgres_writer() {
377 let writer = RdbcItemWriterBuilder::<String>::new()
378 .add_column("id")
379 .add_column("name")
380 .build_postgres();
381 assert_eq!(writer.columns, vec!["id", "name"]);
382 }
383
384 #[test]
385 fn should_transfer_table_and_columns_to_mysql_writer() {
386 let writer = RdbcItemWriterBuilder::<String>::new()
387 .table("orders")
388 .add_column("order_id")
389 .add_column("total")
390 .build_mysql();
391 assert_eq!(writer.table, Some("orders"));
392 assert_eq!(writer.columns, vec!["order_id", "total"]);
393 }
394
395 #[test]
396 fn should_transfer_table_and_columns_to_sqlite_writer() {
397 use crate::BatchError;
398 // No pool configured → validate_config will fail on "pool", not on table/columns.
399 // If table or columns were missing the error would mention those instead,
400 // so reaching the "pool" error proves both were transferred correctly.
401 let binder = NopBinder;
402 let writer = RdbcItemWriterBuilder::<String>::new()
403 .table("items")
404 .add_column("sku")
405 .sqlite_binder(&binder)
406 .build_sqlite();
407 let result = writer.write(&["x".to_string()]);
408 match result.err().unwrap() {
409 BatchError::ItemWriter(msg) => assert!(
410 msg.contains("pool"),
411 "table and columns were set so error should be about pool, got: {msg}"
412 ),
413 e => panic!("expected ItemWriter, got {e:?}"),
414 }
415 }
416
417 #[test]
418 fn should_set_postgres_binder() {
419 let binder = NopBinder;
420 let writer = RdbcItemWriterBuilder::<String>::new()
421 .postgres_binder(&binder)
422 .build_postgres();
423 assert!(
424 writer.item_binder.is_some(),
425 "postgres binder should be set"
426 );
427 }
428
429 #[test]
430 fn should_set_mysql_binder() {
431 let binder = NopBinder;
432 let writer = RdbcItemWriterBuilder::<String>::new()
433 .mysql_binder(&binder)
434 .build_mysql();
435 assert!(writer.item_binder.is_some(), "mysql binder should be set");
436 }
437
438 #[test]
439 fn should_transfer_sqlite_binder_to_writer() {
440 use crate::BatchError;
441 // With binder set but no pool, write() should fail on "pool" not on "binder"
442 let binder = NopBinder;
443 let writer = RdbcItemWriterBuilder::<String>::new()
444 .table("t")
445 .add_column("v")
446 .sqlite_binder(&binder)
447 .build_sqlite();
448 let result = writer.write(&["x".to_string()]);
449 match result.err().unwrap() {
450 BatchError::ItemWriter(msg) => assert!(
451 msg.contains("pool"),
452 "binder was set so error should be about pool, got: {msg}"
453 ),
454 e => panic!("expected ItemWriter, got {e:?}"),
455 }
456 }
457
458 #[tokio::test(flavor = "multi_thread")]
459 async fn should_transfer_sqlite_pool_to_writer() {
460 use crate::BatchError;
461 let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
462 // Pool set but no binder → error is "binder not configured"
463 let writer = RdbcItemWriterBuilder::<String>::new()
464 .sqlite(&pool)
465 .table("t")
466 .add_column("v")
467 .build_sqlite();
468 let result = writer.write(&["x".to_string()]);
469 match result.err().unwrap() {
470 BatchError::ItemWriter(msg) => assert!(
471 msg.contains("binder"),
472 "pool was set so error should be about binder, got: {msg}"
473 ),
474 e => panic!("expected ItemWriter, got {e:?}"),
475 }
476 }
477
478 #[test]
479 fn should_have_no_table_by_default_in_mysql_writer() {
480 let writer = RdbcItemWriterBuilder::<String>::new().build_mysql();
481 assert!(writer.table.is_none());
482 assert!(writer.columns.is_empty());
483 }
484
485 #[test]
486 fn should_create_via_default() {
487 let _b = RdbcItemWriterBuilder::<String>::default();
488 }
489}