Skip to main content

spring_batch_rs/core/
item.rs

1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50///     items: Vec<String>,
51///     position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55///     fn read(&mut self) -> ItemReaderResult<String> {
56///         if self.position < self.items.len() {
57///             let item = self.items[self.position].clone();
58///             self.position += 1;
59///             Ok(Some(item))
60///         } else {
61///             Ok(None) // End of data
62///         }
63///     }
64/// }
65/// ```
66pub trait ItemReader<I> {
67    /// Reads an item from the reader.
68    ///
69    /// # Returns
70    /// - `Ok(Some(item))` when an item is successfully read
71    /// - `Ok(None)` when there are no more items to read (end of data)
72    /// - `Err(BatchError)` when an error occurs during reading
73    fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`crate::core::step::StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// #[derive(Clone)]
107/// struct Person { name: String, age: u32 }
108///
109/// impl ItemProcessor<Person, Person> for AdultFilter {
110///     fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
111///         if item.age >= 18 {
112///             Ok(Some(item.clone())) // keep adults
113///         } else {
114///             Ok(None) // filter out minors
115///         }
116///     }
117/// }
118/// ```
119pub trait ItemProcessor<I, O> {
120    /// Processes an item and returns the processed result.
121    ///
122    /// # Parameters
123    /// - `item`: The item to process
124    ///
125    /// # Returns
126    /// - `Ok(Some(processed_item))` when the item is successfully processed
127    /// - `Ok(None)` when the item is intentionally filtered out
128    /// - `Err(BatchError)` when an error occurs during processing
129    fn process(&self, item: &I) -> ItemProcessorResult<O>;
130}
131
132/// A trait for writing items.
133///
134/// This trait defines the contract for components that write items to a data destination.
135/// It is one of the fundamental building blocks of the batch processing pipeline.
136///
137/// # Design Pattern
138///
139/// This follows the Strategy Pattern, allowing different writing strategies to be
140/// interchangeable while maintaining a consistent interface.
141///
142/// # Lifecycle Methods
143///
144/// This trait includes additional lifecycle methods:
145/// - `flush()`: Flushes any buffered data
146/// - `open()`: Initializes resources before writing starts
147/// - `close()`: Releases resources after writing completes
148///
149/// # Example
150///
151/// ```
152/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
153/// use spring_batch_rs::error::BatchError;
154///
155/// struct ConsoleWriter;
156///
157/// impl ItemWriter<String> for ConsoleWriter {
158///     fn write(&self, items: &[String]) -> ItemWriterResult {
159///         for item in items {
160///             println!("{}", item);
161///         }
162///         Ok(())
163///     }
164/// }
165/// ```
166pub trait ItemWriter<O> {
167    /// Writes the given items.
168    ///
169    /// # Parameters
170    /// - `items`: A slice of items to write
171    ///
172    /// # Returns
173    /// - `Ok(())` when items are successfully written
174    /// - `Err(BatchError)` when an error occurs during writing
175    fn write(&self, items: &[O]) -> ItemWriterResult;
176
177    /// Flushes any buffered data.
178    ///
179    /// This method is called after a chunk of items has been written, and
180    /// allows the writer to flush any internally buffered data to the destination.
181    ///
182    /// # Default Implementation
183    ///
184    /// The default implementation does nothing and returns `Ok(())`.
185    ///
186    /// # Returns
187    /// - `Ok(())` when the flush operation succeeds
188    /// - `Err(BatchError)` when an error occurs during flushing
189    fn flush(&self) -> ItemWriterResult {
190        Ok(())
191    }
192
193    /// Opens the writer.
194    ///
195    /// This method is called before any items are written, and allows the writer
196    /// to initialize any resources it needs.
197    ///
198    /// # Default Implementation
199    ///
200    /// The default implementation does nothing and returns `Ok(())`.
201    ///
202    /// # Returns
203    /// - `Ok(())` when the open operation succeeds
204    /// - `Err(BatchError)` when an error occurs during opening
205    fn open(&self) -> ItemWriterResult {
206        Ok(())
207    }
208
209    /// Closes the writer.
210    ///
211    /// This method is called after all items have been written, and allows the writer
212    /// to release any resources it acquired.
213    ///
214    /// # Default Implementation
215    ///
216    /// The default implementation does nothing and returns `Ok(())`.
217    ///
218    /// # Returns
219    /// - `Ok(())` when the close operation succeeds
220    /// - `Err(BatchError)` when an error occurs during closing
221    fn close(&self) -> ItemWriterResult {
222        Ok(())
223    }
224}
225
226/// A pass-through processor that returns items unchanged.
227///
228/// This processor implements the identity function for batch processing pipelines.
229/// It takes an input item and returns it unchanged, making it useful for scenarios
230/// where you need a processor in the pipeline but don't want to transform the data.
231///
232/// # Type Parameters
233///
234/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
235///
236/// # Use Cases
237///
238/// - Testing batch processing pipelines without data transformation
239/// - Placeholder processor during development
240/// - Pipelines where processing logic is conditional and sometimes bypassed
241/// - Maintaining consistent pipeline structure when transformation is optional
242///
243/// # Performance
244///
245/// This processor performs a clone operation on each item. For large or complex
246/// data structures, consider whether pass-through processing is necessary or if
247/// the pipeline can be restructured to avoid unnecessary cloning.
248///
249/// # Examples
250///
251/// ```
252/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
253///
254/// let processor = PassThroughProcessor::<String>::new();
255/// let input = "Hello, World!".to_string();
256/// let result = processor.process(&input).unwrap();
257/// assert_eq!(result, Some(input));
258/// ```
259///
260/// Using with different data types:
261///
262/// ```
263/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
264///
265/// // With integers
266/// let int_processor = PassThroughProcessor::<i32>::new();
267/// let number = 42;
268/// let result = int_processor.process(&number).unwrap();
269/// assert_eq!(result, Some(number));
270///
271/// // With custom structs
272/// #[derive(Clone, PartialEq, Debug)]
273/// struct Person {
274///     name: String,
275///     age: u32,
276/// }
277///
278/// let person_processor = PassThroughProcessor::<Person>::new();
279/// let person = Person {
280///     name: "Alice".to_string(),
281///     age: 30,
282/// };
283/// let result = person_processor.process(&person).unwrap();
284/// assert_eq!(result, Some(person));
285/// ```
286#[derive(Default)]
287pub struct PassThroughProcessor<T> {
288    _phantom: std::marker::PhantomData<T>,
289}
290
291impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
292    /// Processes an item by returning it unchanged.
293    ///
294    /// # Parameters
295    /// - `item`: The item to process (will be cloned and returned unchanged)
296    ///
297    /// # Returns
298    /// - `Ok(Some(cloned_item))` - Always succeeds and returns a clone of the input item
299    ///
300    /// # Examples
301    ///
302    /// ```
303    /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
304    ///
305    /// let processor = PassThroughProcessor::<Vec<i32>>::new();
306    /// let input = vec![1, 2, 3];
307    /// let result = processor.process(&input).unwrap();
308    /// assert_eq!(result, Some(input));
309    /// ```
310    fn process(&self, item: &T) -> ItemProcessorResult<T> {
311        Ok(Some(item.clone()))
312    }
313}
314
315impl<T: Clone> PassThroughProcessor<T> {
316    /// Creates a new `PassThroughProcessor`.
317    ///
318    /// # Returns
319    /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use spring_batch_rs::core::item::PassThroughProcessor;
325    ///
326    /// let processor = PassThroughProcessor::<String>::new();
327    /// ```
328    pub fn new() -> Self {
329        Self {
330            _phantom: std::marker::PhantomData,
331        }
332    }
333}
334
335/// A composite processor that chains two processors sequentially using static dispatch.
336///
337/// The output of the first processor becomes the input of the second.
338/// If the first processor filters an item (returns `Ok(None)`), the chain
339/// stops immediately and `Ok(None)` is returned — the second processor is
340/// never called.
341///
342/// Both processors are stored by value — no heap allocation occurs inside the
343/// struct itself. This mirrors the pattern used by standard library iterator
344/// adapters such as [`std::iter::Chain`].
345///
346/// Construct chains using [`CompositeItemProcessorBuilder`] rather than
347/// instantiating this struct directly.
348///
349/// # Type Parameters
350///
351/// - `P1`: The first processor type. Must implement `ItemProcessor<I, M>` for
352///   some input type `I` and intermediate type `M`.
353/// - `P2`: The second processor type. Must implement `ItemProcessor<M, O>` where
354///   `M` is the output type of `P1` and `O` is the final output type.
355/// - `M`: The intermediate type — output of `P1`, input of `P2`. Tracked via
356///   `PhantomData` so it participates in type inference without being stored.
357///
358/// # Examples
359///
360/// ```
361/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
362/// use spring_batch_rs::BatchError;
363///
364/// struct DoubleProcessor;
365/// impl ItemProcessor<i32, i32> for DoubleProcessor {
366///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
367///         Ok(Some(item * 2))
368///     }
369/// }
370///
371/// struct ToStringProcessor;
372/// impl ItemProcessor<i32, String> for ToStringProcessor {
373///     fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
374///         Ok(Some(item.to_string()))
375///     }
376/// }
377///
378/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
379///     .link(ToStringProcessor)
380///     .build();
381///
382/// // 21 * 2 = 42, then converted to "42"
383/// assert_eq!(composite.process(&21).unwrap(), Some("42".to_string()));
384/// ```
385///
386/// # Errors
387///
388/// Returns [`BatchError`] if any processor in the chain returns an error.
389pub struct CompositeItemProcessor<P1, P2, M> {
390    first: P1,
391    second: P2,
392    /// Tracks the intermediate type `M` (output of `P1`, input of `P2`).
393    /// Uses `fn(M) -> M` to keep the type parameter invariant and avoid
394    /// unintended variance.
395    _marker: std::marker::PhantomData<fn(M) -> M>,
396}
397
398impl<I, M, O, P1, P2> ItemProcessor<I, O> for CompositeItemProcessor<P1, P2, M>
399where
400    P1: ItemProcessor<I, M>,
401    P2: ItemProcessor<M, O>,
402{
403    /// Applies the first processor, then — if the result is `Some` — applies
404    /// the second. Returns `Ok(None)` immediately if the first processor
405    /// filters the item.
406    ///
407    /// # Errors
408    ///
409    /// Returns [`BatchError`] if either processor fails.
410    fn process(&self, item: &I) -> ItemProcessorResult<O> {
411        match self.first.process(item)? {
412            Some(intermediate) => self.second.process(&intermediate),
413            None => Ok(None),
414        }
415    }
416}
417
418/// Builder for creating a chain of [`ItemProcessor`]s using static dispatch.
419///
420/// Start the chain with [`new`](CompositeItemProcessorBuilder::new), append
421/// processors with [`link`](CompositeItemProcessorBuilder::link), and finalise
422/// with [`build`](CompositeItemProcessorBuilder::build). Each call to `link`
423/// wraps the accumulated chain in a [`CompositeItemProcessor`], changing the
424/// output type. Mismatched types are caught at compile time.
425///
426/// The built chain stores all processors by value — no heap allocations occur
427/// inside the processor itself. The type of the built value encodes the full
428/// chain structure (e.g. `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`),
429/// similar to how `Iterator` adapters compose in the standard library.
430///
431/// # Type Parameters
432///
433/// - `P`: The accumulated processor type. Starts as the first processor and
434///   is wrapped in [`CompositeItemProcessor`] with each [`link`](CompositeItemProcessorBuilder::link) call.
435///
436/// # Examples
437///
438/// Two processors (`i32 → i32 → String`):
439///
440/// ```
441/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
442/// use spring_batch_rs::BatchError;
443///
444/// struct DoubleProcessor;
445/// impl ItemProcessor<i32, i32> for DoubleProcessor {
446///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
447///         Ok(Some(item * 2))
448///     }
449/// }
450///
451/// struct ToStringProcessor;
452/// impl ItemProcessor<i32, String> for ToStringProcessor {
453///     fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
454///         Ok(Some(item.to_string()))
455///     }
456/// }
457///
458/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
459///     .link(ToStringProcessor)
460///     .build();
461///
462/// assert_eq!(composite.process(&21).unwrap(), Some("42".to_string()));
463/// ```
464///
465/// Three processors (`i32 → i32 → i32 → String`):
466///
467/// ```
468/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
469/// use spring_batch_rs::BatchError;
470///
471/// struct AddOneProcessor;
472/// impl ItemProcessor<i32, i32> for AddOneProcessor {
473///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
474///         Ok(Some(item + 1))
475///     }
476/// }
477///
478/// struct DoubleProcessor;
479/// impl ItemProcessor<i32, i32> for DoubleProcessor {
480///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
481///         Ok(Some(item * 2))
482///     }
483/// }
484///
485/// struct ToStringProcessor;
486/// impl ItemProcessor<i32, String> for ToStringProcessor {
487///     fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
488///         Ok(Some(item.to_string()))
489///     }
490/// }
491///
492/// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
493///     .link(DoubleProcessor)
494///     .link(ToStringProcessor)
495///     .build();
496///
497/// // (4 + 1) * 2 = 10 → "10"
498/// assert_eq!(composite.process(&4).unwrap(), Some("10".to_string()));
499/// ```
500pub struct CompositeItemProcessorBuilder<P> {
501    processor: P,
502}
503
504impl<P> CompositeItemProcessorBuilder<P> {
505    /// Creates a new builder with the given processor as the first in the chain.
506    ///
507    /// # Parameters
508    ///
509    /// - `first`: The first processor in the chain.
510    ///
511    /// # Examples
512    ///
513    /// ```
514    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
515    /// use spring_batch_rs::BatchError;
516    ///
517    /// struct UppercaseProcessor;
518    /// impl ItemProcessor<String, String> for UppercaseProcessor {
519    ///     fn process(&self, item: &String) -> Result<Option<String>, BatchError> {
520    ///         Ok(Some(item.to_uppercase()))
521    ///     }
522    /// }
523    ///
524    /// let builder = CompositeItemProcessorBuilder::new(UppercaseProcessor);
525    /// let composite = builder.build();
526    /// assert_eq!(composite.process(&"hello".to_string()).unwrap(), Some("HELLO".to_string()));
527    /// ```
528    pub fn new(first: P) -> Self {
529        Self { processor: first }
530    }
531
532    /// Appends a processor to the end of the chain.
533    ///
534    /// Returns a new builder whose accumulated type is
535    /// `CompositeItemProcessor<P, P2>`. The input/output types are verified
536    /// at compile time when the chain is used.
537    ///
538    /// # Type Parameters
539    ///
540    /// - `P2`: The processor type to append.
541    /// - `M`: The intermediate type connecting `P` and `P2`. Inferred by the
542    ///   compiler from the `ItemProcessor` impls on `P` and `P2`.
543    ///
544    /// # Parameters
545    ///
546    /// - `next`: The processor to append to the chain.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
552    /// use spring_batch_rs::BatchError;
553    ///
554    /// struct AddOneProcessor;
555    /// impl ItemProcessor<i32, i32> for AddOneProcessor {
556    ///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
557    ///         Ok(Some(item + 1))
558    ///     }
559    /// }
560    ///
561    /// struct ToStringProcessor;
562    /// impl ItemProcessor<i32, String> for ToStringProcessor {
563    ///     fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
564    ///         Ok(Some(item.to_string()))
565    ///     }
566    /// }
567    ///
568    /// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
569    ///     .link(ToStringProcessor)
570    ///     .build();
571    ///
572    /// assert_eq!(composite.process(&41).unwrap(), Some("42".to_string()));
573    /// ```
574    pub fn link<P2, M>(
575        self,
576        next: P2,
577    ) -> CompositeItemProcessorBuilder<CompositeItemProcessor<P, P2, M>> {
578        CompositeItemProcessorBuilder {
579            processor: CompositeItemProcessor {
580                first: self.processor,
581                second: next,
582                _marker: std::marker::PhantomData,
583            },
584        }
585    }
586
587    /// Builds and returns the composite processor.
588    ///
589    /// Returns the accumulated processor value `P`. When chained via `link`,
590    /// `P` will be a nested `CompositeItemProcessor` such as
591    /// `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`.
592    ///
593    /// Pass `&composite` to the step builder's `.processor()` method — Rust
594    /// will coerce it to `&dyn ItemProcessor<I, O>` automatically.
595    ///
596    /// # Examples
597    ///
598    /// ```
599    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
600    /// use spring_batch_rs::BatchError;
601    ///
602    /// struct DoubleProcessor;
603    /// impl ItemProcessor<i32, i32> for DoubleProcessor {
604    ///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
605    ///         Ok(Some(item * 2))
606    ///     }
607    /// }
608    ///
609    /// struct AddTenProcessor;
610    /// impl ItemProcessor<i32, i32> for AddTenProcessor {
611    ///     fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
612    ///         Ok(Some(item + 10))
613    ///     }
614    /// }
615    ///
616    /// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
617    ///     .link(AddTenProcessor)
618    ///     .build();
619    ///
620    /// // 5 * 2 = 10, then 10 + 10 = 20
621    /// assert_eq!(composite.process(&5).unwrap(), Some(20));
622    /// ```
623    pub fn build(self) -> P {
624        self.processor
625    }
626}
627
628/// A composite writer that fans out the same chunk to two writers sequentially using static dispatch.
629///
630/// Both writers receive identical slices on every `write` call. All four lifecycle
631/// methods (`write`, `flush`, `open`, `close`) are forwarded to `first` then `second`,
632/// short-circuiting on the first `Err`. If `open()` on `first` fails, `second.open()`
633/// is never called — lifecycle management is the step's responsibility.
634///
635/// Both writers are stored by value — no heap allocation occurs inside the struct.
636/// The type encodes the full chain:
637/// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>` for three writers.
638///
639/// Prefer constructing instances via [`CompositeItemWriterBuilder`] rather than
640/// direct struct literal syntax.
641///
642/// # Type Parameters
643///
644/// - `W1`: The first writer type. Must implement `ItemWriter<T>`.
645/// - `W2`: The second writer type. Must implement `ItemWriter<T>`.
646///
647/// # Examples
648///
649/// ```
650/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
651/// use std::rc::Rc;
652/// use std::cell::Cell;
653///
654/// struct CountingWriter { count: Rc<Cell<usize>> }
655/// impl CountingWriter {
656///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
657/// }
658/// impl ItemWriter<i32> for CountingWriter {
659///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
660///         self.count.set(self.count.get() + items.len());
661///         Ok(())
662///     }
663/// }
664///
665/// let c1 = Rc::new(Cell::new(0usize));
666/// let c2 = Rc::new(Cell::new(0usize));
667/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
668///     .link(CountingWriter::new(c2.clone()))
669///     .build();
670/// composite.write(&[1, 2, 3]).unwrap();
671/// assert_eq!(c1.get(), 3);
672/// assert_eq!(c2.get(), 3);
673/// ```
674///
675/// # Errors
676///
677/// Returns [`BatchError`] if any writer in the chain returns an error.
678pub struct CompositeItemWriter<W1, W2> {
679    first: W1,
680    second: W2,
681}
682
683impl<T, W1, W2> ItemWriter<T> for CompositeItemWriter<W1, W2>
684where
685    W1: ItemWriter<T>,
686    W2: ItemWriter<T>,
687{
688    /// Writes `items` to `first`, then to `second`. Short-circuits on the first error.
689    ///
690    /// # Errors
691    ///
692    /// Returns [`BatchError::ItemWriter`] if either writer fails.
693    fn write(&self, items: &[T]) -> ItemWriterResult {
694        self.first.write(items)?;
695        self.second.write(items)
696    }
697
698    /// Flushes both writers regardless of errors. Returns the first error encountered.
699    ///
700    /// Both `first` and `second` are always flushed, even if `first` fails,
701    /// to avoid silently skipping buffered output in the second writer.
702    ///
703    /// # Errors
704    ///
705    /// Returns [`BatchError::ItemWriter`] if either flush fails. If both fail,
706    /// the error from `first` is returned.
707    fn flush(&self) -> ItemWriterResult {
708        let r1 = self.first.flush();
709        let r2 = self.second.flush();
710        r1.and(r2)
711    }
712
713    /// Opens `first`, then `second`. Short-circuits on the first error.
714    ///
715    /// # Errors
716    ///
717    /// Returns [`BatchError::ItemWriter`] if either open fails.
718    fn open(&self) -> ItemWriterResult {
719        self.first.open()?;
720        self.second.open()
721    }
722
723    /// Closes both writers regardless of errors. Returns the first error encountered.
724    ///
725    /// Both `first` and `second` are always closed, even if `first` fails,
726    /// to avoid resource leaks.
727    ///
728    /// # Errors
729    ///
730    /// Returns [`BatchError::ItemWriter`] if either close fails. If both fail,
731    /// the error from `first` is returned.
732    fn close(&self) -> ItemWriterResult {
733        let r1 = self.first.close();
734        let r2 = self.second.close();
735        r1.and(r2)
736    }
737}
738
739/// Builder for creating a fan-out chain of [`ItemWriter`]s using static dispatch.
740///
741/// Start the chain with [`new`](CompositeItemWriterBuilder::new), append writers
742/// with [`add`](CompositeItemWriterBuilder::add), and finalise with
743/// [`build`](CompositeItemWriterBuilder::build). Each call to `add` wraps the
744/// accumulated chain in a [`CompositeItemWriter`]. The built chain stores all
745/// writers by value — no heap allocations occur inside the chain itself.
746///
747/// # Type Parameters
748///
749/// - `W`: The accumulated writer type. Starts as the first writer and is wrapped
750///   in [`CompositeItemWriter`] with each [`add`](CompositeItemWriterBuilder::add) call.
751///
752/// # Examples
753///
754/// Two writers:
755///
756/// ```
757/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
758/// use std::rc::Rc;
759/// use std::cell::Cell;
760///
761/// struct CountingWriter { count: Rc<Cell<usize>> }
762/// impl CountingWriter {
763///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
764/// }
765/// impl ItemWriter<i32> for CountingWriter {
766///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
767///         self.count.set(self.count.get() + items.len());
768///         Ok(())
769///     }
770/// }
771///
772/// let c1 = Rc::new(Cell::new(0usize));
773/// let c2 = Rc::new(Cell::new(0usize));
774/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
775///     .link(CountingWriter::new(c2.clone()))
776///     .build();
777///
778/// composite.write(&[1, 2, 3]).unwrap();
779/// assert_eq!(c1.get(), 3);
780/// assert_eq!(c2.get(), 3);
781/// ```
782///
783/// Three writers:
784///
785/// ```
786/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
787/// use std::rc::Rc;
788/// use std::cell::Cell;
789///
790/// struct CountingWriter { count: Rc<Cell<usize>> }
791/// impl CountingWriter {
792///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
793/// }
794/// impl ItemWriter<i32> for CountingWriter {
795///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
796///         self.count.set(self.count.get() + items.len());
797///         Ok(())
798///     }
799/// }
800///
801/// let c1 = Rc::new(Cell::new(0usize));
802/// let c2 = Rc::new(Cell::new(0usize));
803/// let c3 = Rc::new(Cell::new(0usize));
804/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
805///     .link(CountingWriter::new(c2.clone()))
806///     .link(CountingWriter::new(c3.clone()))
807///     .build();
808///
809/// composite.write(&[1, 2]).unwrap();
810/// assert_eq!(c1.get(), 2);
811/// assert_eq!(c2.get(), 2);
812/// assert_eq!(c3.get(), 2);
813/// ```
814pub struct CompositeItemWriterBuilder<W> {
815    writer: W,
816}
817
818impl<W> CompositeItemWriterBuilder<W> {
819    /// Creates a new builder with the given writer as the first delegate.
820    ///
821    /// # Parameters
822    ///
823    /// - `first`: The first writer in the fan-out chain.
824    ///
825    /// # Examples
826    ///
827    /// ```
828    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
829    ///
830    /// struct CountingWriter { count: std::cell::Cell<usize> }
831    /// impl CountingWriter { fn new() -> Self { Self { count: std::cell::Cell::new(0) } } }
832    /// impl ItemWriter<i32> for CountingWriter {
833    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
834    ///         self.count.set(self.count.get() + items.len());
835    ///         Ok(())
836    ///     }
837    /// }
838    ///
839    /// let writer = CompositeItemWriterBuilder::new(CountingWriter::new()).build();
840    /// writer.write(&[1, 2, 3]).unwrap();
841    /// assert_eq!(writer.count.get(), 3, "writer should receive all items");
842    /// ```
843    pub fn new(first: W) -> Self {
844        Self { writer: first }
845    }
846
847    /// Appends a writer to the fan-out chain.
848    ///
849    /// Returns a new builder whose accumulated type is `CompositeItemWriter<W, W2>`.
850    /// Both writers must implement `ItemWriter<T>` for the same `T` — verified at
851    /// compile time.
852    ///
853    /// # Parameters
854    ///
855    /// - `next`: The writer to link into the chain.
856    ///
857    /// # Examples
858    ///
859    /// ```
860    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
861    /// use std::rc::Rc;
862    /// use std::cell::Cell;
863    ///
864    /// struct CountingWriter { count: Rc<Cell<usize>> }
865    /// impl CountingWriter {
866    ///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
867    /// }
868    /// impl ItemWriter<i32> for CountingWriter {
869    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
870    ///         self.count.set(self.count.get() + items.len());
871    ///         Ok(())
872    ///     }
873    /// }
874    ///
875    /// let c1 = Rc::new(Cell::new(0usize));
876    /// let c2 = Rc::new(Cell::new(0usize));
877    /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
878    ///     .link(CountingWriter::new(c2.clone()))
879    ///     .build();
880    ///
881    /// composite.write(&[1, 2, 3]).unwrap();
882    /// assert_eq!(c1.get(), 3, "first writer should receive all items");
883    /// assert_eq!(c2.get(), 3, "second writer should receive all items");
884    /// ```
885    #[allow(clippy::should_implement_trait)]
886    pub fn link<W2>(self, next: W2) -> CompositeItemWriterBuilder<CompositeItemWriter<W, W2>> {
887        CompositeItemWriterBuilder {
888            writer: CompositeItemWriter {
889                first: self.writer,
890                second: next,
891            },
892        }
893    }
894
895    /// Builds and returns the composite writer.
896    ///
897    /// Returns the accumulated writer value `W`. When chained via `link`, `W` is a
898    /// nested `CompositeItemWriter` such as
899    /// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>`.
900    ///
901    /// Pass `&composite` to the step builder's `.writer()` method.
902    ///
903    /// # Examples
904    ///
905    /// ```
906    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
907    /// use std::rc::Rc;
908    /// use std::cell::Cell;
909    ///
910    /// struct CountingWriter { count: Rc<Cell<usize>> }
911    /// impl CountingWriter {
912    ///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
913    /// }
914    /// impl ItemWriter<i32> for CountingWriter {
915    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
916    ///         self.count.set(self.count.get() + items.len());
917    ///         Ok(())
918    ///     }
919    /// }
920    ///
921    /// let c1 = Rc::new(Cell::new(0usize));
922    /// let c2 = Rc::new(Cell::new(0usize));
923    /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
924    ///     .link(CountingWriter::new(c2.clone()))
925    ///     .build();
926    ///
927    /// composite.write(&[1, 2, 3]).unwrap();
928    /// assert_eq!(c1.get(), 3);
929    /// assert_eq!(c2.get(), 3);
930    /// ```
931    pub fn build(self) -> W {
932        self.writer
933    }
934}
935
936/// Allows any `Box<P>` where `P: ItemProcessor<I, O>` to be used wherever
937/// `&dyn ItemProcessor<I, O>` is expected — including boxed concrete types
938/// (`Box<MyProcessor>`) and boxed trait objects (`Box<dyn ItemProcessor<I, O>>`).
939///
940/// The `?Sized` bound is what makes this cover trait objects: `dyn Trait` is
941/// unsized, so without `?Sized` the impl would not apply to them.
942impl<I, O, P: ItemProcessor<I, O> + ?Sized> ItemProcessor<I, O> for Box<P> {
943    fn process(&self, item: &I) -> ItemProcessorResult<O> {
944        (**self).process(item)
945    }
946}
947
948/// Allows any `Box<W>` where `W: ItemWriter<T>` to be used wherever
949/// `&dyn ItemWriter<T>` is expected — including boxed concrete types
950/// (`Box<MyWriter>`) and boxed trait objects (`Box<dyn ItemWriter<T>>`).
951///
952/// The `?Sized` bound makes this cover trait objects: `dyn Trait` is
953/// unsized, so without `?Sized` the impl would not apply to them.
954impl<T, W: ItemWriter<T> + ?Sized> ItemWriter<T> for Box<W> {
955    fn write(&self, items: &[T]) -> ItemWriterResult {
956        (**self).write(items)
957    }
958    fn flush(&self) -> ItemWriterResult {
959        (**self).flush()
960    }
961    fn open(&self) -> ItemWriterResult {
962        (**self).open()
963    }
964    fn close(&self) -> ItemWriterResult {
965        (**self).close()
966    }
967}
968
969#[cfg(test)]
970mod tests {
971    use super::*;
972
973    #[test]
974    fn should_create_new_pass_through_processor() {
975        let _processor = PassThroughProcessor::<String>::new();
976        // Test that we can create the processor without panicking
977        // Verify it's a zero-sized type (only contains PhantomData)
978        assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
979    }
980
981    #[test]
982    fn should_create_pass_through_processor_with_default() {
983        let _processor = PassThroughProcessor::<i32>::default();
984        // Test that we can create the processor using Default trait
985        // Verify it's a zero-sized type (only contains PhantomData)
986        assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
987    }
988
989    #[test]
990    fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
991        let processor = PassThroughProcessor::new();
992        let input = "Hello, World!".to_string();
993        let expected = input.clone();
994
995        let result = processor.process(&input)?;
996
997        assert_eq!(result, Some(expected));
998        Ok(())
999    }
1000
1001    #[test]
1002    fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
1003        let processor = PassThroughProcessor::new();
1004        let input = 42i32;
1005
1006        let result = processor.process(&input)?;
1007
1008        assert_eq!(result, Some(input));
1009        Ok(())
1010    }
1011
1012    #[test]
1013    fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
1014        let processor = PassThroughProcessor::new();
1015        let input = vec![1, 2, 3, 4, 5];
1016        let expected = input.clone();
1017
1018        let result = processor.process(&input)?;
1019
1020        assert_eq!(result, Some(expected));
1021        Ok(())
1022    }
1023
1024    #[test]
1025    fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
1026        #[derive(Clone, PartialEq, Debug)]
1027        struct TestData {
1028            id: u32,
1029            name: String,
1030            values: Vec<f64>,
1031        }
1032
1033        let processor = PassThroughProcessor::new();
1034        let input = TestData {
1035            id: 123,
1036            name: "Test Item".to_string(),
1037            values: vec![1.1, 2.2, 3.3],
1038        };
1039        let expected = input.clone();
1040
1041        let result = processor.process(&input)?;
1042
1043        assert_eq!(result, Some(expected));
1044        Ok(())
1045    }
1046
1047    #[test]
1048    fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
1049        let processor = PassThroughProcessor::new();
1050
1051        // Test with Some value
1052        let input_some = Some("test".to_string());
1053        let result_some = processor.process(&input_some)?;
1054        assert_eq!(result_some, Some(input_some));
1055
1056        // Test with None value
1057        let input_none: Option<String> = None;
1058        let result_none = processor.process(&input_none)?;
1059        assert_eq!(result_none, Some(input_none));
1060
1061        Ok(())
1062    }
1063
1064    #[test]
1065    fn should_handle_empty_collections() -> Result<(), BatchError> {
1066        let vec_processor = PassThroughProcessor::new();
1067        let empty_vec: Vec<i32> = vec![];
1068        let result_vec = vec_processor.process(&empty_vec)?;
1069        assert_eq!(result_vec, Some(empty_vec));
1070
1071        let string_processor = PassThroughProcessor::new();
1072        let empty_string = String::new();
1073        let result_string = string_processor.process(&empty_string)?;
1074        assert_eq!(result_string, Some(empty_string));
1075
1076        Ok(())
1077    }
1078
1079    #[test]
1080    fn should_clone_input_not_move() {
1081        let processor = PassThroughProcessor::new();
1082        let input = "original".to_string();
1083        let input_copy = input.clone();
1084
1085        let _result = processor.process(&input).unwrap();
1086
1087        assert_eq!(input, input_copy);
1088        assert_eq!(input, "original");
1089    }
1090
1091    #[test]
1092    fn should_work_with_multiple_processors() -> Result<(), BatchError> {
1093        let processor1 = PassThroughProcessor::<String>::new();
1094        let processor2 = PassThroughProcessor::<String>::new();
1095
1096        let input = "test data".to_string();
1097        let result1 = processor1.process(&input)?;
1098        let inner = result1.unwrap();
1099        let result2 = processor2.process(&inner)?;
1100
1101        assert_eq!(result2, Some(input));
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn should_handle_large_data_structures() -> Result<(), BatchError> {
1107        let processor = PassThroughProcessor::new();
1108
1109        let large_input: Vec<i32> = (0..10000).collect();
1110        let expected_len = large_input.len();
1111
1112        let result = processor.process(&large_input)?;
1113
1114        // PassThroughProcessor always returns Some — unwrap is safe
1115        assert_eq!(result.unwrap().len(), expected_len);
1116        Ok(())
1117    }
1118
1119    #[test]
1120    fn should_use_default_flush_open_close_implementations() {
1121        struct MinimalWriter;
1122        impl ItemWriter<String> for MinimalWriter {
1123            fn write(&self, _: &[String]) -> ItemWriterResult {
1124                Ok(())
1125            }
1126            // flush, open, close use the trait's default implementations
1127        }
1128        let w = MinimalWriter;
1129        assert!(w.flush().is_ok(), "default flush should return Ok");
1130        assert!(w.open().is_ok(), "default open should return Ok");
1131        assert!(w.close().is_ok(), "default close should return Ok");
1132    }
1133
1134    // --- CompositeItemProcessor / CompositeItemProcessorBuilder ---
1135
1136    struct DoubleProcessor;
1137    impl ItemProcessor<i32, i32> for DoubleProcessor {
1138        fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1139            Ok(Some(item * 2))
1140        }
1141    }
1142
1143    struct AddTenProcessor;
1144    impl ItemProcessor<i32, i32> for AddTenProcessor {
1145        fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1146            Ok(Some(item + 10))
1147        }
1148    }
1149
1150    struct ToStringProcessor;
1151    impl ItemProcessor<i32, String> for ToStringProcessor {
1152        fn process(&self, item: &i32) -> ItemProcessorResult<String> {
1153            Ok(Some(item.to_string()))
1154        }
1155    }
1156
1157    struct FilterEvenProcessor;
1158    impl ItemProcessor<i32, i32> for FilterEvenProcessor {
1159        fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1160            if item % 2 == 0 {
1161                Ok(Some(*item))
1162            } else {
1163                Ok(None) // filter odd numbers
1164            }
1165        }
1166    }
1167
1168    struct FailingProcessor;
1169    impl ItemProcessor<i32, i32> for FailingProcessor {
1170        fn process(&self, _item: &i32) -> ItemProcessorResult<i32> {
1171            Err(BatchError::ItemProcessor("forced failure".to_string()))
1172        }
1173    }
1174
1175    #[test]
1176    fn should_chain_two_same_type_processors() -> Result<(), BatchError> {
1177        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1178            .link(AddTenProcessor)
1179            .build();
1180
1181        // 5 * 2 = 10, then 10 + 10 = 20
1182        assert_eq!(
1183            composite.process(&5)?,
1184            Some(20),
1185            "5 * 2 + 10 should equal 20"
1186        );
1187        Ok(())
1188    }
1189
1190    #[test]
1191    fn should_chain_two_type_changing_processors() -> Result<(), BatchError> {
1192        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1193            .link(ToStringProcessor)
1194            .build();
1195
1196        // 21 * 2 = 42, then "42"
1197        assert_eq!(composite.process(&21)?, Some("42".to_string()));
1198        Ok(())
1199    }
1200
1201    #[test]
1202    fn should_chain_three_processors() -> Result<(), BatchError> {
1203        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1204            .link(AddTenProcessor)
1205            .link(ToStringProcessor)
1206            .build();
1207
1208        // 5 * 2 = 10, then 10 + 10 = 20, then "20"
1209        assert_eq!(composite.process(&5)?, Some("20".to_string()));
1210        Ok(())
1211    }
1212
1213    #[test]
1214    fn should_stop_chain_when_first_processor_filters_item() -> Result<(), BatchError> {
1215        let composite = CompositeItemProcessorBuilder::new(FilterEvenProcessor)
1216            .link(ToStringProcessor)
1217            .build();
1218
1219        // 3 is odd → filtered by first processor → second processor never called
1220        assert_eq!(
1221            composite.process(&3)?,
1222            None,
1223            "odd number should be filtered"
1224        );
1225        // 4 is even → passes through → converted to string
1226        assert_eq!(
1227            composite.process(&4)?,
1228            Some("4".to_string()),
1229            "even number should pass"
1230        );
1231        Ok(())
1232    }
1233
1234    #[test]
1235    fn should_propagate_error_from_first_processor() {
1236        let composite = CompositeItemProcessorBuilder::new(FailingProcessor)
1237            .link(ToStringProcessor)
1238            .build();
1239
1240        let result = composite.process(&1);
1241        assert!(
1242            result.is_err(),
1243            "error from first processor should propagate"
1244        );
1245    }
1246
1247    #[test]
1248    fn should_propagate_error_from_second_processor() {
1249        struct AlwaysFailI32;
1250        impl ItemProcessor<i32, i32> for AlwaysFailI32 {
1251            fn process(&self, _: &i32) -> ItemProcessorResult<i32> {
1252                Err(BatchError::ItemProcessor("second failed".to_string()))
1253            }
1254        }
1255
1256        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1257            .link(AlwaysFailI32)
1258            .build();
1259
1260        let result = composite.process(&5);
1261        assert!(
1262            result.is_err(),
1263            "error from second processor should propagate"
1264        );
1265    }
1266
1267    #[test]
1268    fn should_use_box_blanket_impl_as_item_processor() -> Result<(), BatchError> {
1269        // build() returns the concrete type; Box::new() it to get a trait object.
1270        // Box<dyn ItemProcessor<I, O>> implements ItemProcessor<I, O> via the ?Sized blanket impl.
1271        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1272            .link(ToStringProcessor)
1273            .build();
1274        let boxed: Box<dyn ItemProcessor<i32, String>> = Box::new(composite);
1275
1276        let result = boxed.process(&3)?;
1277        assert_eq!(
1278            result,
1279            Some("6".to_string()),
1280            "boxed trait object should delegate to inner processor"
1281        );
1282        Ok(())
1283    }
1284
1285    #[test]
1286    fn should_use_box_concrete_type_as_item_processor() -> Result<(), BatchError> {
1287        // Box<ConcreteProcessor> also implements ItemProcessor<I, O> via the ?Sized blanket impl
1288        let boxed: Box<DoubleProcessor> = Box::new(DoubleProcessor);
1289
1290        let result = boxed.process(&7)?;
1291        assert_eq!(
1292            result,
1293            Some(14),
1294            "boxed concrete processor should delegate to inner processor"
1295        );
1296        Ok(())
1297    }
1298
1299    // --- CompositeItemWriter ---
1300
1301    use std::cell::Cell;
1302
1303    struct RecordingWriter {
1304        write_calls: Cell<usize>,
1305        items_written: Cell<usize>,
1306        open_calls: Cell<usize>,
1307        close_calls: Cell<usize>,
1308        flush_calls: Cell<usize>,
1309        fail_write: bool,
1310        fail_open: bool,
1311        fail_flush: bool,
1312        fail_close: bool,
1313    }
1314
1315    impl RecordingWriter {
1316        fn new() -> Self {
1317            Self {
1318                write_calls: Cell::new(0),
1319                items_written: Cell::new(0),
1320                open_calls: Cell::new(0),
1321                close_calls: Cell::new(0),
1322                flush_calls: Cell::new(0),
1323                fail_write: false,
1324                fail_open: false,
1325                fail_flush: false,
1326                fail_close: false,
1327            }
1328        }
1329        fn failing_write() -> Self {
1330            Self {
1331                fail_write: true,
1332                ..Self::new()
1333            }
1334        }
1335        fn failing_open() -> Self {
1336            Self {
1337                fail_open: true,
1338                ..Self::new()
1339            }
1340        }
1341    }
1342
1343    impl ItemWriter<i32> for RecordingWriter {
1344        fn write(&self, items: &[i32]) -> ItemWriterResult {
1345            if self.fail_write {
1346                return Err(BatchError::ItemWriter("forced write failure".to_string()));
1347            }
1348            self.write_calls.set(self.write_calls.get() + 1);
1349            self.items_written
1350                .set(self.items_written.get() + items.len());
1351            Ok(())
1352        }
1353        fn open(&self) -> ItemWriterResult {
1354            if self.fail_open {
1355                return Err(BatchError::ItemWriter("forced open failure".to_string()));
1356            }
1357            self.open_calls.set(self.open_calls.get() + 1);
1358            Ok(())
1359        }
1360        fn close(&self) -> ItemWriterResult {
1361            if self.fail_close {
1362                return Err(BatchError::ItemWriter("forced close failure".to_string()));
1363            }
1364            self.close_calls.set(self.close_calls.get() + 1);
1365            Ok(())
1366        }
1367        fn flush(&self) -> ItemWriterResult {
1368            if self.fail_flush {
1369                return Err(BatchError::ItemWriter("forced flush failure".to_string()));
1370            }
1371            self.flush_calls.set(self.flush_calls.get() + 1);
1372            Ok(())
1373        }
1374    }
1375
1376    #[test]
1377    fn should_write_to_both_writers() -> Result<(), BatchError> {
1378        let w1 = RecordingWriter::new();
1379        let w2 = RecordingWriter::new();
1380        let composite = CompositeItemWriter {
1381            first: w1,
1382            second: w2,
1383        };
1384        composite.write(&[1, 2, 3])?;
1385        assert_eq!(
1386            composite.first.write_calls.get(),
1387            1,
1388            "first writer should be called"
1389        );
1390        assert_eq!(
1391            composite.first.items_written.get(),
1392            3,
1393            "first writer should receive 3 items"
1394        );
1395        assert_eq!(
1396            composite.second.write_calls.get(),
1397            1,
1398            "second writer should be called"
1399        );
1400        assert_eq!(
1401            composite.second.items_written.get(),
1402            3,
1403            "second writer should receive 3 items"
1404        );
1405        Ok(())
1406    }
1407
1408    #[test]
1409    fn should_open_both_writers_in_order() -> Result<(), BatchError> {
1410        let w1 = RecordingWriter::new();
1411        let w2 = RecordingWriter::new();
1412        let composite = CompositeItemWriter {
1413            first: w1,
1414            second: w2,
1415        };
1416        composite.open()?;
1417        assert_eq!(
1418            composite.first.open_calls.get(),
1419            1,
1420            "first writer should be opened"
1421        );
1422        assert_eq!(
1423            composite.second.open_calls.get(),
1424            1,
1425            "second writer should be opened"
1426        );
1427        Ok(())
1428    }
1429
1430    #[test]
1431    fn should_close_both_writers_in_order() -> Result<(), BatchError> {
1432        let w1 = RecordingWriter::new();
1433        let w2 = RecordingWriter::new();
1434        let composite = CompositeItemWriter {
1435            first: w1,
1436            second: w2,
1437        };
1438        composite.close()?;
1439        assert_eq!(
1440            composite.first.close_calls.get(),
1441            1,
1442            "first writer should be closed"
1443        );
1444        assert_eq!(
1445            composite.second.close_calls.get(),
1446            1,
1447            "second writer should be closed"
1448        );
1449        Ok(())
1450    }
1451
1452    #[test]
1453    fn should_flush_both_writers() -> Result<(), BatchError> {
1454        let w1 = RecordingWriter::new();
1455        let w2 = RecordingWriter::new();
1456        let composite = CompositeItemWriter {
1457            first: w1,
1458            second: w2,
1459        };
1460        composite.flush()?;
1461        assert_eq!(
1462            composite.first.flush_calls.get(),
1463            1,
1464            "first writer should be flushed"
1465        );
1466        assert_eq!(
1467            composite.second.flush_calls.get(),
1468            1,
1469            "second writer should be flushed"
1470        );
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn should_short_circuit_on_write_error() {
1476        let w1 = RecordingWriter::failing_write();
1477        let w2 = RecordingWriter::new();
1478        let composite = CompositeItemWriter {
1479            first: w1,
1480            second: w2,
1481        };
1482        let result = composite.write(&[1, 2, 3]);
1483        assert!(result.is_err(), "error should propagate");
1484        assert_eq!(
1485            composite.second.write_calls.get(),
1486            0,
1487            "second writer should not be called after first fails"
1488        );
1489    }
1490
1491    #[test]
1492    fn should_short_circuit_on_open_error() {
1493        let w1 = RecordingWriter::failing_open();
1494        let w2 = RecordingWriter::new();
1495        let composite = CompositeItemWriter {
1496            first: w1,
1497            second: w2,
1498        };
1499        let result = composite.open();
1500        assert!(result.is_err(), "error should propagate");
1501        assert_eq!(
1502            composite.second.open_calls.get(),
1503            0,
1504            "second writer should not be opened after first fails"
1505        );
1506    }
1507
1508    #[test]
1509    fn should_flush_both_writers_even_when_first_fails() {
1510        let w1 = RecordingWriter {
1511            fail_flush: true,
1512            ..RecordingWriter::new()
1513        };
1514        let w2 = RecordingWriter::new();
1515        let composite = CompositeItemWriter {
1516            first: w1,
1517            second: w2,
1518        };
1519        let result = composite.flush();
1520        assert!(result.is_err(), "error should propagate");
1521        assert_eq!(
1522            composite.second.flush_calls.get(),
1523            1,
1524            "second writer should still be flushed even when first fails"
1525        );
1526    }
1527
1528    #[test]
1529    fn should_close_both_writers_even_when_first_fails() {
1530        let w1 = RecordingWriter {
1531            fail_close: true,
1532            ..RecordingWriter::new()
1533        };
1534        let w2 = RecordingWriter::new();
1535        let composite = CompositeItemWriter {
1536            first: w1,
1537            second: w2,
1538        };
1539        let result = composite.close();
1540        assert!(result.is_err(), "error should propagate");
1541        assert_eq!(
1542            composite.second.close_calls.get(),
1543            1,
1544            "second writer should still be closed even when first fails"
1545        );
1546    }
1547
1548    #[test]
1549    fn should_chain_two_writers_via_builder() -> Result<(), BatchError> {
1550        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1551            .link(RecordingWriter::new())
1552            .build();
1553        composite.write(&[10, 20])?;
1554        assert_eq!(
1555            composite.first.items_written.get(),
1556            2,
1557            "first writer should receive 2 items"
1558        );
1559        assert_eq!(
1560            composite.second.items_written.get(),
1561            2,
1562            "second writer should receive 2 items"
1563        );
1564        Ok(())
1565    }
1566
1567    #[test]
1568    fn should_chain_three_writers() -> Result<(), BatchError> {
1569        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1570            .link(RecordingWriter::new())
1571            .link(RecordingWriter::new())
1572            .build();
1573        composite.write(&[1, 2, 3, 4])?;
1574        // composite type: CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>
1575        // composite.first is CompositeItemWriter<W1, W2>
1576        // composite.second is W3
1577        assert_eq!(
1578            composite.first.first.items_written.get(),
1579            4,
1580            "writer 1 should receive 4 items"
1581        );
1582        assert_eq!(
1583            composite.first.second.items_written.get(),
1584            4,
1585            "writer 2 should receive 4 items"
1586        );
1587        assert_eq!(
1588            composite.second.items_written.get(),
1589            4,
1590            "writer 3 should receive 4 items"
1591        );
1592        Ok(())
1593    }
1594
1595    #[test]
1596    fn should_use_box_blanket_impl_as_item_writer() -> Result<(), BatchError> {
1597        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1598            .link(RecordingWriter::new())
1599            .build();
1600        let boxed: Box<dyn ItemWriter<i32>> = Box::new(composite);
1601        boxed.write(&[5, 6, 7])?;
1602        // The test verifies that Box<dyn ItemWriter<T>> can be used as an ItemWriter<T>.
1603        // We can't inspect the inner writers through Box<dyn>, so asserting Ok is sufficient.
1604        Ok(())
1605    }
1606
1607    #[test]
1608    fn should_use_box_concrete_writer_as_item_writer() -> Result<(), BatchError> {
1609        let boxed: Box<RecordingWriter> = Box::new(RecordingWriter::new());
1610        boxed.open()?;
1611        boxed.write(&[1, 2])?;
1612        boxed.flush()?;
1613        boxed.close()?;
1614        assert_eq!(
1615            boxed.items_written.get(),
1616            2,
1617            "boxed concrete writer should delegate write"
1618        );
1619        assert_eq!(
1620            boxed.open_calls.get(),
1621            1,
1622            "boxed concrete writer should delegate open"
1623        );
1624        assert_eq!(
1625            boxed.flush_calls.get(),
1626            1,
1627            "boxed concrete writer should delegate flush"
1628        );
1629        assert_eq!(
1630            boxed.close_calls.get(),
1631            1,
1632            "boxed concrete writer should delegate close"
1633        );
1634        Ok(())
1635    }
1636}