spring_batch_rs/item/rdbc/unified_reader_builder.rs
1use sqlx::{
2 FromRow, MySql, Pool, Postgres, Sqlite, mysql::MySqlRow, postgres::PgRow, sqlite::SqliteRow,
3};
4use std::marker::PhantomData;
5
6use super::database_type::DatabaseType;
7use super::mysql_reader::MySqlRdbcItemReader;
8use super::postgres_reader::PostgresRdbcItemReader;
9use super::sqlite_reader::SqliteRdbcItemReader;
10
11/// Unified builder for creating RDBC item readers for any supported database type.
12///
13/// This builder provides a consistent API for constructing database readers
14/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
15/// Users specify the database type and connection pool, and the builder
16/// handles the creation of the appropriate reader implementation.
17///
18/// # Type Parameters
19///
20/// * `I` - The item type that implements the appropriate `FromRow` trait for the database
21///
22/// # Examples
23///
24/// ## PostgreSQL
25/// ```no_run
26/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
27/// use sqlx::PgPool;
28/// # use serde::Deserialize;
29/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
30/// # struct User { id: i32, name: String }
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
34///
35/// let reader = RdbcItemReaderBuilder::<User>::new()
36/// .postgres(pool)
37/// .query("SELECT id, name FROM users")
38/// .with_page_size(100)
39/// .build_postgres();
40/// # Ok(())
41/// # }
42/// ```
43///
44/// ## MySQL
45/// ```no_run
46/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
47/// use sqlx::MySqlPool;
48/// # use serde::Deserialize;
49/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
50/// # struct Product { id: i32, name: String }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
54///
55/// let reader = RdbcItemReaderBuilder::<Product>::new()
56/// .mysql(pool)
57/// .query("SELECT id, name FROM products")
58/// .with_page_size(100)
59/// .build_mysql();
60/// # Ok(())
61/// # }
62/// ```
63///
64/// ## SQLite
65/// ```no_run
66/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
67/// use sqlx::SqlitePool;
68/// # use serde::Deserialize;
69/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
70/// # struct Task { id: i32, title: String }
71///
72/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
73/// let pool = SqlitePool::connect("sqlite::memory:").await?;
74///
75/// let reader = RdbcItemReaderBuilder::<Task>::new()
76/// .sqlite(pool)
77/// .query("SELECT id, title FROM tasks")
78/// .with_page_size(100)
79/// .build_sqlite();
80/// # Ok(())
81/// # }
82/// ```
83pub struct RdbcItemReaderBuilder<'a, I> {
84 postgres_pool: Option<Pool<Postgres>>,
85 mysql_pool: Option<Pool<MySql>>,
86 sqlite_pool: Option<Pool<Sqlite>>,
87 query: Option<&'a str>,
88 page_size: Option<i32>,
89 db_type: Option<DatabaseType>,
90 keyset_column: Option<String>,
91 #[allow(clippy::type_complexity)]
92 keyset_key_fn: Option<Box<dyn Fn(&I) -> String>>,
93 _phantom: PhantomData<I>,
94}
95
96impl<'a, I> RdbcItemReaderBuilder<'a, I> {
97 /// Creates a new unified reader builder with default configuration.
98 pub fn new() -> Self {
99 Self {
100 postgres_pool: None,
101 mysql_pool: None,
102 sqlite_pool: None,
103 query: None,
104 page_size: None,
105 db_type: None,
106 keyset_column: None,
107 keyset_key_fn: None,
108 _phantom: PhantomData,
109 }
110 }
111
112 /// Sets the PostgreSQL connection pool and database type.
113 ///
114 /// # Arguments
115 /// * `pool` - The PostgreSQL connection pool
116 ///
117 /// # Returns
118 /// The updated builder instance configured for PostgreSQL
119 pub fn postgres(mut self, pool: Pool<Postgres>) -> Self {
120 self.postgres_pool = Some(pool);
121 self.db_type = Some(DatabaseType::Postgres);
122 self
123 }
124
125 /// Sets the MySQL connection pool and database type.
126 ///
127 /// # Arguments
128 /// * `pool` - The MySQL connection pool
129 ///
130 /// # Returns
131 /// The updated builder instance configured for MySQL
132 pub fn mysql(mut self, pool: Pool<MySql>) -> Self {
133 self.mysql_pool = Some(pool);
134 self.db_type = Some(DatabaseType::MySql);
135 self
136 }
137
138 /// Sets the SQLite connection pool and database type.
139 ///
140 /// # Arguments
141 /// * `pool` - The SQLite connection pool
142 ///
143 /// # Returns
144 /// The updated builder instance configured for SQLite
145 pub fn sqlite(mut self, pool: Pool<Sqlite>) -> Self {
146 self.sqlite_pool = Some(pool);
147 self.db_type = Some(DatabaseType::Sqlite);
148 self
149 }
150
151 /// Sets the SQL query for the reader.
152 ///
153 /// The query should not include LIMIT/OFFSET clauses as these are handled
154 /// automatically when page_size is configured.
155 ///
156 /// # Arguments
157 /// * `query` - The SQL query to execute
158 ///
159 /// # Returns
160 /// The updated builder instance
161 pub fn query(mut self, query: &'a str) -> Self {
162 self.query = Some(query);
163 self
164 }
165
166 /// Sets the page size for paginated reading.
167 ///
168 /// When set, the reader will fetch data in chunks of this size to manage
169 /// memory usage efficiently.
170 ///
171 /// # Arguments
172 /// * `page_size` - Number of items to read per page
173 ///
174 /// # Returns
175 /// The updated builder instance
176 pub fn with_page_size(mut self, page_size: i32) -> Self {
177 self.page_size = Some(page_size);
178 self
179 }
180
181 /// Enables keyset (cursor) pagination instead of LIMIT/OFFSET.
182 ///
183 /// Keyset pagination is O(log n) per page regardless of dataset size, making it
184 /// significantly faster than LIMIT/OFFSET for large tables.
185 ///
186 /// # Requirements
187 ///
188 /// - The query must **not** include `WHERE`, `ORDER BY`, or `LIMIT` clauses — the
189 /// framework appends them automatically.
190 /// - The keyset column must be indexed and have unique, sortable values (e.g.
191 /// primary key, UUID, zero-padded string ID).
192 /// - [`with_page_size`] must also be set.
193 ///
194 /// # Arguments
195 ///
196 /// * `column` - Column name used as the cursor (appended to `WHERE` and `ORDER BY`).
197 /// * `key_fn` - Closure that extracts the cursor value from an item as a `String`.
198 ///
199 /// # Examples
200 ///
201 /// ```no_run
202 /// use spring_batch_rs::item::rdbc::RdbcItemReaderBuilder;
203 /// use sqlx::PgPool;
204 /// # use serde::Deserialize;
205 /// # #[derive(sqlx::FromRow, Clone, Deserialize)]
206 /// # struct Order { order_id: String, amount: f64 }
207 ///
208 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
209 /// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
210 ///
211 /// let reader = RdbcItemReaderBuilder::<Order>::new()
212 /// .postgres(pool)
213 /// .query("SELECT order_id, amount FROM orders")
214 /// .with_page_size(1_000)
215 /// .with_keyset("order_id", |o: &Order| o.order_id.clone())
216 /// .build_postgres();
217 /// # Ok(())
218 /// # }
219 /// ```
220 pub fn with_keyset(mut self, column: &str, key_fn: impl Fn(&I) -> String + 'static) -> Self {
221 self.keyset_column = Some(column.to_string());
222 self.keyset_key_fn = Some(Box::new(key_fn));
223 self
224 }
225}
226
227impl<'a, I> RdbcItemReaderBuilder<'a, I>
228where
229 for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
230{
231 /// Builds a PostgreSQL reader.
232 ///
233 /// # Returns
234 /// A configured `PostgresRdbcItemReader` instance
235 ///
236 /// # Panics
237 /// Panics if PostgreSQL pool or query are missing
238 pub fn build_postgres(self) -> PostgresRdbcItemReader<'a, I> {
239 PostgresRdbcItemReader::new(
240 self.postgres_pool.expect("PostgreSQL pool is required"),
241 self.query.expect("Query is required"),
242 self.page_size,
243 self.keyset_column,
244 self.keyset_key_fn,
245 )
246 }
247}
248
249impl<'a, I> RdbcItemReaderBuilder<'a, I>
250where
251 for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
252{
253 /// Builds a MySQL reader.
254 ///
255 /// # Returns
256 /// A configured `MySqlRdbcItemReader` instance
257 ///
258 /// # Panics
259 /// Panics if MySQL pool or query are missing
260 pub fn build_mysql(self) -> MySqlRdbcItemReader<'a, I> {
261 MySqlRdbcItemReader::new(
262 self.mysql_pool.expect("MySQL pool is required"),
263 self.query.expect("Query is required"),
264 self.page_size,
265 self.keyset_column,
266 self.keyset_key_fn,
267 )
268 }
269}
270
271impl<'a, I> RdbcItemReaderBuilder<'a, I>
272where
273 for<'r> I: FromRow<'r, SqliteRow> + Send + Unpin + Clone,
274{
275 /// Builds a SQLite reader.
276 ///
277 /// # Returns
278 /// A configured `SqliteRdbcItemReader` instance
279 ///
280 /// # Panics
281 /// Panics if SQLite pool or query are missing
282 pub fn build_sqlite(self) -> SqliteRdbcItemReader<'a, I> {
283 SqliteRdbcItemReader::new(
284 self.sqlite_pool.expect("SQLite pool is required"),
285 self.query.expect("Query is required"),
286 self.page_size,
287 self.keyset_column,
288 self.keyset_key_fn,
289 )
290 }
291}
292
293impl<'a, I> Default for RdbcItemReaderBuilder<'a, I> {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use sqlx::{FromRow, SqlitePool};
303
304 #[derive(Clone, FromRow)]
305 struct Dummy {
306 id: i32,
307 }
308
309 #[test]
310 fn should_create_via_default() {
311 // Default == new(), both should produce identical builders
312 let _b = RdbcItemReaderBuilder::<Dummy>::default();
313 }
314
315 #[test]
316 #[should_panic(expected = "SQLite pool is required")]
317 fn should_panic_when_building_sqlite_without_pool() {
318 let _ = RdbcItemReaderBuilder::<Dummy>::new()
319 .query("SELECT id FROM t")
320 .build_sqlite();
321 }
322
323 #[tokio::test(flavor = "multi_thread")]
324 async fn should_build_sqlite_reader_with_pool_and_query() {
325 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
326 let reader = RdbcItemReaderBuilder::<Dummy>::new()
327 .sqlite(pool)
328 .query("SELECT 1 AS id")
329 .build_sqlite();
330 assert_eq!(reader.query, "SELECT 1 AS id");
331 assert_eq!(reader.page_size, None);
332 assert_eq!(reader.offset.get(), 0);
333 }
334
335 #[tokio::test(flavor = "multi_thread")]
336 async fn should_propagate_page_size_to_sqlite_reader() {
337 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
338 let reader = RdbcItemReaderBuilder::<Dummy>::new()
339 .sqlite(pool)
340 .query("SELECT 1 AS id")
341 .with_page_size(25)
342 .build_sqlite();
343 assert_eq!(reader.page_size, Some(25));
344 }
345
346 #[test]
347 #[should_panic(expected = "PostgreSQL pool is required")]
348 fn should_panic_when_building_postgres_without_pool() {
349 let _ = RdbcItemReaderBuilder::<Dummy>::new()
350 .query("SELECT id FROM t")
351 .build_postgres();
352 }
353
354 #[test]
355 #[should_panic(expected = "MySQL pool is required")]
356 fn should_panic_when_building_mysql_without_pool() {
357 let _ = RdbcItemReaderBuilder::<Dummy>::new()
358 .query("SELECT id FROM t")
359 .build_mysql();
360 }
361
362 #[tokio::test(flavor = "multi_thread")]
363 async fn should_propagate_keyset_to_sqlite_reader() {
364 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
365 let reader = RdbcItemReaderBuilder::<Dummy>::new()
366 .sqlite(pool)
367 .query("SELECT 1 AS id")
368 .with_page_size(5)
369 .with_keyset("id", |d: &Dummy| d.id.to_string())
370 .build_sqlite();
371 assert_eq!(
372 reader.keyset_column.as_deref(),
373 Some("id"),
374 "keyset column should be propagated to reader"
375 );
376 assert!(
377 reader.keyset_key.is_some(),
378 "keyset key fn should be propagated to reader"
379 );
380 assert!(
381 reader.last_cursor.borrow().is_none(),
382 "cursor starts as None"
383 );
384 }
385}