spring-batch-rs 0.3.4

A toolkit for building enterprise-grade batch applications
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr, EntityTrait, InsertResult};
use std::marker::PhantomData;

use crate::{
    BatchError,
    core::item::{ItemWriter, ItemWriterResult},
};

/// A writer for writing ORM active models directly to a database.
///
/// This writer provides an implementation of the `ItemWriter` trait for ORM-based
/// database operations. It works directly with ORM active models, eliminating the
/// need for mapper layers and providing a simple, efficient interface for batch
/// database operations.
///
/// # Design Philosophy
///
/// The writer follows the "direct entity" approach used throughout the Spring Batch RS
/// ORM integration:
/// - **No Mappers**: Works directly with ORM active models, no transformation layer
/// - **Type Safety**: Leverages ORM's compile-time type safety
/// - **Efficiency**: Direct operations without intermediate conversions
/// - **Simplicity**: Clean API with minimal configuration required
///
/// # Trait Bounds Design
///
/// The writer requires `A: ActiveModelTrait + Send` where:
/// - `ActiveModelTrait` provides the associated Entity type and database operations
/// - `Send` enables safe transfer across async boundaries
/// - The Entity type is automatically inferred from `<A as ActiveModelTrait>::Entity`
///
/// Note: `IntoActiveModel<A>` is automatically provided by SeaORM's blanket implementation
/// for all types that implement `ActiveModelTrait`, so it's not explicitly required.
///
/// # Usage Pattern
///
/// Users should convert their business objects to ORM active models before writing,
/// either manually or using processors in the batch pipeline. This approach provides
/// maximum flexibility and performance.
///
/// # Database Operations
///
/// The writer uses ORM's built-in batch insert capabilities:
/// - **Connection Management**: Uses ORM's connection management for database operations
/// - **Batch Operations**: Performs batch inserts to minimize database round trips
/// - **Transaction Support**: Leverages ORM's transaction handling for consistency
/// - **Type Safety**: Leverages ORM's type-safe active model operations
///
/// # Thread Safety
///
/// This writer is **not thread-safe** as it's designed for single-threaded batch processing
/// scenarios. If you need concurrent access, consider using multiple writer instances.
///
/// # Database Support
///
/// This writer supports all databases that SeaORM supports:
/// - PostgreSQL
/// - MySQL
/// - SQLite
/// - SQL Server (limited support)
///
/// # Examples
///
/// ```
/// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
/// use spring_batch_rs::core::item::ItemWriter;
/// use sea_orm::{Database, ActiveValue::Set};
/// use serde::{Deserialize, Serialize};
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// // Create database connection
/// let db = Database::connect("sqlite::memory:").await?;
///
/// // Create the writer with single type parameter (just the ActiveModel)
/// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
/// //     .connection(&db)
/// //     .build();
///
/// // Work directly with ORM active models
/// // let active_models = vec![
/// //     product::ActiveModel {
/// //         name: Set("Laptop".to_string()),
/// //         category: Set("Electronics".to_string()),
/// //         price: Set(999.99),
/// //         in_stock: Set(true),
/// //         ..Default::default()
/// //     },
/// // ];
/// // writer.write(&active_models)?;
/// # Ok(())
/// # }
/// ```
pub struct OrmItemWriter<'a, O>
where
    O: ActiveModelTrait + Send,
{
    /// Database connection reference
    /// This ensures the connection remains valid throughout the writer's lifecycle
    connection: &'a DatabaseConnection,
    /// Phantom data to track the active model type
    _phantom: PhantomData<O>,
}

