Skip to main content

spring_batch_rs/item/orm/
orm_writer.rs

1use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr, EntityTrait, InsertResult};
2use std::marker::PhantomData;
3
4use crate::{
5    BatchError,
6    core::item::{ItemWriter, ItemWriterResult},
7};
8
9/// A writer for writing ORM active models directly to a database.
10///
11/// This writer provides an implementation of the `ItemWriter` trait for ORM-based
12/// database operations. It works directly with ORM active models, eliminating the
13/// need for mapper layers and providing a simple, efficient interface for batch
14/// database operations.
15///
16/// # Design Philosophy
17///
18/// The writer follows the "direct entity" approach used throughout the Spring Batch RS
19/// ORM integration:
20/// - **No Mappers**: Works directly with ORM active models, no transformation layer
21/// - **Type Safety**: Leverages ORM's compile-time type safety
22/// - **Efficiency**: Direct operations without intermediate conversions
23/// - **Simplicity**: Clean API with minimal configuration required
24///
25/// # Trait Bounds Design
26///
27/// The writer requires `A: ActiveModelTrait + Send` where:
28/// - `ActiveModelTrait` provides the associated Entity type and database operations
29/// - `Send` enables safe transfer across async boundaries
30/// - The Entity type is automatically inferred from `<A as ActiveModelTrait>::Entity`
31///
32/// Note: `IntoActiveModel<A>` is automatically provided by SeaORM's blanket implementation
33/// for all types that implement `ActiveModelTrait`, so it's not explicitly required.
34///
35/// # Usage Pattern
36///
37/// Users should convert their business objects to ORM active models before writing,
38/// either manually or using processors in the batch pipeline. This approach provides
39/// maximum flexibility and performance.
40///
41/// # Database Operations
42///
43/// The writer uses ORM's built-in batch insert capabilities:
44/// - **Connection Management**: Uses ORM's connection management for database operations
45/// - **Batch Operations**: Performs batch inserts to minimize database round trips
46/// - **Transaction Support**: Leverages ORM's transaction handling for consistency
47/// - **Type Safety**: Leverages ORM's type-safe active model operations
48///
49/// # Thread Safety
50///
51/// This writer is **not thread-safe** as it's designed for single-threaded batch processing
52/// scenarios. If you need concurrent access, consider using multiple writer instances.
53///
54/// # Database Support
55///
56/// This writer supports all databases that SeaORM supports:
57/// - PostgreSQL
58/// - MySQL
59/// - SQLite
60/// - SQL Server (limited support)
61///
62/// # Examples
63///
64/// ```
65/// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
66/// use spring_batch_rs::core::item::ItemWriter;
67/// use sea_orm::{Database, ActiveValue::Set};
68/// use serde::{Deserialize, Serialize};
69///
70/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71/// // Create database connection
72/// let db = Database::connect("sqlite::memory:").await?;
73///
74/// // Create the writer with single type parameter (just the ActiveModel)
75/// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
76/// //     .connection(&db)
77/// //     .build();
78///
79/// // Work directly with ORM active models
80/// // let active_models = vec![
81/// //     product::ActiveModel {
82/// //         name: Set("Laptop".to_string()),
83/// //         category: Set("Electronics".to_string()),
84/// //         price: Set(999.99),
85/// //         in_stock: Set(true),
86/// //         ..Default::default()
87/// //     },
88/// // ];
89/// // writer.write(&active_models)?;
90/// # Ok(())
91/// # }
92/// ```
93pub struct OrmItemWriter<'a, O>
94where
95    O: ActiveModelTrait + Send,
96{
97    /// Database connection reference
98    /// This ensures the connection remains valid throughout the writer's lifecycle
99    connection: &'a DatabaseConnection,
100    /// Phantom data to track the active model type
101    _phantom: PhantomData<O>,
102}
103
104impl<'a, O> OrmItemWriter<'a, O>
105where
106    O: ActiveModelTrait + Send,
107{
108    /// Creates a new ORM item writer.
109    ///
110    /// # Parameters
111    /// - `connection`: Database connection reference
112    ///
113    /// # Returns
114    /// A new ORM item writer instance
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use spring_batch_rs::item::orm::OrmItemWriter;
120    /// use sea_orm::Database;
121    ///
122    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
123    /// let db = Database::connect("sqlite::memory:").await?;
124    ///
125    /// // Create writer for your ORM active model type (Entity is inferred)
126    /// // let writer = OrmItemWriter::<product::ActiveModel>::new(&db);
127    /// # Ok(())
128    /// # }
129    /// ```
130    pub fn new(connection: &'a DatabaseConnection) -> Self {
131        Self {
132            connection,
133            _phantom: PhantomData,
134        }
135    }
136
137    /// Performs the actual database insert operation asynchronously.
138    ///
139    /// This method converts the runtime to handle async operations within
140    /// the synchronous ItemWriter interface.
141    ///
142    /// # Parameters
143    /// - `active_models`: Vector of active models to insert
144    ///
145    /// # Returns
146    /// - `Ok(InsertResult)` if the insert operation succeeds
147    /// - `Err(DbErr)` if the database operation fails
148    async fn insert_batch_async(&self, active_models: Vec<O>) -> Result<InsertResult<O>, DbErr> {
149        <O as ActiveModelTrait>::Entity::insert_many(active_models)
150            .exec(self.connection)
151            .await
152    }
153
154    /// Performs a batch insert operation.
155    ///
156    /// This method handles the conversion between sync and async contexts
157    /// using tokio's block_in_place to avoid blocking the async runtime.
158    ///
159    /// # Parameters
160    /// - `active_models`: Vector of active models to insert
161    ///
162    /// # Returns
163    /// - `Ok(())` if the insert operation succeeds
164    /// - `Err(BatchError)` if the operation fails
165    fn insert_batch(&self, active_models: Vec<O>) -> Result<(), BatchError> {
166        // Use tokio's block_in_place to handle async operations in sync context
167        // This is the same pattern used in the ORM reader
168        let result = tokio::task::block_in_place(|| {
169            tokio::runtime::Handle::current()
170                .block_on(async { self.insert_batch_async(active_models).await })
171        });
172
173        match result {
174            Ok(_insert_result) => {
175                log::debug!("Successfully inserted batch to database");
176                Ok(())
177            }
178            Err(db_err) => {
179                let error_msg = format!("Failed to insert batch to database: {}", db_err);
180                log::error!("{}", error_msg);
181                Err(BatchError::ItemWriter(error_msg))
182            }
183        }
184    }
185}
186
187impl<O> ItemWriter<O> for OrmItemWriter<'_, O>
188where
189    O: ActiveModelTrait + Send,
190{
191    /// Writes ORM active models directly to the database.
192    ///
193    /// This method performs batch insert operations for efficiency, writing all
194    /// active models in a single database operation when possible.
195    ///
196    /// # Process Flow
197    ///
198    /// 1. **Validation**: Check if there are items to write
199    /// 2. **Batch Insert**: Perform a single batch insert operation
200    /// 3. **Error Handling**: Convert any database errors to BatchError
201    ///
202    /// # Parameters
203    /// - `items`: A slice of ORM active models to write to the database
204    ///
205    /// # Returns
206    /// - `Ok(())` if all items are successfully written
207    /// - `Err(BatchError)` if any error occurs during the process
208    ///
209    /// # Database Operations
210    ///
211    /// The method uses ORM's `insert_many()` function, which:
212    /// - Generates a single INSERT statement with multiple VALUE clauses
213    /// - Minimizes database round trips for better performance
214    /// - Maintains transactional consistency for the entire batch
215    /// - Returns the number of affected rows
216    ///
217    /// # Error Handling
218    ///
219    /// Errors can occur during database operations such as:
220    /// - Constraint violations (unique, foreign key, etc.)
221    /// - Connection failures
222    /// - Invalid data types or values
223    ///
224    /// All errors are converted to `BatchError::ItemWriter` with descriptive messages.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
230    /// use spring_batch_rs::core::item::ItemWriter;
231    /// use sea_orm::{Database, ActiveValue::Set};
232    ///
233    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
234    /// let db = Database::connect("sqlite::memory:").await?;
235    ///
236    /// // let writer: OrmItemWriter<user::ActiveModel> = OrmItemWriterBuilder::new()
237    /// //     .connection(&db)
238    /// //     .build();
239    ///
240    /// // Write ORM active models directly
241    /// // let active_models = vec![
242    /// //     user::ActiveModel {
243    /// //         name: Set("Alice".to_string()),
244    /// //         email: Set("alice@example.com".to_string()),
245    /// //         ..Default::default()
246    /// //     },
247    /// //     user::ActiveModel {
248    /// //         name: Set("Bob".to_string()),
249    /// //         email: Set("bob@example.com".to_string()),
250    /// //         ..Default::default()
251    /// //     },
252    /// // ];
253    /// // writer.write(&active_models)?;
254    /// # Ok(())
255    /// # }
256    /// ```
257    fn write(&self, items: &[O]) -> ItemWriterResult {
258        log::debug!("Writing {} active models to database", items.len());
259
260        if items.is_empty() {
261            log::debug!("No items to write, skipping database operation");
262            return Ok(());
263        }
264
265        // Clone all active models for the batch insert
266        let active_models: Vec<O> = items.to_vec();
267
268        // Perform batch insert
269        self.insert_batch(active_models)?;
270
271        log::info!(
272            "Successfully wrote {} active models to database",
273            items.len()
274        );
275        Ok(())
276    }
277
278    /// Flushes any pending operations.
279    ///
280    /// For the ORM writer, this is a no-op since each write operation
281    /// immediately commits to the database. There are no pending operations
282    /// to flush.
283    ///
284    /// # Returns
285    /// Always returns `Ok(())`
286    fn flush(&self) -> ItemWriterResult {
287        log::debug!("Flush called on ORM writer (no-op)");
288        Ok(())
289    }
290
291    /// Opens the writer for writing.
292    ///
293    /// For the ORM writer, this is a no-op since ORM manages
294    /// database connections internally and no special initialization is required.
295    ///
296    /// # Returns
297    /// Always returns `Ok(())`
298    fn open(&self) -> ItemWriterResult {
299        log::debug!("Opened ORM writer");
300        Ok(())
301    }
302
303    /// Closes the writer and releases any resources.
304    ///
305    /// For the ORM writer, this is a no-op since ORM manages
306    /// database connections internally and no special cleanup is required.
307    ///
308    /// # Returns
309    /// Always returns `Ok(())`
310    fn close(&self) -> ItemWriterResult {
311        log::debug!("Closed ORM writer");
312        Ok(())
313    }
314}
315
316/// A builder for creating ORM item writers.
317///
318/// This builder allows you to configure an ORM item writer with the necessary
319/// database connection. Since the writer now works directly with ORM active models,
320/// no mapper configuration is required.
321///
322/// # Design Pattern
323///
324/// This struct implements the Builder pattern, which allows for fluent, chainable
325/// configuration of an `OrmItemWriter` before creation. The simplified design
326/// requires only a database connection and infers the Entity type from the
327/// ActiveModel's associated type.
328///
329/// # Required Configuration
330///
331/// The following parameter is required and must be set before calling `build()`:
332/// - **Connection**: Database connection reference
333///
334/// # Examples
335///
336/// ```
337/// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
338/// use sea_orm::Database;
339///
340/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341/// let db = Database::connect("sqlite::memory:").await?;
342///
343/// // Only need to specify the ActiveModel type - Entity is inferred!
344/// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
345/// //     .connection(&db);
346/// # Ok(())
347/// # }
348/// ```
349#[derive(Default)]
350pub struct OrmItemWriterBuilder<'a, O>
351where
352    O: ActiveModelTrait + Send,
353{
354    /// Database connection - None until set by the user
355    /// This will be validated as required during build()
356    connection: Option<&'a DatabaseConnection>,
357    /// Phantom data to track the active model type
358    _phantom: PhantomData<O>,
359}
360
361impl<'a, O> OrmItemWriterBuilder<'a, O>
362where
363    O: ActiveModelTrait + Send,
364{
365    /// Creates a new ORM item writer builder.
366    ///
367    /// All configuration options start as None and must be set before calling `build()`.
368    ///
369    /// # Returns
370    /// A new builder instance with default configuration
371    ///
372    /// # Examples
373    ///
374    /// ```
375    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
376    ///
377    /// // Create a new builder (only need ActiveModel type!)
378    /// // let builder = OrmItemWriterBuilder::<MyActiveModel>::new();
379    /// ```
380    pub fn new() -> Self {
381        Self {
382            connection: None,
383            _phantom: PhantomData,
384        }
385    }
386
387    /// Sets the database connection for the item writer.
388    ///
389    /// This parameter is **required**. The builder will panic during `build()`
390    /// if this parameter is not set.
391    ///
392    /// # Parameters
393    /// - `connection`: Reference to the ORM database connection
394    ///
395    /// # Returns
396    /// The updated builder instance for method chaining
397    ///
398    /// # Connection Lifecycle
399    ///
400    /// The connection reference must remain valid for the entire lifetime of the
401    /// resulting writer. The writer does not take ownership of the connection,
402    /// allowing it to be shared across multiple components.
403    ///
404    /// # Examples
405    ///
406    /// ```
407    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
408    /// use sea_orm::Database;
409    ///
410    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411    /// let db = Database::connect("sqlite::memory:").await?;
412    ///
413    /// // Much cleaner with single generic parameter!
414    /// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
415    /// //     .connection(&db);
416    /// # Ok(())
417    /// # }
418    /// ```
419    pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
420        self.connection = Some(connection);
421        self
422    }
423
424    /// Builds the ORM item writer with the configured parameters.
425    ///
426    /// This method validates that all required parameters have been set and creates
427    /// a new `OrmItemWriter` instance.
428    ///
429    /// # Returns
430    /// A configured `OrmItemWriter` instance
431    ///
432    /// # Panics
433    /// Panics if the required database connection parameter is missing.
434    ///
435    /// # Validation
436    ///
437    /// The builder performs the following validation:
438    /// - Ensures a database connection has been provided
439    ///
440    /// If any validation fails, the method will panic with a descriptive error message.
441    ///
442    /// # Examples
443    ///
444    /// ```
445    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
446    /// use sea_orm::Database;
447    ///
448    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
449    /// let db = Database::connect("sqlite::memory:").await?;
450    ///
451    /// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
452    /// //     .connection(&db)
453    /// //     .build();
454    /// # Ok(())
455    /// # }
456    /// ```
457    pub fn build(self) -> OrmItemWriter<'a, O> {
458        let connection = self
459            .connection
460            .expect("Database connection is required. Call .connection() before .build()");
461
462        OrmItemWriter::new(connection)
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use sea_orm::{
470        ActiveValue::{NotSet, Set},
471        entity::prelude::*,
472    };
473
474    // Mock entity and active model for testing trait bounds
475    #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
476    #[sea_orm(table_name = "test_entity")]
477    pub struct Model {
478        #[sea_orm(primary_key)]
479        pub id: i32,
480        pub name: String,
481    }
482
483    #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
484    pub enum Relation {}
485
486    impl ActiveModelBehavior for ActiveModel {}
487
488    #[test]
489    fn test_simplified_trait_bounds_compilation() {
490        // This test verifies that our simplified trait bounds compile correctly
491        // with only one generic parameter (ActiveModel)
492        // If this compiles, it means:
493        // 1. ActiveModelTrait + Send is sufficient
494        // 2. Entity type can be inferred from <A as ActiveModelTrait>::Entity
495
496        // Test that we can specify trait bounds with just ActiveModel
497        fn _verify_bounds<A>()
498        where
499            A: ActiveModelTrait + Send,
500        {
501            // This function will only compile if our trait bounds are sufficient
502            // for ORM operations
503        }
504
505        // Verify that our actual types satisfy the bounds
506        _verify_bounds::<ActiveModel>();
507
508        // Test that we can create a builder with just ActiveModel
509        let _builder = OrmItemWriterBuilder::<ActiveModel>::new();
510
511        // Test that the builder has the correct type signature
512        assert!(_builder.connection.is_none());
513    }
514
515    #[test]
516    fn test_active_model_creation() {
517        // Test that we can create active models that satisfy our trait bounds
518        let active_model = ActiveModel {
519            id: NotSet,
520            name: Set("Test".to_owned()),
521        };
522
523        // Verify that ActiveModel implements the required traits
524        fn check_traits<A>(_: A)
525        where
526            A: ActiveModelTrait + Send,
527        {
528            // This function will only compile if A satisfies our trait bounds
529        }
530
531        check_traits(active_model);
532    }
533
534    #[test]
535    fn test_write_empty_slice_skips_database_operation() {
536        use sea_orm::{DatabaseBackend, MockDatabase};
537
538        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
539        let writer = OrmItemWriter::<ActiveModel>::new(&db);
540
541        assert!(writer.open().is_ok());
542        assert!(writer.flush().is_ok());
543        assert!(writer.write(&[]).is_ok());
544        assert!(writer.close().is_ok());
545    }
546
547    #[test]
548    fn should_build_writer_via_builder_with_connection() {
549        use sea_orm::{DatabaseBackend, MockDatabase};
550
551        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
552        let writer = OrmItemWriterBuilder::<ActiveModel>::new()
553            .connection(&db)
554            .build();
555
556        // writer is created successfully — verify open/flush/close are no-ops
557        assert!(writer.open().is_ok());
558        assert!(writer.flush().is_ok());
559        assert!(writer.close().is_ok());
560    }
561
562    #[tokio::test(flavor = "multi_thread")]
563    async fn should_write_active_models_to_mock_database() {
564        use sea_orm::{DatabaseBackend, MockDatabase, MockExecResult};
565
566        let db = MockDatabase::new(DatabaseBackend::Sqlite)
567            .append_exec_results([MockExecResult {
568                last_insert_id: 1,
569                rows_affected: 1,
570            }])
571            .into_connection();
572
573        let writer = OrmItemWriter::<ActiveModel>::new(&db);
574        let items = vec![ActiveModel {
575            id: NotSet,
576            name: Set("Alice".to_owned()),
577        }];
578
579        let result = writer.write(&items);
580        assert!(
581            result.is_ok(),
582            "write should succeed with mock DB: {result:?}"
583        );
584    }
585
586    #[tokio::test(flavor = "multi_thread")]
587    async fn should_return_error_when_database_insert_fails() {
588        use crate::BatchError;
589        use sea_orm::{DatabaseBackend, DbErr, MockDatabase};
590
591        // MockDatabase with no exec results → any insert returns an error
592        let db = MockDatabase::new(DatabaseBackend::Sqlite)
593            .append_exec_errors([DbErr::Custom("insert failed".to_owned())])
594            .into_connection();
595
596        let writer = OrmItemWriter::<ActiveModel>::new(&db);
597        let items = vec![ActiveModel {
598            id: NotSet,
599            name: Set("Fail".to_owned()),
600        }];
601
602        let result = writer.write(&items);
603        assert!(
604            result.is_err(),
605            "write should fail when database returns error"
606        );
607        match result {
608            Err(BatchError::ItemWriter(msg)) => {
609                assert!(msg.contains("Failed to insert"), "unexpected error: {msg}")
610            }
611            other => panic!("expected ItemWriter error, got {other:?}"),
612        }
613    }
614}