Skip to main content

spring_batch_rs/core/
step.rs

1//! # Step Module
2//!
3//! This module provides the core step execution functionality for the Spring Batch framework.
4//! A step represents a single phase of a batch job that processes data in chunks or executes
5//! a single task (tasklet).
6//!
7//! ## Overview
8//!
9//! The step module supports two main execution patterns:
10//!
11//! ### Chunk-Oriented Processing
12//! Processes data in configurable chunks using the read-process-write pattern:
13//! - **Reader**: Reads items from a data source
14//! - **Processor**: Transforms items (optional)
15//! - **Writer**: Writes processed items to a destination
16//!
17//! ### Tasklet Processing
18//! Executes a single task or operation that doesn't follow the chunk pattern.
19//!
20//! ## Key Features
21//!
22//! - **Error Handling**: Configurable skip limits for fault tolerance
23//! - **Metrics Tracking**: Comprehensive execution statistics
24//! - **Lifecycle Management**: Proper resource management with open/close operations
25//! - **Builder Pattern**: Fluent API for step configuration
26//!
27//! ## Examples
28//!
29//! ### Basic Chunk-Oriented Step
30//!
31//! ```rust
32//! use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
33//! use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
34//! use spring_batch_rs::BatchError;
35//!
36//! // Implement your reader, processor, and writer
37//! # struct MyReader;
38//! # impl ItemReader<String> for MyReader {
39//! #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
40//! # }
41//! # struct MyProcessor;
42//! # impl ItemProcessor<String, String> for MyProcessor {
43//! #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.clone())) }
44//! # }
45//! # struct MyWriter;
46//! # impl ItemWriter<String> for MyWriter {
47//! #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
48//! #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
49//! #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
50//! #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
51//! # }
52//!
53//! let reader = MyReader;
54//! let processor = MyProcessor;
55//! let writer = MyWriter;
56//!
57//! let step = StepBuilder::new("my-step")
58//!     .chunk(100)                    // Process 100 items per chunk
59//!     .reader(&reader)
60//!     .processor(&processor)
61//!     .writer(&writer)
62//!     .skip_limit(10)               // Allow up to 10 errors
63//!     .build();
64//!
65//! let mut step_execution = StepExecution::new(step.get_name());
66//! let result = step.execute(&mut step_execution);
67//! ```
68//!
69//! ### Tasklet Step
70//!
71//! ```rust
72//! use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Step, Tasklet};
73//! use spring_batch_rs::BatchError;
74//!
75//! # struct MyTasklet;
76//! # impl Tasklet for MyTasklet {
77//! #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
78//! #         Ok(RepeatStatus::Finished)
79//! #     }
80//! # }
81//!
82//! let tasklet = MyTasklet;
83//!
84//! let step = StepBuilder::new("my-tasklet-step")
85//!     .tasklet(&tasklet)
86//!     .build();
87//!
88//! let mut step_execution = StepExecution::new(step.get_name());
89//! let result = step.execute(&mut step_execution);
90//! ```
91
92use crate::BatchError;
93use log::{debug, error, info, warn};
94use std::time::{Duration, Instant};
95use uuid::Uuid;
96
97use super::item::{ItemProcessor, ItemReader, ItemWriter};
98
99/// A tasklet represents a single task or operation that can be executed as part of a step.
100///
101/// Tasklets are useful for operations that don't fit the chunk-oriented processing model,
102/// such as file operations, database maintenance, or custom business logic.
103///
104/// # Examples
105///
106/// ```rust
107/// use spring_batch_rs::core::step::{StepExecution, RepeatStatus};
108/// use spring_batch_rs::BatchError;
109///
110/// use spring_batch_rs::core::step::Tasklet;
111///
112/// struct FileCleanupTasklet {
113///     directory: String,
114/// }
115///
116/// impl Tasklet for FileCleanupTasklet {
117///     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
118///         // Perform file cleanup logic here
119///         println!("Cleaning up directory: {}", self.directory);
120///         Ok(RepeatStatus::Finished)
121///     }
122/// }
123/// ```
124pub trait Tasklet {
125    /// Executes the tasklet operation.
126    ///
127    /// # Parameters
128    /// - `step_execution`: The current step execution context for accessing metrics and state
129    ///
130    /// # Returns
131    /// - `Ok(RepeatStatus)`: The tasklet completed successfully
132    /// - `Err(BatchError)`: An error occurred during execution
133    fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
134}
135
136/// A step implementation that executes a single tasklet.
137///
138/// TaskletStep is used for operations that don't follow the chunk-oriented processing pattern.
139/// It executes a single tasklet and manages the step lifecycle.
140///
141/// # Examples
142///
143/// ```rust
144/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
145/// use spring_batch_rs::BatchError;
146///
147/// # struct MyTasklet;
148/// # impl Tasklet for MyTasklet {
149/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
150/// #         Ok(RepeatStatus::Finished)
151/// #     }
152/// # }
153/// let tasklet = MyTasklet;
154/// let step = StepBuilder::new("tasklet-step")
155///     .tasklet(&tasklet)
156///     .build();
157/// ```
158pub struct TaskletStep<'a> {
159    name: String,
160    tasklet: &'a dyn Tasklet,
161}
162
163impl Step for TaskletStep<'_> {
164    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
165        step_execution.status = StepStatus::Started;
166        let start_time = Instant::now();
167
168        info!(
169            "Start of step: {}, id: {}",
170            step_execution.name, step_execution.id
171        );
172
173        loop {
174            let result = self.tasklet.execute(step_execution);
175            match result {
176                Ok(RepeatStatus::Continuable) => {}
177                Ok(RepeatStatus::Finished) => {
178                    step_execution.status = StepStatus::Success;
179                    break;
180                }
181                Err(e) => {
182                    error!(
183                        "Error in step: {}, id: {}, error: {}",
184                        step_execution.name, step_execution.id, e
185                    );
186                    step_execution.status = StepStatus::Failed;
187                    step_execution.end_time = Some(Instant::now());
188                    step_execution.duration = Some(start_time.elapsed());
189                    return Err(e);
190                }
191            }
192        }
193
194        // Calculate the step execution details
195        step_execution.start_time = Some(start_time);
196        step_execution.end_time = Some(Instant::now());
197        step_execution.duration = Some(start_time.elapsed());
198
199        Ok(())
200    }
201
202    fn get_name(&self) -> &str {
203        &self.name
204    }
205}
206
207/// Builder for creating TaskletStep instances.
208///
209/// Provides a fluent API for configuring tasklet steps with validation
210/// to ensure all required components are provided.
211///
212/// # Examples
213///
214/// ```rust
215/// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
216/// use spring_batch_rs::BatchError;
217///
218/// # struct MyTasklet;
219/// # impl Tasklet for MyTasklet {
220/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
221/// #         Ok(RepeatStatus::Finished)
222/// #     }
223/// # }
224///
225/// let tasklet = MyTasklet;
226/// let builder = TaskletBuilder::new("my-tasklet")
227///     .tasklet(&tasklet);
228/// let step = builder.build();
229/// ```
230pub struct TaskletBuilder<'a> {
231    name: String,
232    tasklet: Option<&'a dyn Tasklet>,
233}
234
235impl<'a> TaskletBuilder<'a> {
236    /// Creates a new TaskletBuilder with the specified name.
237    ///
238    /// # Parameters
239    /// - `name`: Human-readable name for the step
240    ///
241    /// # Examples
242    ///
243    /// ```rust
244    /// use spring_batch_rs::core::step::TaskletBuilder;
245    ///
246    /// let builder = TaskletBuilder::new("file-cleanup-step");
247    /// ```
248    pub fn new(name: &str) -> Self {
249        Self {
250            name: name.to_string(),
251            tasklet: None,
252        }
253    }
254
255    /// Sets the tasklet to be executed by this step.
256    ///
257    /// # Parameters
258    /// - `tasklet`: The tasklet implementation to execute
259    ///
260    /// # Examples
261    ///
262    /// ```rust
263    /// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
264    /// use spring_batch_rs::BatchError;
265    ///
266    /// # struct MyTasklet;
267    /// # impl Tasklet for MyTasklet {
268    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
269    /// #         Ok(RepeatStatus::Finished)
270    /// #     }
271    /// # }
272    ///
273    /// let tasklet = MyTasklet;
274    /// let builder = TaskletBuilder::new("my-step")
275    ///     .tasklet(&tasklet);
276    /// ```
277    pub fn tasklet(mut self, tasklet: &'a dyn Tasklet) -> Self {
278        self.tasklet = Some(tasklet);
279        self
280    }
281
282    /// Builds the TaskletStep instance.
283    ///
284    /// # Panics
285    /// Panics if no tasklet has been set using the `tasklet()` method.
286    ///
287    /// # Examples
288    ///
289    /// ```rust
290    /// use spring_batch_rs::core::step::{TaskletBuilder, Tasklet, RepeatStatus, StepExecution};
291    /// use spring_batch_rs::BatchError;
292    ///
293    /// # struct MyTasklet;
294    /// # impl Tasklet for MyTasklet {
295    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
296    /// #         Ok(RepeatStatus::Finished)
297    /// #     }
298    /// # }
299    ///
300    /// let tasklet = MyTasklet;
301    /// let step = TaskletBuilder::new("my-step")
302    ///     .tasklet(&tasklet)
303    ///     .build();
304    /// ```
305    pub fn build(self) -> TaskletStep<'a> {
306        TaskletStep {
307            name: self.name,
308            tasklet: self
309                .tasklet
310                .expect("Tasklet is required for building a step"),
311        }
312    }
313}
314
315/// Represents the execution context and metrics for a step.
316///
317/// StepExecution tracks all relevant information about a step's execution,
318/// including timing, item counts, error counts, and current status.
319///
320/// # Examples
321///
322/// ```rust
323/// use spring_batch_rs::core::step::{StepExecution, StepStatus};
324///
325/// let mut step_execution = StepExecution::new("data-processing-step");
326/// assert_eq!(step_execution.status, StepStatus::Starting);
327/// assert_eq!(step_execution.read_count, 0);
328/// assert_eq!(step_execution.write_count, 0);
329/// assert_eq!(step_execution.filter_count, 0);
330/// ```
331#[derive(Clone)]
332pub struct StepExecution {
333    /// Unique identifier for this step instance
334    pub id: Uuid,
335    /// Human-readable name for the step
336    pub name: String,
337    /// Current status of the step execution
338    pub status: StepStatus,
339    /// Timestamp when the step started execution
340    pub start_time: Option<Instant>,
341    /// Timestamp when the step completed execution
342    pub end_time: Option<Instant>,
343    /// Total duration of step execution
344    pub duration: Option<Duration>,
345    /// Number of items successfully read from the source
346    pub read_count: usize,
347    /// Number of items successfully written to the destination
348    pub write_count: usize,
349    /// Number of errors encountered during reading
350    pub read_error_count: usize,
351    /// Number of items successfully processed and passed to the writer (excludes filtered items)
352    pub process_count: usize,
353    /// Number of errors encountered during processing
354    pub process_error_count: usize,
355    /// Number of items filtered by the processor (processor returned Ok(None))
356    pub filter_count: usize,
357    /// Number of errors encountered during writing
358    pub write_error_count: usize,
359}
360
361impl StepExecution {
362    /// Creates a new StepExecution with the specified name.
363    ///
364    /// Initializes all counters to zero and sets the status to `Starting`.
365    /// A unique UUID is generated for this execution instance.
366    ///
367    /// # Parameters
368    /// - `name`: Human-readable name for the step
369    ///
370    /// # Examples
371    ///
372    /// ```rust
373    /// use spring_batch_rs::core::step::{StepExecution, StepStatus};
374    ///
375    /// let step_execution = StepExecution::new("my-step");
376    /// assert_eq!(step_execution.name, "my-step");
377    /// assert_eq!(step_execution.status, StepStatus::Starting);
378    /// assert!(!step_execution.id.is_nil());
379    /// ```
380    pub fn new(name: &str) -> Self {
381        Self {
382            id: Uuid::new_v4(),
383            name: name.to_string(),
384            status: StepStatus::Starting,
385            start_time: None,
386            end_time: None,
387            duration: None,
388            read_count: 0,
389            write_count: 0,
390            read_error_count: 0,
391            process_count: 0,
392            process_error_count: 0,
393            filter_count: 0,
394            write_error_count: 0,
395        }
396    }
397}
398
399/// Represents the overall status of a batch job.
400///
401/// This enum defines all possible states that a batch job can be in
402/// during its lifecycle, from initialization to completion.
403///
404/// # Examples
405///
406/// ```rust
407/// use spring_batch_rs::core::step::BatchStatus;
408///
409/// let status = BatchStatus::COMPLETED;
410/// match status {
411///     BatchStatus::COMPLETED => println!("Job finished successfully"),
412///     BatchStatus::FAILED => println!("Job failed"),
413///     _ => println!("Job in progress or other state"),
414/// }
415/// ```
416pub enum BatchStatus {
417    /// The batch job has successfully completed its execution.
418    COMPLETED,
419    /// Status of a batch job prior to its execution.
420    STARTING,
421    /// Status of a batch job that is running.
422    STARTED,
423    /// Status of batch job waiting for a step to complete before stopping the batch job.
424    STOPPING,
425    /// Status of a batch job that has been stopped by request.
426    STOPPED,
427    /// Status of a batch job that has failed during its execution.
428    FAILED,
429    /// Status of a batch job that did not stop properly and can not be restarted.
430    ABANDONED,
431    /// Status of a batch job that is in an uncertain state.
432    UNKNOWN,
433}
434
435/// Core trait that defines the contract for step execution.
436///
437/// All step implementations must provide execution logic and a name.
438/// The step is responsible for coordinating the processing of data
439/// and managing its own lifecycle.
440///
441/// # Examples
442///
443/// ```rust
444/// use spring_batch_rs::core::step::{Step, StepExecution};
445/// use spring_batch_rs::BatchError;
446///
447/// struct CustomStep {
448///     name: String,
449/// }
450///
451/// impl Step for CustomStep {
452///     fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
453///         // Custom step logic here
454///         Ok(())
455///     }
456///
457///     fn get_name(&self) -> &str {
458///         &self.name
459///     }
460/// }
461/// ```
462pub trait Step {
463    /// Executes the step.
464    ///
465    /// This method represents the main operation of the step. It coordinates
466    /// reading items, processing them, and writing them out for chunk-oriented
467    /// steps, or executes a single task for tasklet steps.
468    ///
469    /// # Parameters
470    /// - `step_execution`: Mutable reference to track execution state and metrics
471    ///
472    /// # Returns
473    /// - `Ok(())`: The step completed successfully
474    /// - `Err(BatchError)`: The step failed due to an error
475    ///
476    /// # Examples
477    ///
478    /// ```rust
479    /// use spring_batch_rs::core::step::{Step, StepExecution, StepStatus};
480    /// use spring_batch_rs::BatchError;
481    ///
482    /// # struct MyStep { name: String }
483    /// # impl Step for MyStep {
484    /// #     fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
485    /// #         step_execution.status = StepStatus::Success;
486    /// #         Ok(())
487    /// #     }
488    /// #     fn get_name(&self) -> &str { &self.name }
489    /// # }
490    /// let step = MyStep { name: "test".to_string() };
491    /// let mut execution = StepExecution::new(step.get_name());
492    /// let result = step.execute(&mut execution);
493    /// assert!(result.is_ok());
494    /// ```
495    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError>;
496
497    /// Returns the name of this step.
498    ///
499    /// # Examples
500    ///
501    /// ```rust
502    /// # use spring_batch_rs::core::step::{Step, StepExecution};
503    /// # use spring_batch_rs::BatchError;
504    /// # struct MyStep { name: String }
505    /// # impl Step for MyStep {
506    /// #     fn execute(&self, _step_execution: &mut StepExecution) -> Result<(), BatchError> { Ok(()) }
507    /// #     fn get_name(&self) -> &str { &self.name }
508    /// # }
509    /// let step = MyStep { name: "data-processing".to_string() };
510    /// assert_eq!(step.get_name(), "data-processing");
511    /// ```
512    fn get_name(&self) -> &str;
513}
514
515/// Indicates whether a tasklet should continue executing or has finished.
516///
517/// This enum is returned by tasklet implementations to control
518/// the execution flow and indicate completion status.
519///
520/// # Examples
521///
522/// ```rust
523/// use spring_batch_rs::core::step::RepeatStatus;
524///
525/// let status = RepeatStatus::Finished;
526/// match status {
527///     RepeatStatus::Continuable => println!("Tasklet can continue"),
528///     RepeatStatus::Finished => println!("Tasklet has completed"),
529/// }
530/// ```
531#[derive(Debug, PartialEq)]
532pub enum RepeatStatus {
533    /// The tasklet can continue to execute.
534    ///
535    /// This indicates that the tasklet has more work to do and should
536    /// be called again in the next execution cycle.
537    Continuable,
538    /// The tasklet has finished executing.
539    ///
540    /// This indicates that the tasklet has completed all its work
541    /// and should not be executed again.
542    Finished,
543}
544
545/// A step implementation that processes items in chunks.
546///
547/// ChunkOrientedStep reads items from a reader, processes them through a processor,
548/// and writes them using a writer. It handles errors gracefully with configurable
549/// skip limits and provides comprehensive metrics tracking.
550///
551/// # Examples
552///
553/// ## Chunk-Oriented Processing
554///
555/// ```rust
556/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
557/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter, PassThroughProcessor};
558/// use spring_batch_rs::BatchError;
559///
560/// # struct MyReader;
561/// # impl ItemReader<String> for MyReader {
562/// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
563/// # }
564/// # struct MyProcessor;
565/// # impl ItemProcessor<String, String> for MyProcessor {
566/// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.clone())) }
567/// # }
568/// # struct MyWriter;
569/// # impl ItemWriter<String> for MyWriter {
570/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
571/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
572/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
573/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
574/// # }
575///
576/// let reader = MyReader;
577/// let processor = PassThroughProcessor::<String>::new();
578/// let writer = MyWriter;
579///
580/// let step = StepBuilder::new("my-step")
581///     .chunk::<String, String>(100)                    // Process 100 items per chunk
582///     .reader(&reader)
583///     .processor(&processor)
584///     .writer(&writer)
585///     .skip_limit(10)               // Allow up to 10 errors
586///     .build();
587///
588/// let mut step_execution = StepExecution::new(step.get_name());
589/// let result = step.execute(&mut step_execution);
590/// ```
591pub struct ChunkOrientedStep<'a, I, O> {
592    name: String,
593    /// Component responsible for reading items from the source
594    reader: &'a dyn ItemReader<I>,
595    /// Component responsible for processing items
596    processor: &'a dyn ItemProcessor<I, O>,
597    /// Component responsible for writing items to the destination
598    writer: &'a dyn ItemWriter<O>,
599    /// Number of items to process in each chunk
600    chunk_size: u16,
601    /// Maximum number of errors allowed before failing the step
602    skip_limit: u16,
603}
604
605impl<I, O> Step for ChunkOrientedStep<'_, I, O> {
606    fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
607        // Start the timer and logging
608        let start_time = Instant::now();
609        info!(
610            "Start of step: {}, id: {}",
611            step_execution.name, step_execution.id
612        );
613
614        // Open the writer and handle any errors
615        Self::manage_error(self.writer.open());
616
617        // Main processing loop
618        loop {
619            // Read chunk
620            let (read_items, chunk_status) = match self.read_chunk(step_execution) {
621                Ok(chunk_data) => chunk_data,
622                Err(_) => {
623                    step_execution.status = StepStatus::ReadError;
624                    break;
625                }
626            };
627
628            // If no items to process, we're done
629            if read_items.is_empty() {
630                step_execution.status = StepStatus::Success;
631                break;
632            }
633
634            // Process and write the chunk
635            if self
636                .process_and_write_chunk(step_execution, &read_items)
637                .is_err()
638            {
639                break; // Status already set in the method
640            }
641
642            // Check if we've reached the end
643            if chunk_status == ChunkStatus::Finished {
644                step_execution.status = StepStatus::Success;
645                break;
646            }
647        }
648
649        // Close the writer and handle any errors
650        Self::manage_error(self.writer.close());
651
652        // Log the end of the step
653        info!(
654            "End of step: {}, id: {}",
655            step_execution.name, step_execution.id
656        );
657
658        // Calculate the step execution details
659        step_execution.start_time = Some(start_time);
660        step_execution.end_time = Some(Instant::now());
661        step_execution.duration = Some(start_time.elapsed());
662
663        // Return the step execution details if the step is successful,
664        // or an error if the step failed
665        if StepStatus::Success == step_execution.status {
666            Ok(())
667        } else {
668            Err(BatchError::Step(step_execution.name.clone()))
669        }
670    }
671
672    fn get_name(&self) -> &str {
673        &self.name
674    }
675}
676
677impl<I, O> ChunkOrientedStep<'_, I, O> {
678    /// Processes a chunk of items and writes them.
679    ///
680    /// This method combines the processing and writing operations for a chunk,
681    /// handling errors appropriately and updating the step execution status.
682    ///
683    /// # Parameters
684    /// - `step_execution`: Mutable reference to track execution state
685    /// - `read_items`: Slice of items to process and write
686    ///
687    /// # Returns
688    /// - `Ok(())`: The chunk was processed and written successfully
689    /// - `Err(BatchError)`: An error occurred during processing or writing
690    fn process_and_write_chunk(
691        &self,
692        step_execution: &mut StepExecution,
693        read_items: &[I],
694    ) -> Result<(), BatchError> {
695        // Process the chunk
696        let processed_items = match self.process_chunk(step_execution, read_items) {
697            Ok(items) => items,
698            Err(error) => {
699                step_execution.status = StepStatus::ProcessorError;
700                return Err(error);
701            }
702        };
703
704        // Write the processed items
705        match self.write_chunk(step_execution, &processed_items) {
706            Ok(()) => Ok(()),
707            Err(error) => {
708                step_execution.status = StepStatus::WriteError;
709                Err(error)
710            }
711        }
712    }
713
714    /// Reads a chunk of items from the reader.
715    ///
716    /// This method attempts to read up to `chunk_size` items from the reader.
717    /// It stops when either:
718    /// - The chunk is full (reached `chunk_size` items)
719    /// - There are no more items to read
720    /// - The error skip limit is reached
721    ///
722    /// # Parameters
723    /// - `read_items`: Vector to store the read items
724    ///
725    /// # Returns
726    /// - `Ok(ChunkStatus::Full)`: The chunk is full with `chunk_size` items
727    /// - `Ok(ChunkStatus::Finished)`: There are no more items to read
728    /// - `Err(BatchError)`: An error occurred and skip limit was reached
729    fn read_chunk(
730        &self,
731        step_execution: &mut StepExecution,
732    ) -> Result<(Vec<I>, ChunkStatus), BatchError> {
733        debug!("Start reading chunk");
734
735        let mut read_items = Vec::with_capacity(self.chunk_size as usize);
736
737        loop {
738            let read_result = self.reader.read();
739
740            match read_result {
741                Ok(item) => {
742                    match item {
743                        Some(item) => {
744                            read_items.push(item);
745                            step_execution.read_count += 1;
746
747                            if read_items.len() >= self.chunk_size as usize {
748                                return Ok((read_items, ChunkStatus::Full));
749                            }
750                        }
751                        None => {
752                            if read_items.is_empty() {
753                                return Ok((read_items, ChunkStatus::Finished));
754                            } else {
755                                return Ok((read_items, ChunkStatus::Full));
756                            }
757                        }
758                    };
759                }
760                Err(error) => {
761                    warn!("Error reading item: {}", error);
762                    step_execution.read_error_count += 1;
763
764                    if self.is_skip_limit_reached(step_execution) {
765                        // Set the status to ReadError when we hit the limit
766                        step_execution.status = StepStatus::ReadError;
767                        return Err(error);
768                    }
769                }
770            }
771        }
772    }
773
774    /// Processes a chunk of items using the processor.
775    ///
776    /// This method applies the processor to each item in the input chunk.
777    /// It collects the successfully processed items and tracks any errors.
778    ///
779    /// # Parameters
780    /// - `read_items`: Vector of items to process
781    ///
782    /// # Returns
783    /// - `Ok(Vec<W>)`: Vector of successfully processed items
784    /// - `Err(BatchError)`: An error occurred and skip limit was reached
785    fn process_chunk(
786        &self,
787        step_execution: &mut StepExecution,
788        read_items: &[I],
789    ) -> Result<Vec<O>, BatchError> {
790        debug!("Processing chunk of {} items", read_items.len());
791        let mut result = Vec::with_capacity(read_items.len());
792
793        for item in read_items {
794            match self.processor.process(item) {
795                Ok(Some(processed_item)) => {
796                    result.push(processed_item);
797                    step_execution.process_count += 1;
798                }
799                Ok(None) => {
800                    step_execution.filter_count += 1;
801                    debug!("Item filtered by processor");
802                }
803                Err(error) => {
804                    warn!("Error processing item: {}", error);
805                    step_execution.process_error_count += 1;
806
807                    if self.is_skip_limit_reached(step_execution) {
808                        // Set the status to ProcessorError when we hit the limit
809                        step_execution.status = StepStatus::ProcessorError;
810                        return Err(error);
811                    }
812                }
813            }
814        }
815
816        Ok(result)
817    }
818
819    /// Writes a chunk of processed items using the writer.
820    ///
821    /// This method writes the processed items to the destination
822    /// and handles any errors that occur.
823    ///
824    /// # Parameters
825    /// - `processed_items`: Vector of items to write
826    ///
827    /// # Returns
828    /// - `Ok(())`: All items were written successfully
829    /// - `Err(BatchError)`: An error occurred and skip limit was reached
830    fn write_chunk(
831        &self,
832        step_execution: &mut StepExecution,
833        processed_items: &[O],
834    ) -> Result<(), BatchError> {
835        debug!("Writing chunk of {} items", processed_items.len());
836
837        if processed_items.is_empty() {
838            debug!("No items to write, skipping write call");
839            return Ok(());
840        }
841
842        match self.writer.write(processed_items) {
843            Ok(()) => {
844                step_execution.write_count += processed_items.len();
845                Self::manage_error(self.writer.flush());
846                Ok(())
847            }
848            Err(error) => {
849                warn!("Error writing items: {}", error);
850                step_execution.write_error_count += processed_items.len();
851
852                if self.is_skip_limit_reached(step_execution) {
853                    // Set the status to WriteError to indicate a write failure
854                    step_execution.status = StepStatus::WriteError;
855                    return Err(error);
856                }
857                Ok(())
858            }
859        }
860    }
861
862    fn is_skip_limit_reached(&self, step_execution: &StepExecution) -> bool {
863        step_execution.read_error_count
864            + step_execution.write_error_count
865            + step_execution.process_error_count
866            > self.skip_limit.into()
867    }
868    /// Helper method to handle errors gracefully.
869    ///
870    /// This method is used to handle errors from operations where we want
871    /// to log the error but not fail the step.
872    ///
873    /// # Parameters
874    /// - `result`: Result to check for errors
875    fn manage_error(result: Result<(), BatchError>) {
876        if let Err(error) = result {
877            warn!("Non-fatal error: {}", error);
878        }
879    }
880}
881
882/// Builder for creating ChunkOrientedStep instances.
883///
884/// Provides a fluent API for configuring chunk-oriented steps with validation
885/// to ensure all required components (reader, processor, writer) are provided.
886///
887/// # Type Parameters
888/// - `I`: The input item type (what the reader produces)
889/// - `O`: The output item type (what the processor produces and writer consumes)
890///
891/// # Examples
892///
893/// ```rust
894/// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
895/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
896/// use spring_batch_rs::BatchError;
897///
898/// # struct MyReader;
899/// # impl ItemReader<i32> for MyReader {
900/// #     fn read(&self) -> Result<Option<i32>, BatchError> { Ok(None) }
901/// # }
902/// # struct MyProcessor;
903/// # impl ItemProcessor<i32, String> for MyProcessor {
904/// #     fn process(&self, item: &i32) -> Result<Option<String>, BatchError> { Ok(Some(item.to_string())) }
905/// # }
906/// # struct MyWriter;
907/// # impl ItemWriter<String> for MyWriter {
908/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
909/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
910/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
911/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
912/// # }
913/// let reader = MyReader;
914/// let processor = MyProcessor;
915/// let writer = MyWriter;
916///
917/// let step = ChunkOrientedStepBuilder::new("number-to-string")
918///     .reader(&reader)
919///     .processor(&processor)
920///     .writer(&writer)
921///     .chunk_size(500)
922///     .skip_limit(25)
923///     .build();
924/// ```
925pub struct ChunkOrientedStepBuilder<'a, I, O> {
926    /// Name for the step
927    name: String,
928    /// Component responsible for reading items from the source
929    reader: Option<&'a dyn ItemReader<I>>,
930    /// Component responsible for processing items
931    processor: Option<&'a dyn ItemProcessor<I, O>>,
932    /// Component responsible for writing items to the destination
933    writer: Option<&'a dyn ItemWriter<O>>,
934    /// Number of items to process in each chunk
935    chunk_size: u16,
936    /// Maximum number of errors allowed before failing the step
937    skip_limit: u16,
938}
939
940impl<'a, I, O> ChunkOrientedStepBuilder<'a, I, O> {
941    /// Creates a new ChunkOrientedStepBuilder with the specified name.
942    ///
943    /// Sets default values:
944    /// - `chunk_size`: 10
945    /// - `skip_limit`: 0 (no error tolerance)
946    ///
947    /// # Parameters
948    /// - `name`: Human-readable name for the step
949    ///
950    /// # Examples
951    ///
952    /// ```rust
953    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
954    ///
955    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("data-migration");
956    /// ```
957    pub fn new(name: &str) -> Self {
958        Self {
959            name: name.to_string(),
960            reader: None,
961            processor: None,
962            writer: None,
963            chunk_size: 10,
964            skip_limit: 0,
965        }
966    }
967
968    /// Sets the item reader for this step.
969    ///
970    /// The reader is responsible for providing items to be processed.
971    /// This is a required component for chunk-oriented steps.
972    ///
973    /// # Parameters
974    /// - `reader`: Implementation of ItemReader that produces items of type `I`
975    ///
976    /// # Examples
977    ///
978    /// ```rust
979    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
980    /// # use spring_batch_rs::core::item::ItemReader;
981    /// # use spring_batch_rs::BatchError;
982    /// # struct FileReader;
983    /// # impl ItemReader<String> for FileReader {
984    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
985    /// # }
986    /// let reader = FileReader;
987    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("file-processing")
988    ///     .reader(&reader);
989    /// ```
990    pub fn reader(mut self, reader: &'a dyn ItemReader<I>) -> Self {
991        self.reader = Some(reader);
992        self
993    }
994
995    /// Sets the item processor for this step.
996    ///
997    /// The processor transforms items from type `I` to type `O`.
998    /// This is a required component for chunk-oriented steps.
999    ///
1000    /// # Parameters
1001    /// - `processor`: Implementation of ItemProcessor that transforms items from `I` to `O`
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```rust
1006    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1007    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor};
1008    /// # use spring_batch_rs::BatchError;
1009    /// # struct FileReader;
1010    /// # impl ItemReader<String> for FileReader {
1011    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1012    /// # }
1013    /// # struct UppercaseProcessor;
1014    /// # impl ItemProcessor<String, String> for UppercaseProcessor {
1015    /// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.to_uppercase())) }
1016    /// # }
1017    /// let reader = FileReader;
1018    /// let processor = UppercaseProcessor;
1019    /// let builder = ChunkOrientedStepBuilder::new("text-processing")
1020    ///     .reader(&reader)
1021    ///     .processor(&processor);
1022    /// ```
1023    pub fn processor(mut self, processor: &'a dyn ItemProcessor<I, O>) -> Self {
1024        self.processor = Some(processor);
1025        self
1026    }
1027
1028    /// Sets the item writer for this step.
1029    ///
1030    /// The writer is responsible for persisting processed items.
1031    /// This is a required component for chunk-oriented steps.
1032    ///
1033    /// # Parameters
1034    /// - `writer`: Implementation of ItemWriter that consumes items of type `O`
1035    ///
1036    /// # Examples
1037    ///
1038    /// ```rust
1039    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1040    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1041    /// # use spring_batch_rs::BatchError;
1042    /// # struct FileReader;
1043    /// # impl ItemReader<String> for FileReader {
1044    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1045    /// # }
1046    /// # struct UppercaseProcessor;
1047    /// # impl ItemProcessor<String, String> for UppercaseProcessor {
1048    /// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.to_uppercase())) }
1049    /// # }
1050    /// # struct FileWriter;
1051    /// # impl ItemWriter<String> for FileWriter {
1052    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1053    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1054    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1055    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1056    /// # }
1057    /// let reader = FileReader;
1058    /// let processor = UppercaseProcessor;
1059    /// let writer = FileWriter;
1060    /// let builder = ChunkOrientedStepBuilder::new("file-processing")
1061    ///     .reader(&reader)
1062    ///     .processor(&processor)
1063    ///     .writer(&writer);
1064    /// ```
1065    pub fn writer(mut self, writer: &'a dyn ItemWriter<O>) -> Self {
1066        self.writer = Some(writer);
1067        self
1068    }
1069
1070    /// Sets the chunk size for this step.
1071    ///
1072    /// The chunk size determines how many items are processed together
1073    /// in a single transaction. Larger chunks can improve performance
1074    /// but use more memory.
1075    ///
1076    /// # Parameters
1077    /// - `chunk_size`: Number of items to process per chunk (must be > 0)
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```rust
1082    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1083    ///
1084    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("bulk-processing")
1085    ///     .chunk_size(1000); // Process 1000 items per chunk
1086    /// ```
1087    pub fn chunk_size(mut self, chunk_size: u16) -> Self {
1088        self.chunk_size = chunk_size;
1089        self
1090    }
1091
1092    /// Sets the skip limit for this step.
1093    ///
1094    /// The skip limit determines how many errors are tolerated before
1095    /// the step fails. A value of 0 means no errors are tolerated.
1096    ///
1097    /// # Parameters
1098    /// - `skip_limit`: Maximum number of errors allowed
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```rust
1103    /// use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1104    ///
1105    /// let builder = ChunkOrientedStepBuilder::<String, String>::new("fault-tolerant-processing")
1106    ///     .skip_limit(100); // Allow up to 100 errors
1107    /// ```
1108    pub fn skip_limit(mut self, skip_limit: u16) -> Self {
1109        self.skip_limit = skip_limit;
1110        self
1111    }
1112
1113    /// Builds the ChunkOrientedStep instance.
1114    ///
1115    /// # Panics
1116    /// Panics if any required component (reader, processor, writer) has not been set.
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```rust
1121    /// # use spring_batch_rs::core::step::ChunkOrientedStepBuilder;
1122    /// # use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1123    /// # use spring_batch_rs::BatchError;
1124    /// # struct MyReader;
1125    /// # impl ItemReader<String> for MyReader {
1126    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1127    /// # }
1128    /// # struct MyProcessor;
1129    /// # impl ItemProcessor<String, String> for MyProcessor {
1130    /// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.clone())) }
1131    /// # }
1132    /// # struct MyWriter;
1133    /// # impl ItemWriter<String> for MyWriter {
1134    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1135    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1136    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1137    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1138    /// # }
1139    /// let reader = MyReader;
1140    /// let processor = MyProcessor;
1141    /// let writer = MyWriter;
1142    ///
1143    /// let step = ChunkOrientedStepBuilder::new("complete-step")
1144    ///     .reader(&reader)
1145    ///     .processor(&processor)
1146    ///     .writer(&writer)
1147    ///     .chunk_size(500)
1148    ///     .skip_limit(10)
1149    ///     .build();
1150    /// ```
1151    pub fn build(self) -> ChunkOrientedStep<'a, I, O> {
1152        ChunkOrientedStep {
1153            name: self.name,
1154            reader: self.reader.expect("Reader is required for building a step"),
1155            processor: self
1156                .processor
1157                .expect("Processor is required for building a step"),
1158            writer: self.writer.expect("Writer is required for building a step"),
1159            chunk_size: self.chunk_size,
1160            skip_limit: self.skip_limit,
1161        }
1162    }
1163}
1164
1165/// Main entry point for building steps of any type.
1166///
1167/// StepBuilder provides a unified interface for creating both chunk-oriented
1168/// and tasklet steps. It uses the builder pattern to provide a fluent API
1169/// for step configuration.
1170///
1171/// # Type Parameters
1172/// - `I`: The input item type for chunk-oriented steps
1173/// - `O`: The output item type for chunk-oriented steps
1174///
1175/// # Examples
1176///
1177/// ## Creating a Chunk-Oriented Step
1178///
1179/// ```rust
1180/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, Step};
1181/// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1182/// use spring_batch_rs::BatchError;
1183///
1184/// # struct MyReader;
1185/// # impl ItemReader<String> for MyReader {
1186/// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1187/// # }
1188/// # struct MyProcessor;
1189/// # impl ItemProcessor<String, String> for MyProcessor {
1190/// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.clone())) }
1191/// # }
1192/// # struct MyWriter;
1193/// # impl ItemWriter<String> for MyWriter {
1194/// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1195/// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1196/// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1197/// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1198/// # }
1199/// let reader = MyReader;
1200/// let processor = MyProcessor;
1201/// let writer = MyWriter;
1202///
1203/// let step = StepBuilder::new("data-processing")
1204///     .chunk::<String, String>(100)
1205///     .reader(&reader)
1206///     .processor(&processor)
1207///     .writer(&writer)
1208///     .build();
1209/// ```
1210///
1211/// ## Creating a Tasklet Step
1212///
1213/// ```rust
1214/// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
1215/// use spring_batch_rs::BatchError;
1216///
1217/// # struct MyTasklet;
1218/// # impl Tasklet for MyTasklet {
1219/// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
1220/// #         Ok(RepeatStatus::Finished)
1221/// #     }
1222/// # }
1223/// let tasklet = MyTasklet;
1224///
1225/// let step = StepBuilder::new("cleanup-task")
1226///     .tasklet(&tasklet)
1227///     .build();
1228/// ```
1229pub struct StepBuilder {
1230    name: String,
1231}
1232
1233impl StepBuilder {
1234    /// Creates a new StepBuilder with the specified name.
1235    ///
1236    /// # Parameters
1237    /// - `name`: Human-readable name for the step
1238    ///
1239    /// # Examples
1240    ///
1241    /// ```rust
1242    /// use spring_batch_rs::core::step::StepBuilder;
1243    ///
1244    /// let builder = StepBuilder::new("my-step");
1245    /// ```
1246    pub fn new(name: &str) -> Self {
1247        Self {
1248            name: name.to_string(),
1249        }
1250    }
1251
1252    /// Configures this step to use a tasklet for execution.
1253    ///
1254    /// Returns a TaskletBuilder for further configuration of the tasklet step.
1255    ///
1256    /// # Parameters
1257    /// - `tasklet`: The tasklet implementation to execute
1258    ///
1259    /// # Examples
1260    ///
1261    /// ```rust
1262    /// use spring_batch_rs::core::step::{StepBuilder, StepExecution, RepeatStatus, Tasklet};
1263    /// use spring_batch_rs::BatchError;
1264    ///
1265    /// # struct FileCleanupTasklet;
1266    /// # impl Tasklet for FileCleanupTasklet {
1267    /// #     fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
1268    /// #         Ok(RepeatStatus::Finished)
1269    /// #     }
1270    /// # }
1271    /// let tasklet = FileCleanupTasklet;
1272    /// let step = StepBuilder::new("cleanup")
1273    ///     .tasklet(&tasklet)
1274    ///     .build();
1275    /// ```
1276    pub fn tasklet(self, tasklet: &dyn Tasklet) -> TaskletBuilder<'_> {
1277        TaskletBuilder::new(&self.name).tasklet(tasklet)
1278    }
1279
1280    /// Configures this step for chunk-oriented processing.
1281    ///
1282    /// Returns a ChunkOrientedStepBuilder for further configuration of the chunk step.
1283    ///
1284    /// # Parameters
1285    /// - `chunk_size`: Number of items to process per chunk
1286    ///
1287    /// # Examples
1288    ///
1289    /// ```rust
1290    /// use spring_batch_rs::core::step::{StepBuilder, Step};
1291    /// use spring_batch_rs::core::item::{ItemReader, ItemProcessor, ItemWriter};
1292    /// use spring_batch_rs::BatchError;
1293    ///
1294    /// # struct MyReader;
1295    /// # impl ItemReader<String> for MyReader {
1296    /// #     fn read(&self) -> Result<Option<String>, BatchError> { Ok(None) }
1297    /// # }
1298    /// # struct MyProcessor;
1299    /// # impl ItemProcessor<String, String> for MyProcessor {
1300    /// #     fn process(&self, item: &String) -> Result<Option<String>, BatchError> { Ok(Some(item.clone())) }
1301    /// # }
1302    /// # struct MyWriter;
1303    /// # impl ItemWriter<String> for MyWriter {
1304    /// #     fn write(&self, items: &[String]) -> Result<(), BatchError> { Ok(()) }
1305    /// #     fn flush(&self) -> Result<(), BatchError> { Ok(()) }
1306    /// #     fn open(&self) -> Result<(), BatchError> { Ok(()) }
1307    /// #     fn close(&self) -> Result<(), BatchError> { Ok(()) }
1308    /// # }
1309    /// let reader = MyReader;
1310    /// let processor = MyProcessor;
1311    /// let writer = MyWriter;
1312    ///
1313    /// let step = StepBuilder::new("bulk-processing")
1314    ///     .chunk(1000)  // Process 1000 items per chunk
1315    ///     .reader(&reader)
1316    ///     .processor(&processor)
1317    ///     .writer(&writer)
1318    ///     .build();
1319    /// ```
1320    pub fn chunk<'a, I, O>(self, chunk_size: u16) -> ChunkOrientedStepBuilder<'a, I, O> {
1321        ChunkOrientedStepBuilder::new(&self.name).chunk_size(chunk_size)
1322    }
1323}
1324
1325/// Represents the status of a chunk during processing.
1326///
1327/// This enum indicates whether a chunk has been fully processed or if
1328/// there are more items to process. It's used internally by the step
1329/// execution logic to control the processing loop.
1330///
1331/// # Examples
1332///
1333/// ```rust
1334/// use spring_batch_rs::core::step::ChunkStatus;
1335///
1336/// let status = ChunkStatus::Full;
1337/// match status {
1338///     ChunkStatus::Full => println!("Chunk is ready for processing"),
1339///     ChunkStatus::Finished => println!("No more items to process"),
1340/// }
1341/// ```
1342#[derive(Debug, PartialEq)]
1343pub enum ChunkStatus {
1344    /// The chunk has been fully processed.
1345    ///
1346    /// This indicates that there are no more items to process in the current
1347    /// data source (typically because we've reached the end of the input).
1348    /// The step should complete after processing any remaining items.
1349    Finished,
1350
1351    /// The chunk is full and ready to be processed.
1352    ///
1353    /// This indicates that we've collected a full chunk of items (based on
1354    /// the configured chunk size) and they are ready to be processed.
1355    /// The step should continue reading more chunks after processing this one.
1356    Full,
1357}
1358
1359/// Represents the current status of a step execution.
1360///
1361/// This enum indicates the current state of a step execution, including
1362/// both success and various failure states. It helps track the step's
1363/// progress and identify the cause of any failures.
1364///
1365/// # Examples
1366///
1367/// ```rust
1368/// use spring_batch_rs::core::step::{StepExecution, StepStatus};
1369///
1370/// let mut step_execution = StepExecution::new("my-step");
1371/// assert_eq!(step_execution.status, StepStatus::Starting);
1372///
1373/// // After successful execution
1374/// step_execution.status = StepStatus::Success;
1375/// match step_execution.status {
1376///     StepStatus::Success => println!("Step completed successfully"),
1377///     StepStatus::ReadError => println!("Failed during reading"),
1378///     StepStatus::ProcessorError => println!("Failed during processing"),
1379///     StepStatus::WriteError => println!("Failed during writing"),
1380///     StepStatus::Starting => println!("Step is starting"),
1381///     StepStatus::Failed => println!("Step has failed"),
1382///     StepStatus::Started => println!("Step has started"),
1383/// }
1384/// ```
1385#[derive(Debug, PartialEq, Clone, Copy)]
1386pub enum StepStatus {
1387    /// The step executed successfully.
1388    ///
1389    /// All items were read, processed, and written without errors
1390    /// exceeding configured skip limits. This is the desired end state
1391    /// for a step execution.
1392    Success,
1393
1394    /// An error occurred during the read operation.
1395    ///
1396    /// This indicates that an error occurred while reading items from the
1397    /// source, and the error count exceeded the configured skip limit.
1398    /// The step was terminated due to too many read failures.
1399    ReadError,
1400
1401    /// An error occurred during the processing operation.
1402    ///
1403    /// This indicates that an error occurred while processing items, and
1404    /// the error count exceeded the configured skip limit.
1405    /// The step was terminated due to too many processing failures.
1406    ProcessorError,
1407
1408    /// An error occurred during the write operation.
1409    ///
1410    /// This indicates that an error occurred while writing items to the
1411    /// destination, and the error count exceeded the configured skip limit.
1412    /// The step was terminated due to too many write failures.
1413    WriteError,
1414
1415    /// The step is starting.
1416    ///
1417    /// This is the initial state of a step before execution begins.
1418    /// All steps start in this state when first created.
1419    Starting,
1420
1421    /// The step is failed.
1422    ///
1423    /// This is the final state of a step after execution has failed.
1424    Failed,
1425
1426    /// The step is started.
1427    ///
1428    /// This is the state of a step after execution has started.
1429    Started,
1430}
1431
1432#[cfg(test)]
1433mod tests {
1434    use anyhow::Result;
1435    use mockall::mock;
1436    use serde::{Deserialize, Serialize};
1437
1438    use crate::{
1439        core::{
1440            item::{
1441                ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
1442                ItemWriterResult,
1443            },
1444            step::{StepExecution, StepStatus},
1445        },
1446        BatchError,
1447    };
1448
1449    use super::{
1450        BatchStatus, ChunkOrientedStepBuilder, ChunkStatus, RepeatStatus, Step, StepBuilder,
1451        Tasklet, TaskletBuilder,
1452    };
1453
1454    mock! {
1455        pub TestItemReader {}
1456        impl ItemReader<Car> for TestItemReader {
1457            fn read(&self) -> ItemReaderResult<Car>;
1458        }
1459    }
1460
1461    mock! {
1462        pub TestProcessor {}
1463        impl ItemProcessor<Car, Car> for TestProcessor {
1464            fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
1465        }
1466    }
1467
1468    mock! {
1469        pub TestItemWriter {}
1470        impl ItemWriter<Car> for TestItemWriter {
1471            fn write(&self, items: &[Car]) -> ItemWriterResult;
1472            fn flush(&self) -> ItemWriterResult;
1473            fn open(&self) -> ItemWriterResult;
1474            fn close(&self) -> ItemWriterResult;
1475        }
1476    }
1477
1478    mock! {
1479        pub TestTasklet {}
1480        impl Tasklet for TestTasklet {
1481            fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
1482        }
1483    }
1484
1485    #[derive(Deserialize, Serialize, Debug, Clone)]
1486    struct Car {
1487        year: u16,
1488        make: String,
1489        model: String,
1490        description: String,
1491    }
1492
1493    fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
1494        if end_count > 0 && *i == end_count {
1495            return Ok(None);
1496        } else if error_count > 0 && *i == error_count {
1497            return Err(BatchError::ItemReader("mock read error".to_string()));
1498        }
1499
1500        let car = Car {
1501            year: 1979,
1502            make: "make".to_owned(),
1503            model: "model".to_owned(),
1504            description: "description".to_owned(),
1505        };
1506        *i += 1;
1507        Ok(Some(car))
1508    }
1509
1510    fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
1511        *i += 1;
1512        if error_at.contains(i) {
1513            return Err(BatchError::ItemProcessor("mock process error".to_string()));
1514        }
1515
1516        let car = Car {
1517            year: 1979,
1518            make: "make".to_owned(),
1519            model: "model".to_owned(),
1520            description: "description".to_owned(),
1521        };
1522        Ok(Some(car))
1523    }
1524
1525    #[test]
1526    fn step_should_succeded_with_empty_data() -> Result<()> {
1527        let mut reader = MockTestItemReader::default();
1528        let reader_result = Ok(None);
1529        reader.expect_read().return_once(move || reader_result);
1530
1531        let mut processor = MockTestProcessor::default();
1532        processor.expect_process().never();
1533
1534        let mut writer = MockTestItemWriter::default();
1535        writer.expect_open().times(1).returning(|| Ok(()));
1536        writer.expect_write().never();
1537        writer.expect_close().times(1).returning(|| Ok(()));
1538
1539        let step = StepBuilder::new("test")
1540            .chunk(3)
1541            .reader(&reader)
1542            .processor(&processor)
1543            .writer(&writer)
1544            .build();
1545
1546        let mut step_execution = StepExecution::new(&step.name);
1547
1548        let result = step.execute(&mut step_execution);
1549
1550        assert!(result.is_ok());
1551        assert_eq!(step.get_name(), "test");
1552        assert!(!step.get_name().is_empty());
1553        assert!(!step_execution.id.is_nil());
1554        assert_eq!(step_execution.status, StepStatus::Success);
1555
1556        Ok(())
1557    }
1558
1559    #[test]
1560    fn step_should_failed_with_processor_error() -> Result<()> {
1561        let mut i = 0;
1562        let mut reader = MockTestItemReader::default();
1563        reader
1564            .expect_read()
1565            .returning(move || mock_read(&mut i, 0, 4));
1566
1567        let mut processor = MockTestProcessor::default();
1568        let mut i = 0;
1569        processor
1570            .expect_process()
1571            .returning(move |_| mock_process(&mut i, &[2]));
1572
1573        let mut writer = MockTestItemWriter::default();
1574        writer.expect_open().times(1).returning(|| Ok(()));
1575        writer.expect_write().never();
1576        writer.expect_close().times(1).returning(|| Ok(()));
1577
1578        let step = StepBuilder::new("test")
1579            .chunk(3)
1580            .reader(&reader)
1581            .processor(&processor)
1582            .writer(&writer)
1583            .build();
1584
1585        let mut step_execution = StepExecution::new(&step.name);
1586
1587        let result = step.execute(&mut step_execution);
1588
1589        assert!(result.is_err());
1590        assert_eq!(step_execution.status, StepStatus::ProcessorError);
1591
1592        Ok(())
1593    }
1594
1595    #[test]
1596    fn step_should_failed_with_write_error() -> Result<()> {
1597        let mut i = 0;
1598        let mut reader = MockTestItemReader::default();
1599        reader
1600            .expect_read()
1601            .returning(move || mock_read(&mut i, 0, 4));
1602
1603        let mut processor = MockTestProcessor::default();
1604        let mut i = 0;
1605        processor
1606            .expect_process()
1607            .returning(move |_| mock_process(&mut i, &[]));
1608
1609        let mut writer = MockTestItemWriter::default();
1610        writer.expect_open().times(1).returning(|| Ok(()));
1611        let result = Err(BatchError::ItemWriter("mock write error".to_string()));
1612        writer.expect_write().return_once(move |_| result);
1613        writer.expect_close().times(1).returning(|| Ok(()));
1614
1615        let step = StepBuilder::new("test")
1616            .chunk(3)
1617            .reader(&reader)
1618            .processor(&processor)
1619            .writer(&writer)
1620            .build();
1621
1622        let mut step_execution = StepExecution::new(&step.name);
1623
1624        let result = step.execute(&mut step_execution);
1625
1626        assert!(result.is_err());
1627        assert_eq!(step_execution.status, StepStatus::WriteError);
1628
1629        Ok(())
1630    }
1631
1632    #[test]
1633    fn step_should_succeed_even_with_processor_error() -> Result<()> {
1634        let mut i = 0;
1635        let mut reader = MockTestItemReader::default();
1636        reader
1637            .expect_read()
1638            .returning(move || mock_read(&mut i, 0, 4));
1639
1640        let mut processor = MockTestProcessor::default();
1641        let mut i = 0;
1642        processor
1643            .expect_process()
1644            .returning(move |_| mock_process(&mut i, &[2]));
1645
1646        let mut writer = MockTestItemWriter::default();
1647        writer.expect_open().times(1).returning(|| Ok(()));
1648        writer.expect_write().times(2).returning(|_| Ok(()));
1649        writer.expect_flush().times(2).returning(|| Ok(()));
1650        writer.expect_close().times(1).returning(|| Ok(()));
1651
1652        let step = StepBuilder::new("test")
1653            .chunk(3)
1654            .reader(&reader)
1655            .processor(&processor)
1656            .writer(&writer)
1657            .skip_limit(1)
1658            .build();
1659
1660        let mut step_execution = StepExecution::new(step.get_name());
1661
1662        let result = step.execute(&mut step_execution);
1663
1664        assert!(result.is_ok());
1665        assert_eq!(step_execution.status, StepStatus::Success);
1666
1667        Ok(())
1668    }
1669
1670    #[test]
1671    fn step_should_fail_with_read_error() -> Result<()> {
1672        let mut i = 0;
1673        let mut reader = MockTestItemReader::default();
1674        reader
1675            .expect_read()
1676            .returning(move || mock_read(&mut i, 1, 4));
1677
1678        let mut processor = MockTestProcessor::default();
1679        let mut i = 0;
1680        processor
1681            .expect_process()
1682            .returning(move |_| mock_process(&mut i, &[]));
1683
1684        let mut writer = MockTestItemWriter::default();
1685        writer.expect_open().times(1).returning(|| Ok(()));
1686        writer.expect_write().never();
1687        writer.expect_close().times(1).returning(|| Ok(()));
1688
1689        let step = StepBuilder::new("test")
1690            .chunk(3)
1691            .reader(&reader)
1692            .processor(&processor)
1693            .writer(&writer)
1694            .build();
1695
1696        let mut step_execution = StepExecution::new(&step.name);
1697
1698        let result = step.execute(&mut step_execution);
1699
1700        assert!(result.is_err());
1701        assert_eq!(step_execution.status, StepStatus::ReadError);
1702        assert_eq!(step_execution.read_error_count, 1);
1703
1704        Ok(())
1705    }
1706
1707    #[test]
1708    fn step_should_respect_chunk_size() -> Result<()> {
1709        let mut i = 0;
1710        let mut reader = MockTestItemReader::default();
1711        reader
1712            .expect_read()
1713            .returning(move || mock_read(&mut i, 0, 6));
1714
1715        let mut processor = MockTestProcessor::default();
1716        let mut i = 0;
1717        processor
1718            .expect_process()
1719            .returning(move |_| mock_process(&mut i, &[]));
1720
1721        let mut writer = MockTestItemWriter::default();
1722        writer.expect_open().times(1).returning(|| Ok(()));
1723        writer.expect_write().times(2).returning(|_| Ok(()));
1724        writer.expect_flush().times(2).returning(|| Ok(()));
1725        writer.expect_close().times(1).returning(|| Ok(()));
1726
1727        let step = StepBuilder::new("test")
1728            .chunk(3)
1729            .reader(&reader)
1730            .processor(&processor)
1731            .writer(&writer)
1732            .build();
1733
1734        let mut step_execution = StepExecution::new(&step.name);
1735
1736        let result = step.execute(&mut step_execution);
1737
1738        assert!(result.is_ok());
1739        assert_eq!(step_execution.status, StepStatus::Success);
1740        assert_eq!(step_execution.read_count, 6);
1741        assert_eq!(step_execution.write_count, 6);
1742
1743        Ok(())
1744    }
1745
1746    #[test]
1747    fn step_should_track_error_counts() -> Result<()> {
1748        let mut i = 0;
1749        let mut reader = MockTestItemReader::default();
1750        reader
1751            .expect_read()
1752            .returning(move || mock_read(&mut i, 0, 4));
1753
1754        let mut processor = MockTestProcessor::default();
1755        let mut i = 0;
1756        processor
1757            .expect_process()
1758            .returning(move |_| mock_process(&mut i, &[1, 2]));
1759
1760        let mut writer = MockTestItemWriter::default();
1761        writer.expect_open().times(1).returning(|| Ok(()));
1762        writer.expect_write().times(2).returning(|_| Ok(()));
1763        writer.expect_flush().times(2).returning(|| Ok(()));
1764        writer.expect_close().times(1).returning(|| Ok(()));
1765
1766        let step = StepBuilder::new("test")
1767            .chunk(3)
1768            .reader(&reader)
1769            .processor(&processor)
1770            .writer(&writer)
1771            .skip_limit(2)
1772            .build();
1773
1774        let mut step_execution = StepExecution::new(&step.name);
1775
1776        let result = step.execute(&mut step_execution);
1777
1778        assert!(result.is_ok());
1779        assert_eq!(step_execution.status, StepStatus::Success);
1780        assert_eq!(step_execution.process_error_count, 2);
1781
1782        Ok(())
1783    }
1784
1785    #[test]
1786    fn step_should_measure_execution_time() -> Result<()> {
1787        let mut i = 0;
1788        let mut reader = MockTestItemReader::default();
1789        reader
1790            .expect_read()
1791            .returning(move || mock_read(&mut i, 0, 2));
1792
1793        let mut processor = MockTestProcessor::default();
1794        let mut i = 0;
1795        processor
1796            .expect_process()
1797            .returning(move |_| mock_process(&mut i, &[]));
1798
1799        let mut writer = MockTestItemWriter::default();
1800        writer.expect_open().times(1).returning(|| Ok(()));
1801        writer.expect_write().times(1).returning(|_| Ok(()));
1802        writer.expect_flush().times(1).returning(|| Ok(()));
1803        writer.expect_close().times(1).returning(|| Ok(()));
1804
1805        let step = StepBuilder::new("test")
1806            .chunk(3)
1807            .reader(&reader)
1808            .processor(&processor)
1809            .writer(&writer)
1810            .build();
1811
1812        let mut step_execution = StepExecution::new(&step.name);
1813
1814        let result = step.execute(&mut step_execution);
1815
1816        assert!(result.is_ok());
1817        assert!(step_execution.duration.unwrap().as_nanos() > 0);
1818        assert!(step_execution.start_time.unwrap() <= step_execution.end_time.unwrap());
1819
1820        Ok(())
1821    }
1822
1823    #[test]
1824    fn step_should_handle_empty_chunk_at_end() -> Result<()> {
1825        let mut i = 0;
1826        let mut reader = MockTestItemReader::default();
1827        reader
1828            .expect_read()
1829            .returning(move || mock_read(&mut i, 0, 1));
1830
1831        let mut processor = MockTestProcessor::default();
1832        let mut i = 0;
1833        processor
1834            .expect_process()
1835            .returning(move |_| mock_process(&mut i, &[]));
1836
1837        let mut writer = MockTestItemWriter::default();
1838        writer.expect_open().times(1).returning(|| Ok(()));
1839        writer.expect_write().times(1).returning(|items| {
1840            assert_eq!(items.len(), 1); // Partial chunk with 1 item
1841            Ok(())
1842        });
1843        writer.expect_flush().times(1).returning(|| Ok(()));
1844        writer.expect_close().times(1).returning(|| Ok(()));
1845
1846        let step = StepBuilder::new("test")
1847            .chunk(3)
1848            .reader(&reader)
1849            .processor(&processor)
1850            .writer(&writer)
1851            .build();
1852
1853        let mut step_execution = StepExecution::new(&step.name);
1854
1855        let result = step.execute(&mut step_execution);
1856
1857        assert!(result.is_ok());
1858        assert_eq!(step_execution.status, StepStatus::Success);
1859        assert_eq!(step_execution.read_count, 1);
1860        assert_eq!(step_execution.write_count, 1);
1861
1862        Ok(())
1863    }
1864
1865    #[test]
1866    fn step_execution_should_initialize_correctly() -> Result<()> {
1867        let step_execution = StepExecution::new("test_step");
1868
1869        assert_eq!(step_execution.name, "test_step");
1870        assert_eq!(step_execution.status, StepStatus::Starting);
1871        assert!(step_execution.start_time.is_none());
1872        assert!(step_execution.end_time.is_none());
1873        assert!(step_execution.duration.is_none());
1874        assert_eq!(step_execution.read_count, 0);
1875        assert_eq!(step_execution.write_count, 0);
1876        assert_eq!(step_execution.read_error_count, 0);
1877        assert_eq!(step_execution.process_count, 0);
1878        assert_eq!(step_execution.process_error_count, 0);
1879        assert_eq!(step_execution.write_error_count, 0);
1880        assert!(!step_execution.id.is_nil());
1881
1882        Ok(())
1883    }
1884
1885    #[test]
1886    fn tasklet_step_should_execute_successfully() -> Result<()> {
1887        let mut tasklet = MockTestTasklet::default();
1888        tasklet
1889            .expect_execute()
1890            .times(1)
1891            .returning(|_| Ok(RepeatStatus::Finished));
1892
1893        let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1894
1895        let mut step_execution = StepExecution::new(&step.name);
1896
1897        let result = step.execute(&mut step_execution);
1898
1899        assert!(result.is_ok());
1900        assert_eq!(step.get_name(), "tasklet_test");
1901
1902        Ok(())
1903    }
1904
1905    #[test]
1906    fn tasklet_step_should_handle_tasklet_error() -> Result<()> {
1907        let mut tasklet = MockTestTasklet::default();
1908        tasklet
1909            .expect_execute()
1910            .times(1)
1911            .returning(|_| Err(BatchError::Step("tasklet error".to_string())));
1912
1913        let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1914
1915        let mut step_execution = StepExecution::new(&step.name);
1916
1917        let result = step.execute(&mut step_execution);
1918
1919        // The tasklet step should now properly handle errors
1920        assert!(result.is_err());
1921        if let Err(BatchError::Step(msg)) = result {
1922            assert_eq!(msg, "tasklet error");
1923        } else {
1924            panic!("Expected Step error");
1925        }
1926
1927        Ok(())
1928    }
1929
1930    #[test]
1931    fn tasklet_step_should_handle_continuable_status() -> Result<()> {
1932        use std::cell::Cell;
1933
1934        let call_count = Cell::new(0);
1935        let mut tasklet = MockTestTasklet::default();
1936        tasklet.expect_execute().times(4).returning(move |_| {
1937            let count = call_count.get();
1938            call_count.set(count + 1);
1939            if count < 3 {
1940                Ok(RepeatStatus::Continuable)
1941            } else {
1942                Ok(RepeatStatus::Finished)
1943            }
1944        });
1945
1946        let step = StepBuilder::new("continuable_tasklet_test")
1947            .tasklet(&tasklet)
1948            .build();
1949
1950        let mut step_execution = StepExecution::new(&step.name);
1951
1952        let result = step.execute(&mut step_execution);
1953
1954        assert!(result.is_ok());
1955        assert_eq!(step.get_name(), "continuable_tasklet_test");
1956
1957        Ok(())
1958    }
1959
1960    #[test]
1961    fn tasklet_step_should_handle_multiple_continuable_cycles() -> Result<()> {
1962        use std::cell::Cell;
1963
1964        let call_count = Cell::new(0);
1965        let mut tasklet = MockTestTasklet::default();
1966
1967        // Set up a sequence: 5 Continuable calls -> 1 Finished call
1968        tasklet.expect_execute().times(6).returning(move |_| {
1969            let count = call_count.get();
1970            call_count.set(count + 1);
1971            if count < 5 {
1972                Ok(RepeatStatus::Continuable)
1973            } else {
1974                Ok(RepeatStatus::Finished)
1975            }
1976        });
1977
1978        let step = StepBuilder::new("multi_cycle_tasklet_test")
1979            .tasklet(&tasklet)
1980            .build();
1981
1982        let mut step_execution = StepExecution::new(&step.name);
1983
1984        let result = step.execute(&mut step_execution);
1985
1986        assert!(result.is_ok());
1987        assert_eq!(step.get_name(), "multi_cycle_tasklet_test");
1988
1989        Ok(())
1990    }
1991
1992    #[test]
1993    fn tasklet_step_should_handle_error_after_continuable() -> Result<()> {
1994        use std::cell::Cell;
1995
1996        let call_count = Cell::new(0);
1997        let mut tasklet = MockTestTasklet::default();
1998
1999        // Set up a sequence: 2 Continuable calls -> 1 Error
2000        tasklet.expect_execute().times(3).returning(move |_| {
2001            let count = call_count.get();
2002            call_count.set(count + 1);
2003            if count < 2 {
2004                Ok(RepeatStatus::Continuable)
2005            } else {
2006                Err(BatchError::Step("error after continuable".to_string()))
2007            }
2008        });
2009
2010        let step = StepBuilder::new("error_after_continuable_test")
2011            .tasklet(&tasklet)
2012            .build();
2013
2014        let mut step_execution = StepExecution::new(&step.name);
2015
2016        let result = step.execute(&mut step_execution);
2017
2018        assert!(result.is_err());
2019        if let Err(BatchError::Step(msg)) = result {
2020            assert_eq!(msg, "error after continuable");
2021        } else {
2022            panic!("Expected Step error");
2023        }
2024
2025        Ok(())
2026    }
2027
2028    #[test]
2029    fn tasklet_step_should_handle_immediate_finished_status() -> Result<()> {
2030        let mut tasklet = MockTestTasklet::default();
2031        tasklet
2032            .expect_execute()
2033            .times(1)
2034            .returning(|_| Ok(RepeatStatus::Finished));
2035
2036        let step = StepBuilder::new("immediate_finished_test")
2037            .tasklet(&tasklet)
2038            .build();
2039
2040        let mut step_execution = StepExecution::new(&step.name);
2041
2042        let result = step.execute(&mut step_execution);
2043
2044        assert!(result.is_ok());
2045        assert_eq!(step.get_name(), "immediate_finished_test");
2046
2047        Ok(())
2048    }
2049
2050    #[test]
2051    fn tasklet_step_should_access_step_execution_context() -> Result<()> {
2052        let mut tasklet = MockTestTasklet::default();
2053        tasklet
2054            .expect_execute()
2055            .times(1)
2056            .withf(|step_execution| {
2057                // Verify that the tasklet receives the correct step execution context
2058                step_execution.name == "context_test"
2059                    && step_execution.status == StepStatus::Started
2060            })
2061            .returning(|_| Ok(RepeatStatus::Finished));
2062
2063        let step = StepBuilder::new("context_test").tasklet(&tasklet).build();
2064
2065        let mut step_execution = StepExecution::new(&step.name);
2066
2067        let result = step.execute(&mut step_execution);
2068
2069        assert!(result.is_ok());
2070
2071        Ok(())
2072    }
2073
2074    #[test]
2075    fn tasklet_builder_should_create_valid_tasklet_step() -> Result<()> {
2076        let mut tasklet = MockTestTasklet::default();
2077        tasklet
2078            .expect_execute()
2079            .times(1)
2080            .returning(|_| Ok(RepeatStatus::Finished));
2081
2082        let step = TaskletBuilder::new("builder_test")
2083            .tasklet(&tasklet)
2084            .build();
2085
2086        let mut step_execution = StepExecution::new(&step.name);
2087
2088        let result = step.execute(&mut step_execution);
2089
2090        assert!(result.is_ok());
2091        assert_eq!(step.get_name(), "builder_test");
2092
2093        Ok(())
2094    }
2095
2096    #[test]
2097    fn tasklet_builder_should_panic_without_tasklet() {
2098        let result = std::panic::catch_unwind(|| TaskletBuilder::new("test").build());
2099
2100        assert!(result.is_err());
2101    }
2102
2103    #[test]
2104    fn step_should_handle_writer_open_error() -> Result<()> {
2105        let mut reader = MockTestItemReader::default();
2106        let reader_result = Ok(None);
2107        reader.expect_read().return_once(move || reader_result);
2108
2109        let mut processor = MockTestProcessor::default();
2110        processor.expect_process().never();
2111
2112        let mut writer = MockTestItemWriter::default();
2113        writer
2114            .expect_open()
2115            .times(1)
2116            .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
2117        writer.expect_close().times(1).returning(|| Ok(()));
2118
2119        let step = StepBuilder::new("test")
2120            .chunk(3)
2121            .reader(&reader)
2122            .processor(&processor)
2123            .writer(&writer)
2124            .build();
2125
2126        let mut step_execution = StepExecution::new(&step.name);
2127
2128        let result = step.execute(&mut step_execution);
2129
2130        // The step should still succeed as open errors are managed
2131        assert!(result.is_ok());
2132        assert_eq!(step_execution.status, StepStatus::Success);
2133
2134        Ok(())
2135    }
2136
2137    #[test]
2138    fn step_should_handle_writer_close_error() -> Result<()> {
2139        let mut reader = MockTestItemReader::default();
2140        let reader_result = Ok(None);
2141        reader.expect_read().return_once(move || reader_result);
2142
2143        let mut processor = MockTestProcessor::default();
2144        processor.expect_process().never();
2145
2146        let mut writer = MockTestItemWriter::default();
2147        writer.expect_open().times(1).returning(|| Ok(()));
2148        writer.expect_write().never();
2149        writer
2150            .expect_close()
2151            .times(1)
2152            .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
2153
2154        let step = StepBuilder::new("test")
2155            .chunk(3)
2156            .reader(&reader)
2157            .processor(&processor)
2158            .writer(&writer)
2159            .build();
2160
2161        let mut step_execution = StepExecution::new(&step.name);
2162
2163        let result = step.execute(&mut step_execution);
2164
2165        // The step should still succeed as close errors are managed
2166        assert!(result.is_ok());
2167        assert_eq!(step_execution.status, StepStatus::Success);
2168
2169        Ok(())
2170    }
2171
2172    #[test]
2173    fn step_should_handle_writer_flush_error() -> Result<()> {
2174        let mut i = 0;
2175        let mut reader = MockTestItemReader::default();
2176        reader
2177            .expect_read()
2178            .returning(move || mock_read(&mut i, 0, 2));
2179
2180        let mut processor = MockTestProcessor::default();
2181        let mut i = 0;
2182        processor
2183            .expect_process()
2184            .returning(move |_| mock_process(&mut i, &[]));
2185
2186        let mut writer = MockTestItemWriter::default();
2187        writer.expect_open().times(1).returning(|| Ok(()));
2188        writer.expect_write().times(1).returning(|_| Ok(()));
2189        writer
2190            .expect_flush()
2191            .times(1)
2192            .returning(|| Err(BatchError::ItemWriter("flush error".to_string())));
2193        writer.expect_close().times(1).returning(|| Ok(()));
2194
2195        let step = StepBuilder::new("test")
2196            .chunk(3)
2197            .reader(&reader)
2198            .processor(&processor)
2199            .writer(&writer)
2200            .build();
2201
2202        let mut step_execution = StepExecution::new(&step.name);
2203
2204        let result = step.execute(&mut step_execution);
2205
2206        // The step should still succeed as flush errors are managed
2207        assert!(result.is_ok());
2208        assert_eq!(step_execution.status, StepStatus::Success);
2209
2210        Ok(())
2211    }
2212
2213    #[test]
2214    fn step_should_handle_multiple_chunks_with_exact_chunk_size() -> Result<()> {
2215        let mut i = 0;
2216        let mut reader = MockTestItemReader::default();
2217        reader
2218            .expect_read()
2219            .returning(move || mock_read(&mut i, 0, 6));
2220
2221        let mut processor = MockTestProcessor::default();
2222        let mut i = 0;
2223        processor
2224            .expect_process()
2225            .returning(move |_| mock_process(&mut i, &[]));
2226
2227        let mut writer = MockTestItemWriter::default();
2228        writer.expect_open().times(1).returning(|| Ok(()));
2229        writer.expect_write().times(2).returning(|items| {
2230            assert_eq!(items.len(), 3); // Each chunk should have exactly 3 items
2231            Ok(())
2232        });
2233        writer.expect_flush().times(2).returning(|| Ok(()));
2234        writer.expect_close().times(1).returning(|| Ok(()));
2235
2236        let step = StepBuilder::new("test")
2237            .chunk(3)
2238            .reader(&reader)
2239            .processor(&processor)
2240            .writer(&writer)
2241            .build();
2242
2243        let mut step_execution = StepExecution::new(&step.name);
2244
2245        let result = step.execute(&mut step_execution);
2246
2247        assert!(result.is_ok());
2248        assert_eq!(step_execution.status, StepStatus::Success);
2249        assert_eq!(step_execution.read_count, 6);
2250        assert_eq!(step_execution.write_count, 6);
2251
2252        Ok(())
2253    }
2254
2255    #[test]
2256    fn step_should_handle_skip_limit_boundary() -> Result<()> {
2257        let mut i = 0;
2258        let mut reader = MockTestItemReader::default();
2259        reader
2260            .expect_read()
2261            .returning(move || mock_read(&mut i, 0, 4));
2262
2263        let mut processor = MockTestProcessor::default();
2264        let mut i = 0;
2265        processor
2266            .expect_process()
2267            .returning(move |_| mock_process(&mut i, &[1, 2])); // 2 errors
2268
2269        let mut writer = MockTestItemWriter::default();
2270        writer.expect_open().times(1).returning(|| Ok(()));
2271        writer.expect_write().times(2).returning(|_| Ok(()));
2272        writer.expect_flush().times(2).returning(|| Ok(()));
2273        writer.expect_close().times(1).returning(|| Ok(()));
2274
2275        let step = StepBuilder::new("test")
2276            .chunk(3)
2277            .reader(&reader)
2278            .processor(&processor)
2279            .writer(&writer)
2280            .skip_limit(2) // Exactly at the limit
2281            .build();
2282
2283        let mut step_execution = StepExecution::new(&step.name);
2284
2285        let result = step.execute(&mut step_execution);
2286
2287        assert!(result.is_ok());
2288        assert_eq!(step_execution.status, StepStatus::Success);
2289        assert_eq!(step_execution.process_error_count, 2);
2290
2291        Ok(())
2292    }
2293
2294    #[test]
2295    fn step_should_fail_when_skip_limit_exceeded() -> Result<()> {
2296        let mut i = 0;
2297        let mut reader = MockTestItemReader::default();
2298        reader
2299            .expect_read()
2300            .returning(move || mock_read(&mut i, 0, 4));
2301
2302        let mut processor = MockTestProcessor::default();
2303        let mut i = 0;
2304        processor
2305            .expect_process()
2306            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // 3 errors
2307
2308        let mut writer = MockTestItemWriter::default();
2309        writer.expect_open().times(1).returning(|| Ok(()));
2310        writer.expect_write().never(); // Should not reach write due to error
2311        writer.expect_close().times(1).returning(|| Ok(()));
2312
2313        let step = StepBuilder::new("test")
2314            .chunk(3)
2315            .reader(&reader)
2316            .processor(&processor)
2317            .writer(&writer)
2318            .skip_limit(2) // Exceeded by 1
2319            .build();
2320
2321        let mut step_execution = StepExecution::new(&step.name);
2322
2323        let result = step.execute(&mut step_execution);
2324
2325        assert!(result.is_err());
2326        assert_eq!(step_execution.status, StepStatus::ProcessorError);
2327        assert_eq!(step_execution.process_error_count, 3);
2328
2329        Ok(())
2330    }
2331
2332    #[test]
2333    fn step_should_handle_empty_processed_chunk() -> Result<()> {
2334        let mut i = 0;
2335        let mut reader = MockTestItemReader::default();
2336        reader
2337            .expect_read()
2338            .returning(move || mock_read(&mut i, 0, 3));
2339
2340        let mut processor = MockTestProcessor::default();
2341        let mut i = 0;
2342        processor
2343            .expect_process()
2344            .returning(move |_| mock_process(&mut i, &[1, 2, 3, 4])); // All items fail processing
2345
2346        let mut writer = MockTestItemWriter::default();
2347        writer.expect_open().times(1).returning(|| Ok(()));
2348        writer.expect_write().never(); // Empty chunks are not written
2349        writer.expect_close().times(1).returning(|| Ok(()));
2350
2351        let step = StepBuilder::new("test")
2352            .chunk(3)
2353            .reader(&reader)
2354            .processor(&processor)
2355            .writer(&writer)
2356            .skip_limit(3) // Allow all errors
2357            .build();
2358
2359        let mut step_execution = StepExecution::new(&step.name);
2360
2361        let result = step.execute(&mut step_execution);
2362
2363        assert!(result.is_ok());
2364        assert_eq!(step_execution.status, StepStatus::Success);
2365        assert_eq!(step_execution.process_error_count, 3);
2366        assert_eq!(step_execution.write_count, 0); // No items written
2367
2368        Ok(())
2369    }
2370
2371    #[test]
2372    fn chunk_status_should_be_comparable() {
2373        assert_eq!(ChunkStatus::Finished, ChunkStatus::Finished);
2374        assert_eq!(ChunkStatus::Full, ChunkStatus::Full);
2375        assert_ne!(ChunkStatus::Finished, ChunkStatus::Full);
2376    }
2377
2378    #[test]
2379    fn step_status_should_be_comparable() {
2380        assert_eq!(StepStatus::Success, StepStatus::Success);
2381        assert_eq!(StepStatus::ReadError, StepStatus::ReadError);
2382        assert_eq!(StepStatus::ProcessorError, StepStatus::ProcessorError);
2383        assert_eq!(StepStatus::WriteError, StepStatus::WriteError);
2384        assert_eq!(StepStatus::Starting, StepStatus::Starting);
2385
2386        assert_ne!(StepStatus::Success, StepStatus::ReadError);
2387        assert_ne!(StepStatus::ProcessorError, StepStatus::WriteError);
2388    }
2389
2390    #[test]
2391    fn repeat_status_should_be_comparable() {
2392        assert_eq!(RepeatStatus::Continuable, RepeatStatus::Continuable);
2393        assert_eq!(RepeatStatus::Finished, RepeatStatus::Finished);
2394        assert_ne!(RepeatStatus::Continuable, RepeatStatus::Finished);
2395    }
2396
2397    #[test]
2398    fn step_builder_should_create_chunk_oriented_step() -> Result<()> {
2399        let mut reader = MockTestItemReader::default();
2400        reader.expect_read().return_once(|| Ok(None));
2401
2402        let mut processor = MockTestProcessor::default();
2403        processor.expect_process().never();
2404
2405        let mut writer = MockTestItemWriter::default();
2406        writer.expect_open().times(1).returning(|| Ok(()));
2407        writer.expect_write().never();
2408        writer.expect_close().times(1).returning(|| Ok(()));
2409
2410        let step = StepBuilder::new("builder_test")
2411            .chunk(5)
2412            .reader(&reader)
2413            .processor(&processor)
2414            .writer(&writer)
2415            .skip_limit(10)
2416            .build();
2417
2418        let mut step_execution = StepExecution::new(&step.name);
2419        let result = step.execute(&mut step_execution);
2420
2421        assert!(result.is_ok());
2422        assert_eq!(step.get_name(), "builder_test");
2423
2424        Ok(())
2425    }
2426
2427    #[test]
2428    fn step_should_handle_large_chunk_size() -> Result<()> {
2429        let mut i = 0;
2430        let mut reader = MockTestItemReader::default();
2431        reader
2432            .expect_read()
2433            .returning(move || mock_read(&mut i, 0, 5));
2434
2435        let mut processor = MockTestProcessor::default();
2436        let mut i = 0;
2437        processor
2438            .expect_process()
2439            .returning(move |_| mock_process(&mut i, &[]));
2440
2441        let mut writer = MockTestItemWriter::default();
2442        writer.expect_open().times(1).returning(|| Ok(()));
2443        writer.expect_write().times(1).returning(|items| {
2444            assert_eq!(items.len(), 5); // All items in one chunk
2445            Ok(())
2446        });
2447        writer.expect_flush().times(1).returning(|| Ok(()));
2448        writer.expect_close().times(1).returning(|| Ok(()));
2449
2450        let step = StepBuilder::new("test")
2451            .chunk(100) // Chunk size larger than available items
2452            .reader(&reader)
2453            .processor(&processor)
2454            .writer(&writer)
2455            .build();
2456
2457        let mut step_execution = StepExecution::new(&step.name);
2458
2459        let result = step.execute(&mut step_execution);
2460
2461        assert!(result.is_ok());
2462        assert_eq!(step_execution.status, StepStatus::Success);
2463        assert_eq!(step_execution.read_count, 5);
2464        assert_eq!(step_execution.write_count, 5);
2465
2466        Ok(())
2467    }
2468
2469    #[test]
2470    fn step_should_handle_mixed_errors_within_skip_limit() -> Result<()> {
2471        use std::cell::Cell;
2472
2473        let read_counter = Cell::new(0u16);
2474        let mut reader = MockTestItemReader::default();
2475        reader.expect_read().returning(move || {
2476            let current = read_counter.get();
2477            if current == 2 {
2478                read_counter.set(current + 1);
2479                Err(BatchError::ItemReader("read error".to_string()))
2480            } else {
2481                let mut i = current;
2482                let result = mock_read(&mut i, 0, 6);
2483                read_counter.set(i);
2484                result
2485            }
2486        });
2487
2488        let mut processor = MockTestProcessor::default();
2489        let mut i = 0;
2490        processor
2491            .expect_process()
2492            .returning(move |_| mock_process(&mut i, &[2])); // 1 process error
2493
2494        let mut writer = MockTestItemWriter::default();
2495        writer.expect_open().times(1).returning(|| Ok(()));
2496        writer.expect_write().times(2).returning(|_| Ok(()));
2497        writer.expect_flush().times(2).returning(|| Ok(()));
2498        writer.expect_close().times(1).returning(|| Ok(()));
2499
2500        let step = StepBuilder::new("test")
2501            .chunk(3)
2502            .reader(&reader)
2503            .processor(&processor)
2504            .writer(&writer)
2505            .skip_limit(2) // Allow 1 read error + 1 process error
2506            .build();
2507
2508        let mut step_execution = StepExecution::new(&step.name);
2509
2510        let result = step.execute(&mut step_execution);
2511
2512        assert!(result.is_ok());
2513        assert_eq!(step_execution.status, StepStatus::Success);
2514        assert_eq!(step_execution.read_error_count, 1);
2515        assert_eq!(step_execution.process_error_count, 1);
2516
2517        Ok(())
2518    }
2519
2520    #[test]
2521    fn step_execution_should_be_cloneable() -> Result<()> {
2522        let step_execution = StepExecution::new("test_step");
2523        let cloned_execution = step_execution.clone();
2524
2525        assert_eq!(step_execution.id, cloned_execution.id);
2526        assert_eq!(step_execution.name, cloned_execution.name);
2527        assert_eq!(step_execution.status, cloned_execution.status);
2528        assert_eq!(step_execution.read_count, cloned_execution.read_count);
2529        assert_eq!(step_execution.write_count, cloned_execution.write_count);
2530
2531        Ok(())
2532    }
2533
2534    #[test]
2535    fn step_should_handle_zero_chunk_size() -> Result<()> {
2536        let mut reader = MockTestItemReader::default();
2537        reader.expect_read().return_once(|| Ok(None));
2538
2539        let mut processor = MockTestProcessor::default();
2540        processor.expect_process().never();
2541
2542        let mut writer = MockTestItemWriter::default();
2543        writer.expect_open().times(1).returning(|| Ok(()));
2544        writer.expect_write().never();
2545        writer.expect_close().times(1).returning(|| Ok(()));
2546
2547        // Test with chunk size of 1 (minimum practical value)
2548        let step = StepBuilder::new("test")
2549            .chunk(1)
2550            .reader(&reader)
2551            .processor(&processor)
2552            .writer(&writer)
2553            .build();
2554
2555        let mut step_execution = StepExecution::new(&step.name);
2556
2557        let result = step.execute(&mut step_execution);
2558
2559        assert!(result.is_ok());
2560        assert_eq!(step_execution.status, StepStatus::Success);
2561
2562        Ok(())
2563    }
2564
2565    #[test]
2566    fn step_should_handle_continuous_read_errors_until_skip_limit() -> Result<()> {
2567        use std::cell::Cell;
2568
2569        let counter = Cell::new(0u16);
2570        let mut reader = MockTestItemReader::default();
2571        reader.expect_read().returning(move || {
2572            let current = counter.get();
2573            counter.set(current + 1);
2574            if current < 3 {
2575                Err(BatchError::ItemReader("continuous read error".to_string()))
2576            } else {
2577                Ok(None) // End of data after errors
2578            }
2579        });
2580
2581        let mut processor = MockTestProcessor::default();
2582        processor.expect_process().never();
2583
2584        let mut writer = MockTestItemWriter::default();
2585        writer.expect_open().times(1).returning(|| Ok(()));
2586        writer.expect_write().never();
2587        writer.expect_close().times(1).returning(|| Ok(()));
2588
2589        let step = StepBuilder::new("test")
2590            .chunk(3)
2591            .reader(&reader)
2592            .processor(&processor)
2593            .writer(&writer)
2594            .skip_limit(2) // Should fail after 3 errors (exceeds limit of 2)
2595            .build();
2596
2597        let mut step_execution = StepExecution::new(&step.name);
2598
2599        let result = step.execute(&mut step_execution);
2600
2601        assert!(result.is_err());
2602        assert_eq!(step_execution.status, StepStatus::ReadError);
2603        assert_eq!(step_execution.read_error_count, 3);
2604
2605        Ok(())
2606    }
2607
2608    #[test]
2609    fn step_should_handle_write_error_with_skip_limit() -> Result<()> {
2610        let mut i = 0;
2611        let mut reader = MockTestItemReader::default();
2612        reader
2613            .expect_read()
2614            .returning(move || mock_read(&mut i, 0, 4));
2615
2616        let mut processor = MockTestProcessor::default();
2617        let mut i = 0;
2618        processor
2619            .expect_process()
2620            .returning(move |_| mock_process(&mut i, &[]));
2621
2622        let mut writer = MockTestItemWriter::default();
2623        writer.expect_open().times(1).returning(|| Ok(()));
2624        writer
2625            .expect_write()
2626            .times(1)
2627            .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2628        writer.expect_close().times(1).returning(|| Ok(()));
2629
2630        let step = StepBuilder::new("test")
2631            .chunk(3)
2632            .reader(&reader)
2633            .processor(&processor)
2634            .writer(&writer)
2635            .skip_limit(0) // No tolerance for errors
2636            .build();
2637
2638        let mut step_execution = StepExecution::new(&step.name);
2639
2640        let result = step.execute(&mut step_execution);
2641
2642        assert!(result.is_err());
2643        assert_eq!(step_execution.status, StepStatus::WriteError);
2644        assert_eq!(step_execution.write_error_count, 3); // All items in chunk failed
2645
2646        Ok(())
2647    }
2648
2649    #[test]
2650    fn step_should_succeed_when_write_error_within_skip_limit() -> Result<()> {
2651        let mut i = 0;
2652        let mut reader = MockTestItemReader::default();
2653        reader
2654            .expect_read()
2655            .returning(move || mock_read(&mut i, 0, 3));
2656
2657        let mut processor = MockTestProcessor::default();
2658        let mut i = 0;
2659        processor
2660            .expect_process()
2661            .returning(move |_| mock_process(&mut i, &[]));
2662
2663        let mut writer = MockTestItemWriter::default();
2664        writer.expect_open().times(1).returning(|| Ok(()));
2665        writer
2666            .expect_write()
2667            .times(1)
2668            .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2669        writer.expect_close().times(1).returning(|| Ok(()));
2670
2671        let step = StepBuilder::new("test")
2672            .chunk(3)
2673            .reader(&reader)
2674            .processor(&processor)
2675            .writer(&writer)
2676            .skip_limit(3) // Exactly at limit: 3 write errors <= skip_limit(3), step continues
2677            .build();
2678
2679        let mut step_execution = StepExecution::new(&step.name);
2680
2681        let result = step.execute(&mut step_execution);
2682
2683        assert!(result.is_ok());
2684        assert_eq!(step_execution.status, StepStatus::Success);
2685        assert_eq!(step_execution.write_error_count, 3);
2686        assert_eq!(step_execution.write_count, 0); // No successful writes
2687
2688        Ok(())
2689    }
2690
2691    #[test]
2692    fn step_should_handle_partial_chunk_at_end() -> Result<()> {
2693        let mut i = 0;
2694        let mut reader = MockTestItemReader::default();
2695        reader
2696            .expect_read()
2697            .returning(move || mock_read(&mut i, 0, 2)); // Only 2 items, chunk size is 3
2698
2699        let mut processor = MockTestProcessor::default();
2700        let mut i = 0;
2701        processor
2702            .expect_process()
2703            .returning(move |_| mock_process(&mut i, &[]));
2704
2705        let mut writer = MockTestItemWriter::default();
2706        writer.expect_open().times(1).returning(|| Ok(()));
2707        writer.expect_write().times(1).returning(|items| {
2708            assert_eq!(items.len(), 2); // Partial chunk with 2 items
2709            Ok(())
2710        });
2711        writer.expect_flush().times(1).returning(|| Ok(()));
2712        writer.expect_close().times(1).returning(|| Ok(()));
2713
2714        let step = StepBuilder::new("test")
2715            .chunk(3)
2716            .reader(&reader)
2717            .processor(&processor)
2718            .writer(&writer)
2719            .build();
2720
2721        let mut step_execution = StepExecution::new(&step.name);
2722
2723        let result = step.execute(&mut step_execution);
2724
2725        assert!(result.is_ok());
2726        assert_eq!(step_execution.status, StepStatus::Success);
2727        assert_eq!(step_execution.read_count, 2);
2728        assert_eq!(step_execution.write_count, 2);
2729
2730        Ok(())
2731    }
2732
2733    #[test]
2734    fn batch_status_should_have_all_variants() {
2735        // Test that all BatchStatus variants exist and can be created
2736        let _completed = BatchStatus::COMPLETED;
2737        let _starting = BatchStatus::STARTING;
2738        let _started = BatchStatus::STARTED;
2739        let _stopping = BatchStatus::STOPPING;
2740        let _stopped = BatchStatus::STOPPED;
2741        let _failed = BatchStatus::FAILED;
2742        let _abandoned = BatchStatus::ABANDONED;
2743        let _unknown = BatchStatus::UNKNOWN;
2744    }
2745
2746    #[test]
2747    fn tasklet_builder_should_require_tasklet() {
2748        let mut tasklet = MockTestTasklet::default();
2749        tasklet.expect_execute().never();
2750
2751        // This test documents that the builder panics if tasklet is not set
2752        // In a real scenario, this would be caught at compile time or runtime
2753        let builder = TaskletBuilder::new("test").tasklet(&tasklet);
2754        let _step = builder.build(); // Should not panic with tasklet set
2755    }
2756
2757    #[test]
2758    fn chunk_oriented_step_builder_should_require_all_components() -> Result<()> {
2759        let mut reader = MockTestItemReader::default();
2760        reader.expect_read().return_once(|| Ok(None));
2761
2762        let mut processor = MockTestProcessor::default();
2763        processor.expect_process().never();
2764
2765        let mut writer = MockTestItemWriter::default();
2766        writer.expect_open().times(1).returning(|| Ok(()));
2767        writer.expect_write().never();
2768        writer.expect_close().times(1).returning(|| Ok(()));
2769
2770        // Test that builder works with all required components
2771        let step = ChunkOrientedStepBuilder::new("test")
2772            .reader(&reader)
2773            .processor(&processor)
2774            .writer(&writer)
2775            .chunk_size(10)
2776            .skip_limit(5)
2777            .build();
2778
2779        let mut step_execution = StepExecution::new(&step.name);
2780        let result = step.execute(&mut step_execution);
2781
2782        assert!(result.is_ok());
2783        assert_eq!(step.get_name(), "test");
2784
2785        Ok(())
2786    }
2787
2788    #[test]
2789    fn step_should_handle_maximum_skip_limit() -> Result<()> {
2790        let mut i = 0;
2791        let mut reader = MockTestItemReader::default();
2792        reader
2793            .expect_read()
2794            .returning(move || mock_read(&mut i, 0, 3)); // Only 3 items to match chunk size
2795
2796        let mut processor = MockTestProcessor::default();
2797        let mut i = 0;
2798        processor
2799            .expect_process()
2800            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // All items fail
2801
2802        let mut writer = MockTestItemWriter::default();
2803        writer.expect_open().times(1).returning(|| Ok(()));
2804        writer.expect_write().never(); // No items to write since all fail processing
2805        writer.expect_close().times(1).returning(|| Ok(()));
2806
2807        let step = StepBuilder::new("test")
2808            .chunk(3)
2809            .reader(&reader)
2810            .processor(&processor)
2811            .writer(&writer)
2812            .skip_limit(u16::MAX) // Maximum skip limit
2813            .build();
2814
2815        let mut step_execution = StepExecution::new(&step.name);
2816
2817        let result = step.execute(&mut step_execution);
2818
2819        assert!(result.is_ok());
2820        assert_eq!(step_execution.status, StepStatus::Success);
2821        assert_eq!(step_execution.process_error_count, 3);
2822
2823        Ok(())
2824    }
2825
2826    #[test]
2827    fn step_should_handle_tasklet_step_timing() -> Result<()> {
2828        let mut tasklet = MockTestTasklet::default();
2829        tasklet
2830            .expect_execute()
2831            .times(1)
2832            .returning(|_| Ok(RepeatStatus::Finished));
2833
2834        let step = StepBuilder::new("timing_test").tasklet(&tasklet).build();
2835
2836        let mut step_execution = StepExecution::new(&step.name);
2837
2838        let result = step.execute(&mut step_execution);
2839
2840        assert!(result.is_ok());
2841        assert!(step_execution.start_time.is_some());
2842        assert!(step_execution.end_time.is_some());
2843        assert!(step_execution.duration.is_some());
2844        assert!(step_execution.duration.unwrap().as_nanos() > 0);
2845
2846        Ok(())
2847    }
2848
2849    #[test]
2850    fn step_should_handle_tasklet_step_status_transitions() -> Result<()> {
2851        let mut tasklet = MockTestTasklet::default();
2852        tasklet
2853            .expect_execute()
2854            .times(1)
2855            .returning(|step_execution| {
2856                // Verify the step status is Started when tasklet is called
2857                assert_eq!(step_execution.status, StepStatus::Started);
2858                Ok(RepeatStatus::Finished)
2859            });
2860
2861        let step = StepBuilder::new("status_test").tasklet(&tasklet).build();
2862
2863        let mut step_execution = StepExecution::new(&step.name);
2864        assert_eq!(step_execution.status, StepStatus::Starting);
2865
2866        let result = step.execute(&mut step_execution);
2867
2868        assert!(result.is_ok());
2869        assert_eq!(step_execution.status, StepStatus::Success);
2870
2871        Ok(())
2872    }
2873
2874    #[test]
2875    fn step_should_handle_tasklet_step_failed_status() -> Result<()> {
2876        let mut tasklet = MockTestTasklet::default();
2877        tasklet
2878            .expect_execute()
2879            .times(1)
2880            .returning(|_| Err(BatchError::Step("tasklet failure".to_string())));
2881
2882        let step = StepBuilder::new("failed_test").tasklet(&tasklet).build();
2883
2884        let mut step_execution = StepExecution::new(&step.name);
2885
2886        let result = step.execute(&mut step_execution);
2887
2888        assert!(result.is_err());
2889        assert_eq!(step_execution.status, StepStatus::Failed);
2890        assert!(step_execution.end_time.is_some());
2891        assert!(step_execution.duration.is_some());
2892
2893        Ok(())
2894    }
2895
2896    #[test]
2897    fn chunk_oriented_step_builder_should_panic_without_reader() {
2898        let mut processor = MockTestProcessor::default();
2899        processor.expect_process().never();
2900
2901        let mut writer = MockTestItemWriter::default();
2902        writer.expect_open().never();
2903
2904        let result = std::panic::catch_unwind(|| {
2905            ChunkOrientedStepBuilder::new("test")
2906                .processor(&processor)
2907                .writer(&writer)
2908                .build()
2909        });
2910
2911        assert!(result.is_err());
2912    }
2913
2914    #[test]
2915    fn chunk_oriented_step_builder_should_panic_without_processor() {
2916        let mut reader = MockTestItemReader::default();
2917        reader.expect_read().never();
2918
2919        let mut writer = MockTestItemWriter::default();
2920        writer.expect_open().never();
2921
2922        let result = std::panic::catch_unwind(|| {
2923            ChunkOrientedStepBuilder::new("test")
2924                .reader(&reader)
2925                .writer(&writer)
2926                .build()
2927        });
2928
2929        assert!(result.is_err());
2930    }
2931
2932    #[test]
2933    fn chunk_oriented_step_builder_should_panic_without_writer() {
2934        let mut reader = MockTestItemReader::default();
2935        reader.expect_read().never();
2936
2937        let mut processor = MockTestProcessor::default();
2938        processor.expect_process().never();
2939
2940        let result = std::panic::catch_unwind(|| {
2941            ChunkOrientedStepBuilder::new("test")
2942                .reader(&reader)
2943                .processor(&processor)
2944                .build()
2945        });
2946
2947        assert!(result.is_err());
2948    }
2949
2950    #[test]
2951    fn step_should_handle_read_chunk_with_full_chunk() -> Result<()> {
2952        let mut i = 0;
2953        let mut reader = MockTestItemReader::default();
2954        reader
2955            .expect_read()
2956            .returning(move || mock_read(&mut i, 0, 4)); // 4 items total
2957
2958        let mut processor = MockTestProcessor::default();
2959        let mut i = 0;
2960        processor
2961            .expect_process()
2962            .returning(move |_| mock_process(&mut i, &[]));
2963
2964        let mut writer = MockTestItemWriter::default();
2965        writer.expect_open().times(1).returning(|| Ok(()));
2966        writer.expect_write().times(2).returning(|items| {
2967            // First chunk has 3 items, second chunk has 1 item
2968            assert!(items.len() <= 3);
2969            Ok(())
2970        });
2971        writer.expect_flush().times(2).returning(|| Ok(()));
2972        writer.expect_close().times(1).returning(|| Ok(()));
2973
2974        let step = StepBuilder::new("test")
2975            .chunk(3)
2976            .reader(&reader)
2977            .processor(&processor)
2978            .writer(&writer)
2979            .build();
2980
2981        let mut step_execution = StepExecution::new(&step.name);
2982
2983        let result = step.execute(&mut step_execution);
2984
2985        assert!(result.is_ok());
2986        assert_eq!(step_execution.status, StepStatus::Success);
2987        assert_eq!(step_execution.read_count, 4);
2988        assert_eq!(step_execution.write_count, 4);
2989
2990        Ok(())
2991    }
2992
2993    #[test]
2994    fn step_should_handle_process_chunk_with_all_errors() -> Result<()> {
2995        let mut i = 0;
2996        let mut reader = MockTestItemReader::default();
2997        reader
2998            .expect_read()
2999            .returning(move || mock_read(&mut i, 0, 3));
3000
3001        let mut processor = MockTestProcessor::default();
3002        let mut i = 0;
3003        processor
3004            .expect_process()
3005            .returning(move |_| mock_process(&mut i, &[1, 2, 3])); // All items fail
3006
3007        let mut writer = MockTestItemWriter::default();
3008        writer.expect_open().times(1).returning(|| Ok(()));
3009        writer.expect_write().never(); // No items to write since all fail processing
3010        writer.expect_close().times(1).returning(|| Ok(()));
3011
3012        let step = StepBuilder::new("test")
3013            .chunk(3)
3014            .reader(&reader)
3015            .processor(&processor)
3016            .writer(&writer)
3017            .skip_limit(5) // Allow all errors
3018            .build();
3019
3020        let mut step_execution = StepExecution::new(&step.name);
3021
3022        let result = step.execute(&mut step_execution);
3023
3024        assert!(result.is_ok());
3025        assert_eq!(step_execution.status, StepStatus::Success);
3026        assert_eq!(step_execution.process_error_count, 3);
3027        assert_eq!(step_execution.write_count, 0);
3028
3029        Ok(())
3030    }
3031
3032    #[test]
3033    fn step_should_handle_write_chunk_with_empty_items() -> Result<()> {
3034        let mut reader = MockTestItemReader::default();
3035        reader.expect_read().return_once(|| Ok(None));
3036
3037        let mut processor = MockTestProcessor::default();
3038        processor.expect_process().never();
3039
3040        let mut writer = MockTestItemWriter::default();
3041        writer.expect_open().times(1).returning(|| Ok(()));
3042        writer.expect_write().never(); // No items to write
3043        writer.expect_close().times(1).returning(|| Ok(()));
3044
3045        let step = StepBuilder::new("test")
3046            .chunk(3)
3047            .reader(&reader)
3048            .processor(&processor)
3049            .writer(&writer)
3050            .build();
3051
3052        let mut step_execution = StepExecution::new(&step.name);
3053
3054        let result = step.execute(&mut step_execution);
3055
3056        assert!(result.is_ok());
3057        assert_eq!(step_execution.status, StepStatus::Success);
3058        assert_eq!(step_execution.read_count, 0);
3059        assert_eq!(step_execution.write_count, 0);
3060
3061        Ok(())
3062    }
3063
3064    #[test]
3065    fn step_should_handle_is_skip_limit_reached_boundary_conditions() -> Result<()> {
3066        let mut i = 0;
3067        let mut reader = MockTestItemReader::default();
3068        reader
3069            .expect_read()
3070            .returning(move || mock_read(&mut i, 0, 4));
3071
3072        let mut processor = MockTestProcessor::default();
3073        let mut i = 0;
3074        processor
3075            .expect_process()
3076            .returning(move |_| mock_process(&mut i, &[1, 2])); // 2 errors
3077
3078        let mut writer = MockTestItemWriter::default();
3079        writer.expect_open().times(1).returning(|| Ok(()));
3080        writer.expect_write().times(2).returning(|_| Ok(()));
3081        writer.expect_flush().times(2).returning(|| Ok(()));
3082        writer.expect_close().times(1).returning(|| Ok(()));
3083
3084        let step = StepBuilder::new("test")
3085            .chunk(3)
3086            .reader(&reader)
3087            .processor(&processor)
3088            .writer(&writer)
3089            .skip_limit(2) // Exactly at the limit
3090            .build();
3091
3092        let mut step_execution = StepExecution::new(&step.name);
3093
3094        let result = step.execute(&mut step_execution);
3095
3096        assert!(result.is_ok());
3097        assert_eq!(step_execution.status, StepStatus::Success);
3098        assert_eq!(step_execution.process_error_count, 2);
3099
3100        Ok(())
3101    }
3102
3103    #[test]
3104    fn step_should_handle_manage_error_with_various_errors() -> Result<()> {
3105        let mut reader = MockTestItemReader::default();
3106        reader.expect_read().return_once(|| Ok(None));
3107
3108        let mut processor = MockTestProcessor::default();
3109        processor.expect_process().never();
3110
3111        let mut writer = MockTestItemWriter::default();
3112        writer
3113            .expect_open()
3114            .times(1)
3115            .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
3116        writer.expect_write().never();
3117        writer
3118            .expect_close()
3119            .times(1)
3120            .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
3121
3122        let step = StepBuilder::new("test")
3123            .chunk(3)
3124            .reader(&reader)
3125            .processor(&processor)
3126            .writer(&writer)
3127            .build();
3128
3129        let mut step_execution = StepExecution::new(&step.name);
3130
3131        let result = step.execute(&mut step_execution);
3132
3133        // Should still succeed as open/close errors are managed
3134        assert!(result.is_ok());
3135        assert_eq!(step_execution.status, StepStatus::Success);
3136
3137        Ok(())
3138    }
3139
3140    #[test]
3141    fn step_execution_should_have_unique_ids() -> Result<()> {
3142        let step_execution1 = StepExecution::new("test1");
3143        let step_execution2 = StepExecution::new("test2");
3144
3145        assert_ne!(step_execution1.id, step_execution2.id);
3146
3147        Ok(())
3148    }
3149
3150    #[test]
3151    fn step_execution_should_clone_with_same_values() -> Result<()> {
3152        let mut step_execution = StepExecution::new("test_step");
3153        step_execution.read_count = 10;
3154        step_execution.write_count = 8;
3155        step_execution.status = StepStatus::Success;
3156
3157        let cloned_execution = step_execution.clone();
3158
3159        assert_eq!(step_execution.id, cloned_execution.id);
3160        assert_eq!(step_execution.name, cloned_execution.name);
3161        assert_eq!(step_execution.status, cloned_execution.status);
3162        assert_eq!(step_execution.read_count, cloned_execution.read_count);
3163        assert_eq!(step_execution.write_count, cloned_execution.write_count);
3164
3165        Ok(())
3166    }
3167
3168    #[test]
3169    fn step_status_should_support_copy_trait() {
3170        let status1 = StepStatus::Success;
3171        let status2 = status1; // This should work because StepStatus implements Copy
3172
3173        assert_eq!(status1, status2);
3174        assert_eq!(status1, StepStatus::Success); // Original should still be usable
3175    }
3176
3177    #[test]
3178    fn step_status_should_support_debug_trait() {
3179        let status = StepStatus::ProcessorError;
3180        let debug_string = format!("{:?}", status);
3181
3182        assert!(debug_string.contains("ProcessorError"));
3183    }
3184
3185    #[test]
3186    fn chunk_status_should_support_debug_trait() {
3187        let status = ChunkStatus::Full;
3188        let debug_string = format!("{:?}", status);
3189
3190        assert!(debug_string.contains("Full"));
3191    }
3192
3193    #[test]
3194    fn repeat_status_should_support_debug_trait() {
3195        let status = RepeatStatus::Continuable;
3196        let debug_string = format!("{:?}", status);
3197
3198        assert!(debug_string.contains("Continuable"));
3199    }
3200
3201    #[test]
3202    fn step_should_count_filtered_items() -> Result<()> {
3203        // Reader returns 4 items (items 0,1,2,3), ends at 4
3204        let mut i = 0u16;
3205        let mut reader = MockTestItemReader::default();
3206        reader
3207            .expect_read()
3208            .returning(move || mock_read(&mut i, 0, 4));
3209
3210        // Processor filters item at position 2 (returns Ok(None))
3211        let mut j = 0u16;
3212        let mut processor = MockTestProcessor::default();
3213        processor.expect_process().returning(move |_| {
3214            j += 1;
3215            if j == 2 {
3216                return Ok(None); // filter this item
3217            }
3218            Ok(Some(Car {
3219                year: 1979,
3220                make: "make".to_owned(),
3221                model: "model".to_owned(),
3222                description: "description".to_owned(),
3223            }))
3224        });
3225
3226        let mut writer = MockTestItemWriter::default();
3227        writer.expect_open().times(1).returning(|| Ok(()));
3228        // 3 items pass through (4 read - 1 filtered), written in one chunk
3229        writer.expect_write().times(1).returning(|items| {
3230            assert_eq!(items.len(), 3, "expected 3 items written after filtering");
3231            Ok(())
3232        });
3233        writer.expect_flush().returning(|| Ok(()));
3234        writer.expect_close().times(1).returning(|| Ok(()));
3235
3236        let step = StepBuilder::new("test")
3237            .chunk(10)
3238            .reader(&reader)
3239            .processor(&processor)
3240            .writer(&writer)
3241            .build();
3242
3243        let mut step_execution = StepExecution::new(&step.name);
3244        let result = step.execute(&mut step_execution);
3245
3246        assert!(result.is_ok());
3247        assert_eq!(step_execution.read_count, 4, "should have read 4 items");
3248        assert_eq!(
3249            step_execution.filter_count, 1,
3250            "should have filtered 1 item"
3251        );
3252        assert_eq!(
3253            step_execution.process_count, 3,
3254            "should have processed 3 items"
3255        );
3256        assert_eq!(step_execution.write_count, 3, "should have written 3 items");
3257
3258        Ok(())
3259    }
3260
3261    #[test]
3262    fn step_should_not_call_writer_when_all_items_filtered() -> Result<()> {
3263        let mut i = 0u16;
3264        let mut reader = MockTestItemReader::default();
3265        reader
3266            .expect_read()
3267            .returning(move || mock_read(&mut i, 0, 3));
3268
3269        let mut processor = MockTestProcessor::default();
3270        processor.expect_process().returning(|_| Ok(None)); // filter every item
3271
3272        let mut writer = MockTestItemWriter::default();
3273        writer.expect_open().times(1).returning(|| Ok(()));
3274        writer.expect_write().never(); // must NOT be called
3275        writer.expect_close().times(1).returning(|| Ok(()));
3276
3277        let step = StepBuilder::new("test")
3278            .chunk(10)
3279            .reader(&reader)
3280            .processor(&processor)
3281            .writer(&writer)
3282            .build();
3283
3284        let mut step_execution = StepExecution::new(&step.name);
3285        let result = step.execute(&mut step_execution);
3286
3287        assert!(result.is_ok());
3288        assert_eq!(
3289            step_execution.filter_count, 3,
3290            "all 3 items should be filtered"
3291        );
3292        assert_eq!(
3293            step_execution.process_count, 0,
3294            "no items should reach process_count"
3295        );
3296        assert_eq!(step_execution.write_count, 0, "nothing should be written");
3297
3298        Ok(())
3299    }
3300}