jetro_core/io/
ndjson_frame.rs1use super::RowError;
2use memchr::memchr;
3use std::ops::Range;
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6pub enum NdjsonRowFrame {
7 JsonLine,
8 DelimitedPayload {
9 separator: u8,
10 null_payload: NullPayload,
11 },
12}
13
14impl Default for NdjsonRowFrame {
15 fn default() -> Self {
16 Self::JsonLine
17 }
18}
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub enum NullPayload {
22 Skip,
23 Keep,
24 Error,
25}
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub(super) enum FramePayload {
29 Data(Range<usize>),
30 Skip,
31}
32
33#[inline]
34pub(super) fn frame_payload(
35 frame: NdjsonRowFrame,
36 line_no: u64,
37 row: &[u8],
38) -> Result<FramePayload, RowError> {
39 let range = match frame {
40 NdjsonRowFrame::JsonLine => 0..row.len(),
41 NdjsonRowFrame::DelimitedPayload { separator, .. } => {
42 let Some(sep) = memchr(separator, row) else {
43 return Ok(FramePayload::Skip);
44 };
45 sep + 1..row.len()
46 }
47 };
48 let range = trim_range(row, range);
49 if range.is_empty() {
50 return match frame {
51 NdjsonRowFrame::JsonLine => Err(RowError::EmptyPayload { line_no }),
52 NdjsonRowFrame::DelimitedPayload { .. } => Ok(FramePayload::Skip),
53 };
54 }
55
56 if let NdjsonRowFrame::DelimitedPayload { null_payload, .. } = frame {
57 if &row[range.clone()] == b"null" {
58 return match null_payload {
59 NullPayload::Skip => Ok(FramePayload::Skip),
60 NullPayload::Keep => Ok(FramePayload::Data(range)),
61 NullPayload::Error => Err(RowError::NullPayload { line_no }),
62 };
63 }
64 if !payload_starts_like_json(&row[range.clone()]) {
65 return Ok(FramePayload::Skip);
66 }
67 }
68
69 Ok(FramePayload::Data(range))
70}
71
72#[inline]
73fn payload_starts_like_json(payload: &[u8]) -> bool {
74 matches!(
75 payload[0],
76 b'{' | b'[' | b'"' | b't' | b'f' | b'-' | b'0'..=b'9'
77 )
78}
79
80#[inline]
81fn trim_range(row: &[u8], range: Range<usize>) -> Range<usize> {
82 let mut start = range.start;
83 let mut end = range.end;
84 while start < end && row[start].is_ascii_whitespace() {
85 start += 1;
86 }
87 while end > start && row[end - 1].is_ascii_whitespace() {
88 end -= 1;
89 }
90 start..end
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[test]
98 fn delimited_payload_skips_null() {
99 let frame = NdjsonRowFrame::DelimitedPayload {
100 separator: b'|',
101 null_payload: NullPayload::Skip,
102 };
103
104 assert_eq!(
105 frame_payload(frame, 1, b"k|null").unwrap(),
106 FramePayload::Skip
107 );
108 assert_eq!(
109 frame_payload(frame, 2, br#"k| {"id":1} "#).unwrap(),
110 FramePayload::Data(3..11)
111 );
112 assert_eq!(frame_payload(frame, 3, b"k|").unwrap(), FramePayload::Skip);
113 assert_eq!(
114 frame_payload(frame, 4, b"no-separator").unwrap(),
115 FramePayload::Skip
116 );
117 assert_eq!(
118 frame_payload(frame, 5, b"k|not-json").unwrap(),
119 FramePayload::Skip
120 );
121 }
122}