Skip to main content

spring_batch_rs/item/rdbc/
unified_reader_builder.rs

1use sqlx::{
2    FromRow, MySql, Pool, Postgres, Sqlite, mysql::MySqlRow, postgres::PgRow, sqlite::SqliteRow,
3};
4use std::marker::PhantomData;
5
6use super::database_type::DatabaseType;
7use super::mysql_reader::MySqlRdbcItemReader;
8use super::postgres_reader::PostgresRdbcItemReader;
9use super::sqlite_reader::SqliteRdbcItemReader;
10
11/// Unified builder for creating RDBC item readers for any supported database type.
12///
13/// This builder provides a consistent API for constructing database readers
14/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
15/// Users specify the database type and connection pool, and the builder
16/// handles the creation of the appropriate reader implementation.
17///
18/// # Type Parameters
19///
20/// * `I` - The item type that implements the appropriate `FromRow` trait for the database
21///
22/// # Examples
23///
24/// ## PostgreSQL
25/// ```no_run
26/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
27/// use sqlx::PgPool;
28/// # use serde::Deserialize;
29/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
30/// # struct User { id: i32, name: String }
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
34///
35/// let reader = RdbcItemReaderBuilder::<User>::new()
36///     .postgres(pool)
37///     .query("SELECT id, name FROM users")
38///     .with_page_size(100)
39///     .build_postgres();
40/// # Ok(())
41/// # }
42/// ```
43///
44/// ## MySQL
45/// ```no_run
46/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
47/// use sqlx::MySqlPool;
48/// # use serde::Deserialize;
49/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
50/// # struct Product { id: i32, name: String }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
54///
55/// let reader = RdbcItemReaderBuilder::<Product>::new()
56///     .mysql(pool)
57///     .query("SELECT id, name FROM products")
58///     .with_page_size(100)
59///     .build_mysql();
60/// # Ok(())
61/// # }
62/// ```
63///
64/// ## SQLite
65/// ```no_run
66/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
67/// use sqlx::SqlitePool;
68/// # use serde::Deserialize;
69/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
70/// # struct Task { id: i32, title: String }
71///
72/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
73/// let pool = SqlitePool::connect("sqlite::memory:").await?;
74///
75/// let reader = RdbcItemReaderBuilder::<Task>::new()
76///     .sqlite(pool)
77///     .query("SELECT id, title FROM tasks")
78///     .with_page_size(100)
79///     .build_sqlite();
80/// # Ok(())
81/// # }
82/// ```
83pub struct RdbcItemReaderBuilder<'a, I> {
84    postgres_pool: Option<Pool<Postgres>>,
85    mysql_pool: Option<Pool<MySql>>,
86    sqlite_pool: Option<Pool<Sqlite>>,
87    query: Option<&'a str>,
88    page_size: Option<i32>,
89    db_type: Option<DatabaseType>,
90    keyset_column: Option<String>,
91    #[allow(clippy::type_complexity)]
92    keyset_key_fn: Option<Box<dyn Fn(&I) -> String>>,
93    _phantom: PhantomData<I>,
94}
95
96impl<'a, I> RdbcItemReaderBuilder<'a, I> {
97    /// Creates a new unified reader builder with default configuration.
98    pub fn new() -> Self {
99        Self {
100            postgres_pool: None,
101            mysql_pool: None,
102            sqlite_pool: None,
103            query: None,
104            page_size: None,
105            db_type: None,
106            keyset_column: None,
107            keyset_key_fn: None,
108            _phantom: PhantomData,
109        }
110    }
111
112    /// Sets the PostgreSQL connection pool and database type.
113    ///
114    /// # Arguments
115    /// * `pool` - The PostgreSQL connection pool
116    ///
117    /// # Returns
118    /// The updated builder instance configured for PostgreSQL
119    pub fn postgres(mut self, pool: Pool<Postgres>) -> Self {
120        self.postgres_pool = Some(pool);
121        self.db_type = Some(DatabaseType::Postgres);
122        self
123    }
124
125    /// Sets the MySQL connection pool and database type.
126    ///
127    /// # Arguments
128    /// * `pool` - The MySQL connection pool
129    ///
130    /// # Returns
131    /// The updated builder instance configured for MySQL
132    pub fn mysql(mut self, pool: Pool<MySql>) -> Self {
133        self.mysql_pool = Some(pool);
134        self.db_type = Some(DatabaseType::MySql);
135        self
136    }
137
138    /// Sets the SQLite connection pool and database type.
139    ///
140    /// # Arguments
141    /// * `pool` - The SQLite connection pool
142    ///
143    /// # Returns
144    /// The updated builder instance configured for SQLite
145    pub fn sqlite(mut self, pool: Pool<Sqlite>) -> Self {
146        self.sqlite_pool = Some(pool);
147        self.db_type = Some(DatabaseType::Sqlite);
148        self
149    }
150
151    /// Sets the SQL query for the reader.
152    ///
153    /// The query should not include LIMIT/OFFSET clauses as these are handled
154    /// automatically when page_size is configured.
155    ///
156    /// # Arguments
157    /// * `query` - The SQL query to execute
158    ///
159    /// # Returns
160    /// The updated builder instance
161    pub fn query(mut self, query: &'a str) -> Self {
162        self.query = Some(query);
163        self
164    }
165
166    /// Sets the page size for paginated reading.
167    ///
168    /// When set, the reader will fetch data in chunks of this size to manage
169    /// memory usage efficiently.
170    ///
171    /// # Arguments
172    /// * `page_size` - Number of items to read per page
173    ///
174    /// # Returns
175    /// The updated builder instance
176    pub fn with_page_size(mut self, page_size: i32) -> Self {
177        self.page_size = Some(page_size);
178        self
179    }
180
181    /// Enables keyset (cursor) pagination instead of LIMIT/OFFSET.
182    ///
183    /// Keyset pagination is O(log n) per page regardless of dataset size, making it
184    /// significantly faster than LIMIT/OFFSET for large tables.
185    ///
186    /// # Requirements
187    ///
188    /// - The query must **not** include `WHERE`, `ORDER BY`, or `LIMIT` clauses — the
189    ///   framework appends them automatically.
190    /// - The keyset column must be indexed and have unique, sortable values (e.g.
191    ///   primary key, UUID, zero-padded string ID).
192    /// - [`with_page_size`] must also be set.
193    ///
194    /// # Arguments
195    ///
196    /// * `column` - Column name used as the cursor (appended to `WHERE` and `ORDER BY`).
197    /// * `key_fn` - Closure that extracts the cursor value from an item as a `String`.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// use spring_batch_rs::item::rdbc::RdbcItemReaderBuilder;
203    /// use sqlx::PgPool;
204    /// # use serde::Deserialize;
205    /// # #[derive(sqlx::FromRow, Clone, Deserialize)]
206    /// # struct Order { order_id: String, amount: f64 }
207    ///
208    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
209    /// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
210    ///
211    /// let reader = RdbcItemReaderBuilder::<Order>::new()
212    ///     .postgres(pool)
213    ///     .query("SELECT order_id, amount FROM orders")
214    ///     .with_page_size(1_000)
215    ///     .with_keyset("order_id", |o: &Order| o.order_id.clone())
216    ///     .build_postgres();
217    /// # Ok(())
218    /// # }
219    /// ```
220    pub fn with_keyset(mut self, column: &str, key_fn: impl Fn(&I) -> String + 'static) -> Self {
221        self.keyset_column = Some(column.to_string());
222        self.keyset_key_fn = Some(Box::new(key_fn));
223        self
224    }
225}
226
227impl<'a, I> RdbcItemReaderBuilder<'a, I>
228where
229    for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
230{
231    /// Builds a PostgreSQL reader.
232    ///
233    /// # Returns
234    /// A configured `PostgresRdbcItemReader` instance
235    ///
236    /// # Panics
237    /// Panics if PostgreSQL pool or query are missing
238    pub fn build_postgres(self) -> PostgresRdbcItemReader<'a, I> {
239        PostgresRdbcItemReader::new(
240            self.postgres_pool.expect("PostgreSQL pool is required"),
241            self.query.expect("Query is required"),
242            self.page_size,
243            self.keyset_column,
244            self.keyset_key_fn,
245        )
246    }
247}
248
249impl<'a, I> RdbcItemReaderBuilder<'a, I>
250where
251    for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
252{
253    /// Builds a MySQL reader.
254    ///
255    /// # Returns
256    /// A configured `MySqlRdbcItemReader` instance
257    ///
258    /// # Panics
259    /// Panics if MySQL pool or query are missing
260    pub fn build_mysql(self) -> MySqlRdbcItemReader<'a, I> {
261        MySqlRdbcItemReader::new(
262            self.mysql_pool.expect("MySQL pool is required"),
263            self.query.expect("Query is required"),
264            self.page_size,
265            self.keyset_column,
266            self.keyset_key_fn,
267        )
268    }
269}
270
271impl<'a, I> RdbcItemReaderBuilder<'a, I>
272where
273    for<'r> I: FromRow<'r, SqliteRow> + Send + Unpin + Clone,
274{
275    /// Builds a SQLite reader.
276    ///
277    /// # Returns
278    /// A configured `SqliteRdbcItemReader` instance
279    ///
280    /// # Panics
281    /// Panics if SQLite pool or query are missing
282    pub fn build_sqlite(self) -> SqliteRdbcItemReader<'a, I> {
283        SqliteRdbcItemReader::new(
284            self.sqlite_pool.expect("SQLite pool is required"),
285            self.query.expect("Query is required"),
286            self.page_size,
287            self.keyset_column,
288            self.keyset_key_fn,
289        )
290    }
291}
292
293impl<'a, I> Default for RdbcItemReaderBuilder<'a, I> {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use sqlx::{FromRow, SqlitePool};
303
304    #[derive(Clone, FromRow)]
305    struct Dummy {
306        id: i32,
307    }
308
309    #[test]
310    fn should_create_via_default() {
311        // Default == new(), both should produce identical builders
312        let _b = RdbcItemReaderBuilder::<Dummy>::default();
313    }
314
315    #[test]
316    #[should_panic(expected = "SQLite pool is required")]
317    fn should_panic_when_building_sqlite_without_pool() {
318        let _ = RdbcItemReaderBuilder::<Dummy>::new()
319            .query("SELECT id FROM t")
320            .build_sqlite();
321    }
322
323    #[tokio::test(flavor = "multi_thread")]
324    async fn should_build_sqlite_reader_with_pool_and_query() {
325        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
326        let reader = RdbcItemReaderBuilder::<Dummy>::new()
327            .sqlite(pool)
328            .query("SELECT 1 AS id")
329            .build_sqlite();
330        assert_eq!(reader.query, "SELECT 1 AS id");
331        assert_eq!(reader.page_size, None);
332        assert_eq!(reader.offset.get(), 0);
333    }
334
335    #[tokio::test(flavor = "multi_thread")]
336    async fn should_propagate_page_size_to_sqlite_reader() {
337        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
338        let reader = RdbcItemReaderBuilder::<Dummy>::new()
339            .sqlite(pool)
340            .query("SELECT 1 AS id")
341            .with_page_size(25)
342            .build_sqlite();
343        assert_eq!(reader.page_size, Some(25));
344    }
345
346    #[test]
347    #[should_panic(expected = "PostgreSQL pool is required")]
348    fn should_panic_when_building_postgres_without_pool() {
349        let _ = RdbcItemReaderBuilder::<Dummy>::new()
350            .query("SELECT id FROM t")
351            .build_postgres();
352    }
353
354    #[test]
355    #[should_panic(expected = "MySQL pool is required")]
356    fn should_panic_when_building_mysql_without_pool() {
357        let _ = RdbcItemReaderBuilder::<Dummy>::new()
358            .query("SELECT id FROM t")
359            .build_mysql();
360    }
361
362    #[tokio::test(flavor = "multi_thread")]
363    async fn should_propagate_keyset_to_sqlite_reader() {
364        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
365        let reader = RdbcItemReaderBuilder::<Dummy>::new()
366            .sqlite(pool)
367            .query("SELECT 1 AS id")
368            .with_page_size(5)
369            .with_keyset("id", |d: &Dummy| d.id.to_string())
370            .build_sqlite();
371        assert_eq!(
372            reader.keyset_column.as_deref(),
373            Some("id"),
374            "keyset column should be propagated to reader"
375        );
376        assert!(
377            reader.keyset_key.is_some(),
378            "keyset key fn should be propagated to reader"
379        );
380        assert!(
381            reader.last_cursor.borrow().is_none(),
382            "cursor starts as None"
383        );
384    }
385}