Skip to main content

jetro_core/io/
mod.rs

1//! Streaming input/output adapters for query execution.
2//!
3//! The initial NDJSON path evaluates each non-empty row independently while
4//! reusing the caller's [`crate::JetroEngine`] plan and VM caches.
5
6mod ndjson;
7mod ndjson_rev;
8mod source;
9
10pub use ndjson::{
11    collect_ndjson, collect_ndjson_file, collect_ndjson_file_with_options, collect_ndjson_matches,
12    collect_ndjson_matches_file, collect_ndjson_matches_file_with_options,
13    collect_ndjson_matches_source, collect_ndjson_matches_source_with_options,
14    collect_ndjson_matches_with_options, collect_ndjson_source, collect_ndjson_source_with_options,
15    collect_ndjson_with_options, for_each_ndjson, for_each_ndjson_source,
16    for_each_ndjson_source_until, for_each_ndjson_source_until_with_options,
17    for_each_ndjson_source_with_options, for_each_ndjson_until, for_each_ndjson_until_with_options,
18    for_each_ndjson_with_options, run_ndjson, run_ndjson_file, run_ndjson_file_limit,
19    run_ndjson_file_limit_with_options, run_ndjson_file_with_options, run_ndjson_limit,
20    run_ndjson_limit_with_options, run_ndjson_matches, run_ndjson_matches_file,
21    run_ndjson_matches_file_with_options, run_ndjson_matches_source,
22    run_ndjson_matches_source_with_options, run_ndjson_matches_with_options, run_ndjson_source,
23    run_ndjson_source_limit, run_ndjson_source_limit_with_options, run_ndjson_source_with_options,
24    run_ndjson_with_options, NdjsonControl, NdjsonOptions, NdjsonPerRowDriver,
25};
26pub use ndjson_rev::{
27    collect_ndjson_rev, collect_ndjson_rev_matches, collect_ndjson_rev_matches_with_options,
28    collect_ndjson_rev_with_options, for_each_ndjson_rev, for_each_ndjson_rev_with_options,
29    run_ndjson_rev, run_ndjson_rev_limit, run_ndjson_rev_limit_with_options,
30    run_ndjson_rev_matches, run_ndjson_rev_matches_with_options, run_ndjson_rev_with_options,
31    NdjsonReverseFileDriver,
32};
33pub use source::NdjsonSource;
34use std::fmt;
35
36/// Error with enough row context for users to find malformed input quickly.
37#[derive(Debug)]
38pub enum RowError {
39    Io(std::io::Error),
40    InvalidJson {
41        line_no: u64,
42        source: serde_json::Error,
43    },
44    InvalidJsonMessage {
45        line_no: u64,
46        message: String,
47    },
48    LineTooLarge {
49        line_no: u64,
50        len: usize,
51        max: usize,
52    },
53}
54
55impl RowError {
56    pub fn invalid_json(line_no: u64, source: serde_json::Error) -> Self {
57        Self::InvalidJson { line_no, source }
58    }
59}
60
61impl fmt::Display for RowError {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            Self::Io(err) => write!(f, "{}", err),
65            Self::InvalidJson { line_no, source } => {
66                write!(f, "invalid JSON on NDJSON line {line_no}: {source}")
67            }
68            Self::InvalidJsonMessage { line_no, message } => {
69                write!(f, "invalid JSON on NDJSON line {line_no}: {message}")
70            }
71            Self::LineTooLarge { line_no, len, max } => write!(
72                f,
73                "NDJSON line {line_no} is too large: {len} bytes exceeds {max} byte limit"
74            ),
75        }
76    }
77}
78
79impl std::error::Error for RowError {
80    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
81        match self {
82            Self::Io(err) => Some(err),
83            Self::InvalidJson { source, .. } => Some(source),
84            Self::InvalidJsonMessage { .. } => None,
85            Self::LineTooLarge { .. } => None,
86        }
87    }
88}
89
90impl From<std::io::Error> for RowError {
91    fn from(err: std::io::Error) -> Self {
92        Self::Io(err)
93    }
94}