spring_batch_rs/core/item.rs
1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50/// items: Vec<String>,
51/// position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55/// fn read(&mut self) -> ItemReaderResult<String> {
56/// if self.position < self.items.len() {
57/// let item = self.items[self.position].clone();
58/// self.position += 1;
59/// Ok(Some(item))
60/// } else {
61/// Ok(None) // End of data
62/// }
63/// }
64/// }
65/// ```
66pub trait ItemReader<I> {
67 /// Reads an item from the reader.
68 ///
69 /// # Returns
70 /// - `Ok(Some(item))` when an item is successfully read
71 /// - `Ok(None)` when there are no more items to read (end of data)
72 /// - `Err(BatchError)` when an error occurs during reading
73 fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`crate::core::step::StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// #[derive(Clone)]
107/// struct Person { name: String, age: u32 }
108///
109/// impl ItemProcessor<Person, Person> for AdultFilter {
110/// fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
111/// if item.age >= 18 {
112/// Ok(Some(item.clone())) // keep adults
113/// } else {
114/// Ok(None) // filter out minors
115/// }
116/// }
117/// }
118/// ```
119pub trait ItemProcessor<I, O> {
120 /// Processes an item and returns the processed result.
121 ///
122 /// # Parameters
123 /// - `item`: The item to process
124 ///
125 /// # Returns
126 /// - `Ok(Some(processed_item))` when the item is successfully processed
127 /// - `Ok(None)` when the item is intentionally filtered out
128 /// - `Err(BatchError)` when an error occurs during processing
129 fn process(&self, item: &I) -> ItemProcessorResult<O>;
130}
131
132/// A trait for writing items.
133///
134/// This trait defines the contract for components that write items to a data destination.
135/// It is one of the fundamental building blocks of the batch processing pipeline.
136///
137/// # Design Pattern
138///
139/// This follows the Strategy Pattern, allowing different writing strategies to be
140/// interchangeable while maintaining a consistent interface.
141///
142/// # Lifecycle Methods
143///
144/// This trait includes additional lifecycle methods:
145/// - `flush()`: Flushes any buffered data
146/// - `open()`: Initializes resources before writing starts
147/// - `close()`: Releases resources after writing completes
148///
149/// # Example
150///
151/// ```
152/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
153/// use spring_batch_rs::error::BatchError;
154///
155/// struct ConsoleWriter;
156///
157/// impl ItemWriter<String> for ConsoleWriter {
158/// fn write(&self, items: &[String]) -> ItemWriterResult {
159/// for item in items {
160/// println!("{}", item);
161/// }
162/// Ok(())
163/// }
164/// }
165/// ```
166pub trait ItemWriter<O> {
167 /// Writes the given items.
168 ///
169 /// # Parameters
170 /// - `items`: A slice of items to write
171 ///
172 /// # Returns
173 /// - `Ok(())` when items are successfully written
174 /// - `Err(BatchError)` when an error occurs during writing
175 fn write(&self, items: &[O]) -> ItemWriterResult;
176
177 /// Flushes any buffered data.
178 ///
179 /// This method is called after a chunk of items has been written, and
180 /// allows the writer to flush any internally buffered data to the destination.
181 ///
182 /// # Default Implementation
183 ///
184 /// The default implementation does nothing and returns `Ok(())`.
185 ///
186 /// # Returns
187 /// - `Ok(())` when the flush operation succeeds
188 /// - `Err(BatchError)` when an error occurs during flushing
189 fn flush(&self) -> ItemWriterResult {
190 Ok(())
191 }
192
193 /// Opens the writer.
194 ///
195 /// This method is called before any items are written, and allows the writer
196 /// to initialize any resources it needs.
197 ///
198 /// # Default Implementation
199 ///
200 /// The default implementation does nothing and returns `Ok(())`.
201 ///
202 /// # Returns
203 /// - `Ok(())` when the open operation succeeds
204 /// - `Err(BatchError)` when an error occurs during opening
205 fn open(&self) -> ItemWriterResult {
206 Ok(())
207 }
208
209 /// Closes the writer.
210 ///
211 /// This method is called after all items have been written, and allows the writer
212 /// to release any resources it acquired.
213 ///
214 /// # Default Implementation
215 ///
216 /// The default implementation does nothing and returns `Ok(())`.
217 ///
218 /// # Returns
219 /// - `Ok(())` when the close operation succeeds
220 /// - `Err(BatchError)` when an error occurs during closing
221 fn close(&self) -> ItemWriterResult {
222 Ok(())
223 }
224}
225
226/// A pass-through processor that returns items unchanged.
227///
228/// This processor implements the identity function for batch processing pipelines.
229/// It takes an input item and returns it unchanged, making it useful for scenarios
230/// where you need a processor in the pipeline but don't want to transform the data.
231///
232/// # Type Parameters
233///
234/// - `T`: The item type that will be passed through unchanged. Must implement `Clone`.
235///
236/// # Use Cases
237///
238/// - Testing batch processing pipelines without data transformation
239/// - Placeholder processor during development
240/// - Pipelines where processing logic is conditional and sometimes bypassed
241/// - Maintaining consistent pipeline structure when transformation is optional
242///
243/// # Performance
244///
245/// This processor performs a clone operation on each item. For large or complex
246/// data structures, consider whether pass-through processing is necessary or if
247/// the pipeline can be restructured to avoid unnecessary cloning.
248///
249/// # Examples
250///
251/// ```
252/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
253///
254/// let processor = PassThroughProcessor::<String>::new();
255/// let input = "Hello, World!".to_string();
256/// let result = processor.process(&input).unwrap();
257/// assert_eq!(result, Some(input));
258/// ```
259///
260/// Using with different data types:
261///
262/// ```
263/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
264///
265/// // With integers
266/// let int_processor = PassThroughProcessor::<i32>::new();
267/// let number = 42;
268/// let result = int_processor.process(&number).unwrap();
269/// assert_eq!(result, Some(number));
270///
271/// // With custom structs
272/// #[derive(Clone, PartialEq, Debug)]
273/// struct Person {
274/// name: String,
275/// age: u32,
276/// }
277///
278/// let person_processor = PassThroughProcessor::<Person>::new();
279/// let person = Person {
280/// name: "Alice".to_string(),
281/// age: 30,
282/// };
283/// let result = person_processor.process(&person).unwrap();
284/// assert_eq!(result, Some(person));
285/// ```
286#[derive(Default)]
287pub struct PassThroughProcessor<T> {
288 _phantom: std::marker::PhantomData<T>,
289}
290
291impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
292 /// Processes an item by returning it unchanged.
293 ///
294 /// # Parameters
295 /// - `item`: The item to process (will be cloned and returned unchanged)
296 ///
297 /// # Returns
298 /// - `Ok(Some(cloned_item))` - Always succeeds and returns a clone of the input item
299 ///
300 /// # Examples
301 ///
302 /// ```
303 /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
304 ///
305 /// let processor = PassThroughProcessor::<Vec<i32>>::new();
306 /// let input = vec![1, 2, 3];
307 /// let result = processor.process(&input).unwrap();
308 /// assert_eq!(result, Some(input));
309 /// ```
310 fn process(&self, item: &T) -> ItemProcessorResult<T> {
311 Ok(Some(item.clone()))
312 }
313}
314
315impl<T: Clone> PassThroughProcessor<T> {
316 /// Creates a new `PassThroughProcessor`.
317 ///
318 /// # Returns
319 /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use spring_batch_rs::core::item::PassThroughProcessor;
325 ///
326 /// let processor = PassThroughProcessor::<String>::new();
327 /// ```
328 pub fn new() -> Self {
329 Self {
330 _phantom: std::marker::PhantomData,
331 }
332 }
333}
334
335/// A composite processor that chains two processors sequentially using static dispatch.
336///
337/// The output of the first processor becomes the input of the second.
338/// If the first processor filters an item (returns `Ok(None)`), the chain
339/// stops immediately and `Ok(None)` is returned — the second processor is
340/// never called.
341///
342/// Both processors are stored by value — no heap allocation occurs inside the
343/// struct itself. This mirrors the pattern used by standard library iterator
344/// adapters such as [`std::iter::Chain`].
345///
346/// Construct chains using [`CompositeItemProcessorBuilder`] rather than
347/// instantiating this struct directly.
348///
349/// # Type Parameters
350///
351/// - `P1`: The first processor type. Must implement `ItemProcessor<I, M>` for
352/// some input type `I` and intermediate type `M`.
353/// - `P2`: The second processor type. Must implement `ItemProcessor<M, O>` where
354/// `M` is the output type of `P1` and `O` is the final output type.
355/// - `M`: The intermediate type — output of `P1`, input of `P2`. Tracked via
356/// `PhantomData` so it participates in type inference without being stored.
357///
358/// # Examples
359///
360/// ```
361/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
362/// use spring_batch_rs::BatchError;
363///
364/// struct DoubleProcessor;
365/// impl ItemProcessor<i32, i32> for DoubleProcessor {
366/// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
367/// Ok(Some(item * 2))
368/// }
369/// }
370///
371/// struct ToStringProcessor;
372/// impl ItemProcessor<i32, String> for ToStringProcessor {
373/// fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
374/// Ok(Some(item.to_string()))
375/// }
376/// }
377///
378/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
379/// .link(ToStringProcessor)
380/// .build();
381///
382/// // 21 * 2 = 42, then converted to "42"
383/// assert_eq!(composite.process(&21).unwrap(), Some("42".to_string()));
384/// ```
385///
386/// # Errors
387///
388/// Returns [`BatchError`] if any processor in the chain returns an error.
389pub struct CompositeItemProcessor<P1, P2, M> {
390 first: P1,
391 second: P2,
392 /// Tracks the intermediate type `M` (output of `P1`, input of `P2`).
393 /// Uses `fn(M) -> M` to keep the type parameter invariant and avoid
394 /// unintended variance.
395 _marker: std::marker::PhantomData<fn(M) -> M>,
396}
397
398impl<I, M, O, P1, P2> ItemProcessor<I, O> for CompositeItemProcessor<P1, P2, M>
399where
400 P1: ItemProcessor<I, M>,
401 P2: ItemProcessor<M, O>,
402{
403 /// Applies the first processor, then — if the result is `Some` — applies
404 /// the second. Returns `Ok(None)` immediately if the first processor
405 /// filters the item.
406 ///
407 /// # Errors
408 ///
409 /// Returns [`BatchError`] if either processor fails.
410 fn process(&self, item: &I) -> ItemProcessorResult<O> {
411 match self.first.process(item)? {
412 Some(intermediate) => self.second.process(&intermediate),
413 None => Ok(None),
414 }
415 }
416}
417
418/// Builder for creating a chain of [`ItemProcessor`]s using static dispatch.
419///
420/// Start the chain with [`new`](CompositeItemProcessorBuilder::new), append
421/// processors with [`link`](CompositeItemProcessorBuilder::link), and finalise
422/// with [`build`](CompositeItemProcessorBuilder::build). Each call to `link`
423/// wraps the accumulated chain in a [`CompositeItemProcessor`], changing the
424/// output type. Mismatched types are caught at compile time.
425///
426/// The built chain stores all processors by value — no heap allocations occur
427/// inside the processor itself. The type of the built value encodes the full
428/// chain structure (e.g. `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`),
429/// similar to how `Iterator` adapters compose in the standard library.
430///
431/// # Type Parameters
432///
433/// - `P`: The accumulated processor type. Starts as the first processor and
434/// is wrapped in [`CompositeItemProcessor`] with each [`link`](CompositeItemProcessorBuilder::link) call.
435///
436/// # Examples
437///
438/// Two processors (`i32 → i32 → String`):
439///
440/// ```
441/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
442/// use spring_batch_rs::BatchError;
443///
444/// struct DoubleProcessor;
445/// impl ItemProcessor<i32, i32> for DoubleProcessor {
446/// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
447/// Ok(Some(item * 2))
448/// }
449/// }
450///
451/// struct ToStringProcessor;
452/// impl ItemProcessor<i32, String> for ToStringProcessor {
453/// fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
454/// Ok(Some(item.to_string()))
455/// }
456/// }
457///
458/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
459/// .link(ToStringProcessor)
460/// .build();
461///
462/// assert_eq!(composite.process(&21).unwrap(), Some("42".to_string()));
463/// ```
464///
465/// Three processors (`i32 → i32 → i32 → String`):
466///
467/// ```
468/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
469/// use spring_batch_rs::BatchError;
470///
471/// struct AddOneProcessor;
472/// impl ItemProcessor<i32, i32> for AddOneProcessor {
473/// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
474/// Ok(Some(item + 1))
475/// }
476/// }
477///
478/// struct DoubleProcessor;
479/// impl ItemProcessor<i32, i32> for DoubleProcessor {
480/// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
481/// Ok(Some(item * 2))
482/// }
483/// }
484///
485/// struct ToStringProcessor;
486/// impl ItemProcessor<i32, String> for ToStringProcessor {
487/// fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
488/// Ok(Some(item.to_string()))
489/// }
490/// }
491///
492/// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
493/// .link(DoubleProcessor)
494/// .link(ToStringProcessor)
495/// .build();
496///
497/// // (4 + 1) * 2 = 10 → "10"
498/// assert_eq!(composite.process(&4).unwrap(), Some("10".to_string()));
499/// ```
500pub struct CompositeItemProcessorBuilder<P> {
501 processor: P,
502}
503
504impl<P> CompositeItemProcessorBuilder<P> {
505 /// Creates a new builder with the given processor as the first in the chain.
506 ///
507 /// # Parameters
508 ///
509 /// - `first`: The first processor in the chain.
510 ///
511 /// # Examples
512 ///
513 /// ```
514 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
515 /// use spring_batch_rs::BatchError;
516 ///
517 /// struct UppercaseProcessor;
518 /// impl ItemProcessor<String, String> for UppercaseProcessor {
519 /// fn process(&self, item: &String) -> Result<Option<String>, BatchError> {
520 /// Ok(Some(item.to_uppercase()))
521 /// }
522 /// }
523 ///
524 /// let builder = CompositeItemProcessorBuilder::new(UppercaseProcessor);
525 /// let composite = builder.build();
526 /// assert_eq!(composite.process(&"hello".to_string()).unwrap(), Some("HELLO".to_string()));
527 /// ```
528 pub fn new(first: P) -> Self {
529 Self { processor: first }
530 }
531
532 /// Appends a processor to the end of the chain.
533 ///
534 /// Returns a new builder whose accumulated type is
535 /// `CompositeItemProcessor<P, P2>`. The input/output types are verified
536 /// at compile time when the chain is used.
537 ///
538 /// # Type Parameters
539 ///
540 /// - `P2`: The processor type to append.
541 /// - `M`: The intermediate type connecting `P` and `P2`. Inferred by the
542 /// compiler from the `ItemProcessor` impls on `P` and `P2`.
543 ///
544 /// # Parameters
545 ///
546 /// - `next`: The processor to append to the chain.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
552 /// use spring_batch_rs::BatchError;
553 ///
554 /// struct AddOneProcessor;
555 /// impl ItemProcessor<i32, i32> for AddOneProcessor {
556 /// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
557 /// Ok(Some(item + 1))
558 /// }
559 /// }
560 ///
561 /// struct ToStringProcessor;
562 /// impl ItemProcessor<i32, String> for ToStringProcessor {
563 /// fn process(&self, item: &i32) -> Result<Option<String>, BatchError> {
564 /// Ok(Some(item.to_string()))
565 /// }
566 /// }
567 ///
568 /// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
569 /// .link(ToStringProcessor)
570 /// .build();
571 ///
572 /// assert_eq!(composite.process(&41).unwrap(), Some("42".to_string()));
573 /// ```
574 pub fn link<P2, M>(
575 self,
576 next: P2,
577 ) -> CompositeItemProcessorBuilder<CompositeItemProcessor<P, P2, M>> {
578 CompositeItemProcessorBuilder {
579 processor: CompositeItemProcessor {
580 first: self.processor,
581 second: next,
582 _marker: std::marker::PhantomData,
583 },
584 }
585 }
586
587 /// Builds and returns the composite processor.
588 ///
589 /// Returns the accumulated processor value `P`. When chained via `link`,
590 /// `P` will be a nested `CompositeItemProcessor` such as
591 /// `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`.
592 ///
593 /// Pass `&composite` to the step builder's `.processor()` method — Rust
594 /// will coerce it to `&dyn ItemProcessor<I, O>` automatically.
595 ///
596 /// # Examples
597 ///
598 /// ```
599 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
600 /// use spring_batch_rs::BatchError;
601 ///
602 /// struct DoubleProcessor;
603 /// impl ItemProcessor<i32, i32> for DoubleProcessor {
604 /// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
605 /// Ok(Some(item * 2))
606 /// }
607 /// }
608 ///
609 /// struct AddTenProcessor;
610 /// impl ItemProcessor<i32, i32> for AddTenProcessor {
611 /// fn process(&self, item: &i32) -> Result<Option<i32>, BatchError> {
612 /// Ok(Some(item + 10))
613 /// }
614 /// }
615 ///
616 /// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
617 /// .link(AddTenProcessor)
618 /// .build();
619 ///
620 /// // 5 * 2 = 10, then 10 + 10 = 20
621 /// assert_eq!(composite.process(&5).unwrap(), Some(20));
622 /// ```
623 pub fn build(self) -> P {
624 self.processor
625 }
626}
627
628/// A composite writer that fans out the same chunk to two writers sequentially using static dispatch.
629///
630/// Both writers receive identical slices on every `write` call. All four lifecycle
631/// methods (`write`, `flush`, `open`, `close`) are forwarded to `first` then `second`,
632/// short-circuiting on the first `Err`. If `open()` on `first` fails, `second.open()`
633/// is never called — lifecycle management is the step's responsibility.
634///
635/// Both writers are stored by value — no heap allocation occurs inside the struct.
636/// The type encodes the full chain:
637/// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>` for three writers.
638///
639/// Prefer constructing instances via [`CompositeItemWriterBuilder`] rather than
640/// direct struct literal syntax.
641///
642/// # Type Parameters
643///
644/// - `W1`: The first writer type. Must implement `ItemWriter<T>`.
645/// - `W2`: The second writer type. Must implement `ItemWriter<T>`.
646///
647/// # Examples
648///
649/// ```
650/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
651/// use std::rc::Rc;
652/// use std::cell::Cell;
653///
654/// struct CountingWriter { count: Rc<Cell<usize>> }
655/// impl CountingWriter {
656/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
657/// }
658/// impl ItemWriter<i32> for CountingWriter {
659/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
660/// self.count.set(self.count.get() + items.len());
661/// Ok(())
662/// }
663/// }
664///
665/// let c1 = Rc::new(Cell::new(0usize));
666/// let c2 = Rc::new(Cell::new(0usize));
667/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
668/// .link(CountingWriter::new(c2.clone()))
669/// .build();
670/// composite.write(&[1, 2, 3]).unwrap();
671/// assert_eq!(c1.get(), 3);
672/// assert_eq!(c2.get(), 3);
673/// ```
674///
675/// # Errors
676///
677/// Returns [`BatchError`] if any writer in the chain returns an error.
678pub struct CompositeItemWriter<W1, W2> {
679 first: W1,
680 second: W2,
681}
682
683impl<T, W1, W2> ItemWriter<T> for CompositeItemWriter<W1, W2>
684where
685 W1: ItemWriter<T>,
686 W2: ItemWriter<T>,
687{
688 /// Writes `items` to `first`, then to `second`. Short-circuits on the first error.
689 ///
690 /// # Errors
691 ///
692 /// Returns [`BatchError::ItemWriter`] if either writer fails.
693 fn write(&self, items: &[T]) -> ItemWriterResult {
694 self.first.write(items)?;
695 self.second.write(items)
696 }
697
698 /// Flushes both writers regardless of errors. Returns the first error encountered.
699 ///
700 /// Both `first` and `second` are always flushed, even if `first` fails,
701 /// to avoid silently skipping buffered output in the second writer.
702 ///
703 /// # Errors
704 ///
705 /// Returns [`BatchError::ItemWriter`] if either flush fails. If both fail,
706 /// the error from `first` is returned.
707 fn flush(&self) -> ItemWriterResult {
708 let r1 = self.first.flush();
709 let r2 = self.second.flush();
710 r1.and(r2)
711 }
712
713 /// Opens `first`, then `second`. Short-circuits on the first error.
714 ///
715 /// # Errors
716 ///
717 /// Returns [`BatchError::ItemWriter`] if either open fails.
718 fn open(&self) -> ItemWriterResult {
719 self.first.open()?;
720 self.second.open()
721 }
722
723 /// Closes both writers regardless of errors. Returns the first error encountered.
724 ///
725 /// Both `first` and `second` are always closed, even if `first` fails,
726 /// to avoid resource leaks.
727 ///
728 /// # Errors
729 ///
730 /// Returns [`BatchError::ItemWriter`] if either close fails. If both fail,
731 /// the error from `first` is returned.
732 fn close(&self) -> ItemWriterResult {
733 let r1 = self.first.close();
734 let r2 = self.second.close();
735 r1.and(r2)
736 }
737}
738
739/// Builder for creating a fan-out chain of [`ItemWriter`]s using static dispatch.
740///
741/// Start the chain with [`new`](CompositeItemWriterBuilder::new), append writers
742/// with [`add`](CompositeItemWriterBuilder::add), and finalise with
743/// [`build`](CompositeItemWriterBuilder::build). Each call to `add` wraps the
744/// accumulated chain in a [`CompositeItemWriter`]. The built chain stores all
745/// writers by value — no heap allocations occur inside the chain itself.
746///
747/// # Type Parameters
748///
749/// - `W`: The accumulated writer type. Starts as the first writer and is wrapped
750/// in [`CompositeItemWriter`] with each [`add`](CompositeItemWriterBuilder::add) call.
751///
752/// # Examples
753///
754/// Two writers:
755///
756/// ```
757/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
758/// use std::rc::Rc;
759/// use std::cell::Cell;
760///
761/// struct CountingWriter { count: Rc<Cell<usize>> }
762/// impl CountingWriter {
763/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
764/// }
765/// impl ItemWriter<i32> for CountingWriter {
766/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
767/// self.count.set(self.count.get() + items.len());
768/// Ok(())
769/// }
770/// }
771///
772/// let c1 = Rc::new(Cell::new(0usize));
773/// let c2 = Rc::new(Cell::new(0usize));
774/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
775/// .link(CountingWriter::new(c2.clone()))
776/// .build();
777///
778/// composite.write(&[1, 2, 3]).unwrap();
779/// assert_eq!(c1.get(), 3);
780/// assert_eq!(c2.get(), 3);
781/// ```
782///
783/// Three writers:
784///
785/// ```
786/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
787/// use std::rc::Rc;
788/// use std::cell::Cell;
789///
790/// struct CountingWriter { count: Rc<Cell<usize>> }
791/// impl CountingWriter {
792/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
793/// }
794/// impl ItemWriter<i32> for CountingWriter {
795/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
796/// self.count.set(self.count.get() + items.len());
797/// Ok(())
798/// }
799/// }
800///
801/// let c1 = Rc::new(Cell::new(0usize));
802/// let c2 = Rc::new(Cell::new(0usize));
803/// let c3 = Rc::new(Cell::new(0usize));
804/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
805/// .link(CountingWriter::new(c2.clone()))
806/// .link(CountingWriter::new(c3.clone()))
807/// .build();
808///
809/// composite.write(&[1, 2]).unwrap();
810/// assert_eq!(c1.get(), 2);
811/// assert_eq!(c2.get(), 2);
812/// assert_eq!(c3.get(), 2);
813/// ```
814pub struct CompositeItemWriterBuilder<W> {
815 writer: W,
816}
817
818impl<W> CompositeItemWriterBuilder<W> {
819 /// Creates a new builder with the given writer as the first delegate.
820 ///
821 /// # Parameters
822 ///
823 /// - `first`: The first writer in the fan-out chain.
824 ///
825 /// # Examples
826 ///
827 /// ```
828 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
829 ///
830 /// struct CountingWriter { count: std::cell::Cell<usize> }
831 /// impl CountingWriter { fn new() -> Self { Self { count: std::cell::Cell::new(0) } } }
832 /// impl ItemWriter<i32> for CountingWriter {
833 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
834 /// self.count.set(self.count.get() + items.len());
835 /// Ok(())
836 /// }
837 /// }
838 ///
839 /// let writer = CompositeItemWriterBuilder::new(CountingWriter::new()).build();
840 /// writer.write(&[1, 2, 3]).unwrap();
841 /// assert_eq!(writer.count.get(), 3, "writer should receive all items");
842 /// ```
843 pub fn new(first: W) -> Self {
844 Self { writer: first }
845 }
846
847 /// Appends a writer to the fan-out chain.
848 ///
849 /// Returns a new builder whose accumulated type is `CompositeItemWriter<W, W2>`.
850 /// Both writers must implement `ItemWriter<T>` for the same `T` — verified at
851 /// compile time.
852 ///
853 /// # Parameters
854 ///
855 /// - `next`: The writer to link into the chain.
856 ///
857 /// # Examples
858 ///
859 /// ```
860 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
861 /// use std::rc::Rc;
862 /// use std::cell::Cell;
863 ///
864 /// struct CountingWriter { count: Rc<Cell<usize>> }
865 /// impl CountingWriter {
866 /// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
867 /// }
868 /// impl ItemWriter<i32> for CountingWriter {
869 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
870 /// self.count.set(self.count.get() + items.len());
871 /// Ok(())
872 /// }
873 /// }
874 ///
875 /// let c1 = Rc::new(Cell::new(0usize));
876 /// let c2 = Rc::new(Cell::new(0usize));
877 /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
878 /// .link(CountingWriter::new(c2.clone()))
879 /// .build();
880 ///
881 /// composite.write(&[1, 2, 3]).unwrap();
882 /// assert_eq!(c1.get(), 3, "first writer should receive all items");
883 /// assert_eq!(c2.get(), 3, "second writer should receive all items");
884 /// ```
885 #[allow(clippy::should_implement_trait)]
886 pub fn link<W2>(self, next: W2) -> CompositeItemWriterBuilder<CompositeItemWriter<W, W2>> {
887 CompositeItemWriterBuilder {
888 writer: CompositeItemWriter {
889 first: self.writer,
890 second: next,
891 },
892 }
893 }
894
895 /// Builds and returns the composite writer.
896 ///
897 /// Returns the accumulated writer value `W`. When chained via `link`, `W` is a
898 /// nested `CompositeItemWriter` such as
899 /// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>`.
900 ///
901 /// Pass `&composite` to the step builder's `.writer()` method.
902 ///
903 /// # Examples
904 ///
905 /// ```
906 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
907 /// use std::rc::Rc;
908 /// use std::cell::Cell;
909 ///
910 /// struct CountingWriter { count: Rc<Cell<usize>> }
911 /// impl CountingWriter {
912 /// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
913 /// }
914 /// impl ItemWriter<i32> for CountingWriter {
915 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
916 /// self.count.set(self.count.get() + items.len());
917 /// Ok(())
918 /// }
919 /// }
920 ///
921 /// let c1 = Rc::new(Cell::new(0usize));
922 /// let c2 = Rc::new(Cell::new(0usize));
923 /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
924 /// .link(CountingWriter::new(c2.clone()))
925 /// .build();
926 ///
927 /// composite.write(&[1, 2, 3]).unwrap();
928 /// assert_eq!(c1.get(), 3);
929 /// assert_eq!(c2.get(), 3);
930 /// ```
931 pub fn build(self) -> W {
932 self.writer
933 }
934}
935
936/// Allows any `Box<P>` where `P: ItemProcessor<I, O>` to be used wherever
937/// `&dyn ItemProcessor<I, O>` is expected — including boxed concrete types
938/// (`Box<MyProcessor>`) and boxed trait objects (`Box<dyn ItemProcessor<I, O>>`).
939///
940/// The `?Sized` bound is what makes this cover trait objects: `dyn Trait` is
941/// unsized, so without `?Sized` the impl would not apply to them.
942impl<I, O, P: ItemProcessor<I, O> + ?Sized> ItemProcessor<I, O> for Box<P> {
943 fn process(&self, item: &I) -> ItemProcessorResult<O> {
944 (**self).process(item)
945 }
946}
947
948/// Allows any `Box<W>` where `W: ItemWriter<T>` to be used wherever
949/// `&dyn ItemWriter<T>` is expected — including boxed concrete types
950/// (`Box<MyWriter>`) and boxed trait objects (`Box<dyn ItemWriter<T>>`).
951///
952/// The `?Sized` bound makes this cover trait objects: `dyn Trait` is
953/// unsized, so without `?Sized` the impl would not apply to them.
954impl<T, W: ItemWriter<T> + ?Sized> ItemWriter<T> for Box<W> {
955 fn write(&self, items: &[T]) -> ItemWriterResult {
956 (**self).write(items)
957 }
958 fn flush(&self) -> ItemWriterResult {
959 (**self).flush()
960 }
961 fn open(&self) -> ItemWriterResult {
962 (**self).open()
963 }
964 fn close(&self) -> ItemWriterResult {
965 (**self).close()
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972
973 #[test]
974 fn should_create_new_pass_through_processor() {
975 let _processor = PassThroughProcessor::<String>::new();
976 // Test that we can create the processor without panicking
977 // Verify it's a zero-sized type (only contains PhantomData)
978 assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
979 }
980
981 #[test]
982 fn should_create_pass_through_processor_with_default() {
983 let _processor = PassThroughProcessor::<i32>::default();
984 // Test that we can create the processor using Default trait
985 // Verify it's a zero-sized type (only contains PhantomData)
986 assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
987 }
988
989 #[test]
990 fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
991 let processor = PassThroughProcessor::new();
992 let input = "Hello, World!".to_string();
993 let expected = input.clone();
994
995 let result = processor.process(&input)?;
996
997 assert_eq!(result, Some(expected));
998 Ok(())
999 }
1000
1001 #[test]
1002 fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
1003 let processor = PassThroughProcessor::new();
1004 let input = 42i32;
1005
1006 let result = processor.process(&input)?;
1007
1008 assert_eq!(result, Some(input));
1009 Ok(())
1010 }
1011
1012 #[test]
1013 fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
1014 let processor = PassThroughProcessor::new();
1015 let input = vec![1, 2, 3, 4, 5];
1016 let expected = input.clone();
1017
1018 let result = processor.process(&input)?;
1019
1020 assert_eq!(result, Some(expected));
1021 Ok(())
1022 }
1023
1024 #[test]
1025 fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
1026 #[derive(Clone, PartialEq, Debug)]
1027 struct TestData {
1028 id: u32,
1029 name: String,
1030 values: Vec<f64>,
1031 }
1032
1033 let processor = PassThroughProcessor::new();
1034 let input = TestData {
1035 id: 123,
1036 name: "Test Item".to_string(),
1037 values: vec![1.1, 2.2, 3.3],
1038 };
1039 let expected = input.clone();
1040
1041 let result = processor.process(&input)?;
1042
1043 assert_eq!(result, Some(expected));
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
1049 let processor = PassThroughProcessor::new();
1050
1051 // Test with Some value
1052 let input_some = Some("test".to_string());
1053 let result_some = processor.process(&input_some)?;
1054 assert_eq!(result_some, Some(input_some));
1055
1056 // Test with None value
1057 let input_none: Option<String> = None;
1058 let result_none = processor.process(&input_none)?;
1059 assert_eq!(result_none, Some(input_none));
1060
1061 Ok(())
1062 }
1063
1064 #[test]
1065 fn should_handle_empty_collections() -> Result<(), BatchError> {
1066 let vec_processor = PassThroughProcessor::new();
1067 let empty_vec: Vec<i32> = vec![];
1068 let result_vec = vec_processor.process(&empty_vec)?;
1069 assert_eq!(result_vec, Some(empty_vec));
1070
1071 let string_processor = PassThroughProcessor::new();
1072 let empty_string = String::new();
1073 let result_string = string_processor.process(&empty_string)?;
1074 assert_eq!(result_string, Some(empty_string));
1075
1076 Ok(())
1077 }
1078
1079 #[test]
1080 fn should_clone_input_not_move() {
1081 let processor = PassThroughProcessor::new();
1082 let input = "original".to_string();
1083 let input_copy = input.clone();
1084
1085 let _result = processor.process(&input).unwrap();
1086
1087 assert_eq!(input, input_copy);
1088 assert_eq!(input, "original");
1089 }
1090
1091 #[test]
1092 fn should_work_with_multiple_processors() -> Result<(), BatchError> {
1093 let processor1 = PassThroughProcessor::<String>::new();
1094 let processor2 = PassThroughProcessor::<String>::new();
1095
1096 let input = "test data".to_string();
1097 let result1 = processor1.process(&input)?;
1098 let inner = result1.unwrap();
1099 let result2 = processor2.process(&inner)?;
1100
1101 assert_eq!(result2, Some(input));
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn should_handle_large_data_structures() -> Result<(), BatchError> {
1107 let processor = PassThroughProcessor::new();
1108
1109 let large_input: Vec<i32> = (0..10000).collect();
1110 let expected_len = large_input.len();
1111
1112 let result = processor.process(&large_input)?;
1113
1114 // PassThroughProcessor always returns Some — unwrap is safe
1115 assert_eq!(result.unwrap().len(), expected_len);
1116 Ok(())
1117 }
1118
1119 #[test]
1120 fn should_use_default_flush_open_close_implementations() {
1121 struct MinimalWriter;
1122 impl ItemWriter<String> for MinimalWriter {
1123 fn write(&self, _: &[String]) -> ItemWriterResult {
1124 Ok(())
1125 }
1126 // flush, open, close use the trait's default implementations
1127 }
1128 let w = MinimalWriter;
1129 assert!(w.flush().is_ok(), "default flush should return Ok");
1130 assert!(w.open().is_ok(), "default open should return Ok");
1131 assert!(w.close().is_ok(), "default close should return Ok");
1132 }
1133
1134 // --- CompositeItemProcessor / CompositeItemProcessorBuilder ---
1135
1136 struct DoubleProcessor;
1137 impl ItemProcessor<i32, i32> for DoubleProcessor {
1138 fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1139 Ok(Some(item * 2))
1140 }
1141 }
1142
1143 struct AddTenProcessor;
1144 impl ItemProcessor<i32, i32> for AddTenProcessor {
1145 fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1146 Ok(Some(item + 10))
1147 }
1148 }
1149
1150 struct ToStringProcessor;
1151 impl ItemProcessor<i32, String> for ToStringProcessor {
1152 fn process(&self, item: &i32) -> ItemProcessorResult<String> {
1153 Ok(Some(item.to_string()))
1154 }
1155 }
1156
1157 struct FilterEvenProcessor;
1158 impl ItemProcessor<i32, i32> for FilterEvenProcessor {
1159 fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
1160 if item % 2 == 0 {
1161 Ok(Some(*item))
1162 } else {
1163 Ok(None) // filter odd numbers
1164 }
1165 }
1166 }
1167
1168 struct FailingProcessor;
1169 impl ItemProcessor<i32, i32> for FailingProcessor {
1170 fn process(&self, _item: &i32) -> ItemProcessorResult<i32> {
1171 Err(BatchError::ItemProcessor("forced failure".to_string()))
1172 }
1173 }
1174
1175 #[test]
1176 fn should_chain_two_same_type_processors() -> Result<(), BatchError> {
1177 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1178 .link(AddTenProcessor)
1179 .build();
1180
1181 // 5 * 2 = 10, then 10 + 10 = 20
1182 assert_eq!(
1183 composite.process(&5)?,
1184 Some(20),
1185 "5 * 2 + 10 should equal 20"
1186 );
1187 Ok(())
1188 }
1189
1190 #[test]
1191 fn should_chain_two_type_changing_processors() -> Result<(), BatchError> {
1192 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1193 .link(ToStringProcessor)
1194 .build();
1195
1196 // 21 * 2 = 42, then "42"
1197 assert_eq!(composite.process(&21)?, Some("42".to_string()));
1198 Ok(())
1199 }
1200
1201 #[test]
1202 fn should_chain_three_processors() -> Result<(), BatchError> {
1203 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1204 .link(AddTenProcessor)
1205 .link(ToStringProcessor)
1206 .build();
1207
1208 // 5 * 2 = 10, then 10 + 10 = 20, then "20"
1209 assert_eq!(composite.process(&5)?, Some("20".to_string()));
1210 Ok(())
1211 }
1212
1213 #[test]
1214 fn should_stop_chain_when_first_processor_filters_item() -> Result<(), BatchError> {
1215 let composite = CompositeItemProcessorBuilder::new(FilterEvenProcessor)
1216 .link(ToStringProcessor)
1217 .build();
1218
1219 // 3 is odd → filtered by first processor → second processor never called
1220 assert_eq!(
1221 composite.process(&3)?,
1222 None,
1223 "odd number should be filtered"
1224 );
1225 // 4 is even → passes through → converted to string
1226 assert_eq!(
1227 composite.process(&4)?,
1228 Some("4".to_string()),
1229 "even number should pass"
1230 );
1231 Ok(())
1232 }
1233
1234 #[test]
1235 fn should_propagate_error_from_first_processor() {
1236 let composite = CompositeItemProcessorBuilder::new(FailingProcessor)
1237 .link(ToStringProcessor)
1238 .build();
1239
1240 let result = composite.process(&1);
1241 assert!(
1242 result.is_err(),
1243 "error from first processor should propagate"
1244 );
1245 }
1246
1247 #[test]
1248 fn should_propagate_error_from_second_processor() {
1249 struct AlwaysFailI32;
1250 impl ItemProcessor<i32, i32> for AlwaysFailI32 {
1251 fn process(&self, _: &i32) -> ItemProcessorResult<i32> {
1252 Err(BatchError::ItemProcessor("second failed".to_string()))
1253 }
1254 }
1255
1256 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1257 .link(AlwaysFailI32)
1258 .build();
1259
1260 let result = composite.process(&5);
1261 assert!(
1262 result.is_err(),
1263 "error from second processor should propagate"
1264 );
1265 }
1266
1267 #[test]
1268 fn should_use_box_blanket_impl_as_item_processor() -> Result<(), BatchError> {
1269 // build() returns the concrete type; Box::new() it to get a trait object.
1270 // Box<dyn ItemProcessor<I, O>> implements ItemProcessor<I, O> via the ?Sized blanket impl.
1271 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1272 .link(ToStringProcessor)
1273 .build();
1274 let boxed: Box<dyn ItemProcessor<i32, String>> = Box::new(composite);
1275
1276 let result = boxed.process(&3)?;
1277 assert_eq!(
1278 result,
1279 Some("6".to_string()),
1280 "boxed trait object should delegate to inner processor"
1281 );
1282 Ok(())
1283 }
1284
1285 #[test]
1286 fn should_use_box_concrete_type_as_item_processor() -> Result<(), BatchError> {
1287 // Box<ConcreteProcessor> also implements ItemProcessor<I, O> via the ?Sized blanket impl
1288 let boxed: Box<DoubleProcessor> = Box::new(DoubleProcessor);
1289
1290 let result = boxed.process(&7)?;
1291 assert_eq!(
1292 result,
1293 Some(14),
1294 "boxed concrete processor should delegate to inner processor"
1295 );
1296 Ok(())
1297 }
1298
1299 // --- CompositeItemWriter ---
1300
1301 use std::cell::Cell;
1302
1303 struct RecordingWriter {
1304 write_calls: Cell<usize>,
1305 items_written: Cell<usize>,
1306 open_calls: Cell<usize>,
1307 close_calls: Cell<usize>,
1308 flush_calls: Cell<usize>,
1309 fail_write: bool,
1310 fail_open: bool,
1311 fail_flush: bool,
1312 fail_close: bool,
1313 }
1314
1315 impl RecordingWriter {
1316 fn new() -> Self {
1317 Self {
1318 write_calls: Cell::new(0),
1319 items_written: Cell::new(0),
1320 open_calls: Cell::new(0),
1321 close_calls: Cell::new(0),
1322 flush_calls: Cell::new(0),
1323 fail_write: false,
1324 fail_open: false,
1325 fail_flush: false,
1326 fail_close: false,
1327 }
1328 }
1329 fn failing_write() -> Self {
1330 Self {
1331 fail_write: true,
1332 ..Self::new()
1333 }
1334 }
1335 fn failing_open() -> Self {
1336 Self {
1337 fail_open: true,
1338 ..Self::new()
1339 }
1340 }
1341 }
1342
1343 impl ItemWriter<i32> for RecordingWriter {
1344 fn write(&self, items: &[i32]) -> ItemWriterResult {
1345 if self.fail_write {
1346 return Err(BatchError::ItemWriter("forced write failure".to_string()));
1347 }
1348 self.write_calls.set(self.write_calls.get() + 1);
1349 self.items_written
1350 .set(self.items_written.get() + items.len());
1351 Ok(())
1352 }
1353 fn open(&self) -> ItemWriterResult {
1354 if self.fail_open {
1355 return Err(BatchError::ItemWriter("forced open failure".to_string()));
1356 }
1357 self.open_calls.set(self.open_calls.get() + 1);
1358 Ok(())
1359 }
1360 fn close(&self) -> ItemWriterResult {
1361 if self.fail_close {
1362 return Err(BatchError::ItemWriter("forced close failure".to_string()));
1363 }
1364 self.close_calls.set(self.close_calls.get() + 1);
1365 Ok(())
1366 }
1367 fn flush(&self) -> ItemWriterResult {
1368 if self.fail_flush {
1369 return Err(BatchError::ItemWriter("forced flush failure".to_string()));
1370 }
1371 self.flush_calls.set(self.flush_calls.get() + 1);
1372 Ok(())
1373 }
1374 }
1375
1376 #[test]
1377 fn should_write_to_both_writers() -> Result<(), BatchError> {
1378 let w1 = RecordingWriter::new();
1379 let w2 = RecordingWriter::new();
1380 let composite = CompositeItemWriter {
1381 first: w1,
1382 second: w2,
1383 };
1384 composite.write(&[1, 2, 3])?;
1385 assert_eq!(
1386 composite.first.write_calls.get(),
1387 1,
1388 "first writer should be called"
1389 );
1390 assert_eq!(
1391 composite.first.items_written.get(),
1392 3,
1393 "first writer should receive 3 items"
1394 );
1395 assert_eq!(
1396 composite.second.write_calls.get(),
1397 1,
1398 "second writer should be called"
1399 );
1400 assert_eq!(
1401 composite.second.items_written.get(),
1402 3,
1403 "second writer should receive 3 items"
1404 );
1405 Ok(())
1406 }
1407
1408 #[test]
1409 fn should_open_both_writers_in_order() -> Result<(), BatchError> {
1410 let w1 = RecordingWriter::new();
1411 let w2 = RecordingWriter::new();
1412 let composite = CompositeItemWriter {
1413 first: w1,
1414 second: w2,
1415 };
1416 composite.open()?;
1417 assert_eq!(
1418 composite.first.open_calls.get(),
1419 1,
1420 "first writer should be opened"
1421 );
1422 assert_eq!(
1423 composite.second.open_calls.get(),
1424 1,
1425 "second writer should be opened"
1426 );
1427 Ok(())
1428 }
1429
1430 #[test]
1431 fn should_close_both_writers_in_order() -> Result<(), BatchError> {
1432 let w1 = RecordingWriter::new();
1433 let w2 = RecordingWriter::new();
1434 let composite = CompositeItemWriter {
1435 first: w1,
1436 second: w2,
1437 };
1438 composite.close()?;
1439 assert_eq!(
1440 composite.first.close_calls.get(),
1441 1,
1442 "first writer should be closed"
1443 );
1444 assert_eq!(
1445 composite.second.close_calls.get(),
1446 1,
1447 "second writer should be closed"
1448 );
1449 Ok(())
1450 }
1451
1452 #[test]
1453 fn should_flush_both_writers() -> Result<(), BatchError> {
1454 let w1 = RecordingWriter::new();
1455 let w2 = RecordingWriter::new();
1456 let composite = CompositeItemWriter {
1457 first: w1,
1458 second: w2,
1459 };
1460 composite.flush()?;
1461 assert_eq!(
1462 composite.first.flush_calls.get(),
1463 1,
1464 "first writer should be flushed"
1465 );
1466 assert_eq!(
1467 composite.second.flush_calls.get(),
1468 1,
1469 "second writer should be flushed"
1470 );
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn should_short_circuit_on_write_error() {
1476 let w1 = RecordingWriter::failing_write();
1477 let w2 = RecordingWriter::new();
1478 let composite = CompositeItemWriter {
1479 first: w1,
1480 second: w2,
1481 };
1482 let result = composite.write(&[1, 2, 3]);
1483 assert!(result.is_err(), "error should propagate");
1484 assert_eq!(
1485 composite.second.write_calls.get(),
1486 0,
1487 "second writer should not be called after first fails"
1488 );
1489 }
1490
1491 #[test]
1492 fn should_short_circuit_on_open_error() {
1493 let w1 = RecordingWriter::failing_open();
1494 let w2 = RecordingWriter::new();
1495 let composite = CompositeItemWriter {
1496 first: w1,
1497 second: w2,
1498 };
1499 let result = composite.open();
1500 assert!(result.is_err(), "error should propagate");
1501 assert_eq!(
1502 composite.second.open_calls.get(),
1503 0,
1504 "second writer should not be opened after first fails"
1505 );
1506 }
1507
1508 #[test]
1509 fn should_flush_both_writers_even_when_first_fails() {
1510 let w1 = RecordingWriter {
1511 fail_flush: true,
1512 ..RecordingWriter::new()
1513 };
1514 let w2 = RecordingWriter::new();
1515 let composite = CompositeItemWriter {
1516 first: w1,
1517 second: w2,
1518 };
1519 let result = composite.flush();
1520 assert!(result.is_err(), "error should propagate");
1521 assert_eq!(
1522 composite.second.flush_calls.get(),
1523 1,
1524 "second writer should still be flushed even when first fails"
1525 );
1526 }
1527
1528 #[test]
1529 fn should_close_both_writers_even_when_first_fails() {
1530 let w1 = RecordingWriter {
1531 fail_close: true,
1532 ..RecordingWriter::new()
1533 };
1534 let w2 = RecordingWriter::new();
1535 let composite = CompositeItemWriter {
1536 first: w1,
1537 second: w2,
1538 };
1539 let result = composite.close();
1540 assert!(result.is_err(), "error should propagate");
1541 assert_eq!(
1542 composite.second.close_calls.get(),
1543 1,
1544 "second writer should still be closed even when first fails"
1545 );
1546 }
1547
1548 #[test]
1549 fn should_chain_two_writers_via_builder() -> Result<(), BatchError> {
1550 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1551 .link(RecordingWriter::new())
1552 .build();
1553 composite.write(&[10, 20])?;
1554 assert_eq!(
1555 composite.first.items_written.get(),
1556 2,
1557 "first writer should receive 2 items"
1558 );
1559 assert_eq!(
1560 composite.second.items_written.get(),
1561 2,
1562 "second writer should receive 2 items"
1563 );
1564 Ok(())
1565 }
1566
1567 #[test]
1568 fn should_chain_three_writers() -> Result<(), BatchError> {
1569 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1570 .link(RecordingWriter::new())
1571 .link(RecordingWriter::new())
1572 .build();
1573 composite.write(&[1, 2, 3, 4])?;
1574 // composite type: CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>
1575 // composite.first is CompositeItemWriter<W1, W2>
1576 // composite.second is W3
1577 assert_eq!(
1578 composite.first.first.items_written.get(),
1579 4,
1580 "writer 1 should receive 4 items"
1581 );
1582 assert_eq!(
1583 composite.first.second.items_written.get(),
1584 4,
1585 "writer 2 should receive 4 items"
1586 );
1587 assert_eq!(
1588 composite.second.items_written.get(),
1589 4,
1590 "writer 3 should receive 4 items"
1591 );
1592 Ok(())
1593 }
1594
1595 #[test]
1596 fn should_use_box_blanket_impl_as_item_writer() -> Result<(), BatchError> {
1597 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1598 .link(RecordingWriter::new())
1599 .build();
1600 let boxed: Box<dyn ItemWriter<i32>> = Box::new(composite);
1601 boxed.write(&[5, 6, 7])?;
1602 // The test verifies that Box<dyn ItemWriter<T>> can be used as an ItemWriter<T>.
1603 // We can't inspect the inner writers through Box<dyn>, so asserting Ok is sufficient.
1604 Ok(())
1605 }
1606
1607 #[test]
1608 fn should_use_box_concrete_writer_as_item_writer() -> Result<(), BatchError> {
1609 let boxed: Box<RecordingWriter> = Box::new(RecordingWriter::new());
1610 boxed.open()?;
1611 boxed.write(&[1, 2])?;
1612 boxed.flush()?;
1613 boxed.close()?;
1614 assert_eq!(
1615 boxed.items_written.get(),
1616 2,
1617 "boxed concrete writer should delegate write"
1618 );
1619 assert_eq!(
1620 boxed.open_calls.get(),
1621 1,
1622 "boxed concrete writer should delegate open"
1623 );
1624 assert_eq!(
1625 boxed.flush_calls.get(),
1626 1,
1627 "boxed concrete writer should delegate flush"
1628 );
1629 assert_eq!(
1630 boxed.close_calls.get(),
1631 1,
1632 "boxed concrete writer should delegate close"
1633 );
1634 Ok(())
1635 }
1636}