1use super::ndjson::{ndjson_writer_path_kind, NdjsonOptions, NdjsonWriterPathKind};
2use super::ndjson_frame::NdjsonRowFrame;
3use super::ndjson_rows::{ndjson_rows_file_plan, NdjsonRowsFilePlan, NdjsonRowsPlanKind};
4use super::stream_types::RowStreamStats;
5use crate::{JetroEngine, JetroEngineError};
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum NdjsonSourceMode {
9 Reader,
10 File,
11}
12
13impl std::fmt::Display for NdjsonSourceMode {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 f.write_str(match self {
16 Self::Reader => "reader",
17 Self::File => "file",
18 })
19 }
20}
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
23pub struct NdjsonSourceCaps {
24 pub mode: NdjsonSourceMode,
25 pub forward: bool,
26 pub reverse: bool,
27 pub mmap: bool,
28 pub partitionable: bool,
29 pub framed_payload: bool,
30}
31
32impl NdjsonSourceCaps {
33 pub fn for_mode(mode: NdjsonSourceMode, options: NdjsonOptions) -> Self {
34 match mode {
35 NdjsonSourceMode::Reader => Self::reader(options),
36 NdjsonSourceMode::File => Self::file(options),
37 }
38 }
39
40 pub fn reader(options: NdjsonOptions) -> Self {
41 Self {
42 mode: NdjsonSourceMode::Reader,
43 forward: true,
44 reverse: false,
45 mmap: false,
46 partitionable: false,
47 framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
48 }
49 }
50
51 pub fn file(options: NdjsonOptions) -> Self {
52 Self {
53 mode: NdjsonSourceMode::File,
54 forward: true,
55 reverse: true,
56 mmap: true,
57 partitionable: true,
58 framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
59 }
60 }
61}
62
63impl std::fmt::Display for NdjsonSourceCaps {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.mode)?;
66 if self.reverse {
67 f.write_str("+reverse")?;
68 }
69 if self.mmap {
70 f.write_str("+mmap")?;
71 }
72 if self.partitionable {
73 f.write_str("+partitionable")?;
74 }
75 if self.framed_payload {
76 f.write_str("+framed-payload")?;
77 }
78 Ok(())
79 }
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
83pub enum NdjsonRouteKind {
84 RowLocal,
85 Matches,
86 RowsStream,
87 RowsFanout,
88 RowsSubquery,
89 UnsupportedRows,
90}
91
92impl std::fmt::Display for NdjsonRouteKind {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.write_str(match self {
95 Self::RowLocal => "row-local",
96 Self::Matches => "matches",
97 Self::RowsStream => "rows-stream",
98 Self::RowsFanout => "rows-fanout",
99 Self::RowsSubquery => "rows-subquery",
100 Self::UnsupportedRows => "unsupported-rows",
101 })
102 }
103}
104
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum NdjsonFallbackReason {
107 FileBackedRowsRequired,
108}
109
110impl std::fmt::Display for NdjsonFallbackReason {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.write_str(match self {
113 Self::FileBackedRowsRequired => "rows plan requires a file-backed NDJSON source",
114 })
115 }
116}
117
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub struct NdjsonRouteExplain {
120 pub kind: NdjsonRouteKind,
121 pub source: NdjsonSourceCaps,
122 pub writer_path: Option<NdjsonWriterPathKind>,
123 pub rows_plan: Option<NdjsonRowsPlanKind>,
124 pub fallback_reason: Option<NdjsonFallbackReason>,
125}
126
127#[derive(Clone, Debug, Default, Eq, PartialEq)]
128pub struct NdjsonExecutionStats {
129 pub rows_scanned: usize,
130 pub rows_emitted: usize,
131 pub rows_filtered: usize,
132 pub duplicate_rows: usize,
133 pub direct_filter_rows: usize,
134 pub fallback_filter_rows: usize,
135 pub direct_key_rows: usize,
136 pub fallback_key_rows: usize,
137 pub direct_project_rows: usize,
138 pub fallback_project_rows: usize,
139 pub parallel_partitions: usize,
140 pub hint_learned_rows: usize,
141 pub hint_rejected_rows: usize,
142 pub hint_rows: usize,
143 pub hint_layout_misses: usize,
144 pub hint_disabled: bool,
145}
146
147impl From<&RowStreamStats> for NdjsonExecutionStats {
148 fn from(stats: &RowStreamStats) -> Self {
149 Self {
150 rows_scanned: stats.rows_scanned,
151 rows_emitted: stats.rows_emitted,
152 rows_filtered: stats.rows_filtered,
153 duplicate_rows: stats.duplicate_rows,
154 direct_filter_rows: stats.direct_filter_rows,
155 fallback_filter_rows: stats.fallback_filter_rows,
156 direct_key_rows: stats.direct_key_rows,
157 fallback_key_rows: stats.fallback_key_rows,
158 direct_project_rows: stats.direct_project_rows,
159 fallback_project_rows: stats.fallback_project_rows,
160 parallel_partitions: stats.parallel_partitions,
161 hint_learned_rows: 0,
162 hint_rejected_rows: 0,
163 hint_rows: 0,
164 hint_layout_misses: 0,
165 hint_disabled: false,
166 }
167 }
168}
169
170#[derive(Clone, Debug, Eq, PartialEq)]
171pub struct NdjsonExecutionReport {
172 pub route: NdjsonRouteExplain,
173 pub stats: NdjsonExecutionStats,
174}
175
176impl NdjsonExecutionReport {
177 pub fn new(route: NdjsonRouteExplain, stats: NdjsonExecutionStats) -> Self {
178 Self { route, stats }
179 }
180
181 pub fn emitted_only(route: NdjsonRouteExplain, rows_emitted: usize) -> Self {
182 Self {
183 route,
184 stats: NdjsonExecutionStats {
185 rows_emitted,
186 ..NdjsonExecutionStats::default()
187 },
188 }
189 }
190}
191
192pub(super) enum NdjsonRoutePlan {
193 RowLocal {
194 explain: NdjsonRouteExplain,
195 },
196 Rows {
197 explain: NdjsonRouteExplain,
198 plan: NdjsonRowsFilePlan,
199 },
200 Unsupported {
201 explain: NdjsonRouteExplain,
202 },
203}
204
205impl NdjsonRoutePlan {
206 pub(super) fn explain(&self) -> &NdjsonRouteExplain {
207 match self {
208 Self::RowLocal { explain }
209 | Self::Rows { explain, .. }
210 | Self::Unsupported { explain } => explain,
211 }
212 }
213
214}
215
216impl NdjsonRouteExplain {
217 pub fn matches(source: NdjsonSourceCaps) -> Self {
218 Self {
219 kind: NdjsonRouteKind::Matches,
220 source,
221 writer_path: None,
222 rows_plan: None,
223 fallback_reason: None,
224 }
225 }
226
227 pub fn is_rows_route(&self) -> bool {
228 self.rows_plan.is_some()
229 }
230
231 pub fn is_supported(&self) -> bool {
232 self.kind != NdjsonRouteKind::UnsupportedRows
233 }
234
235 pub fn unsupported_message(&self) -> Option<String> {
236 if self.is_supported() {
237 return None;
238 }
239 Some(
240 self.fallback_reason
241 .map(|reason| reason.to_string())
242 .unwrap_or_else(|| "unsupported $.rows() NDJSON route".to_string()),
243 )
244 }
245}
246
247pub(super) fn ndjson_route_plan(
248 engine: &JetroEngine,
249 source: NdjsonSourceMode,
250 query: &str,
251 options: NdjsonOptions,
252) -> Result<NdjsonRoutePlan, JetroEngineError> {
253 let source = NdjsonSourceCaps::for_mode(source, options);
254 let Some(plan) = ndjson_rows_file_plan(query)? else {
255 return Ok(NdjsonRoutePlan::RowLocal {
256 explain: NdjsonRouteExplain {
257 kind: NdjsonRouteKind::RowLocal,
258 source,
259 writer_path: ndjson_writer_path_kind(engine, query),
260 rows_plan: None,
261 fallback_reason: None,
262 },
263 });
264 };
265
266 let rows_plan = plan.kind();
267 if plan.requires_file_backed_source() && source.mode == NdjsonSourceMode::Reader {
268 return Ok(NdjsonRoutePlan::Unsupported {
269 explain: NdjsonRouteExplain {
270 kind: NdjsonRouteKind::UnsupportedRows,
271 source,
272 writer_path: None,
273 rows_plan: Some(rows_plan),
274 fallback_reason: Some(NdjsonFallbackReason::FileBackedRowsRequired),
275 },
276 });
277 }
278
279 Ok(NdjsonRoutePlan::Rows {
280 explain: NdjsonRouteExplain {
281 kind: route_kind_for_rows_plan(rows_plan),
282 source,
283 writer_path: None,
284 rows_plan: Some(rows_plan),
285 fallback_reason: None,
286 },
287 plan,
288 })
289}
290
291pub fn ndjson_explain(
292 engine: &JetroEngine,
293 source: NdjsonSourceMode,
294 query: &str,
295 options: NdjsonOptions,
296) -> Result<NdjsonRouteExplain, JetroEngineError> {
297 Ok(ndjson_route_plan(engine, source, query, options)?
298 .explain()
299 .clone())
300}
301
302fn route_kind_for_rows_plan(plan: NdjsonRowsPlanKind) -> NdjsonRouteKind {
303 match plan {
304 NdjsonRowsPlanKind::Stream => NdjsonRouteKind::RowsStream,
305 NdjsonRowsPlanKind::Fanout => NdjsonRouteKind::RowsFanout,
306 NdjsonRowsPlanKind::Subquery => NdjsonRouteKind::RowsSubquery,
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[test]
315 fn route_explain_reports_row_local_and_rows_modes() {
316 let engine = JetroEngine::new();
317 let row = ndjson_explain(
318 &engine,
319 NdjsonSourceMode::Reader,
320 "$.name",
321 NdjsonOptions::default(),
322 )
323 .unwrap();
324 assert_eq!(row.kind, NdjsonRouteKind::RowLocal);
325 assert_eq!(row.writer_path, Some(NdjsonWriterPathKind::ByteExpr));
326 assert_eq!(row.source.to_string(), "reader");
327
328 let rows = ndjson_explain(
329 &engine,
330 NdjsonSourceMode::File,
331 "$.rows().take(1)",
332 NdjsonOptions::default(),
333 )
334 .unwrap();
335 assert_eq!(rows.kind, NdjsonRouteKind::RowsStream);
336 assert_eq!(rows.rows_plan, Some(NdjsonRowsPlanKind::Stream));
337 assert_eq!(rows.source.to_string(), "file+reverse+mmap+partitionable");
338 }
339
340 #[test]
341 fn route_explain_marks_reader_rows_subquery_unsupported() {
342 let engine = JetroEngine::new();
343 let route = ndjson_explain(
344 &engine,
345 NdjsonSourceMode::Reader,
346 r#"{head: $.rows().take(1)}"#,
347 NdjsonOptions::default(),
348 )
349 .unwrap();
350 assert_eq!(route.kind, NdjsonRouteKind::UnsupportedRows);
351 assert_eq!(
352 route.fallback_reason,
353 Some(NdjsonFallbackReason::FileBackedRowsRequired)
354 );
355 assert_eq!(route.kind.to_string(), "unsupported-rows");
356 assert_eq!(
357 route.fallback_reason.unwrap().to_string(),
358 "rows plan requires a file-backed NDJSON source"
359 );
360 assert!(!route.is_supported());
361 assert_eq!(
362 route.unsupported_message().unwrap(),
363 "rows plan requires a file-backed NDJSON source"
364 );
365 }
366
367 #[test]
368 fn route_explain_marks_reader_rows_fanout_unsupported() {
369 let engine = JetroEngine::new();
370 let query =
371 r#"let stream = $.rows(), a = stream.take(1), b = stream.count() in {a, b}"#;
372
373 let reader = ndjson_explain(
374 &engine,
375 NdjsonSourceMode::Reader,
376 query,
377 NdjsonOptions::default(),
378 )
379 .unwrap();
380 assert_eq!(reader.kind, NdjsonRouteKind::UnsupportedRows);
381 assert_eq!(reader.rows_plan, Some(NdjsonRowsPlanKind::Fanout));
382 assert_eq!(
383 reader.fallback_reason,
384 Some(NdjsonFallbackReason::FileBackedRowsRequired)
385 );
386
387 let file = ndjson_explain(
388 &engine,
389 NdjsonSourceMode::File,
390 query,
391 NdjsonOptions::default(),
392 )
393 .unwrap();
394 assert_eq!(file.kind, NdjsonRouteKind::RowsFanout);
395 assert!(file.is_supported());
396 assert!(file.fallback_reason.is_none());
397 }
398
399 #[test]
400 fn execution_stats_copy_row_stream_counters() {
401 let stream = RowStreamStats {
402 rows_scanned: 3,
403 rows_emitted: 2,
404 rows_filtered: 1,
405 direct_project_rows: 2,
406 parallel_partitions: 4,
407 ..RowStreamStats::default()
408 };
409 let stats = NdjsonExecutionStats::from(&stream);
410 assert_eq!(stats.rows_scanned, 3);
411 assert_eq!(stats.rows_emitted, 2);
412 assert_eq!(stats.rows_filtered, 1);
413 assert_eq!(stats.direct_project_rows, 2);
414 assert_eq!(stats.parallel_partitions, 4);
415 }
416}