jetro-core 0.5.10

jetro-core: parser, compiler, and VM for the Jetro JSON query language
Documentation
use super::stream_plan::{lower_root_rows_expr, RowStreamPlan, RowStreamPlanError, RowStreamSourceKind};
use crate::builtins::BuiltinMethod;
use crate::parse::ast::{
    Arg, ArrayElem, Expr, FStringPart, MatchArm, ObjField, Step,
};

pub(super) const STREAM_BINDING: &str = "__jetro_rows_stream_0";

#[derive(Clone, Debug)]
pub(super) struct RowStreamSubqueryPlan {
    pub stream: RowStreamPlan,
    pub wrapper: Expr,
}

pub(super) fn lower_single_rows_subquery(
    expr: &Expr,
    source: RowStreamSourceKind,
) -> Result<Option<RowStreamSubqueryPlan>, RowStreamPlanError> {
    let mut lifted = None;
    let wrapper = replace_rows_expr(expr, source, &mut lifted)?;
    Ok(lifted.map(|stream| RowStreamSubqueryPlan { stream, wrapper }))
}

fn replace_rows_expr(
    expr: &Expr,
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<Expr, RowStreamPlanError> {
    if let Some((stream, replacement)) = lift_root_rows_prefix(expr, source)? {
        if lifted.is_some() {
            return Err(RowStreamPlanError::new(
                "multiple $.rows() stream subqueries are not supported",
            ));
        }
        *lifted = Some(stream);
        return Ok(replacement);
    }

    if let Some(stream) = lower_root_rows_expr(expr, source)? {
        if lifted.is_some() {
            return Err(RowStreamPlanError::new(
                "multiple $.rows() stream subqueries are not supported",
            ));
        }
        *lifted = Some(stream);
        return Ok(Expr::Ident(STREAM_BINDING.to_string()));
    }

    Ok(match expr {
        Expr::Chain(base, steps) => Expr::Chain(
            Box::new(replace_rows_expr(base, source, lifted)?),
            replace_steps(steps, source, lifted)?,
        ),
        Expr::BinOp(left, op, right) => Expr::BinOp(
            Box::new(replace_rows_expr(left, source, lifted)?),
            *op,
            Box::new(replace_rows_expr(right, source, lifted)?),
        ),
        Expr::UnaryNeg(inner) => {
            Expr::UnaryNeg(Box::new(replace_rows_expr(inner, source, lifted)?))
        }
        Expr::Not(inner) => Expr::Not(Box::new(replace_rows_expr(inner, source, lifted)?)),
        Expr::Kind { expr, ty, negate } => Expr::Kind {
            expr: Box::new(replace_rows_expr(expr, source, lifted)?),
            ty: *ty,
            negate: *negate,
        },
        Expr::Coalesce(left, right) => Expr::Coalesce(
            Box::new(replace_rows_expr(left, source, lifted)?),
            Box::new(replace_rows_expr(right, source, lifted)?),
        ),
        Expr::Object(fields) => Expr::Object(
            fields
                .iter()
                .map(|field| replace_obj_field(field, source, lifted))
                .collect::<Result<_, _>>()?,
        ),
        Expr::Array(elems) => Expr::Array(
            elems
                .iter()
                .map(|elem| replace_array_elem(elem, source, lifted))
                .collect::<Result<_, _>>()?,
        ),
        Expr::Let { name, init, body } => Expr::Let {
            name: name.clone(),
            init: Box::new(replace_rows_expr(init, source, lifted)?),
            body: Box::new(replace_rows_expr(body, source, lifted)?),
        },
        Expr::IfElse { cond, then_, else_ } => Expr::IfElse {
            cond: Box::new(replace_rows_expr(cond, source, lifted)?),
            then_: Box::new(replace_rows_expr(then_, source, lifted)?),
            else_: Box::new(replace_rows_expr(else_, source, lifted)?),
        },
        Expr::Try { body, default } => Expr::Try {
            body: Box::new(replace_rows_expr(body, source, lifted)?),
            default: Box::new(replace_rows_expr(default, source, lifted)?),
        },
        Expr::GlobalCall { name, args } => Expr::GlobalCall {
            name: name.clone(),
            args: replace_args(args, source, lifted)?,
        },
        Expr::Cast { expr, ty } => Expr::Cast {
            expr: Box::new(replace_rows_expr(expr, source, lifted)?),
            ty: *ty,
        },
        Expr::Match { scrutinee, arms } => Expr::Match {
            scrutinee: Box::new(replace_rows_expr(scrutinee, source, lifted)?),
            arms: arms
                .iter()
                .map(|arm| replace_match_arm(arm, source, lifted))
                .collect::<Result<_, _>>()?,
        },
        Expr::FString(parts) => Expr::FString(
            parts
                .iter()
                .map(|part| replace_fstring_part(part, source, lifted))
                .collect::<Result<_, _>>()?,
        ),
        _ => expr.clone(),
    })
}

fn lift_root_rows_prefix(
    expr: &Expr,
    source: RowStreamSourceKind,
) -> Result<Option<(RowStreamPlan, Expr)>, RowStreamPlanError> {
    let Expr::Chain(base, steps) = expr else {
        return Ok(None);
    };
    if !matches!(base.as_ref(), Expr::Root) {
        return Ok(None);
    }
    let Some((Step::Method(name, args), _)) = steps.split_first() else {
        return Ok(None);
    };
    if BuiltinMethod::from_name(name) != BuiltinMethod::Rows || !args.is_empty() {
        return Ok(None);
    }

    let mut split = 1usize;
    while let Some(Step::Method(name, _)) = steps.get(split) {
        if !is_rows_stream_method(name) {
            break;
        }
        split += 1;
    }
    if split == steps.len() {
        return Ok(None);
    }

    let prefix = Expr::Chain(Box::new(Expr::Root), steps[..split].to_vec());
    let Some(stream) = lower_root_rows_expr(&prefix, source)? else {
        return Ok(None);
    };
    let suffix = steps[split..].to_vec();
    let replacement = if suffix.is_empty() {
        Expr::Ident(STREAM_BINDING.to_string())
    } else {
        Expr::Chain(Box::new(Expr::Ident(STREAM_BINDING.to_string())), suffix)
    };
    Ok(Some((stream, replacement)))
}

fn is_rows_stream_method(name: &str) -> bool {
    matches!(
        BuiltinMethod::from_name(name),
        BuiltinMethod::Reverse
            | BuiltinMethod::Filter
            | BuiltinMethod::Find
            | BuiltinMethod::FindFirst
            | BuiltinMethod::FindOne
            | BuiltinMethod::UniqueBy
            | BuiltinMethod::Take
            | BuiltinMethod::First
            | BuiltinMethod::Map
    )
}

fn replace_steps(
    steps: &[crate::parse::ast::Step],
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<Vec<crate::parse::ast::Step>, RowStreamPlanError> {
    steps
        .iter()
        .map(|step| {
            Ok(match step {
                crate::parse::ast::Step::DynIndex(expr) => {
                    crate::parse::ast::Step::DynIndex(Box::new(replace_rows_expr(
                        expr, source, lifted,
                    )?))
                }
                crate::parse::ast::Step::InlineFilter(expr) => {
                    crate::parse::ast::Step::InlineFilter(Box::new(replace_rows_expr(
                        expr, source, lifted,
                    )?))
                }
                crate::parse::ast::Step::Method(name, args) => {
                    crate::parse::ast::Step::Method(name.clone(), replace_args(args, source, lifted)?)
                }
                crate::parse::ast::Step::OptMethod(name, args) => {
                    crate::parse::ast::Step::OptMethod(name.clone(), replace_args(args, source, lifted)?)
                }
                _ => step.clone(),
            })
        })
        .collect()
}

fn replace_args(
    args: &[Arg],
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<Vec<Arg>, RowStreamPlanError> {
    args.iter()
        .map(|arg| match arg {
            Arg::Pos(expr) => Ok(Arg::Pos(replace_rows_expr(expr, source, lifted)?)),
            Arg::Named(name, expr) => Ok(Arg::Named(
                name.clone(),
                replace_rows_expr(expr, source, lifted)?,
            )),
        })
        .collect()
}

fn replace_obj_field(
    field: &ObjField,
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<ObjField, RowStreamPlanError> {
    Ok(match field {
        ObjField::Kv {
            key,
            val,
            optional,
            cond,
        } => ObjField::Kv {
            key: key.clone(),
            val: replace_rows_expr(val, source, lifted)?,
            optional: *optional,
            cond: cond
                .as_ref()
                .map(|cond| replace_rows_expr(cond, source, lifted))
                .transpose()?,
        },
        ObjField::Dynamic { key, val } => ObjField::Dynamic {
            key: replace_rows_expr(key, source, lifted)?,
            val: replace_rows_expr(val, source, lifted)?,
        },
        _ => field.clone(),
    })
}

fn replace_array_elem(
    elem: &ArrayElem,
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<ArrayElem, RowStreamPlanError> {
    Ok(match elem {
        ArrayElem::Expr(expr) => ArrayElem::Expr(replace_rows_expr(expr, source, lifted)?),
        ArrayElem::Spread(expr) => ArrayElem::Spread(replace_rows_expr(expr, source, lifted)?),
    })
}

fn replace_match_arm(
    arm: &MatchArm,
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<MatchArm, RowStreamPlanError> {
    Ok(MatchArm {
        pat: arm.pat.clone(),
        guard: arm
            .guard
            .as_ref()
            .map(|guard| replace_rows_expr(guard, source, lifted))
            .transpose()?,
        body: replace_rows_expr(&arm.body, source, lifted)?,
    })
}

fn replace_fstring_part(
    part: &FStringPart,
    source: RowStreamSourceKind,
    lifted: &mut Option<RowStreamPlan>,
) -> Result<FStringPart, RowStreamPlanError> {
    Ok(match part {
        FStringPart::Interp { expr, fmt } => FStringPart::Interp {
            expr: replace_rows_expr(expr, source, lifted)?,
            fmt: fmt.clone(),
        },
        FStringPart::Lit(_) => part.clone(),
    })
}