impl<'a, O> OrmItemWriter<'a, O>
where
    O: ActiveModelTrait + Send,
{
    /// Creates a new ORM item writer.
    ///
    /// # Parameters
    /// - `connection`: Database connection reference
    ///
    /// # Returns
    /// A new ORM item writer instance
    ///
    /// # Examples
    ///
    /// ```
    /// use spring_batch_rs::item::orm::OrmItemWriter;
    /// use sea_orm::Database;
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let db = Database::connect("sqlite::memory:").await?;
    ///
    /// // Create writer for your ORM active model type (Entity is inferred)
    /// // let writer = OrmItemWriter::<product::ActiveModel>::new(&db);
    /// # Ok(())
    /// # }
    /// ```
    pub fn new(connection: &'a DatabaseConnection) -> Self {
        Self {
            connection,
            _phantom: PhantomData,
        }
    }

    /// Performs the actual database insert operation asynchronously.
    ///
    /// This method converts the runtime to handle async operations within
    /// the synchronous ItemWriter interface.
    ///
    /// # Parameters
    /// - `active_models`: Vector of active models to insert
    ///
    /// # Returns
    /// - `Ok(InsertResult)` if the insert operation succeeds
    /// - `Err(DbErr)` if the database operation fails
    async fn insert_batch_async(&self, active_models: Vec<O>) -> Result<InsertResult<O>, DbErr> {
        <O as ActiveModelTrait>::Entity::insert_many(active_models)
            .exec(self.connection)
            .await
    }

    /// Performs a batch insert operation.
    ///
    /// This method handles the conversion between sync and async contexts
    /// using tokio's block_in_place to avoid blocking the async runtime.
    ///
    /// # Parameters
    /// - `active_models`: Vector of active models to insert
    ///
    /// # Returns
    /// - `Ok(())` if the insert operation succeeds
    /// - `Err(BatchError)` if the operation fails
    fn insert_batch(&self, active_models: Vec<O>) -> Result<(), BatchError> {
        // Use tokio's block_in_place to handle async operations in sync context
        // This is the same pattern used in the ORM reader
        let result = tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current()
                .block_on(async { self.insert_batch_async(active_models).await })
        });

        match result {
            Ok(_insert_result) => {
                log::debug!("Successfully inserted batch to database");
                Ok(())
            }
            Err(db_err) => {
                let error_msg = format!("Failed to insert batch to database: {}", db_err);
                log::error!("{}", error_msg);
                Err(BatchError::ItemWriter(error_msg))
            }
        }
    }
}

impl<O> ItemWriter<O> for OrmItemWriter<'_, O>
where
    O: ActiveModelTrait + Send,
{
    /// Writes ORM active models directly to the database.
    ///
    /// This method performs batch insert operations for efficiency, writing all
    /// active models in a single database operation when possible.
    ///
    /// # Process Flow
    ///
    /// 1. **Validation**: Check if there are items to write
    /// 2. **Batch Insert**: Perform a single batch insert operation
    /// 3. **Error Handling**: Convert any database errors to BatchError
    ///
    /// # Parameters
    /// - `items`: A slice of ORM active models to write to the database
    ///
    /// # Returns
    /// - `Ok(())` if all items are successfully written
    /// - `Err(BatchError)` if any error occurs during the process
    ///
    /// # Database Operations
    ///
    /// The method uses ORM's `insert_many()` function, which:
    /// - Generates a single INSERT statement with multiple VALUE clauses
    /// - Minimizes database round trips for better performance
    /// - Maintains transactional consistency for the entire batch
    /// - Returns the number of affected rows
    ///
    /// # Error Handling
    ///
    /// Errors can occur during database operations such as:
    /// - Constraint violations (unique, foreign key, etc.)
    /// - Connection failures
    /// - Invalid data types or values
    ///
    /// All errors are converted to `BatchError::ItemWriter` with descriptive messages.
    ///
    /// # Examples
    ///
    /// ```
    /// use spring_batch_rs::item::orm::{OrmItemWriter, OrmItemWriterBuilder};
    /// use spring_batch_rs::core::item::ItemWriter;
    /// use sea_orm::{Database, ActiveValue::Set};
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let db = Database::connect("sqlite::memory:").await?;
    ///
    /// // let writer: OrmItemWriter<user::ActiveModel> = OrmItemWriterBuilder::new()
    /// //     .connection(&db)
    /// //     .build();
    ///
    /// // Write ORM active models directly
    /// // let active_models = vec![
    /// //     user::ActiveModel {
    /// //         name: Set("Alice".to_string()),
    /// //         email: Set("alice@example.com".to_string()),
    /// //         ..Default::default()
    /// //     },
    /// //     user::ActiveModel {
    /// //         name: Set("Bob".to_string()),
    /// //         email: Set("bob@example.com".to_string()),
    /// //         ..Default::default()
    /// //     },
    /// // ];
    /// // writer.write(&active_models)?;
    /// # Ok(())
    /// # }
    /// ```
    fn write(&self, items: &[O]) -> ItemWriterResult {
        log::debug!("Writing {} active models to database", items.len());

        if items.is_empty() {
            log::debug!("No items to write, skipping database operation");
            return Ok(());
        }

        // Clone all active models for the batch insert
        let active_models: Vec<O> = items.to_vec();

        // Perform batch insert
        self.insert_batch(active_models)?;

        log::info!(
            "Successfully wrote {} active models to database",
            items.len()
        );
        Ok(())
    }

    /// Flushes any pending operations.
    ///
    /// For the ORM writer, this is a no-op since each write operation
    /// immediately commits to the database. There are no pending operations
    /// to flush.
    ///
    /// # Returns
    /// Always returns `Ok(())`
    fn flush(&self) -> ItemWriterResult {
        log::debug!("Flush called on ORM writer (no-op)");
        Ok(())
    }

    /// Opens the writer for writing.
    ///
    /// For the ORM writer, this is a no-op since ORM manages
    /// database connections internally and no special initialization is required.
    ///
    /// # Returns
    /// Always returns `Ok(())`
    fn open(&self) -> ItemWriterResult {
        log::debug!("Opened ORM writer");
        Ok(())
    }

    /// Closes the writer and releases any resources.
    ///
    /// For the ORM writer, this is a no-op since ORM manages
    /// database connections internally and no special cleanup is required.
    ///
    /// # Returns
    /// Always returns `Ok(())`
    fn close(&self) -> ItemWriterResult {
        log::debug!("Closed ORM writer");
        Ok(())
    }
}

