jetro-core 0.5.10

jetro-core: parser, compiler, and VM for the Jetro JSON query language
Documentation
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::time::Instant;

use jetro_core::io::{DistinctFrontFilterKind, NdjsonOptions, NdjsonRowFrame, NullPayload};
use jetro_core::JetroEngine;

fn build_ndjson(rows: usize) -> Vec<u8> {
    let mut out = Vec::with_capacity(rows * 128);
    for i in 0..rows {
        let active = if i % 3 == 0 { "true" } else { "false" };
        let score = 10_000usize.saturating_sub(i % 10_000);
        out.extend_from_slice(
            format!(
                r#"{{"id":{i},"name":"user_{i}","active":{active},"score":{score},"attributes":[{{"key":"k1","value":"v_{i}_1","weight":1}},{{"key":"k2","value":"v_{i}_2","weight":2}},{{"key":"k3","value":"v_{i}_3","weight":3}}]}}"#
            )
            .as_bytes(),
        );
        out.push(b'\n');
    }
    out
}

fn bench(engine: &JetroEngine, data: &[u8], label: &str, query: &str) {
    let start = Instant::now();
    let rows = engine
        .run_ndjson(Cursor::new(data), query, std::io::sink())
        .expect("NDJSON query should run");
    let elapsed = start.elapsed();
    let mb = data.len() as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    println!("{label:<36} {rows:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s");
}

fn bench_matches(engine: &JetroEngine, data: &[u8], label: &str, predicate: &str, limit: usize) {
    let start = Instant::now();
    let rows = engine
        .run_ndjson_matches(Cursor::new(data), predicate, limit, std::io::sink())
        .expect("NDJSON match query should run");
    let elapsed = start.elapsed();
    let mb = data.len() as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    println!("{label:<36} {rows:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s");
}

fn bench_with_options(
    engine: &JetroEngine,
    data: &[u8],
    label: &str,
    query: &str,
    options: NdjsonOptions,
) {
    let start = Instant::now();
    let rows = engine
        .run_ndjson_with_options(Cursor::new(data), query, std::io::sink(), options)
        .expect("NDJSON query with options should run");
    let elapsed = start.elapsed();
    let mb = data.len() as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    println!("{label:<36} {rows:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s");
}

fn bench_rows_stream_reader(engine: &JetroEngine, data: &[u8], label: &str, query: &str) {
    let start = Instant::now();
    let rows = engine
        .run_ndjson(Cursor::new(data), query, std::io::sink())
        .expect("NDJSON rows() stream query should run");
    let elapsed = start.elapsed();
    let mb = data.len() as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    println!("{label:<36} {rows:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s");
}

fn bench_rows_stream_file(
    engine: &JetroEngine,
    path: &Path,
    bytes: usize,
    label: &str,
    query: &str,
) {
    let start = Instant::now();
    let rows = engine
        .run_ndjson_file(path, query, std::io::sink())
        .expect("file-backed NDJSON rows() stream query should run");
    let elapsed = start.elapsed();
    let mb = bytes as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    println!("{label:<36} {rows:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s");
}

fn build_compacted_ndjson(rows: usize, keys: usize) -> Vec<u8> {
    let mut out = Vec::with_capacity(rows * 96);
    let keys = keys.max(1);
    for i in 0..rows {
        let id = i % keys;
        let version = i / keys;
        out.extend_from_slice(
            format!(
                r#"{{"id":{id},"version":{version},"name":"user_{id}","active":{},"payload":{{"score":{},"tag":"t_{}"}}}}"#,
                i % 2 == 0,
                10_000usize.saturating_sub(i % 10_000),
                i % 17
            )
            .as_bytes(),
        );
        out.push(b'\n');
    }
    out
}

fn build_framed_ndjson(rows: usize) -> Vec<u8> {
    let mut out = Vec::with_capacity(rows * 144);
    for i in 0..rows {
        if i % 11 == 0 {
            out.extend_from_slice(format!("key-{i}|null\n").as_bytes());
            continue;
        }
        out.extend_from_slice(
            format!(
                r#"key-{i}|{{"id":{i},"name":"user_{i}","active":{},"score":{}}}"#,
                i % 3 == 0,
                10_000usize.saturating_sub(i % 10_000)
            )
            .as_bytes(),
        );
        out.push(b'\n');
    }
    out
}

fn write_temp_ndjson(label: &str, data: &[u8]) -> PathBuf {
    let mut path = std::env::temp_dir();
    path.push(format!("jetro-bench-{label}-{}.ndjson", std::process::id()));
    std::fs::write(&path, data).expect("bench temp file should be writable");
    path
}

fn bench_rev_distinct(
    engine: &JetroEngine,
    path: &Path,
    bytes: usize,
    label: &str,
    key_query: &str,
    query: &str,
    limit: usize,
) {
    let start = Instant::now();
    let stats = engine
        .run_ndjson_rev_distinct_by_with_stats(path, key_query, query, limit, std::io::sink())
        .expect("reverse distinct_by bench should run");
    let elapsed = start.elapsed();
    let mb = bytes as f64 / (1024.0 * 1024.0);
    let mb_s = mb / elapsed.as_secs_f64();
    let front = match stats.front_filter {
        DistinctFrontFilterKind::None => "none",
        DistinctFrontFilterKind::Bloom => "bloom",
        DistinctFrontFilterKind::Cuckoo => "cuckoo",
    };
    println!(
        "{label:<36} {:>8} rows {elapsed:>10.3?} {mb_s:>8.1} MiB/s dup={:<8} key={}/{} val={}/{} front={front}",
        stats.emitted,
        stats.duplicate_rows,
        stats.direct_key_rows,
        stats.fallback_key_rows,
        stats.direct_value_rows,
        stats.fallback_value_rows,
    );
}

