Skip to main content

jetro_core/io/
ndjson_route.rs

1use super::ndjson::{ndjson_writer_path_kind, NdjsonOptions, NdjsonWriterPathKind};
2use super::ndjson_frame::NdjsonRowFrame;
3use super::ndjson_rows::{ndjson_rows_file_plan, NdjsonRowsFilePlan, NdjsonRowsPlanKind};
4use super::stream_types::RowStreamStats;
5use crate::{JetroEngine, JetroEngineError};
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum NdjsonSourceMode {
9    Reader,
10    File,
11}
12
13impl std::fmt::Display for NdjsonSourceMode {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        f.write_str(match self {
16            Self::Reader => "reader",
17            Self::File => "file",
18        })
19    }
20}
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
23pub struct NdjsonSourceCaps {
24    pub mode: NdjsonSourceMode,
25    pub forward: bool,
26    pub reverse: bool,
27    pub mmap: bool,
28    pub partitionable: bool,
29    pub framed_payload: bool,
30}
31
32impl NdjsonSourceCaps {
33    pub fn for_mode(mode: NdjsonSourceMode, options: NdjsonOptions) -> Self {
34        match mode {
35            NdjsonSourceMode::Reader => Self::reader(options),
36            NdjsonSourceMode::File => Self::file(options),
37        }
38    }
39
40    pub fn reader(options: NdjsonOptions) -> Self {
41        Self {
42            mode: NdjsonSourceMode::Reader,
43            forward: true,
44            reverse: false,
45            mmap: false,
46            partitionable: false,
47            framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
48        }
49    }
50
51    pub fn file(options: NdjsonOptions) -> Self {
52        Self {
53            mode: NdjsonSourceMode::File,
54            forward: true,
55            reverse: true,
56            mmap: true,
57            partitionable: true,
58            framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
59        }
60    }
61}
62
63impl std::fmt::Display for NdjsonSourceCaps {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "{}", self.mode)?;
66        if self.reverse {
67            f.write_str("+reverse")?;
68        }
69        if self.mmap {
70            f.write_str("+mmap")?;
71        }
72        if self.partitionable {
73            f.write_str("+partitionable")?;
74        }
75        if self.framed_payload {
76            f.write_str("+framed-payload")?;
77        }
78        Ok(())
79    }
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
83pub enum NdjsonRouteKind {
84    RowLocal,
85    Matches,
86    RowsStream,
87    RowsFanout,
88    RowsSubquery,
89    UnsupportedRows,
90}
91
92impl std::fmt::Display for NdjsonRouteKind {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.write_str(match self {
95            Self::RowLocal => "row-local",
96            Self::Matches => "matches",
97            Self::RowsStream => "rows-stream",
98            Self::RowsFanout => "rows-fanout",
99            Self::RowsSubquery => "rows-subquery",
100            Self::UnsupportedRows => "unsupported-rows",
101        })
102    }
103}
104
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum NdjsonFallbackReason {
107    FileBackedRowsRequired,
108}
109
110impl std::fmt::Display for NdjsonFallbackReason {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.write_str(match self {
113            Self::FileBackedRowsRequired => "rows plan requires a file-backed NDJSON source",
114        })
115    }
116}
117
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub struct NdjsonRouteExplain {
120    pub kind: NdjsonRouteKind,
121    pub source: NdjsonSourceCaps,
122    pub writer_path: Option<NdjsonWriterPathKind>,
123    pub rows_plan: Option<NdjsonRowsPlanKind>,
124    pub fallback_reason: Option<NdjsonFallbackReason>,
125}
126
127#[derive(Clone, Debug, Default, Eq, PartialEq)]
128pub struct NdjsonExecutionStats {
129    pub rows_scanned: usize,
130    pub rows_emitted: usize,
131    pub rows_filtered: usize,
132    pub duplicate_rows: usize,
133    pub direct_filter_rows: usize,
134    pub fallback_filter_rows: usize,
135    pub direct_key_rows: usize,
136    pub fallback_key_rows: usize,
137    pub direct_project_rows: usize,
138    pub fallback_project_rows: usize,
139    pub parallel_partitions: usize,
140    pub hint_learned_rows: usize,
141    pub hint_rejected_rows: usize,
142    pub hint_rows: usize,
143    pub hint_layout_misses: usize,
144    pub hint_disabled: bool,
145}
146
147impl From<&RowStreamStats> for NdjsonExecutionStats {
148    fn from(stats: &RowStreamStats) -> Self {
149        Self {
150            rows_scanned: stats.rows_scanned,
151            rows_emitted: stats.rows_emitted,
152            rows_filtered: stats.rows_filtered,
153            duplicate_rows: stats.duplicate_rows,
154            direct_filter_rows: stats.direct_filter_rows,
155            fallback_filter_rows: stats.fallback_filter_rows,
156            direct_key_rows: stats.direct_key_rows,
157            fallback_key_rows: stats.fallback_key_rows,
158            direct_project_rows: stats.direct_project_rows,
159            fallback_project_rows: stats.fallback_project_rows,
160            parallel_partitions: stats.parallel_partitions,
161            hint_learned_rows: 0,
162            hint_rejected_rows: 0,
163            hint_rows: 0,
164            hint_layout_misses: 0,
165            hint_disabled: false,
166        }
167    }
168}
169
170#[derive(Clone, Debug, Eq, PartialEq)]
171pub struct NdjsonExecutionReport {
172    pub route: NdjsonRouteExplain,
173    pub stats: NdjsonExecutionStats,
174}
175
176impl NdjsonExecutionReport {
177    pub fn new(route: NdjsonRouteExplain, stats: NdjsonExecutionStats) -> Self {
178        Self { route, stats }
179    }
180
181    pub fn emitted_only(route: NdjsonRouteExplain, rows_emitted: usize) -> Self {
182        Self {
183            route,
184            stats: NdjsonExecutionStats {
185                rows_emitted,
186                ..NdjsonExecutionStats::default()
187            },
188        }
189    }
190}
191
192pub(super) enum NdjsonRoutePlan {
193    RowLocal {
194        explain: NdjsonRouteExplain,
195    },
196    Rows {
197        explain: NdjsonRouteExplain,
198        plan: NdjsonRowsFilePlan,
199    },
200    Unsupported {
201        explain: NdjsonRouteExplain,
202    },
203}
204
205impl NdjsonRoutePlan {
206    pub(super) fn explain(&self) -> &NdjsonRouteExplain {
207        match self {
208            Self::RowLocal { explain }
209            | Self::Rows { explain, .. }
210            | Self::Unsupported { explain } => explain,
211        }
212    }
213
214}
215
216impl NdjsonRouteExplain {
217    pub fn matches(source: NdjsonSourceCaps) -> Self {
218        Self {
219            kind: NdjsonRouteKind::Matches,
220            source,
221            writer_path: None,
222            rows_plan: None,
223            fallback_reason: None,
224        }
225    }
226
227    pub fn is_rows_route(&self) -> bool {
228        self.rows_plan.is_some()
229    }
230
231    pub fn is_supported(&self) -> bool {
232        self.kind != NdjsonRouteKind::UnsupportedRows
233    }
234
235    pub fn unsupported_message(&self) -> Option<String> {
236        if self.is_supported() {
237            return None;
238        }
239        Some(
240            self.fallback_reason
241                .map(|reason| reason.to_string())
242                .unwrap_or_else(|| "unsupported $.rows() NDJSON route".to_string()),
243        )
244    }
245}
246
247pub(super) fn ndjson_route_plan(
248    engine: &JetroEngine,
249    source: NdjsonSourceMode,
250    query: &str,
251    options: NdjsonOptions,
252) -> Result<NdjsonRoutePlan, JetroEngineError> {
253    let source = NdjsonSourceCaps::for_mode(source, options);
254    let Some(plan) = ndjson_rows_file_plan(query)? else {
255        return Ok(NdjsonRoutePlan::RowLocal {
256            explain: NdjsonRouteExplain {
257                kind: NdjsonRouteKind::RowLocal,
258                source,
259                writer_path: ndjson_writer_path_kind(engine, query),
260                rows_plan: None,
261                fallback_reason: None,
262            },
263        });
264    };
265
266    let rows_plan = plan.kind();
267    if plan.requires_file_backed_source() && source.mode == NdjsonSourceMode::Reader {
268        return Ok(NdjsonRoutePlan::Unsupported {
269            explain: NdjsonRouteExplain {
270                kind: NdjsonRouteKind::UnsupportedRows,
271                source,
272                writer_path: None,
273                rows_plan: Some(rows_plan),
274                fallback_reason: Some(NdjsonFallbackReason::FileBackedRowsRequired),
275            },
276        });
277    }
278
279    Ok(NdjsonRoutePlan::Rows {
280        explain: NdjsonRouteExplain {
281            kind: route_kind_for_rows_plan(rows_plan),
282            source,
283            writer_path: None,
284            rows_plan: Some(rows_plan),
285            fallback_reason: None,
286        },
287        plan,
288    })
289}
290
291pub fn ndjson_explain(
292    engine: &JetroEngine,
293    source: NdjsonSourceMode,
294    query: &str,
295    options: NdjsonOptions,
296) -> Result<NdjsonRouteExplain, JetroEngineError> {
297    Ok(ndjson_route_plan(engine, source, query, options)?
298        .explain()
299        .clone())
300}
301
302fn route_kind_for_rows_plan(plan: NdjsonRowsPlanKind) -> NdjsonRouteKind {
303    match plan {
304        NdjsonRowsPlanKind::Stream => NdjsonRouteKind::RowsStream,
305        NdjsonRowsPlanKind::Fanout => NdjsonRouteKind::RowsFanout,
306        NdjsonRowsPlanKind::Subquery => NdjsonRouteKind::RowsSubquery,
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn route_explain_reports_row_local_and_rows_modes() {
316        let engine = JetroEngine::new();
317        let row = ndjson_explain(
318            &engine,
319            NdjsonSourceMode::Reader,
320            "$.name",
321            NdjsonOptions::default(),
322        )
323        .unwrap();
324        assert_eq!(row.kind, NdjsonRouteKind::RowLocal);
325        assert_eq!(row.writer_path, Some(NdjsonWriterPathKind::ByteExpr));
326        assert_eq!(row.source.to_string(), "reader");
327
328        let rows = ndjson_explain(
329            &engine,
330            NdjsonSourceMode::File,
331            "$.rows().take(1)",
332            NdjsonOptions::default(),
333        )
334        .unwrap();
335        assert_eq!(rows.kind, NdjsonRouteKind::RowsStream);
336        assert_eq!(rows.rows_plan, Some(NdjsonRowsPlanKind::Stream));
337        assert_eq!(rows.source.to_string(), "file+reverse+mmap+partitionable");
338    }
339
340    #[test]
341    fn route_explain_marks_reader_rows_subquery_unsupported() {
342        let engine = JetroEngine::new();
343        let route = ndjson_explain(
344            &engine,
345            NdjsonSourceMode::Reader,
346            r#"{head: $.rows().take(1)}"#,
347            NdjsonOptions::default(),
348        )
349        .unwrap();
350        assert_eq!(route.kind, NdjsonRouteKind::UnsupportedRows);
351        assert_eq!(
352            route.fallback_reason,
353            Some(NdjsonFallbackReason::FileBackedRowsRequired)
354        );
355        assert_eq!(route.kind.to_string(), "unsupported-rows");
356        assert_eq!(
357            route.fallback_reason.unwrap().to_string(),
358            "rows plan requires a file-backed NDJSON source"
359        );
360        assert!(!route.is_supported());
361        assert_eq!(
362            route.unsupported_message().unwrap(),
363            "rows plan requires a file-backed NDJSON source"
364        );
365    }
366
367    #[test]
368    fn route_explain_marks_reader_rows_fanout_unsupported() {
369        let engine = JetroEngine::new();
370        let query =
371            r#"let stream = $.rows(), a = stream.take(1), b = stream.count() in {a, b}"#;
372
373        let reader = ndjson_explain(
374            &engine,
375            NdjsonSourceMode::Reader,
376            query,
377            NdjsonOptions::default(),
378        )
379        .unwrap();
380        assert_eq!(reader.kind, NdjsonRouteKind::UnsupportedRows);
381        assert_eq!(reader.rows_plan, Some(NdjsonRowsPlanKind::Fanout));
382        assert_eq!(
383            reader.fallback_reason,
384            Some(NdjsonFallbackReason::FileBackedRowsRequired)
385        );
386
387        let file = ndjson_explain(
388            &engine,
389            NdjsonSourceMode::File,
390            query,
391            NdjsonOptions::default(),
392        )
393        .unwrap();
394        assert_eq!(file.kind, NdjsonRouteKind::RowsFanout);
395        assert!(file.is_supported());
396        assert!(file.fallback_reason.is_none());
397    }
398
399    #[test]
400    fn execution_stats_copy_row_stream_counters() {
401        let stream = RowStreamStats {
402            rows_scanned: 3,
403            rows_emitted: 2,
404            rows_filtered: 1,
405            direct_project_rows: 2,
406            parallel_partitions: 4,
407            ..RowStreamStats::default()
408        };
409        let stats = NdjsonExecutionStats::from(&stream);
410        assert_eq!(stats.rows_scanned, 3);
411        assert_eq!(stats.rows_emitted, 2);
412        assert_eq!(stats.rows_filtered, 1);
413        assert_eq!(stats.direct_project_rows, 2);
414        assert_eq!(stats.parallel_partitions, 4);
415    }
416}