spring_batch_rs/core/item.rs
1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(W)` when an item is successfully processed
15/// - `Err(BatchError)` when an error occurs during processing
16pub type ItemProcessorResult<O> = Result<O, BatchError>;
17
18/// Represents the result of writing items by the writer.
19///
20/// This type is a specialized `Result` that can be:
21/// - `Ok(())` when items are successfully written
22/// - `Err(BatchError)` when an error occurs during writing
23pub type ItemWriterResult = Result<(), BatchError>;
24
25/// A trait for reading items.
26///
27/// This trait defines the contract for components that read items from a data source.
28/// It is one of the fundamental building blocks of the batch processing pipeline.
29///
30/// # Design Pattern
31///
32/// This follows the Strategy Pattern, allowing different reading strategies to be
33/// interchangeable while maintaining a consistent interface.
34///
35/// # Implementation Note
36///
37/// Implementors of this trait should:
38/// - Return `Ok(Some(item))` when an item is successfully read
39/// - Return `Ok(None)` when there are no more items to read (end of data)
40/// - Return `Err(BatchError)` when an error occurs during reading
41///
42/// # Example
43///
44/// ```compile_fail
45/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
46/// use spring_batch_rs::error::BatchError;
47///
48/// struct StringReader {
49/// items: Vec<String>,
50/// position: usize,
51/// }
52///
53/// impl ItemReader<String> for StringReader {
54/// fn read(&mut self) -> ItemReaderResult<String> {
55/// if self.position < self.items.len() {
56/// let item = self.items[self.position].clone();
57/// self.position += 1;
58/// Ok(Some(item))
59/// } else {
60/// Ok(None) // End of data
61/// }
62/// }
63/// }
64/// ```
65pub trait ItemReader<I> {
66 /// Reads an item from the reader.
67 ///
68 /// # Returns
69 /// - `Ok(Some(item))` when an item is successfully read
70 /// - `Ok(None)` when there are no more items to read (end of data)
71 /// - `Err(BatchError)` when an error occurs during reading
72 fn read(&self) -> ItemReaderResult<I>;
73}
74
75/// A trait for processing items.
76///
77/// This trait defines the contract for components that transform or process items
78/// in a batch processing pipeline. It takes an input item of type `R` and produces
79/// an output item of type `W`.
80///
81/// # Design Pattern
82///
83/// This follows the Strategy Pattern, allowing different processing strategies to be
84/// interchangeable while maintaining a consistent interface.
85///
86/// # Type Parameters
87///
88/// - `R`: The input item type
89/// - `W`: The output item type
90///
91/// # Example
92///
93/// ```
94/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
95/// use spring_batch_rs::error::BatchError;
96///
97/// struct UppercaseProcessor;
98///
99/// impl ItemProcessor<String, String> for UppercaseProcessor {
100/// fn process(&self, item: &String) -> ItemProcessorResult<String> {
101/// Ok(item.to_uppercase())
102/// }
103/// }
104/// ```
105pub trait ItemProcessor<I, O> {
106 /// Processes an item and returns the processed result.
107 ///
108 /// # Parameters
109 /// - `item`: The item to process
110 ///
111 /// # Returns
112 /// - `Ok(processed_item)` when the item is successfully processed
113 /// - `Err(BatchError)` when an error occurs during processing
114 fn process(&self, item: &I) -> ItemProcessorResult<O>;
115}
116
117/// A trait for writing items.
118///
119/// This trait defines the contract for components that write items to a data destination.
120/// It is one of the fundamental building blocks of the batch processing pipeline.
121///
122/// # Design Pattern
123///
124/// This follows the Strategy Pattern, allowing different writing strategies to be
125/// interchangeable while maintaining a consistent interface.
126///
127/// # Lifecycle Methods
128///
129/// This trait includes additional lifecycle methods:
130/// - `flush()`: Flushes any buffered data
131/// - `open()`: Initializes resources before writing starts
132/// - `close()`: Releases resources after writing completes
133///
134/// # Example
135///
136/// ```
137/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
138/// use spring_batch_rs::error::BatchError;
139///
140/// struct ConsoleWriter;
141///
142/// impl ItemWriter<String> for ConsoleWriter {
143/// fn write(&self, items: &[String]) -> ItemWriterResult {
144/// for item in items {
145/// println!("{}", item);
146/// }
147/// Ok(())
148/// }
149/// }
150/// ```
151pub trait ItemWriter<O> {
152 /// Writes the given items.
153 ///
154 /// # Parameters
155 /// - `items`: A slice of items to write
156 ///
157 /// # Returns
158 /// - `Ok(())` when items are successfully written
159 /// - `Err(BatchError)` when an error occurs during writing
160 fn write(&self, items: &[O]) -> ItemWriterResult;
161
162 /// Flushes any buffered data.
163 ///
164 /// This method is called after a chunk of items has been written, and
165 /// allows the writer to flush any internally buffered data to the destination.
166 ///
167 /// # Default Implementation
168 ///
169 /// The default implementation does nothing and returns `Ok(())`.
170 ///
171 /// # Returns
172 /// - `Ok(())` when the flush operation succeeds
173 /// - `Err(BatchError)` when an error occurs during flushing
174 fn flush(&self) -> ItemWriterResult {
175 Ok(())
176 }
177
178 /// Opens the writer.
179 ///
180 /// This method is called before any items are written, and allows the writer
181 /// to initialize any resources it needs.
182 ///
183 /// # Default Implementation
184 ///
185 /// The default implementation does nothing and returns `Ok(())`.
186 ///
187 /// # Returns
188 /// - `Ok(())` when the open operation succeeds
189 /// - `Err(BatchError)` when an error occurs during opening
190 fn open(&self) -> ItemWriterResult {
191 Ok(())
192 }
193
194 /// Closes the writer.
195 ///
196 /// This method is called after all items have been written, and allows the writer
197 /// to release any resources it acquired.
198 ///
199 /// # Default Implementation
200 ///
201 /// The default implementation does nothing and returns `Ok(())`.
202 ///
203 /// # Returns
204 /// - `Ok(())` when the close operation succeeds
205 /// - `Err(BatchError)` when an error occurs during closing
206 fn close(&self) -> ItemWriterResult {
207 Ok(())
208 }
209}
210
211/// A pass-through processor that returns items unchanged.
212///
213/// This processor implements the identity function for batch processing pipelines.
214/// It takes an input item and returns it unchanged, making it useful for scenarios
215/// where you need a processor in the pipeline but don't want to transform the data.
216///
217/// # Type Parameters
218///
219/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
220///
221/// # Use Cases
222///
223/// - Testing batch processing pipelines without data transformation
224/// - Placeholder processor during development
225/// - Pipelines where processing logic is conditional and sometimes bypassed
226/// - Maintaining consistent pipeline structure when transformation is optional
227///
228/// # Performance
229///
230/// This processor performs a clone operation on each item. For large or complex
231/// data structures, consider whether pass-through processing is necessary or if
232/// the pipeline can be restructured to avoid unnecessary cloning.
233///
234/// # Examples
235///
236/// ```
237/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
238///
239/// let processor = PassThroughProcessor::<String>::new();
240/// let input = "Hello, World!".to_string();
241/// let result = processor.process(&input).unwrap();
242/// assert_eq!(result, input);
243/// ```
244///
245/// Using with different data types:
246///
247/// ```
248/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
249///
250/// // With integers
251/// let int_processor = PassThroughProcessor::<i32>::new();
252/// let number = 42;
253/// let result = int_processor.process(&number).unwrap();
254/// assert_eq!(result, number);
255///
256/// // With custom structs
257/// #[derive(Clone, PartialEq, Debug)]
258/// struct Person {
259/// name: String,
260/// age: u32,
261/// }
262///
263/// let person_processor = PassThroughProcessor::<Person>::new();
264/// let person = Person {
265/// name: "Alice".to_string(),
266/// age: 30,
267/// };
268/// let result = person_processor.process(&person).unwrap();
269/// assert_eq!(result, person);
270/// ```
271#[derive(Default)]
272pub struct PassThroughProcessor<T> {
273 _phantom: std::marker::PhantomData<T>,
274}
275
276impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
277 /// Processes an item by returning it unchanged.
278 ///
279 /// # Parameters
280 /// - `item`: The item to process (will be cloned and returned unchanged)
281 ///
282 /// # Returns
283 /// - `Ok(cloned_item)` - Always succeeds and returns a clone of the input item
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
289 ///
290 /// let processor = PassThroughProcessor::<Vec<i32>>::new();
291 /// let input = vec![1, 2, 3];
292 /// let result = processor.process(&input).unwrap();
293 /// assert_eq!(result, input);
294 /// ```
295 fn process(&self, item: &T) -> ItemProcessorResult<T> {
296 Ok(item.clone())
297 }
298}
299
300impl<T: Clone> PassThroughProcessor<T> {
301 /// Creates a new `PassThroughProcessor`.
302 ///
303 /// # Returns
304 /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
305 ///
306 /// # Examples
307 ///
308 /// ```
309 /// use spring_batch_rs::core::item::PassThroughProcessor;
310 ///
311 /// let processor = PassThroughProcessor::<String>::new();
312 /// ```
313 pub fn new() -> Self {
314 Self {
315 _phantom: std::marker::PhantomData,
316 }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn should_create_new_pass_through_processor() {
326 let _processor = PassThroughProcessor::<String>::new();
327 // Test that we can create the processor without panicking
328 // Verify it's a zero-sized type (only contains PhantomData)
329 assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
330 }
331
332 #[test]
333 fn should_create_pass_through_processor_with_default() {
334 let _processor = PassThroughProcessor::<i32>::default();
335 // Test that we can create the processor using Default trait
336 // Verify it's a zero-sized type (only contains PhantomData)
337 assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
338 }
339
340 #[test]
341 fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
342 let processor = PassThroughProcessor::new();
343 let input = "Hello, World!".to_string();
344 let expected = input.clone();
345
346 let result = processor.process(&input)?;
347
348 assert_eq!(result, expected);
349 assert_eq!(result, input);
350 Ok(())
351 }
352
353 #[test]
354 fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
355 let processor = PassThroughProcessor::new();
356 let input = 42i32;
357
358 let result = processor.process(&input)?;
359
360 assert_eq!(result, input);
361 Ok(())
362 }
363
364 #[test]
365 fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
366 let processor = PassThroughProcessor::new();
367 let input = vec![1, 2, 3, 4, 5];
368 let expected = input.clone();
369
370 let result = processor.process(&input)?;
371
372 assert_eq!(result, expected);
373 assert_eq!(result, input);
374 Ok(())
375 }
376
377 #[test]
378 fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
379 #[derive(Clone, PartialEq, Debug)]
380 struct TestData {
381 id: u32,
382 name: String,
383 values: Vec<f64>,
384 }
385
386 let processor = PassThroughProcessor::new();
387 let input = TestData {
388 id: 123,
389 name: "Test Item".to_string(),
390 values: vec![1.1, 2.2, 3.3],
391 };
392 let expected = input.clone();
393
394 let result = processor.process(&input)?;
395
396 assert_eq!(result, expected);
397 assert_eq!(result.id, input.id);
398 assert_eq!(result.name, input.name);
399 assert_eq!(result.values, input.values);
400 Ok(())
401 }
402
403 #[test]
404 fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
405 let processor = PassThroughProcessor::new();
406
407 // Test with Some value
408 let input_some = Some("test".to_string());
409 let result_some = processor.process(&input_some)?;
410 assert_eq!(result_some, input_some);
411
412 // Test with None value
413 let input_none: Option<String> = None;
414 let result_none = processor.process(&input_none)?;
415 assert_eq!(result_none, input_none);
416
417 Ok(())
418 }
419
420 #[test]
421 fn should_handle_empty_collections() -> Result<(), BatchError> {
422 // Test empty vector
423 let vec_processor = PassThroughProcessor::new();
424 let empty_vec: Vec<i32> = vec![];
425 let result_vec = vec_processor.process(&empty_vec)?;
426 assert_eq!(result_vec, empty_vec);
427 assert!(result_vec.is_empty());
428
429 // Test empty string
430 let string_processor = PassThroughProcessor::new();
431 let empty_string = String::new();
432 let result_string = string_processor.process(&empty_string)?;
433 assert_eq!(result_string, empty_string);
434 assert!(result_string.is_empty());
435
436 Ok(())
437 }
438
439 #[test]
440 fn should_clone_input_not_move() {
441 let processor = PassThroughProcessor::new();
442 let input = "original".to_string();
443 let input_copy = input.clone();
444
445 let _result = processor.process(&input).unwrap();
446
447 // Original input should still be accessible (not moved)
448 assert_eq!(input, input_copy);
449 assert_eq!(input, "original");
450 }
451
452 #[test]
453 fn should_work_with_multiple_processors() -> Result<(), BatchError> {
454 let processor1 = PassThroughProcessor::<String>::new();
455 let processor2 = PassThroughProcessor::<String>::new();
456
457 let input = "test data".to_string();
458 let result1 = processor1.process(&input)?;
459 let result2 = processor2.process(&result1)?;
460
461 assert_eq!(result2, input);
462 assert_eq!(result1, result2);
463 Ok(())
464 }
465
466 #[test]
467 fn should_handle_large_data_structures() -> Result<(), BatchError> {
468 let processor = PassThroughProcessor::new();
469
470 // Create a large vector
471 let large_input: Vec<i32> = (0..10000).collect();
472 let expected = large_input.clone();
473
474 let result = processor.process(&large_input)?;
475
476 assert_eq!(result.len(), expected.len());
477 assert_eq!(result, expected);
478 Ok(())
479 }
480
481 #[test]
482 fn should_use_default_flush_open_close_implementations() {
483 struct MinimalWriter;
484 impl ItemWriter<String> for MinimalWriter {
485 fn write(&self, _: &[String]) -> ItemWriterResult {
486 Ok(())
487 }
488 // flush, open, close use the trait's default implementations
489 }
490 let w = MinimalWriter;
491 assert!(w.flush().is_ok(), "default flush should return Ok");
492 assert!(w.open().is_ok(), "default open should return Ok");
493 assert!(w.close().is_ok(), "default close should return Ok");
494 }
495}