spring_batch_rs/item/json/
json_reader.rs1use std::{
2 cell::{Cell, RefCell},
3 io::{BufRead, BufReader, Read},
4 marker::PhantomData,
5};
6
7use log::debug;
8use serde::de::DeserializeOwned;
9
10use crate::{
11 core::item::{ItemReader, ItemReaderResult},
12 BatchError,
13};
14
15#[derive(Debug)]
16enum JsonParserResult {
17 NotEnded,
18 ParsingError { error: serde_json::Error },
19}
20
21pub struct JsonItemReader<R, T> {
22 pd: PhantomData<T>,
23 reader: RefCell<BufReader<R>>,
24 capacity: usize,
25 level: Cell<u16>,
26 index: Cell<usize>,
27 object: RefCell<Vec<u8>>,
28}
29
30impl<R: Read, T: DeserializeOwned> JsonItemReader<R, T> {
31 fn new(rdr: R, capacity: usize) -> Self {
32 let buf_reader = BufReader::with_capacity(capacity, rdr);
33
34 Self {
35 pd: PhantomData,
36 reader: RefCell::new(buf_reader),
37 capacity,
38 level: Cell::new(0),
39 index: Cell::new(0),
40 object: RefCell::new(Vec::new()),
41 }
42 }
43
44 fn get_current_char(&self, buffer: &[u8]) -> u8 {
45 buffer[self.index.get()]
46 }
47
48 fn is_new_seq(&self, buffer: &[u8]) -> bool {
49 self.level == 0.into() && self.get_current_char(buffer) == b'['
50 }
51
52 fn is_end_seq(&self, buffer: &[u8]) -> bool {
53 self.level == 0.into() && self.get_current_char(buffer) == b']'
54 }
55
56 fn is_new_object(&self, buffer: &[u8]) -> bool {
57 self.level == 0.into() && self.get_current_char(buffer) == b'{'
58 }
59
60 fn is_end_object(&self, buffer: &[u8]) -> bool {
61 self.level == 1.into() && self.get_current_char(buffer) == b'}'
62 }
63
64 fn start_new(&self) {
65 self.object.borrow_mut().clear();
66 }
67
68 fn append_char(&self, buffer: &[u8]) {
69 let current_char = self.get_current_char(buffer);
70 if current_char != b' ' && current_char != b'\n' {
71 self.object.borrow_mut().push(self.get_current_char(buffer));
72 }
73 }
74
75 fn clear_buff(&self) {
76 self.index.set(0);
77 }
78
79 fn level_inc(&self) {
80 self.level.set(self.level.get() + 1);
81 }
82
83 fn level_dec(&self) {
84 self.level.set(self.level.get() - 1);
85 }
86
87 fn index_inc(&self) {
88 self.index.set(self.index.get() + 1);
89 }
90
91 fn next(&self, buffer: &[u8]) -> Result<T, JsonParserResult> {
92 while self.index.get() < buffer.len() - 1 && !self.is_end_seq(buffer) {
93 if self.is_new_object(buffer) {
94 self.start_new();
95 } else if self.is_new_seq(buffer) {
96 self.index_inc();
97 continue;
98 }
99
100 let current_char = self.get_current_char(buffer);
101
102 if current_char == b'{' {
103 self.level_inc();
104 } else if current_char == b'}' {
105 self.level_dec();
106 }
107
108 self.append_char(buffer);
109
110 self.index_inc();
111
112 if self.is_end_object(buffer) {
113 self.append_char(buffer);
114
115 let result = serde_json::from_slice(self.object.borrow_mut().as_slice());
116 debug!(
117 "object ok: {}",
118 std::str::from_utf8(self.object.borrow().as_slice()).unwrap()
119 );
120 return match result {
121 Ok(record) => Ok(record),
122 Err(error) => Err(JsonParserResult::ParsingError { error }),
123 };
124 }
125 }
126
127 self.append_char(buffer);
128 Err(JsonParserResult::NotEnded)
129 }
130}
131
132impl<R: Read, T: DeserializeOwned> ItemReader<T> for JsonItemReader<R, T> {
133 fn read(&self) -> ItemReaderResult<T> {
134 let mut buf_reader = self.reader.borrow_mut();
135
136 loop {
137 let buffer = &mut buf_reader.fill_buf().unwrap();
138
139 let buffer_length = buffer.len();
140
141 if buffer_length == 0 {
142 return Ok(None);
143 }
144
145 let result: Result<T, JsonParserResult> = self.next(buffer);
146
147 if let Ok(record) = result {
148 return Ok(Some(record));
149 } else if let Err(error) = result {
150 match error {
151 JsonParserResult::NotEnded => {
152 self.clear_buff();
153 buf_reader.consume(self.capacity)
154 }
155 JsonParserResult::ParsingError { error } => {
156 return Err(BatchError::ItemReader(error.to_string()))
157 }
158 }
159 }
160 }
161 }
162}
163
164#[derive(Default)]
165pub struct JsonItemReaderBuilder<T> {
166 _pd: PhantomData<T>,
167 capacity: Option<usize>,
168}
169
170impl<T: DeserializeOwned> JsonItemReaderBuilder<T> {
171 pub fn new() -> JsonItemReaderBuilder<T> {
172 Self {
173 _pd: PhantomData,
174 capacity: Some(8 * 1024),
175 }
176 }
177
178 pub fn capacity(mut self, capacity: usize) -> JsonItemReaderBuilder<T> {
179 self.capacity = Some(capacity);
180 self
181 }
182
183 pub fn from_reader<R: Read>(self, rdr: R) -> JsonItemReader<R, T> {
184 JsonItemReader::new(rdr, self.capacity.unwrap())
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::{error::Error, fs::File, io::Cursor, path::Path};
191
192 use crate::{
193 core::item::{ItemReader, ItemReaderResult},
194 item::{fake::person_reader::Person, json::json_reader::JsonItemReaderBuilder},
195 };
196
197 #[test]
198 fn content_from_file_should_be_deserialized() -> Result<(), Box<dyn Error>> {
199 let path = Path::new("examples/data/persons.json");
200
201 let file = File::options()
202 .append(true)
203 .read(true)
204 .create(false)
205 .open(path)
206 .expect("Unable to open file");
207
208 let reader = JsonItemReaderBuilder::new().capacity(320).from_reader(file);
209
210 let result: ItemReaderResult<Person> = reader.read();
211
212 assert!(result.is_ok());
213 assert_eq!(
214 "first_name:Océane, last_name:Dupond, birth_date:1963-05-16",
215 result.unwrap().unwrap().to_string()
216 );
217
218 let result: ItemReaderResult<Person> = reader.read();
219 assert!(result.is_ok());
220 assert_eq!(
221 "first_name:Amandine, last_name:Évrat, birth_date:1933-07-12",
222 result.unwrap().unwrap().to_string()
223 );
224
225 let result: ItemReaderResult<Person> = reader.read();
226 assert!(result.is_ok());
227 assert_eq!(
228 "first_name:Ugo, last_name:Niels, birth_date:1980-04-05",
229 result.unwrap().unwrap().to_string()
230 );
231
232 let result: ItemReaderResult<Person> = reader.read();
233 assert!(result.is_ok());
234 assert_eq!(
235 "first_name:Léo, last_name:Zola, birth_date:1914-08-13",
236 result.unwrap().unwrap().to_string()
237 );
238
239 Ok(())
240 }
241
242 #[test]
243 fn content_from_bytes_should_be_deserialized() -> Result<(), Box<dyn Error>> {
244 let input = Cursor::new(String::from("foo\nbar\nbaz\n"));
245
246 let reader = JsonItemReaderBuilder::new()
247 .capacity(320)
248 .from_reader(input);
249
250 let result: ItemReaderResult<Person> = reader.read();
251
252 assert!(result.is_ok());
253
254 Ok(())
255 }
256}