Skip to main content

spring_batch_rs/core/
item.rs

1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50///     items: Vec<String>,
51///     position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55///     fn read(&mut self) -> ItemReaderResult<String> {
56///         if self.position < self.items.len() {
57///             let item = self.items[self.position].clone();
58///             self.position += 1;
59///             Ok(Some(item))
60///         } else {
61///             Ok(None) // End of data
62///         }
63///     }
64/// }
65/// ```
66pub trait ItemReader<I> {
67    /// Reads an item from the reader.
68    ///
69    /// # Returns
70    /// - `Ok(Some(item))` when an item is successfully read
71    /// - `Ok(None)` when there are no more items to read (end of data)
72    /// - `Err(BatchError)` when an error occurs during reading
73    fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// #[derive(Clone)]
107/// struct Person { name: String, age: u32 }
108///
109/// impl ItemProcessor<Person, Person> for AdultFilter {
110///     fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
111///         if item.age >= 18 {
112///             Ok(Some(item.clone())) // keep adults
113///         } else {
114///             Ok(None) // filter out minors
115///         }
116///     }
117/// }
118/// ```
119pub trait ItemProcessor<I, O> {
120    /// Processes an item and returns the processed result.
121    ///
122    /// # Parameters
123    /// - `item`: The item to process
124    ///
125    /// # Returns
126    /// - `Ok(Some(processed_item))` when the item is successfully processed
127    /// - `Ok(None)` when the item is intentionally filtered out
128    /// - `Err(BatchError)` when an error occurs during processing
129    fn process(&self, item: &I) -> ItemProcessorResult<O>;
130}
131
132/// A trait for writing items.
133///
134/// This trait defines the contract for components that write items to a data destination.
135/// It is one of the fundamental building blocks of the batch processing pipeline.
136///
137/// # Design Pattern
138///
139/// This follows the Strategy Pattern, allowing different writing strategies to be
140/// interchangeable while maintaining a consistent interface.
141///
142/// # Lifecycle Methods
143///
144/// This trait includes additional lifecycle methods:
145/// - `flush()`: Flushes any buffered data
146/// - `open()`: Initializes resources before writing starts
147/// - `close()`: Releases resources after writing completes
148///
149/// # Example
150///
151/// ```
152/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
153/// use spring_batch_rs::error::BatchError;
154///
155/// struct ConsoleWriter;
156///
157/// impl ItemWriter<String> for ConsoleWriter {
158///     fn write(&self, items: &[String]) -> ItemWriterResult {
159///         for item in items {
160///             println!("{}", item);
161///         }
162///         Ok(())
163///     }
164/// }
165/// ```
166pub trait ItemWriter<O> {
167    /// Writes the given items.
168    ///
169    /// # Parameters
170    /// - `items`: A slice of items to write
171    ///
172    /// # Returns
173    /// - `Ok(())` when items are successfully written
174    /// - `Err(BatchError)` when an error occurs during writing
175    fn write(&self, items: &[O]) -> ItemWriterResult;
176
177    /// Flushes any buffered data.
178    ///
179    /// This method is called after a chunk of items has been written, and
180    /// allows the writer to flush any internally buffered data to the destination.
181    ///
182    /// # Default Implementation
183    ///
184    /// The default implementation does nothing and returns `Ok(())`.
185    ///
186    /// # Returns
187    /// - `Ok(())` when the flush operation succeeds
188    /// - `Err(BatchError)` when an error occurs during flushing
189    fn flush(&self) -> ItemWriterResult {
190        Ok(())
191    }
192
193    /// Opens the writer.
194    ///
195    /// This method is called before any items are written, and allows the writer
196    /// to initialize any resources it needs.
197    ///
198    /// # Default Implementation
199    ///
200    /// The default implementation does nothing and returns `Ok(())`.
201    ///
202    /// # Returns
203    /// - `Ok(())` when the open operation succeeds
204    /// - `Err(BatchError)` when an error occurs during opening
205    fn open(&self) -> ItemWriterResult {
206        Ok(())
207    }
208
209    /// Closes the writer.
210    ///
211    /// This method is called after all items have been written, and allows the writer
212    /// to release any resources it acquired.
213    ///
214    /// # Default Implementation
215    ///
216    /// The default implementation does nothing and returns `Ok(())`.
217    ///
218    /// # Returns
219    /// - `Ok(())` when the close operation succeeds
220    /// - `Err(BatchError)` when an error occurs during closing
221    fn close(&self) -> ItemWriterResult {
222        Ok(())
223    }
224}
225
226/// A pass-through processor that returns items unchanged.
227///
228/// This processor implements the identity function for batch processing pipelines.
229/// It takes an input item and returns it unchanged, making it useful for scenarios
230/// where you need a processor in the pipeline but don't want to transform the data.
231///
232/// # Type Parameters
233///
234/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
235///
236/// # Use Cases
237///
238/// - Testing batch processing pipelines without data transformation
239/// - Placeholder processor during development
240/// - Pipelines where processing logic is conditional and sometimes bypassed
241/// - Maintaining consistent pipeline structure when transformation is optional
242///
243/// # Performance
244///
245/// This processor performs a clone operation on each item. For large or complex
246/// data structures, consider whether pass-through processing is necessary or if
247/// the pipeline can be restructured to avoid unnecessary cloning.
248///
249/// # Examples
250///
251/// ```
252/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
253///
254/// let processor = PassThroughProcessor::<String>::new();
255/// let input = "Hello, World!".to_string();
256/// let result = processor.process(&input).unwrap();
257/// assert_eq!(result, Some(input));
258/// ```
259///
260/// Using with different data types:
261///
262/// ```
263/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
264///
265/// // With integers
266/// let int_processor = PassThroughProcessor::<i32>::new();
267/// let number = 42;
268/// let result = int_processor.process(&number).unwrap();
269/// assert_eq!(result, Some(number));
270///
271/// // With custom structs
272/// #[derive(Clone, PartialEq, Debug)]
273/// struct Person {
274///     name: String,
275///     age: u32,
276/// }
277///
278/// let person_processor = PassThroughProcessor::<Person>::new();
279/// let person = Person {
280///     name: "Alice".to_string(),
281///     age: 30,
282/// };
283/// let result = person_processor.process(&person).unwrap();
284/// assert_eq!(result, Some(person));
285/// ```
286#[derive(Default)]
287pub struct PassThroughProcessor<T> {
288    _phantom: std::marker::PhantomData<T>,
289}
290
291impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
292    /// Processes an item by returning it unchanged.
293    ///
294    /// # Parameters
295    /// - `item`: The item to process (will be cloned and returned unchanged)
296    ///
297    /// # Returns
298    /// - `Ok(Some(cloned_item))` - Always succeeds and returns a clone of the input item
299    ///
300    /// # Examples
301    ///
302    /// ```
303    /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
304    ///
305    /// let processor = PassThroughProcessor::<Vec<i32>>::new();
306    /// let input = vec![1, 2, 3];
307    /// let result = processor.process(&input).unwrap();
308    /// assert_eq!(result, Some(input));
309    /// ```
310    fn process(&self, item: &T) -> ItemProcessorResult<T> {
311        Ok(Some(item.clone()))
312    }
313}
314
315impl<T: Clone> PassThroughProcessor<T> {
316    /// Creates a new `PassThroughProcessor`.
317    ///
318    /// # Returns
319    /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use spring_batch_rs::core::item::PassThroughProcessor;
325    ///
326    /// let processor = PassThroughProcessor::<String>::new();
327    /// ```
328    pub fn new() -> Self {
329        Self {
330            _phantom: std::marker::PhantomData,
331        }
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn should_create_new_pass_through_processor() {
341        let _processor = PassThroughProcessor::<String>::new();
342        // Test that we can create the processor without panicking
343        // Verify it's a zero-sized type (only contains PhantomData)
344        assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
345    }
346
347    #[test]
348    fn should_create_pass_through_processor_with_default() {
349        let _processor = PassThroughProcessor::<i32>::default();
350        // Test that we can create the processor using Default trait
351        // Verify it's a zero-sized type (only contains PhantomData)
352        assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
353    }
354
355    #[test]
356    fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
357        let processor = PassThroughProcessor::new();
358        let input = "Hello, World!".to_string();
359        let expected = input.clone();
360
361        let result = processor.process(&input)?;
362
363        assert_eq!(result, Some(expected));
364        Ok(())
365    }
366
367    #[test]
368    fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
369        let processor = PassThroughProcessor::new();
370        let input = 42i32;
371
372        let result = processor.process(&input)?;
373
374        assert_eq!(result, Some(input));
375        Ok(())
376    }
377
378    #[test]
379    fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
380        let processor = PassThroughProcessor::new();
381        let input = vec![1, 2, 3, 4, 5];
382        let expected = input.clone();
383
384        let result = processor.process(&input)?;
385
386        assert_eq!(result, Some(expected));
387        Ok(())
388    }
389
390    #[test]
391    fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
392        #[derive(Clone, PartialEq, Debug)]
393        struct TestData {
394            id: u32,
395            name: String,
396            values: Vec<f64>,
397        }
398
399        let processor = PassThroughProcessor::new();
400        let input = TestData {
401            id: 123,
402            name: "Test Item".to_string(),
403            values: vec![1.1, 2.2, 3.3],
404        };
405        let expected = input.clone();
406
407        let result = processor.process(&input)?;
408
409        assert_eq!(result, Some(expected));
410        Ok(())
411    }
412
413    #[test]
414    fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
415        let processor = PassThroughProcessor::new();
416
417        // Test with Some value
418        let input_some = Some("test".to_string());
419        let result_some = processor.process(&input_some)?;
420        assert_eq!(result_some, Some(input_some));
421
422        // Test with None value
423        let input_none: Option<String> = None;
424        let result_none = processor.process(&input_none)?;
425        assert_eq!(result_none, Some(input_none));
426
427        Ok(())
428    }
429
430    #[test]
431    fn should_handle_empty_collections() -> Result<(), BatchError> {
432        let vec_processor = PassThroughProcessor::new();
433        let empty_vec: Vec<i32> = vec![];
434        let result_vec = vec_processor.process(&empty_vec)?;
435        assert_eq!(result_vec, Some(empty_vec));
436
437        let string_processor = PassThroughProcessor::new();
438        let empty_string = String::new();
439        let result_string = string_processor.process(&empty_string)?;
440        assert_eq!(result_string, Some(empty_string));
441
442        Ok(())
443    }
444
445    #[test]
446    fn should_clone_input_not_move() {
447        let processor = PassThroughProcessor::new();
448        let input = "original".to_string();
449        let input_copy = input.clone();
450
451        let _result = processor.process(&input).unwrap();
452
453        assert_eq!(input, input_copy);
454        assert_eq!(input, "original");
455    }
456
457    #[test]
458    fn should_work_with_multiple_processors() -> Result<(), BatchError> {
459        let processor1 = PassThroughProcessor::<String>::new();
460        let processor2 = PassThroughProcessor::<String>::new();
461
462        let input = "test data".to_string();
463        let result1 = processor1.process(&input)?;
464        let inner = result1.unwrap();
465        let result2 = processor2.process(&inner)?;
466
467        assert_eq!(result2, Some(input));
468        Ok(())
469    }
470
471    #[test]
472    fn should_handle_large_data_structures() -> Result<(), BatchError> {
473        let processor = PassThroughProcessor::new();
474
475        let large_input: Vec<i32> = (0..10000).collect();
476        let expected_len = large_input.len();
477
478        let result = processor.process(&large_input)?;
479
480        // PassThroughProcessor always returns Some — unwrap is safe
481        assert_eq!(result.unwrap().len(), expected_len);
482        Ok(())
483    }
484
485    #[test]
486    fn should_use_default_flush_open_close_implementations() {
487        struct MinimalWriter;
488        impl ItemWriter<String> for MinimalWriter {
489            fn write(&self, _: &[String]) -> ItemWriterResult {
490                Ok(())
491            }
492            // flush, open, close use the trait's default implementations
493        }
494        let w = MinimalWriter;
495        assert!(w.flush().is_ok(), "default flush should return Ok");
496        assert!(w.open().is_ok(), "default open should return Ok");
497        assert!(w.close().is_ok(), "default close should return Ok");
498    }
499}