spring_batch_rs/item/orm/orm_writer.rs
1use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr, EntityTrait, InsertResult};
2use std::marker::PhantomData;
3
4use crate::{
5 BatchError,
6 core::item::{ItemWriter, ItemWriterResult},
7};
8
9/// A writer for writing ORM active models directly to a database.
10///
11/// This writer provides an implementation of the `ItemWriter` trait for ORM-based
12/// database operations. It works directly with ORM active models, eliminating the
13/// need for mapper layers and providing a simple, efficient interface for batch
14/// database operations.
15///
16/// # Design Philosophy
17///
18/// The writer follows the "direct entity" approach used throughout the Spring Batch RS
19/// ORM integration:
20/// - **No Mappers**: Works directly with ORM active models, no transformation layer
21/// - **Type Safety**: Leverages ORM's compile-time type safety
22/// - **Efficiency**: Direct operations without intermediate conversions
23/// - **Simplicity**: Clean API with minimal configuration required
24///
25/// # Trait Bounds Design
26///
27/// The writer requires `A: ActiveModelTrait + Send` where:
28/// - `ActiveModelTrait` provides the associated Entity type and database operations
29/// - `Send` enables safe transfer across async boundaries
30/// - The Entity type is automatically inferred from `<A as ActiveModelTrait>::Entity`
31///
32/// Note: `IntoActiveModel<A>` is automatically provided by SeaORM's blanket implementation
33/// for all types that implement `ActiveModelTrait`, so it's not explicitly required.
34///
35/// # Usage Pattern
36///
37/// Users should convert their business objects to ORM active models before writing,
38/// either manually or using processors in the batch pipeline. This approach provides
39/// maximum flexibility and performance.
40///
41/// # Database Operations
42///
43/// The writer uses ORM's built-in batch insert capabilities:
44/// - **Connection Management**: Uses ORM's connection management for database operations
45/// - **Batch Operations**: Performs batch inserts to minimize database round trips
46/// - **Transaction Support**: Leverages ORM's transaction handling for consistency
47/// - **Type Safety**: Leverages ORM's type-safe active model operations
48///
49/// # Thread Safety
50///
51/// This writer is **not thread-safe** as it's designed for single-threaded batch processing
52/// scenarios. If you need concurrent access, consider using multiple writer instances.
53///
54/// # Database Support
55///
56/// This writer supports all databases that SeaORM supports:
57/// - PostgreSQL
58/// - MySQL
59/// - SQLite
60/// - SQL Server (limited support)
61///
62/// # Examples
63///
64/// ```
65/// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
66/// use spring_batch_rs::core::item::ItemWriter;
67/// use sea_orm::{Database, ActiveValue::Set};
68/// use serde::{Deserialize, Serialize};
69///
70/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71/// // Create database connection
72/// let db = Database::connect("sqlite::memory:").await?;
73///
74/// // Create the writer with single type parameter (just the ActiveModel)
75/// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
76/// // .connection(&db)
77/// // .build();
78///
79/// // Work directly with ORM active models
80/// // let active_models = vec![
81/// // product::ActiveModel {
82/// // name: Set("Laptop".to_string()),
83/// // category: Set("Electronics".to_string()),
84/// // price: Set(999.99),
85/// // in_stock: Set(true),
86/// // ..Default::default()
87/// // },
88/// // ];
89/// // writer.write(&active_models)?;
90/// # Ok(())
91/// # }
92/// ```
93pub struct OrmItemWriter<'a, O>
94where
95 O: ActiveModelTrait + Send,
96{
97 /// Database connection reference
98 /// This ensures the connection remains valid throughout the writer's lifecycle
99 connection: &'a DatabaseConnection,
100 /// Phantom data to track the active model type
101 _phantom: PhantomData<O>,
102}
103
104impl<'a, O> OrmItemWriter<'a, O>
105where
106 O: ActiveModelTrait + Send,
107{
108 /// Creates a new ORM item writer.
109 ///
110 /// # Parameters
111 /// - `connection`: Database connection reference
112 ///
113 /// # Returns
114 /// A new ORM item writer instance
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use spring_batch_rs::item::orm::OrmItemWriter;
120 /// use sea_orm::Database;
121 ///
122 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
123 /// let db = Database::connect("sqlite::memory:").await?;
124 ///
125 /// // Create writer for your ORM active model type (Entity is inferred)
126 /// // let writer = OrmItemWriter::<product::ActiveModel>::new(&db);
127 /// # Ok(())
128 /// # }
129 /// ```
130 pub fn new(connection: &'a DatabaseConnection) -> Self {
131 Self {
132 connection,
133 _phantom: PhantomData,
134 }
135 }
136
137 /// Performs the actual database insert operation asynchronously.
138 ///
139 /// This method converts the runtime to handle async operations within
140 /// the synchronous ItemWriter interface.
141 ///
142 /// # Parameters
143 /// - `active_models`: Vector of active models to insert
144 ///
145 /// # Returns
146 /// - `Ok(InsertResult)` if the insert operation succeeds
147 /// - `Err(DbErr)` if the database operation fails
148 async fn insert_batch_async(&self, active_models: Vec<O>) -> Result<InsertResult<O>, DbErr> {
149 <O as ActiveModelTrait>::Entity::insert_many(active_models)
150 .exec(self.connection)
151 .await
152 }
153
154 /// Performs a batch insert operation.
155 ///
156 /// This method handles the conversion between sync and async contexts
157 /// using tokio's block_in_place to avoid blocking the async runtime.
158 ///
159 /// # Parameters
160 /// - `active_models`: Vector of active models to insert
161 ///
162 /// # Returns
163 /// - `Ok(())` if the insert operation succeeds
164 /// - `Err(BatchError)` if the operation fails
165 fn insert_batch(&self, active_models: Vec<O>) -> Result<(), BatchError> {
166 // Use tokio's block_in_place to handle async operations in sync context
167 // This is the same pattern used in the ORM reader
168 let result = tokio::task::block_in_place(|| {
169 tokio::runtime::Handle::current()
170 .block_on(async { self.insert_batch_async(active_models).await })
171 });
172
173 match result {
174 Ok(_insert_result) => {
175 log::debug!("Successfully inserted batch to database");
176 Ok(())
177 }
178 Err(db_err) => {
179 let error_msg = format!("Failed to insert batch to database: {}", db_err);
180 log::error!("{}", error_msg);
181 Err(BatchError::ItemWriter(error_msg))
182 }
183 }
184 }
185}
186
187impl<O> ItemWriter<O> for OrmItemWriter<'_, O>
188where
189 O: ActiveModelTrait + Send,
190{
191 /// Writes ORM active models directly to the database.
192 ///
193 /// This method performs batch insert operations for efficiency, writing all
194 /// active models in a single database operation when possible.
195 ///
196 /// # Process Flow
197 ///
198 /// 1. **Validation**: Check if there are items to write
199 /// 2. **Batch Insert**: Perform a single batch insert operation
200 /// 3. **Error Handling**: Convert any database errors to BatchError
201 ///
202 /// # Parameters
203 /// - `items`: A slice of ORM active models to write to the database
204 ///
205 /// # Returns
206 /// - `Ok(())` if all items are successfully written
207 /// - `Err(BatchError)` if any error occurs during the process
208 ///
209 /// # Database Operations
210 ///
211 /// The method uses ORM's `insert_many()` function, which:
212 /// - Generates a single INSERT statement with multiple VALUE clauses
213 /// - Minimizes database round trips for better performance
214 /// - Maintains transactional consistency for the entire batch
215 /// - Returns the number of affected rows
216 ///
217 /// # Error Handling
218 ///
219 /// Errors can occur during database operations such as:
220 /// - Constraint violations (unique, foreign key, etc.)
221 /// - Connection failures
222 /// - Invalid data types or values
223 ///
224 /// All errors are converted to `BatchError::ItemWriter` with descriptive messages.
225 ///
226 /// # Examples
227 ///
228 /// ```
229 /// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
230 /// use spring_batch_rs::core::item::ItemWriter;
231 /// use sea_orm::{Database, ActiveValue::Set};
232 ///
233 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
234 /// let db = Database::connect("sqlite::memory:").await?;
235 ///
236 /// // let writer: OrmItemWriter<user::ActiveModel> = OrmItemWriterBuilder::new()
237 /// // .connection(&db)
238 /// // .build();
239 ///
240 /// // Write ORM active models directly
241 /// // let active_models = vec![
242 /// // user::ActiveModel {
243 /// // name: Set("Alice".to_string()),
244 /// // email: Set("alice@example.com".to_string()),
245 /// // ..Default::default()
246 /// // },
247 /// // user::ActiveModel {
248 /// // name: Set("Bob".to_string()),
249 /// // email: Set("bob@example.com".to_string()),
250 /// // ..Default::default()
251 /// // },
252 /// // ];
253 /// // writer.write(&active_models)?;
254 /// # Ok(())
255 /// # }
256 /// ```
257 fn write(&self, items: &[O]) -> ItemWriterResult {
258 log::debug!("Writing {} active models to database", items.len());
259
260 if items.is_empty() {
261 log::debug!("No items to write, skipping database operation");
262 return Ok(());
263 }
264
265 // Clone all active models for the batch insert
266 let active_models: Vec<O> = items.to_vec();
267
268 // Perform batch insert
269 self.insert_batch(active_models)?;
270
271 log::info!(
272 "Successfully wrote {} active models to database",
273 items.len()
274 );
275 Ok(())
276 }
277
278 /// Flushes any pending operations.
279 ///
280 /// For the ORM writer, this is a no-op since each write operation
281 /// immediately commits to the database. There are no pending operations
282 /// to flush.
283 ///
284 /// # Returns
285 /// Always returns `Ok(())`
286 fn flush(&self) -> ItemWriterResult {
287 log::debug!("Flush called on ORM writer (no-op)");
288 Ok(())
289 }
290
291 /// Opens the writer for writing.
292 ///
293 /// For the ORM writer, this is a no-op since ORM manages
294 /// database connections internally and no special initialization is required.
295 ///
296 /// # Returns
297 /// Always returns `Ok(())`
298 fn open(&self) -> ItemWriterResult {
299 log::debug!("Opened ORM writer");
300 Ok(())
301 }
302
303 /// Closes the writer and releases any resources.
304 ///
305 /// For the ORM writer, this is a no-op since ORM manages
306 /// database connections internally and no special cleanup is required.
307 ///
308 /// # Returns
309 /// Always returns `Ok(())`
310 fn close(&self) -> ItemWriterResult {
311 log::debug!("Closed ORM writer");
312 Ok(())
313 }
314}
315
316/// A builder for creating ORM item writers.
317///
318/// This builder allows you to configure an ORM item writer with the necessary
319/// database connection. Since the writer now works directly with ORM active models,
320/// no mapper configuration is required.
321///
322/// # Design Pattern
323///
324/// This struct implements the Builder pattern, which allows for fluent, chainable
325/// configuration of an `OrmItemWriter` before creation. The simplified design
326/// requires only a database connection and infers the Entity type from the
327/// ActiveModel's associated type.
328///
329/// # Required Configuration
330///
331/// The following parameter is required and must be set before calling `build()`:
332/// - **Connection**: Database connection reference
333///
334/// # Examples
335///
336/// ```
337/// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
338/// use sea_orm::Database;
339///
340/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341/// let db = Database::connect("sqlite::memory:").await?;
342///
343/// // Only need to specify the ActiveModel type - Entity is inferred!
344/// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
345/// // .connection(&db);
346/// # Ok(())
347/// # }
348/// ```
349#[derive(Default)]
350pub struct OrmItemWriterBuilder<'a, O>
351where
352 O: ActiveModelTrait + Send,
353{
354 /// Database connection - None until set by the user
355 /// This will be validated as required during build()
356 connection: Option<&'a DatabaseConnection>,
357 /// Phantom data to track the active model type
358 _phantom: PhantomData<O>,
359}
360
361impl<'a, O> OrmItemWriterBuilder<'a, O>
362where
363 O: ActiveModelTrait + Send,
364{
365 /// Creates a new ORM item writer builder.
366 ///
367 /// All configuration options start as None and must be set before calling `build()`.
368 ///
369 /// # Returns
370 /// A new builder instance with default configuration
371 ///
372 /// # Examples
373 ///
374 /// ```
375 /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
376 ///
377 /// // Create a new builder (only need ActiveModel type!)
378 /// // let builder = OrmItemWriterBuilder::<MyActiveModel>::new();
379 /// ```
380 pub fn new() -> Self {
381 Self {
382 connection: None,
383 _phantom: PhantomData,
384 }
385 }
386
387 /// Sets the database connection for the item writer.
388 ///
389 /// This parameter is **required**. The builder will panic during `build()`
390 /// if this parameter is not set.
391 ///
392 /// # Parameters
393 /// - `connection`: Reference to the ORM database connection
394 ///
395 /// # Returns
396 /// The updated builder instance for method chaining
397 ///
398 /// # Connection Lifecycle
399 ///
400 /// The connection reference must remain valid for the entire lifetime of the
401 /// resulting writer. The writer does not take ownership of the connection,
402 /// allowing it to be shared across multiple components.
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
408 /// use sea_orm::Database;
409 ///
410 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411 /// let db = Database::connect("sqlite::memory:").await?;
412 ///
413 /// // Much cleaner with single generic parameter!
414 /// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
415 /// // .connection(&db);
416 /// # Ok(())
417 /// # }
418 /// ```
419 pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
420 self.connection = Some(connection);
421 self
422 }
423
424 /// Builds the ORM item writer with the configured parameters.
425 ///
426 /// This method validates that all required parameters have been set and creates
427 /// a new `OrmItemWriter` instance.
428 ///
429 /// # Returns
430 /// A configured `OrmItemWriter` instance
431 ///
432 /// # Panics
433 /// Panics if the required database connection parameter is missing.
434 ///
435 /// # Validation
436 ///
437 /// The builder performs the following validation:
438 /// - Ensures a database connection has been provided
439 ///
440 /// If any validation fails, the method will panic with a descriptive error message.
441 ///
442 /// # Examples
443 ///
444 /// ```
445 /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
446 /// use sea_orm::Database;
447 ///
448 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
449 /// let db = Database::connect("sqlite::memory:").await?;
450 ///
451 /// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
452 /// // .connection(&db)
453 /// // .build();
454 /// # Ok(())
455 /// # }
456 /// ```
457 pub fn build(self) -> OrmItemWriter<'a, O> {
458 let connection = self
459 .connection
460 .expect("Database connection is required. Call .connection() before .build()");
461
462 OrmItemWriter::new(connection)
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use sea_orm::{
470 ActiveValue::{NotSet, Set},
471 entity::prelude::*,
472 };
473
474 // Mock entity and active model for testing trait bounds
475 #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
476 #[sea_orm(table_name = "test_entity")]
477 pub struct Model {
478 #[sea_orm(primary_key)]
479 pub id: i32,
480 pub name: String,
481 }
482
483 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
484 pub enum Relation {}
485
486 impl ActiveModelBehavior for ActiveModel {}
487
488 #[test]
489 fn test_simplified_trait_bounds_compilation() {
490 // This test verifies that our simplified trait bounds compile correctly
491 // with only one generic parameter (ActiveModel)
492 // If this compiles, it means:
493 // 1. ActiveModelTrait + Send is sufficient
494 // 2. Entity type can be inferred from <A as ActiveModelTrait>::Entity
495
496 // Test that we can specify trait bounds with just ActiveModel
497 fn _verify_bounds<A>()
498 where
499 A: ActiveModelTrait + Send,
500 {
501 // This function will only compile if our trait bounds are sufficient
502 // for ORM operations
503 }
504
505 // Verify that our actual types satisfy the bounds
506 _verify_bounds::<ActiveModel>();
507
508 // Test that we can create a builder with just ActiveModel
509 let _builder = OrmItemWriterBuilder::<ActiveModel>::new();
510
511 // Test that the builder has the correct type signature
512 assert!(_builder.connection.is_none());
513 }
514
515 #[test]
516 fn test_active_model_creation() {
517 // Test that we can create active models that satisfy our trait bounds
518 let active_model = ActiveModel {
519 id: NotSet,
520 name: Set("Test".to_owned()),
521 };
522
523 // Verify that ActiveModel implements the required traits
524 fn check_traits<A>(_: A)
525 where
526 A: ActiveModelTrait + Send,
527 {
528 // This function will only compile if A satisfies our trait bounds
529 }
530
531 check_traits(active_model);
532 }
533
534 #[test]
535 fn test_write_empty_slice_skips_database_operation() {
536 use sea_orm::{DatabaseBackend, MockDatabase};
537
538 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
539 let writer = OrmItemWriter::<ActiveModel>::new(&db);
540
541 assert!(writer.open().is_ok());
542 assert!(writer.flush().is_ok());
543 assert!(writer.write(&[]).is_ok());
544 assert!(writer.close().is_ok());
545 }
546
547 #[test]
548 fn should_build_writer_via_builder_with_connection() {
549 use sea_orm::{DatabaseBackend, MockDatabase};
550
551 let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
552 let writer = OrmItemWriterBuilder::<ActiveModel>::new()
553 .connection(&db)
554 .build();
555
556 // writer is created successfully — verify open/flush/close are no-ops
557 assert!(writer.open().is_ok());
558 assert!(writer.flush().is_ok());
559 assert!(writer.close().is_ok());
560 }
561
562 #[tokio::test(flavor = "multi_thread")]
563 async fn should_write_active_models_to_mock_database() {
564 use sea_orm::{DatabaseBackend, MockDatabase, MockExecResult};
565
566 let db = MockDatabase::new(DatabaseBackend::Sqlite)
567 .append_exec_results([MockExecResult {
568 last_insert_id: 1,
569 rows_affected: 1,
570 }])
571 .into_connection();
572
573 let writer = OrmItemWriter::<ActiveModel>::new(&db);
574 let items = vec![ActiveModel {
575 id: NotSet,
576 name: Set("Alice".to_owned()),
577 }];
578
579 let result = writer.write(&items);
580 assert!(
581 result.is_ok(),
582 "write should succeed with mock DB: {result:?}"
583 );
584 }
585
586 #[tokio::test(flavor = "multi_thread")]
587 async fn should_return_error_when_database_insert_fails() {
588 use crate::BatchError;
589 use sea_orm::{DatabaseBackend, DbErr, MockDatabase};
590
591 // MockDatabase with no exec results → any insert returns an error
592 let db = MockDatabase::new(DatabaseBackend::Sqlite)
593 .append_exec_errors([DbErr::Custom("insert failed".to_owned())])
594 .into_connection();
595
596 let writer = OrmItemWriter::<ActiveModel>::new(&db);
597 let items = vec![ActiveModel {
598 id: NotSet,
599 name: Set("Fail".to_owned()),
600 }];
601
602 let result = writer.write(&items);
603 assert!(
604 result.is_err(),
605 "write should fail when database returns error"
606 );
607 match result {
608 Err(BatchError::ItemWriter(msg)) => {
609 assert!(msg.contains("Failed to insert"), "unexpected error: {msg}")
610 }
611 other => panic!("expected ItemWriter error, got {other:?}"),
612 }
613 }
614}