Skip to main content

jetro_core/io/
ndjson_frame.rs

1use super::RowError;
2use memchr::memchr;
3use std::ops::Range;
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6pub enum NdjsonRowFrame {
7    JsonLine,
8    DelimitedPayload {
9        separator: u8,
10        null_payload: NullPayload,
11    },
12}
13
14impl Default for NdjsonRowFrame {
15    fn default() -> Self {
16        Self::JsonLine
17    }
18}
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub enum NullPayload {
22    Skip,
23    Keep,
24    Error,
25}
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub(super) enum FramePayload {
29    Data(Range<usize>),
30    Skip,
31}
32
33#[inline]
34pub(super) fn frame_payload(
35    frame: NdjsonRowFrame,
36    line_no: u64,
37    row: &[u8],
38) -> Result<FramePayload, RowError> {
39    let range = match frame {
40        NdjsonRowFrame::JsonLine => 0..row.len(),
41        NdjsonRowFrame::DelimitedPayload { separator, .. } => {
42            let Some(sep) = memchr(separator, row) else {
43                return Ok(FramePayload::Skip);
44            };
45            sep + 1..row.len()
46        }
47    };
48    let range = trim_range(row, range);
49    if range.is_empty() {
50        return match frame {
51            NdjsonRowFrame::JsonLine => Err(RowError::EmptyPayload { line_no }),
52            NdjsonRowFrame::DelimitedPayload { .. } => Ok(FramePayload::Skip),
53        };
54    }
55
56    if let NdjsonRowFrame::DelimitedPayload { null_payload, .. } = frame {
57        if &row[range.clone()] == b"null" {
58            return match null_payload {
59                NullPayload::Skip => Ok(FramePayload::Skip),
60                NullPayload::Keep => Ok(FramePayload::Data(range)),
61                NullPayload::Error => Err(RowError::NullPayload { line_no }),
62            };
63        }
64        if !payload_starts_like_json(&row[range.clone()]) {
65            return Ok(FramePayload::Skip);
66        }
67    }
68
69    Ok(FramePayload::Data(range))
70}
71
72#[inline]
73fn payload_starts_like_json(payload: &[u8]) -> bool {
74    matches!(
75        payload[0],
76        b'{' | b'[' | b'"' | b't' | b'f' | b'-' | b'0'..=b'9'
77    )
78}
79
80#[inline]
81fn trim_range(row: &[u8], range: Range<usize>) -> Range<usize> {
82    let mut start = range.start;
83    let mut end = range.end;
84    while start < end && row[start].is_ascii_whitespace() {
85        start += 1;
86    }
87    while end > start && row[end - 1].is_ascii_whitespace() {
88        end -= 1;
89    }
90    start..end
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn delimited_payload_skips_null() {
99        let frame = NdjsonRowFrame::DelimitedPayload {
100            separator: b'|',
101            null_payload: NullPayload::Skip,
102        };
103
104        assert_eq!(
105            frame_payload(frame, 1, b"k|null").unwrap(),
106            FramePayload::Skip
107        );
108        assert_eq!(
109            frame_payload(frame, 2, br#"k| {"id":1} "#).unwrap(),
110            FramePayload::Data(3..11)
111        );
112        assert_eq!(frame_payload(frame, 3, b"k|").unwrap(), FramePayload::Skip);
113        assert_eq!(
114            frame_payload(frame, 4, b"no-separator").unwrap(),
115            FramePayload::Skip
116        );
117        assert_eq!(
118            frame_payload(frame, 5, b"k|not-json").unwrap(),
119            FramePayload::Skip
120        );
121    }
122}