jetro-core 0.5.0

jetro-core: parser, compiler, and VM for the Jetro JSON query language
Documentation
//! Barrier-stage execution for the composed path.
//! Handles stages that must see all input before producing output (sort, group_by, unique_by)
//! by collecting into `BarrierOutput` and continuing the chain over the resulting rows.

use crate::composed as cmp;
use crate::value::Val;

use super::composed_stage::key_from_kernel;
use super::{BodyKernel, Sink, Stage, StageStrategy};

/// Result of a barrier stage: either a new row list for downstream stages, or a finished value.
pub(super) enum BarrierOutput {
    /// Transformed row set to continue through the pipeline.
    Rows(Vec<Val>),
    /// Final result produced by a consuming barrier (e.g. `group_by`).
    Done(Val),
}

/// Executes a barrier stage over `buf`; returns `None` for unrecognised barriers or missing keys.
pub(super) fn run(
    stage: &Stage,
    kernel: &BodyKernel,
    strategy: StageStrategy,
    sink: &Sink,
    is_terminal: bool,
    buf: Vec<Val>,
) -> Option<BarrierOutput> {
    let rows = match stage {
        Stage::Reverse(_) => cmp::barrier_reverse(buf),
        Stage::Sort(spec) => {
            let key = match &spec.key {
                None => cmp::KeySource::None,
                Some(_) => key_from_kernel(kernel)?,
            };
            let mut out = match (strategy, spec.descending) {
                (StageStrategy::SortTopK(k), false) | (StageStrategy::SortBottomK(k), true) => {
                    cmp::barrier_top_k(buf, &key, k)
                }
                (StageStrategy::SortTopK(k), true) | (StageStrategy::SortBottomK(k), false) => {
                    cmp::barrier_bottom_k(buf, &key, k)
                }
                (_, false) | (_, true) => cmp::barrier_sort(buf, &key),
            };
            if spec.descending {
                out.reverse();
            }
            out
        }
        Stage::UniqueBy(None) => cmp::barrier_unique_by(buf, &cmp::KeySource::None),
        Stage::UniqueBy(Some(_)) => {
            let key = key_from_kernel(kernel)?;
            cmp::barrier_unique_by(buf, &key)
        }
        Stage::ExprBuiltin {
            method: crate::builtins::BuiltinMethod::GroupBy,
            ..
        } => {
            if !matches!(sink, Sink::Collect) || !is_terminal {
                return None;
            }
            let key = key_from_kernel(kernel)?;
            return Some(BarrierOutput::Done(cmp::barrier_group_by(buf, &key)));
        }
        _ => return None,
    };

    Some(BarrierOutput::Rows(rows))
}