Skip to main content

spring_batch_rs/item/rdbc/
postgres_reader.rs

1use std::cell::{Cell, RefCell};
2
3use sqlx::{postgres::PgRow, Execute, FromRow, Pool, Postgres, QueryBuilder};
4
5use super::reader_common::{calculate_page_index, should_load_page};
6use crate::core::item::{ItemReader, ItemReaderResult};
7use crate::BatchError;
8
9/// PostgreSQL RDBC Item Reader for batch processing
10///
11/// This reader provides efficient reading of database records with optional pagination
12/// to manage memory usage. It implements the ItemReader trait for integration with
13/// Spring Batch processing patterns and is specifically optimized for PostgreSQL databases.
14///
15/// # Design
16///
17/// - Uses SQLx's PostgreSQL-specific driver for optimal performance
18/// - Supports automatic deserialization using the `FromRow` trait
19/// - Implements pagination with LIMIT/OFFSET for memory-efficient processing
20/// - Maintains an internal buffer to minimize database round trips
21/// - Uses interior mutability (Cell/RefCell) for state management in single-threaded contexts
22///
23/// # Memory Management
24///
25/// - Uses internal buffering with configurable page sizes
26/// - Automatically handles pagination with LIMIT/OFFSET SQL clauses
27/// - Clears buffer between pages to minimize memory footprint
28/// - Pre-allocates buffer capacity when page size is known
29///
30/// # Thread Safety
31///
32/// - Uses Cell and RefCell for interior mutability in single-threaded contexts
33/// - Not thread-safe - should be used within a single thread
34/// - Designed for use in Spring Batch's single-threaded step execution model
35///
36/// # How Pagination Works
37///
38/// When `page_size` is provided:
39/// - Data is loaded in batches of `page_size` items using SQL LIMIT/OFFSET
40/// - When all items in a batch have been read, a new batch is automatically loaded
41/// - The `offset` tracks both the SQL OFFSET clause and position within the buffer
42/// - Buffer is cleared and refilled for each new page to manage memory
43///
44/// When `page_size` is not provided:
45/// - All data is loaded in one query without LIMIT/OFFSET
46/// - The `offset` only tracks the current position in the buffer
47/// - Suitable for smaller datasets that fit comfortably in memory
48///
49/// # Type Parameters
50///
51/// * `I` - The item type that must implement:
52///   - `FromRow<PgRow>` for automatic deserialization from PostgreSQL rows
53///   - `Send + Unpin` for async compatibility
54///   - `Clone` for efficient item retrieval from the buffer
55///
56/// # Construction
57///
58/// This reader can only be created through `RdbcItemReaderBuilder`.
59/// Direct construction is not available to ensure proper configuration.
60pub struct PostgresRdbcItemReader<'a, I>
61where
62    for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
63{
64    /// Database connection pool for executing queries
65    /// Uses PostgreSQL-specific pool for optimal performance
66    pub(crate) pool: Pool<Postgres>,
67    /// Base SQL query (without LIMIT/OFFSET clauses)
68    /// Should be a SELECT statement that returns columns matching type I
69    pub(crate) query: &'a str,
70    /// Optional page size for pagination - if None, reads all data at once
71    /// When Some(n), data is read in chunks of n items
72    pub(crate) page_size: Option<i32>,
73    /// Current offset position in the result set
74    /// Tracks both SQL OFFSET and buffer position
75    pub(crate) offset: Cell<i32>,
76    /// Internal buffer to store the current page of items
77    /// Cleared and refilled for each new page
78    pub(crate) buffer: RefCell<Vec<I>>,
79}
80
81impl<'a, I> PostgresRdbcItemReader<'a, I>
82where
83    for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
84{
85    /// Creates a new PostgresRdbcItemReader with the specified parameters
86    ///
87    /// This constructor is only accessible within the crate to enforce the use
88    /// of `RdbcItemReaderBuilder` for creating reader instances.
89    ///
90    /// # Arguments
91    ///
92    /// * `pool` - PostgreSQL connection pool for database operations
93    /// * `query` - SQL query to execute (without LIMIT/OFFSET)
94    /// * `page_size` - Optional page size for pagination. None means read all at once.
95    ///
96    /// # Returns
97    ///
98    /// A new `PostgresRdbcItemReader` instance ready for use.
99    pub fn new(pool: Pool<Postgres>, query: &'a str, page_size: Option<i32>) -> Self {
100        Self {
101            pool,
102            query,
103            page_size,
104            offset: Cell::new(0),
105            buffer: RefCell::new(vec![]),
106        }
107    }
108
109    /// Reads a page of data from the database and stores it in the internal buffer
110    ///
111    /// This method constructs the paginated query by appending LIMIT and OFFSET
112    /// clauses to the base query, executes it against the PostgreSQL database,
113    /// and updates the internal buffer with the results.
114    ///
115    /// # Behavior
116    ///
117    /// - Clears the existing buffer before loading new data to manage memory
118    /// - Uses blocking async execution within the current runtime context
119    /// - Automatically calculates OFFSET based on current position and page size
120    /// - Handles both paginated and non-paginated queries appropriately
121    /// - Uses SQLx's `query_as` for automatic deserialization via `FromRow`
122    ///
123    /// # Database Query Construction
124    ///
125    /// For paginated queries (when page_size is Some):
126    /// ```sql
127    /// {base_query} LIMIT {page_size} OFFSET {current_offset}
128    /// ```
129    ///
130    /// For non-paginated queries (when page_size is None):
131    /// ```sql
132    /// {base_query}
133    /// ```
134    ///
135    /// # Errors
136    ///
137    /// Returns [`BatchError::ItemReader`] if the database query fails (e.g., connection
138    /// error, SQL syntax error, or deserialization failure).
139    ///
140    /// # Performance Considerations
141    ///
142    /// - Uses `block_in_place` to run async code in sync context
143    /// - Leverages connection pooling for efficient database access
144    /// - Minimizes memory usage by clearing buffer between pages
145    /// - Uses prepared statements through SQLx for query optimization
146    fn read_page(&self) -> Result<(), BatchError> {
147        // Build the paginated query by appending LIMIT/OFFSET to the base query
148        // QueryBuilder allows us to dynamically construct SQL with proper escaping
149        let mut query_builder = QueryBuilder::<Postgres>::new(self.query);
150
151        // Add pagination clauses only if page_size is configured
152        // This allows the same reader to work with both paginated and non-paginated queries
153        if let Some(page_size) = self.page_size {
154            query_builder.push(format!(" LIMIT {} OFFSET {}", page_size, self.offset.get()));
155        }
156
157        let query = query_builder.build();
158
159        // Execute the query synchronously within the async runtime
160        // This allows the reader to work in synchronous batch processing contexts
161        // while still leveraging async database operations under the hood
162        let items = tokio::task::block_in_place(|| {
163            tokio::runtime::Handle::current().block_on(async {
164                // Use query_as for automatic deserialization via FromRow trait
165                // This eliminates the need for manual row mapping
166                sqlx::query_as::<_, I>(query.sql())
167                    .fetch_all(&self.pool)
168                    .await
169                    .map_err(|e| BatchError::ItemReader(e.to_string()))
170            })
171        })?;
172
173        // Clear the buffer and load the new page of data
174        // This ensures we don't accumulate items across pages, managing memory efficiently
175        self.buffer.borrow_mut().clear();
176        self.buffer.borrow_mut().extend(items);
177        Ok(())
178    }
179}
180
181/// Implementation of ItemReader trait for PostgresRdbcItemReader.
182///
183/// This implementation provides a way to read items from a PostgreSQL database
184/// with support for pagination. It uses an internal buffer to store the results
185/// of database queries and keeps track of the current offset to determine when
186/// a new page of data needs to be fetched.
187///
188/// The implementation handles both paginated and non-paginated reading modes
189/// transparently, making it suitable for various batch processing scenarios.
190impl<I> ItemReader<I> for PostgresRdbcItemReader<'_, I>
191where
192    for<'r> I: FromRow<'r, PgRow> + Send + Unpin + Clone,
193{
194    /// Reads the next item from the PostgreSQL database
195    ///
196    /// This method implements the ItemReader trait and provides the core reading logic
197    /// with automatic pagination management:
198    ///
199    /// 1. **Index Calculation**: Determines the current position within the current page
200    /// 2. **Page Loading**: Loads a new page if we're at the beginning of a page
201    /// 3. **Item Retrieval**: Returns the item at the current position from the buffer
202    /// 4. **Offset Management**: Advances the offset for the next read operation
203    ///
204    /// # Pagination Logic
205    ///
206    /// For paginated reading (when page_size is Some):
207    /// - `index = offset % page_size` gives position within current page
208    /// - When `index == 0`, we're at the start of a new page and need to load data
209    /// - Buffer contains only the current page's items
210    ///
211    /// For non-paginated reading (when page_size is None):
212    /// - `index = offset` gives absolute position in the full result set
213    /// - Data is loaded only once when `index == 0` (first read)
214    /// - Buffer contains all items from the query
215    ///
216    /// # Returns
217    ///
218    /// - `Ok(Some(item))` if an item was successfully read
219    /// - `Ok(None)` if there are no more items to read (end of result set)
220    /// - `Err(BatchError::ItemReader)` if a database error occurred
221    ///
222    /// # Examples
223    ///
224    /// ```ignore
225    /// use spring_batch_rs::core::item::ItemReader;
226    /// # use spring_batch_rs::item::rdbc::PostgresRdbcItemReader;
227    /// # use sqlx::PgPool;
228    /// # use serde::Deserialize;
229    /// # #[derive(sqlx::FromRow, Clone, Deserialize)]
230    /// # struct User { id: i32, name: String }
231    ///
232    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
233    /// # let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
234    /// let reader = PostgresRdbcItemReader::<User>::new(
235    ///     pool,
236    ///     "SELECT id, name FROM users ORDER BY id",
237    ///     Some(100)
238    /// );
239    ///
240    /// // Read items one by one
241    /// let mut count = 0;
242    /// while let Some(user) = reader.read()? {
243    ///     println!("User: {} - {}", user.id, user.name);
244    ///     count += 1;
245    /// }
246    /// println!("Processed {} users", count);
247    /// # Ok(())
248    /// # }
249    /// ```
250    fn read(&self) -> ItemReaderResult<I> {
251        let index = calculate_page_index(self.offset.get(), self.page_size);
252
253        if should_load_page(index) {
254            self.read_page()?;
255        }
256
257        let buffer = self.buffer.borrow();
258        let result = buffer.get(index as usize);
259
260        self.offset.set(self.offset.get() + 1);
261
262        Ok(result.cloned())
263    }
264}