use crate::error::BatchError;
pub type ItemReaderResult<I> = Result<Option<I>, BatchError>;
pub type ItemProcessorResult<O> = Result<Option<O>, BatchError>;
pub type ItemWriterResult = Result<(), BatchError>;
pub trait ItemReader<I> {
fn read(&self) -> ItemReaderResult<I>;
}
pub trait ItemProcessor<I, O> {
fn process(&self, item: &I) -> ItemProcessorResult<O>;
}
pub trait ItemWriter<O> {
fn write(&self, items: &[O]) -> ItemWriterResult;
fn flush(&self) -> ItemWriterResult {
Ok(())
}
fn open(&self) -> ItemWriterResult {
Ok(())
}
fn close(&self) -> ItemWriterResult {
Ok(())
}
}
#[derive(Default)]
pub struct PassThroughProcessor<T> {
_phantom: std::marker::PhantomData<T>,
}
impl<T: Clone> ItemProcessor<T, T> for PassThroughProcessor<T> {
fn process(&self, item: &T) -> ItemProcessorResult<T> {
Ok(Some(item.clone()))
}
}
impl<T: Clone> PassThroughProcessor<T> {
pub fn new() -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
}
pub struct CompositeItemProcessor<P1, P2, M> {
first: P1,
second: P2,
_marker: std::marker::PhantomData<fn(M) -> M>,
}
impl<I, M, O, P1, P2> ItemProcessor<I, O> for CompositeItemProcessor<P1, P2, M>
where
P1: ItemProcessor<I, M>,
P2: ItemProcessor<M, O>,
{
fn process(&self, item: &I) -> ItemProcessorResult<O> {
match self.first.process(item)? {
Some(intermediate) => self.second.process(&intermediate),
None => Ok(None),
}
}
}
pub struct CompositeItemProcessorBuilder<P> {
processor: P,
}
impl<P> CompositeItemProcessorBuilder<P> {
pub fn new(first: P) -> Self {
Self { processor: first }
}
pub fn link<P2, M>(
self,
next: P2,
) -> CompositeItemProcessorBuilder<CompositeItemProcessor<P, P2, M>> {
CompositeItemProcessorBuilder {
processor: CompositeItemProcessor {
first: self.processor,
second: next,
_marker: std::marker::PhantomData,
},
}
}
pub fn build(self) -> P {
self.processor
}
}
pub struct CompositeItemWriter<W1, W2> {
first: W1,
second: W2,
}
impl<T, W1, W2> ItemWriter<T> for CompositeItemWriter<W1, W2>
where
W1: ItemWriter<T>,
W2: ItemWriter<T>,
{
fn write(&self, items: &[T]) -> ItemWriterResult {
self.first.write(items)?;
self.second.write(items)
}
fn flush(&self) -> ItemWriterResult {
let r1 = self.first.flush();
let r2 = self.second.flush();
r1.and(r2)
}
fn open(&self) -> ItemWriterResult {
self.first.open()?;
self.second.open()
}
fn close(&self) -> ItemWriterResult {
let r1 = self.first.close();
let r2 = self.second.close();
r1.and(r2)
}
}
pub struct CompositeItemWriterBuilder<W> {
writer: W,
}
impl<W> CompositeItemWriterBuilder<W> {
pub fn new(first: W) -> Self {
Self { writer: first }
}
#[allow(clippy::should_implement_trait)]
pub fn link<W2>(self, next: W2) -> CompositeItemWriterBuilder<CompositeItemWriter<W, W2>> {
CompositeItemWriterBuilder {
writer: CompositeItemWriter {
first: self.writer,
second: next,
},
}
}
pub fn build(self) -> W {
self.writer
}
}
impl<I, O, P: ItemProcessor<I, O> + ?Sized> ItemProcessor<I, O> for Box<P> {
fn process(&self, item: &I) -> ItemProcessorResult<O> {
(**self).process(item)
}
}
impl<T, W: ItemWriter<T> + ?Sized> ItemWriter<T> for Box<W> {
fn write(&self, items: &[T]) -> ItemWriterResult {
(**self).write(items)
}
fn flush(&self) -> ItemWriterResult {
(**self).flush()
}
fn open(&self) -> ItemWriterResult {
(**self).open()
}
fn close(&self) -> ItemWriterResult {
(**self).close()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_create_new_pass_through_processor() {
let _processor = PassThroughProcessor::<String>::new();
assert_eq!(std::mem::size_of::<PassThroughProcessor<String>>(), 0);
}
#[test]
fn should_create_pass_through_processor_with_default() {
let _processor = PassThroughProcessor::<i32>::default();
assert_eq!(std::mem::size_of::<PassThroughProcessor<i32>>(), 0);
}
#[test]
fn should_pass_through_string_unchanged() -> Result<(), BatchError> {
let processor = PassThroughProcessor::new();
let input = "Hello, World!".to_string();
let expected = input.clone();
let result = processor.process(&input)?;
assert_eq!(result, Some(expected));
Ok(())
}
#[test]
fn should_pass_through_integer_unchanged() -> Result<(), BatchError> {
let processor = PassThroughProcessor::new();
let input = 42i32;
let result = processor.process(&input)?;
assert_eq!(result, Some(input));
Ok(())
}
#[test]
fn should_pass_through_vector_unchanged() -> Result<(), BatchError> {
let processor = PassThroughProcessor::new();
let input = vec![1, 2, 3, 4, 5];
let expected = input.clone();
let result = processor.process(&input)?;
assert_eq!(result, Some(expected));
Ok(())
}
#[test]
fn should_pass_through_custom_struct_unchanged() -> Result<(), BatchError> {
#[derive(Clone, PartialEq, Debug)]
struct TestData {
id: u32,
name: String,
values: Vec<f64>,
}
let processor = PassThroughProcessor::new();
let input = TestData {
id: 123,
name: "Test Item".to_string(),
values: vec![1.1, 2.2, 3.3],
};
let expected = input.clone();
let result = processor.process(&input)?;
assert_eq!(result, Some(expected));
Ok(())
}
#[test]
fn should_pass_through_option_unchanged() -> Result<(), BatchError> {
let processor = PassThroughProcessor::new();
let input_some = Some("test".to_string());
let result_some = processor.process(&input_some)?;
assert_eq!(result_some, Some(input_some));
let input_none: Option<String> = None;
let result_none = processor.process(&input_none)?;
assert_eq!(result_none, Some(input_none));
Ok(())
}
#[test]
fn should_handle_empty_collections() -> Result<(), BatchError> {
let vec_processor = PassThroughProcessor::new();
let empty_vec: Vec<i32> = vec![];
let result_vec = vec_processor.process(&empty_vec)?;
assert_eq!(result_vec, Some(empty_vec));
let string_processor = PassThroughProcessor::new();
let empty_string = String::new();
let result_string = string_processor.process(&empty_string)?;
assert_eq!(result_string, Some(empty_string));
Ok(())
}
#[test]
fn should_clone_input_not_move() {
let processor = PassThroughProcessor::new();
let input = "original".to_string();
let input_copy = input.clone();
let _result = processor.process(&input).unwrap();
assert_eq!(input, input_copy);
assert_eq!(input, "original");
}
#[test]
fn should_work_with_multiple_processors() -> Result<(), BatchError> {
let processor1 = PassThroughProcessor::<String>::new();
let processor2 = PassThroughProcessor::<String>::new();
let input = "test data".to_string();
let result1 = processor1.process(&input)?;
let inner = result1.unwrap();
let result2 = processor2.process(&inner)?;
assert_eq!(result2, Some(input));
Ok(())
}
#[test]
fn should_handle_large_data_structures() -> Result<(), BatchError> {
let processor = PassThroughProcessor::new();
let large_input: Vec<i32> = (0..10000).collect();
let expected_len = large_input.len();
let result = processor.process(&large_input)?;
assert_eq!(result.unwrap().len(), expected_len);
Ok(())
}
#[test]
fn should_use_default_flush_open_close_implementations() {
struct MinimalWriter;
impl ItemWriter<String> for MinimalWriter {
fn write(&self, _: &[String]) -> ItemWriterResult {
Ok(())
}
}
let w = MinimalWriter;
assert!(w.flush().is_ok(), "default flush should return Ok");
assert!(w.open().is_ok(), "default open should return Ok");
assert!(w.close().is_ok(), "default close should return Ok");
}
struct DoubleProcessor;
impl ItemProcessor<i32, i32> for DoubleProcessor {
fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
Ok(Some(item * 2))
}
}
struct AddTenProcessor;
impl ItemProcessor<i32, i32> for AddTenProcessor {
fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
Ok(Some(item + 10))
}
}
struct ToStringProcessor;
impl ItemProcessor<i32, String> for ToStringProcessor {
fn process(&self, item: &i32) -> ItemProcessorResult<String> {
Ok(Some(item.to_string()))
}
}
struct FilterEvenProcessor;
impl ItemProcessor<i32, i32> for FilterEvenProcessor {
fn process(&self, item: &i32) -> ItemProcessorResult<i32> {
if item % 2 == 0 {
Ok(Some(*item))
} else {
Ok(None) }
}
}
struct FailingProcessor;
impl ItemProcessor<i32, i32> for FailingProcessor {
fn process(&self, _item: &i32) -> ItemProcessorResult<i32> {
Err(BatchError::ItemProcessor("forced failure".to_string()))
}
}
#[test]
fn should_chain_two_same_type_processors() -> Result<(), BatchError> {
let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
.link(AddTenProcessor)
.build();
assert_eq!(
composite.process(&5)?,
Some(20),
"5 * 2 + 10 should equal 20"
);
Ok(())
}
#[test]
fn should_chain_two_type_changing_processors() -> Result<(), BatchError> {
let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
.link(ToStringProcessor)
.build();
assert_eq!(composite.process(&21)?, Some("42".to_string()));
Ok(())
}
#[test]
fn should_chain_three_processors() -> Result<(), BatchError> {
let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
.link(AddTenProcessor)
.link(ToStringProcessor)
.build();
assert_eq!(composite.process(&5)?, Some("20".to_string()));
Ok(())
}
#[test]
fn should_stop_chain_when_first_processor_filters_item() -> Result<(), BatchError> {
let composite = CompositeItemProcessorBuilder::new(FilterEvenProcessor)
.link(ToStringProcessor)
.build();
assert_eq!(
composite.process(&3)?,
None,
"odd number should be filtered"
);
assert_eq!(
composite.process(&4)?,
Some("4".to_string()),
"even number should pass"
);
Ok(())
}
#[test]
fn should_propagate_error_from_first_processor() {
let composite = CompositeItemProcessorBuilder::new(FailingProcessor)
.link(ToStringProcessor)
.build();
let result = composite.process(&1);
assert!(
result.is_err(),
"error from first processor should propagate"
);
}
#[test]
fn should_propagate_error_from_second_processor() {
struct AlwaysFailI32;
impl ItemProcessor<i32, i32> for AlwaysFailI32 {
fn process(&self, _: &i32) -> ItemProcessorResult<i32> {
Err(BatchError::ItemProcessor("second failed".to_string()))
}
}
let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
.link(AlwaysFailI32)
.build();
let result = composite.process(&5);
assert!(
result.is_err(),
"error from second processor should propagate"
);
}
#[test]
fn should_use_box_blanket_impl_as_item_processor() -> Result<(), BatchError> {
let composite = CompositeItemProcessorBuilder::new(DoubleProcessor)
.link(ToStringProcessor)
.build();
let boxed: Box<dyn ItemProcessor<i32, String>> = Box::new(composite);
let result = boxed.process(&3)?;
assert_eq!(
result,
Some("6".to_string()),
"boxed trait object should delegate to inner processor"
);
Ok(())
}
#[test]
fn should_use_box_concrete_type_as_item_processor() -> Result<(), BatchError> {
let boxed: Box<DoubleProcessor> = Box::new(DoubleProcessor);
let result = boxed.process(&7)?;
assert_eq!(
result,
Some(14),
"boxed concrete processor should delegate to inner processor"
);
Ok(())
}
use std::cell::Cell;
struct RecordingWriter {
write_calls: Cell<usize>,
items_written: Cell<usize>,
open_calls: Cell<usize>,
close_calls: Cell<usize>,
flush_calls: Cell<usize>,
fail_write: bool,
fail_open: bool,
fail_flush: bool,
fail_close: bool,
}
impl RecordingWriter {
fn new() -> Self {
Self {
write_calls: Cell::new(0),
items_written: Cell::new(0),
open_calls: Cell::new(0),
close_calls: Cell::new(0),
flush_calls: Cell::new(0),
fail_write: false,
fail_open: false,
fail_flush: false,
fail_close: false,
}
}
fn failing_write() -> Self {
Self {
fail_write: true,
..Self::new()
}
}
fn failing_open() -> Self {
Self {
fail_open: true,
..Self::new()
}
}
}
impl ItemWriter<i32> for RecordingWriter {
fn write(&self, items: &[i32]) -> ItemWriterResult {
if self.fail_write {
return Err(BatchError::ItemWriter("forced write failure".to_string()));
}
self.write_calls.set(self.write_calls.get() + 1);
self.items_written
.set(self.items_written.get() + items.len());
Ok(())
}
fn open(&self) -> ItemWriterResult {
if self.fail_open {
return Err(BatchError::ItemWriter("forced open failure".to_string()));
}
self.open_calls.set(self.open_calls.get() + 1);
Ok(())
}
fn close(&self) -> ItemWriterResult {
if self.fail_close {
return Err(BatchError::ItemWriter("forced close failure".to_string()));
}
self.close_calls.set(self.close_calls.get() + 1);
Ok(())
}
fn flush(&self) -> ItemWriterResult {
if self.fail_flush {
return Err(BatchError::ItemWriter("forced flush failure".to_string()));
}
self.flush_calls.set(self.flush_calls.get() + 1);
Ok(())
}
}
#[test]
fn should_write_to_both_writers() -> Result<(), BatchError> {
let w1 = RecordingWriter::new();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
composite.write(&[1, 2, 3])?;
assert_eq!(
composite.first.write_calls.get(),
1,
"first writer should be called"
);
assert_eq!(
composite.first.items_written.get(),
3,
"first writer should receive 3 items"
);
assert_eq!(
composite.second.write_calls.get(),
1,
"second writer should be called"
);
assert_eq!(
composite.second.items_written.get(),
3,
"second writer should receive 3 items"
);
Ok(())
}
#[test]
fn should_open_both_writers_in_order() -> Result<(), BatchError> {
let w1 = RecordingWriter::new();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
composite.open()?;
assert_eq!(
composite.first.open_calls.get(),
1,
"first writer should be opened"
);
assert_eq!(
composite.second.open_calls.get(),
1,
"second writer should be opened"
);
Ok(())
}
#[test]
fn should_close_both_writers_in_order() -> Result<(), BatchError> {
let w1 = RecordingWriter::new();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
composite.close()?;
assert_eq!(
composite.first.close_calls.get(),
1,
"first writer should be closed"
);
assert_eq!(
composite.second.close_calls.get(),
1,
"second writer should be closed"
);
Ok(())
}
#[test]
fn should_flush_both_writers() -> Result<(), BatchError> {
let w1 = RecordingWriter::new();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
composite.flush()?;
assert_eq!(
composite.first.flush_calls.get(),
1,
"first writer should be flushed"
);
assert_eq!(
composite.second.flush_calls.get(),
1,
"second writer should be flushed"
);
Ok(())
}
#[test]
fn should_short_circuit_on_write_error() {
let w1 = RecordingWriter::failing_write();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
let result = composite.write(&[1, 2, 3]);
assert!(result.is_err(), "error should propagate");
assert_eq!(
composite.second.write_calls.get(),
0,
"second writer should not be called after first fails"
);
}
#[test]
fn should_short_circuit_on_open_error() {
let w1 = RecordingWriter::failing_open();
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
let result = composite.open();
assert!(result.is_err(), "error should propagate");
assert_eq!(
composite.second.open_calls.get(),
0,
"second writer should not be opened after first fails"
);
}
#[test]
fn should_flush_both_writers_even_when_first_fails() {
let w1 = RecordingWriter {
fail_flush: true,
..RecordingWriter::new()
};
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
let result = composite.flush();
assert!(result.is_err(), "error should propagate");
assert_eq!(
composite.second.flush_calls.get(),
1,
"second writer should still be flushed even when first fails"
);
}
#[test]
fn should_close_both_writers_even_when_first_fails() {
let w1 = RecordingWriter {
fail_close: true,
..RecordingWriter::new()
};
let w2 = RecordingWriter::new();
let composite = CompositeItemWriter {
first: w1,
second: w2,
};
let result = composite.close();
assert!(result.is_err(), "error should propagate");
assert_eq!(
composite.second.close_calls.get(),
1,
"second writer should still be closed even when first fails"
);
}
#[test]
fn should_chain_two_writers_via_builder() -> Result<(), BatchError> {
let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
.link(RecordingWriter::new())
.build();
composite.write(&[10, 20])?;
assert_eq!(
composite.first.items_written.get(),
2,
"first writer should receive 2 items"
);
assert_eq!(
composite.second.items_written.get(),
2,
"second writer should receive 2 items"
);
Ok(())
}
#[test]
fn should_chain_three_writers() -> Result<(), BatchError> {
let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
.link(RecordingWriter::new())
.link(RecordingWriter::new())
.build();
composite.write(&[1, 2, 3, 4])?;
assert_eq!(
composite.first.first.items_written.get(),
4,
"writer 1 should receive 4 items"
);
assert_eq!(
composite.first.second.items_written.get(),
4,
"writer 2 should receive 4 items"
);
assert_eq!(
composite.second.items_written.get(),
4,
"writer 3 should receive 4 items"
);
Ok(())
}
#[test]
fn should_use_box_blanket_impl_as_item_writer() -> Result<(), BatchError> {
let composite = CompositeItemWriterBuilder::new(RecordingWriter::new())
.link(RecordingWriter::new())
.build();
let boxed: Box<dyn ItemWriter<i32>> = Box::new(composite);
boxed.write(&[5, 6, 7])?;
Ok(())
}
#[test]
fn should_use_box_concrete_writer_as_item_writer() -> Result<(), BatchError> {
let boxed: Box<RecordingWriter> = Box::new(RecordingWriter::new());
boxed.open()?;
boxed.write(&[1, 2])?;
boxed.flush()?;
boxed.close()?;
assert_eq!(
boxed.items_written.get(),
2,
"boxed concrete writer should delegate write"
);
assert_eq!(
boxed.open_calls.get(),
1,
"boxed concrete writer should delegate open"
);
assert_eq!(
boxed.flush_calls.get(),
1,
"boxed concrete writer should delegate flush"
);
assert_eq!(
boxed.close_calls.get(),
1,
"boxed concrete writer should delegate close"
);
Ok(())
}
}