mod common;
use common::MockFile;
use ::serde::ser::Error;
use rand::distr::{Alphanumeric, SampleString};
use serde::{Deserialize, Serialize, Serializer};
use std::{
env::temp_dir,
fs::{self},
io::{self, ErrorKind},
};
use spring_batch_rs::{
core::{
item::{ItemProcessor, ItemProcessorResult},
job::{Job, JobBuilder},
step::StepBuilder,
},
item::{csv::csv_reader::CsvItemReaderBuilder, json::json_writer::JsonItemWriterBuilder},
};
use time::{Date, format_description};
fn date_serializer<S>(date: &Date, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let result = format_description::parse("[year]-[month]-[day]");
match result {
Ok(format) => {
let s = date.format(&format).unwrap();
serializer.serialize_str(&s)
}
Err(error) => Err(Error::custom(error.to_string())),
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct Person {
first_name: String,
last_name: String,
title: String,
email: String,
#[serde(serialize_with = "date_serializer")]
birth_date: Date,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Car {
year: u16,
make: String,
model: String,
description: String,
}
#[derive(Default)]
struct CarProcessor;
impl ItemProcessor<Car, Car> for CarProcessor {
fn process(&self, item: &Car) -> ItemProcessorResult<Car> {
Ok(Some(item.clone()))
}
}
#[test]
fn transform_csv_stream_to_json_file_with_error_at_first() {
let csv = "year,make,model,description
1948d,Porsche,356,Luxury sports car
2011,Peugeot,206+,City car
2012,Citroën,C4 Picasso,SUV
2021,Mazda,CX-30,SUV Compact
1967,Ford,Mustang fastback 1967,American car";
let reader = CsvItemReaderBuilder::<Car>::new()
.has_headers(true)
.from_reader(csv.as_bytes());
let file_name = Alphanumeric.sample_string(&mut rand::rng(), 16);
let writer = JsonItemWriterBuilder::new().from_path(temp_dir().join(file_name.clone()));
let step = StepBuilder::new("test")
.chunk::<Car, Car>(3)
.reader(&reader)
.processor(&CarProcessor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run();
assert!(result.is_err());
let file_content = fs::read_to_string(temp_dir().join(file_name))
.expect("Should have been able to read the file");
assert_eq!(
file_content,
r#"[]
"#
);
}
#[test]
fn transform_csv_stream_to_json_file_with_error_at_end() {
let csv = "year,make,model,description
1948,Porsche,356,Luxury sports car
2011,Peugeot,206+,City car
2012,Citroën,C4 Picasso,SUV
2021,Mazda,CX-30,SUV Compact
1967d,Ford,Mustang fastback 1967,American car";
let reader = CsvItemReaderBuilder::<Car>::new()
.has_headers(true)
.from_reader(csv.as_bytes());
let file_name = Alphanumeric.sample_string(&mut rand::rng(), 16);
let writer = JsonItemWriterBuilder::new().from_path(temp_dir().join(file_name.clone()));
let step = StepBuilder::new("test")
.chunk::<Car, Car>(3)
.reader(&reader)
.processor(&CarProcessor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run();
assert!(result.is_err());
let file_content = fs::read_to_string(temp_dir().join(file_name))
.expect("Should have been able to read the file");
assert_eq!(
file_content,
r#"[{"year":1948,"make":"Porsche","model":"356","description":"Luxury sports car"},{"year":2011,"make":"Peugeot","model":"206+","description":"City car"},{"year":2012,"make":"Citroën","model":"C4 Picasso","description":"SUV"}]
"#
);
}
#[test]
fn transform_csv_stream_to_writer_with_error() {
let _ = env_logger::builder().is_test(true).try_init();
use std::println as log;
log!("Starting transform_csv_stream_to_writer_with_error test");
let csv = "year,make,model,description
1948,Porsche,356,Luxury sports car";
let reader = CsvItemReaderBuilder::<Car>::new()
.has_headers(true)
.from_reader(csv.as_bytes());
let mut file = MockFile::default();
let write_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let write_count_clone = write_count.clone();
file.expect_write().returning(move |_buf| {
let count = write_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
log!("Write called #{}, returning error", count);
let err = io::Error::from(ErrorKind::PermissionDenied);
Result::Err(err)
});
let writer = JsonItemWriterBuilder::new().from_writer(file);
let step = StepBuilder::new("test")
.chunk(1)
.reader(&reader)
.processor(&CarProcessor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
let result = job.run();
assert!(result.is_ok());
assert!(
write_count.load(std::sync::atomic::Ordering::SeqCst) > 0,
"Write should have been called at least once"
);
}