Skip to main content

jetro_core/io/
mod.rs

1//! Streaming input/output adapters for query execution.
2//!
3//! NDJSON adapters evaluate each non-empty row independently while reusing the
4//! caller's [`crate::JetroEngine`] plan, VM, and tape execution caches.
5
6mod document_rows;
7mod mapped_bytes;
8mod ndjson;
9#[cfg(feature = "simd-json")]
10mod ndjson_byte;
11#[cfg(feature = "simd-json")]
12mod ndjson_direct;
13mod ndjson_distinct;
14mod ndjson_frame;
15#[cfg(feature = "simd-json")]
16#[cfg_attr(not(test), allow(dead_code))]
17mod ndjson_hint;
18mod ndjson_parallel;
19mod ndjson_rev;
20#[cfg(feature = "simd-json")]
21mod ndjson_stream_cache;
22mod source;
23mod stream_direct;
24mod stream_exec;
25#[allow(dead_code)]
26mod stream_plan;
27mod stream_source;
28mod stream_subquery;
29mod stream_types;
30
31pub(crate) use document_rows::collect_document_rows;
32pub use ndjson::{
33    collect_ndjson, collect_ndjson_file, collect_ndjson_file_with_options, collect_ndjson_matches,
34    collect_ndjson_matches_file, collect_ndjson_matches_file_with_options,
35    collect_ndjson_matches_source, collect_ndjson_matches_source_with_options,
36    collect_ndjson_matches_with_options, collect_ndjson_source, collect_ndjson_source_with_options,
37    collect_ndjson_with_options, for_each_ndjson, for_each_ndjson_source,
38    for_each_ndjson_source_until, for_each_ndjson_source_until_with_options,
39    for_each_ndjson_source_with_options, for_each_ndjson_until, for_each_ndjson_until_with_options,
40    for_each_ndjson_with_options, run_ndjson, run_ndjson_file, run_ndjson_file_limit,
41    run_ndjson_file_limit_with_options, run_ndjson_file_with_options, run_ndjson_limit,
42    run_ndjson_limit_with_options, run_ndjson_matches, run_ndjson_matches_file,
43    run_ndjson_matches_file_with_options, run_ndjson_matches_source,
44    run_ndjson_matches_source_with_options, run_ndjson_matches_with_options, run_ndjson_source,
45    run_ndjson_source_limit, run_ndjson_source_limit_with_options, run_ndjson_source_with_options,
46    run_ndjson_with_options, NdjsonControl, NdjsonNullOutput, NdjsonOptions, NdjsonParallelism,
47    NdjsonPerRowDriver,
48};
49pub use ndjson_distinct::DistinctFrontFilterKind;
50pub use ndjson_frame::{NdjsonRowFrame, NullPayload};
51pub use ndjson_rev::{
52    collect_ndjson_rev, collect_ndjson_rev_matches, collect_ndjson_rev_matches_with_options,
53    collect_ndjson_rev_with_options, for_each_ndjson_rev, for_each_ndjson_rev_with_options,
54    run_ndjson_rev, run_ndjson_rev_distinct_by, run_ndjson_rev_distinct_by_with_options,
55    run_ndjson_rev_distinct_by_with_stats, run_ndjson_rev_distinct_by_with_stats_and_options,
56    run_ndjson_rev_limit, run_ndjson_rev_limit_with_options, run_ndjson_rev_matches,
57    run_ndjson_rev_matches_with_options, run_ndjson_rev_with_options, NdjsonRevDistinctStats,
58    NdjsonReverseFileDriver,
59};
60pub use source::NdjsonSource;
61use std::fmt;
62
63/// Error with enough row context for users to find malformed input quickly.
64#[derive(Debug)]
65pub enum RowError {
66    Io(std::io::Error),
67    InvalidJson {
68        line_no: u64,
69        source: serde_json::Error,
70    },
71    InvalidJsonMessage {
72        line_no: u64,
73        message: String,
74    },
75    EmptyPayload {
76        line_no: u64,
77    },
78    NullPayload {
79        line_no: u64,
80    },
81    LineTooLarge {
82        line_no: u64,
83        len: usize,
84        max: usize,
85    },
86}
87
88impl RowError {
89    pub fn invalid_json(line_no: u64, source: serde_json::Error) -> Self {
90        Self::InvalidJson { line_no, source }
91    }
92}
93
94impl fmt::Display for RowError {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Io(err) => write!(f, "{}", err),
98            Self::InvalidJson { line_no, source } => {
99                write!(f, "invalid JSON on NDJSON line {line_no}: {source}")
100            }
101            Self::InvalidJsonMessage { line_no, message } => {
102                write!(f, "invalid JSON on NDJSON line {line_no}: {message}")
103            }
104            Self::EmptyPayload { line_no } => {
105                write!(f, "NDJSON line {line_no} has an empty framed payload")
106            }
107            Self::NullPayload { line_no } => {
108                write!(f, "NDJSON line {line_no} has a null framed payload")
109            }
110            Self::LineTooLarge { line_no, len, max } => write!(
111                f,
112                "NDJSON line {line_no} is too large: {len} bytes exceeds {max} byte limit"
113            ),
114        }
115    }
116}
117
118impl std::error::Error for RowError {
119    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
120        match self {
121            Self::Io(err) => Some(err),
122            Self::InvalidJson { source, .. } => Some(source),
123            Self::InvalidJsonMessage { .. } => None,
124            Self::EmptyPayload { .. } => None,
125            Self::NullPayload { .. } => None,
126            Self::LineTooLarge { .. } => None,
127        }
128    }
129}
130
131impl From<std::io::Error> for RowError {
132    fn from(err: std::io::Error) -> Self {
133        Self::Io(err)
134    }
135}