jetro-core 0.5.10

jetro-core: parser, compiler, and VM for the Jetro JSON query language
Documentation
use super::stream_exec::CompiledRowStream;
use super::stream_plan::{lower_root_rows_query, RowStreamPlan, RowStreamSourceKind};
use super::stream_source::{DocumentRowSource, ValueRowSource};
use super::stream_types::RowStreamRowResult;
use crate::data::value::Val;
use crate::{EvalError, Jetro, JetroEngine};
use std::sync::Arc;

pub(crate) fn collect_document_rows(
    engine: &JetroEngine,
    document: &Jetro,
    query: &str,
) -> Result<Option<Val>, EvalError> {
    let Some(plan) = document_rows_stream_plan(query)? else {
        return Ok(None);
    };

    let (out, _) = run_document_rows(engine, document, &plan)?;
    Ok(Some(out))
}

fn run_document_rows(
    engine: &JetroEngine,
    document: &Jetro,
    plan: &RowStreamPlan,
) -> Result<(Val, super::stream_types::RowStreamStats), EvalError> {
    let mut stream = CompiledRowStream::new(&plan);
    let mut vm = engine.lock_vm();
    let mut out = Vec::new();

    let root = document.root_val_with(engine.keys())?;
    let mut source = DocumentRowSource::new(root, plan.direction);
    while let Some(row) = source.next_row() {
        if apply_document_row(&mut stream, &mut vm, row, &mut out)? {
            break;
        }
    }

    let stats = stream.stats().clone();
    Ok((Val::Arr(Arc::new(out)), stats))
}

fn apply_document_row(
    stream: &mut CompiledRowStream,
    vm: &mut crate::VM,
    row: Val,
    out: &mut Vec<Val>,
) -> Result<bool, EvalError> {
    if stream.is_exhausted() {
        return Ok(true);
    }
    match stream.apply_val_row(vm, row)? {
        RowStreamRowResult::Emit(value) => out.push(value),
        RowStreamRowResult::EmitBytes(bytes) => {
            return Err(EvalError(format!(
                "internal rows() stream error: byte output in document mode ({} bytes)",
                bytes.len()
            )));
        }
        RowStreamRowResult::Skip => {}
        RowStreamRowResult::Stop => return Ok(true),
    }
    Ok(false)
}

#[cfg(test)]
fn collect_document_rows_with_stats(
    engine: &JetroEngine,
    document: &Jetro,
    query: &str,
) -> Result<Option<(Val, super::stream_types::RowStreamStats)>, EvalError> {
    let Some(plan) = document_rows_stream_plan(query)? else {
        return Ok(None);
    };

    let (out, stats) = run_document_rows(engine, document, &plan)?;
    Ok(Some((out, stats)))
}

fn document_rows_stream_plan(query: &str) -> Result<Option<RowStreamPlan>, EvalError> {
    lower_root_rows_query(query, RowStreamSourceKind::DocumentRows)
        .map_err(|err| EvalError(err.to_string()))
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn document_rows_array_maps_elements() {
        let engine = JetroEngine::new();
        let document = engine.parse_value(json!([
            {"id": 1, "name": "Ada"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Cid"}
        ]));

        let out = collect_document_rows(&engine, &document, "$.rows().take(2).map($.name)")
            .unwrap()
            .unwrap();

        assert_eq!(serde_json::Value::from(out), json!(["Ada", "Bob"]));
    }

    #[test]
    fn document_rows_object_is_single_row() {
        let engine = JetroEngine::new();
        let document = engine.parse_value(json!({"id": 1}));

        let out = collect_document_rows(&engine, &document, "$.rows().map($.id)")
            .unwrap()
            .unwrap();

        assert_eq!(serde_json::Value::from(out), json!([1]));
    }

    #[test]
    fn document_rows_reverse_distinct_keeps_stream_order() {
        let engine = JetroEngine::new();
        let document = engine.parse_value(json!([
            {"id": "a", "v": 1},
            {"id": "b", "v": 2},
            {"id": "a", "v": 3},
            {"id": "c", "v": 4}
        ]));

        let out = collect_document_rows(
            &engine,
            &document,
            "$.rows().reverse().distinct_by($.id).take(2).map($.v)",
        )
        .unwrap()
        .unwrap();

        assert_eq!(serde_json::Value::from(out), json!([4, 3]));
    }

    #[test]
    fn document_rows_scalar_is_single_row() {
        let engine = JetroEngine::new();
        let document = engine.parse_value(json!(7));

        let out = collect_document_rows(&engine, &document, "$.rows().map(@ + 1)")
            .unwrap()
            .unwrap();

        assert_eq!(serde_json::Value::from(out), json!([8]));
    }

    #[test]
    fn document_rows_take_stops_without_scanning_full_array() {
        let engine = JetroEngine::new();
        let document = engine.parse_value(json!([
            {"id": 1},
            {"id": 2},
            {"id": 3},
            {"id": 4}
        ]));

        let (out, stats) =
            collect_document_rows_with_stats(&engine, &document, "$.rows().take(2).map($.id)")
                .unwrap()
                .unwrap();

        assert_eq!(serde_json::Value::from(out), json!([1, 2]));
        assert_eq!(stats.source, RowStreamSourceKind::DocumentRows);
        assert_eq!(stats.rows_scanned, 2);
        assert_eq!(stats.rows_emitted, 2);
    }
}