use std::{
cell::RefCell,
collections::HashMap,
time::{Duration, Instant},
};
use log::info;
use uuid::Uuid;
use crate::{BatchError, core::step::StepExecution};
use super::step::Step;
type JobResult<T> = Result<T, BatchError>;
pub trait Job {
fn run(&self) -> JobResult<JobExecution>;
}
#[derive(Debug)]
pub struct JobExecution {
pub start: Instant,
pub end: Instant,
pub duration: Duration,
}
pub struct JobInstance<'a> {
id: Uuid,
name: String,
steps: Vec<&'a dyn Step>,
executions: RefCell<HashMap<String, StepExecution>>,
}
impl Job for JobInstance<'_> {
fn run(&self) -> JobResult<JobExecution> {
let start = Instant::now();
info!("Start of job: {}, id: {}", self.name, self.id);
let steps = &self.steps;
for step in steps {
let mut step_execution = StepExecution::new(step.get_name());
let result = step.execute(&mut step_execution);
self.executions
.borrow_mut()
.insert(step.get_name().to_string(), step_execution.clone());
if result.is_err() {
return Err(BatchError::Step(step_execution.name));
}
}
info!("End of job: {}, id: {}", self.name, self.id);
let job_execution = JobExecution {
start,
end: Instant::now(),
duration: start.elapsed(),
};
Ok(job_execution)
}
}
impl JobInstance<'_> {
pub fn get_step_execution(&self, name: &str) -> Option<StepExecution> {
self.executions.borrow().get(name).cloned()
}
}
#[derive(Default)]
pub struct JobBuilder<'a> {
name: Option<String>,
steps: Vec<&'a dyn Step>,
}
impl<'a> JobBuilder<'a> {
pub fn new() -> Self {
Self {
name: None,
steps: Vec::new(),
}
}
pub fn name(mut self, name: String) -> JobBuilder<'a> {
self.name = Some(name);
self
}
pub fn start(mut self, step: &'a dyn Step) -> JobBuilder<'a> {
self.steps.push(step);
self
}
pub fn next(mut self, step: &'a dyn Step) -> JobBuilder<'a> {
self.steps.push(step);
self
}
pub fn build(self) -> JobInstance<'a> {
JobInstance {
id: Uuid::new_v4(),
name: self.name.unwrap_or(Uuid::new_v4().to_string()),
steps: self.steps,
executions: RefCell::new(HashMap::new()),
}
}
}
#[cfg(test)]
mod tests {
use super::{Job, JobBuilder};
use crate::BatchError;
use crate::core::{
item::{ItemReader, ItemReaderResult, ItemWriter, ItemWriterResult, PassThroughProcessor},
step::{StepBuilder, StepStatus},
};
use mockall::mock;
mock! {
TestItemReader {}
impl ItemReader<i32> for TestItemReader {
fn read(&self) -> ItemReaderResult<i32>;
}
}
mock! {
TestItemWriter {}
impl ItemWriter<i32> for TestItemWriter {
fn open(&self) -> ItemWriterResult;
fn write(&self, items: &[i32]) -> ItemWriterResult;
fn flush(&self) -> ItemWriterResult;
fn close(&self) -> ItemWriterResult;
}
}
#[test]
fn job_should_run_steps_in_sequence() {
let mut reader = MockTestItemReader::default();
let mut call_count = 0;
reader.expect_read().returning(move || {
call_count += 1;
if call_count <= 2 {
Ok(Some(call_count))
} else {
Ok(None)
}
});
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_write().returning(|_| Ok(()));
writer.expect_flush().returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let processor = PassThroughProcessor::<i32>::new();
let step = StepBuilder::new("test")
.chunk(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new()
.name("test-job".to_string())
.start(&step)
.build();
let result = job.run();
assert!(result.is_ok());
}
#[test]
fn job_should_fail_when_step_fails() {
let mut reader = MockTestItemReader::default();
reader
.expect_read()
.returning(|| Err(BatchError::ItemReader("read error".to_string())));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let processor = PassThroughProcessor::<i32>::new();
let step = StepBuilder::new("failing-step")
.chunk(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new()
.name("test-job".to_string())
.start(&step)
.build();
let result = job.run();
assert!(matches!(result.unwrap_err(), BatchError::Step(name) if name == "failing-step"));
}
#[test]
fn job_should_store_step_executions() {
let mut reader = MockTestItemReader::default();
reader.expect_read().returning(|| Ok(None));
let mut writer = MockTestItemWriter::default();
writer.expect_open().times(1).returning(|| Ok(()));
writer.expect_close().times(1).returning(|| Ok(()));
let processor = PassThroughProcessor::<i32>::new();
let step = StepBuilder::new("named-step")
.chunk(2)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new()
.name("test-job".to_string())
.start(&step)
.build();
let _ = job.run();
let execution = job.get_step_execution("named-step");
assert!(execution.is_some());
assert_eq!(execution.unwrap().status, StepStatus::Success);
}
#[test]
fn job_should_run_multiple_steps_added_with_next() {
let mut reader1 = MockTestItemReader::default();
reader1.expect_read().returning(|| Ok(None));
let mut writer1 = MockTestItemWriter::default();
writer1.expect_open().times(1).returning(|| Ok(()));
writer1.expect_close().times(1).returning(|| Ok(()));
let mut reader2 = MockTestItemReader::default();
reader2.expect_read().returning(|| Ok(None));
let mut writer2 = MockTestItemWriter::default();
writer2.expect_open().times(1).returning(|| Ok(()));
writer2.expect_close().times(1).returning(|| Ok(()));
let processor = PassThroughProcessor::<i32>::new();
let step1 = StepBuilder::new("step1")
.chunk(2)
.reader(&reader1)
.processor(&processor)
.writer(&writer1)
.build();
let step2 = StepBuilder::new("step2")
.chunk(2)
.reader(&reader2)
.processor(&processor)
.writer(&writer2)
.build();
let job = JobBuilder::new().start(&step1).next(&step2).build();
let result = job.run();
assert!(result.is_ok(), "job with two steps should succeed");
assert!(job.get_step_execution("step1").is_some());
assert!(job.get_step_execution("step2").is_some());
}
}