spring_batch_rs/item/rdbc/postgres_reader.rs
1use std::cell::{Cell, RefCell};
2
3use sqlx::{postgres::PgRow, Execute, FromRow, Pool, Postgres, QueryBuilder};
4
5use super::reader_common::{calculate_page_index, should_load_page};
6use crate::core::item::{ItemReader, ItemReaderResult};
7use crate::BatchError;
8
9/// PostgreSQL RDBC Item Reader for batch processing
10///
11/// This reader provides efficient reading of database records with optional pagination
12/// to manage memory usage. It implements the ItemReader trait for integration with
13/// Spring Batch processing patterns and is specifically optimized for PostgreSQL databases.
14///
15/// # Design
16///
17/// - Uses SQLx's PostgreSQL-specific driver for optimal performance
18/// - Supports automatic deserialization using the `FromRow` trait
19/// - Implements pagination with LIMIT/OFFSET for memory-efficient processing
20/// - Maintains an internal buffer to minimize database round trips
21/// - Uses interior mutability (Cell/RefCell) for state management in single-threaded contexts
22///
23/// # Memory Management
24///
25/// - Uses internal buffering with configurable page sizes
26/// - Automatically handles pagination with LIMIT/OFFSET SQL clauses
27/// - Clears buffer between pages to minimize memory footprint
28/// - Pre-allocates buffer capacity when page size is known
29///
30/// # Thread Safety
31///
32/// - Uses Cell and RefCell for interior mutability in single-threaded contexts
33/// - Not thread-safe - should be used within a single thread
34/// - Designed for use in Spring Batch's single-threaded step execution model
35///
36/// # How Pagination Works
37///
38/// When `page_size` is provided:
39/// - Data is loaded in batches of `page_size` items using SQL LIMIT/OFFSET
40/// - When all items in a batch have been read, a new batch is automatically loaded
41/// - The `offset` tracks both the SQL OFFSET clause and position within the buffer
42/// - Buffer is cleared and refilled for each new page to manage memory
43///
44/// When `page_size` is not provided:
45/// - All data is loaded in one query without LIMIT/OFFSET
46/// - The `offset` only tracks the current position in the buffer
47/// - Suitable for smaller datasets that fit comfortably in memory
48///
49/// # Type Parameters
50///
51/// * `I` - The item type that must implement:
52/// - `FromRow<PgRow>` for automatic deserialization from PostgreSQL rows
53/// - `Send + Unpin` for async compatibility
54/// - `Clone` for efficient item retrieval from the buffer
55///
56/// # Construction
57///
58/// This reader can only be created through `RdbcItemReaderBuilder`.
59/// Direct construction is not available to ensure proper configuration.
60pub struct PostgresRdbcItemReader<'a, I>
61where
62 for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
63{
64 /// Database connection pool for executing queries
65 /// Uses PostgreSQL-specific pool for optimal performance
66 pub(crate) pool: Pool<Postgres>,
67 /// Base SQL query (without LIMIT/OFFSET clauses)
68 /// Should be a SELECT statement that returns columns matching type I
69 pub(crate) query: &'a str,
70 /// Optional page size for pagination - if None, reads all data at once
71 /// When Some(n), data is read in chunks of n items
72 pub(crate) page_size: Option<i32>,
73 /// Current offset position in the result set
74 /// Tracks both SQL OFFSET and buffer position
75 pub(crate) offset: Cell<i32>,
76 /// Internal buffer to store the current page of items
77 /// Cleared and refilled for each new page
78 pub(crate) buffer: RefCell<Vec<I>>,
79}
80
81impl<'a, I> PostgresRdbcItemReader<'a, I>
82where
83 for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
84{
85 /// Creates a new PostgresRdbcItemReader with the specified parameters
86 ///
87 /// This constructor is only accessible within the crate to enforce the use
88 /// of `RdbcItemReaderBuilder` for creating reader instances.
89 ///
90 /// # Arguments
91 ///
92 /// * `pool` - PostgreSQL connection pool for database operations
93 /// * `query` - SQL query to execute (without LIMIT/OFFSET)
94 /// * `page_size` - Optional page size for pagination. None means read all at once.
95 ///
96 /// # Returns
97 ///
98 /// A new `PostgresRdbcItemReader` instance ready for use.
99 pub fn new(pool: Pool<Postgres>, query: &'a str, page_size: Option<i32>) -> Self {
100 Self {
101 pool,
102 query,
103 page_size,
104 offset: Cell::new(0),
105 buffer: RefCell::new(vec![]),
106 }
107 }
108
109 /// Reads a page of data from the database and stores it in the internal buffer
110 ///
111 /// This method constructs the paginated query by appending LIMIT and OFFSET
112 /// clauses to the base query, executes it against the PostgreSQL database,
113 /// and updates the internal buffer with the results.
114 ///
115 /// # Behavior
116 ///
117 /// - Clears the existing buffer before loading new data to manage memory
118 /// - Uses blocking async execution within the current runtime context
119 /// - Automatically calculates OFFSET based on current position and page size
120 /// - Handles both paginated and non-paginated queries appropriately
121 /// - Uses SQLx's `query_as` for automatic deserialization via `FromRow`
122 ///
123 /// # Database Query Construction
124 ///
125 /// For paginated queries (when page_size is Some):
126 /// ```sql
127 /// {base_query} LIMIT {page_size} OFFSET {current_offset}
128 /// ```
129 ///
130 /// For non-paginated queries (when page_size is None):
131 /// ```sql
132 /// {base_query}
133 /// ```
134 ///
135 /// # Errors
136 ///
137 /// Returns [`BatchError::ItemReader`] if the database query fails (e.g., connection
138 /// error, SQL syntax error, or deserialization failure).
139 ///
140 /// # Performance Considerations
141 ///
142 /// - Uses `block_in_place` to run async code in sync context
143 /// - Leverages connection pooling for efficient database access
144 /// - Minimizes memory usage by clearing buffer between pages
145 /// - Uses prepared statements through SQLx for query optimization
146 fn read_page(&self) -> Result<(), BatchError> {
147 // Build the paginated query by appending LIMIT/OFFSET to the base query
148 // QueryBuilder allows us to dynamically construct SQL with proper escaping
149 let mut query_builder = QueryBuilder::<Postgres>::new(self.query);
150
151 // Add pagination clauses only if page_size is configured
152 // This allows the same reader to work with both paginated and non-paginated queries
153 if let Some(page_size) = self.page_size {
154 query_builder.push(format!(" LIMIT {} OFFSET {}", page_size, self.offset.get()));
155 }
156
157 let query = query_builder.build();
158
159 // Execute the query synchronously within the async runtime
160 // This allows the reader to work in synchronous batch processing contexts
161 // while still leveraging async database operations under the hood
162 let items = tokio::task::block_in_place(|| {
163 tokio::runtime::Handle::current().block_on(async {
164 // Use query_as for automatic deserialization via FromRow trait
165 // This eliminates the need for manual row mapping
166 sqlx::query_as::<_, I>(query.sql())
167 .fetch_all(&self.pool)
168 .await
169 .map_err(|e| BatchError::ItemReader(e.to_string()))
170 })
171 })?;
172
173 // Clear the buffer and load the new page of data
174 // This ensures we don't accumulate items across pages, managing memory efficiently
175 self.buffer.borrow_mut().clear();
176 self.buffer.borrow_mut().extend(items);
177 Ok(())
178 }
179}
180
181/// Implementation of ItemReader trait for PostgresRdbcItemReader.
182///
183/// This implementation provides a way to read items from a PostgreSQL database
184/// with support for pagination. It uses an internal buffer to store the results
185/// of database queries and keeps track of the current offset to determine when
186/// a new page of data needs to be fetched.
187///
188/// The implementation handles both paginated and non-paginated reading modes
189/// transparently, making it suitable for various batch processing scenarios.
190impl<I> ItemReader<I> for PostgresRdbcItemReader<'_, I>
191where
192 for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
193{
194 /// Reads the next item from the PostgreSQL database
195 ///
196 /// This method implements the ItemReader trait and provides the core reading logic
197 /// with automatic pagination management:
198 ///
199 /// 1. **Index Calculation**: Determines the current position within the current page
200 /// 2. **Page Loading**: Loads a new page if we're at the beginning of a page
201 /// 3. **Item Retrieval**: Returns the item at the current position from the buffer
202 /// 4. **Offset Management**: Advances the offset for the next read operation
203 ///
204 /// # Pagination Logic
205 ///
206 /// For paginated reading (when page_size is Some):
207 /// - `index = offset % page_size` gives position within current page
208 /// - When `index == 0`, we're at the start of a new page and need to load data
209 /// - Buffer contains only the current page's items
210 ///
211 /// For non-paginated reading (when page_size is None):
212 /// - `index = offset` gives absolute position in the full result set
213 /// - Data is loaded only once when `index == 0` (first read)
214 /// - Buffer contains all items from the query
215 ///
216 /// # Returns
217 ///
218 /// - `Ok(Some(item))` if an item was successfully read
219 /// - `Ok(None)` if there are no more items to read (end of result set)
220 /// - `Err(BatchError::ItemReader)` if a database error occurred
221 ///
222 /// # Examples
223 ///
224 /// ```ignore
225 /// use spring_batch_rs::core::item::ItemReader;
226 /// # use spring_batch_rs::item::rdbc::PostgresRdbcItemReader;
227 /// # use sqlx::PgPool;
228 /// # use serde::Deserialize;
229 /// # #[derive(sqlx::FromRow, Clone, Deserialize)]
230 /// # struct User { id: i32, name: String }
231 ///
232 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
233 /// # let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
234 /// let reader = PostgresRdbcItemReader::<User>::new(
235 /// pool,
236 /// "SELECT id, name FROM users ORDER BY id",
237 /// Some(100)
238 /// );
239 ///
240 /// // Read items one by one
241 /// let mut count = 0;
242 /// while let Some(user) = reader.read()? {
243 /// println!("User: {} - {}", user.id, user.name);
244 /// count += 1;
245 /// }
246 /// println!("Processed {} users", count);
247 /// # Ok(())
248 /// # }
249 /// ```
250 fn read(&self) -> ItemReaderResult<I> {
251 let index = calculate_page_index(self.offset.get(), self.page_size);
252
253 if should_load_page(index) {
254 self.read_page()?;
255 }
256
257 let buffer = self.buffer.borrow();
258 let result = buffer.get(index as usize);
259
260 self.offset.set(self.offset.get() + 1);
261
262 Ok(result.cloned())
263 }
264}