Skip to main content

spring_batch_rs/item/rdbc/
unified_reader_builder.rs

1use sqlx::{
2    mysql::MySqlRow, postgres::PgRow, sqlite::SqliteRow, FromRow, MySql, Pool, Postgres, Sqlite,
3};
4use std::marker::PhantomData;
5
6use super::database_type::DatabaseType;
7use super::mysql_reader::MySqlRdbcItemReader;
8use super::postgres_reader::PostgresRdbcItemReader;
9use super::sqlite_reader::SqliteRdbcItemReader;
10
11/// Unified builder for creating RDBC item readers for any supported database type.
12///
13/// This builder provides a consistent API for constructing database readers
14/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
15/// Users specify the database type and connection pool, and the builder
16/// handles the creation of the appropriate reader implementation.
17///
18/// # Type Parameters
19///
20/// * `I` - The item type that implements the appropriate `FromRow` trait for the database
21///
22/// # Examples
23///
24/// ## PostgreSQL
25/// ```no_run
26/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
27/// use sqlx::PgPool;
28/// # use serde::Deserialize;
29/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
30/// # struct User { id: i32, name: String }
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
34///
35/// let reader = RdbcItemReaderBuilder::<User>::new()
36///     .postgres(pool)
37///     .query("SELECT id, name FROM users")
38///     .with_page_size(100)
39///     .build_postgres();
40/// # Ok(())
41/// # }
42/// ```
43///
44/// ## MySQL
45/// ```no_run
46/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
47/// use sqlx::MySqlPool;
48/// # use serde::Deserialize;
49/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
50/// # struct Product { id: i32, name: String }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
54///
55/// let reader = RdbcItemReaderBuilder::<Product>::new()
56///     .mysql(pool)
57///     .query("SELECT id, name FROM products")
58///     .with_page_size(100)
59///     .build_mysql();
60/// # Ok(())
61/// # }
62/// ```
63///
64/// ## SQLite
65/// ```no_run
66/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
67/// use sqlx::SqlitePool;
68/// # use serde::Deserialize;
69/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
70/// # struct Task { id: i32, title: String }
71///
72/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
73/// let pool = SqlitePool::connect("sqlite::memory:").await?;
74///
75/// let reader = RdbcItemReaderBuilder::<Task>::new()
76///     .sqlite(pool)
77///     .query("SELECT id, title FROM tasks")
78///     .with_page_size(100)
79///     .build_sqlite();
80/// # Ok(())
81/// # }
82/// ```
83pub struct RdbcItemReaderBuilder<'a, I> {
84    postgres_pool: Option<Pool<Postgres>>,
85    mysql_pool: Option<Pool<MySql>>,
86    sqlite_pool: Option<Pool<Sqlite>>,
87    query: Option<&'a str>,
88    page_size: Option<i32>,
89    db_type: Option<DatabaseType>,
90    _phantom: PhantomData<I>,
91}
92
93impl<'a, I> RdbcItemReaderBuilder<'a, I> {
94    /// Creates a new unified reader builder with default configuration.
95    pub fn new() -> Self {
96        Self {
97            postgres_pool: None,
98            mysql_pool: None,
99            sqlite_pool: None,
100            query: None,
101            page_size: None,
102            db_type: None,
103            _phantom: PhantomData,
104        }
105    }
106
107    /// Sets the PostgreSQL connection pool and database type.
108    ///
109    /// # Arguments
110    /// * `pool` - The PostgreSQL connection pool
111    ///
112    /// # Returns
113    /// The updated builder instance configured for PostgreSQL
114    pub fn postgres(mut self, pool: Pool<Postgres>) -> Self {
115        self.postgres_pool = Some(pool);
116        self.db_type = Some(DatabaseType::Postgres);
117        self
118    }
119
120    /// Sets the MySQL connection pool and database type.
121    ///
122    /// # Arguments
123    /// * `pool` - The MySQL connection pool
124    ///
125    /// # Returns
126    /// The updated builder instance configured for MySQL
127    pub fn mysql(mut self, pool: Pool<MySql>) -> Self {
128        self.mysql_pool = Some(pool);
129        self.db_type = Some(DatabaseType::MySql);
130        self
131    }
132
133    /// Sets the SQLite connection pool and database type.
134    ///
135    /// # Arguments
136    /// * `pool` - The SQLite connection pool
137    ///
138    /// # Returns
139    /// The updated builder instance configured for SQLite
140    pub fn sqlite(mut self, pool: Pool<Sqlite>) -> Self {
141        self.sqlite_pool = Some(pool);
142        self.db_type = Some(DatabaseType::Sqlite);
143        self
144    }
145
146    /// Sets the SQL query for the reader.
147    ///
148    /// The query should not include LIMIT/OFFSET clauses as these are handled
149    /// automatically when page_size is configured.
150    ///
151    /// # Arguments
152    /// * `query` - The SQL query to execute
153    ///
154    /// # Returns
155    /// The updated builder instance
156    pub fn query(mut self, query: &'a str) -> Self {
157        self.query = Some(query);
158        self
159    }
160
161    /// Sets the page size for paginated reading.
162    ///
163    /// When set, the reader will fetch data in chunks of this size to manage
164    /// memory usage efficiently.
165    ///
166    /// # Arguments
167    /// * `page_size` - Number of items to read per page
168    ///
169    /// # Returns
170    /// The updated builder instance
171    pub fn with_page_size(mut self, page_size: i32) -> Self {
172        self.page_size = Some(page_size);
173        self
174    }
175}
176
177impl<'a, I> RdbcItemReaderBuilder<'a, I>
178where
179    for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
180{
181    /// Builds a PostgreSQL reader.
182    ///
183    /// # Returns
184    /// A configured `PostgresRdbcItemReader` instance
185    ///
186    /// # Panics
187    /// Panics if PostgreSQL pool or query are missing
188    pub fn build_postgres(self) -> PostgresRdbcItemReader<'a, I> {
189        PostgresRdbcItemReader::new(
190            self.postgres_pool.expect("PostgreSQL pool is required"),
191            self.query.expect("Query is required"),
192            self.page_size,
193        )
194    }
195}
196
197impl<'a, I> RdbcItemReaderBuilder<'a, I>
198where
199    for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
200{
201    /// Builds a MySQL reader.
202    ///
203    /// # Returns
204    /// A configured `MySqlRdbcItemReader` instance
205    ///
206    /// # Panics
207    /// Panics if MySQL pool or query are missing
208    pub fn build_mysql(self) -> MySqlRdbcItemReader<'a, I> {
209        MySqlRdbcItemReader::new(
210            self.mysql_pool.expect("MySQL pool is required"),
211            self.query.expect("Query is required"),
212            self.page_size,
213        )
214    }
215}
216
217impl<'a, I> RdbcItemReaderBuilder<'a, I>
218where
219    for<'r> I: FromRow<'r, SqliteRow> + Send + Unpin + Clone,
220{
221    /// Builds a SQLite reader.
222    ///
223    /// # Returns
224    /// A configured `SqliteRdbcItemReader` instance
225    ///
226    /// # Panics
227    /// Panics if SQLite pool or query are missing
228    pub fn build_sqlite(self) -> SqliteRdbcItemReader<'a, I> {
229        SqliteRdbcItemReader::new(
230            self.sqlite_pool.expect("SQLite pool is required"),
231            self.query.expect("Query is required"),
232            self.page_size,
233        )
234    }
235}
236
237impl<'a, I> Default for RdbcItemReaderBuilder<'a, I> {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use sqlx::{FromRow, SqlitePool};
247
248    #[derive(Clone, FromRow)]
249    struct Dummy {
250        id: i32,
251    }
252
253    #[test]
254    fn should_create_via_default() {
255        // Default == new(), both should produce identical builders
256        let _b = RdbcItemReaderBuilder::<Dummy>::default();
257    }
258
259    #[test]
260    #[should_panic(expected = "SQLite pool is required")]
261    fn should_panic_when_building_sqlite_without_pool() {
262        let _ = RdbcItemReaderBuilder::<Dummy>::new()
263            .query("SELECT id FROM t")
264            .build_sqlite();
265    }
266
267    #[tokio::test(flavor = "multi_thread")]
268    async fn should_build_sqlite_reader_with_pool_and_query() {
269        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
270        let reader = RdbcItemReaderBuilder::<Dummy>::new()
271            .sqlite(pool)
272            .query("SELECT 1 AS id")
273            .build_sqlite();
274        assert_eq!(reader.query, "SELECT 1 AS id");
275        assert_eq!(reader.page_size, None);
276        assert_eq!(reader.offset.get(), 0);
277    }
278
279    #[tokio::test(flavor = "multi_thread")]
280    async fn should_propagate_page_size_to_sqlite_reader() {
281        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
282        let reader = RdbcItemReaderBuilder::<Dummy>::new()
283            .sqlite(pool)
284            .query("SELECT 1 AS id")
285            .with_page_size(25)
286            .build_sqlite();
287        assert_eq!(reader.page_size, Some(25));
288    }
289
290    #[test]
291    #[should_panic(expected = "PostgreSQL pool is required")]
292    fn should_panic_when_building_postgres_without_pool() {
293        let _ = RdbcItemReaderBuilder::<Dummy>::new()
294            .query("SELECT id FROM t")
295            .build_postgres();
296    }
297
298    #[test]
299    #[should_panic(expected = "MySQL pool is required")]
300    fn should_panic_when_building_mysql_without_pool() {
301        let _ = RdbcItemReaderBuilder::<Dummy>::new()
302            .query("SELECT id FROM t")
303            .build_mysql();
304    }
305}