spring_batch_rs/item/rdbc/unified_reader_builder.rs
1use sqlx::{
2 mysql::MySqlRow, postgres::PgRow, sqlite::SqliteRow, FromRow, MySql, Pool, Postgres, Sqlite,
3};
4use std::marker::PhantomData;
5
6use super::database_type::DatabaseType;
7use super::mysql_reader::MySqlRdbcItemReader;
8use super::postgres_reader::PostgresRdbcItemReader;
9use super::sqlite_reader::SqliteRdbcItemReader;
10
11/// Unified builder for creating RDBC item readers for any supported database type.
12///
13/// This builder provides a consistent API for constructing database readers
14/// regardless of the underlying database (PostgreSQL, MySQL, or SQLite).
15/// Users specify the database type and connection pool, and the builder
16/// handles the creation of the appropriate reader implementation.
17///
18/// # Type Parameters
19///
20/// * `I` - The item type that implements the appropriate `FromRow` trait for the database
21///
22/// # Examples
23///
24/// ## PostgreSQL
25/// ```no_run
26/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
27/// use sqlx::PgPool;
28/// # use serde::Deserialize;
29/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
30/// # struct User { id: i32, name: String }
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
34///
35/// let reader = RdbcItemReaderBuilder::<User>::new()
36/// .postgres(pool)
37/// .query("SELECT id, name FROM users")
38/// .with_page_size(100)
39/// .build_postgres();
40/// # Ok(())
41/// # }
42/// ```
43///
44/// ## MySQL
45/// ```no_run
46/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
47/// use sqlx::MySqlPool;
48/// # use serde::Deserialize;
49/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
50/// # struct Product { id: i32, name: String }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
54///
55/// let reader = RdbcItemReaderBuilder::<Product>::new()
56/// .mysql(pool)
57/// .query("SELECT id, name FROM products")
58/// .with_page_size(100)
59/// .build_mysql();
60/// # Ok(())
61/// # }
62/// ```
63///
64/// ## SQLite
65/// ```no_run
66/// use spring_batch_rs::item::rdbc::{RdbcItemReaderBuilder, DatabaseType};
67/// use sqlx::SqlitePool;
68/// # use serde::Deserialize;
69/// # #[derive(sqlx::FromRow, Clone, Deserialize)]
70/// # struct Task { id: i32, title: String }
71///
72/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
73/// let pool = SqlitePool::connect("sqlite::memory:").await?;
74///
75/// let reader = RdbcItemReaderBuilder::<Task>::new()
76/// .sqlite(pool)
77/// .query("SELECT id, title FROM tasks")
78/// .with_page_size(100)
79/// .build_sqlite();
80/// # Ok(())
81/// # }
82/// ```
83pub struct RdbcItemReaderBuilder<'a, I> {
84 postgres_pool: Option<Pool<Postgres>>,
85 mysql_pool: Option<Pool<MySql>>,
86 sqlite_pool: Option<Pool<Sqlite>>,
87 query: Option<&'a str>,
88 page_size: Option<i32>,
89 db_type: Option<DatabaseType>,
90 _phantom: PhantomData<I>,
91}
92
93impl<'a, I> RdbcItemReaderBuilder<'a, I> {
94 /// Creates a new unified reader builder with default configuration.
95 pub fn new() -> Self {
96 Self {
97 postgres_pool: None,
98 mysql_pool: None,
99 sqlite_pool: None,
100 query: None,
101 page_size: None,
102 db_type: None,
103 _phantom: PhantomData,
104 }
105 }
106
107 /// Sets the PostgreSQL connection pool and database type.
108 ///
109 /// # Arguments
110 /// * `pool` - The PostgreSQL connection pool
111 ///
112 /// # Returns
113 /// The updated builder instance configured for PostgreSQL
114 pub fn postgres(mut self, pool: Pool<Postgres>) -> Self {
115 self.postgres_pool = Some(pool);
116 self.db_type = Some(DatabaseType::Postgres);
117 self
118 }
119
120 /// Sets the MySQL connection pool and database type.
121 ///
122 /// # Arguments
123 /// * `pool` - The MySQL connection pool
124 ///
125 /// # Returns
126 /// The updated builder instance configured for MySQL
127 pub fn mysql(mut self, pool: Pool<MySql>) -> Self {
128 self.mysql_pool = Some(pool);
129 self.db_type = Some(DatabaseType::MySql);
130 self
131 }
132
133 /// Sets the SQLite connection pool and database type.
134 ///
135 /// # Arguments
136 /// * `pool` - The SQLite connection pool
137 ///
138 /// # Returns
139 /// The updated builder instance configured for SQLite
140 pub fn sqlite(mut self, pool: Pool<Sqlite>) -> Self {
141 self.sqlite_pool = Some(pool);
142 self.db_type = Some(DatabaseType::Sqlite);
143 self
144 }
145
146 /// Sets the SQL query for the reader.
147 ///
148 /// The query should not include LIMIT/OFFSET clauses as these are handled
149 /// automatically when page_size is configured.
150 ///
151 /// # Arguments
152 /// * `query` - The SQL query to execute
153 ///
154 /// # Returns
155 /// The updated builder instance
156 pub fn query(mut self, query: &'a str) -> Self {
157 self.query = Some(query);
158 self
159 }
160
161 /// Sets the page size for paginated reading.
162 ///
163 /// When set, the reader will fetch data in chunks of this size to manage
164 /// memory usage efficiently.
165 ///
166 /// # Arguments
167 /// * `page_size` - Number of items to read per page
168 ///
169 /// # Returns
170 /// The updated builder instance
171 pub fn with_page_size(mut self, page_size: i32) -> Self {
172 self.page_size = Some(page_size);
173 self
174 }
175}
176
177impl<'a, I> RdbcItemReaderBuilder<'a, I>
178where
179 for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
180{
181 /// Builds a PostgreSQL reader.
182 ///
183 /// # Returns
184 /// A configured `PostgresRdbcItemReader` instance
185 ///
186 /// # Panics
187 /// Panics if PostgreSQL pool or query are missing
188 pub fn build_postgres(self) -> PostgresRdbcItemReader<'a, I> {
189 PostgresRdbcItemReader::new(
190 self.postgres_pool.expect("PostgreSQL pool is required"),
191 self.query.expect("Query is required"),
192 self.page_size,
193 )
194 }
195}
196
197impl<'a, I> RdbcItemReaderBuilder<'a, I>
198where
199 for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
200{
201 /// Builds a MySQL reader.
202 ///
203 /// # Returns
204 /// A configured `MySqlRdbcItemReader` instance
205 ///
206 /// # Panics
207 /// Panics if MySQL pool or query are missing
208 pub fn build_mysql(self) -> MySqlRdbcItemReader<'a, I> {
209 MySqlRdbcItemReader::new(
210 self.mysql_pool.expect("MySQL pool is required"),
211 self.query.expect("Query is required"),
212 self.page_size,
213 )
214 }
215}
216
217impl<'a, I> RdbcItemReaderBuilder<'a, I>
218where
219 for<'r> I: FromRow<'r, SqliteRow> + Send + Unpin + Clone,
220{
221 /// Builds a SQLite reader.
222 ///
223 /// # Returns
224 /// A configured `SqliteRdbcItemReader` instance
225 ///
226 /// # Panics
227 /// Panics if SQLite pool or query are missing
228 pub fn build_sqlite(self) -> SqliteRdbcItemReader<'a, I> {
229 SqliteRdbcItemReader::new(
230 self.sqlite_pool.expect("SQLite pool is required"),
231 self.query.expect("Query is required"),
232 self.page_size,
233 )
234 }
235}
236
237impl<'a, I> Default for RdbcItemReaderBuilder<'a, I> {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use sqlx::{FromRow, SqlitePool};
247
248 #[derive(Clone, FromRow)]
249 struct Dummy {
250 id: i32,
251 }
252
253 #[test]
254 fn should_create_via_default() {
255 // Default == new(), both should produce identical builders
256 let _b = RdbcItemReaderBuilder::<Dummy>::default();
257 }
258
259 #[test]
260 #[should_panic(expected = "SQLite pool is required")]
261 fn should_panic_when_building_sqlite_without_pool() {
262 let _ = RdbcItemReaderBuilder::<Dummy>::new()
263 .query("SELECT id FROM t")
264 .build_sqlite();
265 }
266
267 #[tokio::test(flavor = "multi_thread")]
268 async fn should_build_sqlite_reader_with_pool_and_query() {
269 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
270 let reader = RdbcItemReaderBuilder::<Dummy>::new()
271 .sqlite(pool)
272 .query("SELECT 1 AS id")
273 .build_sqlite();
274 assert_eq!(reader.query, "SELECT 1 AS id");
275 assert_eq!(reader.page_size, None);
276 assert_eq!(reader.offset.get(), 0);
277 }
278
279 #[tokio::test(flavor = "multi_thread")]
280 async fn should_propagate_page_size_to_sqlite_reader() {
281 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
282 let reader = RdbcItemReaderBuilder::<Dummy>::new()
283 .sqlite(pool)
284 .query("SELECT 1 AS id")
285 .with_page_size(25)
286 .build_sqlite();
287 assert_eq!(reader.page_size, Some(25));
288 }
289
290 #[test]
291 #[should_panic(expected = "PostgreSQL pool is required")]
292 fn should_panic_when_building_postgres_without_pool() {
293 let _ = RdbcItemReaderBuilder::<Dummy>::new()
294 .query("SELECT id FROM t")
295 .build_postgres();
296 }
297
298 #[test]
299 #[should_panic(expected = "MySQL pool is required")]
300 fn should_panic_when_building_mysql_without_pool() {
301 let _ = RdbcItemReaderBuilder::<Dummy>::new()
302 .query("SELECT id FROM t")
303 .build_mysql();
304 }
305}