spring_batch_rs/item/json/json_reader.rs
1use std::{
2 cell::{Cell, RefCell},
3 io::{BufRead, BufReader, Read},
4 marker::PhantomData,
5};
6
7use log::debug;
8use serde::de::DeserializeOwned;
9
10use crate::{
11 core::item::{ItemReader, ItemReaderResult},
12 BatchError,
13};
14
15/// Internal structure to represent the parsing state result
16#[derive(Debug)]
17enum JsonParserResult {
18 /// Indicates that the parser has not yet reached the end of the JSON array
19 NotEnded,
20 /// Indicates a parsing error occurred with the specific serde_json error
21 ParsingError { error: serde_json::Error },
22}
23
24/// A reader that reads items from a JSON source.
25///
26/// The reader expects JSON data in an array format, where each object in the array
27/// represents a single item to be processed. It implements a streaming approach
28/// that allows reading large JSON files without loading the entire file into memory.
29///
30/// # Examples
31///
32/// ```
33/// use spring_batch_rs::item::json::JsonItemReaderBuilder;
34/// use spring_batch_rs::core::item::ItemReader;
35/// use serde::Deserialize;
36/// use std::io::Cursor;
37///
38/// // Define a structure matching our JSON format
39/// #[derive(Debug, Deserialize, PartialEq)]
40/// struct Product {
41/// id: u32,
42/// name: String,
43/// price: f64,
44/// }
45///
46/// // Create some JSON data with products
47/// let json_data = r#"[
48/// {"id": 1, "name": "Keyboard", "price": 49.99},
49/// {"id": 2, "name": "Mouse", "price": 29.99},
50/// {"id": 3, "name": "Monitor", "price": 199.99}
51/// ]"#;
52///
53/// // Create a reader using the builder
54/// let cursor = Cursor::new(json_data);
55/// let reader = JsonItemReaderBuilder::<Product>::new()
56/// .from_reader(cursor);
57///
58/// // Read all products
59/// let product1 = reader.read().unwrap().unwrap();
60/// assert_eq!(product1.id, 1);
61/// assert_eq!(product1.name, "Keyboard");
62/// assert_eq!(product1.price, 49.99);
63///
64/// let product2 = reader.read().unwrap().unwrap();
65/// assert_eq!(product2.id, 2);
66///
67/// let product3 = reader.read().unwrap().unwrap();
68/// assert_eq!(product3.id, 3);
69///
70/// // No more products
71/// assert!(reader.read().unwrap().is_none());
72/// ```
73pub struct JsonItemReader<I, R: Read> {
74 /// Phantom data to handle the generic type parameter T (item type)
75 pd: PhantomData<I>,
76 /// Buffered reader for the input source
77 reader: RefCell<BufReader<R>>,
78 /// Buffer capacity in bytes
79 capacity: usize,
80 /// Current nesting level while parsing JSON
81 level: Cell<u16>,
82 /// Current position within the buffer
83 index: Cell<usize>,
84 /// Buffer for the current JSON object being parsed
85 object: RefCell<Vec<u8>>,
86}
87
88impl<I: DeserializeOwned, R: Read> JsonItemReader<I, R> {
89 /// Creates a new JSON item reader with the specified input source and buffer capacity.
90 fn new(rdr: R, capacity: usize) -> Self {
91 let buf_reader = BufReader::with_capacity(capacity, rdr);
92
93 Self {
94 pd: PhantomData,
95 reader: RefCell::new(buf_reader),
96 capacity,
97 level: Cell::new(0),
98 index: Cell::new(0),
99 object: RefCell::new(Vec::new()),
100 }
101 }
102
103 /// Gets the character at the current index in the buffer
104 fn get_current_char(&self, buffer: &[u8]) -> u8 {
105 buffer[self.index.get()]
106 }
107
108 /// Checks if the current character is the beginning of a new JSON array
109 fn is_new_seq(&self, buffer: &[u8]) -> bool {
110 self.level == 0.into() && self.get_current_char(buffer) == b'['
111 }
112
113 /// Checks if the current character is the end of a JSON array
114 fn is_end_seq(&self, buffer: &[u8]) -> bool {
115 self.level == 0.into() && self.get_current_char(buffer) == b']'
116 }
117
118 /// Checks if the current character is the beginning of a new JSON object
119 fn is_new_object(&self, buffer: &[u8]) -> bool {
120 self.level == 0.into() && self.get_current_char(buffer) == b'{'
121 }
122
123 /// Checks if the current character is the end of a JSON object at level 1
124 /// (an object directly inside the main array)
125 fn is_end_object(&self, buffer: &[u8]) -> bool {
126 self.level == 1.into() && self.get_current_char(buffer) == b'}'
127 }
128
129 /// Clears the object buffer to start parsing a new object
130 fn start_new(&self) {
131 self.object.borrow_mut().clear();
132 }
133
134 /// Adds the current character to the object buffer, ignoring whitespace
135 fn append_char(&self, buffer: &[u8]) {
136 let current_char = self.get_current_char(buffer);
137 if current_char != b' ' && current_char != b'\n' {
138 self.object.borrow_mut().push(self.get_current_char(buffer));
139 }
140 }
141
142 /// Resets the index to the beginning of the buffer
143 fn clear_buff(&self) {
144 self.index.set(0);
145 }
146
147 /// Increments the nesting level when entering a new object or array
148 fn level_inc(&self) {
149 self.level.set(self.level.get() + 1);
150 }
151
152 /// Decrements the nesting level when exiting an object or array
153 fn level_dec(&self) {
154 self.level.set(self.level.get() - 1);
155 }
156
157 /// Moves to the next character in the buffer
158 fn index_inc(&self) {
159 self.index.set(self.index.get() + 1);
160 }
161
162 /// Attempts to read the next item from the current buffer
163 ///
164 /// This method parses the JSON buffer character by character, keeping track of nesting levels,
165 /// and tries to extract a complete JSON object. When it finds a complete object at level 1,
166 /// it deserializes it into the target type T.
167 ///
168 /// # Returns
169 /// - `Ok(T)` - Successfully parsed an item
170 /// - `Err(JsonParserResult::NotEnded)` - Need more data from the source
171 /// - `Err(JsonParserResult::ParsingError)` - Failed to parse the JSON
172 fn next(&self, buffer: &[u8]) -> Result<I, JsonParserResult> {
173 while self.index.get() < buffer.len() - 1 && !self.is_end_seq(buffer) {
174 if self.is_new_object(buffer) {
175 self.start_new();
176 } else if self.is_new_seq(buffer) {
177 self.index_inc();
178 continue;
179 }
180
181 let current_char = self.get_current_char(buffer);
182
183 if current_char == b'{' {
184 self.level_inc();
185 } else if current_char == b'}' {
186 self.level_dec();
187 }
188
189 self.append_char(buffer);
190
191 self.index_inc();
192
193 if self.is_end_object(buffer) {
194 self.append_char(buffer);
195
196 let result = serde_json::from_slice(self.object.borrow_mut().as_slice());
197 debug!(
198 "object ok: {}",
199 std::str::from_utf8(self.object.borrow().as_slice()).unwrap()
200 );
201 return match result {
202 Ok(record) => Ok(record),
203 Err(error) => Err(JsonParserResult::ParsingError { error }),
204 };
205 }
206 }
207
208 self.append_char(buffer);
209 Err(JsonParserResult::NotEnded)
210 }
211}
212
213impl<I: DeserializeOwned, R: Read> ItemReader<I> for JsonItemReader<I, R> {
214 /// Reads the next item from the JSON stream
215 ///
216 /// This method reads data from the underlying input source in chunks,
217 /// processes the buffer to find the next complete JSON object, and
218 /// deserializes it into the target type.
219 ///
220 /// # Returns
221 /// - `Ok(Some(T))` - Successfully read and deserialized an item
222 /// - `Ok(None)` - End of input reached, no more items
223 /// - `Err(BatchError)` - Error during reading or parsing
224 fn read(&self) -> ItemReaderResult<I> {
225 let mut buf_reader = self.reader.borrow_mut();
226
227 loop {
228 let buffer = &mut buf_reader.fill_buf().unwrap();
229
230 let buffer_length = buffer.len();
231
232 if buffer_length == 0 {
233 return Ok(None);
234 }
235
236 let result: Result<I, JsonParserResult> = self.next(buffer);
237
238 if let Ok(record) = result {
239 return Ok(Some(record));
240 } else if let Err(error) = result {
241 match error {
242 JsonParserResult::NotEnded => {
243 self.clear_buff();
244 buf_reader.consume(self.capacity)
245 }
246 JsonParserResult::ParsingError { error } => {
247 return Err(BatchError::ItemReader(error.to_string()))
248 }
249 }
250 }
251 }
252 }
253}
254
255/// A builder for creating JSON item readers.
256///
257/// This builder provides a convenient way to configure and create a `JsonItemReader`
258/// with custom parameters like buffer capacity.
259///
260/// # Examples
261///
262/// Reading from a string:
263///
264/// ```
265/// use spring_batch_rs::item::json::JsonItemReaderBuilder;
266/// use spring_batch_rs::core::item::ItemReader;
267/// use serde::Deserialize;
268/// use std::io::Cursor;
269///
270/// #[derive(Debug, Deserialize)]
271/// struct Person {
272/// name: String,
273/// age: u32,
274/// occupation: String,
275/// }
276///
277/// // Sample JSON data
278/// let json = r#"[
279/// {"name": "JohnDoe", "age": 30, "occupation": "SoftwareEngineer"},
280/// {"name": "JaneSmith", "age": 28, "occupation": "DataScientist"}
281/// ]"#;
282///
283/// // Create a reader
284/// let cursor = Cursor::new(json);
285/// let reader = JsonItemReaderBuilder::<Person>::new()
286/// .capacity(4096) // Set a custom buffer capacity
287/// .from_reader(cursor);
288///
289/// // Read the items
290/// let person1 = reader.read().unwrap().unwrap();
291/// assert_eq!(person1.name, "JohnDoe");
292/// assert_eq!(person1.age, 30);
293///
294/// let person2 = reader.read().unwrap().unwrap();
295/// assert_eq!(person2.name, "JaneSmith");
296/// assert_eq!(person2.occupation, "DataScientist");
297/// ```
298///
299/// The builder can also be used to read from files or any other source that implements
300/// the `Read` trait.
301#[derive(Default)]
302pub struct JsonItemReaderBuilder<I> {
303 /// Phantom data to handle the generic type parameter T
304 _pd: PhantomData<I>,
305 /// Optional buffer capacity - defaults to 8KB if not specified
306 capacity: Option<usize>,
307}
308
309impl<I: DeserializeOwned> JsonItemReaderBuilder<I> {
310 /// Creates a new JSON item reader builder with default settings.
311 ///
312 /// The default buffer capacity is 8 KB (8192 bytes).
313 ///
314 /// # Examples
315 ///
316 /// ```
317 /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
318 /// use serde::Deserialize;
319 ///
320 /// #[derive(Deserialize)]
321 /// struct Record {
322 /// id: u32,
323 /// value: String,
324 /// }
325 ///
326 /// let builder = JsonItemReaderBuilder::<Record>::new();
327 /// ```
328 pub fn new() -> JsonItemReaderBuilder<I> {
329 Self {
330 _pd: PhantomData,
331 capacity: Some(8 * 1024),
332 }
333 }
334
335 /// Sets the buffer capacity for the JSON reader.
336 ///
337 /// A larger capacity can improve performance when reading large files,
338 /// but uses more memory.
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
344 /// use serde::Deserialize;
345 ///
346 /// #[derive(Deserialize)]
347 /// struct Record {
348 /// id: u32,
349 /// value: String,
350 /// }
351 ///
352 /// // Create a builder with a 16 KB buffer
353 /// let builder = JsonItemReaderBuilder::<Record>::new()
354 /// .capacity(16 * 1024);
355 /// ```
356 pub fn capacity(mut self, capacity: usize) -> JsonItemReaderBuilder<I> {
357 self.capacity = Some(capacity);
358 self
359 }
360
361 /// Creates a JSON item reader from any source that implements the `Read` trait.
362 ///
363 /// This allows reading from files, memory buffers, network connections, etc.
364 ///
365 /// # Examples
366 ///
367 /// ```
368 /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
369 /// use spring_batch_rs::core::item::ItemReader;
370 /// use serde::Deserialize;
371 /// use std::io::Cursor;
372 ///
373 /// #[derive(Debug, Deserialize)]
374 /// struct Order {
375 /// id: String,
376 /// customer: String,
377 /// total: f64,
378 /// }
379 ///
380 /// // Sample JSON data
381 /// let json = r#"[
382 /// {"id": "ORD-001", "customer": "JohnDoe", "total": 125.50},
383 /// {"id": "ORD-002", "customer": "JaneSmith", "total": 89.99}
384 /// ]"#;
385 ///
386 /// // Create a reader from a memory buffer
387 /// let cursor = Cursor::new(json);
388 /// let reader = JsonItemReaderBuilder::<Order>::new()
389 /// .from_reader(cursor);
390 ///
391 /// // Process the orders
392 /// let first_order = reader.read().unwrap().unwrap();
393 /// assert_eq!(first_order.id, "ORD-001");
394 /// assert_eq!(first_order.total, 125.50);
395 /// ```
396 pub fn from_reader<R: Read>(self, rdr: R) -> JsonItemReader<I, R> {
397 // Create a new JsonItemReader with the configured capacity
398 JsonItemReader::new(rdr, self.capacity.unwrap())
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use std::{error::Error, io::Cursor};
405
406 use crate::{
407 core::item::{ItemReader, ItemReaderResult},
408 item::{fake::person_reader::Person, json::json_reader::JsonItemReaderBuilder},
409 };
410
411 const PERSONS_JSON: &str = r#"[
412 {"first_name": "Océane", "last_name": "Dupond", "title": "Mr.", "email": "leopold_enim@orange.fr", "birth_date": "1963-05-16"},
413 {"first_name": "Amandine", "last_name": "Évrat", "title": "Mrs.", "email": "amandine_iure@outlook.fr", "birth_date": "1933-07-12"},
414 {"first_name": "Ugo", "last_name": "Niels", "title": "Sir.", "email": "xavier_voluptatem@sfr.fr", "birth_date": "1980-04-05"},
415 {"first_name": "Léo", "last_name": "Zola", "title": "Dr.", "email": "ugo_praesentium@orange.fr", "birth_date": "1914-08-13"}
416]"#;
417
418 #[test]
419 fn content_from_reader_should_be_deserialized() -> Result<(), Box<dyn Error>> {
420 let reader = JsonItemReaderBuilder::new()
421 .capacity(320)
422 .from_reader(Cursor::new(PERSONS_JSON));
423
424 let result: ItemReaderResult<Person> = reader.read();
425 assert!(result.is_ok());
426 assert_eq!(
427 "first_name:Océane, last_name:Dupond, birth_date:1963-05-16",
428 result.unwrap().unwrap().to_string()
429 );
430
431 let result: ItemReaderResult<Person> = reader.read();
432 assert!(result.is_ok());
433 assert_eq!(
434 "first_name:Amandine, last_name:Évrat, birth_date:1933-07-12",
435 result.unwrap().unwrap().to_string()
436 );
437
438 let result: ItemReaderResult<Person> = reader.read();
439 assert!(result.is_ok());
440 assert_eq!(
441 "first_name:Ugo, last_name:Niels, birth_date:1980-04-05",
442 result.unwrap().unwrap().to_string()
443 );
444
445 let result: ItemReaderResult<Person> = reader.read();
446 assert!(result.is_ok());
447 assert_eq!(
448 "first_name:Léo, last_name:Zola, birth_date:1914-08-13",
449 result.unwrap().unwrap().to_string()
450 );
451
452 let result: ItemReaderResult<Person> = reader.read();
453 assert!(result.is_ok());
454 assert!(result.unwrap().is_none());
455
456 Ok(())
457 }
458
459 #[test]
460 fn should_return_error_when_json_object_fails_to_deserialize() {
461 use crate::BatchError;
462 use serde::Deserialize;
463
464 #[derive(Debug, Deserialize)]
465 struct StrictItem {
466 #[allow(dead_code)]
467 id: u32,
468 }
469
470 // Object is syntactically valid JSON but missing required field `id`
471 let json = r#"[{"wrong_field": 42}]"#;
472 let reader = JsonItemReaderBuilder::<StrictItem>::new().from_reader(Cursor::new(json));
473
474 let result = reader.read();
475 assert!(
476 result.is_err(),
477 "should fail when JSON doesn't match target type"
478 );
479 match result {
480 Err(BatchError::ItemReader(_)) => {}
481 other => panic!("expected ItemReader error, got {other:?}"),
482 }
483 }
484
485 #[test]
486 fn should_handle_object_spanning_multiple_buffer_reads() {
487 use serde::Deserialize;
488
489 #[derive(Deserialize, PartialEq, Debug)]
490 struct Item {
491 id: u32,
492 name: String,
493 }
494
495 // Each object is ~25 bytes; capacity=10 forces NotEnded on every read
496 let json = r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#;
497 let reader = JsonItemReaderBuilder::<Item>::new()
498 .capacity(10)
499 .from_reader(Cursor::new(json));
500
501 let item1 = reader.read().unwrap().unwrap();
502 assert_eq!(item1.id, 1);
503 assert_eq!(item1.name, "Alice");
504
505 let item2 = reader.read().unwrap().unwrap();
506 assert_eq!(item2.id, 2);
507 assert_eq!(item2.name, "Bob");
508
509 assert!(reader.read().unwrap().is_none());
510 }
511
512 /// Tests reading from non-JSON input
513 ///
514 /// This test verifies that the reader gracefully handles input data
515 /// that isn't valid JSON without crashing.
516 #[test]
517 fn content_from_bytes_should_be_deserialized() -> Result<(), Box<dyn Error>> {
518 let input = Cursor::new(String::from("foo\nbar\nbaz\n"));
519
520 let reader = JsonItemReaderBuilder::new()
521 .capacity(320)
522 .from_reader(input);
523
524 let result: ItemReaderResult<Person> = reader.read();
525
526 assert!(result.is_ok());
527 assert!(result.unwrap().is_none()); // Non-JSON input yields no items
528
529 Ok(())
530 }
531}