/// A builder for creating ORM item writers.
///
/// This builder allows you to configure an ORM item writer with the necessary
/// database connection. Since the writer now works directly with ORM active models,
/// no mapper configuration is required.
///
/// # Design Pattern
///
/// This struct implements the Builder pattern, which allows for fluent, chainable
/// configuration of an `OrmItemWriter` before creation. The simplified design
/// requires only a database connection and infers the Entity type from the
/// ActiveModel's associated type.
///
/// # Required Configuration
///
/// The following parameter is required and must be set before calling `build()`:
/// - **Connection**: Database connection reference
///
/// # Examples
///
/// ```
/// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
/// use sea_orm::Database;
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let db = Database::connect("sqlite::memory:").await?;
///
/// // Only need to specify the ActiveModel type - Entity is inferred!
/// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
/// //     .connection(&db);
/// # Ok(())
/// # }
/// ```
#[derive(Default)]
pub struct OrmItemWriterBuilder<'a, O>
where
    O: ActiveModelTrait + Send,
{
    /// Database connection - None until set by the user
    /// This will be validated as required during build()
    connection: Option<&'a DatabaseConnection>,
    /// Phantom data to track the active model type
    _phantom: PhantomData<O>,
}

impl<'a, O> OrmItemWriterBuilder<'a, O>
where
    O: ActiveModelTrait + Send,
{
    /// Creates a new ORM item writer builder.
    ///
    /// All configuration options start as None and must be set before calling `build()`.
    ///
    /// # Returns
    /// A new builder instance with default configuration
    ///
    /// # Examples
    ///
    /// ```
    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
    ///
    /// // Create a new builder (only need ActiveModel type!)
    /// // let builder = OrmItemWriterBuilder::<MyActiveModel>::new();
    /// ```
    pub fn new() -> Self {
        Self {
            connection: None,
            _phantom: PhantomData,
        }
    }

    /// Sets the database connection for the item writer.
    ///
    /// This parameter is **required**. The builder will panic during `build()`
    /// if this parameter is not set.
    ///
    /// # Parameters
    /// - `connection`: Reference to the ORM database connection
    ///
    /// # Returns
    /// The updated builder instance for method chaining
    ///
    /// # Connection Lifecycle
    ///
    /// The connection reference must remain valid for the entire lifetime of the
    /// resulting writer. The writer does not take ownership of the connection,
    /// allowing it to be shared across multiple components.
    ///
    /// # Examples
    ///
    /// ```
    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
    /// use sea_orm::Database;
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let db = Database::connect("sqlite::memory:").await?;
    ///
    /// // Much cleaner with single generic parameter!
    /// // let builder: OrmItemWriterBuilder<product::ActiveModel> = OrmItemWriterBuilder::new()
    /// //     .connection(&db);
    /// # Ok(())
    /// # }
    /// ```
    pub fn connection(mut self, connection: &'a DatabaseConnection) -> Self {
        self.connection = Some(connection);
        self
    }

    /// Builds the ORM item writer with the configured parameters.
    ///
    /// This method validates that all required parameters have been set and creates
    /// a new `OrmItemWriter` instance.
    ///
    /// # Returns
    /// A configured `OrmItemWriter` instance
    ///
    /// # Panics
    /// Panics if the required database connection parameter is missing.
    ///
    /// # Validation
    ///
    /// The builder performs the following validation:
    /// - Ensures a database connection has been provided
    ///
    /// If any validation fails, the method will panic with a descriptive error message.
    ///
    /// # Examples
    ///
    /// ```
    /// use spring_batch_rs::item::orm::OrmItemWriterBuilder;
    /// use sea_orm::Database;
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let db = Database::connect("sqlite::memory:").await?;
    ///
    /// // let writer: OrmItemWriter<product::ActiveModel> = OrmItemWriterBuilder::new()
    /// //     .connection(&db)
    /// //     .build();
    /// # Ok(())
    /// # }
    /// ```
    pub fn build(self) -> OrmItemWriter<'a, O> {
        let connection = self
            .connection
            .expect("Database connection is required. Call .connection() before .build()");

        OrmItemWriter::new(connection)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use sea_orm::{
        ActiveValue::{NotSet, Set},
        entity::prelude::*,
    };

    // Mock entity and active model for testing trait bounds
    #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
    #[sea_orm(table_name = "test_entity")]
    pub struct Model {
        #[sea_orm(primary_key)]
        pub id: i32,
        pub name: String,
    }

    #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
    pub enum Relation {}

    impl ActiveModelBehavior for ActiveModel {}

    #[test]
    fn test_simplified_trait_bounds_compilation() {
        // This test verifies that our simplified trait bounds compile correctly
        // with only one generic parameter (ActiveModel)
        // If this compiles, it means:
        // 1. ActiveModelTrait + Send is sufficient
        // 2. Entity type can be inferred from <A as ActiveModelTrait>::Entity

        // Test that we can specify trait bounds with just ActiveModel
        fn _verify_bounds<A>()
        where
            A: ActiveModelTrait + Send,
        {
            // This function will only compile if our trait bounds are sufficient
            // for ORM operations
        }

        // Verify that our actual types satisfy the bounds
        _verify_bounds::<ActiveModel>();

        // Test that we can create a builder with just ActiveModel
        let _builder = OrmItemWriterBuilder::<ActiveModel>::new();

        // Test that the builder has the correct type signature
        assert!(_builder.connection.is_none());
    }

    #[test]
    fn test_active_model_creation() {
        // Test that we can create active models that satisfy our trait bounds
        let active_model = ActiveModel {
            id: NotSet,
            name: Set("Test".to_owned()),
        };

        // Verify that ActiveModel implements the required traits
        fn check_traits<A>(_: A)
        where
            A: ActiveModelTrait + Send,
        {
            // This function will only compile if A satisfies our trait bounds
        }

        check_traits(active_model);
    }

    #[test]
    fn test_write_empty_slice_skips_database_operation() {
        use sea_orm::{DatabaseBackend, MockDatabase};

        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
        let writer = OrmItemWriter::<ActiveModel>::new(&db);

        assert!(writer.open().is_ok());
        assert!(writer.flush().is_ok());
        assert!(writer.write(&[]).is_ok());
        assert!(writer.close().is_ok());
    }

    #[test]
    fn should_build_writer_via_builder_with_connection() {
        use sea_orm::{DatabaseBackend, MockDatabase};

        let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
        let writer = OrmItemWriterBuilder::<ActiveModel>::new()
            .connection(&db)
            .build();

        // writer is created successfully — verify open/flush/close are no-ops
        assert!(writer.open().is_ok());
        assert!(writer.flush().is_ok());
        assert!(writer.close().is_ok());
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn should_write_active_models_to_mock_database() {
        use sea_orm::{DatabaseBackend, MockDatabase, MockExecResult};

        let db = MockDatabase::new(DatabaseBackend::Sqlite)
            .append_exec_results([MockExecResult {
                last_insert_id: 1,
                rows_affected: 1,
            }])
            .into_connection();

        let writer = OrmItemWriter::<ActiveModel>::new(&db);
        let items = vec![ActiveModel {
            id: NotSet,
            name: Set("Alice".to_owned()),
        }];

        let result = writer.write(&items);
        assert!(
            result.is_ok(),
            "write should succeed with mock DB: {result:?}"
        );
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn should_return_error_when_database_insert_fails() {
        use crate::BatchError;
        use sea_orm::{DatabaseBackend, DbErr, MockDatabase};

        // MockDatabase with no exec results → any insert returns an error
        let db = MockDatabase::new(DatabaseBackend::Sqlite)
            .append_exec_errors([DbErr::Custom("insert failed".to_owned())])
            .into_connection();

        let writer = OrmItemWriter::<ActiveModel>::new(&db);
        let items = vec![ActiveModel {
            id: NotSet,
            name: Set("Fail".to_owned()),
        }];

        let result = writer.write(&items);
        assert!(
            result.is_err(),
            "write should fail when database returns error"
        );
        match result {
            Err(BatchError::ItemWriter(msg)) => {
                assert!(msg.contains("Failed to insert"), "unexpected error: {msg}")
            }
            other => panic!("expected ItemWriter error, got {other:?}"),
        }
    }
}