jetro_core/io/
ndjson_rows.rs1use super::stream_fanout::{lower_rows_fanout_expr, RowStreamFanoutPlan};
2use super::stream_plan::{lower_root_rows_query, RowStreamPlan, RowStreamSourceKind};
3use super::stream_subquery::{lower_single_rows_subquery, RowStreamSubqueryPlan};
4use crate::{EvalError, JetroEngineError};
5
6pub(crate) enum NdjsonRowsFilePlan {
7 Stream(RowStreamPlan),
8 Fanout(RowStreamFanoutPlan),
9 Subquery(RowStreamSubqueryPlan),
10}
11
12impl NdjsonRowsFilePlan {
13 pub(crate) fn kind(&self) -> NdjsonRowsPlanKind {
14 match self {
15 Self::Stream(_) => NdjsonRowsPlanKind::Stream,
16 Self::Fanout(_) => NdjsonRowsPlanKind::Fanout,
17 Self::Subquery(_) => NdjsonRowsPlanKind::Subquery,
18 }
19 }
20
21 pub(crate) fn requires_file_backed_source(&self) -> bool {
22 matches!(self, Self::Fanout(_) | Self::Subquery(_))
23 }
24}
25
26#[derive(Clone, Copy, Debug, Eq, PartialEq)]
27pub enum NdjsonRowsPlanKind {
28 Stream,
29 Fanout,
30 Subquery,
31}
32
33impl std::fmt::Display for NdjsonRowsPlanKind {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.write_str(match self {
36 Self::Stream => "stream",
37 Self::Fanout => "fanout",
38 Self::Subquery => "subquery",
39 })
40 }
41}
42
43pub fn ndjson_rows_plan_kind(query: &str) -> Result<Option<NdjsonRowsPlanKind>, JetroEngineError> {
44 Ok(ndjson_rows_file_plan(query)?.map(|plan| plan.kind()))
45}
46
47pub(crate) fn ndjson_rows_stream_plan(
48 query: &str,
49) -> Result<Option<RowStreamPlan>, JetroEngineError> {
50 lower_root_rows_query(query, RowStreamSourceKind::NdjsonRows)
51 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
52}
53
54pub(crate) fn ndjson_rows_file_plan(
55 query: &str,
56) -> Result<Option<NdjsonRowsFilePlan>, JetroEngineError> {
57 if let Some(plan) = ndjson_rows_stream_plan(query)? {
58 return Ok(Some(NdjsonRowsFilePlan::Stream(plan)));
59 }
60 if let Some(plan) = ndjson_rows_fanout_plan(query)? {
61 return Ok(Some(NdjsonRowsFilePlan::Fanout(plan)));
62 }
63 if let Some(plan) = ndjson_rows_subquery_plan(query)? {
64 return Ok(Some(NdjsonRowsFilePlan::Subquery(plan)));
65 }
66 Ok(None)
67}
68
69pub(super) fn ndjson_rows_subquery_plan(
70 query: &str,
71) -> Result<Option<RowStreamSubqueryPlan>, JetroEngineError> {
72 let Some(expr) = parse_ndjson_rows_expr(query)? else {
73 return Ok(None);
74 };
75 lower_single_rows_subquery(&expr, RowStreamSourceKind::NdjsonRows)
76 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
77}
78
79fn ndjson_rows_fanout_plan(query: &str) -> Result<Option<RowStreamFanoutPlan>, JetroEngineError> {
80 let Some(expr) = parse_ndjson_rows_expr(query)? else {
81 return Ok(None);
82 };
83 lower_rows_fanout_expr(&expr, RowStreamSourceKind::NdjsonRows)
84 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
85}
86
87fn parse_ndjson_rows_expr(
88 query: &str,
89) -> Result<Option<crate::parse::ast::Expr>, JetroEngineError> {
90 if !query.contains("$.rows") {
91 return Ok(None);
92 }
93 crate::parse::parser::parse(query)
94 .map(Some)
95 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn rows_plan_kind_reports_routing_family() {
104 assert_eq!(
105 ndjson_rows_plan_kind("$.rows().filter($.active).take(1)").unwrap(),
106 Some(NdjsonRowsPlanKind::Stream)
107 );
108 assert_eq!(NdjsonRowsPlanKind::Stream.to_string(), "stream");
109 assert_eq!(
110 ndjson_rows_plan_kind(
111 r#"let stream = $.rows(), a = stream.filter($.active).count(), b = stream.filter($.paid).count() in {a, b}"#
112 )
113 .unwrap(),
114 Some(NdjsonRowsPlanKind::Fanout)
115 );
116 assert_eq!(
117 ndjson_rows_plan_kind(r#"{head: $.rows().take(1)}"#).unwrap(),
118 Some(NdjsonRowsPlanKind::Subquery)
119 );
120 assert_eq!(ndjson_rows_plan_kind("$.id").unwrap(), None);
121 }
122}