jetro_core/io/
ndjson_driver.rs1use super::ndjson::{non_ws_range, strip_initial_bom, trim_line_ending, NdjsonOptions};
2use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
3use super::RowError;
4use memchr::memchr;
5use std::io::BufRead;
6
7pub struct NdjsonPerRowDriver<R> {
9 pub(super) reader: R,
10 pub(super) line_no: u64,
11 pub(super) max_line_len: usize,
12 pub(super) row_frame: NdjsonRowFrame,
13}
14
15impl<R: BufRead> NdjsonPerRowDriver<R> {
16 pub fn new(reader: R) -> Self {
17 Self {
18 reader,
19 line_no: 0,
20 max_line_len: super::ndjson::DEFAULT_MAX_LINE_LEN,
21 row_frame: NdjsonRowFrame::JsonLine,
22 }
23 }
24
25 pub fn with_options(mut self, options: NdjsonOptions) -> Self {
26 self.max_line_len = options.max_line_len;
27 self.row_frame = options.row_frame;
28 self
29 }
30
31 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
32 self.max_line_len = max_line_len;
33 self
34 }
35
36 pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
37 self.row_frame = row_frame;
38 self
39 }
40
41 pub fn line_no(&self) -> u64 {
42 self.line_no
43 }
44
45 pub fn read_next_nonempty<'a>(
46 &mut self,
47 buf: &'a mut Vec<u8>,
48 ) -> Result<Option<(u64, &'a [u8])>, RowError> {
49 loop {
50 buf.clear();
51 let read = self.read_physical_line(buf)?;
52 if read == 0 {
53 return Ok(None);
54 }
55 self.line_no += 1;
56
57 strip_initial_bom(self.line_no, buf);
58 trim_line_ending(buf);
59
60 let (start, end) = non_ws_range(buf);
61 if start == end {
62 continue;
63 }
64
65 let len = end - start;
66 if len > self.max_line_len {
67 return Err(RowError::LineTooLarge {
68 line_no: self.line_no,
69 len,
70 max: self.max_line_len,
71 });
72 }
73
74 match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
75 FramePayload::Data(range) => {
76 return Ok(Some((
77 self.line_no,
78 &buf[start + range.start..start + range.end],
79 )));
80 }
81 FramePayload::Skip => continue,
82 }
83 }
84 }
85
86 pub fn read_next_owned(
87 &mut self,
88 buf: &mut Vec<u8>,
89 ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
90 loop {
91 buf.clear();
92 let read = self.read_physical_line(buf)?;
93 if read == 0 {
94 return Ok(None);
95 }
96 self.line_no += 1;
97
98 strip_initial_bom(self.line_no, buf);
99 trim_line_ending(buf);
100
101 let (start, end) = non_ws_range(buf);
102 if start == end {
103 continue;
104 }
105
106 let len = end - start;
107 if len > self.max_line_len {
108 return Err(RowError::LineTooLarge {
109 line_no: self.line_no,
110 len,
111 max: self.max_line_len,
112 });
113 }
114
115 let payload = match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
116 FramePayload::Data(range) => start + range.start..start + range.end,
117 FramePayload::Skip => continue,
118 };
119 if payload.start > 0 || payload.end < buf.len() {
120 buf.copy_within(payload.clone(), 0);
121 buf.truncate(payload.end - payload.start);
122 }
123
124 let capacity = buf.capacity();
125 return Ok(Some((
126 self.line_no,
127 std::mem::replace(buf, Vec::with_capacity(capacity)),
128 )));
129 }
130 }
131
132 pub(super) fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
133 loop {
134 let available = self.reader.fill_buf()?;
135 if available.is_empty() {
136 return Ok(buf.len());
137 }
138
139 if let Some(pos) = memchr(b'\n', available) {
140 buf.extend_from_slice(&available[..=pos]);
141 self.reader.consume(pos + 1);
142 self.check_physical_line_len(buf.len())?;
143 return Ok(buf.len());
144 }
145
146 let len = available.len();
147 buf.extend_from_slice(available);
148 self.reader.consume(len);
149 self.check_physical_line_len(buf.len())?;
150 }
151 }
152
153 fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
154 let hard_max = self.max_line_len.saturating_add(2);
155 if len > hard_max {
156 return Err(RowError::LineTooLarge {
157 line_no: self.line_no + 1,
158 len,
159 max: self.max_line_len,
160 });
161 }
162 Ok(())
163 }
164}