Skip to main content

spring_batch_rs/core/
item.rs

1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50///     items: Vec<String>,
51///     position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55///     fn read(&mut self) -> ItemReaderResult<String> {
56///         if self.position < self.items.len() {
57///             let item = self.items[self.position].clone();
58///             self.position += 1;
59///             Ok(Some(item))
60///         } else {
61///             Ok(None) // End of data
62///         }
63///     }
64/// }
65/// ```
66pub trait ItemReader<I> {
67    /// Reads an item from the reader.
68    ///
69    /// # Returns
70    /// - `Ok(Some(item))` when an item is successfully read
71    /// - `Ok(None)` when there are no more items to read (end of data)
72    /// - `Err(BatchError)` when an error occurs during reading
73    fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`crate::core::step::StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// struct Person { name: String, age: u32 }
107///
108/// impl ItemProcessor<Person, Person> for AdultFilter {
109///     fn process(&self, item: Person) -> ItemProcessorResult<Person> {
110///         if item.age >= 18 {
111///             Ok(Some(item)) // keep adults
112///         } else {
113///             Ok(None) // filter out minors
114///         }
115///     }
116/// }
117/// ```
118pub trait ItemProcessor<I, O> {
119    /// Processes an item and returns the processed result.
120    ///
121    /// # Parameters
122    /// - `item`: The item to process (consumed by value)
123    ///
124    /// # Returns
125    /// - `Ok(Some(processed_item))` when the item is successfully processed
126    /// - `Ok(None)` when the item is intentionally filtered out
127    /// - `Err(BatchError)` when an error occurs during processing
128    fn process(&self, item: I) -> ItemProcessorResult<O>;
129}
130
131/// A trait for writing items.
132///
133/// This trait defines the contract for components that write items to a data destination.
134/// It is one of the fundamental building blocks of the batch processing pipeline.
135///
136/// # Design Pattern
137///
138/// This follows the Strategy Pattern, allowing different writing strategies to be
139/// interchangeable while maintaining a consistent interface.
140///
141/// # Lifecycle Methods
142///
143/// This trait includes additional lifecycle methods:
144/// - `flush()`: Flushes any buffered data
145/// - `open()`: Initializes resources before writing starts
146/// - `close()`: Releases resources after writing completes
147///
148/// # Example
149///
150/// ```
151/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
152/// use spring_batch_rs::error::BatchError;
153///
154/// struct ConsoleWriter;
155///
156/// impl ItemWriter<String> for ConsoleWriter {
157///     fn write(&self, items: &[String]) -> ItemWriterResult {
158///         for item in items {
159///             println!("{}", item);
160///         }
161///         Ok(())
162///     }
163/// }
164/// ```
165pub trait ItemWriter<O> {
166    /// Writes the given items.
167    ///
168    /// # Parameters
169    /// - `items`: A slice of items to write
170    ///
171    /// # Returns
172    /// - `Ok(())` when items are successfully written
173    /// - `Err(BatchError)` when an error occurs during writing
174    fn write(&self, items: &[O]) -> ItemWriterResult;
175
176    /// Flushes any buffered data.
177    ///
178    /// This method is called after a chunk of items has been written, and
179    /// allows the writer to flush any internally buffered data to the destination.
180    ///
181    /// # Default Implementation
182    ///
183    /// The default implementation does nothing and returns `Ok(())`.
184    ///
185    /// # Returns
186    /// - `Ok(())` when the flush operation succeeds
187    /// - `Err(BatchError)` when an error occurs during flushing
188    fn flush(&self) -> ItemWriterResult {
189        Ok(())
190    }
191
192    /// Opens the writer.
193    ///
194    /// This method is called before any items are written, and allows the writer
195    /// to initialize any resources it needs.
196    ///
197    /// # Default Implementation
198    ///
199    /// The default implementation does nothing and returns `Ok(())`.
200    ///
201    /// # Returns
202    /// - `Ok(())` when the open operation succeeds
203    /// - `Err(BatchError)` when an error occurs during opening
204    fn open(&self) -> ItemWriterResult {
205        Ok(())
206    }
207
208    /// Closes the writer.
209    ///
210    /// This method is called after all items have been written, and allows the writer
211    /// to release any resources it acquired.
212    ///
213    /// # Default Implementation
214    ///
215    /// The default implementation does nothing and returns `Ok(())`.
216    ///
217    /// # Returns
218    /// - `Ok(())` when the close operation succeeds
219    /// - `Err(BatchError)` when an error occurs during closing
220    fn close(&self) -> ItemWriterResult {
221        Ok(())
222    }
223}
224
225/// A pass-through processor that returns items unchanged.
226///
227/// This processor implements the identity function for batch processing pipelines.
228/// It takes an input item and returns it unchanged, making it useful for scenarios
229/// where you need a processor in the pipeline but don't want to transform the data.
230///
231/// # Type Parameters
232///
233/// - `T`: The item type that will be passed through unchanged.
234///
235/// # Use Cases
236///
237/// - Testing batch processing pipelines without data transformation
238/// - Placeholder processor during development
239/// - Pipelines where processing logic is conditional and sometimes bypassed
240/// - Maintaining consistent pipeline structure when transformation is optional
241///
242/// # Performance
243///
244/// This processor takes ownership of the item and returns it directly,
245/// with no allocation or clone.
246///
247/// # Examples
248///
249/// ```
250/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
251///
252/// let processor = PassThroughProcessor::<String>::new();
253/// let result = processor.process("Hello, World!".to_string()).unwrap();
254/// assert_eq!(result, Some("Hello, World!".to_string()));
255/// ```
256///
257/// Using with different data types:
258///
259/// ```
260/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
261///
262/// let int_processor = PassThroughProcessor::<i32>::new();
263/// let result = int_processor.process(42).unwrap();
264/// assert_eq!(result, Some(42));
265///
266/// #[derive(PartialEq, Debug)]
267/// struct Person {
268///     name: String,
269///     age: u32,
270/// }
271///
272/// let person_processor = PassThroughProcessor::<Person>::new();
273/// let result = person_processor.process(Person { name: "Alice".to_string(), age: 30 }).unwrap();
274/// assert_eq!(result, Some(Person { name: "Alice".to_string(), age: 30 }));
275/// ```
276#[derive(Default)]
277pub struct PassThroughProcessor<T> {
278    _phantom: std::marker::PhantomData<T>,
279}
280
281impl<T> ItemProcessor<T, T> for PassThroughProcessor<T> {
282    /// Processes an item by returning it unchanged.
283    ///
284    /// # Parameters
285    /// - `item`: The item to process (consumed by value and returned unchanged)
286    ///
287    /// # Returns
288    /// - `Ok(Some(item))` - Always succeeds and returns the input item
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
294    ///
295    /// let processor = PassThroughProcessor::<Vec<i32>>::new();
296    /// let result = processor.process(vec![1, 2, 3]).unwrap();
297    /// assert_eq!(result, Some(vec![1, 2, 3]));
298    /// ```
299    fn process(&self, item: T) -> ItemProcessorResult<T> {
300        Ok(Some(item))
301    }
302}
303
304impl<T> PassThroughProcessor<T> {
305    /// Creates a new `PassThroughProcessor`.
306    ///
307    /// # Returns
308    /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use spring_batch_rs::core::item::PassThroughProcessor;
314    ///
315    /// let processor = PassThroughProcessor::<String>::new();
316    /// ```
317    pub fn new() -> Self {
318        Self {
319            _phantom: std::marker::PhantomData,
320        }
321    }
322}
323
324/// A composite processor that chains two processors sequentially using static dispatch.
325///
326/// The output of the first processor becomes the input of the second.
327/// If the first processor filters an item (returns `Ok(None)`), the chain
328/// stops immediately and `Ok(None)` is returned — the second processor is
329/// never called.
330///
331/// Both processors are stored by value — no heap allocation occurs inside the
332/// struct itself. This mirrors the pattern used by standard library iterator
333/// adapters such as [`std::iter::Chain`].
334///
335/// Construct chains using [`CompositeItemProcessorBuilder`] rather than
336/// instantiating this struct directly.
337///
338/// # Type Parameters
339///
340/// - `P1`: The first processor type. Must implement `ItemProcessor<I, M>` for
341///   some input type `I` and intermediate type `M`.
342/// - `P2`: The second processor type. Must implement `ItemProcessor<M, O>` where
343///   `M` is the output type of `P1` and `O` is the final output type.
344/// - `M`: The intermediate type — output of `P1`, input of `P2`. Tracked via
345///   `PhantomData` so it participates in type inference without being stored.
346///
347/// # Examples
348///
349/// ```
350/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
351/// use spring_batch_rs::BatchError;
352///
353/// struct DoubleProcessor;
354/// impl ItemProcessor<i32, i32> for DoubleProcessor {
355///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
356///         Ok(Some(item * 2))
357///     }
358/// }
359///
360/// struct ToStringProcessor;
361/// impl ItemProcessor<i32, String> for ToStringProcessor {
362///     fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
363///         Ok(Some(item.to_string()))
364///     }
365/// }
366///
367/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
368///     .link(ToStringProcessor)
369///     .build();
370///
371/// // 21 * 2 = 42, then converted to "42"
372/// assert_eq!(composite.process(21).unwrap(), Some("42".to_string()));
373/// ```
374///
375/// # Errors
376///
377/// Returns [`BatchError`] if any processor in the chain returns an error.
378pub struct CompositeItemProcessor<P1, P2, M> {
379    first: P1,
380    second: P2,
381    /// Tracks the intermediate type `M` (output of `P1`, input of `P2`).
382    /// Uses `fn(M) -> M` to keep the type parameter invariant and avoid
383    /// unintended variance.
384    _marker: std::marker::PhantomData<fn(M) -> M>,
385}
386
387impl<I, M, O, P1, P2> ItemProcessor<I, O> for CompositeItemProcessor<P1, P2, M>
388where
389    P1: ItemProcessor<I, M>,
390    P2: ItemProcessor<M, O>,
391{
392    /// Applies the first processor, then — if the result is `Some` — applies
393    /// the second. Returns `Ok(None)` immediately if the first processor
394    /// filters the item.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`BatchError`] if either processor fails.
399    fn process(&self, item: I) -> ItemProcessorResult<O> {
400        match self.first.process(item)? {
401            Some(intermediate) => self.second.process(intermediate),
402            None => Ok(None),
403        }
404    }
405}
406
407/// Builder for creating a chain of [`ItemProcessor`]s using static dispatch.
408///
409/// Start the chain with [`new`](CompositeItemProcessorBuilder::new), append
410/// processors with [`link`](CompositeItemProcessorBuilder::link), and finalise
411/// with [`build`](CompositeItemProcessorBuilder::build). Each call to `link`
412/// wraps the accumulated chain in a [`CompositeItemProcessor`], changing the
413/// output type. Mismatched types are caught at compile time.
414///
415/// The built chain stores all processors by value — no heap allocations occur
416/// inside the processor itself. The type of the built value encodes the full
417/// chain structure (e.g. `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`),
418/// similar to how `Iterator` adapters compose in the standard library.
419///
420/// # Type Parameters
421///
422/// - `P`: The accumulated processor type. Starts as the first processor and
423///   is wrapped in [`CompositeItemProcessor`] with each [`link`](CompositeItemProcessorBuilder::link) call.
424///
425/// # Examples
426///
427/// Two processors (`i32 → i32 → String`):
428///
429/// ```
430/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
431/// use spring_batch_rs::BatchError;
432///
433/// struct DoubleProcessor;
434/// impl ItemProcessor<i32, i32> for DoubleProcessor {
435///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
436///         Ok(Some(item * 2))
437///     }
438/// }
439///
440/// struct ToStringProcessor;
441/// impl ItemProcessor<i32, String> for ToStringProcessor {
442///     fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
443///         Ok(Some(item.to_string()))
444///     }
445/// }
446///
447/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
448///     .link(ToStringProcessor)
449///     .build();
450///
451/// assert_eq!(composite.process(21).unwrap(), Some("42".to_string()));
452/// ```
453///
454/// Three processors (`i32 → i32 → i32 → String`):
455///
456/// ```
457/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
458/// use spring_batch_rs::BatchError;
459///
460/// struct AddOneProcessor;
461/// impl ItemProcessor<i32, i32> for AddOneProcessor {
462///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
463///         Ok(Some(item + 1))
464///     }
465/// }
466///
467/// struct DoubleProcessor;
468/// impl ItemProcessor<i32, i32> for DoubleProcessor {
469///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
470///         Ok(Some(item * 2))
471///     }
472/// }
473///
474/// struct ToStringProcessor;
475/// impl ItemProcessor<i32, String> for ToStringProcessor {
476///     fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
477///         Ok(Some(item.to_string()))
478///     }
479/// }
480///
481/// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
482///     .link(DoubleProcessor)
483///     .link(ToStringProcessor)
484///     .build();
485///
486/// // (4 + 1) * 2 = 10 → "10"
487/// assert_eq!(composite.process(4).unwrap(), Some("10".to_string()));
488/// ```
489pub struct CompositeItemProcessorBuilder<P> {
490    processor: P,
491}
492
493impl<P> CompositeItemProcessorBuilder<P> {
494    /// Creates a new builder with the given processor as the first in the chain.
495    ///
496    /// # Parameters
497    ///
498    /// - `first`: The first processor in the chain.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
504    /// use spring_batch_rs::BatchError;
505    ///
506    /// struct UppercaseProcessor;
507    /// impl ItemProcessor<String, String> for UppercaseProcessor {
508    ///     fn process(&self, item: String) -> Result<Option<String>, BatchError> {
509    ///         Ok(Some(item.to_uppercase()))
510    ///     }
511    /// }
512    ///
513    /// let builder = CompositeItemProcessorBuilder::new(UppercaseProcessor);
514    /// let composite = builder.build();
515    /// assert_eq!(composite.process("hello".to_string()).unwrap(), Some("HELLO".to_string()));
516    /// ```
517    pub fn new(first: P) -> Self {
518        Self { processor: first }
519    }
520
521    /// Appends a processor to the end of the chain.
522    ///
523    /// Returns a new builder whose accumulated type is
524    /// `CompositeItemProcessor<P, P2>`. The input/output types are verified
525    /// at compile time when the chain is used.
526    ///
527    /// # Type Parameters
528    ///
529    /// - `P2`: The processor type to append.
530    /// - `M`: The intermediate type connecting `P` and `P2`. Inferred by the
531    ///   compiler from the `ItemProcessor` impls on `P` and `P2`.
532    ///
533    /// # Parameters
534    ///
535    /// - `next`: The processor to append to the chain.
536    ///
537    /// # Examples
538    ///
539    /// ```
540    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
541    /// use spring_batch_rs::BatchError;
542    ///
543    /// struct AddOneProcessor;
544    /// impl ItemProcessor<i32, i32> for AddOneProcessor {
545    ///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
546    ///         Ok(Some(item + 1))
547    ///     }
548    /// }
549    ///
550    /// struct ToStringProcessor;
551    /// impl ItemProcessor<i32, String> for ToStringProcessor {
552    ///     fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
553    ///         Ok(Some(item.to_string()))
554    ///     }
555    /// }
556    ///
557    /// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
558    ///     .link(ToStringProcessor)
559    ///     .build();
560    ///
561    /// assert_eq!(composite.process(41).unwrap(), Some("42".to_string()));
562    /// ```
563    pub fn link<P2, M>(
564        self,
565        next: P2,
566    ) -> CompositeItemProcessorBuilder<CompositeItemProcessor<P, P2, M>> {
567        CompositeItemProcessorBuilder {
568            processor: CompositeItemProcessor {
569                first: self.processor,
570                second: next,
571                _marker: std::marker::PhantomData,
572            },
573        }
574    }
575
576    /// Builds and returns the composite processor.
577    ///
578    /// Returns the accumulated processor value `P`. When chained via `link`,
579    /// `P` will be a nested `CompositeItemProcessor` such as
580    /// `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`.
581    ///
582    /// Pass `&composite` to the step builder's `.processor()` method — Rust
583    /// will coerce it to `&dyn ItemProcessor<I, O>` automatically.
584    ///
585    /// # Examples
586    ///
587    /// ```
588    /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
589    /// use spring_batch_rs::BatchError;
590    ///
591    /// struct DoubleProcessor;
592    /// impl ItemProcessor<i32, i32> for DoubleProcessor {
593    ///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
594    ///         Ok(Some(item * 2))
595    ///     }
596    /// }
597    ///
598    /// struct AddTenProcessor;
599    /// impl ItemProcessor<i32, i32> for AddTenProcessor {
600    ///     fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
601    ///         Ok(Some(item + 10))
602    ///     }
603    /// }
604    ///
605    /// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
606    ///     .link(AddTenProcessor)
607    ///     .build();
608    ///
609    /// // 5 * 2 = 10, then 10 + 10 = 20
610    /// assert_eq!(composite.process(5).unwrap(), Some(20));
611    /// ```
612    pub fn build(self) -> P {
613        self.processor
614    }
615}
616
617/// A composite writer that fans out the same chunk to two writers sequentially using static dispatch.
618///
619/// Both writers receive identical slices on every `write` call. All four lifecycle
620/// methods (`write`, `flush`, `open`, `close`) are forwarded to `first` then `second`,
621/// short-circuiting on the first `Err`. If `open()` on `first` fails, `second.open()`
622/// is never called — lifecycle management is the step's responsibility.
623///
624/// Both writers are stored by value — no heap allocation occurs inside the struct.
625/// The type encodes the full chain:
626/// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>` for three writers.
627///
628/// Prefer constructing instances via [`CompositeItemWriterBuilder`] rather than
629/// direct struct literal syntax.
630///
631/// # Type Parameters
632///
633/// - `W1`: The first writer type. Must implement `ItemWriter<T>`.
634/// - `W2`: The second writer type. Must implement `ItemWriter<T>`.
635///
636/// # Examples
637///
638/// ```
639/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
640/// use std::rc::Rc;
641/// use std::cell::Cell;
642///
643/// struct CountingWriter { count: Rc<Cell<usize>> }
644/// impl CountingWriter {
645///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
646/// }
647/// impl ItemWriter<i32> for CountingWriter {
648///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
649///         self.count.set(self.count.get() + items.len());
650///         Ok(())
651///     }
652/// }
653///
654/// let c1 = Rc::new(Cell::new(0usize));
655/// let c2 = Rc::new(Cell::new(0usize));
656/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
657///     .link(CountingWriter::new(c2.clone()))
658///     .build();
659/// composite.write(&[1, 2, 3]).unwrap();
660/// assert_eq!(c1.get(), 3);
661/// assert_eq!(c2.get(), 3);
662/// ```
663///
664/// # Errors
665///
666/// Returns [`BatchError`] if any writer in the chain returns an error.
667pub struct CompositeItemWriter<W1, W2> {
668    first: W1,
669    second: W2,
670}
671
672impl<T, W1, W2> ItemWriter<T> for CompositeItemWriter<W1, W2>
673where
674    W1: ItemWriter<T>,
675    W2: ItemWriter<T>,
676{
677    /// Writes `items` to `first`, then to `second`. Short-circuits on the first error.
678    ///
679    /// # Errors
680    ///
681    /// Returns [`BatchError::ItemWriter`] if either writer fails.
682    fn write(&self, items: &[T]) -> ItemWriterResult {
683        self.first.write(items)?;
684        self.second.write(items)
685    }
686
687    /// Flushes both writers regardless of errors. Returns the first error encountered.
688    ///
689    /// Both `first` and `second` are always flushed, even if `first` fails,
690    /// to avoid silently skipping buffered output in the second writer.
691    ///
692    /// # Errors
693    ///
694    /// Returns [`BatchError::ItemWriter`] if either flush fails. If both fail,
695    /// the error from `first` is returned.
696    fn flush(&self) -> ItemWriterResult {
697        let r1 = self.first.flush();
698        let r2 = self.second.flush();
699        r1.and(r2)
700    }
701
702    /// Opens `first`, then `second`. Short-circuits on the first error.
703    ///
704    /// # Errors
705    ///
706    /// Returns [`BatchError::ItemWriter`] if either open fails.
707    fn open(&self) -> ItemWriterResult {
708        self.first.open()?;
709        self.second.open()
710    }
711
712    /// Closes both writers regardless of errors. Returns the first error encountered.
713    ///
714    /// Both `first` and `second` are always closed, even if `first` fails,
715    /// to avoid resource leaks.
716    ///
717    /// # Errors
718    ///
719    /// Returns [`BatchError::ItemWriter`] if either close fails. If both fail,
720    /// the error from `first` is returned.
721    fn close(&self) -> ItemWriterResult {
722        let r1 = self.first.close();
723        let r2 = self.second.close();
724        r1.and(r2)
725    }
726}
727
728/// Builder for creating a fan-out chain of [`ItemWriter`]s using static dispatch.
729///
730/// Start the chain with [`new`](CompositeItemWriterBuilder::new), append writers
731/// with [`add`](CompositeItemWriterBuilder::add), and finalise with
732/// [`build`](CompositeItemWriterBuilder::build). Each call to `add` wraps the
733/// accumulated chain in a [`CompositeItemWriter`]. The built chain stores all
734/// writers by value — no heap allocations occur inside the chain itself.
735///
736/// # Type Parameters
737///
738/// - `W`: The accumulated writer type. Starts as the first writer and is wrapped
739///   in [`CompositeItemWriter`] with each [`add`](CompositeItemWriterBuilder::add) call.
740///
741/// # Examples
742///
743/// Two writers:
744///
745/// ```
746/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
747/// use std::rc::Rc;
748/// use std::cell::Cell;
749///
750/// struct CountingWriter { count: Rc<Cell<usize>> }
751/// impl CountingWriter {
752///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
753/// }
754/// impl ItemWriter<i32> for CountingWriter {
755///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
756///         self.count.set(self.count.get() + items.len());
757///         Ok(())
758///     }
759/// }
760///
761/// let c1 = Rc::new(Cell::new(0usize));
762/// let c2 = Rc::new(Cell::new(0usize));
763/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
764///     .link(CountingWriter::new(c2.clone()))
765///     .build();
766///
767/// composite.write(&[1, 2, 3]).unwrap();
768/// assert_eq!(c1.get(), 3);
769/// assert_eq!(c2.get(), 3);
770/// ```
771///
772/// Three writers:
773///
774/// ```
775/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
776/// use std::rc::Rc;
777/// use std::cell::Cell;
778///
779/// struct CountingWriter { count: Rc<Cell<usize>> }
780/// impl CountingWriter {
781///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
782/// }
783/// impl ItemWriter<i32> for CountingWriter {
784///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
785///         self.count.set(self.count.get() + items.len());
786///         Ok(())
787///     }
788/// }
789///
790/// let c1 = Rc::new(Cell::new(0usize));
791/// let c2 = Rc::new(Cell::new(0usize));
792/// let c3 = Rc::new(Cell::new(0usize));
793/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
794///     .link(CountingWriter::new(c2.clone()))
795///     .link(CountingWriter::new(c3.clone()))
796///     .build();
797///
798/// composite.write(&[1, 2]).unwrap();
799/// assert_eq!(c1.get(), 2);
800/// assert_eq!(c2.get(), 2);
801/// assert_eq!(c3.get(), 2);
802/// ```
803pub struct CompositeItemWriterBuilder<W> {
804    writer: W,
805}
806
807impl<W> CompositeItemWriterBuilder<W> {
808    /// Creates a new builder with the given writer as the first delegate.
809    ///
810    /// # Parameters
811    ///
812    /// - `first`: The first writer in the fan-out chain.
813    ///
814    /// # Examples
815    ///
816    /// ```
817    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
818    ///
819    /// struct CountingWriter { count: std::cell::Cell<usize> }
820    /// impl CountingWriter { fn new() -> Self { Self { count: std::cell::Cell::new(0) } } }
821    /// impl ItemWriter<i32> for CountingWriter {
822    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
823    ///         self.count.set(self.count.get() + items.len());
824    ///         Ok(())
825    ///     }
826    /// }
827    ///
828    /// let writer = CompositeItemWriterBuilder::new(CountingWriter::new()).build();
829    /// writer.write(&[1, 2, 3]).unwrap();
830    /// assert_eq!(writer.count.get(), 3, "writer should receive all items");
831    /// ```
832    pub fn new(first: W) -> Self {
833        Self { writer: first }
834    }
835
836    /// Appends a writer to the fan-out chain.
837    ///
838    /// Returns a new builder whose accumulated type is `CompositeItemWriter<W, W2>`.
839    /// Both writers must implement `ItemWriter<T>` for the same `T` — verified at
840    /// compile time.
841    ///
842    /// # Parameters
843    ///
844    /// - `next`: The writer to link into the chain.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
850    /// use std::rc::Rc;
851    /// use std::cell::Cell;
852    ///
853    /// struct CountingWriter { count: Rc<Cell<usize>> }
854    /// impl CountingWriter {
855    ///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
856    /// }
857    /// impl ItemWriter<i32> for CountingWriter {
858    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
859    ///         self.count.set(self.count.get() + items.len());
860    ///         Ok(())
861    ///     }
862    /// }
863    ///
864    /// let c1 = Rc::new(Cell::new(0usize));
865    /// let c2 = Rc::new(Cell::new(0usize));
866    /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
867    ///     .link(CountingWriter::new(c2.clone()))
868    ///     .build();
869    ///
870    /// composite.write(&[1, 2, 3]).unwrap();
871    /// assert_eq!(c1.get(), 3, "first writer should receive all items");
872    /// assert_eq!(c2.get(), 3, "second writer should receive all items");
873    /// ```
874    #[allow(clippy::should_implement_trait)]
875    pub fn link<W2>(self, next: W2) -> CompositeItemWriterBuilder<CompositeItemWriter<W, W2>> {
876        CompositeItemWriterBuilder {
877            writer: CompositeItemWriter {
878                first: self.writer,
879                second: next,
880            },
881        }
882    }
883
884    /// Builds and returns the composite writer.
885    ///
886    /// Returns the accumulated writer value `W`. When chained via `link`, `W` is a
887    /// nested `CompositeItemWriter` such as
888    /// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>`.
889    ///
890    /// Pass `&composite` to the step builder's `.writer()` method.
891    ///
892    /// # Examples
893    ///
894    /// ```
895    /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
896    /// use std::rc::Rc;
897    /// use std::cell::Cell;
898    ///
899    /// struct CountingWriter { count: Rc<Cell<usize>> }
900    /// impl CountingWriter {
901    ///     fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
902    /// }
903    /// impl ItemWriter<i32> for CountingWriter {
904    ///     fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
905    ///         self.count.set(self.count.get() + items.len());
906    ///         Ok(())
907    ///     }
908    /// }
909    ///
910    /// let c1 = Rc::new(Cell::new(0usize));
911    /// let c2 = Rc::new(Cell::new(0usize));
912    /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
913    ///     .link(CountingWriter::new(c2.clone()))
914    ///     .build();
915    ///
916    /// composite.write(&[1, 2, 3]).unwrap();
917    /// assert_eq!(c1.get(), 3);
918    /// assert_eq!(c2.get(), 3);
919    /// ```
920    pub fn build(self) -> W {
921        self.writer
922    }
923}
924
925/// Allows any `Box<P>` where `P: ItemProcessor<I, O>` to be used wherever
926/// `&dyn ItemProcessor<I, O>` is expected — including boxed concrete types
927/// (`Box<MyProcessor>`) and boxed trait objects (`Box<dyn ItemProcessor<I, O>>`).
928///
929/// The `?Sized` bound is what makes this cover trait objects: `dyn Trait` is
930/// unsized, so without `?Sized` the impl would not apply to them.
931impl<I, O, P: ItemProcessor<I, O> + ?Sized> ItemProcessor<I, O> for Box<P> {
932    fn process(&self, item: I) -> ItemProcessorResult<O> {
933        (**self).process(item)
934    }
935}
936
937/// Allows any `Box<W>` where `W: ItemWriter<T>` to be used wherever
938/// `&dyn ItemWriter<T>` is expected — including boxed concrete types
939/// (`Box<MyWriter>`) and boxed trait objects (`Box<dyn ItemWriter<T>>`).
940///
941/// The `?Sized` bound makes this cover trait objects: `dyn Trait` is
942/// unsized, so without `?Sized` the impl would not apply to them.
943impl<T, W: ItemWriter<T> + ?Sized> ItemWriter<T> for Box<W> {
944    fn write(&self, items: &[T]) -> ItemWriterResult {
945        (**self).write(items)
946    }
947    fn flush(&self) -> ItemWriterResult {
948        (**self).flush()
949    }
950    fn open(&self) -> ItemWriterResult {
951        (**self).open()
952    }
953    fn close(&self) -> ItemWriterResult {
954        (**self).close()
955    }
956}
957
958#[cfg(test)]
959mod tests {
960    use super::*;
961
962    #[test]
963    fn should_create_new_pass_through_processor() {
964        let _processor = PassThroughProcessor::<String>::new();
965        // Test that we can create the processor without panicking
966        // Verify it's a zero-sized type (only contains PhantomData)
967        assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
968    }
969
970    #[test]
971    fn should_create_pass_through_processor_with_default() {
972        let _processor = PassThroughProcessor::<i32>::default();
973        // Test that we can create the processor using Default trait
974        // Verify it's a zero-sized type (only contains PhantomData)
975        assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
976    }
977
978    #[test]
979    fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
980        let processor = PassThroughProcessor::new();
981        let result = processor.process("Hello, World!".to_string())?;
982        assert_eq!(result, Some("Hello, World!".to_string()));
983        Ok(())
984    }
985
986    #[test]
987    fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
988        let processor = PassThroughProcessor::new();
989        let result = processor.process(42i32)?;
990        assert_eq!(result, Some(42));
991        Ok(())
992    }
993
994    #[test]
995    fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
996        let processor = PassThroughProcessor::new();
997        let result = processor.process(vec![1, 2, 3, 4, 5])?;
998        assert_eq!(result, Some(vec![1, 2, 3, 4, 5]));
999        Ok(())
1000    }
1001
1002    #[test]
1003    fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
1004        #[derive(PartialEq, Debug)]
1005        struct TestData {
1006            id: u32,
1007            name: String,
1008            values: Vec<f64>,
1009        }
1010
1011        let processor = PassThroughProcessor::new();
1012        let result = processor.process(TestData {
1013            id: 123,
1014            name: "Test Item".to_string(),
1015            values: vec![1.1, 2.2, 3.3],
1016        })?;
1017        assert_eq!(
1018            result,
1019            Some(TestData {
1020                id: 123,
1021                name: "Test Item".to_string(),
1022                values: vec![1.1, 2.2, 3.3],
1023            })
1024        );
1025        Ok(())
1026    }
1027
1028    #[test]
1029    fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
1030        let processor = PassThroughProcessor::new();
1031        let result_some = processor.process(Some("test".to_string()))?;
1032        assert_eq!(result_some, Some(Some("test".to_string())));
1033        let result_none = processor.process(None::<String>)?;
1034        assert_eq!(result_none, Some(None::<String>));
1035        Ok(())
1036    }
1037
1038    #[test]
1039    fn should_handle_empty_collections() -> Result<(), BatchError> {
1040        let result_vec = PassThroughProcessor::new().process(Vec::<i32>::new())?;
1041        assert_eq!(result_vec, Some(vec![]));
1042        let result_string = PassThroughProcessor::new().process(String::new())?;
1043        assert_eq!(result_string, Some(String::new()));
1044        Ok(())
1045    }
1046
1047    #[test]
1048    fn should_take_ownership_of_input() {
1049        let processor = PassThroughProcessor::new();
1050        let input = "original".to_string();
1051        let result = processor.process(input).unwrap();
1052        assert_eq!(result, Some("original".to_string()));
1053    }
1054
1055    #[test]
1056    fn should_work_with_multiple_processors() -> Result<(), BatchError> {
1057        let processor1 = PassThroughProcessor::<String>::new();
1058        let processor2 = PassThroughProcessor::<String>::new();
1059        let inner = processor1.process("test data".to_string())?.unwrap();
1060        let result = processor2.process(inner)?;
1061        assert_eq!(result, Some("test data".to_string()));
1062        Ok(())
1063    }
1064
1065    #[test]
1066    fn should_handle_large_data_structures() -> Result<(), BatchError> {
1067        let processor = PassThroughProcessor::new();
1068        let large_input: Vec<i32> = (0..10000).collect();
1069        let expected_len = large_input.len();
1070        let result = processor.process(large_input)?;
1071        // PassThroughProcessor always returns Some — unwrap is safe
1072        assert_eq!(result.unwrap().len(), expected_len);
1073        Ok(())
1074    }
1075
1076    #[test]
1077    fn should_use_default_flush_open_close_implementations() {
1078        struct MinimalWriter;
1079        impl ItemWriter<String> for MinimalWriter {
1080            fn write(&self, _: &[String]) -> ItemWriterResult {
1081                Ok(())
1082            }
1083            // flush, open, close use the trait's default implementations
1084        }
1085        let w = MinimalWriter;
1086        assert!(w.flush().is_ok(), "default flush should return Ok");
1087        assert!(w.open().is_ok(), "default open should return Ok");
1088        assert!(w.close().is_ok(), "default close should return Ok");
1089    }
1090
1091    // --- CompositeItemProcessor / CompositeItemProcessorBuilder ---
1092
1093    struct DoubleProcessor;
1094    impl ItemProcessor<i32, i32> for DoubleProcessor {
1095        fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1096            Ok(Some(item * 2))
1097        }
1098    }
1099
1100    struct AddTenProcessor;
1101    impl ItemProcessor<i32, i32> for AddTenProcessor {
1102        fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1103            Ok(Some(item + 10))
1104        }
1105    }
1106
1107    struct ToStringProcessor;
1108    impl ItemProcessor<i32, String> for ToStringProcessor {
1109        fn process(&self, item: i32) -> ItemProcessorResult<String> {
1110            Ok(Some(item.to_string()))
1111        }
1112    }
1113
1114    struct FilterEvenProcessor;
1115    impl ItemProcessor<i32, i32> for FilterEvenProcessor {
1116        fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1117            if item % 2 == 0 {
1118                Ok(Some(item))
1119            } else {
1120                Ok(None) // filter odd numbers
1121            }
1122        }
1123    }
1124
1125    struct FailingProcessor;
1126    impl ItemProcessor<i32, i32> for FailingProcessor {
1127        fn process(&self, _item: i32) -> ItemProcessorResult<i32> {
1128            Err(BatchError::ItemProcessor("forced failure".to_string()))
1129        }
1130    }
1131
1132    #[test]
1133    fn should_chain_two_same_type_processors() -> Result<(), BatchError> {
1134        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1135            .link(AddTenProcessor)
1136            .build();
1137
1138        // 5 * 2 = 10, then 10 + 10 = 20
1139        assert_eq!(
1140            composite.process(5)?,
1141            Some(20),
1142            "5 * 2 + 10 should equal 20"
1143        );
1144        Ok(())
1145    }
1146
1147    #[test]
1148    fn should_chain_two_type_changing_processors() -> Result<(), BatchError> {
1149        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1150            .link(ToStringProcessor)
1151            .build();
1152
1153        // 21 * 2 = 42, then "42"
1154        assert_eq!(composite.process(21)?, Some("42".to_string()));
1155        Ok(())
1156    }
1157
1158    #[test]
1159    fn should_chain_three_processors() -> Result<(), BatchError> {
1160        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1161            .link(AddTenProcessor)
1162            .link(ToStringProcessor)
1163            .build();
1164
1165        // 5 * 2 = 10, then 10 + 10 = 20, then "20"
1166        assert_eq!(composite.process(5)?, Some("20".to_string()));
1167        Ok(())
1168    }
1169
1170    #[test]
1171    fn should_stop_chain_when_first_processor_filters_item() -> Result<(), BatchError> {
1172        let composite = CompositeItemProcessorBuilder::new(FilterEvenProcessor)
1173            .link(ToStringProcessor)
1174            .build();
1175
1176        // 3 is odd → filtered by first processor → second processor never called
1177        assert_eq!(composite.process(3)?, None, "odd number should be filtered");
1178        // 4 is even → passes through → converted to string
1179        assert_eq!(
1180            composite.process(4)?,
1181            Some("4".to_string()),
1182            "even number should pass"
1183        );
1184        Ok(())
1185    }
1186
1187    #[test]
1188    fn should_propagate_error_from_first_processor() {
1189        let composite = CompositeItemProcessorBuilder::new(FailingProcessor)
1190            .link(ToStringProcessor)
1191            .build();
1192
1193        let result = composite.process(1);
1194        assert!(
1195            result.is_err(),
1196            "error from first processor should propagate"
1197        );
1198    }
1199
1200    #[test]
1201    fn should_propagate_error_from_second_processor() {
1202        struct AlwaysFailI32;
1203        impl ItemProcessor<i32, i32> for AlwaysFailI32 {
1204            fn process(&self, _: i32) -> ItemProcessorResult<i32> {
1205                Err(BatchError::ItemProcessor("second failed".to_string()))
1206            }
1207        }
1208
1209        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1210            .link(AlwaysFailI32)
1211            .build();
1212
1213        let result = composite.process(5);
1214        assert!(
1215            result.is_err(),
1216            "error from second processor should propagate"
1217        );
1218    }
1219
1220    #[test]
1221    fn should_use_box_blanket_impl_as_item_processor() -> Result<(), BatchError> {
1222        // build() returns the concrete type; Box::new() it to get a trait object.
1223        // Box<dyn ItemProcessor<I, O>> implements ItemProcessor<I, O> via the ?Sized blanket impl.
1224        let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1225            .link(ToStringProcessor)
1226            .build();
1227        let boxed: Box<dyn ItemProcessor<i32, String>> = Box::new(composite);
1228
1229        let result = boxed.process(3)?;
1230        assert_eq!(
1231            result,
1232            Some("6".to_string()),
1233            "boxed trait object should delegate to inner processor"
1234        );
1235        Ok(())
1236    }
1237
1238    #[test]
1239    fn should_use_box_concrete_type_as_item_processor() -> Result<(), BatchError> {
1240        // Box<ConcreteProcessor> also implements ItemProcessor<I, O> via the ?Sized blanket impl
1241        let boxed: Box<DoubleProcessor> = Box::new(DoubleProcessor);
1242
1243        let result = boxed.process(7)?;
1244        assert_eq!(
1245            result,
1246            Some(14),
1247            "boxed concrete processor should delegate to inner processor"
1248        );
1249        Ok(())
1250    }
1251
1252    // --- CompositeItemWriter ---
1253
1254    use std::cell::Cell;
1255
1256    struct RecordingWriter {
1257        write_calls: Cell<usize>,
1258        items_written: Cell<usize>,
1259        open_calls: Cell<usize>,
1260        close_calls: Cell<usize>,
1261        flush_calls: Cell<usize>,
1262        fail_write: bool,
1263        fail_open: bool,
1264        fail_flush: bool,
1265        fail_close: bool,
1266    }
1267
1268    impl RecordingWriter {
1269        fn new() -> Self {
1270            Self {
1271                write_calls: Cell::new(0),
1272                items_written: Cell::new(0),
1273                open_calls: Cell::new(0),
1274                close_calls: Cell::new(0),
1275                flush_calls: Cell::new(0),
1276                fail_write: false,
1277                fail_open: false,
1278                fail_flush: false,
1279                fail_close: false,
1280            }
1281        }
1282        fn failing_write() -> Self {
1283            Self {
1284                fail_write: true,
1285                ..Self::new()
1286            }
1287        }
1288        fn failing_open() -> Self {
1289            Self {
1290                fail_open: true,
1291                ..Self::new()
1292            }
1293        }
1294    }
1295
1296    impl ItemWriter<i32> for RecordingWriter {
1297        fn write(&self, items: &[i32]) -> ItemWriterResult {
1298            if self.fail_write {
1299                return Err(BatchError::ItemWriter("forced write failure".to_string()));
1300            }
1301            self.write_calls.set(self.write_calls.get() + 1);
1302            self.items_written
1303                .set(self.items_written.get() + items.len());
1304            Ok(())
1305        }
1306        fn open(&self) -> ItemWriterResult {
1307            if self.fail_open {
1308                return Err(BatchError::ItemWriter("forced open failure".to_string()));
1309            }
1310            self.open_calls.set(self.open_calls.get() + 1);
1311            Ok(())
1312        }
1313        fn close(&self) -> ItemWriterResult {
1314            if self.fail_close {
1315                return Err(BatchError::ItemWriter("forced close failure".to_string()));
1316            }
1317            self.close_calls.set(self.close_calls.get() + 1);
1318            Ok(())
1319        }
1320        fn flush(&self) -> ItemWriterResult {
1321            if self.fail_flush {
1322                return Err(BatchError::ItemWriter("forced flush failure".to_string()));
1323            }
1324            self.flush_calls.set(self.flush_calls.get() + 1);
1325            Ok(())
1326        }
1327    }
1328
1329    #[test]
1330    fn should_write_to_both_writers() -> Result<(), BatchError> {
1331        let w1 = RecordingWriter::new();
1332        let w2 = RecordingWriter::new();
1333        let composite = CompositeItemWriter {
1334            first: w1,
1335            second: w2,
1336        };
1337        composite.write(&[1, 2, 3])?;
1338        assert_eq!(
1339            composite.first.write_calls.get(),
1340            1,
1341            "first writer should be called"
1342        );
1343        assert_eq!(
1344            composite.first.items_written.get(),
1345            3,
1346            "first writer should receive 3 items"
1347        );
1348        assert_eq!(
1349            composite.second.write_calls.get(),
1350            1,
1351            "second writer should be called"
1352        );
1353        assert_eq!(
1354            composite.second.items_written.get(),
1355            3,
1356            "second writer should receive 3 items"
1357        );
1358        Ok(())
1359    }
1360
1361    #[test]
1362    fn should_open_both_writers_in_order() -> Result<(), BatchError> {
1363        let w1 = RecordingWriter::new();
1364        let w2 = RecordingWriter::new();
1365        let composite = CompositeItemWriter {
1366            first: w1,
1367            second: w2,
1368        };
1369        composite.open()?;
1370        assert_eq!(
1371            composite.first.open_calls.get(),
1372            1,
1373            "first writer should be opened"
1374        );
1375        assert_eq!(
1376            composite.second.open_calls.get(),
1377            1,
1378            "second writer should be opened"
1379        );
1380        Ok(())
1381    }
1382
1383    #[test]
1384    fn should_close_both_writers_in_order() -> Result<(), BatchError> {
1385        let w1 = RecordingWriter::new();
1386        let w2 = RecordingWriter::new();
1387        let composite = CompositeItemWriter {
1388            first: w1,
1389            second: w2,
1390        };
1391        composite.close()?;
1392        assert_eq!(
1393            composite.first.close_calls.get(),
1394            1,
1395            "first writer should be closed"
1396        );
1397        assert_eq!(
1398            composite.second.close_calls.get(),
1399            1,
1400            "second writer should be closed"
1401        );
1402        Ok(())
1403    }
1404
1405    #[test]
1406    fn should_flush_both_writers() -> Result<(), BatchError> {
1407        let w1 = RecordingWriter::new();
1408        let w2 = RecordingWriter::new();
1409        let composite = CompositeItemWriter {
1410            first: w1,
1411            second: w2,
1412        };
1413        composite.flush()?;
1414        assert_eq!(
1415            composite.first.flush_calls.get(),
1416            1,
1417            "first writer should be flushed"
1418        );
1419        assert_eq!(
1420            composite.second.flush_calls.get(),
1421            1,
1422            "second writer should be flushed"
1423        );
1424        Ok(())
1425    }
1426
1427    #[test]
1428    fn should_short_circuit_on_write_error() {
1429        let w1 = RecordingWriter::failing_write();
1430        let w2 = RecordingWriter::new();
1431        let composite = CompositeItemWriter {
1432            first: w1,
1433            second: w2,
1434        };
1435        let result = composite.write(&[1, 2, 3]);
1436        assert!(result.is_err(), "error should propagate");
1437        assert_eq!(
1438            composite.second.write_calls.get(),
1439            0,
1440            "second writer should not be called after first fails"
1441        );
1442    }
1443
1444    #[test]
1445    fn should_short_circuit_on_open_error() {
1446        let w1 = RecordingWriter::failing_open();
1447        let w2 = RecordingWriter::new();
1448        let composite = CompositeItemWriter {
1449            first: w1,
1450            second: w2,
1451        };
1452        let result = composite.open();
1453        assert!(result.is_err(), "error should propagate");
1454        assert_eq!(
1455            composite.second.open_calls.get(),
1456            0,
1457            "second writer should not be opened after first fails"
1458        );
1459    }
1460
1461    #[test]
1462    fn should_flush_both_writers_even_when_first_fails() {
1463        let w1 = RecordingWriter {
1464            fail_flush: true,
1465            ..RecordingWriter::new()
1466        };
1467        let w2 = RecordingWriter::new();
1468        let composite = CompositeItemWriter {
1469            first: w1,
1470            second: w2,
1471        };
1472        let result = composite.flush();
1473        assert!(result.is_err(), "error should propagate");
1474        assert_eq!(
1475            composite.second.flush_calls.get(),
1476            1,
1477            "second writer should still be flushed even when first fails"
1478        );
1479    }
1480
1481    #[test]
1482    fn should_close_both_writers_even_when_first_fails() {
1483        let w1 = RecordingWriter {
1484            fail_close: true,
1485            ..RecordingWriter::new()
1486        };
1487        let w2 = RecordingWriter::new();
1488        let composite = CompositeItemWriter {
1489            first: w1,
1490            second: w2,
1491        };
1492        let result = composite.close();
1493        assert!(result.is_err(), "error should propagate");
1494        assert_eq!(
1495            composite.second.close_calls.get(),
1496            1,
1497            "second writer should still be closed even when first fails"
1498        );
1499    }
1500
1501    #[test]
1502    fn should_chain_two_writers_via_builder() -> Result<(), BatchError> {
1503        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1504            .link(RecordingWriter::new())
1505            .build();
1506        composite.write(&[10, 20])?;
1507        assert_eq!(
1508            composite.first.items_written.get(),
1509            2,
1510            "first writer should receive 2 items"
1511        );
1512        assert_eq!(
1513            composite.second.items_written.get(),
1514            2,
1515            "second writer should receive 2 items"
1516        );
1517        Ok(())
1518    }
1519
1520    #[test]
1521    fn should_chain_three_writers() -> Result<(), BatchError> {
1522        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1523            .link(RecordingWriter::new())
1524            .link(RecordingWriter::new())
1525            .build();
1526        composite.write(&[1, 2, 3, 4])?;
1527        // composite type: CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>
1528        // composite.first is CompositeItemWriter<W1, W2>
1529        // composite.second is W3
1530        assert_eq!(
1531            composite.first.first.items_written.get(),
1532            4,
1533            "writer 1 should receive 4 items"
1534        );
1535        assert_eq!(
1536            composite.first.second.items_written.get(),
1537            4,
1538            "writer 2 should receive 4 items"
1539        );
1540        assert_eq!(
1541            composite.second.items_written.get(),
1542            4,
1543            "writer 3 should receive 4 items"
1544        );
1545        Ok(())
1546    }
1547
1548    #[test]
1549    fn should_use_box_blanket_impl_as_item_writer() -> Result<(), BatchError> {
1550        let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1551            .link(RecordingWriter::new())
1552            .build();
1553        let boxed: Box<dyn ItemWriter<i32>> = Box::new(composite);
1554        boxed.write(&[5, 6, 7])?;
1555        // The test verifies that Box<dyn ItemWriter<T>> can be used as an ItemWriter<T>.
1556        // We can't inspect the inner writers through Box<dyn>, so asserting Ok is sufficient.
1557        Ok(())
1558    }
1559
1560    #[test]
1561    fn should_use_box_concrete_writer_as_item_writer() -> Result<(), BatchError> {
1562        let boxed: Box<RecordingWriter> = Box::new(RecordingWriter::new());
1563        boxed.open()?;
1564        boxed.write(&[1, 2])?;
1565        boxed.flush()?;
1566        boxed.close()?;
1567        assert_eq!(
1568            boxed.items_written.get(),
1569            2,
1570            "boxed concrete writer should delegate write"
1571        );
1572        assert_eq!(
1573            boxed.open_calls.get(),
1574            1,
1575            "boxed concrete writer should delegate open"
1576        );
1577        assert_eq!(
1578            boxed.flush_calls.get(),
1579            1,
1580            "boxed concrete writer should delegate flush"
1581        );
1582        assert_eq!(
1583            boxed.close_calls.get(),
1584            1,
1585            "boxed concrete writer should delegate close"
1586        );
1587        Ok(())
1588    }
1589}