Skip to main content

jetro_core/io/
ndjson_driver.rs

1use super::ndjson::{non_ws_range, strip_initial_bom, trim_line_ending, NdjsonOptions};
2use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
3use super::RowError;
4use memchr::memchr;
5use std::io::BufRead;
6
7/// Forward-only per-row NDJSON reader.
8pub struct NdjsonPerRowDriver<R> {
9    pub(super) reader: R,
10    pub(super) line_no: u64,
11    pub(super) max_line_len: usize,
12    pub(super) row_frame: NdjsonRowFrame,
13}
14
15impl<R: BufRead> NdjsonPerRowDriver<R> {
16    pub fn new(reader: R) -> Self {
17        Self {
18            reader,
19            line_no: 0,
20            max_line_len: super::ndjson::DEFAULT_MAX_LINE_LEN,
21            row_frame: NdjsonRowFrame::JsonLine,
22        }
23    }
24
25    pub fn with_options(mut self, options: NdjsonOptions) -> Self {
26        self.max_line_len = options.max_line_len;
27        self.row_frame = options.row_frame;
28        self
29    }
30
31    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
32        self.max_line_len = max_line_len;
33        self
34    }
35
36    pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
37        self.row_frame = row_frame;
38        self
39    }
40
41    pub fn line_no(&self) -> u64 {
42        self.line_no
43    }
44
45    pub fn read_next_nonempty<'a>(
46        &mut self,
47        buf: &'a mut Vec<u8>,
48    ) -> Result<Option<(u64, &'a [u8])>, RowError> {
49        loop {
50            buf.clear();
51            let read = self.read_physical_line(buf)?;
52            if read == 0 {
53                return Ok(None);
54            }
55            self.line_no += 1;
56
57            strip_initial_bom(self.line_no, buf);
58            trim_line_ending(buf);
59
60            let (start, end) = non_ws_range(buf);
61            if start == end {
62                continue;
63            }
64
65            let len = end - start;
66            if len > self.max_line_len {
67                return Err(RowError::LineTooLarge {
68                    line_no: self.line_no,
69                    len,
70                    max: self.max_line_len,
71                });
72            }
73
74            match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
75                FramePayload::Data(range) => {
76                    return Ok(Some((
77                        self.line_no,
78                        &buf[start + range.start..start + range.end],
79                    )));
80                }
81                FramePayload::Skip => continue,
82            }
83        }
84    }
85
86    pub fn read_next_owned(
87        &mut self,
88        buf: &mut Vec<u8>,
89    ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
90        loop {
91            buf.clear();
92            let read = self.read_physical_line(buf)?;
93            if read == 0 {
94                return Ok(None);
95            }
96            self.line_no += 1;
97
98            strip_initial_bom(self.line_no, buf);
99            trim_line_ending(buf);
100
101            let (start, end) = non_ws_range(buf);
102            if start == end {
103                continue;
104            }
105
106            let len = end - start;
107            if len > self.max_line_len {
108                return Err(RowError::LineTooLarge {
109                    line_no: self.line_no,
110                    len,
111                    max: self.max_line_len,
112                });
113            }
114
115            let payload = match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
116                FramePayload::Data(range) => start + range.start..start + range.end,
117                FramePayload::Skip => continue,
118            };
119            if payload.start > 0 || payload.end < buf.len() {
120                buf.copy_within(payload.clone(), 0);
121                buf.truncate(payload.end - payload.start);
122            }
123
124            let capacity = buf.capacity();
125            return Ok(Some((
126                self.line_no,
127                std::mem::replace(buf, Vec::with_capacity(capacity)),
128            )));
129        }
130    }
131
132    pub(super) fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
133        loop {
134            let available = self.reader.fill_buf()?;
135            if available.is_empty() {
136                return Ok(buf.len());
137            }
138
139            if let Some(pos) = memchr(b'\n', available) {
140                buf.extend_from_slice(&available[..=pos]);
141                self.reader.consume(pos + 1);
142                self.check_physical_line_len(buf.len())?;
143                return Ok(buf.len());
144            }
145
146            let len = available.len();
147            buf.extend_from_slice(available);
148            self.reader.consume(len);
149            self.check_physical_line_len(buf.len())?;
150        }
151    }
152
153    fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
154        let hard_max = self.max_line_len.saturating_add(2);
155        if len > hard_max {
156            return Err(RowError::LineTooLarge {
157                line_no: self.line_no + 1,
158                len,
159                max: self.max_line_len,
160            });
161        }
162        Ok(())
163    }
164}