1use super::ndjson::{ndjson_writer_path_kind, NdjsonOptions, NdjsonWriterPathKind};
2use super::ndjson_frame::NdjsonRowFrame;
3use super::ndjson_rows::{ndjson_rows_file_plan, NdjsonRowsFilePlan, NdjsonRowsPlanKind};
4use super::stream_types::RowStreamStats;
5use crate::{JetroEngine, JetroEngineError};
6
7#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8pub enum NdjsonSourceMode {
9 Reader,
10 File,
11}
12
13impl std::fmt::Display for NdjsonSourceMode {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 f.write_str(match self {
16 Self::Reader => "reader",
17 Self::File => "file",
18 })
19 }
20}
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq)]
23pub struct NdjsonSourceCaps {
24 pub mode: NdjsonSourceMode,
25 pub forward: bool,
26 pub reverse: bool,
27 pub mmap: bool,
28 pub partitionable: bool,
29 pub framed_payload: bool,
30}
31
32impl NdjsonSourceCaps {
33 pub fn for_mode(mode: NdjsonSourceMode, options: NdjsonOptions) -> Self {
34 match mode {
35 NdjsonSourceMode::Reader => Self::reader(options),
36 NdjsonSourceMode::File => Self::file(options),
37 }
38 }
39
40 pub fn reader(options: NdjsonOptions) -> Self {
41 Self {
42 mode: NdjsonSourceMode::Reader,
43 forward: true,
44 reverse: false,
45 mmap: false,
46 partitionable: false,
47 framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
48 }
49 }
50
51 pub fn file(options: NdjsonOptions) -> Self {
52 Self {
53 mode: NdjsonSourceMode::File,
54 forward: true,
55 reverse: true,
56 mmap: true,
57 partitionable: true,
58 framed_payload: options.row_frame != NdjsonRowFrame::JsonLine,
59 }
60 }
61}
62
63impl std::fmt::Display for NdjsonSourceCaps {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.mode)?;
66 if self.reverse {
67 f.write_str("+reverse")?;
68 }
69 if self.mmap {
70 f.write_str("+mmap")?;
71 }
72 if self.partitionable {
73 f.write_str("+partitionable")?;
74 }
75 if self.framed_payload {
76 f.write_str("+framed-payload")?;
77 }
78 Ok(())
79 }
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
83pub enum NdjsonRouteKind {
84 RowLocal,
85 Matches,
86 RowsStream,
87 RowsFanout,
88 RowsSubquery,
89 UnsupportedRows,
90}
91
92impl std::fmt::Display for NdjsonRouteKind {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.write_str(match self {
95 Self::RowLocal => "row-local",
96 Self::Matches => "matches",
97 Self::RowsStream => "rows-stream",
98 Self::RowsFanout => "rows-fanout",
99 Self::RowsSubquery => "rows-subquery",
100 Self::UnsupportedRows => "unsupported-rows",
101 })
102 }
103}
104
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum NdjsonFallbackReason {
107 FileBackedRowsRequired,
108}
109
110impl std::fmt::Display for NdjsonFallbackReason {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.write_str(match self {
113 Self::FileBackedRowsRequired => "rows plan requires a file-backed NDJSON source",
114 })
115 }
116}
117
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub struct NdjsonRouteExplain {
120 pub kind: NdjsonRouteKind,
121 pub source: NdjsonSourceCaps,
122 pub writer_path: Option<NdjsonWriterPathKind>,
123 pub rows_plan: Option<NdjsonRowsPlanKind>,
124 pub fallback_reason: Option<NdjsonFallbackReason>,
125}
126
127#[derive(Clone, Debug, Default, Eq, PartialEq)]
128pub struct NdjsonExecutionStats {
129 pub rows_scanned: usize,
130 pub rows_emitted: usize,
131 pub rows_filtered: usize,
132 pub duplicate_rows: usize,
133 pub direct_filter_rows: usize,
134 pub fallback_filter_rows: usize,
135 pub direct_key_rows: usize,
136 pub fallback_key_rows: usize,
137 pub direct_project_rows: usize,
138 pub fallback_project_rows: usize,
139 pub parallel_partitions: usize,
140 pub hint_learned_rows: usize,
141 pub hint_rejected_rows: usize,
142 pub hint_rows: usize,
143 pub hint_layout_misses: usize,
144 pub hint_disabled: bool,
145}
146
147impl From<&RowStreamStats> for NdjsonExecutionStats {
148 fn from(stats: &RowStreamStats) -> Self {
149 Self {
150 rows_scanned: stats.rows_scanned,
151 rows_emitted: stats.rows_emitted,
152 rows_filtered: stats.rows_filtered,
153 duplicate_rows: stats.duplicate_rows,
154 direct_filter_rows: stats.direct_filter_rows,
155 fallback_filter_rows: stats.fallback_filter_rows,
156 direct_key_rows: stats.direct_key_rows,
157 fallback_key_rows: stats.fallback_key_rows,
158 direct_project_rows: stats.direct_project_rows,
159 fallback_project_rows: stats.fallback_project_rows,
160 parallel_partitions: stats.parallel_partitions,
161 hint_learned_rows: 0,
162 hint_rejected_rows: 0,
163 hint_rows: 0,
164 hint_layout_misses: 0,
165 hint_disabled: false,
166 }
167 }
168}
169
170#[derive(Clone, Debug, Eq, PartialEq)]
171pub struct NdjsonExecutionReport {
172 pub route: NdjsonRouteExplain,
173 pub stats: NdjsonExecutionStats,
174}
175
176impl NdjsonExecutionReport {
177 pub fn new(route: NdjsonRouteExplain, stats: NdjsonExecutionStats) -> Self {
178 Self { route, stats }
179 }
180
181 pub fn emitted_only(route: NdjsonRouteExplain, rows_emitted: usize) -> Self {
182 Self {
183 route,
184 stats: NdjsonExecutionStats {
185 rows_emitted,
186 ..NdjsonExecutionStats::default()
187 },
188 }
189 }
190}
191
192pub(crate) enum NdjsonRoutePlan {
193 RowLocal {
194 explain: NdjsonRouteExplain,
195 },
196 Rows {
197 explain: NdjsonRouteExplain,
198 plan: NdjsonRowsFilePlan,
199 },
200 Unsupported {
201 explain: NdjsonRouteExplain,
202 },
203}
204
205impl NdjsonRoutePlan {
206 pub(crate) fn explain(&self) -> &NdjsonRouteExplain {
207 match self {
208 Self::RowLocal { explain }
209 | Self::Rows { explain, .. }
210 | Self::Unsupported { explain } => explain,
211 }
212 }
213}
214
215impl NdjsonRouteExplain {
216 pub fn matches(source: NdjsonSourceCaps) -> Self {
217 Self {
218 kind: NdjsonRouteKind::Matches,
219 source,
220 writer_path: None,
221 rows_plan: None,
222 fallback_reason: None,
223 }
224 }
225
226 pub fn is_rows_route(&self) -> bool {
227 self.rows_plan.is_some()
228 }
229
230 pub fn is_supported(&self) -> bool {
231 self.kind != NdjsonRouteKind::UnsupportedRows
232 }
233
234 pub fn unsupported_message(&self) -> Option<String> {
235 if self.is_supported() {
236 return None;
237 }
238 Some(
239 self.fallback_reason
240 .map(|reason| reason.to_string())
241 .unwrap_or_else(|| "unsupported $.rows() NDJSON route".to_string()),
242 )
243 }
244}
245
246pub(crate) fn ndjson_route_plan(
247 engine: &JetroEngine,
248 source: NdjsonSourceMode,
249 query: &str,
250 options: NdjsonOptions,
251) -> Result<NdjsonRoutePlan, JetroEngineError> {
252 let source = NdjsonSourceCaps::for_mode(source, options);
253 let Some(plan) = ndjson_rows_file_plan(query)? else {
254 return Ok(NdjsonRoutePlan::RowLocal {
255 explain: NdjsonRouteExplain {
256 kind: NdjsonRouteKind::RowLocal,
257 source,
258 writer_path: ndjson_writer_path_kind(engine, query),
259 rows_plan: None,
260 fallback_reason: None,
261 },
262 });
263 };
264
265 let rows_plan = plan.kind();
266 if plan.requires_file_backed_source() && source.mode == NdjsonSourceMode::Reader {
267 return Ok(NdjsonRoutePlan::Unsupported {
268 explain: NdjsonRouteExplain {
269 kind: NdjsonRouteKind::UnsupportedRows,
270 source,
271 writer_path: None,
272 rows_plan: Some(rows_plan),
273 fallback_reason: Some(NdjsonFallbackReason::FileBackedRowsRequired),
274 },
275 });
276 }
277
278 Ok(NdjsonRoutePlan::Rows {
279 explain: NdjsonRouteExplain {
280 kind: route_kind_for_rows_plan(rows_plan),
281 source,
282 writer_path: None,
283 rows_plan: Some(rows_plan),
284 fallback_reason: None,
285 },
286 plan,
287 })
288}
289
290pub fn ndjson_explain(
291 engine: &JetroEngine,
292 source: NdjsonSourceMode,
293 query: &str,
294 options: NdjsonOptions,
295) -> Result<NdjsonRouteExplain, JetroEngineError> {
296 Ok(ndjson_route_plan(engine, source, query, options)?
297 .explain()
298 .clone())
299}
300
301fn route_kind_for_rows_plan(plan: NdjsonRowsPlanKind) -> NdjsonRouteKind {
302 match plan {
303 NdjsonRowsPlanKind::Stream => NdjsonRouteKind::RowsStream,
304 NdjsonRowsPlanKind::Fanout => NdjsonRouteKind::RowsFanout,
305 NdjsonRowsPlanKind::Subquery => NdjsonRouteKind::RowsSubquery,
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn route_explain_reports_row_local_and_rows_modes() {
315 let engine = JetroEngine::new();
316 let row = ndjson_explain(
317 &engine,
318 NdjsonSourceMode::Reader,
319 "$.name",
320 NdjsonOptions::default(),
321 )
322 .unwrap();
323 assert_eq!(row.kind, NdjsonRouteKind::RowLocal);
324 assert_eq!(row.writer_path, Some(NdjsonWriterPathKind::ByteExpr));
325 assert_eq!(row.source.to_string(), "reader");
326
327 let rows = ndjson_explain(
328 &engine,
329 NdjsonSourceMode::File,
330 "$.rows().take(1)",
331 NdjsonOptions::default(),
332 )
333 .unwrap();
334 assert_eq!(rows.kind, NdjsonRouteKind::RowsStream);
335 assert_eq!(rows.rows_plan, Some(NdjsonRowsPlanKind::Stream));
336 assert_eq!(rows.source.to_string(), "file+reverse+mmap+partitionable");
337 }
338
339 #[test]
340 fn route_explain_marks_reader_rows_subquery_unsupported() {
341 let engine = JetroEngine::new();
342 let route = ndjson_explain(
343 &engine,
344 NdjsonSourceMode::Reader,
345 r#"{head: $.rows().take(1)}"#,
346 NdjsonOptions::default(),
347 )
348 .unwrap();
349 assert_eq!(route.kind, NdjsonRouteKind::UnsupportedRows);
350 assert_eq!(
351 route.fallback_reason,
352 Some(NdjsonFallbackReason::FileBackedRowsRequired)
353 );
354 assert_eq!(route.kind.to_string(), "unsupported-rows");
355 assert_eq!(
356 route.fallback_reason.unwrap().to_string(),
357 "rows plan requires a file-backed NDJSON source"
358 );
359 assert!(!route.is_supported());
360 assert_eq!(
361 route.unsupported_message().unwrap(),
362 "rows plan requires a file-backed NDJSON source"
363 );
364 }
365
366 #[test]
367 fn route_explain_marks_reader_rows_fanout_unsupported() {
368 let engine = JetroEngine::new();
369 let query = r#"let stream = $.rows(), a = stream.take(1), b = stream.count() in {a, b}"#;
370
371 let reader = ndjson_explain(
372 &engine,
373 NdjsonSourceMode::Reader,
374 query,
375 NdjsonOptions::default(),
376 )
377 .unwrap();
378 assert_eq!(reader.kind, NdjsonRouteKind::UnsupportedRows);
379 assert_eq!(reader.rows_plan, Some(NdjsonRowsPlanKind::Fanout));
380 assert_eq!(
381 reader.fallback_reason,
382 Some(NdjsonFallbackReason::FileBackedRowsRequired)
383 );
384
385 let file = ndjson_explain(
386 &engine,
387 NdjsonSourceMode::File,
388 query,
389 NdjsonOptions::default(),
390 )
391 .unwrap();
392 assert_eq!(file.kind, NdjsonRouteKind::RowsFanout);
393 assert!(file.is_supported());
394 assert!(file.fallback_reason.is_none());
395 }
396
397 #[test]
398 fn execution_stats_copy_row_stream_counters() {
399 let stream = RowStreamStats {
400 rows_scanned: 3,
401 rows_emitted: 2,
402 rows_filtered: 1,
403 direct_project_rows: 2,
404 parallel_partitions: 4,
405 ..RowStreamStats::default()
406 };
407 let stats = NdjsonExecutionStats::from(&stream);
408 assert_eq!(stats.rows_scanned, 3);
409 assert_eq!(stats.rows_emitted, 2);
410 assert_eq!(stats.rows_filtered, 1);
411 assert_eq!(stats.direct_project_rows, 2);
412 assert_eq!(stats.parallel_partitions, 4);
413 }
414}