spring_batch_rs/item/json/
json_reader.rs

1use std::{
2    cell::{Cell, RefCell},
3    io::{BufRead, BufReader, Read},
4    marker::PhantomData,
5};
6
7use log::debug;
8use serde::de::DeserializeOwned;
9
10use crate::{
11    core::item::{ItemReader, ItemReaderResult},
12    BatchError,
13};
14
15#[derive(Debug)]
16enum JsonParserResult {
17    NotEnded,
18    ParsingError { error: serde_json::Error },
19}
20
21pub struct JsonItemReader<R, T> {
22    pd: PhantomData<T>,
23    reader: RefCell<BufReader<R>>,
24    capacity: usize,
25    level: Cell<u16>,
26    index: Cell<usize>,
27    object: RefCell<Vec<u8>>,
28}
29
30impl<R: Read, T: DeserializeOwned> JsonItemReader<R, T> {
31    fn new(rdr: R, capacity: usize) -> Self {
32        let buf_reader = BufReader::with_capacity(capacity, rdr);
33
34        Self {
35            pd: PhantomData,
36            reader: RefCell::new(buf_reader),
37            capacity,
38            level: Cell::new(0),
39            index: Cell::new(0),
40            object: RefCell::new(Vec::new()),
41        }
42    }
43
44    fn get_current_char(&self, buffer: &[u8]) -> u8 {
45        buffer[self.index.get()]
46    }
47
48    fn is_new_seq(&self, buffer: &[u8]) -> bool {
49        self.level == 0.into() && self.get_current_char(buffer) == b'['
50    }
51
52    fn is_end_seq(&self, buffer: &[u8]) -> bool {
53        self.level == 0.into() && self.get_current_char(buffer) == b']'
54    }
55
56    fn is_new_object(&self, buffer: &[u8]) -> bool {
57        self.level == 0.into() && self.get_current_char(buffer) == b'{'
58    }
59
60    fn is_end_object(&self, buffer: &[u8]) -> bool {
61        self.level == 1.into() && self.get_current_char(buffer) == b'}'
62    }
63
64    fn start_new(&self) {
65        self.object.borrow_mut().clear();
66    }
67
68    fn append_char(&self, buffer: &[u8]) {
69        let current_char = self.get_current_char(buffer);
70        if current_char != b' ' && current_char != b'\n' {
71            self.object.borrow_mut().push(self.get_current_char(buffer));
72        }
73    }
74
75    fn clear_buff(&self) {
76        self.index.set(0);
77    }
78
79    fn level_inc(&self) {
80        self.level.set(self.level.get() + 1);
81    }
82
83    fn level_dec(&self) {
84        self.level.set(self.level.get() - 1);
85    }
86
87    fn index_inc(&self) {
88        self.index.set(self.index.get() + 1);
89    }
90
91    fn next(&self, buffer: &[u8]) -> Result<T, JsonParserResult> {
92        while self.index.get() < buffer.len() - 1 && !self.is_end_seq(buffer) {
93            if self.is_new_object(buffer) {
94                self.start_new();
95            } else if self.is_new_seq(buffer) {
96                self.index_inc();
97                continue;
98            }
99
100            let current_char = self.get_current_char(buffer);
101
102            if current_char == b'{' {
103                self.level_inc();
104            } else if current_char == b'}' {
105                self.level_dec();
106            }
107
108            self.append_char(buffer);
109
110            self.index_inc();
111
112            if self.is_end_object(buffer) {
113                self.append_char(buffer);
114
115                let result = serde_json::from_slice(self.object.borrow_mut().as_slice());
116                debug!(
117                    "object ok: {}",
118                    std::str::from_utf8(self.object.borrow().as_slice()).unwrap()
119                );
120                return match result {
121                    Ok(record) => Ok(record),
122                    Err(error) => Err(JsonParserResult::ParsingError { error }),
123                };
124            }
125        }
126
127        self.append_char(buffer);
128        Err(JsonParserResult::NotEnded)
129    }
130}
131
132impl<R: Read, T: DeserializeOwned> ItemReader<T> for JsonItemReader<R, T> {
133    fn read(&self) -> ItemReaderResult<T> {
134        let mut buf_reader = self.reader.borrow_mut();
135
136        loop {
137            let buffer = &mut buf_reader.fill_buf().unwrap();
138
139            let buffer_length = buffer.len();
140
141            if buffer_length == 0 {
142                return Ok(None);
143            }
144
145            let result: Result<T, JsonParserResult> = self.next(buffer);
146
147            if let Ok(record) = result {
148                return Ok(Some(record));
149            } else if let Err(error) = result {
150                match error {
151                    JsonParserResult::NotEnded => {
152                        self.clear_buff();
153                        buf_reader.consume(self.capacity)
154                    }
155                    JsonParserResult::ParsingError { error } => {
156                        return Err(BatchError::ItemReader(error.to_string()))
157                    }
158                }
159            }
160        }
161    }
162}
163
164#[derive(Default)]
165pub struct JsonItemReaderBuilder<T> {
166    _pd: PhantomData<T>,
167    capacity: Option<usize>,
168}
169
170impl<T: DeserializeOwned> JsonItemReaderBuilder<T> {
171    pub fn new() -> JsonItemReaderBuilder<T> {
172        Self {
173            _pd: PhantomData,
174            capacity: Some(8 * 1024),
175        }
176    }
177
178    pub fn capacity(mut self, capacity: usize) -> JsonItemReaderBuilder<T> {
179        self.capacity = Some(capacity);
180        self
181    }
182
183    pub fn from_reader<R: Read>(self, rdr: R) -> JsonItemReader<R, T> {
184        JsonItemReader::new(rdr, self.capacity.unwrap())
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::{error::Error, fs::File, io::Cursor, path::Path};
191
192    use crate::{
193        core::item::{ItemReader, ItemReaderResult},
194        item::{fake::person_reader::Person, json::json_reader::JsonItemReaderBuilder},
195    };
196
197    #[test]
198    fn content_from_file_should_be_deserialized() -> Result<(), Box<dyn Error>> {
199        let path = Path::new("examples/data/persons.json");
200
201        let file = File::options()
202            .append(true)
203            .read(true)
204            .create(false)
205            .open(path)
206            .expect("Unable to open file");
207
208        let reader = JsonItemReaderBuilder::new().capacity(320).from_reader(file);
209
210        let result: ItemReaderResult<Person> = reader.read();
211
212        assert!(result.is_ok());
213        assert_eq!(
214            "first_name:Océane, last_name:Dupond, birth_date:1963-05-16",
215            result.unwrap().unwrap().to_string()
216        );
217
218        let result: ItemReaderResult<Person> = reader.read();
219        assert!(result.is_ok());
220        assert_eq!(
221            "first_name:Amandine, last_name:Évrat, birth_date:1933-07-12",
222            result.unwrap().unwrap().to_string()
223        );
224
225        let result: ItemReaderResult<Person> = reader.read();
226        assert!(result.is_ok());
227        assert_eq!(
228            "first_name:Ugo, last_name:Niels, birth_date:1980-04-05",
229            result.unwrap().unwrap().to_string()
230        );
231
232        let result: ItemReaderResult<Person> = reader.read();
233        assert!(result.is_ok());
234        assert_eq!(
235            "first_name:Léo, last_name:Zola, birth_date:1914-08-13",
236            result.unwrap().unwrap().to_string()
237        );
238
239        Ok(())
240    }
241
242    #[test]
243    fn content_from_bytes_should_be_deserialized() -> Result<(), Box<dyn Error>> {
244        let input = Cursor::new(String::from("foo\nbar\nbaz\n"));
245
246        let reader = JsonItemReaderBuilder::new()
247            .capacity(320)
248            .from_reader(input);
249
250        let result: ItemReaderResult<Person> = reader.read();
251
252        assert!(result.is_ok());
253
254        Ok(())
255    }
256}