spring_batch_rs/core/item.rs
1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50/// items: Vec<String>,
51/// position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55/// fn read(&mut self) -> ItemReaderResult<String> {
56/// if self.position < self.items.len() {
57/// let item = self.items[self.position].clone();
58/// self.position += 1;
59/// Ok(Some(item))
60/// } else {
61/// Ok(None) // End of data
62/// }
63/// }
64/// }
65/// ```
66pub trait ItemReader<I> {
67 /// Reads an item from the reader.
68 ///
69 /// # Returns
70 /// - `Ok(Some(item))` when an item is successfully read
71 /// - `Ok(None)` when there are no more items to read (end of data)
72 /// - `Err(BatchError)` when an error occurs during reading
73 fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// #[derive(Clone)]
107/// struct Person { name: String, age: u32 }
108///
109/// impl ItemProcessor<Person, Person> for AdultFilter {
110/// fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
111/// if item.age >= 18 {
112/// Ok(Some(item.clone())) // keep adults
113/// } else {
114/// Ok(None) // filter out minors
115/// }
116/// }
117/// }
118/// ```
119pub trait ItemProcessor<I, O> {
120 /// Processes an item and returns the processed result.
121 ///
122 /// # Parameters
123 /// - `item`: The item to process
124 ///
125 /// # Returns
126 /// - `Ok(Some(processed_item))` when the item is successfully processed
127 /// - `Ok(None)` when the item is intentionally filtered out
128 /// - `Err(BatchError)` when an error occurs during processing
129 fn process(&self, item: &I) -> ItemProcessorResult<O>;
130}
131
132/// A trait for writing items.
133///
134/// This trait defines the contract for components that write items to a data destination.
135/// It is one of the fundamental building blocks of the batch processing pipeline.
136///
137/// # Design Pattern
138///
139/// This follows the Strategy Pattern, allowing different writing strategies to be
140/// interchangeable while maintaining a consistent interface.
141///
142/// # Lifecycle Methods
143///
144/// This trait includes additional lifecycle methods:
145/// - `flush()`: Flushes any buffered data
146/// - `open()`: Initializes resources before writing starts
147/// - `close()`: Releases resources after writing completes
148///
149/// # Example
150///
151/// ```
152/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
153/// use spring_batch_rs::error::BatchError;
154///
155/// struct ConsoleWriter;
156///
157/// impl ItemWriter<String> for ConsoleWriter {
158/// fn write(&self, items: &[String]) -> ItemWriterResult {
159/// for item in items {
160/// println!("{}", item);
161/// }
162/// Ok(())
163/// }
164/// }
165/// ```
166pub trait ItemWriter<O> {
167 /// Writes the given items.
168 ///
169 /// # Parameters
170 /// - `items`: A slice of items to write
171 ///
172 /// # Returns
173 /// - `Ok(())` when items are successfully written
174 /// - `Err(BatchError)` when an error occurs during writing
175 fn write(&self, items: &[O]) -> ItemWriterResult;
176
177 /// Flushes any buffered data.
178 ///
179 /// This method is called after a chunk of items has been written, and
180 /// allows the writer to flush any internally buffered data to the destination.
181 ///
182 /// # Default Implementation
183 ///
184 /// The default implementation does nothing and returns `Ok(())`.
185 ///
186 /// # Returns
187 /// - `Ok(())` when the flush operation succeeds
188 /// - `Err(BatchError)` when an error occurs during flushing
189 fn flush(&self) -> ItemWriterResult {
190 Ok(())
191 }
192
193 /// Opens the writer.
194 ///
195 /// This method is called before any items are written, and allows the writer
196 /// to initialize any resources it needs.
197 ///
198 /// # Default Implementation
199 ///
200 /// The default implementation does nothing and returns `Ok(())`.
201 ///
202 /// # Returns
203 /// - `Ok(())` when the open operation succeeds
204 /// - `Err(BatchError)` when an error occurs during opening
205 fn open(&self) -> ItemWriterResult {
206 Ok(())
207 }
208
209 /// Closes the writer.
210 ///
211 /// This method is called after all items have been written, and allows the writer
212 /// to release any resources it acquired.
213 ///
214 /// # Default Implementation
215 ///
216 /// The default implementation does nothing and returns `Ok(())`.
217 ///
218 /// # Returns
219 /// - `Ok(())` when the close operation succeeds
220 /// - `Err(BatchError)` when an error occurs during closing
221 fn close(&self) -> ItemWriterResult {
222 Ok(())
223 }
224}
225
226/// A pass-through processor that returns items unchanged.
227///
228/// This processor implements the identity function for batch processing pipelines.
229/// It takes an input item and returns it unchanged, making it useful for scenarios
230/// where you need a processor in the pipeline but don't want to transform the data.
231///
232/// # Type Parameters
233///
234/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
235///
236/// # Use Cases
237///
238/// - Testing batch processing pipelines without data transformation
239/// - Placeholder processor during development
240/// - Pipelines where processing logic is conditional and sometimes bypassed
241/// - Maintaining consistent pipeline structure when transformation is optional
242///
243/// # Performance
244///
245/// This processor performs a clone operation on each item. For large or complex
246/// data structures, consider whether pass-through processing is necessary or if
247/// the pipeline can be restructured to avoid unnecessary cloning.
248///
249/// # Examples
250///
251/// ```
252/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
253///
254/// let processor = PassThroughProcessor::<String>::new();
255/// let input = "Hello, World!".to_string();
256/// let result = processor.process(&input).unwrap();
257/// assert_eq!(result, Some(input));
258/// ```
259///
260/// Using with different data types:
261///
262/// ```
263/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
264///
265/// // With integers
266/// let int_processor = PassThroughProcessor::<i32>::new();
267/// let number = 42;
268/// let result = int_processor.process(&number).unwrap();
269/// assert_eq!(result, Some(number));
270///
271/// // With custom structs
272/// #[derive(Clone, PartialEq, Debug)]
273/// struct Person {
274/// name: String,
275/// age: u32,
276/// }
277///
278/// let person_processor = PassThroughProcessor::<Person>::new();
279/// let person = Person {
280/// name: "Alice".to_string(),
281/// age: 30,
282/// };
283/// let result = person_processor.process(&person).unwrap();
284/// assert_eq!(result, Some(person));
285/// ```
286#[derive(Default)]
287pub struct PassThroughProcessor<T> {
288 _phantom: std::marker::PhantomData<T>,
289}
290
291impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
292 /// Processes an item by returning it unchanged.
293 ///
294 /// # Parameters
295 /// - `item`: The item to process (will be cloned and returned unchanged)
296 ///
297 /// # Returns
298 /// - `Ok(Some(cloned_item))` - Always succeeds and returns a clone of the input item
299 ///
300 /// # Examples
301 ///
302 /// ```
303 /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
304 ///
305 /// let processor = PassThroughProcessor::<Vec<i32>>::new();
306 /// let input = vec![1, 2, 3];
307 /// let result = processor.process(&input).unwrap();
308 /// assert_eq!(result, Some(input));
309 /// ```
310 fn process(&self, item: &T) -> ItemProcessorResult<T> {
311 Ok(Some(item.clone()))
312 }
313}
314
315impl<T: Clone> PassThroughProcessor<T> {
316 /// Creates a new `PassThroughProcessor`.
317 ///
318 /// # Returns
319 /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use spring_batch_rs::core::item::PassThroughProcessor;
325 ///
326 /// let processor = PassThroughProcessor::<String>::new();
327 /// ```
328 pub fn new() -> Self {
329 Self {
330 _phantom: std::marker::PhantomData,
331 }
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn should_create_new_pass_through_processor() {
341 let _processor = PassThroughProcessor::<String>::new();
342 // Test that we can create the processor without panicking
343 // Verify it's a zero-sized type (only contains PhantomData)
344 assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
345 }
346
347 #[test]
348 fn should_create_pass_through_processor_with_default() {
349 let _processor = PassThroughProcessor::<i32>::default();
350 // Test that we can create the processor using Default trait
351 // Verify it's a zero-sized type (only contains PhantomData)
352 assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
353 }
354
355 #[test]
356 fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
357 let processor = PassThroughProcessor::new();
358 let input = "Hello, World!".to_string();
359 let expected = input.clone();
360
361 let result = processor.process(&input)?;
362
363 assert_eq!(result, Some(expected));
364 Ok(())
365 }
366
367 #[test]
368 fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
369 let processor = PassThroughProcessor::new();
370 let input = 42i32;
371
372 let result = processor.process(&input)?;
373
374 assert_eq!(result, Some(input));
375 Ok(())
376 }
377
378 #[test]
379 fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
380 let processor = PassThroughProcessor::new();
381 let input = vec![1, 2, 3, 4, 5];
382 let expected = input.clone();
383
384 let result = processor.process(&input)?;
385
386 assert_eq!(result, Some(expected));
387 Ok(())
388 }
389
390 #[test]
391 fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
392 #[derive(Clone, PartialEq, Debug)]
393 struct TestData {
394 id: u32,
395 name: String,
396 values: Vec<f64>,
397 }
398
399 let processor = PassThroughProcessor::new();
400 let input = TestData {
401 id: 123,
402 name: "Test Item".to_string(),
403 values: vec![1.1, 2.2, 3.3],
404 };
405 let expected = input.clone();
406
407 let result = processor.process(&input)?;
408
409 assert_eq!(result, Some(expected));
410 Ok(())
411 }
412
413 #[test]
414 fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
415 let processor = PassThroughProcessor::new();
416
417 // Test with Some value
418 let input_some = Some("test".to_string());
419 let result_some = processor.process(&input_some)?;
420 assert_eq!(result_some, Some(input_some));
421
422 // Test with None value
423 let input_none: Option<String> = None;
424 let result_none = processor.process(&input_none)?;
425 assert_eq!(result_none, Some(input_none));
426
427 Ok(())
428 }
429
430 #[test]
431 fn should_handle_empty_collections() -> Result<(), BatchError> {
432 let vec_processor = PassThroughProcessor::new();
433 let empty_vec: Vec<i32> = vec![];
434 let result_vec = vec_processor.process(&empty_vec)?;
435 assert_eq!(result_vec, Some(empty_vec));
436
437 let string_processor = PassThroughProcessor::new();
438 let empty_string = String::new();
439 let result_string = string_processor.process(&empty_string)?;
440 assert_eq!(result_string, Some(empty_string));
441
442 Ok(())
443 }
444
445 #[test]
446 fn should_clone_input_not_move() {
447 let processor = PassThroughProcessor::new();
448 let input = "original".to_string();
449 let input_copy = input.clone();
450
451 let _result = processor.process(&input).unwrap();
452
453 assert_eq!(input, input_copy);
454 assert_eq!(input, "original");
455 }
456
457 #[test]
458 fn should_work_with_multiple_processors() -> Result<(), BatchError> {
459 let processor1 = PassThroughProcessor::<String>::new();
460 let processor2 = PassThroughProcessor::<String>::new();
461
462 let input = "test data".to_string();
463 let result1 = processor1.process(&input)?;
464 let inner = result1.unwrap();
465 let result2 = processor2.process(&inner)?;
466
467 assert_eq!(result2, Some(input));
468 Ok(())
469 }
470
471 #[test]
472 fn should_handle_large_data_structures() -> Result<(), BatchError> {
473 let processor = PassThroughProcessor::new();
474
475 let large_input: Vec<i32> = (0..10000).collect();
476 let expected_len = large_input.len();
477
478 let result = processor.process(&large_input)?;
479
480 // PassThroughProcessor always returns Some — unwrap is safe
481 assert_eq!(result.unwrap().len(), expected_len);
482 Ok(())
483 }
484
485 #[test]
486 fn should_use_default_flush_open_close_implementations() {
487 struct MinimalWriter;
488 impl ItemWriter<String> for MinimalWriter {
489 fn write(&self, _: &[String]) -> ItemWriterResult {
490 Ok(())
491 }
492 // flush, open, close use the trait's default implementations
493 }
494 let w = MinimalWriter;
495 assert!(w.flush().is_ok(), "default flush should return Ok");
496 assert!(w.open().is_ok(), "default open should return Ok");
497 assert!(w.close().is_ok(), "default close should return Ok");
498 }
499}