Skip to main content

spring_batch_rs/core/
item.rs

1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(W)` when an item is successfully processed
15/// - `Err(BatchError)` when an error occurs during processing
16pub type ItemProcessorResult<O> = Result<O, BatchError>;
17
18/// Represents the result of writing items by the writer.
19///
20/// This type is a specialized `Result` that can be:
21/// - `Ok(())` when items are successfully written
22/// - `Err(BatchError)` when an error occurs during writing
23pub type ItemWriterResult = Result<(), BatchError>;
24
25/// A trait for reading items.
26///
27/// This trait defines the contract for components that read items from a data source.
28/// It is one of the fundamental building blocks of the batch processing pipeline.
29///
30/// # Design Pattern
31///
32/// This follows the Strategy Pattern, allowing different reading strategies to be
33/// interchangeable while maintaining a consistent interface.
34///
35/// # Implementation Note
36///
37/// Implementors of this trait should:
38/// - Return `Ok(Some(item))` when an item is successfully read
39/// - Return `Ok(None)` when there are no more items to read (end of data)
40/// - Return `Err(BatchError)` when an error occurs during reading
41///
42/// # Example
43///
44/// ```compile_fail
45/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
46/// use spring_batch_rs::error::BatchError;
47///
48/// struct StringReader {
49///     items: Vec<String>,
50///     position: usize,
51/// }
52///
53/// impl ItemReader<String> for StringReader {
54///     fn read(&mut self) -> ItemReaderResult<String> {
55///         if self.position < self.items.len() {
56///             let item = self.items[self.position].clone();
57///             self.position += 1;
58///             Ok(Some(item))
59///         } else {
60///             Ok(None) // End of data
61///         }
62///     }
63/// }
64/// ```
65pub trait ItemReader<I> {
66    /// Reads an item from the reader.
67    ///
68    /// # Returns
69    /// - `Ok(Some(item))` when an item is successfully read
70    /// - `Ok(None)` when there are no more items to read (end of data)
71    /// - `Err(BatchError)` when an error occurs during reading
72    fn read(&self) -> ItemReaderResult<I>;
73}
74
75/// A trait for processing items.
76///
77/// This trait defines the contract for components that transform or process items
78/// in a batch processing pipeline. It takes an input item of type `R` and produces
79/// an output item of type `W`.
80///
81/// # Design Pattern
82///
83/// This follows the Strategy Pattern, allowing different processing strategies to be
84/// interchangeable while maintaining a consistent interface.
85///
86/// # Type Parameters
87///
88/// - `R`: The input item type
89/// - `W`: The output item type
90///
91/// # Example
92///
93/// ```
94/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
95/// use spring_batch_rs::error::BatchError;
96///
97/// struct UppercaseProcessor;
98///
99/// impl ItemProcessor<String, String> for UppercaseProcessor {
100///     fn process(&self, item: &String) -> ItemProcessorResult<String> {
101///         Ok(item.to_uppercase())
102///     }
103/// }
104/// ```
105pub trait ItemProcessor<I, O> {
106    /// Processes an item and returns the processed result.
107    ///
108    /// # Parameters
109    /// - `item`: The item to process
110    ///
111    /// # Returns
112    /// - `Ok(processed_item)` when the item is successfully processed
113    /// - `Err(BatchError)` when an error occurs during processing
114    fn process(&self, item: &I) -> ItemProcessorResult<O>;
115}
116
117/// A trait for writing items.
118///
119/// This trait defines the contract for components that write items to a data destination.
120/// It is one of the fundamental building blocks of the batch processing pipeline.
121///
122/// # Design Pattern
123///
124/// This follows the Strategy Pattern, allowing different writing strategies to be
125/// interchangeable while maintaining a consistent interface.
126///
127/// # Lifecycle Methods
128///
129/// This trait includes additional lifecycle methods:
130/// - `flush()`: Flushes any buffered data
131/// - `open()`: Initializes resources before writing starts
132/// - `close()`: Releases resources after writing completes
133///
134/// # Example
135///
136/// ```
137/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
138/// use spring_batch_rs::error::BatchError;
139///
140/// struct ConsoleWriter;
141///
142/// impl ItemWriter<String> for ConsoleWriter {
143///     fn write(&self, items: &[String]) -> ItemWriterResult {
144///         for item in items {
145///             println!("{}", item);
146///         }
147///         Ok(())
148///     }
149/// }
150/// ```
151pub trait ItemWriter<O> {
152    /// Writes the given items.
153    ///
154    /// # Parameters
155    /// - `items`: A slice of items to write
156    ///
157    /// # Returns
158    /// - `Ok(())` when items are successfully written
159    /// - `Err(BatchError)` when an error occurs during writing
160    fn write(&self, items: &[O]) -> ItemWriterResult;
161
162    /// Flushes any buffered data.
163    ///
164    /// This method is called after a chunk of items has been written, and
165    /// allows the writer to flush any internally buffered data to the destination.
166    ///
167    /// # Default Implementation
168    ///
169    /// The default implementation does nothing and returns `Ok(())`.
170    ///
171    /// # Returns
172    /// - `Ok(())` when the flush operation succeeds
173    /// - `Err(BatchError)` when an error occurs during flushing
174    fn flush(&self) -> ItemWriterResult {
175        Ok(())
176    }
177
178    /// Opens the writer.
179    ///
180    /// This method is called before any items are written, and allows the writer
181    /// to initialize any resources it needs.
182    ///
183    /// # Default Implementation
184    ///
185    /// The default implementation does nothing and returns `Ok(())`.
186    ///
187    /// # Returns
188    /// - `Ok(())` when the open operation succeeds
189    /// - `Err(BatchError)` when an error occurs during opening
190    fn open(&self) -> ItemWriterResult {
191        Ok(())
192    }
193
194    /// Closes the writer.
195    ///
196    /// This method is called after all items have been written, and allows the writer
197    /// to release any resources it acquired.
198    ///
199    /// # Default Implementation
200    ///
201    /// The default implementation does nothing and returns `Ok(())`.
202    ///
203    /// # Returns
204    /// - `Ok(())` when the close operation succeeds
205    /// - `Err(BatchError)` when an error occurs during closing
206    fn close(&self) -> ItemWriterResult {
207        Ok(())
208    }
209}
210
211/// A pass-through processor that returns items unchanged.
212///
213/// This processor implements the identity function for batch processing pipelines.
214/// It takes an input item and returns it unchanged, making it useful for scenarios
215/// where you need a processor in the pipeline but don't want to transform the data.
216///
217/// # Type Parameters
218///
219/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
220///
221/// # Use Cases
222///
223/// - Testing batch processing pipelines without data transformation
224/// - Placeholder processor during development
225/// - Pipelines where processing logic is conditional and sometimes bypassed
226/// - Maintaining consistent pipeline structure when transformation is optional
227///
228/// # Performance
229///
230/// This processor performs a clone operation on each item. For large or complex
231/// data structures, consider whether pass-through processing is necessary or if
232/// the pipeline can be restructured to avoid unnecessary cloning.
233///
234/// # Examples
235///
236/// ```
237/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
238///
239/// let processor = PassThroughProcessor::<String>::new();
240/// let input = "Hello, World!".to_string();
241/// let result = processor.process(&input).unwrap();
242/// assert_eq!(result, input);
243/// ```
244///
245/// Using with different data types:
246///
247/// ```
248/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
249///
250/// // With integers
251/// let int_processor = PassThroughProcessor::<i32>::new();
252/// let number = 42;
253/// let result = int_processor.process(&number).unwrap();
254/// assert_eq!(result, number);
255///
256/// // With custom structs
257/// #[derive(Clone, PartialEq, Debug)]
258/// struct Person {
259///     name: String,
260///     age: u32,
261/// }
262///
263/// let person_processor = PassThroughProcessor::<Person>::new();
264/// let person = Person {
265///     name: "Alice".to_string(),
266///     age: 30,
267/// };
268/// let result = person_processor.process(&person).unwrap();
269/// assert_eq!(result, person);
270/// ```
271#[derive(Default)]
272pub struct PassThroughProcessor<T> {
273    _phantom: std::marker::PhantomData<T>,
274}
275
276impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
277    /// Processes an item by returning it unchanged.
278    ///
279    /// # Parameters
280    /// - `item`: The item to process (will be cloned and returned unchanged)
281    ///
282    /// # Returns
283    /// - `Ok(cloned_item)` - Always succeeds and returns a clone of the input item
284    ///
285    /// # Examples
286    ///
287    /// ```
288    /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
289    ///
290    /// let processor = PassThroughProcessor::<Vec<i32>>::new();
291    /// let input = vec![1, 2, 3];
292    /// let result = processor.process(&input).unwrap();
293    /// assert_eq!(result, input);
294    /// ```
295    fn process(&self, item: &T) -> ItemProcessorResult<T> {
296        Ok(item.clone())
297    }
298}
299
300impl<T: Clone> PassThroughProcessor<T> {
301    /// Creates a new `PassThroughProcessor`.
302    ///
303    /// # Returns
304    /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
305    ///
306    /// # Examples
307    ///
308    /// ```
309    /// use spring_batch_rs::core::item::PassThroughProcessor;
310    ///
311    /// let processor = PassThroughProcessor::<String>::new();
312    /// ```
313    pub fn new() -> Self {
314        Self {
315            _phantom: std::marker::PhantomData,
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn should_create_new_pass_through_processor() {
326        let _processor = PassThroughProcessor::<String>::new();
327        // Test that we can create the processor without panicking
328        // Verify it's a zero-sized type (only contains PhantomData)
329        assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
330    }
331
332    #[test]
333    fn should_create_pass_through_processor_with_default() {
334        let _processor = PassThroughProcessor::<i32>::default();
335        // Test that we can create the processor using Default trait
336        // Verify it's a zero-sized type (only contains PhantomData)
337        assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
338    }
339
340    #[test]
341    fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
342        let processor = PassThroughProcessor::new();
343        let input = "Hello, World!".to_string();
344        let expected = input.clone();
345
346        let result = processor.process(&input)?;
347
348        assert_eq!(result, expected);
349        assert_eq!(result, input);
350        Ok(())
351    }
352
353    #[test]
354    fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
355        let processor = PassThroughProcessor::new();
356        let input = 42i32;
357
358        let result = processor.process(&input)?;
359
360        assert_eq!(result, input);
361        Ok(())
362    }
363
364    #[test]
365    fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
366        let processor = PassThroughProcessor::new();
367        let input = vec![1, 2, 3, 4, 5];
368        let expected = input.clone();
369
370        let result = processor.process(&input)?;
371
372        assert_eq!(result, expected);
373        assert_eq!(result, input);
374        Ok(())
375    }
376
377    #[test]
378    fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
379        #[derive(Clone, PartialEq, Debug)]
380        struct TestData {
381            id: u32,
382            name: String,
383            values: Vec<f64>,
384        }
385
386        let processor = PassThroughProcessor::new();
387        let input = TestData {
388            id: 123,
389            name: "Test Item".to_string(),
390            values: vec![1.1, 2.2, 3.3],
391        };
392        let expected = input.clone();
393
394        let result = processor.process(&input)?;
395
396        assert_eq!(result, expected);
397        assert_eq!(result.id, input.id);
398        assert_eq!(result.name, input.name);
399        assert_eq!(result.values, input.values);
400        Ok(())
401    }
402
403    #[test]
404    fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
405        let processor = PassThroughProcessor::new();
406
407        // Test with Some value
408        let input_some = Some("test".to_string());
409        let result_some = processor.process(&input_some)?;
410        assert_eq!(result_some, input_some);
411
412        // Test with None value
413        let input_none: Option<String> = None;
414        let result_none = processor.process(&input_none)?;
415        assert_eq!(result_none, input_none);
416
417        Ok(())
418    }
419
420    #[test]
421    fn should_handle_empty_collections() -> Result<(), BatchError> {
422        // Test empty vector
423        let vec_processor = PassThroughProcessor::new();
424        let empty_vec: Vec<i32> = vec![];
425        let result_vec = vec_processor.process(&empty_vec)?;
426        assert_eq!(result_vec, empty_vec);
427        assert!(result_vec.is_empty());
428
429        // Test empty string
430        let string_processor = PassThroughProcessor::new();
431        let empty_string = String::new();
432        let result_string = string_processor.process(&empty_string)?;
433        assert_eq!(result_string, empty_string);
434        assert!(result_string.is_empty());
435
436        Ok(())
437    }
438
439    #[test]
440    fn should_clone_input_not_move() {
441        let processor = PassThroughProcessor::new();
442        let input = "original".to_string();
443        let input_copy = input.clone();
444
445        let _result = processor.process(&input).unwrap();
446
447        // Original input should still be accessible (not moved)
448        assert_eq!(input, input_copy);
449        assert_eq!(input, "original");
450    }
451
452    #[test]
453    fn should_work_with_multiple_processors() -> Result<(), BatchError> {
454        let processor1 = PassThroughProcessor::<String>::new();
455        let processor2 = PassThroughProcessor::<String>::new();
456
457        let input = "test data".to_string();
458        let result1 = processor1.process(&input)?;
459        let result2 = processor2.process(&result1)?;
460
461        assert_eq!(result2, input);
462        assert_eq!(result1, result2);
463        Ok(())
464    }
465
466    #[test]
467    fn should_handle_large_data_structures() -> Result<(), BatchError> {
468        let processor = PassThroughProcessor::new();
469
470        // Create a large vector
471        let large_input: Vec<i32> = (0..10000).collect();
472        let expected = large_input.clone();
473
474        let result = processor.process(&large_input)?;
475
476        assert_eq!(result.len(), expected.len());
477        assert_eq!(result, expected);
478        Ok(())
479    }
480
481    #[test]
482    fn should_use_default_flush_open_close_implementations() {
483        struct MinimalWriter;
484        impl ItemWriter<String> for MinimalWriter {
485            fn write(&self, _: &[String]) -> ItemWriterResult {
486                Ok(())
487            }
488            // flush, open, close use the trait's default implementations
489        }
490        let w = MinimalWriter;
491        assert!(w.flush().is_ok(), "default flush should return Ok");
492        assert!(w.open().is_ok(), "default open should return Ok");
493        assert!(w.close().is_ok(), "default close should return Ok");
494    }
495}