Skip to main content

spring_batch_rs/item/orm/
orm_reader.rs

1use std::cell::{Cell, RefCell};
2
3use sea_orm::{DatabaseConnection, DbErr, EntityTrait, FromQueryResult, PaginatorTrait, Select};
4
5use crate::{
6    BatchError,
7    core::item::{ItemReader, ItemReaderResult},
8};
9
10/// A reader for reading entities from a database using SeaORM.
11///
12/// This reader provides an implementation of the `ItemReader` trait for SeaORM-based
13/// database operations. It supports reading entities from any SeaORM-compatible database
14/// with optional pagination for efficient memory usage when dealing with large datasets.
15///
16/// # Generic Parameters
17///
18/// - `I`: The SeaORM entity type that implements `EntityTrait`
19///
20/// # Design Philosophy
21///
22/// This reader follows SeaORM's conventions and leverages its built-in pagination
23/// mechanisms for optimal performance. It provides a bridge between SeaORM's async
24/// API and the batch framework's synchronous `ItemReader` trait.
25///
26/// # Memory Management
27///
28/// The reader uses an internal buffer to store entity models from the current page,
29/// which helps balance memory usage with query performance. When pagination is enabled,
30/// only one page of data is kept in memory at any time.
31///
32/// # Query Flexibility
33///
34/// Accepts any SeaORM `Select` query, allowing for complex filtering, joins, and
35/// ordering. The query is executed as-is, with pagination parameters added when configured.
36///
37/// # State Management
38///
39/// - `offset`: Tracks the absolute position across all pages
40/// - `current_page`: Tracks which page is currently loaded in the buffer
41/// - `buffer`: Holds the current page's entity models
42pub struct OrmItemReader<'a, I>
43where
44    I: EntityTrait,
45{
46    /// Database connection reference - borrowed for the lifetime of the reader
47    /// This ensures the connection remains valid throughout the reader's lifecycle
48    connection: Option<&'a DatabaseConnection>,
49
50    /// SeaORM select query to execute
51    /// This query is cloned for each page fetch to maintain immutability
52    query: Option<Select<I>>,
53
54    /// Optional page size for pagination
55    /// When Some(size), data is loaded in chunks of this size
56    /// When None, all data is loaded at once
57    page_size: Option<u64>,
58
59    /// Current offset for tracking position across all pages
60    /// Uses Cell for interior mutability in single-threaded context
61    /// This tracks the absolute position, not just within the current page
62    offset: Cell<u64>,
63
64    /// Internal buffer for storing fetched items from the current page
65    /// Uses RefCell for interior mutability to allow borrowing during reads
66    /// The buffer is cleared and refilled when a new page is loaded
67    buffer: RefCell<Vec<I::Model>>,
68
69    /// Current page number (used with pagination)
70    /// Starts at 0 and increments when moving to the next page
71    /// Only relevant when page_size is Some(_)
72    current_page: Cell<u64>,
73}
74
75impl<'a, I> Default for OrmItemReader<'a, I>
76where
77    I: EntityTrait,
78    I::Model: FromQueryResult + Send + Sync + Clone,
79{
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl<'a, I> OrmItemReader<'a, I>
86where
87    I: EntityTrait,
88    I::Model: FromQueryResult + Send + Sync + Clone,
89{
90    /// Creates a new `OrmItemReader` with default configuration.
91    ///
92    /// All parameters must be set using the builder methods before use.
93    /// Use the builder pattern for a more convenient API.
94    ///
95    /// # Returns
96    ///
97    /// A new `OrmItemReader` instance with default settings.
98    ///
99    /// # Examples
100    ///
101    /// ```no_run
102    /// use spring_batch_rs::item::orm::OrmItemReader;
103    /// use sea_orm::{Database, EntityTrait};
104    ///
105    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
106    /// let db = Database::connect("sqlite::memory:").await?;
107    ///
108    /// // Using direct instantiation (less recommended)
109    /// // let reader = OrmItemReader::<MyEntity>::new()
110    /// //     .connection(&db)
111    /// //     .query(MyEntity::find())
112    /// //     .page_size(100);
113    ///
114    /// // Using builder (recommended)
115    /// // let reader = OrmItemReaderBuilder::new()
116    /// //     .connection(&db)
117    /// //     .query(MyEntity::find())
118    /// //     .page_size(100)
119    /// //     .build();
120    /// # Ok(())
121    /// # }
122    /// ```
123    pub fn new() -> Self {
124        Self {
125            connection: None,
126            query: None,
127            page_size: None,
128            offset: Cell::new(0),
129            buffer: RefCell::new(Vec::new()),
130            current_page: Cell::new(0),
131        }
132    }
133
134    /// Sets the database connection for the reader.
135    ///
136    /// This is a required parameter that must be set before using the reader.
137    ///
138    /// # Arguments
139    ///
140    /// * `connection` - Reference to the SeaORM database connection
141    ///
142    /// # Returns
143    ///
144    /// The updated `OrmItemReader` instance.
145    ///
146    /// # Examples
147    ///
148    /// ```compile_fail
149    /// use spring_batch_rs::item::orm::OrmItemReader;
150    /// use sea_orm::Database;
151    ///
152    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
153    /// let db = Database::connect("sqlite::memory:").await?;
154    /// let reader = OrmItemReader::<String>::new()
155    ///     .connection(&db);
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
160        self.connection = Some(connection);
161        self
162    }
163
164    /// Sets the SeaORM select query for the reader.
165    ///
166    /// This is a required parameter that must be set before using the reader.
167    /// The query can include filtering, ordering, joins, and other SeaORM operations.
168    ///
169    /// # Arguments
170    ///
171    /// * `query` - The SeaORM select query to execute
172    ///
173    /// # Returns
174    ///
175    /// The updated `OrmItemReader` instance.
176    ///
177    /// # Examples
178    ///
179    /// ```no_run
180    /// use spring_batch_rs::item::orm::OrmItemReader;
181    /// use sea_orm::EntityTrait;
182    ///
183    /// // Basic query
184    /// // let reader = OrmItemReader::<MyEntity>::new()
185    /// //     .query(MyEntity::find());
186    ///
187    /// // Filtered query
188    /// // let reader = OrmItemReader::<MyEntity>::new()
189    /// //     .query(MyEntity::find().filter(my_entity::Column::Active.eq(true)));
190    ///
191    /// // Ordered query
192    /// // let reader = OrmItemReader::<MyEntity>::new()
193    /// //     .query(MyEntity::find().order_by_asc(my_entity::Column::Id));
194    /// ```
195    pub fn query(mut self, query: Select<I>) -> Self {
196        self.query = Some(query);
197        // Pre-allocate buffer capacity based on page size if available
198        if let Some(page_size) = self.page_size {
199            self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
200        }
201        self
202    }
203
204    /// Sets the page size for the reader.
205    ///
206    /// When set, the reader will use pagination to limit memory usage.
207    /// When not set, all data will be loaded at once.
208    ///
209    /// # Arguments
210    ///
211    /// * `page_size` - The number of items to read per page
212    ///
213    /// # Returns
214    ///
215    /// The updated `OrmItemReader` instance.
216    ///
217    /// # Performance Considerations
218    ///
219    /// - Larger page sizes reduce the number of database round trips but use more memory
220    /// - Smaller page sizes use less memory but may result in more database queries
221    /// - Consider your dataset size and available memory when choosing a page size
222    /// - Recommended range: 50-1000 items per page for most use cases
223    ///
224    /// # Examples
225    ///
226    /// ```compile_fail
227    /// use spring_batch_rs::item::orm::OrmItemReader;
228    ///
229    /// // Small page size for memory-constrained environments
230    /// let reader = OrmItemReader::<String>::new()
231    ///     .page_size(50);
232    ///
233    /// // Larger page size for better performance with ample memory
234    /// let reader = OrmItemReader::<String>::new()
235    ///     .page_size(1000);
236    /// ```
237    pub fn page_size(mut self, page_size: u64) -> Self {
238        self.page_size = Some(page_size);
239        // Update buffer capacity if available
240        self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
241        self
242    }
243
244    /// Reads a page of items from the database using SeaORM.
245    ///
246    /// This method executes the SeaORM query with pagination parameters (if page_size is set),
247    /// and fills the internal buffer with the results. It uses SeaORM's async methods
248    /// within a blocking context to maintain compatibility with the synchronous ItemReader trait.
249    ///
250    /// # Pagination Logic
251    ///
252    /// - **With pagination**: Uses SeaORM's `paginate()` method to fetch a specific page
253    /// - **Without pagination**: Uses SeaORM's `all()` method to fetch all results
254    ///
255    /// # Buffer Management
256    ///
257    /// - Clears the existing buffer before loading new data
258    /// - Stores all entity models directly in the buffer for sequential access
259    ///
260    /// # Error Handling
261    ///
262    /// Returns a `DbErr` if the database query fails. This error will be converted
263    /// to a `BatchError` by the calling method.
264    async fn read_page_async(&self) -> Result<(), DbErr> {
265        let results = if let Some(page_size) = self.page_size {
266            // Use SeaORM's paginator for efficient pagination
267            // This creates a paginator that can fetch specific pages
268            let paginator = self
269                .query
270                .as_ref()
271                .unwrap()
272                .clone()
273                .paginate(self.connection.unwrap(), page_size);
274            let current_page = self.current_page.get();
275
276            // Fetch the specific page we're currently on
277            // Pages are 0-indexed in SeaORM's paginator
278            paginator.fetch_page(current_page).await?
279        } else {
280            // Load all results at once - suitable for smaller datasets
281            // This executes the query and returns all matching rows
282            self.query
283                .as_ref()
284                .unwrap()
285                .clone()
286                .all(self.connection.unwrap())
287                .await?
288        };
289
290        // Clear the buffer and fill it with entity models
291        let mut buffer = self.buffer.borrow_mut();
292        buffer.clear(); // Remove any existing items from the previous page
293
294        // Store entity models directly in the buffer
295        buffer.extend(results);
296
297        Ok(())
298    }
299
300    /// Reads a page of items from the database.
301    ///
302    /// This is a synchronous wrapper around the async `read_page_async` method.
303    /// It uses tokio's `block_in_place` to run the async operation in a blocking context,
304    /// which is necessary because the `ItemReader` trait is synchronous.
305    ///
306    /// # Async-to-Sync Bridge
307    ///
308    /// SeaORM is inherently async, but the batch framework uses synchronous traits
309    /// for simplicity. This method bridges that gap by:
310    /// 1. Using `block_in_place` to avoid blocking the tokio runtime
311    /// 2. Getting the current runtime handle to execute the async operation
312    /// 3. Converting SeaORM's `DbErr` to the batch framework's `BatchError`
313    ///
314    /// # Error Conversion
315    ///
316    /// Converts SeaORM's `DbErr` to `BatchError::ItemReader` with context information
317    /// to help with debugging database-related issues.
318    fn read_page(&self) -> Result<(), BatchError> {
319        tokio::task::block_in_place(|| {
320            tokio::runtime::Handle::current().block_on(async {
321                self.read_page_async()
322                    .await
323                    .map_err(|e| BatchError::ItemReader(format!("SeaORM query failed: {}", e)))
324            })
325        })
326    }
327}
328
329/// Implementation of ItemReader trait for OrmItemReader.
330///
331/// This implementation provides a way to read items from a database using SeaORM
332/// with support for pagination. It uses an internal buffer to store the results
333/// of database queries and keeps track of the current offset to determine when
334/// a new page of data needs to be fetched.
335///
336/// # Reading Strategy
337///
338/// The reader implements a lazy-loading strategy:
339/// 1. **First read**: Loads the first page of data
340/// 2. **Subsequent reads**: Returns items from the buffer
341/// 3. **Page boundary**: When the buffer is exhausted, loads the next page
342/// 4. **End of data**: Returns None when no more data is available
343///
344/// # State Management
345///
346/// - `offset`: Tracks the absolute position across all pages
347/// - `current_page`: Tracks which page we're currently reading from
348/// - `buffer`: Holds the current page's data
349///
350/// The reader maintains these invariants:
351/// - `offset` always represents the next item to be read
352/// - `current_page` represents the page currently loaded in the buffer
353/// - The buffer contains items for the current page only
354impl<I> ItemReader<I::Model> for OrmItemReader<'_, I>
355where
356    I: EntityTrait,
357    I::Model: FromQueryResult + Send + Sync + Clone,
358{
359    /// Reads the next item from the reader.
360    ///
361    /// This method manages pagination internally and provides a simple interface
362    /// for sequential reading. The complexity of pagination is hidden from the caller.
363    ///
364    /// # Reading Algorithm
365    ///
366    /// 1. **Calculate buffer index**: Determine where we are within the current page
367    /// 2. **Check if new page needed**: If at the start of a page, load new data
368    /// 3. **Fetch from buffer**: Get the item at the current index
369    /// 4. **Update counters**: Increment offset and page number as needed
370    /// 5. **Return result**: Clone the item or return None if exhausted
371    ///
372    /// # Pagination Logic
373    ///
374    /// - **With pagination**: `index = offset % page_size`
375    /// - **Without pagination**: `index = offset` (all data in one "page")
376    /// - **New page trigger**: When `index == 0`, we need to load a new page
377    /// - **Page advancement**: When we reach the end of a page, increment `current_page`
378    ///
379    /// # Memory Management
380    ///
381    /// Items are cloned when returned to ensure the caller owns the data.
382    /// This prevents borrowing issues and allows the buffer to be modified
383    /// for the next page load.
384    ///
385    /// # Returns
386    ///
387    /// - `Ok(Some(item))` if an item was successfully read
388    /// - `Ok(None)` if there are no more items to read
389    /// - `Err(BatchError)` if an error occurred during reading (e.g., database error)
390    ///
391    /// # Examples
392    ///
393    /// ```
394    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
395    /// use spring_batch_rs::core::item::ItemReader;
396    /// use sea_orm::FromQueryResult;
397    /// use serde::{Deserialize, Serialize};
398    ///
399    /// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
400    /// struct Product {
401    ///     id: i32,
402    ///     name: String,
403    ///     price: f64,
404    /// }
405    ///
406    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
407    /// // Assuming you have a database connection and query set up
408    /// // let reader = OrmItemReaderBuilder::new()
409    /// //     .connection(&db)
410    /// //     .query(products::Entity::find())
411    /// //     .page_size(50)
412    /// //     .build();
413    ///
414    /// // Read all products
415    /// // let mut products = Vec::new();
416    /// // while let Some(product) = reader.read()? {
417    /// //     products.push(product);
418    /// // }
419    /// # Ok(())
420    /// # }
421    /// ```
422    fn read(&self) -> ItemReaderResult<I::Model> {
423        // Calculate the index within the current page
424        // This determines where we are in the current buffer
425        let index = if let Some(page_size) = self.page_size {
426            // With pagination: index is position within the current page
427            self.offset.get() % page_size
428        } else {
429            // Without pagination: index is the absolute position
430            self.offset.get()
431        };
432
433        // When index is 0, we've reached the start of a new page
434        // or this is the first read operation, so we need to fetch data
435        if index == 0 {
436            self.read_page()?
437        }
438
439        // Retrieve the item at the current index from the buffer
440        let buffer = self.buffer.borrow();
441        let result = buffer.get(index as usize);
442
443        match result {
444            Some(item) => {
445                // We found an item at the current position
446
447                // Increment the offset for the next read operation
448                // This moves us to the next item in the sequence
449                self.offset.set(self.offset.get() + 1);
450
451                // If we're using pagination and have reached the end of the current page,
452                // increment the page number for the next page load
453                if let Some(page_size) = self.page_size
454                    && self.offset.get().is_multiple_of(page_size)
455                {
456                    // We've read all items in the current page
457                    // Move to the next page for the next read cycle
458                    self.current_page.set(self.current_page.get() + 1);
459                }
460
461                // Clone the item to give ownership to the caller
462                // This prevents borrowing conflicts with the buffer
463                Ok(Some(item.clone()))
464            }
465            None => {
466                // No more items in the current buffer
467                // This means we've reached the end of the data
468
469                // If we're using pagination, this might mean we've reached the end of all data
470                // If not using pagination, this definitely means we're done
471                Ok(None)
472            }
473        }
474    }
475}
476
477/// Builder for creating a `OrmItemReader`.
478///
479/// This builder provides a convenient way to configure and create a `OrmItemReader`
480/// with custom parameters like page size and database connection.
481/// It follows the builder pattern to ensure all required components are provided
482/// before creating the reader instance.
483///
484/// # Builder Pattern Benefits
485///
486/// - **Fluent API**: Method chaining for readable configuration
487/// - **Compile-time Safety**: Required fields are enforced at build time
488/// - **Flexibility**: Optional parameters can be omitted
489/// - **Validation**: Build method validates all required components are present
490///
491/// # Required Components
492///
493/// The following components must be set before calling `build()`:
494/// - **Connection**: Database connection for executing queries
495/// - **Query**: SeaORM select query to execute
496///
497/// # Optional Components
498///
499/// - **Page Size**: When set, enables pagination for memory-efficient reading
500///
501/// # Usage Pattern
502///
503/// ```text
504/// let reader = OrmItemReaderBuilder::new()
505///     .connection(&db)           // Required
506///     .query(entity_query)       // Required
507///     .page_size(100)           // Optional
508///     .build();                 // Creates the reader
509/// ```
510///
511/// # Examples
512///
513/// ```
514/// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
515/// use sea_orm::{Database, EntityTrait, FromQueryResult};
516/// use serde::{Deserialize, Serialize};
517///
518/// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
519/// struct Order {
520///     id: i32,
521///     customer_name: String,
522///     total_amount: f64,
523///     status: String,
524/// }
525///
526/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
527/// // Create database connection
528/// let db = Database::connect("postgresql://user:pass@localhost/db").await?;
529///
530/// // Create a query (assuming you have an orders entity)
531/// // let query = orders::Entity::find()
532/// //     .filter(orders::Column::Status.eq("pending"));
533///
534/// // Create the reader with explicit type annotations
535/// // let reader: OrmItemReader<orders::Entity> = OrmItemReaderBuilder::new()
536/// //     .connection(&db)
537/// //     .query(query)
538/// //     .page_size(100)  // Process 100 orders at a time
539/// //     .build();
540/// # Ok(())
541/// # }
542/// ```
543pub struct OrmItemReaderBuilder<'a, I>
544where
545    I: EntityTrait,
546{
547    /// Database connection - None until set by the user
548    /// This will be validated as required during build()
549    connection: Option<&'a DatabaseConnection>,
550
551    /// SeaORM select query - None until set by the user
552    /// This will be validated as required during build()
553    query: Option<Select<I>>,
554
555    /// Optional page size for pagination
556    /// When None, all data will be loaded at once
557    /// When Some(size), data will be loaded in chunks
558    page_size: Option<u64>,
559}
560
561impl<I> Default for OrmItemReaderBuilder<'_, I>
562where
563    I: EntityTrait,
564{
565    /// Creates a new builder with all fields set to None/default values.
566    ///
567    /// This is the starting point for the builder pattern. All required
568    /// fields must be set before calling `build()`.
569    fn default() -> Self {
570        Self {
571            connection: None,
572            query: None,
573            page_size: None,
574        }
575    }
576}
577
578impl<'a, I> OrmItemReaderBuilder<'a, I>
579where
580    I: EntityTrait,
581    I::Model: FromQueryResult + Send + Sync + Clone,
582{
583    /// Sets the page size for the reader.
584    ///
585    /// When set, the reader will use pagination to load data in chunks of the specified size.
586    /// This is useful for processing large datasets without loading everything into memory.
587    ///
588    /// # Memory Management
589    ///
590    /// - **Small page sizes** (e.g., 10-100): Lower memory usage, more database round trips
591    /// - **Large page sizes** (e.g., 1000-10000): Higher memory usage, fewer database round trips
592    /// - **No pagination** (None): All data loaded at once, highest memory usage
593    ///
594    /// # Performance Considerations
595    ///
596    /// - Choose page size based on available memory and network latency
597    /// - Consider the size of your data rows when setting page size
598    /// - Monitor memory usage and adjust as needed
599    ///
600    /// # Arguments
601    ///
602    /// * `page_size` - The number of items to read per page (must be > 0 for meaningful pagination)
603    ///
604    /// # Returns
605    ///
606    /// The updated `OrmItemReaderBuilder` instance for method chaining.
607    ///
608    /// # Examples
609    ///
610    /// ```
611    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
612    /// use sea_orm::FromQueryResult;
613    /// use serde::{Deserialize, Serialize};
614    ///
615    /// // In practice, you would use actual SeaORM entities
616    /// // #[derive(Debug, Clone, FromQueryResult)]
617    /// // struct Record {
618    /// //     id: i32,
619    /// //     data: String,
620    /// // }
621    /// //
622    /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
623    /// //     .page_size(50);  // Process 50 records at a time
624    /// ```
625    pub fn page_size(mut self, page_size: u64) -> Self {
626        self.page_size = Some(page_size);
627        self
628    }
629
630    /// Sets the SeaORM select query for the reader.
631    ///
632    /// This query will be executed to fetch data from the database. The query can include
633    /// filters, joins, ordering, and other SeaORM query operations. The query will be
634    /// cloned for each page fetch, so it should be relatively lightweight to clone.
635    ///
636    /// # Query Design Considerations
637    ///
638    /// - **Ordering**: Include `ORDER BY` clauses for consistent pagination
639    /// - **Filtering**: Apply filters to reduce the dataset size
640    /// - **Joins**: Use joins to fetch related data in a single query
641    /// - **Indexing**: Ensure proper database indexes for query performance
642    ///
643    /// # Pagination Compatibility
644    ///
645    /// When using pagination, the query should:
646    /// - Have a deterministic order (use ORDER BY)
647    /// - Not use LIMIT/OFFSET (handled by the paginator)
648    /// - Be compatible with SeaORM's paginator
649    ///
650    /// # Arguments
651    ///
652    /// * `query` - The SeaORM select query to execute
653    ///
654    /// # Returns
655    ///
656    /// The updated `OrmItemReaderBuilder` instance for method chaining.
657    ///
658    /// # Examples
659    ///
660    /// ```
661    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
662    /// use sea_orm::{EntityTrait, QueryFilter};
663    /// use serde::{Deserialize, Serialize};
664    ///
665    /// // In practice, you would use actual SeaORM entities
666    /// // let query = user::Entity::find()
667    /// //     .filter(user::Column::Active.eq(true))
668    /// //     .order_by_asc(user::Column::Id);
669    /// //
670    /// // let builder = OrmItemReaderBuilder::<user::Entity>::new()
671    /// //     .query(query);
672    /// ```
673    pub fn query(mut self, query: Select<I>) -> Self {
674        self.query = Some(query);
675        self
676    }
677
678    /// Sets the database connection for the reader.
679    ///
680    /// This connection will be used to execute the SeaORM queries. The connection
681    /// must remain valid for the entire lifetime of the reader, which is enforced
682    /// by the lifetime parameter 'a.
683    ///
684    /// # Connection Management
685    ///
686    /// - The connection is borrowed, not owned by the reader
687    /// - Ensure the connection pool/manager outlives the reader
688    /// - The connection should be properly configured for your database
689    ///
690    /// # Database Compatibility
691    ///
692    /// SeaORM supports multiple databases:
693    /// - PostgreSQL
694    /// - MySQL
695    /// - SQLite
696    /// - SQL Server (via sqlx)
697    ///
698    /// # Arguments
699    ///
700    /// * `connection` - The SeaORM database connection (must outlive the reader)
701    ///
702    /// # Returns
703    ///
704    /// The updated `OrmItemReaderBuilder` instance for method chaining.
705    ///
706    /// # Examples
707    ///
708    /// ```
709    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
710    /// use sea_orm::{Database, EntityTrait};
711    /// use serde::{Deserialize, Serialize};
712    ///
713    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
714    /// let db = Database::connect("sqlite::memory:").await?;
715    ///
716    /// // In practice, you would use actual SeaORM entities
717    /// // let builder = OrmItemReaderBuilder::<product::Entity>::new()
718    /// //     .connection(&db);
719    /// # Ok(())
720    /// # }
721    /// ```
722    pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
723        self.connection = Some(connection);
724        self
725    }
726
727    /// Builds the ORM item reader with the configured parameters.
728    ///
729    /// This method validates that all required parameters have been set and creates
730    /// a new `OrmItemReader` instance.
731    ///
732    /// # Returns
733    /// A configured `OrmItemReader` instance
734    ///
735    /// # Panics
736    /// Panics if required parameters (connection and query) are missing.
737    ///
738    /// # Validation
739    ///
740    /// The builder performs the following validation:
741    /// - Ensures a database connection has been provided
742    /// - Ensures a SeaORM query has been provided
743    /// - Page size is optional and defaults to loading all data at once
744    ///
745    /// If any required parameter is missing, the method will panic with a descriptive error message.
746    ///
747    /// # Examples
748    ///
749    /// ```no_run
750    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
751    /// use sea_orm::{Database, EntityTrait};
752    ///
753    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
754    /// let db = Database::connect("sqlite::memory:").await?;
755    ///
756    /// // With pagination
757    /// // let reader = OrmItemReaderBuilder::new()
758    /// //     .connection(&db)
759    /// //     .query(MyEntity::find())
760    /// //     .page_size(100)
761    /// //     .build();
762    ///
763    /// // Without pagination (load all at once)
764    /// // let reader = OrmItemReaderBuilder::new()
765    /// //     .connection(&db)
766    /// //     .query(MyEntity::find())
767    /// //     .build();
768    /// # Ok(())
769    /// # }
770    /// ```
771    pub fn build(self) -> OrmItemReader<'a, I> {
772        let mut reader = OrmItemReader::new()
773            .connection(self.connection.expect("Database connection is required"))
774            .query(self.query.expect("Query is required"));
775
776        if let Some(page_size) = self.page_size {
777            reader = reader.page_size(page_size);
778        }
779
780        reader
781    }
782
783    /// Creates a new `OrmItemReaderBuilder`.
784    ///
785    /// This is the entry point for the builder pattern. It creates a new builder
786    /// instance with all fields set to their default values (None for optional fields).
787    ///
788    /// # Builder Lifecycle
789    ///
790    /// 1. **Creation**: `new()` creates an empty builder
791    /// 2. **Configuration**: Use setter methods to configure the reader
792    /// 3. **Validation**: `build()` validates and creates the final reader
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
798    /// use sea_orm::EntityTrait;
799    /// use serde::{Deserialize, Serialize};
800    ///
801    /// // In practice, you would use actual SeaORM entities
802    /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
803    /// //     .page_size(50);  // Process 50 records at a time
804    /// ```
805    pub fn new() -> Self {
806        Self::default()
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use crate::core::item::ItemReader;
814    use sea_orm::{DatabaseBackend, MockDatabase, entity::prelude::*};
815
816    // Minimal entity definition for testing
817    #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
818    #[sea_orm(table_name = "record")]
819    pub struct Model {
820        #[sea_orm(primary_key)]
821        pub id: i32,
822        pub name: String,
823    }
824
825    #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
826    pub enum Relation {}
827
828    impl ActiveModelBehavior for ActiveModel {}
829
830    // --- OrmItemReader unit tests ---
831
832    #[test]
833    fn should_create_reader_with_default_state() {
834        let reader = OrmItemReader::<Entity>::new();
835        assert!(
836            reader.connection.is_none(),
837            "connection should start as None"
838        );
839        assert_eq!(reader.page_size, None);
840        assert_eq!(reader.offset.get(), 0);
841        assert_eq!(reader.current_page.get(), 0);
842        assert!(reader.buffer.borrow().is_empty());
843    }
844
845    #[test]
846    fn should_set_page_size_via_method() {
847        let reader = OrmItemReader::<Entity>::new().page_size(50);
848        assert_eq!(reader.page_size, Some(50));
849        // buffer capacity should be pre-allocated
850        assert_eq!(reader.buffer.borrow().capacity(), 50);
851    }
852
853    #[tokio::test(flavor = "multi_thread")]
854    async fn should_return_none_when_database_is_empty() {
855        let db = MockDatabase::new(DatabaseBackend::Sqlite)
856            .append_query_results([Vec::<Model>::new()])
857            .into_connection();
858
859        let reader = OrmItemReader::<Entity>::new()
860            .connection(&db)
861            .query(Entity::find());
862
863        let result = reader.read().unwrap();
864        assert!(result.is_none(), "empty DB should yield None");
865    }
866
867    #[tokio::test(flavor = "multi_thread")]
868    async fn should_read_single_item_then_return_none() {
869        let db = MockDatabase::new(DatabaseBackend::Sqlite)
870            .append_query_results([vec![Model {
871                id: 1,
872                name: "Alice".to_string(),
873            }]])
874            .into_connection();
875
876        let reader = OrmItemReader::<Entity>::new()
877            .connection(&db)
878            .query(Entity::find());
879
880        let first = reader.read().unwrap().expect("first item should exist");
881        assert_eq!(first.name, "Alice");
882        assert_eq!(reader.offset.get(), 1);
883
884        let second = reader.read().unwrap();
885        assert!(second.is_none(), "should return None after the only item");
886    }
887
888    #[tokio::test(flavor = "multi_thread")]
889    async fn should_read_multiple_items_without_pagination() {
890        let db = MockDatabase::new(DatabaseBackend::Sqlite)
891            .append_query_results([vec![
892                Model {
893                    id: 1,
894                    name: "Alice".to_string(),
895                },
896                Model {
897                    id: 2,
898                    name: "Bob".to_string(),
899                },
900            ]])
901            .into_connection();
902
903        let reader = OrmItemReader::<Entity>::new()
904            .connection(&db)
905            .query(Entity::find());
906
907        let a = reader.read().unwrap().unwrap();
908        assert_eq!(a.name, "Alice");
909        let b = reader.read().unwrap().unwrap();
910        assert_eq!(b.name, "Bob");
911        assert!(reader.read().unwrap().is_none());
912    }
913
914    #[tokio::test(flavor = "multi_thread")]
915    async fn should_paginate_across_multiple_pages() {
916        // page_size=1: two DB calls, each returning one item, then empty
917        let db = MockDatabase::new(DatabaseBackend::Sqlite)
918            .append_query_results([
919                vec![Model {
920                    id: 1,
921                    name: "P1".to_string(),
922                }],
923                vec![Model {
924                    id: 2,
925                    name: "P2".to_string(),
926                }],
927                vec![], // signals end of data
928            ])
929            .into_connection();
930
931        let reader = OrmItemReader::<Entity>::new()
932            .connection(&db)
933            .query(Entity::find())
934            .page_size(1);
935
936        let first = reader.read().unwrap().unwrap();
937        assert_eq!(first.name, "P1");
938        assert_eq!(
939            reader.current_page.get(),
940            1,
941            "page should advance after full page"
942        );
943
944        let second = reader.read().unwrap().unwrap();
945        assert_eq!(second.name, "P2");
946
947        assert!(
948            reader.read().unwrap().is_none(),
949            "should stop after all pages"
950        );
951    }
952
953    // --- OrmItemReaderBuilder unit tests ---
954
955    #[test]
956    fn should_create_builder_with_default_state() {
957        let builder = OrmItemReaderBuilder::<Entity>::new();
958        assert!(builder.connection.is_none());
959        assert!(builder.query.is_none());
960        assert_eq!(builder.page_size, None);
961    }
962
963    #[test]
964    fn should_set_page_size_on_builder() {
965        let builder = OrmItemReaderBuilder::<Entity>::new().page_size(100);
966        assert_eq!(builder.page_size, Some(100));
967    }
968
969    #[test]
970    #[should_panic(expected = "Database connection is required")]
971    fn should_panic_when_building_without_connection() {
972        OrmItemReaderBuilder::<Entity>::new()
973            .query(Entity::find())
974            .build();
975    }
976
977    #[test]
978    #[should_panic(expected = "Query is required")]
979    fn should_panic_when_building_without_query() {
980        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
981        OrmItemReaderBuilder::<Entity>::new()
982            .connection(&db)
983            .build();
984    }
985
986    #[test]
987    fn should_build_reader_with_connection_and_query() {
988        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
989        let reader = OrmItemReaderBuilder::<Entity>::new()
990            .connection(&db)
991            .query(Entity::find())
992            .build();
993        assert!(reader.connection.is_some());
994        assert_eq!(reader.page_size, None);
995    }
996
997    #[test]
998    fn should_build_reader_with_page_size() {
999        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
1000        let reader = OrmItemReaderBuilder::<Entity>::new()
1001            .connection(&db)
1002            .query(Entity::find())
1003            .page_size(50)
1004            .build();
1005        assert_eq!(reader.page_size, Some(50));
1006    }
1007}