Skip to main content

jetro_core/io/
mod.rs

1//! Streaming input/output adapters for query execution.
2//!
3//! NDJSON adapters evaluate each non-empty row independently while reusing the
4//! caller's [`crate::JetroEngine`] plan, VM, and tape execution caches.
5
6mod ndjson;
7#[cfg(feature = "simd-json")]
8mod ndjson_byte;
9#[cfg(feature = "simd-json")]
10mod ndjson_direct;
11mod ndjson_rev;
12mod source;
13
14pub use ndjson::{
15    collect_ndjson, collect_ndjson_file, collect_ndjson_file_with_options, collect_ndjson_matches,
16    collect_ndjson_matches_file, collect_ndjson_matches_file_with_options,
17    collect_ndjson_matches_source, collect_ndjson_matches_source_with_options,
18    collect_ndjson_matches_with_options, collect_ndjson_source, collect_ndjson_source_with_options,
19    collect_ndjson_with_options, for_each_ndjson, for_each_ndjson_source,
20    for_each_ndjson_source_until, for_each_ndjson_source_until_with_options,
21    for_each_ndjson_source_with_options, for_each_ndjson_until, for_each_ndjson_until_with_options,
22    for_each_ndjson_with_options, run_ndjson, run_ndjson_file, run_ndjson_file_limit,
23    run_ndjson_file_limit_with_options, run_ndjson_file_with_options, run_ndjson_limit,
24    run_ndjson_limit_with_options, run_ndjson_matches, run_ndjson_matches_file,
25    run_ndjson_matches_file_with_options, run_ndjson_matches_source,
26    run_ndjson_matches_source_with_options, run_ndjson_matches_with_options, run_ndjson_source,
27    run_ndjson_source_limit, run_ndjson_source_limit_with_options, run_ndjson_source_with_options,
28    run_ndjson_with_options, NdjsonControl, NdjsonOptions, NdjsonPerRowDriver,
29};
30pub use ndjson_rev::{
31    collect_ndjson_rev, collect_ndjson_rev_matches, collect_ndjson_rev_matches_with_options,
32    collect_ndjson_rev_with_options, for_each_ndjson_rev, for_each_ndjson_rev_with_options,
33    run_ndjson_rev, run_ndjson_rev_limit, run_ndjson_rev_limit_with_options,
34    run_ndjson_rev_matches, run_ndjson_rev_matches_with_options, run_ndjson_rev_with_options,
35    NdjsonReverseFileDriver,
36};
37pub use source::NdjsonSource;
38use std::fmt;
39
40/// Error with enough row context for users to find malformed input quickly.
41#[derive(Debug)]
42pub enum RowError {
43    Io(std::io::Error),
44    InvalidJson {
45        line_no: u64,
46        source: serde_json::Error,
47    },
48    InvalidJsonMessage {
49        line_no: u64,
50        message: String,
51    },
52    LineTooLarge {
53        line_no: u64,
54        len: usize,
55        max: usize,
56    },
57}
58
59impl RowError {
60    pub fn invalid_json(line_no: u64, source: serde_json::Error) -> Self {
61        Self::InvalidJson { line_no, source }
62    }
63}
64
65impl fmt::Display for RowError {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        match self {
68            Self::Io(err) => write!(f, "{}", err),
69            Self::InvalidJson { line_no, source } => {
70                write!(f, "invalid JSON on NDJSON line {line_no}: {source}")
71            }
72            Self::InvalidJsonMessage { line_no, message } => {
73                write!(f, "invalid JSON on NDJSON line {line_no}: {message}")
74            }
75            Self::LineTooLarge { line_no, len, max } => write!(
76                f,
77                "NDJSON line {line_no} is too large: {len} bytes exceeds {max} byte limit"
78            ),
79        }
80    }
81}
82
83impl std::error::Error for RowError {
84    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
85        match self {
86            Self::Io(err) => Some(err),
87            Self::InvalidJson { source, .. } => Some(source),
88            Self::InvalidJsonMessage { .. } => None,
89            Self::LineTooLarge { .. } => None,
90        }
91    }
92}
93
94impl From<std::io::Error> for RowError {
95    fn from(err: std::io::Error) -> Self {
96        Self::Io(err)
97    }
98}