spring_batch_rs/item/orm/orm_reader.rs
1use std::cell::{Cell, RefCell};
2
3use sea_orm::{DatabaseConnection, DbErr, EntityTrait, FromQueryResult, PaginatorTrait, Select};
4
5use crate::{
6 core::item::{ItemReader, ItemReaderResult},
7 BatchError,
8};
9
10/// A reader for reading entities from a database using SeaORM.
11///
12/// This reader provides an implementation of the `ItemReader` trait for SeaORM-based
13/// database operations. It supports reading entities from any SeaORM-compatible database
14/// with optional pagination for efficient memory usage when dealing with large datasets.
15///
16/// # Generic Parameters
17///
18/// - `I`: The SeaORM entity type that implements `EntityTrait`
19///
20/// # Design Philosophy
21///
22/// This reader follows SeaORM's conventions and leverages its built-in pagination
23/// mechanisms for optimal performance. It provides a bridge between SeaORM's async
24/// API and the batch framework's synchronous `ItemReader` trait.
25///
26/// # Memory Management
27///
28/// The reader uses an internal buffer to store entity models from the current page,
29/// which helps balance memory usage with query performance. When pagination is enabled,
30/// only one page of data is kept in memory at any time.
31///
32/// # Query Flexibility
33///
34/// Accepts any SeaORM `Select` query, allowing for complex filtering, joins, and
35/// ordering. The query is executed as-is, with pagination parameters added when configured.
36///
37/// # State Management
38///
39/// - `offset`: Tracks the absolute position across all pages
40/// - `current_page`: Tracks which page is currently loaded in the buffer
41/// - `buffer`: Holds the current page's entity models
42pub struct OrmItemReader<'a, I>
43where
44 I: EntityTrait,
45{
46 /// Database connection reference - borrowed for the lifetime of the reader
47 /// This ensures the connection remains valid throughout the reader's lifecycle
48 connection: Option<&'a DatabaseConnection>,
49
50 /// SeaORM select query to execute
51 /// This query is cloned for each page fetch to maintain immutability
52 query: Option<Select<I>>,
53
54 /// Optional page size for pagination
55 /// When Some(size), data is loaded in chunks of this size
56 /// When None, all data is loaded at once
57 page_size: Option<u64>,
58
59 /// Current offset for tracking position across all pages
60 /// Uses Cell for interior mutability in single-threaded context
61 /// This tracks the absolute position, not just within the current page
62 offset: Cell<u64>,
63
64 /// Internal buffer for storing fetched items from the current page
65 /// Uses RefCell for interior mutability to allow borrowing during reads
66 /// The buffer is cleared and refilled when a new page is loaded
67 buffer: RefCell<Vec<I::Model>>,
68
69 /// Current page number (used with pagination)
70 /// Starts at 0 and increments when moving to the next page
71 /// Only relevant when page_size is Some(_)
72 current_page: Cell<u64>,
73}
74
75impl<'a, I> Default for OrmItemReader<'a, I>
76where
77 I: EntityTrait,
78 I::Model: FromQueryResult + Send + Sync + Clone,
79{
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl<'a, I> OrmItemReader<'a, I>
86where
87 I: EntityTrait,
88 I::Model: FromQueryResult + Send + Sync + Clone,
89{
90 /// Creates a new `OrmItemReader` with default configuration.
91 ///
92 /// All parameters must be set using the builder methods before use.
93 /// Use the builder pattern for a more convenient API.
94 ///
95 /// # Returns
96 ///
97 /// A new `OrmItemReader` instance with default settings.
98 ///
99 /// # Examples
100 ///
101 /// ```no_run
102 /// use spring_batch_rs::item::orm::OrmItemReader;
103 /// use sea_orm::{Database, EntityTrait};
104 ///
105 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
106 /// let db = Database::connect("sqlite::memory:").await?;
107 ///
108 /// // Using direct instantiation (less recommended)
109 /// // let reader = OrmItemReader::<MyEntity>::new()
110 /// // .connection(&db)
111 /// // .query(MyEntity::find())
112 /// // .page_size(100);
113 ///
114 /// // Using builder (recommended)
115 /// // let reader = OrmItemReaderBuilder::new()
116 /// // .connection(&db)
117 /// // .query(MyEntity::find())
118 /// // .page_size(100)
119 /// // .build();
120 /// # Ok(())
121 /// # }
122 /// ```
123 pub fn new() -> Self {
124 Self {
125 connection: None,
126 query: None,
127 page_size: None,
128 offset: Cell::new(0),
129 buffer: RefCell::new(Vec::new()),
130 current_page: Cell::new(0),
131 }
132 }
133
134 /// Sets the database connection for the reader.
135 ///
136 /// This is a required parameter that must be set before using the reader.
137 ///
138 /// # Arguments
139 ///
140 /// * `connection` - Reference to the SeaORM database connection
141 ///
142 /// # Returns
143 ///
144 /// The updated `OrmItemReader` instance.
145 ///
146 /// # Examples
147 ///
148 /// ```compile_fail
149 /// use spring_batch_rs::item::orm::OrmItemReader;
150 /// use sea_orm::Database;
151 ///
152 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
153 /// let db = Database::connect("sqlite::memory:").await?;
154 /// let reader = OrmItemReader::<String>::new()
155 /// .connection(&db);
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
160 self.connection = Some(connection);
161 self
162 }
163
164 /// Sets the SeaORM select query for the reader.
165 ///
166 /// This is a required parameter that must be set before using the reader.
167 /// The query can include filtering, ordering, joins, and other SeaORM operations.
168 ///
169 /// # Arguments
170 ///
171 /// * `query` - The SeaORM select query to execute
172 ///
173 /// # Returns
174 ///
175 /// The updated `OrmItemReader` instance.
176 ///
177 /// # Examples
178 ///
179 /// ```no_run
180 /// use spring_batch_rs::item::orm::OrmItemReader;
181 /// use sea_orm::EntityTrait;
182 ///
183 /// // Basic query
184 /// // let reader = OrmItemReader::<MyEntity>::new()
185 /// // .query(MyEntity::find());
186 ///
187 /// // Filtered query
188 /// // let reader = OrmItemReader::<MyEntity>::new()
189 /// // .query(MyEntity::find().filter(my_entity::Column::Active.eq(true)));
190 ///
191 /// // Ordered query
192 /// // let reader = OrmItemReader::<MyEntity>::new()
193 /// // .query(MyEntity::find().order_by_asc(my_entity::Column::Id));
194 /// ```
195 pub fn query(mut self, query: Select<I>) -> Self {
196 self.query = Some(query);
197 // Pre-allocate buffer capacity based on page size if available
198 if let Some(page_size) = self.page_size {
199 self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
200 }
201 self
202 }
203
204 /// Sets the page size for the reader.
205 ///
206 /// When set, the reader will use pagination to limit memory usage.
207 /// When not set, all data will be loaded at once.
208 ///
209 /// # Arguments
210 ///
211 /// * `page_size` - The number of items to read per page
212 ///
213 /// # Returns
214 ///
215 /// The updated `OrmItemReader` instance.
216 ///
217 /// # Performance Considerations
218 ///
219 /// - Larger page sizes reduce the number of database round trips but use more memory
220 /// - Smaller page sizes use less memory but may result in more database queries
221 /// - Consider your dataset size and available memory when choosing a page size
222 /// - Recommended range: 50-1000 items per page for most use cases
223 ///
224 /// # Examples
225 ///
226 /// ```compile_fail
227 /// use spring_batch_rs::item::orm::OrmItemReader;
228 ///
229 /// // Small page size for memory-constrained environments
230 /// let reader = OrmItemReader::<String>::new()
231 /// .page_size(50);
232 ///
233 /// // Larger page size for better performance with ample memory
234 /// let reader = OrmItemReader::<String>::new()
235 /// .page_size(1000);
236 /// ```
237 pub fn page_size(mut self, page_size: u64) -> Self {
238 self.page_size = Some(page_size);
239 // Update buffer capacity if available
240 self.buffer = RefCell::new(Vec::with_capacity(page_size as usize));
241 self
242 }
243
244 /// Reads a page of items from the database using SeaORM.
245 ///
246 /// This method executes the SeaORM query with pagination parameters (if page_size is set),
247 /// and fills the internal buffer with the results. It uses SeaORM's async methods
248 /// within a blocking context to maintain compatibility with the synchronous ItemReader trait.
249 ///
250 /// # Pagination Logic
251 ///
252 /// - **With pagination**: Uses SeaORM's `paginate()` method to fetch a specific page
253 /// - **Without pagination**: Uses SeaORM's `all()` method to fetch all results
254 ///
255 /// # Buffer Management
256 ///
257 /// - Clears the existing buffer before loading new data
258 /// - Stores all entity models directly in the buffer for sequential access
259 ///
260 /// # Error Handling
261 ///
262 /// Returns a `DbErr` if the database query fails. This error will be converted
263 /// to a `BatchError` by the calling method.
264 async fn read_page_async(&self) -> Result<(), DbErr> {
265 let results = if let Some(page_size) = self.page_size {
266 // Use SeaORM's paginator for efficient pagination
267 // This creates a paginator that can fetch specific pages
268 let paginator = self
269 .query
270 .as_ref()
271 .unwrap()
272 .clone()
273 .paginate(self.connection.unwrap(), page_size);
274 let current_page = self.current_page.get();
275
276 // Fetch the specific page we're currently on
277 // Pages are 0-indexed in SeaORM's paginator
278 paginator.fetch_page(current_page).await?
279 } else {
280 // Load all results at once - suitable for smaller datasets
281 // This executes the query and returns all matching rows
282 self.query
283 .as_ref()
284 .unwrap()
285 .clone()
286 .all(self.connection.unwrap())
287 .await?
288 };
289
290 // Clear the buffer and fill it with entity models
291 let mut buffer = self.buffer.borrow_mut();
292 buffer.clear(); // Remove any existing items from the previous page
293
294 // Store entity models directly in the buffer
295 buffer.extend(results);
296
297 Ok(())
298 }
299
300 /// Reads a page of items from the database.
301 ///
302 /// This is a synchronous wrapper around the async `read_page_async` method.
303 /// It uses tokio's `block_in_place` to run the async operation in a blocking context,
304 /// which is necessary because the `ItemReader` trait is synchronous.
305 ///
306 /// # Async-to-Sync Bridge
307 ///
308 /// SeaORM is inherently async, but the batch framework uses synchronous traits
309 /// for simplicity. This method bridges that gap by:
310 /// 1. Using `block_in_place` to avoid blocking the tokio runtime
311 /// 2. Getting the current runtime handle to execute the async operation
312 /// 3. Converting SeaORM's `DbErr` to the batch framework's `BatchError`
313 ///
314 /// # Error Conversion
315 ///
316 /// Converts SeaORM's `DbErr` to `BatchError::ItemReader` with context information
317 /// to help with debugging database-related issues.
318 fn read_page(&self) -> Result<(), BatchError> {
319 tokio::task::block_in_place(|| {
320 tokio::runtime::Handle::current().block_on(async {
321 self.read_page_async()
322 .await
323 .map_err(|e| BatchError::ItemReader(format!("SeaORM query failed: {}", e)))
324 })
325 })
326 }
327}
328
329/// Implementation of ItemReader trait for OrmItemReader.
330///
331/// This implementation provides a way to read items from a database using SeaORM
332/// with support for pagination. It uses an internal buffer to store the results
333/// of database queries and keeps track of the current offset to determine when
334/// a new page of data needs to be fetched.
335///
336/// # Reading Strategy
337///
338/// The reader implements a lazy-loading strategy:
339/// 1. **First read**: Loads the first page of data
340/// 2. **Subsequent reads**: Returns items from the buffer
341/// 3. **Page boundary**: When the buffer is exhausted, loads the next page
342/// 4. **End of data**: Returns None when no more data is available
343///
344/// # State Management
345///
346/// - `offset`: Tracks the absolute position across all pages
347/// - `current_page`: Tracks which page we're currently reading from
348/// - `buffer`: Holds the current page's data
349///
350/// The reader maintains these invariants:
351/// - `offset` always represents the next item to be read
352/// - `current_page` represents the page currently loaded in the buffer
353/// - The buffer contains items for the current page only
354impl<I> ItemReader<I::Model> for OrmItemReader<'_, I>
355where
356 I: EntityTrait,
357 I::Model: FromQueryResult + Send + Sync + Clone,
358{
359 /// Reads the next item from the reader.
360 ///
361 /// This method manages pagination internally and provides a simple interface
362 /// for sequential reading. The complexity of pagination is hidden from the caller.
363 ///
364 /// # Reading Algorithm
365 ///
366 /// 1. **Calculate buffer index**: Determine where we are within the current page
367 /// 2. **Check if new page needed**: If at the start of a page, load new data
368 /// 3. **Fetch from buffer**: Get the item at the current index
369 /// 4. **Update counters**: Increment offset and page number as needed
370 /// 5. **Return result**: Clone the item or return None if exhausted
371 ///
372 /// # Pagination Logic
373 ///
374 /// - **With pagination**: `index = offset % page_size`
375 /// - **Without pagination**: `index = offset` (all data in one "page")
376 /// - **New page trigger**: When `index == 0`, we need to load a new page
377 /// - **Page advancement**: When we reach the end of a page, increment `current_page`
378 ///
379 /// # Memory Management
380 ///
381 /// Items are cloned when returned to ensure the caller owns the data.
382 /// This prevents borrowing issues and allows the buffer to be modified
383 /// for the next page load.
384 ///
385 /// # Returns
386 ///
387 /// - `Ok(Some(item))` if an item was successfully read
388 /// - `Ok(None)` if there are no more items to read
389 /// - `Err(BatchError)` if an error occurred during reading (e.g., database error)
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
395 /// use spring_batch_rs::core::item::ItemReader;
396 /// use sea_orm::FromQueryResult;
397 /// use serde::{Deserialize, Serialize};
398 ///
399 /// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
400 /// struct Product {
401 /// id: i32,
402 /// name: String,
403 /// price: f64,
404 /// }
405 ///
406 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
407 /// // Assuming you have a database connection and query set up
408 /// // let reader = OrmItemReaderBuilder::new()
409 /// // .connection(&db)
410 /// // .query(products::Entity::find())
411 /// // .page_size(50)
412 /// // .build();
413 ///
414 /// // Read all products
415 /// // let mut products = Vec::new();
416 /// // while let Some(product) = reader.read()? {
417 /// // products.push(product);
418 /// // }
419 /// # Ok(())
420 /// # }
421 /// ```
422 fn read(&self) -> ItemReaderResult<I::Model> {
423 // Calculate the index within the current page
424 // This determines where we are in the current buffer
425 let index = if let Some(page_size) = self.page_size {
426 // With pagination: index is position within the current page
427 self.offset.get() % page_size
428 } else {
429 // Without pagination: index is the absolute position
430 self.offset.get()
431 };
432
433 // When index is 0, we've reached the start of a new page
434 // or this is the first read operation, so we need to fetch data
435 if index == 0 {
436 self.read_page()?
437 }
438
439 // Retrieve the item at the current index from the buffer
440 let buffer = self.buffer.borrow();
441 let result = buffer.get(index as usize);
442
443 match result {
444 Some(item) => {
445 // We found an item at the current position
446
447 // Increment the offset for the next read operation
448 // This moves us to the next item in the sequence
449 self.offset.set(self.offset.get() + 1);
450
451 // If we're using pagination and have reached the end of the current page,
452 // increment the page number for the next page load
453 if let Some(page_size) = self.page_size {
454 if self.offset.get().is_multiple_of(page_size) {
455 // We've read all items in the current page
456 // Move to the next page for the next read cycle
457 self.current_page.set(self.current_page.get() + 1);
458 }
459 }
460
461 // Clone the item to give ownership to the caller
462 // This prevents borrowing conflicts with the buffer
463 Ok(Some(item.clone()))
464 }
465 None => {
466 // No more items in the current buffer
467 // This means we've reached the end of the data
468
469 // If we're using pagination, this might mean we've reached the end of all data
470 // If not using pagination, this definitely means we're done
471 Ok(None)
472 }
473 }
474 }
475}
476
477/// Builder for creating a `OrmItemReader`.
478///
479/// This builder provides a convenient way to configure and create a `OrmItemReader`
480/// with custom parameters like page size and database connection.
481/// It follows the builder pattern to ensure all required components are provided
482/// before creating the reader instance.
483///
484/// # Builder Pattern Benefits
485///
486/// - **Fluent API**: Method chaining for readable configuration
487/// - **Compile-time Safety**: Required fields are enforced at build time
488/// - **Flexibility**: Optional parameters can be omitted
489/// - **Validation**: Build method validates all required components are present
490///
491/// # Required Components
492///
493/// The following components must be set before calling `build()`:
494/// - **Connection**: Database connection for executing queries
495/// - **Query**: SeaORM select query to execute
496///
497/// # Optional Components
498///
499/// - **Page Size**: When set, enables pagination for memory-efficient reading
500///
501/// # Usage Pattern
502///
503/// ```text
504/// let reader = OrmItemReaderBuilder::new()
505/// .connection(&db) // Required
506/// .query(entity_query) // Required
507/// .page_size(100) // Optional
508/// .build(); // Creates the reader
509/// ```
510///
511/// # Examples
512///
513/// ```
514/// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
515/// use sea_orm::{Database, EntityTrait, FromQueryResult};
516/// use serde::{Deserialize, Serialize};
517///
518/// #[derive(Debug, Clone, FromQueryResult, Deserialize, Serialize)]
519/// struct Order {
520/// id: i32,
521/// customer_name: String,
522/// total_amount: f64,
523/// status: String,
524/// }
525///
526/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
527/// // Create database connection
528/// let db = Database::connect("postgresql://user:pass@localhost/db").await?;
529///
530/// // Create a query (assuming you have an orders entity)
531/// // let query = orders::Entity::find()
532/// // .filter(orders::Column::Status.eq("pending"));
533///
534/// // Create the reader with explicit type annotations
535/// // let reader: OrmItemReader<orders::Entity> = OrmItemReaderBuilder::new()
536/// // .connection(&db)
537/// // .query(query)
538/// // .page_size(100) // Process 100 orders at a time
539/// // .build();
540/// # Ok(())
541/// # }
542/// ```
543pub struct OrmItemReaderBuilder<'a, I>
544where
545 I: EntityTrait,
546{
547 /// Database connection - None until set by the user
548 /// This will be validated as required during build()
549 connection: Option<&'a DatabaseConnection>,
550
551 /// SeaORM select query - None until set by the user
552 /// This will be validated as required during build()
553 query: Option<Select<I>>,
554
555 /// Optional page size for pagination
556 /// When None, all data will be loaded at once
557 /// When Some(size), data will be loaded in chunks
558 page_size: Option<u64>,
559}
560
561impl<I> Default for OrmItemReaderBuilder<'_, I>
562where
563 I: EntityTrait,
564{
565 /// Creates a new builder with all fields set to None/default values.
566 ///
567 /// This is the starting point for the builder pattern. All required
568 /// fields must be set before calling `build()`.
569 fn default() -> Self {
570 Self {
571 connection: None,
572 query: None,
573 page_size: None,
574 }
575 }
576}
577
578impl<'a, I> OrmItemReaderBuilder<'a, I>
579where
580 I: EntityTrait,
581 I::Model: FromQueryResult + Send + Sync + Clone,
582{
583 /// Sets the page size for the reader.
584 ///
585 /// When set, the reader will use pagination to load data in chunks of the specified size.
586 /// This is useful for processing large datasets without loading everything into memory.
587 ///
588 /// # Memory Management
589 ///
590 /// - **Small page sizes** (e.g., 10-100): Lower memory usage, more database round trips
591 /// - **Large page sizes** (e.g., 1000-10000): Higher memory usage, fewer database round trips
592 /// - **No pagination** (None): All data loaded at once, highest memory usage
593 ///
594 /// # Performance Considerations
595 ///
596 /// - Choose page size based on available memory and network latency
597 /// - Consider the size of your data rows when setting page size
598 /// - Monitor memory usage and adjust as needed
599 ///
600 /// # Arguments
601 ///
602 /// * `page_size` - The number of items to read per page (must be > 0 for meaningful pagination)
603 ///
604 /// # Returns
605 ///
606 /// The updated `OrmItemReaderBuilder` instance for method chaining.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
612 /// use sea_orm::FromQueryResult;
613 /// use serde::{Deserialize, Serialize};
614 ///
615 /// // In practice, you would use actual SeaORM entities
616 /// // #[derive(Debug, Clone, FromQueryResult)]
617 /// // struct Record {
618 /// // id: i32,
619 /// // data: String,
620 /// // }
621 /// //
622 /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
623 /// // .page_size(50); // Process 50 records at a time
624 /// ```
625 pub fn page_size(mut self, page_size: u64) -> Self {
626 self.page_size = Some(page_size);
627 self
628 }
629
630 /// Sets the SeaORM select query for the reader.
631 ///
632 /// This query will be executed to fetch data from the database. The query can include
633 /// filters, joins, ordering, and other SeaORM query operations. The query will be
634 /// cloned for each page fetch, so it should be relatively lightweight to clone.
635 ///
636 /// # Query Design Considerations
637 ///
638 /// - **Ordering**: Include `ORDER BY` clauses for consistent pagination
639 /// - **Filtering**: Apply filters to reduce the dataset size
640 /// - **Joins**: Use joins to fetch related data in a single query
641 /// - **Indexing**: Ensure proper database indexes for query performance
642 ///
643 /// # Pagination Compatibility
644 ///
645 /// When using pagination, the query should:
646 /// - Have a deterministic order (use ORDER BY)
647 /// - Not use LIMIT/OFFSET (handled by the paginator)
648 /// - Be compatible with SeaORM's paginator
649 ///
650 /// # Arguments
651 ///
652 /// * `query` - The SeaORM select query to execute
653 ///
654 /// # Returns
655 ///
656 /// The updated `OrmItemReaderBuilder` instance for method chaining.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
662 /// use sea_orm::{EntityTrait, QueryFilter};
663 /// use serde::{Deserialize, Serialize};
664 ///
665 /// // In practice, you would use actual SeaORM entities
666 /// // let query = user::Entity::find()
667 /// // .filter(user::Column::Active.eq(true))
668 /// // .order_by_asc(user::Column::Id);
669 /// //
670 /// // let builder = OrmItemReaderBuilder::<user::Entity>::new()
671 /// // .query(query);
672 /// ```
673 pub fn query(mut self, query: Select<I>) -> Self {
674 self.query = Some(query);
675 self
676 }
677
678 /// Sets the database connection for the reader.
679 ///
680 /// This connection will be used to execute the SeaORM queries. The connection
681 /// must remain valid for the entire lifetime of the reader, which is enforced
682 /// by the lifetime parameter 'a.
683 ///
684 /// # Connection Management
685 ///
686 /// - The connection is borrowed, not owned by the reader
687 /// - Ensure the connection pool/manager outlives the reader
688 /// - The connection should be properly configured for your database
689 ///
690 /// # Database Compatibility
691 ///
692 /// SeaORM supports multiple databases:
693 /// - PostgreSQL
694 /// - MySQL
695 /// - SQLite
696 /// - SQL Server (via sqlx)
697 ///
698 /// # Arguments
699 ///
700 /// * `connection` - The SeaORM database connection (must outlive the reader)
701 ///
702 /// # Returns
703 ///
704 /// The updated `OrmItemReaderBuilder` instance for method chaining.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
710 /// use sea_orm::{Database, EntityTrait};
711 /// use serde::{Deserialize, Serialize};
712 ///
713 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
714 /// let db = Database::connect("sqlite::memory:").await?;
715 ///
716 /// // In practice, you would use actual SeaORM entities
717 /// // let builder = OrmItemReaderBuilder::<product::Entity>::new()
718 /// // .connection(&db);
719 /// # Ok(())
720 /// # }
721 /// ```
722 pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
723 self.connection = Some(connection);
724 self
725 }
726
727 /// Builds the ORM item reader with the configured parameters.
728 ///
729 /// This method validates that all required parameters have been set and creates
730 /// a new `OrmItemReader` instance.
731 ///
732 /// # Returns
733 /// A configured `OrmItemReader` instance
734 ///
735 /// # Panics
736 /// Panics if required parameters (connection and query) are missing.
737 ///
738 /// # Validation
739 ///
740 /// The builder performs the following validation:
741 /// - Ensures a database connection has been provided
742 /// - Ensures a SeaORM query has been provided
743 /// - Page size is optional and defaults to loading all data at once
744 ///
745 /// If any required parameter is missing, the method will panic with a descriptive error message.
746 ///
747 /// # Examples
748 ///
749 /// ```no_run
750 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
751 /// use sea_orm::{Database, EntityTrait};
752 ///
753 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
754 /// let db = Database::connect("sqlite::memory:").await?;
755 ///
756 /// // With pagination
757 /// // let reader = OrmItemReaderBuilder::new()
758 /// // .connection(&db)
759 /// // .query(MyEntity::find())
760 /// // .page_size(100)
761 /// // .build();
762 ///
763 /// // Without pagination (load all at once)
764 /// // let reader = OrmItemReaderBuilder::new()
765 /// // .connection(&db)
766 /// // .query(MyEntity::find())
767 /// // .build();
768 /// # Ok(())
769 /// # }
770 /// ```
771 pub fn build(self) -> OrmItemReader<'a, I> {
772 let mut reader = OrmItemReader::new()
773 .connection(self.connection.expect("Database connection is required"))
774 .query(self.query.expect("Query is required"));
775
776 if let Some(page_size) = self.page_size {
777 reader = reader.page_size(page_size);
778 }
779
780 reader
781 }
782
783 /// Creates a new `OrmItemReaderBuilder`.
784 ///
785 /// This is the entry point for the builder pattern. It creates a new builder
786 /// instance with all fields set to their default values (None for optional fields).
787 ///
788 /// # Builder Lifecycle
789 ///
790 /// 1. **Creation**: `new()` creates an empty builder
791 /// 2. **Configuration**: Use setter methods to configure the reader
792 /// 3. **Validation**: `build()` validates and creates the final reader
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// use spring_batch_rs::item::orm::OrmItemReaderBuilder;
798 /// use sea_orm::EntityTrait;
799 /// use serde::{Deserialize, Serialize};
800 ///
801 /// // In practice, you would use actual SeaORM entities
802 /// // let builder = OrmItemReaderBuilder::<record::Entity>::new()
803 /// // .page_size(50); // Process 50 records at a time
804 /// ```
805 pub fn new() -> Self {
806 Self::default()
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use crate::core::item::ItemReader;
814 use sea_orm::{
815 entity::prelude::*, ActiveValue::Set, DatabaseBackend, MockDatabase, MockExecResult,
816 };
817
818 // Minimal entity definition for testing
819 #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
820 #[sea_orm(table_name = "record")]
821 pub struct Model {
822 #[sea_orm(primary_key)]
823 pub id: i32,
824 pub name: String,
825 }
826
827 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
828 pub enum Relation {}
829
830 impl ActiveModelBehavior for ActiveModel {}
831
832 // --- OrmItemReader unit tests ---
833
834 #[test]
835 fn should_create_reader_with_default_state() {
836 let reader = OrmItemReader::<Entity>::new();
837 assert!(
838 reader.connection.is_none(),
839 "connection should start as None"
840 );
841 assert_eq!(reader.page_size, None);
842 assert_eq!(reader.offset.get(), 0);
843 assert_eq!(reader.current_page.get(), 0);
844 assert!(reader.buffer.borrow().is_empty());
845 }
846
847 #[test]
848 fn should_set_page_size_via_method() {
849 let reader = OrmItemReader::<Entity>::new().page_size(50);
850 assert_eq!(reader.page_size, Some(50));
851 // buffer capacity should be pre-allocated
852 assert_eq!(reader.buffer.borrow().capacity(), 50);
853 }
854
855 #[tokio::test(flavor = "multi_thread")]
856 async fn should_return_none_when_database_is_empty() {
857 let db = MockDatabase::new(DatabaseBackend::Sqlite)
858 .append_query_results([Vec::<Model>::new()])
859 .into_connection();
860
861 let reader = OrmItemReader::<Entity>::new()
862 .connection(&db)
863 .query(Entity::find());
864
865 let result = reader.read().unwrap();
866 assert!(result.is_none(), "empty DB should yield None");
867 }
868
869 #[tokio::test(flavor = "multi_thread")]
870 async fn should_read_single_item_then_return_none() {
871 let db = MockDatabase::new(DatabaseBackend::Sqlite)
872 .append_query_results([vec![Model {
873 id: 1,
874 name: "Alice".to_string(),
875 }]])
876 .into_connection();
877
878 let reader = OrmItemReader::<Entity>::new()
879 .connection(&db)
880 .query(Entity::find());
881
882 let first = reader.read().unwrap().expect("first item should exist");
883 assert_eq!(first.name, "Alice");
884 assert_eq!(reader.offset.get(), 1);
885
886 let second = reader.read().unwrap();
887 assert!(second.is_none(), "should return None after the only item");
888 }
889
890 #[tokio::test(flavor = "multi_thread")]
891 async fn should_read_multiple_items_without_pagination() {
892 let db = MockDatabase::new(DatabaseBackend::Sqlite)
893 .append_query_results([vec![
894 Model {
895 id: 1,
896 name: "Alice".to_string(),
897 },
898 Model {
899 id: 2,
900 name: "Bob".to_string(),
901 },
902 ]])
903 .into_connection();
904
905 let reader = OrmItemReader::<Entity>::new()
906 .connection(&db)
907 .query(Entity::find());
908
909 let a = reader.read().unwrap().unwrap();
910 assert_eq!(a.name, "Alice");
911 let b = reader.read().unwrap().unwrap();
912 assert_eq!(b.name, "Bob");
913 assert!(reader.read().unwrap().is_none());
914 }
915
916 #[tokio::test(flavor = "multi_thread")]
917 async fn should_paginate_across_multiple_pages() {
918 // page_size=1: two DB calls, each returning one item, then empty
919 let db = MockDatabase::new(DatabaseBackend::Sqlite)
920 .append_query_results([
921 vec![Model {
922 id: 1,
923 name: "P1".to_string(),
924 }],
925 vec![Model {
926 id: 2,
927 name: "P2".to_string(),
928 }],
929 vec![], // signals end of data
930 ])
931 .into_connection();
932
933 let reader = OrmItemReader::<Entity>::new()
934 .connection(&db)
935 .query(Entity::find())
936 .page_size(1);
937
938 let first = reader.read().unwrap().unwrap();
939 assert_eq!(first.name, "P1");
940 assert_eq!(
941 reader.current_page.get(),
942 1,
943 "page should advance after full page"
944 );
945
946 let second = reader.read().unwrap().unwrap();
947 assert_eq!(second.name, "P2");
948
949 assert!(
950 reader.read().unwrap().is_none(),
951 "should stop after all pages"
952 );
953 }
954
955 // --- OrmItemReaderBuilder unit tests ---
956
957 #[test]
958 fn should_create_builder_with_default_state() {
959 let builder = OrmItemReaderBuilder::<Entity>::new();
960 assert!(builder.connection.is_none());
961 assert!(builder.query.is_none());
962 assert_eq!(builder.page_size, None);
963 }
964
965 #[test]
966 fn should_set_page_size_on_builder() {
967 let builder = OrmItemReaderBuilder::<Entity>::new().page_size(100);
968 assert_eq!(builder.page_size, Some(100));
969 }
970
971 #[test]
972 #[should_panic(expected = "Database connection is required")]
973 fn should_panic_when_building_without_connection() {
974 OrmItemReaderBuilder::<Entity>::new()
975 .query(Entity::find())
976 .build();
977 }
978
979 #[test]
980 #[should_panic(expected = "Query is required")]
981 fn should_panic_when_building_without_query() {
982 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
983 OrmItemReaderBuilder::<Entity>::new()
984 .connection(&db)
985 .build();
986 }
987
988 #[test]
989 fn should_build_reader_with_connection_and_query() {
990 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
991 let reader = OrmItemReaderBuilder::<Entity>::new()
992 .connection(&db)
993 .query(Entity::find())
994 .build();
995 assert!(reader.connection.is_some());
996 assert_eq!(reader.page_size, None);
997 }
998
999 #[test]
1000 fn should_build_reader_with_page_size() {
1001 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
1002 let reader = OrmItemReaderBuilder::<Entity>::new()
1003 .connection(&db)
1004 .query(Entity::find())
1005 .page_size(50)
1006 .build();
1007 assert_eq!(reader.page_size, Some(50));
1008 }
1009}