Skip to main content

spring_batch_rs/item/rdbc/
unified_writer_builder.rs

1use sqlx::{MySql, Pool, Postgres, Sqlite};
2
3use super::DatabaseItemBinder;
4use super::database_type::DatabaseType;
5use super::mysql_writer::MySqlItemWriter;
6use super::postgres_writer::PostgresItemWriter;
7use super::sqlite_writer::SqliteItemWriter;
8
9/// Unified builder for creating RDBC item writers for any supported database type.
10///
11/// This builder provides a consistent API for constructing database writers
12/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
13/// Users specify the database type, connection pool, table, and columns,
14/// and the builder handles the creation of the appropriate writer implementation.
15///
16/// # Type Parameters
17///
18/// * `O` - The item type to write
19///
20/// # Examples
21///
22/// ## PostgreSQL
23/// ```no_run
24/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
25/// use sqlx::{PgPool, query_builder::Separated, Postgres};
26/// use serde::Serialize;
27///
28/// #[derive(Clone, Serialize)]
29/// struct User {
30///     id: i32,
31///     name: String,
32/// }
33///
34/// struct UserBinder;
35/// impl DatabaseItemBinder<User, Postgres> for UserBinder {
36///     fn bind(&self, item: &User, mut query_builder: Separated<Postgres, &str>) {
37///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
38///         // In real usage: query_builder.push_bind(item.id);
39///         // In real usage: query_builder.push_bind(&item.name);
40///     }
41/// }
42///
43/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
44/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
45/// let binder = UserBinder;
46///
47/// let writer = RdbcItemWriterBuilder::<User>::new()
48///     .postgres(&pool)
49///     .table("users")
50///     .add_column("id")
51///     .add_column("name")
52///     .postgres_binder(&binder)
53///     .build_postgres();
54/// # Ok(())
55/// # }
56/// ```
57///
58/// ## MySQL
59/// ```no_run
60/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
61/// use sqlx::{MySqlPool, query_builder::Separated, MySql};
62/// use serde::Serialize;
63///
64/// #[derive(Clone, Serialize)]
65/// struct Product {
66///     id: i32,
67///     name: String,
68/// }
69///
70/// struct ProductBinder;
71/// impl DatabaseItemBinder<Product, MySql> for ProductBinder {
72///     fn bind(&self, item: &Product, mut query_builder: Separated<MySql, &str>) {
73///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
74///         // In real usage: query_builder.push_bind(item.id);
75///         // In real usage: query_builder.push_bind(&item.name);
76///     }
77/// }
78///
79/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
80/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
81/// let binder = ProductBinder;
82///
83/// let writer = RdbcItemWriterBuilder::<Product>::new()
84///     .mysql(&pool)
85///     .table("products")
86///     .add_column("id")
87///     .add_column("name")
88///     .mysql_binder(&binder)
89///     .build_mysql();
90/// # Ok(())
91/// # }
92/// ```
93///
94/// ## SQLite
95/// ```no_run
96/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
97/// use sqlx::{SqlitePool, query_builder::Separated, Sqlite};
98/// use serde::Serialize;
99///
100/// #[derive(Clone, Serialize)]
101/// struct Task {
102///     id: i32,
103///     title: String,
104/// }
105///
106/// struct TaskBinder;
107/// impl DatabaseItemBinder<Task, Sqlite> for TaskBinder {
108///     fn bind(&self, item: &Task, mut query_builder: Separated<Sqlite, &str>) {
109///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
110///         // In real usage: query_builder.push_bind(item.id);
111///         // In real usage: query_builder.push_bind(&item.title);
112///     }
113/// }
114///
115/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
116/// let pool = SqlitePool::connect("sqlite::memory:").await?;
117/// let binder = TaskBinder;
118///
119/// let writer = RdbcItemWriterBuilder::<Task>::new()
120///     .sqlite(&pool)
121///     .table("tasks")
122///     .add_column("id")
123///     .add_column("title")
124///     .sqlite_binder(&binder)
125///     .build_sqlite();
126/// # Ok(())
127/// # }
128/// ```
129pub struct RdbcItemWriterBuilder<'a, O> {
130    postgres_pool: Option<&'a Pool<Postgres>>,
131    mysql_pool: Option<&'a Pool<MySql>>,
132    sqlite_pool: Option<&'a Pool<Sqlite>>,
133    table: Option<&'a str>,
134    columns: Vec<&'a str>,
135    postgres_binder: Option<&'a dyn DatabaseItemBinder<O, Postgres>>,
136    mysql_binder: Option<&'a dyn DatabaseItemBinder<O, MySql>>,
137    sqlite_binder: Option<&'a dyn DatabaseItemBinder<O, Sqlite>>,
138    db_type: Option<DatabaseType>,
139}
140
141impl<'a, O> RdbcItemWriterBuilder<'a, O> {
142    /// Creates a new unified writer builder with default configuration.
143    pub fn new() -> Self {
144        Self {
145            postgres_pool: None,
146            mysql_pool: None,
147            sqlite_pool: None,
148            table: None,
149            columns: Vec::new(),
150            postgres_binder: None,
151            mysql_binder: None,
152            sqlite_binder: None,
153            db_type: None,
154        }
155    }
156
157    /// Sets the PostgreSQL connection pool and database type.
158    ///
159    /// # Arguments
160    /// * `pool` - The PostgreSQL connection pool
161    ///
162    /// # Returns
163    /// The updated builder instance configured for PostgreSQL
164    pub fn postgres(mut self, pool: &'a Pool<Postgres>) -> Self {
165        self.postgres_pool = Some(pool);
166        self.db_type = Some(DatabaseType::Postgres);
167        self
168    }
169
170    /// Sets the MySQL connection pool and database type.
171    ///
172    /// # Arguments
173    /// * `pool` - The MySQL connection pool
174    ///
175    /// # Returns
176    /// The updated builder instance configured for MySQL
177    pub fn mysql(mut self, pool: &'a Pool<MySql>) -> Self {
178        self.mysql_pool = Some(pool);
179        self.db_type = Some(DatabaseType::MySql);
180        self
181    }
182
183    /// Sets the SQLite connection pool and database type.
184    ///
185    /// # Arguments
186    /// * `pool` - The SQLite connection pool
187    ///
188    /// # Returns
189    /// The updated builder instance configured for SQLite
190    pub fn sqlite(mut self, pool: &'a Pool<Sqlite>) -> Self {
191        self.sqlite_pool = Some(pool);
192        self.db_type = Some(DatabaseType::Sqlite);
193        self
194    }
195
196    /// Sets the table name for the writer.
197    ///
198    /// # Arguments
199    /// * `table` - The database table name
200    ///
201    /// # Returns
202    /// The updated builder instance
203    pub fn table(mut self, table: &'a str) -> Self {
204        self.table = Some(table);
205        self
206    }
207
208    /// Adds a column to the writer.
209    ///
210    /// # Arguments
211    /// * `column` - The column name
212    ///
213    /// # Returns
214    /// The updated builder instance
215    pub fn add_column(mut self, column: &'a str) -> Self {
216        self.columns.push(column);
217        self
218    }
219
220    /// Sets the item binder for PostgreSQL.
221    ///
222    /// # Arguments
223    /// * `binder` - The PostgreSQL-specific item binder
224    ///
225    /// # Returns
226    /// The updated builder instance
227    pub fn postgres_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, Postgres>) -> Self {
228        self.postgres_binder = Some(binder);
229        self
230    }
231
232    /// Sets the item binder for MySQL.
233    ///
234    /// # Arguments
235    /// * `binder` - The MySQL-specific item binder
236    ///
237    /// # Returns
238    /// The updated builder instance
239    pub fn mysql_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, MySql>) -> Self {
240        self.mysql_binder = Some(binder);
241        self
242    }
243
244    /// Sets the item binder for SQLite.
245    ///
246    /// # Arguments
247    /// * `binder` - The SQLite-specific item binder
248    ///
249    /// # Returns
250    /// The updated builder instance
251    pub fn sqlite_binder(mut self, binder: &'a dyn DatabaseItemBinder<O, Sqlite>) -> Self {
252        self.sqlite_binder = Some(binder);
253        self
254    }
255
256    /// Builds a PostgreSQL writer.
257    ///
258    /// # Returns
259    /// A configured `PostgresItemWriter` instance
260    ///
261    /// # Panics
262    /// Panics if required PostgreSQL-specific configuration is missing
263    pub fn build_postgres(self) -> PostgresItemWriter<'a, O> {
264        let mut writer = PostgresItemWriter::new();
265
266        if let Some(pool) = self.postgres_pool {
267            writer = writer.pool(pool);
268        }
269
270        if let Some(table) = self.table {
271            writer = writer.table(table);
272        }
273
274        for column in self.columns {
275            writer = writer.add_column(column);
276        }
277
278        if let Some(binder) = self.postgres_binder {
279            writer = writer.item_binder(binder);
280        }
281
282        writer
283    }
284
285    /// Builds a MySQL writer.
286    ///
287    /// # Returns
288    /// A configured `MySqlItemWriter` instance
289    ///
290    /// # Panics
291    /// Panics if required MySQL-specific configuration is missing
292    pub fn build_mysql(self) -> MySqlItemWriter<'a, O> {
293        let mut writer = MySqlItemWriter::new();
294
295        if let Some(pool) = self.mysql_pool {
296            writer = writer.pool(pool);
297        }
298
299        if let Some(table) = self.table {
300            writer = writer.table(table);
301        }
302
303        for column in self.columns {
304            writer = writer.add_column(column);
305        }
306
307        if let Some(binder) = self.mysql_binder {
308            writer = writer.item_binder(binder);
309        }
310
311        writer
312    }
313
314    /// Builds a SQLite writer.
315    ///
316    /// # Returns
317    /// A configured `SqliteItemWriter` instance
318    ///
319    /// # Panics
320    /// Panics if required SQLite-specific configuration is missing
321    pub fn build_sqlite(self) -> SqliteItemWriter<'a, O> {
322        let mut writer = SqliteItemWriter::new();
323
324        if let Some(pool) = self.sqlite_pool {
325            writer = writer.pool(pool);
326        }
327
328        if let Some(table) = self.table {
329            writer = writer.table(table);
330        }
331
332        for column in self.columns {
333            writer = writer.add_column(column);
334        }
335
336        if let Some(binder) = self.sqlite_binder {
337            writer = writer.item_binder(binder);
338        }
339
340        writer
341    }
342}
343
344impl<'a, O> Default for RdbcItemWriterBuilder<'a, O> {
345    fn default() -> Self {
346        Self::new()
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::core::item::ItemWriter;
354    use sqlx::query_builder::Separated;
355
356    struct NopBinder;
357    impl DatabaseItemBinder<String, Postgres> for NopBinder {
358        fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
359    }
360    impl DatabaseItemBinder<String, MySql> for NopBinder {
361        fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
362    }
363    impl DatabaseItemBinder<String, Sqlite> for NopBinder {
364        fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
365    }
366
367    #[test]
368    fn should_set_table_name_in_postgres_writer() {
369        let writer = RdbcItemWriterBuilder::<String>::new()
370            .table("users")
371            .build_postgres();
372        assert_eq!(writer.table, Some("users"));
373    }
374
375    #[test]
376    fn should_accumulate_columns_in_postgres_writer() {
377        let writer = RdbcItemWriterBuilder::<String>::new()
378            .add_column("id")
379            .add_column("name")
380            .build_postgres();
381        assert_eq!(writer.columns, vec!["id", "name"]);
382    }
383
384    #[test]
385    fn should_transfer_table_and_columns_to_mysql_writer() {
386        let writer = RdbcItemWriterBuilder::<String>::new()
387            .table("orders")
388            .add_column("order_id")
389            .add_column("total")
390            .build_mysql();
391        assert_eq!(writer.table, Some("orders"));
392        assert_eq!(writer.columns, vec!["order_id", "total"]);
393    }
394
395    #[test]
396    fn should_transfer_table_and_columns_to_sqlite_writer() {
397        use crate::BatchError;
398        // No pool configured → validate_config will fail on "pool", not on table/columns.
399        // If table or columns were missing the error would mention those instead,
400        // so reaching the "pool" error proves both were transferred correctly.
401        let binder = NopBinder;
402        let writer = RdbcItemWriterBuilder::<String>::new()
403            .table("items")
404            .add_column("sku")
405            .sqlite_binder(&binder)
406            .build_sqlite();
407        let result = writer.write(&["x".to_string()]);
408        match result.err().unwrap() {
409            BatchError::ItemWriter(msg) => assert!(
410                msg.contains("pool"),
411                "table and columns were set so error should be about pool, got: {msg}"
412            ),
413            e => panic!("expected ItemWriter, got {e:?}"),
414        }
415    }
416
417    #[test]
418    fn should_set_postgres_binder() {
419        let binder = NopBinder;
420        let writer = RdbcItemWriterBuilder::<String>::new()
421            .postgres_binder(&binder)
422            .build_postgres();
423        assert!(
424            writer.item_binder.is_some(),
425            "postgres binder should be set"
426        );
427    }
428
429    #[test]
430    fn should_set_mysql_binder() {
431        let binder = NopBinder;
432        let writer = RdbcItemWriterBuilder::<String>::new()
433            .mysql_binder(&binder)
434            .build_mysql();
435        assert!(writer.item_binder.is_some(), "mysql binder should be set");
436    }
437
438    #[test]
439    fn should_transfer_sqlite_binder_to_writer() {
440        use crate::BatchError;
441        // With binder set but no pool, write() should fail on "pool" not on "binder"
442        let binder = NopBinder;
443        let writer = RdbcItemWriterBuilder::<String>::new()
444            .table("t")
445            .add_column("v")
446            .sqlite_binder(&binder)
447            .build_sqlite();
448        let result = writer.write(&["x".to_string()]);
449        match result.err().unwrap() {
450            BatchError::ItemWriter(msg) => assert!(
451                msg.contains("pool"),
452                "binder was set so error should be about pool, got: {msg}"
453            ),
454            e => panic!("expected ItemWriter, got {e:?}"),
455        }
456    }
457
458    #[tokio::test(flavor = "multi_thread")]
459    async fn should_transfer_sqlite_pool_to_writer() {
460        use crate::BatchError;
461        let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
462        // Pool set but no binder → error is "binder not configured"
463        let writer = RdbcItemWriterBuilder::<String>::new()
464            .sqlite(&pool)
465            .table("t")
466            .add_column("v")
467            .build_sqlite();
468        let result = writer.write(&["x".to_string()]);
469        match result.err().unwrap() {
470            BatchError::ItemWriter(msg) => assert!(
471                msg.contains("binder"),
472                "pool was set so error should be about binder, got: {msg}"
473            ),
474            e => panic!("expected ItemWriter, got {e:?}"),
475        }
476    }
477
478    #[test]
479    fn should_have_no_table_by_default_in_mysql_writer() {
480        let writer = RdbcItemWriterBuilder::<String>::new().build_mysql();
481        assert!(writer.table.is_none());
482        assert!(writer.columns.is_empty());
483    }
484
485    #[test]
486    fn should_create_via_default() {
487        let _b = RdbcItemWriterBuilder::<String>::default();
488    }
489}