Skip to main content

jetro_core/io/
ndjson_rows.rs

1use super::stream_fanout::{lower_rows_fanout_expr, RowStreamFanoutPlan};
2use super::stream_plan::{lower_root_rows_query, RowStreamPlan, RowStreamSourceKind};
3use super::stream_subquery::{lower_single_rows_subquery, RowStreamSubqueryPlan};
4use crate::{EvalError, JetroEngineError};
5
6pub(crate) enum NdjsonRowsFilePlan {
7    Stream(RowStreamPlan),
8    Fanout(RowStreamFanoutPlan),
9    Subquery(RowStreamSubqueryPlan),
10}
11
12impl NdjsonRowsFilePlan {
13    pub(crate) fn kind(&self) -> NdjsonRowsPlanKind {
14        match self {
15            Self::Stream(_) => NdjsonRowsPlanKind::Stream,
16            Self::Fanout(_) => NdjsonRowsPlanKind::Fanout,
17            Self::Subquery(_) => NdjsonRowsPlanKind::Subquery,
18        }
19    }
20
21    pub(crate) fn requires_file_backed_source(&self) -> bool {
22        matches!(self, Self::Fanout(_) | Self::Subquery(_))
23    }
24}
25
26#[derive(Clone, Copy, Debug, Eq, PartialEq)]
27pub enum NdjsonRowsPlanKind {
28    Stream,
29    Fanout,
30    Subquery,
31}
32
33impl std::fmt::Display for NdjsonRowsPlanKind {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.write_str(match self {
36            Self::Stream => "stream",
37            Self::Fanout => "fanout",
38            Self::Subquery => "subquery",
39        })
40    }
41}
42
43pub fn ndjson_rows_plan_kind(query: &str) -> Result<Option<NdjsonRowsPlanKind>, JetroEngineError> {
44    Ok(ndjson_rows_file_plan(query)?.map(|plan| plan.kind()))
45}
46
47pub(crate) fn ndjson_rows_stream_plan(
48    query: &str,
49) -> Result<Option<RowStreamPlan>, JetroEngineError> {
50    lower_root_rows_query(query, RowStreamSourceKind::NdjsonRows)
51        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
52}
53
54pub(crate) fn ndjson_rows_file_plan(
55    query: &str,
56) -> Result<Option<NdjsonRowsFilePlan>, JetroEngineError> {
57    if let Some(plan) = ndjson_rows_stream_plan(query)? {
58        return Ok(Some(NdjsonRowsFilePlan::Stream(plan)));
59    }
60    if let Some(plan) = ndjson_rows_fanout_plan(query)? {
61        return Ok(Some(NdjsonRowsFilePlan::Fanout(plan)));
62    }
63    if let Some(plan) = ndjson_rows_subquery_plan(query)? {
64        return Ok(Some(NdjsonRowsFilePlan::Subquery(plan)));
65    }
66    Ok(None)
67}
68
69pub(super) fn ndjson_rows_subquery_plan(
70    query: &str,
71) -> Result<Option<RowStreamSubqueryPlan>, JetroEngineError> {
72    let Some(expr) = parse_ndjson_rows_expr(query)? else {
73        return Ok(None);
74    };
75    lower_single_rows_subquery(&expr, RowStreamSourceKind::NdjsonRows)
76        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
77}
78
79fn ndjson_rows_fanout_plan(query: &str) -> Result<Option<RowStreamFanoutPlan>, JetroEngineError> {
80    let Some(expr) = parse_ndjson_rows_expr(query)? else {
81        return Ok(None);
82    };
83    lower_rows_fanout_expr(&expr, RowStreamSourceKind::NdjsonRows)
84        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
85}
86
87fn parse_ndjson_rows_expr(
88    query: &str,
89) -> Result<Option<crate::parse::ast::Expr>, JetroEngineError> {
90    if !query.contains("$.rows") {
91        return Ok(None);
92    }
93    crate::parse::parser::parse(query)
94        .map(Some)
95        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn rows_plan_kind_reports_routing_family() {
104        assert_eq!(
105            ndjson_rows_plan_kind("$.rows().filter($.active).take(1)").unwrap(),
106            Some(NdjsonRowsPlanKind::Stream)
107        );
108        assert_eq!(NdjsonRowsPlanKind::Stream.to_string(), "stream");
109        assert_eq!(
110            ndjson_rows_plan_kind(
111                r#"let stream = $.rows(), a = stream.filter($.active).count(), b = stream.filter($.paid).count() in {a, b}"#
112            )
113            .unwrap(),
114            Some(NdjsonRowsPlanKind::Fanout)
115        );
116        assert_eq!(
117            ndjson_rows_plan_kind(r#"{head: $.rows().take(1)}"#).unwrap(),
118            Some(NdjsonRowsPlanKind::Subquery)
119        );
120        assert_eq!(ndjson_rows_plan_kind("$.id").unwrap(), None);
121    }
122}