use crate::BatchError;
use log::{debug, error, info, warn};
use std::time::{Duration, Instant};
use uuid::Uuid;
use super::item::{ItemProcessor, ItemReader, ItemWriter};
pub trait Tasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
}
pub struct TaskletStep<'a> {
name: String,
tasklet: &'a dyn Tasklet,
}
impl Step for TaskletStep<'_> {
fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
step_execution.status = StepStatus::Started;
let start_time = Instant::now();
info!(
"Start of step: {}, id: {}",
step_execution.name, step_execution.id
);
loop {
let result = self.tasklet.execute(step_execution);
match result {
Ok(RepeatStatus::Continuable) => {}
Ok(RepeatStatus::Finished) => {
step_execution.status = StepStatus::Success;
break;
}
Err(e) => {
error!(
"Error in step: {}, id: {}, error: {}",
step_execution.name, step_execution.id, e
);
step_execution.status = StepStatus::Failed;
step_execution.end_time = Some(Instant::now());
step_execution.duration = Some(start_time.elapsed());
return Err(e);
}
}
}
step_execution.start_time = Some(start_time);
step_execution.end_time = Some(Instant::now());
step_execution.duration = Some(start_time.elapsed());
Ok(())
}
fn get_name(&self) -> &str {
&self.name
}
}
pub struct TaskletBuilder<'a> {
name: String,
tasklet: Option<&'a dyn Tasklet>,
}
impl<'a> TaskletBuilder<'a> {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
tasklet: None,
}
}
pub fn tasklet(mut self, tasklet: &'a dyn Tasklet) -> Self {
self.tasklet = Some(tasklet);
self
}
pub fn build(self) -> TaskletStep<'a> {
TaskletStep {
name: self.name,
tasklet: self
.tasklet
.expect("Tasklet is required for building a step"),
}
}
}
#[derive(Clone)]
pub struct StepExecution {
pub id: Uuid,
pub name: String,
pub status: StepStatus,
pub start_time: Option<Instant>,
pub end_time: Option<Instant>,
pub duration: Option<Duration>,
pub read_count: usize,
pub write_count: usize,
pub read_error_count: usize,
pub process_count: usize,
pub process_error_count: usize,
pub filter_count: usize,
pub write_error_count: usize,
}
impl StepExecution {
pub fn new(name: &str) -> Self {
Self {
id: Uuid::new_v4(),
name: name.to_string(),
status: StepStatus::Starting,
start_time: None,
end_time: None,
duration: None,
read_count: 0,
write_count: 0,
read_error_count: 0,
process_count: 0,
process_error_count: 0,
filter_count: 0,
write_error_count: 0,
}
}
}
pub enum BatchStatus {
COMPLETED,
STARTING,
STARTED,
STOPPING,
STOPPED,
FAILED,
ABANDONED,
UNKNOWN,
}
pub trait Step {
fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError>;
fn get_name(&self) -> &str;
}
#[derive(Debug, PartialEq)]
pub enum RepeatStatus {
Continuable,
Finished,
}
pub struct ChunkOrientedStep<'a, I, O> {
name: String,
reader: &'a dyn ItemReader<I>,
processor: &'a dyn ItemProcessor<I, O>,
writer: &'a dyn ItemWriter<O>,
chunk_size: u16,
skip_limit: u16,
}
impl<I, O> Step for ChunkOrientedStep<'_, I, O> {
fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
let start_time = Instant::now();
info!(
"Start of step: {}, id: {}",
step_execution.name, step_execution.id
);
Self::manage_error(self.writer.open());
loop {
let (read_items, chunk_status) = match self.read_chunk(step_execution) {
Ok(chunk_data) => chunk_data,
Err(_) => {
step_execution.status = StepStatus::ReadError;
break;
}
};
if read_items.is_empty() {
step_execution.status = StepStatus::Success;
break;
}
if self
.process_and_write_chunk(step_execution, &read_items)
.is_err()
{
break; }
if chunk_status == ChunkStatus::Finished {
step_execution.status = StepStatus::Success;
break;
}
}
Self::manage_error(self.writer.close());
info!(
"End of step: {}, id: {}",
step_execution.name, step_execution.id
);
step_execution.start_time = Some(start_time);
step_execution.end_time = Some(Instant::now());
step_execution.duration = Some(start_time.elapsed());
if StepStatus::Success == step_execution.status {
Ok(())
} else {
Err(BatchError::Step(step_execution.name.clone()))
}
}
fn get_name(&self) -> &str {
&self.name
}
}
impl<I, O> ChunkOrientedStep<'_, I, O> {
fn process_and_write_chunk(
&self,
step_execution: &mut StepExecution,
read_items: &[I],
) -> Result<(), BatchError> {
let processed_items = match self.process_chunk(step_execution, read_items) {
Ok(items) => items,
Err(error) => {
step_execution.status = StepStatus::ProcessorError;
return Err(error);
}
};
match self.write_chunk(step_execution, &processed_items) {
Ok(()) => Ok(()),
Err(error) => {
step_execution.status = StepStatus::WriteError;
Err(error)
}
}
}
fn read_chunk(
&self,
step_execution: &mut StepExecution,
) -> Result<(Vec<I>, ChunkStatus), BatchError> {
debug!("Start reading chunk");
let mut read_items = Vec::with_capacity(self.chunk_size as usize);
loop {
let read_result = self.reader.read();
match read_result {
Ok(item) => {
match item {
Some(item) => {
read_items.push(item);
step_execution.read_count += 1;
if read_items.len() >= self.chunk_size as usize {
return Ok((read_items, ChunkStatus::Full));
}
}
None => {
if read_items.is_empty() {
return Ok((read_items, ChunkStatus::Finished));
} else {
return Ok((read_items, ChunkStatus::Full));
}
}
};
}
Err(error) => {
warn!("Error reading item: {}", error);
step_execution.read_error_count += 1;
if self.is_skip_limit_reached(step_execution) {
step_execution.status = StepStatus::ReadError;
return Err(error);
}
}
}
}
}
fn process_chunk(
&self,
step_execution: &mut StepExecution,
read_items: &[I],
) -> Result<Vec<O>, BatchError> {
debug!("Processing chunk of {} items", read_items.len());
let mut result = Vec::with_capacity(read_items.len());
for item in read_items {
match self.processor.process(item) {
Ok(Some(processed_item)) => {
result.push(processed_item);
step_execution.process_count += 1;
}
Ok(None) => {
step_execution.filter_count += 1;
debug!("Item filtered by processor");
}
Err(error) => {
warn!("Error processing item: {}", error);
step_execution.process_error_count += 1;
if self.is_skip_limit_reached(step_execution) {
step_execution.status = StepStatus::ProcessorError;
return Err(error);
}
}
}
}
Ok(result)
}
fn write_chunk(
&self,
step_execution: &mut StepExecution,
processed_items: &[O],
) -> Result<(), BatchError> {
debug!("Writing chunk of {} items", processed_items.len());
if processed_items.is_empty() {
debug!("No items to write, skipping write call");
return Ok(());
}
match self.writer.write(processed_items) {
Ok(()) => {
step_execution.write_count += processed_items.len();
Self::manage_error(self.writer.flush());
Ok(())
}
Err(error) => {
warn!("Error writing items: {}", error);
step_execution.write_error_count += processed_items.len();
if self.is_skip_limit_reached(step_execution) {
step_execution.status = StepStatus::WriteError;
return Err(error);
}
Ok(())
}
}
}
fn is_skip_limit_reached(&self, step_execution: &StepExecution) -> bool {
step_execution.read_error_count
+ step_execution.write_error_count
+ step_execution.process_error_count
> self.skip_limit.into()
}
fn manage_error(result: Result<(), BatchError>) {
if let Err(error) = result {
warn!("Non-fatal error: {}", error);
}
}
}
pub struct ChunkOrientedStepBuilder<'a, I, O> {
name: String,
reader: Option<&'a dyn ItemReader<I>>,
processor: Option<&'a dyn ItemProcessor<I, O>>,
writer: Option<&'a dyn ItemWriter<O>>,
chunk_size: u16,
skip_limit: u16,
}
impl<'a, I, O> ChunkOrientedStepBuilder<'a, I, O> {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
reader: None,
processor: None,
writer: None,
chunk_size: 10,
skip_limit: 0,
}
}
pub fn reader(mut self, reader: &'a dyn ItemReader<I>) -> Self {
self.reader = Some(reader);
self
}
pub fn processor(mut self, processor: &'a dyn ItemProcessor<I, O>) -> Self {
self.processor = Some(processor);
self
}
pub fn writer(mut self, writer: &'a dyn ItemWriter<O>) -> Self {
self.writer = Some(writer);
self
}
pub fn chunk_size(mut self, chunk_size: u16) -> Self {
self.chunk_size = chunk_size;
self
}
pub fn skip_limit(mut self, skip_limit: u16) -> Self {
self.skip_limit = skip_limit;
self
}
pub fn build(self) -> ChunkOrientedStep<'a, I, O> {
ChunkOrientedStep {
name: self.name,
reader: self.reader.expect("Reader is required for building a step"),
processor: self
.processor
.expect("Processor is required for building a step"),
writer: self.writer.expect("Writer is required for building a step"),
chunk_size: self.chunk_size,
skip_limit: self.skip_limit,
}
}
}
pub struct StepBuilder {
name: String,
}
impl StepBuilder {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
pub fn tasklet(self, tasklet: &dyn Tasklet) -> TaskletBuilder<'_> {
TaskletBuilder::new(&self.name).tasklet(tasklet)
}
pub fn chunk<'a, I, O>(self, chunk_size: u16) -> ChunkOrientedStepBuilder<'a, I, O> {
ChunkOrientedStepBuilder::new(&self.name).chunk_size(chunk_size)
}
}
#[derive(Debug, PartialEq)]
pub enum ChunkStatus {
Finished,
Full,
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum StepStatus {
Success,
ReadError,
ProcessorError,
WriteError,
Starting,
Failed,
Started,
}
#[cfg(test)]
mod tests {
use anyhow::Result;
use mockall::mock;
use serde::{Deserialize, Serialize};
use crate::{
BatchError,
core::{
item::{
ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
ItemWriterResult,
},
step::{StepExecution, StepStatus},
},
};
use super::{
BatchStatus, ChunkOrientedStepBuilder, ChunkStatus, RepeatStatus, Step, StepBuilder,
Tasklet, TaskletBuilder,
};
mock! {
pub TestItemReader {}
impl ItemReader<Car> for TestItemReader {
fn read(&self) -> ItemReaderResult<Car>;
}
}
mock! {
pub TestProcessor {}
impl ItemProcessor<Car, Car> for TestProcessor {
fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
}
}
mock! {
pub TestItemWriter {}
impl ItemWriter<Car> for TestItemWriter {
fn write(&self, items: &[Car]) -> ItemWriterResult;
fn flush(&self) -> ItemWriterResult;
fn open(&self) -> ItemWriterResult;
fn close(&self) -> ItemWriterResult;
}
}
mock! {
pub TestTasklet {}
impl Tasklet for TestTasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
}
}
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Car {
year: u16,
make: String,
model: String,
description: String,
}
fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
if end_count > 0 && *i == end_count {
return Ok(None);
} else if error_count > 0 && *i == error_count {
return Err(BatchError::ItemReader("mock read error".to_string()));
}
let car = Car {
year: 1979,
make: "make".to_owned(),
model: "model".to_owned(),
description: "description".to_owned(),
};
*i += 1;
Ok(Some(car))
}
fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
*i += 1;
if error_at.contains(i) {
return Err(BatchError::ItemProcessor("mock process error".to_string()));
}
let car = Car {
year: 1979,
make: "make".to_owned(),
model: "model".to_owned(),
description: "description".to_owned(),
};
Ok(Some(car))
}
#[test]
fn step_should_succeded_with_empty_data() -> Result<()> {
let mut reader = MockTestItemReader::default();
let reader_result = Ok(None);
reader.expect_read().return_once(move || reader_result);
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "test");
assert!(!step.get_name().is_empty());
assert!(!step_execution.id.is_nil());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_failed_with_processor_error() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::ProcessorError);
Ok(())
}
#[test]
fn step_should_failed_with_write_error() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
let result = Err(BatchError::ItemWriter("mock write error".to_string()));
writer.expect_write().return_once(move |_| result);
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::WriteError);
Ok(())
}
#[test]
fn step_should_succeed_even_with_processor_error() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(1)
.build();
let mut step_execution = StepExecution::new(step.get_name());
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_fail_with_read_error() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 1, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::ReadError);
assert_eq!(step_execution.read_error_count, 1);
Ok(())
}
#[test]
fn step_should_respect_chunk_size() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 6));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 6);
assert_eq!(step_execution.write_count, 6);
Ok(())
}
#[test]
fn step_should_track_error_counts() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 2);
Ok(())
}
#[test]
fn step_should_measure_execution_time() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 2));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|_| Ok(()));
writer.expect_flush().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert!(step_execution.duration.unwrap().as_nanos() > 0);
assert!(step_execution.start_time.unwrap() <= step_execution.end_time.unwrap());
Ok(())
}
#[test]
fn step_should_handle_empty_chunk_at_end() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 1));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|items| {
assert_eq!(items.len(), 1); Ok(())
});
writer.expect_flush().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 1);
assert_eq!(step_execution.write_count, 1);
Ok(())
}
#[test]
fn step_execution_should_initialize_correctly() -> Result<()> {
let step_execution = StepExecution::new("test_step");
assert_eq!(step_execution.name, "test_step");
assert_eq!(step_execution.status, StepStatus::Starting);
assert!(step_execution.start_time.is_none());
assert!(step_execution.end_time.is_none());
assert!(step_execution.duration.is_none());
assert_eq!(step_execution.read_count, 0);
assert_eq!(step_execution.write_count, 0);
assert_eq!(step_execution.read_error_count, 0);
assert_eq!(step_execution.process_count, 0);
assert_eq!(step_execution.process_error_count, 0);
assert_eq!(step_execution.write_error_count, 0);
assert!(!step_execution.id.is_nil());
Ok(())
}
#[test]
fn tasklet_step_should_execute_successfully() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Ok(RepeatStatus::Finished));
let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "tasklet_test");
Ok(())
}
#[test]
fn tasklet_step_should_handle_tasklet_error() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Err(BatchError::Step("tasklet error".to_string())));
let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
if let Err(BatchError::Step(msg)) = result {
assert_eq!(msg, "tasklet error");
} else {
panic!("Expected Step error");
}
Ok(())
}
#[test]
fn tasklet_step_should_handle_continuable_status() -> Result<()> {
use std::cell::Cell;
let call_count = Cell::new(0);
let mut tasklet = MockTestTasklet::default();
tasklet.expect_execute().times(4).returning(move |_| {
let count = call_count.get();
call_count.set(count + 1);
if count < 3 {
Ok(RepeatStatus::Continuable)
} else {
Ok(RepeatStatus::Finished)
}
});
let step = StepBuilder::new("continuable_tasklet_test")
.tasklet(&tasklet)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "continuable_tasklet_test");
Ok(())
}
#[test]
fn tasklet_step_should_handle_multiple_continuable_cycles() -> Result<()> {
use std::cell::Cell;
let call_count = Cell::new(0);
let mut tasklet = MockTestTasklet::default();
tasklet.expect_execute().times(6).returning(move |_| {
let count = call_count.get();
call_count.set(count + 1);
if count < 5 {
Ok(RepeatStatus::Continuable)
} else {
Ok(RepeatStatus::Finished)
}
});
let step = StepBuilder::new("multi_cycle_tasklet_test")
.tasklet(&tasklet)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "multi_cycle_tasklet_test");
Ok(())
}
#[test]
fn tasklet_step_should_handle_error_after_continuable() -> Result<()> {
use std::cell::Cell;
let call_count = Cell::new(0);
let mut tasklet = MockTestTasklet::default();
tasklet.expect_execute().times(3).returning(move |_| {
let count = call_count.get();
call_count.set(count + 1);
if count < 2 {
Ok(RepeatStatus::Continuable)
} else {
Err(BatchError::Step("error after continuable".to_string()))
}
});
let step = StepBuilder::new("error_after_continuable_test")
.tasklet(&tasklet)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
if let Err(BatchError::Step(msg)) = result {
assert_eq!(msg, "error after continuable");
} else {
panic!("Expected Step error");
}
Ok(())
}
#[test]
fn tasklet_step_should_handle_immediate_finished_status() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Ok(RepeatStatus::Finished));
let step = StepBuilder::new("immediate_finished_test")
.tasklet(&tasklet)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "immediate_finished_test");
Ok(())
}
#[test]
fn tasklet_step_should_access_step_execution_context() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.withf(|step_execution| {
step_execution.name == "context_test"
&& step_execution.status == StepStatus::Started
})
.returning(|_| Ok(RepeatStatus::Finished));
let step = StepBuilder::new("context_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
Ok(())
}
#[test]
fn tasklet_builder_should_create_valid_tasklet_step() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Ok(RepeatStatus::Finished));
let step = TaskletBuilder::new("builder_test")
.tasklet(&tasklet)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "builder_test");
Ok(())
}
#[test]
fn tasklet_builder_should_panic_without_tasklet() {
let result = std::panic::catch_unwind(|| TaskletBuilder::new("test").build());
assert!(result.is_err());
}
#[test]
fn step_should_handle_writer_open_error() -> Result<()> {
let mut reader = MockTestItemReader::default();
let reader_result = Ok(None);
reader.expect_read().return_once(move || reader_result);
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer
.expect_open()
.times(1)
.returning(|| Err(BatchError::ItemWriter("open error".to_string())));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_handle_writer_close_error() -> Result<()> {
let mut reader = MockTestItemReader::default();
let reader_result = Ok(None);
reader.expect_read().return_once(move || reader_result);
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer
.expect_close()
.times(1)
.returning(|| Err(BatchError::ItemWriter("close error".to_string())));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_handle_writer_flush_error() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 2));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|_| Ok(()));
writer
.expect_flush()
.times(1)
.returning(|| Err(BatchError::ItemWriter("flush error".to_string())));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_handle_multiple_chunks_with_exact_chunk_size() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 6));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|items| {
assert_eq!(items.len(), 3); Ok(())
});
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 6);
assert_eq!(step_execution.write_count, 6);
Ok(())
}
#[test]
fn step_should_handle_skip_limit_boundary() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 2);
Ok(())
}
#[test]
fn step_should_fail_when_skip_limit_exceeded() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2, 3]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::ProcessorError);
assert_eq!(step_execution.process_error_count, 3);
Ok(())
}
#[test]
fn step_should_handle_empty_processed_chunk() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 3));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2, 3, 4]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(3) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 3);
assert_eq!(step_execution.write_count, 0);
Ok(())
}
#[test]
fn chunk_status_should_be_comparable() {
assert_eq!(ChunkStatus::Finished, ChunkStatus::Finished);
assert_eq!(ChunkStatus::Full, ChunkStatus::Full);
assert_ne!(ChunkStatus::Finished, ChunkStatus::Full);
}
#[test]
fn step_status_should_be_comparable() {
assert_eq!(StepStatus::Success, StepStatus::Success);
assert_eq!(StepStatus::ReadError, StepStatus::ReadError);
assert_eq!(StepStatus::ProcessorError, StepStatus::ProcessorError);
assert_eq!(StepStatus::WriteError, StepStatus::WriteError);
assert_eq!(StepStatus::Starting, StepStatus::Starting);
assert_ne!(StepStatus::Success, StepStatus::ReadError);
assert_ne!(StepStatus::ProcessorError, StepStatus::WriteError);
}
#[test]
fn repeat_status_should_be_comparable() {
assert_eq!(RepeatStatus::Continuable, RepeatStatus::Continuable);
assert_eq!(RepeatStatus::Finished, RepeatStatus::Finished);
assert_ne!(RepeatStatus::Continuable, RepeatStatus::Finished);
}
#[test]
fn step_builder_should_create_chunk_oriented_step() -> Result<()> {
let mut reader = MockTestItemReader::default();
reader.expect_read().return_once(|| Ok(None));
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("builder_test")
.chunk(5)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "builder_test");
Ok(())
}
#[test]
fn step_should_handle_large_chunk_size() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 5));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|items| {
assert_eq!(items.len(), 5); Ok(())
});
writer.expect_flush().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(100) .reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 5);
assert_eq!(step_execution.write_count, 5);
Ok(())
}
#[test]
fn step_should_handle_mixed_errors_within_skip_limit() -> Result<()> {
use std::cell::Cell;
let read_counter = Cell::new(0u16);
let mut reader = MockTestItemReader::default();
reader.expect_read().returning(move || {
let current = read_counter.get();
if current == 2 {
read_counter.set(current + 1);
Err(BatchError::ItemReader("read error".to_string()))
} else {
let mut i = current;
let result = mock_read(&mut i, 0, 6);
read_counter.set(i);
result
}
});
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_error_count, 1);
assert_eq!(step_execution.process_error_count, 1);
Ok(())
}
#[test]
fn step_execution_should_be_cloneable() -> Result<()> {
let step_execution = StepExecution::new("test_step");
let cloned_execution = step_execution.clone();
assert_eq!(step_execution.id, cloned_execution.id);
assert_eq!(step_execution.name, cloned_execution.name);
assert_eq!(step_execution.status, cloned_execution.status);
assert_eq!(step_execution.read_count, cloned_execution.read_count);
assert_eq!(step_execution.write_count, cloned_execution.write_count);
Ok(())
}
#[test]
fn step_should_handle_zero_chunk_size() -> Result<()> {
let mut reader = MockTestItemReader::default();
reader.expect_read().return_once(|| Ok(None));
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(1)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_handle_continuous_read_errors_until_skip_limit() -> Result<()> {
use std::cell::Cell;
let counter = Cell::new(0u16);
let mut reader = MockTestItemReader::default();
reader.expect_read().returning(move || {
let current = counter.get();
counter.set(current + 1);
if current < 3 {
Err(BatchError::ItemReader("continuous read error".to_string()))
} else {
Ok(None) }
});
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::ReadError);
assert_eq!(step_execution.read_error_count, 3);
Ok(())
}
#[test]
fn step_should_handle_write_error_with_skip_limit() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer
.expect_write()
.times(1)
.returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(0) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::WriteError);
assert_eq!(step_execution.write_error_count, 3);
Ok(())
}
#[test]
fn step_should_succeed_when_write_error_within_skip_limit() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 3));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer
.expect_write()
.times(1)
.returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(3) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.write_error_count, 3);
assert_eq!(step_execution.write_count, 0);
Ok(())
}
#[test]
fn step_should_handle_partial_chunk_at_end() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 2));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|items| {
assert_eq!(items.len(), 2); Ok(())
});
writer.expect_flush().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 2);
assert_eq!(step_execution.write_count, 2);
Ok(())
}
#[test]
fn batch_status_should_have_all_variants() {
let _completed = BatchStatus::COMPLETED;
let _starting = BatchStatus::STARTING;
let _started = BatchStatus::STARTED;
let _stopping = BatchStatus::STOPPING;
let _stopped = BatchStatus::STOPPED;
let _failed = BatchStatus::FAILED;
let _abandoned = BatchStatus::ABANDONED;
let _unknown = BatchStatus::UNKNOWN;
}
#[test]
fn tasklet_builder_should_require_tasklet() {
let mut tasklet = MockTestTasklet::default();
tasklet.expect_execute().never();
let builder = TaskletBuilder::new("test").tasklet(&tasklet);
let _step = builder.build(); }
#[test]
fn chunk_oriented_step_builder_should_require_all_components() -> Result<()> {
let mut reader = MockTestItemReader::default();
reader.expect_read().return_once(|| Ok(None));
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never();
writer.expect_close().times(1).returning(|| Ok(()));
let step = ChunkOrientedStepBuilder::new("test")
.reader(&reader)
.processor(&processor)
.writer(&writer)
.chunk_size(10)
.skip_limit(5)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step.get_name(), "test");
Ok(())
}
#[test]
fn step_should_handle_maximum_skip_limit() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 3));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2, 3]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(u16::MAX) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 3);
Ok(())
}
#[test]
fn step_should_handle_tasklet_step_timing() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Ok(RepeatStatus::Finished));
let step = StepBuilder::new("timing_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert!(step_execution.start_time.is_some());
assert!(step_execution.end_time.is_some());
assert!(step_execution.duration.is_some());
assert!(step_execution.duration.unwrap().as_nanos() > 0);
Ok(())
}
#[test]
fn step_should_handle_tasklet_step_status_transitions() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|step_execution| {
assert_eq!(step_execution.status, StepStatus::Started);
Ok(RepeatStatus::Finished)
});
let step = StepBuilder::new("status_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
assert_eq!(step_execution.status, StepStatus::Starting);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_should_handle_tasklet_step_failed_status() -> Result<()> {
let mut tasklet = MockTestTasklet::default();
tasklet
.expect_execute()
.times(1)
.returning(|_| Err(BatchError::Step("tasklet failure".to_string())));
let step = StepBuilder::new("failed_test").tasklet(&tasklet).build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_err());
assert_eq!(step_execution.status, StepStatus::Failed);
assert!(step_execution.end_time.is_some());
assert!(step_execution.duration.is_some());
Ok(())
}
#[test]
fn chunk_oriented_step_builder_should_panic_without_reader() {
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().never();
let result = std::panic::catch_unwind(|| {
ChunkOrientedStepBuilder::new("test")
.processor(&processor)
.writer(&writer)
.build()
});
assert!(result.is_err());
}
#[test]
fn chunk_oriented_step_builder_should_panic_without_processor() {
let mut reader = MockTestItemReader::default();
reader.expect_read().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().never();
let result = std::panic::catch_unwind(|| {
ChunkOrientedStepBuilder::new("test")
.reader(&reader)
.writer(&writer)
.build()
});
assert!(result.is_err());
}
#[test]
fn chunk_oriented_step_builder_should_panic_without_writer() {
let mut reader = MockTestItemReader::default();
reader.expect_read().never();
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let result = std::panic::catch_unwind(|| {
ChunkOrientedStepBuilder::new("test")
.reader(&reader)
.processor(&processor)
.build()
});
assert!(result.is_err());
}
#[test]
fn step_should_handle_read_chunk_with_full_chunk() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|items| {
assert!(items.len() <= 3);
Ok(())
});
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 4);
assert_eq!(step_execution.write_count, 4);
Ok(())
}
#[test]
fn step_should_handle_process_chunk_with_all_errors() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 3));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2, 3]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(5) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 3);
assert_eq!(step_execution.write_count, 0);
Ok(())
}
#[test]
fn step_should_handle_write_chunk_with_empty_items() -> Result<()> {
let mut reader = MockTestItemReader::default();
reader.expect_read().return_once(|| Ok(None));
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.read_count, 0);
assert_eq!(step_execution.write_count, 0);
Ok(())
}
#[test]
fn step_should_handle_is_skip_limit_reached_boundary_conditions() -> Result<()> {
let mut i = 0;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut processor = MockTestProcessor::default();
let mut i = 0;
processor
.expect_process()
.returning(move |_| mock_process(&mut i, &[1, 2]));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(2).returning(|_| Ok(()));
writer.expect_flush().times(2).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(2) .build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
assert_eq!(step_execution.process_error_count, 2);
Ok(())
}
#[test]
fn step_should_handle_manage_error_with_various_errors() -> Result<()> {
let mut reader = MockTestItemReader::default();
reader.expect_read().return_once(|| Ok(None));
let mut processor = MockTestProcessor::default();
processor.expect_process().never();
let mut writer = MockTestItemWriter::default();
writer
.expect_open()
.times(1)
.returning(|| Err(BatchError::ItemWriter("open error".to_string())));
writer.expect_write().never();
writer
.expect_close()
.times(1)
.returning(|| Err(BatchError::ItemWriter("close error".to_string())));
let step = StepBuilder::new("test")
.chunk(3)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.status, StepStatus::Success);
Ok(())
}
#[test]
fn step_execution_should_have_unique_ids() -> Result<()> {
let step_execution1 = StepExecution::new("test1");
let step_execution2 = StepExecution::new("test2");
assert_ne!(step_execution1.id, step_execution2.id);
Ok(())
}
#[test]
fn step_execution_should_clone_with_same_values() -> Result<()> {
let mut step_execution = StepExecution::new("test_step");
step_execution.read_count = 10;
step_execution.write_count = 8;
step_execution.status = StepStatus::Success;
let cloned_execution = step_execution.clone();
assert_eq!(step_execution.id, cloned_execution.id);
assert_eq!(step_execution.name, cloned_execution.name);
assert_eq!(step_execution.status, cloned_execution.status);
assert_eq!(step_execution.read_count, cloned_execution.read_count);
assert_eq!(step_execution.write_count, cloned_execution.write_count);
Ok(())
}
#[test]
fn step_status_should_support_copy_trait() {
let status1 = StepStatus::Success;
let status2 = status1;
assert_eq!(status1, status2);
assert_eq!(status1, StepStatus::Success); }
#[test]
fn step_status_should_support_debug_trait() {
let status = StepStatus::ProcessorError;
let debug_string = format!("{:?}", status);
assert!(debug_string.contains("ProcessorError"));
}
#[test]
fn chunk_status_should_support_debug_trait() {
let status = ChunkStatus::Full;
let debug_string = format!("{:?}", status);
assert!(debug_string.contains("Full"));
}
#[test]
fn repeat_status_should_support_debug_trait() {
let status = RepeatStatus::Continuable;
let debug_string = format!("{:?}", status);
assert!(debug_string.contains("Continuable"));
}
#[test]
fn step_should_count_filtered_items() -> Result<()> {
let mut i = 0u16;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 4));
let mut j = 0u16;
let mut processor = MockTestProcessor::default();
processor.expect_process().returning(move |_| {
j += 1;
if j == 2 {
return Ok(None); }
Ok(Some(Car {
year: 1979,
make: "make".to_owned(),
model: "model".to_owned(),
description: "description".to_owned(),
}))
});
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().times(1).returning(|items| {
assert_eq!(items.len(), 3, "expected 3 items written after filtering");
Ok(())
});
writer.expect_flush().returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(step_execution.read_count, 4, "should have read 4 items");
assert_eq!(
step_execution.filter_count, 1,
"should have filtered 1 item"
);
assert_eq!(
step_execution.process_count, 3,
"should have processed 3 items"
);
assert_eq!(step_execution.write_count, 3, "should have written 3 items");
Ok(())
}
#[test]
fn step_should_not_call_writer_when_all_items_filtered() -> Result<()> {
let mut i = 0u16;
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(move || mock_read(&mut i, 0, 3));
let mut processor = MockTestProcessor::default();
processor.expect_process().returning(|_| Ok(None));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
let step = StepBuilder::new("test")
.chunk(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut step_execution = StepExecution::new(&step.name);
let result = step.execute(&mut step_execution);
assert!(result.is_ok());
assert_eq!(
step_execution.filter_count, 3,
"all 3 items should be filtered"
);
assert_eq!(
step_execution.process_count, 0,
"no items should reach process_count"
);
assert_eq!(step_execution.write_count, 0, "nothing should be written");
Ok(())
}
}