Skip to main content

spring_batch_rs/core/
step.rs

1//! # Step Module
2//!
3//! This module provides the core step execution functionality for the Spring Batch framework.
4//! A step represents a single phase of a batch job that processes data in chunks or executes
5//! a single task (tasklet).
6//!
7//! ## Overview
8//!
9//! The step module supports two main execution patterns:
10//!
11//! ### Chunk-Oriented Processing
12//! Processes data in configurable chunks using the read-process-write pattern:
13//! - **Reader**: Reads items from a data source
14//! - **Processor**: Transforms items (optional)
15//! - **Writer**: Writes processed items to a destination
16//!
17//! ### Tasklet Processing
18//! Executes a single task or operation that doesn't follow the chunk pattern.
19//!
20//! ## Key Features
21//!
22//! - **Error Handling**: Configurable skip limits for fault tolerance
23//! - **Metrics Tracking**: Comprehensive execution statistics
24//! - **Lifecycle Management**: Proper resource management with open/close operations
25//! - **Builder Pattern**: Fluent API for step configuration
26//!
27//! ## Examples
28//!
29//! ### Basic Chunk-Oriented Step
30//!
31//! ```rust
32//! use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
33//! use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
34//! use spring_batch_rs::BatchError;
35//!
36//! // Implement your reader, processor, and writer
37//! # struct MyReader;
38//! # impl ItemReader<String> for MyReader {
39//! #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
40//! # }
41//! # struct MyProcessor;
42//! # impl ItemProcessor<String, String> for MyProcessor {
43//! #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.clone()) }
44//! # }
45//! # struct MyWriter;
46//! # impl ItemWriter<String> for MyWriter {
47//! #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
48//! #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
49//! #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
50//! #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
51//! # }
52//!
53//! let reader = MyReader;
54//! let processor = MyProcessor;
55//! let writer = MyWriter;
56//!
57//! let step = StepBuilder::new("my-step")
58//!     .chunk(100)                    // Process 100 items per chunk
59//!     .reader(&reader)
60//!     .processor(&processor)
61//!     .writer(&writer)
62//!     .skip_limit(10)               // Allow up to 10 errors
63//!     .build();
64//!
65//! let mut step_execution = StepExecution::new(step.get_name());
66//! let result = step.execute(&mut step_execution);
67//! ```
68//!
69//! ### Tasklet Step
70//!
71//! ```rust
72//! use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Step, Tasklet};
73//! use spring_batch_rs::BatchError;
74//!
75//! # struct MyTasklet;
76//! # impl Tasklet for MyTasklet {
77//! #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
78//! #         Ok(RepeatStatus::Finished)
79//! #     }
80//! # }
81//!
82//! let tasklet = MyTasklet;
83//!
84//! let step = StepBuilder::new("my-tasklet-step")
85//!     .tasklet(&tasklet)
86//!     .build();
87//!
88//! let mut step_execution = StepExecution::new(step.get_name());
89//! let result = step.execute(&mut step_execution);
90//! ```
91
92use crate::BatchError;
93use log::{debug, error, info, warn};
94use std::time::{Duration, Instant};
95use uuid::Uuid;
96
97use super::item::{ItemProcessor, ItemReader, ItemWriter};
98
99/// A tasklet represents a single task or operation that can be executed as part of a step.
100///
101/// Tasklets are useful for operations that don't fit the chunk-oriented processing model,
102/// such as file operations, database maintenance, or custom business logic.
103///
104/// # Examples
105///
106/// ```rust
107/// use spring_batch_rs::core::step::{StepExecution, RepeatStatus};
108/// use spring_batch_rs::BatchError;
109///
110/// use spring_batch_rs::core::step::Tasklet;
111///
112/// struct FileCleanupTasklet {
113///     directory: String,
114/// }
115///
116/// impl Tasklet for FileCleanupTasklet {
117///     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
118///         // Perform file cleanup logic here
119///         println!("Cleaning up directory: {}", self.directory);
120///         Ok(RepeatStatus::Finished)
121///     }
122/// }
123/// ```
124pub trait Tasklet {
125    /// Executes the tasklet operation.
126    ///
127    /// # Parameters
128    /// - `step_execution`: The current step execution context for accessing metrics and state
129    ///
130    /// # Returns
131    /// - `Ok(RepeatStatus)`: The tasklet completed successfully
132    /// - `Err(BatchError)`: An error occurred during execution
133    fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
134}
135
136/// A step implementation that executes a single tasklet.
137///
138/// TaskletStep is used for operations that don't follow the chunk-oriented processing pattern.
139/// It executes a single tasklet and manages the step lifecycle.
140///
141/// # Examples
142///
143/// ```rust
144/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
145/// use spring_batch_rs::BatchError;
146///
147/// # struct MyTasklet;
148/// # impl Tasklet for MyTasklet {
149/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
150/// #         Ok(RepeatStatus::Finished)
151/// #     }
152/// # }
153/// let tasklet = MyTasklet;
154/// let step = StepBuilder::new("tasklet-step")
155///     .tasklet(&tasklet)
156///     .build();
157/// ```
158pub struct TaskletStep<'a> {
159    name: String,
160    tasklet: &'a dyn Tasklet,
161}
162
163impl Step for TaskletStep<'_> {
164    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
165        step_execution.status = StepStatus::Started;
166        let start_time = Instant::now();
167
168        info!(
169            "Start of step: {}, id: {}",
170            step_execution.name, step_execution.id
171        );
172
173        loop {
174            let result = self.tasklet.execute(step_execution);
175            match result {
176                Ok(RepeatStatus::Continuable) => {}
177                Ok(RepeatStatus::Finished) => {
178                    step_execution.status = StepStatus::Success;
179                    break;
180                }
181                Err(e) => {
182                    error!(
183                        "Error in step: {}, id: {}, error: {}",
184                        step_execution.name, step_execution.id, e
185                    );
186                    step_execution.status = StepStatus::Failed;
187                    step_execution.end_time = Some(Instant::now());
188                    step_execution.duration = Some(start_time.elapsed());
189                    return Err(e);
190                }
191            }
192        }
193
194        // Calculate the step execution details
195        step_execution.start_time = Some(start_time);
196        step_execution.end_time = Some(Instant::now());
197        step_execution.duration = Some(start_time.elapsed());
198
199        Ok(())
200    }
201
202    fn get_name(&self) -> &str {
203        &self.name
204    }
205}
206
207/// Builder for creating TaskletStep instances.
208///
209/// Provides a fluent API for configuring tasklet steps with validation
210/// to ensure all required components are provided.
211///
212/// # Examples
213///
214/// ```rust
215/// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
216/// use spring_batch_rs::BatchError;
217///
218/// # struct MyTasklet;
219/// # impl Tasklet for MyTasklet {
220/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
221/// #         Ok(RepeatStatus::Finished)
222/// #     }
223/// # }
224///
225/// let tasklet = MyTasklet;
226/// let builder = TaskletBuilder::new("my-tasklet")
227///     .tasklet(&tasklet);
228/// let step = builder.build();
229/// ```
230pub struct TaskletBuilder<'a> {
231    name: String,
232    tasklet: Option<&'a dyn Tasklet>,
233}
234
235impl<'a> TaskletBuilder<'a> {
236    /// Creates a new TaskletBuilder with the specified name.
237    ///
238    /// # Parameters
239    /// - `name`: Human-readable name for the step
240    ///
241    /// # Examples
242    ///
243    /// ```rust
244    /// use spring_batch_rs::core::step::TaskletBuilder;
245    ///
246    /// let builder = TaskletBuilder::new("file-cleanup-step");
247    /// ```
248    pub fn new(name: &str) -> Self {
249        Self {
250            name: name.to_string(),
251            tasklet: None,
252        }
253    }
254
255    /// Sets the tasklet to be executed by this step.
256    ///
257    /// # Parameters
258    /// - `tasklet`: The tasklet implementation to execute
259    ///
260    /// # Examples
261    ///
262    /// ```rust
263    /// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
264    /// use spring_batch_rs::BatchError;
265    ///
266    /// # struct MyTasklet;
267    /// # impl Tasklet for MyTasklet {
268    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
269    /// #         Ok(RepeatStatus::Finished)
270    /// #     }
271    /// # }
272    ///
273    /// let tasklet = MyTasklet;
274    /// let builder = TaskletBuilder::new("my-step")
275    ///     .tasklet(&tasklet);
276    /// ```
277    pub fn tasklet(mut self, tasklet: &'a dyn Tasklet) -> Self {
278        self.tasklet = Some(tasklet);
279        self
280    }
281
282    /// Builds the TaskletStep instance.
283    ///
284    /// # Panics
285    /// Panics if no tasklet has been set using the `tasklet()` method.
286    ///
287    /// # Examples
288    ///
289    /// ```rust
290    /// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
291    /// use spring_batch_rs::BatchError;
292    ///
293    /// # struct MyTasklet;
294    /// # impl Tasklet for MyTasklet {
295    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
296    /// #         Ok(RepeatStatus::Finished)
297    /// #     }
298    /// # }
299    ///
300    /// let tasklet = MyTasklet;
301    /// let step = TaskletBuilder::new("my-step")
302    ///     .tasklet(&tasklet)
303    ///     .build();
304    /// ```
305    pub fn build(self) -> TaskletStep<'a> {
306        TaskletStep {
307            name: self.name,
308            tasklet: self
309                .tasklet
310                .expect("Tasklet is required for building a step"),
311        }
312    }
313}
314
315/// Represents the execution context and metrics for a step.
316///
317/// StepExecution tracks all relevant information about a step's execution,
318/// including timing, item counts, error counts, and current status.
319///
320/// # Examples
321///
322/// ```rust
323/// use spring_batch_rs::core::step::{StepExecution, StepStatus};
324///
325/// let mut step_execution = StepExecution::new("data-processing-step");
326/// assert_eq!(step_execution.status, StepStatus::Starting);
327/// assert_eq!(step_execution.read_count, 0);
328/// assert_eq!(step_execution.write_count, 0);
329/// ```
330#[derive(Clone)]
331pub struct StepExecution {
332    /// Unique identifier for this step instance
333    pub id: Uuid,
334    /// Human-readable name for the step
335    pub name: String,
336    /// Current status of the step execution
337    pub status: StepStatus,
338    /// Timestamp when the step started execution
339    pub start_time: Option<Instant>,
340    /// Timestamp when the step completed execution
341    pub end_time: Option<Instant>,
342    /// Total duration of step execution
343    pub duration: Option<Duration>,
344    /// Number of items successfully read from the source
345    pub read_count: usize,
346    /// Number of items successfully written to the destination
347    pub write_count: usize,
348    /// Number of errors encountered during reading
349    pub read_error_count: usize,
350    /// Number of items successfully processed
351    pub process_count: usize,
352    /// Number of errors encountered during processing
353    pub process_error_count: usize,
354    /// Number of errors encountered during writing
355    pub write_error_count: usize,
356}
357
358impl StepExecution {
359    /// Creates a new StepExecution with the specified name.
360    ///
361    /// Initializes all counters to zero and sets the status to `Starting`.
362    /// A unique UUID is generated for this execution instance.
363    ///
364    /// # Parameters
365    /// - `name`: Human-readable name for the step
366    ///
367    /// # Examples
368    ///
369    /// ```rust
370    /// use spring_batch_rs::core::step::{StepExecution, StepStatus};
371    ///
372    /// let step_execution = StepExecution::new("my-step");
373    /// assert_eq!(step_execution.name, "my-step");
374    /// assert_eq!(step_execution.status, StepStatus::Starting);
375    /// assert!(!step_execution.id.is_nil());
376    /// ```
377    pub fn new(name: &str) -> Self {
378        Self {
379            id: Uuid::new_v4(),
380            name: name.to_string(),
381            status: StepStatus::Starting,
382            start_time: None,
383            end_time: None,
384            duration: None,
385            read_count: 0,
386            write_count: 0,
387            read_error_count: 0,
388            process_count: 0,
389            process_error_count: 0,
390            write_error_count: 0,
391        }
392    }
393}
394
395/// Represents the overall status of a batch job.
396///
397/// This enum defines all possible states that a batch job can be in
398/// during its lifecycle, from initialization to completion.
399///
400/// # Examples
401///
402/// ```rust
403/// use spring_batch_rs::core::step::BatchStatus;
404///
405/// let status = BatchStatus::COMPLETED;
406/// match status {
407///     BatchStatus::COMPLETED => println!("Job finished successfully"),
408///     BatchStatus::FAILED => println!("Job failed"),
409///     _ => println!("Job in progress or other state"),
410/// }
411/// ```
412pub enum BatchStatus {
413    /// The batch job has successfully completed its execution.
414    COMPLETED,
415    /// Status of a batch job prior to its execution.
416    STARTING,
417    /// Status of a batch job that is running.
418    STARTED,
419    /// Status of batch job waiting for a step to complete before stopping the batch job.
420    STOPPING,
421    /// Status of a batch job that has been stopped by request.
422    STOPPED,
423    /// Status of a batch job that has failed during its execution.
424    FAILED,
425    /// Status of a batch job that did not stop properly and can not be restarted.
426    ABANDONED,
427    /// Status of a batch job that is in an uncertain state.
428    UNKNOWN,
429}
430
431/// Core trait that defines the contract for step execution.
432///
433/// All step implementations must provide execution logic and a name.
434/// The step is responsible for coordinating the processing of data
435/// and managing its own lifecycle.
436///
437/// # Examples
438///
439/// ```rust
440/// use spring_batch_rs::core::step::{Step, StepExecution};
441/// use spring_batch_rs::BatchError;
442///
443/// struct CustomStep {
444///     name: String,
445/// }
446///
447/// impl Step for CustomStep {
448///     fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
449///         // Custom step logic here
450///         Ok(())
451///     }
452///
453///     fn get_name(&self) -> &str {
454///         &self.name
455///     }
456/// }
457/// ```
458pub trait Step {
459    /// Executes the step.
460    ///
461    /// This method represents the main operation of the step. It coordinates
462    /// reading items, processing them, and writing them out for chunk-oriented
463    /// steps, or executes a single task for tasklet steps.
464    ///
465    /// # Parameters
466    /// - `step_execution`: Mutable reference to track execution state and metrics
467    ///
468    /// # Returns
469    /// - `Ok(())`: The step completed successfully
470    /// - `Err(BatchError)`: The step failed due to an error
471    ///
472    /// # Examples
473    ///
474    /// ```rust
475    /// use spring_batch_rs::core::step::{Step, StepExecution, StepStatus};
476    /// use spring_batch_rs::BatchError;
477    ///
478    /// # struct MyStep { name: String }
479    /// # impl Step for MyStep {
480    /// #     fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
481    /// #         step_execution.status = StepStatus::Success;
482    /// #         Ok(())
483    /// #     }
484    /// #     fn get_name(&self) -> &str { &self.name }
485    /// # }
486    /// let step = MyStep { name: "test".to_string() };
487    /// let mut execution = StepExecution::new(step.get_name());
488    /// let result = step.execute(&mut execution);
489    /// assert!(result.is_ok());
490    /// ```
491    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError>;
492
493    /// Returns the name of this step.
494    ///
495    /// # Examples
496    ///
497    /// ```rust
498    /// # use spring_batch_rs::core::step::{Step, StepExecution};
499    /// # use spring_batch_rs::BatchError;
500    /// # struct MyStep { name: String }
501    /// # impl Step for MyStep {
502    /// #     fn execute(&self, _step_execution: &mut StepExecution) -> Result<(), BatchError> { Ok(()) }
503    /// #     fn get_name(&self) -> &str { &self.name }
504    /// # }
505    /// let step = MyStep { name: "data-processing".to_string() };
506    /// assert_eq!(step.get_name(), "data-processing");
507    /// ```
508    fn get_name(&self) -> &str;
509}
510
511/// Indicates whether a tasklet should continue executing or has finished.
512///
513/// This enum is returned by tasklet implementations to control
514/// the execution flow and indicate completion status.
515///
516/// # Examples
517///
518/// ```rust
519/// use spring_batch_rs::core::step::RepeatStatus;
520///
521/// let status = RepeatStatus::Finished;
522/// match status {
523///     RepeatStatus::Continuable => println!("Tasklet can continue"),
524///     RepeatStatus::Finished => println!("Tasklet has completed"),
525/// }
526/// ```
527#[derive(Debug, PartialEq)]
528pub enum RepeatStatus {
529    /// The tasklet can continue to execute.
530    ///
531    /// This indicates that the tasklet has more work to do and should
532    /// be called again in the next execution cycle.
533    Continuable,
534    /// The tasklet has finished executing.
535    ///
536    /// This indicates that the tasklet has completed all its work
537    /// and should not be executed again.
538    Finished,
539}
540
541/// A step implementation that processes items in chunks.
542///
543/// ChunkOrientedStep reads items from a reader, processes them through a processor,
544/// and writes them using a writer. It handles errors gracefully with configurable
545/// skip limits and provides comprehensive metrics tracking.
546///
547/// # Examples
548///
549/// ## Chunk-Oriented Processing
550///
551/// ```rust
552/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
553/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter, PassThroughProcessor};
554/// use spring_batch_rs::BatchError;
555///
556/// # struct MyReader;
557/// # impl ItemReader<String> for MyReader {
558/// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
559/// # }
560/// # struct MyProcessor;
561/// # impl ItemProcessor<String, String> for MyProcessor {
562/// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.clone()) }
563/// # }
564/// # struct MyWriter;
565/// # impl ItemWriter<String> for MyWriter {
566/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
567/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
568/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
569/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
570/// # }
571///
572/// let reader = MyReader;
573/// let processor = PassThroughProcessor::<String>::new();
574/// let writer = MyWriter;
575///
576/// let step = StepBuilder::new("my-step")
577///     .chunk::<String, String>(100)                    // Process 100 items per chunk
578///     .reader(&reader)
579///     .processor(&processor)
580///     .writer(&writer)
581///     .skip_limit(10)               // Allow up to 10 errors
582///     .build();
583///
584/// let mut step_execution = StepExecution::new(step.get_name());
585/// let result = step.execute(&mut step_execution);
586/// ```
587pub struct ChunkOrientedStep<'a, I, O> {
588    name: String,
589    /// Component responsible for reading items from the source
590    reader: &'a dyn ItemReader<I>,
591    /// Component responsible for processing items
592    processor: &'a dyn ItemProcessor<I, O>,
593    /// Component responsible for writing items to the destination
594    writer: &'a dyn ItemWriter<O>,
595    /// Number of items to process in each chunk
596    chunk_size: u16,
597    /// Maximum number of errors allowed before failing the step
598    skip_limit: u16,
599}
600
601impl<I, O> Step for ChunkOrientedStep<'_, I, O> {
602    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
603        // Start the timer and logging
604        let start_time = Instant::now();
605        info!(
606            "Start of step: {}, id: {}",
607            step_execution.name, step_execution.id
608        );
609
610        // Open the writer and handle any errors
611        Self::manage_error(self.writer.open());
612
613        // Main processing loop
614        loop {
615            // Read chunk
616            let (read_items, chunk_status) = match self.read_chunk(step_execution) {
617                Ok(chunk_data) => chunk_data,
618                Err(_) => {
619                    step_execution.status = StepStatus::ReadError;
620                    break;
621                }
622            };
623
624            // If no items to process, we're done
625            if read_items.is_empty() {
626                step_execution.status = StepStatus::Success;
627                break;
628            }
629
630            // Process and write the chunk
631            if self
632                .process_and_write_chunk(step_execution, &read_items)
633                .is_err()
634            {
635                break; // Status already set in the method
636            }
637
638            // Check if we've reached the end
639            if chunk_status == ChunkStatus::Finished {
640                step_execution.status = StepStatus::Success;
641                break;
642            }
643        }
644
645        // Close the writer and handle any errors
646        Self::manage_error(self.writer.close());
647
648        // Log the end of the step
649        info!(
650            "End of step: {}, id: {}",
651            step_execution.name, step_execution.id
652        );
653
654        // Calculate the step execution details
655        step_execution.start_time = Some(start_time);
656        step_execution.end_time = Some(Instant::now());
657        step_execution.duration = Some(start_time.elapsed());
658
659        // Return the step execution details if the step is successful,
660        // or an error if the step failed
661        if StepStatus::Success == step_execution.status {
662            Ok(())
663        } else {
664            Err(BatchError::Step(step_execution.name.clone()))
665        }
666    }
667
668    fn get_name(&self) -> &str {
669        &self.name
670    }
671}
672
673impl<I, O> ChunkOrientedStep<'_, I, O> {
674    /// Processes a chunk of items and writes them.
675    ///
676    /// This method combines the processing and writing operations for a chunk,
677    /// handling errors appropriately and updating the step execution status.
678    ///
679    /// # Parameters
680    /// - `step_execution`: Mutable reference to track execution state
681    /// - `read_items`: Slice of items to process and write
682    ///
683    /// # Returns
684    /// - `Ok(())`: The chunk was processed and written successfully
685    /// - `Err(BatchError)`: An error occurred during processing or writing
686    fn process_and_write_chunk(
687        &self,
688        step_execution: &mut StepExecution,
689        read_items: &[I],
690    ) -> Result<(), BatchError> {
691        // Process the chunk
692        let processed_items = match self.process_chunk(step_execution, read_items) {
693            Ok(items) => items,
694            Err(error) => {
695                step_execution.status = StepStatus::ProcessorError;
696                return Err(error);
697            }
698        };
699
700        // Write the processed items
701        match self.write_chunk(step_execution, &processed_items) {
702            Ok(()) => Ok(()),
703            Err(error) => {
704                step_execution.status = StepStatus::WriteError;
705                Err(error)
706            }
707        }
708    }
709
710    /// Reads a chunk of items from the reader.
711    ///
712    /// This method attempts to read up to `chunk_size` items from the reader.
713    /// It stops when either:
714    /// - The chunk is full (reached `chunk_size` items)
715    /// - There are no more items to read
716    /// - The error skip limit is reached
717    ///
718    /// # Parameters
719    /// - `read_items`: Vector to store the read items
720    ///
721    /// # Returns
722    /// - `Ok(ChunkStatus::Full)`: The chunk is full with `chunk_size` items
723    /// - `Ok(ChunkStatus::Finished)`: There are no more items to read
724    /// - `Err(BatchError)`: An error occurred and skip limit was reached
725    fn read_chunk(
726        &self,
727        step_execution: &mut StepExecution,
728    ) -> Result<(Vec<I>, ChunkStatus), BatchError> {
729        debug!("Start reading chunk");
730
731        let mut read_items = Vec::with_capacity(self.chunk_size as usize);
732
733        loop {
734            let read_result = self.reader.read();
735
736            match read_result {
737                Ok(item) => {
738                    match item {
739                        Some(item) => {
740                            read_items.push(item);
741                            step_execution.read_count += 1;
742
743                            if read_items.len() >= self.chunk_size as usize {
744                                return Ok((read_items, ChunkStatus::Full));
745                            }
746                        }
747                        None => {
748                            if read_items.is_empty() {
749                                return Ok((read_items, ChunkStatus::Finished));
750                            } else {
751                                return Ok((read_items, ChunkStatus::Full));
752                            }
753                        }
754                    };
755                }
756                Err(error) => {
757                    warn!("Error reading item: {}", error);
758                    step_execution.read_error_count += 1;
759
760                    if self.is_skip_limit_reached(step_execution) {
761                        // Set the status to ReadError when we hit the limit
762                        step_execution.status = StepStatus::ReadError;
763                        return Err(error);
764                    }
765                }
766            }
767        }
768    }
769
770    /// Processes a chunk of items using the processor.
771    ///
772    /// This method applies the processor to each item in the input chunk.
773    /// It collects the successfully processed items and tracks any errors.
774    ///
775    /// # Parameters
776    /// - `read_items`: Vector of items to process
777    ///
778    /// # Returns
779    /// - `Ok(Vec<W>)`: Vector of successfully processed items
780    /// - `Err(BatchError)`: An error occurred and skip limit was reached
781    fn process_chunk(
782        &self,
783        step_execution: &mut StepExecution,
784        read_items: &[I],
785    ) -> Result<Vec<O>, BatchError> {
786        debug!("Processing chunk of {} items", read_items.len());
787        let mut result = Vec::with_capacity(read_items.len());
788
789        for item in read_items {
790            match self.processor.process(item) {
791                Ok(processed_item) => {
792                    result.push(processed_item);
793                    step_execution.process_count += 1;
794                }
795                Err(error) => {
796                    warn!("Error processing item: {}", error);
797                    step_execution.process_error_count += 1;
798
799                    if self.is_skip_limit_reached(step_execution) {
800                        // Set the status to ProcessorError when we hit the limit
801                        step_execution.status = StepStatus::ProcessorError;
802                        return Err(error);
803                    }
804                }
805            }
806        }
807
808        Ok(result)
809    }
810
811    /// Writes a chunk of processed items using the writer.
812    ///
813    /// This method writes the processed items to the destination
814    /// and handles any errors that occur.
815    ///
816    /// # Parameters
817    /// - `processed_items`: Vector of items to write
818    ///
819    /// # Returns
820    /// - `Ok(())`: All items were written successfully
821    /// - `Err(BatchError)`: An error occurred and skip limit was reached
822    fn write_chunk(
823        &self,
824        step_execution: &mut StepExecution,
825        processed_items: &[O],
826    ) -> Result<(), BatchError> {
827        debug!("Writing chunk of {} items", processed_items.len());
828
829        if processed_items.is_empty() {
830            debug!("No items to write, skipping write call");
831            return Ok(());
832        }
833
834        match self.writer.write(processed_items) {
835            Ok(()) => {
836                step_execution.write_count += processed_items.len();
837                Self::manage_error(self.writer.flush());
838                Ok(())
839            }
840            Err(error) => {
841                warn!("Error writing items: {}", error);
842                step_execution.write_error_count += processed_items.len();
843
844                if self.is_skip_limit_reached(step_execution) {
845                    // Set the status to WriteError to indicate a write failure
846                    step_execution.status = StepStatus::WriteError;
847                    return Err(error);
848                }
849                Ok(())
850            }
851        }
852    }
853
854    fn is_skip_limit_reached(&self, step_execution: &StepExecution) -> bool {
855        step_execution.read_error_count
856            + step_execution.write_error_count
857            + step_execution.process_error_count
858            > self.skip_limit.into()
859    }
860    /// Helper method to handle errors gracefully.
861    ///
862    /// This method is used to handle errors from operations where we want
863    /// to log the error but not fail the step.
864    ///
865    /// # Parameters
866    /// - `result`: Result to check for errors
867    fn manage_error(result: Result<(), BatchError>) {
868        if let Err(error) = result {
869            warn!("Non-fatal error: {}", error);
870        }
871    }
872}
873
874/// Builder for creating ChunkOrientedStep instances.
875///
876/// Provides a fluent API for configuring chunk-oriented steps with validation
877/// to ensure all required components (reader, processor, writer) are provided.
878///
879/// # Type Parameters
880/// - `I`: The input item type (what the reader produces)
881/// - `O`: The output item type (what the processor produces and writer consumes)
882///
883/// # Examples
884///
885/// ```rust
886/// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
887/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
888/// use spring_batch_rs::BatchError;
889///
890/// # struct MyReader;
891/// # impl ItemReader<i32> for MyReader {
892/// #     fn read(&self) -> Result<Option<i32>, BatchError> { Ok(None) }
893/// # }
894/// # struct MyProcessor;
895/// # impl ItemProcessor<i32, String> for MyProcessor {
896/// #     fn process(&self, item: &i32) -> Result<String, BatchError> { Ok(item.to_string()) }
897/// # }
898/// # struct MyWriter;
899/// # impl ItemWriter<String> for MyWriter {
900/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
901/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
902/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
903/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
904/// # }
905/// let reader = MyReader;
906/// let processor = MyProcessor;
907/// let writer = MyWriter;
908///
909/// let step = ChunkOrientedStepBuilder::new("number-to-string")
910///     .reader(&reader)
911///     .processor(&processor)
912///     .writer(&writer)
913///     .chunk_size(500)
914///     .skip_limit(25)
915///     .build();
916/// ```
917pub struct ChunkOrientedStepBuilder<'a, I, O> {
918    /// Name for the step
919    name: String,
920    /// Component responsible for reading items from the source
921    reader: Option<&'a dyn ItemReader<I>>,
922    /// Component responsible for processing items
923    processor: Option<&'a dyn ItemProcessor<I, O>>,
924    /// Component responsible for writing items to the destination
925    writer: Option<&'a dyn ItemWriter<O>>,
926    /// Number of items to process in each chunk
927    chunk_size: u16,
928    /// Maximum number of errors allowed before failing the step
929    skip_limit: u16,
930}
931
932impl<'a, I, O> ChunkOrientedStepBuilder<'a, I, O> {
933    /// Creates a new ChunkOrientedStepBuilder with the specified name.
934    ///
935    /// Sets default values:
936    /// - `chunk_size`: 10
937    /// - `skip_limit`: 0 (no error tolerance)
938    ///
939    /// # Parameters
940    /// - `name`: Human-readable name for the step
941    ///
942    /// # Examples
943    ///
944    /// ```rust
945    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
946    ///
947    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("data-migration");
948    /// ```
949    pub fn new(name: &str) -> Self {
950        Self {
951            name: name.to_string(),
952            reader: None,
953            processor: None,
954            writer: None,
955            chunk_size: 10,
956            skip_limit: 0,
957        }
958    }
959
960    /// Sets the item reader for this step.
961    ///
962    /// The reader is responsible for providing items to be processed.
963    /// This is a required component for chunk-oriented steps.
964    ///
965    /// # Parameters
966    /// - `reader`: Implementation of ItemReader that produces items of type `I`
967    ///
968    /// # Examples
969    ///
970    /// ```rust
971    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
972    /// # use spring_batch_rs::core::item::ItemReader;
973    /// # use spring_batch_rs::BatchError;
974    /// # struct FileReader;
975    /// # impl ItemReader<String> for FileReader {
976    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
977    /// # }
978    /// let reader = FileReader;
979    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("file-processing")
980    ///     .reader(&reader);
981    /// ```
982    pub fn reader(mut self, reader: &'a dyn ItemReader<I>) -> Self {
983        self.reader = Some(reader);
984        self
985    }
986
987    /// Sets the item processor for this step.
988    ///
989    /// The processor transforms items from type `I` to type `O`.
990    /// This is a required component for chunk-oriented steps.
991    ///
992    /// # Parameters
993    /// - `processor`: Implementation of ItemProcessor that transforms items from `I` to `O`
994    ///
995    /// # Examples
996    ///
997    /// ```rust
998    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
999    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor};
1000    /// # use spring_batch_rs::BatchError;
1001    /// # struct FileReader;
1002    /// # impl ItemReader<String> for FileReader {
1003    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1004    /// # }
1005    /// # struct UppercaseProcessor;
1006    /// # impl ItemProcessor<String, String> for UppercaseProcessor {
1007    /// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.to_uppercase()) }
1008    /// # }
1009    /// let reader = FileReader;
1010    /// let processor = UppercaseProcessor;
1011    /// let builder = ChunkOrientedStepBuilder::new("text-processing")
1012    ///     .reader(&reader)
1013    ///     .processor(&processor);
1014    /// ```
1015    pub fn processor(mut self, processor: &'a dyn ItemProcessor<I, O>) -> Self {
1016        self.processor = Some(processor);
1017        self
1018    }
1019
1020    /// Sets the item writer for this step.
1021    ///
1022    /// The writer is responsible for persisting processed items.
1023    /// This is a required component for chunk-oriented steps.
1024    ///
1025    /// # Parameters
1026    /// - `writer`: Implementation of ItemWriter that consumes items of type `O`
1027    ///
1028    /// # Examples
1029    ///
1030    /// ```rust
1031    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1032    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1033    /// # use spring_batch_rs::BatchError;
1034    /// # struct FileReader;
1035    /// # impl ItemReader<String> for FileReader {
1036    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1037    /// # }
1038    /// # struct UppercaseProcessor;
1039    /// # impl ItemProcessor<String, String> for UppercaseProcessor {
1040    /// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.to_uppercase()) }
1041    /// # }
1042    /// # struct FileWriter;
1043    /// # impl ItemWriter<String> for FileWriter {
1044    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1045    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1046    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1047    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1048    /// # }
1049    /// let reader = FileReader;
1050    /// let processor = UppercaseProcessor;
1051    /// let writer = FileWriter;
1052    /// let builder = ChunkOrientedStepBuilder::new("file-processing")
1053    ///     .reader(&reader)
1054    ///     .processor(&processor)
1055    ///     .writer(&writer);
1056    /// ```
1057    pub fn writer(mut self, writer: &'a dyn ItemWriter<O>) -> Self {
1058        self.writer = Some(writer);
1059        self
1060    }
1061
1062    /// Sets the chunk size for this step.
1063    ///
1064    /// The chunk size determines how many items are processed together
1065    /// in a single transaction. Larger chunks can improve performance
1066    /// but use more memory.
1067    ///
1068    /// # Parameters
1069    /// - `chunk_size`: Number of items to process per chunk (must be > 0)
1070    ///
1071    /// # Examples
1072    ///
1073    /// ```rust
1074    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1075    ///
1076    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("bulk-processing")
1077    ///     .chunk_size(1000); // Process 1000 items per chunk
1078    /// ```
1079    pub fn chunk_size(mut self, chunk_size: u16) -> Self {
1080        self.chunk_size = chunk_size;
1081        self
1082    }
1083
1084    /// Sets the skip limit for this step.
1085    ///
1086    /// The skip limit determines how many errors are tolerated before
1087    /// the step fails. A value of 0 means no errors are tolerated.
1088    ///
1089    /// # Parameters
1090    /// - `skip_limit`: Maximum number of errors allowed
1091    ///
1092    /// # Examples
1093    ///
1094    /// ```rust
1095    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1096    ///
1097    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("fault-tolerant-processing")
1098    ///     .skip_limit(100); // Allow up to 100 errors
1099    /// ```
1100    pub fn skip_limit(mut self, skip_limit: u16) -> Self {
1101        self.skip_limit = skip_limit;
1102        self
1103    }
1104
1105    /// Builds the ChunkOrientedStep instance.
1106    ///
1107    /// # Panics
1108    /// Panics if any required component (reader, processor, writer) has not been set.
1109    ///
1110    /// # Examples
1111    ///
1112    /// ```rust
1113    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1114    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1115    /// # use spring_batch_rs::BatchError;
1116    /// # struct MyReader;
1117    /// # impl ItemReader<String> for MyReader {
1118    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1119    /// # }
1120    /// # struct MyProcessor;
1121    /// # impl ItemProcessor<String, String> for MyProcessor {
1122    /// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.clone()) }
1123    /// # }
1124    /// # struct MyWriter;
1125    /// # impl ItemWriter<String> for MyWriter {
1126    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1127    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1128    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1129    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1130    /// # }
1131    /// let reader = MyReader;
1132    /// let processor = MyProcessor;
1133    /// let writer = MyWriter;
1134    ///
1135    /// let step = ChunkOrientedStepBuilder::new("complete-step")
1136    ///     .reader(&reader)
1137    ///     .processor(&processor)
1138    ///     .writer(&writer)
1139    ///     .chunk_size(500)
1140    ///     .skip_limit(10)
1141    ///     .build();
1142    /// ```
1143    pub fn build(self) -> ChunkOrientedStep<'a, I, O> {
1144        ChunkOrientedStep {
1145            name: self.name,
1146            reader: self.reader.expect("Reader is required for building a step"),
1147            processor: self
1148                .processor
1149                .expect("Processor is required for building a step"),
1150            writer: self.writer.expect("Writer is required for building a step"),
1151            chunk_size: self.chunk_size,
1152            skip_limit: self.skip_limit,
1153        }
1154    }
1155}
1156
1157/// Main entry point for building steps of any type.
1158///
1159/// StepBuilder provides a unified interface for creating both chunk-oriented
1160/// and tasklet steps. It uses the builder pattern to provide a fluent API
1161/// for step configuration.
1162///
1163/// # Type Parameters
1164/// - `I`: The input item type for chunk-oriented steps
1165/// - `O`: The output item type for chunk-oriented steps
1166///
1167/// # Examples
1168///
1169/// ## Creating a Chunk-Oriented Step
1170///
1171/// ```rust
1172/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
1173/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1174/// use spring_batch_rs::BatchError;
1175///
1176/// # struct MyReader;
1177/// # impl ItemReader<String> for MyReader {
1178/// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1179/// # }
1180/// # struct MyProcessor;
1181/// # impl ItemProcessor<String, String> for MyProcessor {
1182/// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.clone()) }
1183/// # }
1184/// # struct MyWriter;
1185/// # impl ItemWriter<String> for MyWriter {
1186/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1187/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1188/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1189/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1190/// # }
1191/// let reader = MyReader;
1192/// let processor = MyProcessor;
1193/// let writer = MyWriter;
1194///
1195/// let step = StepBuilder::new("data-processing")
1196///     .chunk::<String, String>(100)
1197///     .reader(&reader)
1198///     .processor(&processor)
1199///     .writer(&writer)
1200///     .build();
1201/// ```
1202///
1203/// ## Creating a Tasklet Step
1204///
1205/// ```rust
1206/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
1207/// use spring_batch_rs::BatchError;
1208///
1209/// # struct MyTasklet;
1210/// # impl Tasklet for MyTasklet {
1211/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
1212/// #         Ok(RepeatStatus::Finished)
1213/// #     }
1214/// # }
1215/// let tasklet = MyTasklet;
1216///
1217/// let step = StepBuilder::new("cleanup-task")
1218///     .tasklet(&tasklet)
1219///     .build();
1220/// ```
1221pub struct StepBuilder {
1222    name: String,
1223}
1224
1225impl StepBuilder {
1226    /// Creates a new StepBuilder with the specified name.
1227    ///
1228    /// # Parameters
1229    /// - `name`: Human-readable name for the step
1230    ///
1231    /// # Examples
1232    ///
1233    /// ```rust
1234    /// use spring_batch_rs::core::step::StepBuilder;
1235    ///
1236    /// let builder = StepBuilder::new("my-step");
1237    /// ```
1238    pub fn new(name: &str) -> Self {
1239        Self {
1240            name: name.to_string(),
1241        }
1242    }
1243
1244    /// Configures this step to use a tasklet for execution.
1245    ///
1246    /// Returns a TaskletBuilder for further configuration of the tasklet step.
1247    ///
1248    /// # Parameters
1249    /// - `tasklet`: The tasklet implementation to execute
1250    ///
1251    /// # Examples
1252    ///
1253    /// ```rust
1254    /// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
1255    /// use spring_batch_rs::BatchError;
1256    ///
1257    /// # struct FileCleanupTasklet;
1258    /// # impl Tasklet for FileCleanupTasklet {
1259    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
1260    /// #         Ok(RepeatStatus::Finished)
1261    /// #     }
1262    /// # }
1263    /// let tasklet = FileCleanupTasklet;
1264    /// let step = StepBuilder::new("cleanup")
1265    ///     .tasklet(&tasklet)
1266    ///     .build();
1267    /// ```
1268    pub fn tasklet(self, tasklet: &dyn Tasklet) -> TaskletBuilder<'_> {
1269        TaskletBuilder::new(&self.name).tasklet(tasklet)
1270    }
1271
1272    /// Configures this step for chunk-oriented processing.
1273    ///
1274    /// Returns a ChunkOrientedStepBuilder for further configuration of the chunk step.
1275    ///
1276    /// # Parameters
1277    /// - `chunk_size`: Number of items to process per chunk
1278    ///
1279    /// # Examples
1280    ///
1281    /// ```rust
1282    /// use spring_batch_rs::core::step::{StepBuilder, Step};
1283    /// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1284    /// use spring_batch_rs::BatchError;
1285    ///
1286    /// # struct MyReader;
1287    /// # impl ItemReader<String> for MyReader {
1288    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1289    /// # }
1290    /// # struct MyProcessor;
1291    /// # impl ItemProcessor<String, String> for MyProcessor {
1292    /// #     fn process(&self, item: &String) -> Result<String, BatchError> { Ok(item.clone()) }
1293    /// # }
1294    /// # struct MyWriter;
1295    /// # impl ItemWriter<String> for MyWriter {
1296    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1297    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1298    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1299    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1300    /// # }
1301    /// let reader = MyReader;
1302    /// let processor = MyProcessor;
1303    /// let writer = MyWriter;
1304    ///
1305    /// let step = StepBuilder::new("bulk-processing")
1306    ///     .chunk(1000)  // Process 1000 items per chunk
1307    ///     .reader(&reader)
1308    ///     .processor(&processor)
1309    ///     .writer(&writer)
1310    ///     .build();
1311    /// ```
1312    pub fn chunk<'a, I, O>(self, chunk_size: u16) -> ChunkOrientedStepBuilder<'a, I, O> {
1313        ChunkOrientedStepBuilder::new(&self.name).chunk_size(chunk_size)
1314    }
1315}
1316
1317/// Represents the status of a chunk during processing.
1318///
1319/// This enum indicates whether a chunk has been fully processed or if
1320/// there are more items to process. It's used internally by the step
1321/// execution logic to control the processing loop.
1322///
1323/// # Examples
1324///
1325/// ```rust
1326/// use spring_batch_rs::core::step::ChunkStatus;
1327///
1328/// let status = ChunkStatus::Full;
1329/// match status {
1330///     ChunkStatus::Full => println!("Chunk is ready for processing"),
1331///     ChunkStatus::Finished => println!("No more items to process"),
1332/// }
1333/// ```
1334#[derive(Debug, PartialEq)]
1335pub enum ChunkStatus {
1336    /// The chunk has been fully processed.
1337    ///
1338    /// This indicates that there are no more items to process in the current
1339    /// data source (typically because we've reached the end of the input).
1340    /// The step should complete after processing any remaining items.
1341    Finished,
1342
1343    /// The chunk is full and ready to be processed.
1344    ///
1345    /// This indicates that we've collected a full chunk of items (based on
1346    /// the configured chunk size) and they are ready to be processed.
1347    /// The step should continue reading more chunks after processing this one.
1348    Full,
1349}
1350
1351/// Represents the current status of a step execution.
1352///
1353/// This enum indicates the current state of a step execution, including
1354/// both success and various failure states. It helps track the step's
1355/// progress and identify the cause of any failures.
1356///
1357/// # Examples
1358///
1359/// ```rust
1360/// use spring_batch_rs::core::step::{StepExecution, StepStatus};
1361///
1362/// let mut step_execution = StepExecution::new("my-step");
1363/// assert_eq!(step_execution.status, StepStatus::Starting);
1364///
1365/// // After successful execution
1366/// step_execution.status = StepStatus::Success;
1367/// match step_execution.status {
1368///     StepStatus::Success => println!("Step completed successfully"),
1369///     StepStatus::ReadError => println!("Failed during reading"),
1370///     StepStatus::ProcessorError => println!("Failed during processing"),
1371///     StepStatus::WriteError => println!("Failed during writing"),
1372///     StepStatus::Starting => println!("Step is starting"),
1373///     StepStatus::Failed => println!("Step has failed"),
1374///     StepStatus::Started => println!("Step has started"),
1375/// }
1376/// ```
1377#[derive(Debug, PartialEq, Clone, Copy)]
1378pub enum StepStatus {
1379    /// The step executed successfully.
1380    ///
1381    /// All items were read, processed, and written without errors
1382    /// exceeding configured skip limits. This is the desired end state
1383    /// for a step execution.
1384    Success,
1385
1386    /// An error occurred during the read operation.
1387    ///
1388    /// This indicates that an error occurred while reading items from the
1389    /// source, and the error count exceeded the configured skip limit.
1390    /// The step was terminated due to too many read failures.
1391    ReadError,
1392
1393    /// An error occurred during the processing operation.
1394    ///
1395    /// This indicates that an error occurred while processing items, and
1396    /// the error count exceeded the configured skip limit.
1397    /// The step was terminated due to too many processing failures.
1398    ProcessorError,
1399
1400    /// An error occurred during the write operation.
1401    ///
1402    /// This indicates that an error occurred while writing items to the
1403    /// destination, and the error count exceeded the configured skip limit.
1404    /// The step was terminated due to too many write failures.
1405    WriteError,
1406
1407    /// The step is starting.
1408    ///
1409    /// This is the initial state of a step before execution begins.
1410    /// All steps start in this state when first created.
1411    Starting,
1412
1413    /// The step is failed.
1414    ///
1415    /// This is the final state of a step after execution has failed.
1416    Failed,
1417
1418    /// The step is started.
1419    ///
1420    /// This is the state of a step after execution has started.
1421    Started,
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426    use anyhow::Result;
1427    use mockall::mock;
1428    use serde::{Deserialize, Serialize};
1429
1430    use crate::{
1431        core::{
1432            item::{
1433                ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
1434                ItemWriterResult,
1435            },
1436            step::{StepExecution, StepStatus},
1437        },
1438        BatchError,
1439    };
1440
1441    use super::{
1442        BatchStatus, ChunkOrientedStepBuilder, ChunkStatus, RepeatStatus, Step, StepBuilder,
1443        Tasklet, TaskletBuilder,
1444    };
1445
1446    mock! {
1447        pub TestItemReader {}
1448        impl ItemReader<Car> for TestItemReader {
1449            fn read(&self) -> ItemReaderResult<Car>;
1450        }
1451    }
1452
1453    mock! {
1454        pub TestProcessor {}
1455        impl ItemProcessor<Car, Car> for TestProcessor {
1456            fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
1457        }
1458    }
1459
1460    mock! {
1461        pub TestItemWriter {}
1462        impl ItemWriter<Car> for TestItemWriter {
1463            fn write(&self, items: &[Car]) -> ItemWriterResult;
1464            fn flush(&self) -> ItemWriterResult;
1465            fn open(&self) -> ItemWriterResult;
1466            fn close(&self) -> ItemWriterResult;
1467        }
1468    }
1469
1470    mock! {
1471        pub TestTasklet {}
1472        impl Tasklet for TestTasklet {
1473            fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
1474        }
1475    }
1476
1477    #[derive(Deserialize, Serialize, Debug, Clone)]
1478    struct Car {
1479        year: u16,
1480        make: String,
1481        model: String,
1482        description: String,
1483    }
1484
1485    fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
1486        if end_count > 0 && *i == end_count {
1487            return Ok(None);
1488        } else if error_count > 0 && *i == error_count {
1489            return Err(BatchError::ItemReader("mock read error".to_string()));
1490        }
1491
1492        let car = Car {
1493            year: 1979,
1494            make: "make".to_owned(),
1495            model: "model".to_owned(),
1496            description: "description".to_owned(),
1497        };
1498        *i += 1;
1499        Ok(Some(car))
1500    }
1501
1502    fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
1503        *i += 1;
1504        if error_at.contains(i) {
1505            return Err(BatchError::ItemProcessor("mock process error".to_string()));
1506        }
1507
1508        let car = Car {
1509            year: 1979,
1510            make: "make".to_owned(),
1511            model: "model".to_owned(),
1512            description: "description".to_owned(),
1513        };
1514        Ok(car)
1515    }
1516
1517    #[test]
1518    fn step_should_succeded_with_empty_data() -> Result<()> {
1519        let mut reader = MockTestItemReader::default();
1520        let reader_result = Ok(None);
1521        reader.expect_read().return_once(move || reader_result);
1522
1523        let mut processor = MockTestProcessor::default();
1524        processor.expect_process().never();
1525
1526        let mut writer = MockTestItemWriter::default();
1527        writer.expect_open().times(1).returning(|| Ok(()));
1528        writer.expect_write().never();
1529        writer.expect_close().times(1).returning(|| Ok(()));
1530
1531        let step = StepBuilder::new("test")
1532            .chunk(3)
1533            .reader(&reader)
1534            .processor(&processor)
1535            .writer(&writer)
1536            .build();
1537
1538        let mut step_execution = StepExecution::new(&step.name);
1539
1540        let result = step.execute(&mut step_execution);
1541
1542        assert!(result.is_ok());
1543        assert_eq!(step.get_name(), "test");
1544        assert!(!step.get_name().is_empty());
1545        assert!(!step_execution.id.is_nil());
1546        assert_eq!(step_execution.status, StepStatus::Success);
1547
1548        Ok(())
1549    }
1550
1551    #[test]
1552    fn step_should_failed_with_processor_error() -> Result<()> {
1553        let mut i = 0;
1554        let mut reader = MockTestItemReader::default();
1555        reader
1556            .expect_read()
1557            .returning(move || mock_read(&mut i, 0, 4));
1558
1559        let mut processor = MockTestProcessor::default();
1560        let mut i = 0;
1561        processor
1562            .expect_process()
1563            .returning(move |_| mock_process(&mut i, &[2]));
1564
1565        let mut writer = MockTestItemWriter::default();
1566        writer.expect_open().times(1).returning(|| Ok(()));
1567        writer.expect_write().never();
1568        writer.expect_close().times(1).returning(|| Ok(()));
1569
1570        let step = StepBuilder::new("test")
1571            .chunk(3)
1572            .reader(&reader)
1573            .processor(&processor)
1574            .writer(&writer)
1575            .build();
1576
1577        let mut step_execution = StepExecution::new(&step.name);
1578
1579        let result = step.execute(&mut step_execution);
1580
1581        assert!(result.is_err());
1582        assert_eq!(step_execution.status, StepStatus::ProcessorError);
1583
1584        Ok(())
1585    }
1586
1587    #[test]
1588    fn step_should_failed_with_write_error() -> Result<()> {
1589        let mut i = 0;
1590        let mut reader = MockTestItemReader::default();
1591        reader
1592            .expect_read()
1593            .returning(move || mock_read(&mut i, 0, 4));
1594
1595        let mut processor = MockTestProcessor::default();
1596        let mut i = 0;
1597        processor
1598            .expect_process()
1599            .returning(move |_| mock_process(&mut i, &[]));
1600
1601        let mut writer = MockTestItemWriter::default();
1602        writer.expect_open().times(1).returning(|| Ok(()));
1603        let result = Err(BatchError::ItemWriter("mock write error".to_string()));
1604        writer.expect_write().return_once(move |_| result);
1605        writer.expect_close().times(1).returning(|| Ok(()));
1606
1607        let step = StepBuilder::new("test")
1608            .chunk(3)
1609            .reader(&reader)
1610            .processor(&processor)
1611            .writer(&writer)
1612            .build();
1613
1614        let mut step_execution = StepExecution::new(&step.name);
1615
1616        let result = step.execute(&mut step_execution);
1617
1618        assert!(result.is_err());
1619        assert_eq!(step_execution.status, StepStatus::WriteError);
1620
1621        Ok(())
1622    }
1623
1624    #[test]
1625    fn step_should_succeed_even_with_processor_error() -> Result<()> {
1626        let mut i = 0;
1627        let mut reader = MockTestItemReader::default();
1628        reader
1629            .expect_read()
1630            .returning(move || mock_read(&mut i, 0, 4));
1631
1632        let mut processor = MockTestProcessor::default();
1633        let mut i = 0;
1634        processor
1635            .expect_process()
1636            .returning(move |_| mock_process(&mut i, &[2]));
1637
1638        let mut writer = MockTestItemWriter::default();
1639        writer.expect_open().times(1).returning(|| Ok(()));
1640        writer.expect_write().times(2).returning(|_| Ok(()));
1641        writer.expect_flush().times(2).returning(|| Ok(()));
1642        writer.expect_close().times(1).returning(|| Ok(()));
1643
1644        let step = StepBuilder::new("test")
1645            .chunk(3)
1646            .reader(&reader)
1647            .processor(&processor)
1648            .writer(&writer)
1649            .skip_limit(1)
1650            .build();
1651
1652        let mut step_execution = StepExecution::new(step.get_name());
1653
1654        let result = step.execute(&mut step_execution);
1655
1656        assert!(result.is_ok());
1657        assert_eq!(step_execution.status, StepStatus::Success);
1658
1659        Ok(())
1660    }
1661
1662    #[test]
1663    fn step_should_fail_with_read_error() -> Result<()> {
1664        let mut i = 0;
1665        let mut reader = MockTestItemReader::default();
1666        reader
1667            .expect_read()
1668            .returning(move || mock_read(&mut i, 1, 4));
1669
1670        let mut processor = MockTestProcessor::default();
1671        let mut i = 0;
1672        processor
1673            .expect_process()
1674            .returning(move |_| mock_process(&mut i, &[]));
1675
1676        let mut writer = MockTestItemWriter::default();
1677        writer.expect_open().times(1).returning(|| Ok(()));
1678        writer.expect_write().never();
1679        writer.expect_close().times(1).returning(|| Ok(()));
1680
1681        let step = StepBuilder::new("test")
1682            .chunk(3)
1683            .reader(&reader)
1684            .processor(&processor)
1685            .writer(&writer)
1686            .build();
1687
1688        let mut step_execution = StepExecution::new(&step.name);
1689
1690        let result = step.execute(&mut step_execution);
1691
1692        assert!(result.is_err());
1693        assert_eq!(step_execution.status, StepStatus::ReadError);
1694        assert_eq!(step_execution.read_error_count, 1);
1695
1696        Ok(())
1697    }
1698
1699    #[test]
1700    fn step_should_respect_chunk_size() -> Result<()> {
1701        let mut i = 0;
1702        let mut reader = MockTestItemReader::default();
1703        reader
1704            .expect_read()
1705            .returning(move || mock_read(&mut i, 0, 6));
1706
1707        let mut processor = MockTestProcessor::default();
1708        let mut i = 0;
1709        processor
1710            .expect_process()
1711            .returning(move |_| mock_process(&mut i, &[]));
1712
1713        let mut writer = MockTestItemWriter::default();
1714        writer.expect_open().times(1).returning(|| Ok(()));
1715        writer.expect_write().times(2).returning(|_| Ok(()));
1716        writer.expect_flush().times(2).returning(|| Ok(()));
1717        writer.expect_close().times(1).returning(|| Ok(()));
1718
1719        let step = StepBuilder::new("test")
1720            .chunk(3)
1721            .reader(&reader)
1722            .processor(&processor)
1723            .writer(&writer)
1724            .build();
1725
1726        let mut step_execution = StepExecution::new(&step.name);
1727
1728        let result = step.execute(&mut step_execution);
1729
1730        assert!(result.is_ok());
1731        assert_eq!(step_execution.status, StepStatus::Success);
1732        assert_eq!(step_execution.read_count, 6);
1733        assert_eq!(step_execution.write_count, 6);
1734
1735        Ok(())
1736    }
1737
1738    #[test]
1739    fn step_should_track_error_counts() -> Result<()> {
1740        let mut i = 0;
1741        let mut reader = MockTestItemReader::default();
1742        reader
1743            .expect_read()
1744            .returning(move || mock_read(&mut i, 0, 4));
1745
1746        let mut processor = MockTestProcessor::default();
1747        let mut i = 0;
1748        processor
1749            .expect_process()
1750            .returning(move |_| mock_process(&mut i, &[1, 2]));
1751
1752        let mut writer = MockTestItemWriter::default();
1753        writer.expect_open().times(1).returning(|| Ok(()));
1754        writer.expect_write().times(2).returning(|_| Ok(()));
1755        writer.expect_flush().times(2).returning(|| Ok(()));
1756        writer.expect_close().times(1).returning(|| Ok(()));
1757
1758        let step = StepBuilder::new("test")
1759            .chunk(3)
1760            .reader(&reader)
1761            .processor(&processor)
1762            .writer(&writer)
1763            .skip_limit(2)
1764            .build();
1765
1766        let mut step_execution = StepExecution::new(&step.name);
1767
1768        let result = step.execute(&mut step_execution);
1769
1770        assert!(result.is_ok());
1771        assert_eq!(step_execution.status, StepStatus::Success);
1772        assert_eq!(step_execution.process_error_count, 2);
1773
1774        Ok(())
1775    }
1776
1777    #[test]
1778    fn step_should_measure_execution_time() -> Result<()> {
1779        let mut i = 0;
1780        let mut reader = MockTestItemReader::default();
1781        reader
1782            .expect_read()
1783            .returning(move || mock_read(&mut i, 0, 2));
1784
1785        let mut processor = MockTestProcessor::default();
1786        let mut i = 0;
1787        processor
1788            .expect_process()
1789            .returning(move |_| mock_process(&mut i, &[]));
1790
1791        let mut writer = MockTestItemWriter::default();
1792        writer.expect_open().times(1).returning(|| Ok(()));
1793        writer.expect_write().times(1).returning(|_| Ok(()));
1794        writer.expect_flush().times(1).returning(|| Ok(()));
1795        writer.expect_close().times(1).returning(|| Ok(()));
1796
1797        let step = StepBuilder::new("test")
1798            .chunk(3)
1799            .reader(&reader)
1800            .processor(&processor)
1801            .writer(&writer)
1802            .build();
1803
1804        let mut step_execution = StepExecution::new(&step.name);
1805
1806        let result = step.execute(&mut step_execution);
1807
1808        assert!(result.is_ok());
1809        assert!(step_execution.duration.unwrap().as_nanos() > 0);
1810        assert!(step_execution.start_time.unwrap() <= step_execution.end_time.unwrap());
1811
1812        Ok(())
1813    }
1814
1815    #[test]
1816    fn step_should_handle_empty_chunk_at_end() -> Result<()> {
1817        let mut i = 0;
1818        let mut reader = MockTestItemReader::default();
1819        reader
1820            .expect_read()
1821            .returning(move || mock_read(&mut i, 0, 1));
1822
1823        let mut processor = MockTestProcessor::default();
1824        let mut i = 0;
1825        processor
1826            .expect_process()
1827            .returning(move |_| mock_process(&mut i, &[]));
1828
1829        let mut writer = MockTestItemWriter::default();
1830        writer.expect_open().times(1).returning(|| Ok(()));
1831        writer.expect_write().times(1).returning(|items| {
1832            assert_eq!(items.len(), 1); // Partial chunk with 1 item
1833            Ok(())
1834        });
1835        writer.expect_flush().times(1).returning(|| Ok(()));
1836        writer.expect_close().times(1).returning(|| Ok(()));
1837
1838        let step = StepBuilder::new("test")
1839            .chunk(3)
1840            .reader(&reader)
1841            .processor(&processor)
1842            .writer(&writer)
1843            .build();
1844
1845        let mut step_execution = StepExecution::new(&step.name);
1846
1847        let result = step.execute(&mut step_execution);
1848
1849        assert!(result.is_ok());
1850        assert_eq!(step_execution.status, StepStatus::Success);
1851        assert_eq!(step_execution.read_count, 1);
1852        assert_eq!(step_execution.write_count, 1);
1853
1854        Ok(())
1855    }
1856
1857    #[test]
1858    fn step_execution_should_initialize_correctly() -> Result<()> {
1859        let step_execution = StepExecution::new("test_step");
1860
1861        assert_eq!(step_execution.name, "test_step");
1862        assert_eq!(step_execution.status, StepStatus::Starting);
1863        assert!(step_execution.start_time.is_none());
1864        assert!(step_execution.end_time.is_none());
1865        assert!(step_execution.duration.is_none());
1866        assert_eq!(step_execution.read_count, 0);
1867        assert_eq!(step_execution.write_count, 0);
1868        assert_eq!(step_execution.read_error_count, 0);
1869        assert_eq!(step_execution.process_count, 0);
1870        assert_eq!(step_execution.process_error_count, 0);
1871        assert_eq!(step_execution.write_error_count, 0);
1872        assert!(!step_execution.id.is_nil());
1873
1874        Ok(())
1875    }
1876
1877    #[test]
1878    fn tasklet_step_should_execute_successfully() -> Result<()> {
1879        let mut tasklet = MockTestTasklet::default();
1880        tasklet
1881            .expect_execute()
1882            .times(1)
1883            .returning(|_| Ok(RepeatStatus::Finished));
1884
1885        let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1886
1887        let mut step_execution = StepExecution::new(&step.name);
1888
1889        let result = step.execute(&mut step_execution);
1890
1891        assert!(result.is_ok());
1892        assert_eq!(step.get_name(), "tasklet_test");
1893
1894        Ok(())
1895    }
1896
1897    #[test]
1898    fn tasklet_step_should_handle_tasklet_error() -> Result<()> {
1899        let mut tasklet = MockTestTasklet::default();
1900        tasklet
1901            .expect_execute()
1902            .times(1)
1903            .returning(|_| Err(BatchError::Step("tasklet error".to_string())));
1904
1905        let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1906
1907        let mut step_execution = StepExecution::new(&step.name);
1908
1909        let result = step.execute(&mut step_execution);
1910
1911        // The tasklet step should now properly handle errors
1912        assert!(result.is_err());
1913        if let Err(BatchError::Step(msg)) = result {
1914            assert_eq!(msg, "tasklet error");
1915        } else {
1916            panic!("Expected Step error");
1917        }
1918
1919        Ok(())
1920    }
1921
1922    #[test]
1923    fn tasklet_step_should_handle_continuable_status() -> Result<()> {
1924        use std::cell::Cell;
1925
1926        let call_count = Cell::new(0);
1927        let mut tasklet = MockTestTasklet::default();
1928        tasklet.expect_execute().times(4).returning(move |_| {
1929            let count = call_count.get();
1930            call_count.set(count + 1);
1931            if count < 3 {
1932                Ok(RepeatStatus::Continuable)
1933            } else {
1934                Ok(RepeatStatus::Finished)
1935            }
1936        });
1937
1938        let step = StepBuilder::new("continuable_tasklet_test")
1939            .tasklet(&tasklet)
1940            .build();
1941
1942        let mut step_execution = StepExecution::new(&step.name);
1943
1944        let result = step.execute(&mut step_execution);
1945
1946        assert!(result.is_ok());
1947        assert_eq!(step.get_name(), "continuable_tasklet_test");
1948
1949        Ok(())
1950    }
1951
1952    #[test]
1953    fn tasklet_step_should_handle_multiple_continuable_cycles() -> Result<()> {
1954        use std::cell::Cell;
1955
1956        let call_count = Cell::new(0);
1957        let mut tasklet = MockTestTasklet::default();
1958
1959        // Set up a sequence: 5 Continuable calls -> 1 Finished call
1960        tasklet.expect_execute().times(6).returning(move |_| {
1961            let count = call_count.get();
1962            call_count.set(count + 1);
1963            if count < 5 {
1964                Ok(RepeatStatus::Continuable)
1965            } else {
1966                Ok(RepeatStatus::Finished)
1967            }
1968        });
1969
1970        let step = StepBuilder::new("multi_cycle_tasklet_test")
1971            .tasklet(&tasklet)
1972            .build();
1973
1974        let mut step_execution = StepExecution::new(&step.name);
1975
1976        let result = step.execute(&mut step_execution);
1977
1978        assert!(result.is_ok());
1979        assert_eq!(step.get_name(), "multi_cycle_tasklet_test");
1980
1981        Ok(())
1982    }
1983
1984    #[test]
1985    fn tasklet_step_should_handle_error_after_continuable() -> Result<()> {
1986        use std::cell::Cell;
1987
1988        let call_count = Cell::new(0);
1989        let mut tasklet = MockTestTasklet::default();
1990
1991        // Set up a sequence: 2 Continuable calls -> 1 Error
1992        tasklet.expect_execute().times(3).returning(move |_| {
1993            let count = call_count.get();
1994            call_count.set(count + 1);
1995            if count < 2 {
1996                Ok(RepeatStatus::Continuable)
1997            } else {
1998                Err(BatchError::Step("error after continuable".to_string()))
1999            }
2000        });
2001
2002        let step = StepBuilder::new("error_after_continuable_test")
2003            .tasklet(&tasklet)
2004            .build();
2005
2006        let mut step_execution = StepExecution::new(&step.name);
2007
2008        let result = step.execute(&mut step_execution);
2009
2010        assert!(result.is_err());
2011        if let Err(BatchError::Step(msg)) = result {
2012            assert_eq!(msg, "error after continuable");
2013        } else {
2014            panic!("Expected Step error");
2015        }
2016
2017        Ok(())
2018    }
2019
2020    #[test]
2021    fn tasklet_step_should_handle_immediate_finished_status() -> Result<()> {
2022        let mut tasklet = MockTestTasklet::default();
2023        tasklet
2024            .expect_execute()
2025            .times(1)
2026            .returning(|_| Ok(RepeatStatus::Finished));
2027
2028        let step = StepBuilder::new("immediate_finished_test")
2029            .tasklet(&tasklet)
2030            .build();
2031
2032        let mut step_execution = StepExecution::new(&step.name);
2033
2034        let result = step.execute(&mut step_execution);
2035
2036        assert!(result.is_ok());
2037        assert_eq!(step.get_name(), "immediate_finished_test");
2038
2039        Ok(())
2040    }
2041
2042    #[test]
2043    fn tasklet_step_should_access_step_execution_context() -> Result<()> {
2044        let mut tasklet = MockTestTasklet::default();
2045        tasklet
2046            .expect_execute()
2047            .times(1)
2048            .withf(|step_execution| {
2049                // Verify that the tasklet receives the correct step execution context
2050                step_execution.name == "context_test"
2051                    && step_execution.status == StepStatus::Started
2052            })
2053            .returning(|_| Ok(RepeatStatus::Finished));
2054
2055        let step = StepBuilder::new("context_test").tasklet(&tasklet).build();
2056
2057        let mut step_execution = StepExecution::new(&step.name);
2058
2059        let result = step.execute(&mut step_execution);
2060
2061        assert!(result.is_ok());
2062
2063        Ok(())
2064    }
2065
2066    #[test]
2067    fn tasklet_builder_should_create_valid_tasklet_step() -> Result<()> {
2068        let mut tasklet = MockTestTasklet::default();
2069        tasklet
2070            .expect_execute()
2071            .times(1)
2072            .returning(|_| Ok(RepeatStatus::Finished));
2073
2074        let step = TaskletBuilder::new("builder_test")
2075            .tasklet(&tasklet)
2076            .build();
2077
2078        let mut step_execution = StepExecution::new(&step.name);
2079
2080        let result = step.execute(&mut step_execution);
2081
2082        assert!(result.is_ok());
2083        assert_eq!(step.get_name(), "builder_test");
2084
2085        Ok(())
2086    }
2087
2088    #[test]
2089    fn tasklet_builder_should_panic_without_tasklet() {
2090        let result = std::panic::catch_unwind(|| TaskletBuilder::new("test").build());
2091
2092        assert!(result.is_err());
2093    }
2094
2095    #[test]
2096    fn step_should_handle_writer_open_error() -> Result<()> {
2097        let mut reader = MockTestItemReader::default();
2098        let reader_result = Ok(None);
2099        reader.expect_read().return_once(move || reader_result);
2100
2101        let mut processor = MockTestProcessor::default();
2102        processor.expect_process().never();
2103
2104        let mut writer = MockTestItemWriter::default();
2105        writer
2106            .expect_open()
2107            .times(1)
2108            .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
2109        writer.expect_close().times(1).returning(|| Ok(()));
2110
2111        let step = StepBuilder::new("test")
2112            .chunk(3)
2113            .reader(&reader)
2114            .processor(&processor)
2115            .writer(&writer)
2116            .build();
2117
2118        let mut step_execution = StepExecution::new(&step.name);
2119
2120        let result = step.execute(&mut step_execution);
2121
2122        // The step should still succeed as open errors are managed
2123        assert!(result.is_ok());
2124        assert_eq!(step_execution.status, StepStatus::Success);
2125
2126        Ok(())
2127    }
2128
2129    #[test]
2130    fn step_should_handle_writer_close_error() -> Result<()> {
2131        let mut reader = MockTestItemReader::default();
2132        let reader_result = Ok(None);
2133        reader.expect_read().return_once(move || reader_result);
2134
2135        let mut processor = MockTestProcessor::default();
2136        processor.expect_process().never();
2137
2138        let mut writer = MockTestItemWriter::default();
2139        writer.expect_open().times(1).returning(|| Ok(()));
2140        writer.expect_write().never();
2141        writer
2142            .expect_close()
2143            .times(1)
2144            .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
2145
2146        let step = StepBuilder::new("test")
2147            .chunk(3)
2148            .reader(&reader)
2149            .processor(&processor)
2150            .writer(&writer)
2151            .build();
2152
2153        let mut step_execution = StepExecution::new(&step.name);
2154
2155        let result = step.execute(&mut step_execution);
2156
2157        // The step should still succeed as close errors are managed
2158        assert!(result.is_ok());
2159        assert_eq!(step_execution.status, StepStatus::Success);
2160
2161        Ok(())
2162    }
2163
2164    #[test]
2165    fn step_should_handle_writer_flush_error() -> Result<()> {
2166        let mut i = 0;
2167        let mut reader = MockTestItemReader::default();
2168        reader
2169            .expect_read()
2170            .returning(move || mock_read(&mut i, 0, 2));
2171
2172        let mut processor = MockTestProcessor::default();
2173        let mut i = 0;
2174        processor
2175            .expect_process()
2176            .returning(move |_| mock_process(&mut i, &[]));
2177
2178        let mut writer = MockTestItemWriter::default();
2179        writer.expect_open().times(1).returning(|| Ok(()));
2180        writer.expect_write().times(1).returning(|_| Ok(()));
2181        writer
2182            .expect_flush()
2183            .times(1)
2184            .returning(|| Err(BatchError::ItemWriter("flush error".to_string())));
2185        writer.expect_close().times(1).returning(|| Ok(()));
2186
2187        let step = StepBuilder::new("test")
2188            .chunk(3)
2189            .reader(&reader)
2190            .processor(&processor)
2191            .writer(&writer)
2192            .build();
2193
2194        let mut step_execution = StepExecution::new(&step.name);
2195
2196        let result = step.execute(&mut step_execution);
2197
2198        // The step should still succeed as flush errors are managed
2199        assert!(result.is_ok());
2200        assert_eq!(step_execution.status, StepStatus::Success);
2201
2202        Ok(())
2203    }
2204
2205    #[test]
2206    fn step_should_handle_multiple_chunks_with_exact_chunk_size() -> Result<()> {
2207        let mut i = 0;
2208        let mut reader = MockTestItemReader::default();
2209        reader
2210            .expect_read()
2211            .returning(move || mock_read(&mut i, 0, 6));
2212
2213        let mut processor = MockTestProcessor::default();
2214        let mut i = 0;
2215        processor
2216            .expect_process()
2217            .returning(move |_| mock_process(&mut i, &[]));
2218
2219        let mut writer = MockTestItemWriter::default();
2220        writer.expect_open().times(1).returning(|| Ok(()));
2221        writer.expect_write().times(2).returning(|items| {
2222            assert_eq!(items.len(), 3); // Each chunk should have exactly 3 items
2223            Ok(())
2224        });
2225        writer.expect_flush().times(2).returning(|| Ok(()));
2226        writer.expect_close().times(1).returning(|| Ok(()));
2227
2228        let step = StepBuilder::new("test")
2229            .chunk(3)
2230            .reader(&reader)
2231            .processor(&processor)
2232            .writer(&writer)
2233            .build();
2234
2235        let mut step_execution = StepExecution::new(&step.name);
2236
2237        let result = step.execute(&mut step_execution);
2238
2239        assert!(result.is_ok());
2240        assert_eq!(step_execution.status, StepStatus::Success);
2241        assert_eq!(step_execution.read_count, 6);
2242        assert_eq!(step_execution.write_count, 6);
2243
2244        Ok(())
2245    }
2246
2247    #[test]
2248    fn step_should_handle_skip_limit_boundary() -> Result<()> {
2249        let mut i = 0;
2250        let mut reader = MockTestItemReader::default();
2251        reader
2252            .expect_read()
2253            .returning(move || mock_read(&mut i, 0, 4));
2254
2255        let mut processor = MockTestProcessor::default();
2256        let mut i = 0;
2257        processor
2258            .expect_process()
2259            .returning(move |_| mock_process(&mut i, &[1, 2])); // 2 errors
2260
2261        let mut writer = MockTestItemWriter::default();
2262        writer.expect_open().times(1).returning(|| Ok(()));
2263        writer.expect_write().times(2).returning(|_| Ok(()));
2264        writer.expect_flush().times(2).returning(|| Ok(()));
2265        writer.expect_close().times(1).returning(|| Ok(()));
2266
2267        let step = StepBuilder::new("test")
2268            .chunk(3)
2269            .reader(&reader)
2270            .processor(&processor)
2271            .writer(&writer)
2272            .skip_limit(2) // Exactly at the limit
2273            .build();
2274
2275        let mut step_execution = StepExecution::new(&step.name);
2276
2277        let result = step.execute(&mut step_execution);
2278
2279        assert!(result.is_ok());
2280        assert_eq!(step_execution.status, StepStatus::Success);
2281        assert_eq!(step_execution.process_error_count, 2);
2282
2283        Ok(())
2284    }
2285
2286    #[test]
2287    fn step_should_fail_when_skip_limit_exceeded() -> Result<()> {
2288        let mut i = 0;
2289        let mut reader = MockTestItemReader::default();
2290        reader
2291            .expect_read()
2292            .returning(move || mock_read(&mut i, 0, 4));
2293
2294        let mut processor = MockTestProcessor::default();
2295        let mut i = 0;
2296        processor
2297            .expect_process()
2298            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // 3 errors
2299
2300        let mut writer = MockTestItemWriter::default();
2301        writer.expect_open().times(1).returning(|| Ok(()));
2302        writer.expect_write().never(); // Should not reach write due to error
2303        writer.expect_close().times(1).returning(|| Ok(()));
2304
2305        let step = StepBuilder::new("test")
2306            .chunk(3)
2307            .reader(&reader)
2308            .processor(&processor)
2309            .writer(&writer)
2310            .skip_limit(2) // Exceeded by 1
2311            .build();
2312
2313        let mut step_execution = StepExecution::new(&step.name);
2314
2315        let result = step.execute(&mut step_execution);
2316
2317        assert!(result.is_err());
2318        assert_eq!(step_execution.status, StepStatus::ProcessorError);
2319        assert_eq!(step_execution.process_error_count, 3);
2320
2321        Ok(())
2322    }
2323
2324    #[test]
2325    fn step_should_handle_empty_processed_chunk() -> Result<()> {
2326        let mut i = 0;
2327        let mut reader = MockTestItemReader::default();
2328        reader
2329            .expect_read()
2330            .returning(move || mock_read(&mut i, 0, 3));
2331
2332        let mut processor = MockTestProcessor::default();
2333        let mut i = 0;
2334        processor
2335            .expect_process()
2336            .returning(move |_| mock_process(&mut i, &[1, 2, 3, 4])); // All items fail processing
2337
2338        let mut writer = MockTestItemWriter::default();
2339        writer.expect_open().times(1).returning(|| Ok(()));
2340        writer.expect_write().never(); // Empty chunks are not written
2341        writer.expect_close().times(1).returning(|| Ok(()));
2342
2343        let step = StepBuilder::new("test")
2344            .chunk(3)
2345            .reader(&reader)
2346            .processor(&processor)
2347            .writer(&writer)
2348            .skip_limit(3) // Allow all errors
2349            .build();
2350
2351        let mut step_execution = StepExecution::new(&step.name);
2352
2353        let result = step.execute(&mut step_execution);
2354
2355        assert!(result.is_ok());
2356        assert_eq!(step_execution.status, StepStatus::Success);
2357        assert_eq!(step_execution.process_error_count, 3);
2358        assert_eq!(step_execution.write_count, 0); // No items written
2359
2360        Ok(())
2361    }
2362
2363    #[test]
2364    fn chunk_status_should_be_comparable() {
2365        assert_eq!(ChunkStatus::Finished, ChunkStatus::Finished);
2366        assert_eq!(ChunkStatus::Full, ChunkStatus::Full);
2367        assert_ne!(ChunkStatus::Finished, ChunkStatus::Full);
2368    }
2369
2370    #[test]
2371    fn step_status_should_be_comparable() {
2372        assert_eq!(StepStatus::Success, StepStatus::Success);
2373        assert_eq!(StepStatus::ReadError, StepStatus::ReadError);
2374        assert_eq!(StepStatus::ProcessorError, StepStatus::ProcessorError);
2375        assert_eq!(StepStatus::WriteError, StepStatus::WriteError);
2376        assert_eq!(StepStatus::Starting, StepStatus::Starting);
2377
2378        assert_ne!(StepStatus::Success, StepStatus::ReadError);
2379        assert_ne!(StepStatus::ProcessorError, StepStatus::WriteError);
2380    }
2381
2382    #[test]
2383    fn repeat_status_should_be_comparable() {
2384        assert_eq!(RepeatStatus::Continuable, RepeatStatus::Continuable);
2385        assert_eq!(RepeatStatus::Finished, RepeatStatus::Finished);
2386        assert_ne!(RepeatStatus::Continuable, RepeatStatus::Finished);
2387    }
2388
2389    #[test]
2390    fn step_builder_should_create_chunk_oriented_step() -> Result<()> {
2391        let mut reader = MockTestItemReader::default();
2392        reader.expect_read().return_once(|| Ok(None));
2393
2394        let mut processor = MockTestProcessor::default();
2395        processor.expect_process().never();
2396
2397        let mut writer = MockTestItemWriter::default();
2398        writer.expect_open().times(1).returning(|| Ok(()));
2399        writer.expect_write().never();
2400        writer.expect_close().times(1).returning(|| Ok(()));
2401
2402        let step = StepBuilder::new("builder_test")
2403            .chunk(5)
2404            .reader(&reader)
2405            .processor(&processor)
2406            .writer(&writer)
2407            .skip_limit(10)
2408            .build();
2409
2410        let mut step_execution = StepExecution::new(&step.name);
2411        let result = step.execute(&mut step_execution);
2412
2413        assert!(result.is_ok());
2414        assert_eq!(step.get_name(), "builder_test");
2415
2416        Ok(())
2417    }
2418
2419    #[test]
2420    fn step_should_handle_large_chunk_size() -> Result<()> {
2421        let mut i = 0;
2422        let mut reader = MockTestItemReader::default();
2423        reader
2424            .expect_read()
2425            .returning(move || mock_read(&mut i, 0, 5));
2426
2427        let mut processor = MockTestProcessor::default();
2428        let mut i = 0;
2429        processor
2430            .expect_process()
2431            .returning(move |_| mock_process(&mut i, &[]));
2432
2433        let mut writer = MockTestItemWriter::default();
2434        writer.expect_open().times(1).returning(|| Ok(()));
2435        writer.expect_write().times(1).returning(|items| {
2436            assert_eq!(items.len(), 5); // All items in one chunk
2437            Ok(())
2438        });
2439        writer.expect_flush().times(1).returning(|| Ok(()));
2440        writer.expect_close().times(1).returning(|| Ok(()));
2441
2442        let step = StepBuilder::new("test")
2443            .chunk(100) // Chunk size larger than available items
2444            .reader(&reader)
2445            .processor(&processor)
2446            .writer(&writer)
2447            .build();
2448
2449        let mut step_execution = StepExecution::new(&step.name);
2450
2451        let result = step.execute(&mut step_execution);
2452
2453        assert!(result.is_ok());
2454        assert_eq!(step_execution.status, StepStatus::Success);
2455        assert_eq!(step_execution.read_count, 5);
2456        assert_eq!(step_execution.write_count, 5);
2457
2458        Ok(())
2459    }
2460
2461    #[test]
2462    fn step_should_handle_mixed_errors_within_skip_limit() -> Result<()> {
2463        use std::cell::Cell;
2464
2465        let read_counter = Cell::new(0u16);
2466        let mut reader = MockTestItemReader::default();
2467        reader.expect_read().returning(move || {
2468            let current = read_counter.get();
2469            if current == 2 {
2470                read_counter.set(current + 1);
2471                Err(BatchError::ItemReader("read error".to_string()))
2472            } else {
2473                let mut i = current;
2474                let result = mock_read(&mut i, 0, 6);
2475                read_counter.set(i);
2476                result
2477            }
2478        });
2479
2480        let mut processor = MockTestProcessor::default();
2481        let mut i = 0;
2482        processor
2483            .expect_process()
2484            .returning(move |_| mock_process(&mut i, &[2])); // 1 process error
2485
2486        let mut writer = MockTestItemWriter::default();
2487        writer.expect_open().times(1).returning(|| Ok(()));
2488        writer.expect_write().times(2).returning(|_| Ok(()));
2489        writer.expect_flush().times(2).returning(|| Ok(()));
2490        writer.expect_close().times(1).returning(|| Ok(()));
2491
2492        let step = StepBuilder::new("test")
2493            .chunk(3)
2494            .reader(&reader)
2495            .processor(&processor)
2496            .writer(&writer)
2497            .skip_limit(2) // Allow 1 read error + 1 process error
2498            .build();
2499
2500        let mut step_execution = StepExecution::new(&step.name);
2501
2502        let result = step.execute(&mut step_execution);
2503
2504        assert!(result.is_ok());
2505        assert_eq!(step_execution.status, StepStatus::Success);
2506        assert_eq!(step_execution.read_error_count, 1);
2507        assert_eq!(step_execution.process_error_count, 1);
2508
2509        Ok(())
2510    }
2511
2512    #[test]
2513    fn step_execution_should_be_cloneable() -> Result<()> {
2514        let step_execution = StepExecution::new("test_step");
2515        let cloned_execution = step_execution.clone();
2516
2517        assert_eq!(step_execution.id, cloned_execution.id);
2518        assert_eq!(step_execution.name, cloned_execution.name);
2519        assert_eq!(step_execution.status, cloned_execution.status);
2520        assert_eq!(step_execution.read_count, cloned_execution.read_count);
2521        assert_eq!(step_execution.write_count, cloned_execution.write_count);
2522
2523        Ok(())
2524    }
2525
2526    #[test]
2527    fn step_should_handle_zero_chunk_size() -> Result<()> {
2528        let mut reader = MockTestItemReader::default();
2529        reader.expect_read().return_once(|| Ok(None));
2530
2531        let mut processor = MockTestProcessor::default();
2532        processor.expect_process().never();
2533
2534        let mut writer = MockTestItemWriter::default();
2535        writer.expect_open().times(1).returning(|| Ok(()));
2536        writer.expect_write().never();
2537        writer.expect_close().times(1).returning(|| Ok(()));
2538
2539        // Test with chunk size of 1 (minimum practical value)
2540        let step = StepBuilder::new("test")
2541            .chunk(1)
2542            .reader(&reader)
2543            .processor(&processor)
2544            .writer(&writer)
2545            .build();
2546
2547        let mut step_execution = StepExecution::new(&step.name);
2548
2549        let result = step.execute(&mut step_execution);
2550
2551        assert!(result.is_ok());
2552        assert_eq!(step_execution.status, StepStatus::Success);
2553
2554        Ok(())
2555    }
2556
2557    #[test]
2558    fn step_should_handle_continuous_read_errors_until_skip_limit() -> Result<()> {
2559        use std::cell::Cell;
2560
2561        let counter = Cell::new(0u16);
2562        let mut reader = MockTestItemReader::default();
2563        reader.expect_read().returning(move || {
2564            let current = counter.get();
2565            counter.set(current + 1);
2566            if current < 3 {
2567                Err(BatchError::ItemReader("continuous read error".to_string()))
2568            } else {
2569                Ok(None) // End of data after errors
2570            }
2571        });
2572
2573        let mut processor = MockTestProcessor::default();
2574        processor.expect_process().never();
2575
2576        let mut writer = MockTestItemWriter::default();
2577        writer.expect_open().times(1).returning(|| Ok(()));
2578        writer.expect_write().never();
2579        writer.expect_close().times(1).returning(|| Ok(()));
2580
2581        let step = StepBuilder::new("test")
2582            .chunk(3)
2583            .reader(&reader)
2584            .processor(&processor)
2585            .writer(&writer)
2586            .skip_limit(2) // Should fail after 3 errors (exceeds limit of 2)
2587            .build();
2588
2589        let mut step_execution = StepExecution::new(&step.name);
2590
2591        let result = step.execute(&mut step_execution);
2592
2593        assert!(result.is_err());
2594        assert_eq!(step_execution.status, StepStatus::ReadError);
2595        assert_eq!(step_execution.read_error_count, 3);
2596
2597        Ok(())
2598    }
2599
2600    #[test]
2601    fn step_should_handle_write_error_with_skip_limit() -> Result<()> {
2602        let mut i = 0;
2603        let mut reader = MockTestItemReader::default();
2604        reader
2605            .expect_read()
2606            .returning(move || mock_read(&mut i, 0, 4));
2607
2608        let mut processor = MockTestProcessor::default();
2609        let mut i = 0;
2610        processor
2611            .expect_process()
2612            .returning(move |_| mock_process(&mut i, &[]));
2613
2614        let mut writer = MockTestItemWriter::default();
2615        writer.expect_open().times(1).returning(|| Ok(()));
2616        writer
2617            .expect_write()
2618            .times(1)
2619            .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2620        writer.expect_close().times(1).returning(|| Ok(()));
2621
2622        let step = StepBuilder::new("test")
2623            .chunk(3)
2624            .reader(&reader)
2625            .processor(&processor)
2626            .writer(&writer)
2627            .skip_limit(0) // No tolerance for errors
2628            .build();
2629
2630        let mut step_execution = StepExecution::new(&step.name);
2631
2632        let result = step.execute(&mut step_execution);
2633
2634        assert!(result.is_err());
2635        assert_eq!(step_execution.status, StepStatus::WriteError);
2636        assert_eq!(step_execution.write_error_count, 3); // All items in chunk failed
2637
2638        Ok(())
2639    }
2640
2641    #[test]
2642    fn step_should_succeed_when_write_error_within_skip_limit() -> Result<()> {
2643        let mut i = 0;
2644        let mut reader = MockTestItemReader::default();
2645        reader
2646            .expect_read()
2647            .returning(move || mock_read(&mut i, 0, 3));
2648
2649        let mut processor = MockTestProcessor::default();
2650        let mut i = 0;
2651        processor
2652            .expect_process()
2653            .returning(move |_| mock_process(&mut i, &[]));
2654
2655        let mut writer = MockTestItemWriter::default();
2656        writer.expect_open().times(1).returning(|| Ok(()));
2657        writer
2658            .expect_write()
2659            .times(1)
2660            .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2661        writer.expect_close().times(1).returning(|| Ok(()));
2662
2663        let step = StepBuilder::new("test")
2664            .chunk(3)
2665            .reader(&reader)
2666            .processor(&processor)
2667            .writer(&writer)
2668            .skip_limit(3) // Exactly at limit: 3 write errors <= skip_limit(3), step continues
2669            .build();
2670
2671        let mut step_execution = StepExecution::new(&step.name);
2672
2673        let result = step.execute(&mut step_execution);
2674
2675        assert!(result.is_ok());
2676        assert_eq!(step_execution.status, StepStatus::Success);
2677        assert_eq!(step_execution.write_error_count, 3);
2678        assert_eq!(step_execution.write_count, 0); // No successful writes
2679
2680        Ok(())
2681    }
2682
2683    #[test]
2684    fn step_should_handle_partial_chunk_at_end() -> Result<()> {
2685        let mut i = 0;
2686        let mut reader = MockTestItemReader::default();
2687        reader
2688            .expect_read()
2689            .returning(move || mock_read(&mut i, 0, 2)); // Only 2 items, chunk size is 3
2690
2691        let mut processor = MockTestProcessor::default();
2692        let mut i = 0;
2693        processor
2694            .expect_process()
2695            .returning(move |_| mock_process(&mut i, &[]));
2696
2697        let mut writer = MockTestItemWriter::default();
2698        writer.expect_open().times(1).returning(|| Ok(()));
2699        writer.expect_write().times(1).returning(|items| {
2700            assert_eq!(items.len(), 2); // Partial chunk with 2 items
2701            Ok(())
2702        });
2703        writer.expect_flush().times(1).returning(|| Ok(()));
2704        writer.expect_close().times(1).returning(|| Ok(()));
2705
2706        let step = StepBuilder::new("test")
2707            .chunk(3)
2708            .reader(&reader)
2709            .processor(&processor)
2710            .writer(&writer)
2711            .build();
2712
2713        let mut step_execution = StepExecution::new(&step.name);
2714
2715        let result = step.execute(&mut step_execution);
2716
2717        assert!(result.is_ok());
2718        assert_eq!(step_execution.status, StepStatus::Success);
2719        assert_eq!(step_execution.read_count, 2);
2720        assert_eq!(step_execution.write_count, 2);
2721
2722        Ok(())
2723    }
2724
2725    #[test]
2726    fn batch_status_should_have_all_variants() {
2727        // Test that all BatchStatus variants exist and can be created
2728        let _completed = BatchStatus::COMPLETED;
2729        let _starting = BatchStatus::STARTING;
2730        let _started = BatchStatus::STARTED;
2731        let _stopping = BatchStatus::STOPPING;
2732        let _stopped = BatchStatus::STOPPED;
2733        let _failed = BatchStatus::FAILED;
2734        let _abandoned = BatchStatus::ABANDONED;
2735        let _unknown = BatchStatus::UNKNOWN;
2736    }
2737
2738    #[test]
2739    fn tasklet_builder_should_require_tasklet() {
2740        let mut tasklet = MockTestTasklet::default();
2741        tasklet.expect_execute().never();
2742
2743        // This test documents that the builder panics if tasklet is not set
2744        // In a real scenario, this would be caught at compile time or runtime
2745        let builder = TaskletBuilder::new("test").tasklet(&tasklet);
2746        let _step = builder.build(); // Should not panic with tasklet set
2747    }
2748
2749    #[test]
2750    fn chunk_oriented_step_builder_should_require_all_components() -> Result<()> {
2751        let mut reader = MockTestItemReader::default();
2752        reader.expect_read().return_once(|| Ok(None));
2753
2754        let mut processor = MockTestProcessor::default();
2755        processor.expect_process().never();
2756
2757        let mut writer = MockTestItemWriter::default();
2758        writer.expect_open().times(1).returning(|| Ok(()));
2759        writer.expect_write().never();
2760        writer.expect_close().times(1).returning(|| Ok(()));
2761
2762        // Test that builder works with all required components
2763        let step = ChunkOrientedStepBuilder::new("test")
2764            .reader(&reader)
2765            .processor(&processor)
2766            .writer(&writer)
2767            .chunk_size(10)
2768            .skip_limit(5)
2769            .build();
2770
2771        let mut step_execution = StepExecution::new(&step.name);
2772        let result = step.execute(&mut step_execution);
2773
2774        assert!(result.is_ok());
2775        assert_eq!(step.get_name(), "test");
2776
2777        Ok(())
2778    }
2779
2780    #[test]
2781    fn step_should_handle_maximum_skip_limit() -> Result<()> {
2782        let mut i = 0;
2783        let mut reader = MockTestItemReader::default();
2784        reader
2785            .expect_read()
2786            .returning(move || mock_read(&mut i, 0, 3)); // Only 3 items to match chunk size
2787
2788        let mut processor = MockTestProcessor::default();
2789        let mut i = 0;
2790        processor
2791            .expect_process()
2792            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // All items fail
2793
2794        let mut writer = MockTestItemWriter::default();
2795        writer.expect_open().times(1).returning(|| Ok(()));
2796        writer.expect_write().never(); // No items to write since all fail processing
2797        writer.expect_close().times(1).returning(|| Ok(()));
2798
2799        let step = StepBuilder::new("test")
2800            .chunk(3)
2801            .reader(&reader)
2802            .processor(&processor)
2803            .writer(&writer)
2804            .skip_limit(u16::MAX) // Maximum skip limit
2805            .build();
2806
2807        let mut step_execution = StepExecution::new(&step.name);
2808
2809        let result = step.execute(&mut step_execution);
2810
2811        assert!(result.is_ok());
2812        assert_eq!(step_execution.status, StepStatus::Success);
2813        assert_eq!(step_execution.process_error_count, 3);
2814
2815        Ok(())
2816    }
2817
2818    #[test]
2819    fn step_should_handle_tasklet_step_timing() -> Result<()> {
2820        let mut tasklet = MockTestTasklet::default();
2821        tasklet
2822            .expect_execute()
2823            .times(1)
2824            .returning(|_| Ok(RepeatStatus::Finished));
2825
2826        let step = StepBuilder::new("timing_test").tasklet(&tasklet).build();
2827
2828        let mut step_execution = StepExecution::new(&step.name);
2829
2830        let result = step.execute(&mut step_execution);
2831
2832        assert!(result.is_ok());
2833        assert!(step_execution.start_time.is_some());
2834        assert!(step_execution.end_time.is_some());
2835        assert!(step_execution.duration.is_some());
2836        assert!(step_execution.duration.unwrap().as_nanos() > 0);
2837
2838        Ok(())
2839    }
2840
2841    #[test]
2842    fn step_should_handle_tasklet_step_status_transitions() -> Result<()> {
2843        let mut tasklet = MockTestTasklet::default();
2844        tasklet
2845            .expect_execute()
2846            .times(1)
2847            .returning(|step_execution| {
2848                // Verify the step status is Started when tasklet is called
2849                assert_eq!(step_execution.status, StepStatus::Started);
2850                Ok(RepeatStatus::Finished)
2851            });
2852
2853        let step = StepBuilder::new("status_test").tasklet(&tasklet).build();
2854
2855        let mut step_execution = StepExecution::new(&step.name);
2856        assert_eq!(step_execution.status, StepStatus::Starting);
2857
2858        let result = step.execute(&mut step_execution);
2859
2860        assert!(result.is_ok());
2861        assert_eq!(step_execution.status, StepStatus::Success);
2862
2863        Ok(())
2864    }
2865
2866    #[test]
2867    fn step_should_handle_tasklet_step_failed_status() -> Result<()> {
2868        let mut tasklet = MockTestTasklet::default();
2869        tasklet
2870            .expect_execute()
2871            .times(1)
2872            .returning(|_| Err(BatchError::Step("tasklet failure".to_string())));
2873
2874        let step = StepBuilder::new("failed_test").tasklet(&tasklet).build();
2875
2876        let mut step_execution = StepExecution::new(&step.name);
2877
2878        let result = step.execute(&mut step_execution);
2879
2880        assert!(result.is_err());
2881        assert_eq!(step_execution.status, StepStatus::Failed);
2882        assert!(step_execution.end_time.is_some());
2883        assert!(step_execution.duration.is_some());
2884
2885        Ok(())
2886    }
2887
2888    #[test]
2889    fn chunk_oriented_step_builder_should_panic_without_reader() {
2890        let mut processor = MockTestProcessor::default();
2891        processor.expect_process().never();
2892
2893        let mut writer = MockTestItemWriter::default();
2894        writer.expect_open().never();
2895
2896        let result = std::panic::catch_unwind(|| {
2897            ChunkOrientedStepBuilder::new("test")
2898                .processor(&processor)
2899                .writer(&writer)
2900                .build()
2901        });
2902
2903        assert!(result.is_err());
2904    }
2905
2906    #[test]
2907    fn chunk_oriented_step_builder_should_panic_without_processor() {
2908        let mut reader = MockTestItemReader::default();
2909        reader.expect_read().never();
2910
2911        let mut writer = MockTestItemWriter::default();
2912        writer.expect_open().never();
2913
2914        let result = std::panic::catch_unwind(|| {
2915            ChunkOrientedStepBuilder::new("test")
2916                .reader(&reader)
2917                .writer(&writer)
2918                .build()
2919        });
2920
2921        assert!(result.is_err());
2922    }
2923
2924    #[test]
2925    fn chunk_oriented_step_builder_should_panic_without_writer() {
2926        let mut reader = MockTestItemReader::default();
2927        reader.expect_read().never();
2928
2929        let mut processor = MockTestProcessor::default();
2930        processor.expect_process().never();
2931
2932        let result = std::panic::catch_unwind(|| {
2933            ChunkOrientedStepBuilder::new("test")
2934                .reader(&reader)
2935                .processor(&processor)
2936                .build()
2937        });
2938
2939        assert!(result.is_err());
2940    }
2941
2942    #[test]
2943    fn step_should_handle_read_chunk_with_full_chunk() -> Result<()> {
2944        let mut i = 0;
2945        let mut reader = MockTestItemReader::default();
2946        reader
2947            .expect_read()
2948            .returning(move || mock_read(&mut i, 0, 4)); // 4 items total
2949
2950        let mut processor = MockTestProcessor::default();
2951        let mut i = 0;
2952        processor
2953            .expect_process()
2954            .returning(move |_| mock_process(&mut i, &[]));
2955
2956        let mut writer = MockTestItemWriter::default();
2957        writer.expect_open().times(1).returning(|| Ok(()));
2958        writer.expect_write().times(2).returning(|items| {
2959            // First chunk has 3 items, second chunk has 1 item
2960            assert!(items.len() <= 3);
2961            Ok(())
2962        });
2963        writer.expect_flush().times(2).returning(|| Ok(()));
2964        writer.expect_close().times(1).returning(|| Ok(()));
2965
2966        let step = StepBuilder::new("test")
2967            .chunk(3)
2968            .reader(&reader)
2969            .processor(&processor)
2970            .writer(&writer)
2971            .build();
2972
2973        let mut step_execution = StepExecution::new(&step.name);
2974
2975        let result = step.execute(&mut step_execution);
2976
2977        assert!(result.is_ok());
2978        assert_eq!(step_execution.status, StepStatus::Success);
2979        assert_eq!(step_execution.read_count, 4);
2980        assert_eq!(step_execution.write_count, 4);
2981
2982        Ok(())
2983    }
2984
2985    #[test]
2986    fn step_should_handle_process_chunk_with_all_errors() -> Result<()> {
2987        let mut i = 0;
2988        let mut reader = MockTestItemReader::default();
2989        reader
2990            .expect_read()
2991            .returning(move || mock_read(&mut i, 0, 3));
2992
2993        let mut processor = MockTestProcessor::default();
2994        let mut i = 0;
2995        processor
2996            .expect_process()
2997            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // All items fail
2998
2999        let mut writer = MockTestItemWriter::default();
3000        writer.expect_open().times(1).returning(|| Ok(()));
3001        writer.expect_write().never(); // No items to write since all fail processing
3002        writer.expect_close().times(1).returning(|| Ok(()));
3003
3004        let step = StepBuilder::new("test")
3005            .chunk(3)
3006            .reader(&reader)
3007            .processor(&processor)
3008            .writer(&writer)
3009            .skip_limit(5) // Allow all errors
3010            .build();
3011
3012        let mut step_execution = StepExecution::new(&step.name);
3013
3014        let result = step.execute(&mut step_execution);
3015
3016        assert!(result.is_ok());
3017        assert_eq!(step_execution.status, StepStatus::Success);
3018        assert_eq!(step_execution.process_error_count, 3);
3019        assert_eq!(step_execution.write_count, 0);
3020
3021        Ok(())
3022    }
3023
3024    #[test]
3025    fn step_should_handle_write_chunk_with_empty_items() -> Result<()> {
3026        let mut reader = MockTestItemReader::default();
3027        reader.expect_read().return_once(|| Ok(None));
3028
3029        let mut processor = MockTestProcessor::default();
3030        processor.expect_process().never();
3031
3032        let mut writer = MockTestItemWriter::default();
3033        writer.expect_open().times(1).returning(|| Ok(()));
3034        writer.expect_write().never(); // No items to write
3035        writer.expect_close().times(1).returning(|| Ok(()));
3036
3037        let step = StepBuilder::new("test")
3038            .chunk(3)
3039            .reader(&reader)
3040            .processor(&processor)
3041            .writer(&writer)
3042            .build();
3043
3044        let mut step_execution = StepExecution::new(&step.name);
3045
3046        let result = step.execute(&mut step_execution);
3047
3048        assert!(result.is_ok());
3049        assert_eq!(step_execution.status, StepStatus::Success);
3050        assert_eq!(step_execution.read_count, 0);
3051        assert_eq!(step_execution.write_count, 0);
3052
3053        Ok(())
3054    }
3055
3056    #[test]
3057    fn step_should_handle_is_skip_limit_reached_boundary_conditions() -> Result<()> {
3058        let mut i = 0;
3059        let mut reader = MockTestItemReader::default();
3060        reader
3061            .expect_read()
3062            .returning(move || mock_read(&mut i, 0, 4));
3063
3064        let mut processor = MockTestProcessor::default();
3065        let mut i = 0;
3066        processor
3067            .expect_process()
3068            .returning(move |_| mock_process(&mut i, &[1, 2])); // 2 errors
3069
3070        let mut writer = MockTestItemWriter::default();
3071        writer.expect_open().times(1).returning(|| Ok(()));
3072        writer.expect_write().times(2).returning(|_| Ok(()));
3073        writer.expect_flush().times(2).returning(|| Ok(()));
3074        writer.expect_close().times(1).returning(|| Ok(()));
3075
3076        let step = StepBuilder::new("test")
3077            .chunk(3)
3078            .reader(&reader)
3079            .processor(&processor)
3080            .writer(&writer)
3081            .skip_limit(2) // Exactly at the limit
3082            .build();
3083
3084        let mut step_execution = StepExecution::new(&step.name);
3085
3086        let result = step.execute(&mut step_execution);
3087
3088        assert!(result.is_ok());
3089        assert_eq!(step_execution.status, StepStatus::Success);
3090        assert_eq!(step_execution.process_error_count, 2);
3091
3092        Ok(())
3093    }
3094
3095    #[test]
3096    fn step_should_handle_manage_error_with_various_errors() -> Result<()> {
3097        let mut reader = MockTestItemReader::default();
3098        reader.expect_read().return_once(|| Ok(None));
3099
3100        let mut processor = MockTestProcessor::default();
3101        processor.expect_process().never();
3102
3103        let mut writer = MockTestItemWriter::default();
3104        writer
3105            .expect_open()
3106            .times(1)
3107            .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
3108        writer.expect_write().never();
3109        writer
3110            .expect_close()
3111            .times(1)
3112            .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
3113
3114        let step = StepBuilder::new("test")
3115            .chunk(3)
3116            .reader(&reader)
3117            .processor(&processor)
3118            .writer(&writer)
3119            .build();
3120
3121        let mut step_execution = StepExecution::new(&step.name);
3122
3123        let result = step.execute(&mut step_execution);
3124
3125        // Should still succeed as open/close errors are managed
3126        assert!(result.is_ok());
3127        assert_eq!(step_execution.status, StepStatus::Success);
3128
3129        Ok(())
3130    }
3131
3132    #[test]
3133    fn step_execution_should_have_unique_ids() -> Result<()> {
3134        let step_execution1 = StepExecution::new("test1");
3135        let step_execution2 = StepExecution::new("test2");
3136
3137        assert_ne!(step_execution1.id, step_execution2.id);
3138
3139        Ok(())
3140    }
3141
3142    #[test]
3143    fn step_execution_should_clone_with_same_values() -> Result<()> {
3144        let mut step_execution = StepExecution::new("test_step");
3145        step_execution.read_count = 10;
3146        step_execution.write_count = 8;
3147        step_execution.status = StepStatus::Success;
3148
3149        let cloned_execution = step_execution.clone();
3150
3151        assert_eq!(step_execution.id, cloned_execution.id);
3152        assert_eq!(step_execution.name, cloned_execution.name);
3153        assert_eq!(step_execution.status, cloned_execution.status);
3154        assert_eq!(step_execution.read_count, cloned_execution.read_count);
3155        assert_eq!(step_execution.write_count, cloned_execution.write_count);
3156
3157        Ok(())
3158    }
3159
3160    #[test]
3161    fn step_status_should_support_copy_trait() {
3162        let status1 = StepStatus::Success;
3163        let status2 = status1; // This should work because StepStatus implements Copy
3164
3165        assert_eq!(status1, status2);
3166        assert_eq!(status1, StepStatus::Success); // Original should still be usable
3167    }
3168
3169    #[test]
3170    fn step_status_should_support_debug_trait() {
3171        let status = StepStatus::ProcessorError;
3172        let debug_string = format!("{:?}", status);
3173
3174        assert!(debug_string.contains("ProcessorError"));
3175    }
3176
3177    #[test]
3178    fn chunk_status_should_support_debug_trait() {
3179        let status = ChunkStatus::Full;
3180        let debug_string = format!("{:?}", status);
3181
3182        assert!(debug_string.contains("Full"));
3183    }
3184
3185    #[test]
3186    fn repeat_status_should_support_debug_trait() {
3187        let status = RepeatStatus::Continuable;
3188        let debug_string = format!("{:?}", status);
3189
3190        assert!(debug_string.contains("Continuable"));
3191    }
3192}