fn main() {
    let rows = std::env::args()
        .nth(1)
        .and_then(|arg| arg.parse::<usize>().ok())
        .unwrap_or(200_000);
    let data = build_ndjson(rows);
    let engine = JetroEngine::new();

    println!(
        "NDJSON cold-path core bench: {} rows, {:.1} MiB",
        rows,
        data.len() as f64 / (1024.0 * 1024.0)
    );
    bench(&engine, &data, "root int field", "id");
    bench(&engine, &data, "root string field", "name");
    bench(&engine, &data, "root string upper", "name.upper()");
    bench(&engine, &data, "array len", "attributes.len()");
    bench(
        &engine,
        &data,
        "first nested value",
        "attributes.first().value",
    );
    bench(
        &engine,
        &data,
        "first nested upper",
        "attributes.first().key.upper()",
    );
    bench(&engine, &data, "map nested keys", "attributes.map(@.key)");
    bench(
        &engine,
        &data,
        "map nested pairs",
        "attributes.map([@.key, @.value])",
    );
    bench(
        &engine,
        &data,
        "map nested objects",
        "attributes.map({key: @.key, value: @.value})",
    );
    bench(
        &engine,
        &data,
        "filter nested count",
        r#"attributes.filter(@.value.contains("_3")).len()"#,
    );
    bench(
        &engine,
        &data,
        "filter numeric count",
        "attributes.filter(@.value > \"v_100_1\").len()",
    );
    bench(
        &engine,
        &data,
        "filter nested map keys",
        r#"attributes.filter(@.value.contains("_3")).map(@.key)"#,
    );
    bench(
        &engine,
        &data,
        "filter nested map pairs",
        r#"attributes.filter(@.value.contains("_3")).map([@.key, @.value])"#,
    );
    bench(
        &engine,
        &data,
        "map numeric sum",
        "attributes.map(@.weight).sum()",
    );
    bench(
        &engine,
        &data,
        "filter numeric sum",
        r#"attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
    );
    bench(
        &engine,
        &data,
        "object projection",
        r#"{id: id, name: name, score: score, active: active}"#,
    );
    bench(
        &engine,
        &data,
        "object scalar projection",
        r#"{id: id, name: name.upper(), score: score, active: active}"#,
    );
    bench(
        &engine,
        &data,
        "array projection",
        r#"[id, name, score, active]"#,
    );
    bench(&engine, &data, "object keys", "$.keys()");
    bench_matches(&engine, &data, "match active rows", "active", rows);
    bench_matches(&engine, &data, "match score > 9900", "score > 9900", rows);
    bench_matches(
        &engine,
        &data,
        "match nested contains",
        r#"attributes.first().value.contains("_1")"#,
        rows,
    );

    let framed = build_framed_ndjson(rows);
    let framed_options =
        NdjsonOptions::default().with_row_frame(NdjsonRowFrame::DelimitedPayload {
            separator: b'|',
            null_payload: NullPayload::Skip,
        });
    println!("\nDelimited payload framing:");
    bench_with_options(
        &engine,
        &framed,
        "framed root projection",
        "$.name",
        framed_options,
    );
    bench_with_options(
        &engine,
        &framed,
        "framed rows take",
        "$.rows().take(1000).map($.id)",
        framed_options,
    );

    let data_path = write_temp_ndjson("rows", &data);
    let high_dup = build_compacted_ndjson(rows, (rows / 100).max(1));
    let low_dup = build_compacted_ndjson(rows, rows);
    let high_dup_path = write_temp_ndjson("high-dup", &high_dup);
    let low_dup_path = write_temp_ndjson("low-dup", &low_dup);
    println!("\nReverse compacted-topic distinct_by:");
    bench_rev_distinct(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "distinct high-dup direct",
        "id",
        r#"{id: id, version: version}"#,
        rows,
    );
    println!("\nExpression-level rows() streams:");
    bench_rows_stream_reader(
        &engine,
        &data,
        "rows take+project",
        "$.rows().take(1000).map($.name)",
    );
    bench_rows_stream_reader(
        &engine,
        &data,
        "rows filter+distinct+take",
        "$.rows().filter($.active == true).distinct_by($.id).take(1000).map({id: $.id, name: $.name})",
    );
    bench_rows_stream_file(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "rows reverse+take",
        "$.rows().reverse().take(1000).map($.id)",
    );
    bench_rows_stream_file(
        &engine,
        &data_path,
        data.len(),
        "rows filter+take file",
        "$.rows().filter($.score > 9900).take(1000).map({id: $.id, score: $.score})",
    );
    bench_rows_stream_file(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "rows reverse+distinct+take",
        "$.rows().reverse().distinct_by($.id).take(1000).map({id: $.id, version: $.version})",
    );
    bench_rows_stream_file(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "rows fallback key",
        "$.rows().reverse().distinct_by($.name.upper()).take(1000).map($.payload.score)",
    );
    bench_rev_distinct(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "distinct high-dup limit",
        "id",
        "payload.score",
        100,
    );
    bench_rev_distinct(
        &engine,
        &low_dup_path,
        low_dup.len(),
        "distinct low-dup direct",
        "id",
        "name",
        rows,
    );
    bench_rev_distinct(
        &engine,
        &high_dup_path,
        high_dup.len(),
        "distinct fallback key",
        "name.upper()",
        "payload.score",
        rows,
    );
    let _ = std::fs::remove_file(data_path);
    let _ = std::fs::remove_file(high_dup_path);
    let _ = std::fs::remove_file(low_dup_path);
}