spring_batch_rs/core/item.rs
1use crate::error::BatchError;
2
3/// Represents the result of reading an item from the reader.
4///
5/// This type is a specialized `Result` that can be:
6/// - `Ok(Some(R))` when an item is successfully read
7/// - `Ok(None)` when there are no more items to read (end of data)
8/// - `Err(BatchError)` when an error occurs during reading
9pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
10
11/// Represents the result of processing an item by the processor.
12///
13/// This type is a specialized `Result` that can be:
14/// - `Ok(Some(O))` when an item is successfully processed and should be passed to the writer
15/// - `Ok(None)` when an item is intentionally filtered out (not an error)
16/// - `Err(BatchError)` when an error occurs during processing
17pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
18
19/// Represents the result of writing items by the writer.
20///
21/// This type is a specialized `Result` that can be:
22/// - `Ok(())` when items are successfully written
23/// - `Err(BatchError)` when an error occurs during writing
24pub type ItemWriterResult = Result<(), BatchError>;
25
26/// A trait for reading items.
27///
28/// This trait defines the contract for components that read items from a data source.
29/// It is one of the fundamental building blocks of the batch processing pipeline.
30///
31/// # Design Pattern
32///
33/// This follows the Strategy Pattern, allowing different reading strategies to be
34/// interchangeable while maintaining a consistent interface.
35///
36/// # Implementation Note
37///
38/// Implementors of this trait should:
39/// - Return `Ok(Some(item))` when an item is successfully read
40/// - Return `Ok(None)` when there are no more items to read (end of data)
41/// - Return `Err(BatchError)` when an error occurs during reading
42///
43/// # Example
44///
45/// ```compile_fail
46/// use spring_batch_rs::core::item::{ItemReader, ItemReaderResult};
47/// use spring_batch_rs::error::BatchError;
48///
49/// struct StringReader {
50/// items: Vec<String>,
51/// position: usize,
52/// }
53///
54/// impl ItemReader<String> for StringReader {
55/// fn read(&mut self) -> ItemReaderResult<String> {
56/// if self.position < self.items.len() {
57/// let item = self.items[self.position].clone();
58/// self.position += 1;
59/// Ok(Some(item))
60/// } else {
61/// Ok(None) // End of data
62/// }
63/// }
64/// }
65/// ```
66pub trait ItemReader<I> {
67 /// Reads an item from the reader.
68 ///
69 /// # Returns
70 /// - `Ok(Some(item))` when an item is successfully read
71 /// - `Ok(None)` when there are no more items to read (end of data)
72 /// - `Err(BatchError)` when an error occurs during reading
73 fn read(&self) -> ItemReaderResult<I>;
74}
75
76/// A trait for processing items.
77///
78/// This trait defines the contract for components that transform or process items
79/// in a batch processing pipeline. It takes an input item of type `I` and produces
80/// an output item of type `O`.
81///
82/// # Filtering
83///
84/// Returning `Ok(None)` filters the item silently: it is not passed to the writer
85/// and is counted in [`crate::core::step::StepExecution::filter_count`]. This is different from returning
86/// `Err(BatchError)` which counts as a processing error and may trigger fault tolerance.
87///
88/// # Design Pattern
89///
90/// This follows the Strategy Pattern, allowing different processing strategies to be
91/// interchangeable while maintaining a consistent interface.
92///
93/// # Type Parameters
94///
95/// - `I`: The input item type
96/// - `O`: The output item type
97///
98/// # Example
99///
100/// ```
101/// use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
102/// use spring_batch_rs::error::BatchError;
103///
104/// struct AdultFilter;
105///
106/// struct Person { name: String, age: u32 }
107///
108/// impl ItemProcessor<Person, Person> for AdultFilter {
109/// fn process(&self, item: Person) -> ItemProcessorResult<Person> {
110/// if item.age >= 18 {
111/// Ok(Some(item)) // keep adults
112/// } else {
113/// Ok(None) // filter out minors
114/// }
115/// }
116/// }
117/// ```
118pub trait ItemProcessor<I, O> {
119 /// Processes an item and returns the processed result.
120 ///
121 /// # Parameters
122 /// - `item`: The item to process (consumed by value)
123 ///
124 /// # Returns
125 /// - `Ok(Some(processed_item))` when the item is successfully processed
126 /// - `Ok(None)` when the item is intentionally filtered out
127 /// - `Err(BatchError)` when an error occurs during processing
128 fn process(&self, item: I) -> ItemProcessorResult<O>;
129}
130
131/// A trait for writing items.
132///
133/// This trait defines the contract for components that write items to a data destination.
134/// It is one of the fundamental building blocks of the batch processing pipeline.
135///
136/// # Design Pattern
137///
138/// This follows the Strategy Pattern, allowing different writing strategies to be
139/// interchangeable while maintaining a consistent interface.
140///
141/// # Lifecycle Methods
142///
143/// This trait includes additional lifecycle methods:
144/// - `flush()`: Flushes any buffered data
145/// - `open()`: Initializes resources before writing starts
146/// - `close()`: Releases resources after writing completes
147///
148/// # Example
149///
150/// ```
151/// use spring_batch_rs::core::item::{ItemWriter, ItemWriterResult};
152/// use spring_batch_rs::error::BatchError;
153///
154/// struct ConsoleWriter;
155///
156/// impl ItemWriter<String> for ConsoleWriter {
157/// fn write(&self, items: &[String]) -> ItemWriterResult {
158/// for item in items {
159/// println!("{}", item);
160/// }
161/// Ok(())
162/// }
163/// }
164/// ```
165pub trait ItemWriter<O> {
166 /// Writes the given items.
167 ///
168 /// # Parameters
169 /// - `items`: A slice of items to write
170 ///
171 /// # Returns
172 /// - `Ok(())` when items are successfully written
173 /// - `Err(BatchError)` when an error occurs during writing
174 fn write(&self, items: &[O]) -> ItemWriterResult;
175
176 /// Flushes any buffered data.
177 ///
178 /// This method is called after a chunk of items has been written, and
179 /// allows the writer to flush any internally buffered data to the destination.
180 ///
181 /// # Default Implementation
182 ///
183 /// The default implementation does nothing and returns `Ok(())`.
184 ///
185 /// # Returns
186 /// - `Ok(())` when the flush operation succeeds
187 /// - `Err(BatchError)` when an error occurs during flushing
188 fn flush(&self) -> ItemWriterResult {
189 Ok(())
190 }
191
192 /// Opens the writer.
193 ///
194 /// This method is called before any items are written, and allows the writer
195 /// to initialize any resources it needs.
196 ///
197 /// # Default Implementation
198 ///
199 /// The default implementation does nothing and returns `Ok(())`.
200 ///
201 /// # Returns
202 /// - `Ok(())` when the open operation succeeds
203 /// - `Err(BatchError)` when an error occurs during opening
204 fn open(&self) -> ItemWriterResult {
205 Ok(())
206 }
207
208 /// Closes the writer.
209 ///
210 /// This method is called after all items have been written, and allows the writer
211 /// to release any resources it acquired.
212 ///
213 /// # Default Implementation
214 ///
215 /// The default implementation does nothing and returns `Ok(())`.
216 ///
217 /// # Returns
218 /// - `Ok(())` when the close operation succeeds
219 /// - `Err(BatchError)` when an error occurs during closing
220 fn close(&self) -> ItemWriterResult {
221 Ok(())
222 }
223}
224
225/// A pass-through processor that returns items unchanged.
226///
227/// This processor implements the identity function for batch processing pipelines.
228/// It takes an input item and returns it unchanged, making it useful for scenarios
229/// where you need a processor in the pipeline but don't want to transform the data.
230///
231/// # Type Parameters
232///
233/// - `T`: The item type that will be passed through unchanged.
234///
235/// # Use Cases
236///
237/// - Testing batch processing pipelines without data transformation
238/// - Placeholder processor during development
239/// - Pipelines where processing logic is conditional and sometimes bypassed
240/// - Maintaining consistent pipeline structure when transformation is optional
241///
242/// # Performance
243///
244/// This processor takes ownership of the item and returns it directly,
245/// with no allocation or clone.
246///
247/// # Examples
248///
249/// ```
250/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
251///
252/// let processor = PassThroughProcessor::<String>::new();
253/// let result = processor.process("Hello, World!".to_string()).unwrap();
254/// assert_eq!(result, Some("Hello, World!".to_string()));
255/// ```
256///
257/// Using with different data types:
258///
259/// ```
260/// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
261///
262/// let int_processor = PassThroughProcessor::<i32>::new();
263/// let result = int_processor.process(42).unwrap();
264/// assert_eq!(result, Some(42));
265///
266/// #[derive(PartialEq, Debug)]
267/// struct Person {
268/// name: String,
269/// age: u32,
270/// }
271///
272/// let person_processor = PassThroughProcessor::<Person>::new();
273/// let result = person_processor.process(Person { name: "Alice".to_string(), age: 30 }).unwrap();
274/// assert_eq!(result, Some(Person { name: "Alice".to_string(), age: 30 }));
275/// ```
276#[derive(Default)]
277pub struct PassThroughProcessor<T> {
278 _phantom: std::marker::PhantomData<T>,
279}
280
281impl<T> ItemProcessor<T, T> for PassThroughProcessor<T> {
282 /// Processes an item by returning it unchanged.
283 ///
284 /// # Parameters
285 /// - `item`: The item to process (consumed by value and returned unchanged)
286 ///
287 /// # Returns
288 /// - `Ok(Some(item))` - Always succeeds and returns the input item
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use spring_batch_rs::core::item::{ItemProcessor, PassThroughProcessor};
294 ///
295 /// let processor = PassThroughProcessor::<Vec<i32>>::new();
296 /// let result = processor.process(vec![1, 2, 3]).unwrap();
297 /// assert_eq!(result, Some(vec![1, 2, 3]));
298 /// ```
299 fn process(&self, item: T) -> ItemProcessorResult<T> {
300 Ok(Some(item))
301 }
302}
303
304impl<T> PassThroughProcessor<T> {
305 /// Creates a new `PassThroughProcessor`.
306 ///
307 /// # Returns
308 /// A new instance of `PassThroughProcessor` that will pass through items of type `T`.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use spring_batch_rs::core::item::PassThroughProcessor;
314 ///
315 /// let processor = PassThroughProcessor::<String>::new();
316 /// ```
317 pub fn new() -> Self {
318 Self {
319 _phantom: std::marker::PhantomData,
320 }
321 }
322}
323
324/// A composite processor that chains two processors sequentially using static dispatch.
325///
326/// The output of the first processor becomes the input of the second.
327/// If the first processor filters an item (returns `Ok(None)`), the chain
328/// stops immediately and `Ok(None)` is returned — the second processor is
329/// never called.
330///
331/// Both processors are stored by value — no heap allocation occurs inside the
332/// struct itself. This mirrors the pattern used by standard library iterator
333/// adapters such as [`std::iter::Chain`].
334///
335/// Construct chains using [`CompositeItemProcessorBuilder`] rather than
336/// instantiating this struct directly.
337///
338/// # Type Parameters
339///
340/// - `P1`: The first processor type. Must implement `ItemProcessor<I, M>` for
341/// some input type `I` and intermediate type `M`.
342/// - `P2`: The second processor type. Must implement `ItemProcessor<M, O>` where
343/// `M` is the output type of `P1` and `O` is the final output type.
344/// - `M`: The intermediate type — output of `P1`, input of `P2`. Tracked via
345/// `PhantomData` so it participates in type inference without being stored.
346///
347/// # Examples
348///
349/// ```
350/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
351/// use spring_batch_rs::BatchError;
352///
353/// struct DoubleProcessor;
354/// impl ItemProcessor<i32, i32> for DoubleProcessor {
355/// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
356/// Ok(Some(item * 2))
357/// }
358/// }
359///
360/// struct ToStringProcessor;
361/// impl ItemProcessor<i32, String> for ToStringProcessor {
362/// fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
363/// Ok(Some(item.to_string()))
364/// }
365/// }
366///
367/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
368/// .link(ToStringProcessor)
369/// .build();
370///
371/// // 21 * 2 = 42, then converted to "42"
372/// assert_eq!(composite.process(21).unwrap(), Some("42".to_string()));
373/// ```
374///
375/// # Errors
376///
377/// Returns [`BatchError`] if any processor in the chain returns an error.
378pub struct CompositeItemProcessor<P1, P2, M> {
379 first: P1,
380 second: P2,
381 /// Tracks the intermediate type `M` (output of `P1`, input of `P2`).
382 /// Uses `fn(M) -> M` to keep the type parameter invariant and avoid
383 /// unintended variance.
384 _marker: std::marker::PhantomData<fn(M) -> M>,
385}
386
387impl<I, M, O, P1, P2> ItemProcessor<I, O> for CompositeItemProcessor<P1, P2, M>
388where
389 P1: ItemProcessor<I, M>,
390 P2: ItemProcessor<M, O>,
391{
392 /// Applies the first processor, then — if the result is `Some` — applies
393 /// the second. Returns `Ok(None)` immediately if the first processor
394 /// filters the item.
395 ///
396 /// # Errors
397 ///
398 /// Returns [`BatchError`] if either processor fails.
399 fn process(&self, item: I) -> ItemProcessorResult<O> {
400 match self.first.process(item)? {
401 Some(intermediate) => self.second.process(intermediate),
402 None => Ok(None),
403 }
404 }
405}
406
407/// Builder for creating a chain of [`ItemProcessor`]s using static dispatch.
408///
409/// Start the chain with [`new`](CompositeItemProcessorBuilder::new), append
410/// processors with [`link`](CompositeItemProcessorBuilder::link), and finalise
411/// with [`build`](CompositeItemProcessorBuilder::build). Each call to `link`
412/// wraps the accumulated chain in a [`CompositeItemProcessor`], changing the
413/// output type. Mismatched types are caught at compile time.
414///
415/// The built chain stores all processors by value — no heap allocations occur
416/// inside the processor itself. The type of the built value encodes the full
417/// chain structure (e.g. `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`),
418/// similar to how `Iterator` adapters compose in the standard library.
419///
420/// # Type Parameters
421///
422/// - `P`: The accumulated processor type. Starts as the first processor and
423/// is wrapped in [`CompositeItemProcessor`] with each [`link`](CompositeItemProcessorBuilder::link) call.
424///
425/// # Examples
426///
427/// Two processors (`i32 → i32 → String`):
428///
429/// ```
430/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
431/// use spring_batch_rs::BatchError;
432///
433/// struct DoubleProcessor;
434/// impl ItemProcessor<i32, i32> for DoubleProcessor {
435/// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
436/// Ok(Some(item * 2))
437/// }
438/// }
439///
440/// struct ToStringProcessor;
441/// impl ItemProcessor<i32, String> for ToStringProcessor {
442/// fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
443/// Ok(Some(item.to_string()))
444/// }
445/// }
446///
447/// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
448/// .link(ToStringProcessor)
449/// .build();
450///
451/// assert_eq!(composite.process(21).unwrap(), Some("42".to_string()));
452/// ```
453///
454/// Three processors (`i32 → i32 → i32 → String`):
455///
456/// ```
457/// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
458/// use spring_batch_rs::BatchError;
459///
460/// struct AddOneProcessor;
461/// impl ItemProcessor<i32, i32> for AddOneProcessor {
462/// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
463/// Ok(Some(item + 1))
464/// }
465/// }
466///
467/// struct DoubleProcessor;
468/// impl ItemProcessor<i32, i32> for DoubleProcessor {
469/// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
470/// Ok(Some(item * 2))
471/// }
472/// }
473///
474/// struct ToStringProcessor;
475/// impl ItemProcessor<i32, String> for ToStringProcessor {
476/// fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
477/// Ok(Some(item.to_string()))
478/// }
479/// }
480///
481/// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
482/// .link(DoubleProcessor)
483/// .link(ToStringProcessor)
484/// .build();
485///
486/// // (4 + 1) * 2 = 10 → "10"
487/// assert_eq!(composite.process(4).unwrap(), Some("10".to_string()));
488/// ```
489pub struct CompositeItemProcessorBuilder<P> {
490 processor: P,
491}
492
493impl<P> CompositeItemProcessorBuilder<P> {
494 /// Creates a new builder with the given processor as the first in the chain.
495 ///
496 /// # Parameters
497 ///
498 /// - `first`: The first processor in the chain.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
504 /// use spring_batch_rs::BatchError;
505 ///
506 /// struct UppercaseProcessor;
507 /// impl ItemProcessor<String, String> for UppercaseProcessor {
508 /// fn process(&self, item: String) -> Result<Option<String>, BatchError> {
509 /// Ok(Some(item.to_uppercase()))
510 /// }
511 /// }
512 ///
513 /// let builder = CompositeItemProcessorBuilder::new(UppercaseProcessor);
514 /// let composite = builder.build();
515 /// assert_eq!(composite.process("hello".to_string()).unwrap(), Some("HELLO".to_string()));
516 /// ```
517 pub fn new(first: P) -> Self {
518 Self { processor: first }
519 }
520
521 /// Appends a processor to the end of the chain.
522 ///
523 /// Returns a new builder whose accumulated type is
524 /// `CompositeItemProcessor<P, P2>`. The input/output types are verified
525 /// at compile time when the chain is used.
526 ///
527 /// # Type Parameters
528 ///
529 /// - `P2`: The processor type to append.
530 /// - `M`: The intermediate type connecting `P` and `P2`. Inferred by the
531 /// compiler from the `ItemProcessor` impls on `P` and `P2`.
532 ///
533 /// # Parameters
534 ///
535 /// - `next`: The processor to append to the chain.
536 ///
537 /// # Examples
538 ///
539 /// ```
540 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
541 /// use spring_batch_rs::BatchError;
542 ///
543 /// struct AddOneProcessor;
544 /// impl ItemProcessor<i32, i32> for AddOneProcessor {
545 /// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
546 /// Ok(Some(item + 1))
547 /// }
548 /// }
549 ///
550 /// struct ToStringProcessor;
551 /// impl ItemProcessor<i32, String> for ToStringProcessor {
552 /// fn process(&self, item: i32) -> Result<Option<String>, BatchError> {
553 /// Ok(Some(item.to_string()))
554 /// }
555 /// }
556 ///
557 /// let composite = CompositeItemProcessorBuilder::new(AddOneProcessor)
558 /// .link(ToStringProcessor)
559 /// .build();
560 ///
561 /// assert_eq!(composite.process(41).unwrap(), Some("42".to_string()));
562 /// ```
563 pub fn link<P2, M>(
564 self,
565 next: P2,
566 ) -> CompositeItemProcessorBuilder<CompositeItemProcessor<P, P2, M>> {
567 CompositeItemProcessorBuilder {
568 processor: CompositeItemProcessor {
569 first: self.processor,
570 second: next,
571 _marker: std::marker::PhantomData,
572 },
573 }
574 }
575
576 /// Builds and returns the composite processor.
577 ///
578 /// Returns the accumulated processor value `P`. When chained via `link`,
579 /// `P` will be a nested `CompositeItemProcessor` such as
580 /// `CompositeItemProcessor<P1, CompositeItemProcessor<P2, P3>>`.
581 ///
582 /// Pass `&composite` to the step builder's `.processor()` method — Rust
583 /// will coerce it to `&dyn ItemProcessor<I, O>` automatically.
584 ///
585 /// # Examples
586 ///
587 /// ```
588 /// use spring_batch_rs::core::item::{ItemProcessor, CompositeItemProcessorBuilder};
589 /// use spring_batch_rs::BatchError;
590 ///
591 /// struct DoubleProcessor;
592 /// impl ItemProcessor<i32, i32> for DoubleProcessor {
593 /// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
594 /// Ok(Some(item * 2))
595 /// }
596 /// }
597 ///
598 /// struct AddTenProcessor;
599 /// impl ItemProcessor<i32, i32> for AddTenProcessor {
600 /// fn process(&self, item: i32) -> Result<Option<i32>, BatchError> {
601 /// Ok(Some(item + 10))
602 /// }
603 /// }
604 ///
605 /// let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
606 /// .link(AddTenProcessor)
607 /// .build();
608 ///
609 /// // 5 * 2 = 10, then 10 + 10 = 20
610 /// assert_eq!(composite.process(5).unwrap(), Some(20));
611 /// ```
612 pub fn build(self) -> P {
613 self.processor
614 }
615}
616
617/// A composite writer that fans out the same chunk to two writers sequentially using static dispatch.
618///
619/// Both writers receive identical slices on every `write` call. All four lifecycle
620/// methods (`write`, `flush`, `open`, `close`) are forwarded to `first` then `second`,
621/// short-circuiting on the first `Err`. If `open()` on `first` fails, `second.open()`
622/// is never called — lifecycle management is the step's responsibility.
623///
624/// Both writers are stored by value — no heap allocation occurs inside the struct.
625/// The type encodes the full chain:
626/// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>` for three writers.
627///
628/// Prefer constructing instances via [`CompositeItemWriterBuilder`] rather than
629/// direct struct literal syntax.
630///
631/// # Type Parameters
632///
633/// - `W1`: The first writer type. Must implement `ItemWriter<T>`.
634/// - `W2`: The second writer type. Must implement `ItemWriter<T>`.
635///
636/// # Examples
637///
638/// ```
639/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
640/// use std::rc::Rc;
641/// use std::cell::Cell;
642///
643/// struct CountingWriter { count: Rc<Cell<usize>> }
644/// impl CountingWriter {
645/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
646/// }
647/// impl ItemWriter<i32> for CountingWriter {
648/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
649/// self.count.set(self.count.get() + items.len());
650/// Ok(())
651/// }
652/// }
653///
654/// let c1 = Rc::new(Cell::new(0usize));
655/// let c2 = Rc::new(Cell::new(0usize));
656/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
657/// .link(CountingWriter::new(c2.clone()))
658/// .build();
659/// composite.write(&[1, 2, 3]).unwrap();
660/// assert_eq!(c1.get(), 3);
661/// assert_eq!(c2.get(), 3);
662/// ```
663///
664/// # Errors
665///
666/// Returns [`BatchError`] if any writer in the chain returns an error.
667pub struct CompositeItemWriter<W1, W2> {
668 first: W1,
669 second: W2,
670}
671
672impl<T, W1, W2> ItemWriter<T> for CompositeItemWriter<W1, W2>
673where
674 W1: ItemWriter<T>,
675 W2: ItemWriter<T>,
676{
677 /// Writes `items` to `first`, then to `second`. Short-circuits on the first error.
678 ///
679 /// # Errors
680 ///
681 /// Returns [`BatchError::ItemWriter`] if either writer fails.
682 fn write(&self, items: &[T]) -> ItemWriterResult {
683 self.first.write(items)?;
684 self.second.write(items)
685 }
686
687 /// Flushes both writers regardless of errors. Returns the first error encountered.
688 ///
689 /// Both `first` and `second` are always flushed, even if `first` fails,
690 /// to avoid silently skipping buffered output in the second writer.
691 ///
692 /// # Errors
693 ///
694 /// Returns [`BatchError::ItemWriter`] if either flush fails. If both fail,
695 /// the error from `first` is returned.
696 fn flush(&self) -> ItemWriterResult {
697 let r1 = self.first.flush();
698 let r2 = self.second.flush();
699 r1.and(r2)
700 }
701
702 /// Opens `first`, then `second`. Short-circuits on the first error.
703 ///
704 /// # Errors
705 ///
706 /// Returns [`BatchError::ItemWriter`] if either open fails.
707 fn open(&self) -> ItemWriterResult {
708 self.first.open()?;
709 self.second.open()
710 }
711
712 /// Closes both writers regardless of errors. Returns the first error encountered.
713 ///
714 /// Both `first` and `second` are always closed, even if `first` fails,
715 /// to avoid resource leaks.
716 ///
717 /// # Errors
718 ///
719 /// Returns [`BatchError::ItemWriter`] if either close fails. If both fail,
720 /// the error from `first` is returned.
721 fn close(&self) -> ItemWriterResult {
722 let r1 = self.first.close();
723 let r2 = self.second.close();
724 r1.and(r2)
725 }
726}
727
728/// Builder for creating a fan-out chain of [`ItemWriter`]s using static dispatch.
729///
730/// Start the chain with [`new`](CompositeItemWriterBuilder::new), append writers
731/// with [`add`](CompositeItemWriterBuilder::add), and finalise with
732/// [`build`](CompositeItemWriterBuilder::build). Each call to `add` wraps the
733/// accumulated chain in a [`CompositeItemWriter`]. The built chain stores all
734/// writers by value — no heap allocations occur inside the chain itself.
735///
736/// # Type Parameters
737///
738/// - `W`: The accumulated writer type. Starts as the first writer and is wrapped
739/// in [`CompositeItemWriter`] with each [`add`](CompositeItemWriterBuilder::add) call.
740///
741/// # Examples
742///
743/// Two writers:
744///
745/// ```
746/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
747/// use std::rc::Rc;
748/// use std::cell::Cell;
749///
750/// struct CountingWriter { count: Rc<Cell<usize>> }
751/// impl CountingWriter {
752/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
753/// }
754/// impl ItemWriter<i32> for CountingWriter {
755/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
756/// self.count.set(self.count.get() + items.len());
757/// Ok(())
758/// }
759/// }
760///
761/// let c1 = Rc::new(Cell::new(0usize));
762/// let c2 = Rc::new(Cell::new(0usize));
763/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
764/// .link(CountingWriter::new(c2.clone()))
765/// .build();
766///
767/// composite.write(&[1, 2, 3]).unwrap();
768/// assert_eq!(c1.get(), 3);
769/// assert_eq!(c2.get(), 3);
770/// ```
771///
772/// Three writers:
773///
774/// ```
775/// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
776/// use std::rc::Rc;
777/// use std::cell::Cell;
778///
779/// struct CountingWriter { count: Rc<Cell<usize>> }
780/// impl CountingWriter {
781/// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
782/// }
783/// impl ItemWriter<i32> for CountingWriter {
784/// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
785/// self.count.set(self.count.get() + items.len());
786/// Ok(())
787/// }
788/// }
789///
790/// let c1 = Rc::new(Cell::new(0usize));
791/// let c2 = Rc::new(Cell::new(0usize));
792/// let c3 = Rc::new(Cell::new(0usize));
793/// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
794/// .link(CountingWriter::new(c2.clone()))
795/// .link(CountingWriter::new(c3.clone()))
796/// .build();
797///
798/// composite.write(&[1, 2]).unwrap();
799/// assert_eq!(c1.get(), 2);
800/// assert_eq!(c2.get(), 2);
801/// assert_eq!(c3.get(), 2);
802/// ```
803pub struct CompositeItemWriterBuilder<W> {
804 writer: W,
805}
806
807impl<W> CompositeItemWriterBuilder<W> {
808 /// Creates a new builder with the given writer as the first delegate.
809 ///
810 /// # Parameters
811 ///
812 /// - `first`: The first writer in the fan-out chain.
813 ///
814 /// # Examples
815 ///
816 /// ```
817 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
818 ///
819 /// struct CountingWriter { count: std::cell::Cell<usize> }
820 /// impl CountingWriter { fn new() -> Self { Self { count: std::cell::Cell::new(0) } } }
821 /// impl ItemWriter<i32> for CountingWriter {
822 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
823 /// self.count.set(self.count.get() + items.len());
824 /// Ok(())
825 /// }
826 /// }
827 ///
828 /// let writer = CompositeItemWriterBuilder::new(CountingWriter::new()).build();
829 /// writer.write(&[1, 2, 3]).unwrap();
830 /// assert_eq!(writer.count.get(), 3, "writer should receive all items");
831 /// ```
832 pub fn new(first: W) -> Self {
833 Self { writer: first }
834 }
835
836 /// Appends a writer to the fan-out chain.
837 ///
838 /// Returns a new builder whose accumulated type is `CompositeItemWriter<W, W2>`.
839 /// Both writers must implement `ItemWriter<T>` for the same `T` — verified at
840 /// compile time.
841 ///
842 /// # Parameters
843 ///
844 /// - `next`: The writer to link into the chain.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
850 /// use std::rc::Rc;
851 /// use std::cell::Cell;
852 ///
853 /// struct CountingWriter { count: Rc<Cell<usize>> }
854 /// impl CountingWriter {
855 /// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
856 /// }
857 /// impl ItemWriter<i32> for CountingWriter {
858 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
859 /// self.count.set(self.count.get() + items.len());
860 /// Ok(())
861 /// }
862 /// }
863 ///
864 /// let c1 = Rc::new(Cell::new(0usize));
865 /// let c2 = Rc::new(Cell::new(0usize));
866 /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
867 /// .link(CountingWriter::new(c2.clone()))
868 /// .build();
869 ///
870 /// composite.write(&[1, 2, 3]).unwrap();
871 /// assert_eq!(c1.get(), 3, "first writer should receive all items");
872 /// assert_eq!(c2.get(), 3, "second writer should receive all items");
873 /// ```
874 #[allow(clippy::should_implement_trait)]
875 pub fn link<W2>(self, next: W2) -> CompositeItemWriterBuilder<CompositeItemWriter<W, W2>> {
876 CompositeItemWriterBuilder {
877 writer: CompositeItemWriter {
878 first: self.writer,
879 second: next,
880 },
881 }
882 }
883
884 /// Builds and returns the composite writer.
885 ///
886 /// Returns the accumulated writer value `W`. When chained via `link`, `W` is a
887 /// nested `CompositeItemWriter` such as
888 /// `CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>`.
889 ///
890 /// Pass `&composite` to the step builder's `.writer()` method.
891 ///
892 /// # Examples
893 ///
894 /// ```
895 /// use spring_batch_rs::core::item::{ItemWriter, CompositeItemWriterBuilder};
896 /// use std::rc::Rc;
897 /// use std::cell::Cell;
898 ///
899 /// struct CountingWriter { count: Rc<Cell<usize>> }
900 /// impl CountingWriter {
901 /// fn new(count: Rc<Cell<usize>>) -> Self { Self { count } }
902 /// }
903 /// impl ItemWriter<i32> for CountingWriter {
904 /// fn write(&self, items: &[i32]) -> Result<(), spring_batch_rs::BatchError> {
905 /// self.count.set(self.count.get() + items.len());
906 /// Ok(())
907 /// }
908 /// }
909 ///
910 /// let c1 = Rc::new(Cell::new(0usize));
911 /// let c2 = Rc::new(Cell::new(0usize));
912 /// let composite = CompositeItemWriterBuilder::new(CountingWriter::new(c1.clone()))
913 /// .link(CountingWriter::new(c2.clone()))
914 /// .build();
915 ///
916 /// composite.write(&[1, 2, 3]).unwrap();
917 /// assert_eq!(c1.get(), 3);
918 /// assert_eq!(c2.get(), 3);
919 /// ```
920 pub fn build(self) -> W {
921 self.writer
922 }
923}
924
925/// Allows any `Box<P>` where `P: ItemProcessor<I, O>` to be used wherever
926/// `&dyn ItemProcessor<I, O>` is expected — including boxed concrete types
927/// (`Box<MyProcessor>`) and boxed trait objects (`Box<dyn ItemProcessor<I, O>>`).
928///
929/// The `?Sized` bound is what makes this cover trait objects: `dyn Trait` is
930/// unsized, so without `?Sized` the impl would not apply to them.
931impl<I, O, P: ItemProcessor<I, O> + ?Sized> ItemProcessor<I, O> for Box<P> {
932 fn process(&self, item: I) -> ItemProcessorResult<O> {
933 (**self).process(item)
934 }
935}
936
937/// Allows any `Box<W>` where `W: ItemWriter<T>` to be used wherever
938/// `&dyn ItemWriter<T>` is expected — including boxed concrete types
939/// (`Box<MyWriter>`) and boxed trait objects (`Box<dyn ItemWriter<T>>`).
940///
941/// The `?Sized` bound makes this cover trait objects: `dyn Trait` is
942/// unsized, so without `?Sized` the impl would not apply to them.
943impl<T, W: ItemWriter<T> + ?Sized> ItemWriter<T> for Box<W> {
944 fn write(&self, items: &[T]) -> ItemWriterResult {
945 (**self).write(items)
946 }
947 fn flush(&self) -> ItemWriterResult {
948 (**self).flush()
949 }
950 fn open(&self) -> ItemWriterResult {
951 (**self).open()
952 }
953 fn close(&self) -> ItemWriterResult {
954 (**self).close()
955 }
956}
957
958#[cfg(test)]
959mod tests {
960 use super::*;
961
962 #[test]
963 fn should_create_new_pass_through_processor() {
964 let _processor = PassThroughProcessor::<String>::new();
965 // Test that we can create the processor without panicking
966 // Verify it's a zero-sized type (only contains PhantomData)
967 assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
968 }
969
970 #[test]
971 fn should_create_pass_through_processor_with_default() {
972 let _processor = PassThroughProcessor::<i32>::default();
973 // Test that we can create the processor using Default trait
974 // Verify it's a zero-sized type (only contains PhantomData)
975 assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
976 }
977
978 #[test]
979 fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
980 let processor = PassThroughProcessor::new();
981 let result = processor.process("Hello, World!".to_string())?;
982 assert_eq!(result, Some("Hello, World!".to_string()));
983 Ok(())
984 }
985
986 #[test]
987 fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
988 let processor = PassThroughProcessor::new();
989 let result = processor.process(42i32)?;
990 assert_eq!(result, Some(42));
991 Ok(())
992 }
993
994 #[test]
995 fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
996 let processor = PassThroughProcessor::new();
997 let result = processor.process(vec![1, 2, 3, 4, 5])?;
998 assert_eq!(result, Some(vec![1, 2, 3, 4, 5]));
999 Ok(())
1000 }
1001
1002 #[test]
1003 fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
1004 #[derive(PartialEq, Debug)]
1005 struct TestData {
1006 id: u32,
1007 name: String,
1008 values: Vec<f64>,
1009 }
1010
1011 let processor = PassThroughProcessor::new();
1012 let result = processor.process(TestData {
1013 id: 123,
1014 name: "Test Item".to_string(),
1015 values: vec![1.1, 2.2, 3.3],
1016 })?;
1017 assert_eq!(
1018 result,
1019 Some(TestData {
1020 id: 123,
1021 name: "Test Item".to_string(),
1022 values: vec![1.1, 2.2, 3.3],
1023 })
1024 );
1025 Ok(())
1026 }
1027
1028 #[test]
1029 fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
1030 let processor = PassThroughProcessor::new();
1031 let result_some = processor.process(Some("test".to_string()))?;
1032 assert_eq!(result_some, Some(Some("test".to_string())));
1033 let result_none = processor.process(None::<String>)?;
1034 assert_eq!(result_none, Some(None::<String>));
1035 Ok(())
1036 }
1037
1038 #[test]
1039 fn should_handle_empty_collections() -> Result<(), BatchError> {
1040 let result_vec = PassThroughProcessor::new().process(Vec::<i32>::new())?;
1041 assert_eq!(result_vec, Some(vec![]));
1042 let result_string = PassThroughProcessor::new().process(String::new())?;
1043 assert_eq!(result_string, Some(String::new()));
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn should_take_ownership_of_input() {
1049 let processor = PassThroughProcessor::new();
1050 let input = "original".to_string();
1051 let result = processor.process(input).unwrap();
1052 assert_eq!(result, Some("original".to_string()));
1053 }
1054
1055 #[test]
1056 fn should_work_with_multiple_processors() -> Result<(), BatchError> {
1057 let processor1 = PassThroughProcessor::<String>::new();
1058 let processor2 = PassThroughProcessor::<String>::new();
1059 let inner = processor1.process("test data".to_string())?.unwrap();
1060 let result = processor2.process(inner)?;
1061 assert_eq!(result, Some("test data".to_string()));
1062 Ok(())
1063 }
1064
1065 #[test]
1066 fn should_handle_large_data_structures() -> Result<(), BatchError> {
1067 let processor = PassThroughProcessor::new();
1068 let large_input: Vec<i32> = (0..10000).collect();
1069 let expected_len = large_input.len();
1070 let result = processor.process(large_input)?;
1071 // PassThroughProcessor always returns Some — unwrap is safe
1072 assert_eq!(result.unwrap().len(), expected_len);
1073 Ok(())
1074 }
1075
1076 #[test]
1077 fn should_use_default_flush_open_close_implementations() {
1078 struct MinimalWriter;
1079 impl ItemWriter<String> for MinimalWriter {
1080 fn write(&self, _: &[String]) -> ItemWriterResult {
1081 Ok(())
1082 }
1083 // flush, open, close use the trait's default implementations
1084 }
1085 let w = MinimalWriter;
1086 assert!(w.flush().is_ok(), "default flush should return Ok");
1087 assert!(w.open().is_ok(), "default open should return Ok");
1088 assert!(w.close().is_ok(), "default close should return Ok");
1089 }
1090
1091 // --- CompositeItemProcessor / CompositeItemProcessorBuilder ---
1092
1093 struct DoubleProcessor;
1094 impl ItemProcessor<i32, i32> for DoubleProcessor {
1095 fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1096 Ok(Some(item * 2))
1097 }
1098 }
1099
1100 struct AddTenProcessor;
1101 impl ItemProcessor<i32, i32> for AddTenProcessor {
1102 fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1103 Ok(Some(item + 10))
1104 }
1105 }
1106
1107 struct ToStringProcessor;
1108 impl ItemProcessor<i32, String> for ToStringProcessor {
1109 fn process(&self, item: i32) -> ItemProcessorResult<String> {
1110 Ok(Some(item.to_string()))
1111 }
1112 }
1113
1114 struct FilterEvenProcessor;
1115 impl ItemProcessor<i32, i32> for FilterEvenProcessor {
1116 fn process(&self, item: i32) -> ItemProcessorResult<i32> {
1117 if item % 2 == 0 {
1118 Ok(Some(item))
1119 } else {
1120 Ok(None) // filter odd numbers
1121 }
1122 }
1123 }
1124
1125 struct FailingProcessor;
1126 impl ItemProcessor<i32, i32> for FailingProcessor {
1127 fn process(&self, _item: i32) -> ItemProcessorResult<i32> {
1128 Err(BatchError::ItemProcessor("forced failure".to_string()))
1129 }
1130 }
1131
1132 #[test]
1133 fn should_chain_two_same_type_processors() -> Result<(), BatchError> {
1134 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1135 .link(AddTenProcessor)
1136 .build();
1137
1138 // 5 * 2 = 10, then 10 + 10 = 20
1139 assert_eq!(
1140 composite.process(5)?,
1141 Some(20),
1142 "5 * 2 + 10 should equal 20"
1143 );
1144 Ok(())
1145 }
1146
1147 #[test]
1148 fn should_chain_two_type_changing_processors() -> Result<(), BatchError> {
1149 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1150 .link(ToStringProcessor)
1151 .build();
1152
1153 // 21 * 2 = 42, then "42"
1154 assert_eq!(composite.process(21)?, Some("42".to_string()));
1155 Ok(())
1156 }
1157
1158 #[test]
1159 fn should_chain_three_processors() -> Result<(), BatchError> {
1160 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1161 .link(AddTenProcessor)
1162 .link(ToStringProcessor)
1163 .build();
1164
1165 // 5 * 2 = 10, then 10 + 10 = 20, then "20"
1166 assert_eq!(composite.process(5)?, Some("20".to_string()));
1167 Ok(())
1168 }
1169
1170 #[test]
1171 fn should_stop_chain_when_first_processor_filters_item() -> Result<(), BatchError> {
1172 let composite = CompositeItemProcessorBuilder::new(FilterEvenProcessor)
1173 .link(ToStringProcessor)
1174 .build();
1175
1176 // 3 is odd → filtered by first processor → second processor never called
1177 assert_eq!(composite.process(3)?, None, "odd number should be filtered");
1178 // 4 is even → passes through → converted to string
1179 assert_eq!(
1180 composite.process(4)?,
1181 Some("4".to_string()),
1182 "even number should pass"
1183 );
1184 Ok(())
1185 }
1186
1187 #[test]
1188 fn should_propagate_error_from_first_processor() {
1189 let composite = CompositeItemProcessorBuilder::new(FailingProcessor)
1190 .link(ToStringProcessor)
1191 .build();
1192
1193 let result = composite.process(1);
1194 assert!(
1195 result.is_err(),
1196 "error from first processor should propagate"
1197 );
1198 }
1199
1200 #[test]
1201 fn should_propagate_error_from_second_processor() {
1202 struct AlwaysFailI32;
1203 impl ItemProcessor<i32, i32> for AlwaysFailI32 {
1204 fn process(&self, _: i32) -> ItemProcessorResult<i32> {
1205 Err(BatchError::ItemProcessor("second failed".to_string()))
1206 }
1207 }
1208
1209 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1210 .link(AlwaysFailI32)
1211 .build();
1212
1213 let result = composite.process(5);
1214 assert!(
1215 result.is_err(),
1216 "error from second processor should propagate"
1217 );
1218 }
1219
1220 #[test]
1221 fn should_use_box_blanket_impl_as_item_processor() -> Result<(), BatchError> {
1222 // build() returns the concrete type; Box::new() it to get a trait object.
1223 // Box<dyn ItemProcessor<I, O>> implements ItemProcessor<I, O> via the ?Sized blanket impl.
1224 let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
1225 .link(ToStringProcessor)
1226 .build();
1227 let boxed: Box<dyn ItemProcessor<i32, String>> = Box::new(composite);
1228
1229 let result = boxed.process(3)?;
1230 assert_eq!(
1231 result,
1232 Some("6".to_string()),
1233 "boxed trait object should delegate to inner processor"
1234 );
1235 Ok(())
1236 }
1237
1238 #[test]
1239 fn should_use_box_concrete_type_as_item_processor() -> Result<(), BatchError> {
1240 // Box<ConcreteProcessor> also implements ItemProcessor<I, O> via the ?Sized blanket impl
1241 let boxed: Box<DoubleProcessor> = Box::new(DoubleProcessor);
1242
1243 let result = boxed.process(7)?;
1244 assert_eq!(
1245 result,
1246 Some(14),
1247 "boxed concrete processor should delegate to inner processor"
1248 );
1249 Ok(())
1250 }
1251
1252 // --- CompositeItemWriter ---
1253
1254 use std::cell::Cell;
1255
1256 struct RecordingWriter {
1257 write_calls: Cell<usize>,
1258 items_written: Cell<usize>,
1259 open_calls: Cell<usize>,
1260 close_calls: Cell<usize>,
1261 flush_calls: Cell<usize>,
1262 fail_write: bool,
1263 fail_open: bool,
1264 fail_flush: bool,
1265 fail_close: bool,
1266 }
1267
1268 impl RecordingWriter {
1269 fn new() -> Self {
1270 Self {
1271 write_calls: Cell::new(0),
1272 items_written: Cell::new(0),
1273 open_calls: Cell::new(0),
1274 close_calls: Cell::new(0),
1275 flush_calls: Cell::new(0),
1276 fail_write: false,
1277 fail_open: false,
1278 fail_flush: false,
1279 fail_close: false,
1280 }
1281 }
1282 fn failing_write() -> Self {
1283 Self {
1284 fail_write: true,
1285 ..Self::new()
1286 }
1287 }
1288 fn failing_open() -> Self {
1289 Self {
1290 fail_open: true,
1291 ..Self::new()
1292 }
1293 }
1294 }
1295
1296 impl ItemWriter<i32> for RecordingWriter {
1297 fn write(&self, items: &[i32]) -> ItemWriterResult {
1298 if self.fail_write {
1299 return Err(BatchError::ItemWriter("forced write failure".to_string()));
1300 }
1301 self.write_calls.set(self.write_calls.get() + 1);
1302 self.items_written
1303 .set(self.items_written.get() + items.len());
1304 Ok(())
1305 }
1306 fn open(&self) -> ItemWriterResult {
1307 if self.fail_open {
1308 return Err(BatchError::ItemWriter("forced open failure".to_string()));
1309 }
1310 self.open_calls.set(self.open_calls.get() + 1);
1311 Ok(())
1312 }
1313 fn close(&self) -> ItemWriterResult {
1314 if self.fail_close {
1315 return Err(BatchError::ItemWriter("forced close failure".to_string()));
1316 }
1317 self.close_calls.set(self.close_calls.get() + 1);
1318 Ok(())
1319 }
1320 fn flush(&self) -> ItemWriterResult {
1321 if self.fail_flush {
1322 return Err(BatchError::ItemWriter("forced flush failure".to_string()));
1323 }
1324 self.flush_calls.set(self.flush_calls.get() + 1);
1325 Ok(())
1326 }
1327 }
1328
1329 #[test]
1330 fn should_write_to_both_writers() -> Result<(), BatchError> {
1331 let w1 = RecordingWriter::new();
1332 let w2 = RecordingWriter::new();
1333 let composite = CompositeItemWriter {
1334 first: w1,
1335 second: w2,
1336 };
1337 composite.write(&[1, 2, 3])?;
1338 assert_eq!(
1339 composite.first.write_calls.get(),
1340 1,
1341 "first writer should be called"
1342 );
1343 assert_eq!(
1344 composite.first.items_written.get(),
1345 3,
1346 "first writer should receive 3 items"
1347 );
1348 assert_eq!(
1349 composite.second.write_calls.get(),
1350 1,
1351 "second writer should be called"
1352 );
1353 assert_eq!(
1354 composite.second.items_written.get(),
1355 3,
1356 "second writer should receive 3 items"
1357 );
1358 Ok(())
1359 }
1360
1361 #[test]
1362 fn should_open_both_writers_in_order() -> Result<(), BatchError> {
1363 let w1 = RecordingWriter::new();
1364 let w2 = RecordingWriter::new();
1365 let composite = CompositeItemWriter {
1366 first: w1,
1367 second: w2,
1368 };
1369 composite.open()?;
1370 assert_eq!(
1371 composite.first.open_calls.get(),
1372 1,
1373 "first writer should be opened"
1374 );
1375 assert_eq!(
1376 composite.second.open_calls.get(),
1377 1,
1378 "second writer should be opened"
1379 );
1380 Ok(())
1381 }
1382
1383 #[test]
1384 fn should_close_both_writers_in_order() -> Result<(), BatchError> {
1385 let w1 = RecordingWriter::new();
1386 let w2 = RecordingWriter::new();
1387 let composite = CompositeItemWriter {
1388 first: w1,
1389 second: w2,
1390 };
1391 composite.close()?;
1392 assert_eq!(
1393 composite.first.close_calls.get(),
1394 1,
1395 "first writer should be closed"
1396 );
1397 assert_eq!(
1398 composite.second.close_calls.get(),
1399 1,
1400 "second writer should be closed"
1401 );
1402 Ok(())
1403 }
1404
1405 #[test]
1406 fn should_flush_both_writers() -> Result<(), BatchError> {
1407 let w1 = RecordingWriter::new();
1408 let w2 = RecordingWriter::new();
1409 let composite = CompositeItemWriter {
1410 first: w1,
1411 second: w2,
1412 };
1413 composite.flush()?;
1414 assert_eq!(
1415 composite.first.flush_calls.get(),
1416 1,
1417 "first writer should be flushed"
1418 );
1419 assert_eq!(
1420 composite.second.flush_calls.get(),
1421 1,
1422 "second writer should be flushed"
1423 );
1424 Ok(())
1425 }
1426
1427 #[test]
1428 fn should_short_circuit_on_write_error() {
1429 let w1 = RecordingWriter::failing_write();
1430 let w2 = RecordingWriter::new();
1431 let composite = CompositeItemWriter {
1432 first: w1,
1433 second: w2,
1434 };
1435 let result = composite.write(&[1, 2, 3]);
1436 assert!(result.is_err(), "error should propagate");
1437 assert_eq!(
1438 composite.second.write_calls.get(),
1439 0,
1440 "second writer should not be called after first fails"
1441 );
1442 }
1443
1444 #[test]
1445 fn should_short_circuit_on_open_error() {
1446 let w1 = RecordingWriter::failing_open();
1447 let w2 = RecordingWriter::new();
1448 let composite = CompositeItemWriter {
1449 first: w1,
1450 second: w2,
1451 };
1452 let result = composite.open();
1453 assert!(result.is_err(), "error should propagate");
1454 assert_eq!(
1455 composite.second.open_calls.get(),
1456 0,
1457 "second writer should not be opened after first fails"
1458 );
1459 }
1460
1461 #[test]
1462 fn should_flush_both_writers_even_when_first_fails() {
1463 let w1 = RecordingWriter {
1464 fail_flush: true,
1465 ..RecordingWriter::new()
1466 };
1467 let w2 = RecordingWriter::new();
1468 let composite = CompositeItemWriter {
1469 first: w1,
1470 second: w2,
1471 };
1472 let result = composite.flush();
1473 assert!(result.is_err(), "error should propagate");
1474 assert_eq!(
1475 composite.second.flush_calls.get(),
1476 1,
1477 "second writer should still be flushed even when first fails"
1478 );
1479 }
1480
1481 #[test]
1482 fn should_close_both_writers_even_when_first_fails() {
1483 let w1 = RecordingWriter {
1484 fail_close: true,
1485 ..RecordingWriter::new()
1486 };
1487 let w2 = RecordingWriter::new();
1488 let composite = CompositeItemWriter {
1489 first: w1,
1490 second: w2,
1491 };
1492 let result = composite.close();
1493 assert!(result.is_err(), "error should propagate");
1494 assert_eq!(
1495 composite.second.close_calls.get(),
1496 1,
1497 "second writer should still be closed even when first fails"
1498 );
1499 }
1500
1501 #[test]
1502 fn should_chain_two_writers_via_builder() -> Result<(), BatchError> {
1503 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1504 .link(RecordingWriter::new())
1505 .build();
1506 composite.write(&[10, 20])?;
1507 assert_eq!(
1508 composite.first.items_written.get(),
1509 2,
1510 "first writer should receive 2 items"
1511 );
1512 assert_eq!(
1513 composite.second.items_written.get(),
1514 2,
1515 "second writer should receive 2 items"
1516 );
1517 Ok(())
1518 }
1519
1520 #[test]
1521 fn should_chain_three_writers() -> Result<(), BatchError> {
1522 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1523 .link(RecordingWriter::new())
1524 .link(RecordingWriter::new())
1525 .build();
1526 composite.write(&[1, 2, 3, 4])?;
1527 // composite type: CompositeItemWriter<CompositeItemWriter<W1, W2>, W3>
1528 // composite.first is CompositeItemWriter<W1, W2>
1529 // composite.second is W3
1530 assert_eq!(
1531 composite.first.first.items_written.get(),
1532 4,
1533 "writer 1 should receive 4 items"
1534 );
1535 assert_eq!(
1536 composite.first.second.items_written.get(),
1537 4,
1538 "writer 2 should receive 4 items"
1539 );
1540 assert_eq!(
1541 composite.second.items_written.get(),
1542 4,
1543 "writer 3 should receive 4 items"
1544 );
1545 Ok(())
1546 }
1547
1548 #[test]
1549 fn should_use_box_blanket_impl_as_item_writer() -> Result<(), BatchError> {
1550 let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
1551 .link(RecordingWriter::new())
1552 .build();
1553 let boxed: Box<dyn ItemWriter<i32>> = Box::new(composite);
1554 boxed.write(&[5, 6, 7])?;
1555 // The test verifies that Box<dyn ItemWriter<T>> can be used as an ItemWriter<T>.
1556 // We can't inspect the inner writers through Box<dyn>, so asserting Ok is sufficient.
1557 Ok(())
1558 }
1559
1560 #[test]
1561 fn should_use_box_concrete_writer_as_item_writer() -> Result<(), BatchError> {
1562 let boxed: Box<RecordingWriter> = Box::new(RecordingWriter::new());
1563 boxed.open()?;
1564 boxed.write(&[1, 2])?;
1565 boxed.flush()?;
1566 boxed.close()?;
1567 assert_eq!(
1568 boxed.items_written.get(),
1569 2,
1570 "boxed concrete writer should delegate write"
1571 );
1572 assert_eq!(
1573 boxed.open_calls.get(),
1574 1,
1575 "boxed concrete writer should delegate open"
1576 );
1577 assert_eq!(
1578 boxed.flush_calls.get(),
1579 1,
1580 "boxed concrete writer should delegate flush"
1581 );
1582 assert_eq!(
1583 boxed.close_calls.get(),
1584 1,
1585 "boxed concrete writer should delegate close"
1586 );
1587 Ok(())
1588 }
1589}