Skip to main content

spring_batch_rs/item/json/
json_reader.rs

1use std::{
2    cell::{Cell, RefCell},
3    io::{BufRead, BufReader, Read},
4    marker::PhantomData,
5};
6
7use log::debug;
8use serde::de::DeserializeOwned;
9
10use crate::{
11    core::item::{ItemReader, ItemReaderResult},
12    BatchError,
13};
14
15/// Internal structure to represent the parsing state result
16#[derive(Debug)]
17enum JsonParserResult {
18    /// Indicates that the parser has not yet reached the end of the JSON array
19    NotEnded,
20    /// Indicates a parsing error occurred with the specific serde_json error
21    ParsingError { error: serde_json::Error },
22}
23
24/// A reader that reads items from a JSON source.
25///
26/// The reader expects JSON data in an array format, where each object in the array
27/// represents a single item to be processed. It implements a streaming approach
28/// that allows reading large JSON files without loading the entire file into memory.
29///
30/// # Examples
31///
32/// ```
33/// use spring_batch_rs::item::json::JsonItemReaderBuilder;
34/// use spring_batch_rs::core::item::ItemReader;
35/// use serde::Deserialize;
36/// use std::io::Cursor;
37///
38/// // Define a structure matching our JSON format
39/// #[derive(Debug, Deserialize, PartialEq)]
40/// struct Product {
41///     id: u32,
42///     name: String,
43///     price: f64,
44/// }
45///
46/// // Create some JSON data with products
47/// let json_data = r#"[
48///   {"id": 1, "name": "Keyboard", "price": 49.99},
49///   {"id": 2, "name": "Mouse", "price": 29.99},
50///   {"id": 3, "name": "Monitor", "price": 199.99}
51/// ]"#;
52///
53/// // Create a reader using the builder
54/// let cursor = Cursor::new(json_data);
55/// let reader = JsonItemReaderBuilder::<Product>::new()
56///     .from_reader(cursor);
57///
58/// // Read all products
59/// let product1 = reader.read().unwrap().unwrap();
60/// assert_eq!(product1.id, 1);
61/// assert_eq!(product1.name, "Keyboard");
62/// assert_eq!(product1.price, 49.99);
63///
64/// let product2 = reader.read().unwrap().unwrap();
65/// assert_eq!(product2.id, 2);
66///
67/// let product3 = reader.read().unwrap().unwrap();
68/// assert_eq!(product3.id, 3);
69///
70/// // No more products
71/// assert!(reader.read().unwrap().is_none());
72/// ```
73pub struct JsonItemReader<I, R: Read> {
74    /// Phantom data to handle the generic type parameter T (item type)
75    pd: PhantomData<I>,
76    /// Buffered reader for the input source
77    reader: RefCell<BufReader<R>>,
78    /// Buffer capacity in bytes
79    capacity: usize,
80    /// Current nesting level while parsing JSON
81    level: Cell<u16>,
82    /// Current position within the buffer
83    index: Cell<usize>,
84    /// Buffer for the current JSON object being parsed
85    object: RefCell<Vec<u8>>,
86}
87
88impl<I: DeserializeOwned, R: Read> JsonItemReader<I, R> {
89    /// Creates a new JSON item reader with the specified input source and buffer capacity.
90    fn new(rdr: R, capacity: usize) -> Self {
91        let buf_reader = BufReader::with_capacity(capacity, rdr);
92
93        Self {
94            pd: PhantomData,
95            reader: RefCell::new(buf_reader),
96            capacity,
97            level: Cell::new(0),
98            index: Cell::new(0),
99            object: RefCell::new(Vec::new()),
100        }
101    }
102
103    /// Gets the character at the current index in the buffer
104    fn get_current_char(&self, buffer: &[u8]) -> u8 {
105        buffer[self.index.get()]
106    }
107
108    /// Checks if the current character is the beginning of a new JSON array
109    fn is_new_seq(&self, buffer: &[u8]) -> bool {
110        self.level == 0.into() && self.get_current_char(buffer) == b'['
111    }
112
113    /// Checks if the current character is the end of a JSON array
114    fn is_end_seq(&self, buffer: &[u8]) -> bool {
115        self.level == 0.into() && self.get_current_char(buffer) == b']'
116    }
117
118    /// Checks if the current character is the beginning of a new JSON object
119    fn is_new_object(&self, buffer: &[u8]) -> bool {
120        self.level == 0.into() && self.get_current_char(buffer) == b'{'
121    }
122
123    /// Checks if the current character is the end of a JSON object at level 1
124    /// (an object directly inside the main array)
125    fn is_end_object(&self, buffer: &[u8]) -> bool {
126        self.level == 1.into() && self.get_current_char(buffer) == b'}'
127    }
128
129    /// Clears the object buffer to start parsing a new object
130    fn start_new(&self) {
131        self.object.borrow_mut().clear();
132    }
133
134    /// Adds the current character to the object buffer, ignoring whitespace
135    fn append_char(&self, buffer: &[u8]) {
136        let current_char = self.get_current_char(buffer);
137        if current_char != b' ' && current_char != b'\n' {
138            self.object.borrow_mut().push(self.get_current_char(buffer));
139        }
140    }
141
142    /// Resets the index to the beginning of the buffer
143    fn clear_buff(&self) {
144        self.index.set(0);
145    }
146
147    /// Increments the nesting level when entering a new object or array
148    fn level_inc(&self) {
149        self.level.set(self.level.get() + 1);
150    }
151
152    /// Decrements the nesting level when exiting an object or array
153    fn level_dec(&self) {
154        self.level.set(self.level.get() - 1);
155    }
156
157    /// Moves to the next character in the buffer
158    fn index_inc(&self) {
159        self.index.set(self.index.get() + 1);
160    }
161
162    /// Attempts to read the next item from the current buffer
163    ///
164    /// This method parses the JSON buffer character by character, keeping track of nesting levels,
165    /// and tries to extract a complete JSON object. When it finds a complete object at level 1,
166    /// it deserializes it into the target type T.
167    ///
168    /// # Returns
169    /// - `Ok(T)` - Successfully parsed an item
170    /// - `Err(JsonParserResult::NotEnded)` - Need more data from the source
171    /// - `Err(JsonParserResult::ParsingError)` - Failed to parse the JSON
172    fn next(&self, buffer: &[u8]) -> Result<I, JsonParserResult> {
173        while self.index.get() < buffer.len() - 1 && !self.is_end_seq(buffer) {
174            if self.is_new_object(buffer) {
175                self.start_new();
176            } else if self.is_new_seq(buffer) {
177                self.index_inc();
178                continue;
179            }
180
181            let current_char = self.get_current_char(buffer);
182
183            if current_char == b'{' {
184                self.level_inc();
185            } else if current_char == b'}' {
186                self.level_dec();
187            }
188
189            self.append_char(buffer);
190
191            self.index_inc();
192
193            if self.is_end_object(buffer) {
194                self.append_char(buffer);
195
196                let result = serde_json::from_slice(self.object.borrow_mut().as_slice());
197                debug!(
198                    "object ok: {}",
199                    std::str::from_utf8(self.object.borrow().as_slice()).unwrap()
200                );
201                return match result {
202                    Ok(record) => Ok(record),
203                    Err(error) => Err(JsonParserResult::ParsingError { error }),
204                };
205            }
206        }
207
208        self.append_char(buffer);
209        Err(JsonParserResult::NotEnded)
210    }
211}
212
213impl<I: DeserializeOwned, R: Read> ItemReader<I> for JsonItemReader<I, R> {
214    /// Reads the next item from the JSON stream
215    ///
216    /// This method reads data from the underlying input source in chunks,
217    /// processes the buffer to find the next complete JSON object, and
218    /// deserializes it into the target type.
219    ///
220    /// # Returns
221    /// - `Ok(Some(T))` - Successfully read and deserialized an item
222    /// - `Ok(None)` - End of input reached, no more items
223    /// - `Err(BatchError)` - Error during reading or parsing
224    fn read(&self) -> ItemReaderResult<I> {
225        let mut buf_reader = self.reader.borrow_mut();
226
227        loop {
228            let buffer = &mut buf_reader.fill_buf().unwrap();
229
230            let buffer_length = buffer.len();
231
232            if buffer_length == 0 {
233                return Ok(None);
234            }
235
236            let result: Result<I, JsonParserResult> = self.next(buffer);
237
238            if let Ok(record) = result {
239                return Ok(Some(record));
240            } else if let Err(error) = result {
241                match error {
242                    JsonParserResult::NotEnded => {
243                        self.clear_buff();
244                        buf_reader.consume(self.capacity)
245                    }
246                    JsonParserResult::ParsingError { error } => {
247                        return Err(BatchError::ItemReader(error.to_string()))
248                    }
249                }
250            }
251        }
252    }
253}
254
255/// A builder for creating JSON item readers.
256///
257/// This builder provides a convenient way to configure and create a `JsonItemReader`
258/// with custom parameters like buffer capacity.
259///
260/// # Examples
261///
262/// Reading from a string:
263///
264/// ```
265/// use spring_batch_rs::item::json::JsonItemReaderBuilder;
266/// use spring_batch_rs::core::item::ItemReader;
267/// use serde::Deserialize;
268/// use std::io::Cursor;
269///
270/// #[derive(Debug, Deserialize)]
271/// struct Person {
272///     name: String,
273///     age: u32,
274///     occupation: String,
275/// }
276///
277/// // Sample JSON data
278/// let json = r#"[
279///   {"name": "JohnDoe", "age": 30, "occupation": "SoftwareEngineer"},
280///   {"name": "JaneSmith", "age": 28, "occupation": "DataScientist"}
281/// ]"#;
282///
283/// // Create a reader
284/// let cursor = Cursor::new(json);
285/// let reader = JsonItemReaderBuilder::<Person>::new()
286///     .capacity(4096)  // Set a custom buffer capacity
287///     .from_reader(cursor);
288///
289/// // Read the items
290/// let person1 = reader.read().unwrap().unwrap();
291/// assert_eq!(person1.name, "JohnDoe");
292/// assert_eq!(person1.age, 30);
293///
294/// let person2 = reader.read().unwrap().unwrap();
295/// assert_eq!(person2.name, "JaneSmith");
296/// assert_eq!(person2.occupation, "DataScientist");
297/// ```
298///
299/// The builder can also be used to read from files or any other source that implements
300/// the `Read` trait.
301#[derive(Default)]
302pub struct JsonItemReaderBuilder<I> {
303    /// Phantom data to handle the generic type parameter T
304    _pd: PhantomData<I>,
305    /// Optional buffer capacity - defaults to 8KB if not specified
306    capacity: Option<usize>,
307}
308
309impl<I: DeserializeOwned> JsonItemReaderBuilder<I> {
310    /// Creates a new JSON item reader builder with default settings.
311    ///
312    /// The default buffer capacity is 8 KB (8192 bytes).
313    ///
314    /// # Examples
315    ///
316    /// ```
317    /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
318    /// use serde::Deserialize;
319    ///
320    /// #[derive(Deserialize)]
321    /// struct Record {
322    ///     id: u32,
323    ///     value: String,
324    /// }
325    ///
326    /// let builder = JsonItemReaderBuilder::<Record>::new();
327    /// ```
328    pub fn new() -> JsonItemReaderBuilder<I> {
329        Self {
330            _pd: PhantomData,
331            capacity: Some(8 * 1024),
332        }
333    }
334
335    /// Sets the buffer capacity for the JSON reader.
336    ///
337    /// A larger capacity can improve performance when reading large files,
338    /// but uses more memory.
339    ///
340    /// # Examples
341    ///
342    /// ```
343    /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
344    /// use serde::Deserialize;
345    ///
346    /// #[derive(Deserialize)]
347    /// struct Record {
348    ///     id: u32,
349    ///     value: String,
350    /// }
351    ///
352    /// // Create a builder with a 16 KB buffer
353    /// let builder = JsonItemReaderBuilder::<Record>::new()
354    ///     .capacity(16 * 1024);
355    /// ```
356    pub fn capacity(mut self, capacity: usize) -> JsonItemReaderBuilder<I> {
357        self.capacity = Some(capacity);
358        self
359    }
360
361    /// Creates a JSON item reader from any source that implements the `Read` trait.
362    ///
363    /// This allows reading from files, memory buffers, network connections, etc.
364    ///
365    /// # Examples
366    ///
367    /// ```
368    /// use spring_batch_rs::item::json::JsonItemReaderBuilder;
369    /// use spring_batch_rs::core::item::ItemReader;
370    /// use serde::Deserialize;
371    /// use std::io::Cursor;
372    ///
373    /// #[derive(Debug, Deserialize)]
374    /// struct Order {
375    ///     id: String,
376    ///     customer: String,
377    ///     total: f64,
378    /// }
379    ///
380    /// // Sample JSON data
381    /// let json = r#"[
382    ///   {"id": "ORD-001", "customer": "JohnDoe", "total": 125.50},
383    ///   {"id": "ORD-002", "customer": "JaneSmith", "total": 89.99}
384    /// ]"#;
385    ///
386    /// // Create a reader from a memory buffer
387    /// let cursor = Cursor::new(json);
388    /// let reader = JsonItemReaderBuilder::<Order>::new()
389    ///     .from_reader(cursor);
390    ///
391    /// // Process the orders
392    /// let first_order = reader.read().unwrap().unwrap();
393    /// assert_eq!(first_order.id, "ORD-001");
394    /// assert_eq!(first_order.total, 125.50);
395    /// ```
396    pub fn from_reader<R: Read>(self, rdr: R) -> JsonItemReader<I, R> {
397        // Create a new JsonItemReader with the configured capacity
398        JsonItemReader::new(rdr, self.capacity.unwrap())
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use std::{error::Error, io::Cursor};
405
406    use crate::{
407        core::item::{ItemReader, ItemReaderResult},
408        item::{fake::person_reader::Person, json::json_reader::JsonItemReaderBuilder},
409    };
410
411    const PERSONS_JSON: &str = r#"[
412  {"first_name": "Océane", "last_name": "Dupond", "title": "Mr.", "email": "leopold_enim@orange.fr", "birth_date": "1963-05-16"},
413  {"first_name": "Amandine", "last_name": "Évrat", "title": "Mrs.", "email": "amandine_iure@outlook.fr", "birth_date": "1933-07-12"},
414  {"first_name": "Ugo", "last_name": "Niels", "title": "Sir.", "email": "xavier_voluptatem@sfr.fr", "birth_date": "1980-04-05"},
415  {"first_name": "Léo", "last_name": "Zola", "title": "Dr.", "email": "ugo_praesentium@orange.fr", "birth_date": "1914-08-13"}
416]"#;
417
418    #[test]
419    fn content_from_reader_should_be_deserialized() -> Result<(), Box<dyn Error>> {
420        let reader = JsonItemReaderBuilder::new()
421            .capacity(320)
422            .from_reader(Cursor::new(PERSONS_JSON));
423
424        let result: ItemReaderResult<Person> = reader.read();
425        assert!(result.is_ok());
426        assert_eq!(
427            "first_name:Océane, last_name:Dupond, birth_date:1963-05-16",
428            result.unwrap().unwrap().to_string()
429        );
430
431        let result: ItemReaderResult<Person> = reader.read();
432        assert!(result.is_ok());
433        assert_eq!(
434            "first_name:Amandine, last_name:Évrat, birth_date:1933-07-12",
435            result.unwrap().unwrap().to_string()
436        );
437
438        let result: ItemReaderResult<Person> = reader.read();
439        assert!(result.is_ok());
440        assert_eq!(
441            "first_name:Ugo, last_name:Niels, birth_date:1980-04-05",
442            result.unwrap().unwrap().to_string()
443        );
444
445        let result: ItemReaderResult<Person> = reader.read();
446        assert!(result.is_ok());
447        assert_eq!(
448            "first_name:Léo, last_name:Zola, birth_date:1914-08-13",
449            result.unwrap().unwrap().to_string()
450        );
451
452        let result: ItemReaderResult<Person> = reader.read();
453        assert!(result.is_ok());
454        assert!(result.unwrap().is_none());
455
456        Ok(())
457    }
458
459    #[test]
460    fn should_return_error_when_json_object_fails_to_deserialize() {
461        use crate::BatchError;
462        use serde::Deserialize;
463
464        #[derive(Debug, Deserialize)]
465        struct StrictItem {
466            #[allow(dead_code)]
467            id: u32,
468        }
469
470        // Object is syntactically valid JSON but missing required field `id`
471        let json = r#"[{"wrong_field": 42}]"#;
472        let reader = JsonItemReaderBuilder::<StrictItem>::new().from_reader(Cursor::new(json));
473
474        let result = reader.read();
475        assert!(
476            result.is_err(),
477            "should fail when JSON doesn't match target type"
478        );
479        match result {
480            Err(BatchError::ItemReader(_)) => {}
481            other => panic!("expected ItemReader error, got {other:?}"),
482        }
483    }
484
485    #[test]
486    fn should_handle_object_spanning_multiple_buffer_reads() {
487        use serde::Deserialize;
488
489        #[derive(Deserialize, PartialEq, Debug)]
490        struct Item {
491            id: u32,
492            name: String,
493        }
494
495        // Each object is ~25 bytes; capacity=10 forces NotEnded on every read
496        let json = r#"[{"id":1,"name":"Alice"},{"id":2,"name":"Bob"}]"#;
497        let reader = JsonItemReaderBuilder::<Item>::new()
498            .capacity(10)
499            .from_reader(Cursor::new(json));
500
501        let item1 = reader.read().unwrap().unwrap();
502        assert_eq!(item1.id, 1);
503        assert_eq!(item1.name, "Alice");
504
505        let item2 = reader.read().unwrap().unwrap();
506        assert_eq!(item2.id, 2);
507        assert_eq!(item2.name, "Bob");
508
509        assert!(reader.read().unwrap().is_none());
510    }
511
512    /// Tests reading from non-JSON input
513    ///
514    /// This test verifies that the reader gracefully handles input data
515    /// that isn't valid JSON without crashing.
516    #[test]
517    fn content_from_bytes_should_be_deserialized() -> Result<(), Box<dyn Error>> {
518        let input = Cursor::new(String::from("foo\nbar\nbaz\n"));
519
520        let reader = JsonItemReaderBuilder::new()
521            .capacity(320)
522            .from_reader(input);
523
524        let result: ItemReaderResult<Person> = reader.read();
525
526        assert!(result.is_ok());
527        assert!(result.unwrap().is_none()); // Non-JSON input yields no items
528
529        Ok(())
530    }
531}