spring_batch_rs/item/orm/orm_reader.rs
1use std::cell::{Cell, RefCell};
2
3use sea_orm::{DatabaseConnection, DbErr, EntityTrait, FromQueryResult, PaginatorTrait, Select};
4
5use crate::{
6 BatchError,
7 core::item::{ItemReader, ItemReaderResult},
8};
9
10/// A reader for reading entities from a database using SeaORM.
11///
12/// This reader provides an implementation of the `ItemReader` trait for SeaORM-based
13/// database operations. It supports reading entities from any SeaORM-compatible database
14/// with optional pagination for efficient memory usage when dealing with large datasets.
15///
16/// # Generic Parameters
17///
18/// - `I`: The SeaORM entity type that implements `EntityTrait`
19///
20/// # Design Philosophy
21///
22/// This reader follows SeaORM's conventions and leverages its built-in pagination
23/// mechanisms for optimal performance. It provides a bridge between SeaORM's async
24/// API and the batch framework's synchronous `ItemReader` trait.
25///
26/// # Memory Management
27///
28/// The reader uses an internal buffer to store entity models from the current page,
29/// which helps balance memory usage with query performance. When pagination is enabled,
30/// only one page of data is kept in memory at any time.
31///
32/// # Query Flexibility
33///
34/// Accepts any SeaORM `Select` query, allowing for complex filtering, joins, and
35/// ordering. The query is executed as-is, with pagination parameters added when configured.
36///
37/// # State Management
38///
39/// - `offset`: Tracks the absolute position across all pages
40/// - `current_page`: Tracks which page is currently loaded in the buffer
41/// - `buffer`: Holds the current page's entity models
42pub struct OrmItemReader<'a, I>
43where
44 I: EntityTrait,
45{
46 /// Database connection reference - borrowed for the lifetime of the reader
47 /// This ensures the connection remains valid throughout the reader's lifecycle
48 connection: Option<&'a DatabaseConnection>,
49
50 /// SeaORM select query to execute
51 /// This query is cloned for each page fetch to maintain immutability
52 query: Option<Select<I>>,
53
54 /// Optional page size for pagination
55 /// When Some(size), data is loaded in chunks of this size
56 /// When None, all data is loaded at once
57 page_size: Option<u64>,
58
59 /// Current offset for tracking position across all pages
60 /// Uses Cell for interior mutability in single-threaded context
61 /// This tracks the absolute position, not just within the current page
62 offset: Cell<u64>,
63
64 /// Internal buffer for storing fetched items from the current page
65 /// Uses RefCell for interior mutability to allow borrowing during reads
66 /// The buffer is cleared and refilled when a new page is loaded
67 buffer: RefCell<Vec<I::Model>>,
68
69 /// Current page number (used with pagination)
70 /// Starts at 0 and increments when moving to the next page
71 /// Only relevant when page_size is Some(_)
72 current_page: Cell<u64>,
73}
74
75impl<'a, I> Default for OrmItemReader<'a, I>
76where
77 I: EntityTrait,
78 I::Model: FromQueryResult + Send + Sync + Clone,
79{
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl<'a, I> OrmItemReader<'a, I>
86where
87 I: EntityTrait,
88 I::Model: FromQueryResult + Send + Sync + Clone,
89{
90 /// Creates a new `OrmItemReader` with default configuration.
91 ///
92 /// All parameters must be set using the builder methods before use.
93 /// Use the builder pattern for a more convenient API.
94 ///
95 /// # Returns
96 ///
97 /// A new `OrmItemReader` instance with default settings.
98 ///
99 /// # Examples
100 ///
101 /// ```no_run
102 /// use spring_batch_rs::item::orm::OrmItemReader;
103 /// use sea_orm::{Database, EntityTrait};
104 ///
105 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
106 /// let db = Database::connect("sqlite::memory:").await?;
107 ///
108 /// // Using direct instantiation (less recommended)
109 /// // let reader = OrmItemReader::<MyEntity>::new()
110 /// // .connection(&db)
111 /// // .query(MyEntity::find())
112 /// // .page_size(100);
113 ///
114 /// // Using builder (recommended)
115 /// // let reader = OrmItemReaderBuilder::new()
116 /// // .connection(&db)
117 /// // .query(MyEntity::find())
118 /// // .page_size(100)
119 /// // .build();
120 /// # Ok(())
121 /// # }
122 /// ```
123 pub fn new() -> Self {
124 Self {
125 connection: None,
126 query: None,
127 page_size: None,
128 offset: Cell::new(0),
129 buffer: RefCell::new(Vec::new()),
130 current_page: Cell::new(0),
131 }
132 }
133
134 /// Sets the database connection for the reader.
135 ///
136 /// This is a required parameter that must be set before using the reader.
137 ///
138 /// # Arguments
139 ///
140 /// * `connection` - Reference to the SeaORM database connection
141 ///
142 /// # Returns
143 ///
144 /// The updated `OrmItemReader` instance.
145 ///
146 /// # Examples
147 ///
148 /// ```compile_fail
149 /// use spring_batch_rs::item::orm::OrmItemReader;
150 /// use sea_orm::Database;
151 ///
152 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
153 /// let db = Database::connect("sqlite::memory:").await?;
154 /// let reader = OrmItemReader::<String>::new()
155 /// .connection(&db);
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
160 self.connection = Some(connection);
161 self
162 }
163
164 /// Sets the SeaORM select query for the reader.
165 ///
166 /// This is a required parameter that must be set before using the reader.
167 /// The query can include filtering, ordering, joins, and other SeaORM operations.
168 ///
169 /// # Arguments
170 ///
171 /// * `query` - The SeaORM select query to execute
172 ///
173 /// # Returns
174 ///
175 /// The updated `OrmItemReader` instance.
176 ///
177 /// # Examples
178 ///
179 /// ```no_run
180 /// use spring_batch_rs::item::orm::OrmItemReader;
181 /// use sea_orm::EntityTrait;
182 ///
183 /// // Basic query
184 /// // let reader = OrmItemReader::<MyEntity>::new()
185 /// // .query(MyEntity::find());
186 ///
187 /// // Filtered query
188 /// // let reader = OrmItemReader::<MyEntity>::new()
189 /// // .query(MyEntity::find().filter(my_entity::Column::Active.eq(true)));
190 ///
191 /// // Ordered query
192 /// // let reader = OrmItemReader::<MyEntity>::new()
193 /// // .query(MyEntity::find().order_by_asc(my_entity::Column::Id));
194 /// ```
195 pub fn query(mut self, query: Select<I>) -> Self {
196 self.query = Some(query);
197 // Pre-allocate buffer capacity based on page size if available
198 if let Some(page_size) = self.page_size {
199 self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
200 }
201 self
202 }
203
204 /// Sets the page size for the reader.
205 ///
206 /// When set, the reader will use pagination to limit memory usage.
207 /// When not set, all data will be loaded at once.
208 ///
209 /// # Arguments
210 ///
211 /// * `page_size` - The number of items to read per page
212 ///
213 /// # Returns
214 ///
215 /// The updated `OrmItemReader` instance.
216 ///
217 /// # Performance Considerations
218 ///
219 /// - Larger page sizes reduce the number of database round trips but use more memory
220 /// - Smaller page sizes use less memory but may result in more database queries
221 /// - Consider your dataset size and available memory when choosing a page size
222 /// - Recommended range: 50-1000 items per page for most use cases
223 ///
224 /// # Examples
225 ///
226 /// ```compile_fail
227 /// use spring_batch_rs::item::orm::OrmItemReader;
228 ///
229 /// // Small page size for memory-constrained environments
230 /// let reader = OrmItemReader::<String>::new()
231 /// .page_size(50);
232 ///
233 /// // Larger page size for better performance with ample memory
234 /// let reader = OrmItemReader::<String>::new()
235 /// .page_size(1000);
236 /// ```
237 pub fn page_size(mut self, page_size: u64) -> Self {
238 self.page_size = Some(page_size);
239 // Update buffer capacity if available
240 self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
241 self
242 }
243
244 /// Reads a page of items from the database using SeaORM.
245 ///
246 /// This method executes the SeaORM query with pagination parameters (if page_size is set),
247 /// and fills the internal buffer with the results. It uses SeaORM's async methods
248 /// within a blocking context to maintain compatibility with the synchronous ItemReader trait.
249 ///
250 /// # Pagination Logic
251 ///
252 /// - **With pagination**: Uses SeaORM's `paginate()` method to fetch a specific page
253 /// - **Without pagination**: Uses SeaORM's `all()` method to fetch all results
254 ///
255 /// # Buffer Management
256 ///
257 /// - Clears the existing buffer before loading new data
258 /// - Stores all entity models directly in the buffer for sequential access
259 ///
260 /// # Error Handling
261 ///
262 /// Returns a `DbErr` if the database query fails. This error will be converted
263 /// to a `BatchError` by the calling method.
264 async fn read_page_async(&self) -> Result<(), DbErr> {
265 let results = if let Some(page_size) = self.page_size {
266 // Use SeaORM's paginator for efficient pagination
267 // This creates a paginator that can fetch specific pages
268 let paginator = self
269 .query
270 .as_ref()
271 .unwrap()
272 .clone()
273 .paginate(self.connection.unwrap(), page_size);
274 let current_page = self.current_page.get();
275
276 // Fetch the specific page we're currently on
277 // Pages are 0-indexed in SeaORM's paginator
278 paginator.fetch_page(current_page).await?
279 } else {
280 // Load all results at once - suitable for smaller datasets
281 // This executes the query and returns all matching rows
282 self.query
283 .as_ref()
284 .unwrap()
285 .clone()
286 .all(self.connection.unwrap())
287 .await?
288 };
289
290 // Clear the buffer and fill it with entity models
291 let mut buffer = self.buffer.borrow_mut();
292 buffer.clear(); // Remove any existing items from the previous page
293
294 // Store entity models directly in the buffer
295 buffer.extend(results);
296
297 Ok(())
298 }
299
300 /// Reads a page of items from the database.
301 ///
302 /// This is a synchronous wrapper around the async `read_page_async` method.
303 /// It uses tokio's `block_in_place` to run the async operation in a blocking context,
304 /// which is necessary because the `ItemReader` trait is synchronous.
305 ///
306 /// # Async-to-Sync Bridge
307 ///
308 /// SeaORM is inherently async, but the batch framework uses synchronous traits
309 /// for simplicity. This method bridges that gap by:
310 /// 1. Using `block_in_place` to avoid blocking the tokio runtime
311 /// 2. Getting the current runtime handle to execute the async operation
312 /// 3. Converting SeaORM's `DbErr` to the batch framework's `BatchError`
313 ///
314 /// # Error Conversion
315 ///
316 /// Converts SeaORM's `DbErr` to `BatchError::ItemReader` with context information
317 /// to help with debugging database-related issues.
318 fn read_page(&self) -> Result<(), BatchError> {
319 tokio::task::block_in_place(|| {
320 tokio::runtime::Handle::current().block_on(async {
321 self.read_page_async()
322 .await
323 .map_err(|e| BatchError::ItemReader(format!("SeaORM query failed: {}", e)))
324 })
325 })
326 }
327}
328
329/// Implementation of ItemReader trait for OrmItemReader.
330///
331/// This implementation provides a way to read items from a database using SeaORM
332/// with support for pagination. It uses an internal buffer to store the results
333/// of database queries and keeps track of the current offset to determine when
334/// a new page of data needs to be fetched.
335///
336/// # Reading Strategy
337///
338/// The reader implements a lazy-loading strategy:
339/// 1. **First read**: Loads the first page of data
340/// 2. **Subsequent reads**: Returns items from the buffer
341/// 3. **Page boundary**: When the buffer is exhausted, loads the next page
342/// 4. **End of data**: Returns None when no more data is available
343///
344/// # State Management
345///
346/// - `offset`: Tracks the absolute position across all pages
347/// - `current_page`: Tracks which page we're currently reading from
348/// - `buffer`: Holds the current page's data
349///
350/// The reader maintains these invariants:
351/// - `offset` always represents the next item to be read
352/// - `current_page` represents the page currently loaded in the buffer
353/// - The buffer contains items for the current page only
354impl<I> ItemReader<I::Model> for OrmItemReader<'_, I>
355where
356 I: EntityTrait,
357 I::Model: FromQueryResult + Send + Sync + Clone,
358{
359 /// Reads the next item from the reader.
360 ///
361 /// This method manages pagination internally and provides a simple interface
362 /// for sequential reading. The complexity of pagination is hidden from the caller.
363 ///
364 /// # Reading Algorithm
365 ///
366 /// 1. **Calculate buffer index**: Determine where we are within the current page
367 /// 2. **Check if new page needed**: If at the start of a page, load new data
368 /// 3. **Fetch from buffer**: Get the item at the current index
369 /// 4. **Update counters**: Increment offset and page number as needed
370 /// 5. **Return result**: Clone the item or return None if exhausted
371 ///
372 /// # Pagination Logic
373 ///
374 /// - **With pagination**: `index = offset % page_size`
375 /// - **Without pagination**: `index = offset` (all data in one "page")
376 /// - **New page trigger**: When `index == 0`, we need to load a new page
377 /// - **Page advancement**: When we reach the end of a page, increment `current_page`
378 ///
379 /// # Memory Management
380 ///
381 /// Items are cloned when returned to ensure the caller owns the data.
382 /// This prevents borrowing issues and allows the buffer to be modified
383 /// for the next page load.
384 ///
385 /// # Returns
386 ///
387 /// - `Ok(Some(item))` if an item was successfully read
388 /// - `Ok(None)` if there are no more items to read
389 /// - `Err(BatchError)` if an error occurred during reading (e.g., database error)
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
395 /// use spring_batch_rs::core::item::ItemReader;
396 /// use sea_orm::FromQueryResult;
397 /// use serde::{Deserialize, Serialize};
398 ///
399 /// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
400 /// struct Product {
401 /// id: i32,
402 /// name: String,
403 /// price: f64,
404 /// }
405 ///
406 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
407 /// // Assuming you have a database connection and query set up
408 /// // let reader = OrmItemReaderBuilder::new()
409 /// // .connection(&db)
410 /// // .query(products::Entity::find())
411 /// // .page_size(50)
412 /// // .build();
413 ///
414 /// // Read all products
415 /// // let mut products = Vec::new();
416 /// // while let Some(product) = reader.read()? {
417 /// // products.push(product);
418 /// // }
419 /// # Ok(())
420 /// # }
421 /// ```
422 fn read(&self) -> ItemReaderResult<I::Model> {
423 // Calculate the index within the current page
424 // This determines where we are in the current buffer
425 let index = if let Some(page_size) = self.page_size {
426 // With pagination: index is position within the current page
427 self.offset.get() % page_size
428 } else {
429 // Without pagination: index is the absolute position
430 self.offset.get()
431 };
432
433 // When index is 0, we've reached the start of a new page
434 // or this is the first read operation, so we need to fetch data
435 if index == 0 {
436 self.read_page()?
437 }
438
439 // Retrieve the item at the current index from the buffer
440 let buffer = self.buffer.borrow();
441 let result = buffer.get(index as usize);
442
443 match result {
444 Some(item) => {
445 // We found an item at the current position
446
447 // Increment the offset for the next read operation
448 // This moves us to the next item in the sequence
449 self.offset.set(self.offset.get() + 1);
450
451 // If we're using pagination and have reached the end of the current page,
452 // increment the page number for the next page load
453 if let Some(page_size) = self.page_size
454 && self.offset.get().is_multiple_of(page_size)
455 {
456 // We've read all items in the current page
457 // Move to the next page for the next read cycle
458 self.current_page.set(self.current_page.get() + 1);
459 }
460
461 // Clone the item to give ownership to the caller
462 // This prevents borrowing conflicts with the buffer
463 Ok(Some(item.clone()))
464 }
465 None => {
466 // No more items in the current buffer
467 // This means we've reached the end of the data
468
469 // If we're using pagination, this might mean we've reached the end of all data
470 // If not using pagination, this definitely means we're done
471 Ok(None)
472 }
473 }
474 }
475}
476
477/// Builder for creating a `OrmItemReader`.
478///
479/// This builder provides a convenient way to configure and create a `OrmItemReader`
480/// with custom parameters like page size and database connection.
481/// It follows the builder pattern to ensure all required components are provided
482/// before creating the reader instance.
483///
484/// # Builder Pattern Benefits
485///
486/// - **Fluent API**: Method chaining for readable configuration
487/// - **Compile-time Safety**: Required fields are enforced at build time
488/// - **Flexibility**: Optional parameters can be omitted
489/// - **Validation**: Build method validates all required components are present
490///
491/// # Required Components
492///
493/// The following components must be set before calling `build()`:
494/// - **Connection**: Database connection for executing queries
495/// - **Query**: SeaORM select query to execute
496///
497/// # Optional Components
498///
499/// - **Page Size**: When set, enables pagination for memory-efficient reading
500///
501/// # Usage Pattern
502///
503/// ```text
504/// let reader = OrmItemReaderBuilder::new()
505/// .connection(&db) // Required
506/// .query(entity_query) // Required
507/// .page_size(100) // Optional
508/// .build(); // Creates the reader
509/// ```
510///
511/// # Examples
512///
513/// ```
514/// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
515/// use sea_orm::{Database, EntityTrait, FromQueryResult};
516/// use serde::{Deserialize, Serialize};
517///
518/// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
519/// struct Order {
520/// id: i32,
521/// customer_name: String,
522/// total_amount: f64,
523/// status: String,
524/// }
525///
526/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
527/// // Create database connection
528/// let db = Database::connect("postgresql://user:pass@localhost/db").await?;
529///
530/// // Create a query (assuming you have an orders entity)
531/// // let query = orders::Entity::find()
532/// // .filter(orders::Column::Status.eq("pending"));
533///
534/// // Create the reader with explicit type annotations
535/// // let reader: OrmItemReader<orders::Entity> = OrmItemReaderBuilder::new()
536/// // .connection(&db)
537/// // .query(query)
538/// // .page_size(100) // Process 100 orders at a time
539/// // .build();
540/// # Ok(())
541/// # }
542/// ```
543pub struct OrmItemReaderBuilder<'a, I>
544where
545 I: EntityTrait,
546{
547 /// Database connection - None until set by the user
548 /// This will be validated as required during build()
549 connection: Option<&'a DatabaseConnection>,
550
551 /// SeaORM select query - None until set by the user
552 /// This will be validated as required during build()
553 query: Option<Select<I>>,
554
555 /// Optional page size for pagination
556 /// When None, all data will be loaded at once
557 /// When Some(size), data will be loaded in chunks
558 page_size: Option<u64>,
559}
560
561impl<I> Default for OrmItemReaderBuilder<'_, I>
562where
563 I: EntityTrait,
564{
565 /// Creates a new builder with all fields set to None/default values.
566 ///
567 /// This is the starting point for the builder pattern. All required
568 /// fields must be set before calling `build()`.
569 fn default() -> Self {
570 Self {
571 connection: None,
572 query: None,
573 page_size: None,
574 }
575 }
576}
577
578impl<'a, I> OrmItemReaderBuilder<'a, I>
579where
580 I: EntityTrait,
581 I::Model: FromQueryResult + Send + Sync + Clone,
582{
583 /// Sets the page size for the reader.
584 ///
585 /// When set, the reader will use pagination to load data in chunks of the specified size.
586 /// This is useful for processing large datasets without loading everything into memory.
587 ///
588 /// # Memory Management
589 ///
590 /// - **Small page sizes** (e.g., 10-100): Lower memory usage, more database round trips
591 /// - **Large page sizes** (e.g., 1000-10000): Higher memory usage, fewer database round trips
592 /// - **No pagination** (None): All data loaded at once, highest memory usage
593 ///
594 /// # Performance Considerations
595 ///
596 /// - Choose page size based on available memory and network latency
597 /// - Consider the size of your data rows when setting page size
598 /// - Monitor memory usage and adjust as needed
599 ///
600 /// # Arguments
601 ///
602 /// * `page_size` - The number of items to read per page (must be > 0 for meaningful pagination)
603 ///
604 /// # Returns
605 ///
606 /// The updated `OrmItemReaderBuilder` instance for method chaining.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
612 /// use sea_orm::FromQueryResult;
613 /// use serde::{Deserialize, Serialize};
614 ///
615 /// // In practice, you would use actual SeaORM entities
616 /// // #[derive(Debug, Clone, FromQueryResult)]
617 /// // struct Record {
618 /// // id: i32,
619 /// // data: String,
620 /// // }
621 /// //
622 /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
623 /// // .page_size(50); // Process 50 records at a time
624 /// ```
625 pub fn page_size(mut self, page_size: u64) -> Self {
626 self.page_size = Some(page_size);
627 self
628 }
629
630 /// Sets the SeaORM select query for the reader.
631 ///
632 /// This query will be executed to fetch data from the database. The query can include
633 /// filters, joins, ordering, and other SeaORM query operations. The query will be
634 /// cloned for each page fetch, so it should be relatively lightweight to clone.
635 ///
636 /// # Query Design Considerations
637 ///
638 /// - **Ordering**: Include `ORDER BY` clauses for consistent pagination
639 /// - **Filtering**: Apply filters to reduce the dataset size
640 /// - **Joins**: Use joins to fetch related data in a single query
641 /// - **Indexing**: Ensure proper database indexes for query performance
642 ///
643 /// # Pagination Compatibility
644 ///
645 /// When using pagination, the query should:
646 /// - Have a deterministic order (use ORDER BY)
647 /// - Not use LIMIT/OFFSET (handled by the paginator)
648 /// - Be compatible with SeaORM's paginator
649 ///
650 /// # Arguments
651 ///
652 /// * `query` - The SeaORM select query to execute
653 ///
654 /// # Returns
655 ///
656 /// The updated `OrmItemReaderBuilder` instance for method chaining.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
662 /// use sea_orm::{EntityTrait, QueryFilter};
663 /// use serde::{Deserialize, Serialize};
664 ///
665 /// // In practice, you would use actual SeaORM entities
666 /// // let query = user::Entity::find()
667 /// // .filter(user::Column::Active.eq(true))
668 /// // .order_by_asc(user::Column::Id);
669 /// //
670 /// // let builder = OrmItemReaderBuilder::<user::Entity>::new()
671 /// // .query(query);
672 /// ```
673 pub fn query(mut self, query: Select<I>) -> Self {
674 self.query = Some(query);
675 self
676 }
677
678 /// Sets the database connection for the reader.
679 ///
680 /// This connection will be used to execute the SeaORM queries. The connection
681 /// must remain valid for the entire lifetime of the reader, which is enforced
682 /// by the lifetime parameter 'a.
683 ///
684 /// # Connection Management
685 ///
686 /// - The connection is borrowed, not owned by the reader
687 /// - Ensure the connection pool/manager outlives the reader
688 /// - The connection should be properly configured for your database
689 ///
690 /// # Database Compatibility
691 ///
692 /// SeaORM supports multiple databases:
693 /// - PostgreSQL
694 /// - MySQL
695 /// - SQLite
696 /// - SQL Server (via sqlx)
697 ///
698 /// # Arguments
699 ///
700 /// * `connection` - The SeaORM database connection (must outlive the reader)
701 ///
702 /// # Returns
703 ///
704 /// The updated `OrmItemReaderBuilder` instance for method chaining.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
710 /// use sea_orm::{Database, EntityTrait};
711 /// use serde::{Deserialize, Serialize};
712 ///
713 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
714 /// let db = Database::connect("sqlite::memory:").await?;
715 ///
716 /// // In practice, you would use actual SeaORM entities
717 /// // let builder = OrmItemReaderBuilder::<product::Entity>::new()
718 /// // .connection(&db);
719 /// # Ok(())
720 /// # }
721 /// ```
722 pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
723 self.connection = Some(connection);
724 self
725 }
726
727 /// Builds the ORM item reader with the configured parameters.
728 ///
729 /// This method validates that all required parameters have been set and creates
730 /// a new `OrmItemReader` instance.
731 ///
732 /// # Returns
733 /// A configured `OrmItemReader` instance
734 ///
735 /// # Panics
736 /// Panics if required parameters (connection and query) are missing.
737 ///
738 /// # Validation
739 ///
740 /// The builder performs the following validation:
741 /// - Ensures a database connection has been provided
742 /// - Ensures a SeaORM query has been provided
743 /// - Page size is optional and defaults to loading all data at once
744 ///
745 /// If any required parameter is missing, the method will panic with a descriptive error message.
746 ///
747 /// # Examples
748 ///
749 /// ```no_run
750 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
751 /// use sea_orm::{Database, EntityTrait};
752 ///
753 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
754 /// let db = Database::connect("sqlite::memory:").await?;
755 ///
756 /// // With pagination
757 /// // let reader = OrmItemReaderBuilder::new()
758 /// // .connection(&db)
759 /// // .query(MyEntity::find())
760 /// // .page_size(100)
761 /// // .build();
762 ///
763 /// // Without pagination (load all at once)
764 /// // let reader = OrmItemReaderBuilder::new()
765 /// // .connection(&db)
766 /// // .query(MyEntity::find())
767 /// // .build();
768 /// # Ok(())
769 /// # }
770 /// ```
771 pub fn build(self) -> OrmItemReader<'a, I> {
772 let mut reader = OrmItemReader::new()
773 .connection(self.connection.expect("Database connection is required"))
774 .query(self.query.expect("Query is required"));
775
776 if let Some(page_size) = self.page_size {
777 reader = reader.page_size(page_size);
778 }
779
780 reader
781 }
782
783 /// Creates a new `OrmItemReaderBuilder`.
784 ///
785 /// This is the entry point for the builder pattern. It creates a new builder
786 /// instance with all fields set to their default values (None for optional fields).
787 ///
788 /// # Builder Lifecycle
789 ///
790 /// 1. **Creation**: `new()` creates an empty builder
791 /// 2. **Configuration**: Use setter methods to configure the reader
792 /// 3. **Validation**: `build()` validates and creates the final reader
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
798 /// use sea_orm::EntityTrait;
799 /// use serde::{Deserialize, Serialize};
800 ///
801 /// // In practice, you would use actual SeaORM entities
802 /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
803 /// // .page_size(50); // Process 50 records at a time
804 /// ```
805 pub fn new() -> Self {
806 Self::default()
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use crate::core::item::ItemReader;
814 use sea_orm::{DatabaseBackend, MockDatabase, entity::prelude::*};
815
816 // Minimal entity definition for testing
817 #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
818 #[sea_orm(table_name = "record")]
819 pub struct Model {
820 #[sea_orm(primary_key)]
821 pub id: i32,
822 pub name: String,
823 }
824
825 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
826 pub enum Relation {}
827
828 impl ActiveModelBehavior for ActiveModel {}
829
830 // --- OrmItemReader unit tests ---
831
832 #[test]
833 fn should_create_reader_with_default_state() {
834 let reader = OrmItemReader::<Entity>::new();
835 assert!(
836 reader.connection.is_none(),
837 "connection should start as None"
838 );
839 assert_eq!(reader.page_size, None);
840 assert_eq!(reader.offset.get(), 0);
841 assert_eq!(reader.current_page.get(), 0);
842 assert!(reader.buffer.borrow().is_empty());
843 }
844
845 #[test]
846 fn should_set_page_size_via_method() {
847 let reader = OrmItemReader::<Entity>::new().page_size(50);
848 assert_eq!(reader.page_size, Some(50));
849 // buffer capacity should be pre-allocated
850 assert_eq!(reader.buffer.borrow().capacity(), 50);
851 }
852
853 #[tokio::test(flavor = "multi_thread")]
854 async fn should_return_none_when_database_is_empty() {
855 let db = MockDatabase::new(DatabaseBackend::Sqlite)
856 .append_query_results([Vec::<Model>::new()])
857 .into_connection();
858
859 let reader = OrmItemReader::<Entity>::new()
860 .connection(&db)
861 .query(Entity::find());
862
863 let result = reader.read().unwrap();
864 assert!(result.is_none(), "empty DB should yield None");
865 }
866
867 #[tokio::test(flavor = "multi_thread")]
868 async fn should_read_single_item_then_return_none() {
869 let db = MockDatabase::new(DatabaseBackend::Sqlite)
870 .append_query_results([vec![Model {
871 id: 1,
872 name: "Alice".to_string(),
873 }]])
874 .into_connection();
875
876 let reader = OrmItemReader::<Entity>::new()
877 .connection(&db)
878 .query(Entity::find());
879
880 let first = reader.read().unwrap().expect("first item should exist");
881 assert_eq!(first.name, "Alice");
882 assert_eq!(reader.offset.get(), 1);
883
884 let second = reader.read().unwrap();
885 assert!(second.is_none(), "should return None after the only item");
886 }
887
888 #[tokio::test(flavor = "multi_thread")]
889 async fn should_read_multiple_items_without_pagination() {
890 let db = MockDatabase::new(DatabaseBackend::Sqlite)
891 .append_query_results([vec![
892 Model {
893 id: 1,
894 name: "Alice".to_string(),
895 },
896 Model {
897 id: 2,
898 name: "Bob".to_string(),
899 },
900 ]])
901 .into_connection();
902
903 let reader = OrmItemReader::<Entity>::new()
904 .connection(&db)
905 .query(Entity::find());
906
907 let a = reader.read().unwrap().unwrap();
908 assert_eq!(a.name, "Alice");
909 let b = reader.read().unwrap().unwrap();
910 assert_eq!(b.name, "Bob");
911 assert!(reader.read().unwrap().is_none());
912 }
913
914 #[tokio::test(flavor = "multi_thread")]
915 async fn should_paginate_across_multiple_pages() {
916 // page_size=1: two DB calls, each returning one item, then empty
917 let db = MockDatabase::new(DatabaseBackend::Sqlite)
918 .append_query_results([
919 vec![Model {
920 id: 1,
921 name: "P1".to_string(),
922 }],
923 vec![Model {
924 id: 2,
925 name: "P2".to_string(),
926 }],
927 vec![], // signals end of data
928 ])
929 .into_connection();
930
931 let reader = OrmItemReader::<Entity>::new()
932 .connection(&db)
933 .query(Entity::find())
934 .page_size(1);
935
936 let first = reader.read().unwrap().unwrap();
937 assert_eq!(first.name, "P1");
938 assert_eq!(
939 reader.current_page.get(),
940 1,
941 "page should advance after full page"
942 );
943
944 let second = reader.read().unwrap().unwrap();
945 assert_eq!(second.name, "P2");
946
947 assert!(
948 reader.read().unwrap().is_none(),
949 "should stop after all pages"
950 );
951 }
952
953 // --- OrmItemReaderBuilder unit tests ---
954
955 #[test]
956 fn should_create_builder_with_default_state() {
957 let builder = OrmItemReaderBuilder::<Entity>::new();
958 assert!(builder.connection.is_none());
959 assert!(builder.query.is_none());
960 assert_eq!(builder.page_size, None);
961 }
962
963 #[test]
964 fn should_set_page_size_on_builder() {
965 let builder = OrmItemReaderBuilder::<Entity>::new().page_size(100);
966 assert_eq!(builder.page_size, Some(100));
967 }
968
969 #[test]
970 #[should_panic(expected = "Database connection is required")]
971 fn should_panic_when_building_without_connection() {
972 OrmItemReaderBuilder::<Entity>::new()
973 .query(Entity::find())
974 .build();
975 }
976
977 #[test]
978 #[should_panic(expected = "Query is required")]
979 fn should_panic_when_building_without_query() {
980 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
981 OrmItemReaderBuilder::<Entity>::new()
982 .connection(&db)
983 .build();
984 }
985
986 #[test]
987 fn should_build_reader_with_connection_and_query() {
988 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
989 let reader = OrmItemReaderBuilder::<Entity>::new()
990 .connection(&db)
991 .query(Entity::find())
992 .build();
993 assert!(reader.connection.is_some());
994 assert_eq!(reader.page_size, None);
995 }
996
997 #[test]
998 fn should_build_reader_with_page_size() {
999 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
1000 let reader = OrmItemReaderBuilder::<Entity>::new()
1001 .connection(&db)
1002 .query(Entity::find())
1003 .page_size(50)
1004 .build();
1005 assert_eq!(reader.page_size, Some(50));
1006 }
1007}