use std::{
cell::{Cell, RefCell},
io::{BufRead, BufReader, Read},
marker::PhantomData,
};
use log::debug;
use serde::de::DeserializeOwned;
use crate::{
BatchError,
core::item::{ItemReader, ItemReaderResult},
};
#[derive(Debug)]
enum JsonParserResult {
NotEnded,
ParsingError { error: serde_json::Error },
}
pub struct JsonItemReader<I, R: Read> {
pd: PhantomData<I>,
reader: RefCell<BufReader<R>>,
capacity: usize,
level: Cell<u16>,
index: Cell<usize>,
object: RefCell<Vec<u8>>,
}
impl<I: DeserializeOwned, R: Read> JsonItemReader<I, R> {
fn new(rdr: R, capacity: usize) -> Self {
let buf_reader = BufReader::with_capacity(capacity, rdr);
Self {
pd: PhantomData,
reader: RefCell::new(buf_reader),
capacity,
level: Cell::new(0),
index: Cell::new(0),
object: RefCell::new(Vec::new()),
}
}
fn get_current_char(&self, buffer: &[u8]) -> u8 {
buffer[self.index.get()]
}
fn is_new_seq(&self, buffer: &[u8]) -> bool {
self.level == 0.into() && self.get_current_char(buffer) == b'['
}
fn is_end_seq(&self, buffer: &[u8]) -> bool {
self.level == 0.into() && self.get_current_char(buffer) == b']'
}
fn is_new_object(&self, buffer: &[u8]) -> bool {
self.level == 0.into() && self.get_current_char(buffer) == b'{'
}
fn is_end_object(&self, buffer: &[u8]) -> bool {
self.level == 1.into() && self.get_current_char(buffer) == b'}'
}
fn start_new(&self) {
self.object.borrow_mut().clear();
}
fn append_char(&self, buffer: &[u8]) {
let current_char = self.get_current_char(buffer);
if current_char != b' ' && current_char != b'\n' {
self.object.borrow_mut().push(self.get_current_char(buffer));
}
}
fn clear_buff(&self) {
self.index.set(0);
}
fn level_inc(&self) {
self.level.set(self.level.get() + 1);
}
fn level_dec(&self) {
self.level.set(self.level.get() - 1);
}
fn index_inc(&self) {
self.index.set(self.index.get() + 1);
}
fn next(&self, buffer: &[u8]) -> Result<I, JsonParserResult> {
while self.index.get() < buffer.len() - 1 && !self.is_end_seq(buffer) {
if self.is_new_object(buffer) {
self.start_new();
} else if self.is_new_seq(buffer) {
self.index_inc();
continue;
}
let current_char = self.get_current_char(buffer);
if current_char == b'{' {
self.level_inc();
} else if current_char == b'}' {
self.level_dec();
}
self.append_char(buffer);
self.index_inc();
if self.is_end_object(buffer) {
self.append_char(buffer);
let result = serde_json::from_slice(self.object.borrow_mut().as_slice());
debug!(
"object ok: {}",
std::str::from_utf8(self.object.borrow().as_slice()).unwrap()
);
return match result {
Ok(record) => Ok(record),
Err(error) => Err(JsonParserResult::ParsingError { error }),
};
}
}
self.append_char(buffer);
Err(JsonParserResult::NotEnded)
}
}
impl<I: DeserializeOwned, R: Read> ItemReader<I> for JsonItemReader<I, R> {
fn read(&self) -> ItemReaderResult<I> {
let mut buf_reader = self.reader.borrow_mut();
loop {
let buffer = &mut buf_reader.fill_buf().unwrap();
let buffer_length = buffer.len();
if buffer_length == 0 {
return Ok(None);
}
let result: Result<I, JsonParserResult> = self.next(buffer);
if let Ok(record) = result {
return Ok(Some(record));
} else if let Err(error) = result {
match error {
JsonParserResult::NotEnded => {
self.clear_buff();
buf_reader.consume(self.capacity)
}
JsonParserResult::ParsingError { error } => {
return Err(BatchError::ItemReader(error.to_string()));
}
}
}
}
}
}
#[derive(Default)]
pub struct JsonItemReaderBuilder<I> {
_pd: PhantomData<I>,
capacity: Option<usize>,
}
impl<I: DeserializeOwned> JsonItemReaderBuilder<I> {
pub fn new() -> JsonItemReaderBuilder<I> {
Self {
_pd: PhantomData,
capacity: Some(8 * 1024),
}
}
pub fn capacity(mut self, capacity: usize) -> JsonItemReaderBuilder<I> {
self.capacity = Some(capacity);
self
}
pub fn from_reader<R: Read>(self, rdr: R) -> JsonItemReader<I, R> {
JsonItemReader::new(rdr, self.capacity.unwrap())
}
}
#[cfg(test)]
mod tests {
use std::{error::Error, io::Cursor};
use crate::{
core::item::{ItemReader, ItemReaderResult},
item::{fake::person_reader::Person, json::json_reader::JsonItemReaderBuilder},
};
const PERSONS_JSON: &str = r#"[
{"first_name": "Océane", "last_name": "Dupond", "title": "Mr.", "email": "leopold_enim@orange.fr", "birth_date": "1963-05-16"},
{"first_name": "Amandine", "last_name": "Évrat", "title": "Mrs.", "email": "amandine_iure@outlook.fr", "birth_date": "1933-07-12"},
{"first_name": "Ugo", "last_name": "Niels", "title": "Sir.", "email": "xavier_voluptatem@sfr.fr", "birth_date": "1980-04-05"},
{"first_name": "Léo", "last_name": "Zola", "title": "Dr.", "email": "ugo_praesentium@orange.fr", "birth_date": "1914-08-13"}
]"#;
#[test]
fn content_from_reader_should_be_deserialized() -> Result<(), Box<dyn Error>> {
let reader = JsonItemReaderBuilder::new()
.capacity(320)
.from_reader(Cursor::new(PERSONS_JSON));
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert_eq!(
"first_name:Océane, last_name:Dupond, birth_date:1963-05-16",
result.unwrap().unwrap().to_string()
);
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert_eq!(
"first_name:Amandine, last_name:Évrat, birth_date:1933-07-12",
result.unwrap().unwrap().to_string()
);
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert_eq!(
"first_name:Ugo, last_name:Niels, birth_date:1980-04-05",
result.unwrap().unwrap().to_string()
);
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert_eq!(
"first_name:Léo, last_name:Zola, birth_date:1914-08-13",
result.unwrap().unwrap().to_string()
);
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert!(result.unwrap().is_none());
Ok(())
}
#[test]
fn should_return_error_when_json_object_fails_to_deserialize() {
use crate::BatchError;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct StrictItem {
#[allow(dead_code)]
id: u32,
}
let json = r#"[{"wrong_field": 42}]"#;
let reader = JsonItemReaderBuilder::<StrictItem>::new().from_reader(Cursor::new(json));
let result = reader.read();
assert!(
result.is_err(),
"should fail when JSON doesn't match target type"
);
match result {
Err(BatchError::ItemReader(_)) => {}
other => panic!("expected ItemReader error, got {other:?}"),
}
}
#[test]
fn should_handle_object_spanning_multiple_buffer_reads() {
use serde::Deserialize;
#[derive(Deserialize, PartialEq, Debug)]
struct Item {
id: u32,
name: String,
}
let json = r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#;
let reader = JsonItemReaderBuilder::<Item>::new()
.capacity(10)
.from_reader(Cursor::new(json));
let item1 = reader.read().unwrap().unwrap();
assert_eq!(item1.id, 1);
assert_eq!(item1.name, "Alice");
let item2 = reader.read().unwrap().unwrap();
assert_eq!(item2.id, 2);
assert_eq!(item2.name, "Bob");
assert!(reader.read().unwrap().is_none());
}
#[test]
fn content_from_bytes_should_be_deserialized() -> Result<(), Box<dyn Error>> {
let input = Cursor::new(String::from("foo\nbar\nbaz\n"));
let reader = JsonItemReaderBuilder::new()
.capacity(320)
.from_reader(input);
let result: ItemReaderResult<Person> = reader.read();
assert!(result.is_ok());
assert!(result.unwrap().is_none());
Ok(())
}
}