Skip to main content

jetro_core/io/
mod.rs

1//! Streaming input/output adapters for query execution.
2//!
3//! NDJSON adapters evaluate each non-empty row independently while reusing the
4//! caller's [`crate::JetroEngine`] plan, VM, and tape execution caches.
5
6mod document_rows;
7mod mapped_bytes;
8mod ndjson;
9mod ndjson_byte;
10mod ndjson_direct;
11mod ndjson_distinct;
12mod ndjson_driver;
13mod ndjson_frame;
14mod ndjson_hint;
15mod ndjson_parallel;
16mod ndjson_rev;
17mod ndjson_route;
18mod ndjson_row;
19mod ndjson_rows;
20mod ndjson_scan;
21mod ndjson_stream_cache;
22mod ndjson_write;
23mod source;
24mod stream_direct;
25mod stream_exec;
26mod stream_fanout;
27mod stream_numeric;
28mod stream_plan;
29mod stream_source;
30mod stream_subquery;
31mod stream_types;
32
33pub(crate) use document_rows::collect_document_rows;
34pub use ndjson::{
35    collect_ndjson, collect_ndjson_file, collect_ndjson_file_with_options, collect_ndjson_matches,
36    collect_ndjson_matches_file, collect_ndjson_matches_file_with_options,
37    collect_ndjson_matches_source, collect_ndjson_matches_source_with_options,
38    collect_ndjson_matches_with_options, collect_ndjson_source, collect_ndjson_source_with_options,
39    collect_ndjson_with_options, for_each_ndjson, for_each_ndjson_source,
40    for_each_ndjson_source_until, for_each_ndjson_source_until_with_options,
41    for_each_ndjson_source_with_options, for_each_ndjson_until, for_each_ndjson_until_with_options,
42    for_each_ndjson_with_options, ndjson_writer_path_kind, run_ndjson, run_ndjson_file,
43    run_ndjson_file_limit, run_ndjson_file_limit_with_options, run_ndjson_file_limit_with_report,
44    run_ndjson_file_limit_with_report_and_options, run_ndjson_file_with_options,
45    run_ndjson_file_with_report, run_ndjson_file_with_report_and_options, run_ndjson_limit,
46    run_ndjson_limit_with_options, run_ndjson_limit_with_report,
47    run_ndjson_limit_with_report_and_options, run_ndjson_matches, run_ndjson_matches_file,
48    run_ndjson_matches_file_with_options, run_ndjson_matches_file_with_report,
49    run_ndjson_matches_file_with_report_and_options, run_ndjson_matches_source,
50    run_ndjson_matches_source_with_options, run_ndjson_matches_source_with_report,
51    run_ndjson_matches_source_with_report_and_options, run_ndjson_matches_with_options,
52    run_ndjson_matches_with_report, run_ndjson_matches_with_report_and_options, run_ndjson_source,
53    run_ndjson_source_limit, run_ndjson_source_limit_with_options,
54    run_ndjson_source_limit_with_report, run_ndjson_source_limit_with_report_and_options,
55    run_ndjson_source_with_options, run_ndjson_source_with_report,
56    run_ndjson_source_with_report_and_options, run_ndjson_with_options, run_ndjson_with_report,
57    run_ndjson_with_report_and_options, NdjsonControl, NdjsonNullOutput, NdjsonOptions,
58    NdjsonParallelism, NdjsonWriterPathKind,
59};
60pub use ndjson_distinct::DistinctFrontFilterKind;
61pub use ndjson_driver::NdjsonPerRowDriver;
62pub use ndjson_frame::{NdjsonRowFrame, NullPayload};
63pub use ndjson_rev::{
64    collect_ndjson_rev, collect_ndjson_rev_matches, collect_ndjson_rev_matches_with_options,
65    collect_ndjson_rev_with_options, for_each_ndjson_rev, for_each_ndjson_rev_with_options,
66    run_ndjson_rev, run_ndjson_rev_distinct_by, run_ndjson_rev_distinct_by_with_options,
67    run_ndjson_rev_distinct_by_with_report, run_ndjson_rev_distinct_by_with_report_and_options,
68    run_ndjson_rev_distinct_by_with_stats, run_ndjson_rev_distinct_by_with_stats_and_options,
69    run_ndjson_rev_limit, run_ndjson_rev_limit_with_options, run_ndjson_rev_matches,
70    run_ndjson_rev_matches_with_options, run_ndjson_rev_matches_with_report,
71    run_ndjson_rev_matches_with_report_and_options, run_ndjson_rev_with_options,
72    NdjsonRevDistinctStats, NdjsonReverseFileDriver,
73};
74pub use ndjson_route::{
75    ndjson_explain, NdjsonExecutionReport, NdjsonExecutionStats, NdjsonFallbackReason,
76    NdjsonRouteExplain, NdjsonRouteKind, NdjsonSourceCaps, NdjsonSourceMode,
77};
78pub(crate) use ndjson_route::{ndjson_route_plan, NdjsonRoutePlan};
79pub(crate) use ndjson_rows::NdjsonRowsFilePlan;
80pub use ndjson_rows::{ndjson_rows_plan_kind, NdjsonRowsPlanKind};
81pub use source::NdjsonSource;
82use std::fmt;
83pub(crate) use stream_plan::{
84    RowStreamDemand, RowStreamDirection, RowStreamFileStrategy, RowStreamParallelism,
85    RowStreamPlan, RowStreamSourceKind, RowStreamStage,
86};
87
88/// Error with enough row context for users to find malformed input quickly.
89#[derive(Debug)]
90pub enum RowError {
91    Io(std::io::Error),
92    InvalidJson {
93        line_no: u64,
94        source: serde_json::Error,
95    },
96    InvalidJsonMessage {
97        line_no: u64,
98        message: String,
99    },
100    EmptyPayload {
101        line_no: u64,
102    },
103    NullPayload {
104        line_no: u64,
105    },
106    LineTooLarge {
107        line_no: u64,
108        len: usize,
109        max: usize,
110    },
111}
112
113impl RowError {
114    pub fn invalid_json(line_no: u64, source: serde_json::Error) -> Self {
115        Self::InvalidJson { line_no, source }
116    }
117}
118
119impl fmt::Display for RowError {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        match self {
122            Self::Io(err) => write!(f, "{}", err),
123            Self::InvalidJson { line_no, source } => {
124                write!(f, "invalid JSON on NDJSON line {line_no}: {source}")
125            }
126            Self::InvalidJsonMessage { line_no, message } => {
127                write!(f, "invalid JSON on NDJSON line {line_no}: {message}")
128            }
129            Self::EmptyPayload { line_no } => {
130                write!(f, "NDJSON line {line_no} has an empty framed payload")
131            }
132            Self::NullPayload { line_no } => {
133                write!(f, "NDJSON line {line_no} has a null framed payload")
134            }
135            Self::LineTooLarge { line_no, len, max } => write!(
136                f,
137                "NDJSON line {line_no} is too large: {len} bytes exceeds {max} byte limit"
138            ),
139        }
140    }
141}
142
143impl std::error::Error for RowError {
144    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
145        match self {
146            Self::Io(err) => Some(err),
147            Self::InvalidJson { source, .. } => Some(source),
148            Self::InvalidJsonMessage { .. } => None,
149            Self::EmptyPayload { .. } => None,
150            Self::NullPayload { .. } => None,
151            Self::LineTooLarge { .. } => None,
152        }
153    }
154}
155
156impl From<std::io::Error> for RowError {
157    fn from(err: std::io::Error) -> Self {
158        Self::Io(err)
159    }
160}