Skip to main content

jetro_core/io/
ndjson_route.rs

1use super::ndjson::{ndjson_writer_path_kind, NdjsonOptions, NdjsonWriterPathKind};
2use super::ndjson_frame::NdjsonRowFrame;
3use super::ndjson_rows::{ndjson_rows_file_plan, NdjsonRowsFilePlan, NdjsonRowsPlanKind};
4use super::stream_types::RowStreamStats;
5use crate::{JetroEngine, JetroEngineError};
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum NdjsonSourceMode {
9    Reader,
10    File,
11}
12
13impl std::fmt::Display for NdjsonSourceMode {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        f.write_str(match self {
16            Self::Reader => "reader",
17            Self::File => "file",
18        })
19    }
20}
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
23pub struct NdjsonSourceCaps {
24    pub mode: NdjsonSourceMode,
25    pub forward: bool,
26    pub reverse: bool,
27    pub mmap: bool,
28    pub partitionable: bool,
29    pub framed_payload: bool,
30}
31
32impl NdjsonSourceCaps {
33    pub fn for_mode(mode: NdjsonSourceMode, options: NdjsonOptions) -> Self {
34        match mode {
35            NdjsonSourceMode::Reader => Self::reader(options),
36            NdjsonSourceMode::File => Self::file(options),
37        }
38    }
39
40    pub fn reader(options: NdjsonOptions) -> Self {
41        Self {
42            mode: NdjsonSourceMode::Reader,
43            forward: true,
44            reverse: false,
45            mmap: false,
46            partitionable: false,
47            framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
48        }
49    }
50
51    pub fn file(options: NdjsonOptions) -> Self {
52        Self {
53            mode: NdjsonSourceMode::File,
54            forward: true,
55            reverse: true,
56            mmap: true,
57            partitionable: true,
58            framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
59        }
60    }
61}
62
63impl std::fmt::Display for NdjsonSourceCaps {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "{}", self.mode)?;
66        if self.reverse {
67            f.write_str("+reverse")?;
68        }
69        if self.mmap {
70            f.write_str("+mmap")?;
71        }
72        if self.partitionable {
73            f.write_str("+partitionable")?;
74        }
75        if self.framed_payload {
76            f.write_str("+framed-payload")?;
77        }
78        Ok(())
79    }
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
83pub enum NdjsonRouteKind {
84    RowLocal,
85    Matches,
86    RowsStream,
87    RowsFanout,
88    RowsSubquery,
89    UnsupportedRows,
90}
91
92impl std::fmt::Display for NdjsonRouteKind {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.write_str(match self {
95            Self::RowLocal => "row-local",
96            Self::Matches => "matches",
97            Self::RowsStream => "rows-stream",
98            Self::RowsFanout => "rows-fanout",
99            Self::RowsSubquery => "rows-subquery",
100            Self::UnsupportedRows => "unsupported-rows",
101        })
102    }
103}
104
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum NdjsonFallbackReason {
107    FileBackedRowsRequired,
108}
109
110impl std::fmt::Display for NdjsonFallbackReason {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.write_str(match self {
113            Self::FileBackedRowsRequired => "rows plan requires a file-backed NDJSON source",
114        })
115    }
116}
117
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub struct NdjsonRouteExplain {
120    pub kind: NdjsonRouteKind,
121    pub source: NdjsonSourceCaps,
122    pub writer_path: Option<NdjsonWriterPathKind>,
123    pub rows_plan: Option<NdjsonRowsPlanKind>,
124    pub fallback_reason: Option<NdjsonFallbackReason>,
125}
126
127#[derive(Clone, Debug, Default, Eq, PartialEq)]
128pub struct NdjsonExecutionStats {
129    pub rows_scanned: usize,
130    pub rows_emitted: usize,
131    pub rows_filtered: usize,
132    pub duplicate_rows: usize,
133    pub direct_filter_rows: usize,
134    pub fallback_filter_rows: usize,
135    pub direct_key_rows: usize,
136    pub fallback_key_rows: usize,
137    pub direct_project_rows: usize,
138    pub fallback_project_rows: usize,
139    pub parallel_partitions: usize,
140    pub hint_learned_rows: usize,
141    pub hint_rejected_rows: usize,
142    pub hint_rows: usize,
143    pub hint_layout_misses: usize,
144    pub hint_disabled: bool,
145}
146
147impl From<&RowStreamStats> for NdjsonExecutionStats {
148    fn from(stats: &RowStreamStats) -> Self {
149        Self {
150            rows_scanned: stats.rows_scanned,
151            rows_emitted: stats.rows_emitted,
152            rows_filtered: stats.rows_filtered,
153            duplicate_rows: stats.duplicate_rows,
154            direct_filter_rows: stats.direct_filter_rows,
155            fallback_filter_rows: stats.fallback_filter_rows,
156            direct_key_rows: stats.direct_key_rows,
157            fallback_key_rows: stats.fallback_key_rows,
158            direct_project_rows: stats.direct_project_rows,
159            fallback_project_rows: stats.fallback_project_rows,
160            parallel_partitions: stats.parallel_partitions,
161            hint_learned_rows: 0,
162            hint_rejected_rows: 0,
163            hint_rows: 0,
164            hint_layout_misses: 0,
165            hint_disabled: false,
166        }
167    }
168}
169
170#[derive(Clone, Debug, Eq, PartialEq)]
171pub struct NdjsonExecutionReport {
172    pub route: NdjsonRouteExplain,
173    pub stats: NdjsonExecutionStats,
174}
175
176impl NdjsonExecutionReport {
177    pub fn new(route: NdjsonRouteExplain, stats: NdjsonExecutionStats) -> Self {
178        Self { route, stats }
179    }
180
181    pub fn emitted_only(route: NdjsonRouteExplain, rows_emitted: usize) -> Self {
182        Self {
183            route,
184            stats: NdjsonExecutionStats {
185                rows_emitted,
186                ..NdjsonExecutionStats::default()
187            },
188        }
189    }
190}
191
192pub(crate) enum NdjsonRoutePlan {
193    RowLocal {
194        explain: NdjsonRouteExplain,
195    },
196    Rows {
197        explain: NdjsonRouteExplain,
198        plan: NdjsonRowsFilePlan,
199    },
200    Unsupported {
201        explain: NdjsonRouteExplain,
202    },
203}
204
205impl NdjsonRoutePlan {
206    pub(crate) fn explain(&self) -> &NdjsonRouteExplain {
207        match self {
208            Self::RowLocal { explain }
209            | Self::Rows { explain, .. }
210            | Self::Unsupported { explain } => explain,
211        }
212    }
213}
214
215impl NdjsonRouteExplain {
216    pub fn matches(source: NdjsonSourceCaps) -> Self {
217        Self {
218            kind: NdjsonRouteKind::Matches,
219            source,
220            writer_path: None,
221            rows_plan: None,
222            fallback_reason: None,
223        }
224    }
225
226    pub fn is_rows_route(&self) -> bool {
227        self.rows_plan.is_some()
228    }
229
230    pub fn is_supported(&self) -> bool {
231        self.kind != NdjsonRouteKind::UnsupportedRows
232    }
233
234    pub fn unsupported_message(&self) -> Option<String> {
235        if self.is_supported() {
236            return None;
237        }
238        Some(
239            self.fallback_reason
240                .map(|reason| reason.to_string())
241                .unwrap_or_else(|| "unsupported $.rows() NDJSON route".to_string()),
242        )
243    }
244}
245
246pub(crate) fn ndjson_route_plan(
247    engine: &JetroEngine,
248    source: NdjsonSourceMode,
249    query: &str,
250    options: NdjsonOptions,
251) -> Result<NdjsonRoutePlan, JetroEngineError> {
252    let source = NdjsonSourceCaps::for_mode(source, options);
253    let Some(plan) = ndjson_rows_file_plan(query)? else {
254        return Ok(NdjsonRoutePlan::RowLocal {
255            explain: NdjsonRouteExplain {
256                kind: NdjsonRouteKind::RowLocal,
257                source,
258                writer_path: ndjson_writer_path_kind(engine, query),
259                rows_plan: None,
260                fallback_reason: None,
261            },
262        });
263    };
264
265    let rows_plan = plan.kind();
266    if plan.requires_file_backed_source() && source.mode == NdjsonSourceMode::Reader {
267        return Ok(NdjsonRoutePlan::Unsupported {
268            explain: NdjsonRouteExplain {
269                kind: NdjsonRouteKind::UnsupportedRows,
270                source,
271                writer_path: None,
272                rows_plan: Some(rows_plan),
273                fallback_reason: Some(NdjsonFallbackReason::FileBackedRowsRequired),
274            },
275        });
276    }
277
278    Ok(NdjsonRoutePlan::Rows {
279        explain: NdjsonRouteExplain {
280            kind: route_kind_for_rows_plan(rows_plan),
281            source,
282            writer_path: None,
283            rows_plan: Some(rows_plan),
284            fallback_reason: None,
285        },
286        plan,
287    })
288}
289
290pub fn ndjson_explain(
291    engine: &JetroEngine,
292    source: NdjsonSourceMode,
293    query: &str,
294    options: NdjsonOptions,
295) -> Result<NdjsonRouteExplain, JetroEngineError> {
296    Ok(ndjson_route_plan(engine, source, query, options)?
297        .explain()
298        .clone())
299}
300
301fn route_kind_for_rows_plan(plan: NdjsonRowsPlanKind) -> NdjsonRouteKind {
302    match plan {
303        NdjsonRowsPlanKind::Stream => NdjsonRouteKind::RowsStream,
304        NdjsonRowsPlanKind::Fanout => NdjsonRouteKind::RowsFanout,
305        NdjsonRowsPlanKind::Subquery => NdjsonRouteKind::RowsSubquery,
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn route_explain_reports_row_local_and_rows_modes() {
315        let engine = JetroEngine::new();
316        let row = ndjson_explain(
317            &engine,
318            NdjsonSourceMode::Reader,
319            "$.name",
320            NdjsonOptions::default(),
321        )
322        .unwrap();
323        assert_eq!(row.kind, NdjsonRouteKind::RowLocal);
324        assert_eq!(row.writer_path, Some(NdjsonWriterPathKind::ByteExpr));
325        assert_eq!(row.source.to_string(), "reader");
326
327        let rows = ndjson_explain(
328            &engine,
329            NdjsonSourceMode::File,
330            "$.rows().take(1)",
331            NdjsonOptions::default(),
332        )
333        .unwrap();
334        assert_eq!(rows.kind, NdjsonRouteKind::RowsStream);
335        assert_eq!(rows.rows_plan, Some(NdjsonRowsPlanKind::Stream));
336        assert_eq!(rows.source.to_string(), "file+reverse+mmap+partitionable");
337    }
338
339    #[test]
340    fn route_explain_marks_reader_rows_subquery_unsupported() {
341        let engine = JetroEngine::new();
342        let route = ndjson_explain(
343            &engine,
344            NdjsonSourceMode::Reader,
345            r#"{head: $.rows().take(1)}"#,
346            NdjsonOptions::default(),
347        )
348        .unwrap();
349        assert_eq!(route.kind, NdjsonRouteKind::UnsupportedRows);
350        assert_eq!(
351            route.fallback_reason,
352            Some(NdjsonFallbackReason::FileBackedRowsRequired)
353        );
354        assert_eq!(route.kind.to_string(), "unsupported-rows");
355        assert_eq!(
356            route.fallback_reason.unwrap().to_string(),
357            "rows plan requires a file-backed NDJSON source"
358        );
359        assert!(!route.is_supported());
360        assert_eq!(
361            route.unsupported_message().unwrap(),
362            "rows plan requires a file-backed NDJSON source"
363        );
364    }
365
366    #[test]
367    fn route_explain_marks_reader_rows_fanout_unsupported() {
368        let engine = JetroEngine::new();
369        let query = r#"let stream = $.rows(), a = stream.take(1), b = stream.count() in {a, b}"#;
370
371        let reader = ndjson_explain(
372            &engine,
373            NdjsonSourceMode::Reader,
374            query,
375            NdjsonOptions::default(),
376        )
377        .unwrap();
378        assert_eq!(reader.kind, NdjsonRouteKind::UnsupportedRows);
379        assert_eq!(reader.rows_plan, Some(NdjsonRowsPlanKind::Fanout));
380        assert_eq!(
381            reader.fallback_reason,
382            Some(NdjsonFallbackReason::FileBackedRowsRequired)
383        );
384
385        let file = ndjson_explain(
386            &engine,
387            NdjsonSourceMode::File,
388            query,
389            NdjsonOptions::default(),
390        )
391        .unwrap();
392        assert_eq!(file.kind, NdjsonRouteKind::RowsFanout);
393        assert!(file.is_supported());
394        assert!(file.fallback_reason.is_none());
395    }
396
397    #[test]
398    fn execution_stats_copy_row_stream_counters() {
399        let stream = RowStreamStats {
400            rows_scanned: 3,
401            rows_emitted: 2,
402            rows_filtered: 1,
403            direct_project_rows: 2,
404            parallel_partitions: 4,
405            ..RowStreamStats::default()
406        };
407        let stats = NdjsonExecutionStats::from(&stream);
408        assert_eq!(stats.rows_scanned, 3);
409        assert_eq!(stats.rows_emitted, 2);
410        assert_eq!(stats.rows_filtered, 1);
411        assert_eq!(stats.direct_project_rows, 2);
412        assert_eq!(stats.parallel_partitions, 4);
413    }
414}