# NDJSON Support for Jetro — Engineering Plan
**Audience:** senior Rust engineer joining the jetro project to implement NDJSON ingestion.
**Branch:** `v2.14`
**Status:** design complete, no code written.
**Repo entry points:** `jetro-core/src/lib.rs`, `jetro-core/src/exec/pipeline.rs`, `jetro-core/src/exec/pipeline/capability.rs`.
This document is the implementation contract. Read it end-to-end before touching code. The architecture has constraints the v2.14 branch enforces — bypassing them will produce something that compiles but loses to the legacy interpreted path on every benchmark.
---
## 1. What we're building
Newline-delimited JSON ingestion as a first-class jetro input. One JSON document per line, queries evaluated either per-line (default) or as a virtual array spanning all lines (`--stream`).
**Library only.** No CLI in this repo. The downstream CLI lives in a separate project. Every NDJSON behavior must be reachable through `JetroEngine` public methods.
**Public API target (in `jetro-core/src/lib.rs`):**
```rust
impl JetroEngine {
// Per-row mode: each line is its own $.
pub fn run_ndjson<R: BufRead, W: Write>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;
pub fn collect_ndjson<R: BufRead>(&self, r: R, query: &str) -> Result<Vec<Value>, JetroEngineError>;
pub fn for_each_ndjson<R, F>(&self, r: R, query: &str, f: F) -> Result<usize, JetroEngineError>
where R: BufRead, F: FnMut(Value);
// Stream-as-array: $ binds to virtual Vec of all rows.
pub fn run_ndjson_stream<R, W>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;
pub fn collect_ndjson_stream<R>(&self, r: R, query: &str) -> Result<Value, JetroEngineError>;
// Reverse iteration (tail-N, find-latest). File-only (needs mmap).
pub fn run_ndjson_rev<W>(&self, path: &Path, query: &str, w: W) -> Result<usize, JetroEngineError>;
pub fn collect_ndjson_rev(&self, path: &Path, query: &str) -> Result<Vec<Value>, JetroEngineError>;
// Parallel.
pub fn run_ndjson_par<R, W>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;
// Multi-source: concat or named binding for cross-stream queries.
pub fn run_ndjson_multi<W>(&self, sources: &[NdjsonSource], query: &str, w: W) -> Result<usize, JetroEngineError>;
pub fn run_ndjson_named<W>(&self, sources: &[(&str, NdjsonSource)], query: &str, w: W) -> Result<usize, JetroEngineError>;
}
```
`NdjsonSource` is `enum { File(PathBuf), Mmap(PathBuf), Reader(Box<dyn BufRead + Send>) }`.
---
## 2. Architectural constraints from v2.14
Before designing anything, internalize the parts of the codebase you will be plugging into. The relevant files are listed against each constraint so you can read them now.
### 2.1 `pipeline::Source` is two variants only
`jetro-core/src/exec/pipeline.rs:193`:
```rust
pub enum Source {
Receiver(Val),
FieldChain { keys: Arc<[Arc<str>]> },
}
```
Both assume a materialized document root (`&Val`). `Pipeline` is `Debug + Clone` and is stored in `JetroEngine::plan_cache` keyed by `PlanningContext`. **Any new variant must preserve `Debug + Clone` and must not leak source instance identity into the cache key**, or the plan cache degenerates to one entry per NDJSON stream.
### 2.2 Row-source layer is pipeline-internal
`jetro-core/src/exec/pipeline/row_source.rs` defines `ValRowSource<'a>`, `TapeRowSource<'a>`, `Rows<'a>`. All `pub(super)`. There is **no public `RowSource` trait**. The old plan said "add NDJSON as a new `RowSource` variant"; that is wrong shape — those enums are tied to in-memory row iteration after the source has been resolved.
The right layer for NDJSON is *above* the pipeline, not inside `row_source.rs`. Two places attach:
- **Per-row mode:** a top-level driver in `io::ndjson::PerRowDriver` that produces one `Val` (or `TapeView`) per line and calls into the existing single-document evaluation paths. No pipeline IR changes.
- **Stream-as-array mode:** a new `Source::Streaming(Arc<dyn StreamingSourceHandle>)` variant in `pipeline::Source`. `StreamingSourceHandle` is the *pipeline-internal* trait that wraps a public `RowProvider` and adapts it to the pipeline's iteration loop.
The public extensibility surface is the `RowProvider` trait in `io/mod.rs`. Future async sources, gzip/zstd-wrapped NDJSON, Parquet row groups, etc. all implement `RowProvider` without touching pipeline code.
### 2.3 `SourceCapabilities` controls access mode
`jetro-core/src/exec/pipeline/capability.rs:19`:
```rust
pub(crate) struct SourceCapabilities {
pub forward_stream: bool,
pub reverse_stream: bool,
pub indexed_array_child: bool,
pub tape_view: bool,
pub field_key_read: bool,
pub subtree_skip: bool,
pub selected_row_materialization: bool,
pub materialized_fallback: bool,
}
```
`Source::Streaming` must declare capabilities accurately. For a generic NDJSON reader:
```rust
SourceCapabilities {
forward_stream: true,
reverse_stream: false, // true only for mmap reverse driver
indexed_array_child: false, // true only when source supports byte-offset skip
tape_view: true, // we hand the pipeline TapeView per line
field_key_read: true, // simd_json lets us pick keys per line
subtree_skip: true, // partial parse via payload lanes
selected_row_materialization: true,
materialized_fallback: true,
}
```
`SourceCapabilities::choose_access(demand)` (capability.rs:64) decides the `SourceAccessMode`. For NDJSON the interesting cases are:
| `All` | `Forward` | read line, parse, emit |
| `FirstInput(n)` | `ForwardBounded(n)` | stop after `n` rows |
| `LastInput(n)` | `Reverse { outputs: n }` (only if `reverse_stream`) | reverse driver |
| `LastInput(1)` | `IndexedFromEnd(0)` (only if `indexed_array_child`) | seek to last record |
| `NthInput(k)` | `Indexed(k)` (only if `indexed_array_child`) | byte-offset skip |
For the generic forward driver, `reverse_stream` and `indexed_array_child` are false and `choose_access` will degrade `LastInput` / `NthInput` to `Forward` plus runtime skip — correct but slow. The reverse mmap driver sets both true.
### 2.4 Payload-lane and demand plumbing
`jetro-core/src/plan/analysis.rs` and `jetro-core/src/exec/pipeline/capability.rs` compute *what fields each row contributes to the downstream pipeline*. The result is two `FieldDemand` values on `Pipeline`: `payload_demand.scan_need` (fields read during scanning, e.g. inside filter predicates) and `payload_demand.result_need` (fields needed on rows that survive the sink). When `source_payload_lanes_supported = true`, the source is allowed to skip non-required fields entirely.
For NDJSON this is the **biggest single win** in the project. A query `$.map({id, name})` over 1 GB of wide rows touches two fields per line, not the whole document. The lane plan needs to flow from the planner into `RowProvider::request_lanes(plan)` before iteration starts.
Demand also covers `has` / `missing` propagation (`jetro-core/src/plan/chain_demand.rs`, v2.14). A predicate `filter(has(error))` lets the parser skip lines without an `error` field after a partial scan.
### 2.5 Direct positional planning
`jetro-core/src/exec/pipeline/plan.rs` and `jetro-core/src/exec/pipeline/indexed_exec.rs` lower `first / last / nth / take(K)` against indexed sources to direct seeks instead of streaming. For NDJSON:
- **`take(N)`** over a forward driver: planner emits `ForwardBounded(N)` and the driver returns `None` from `next_row` after `N` calls. Cancellation token short-circuits the rest of the file.
- **`last(N)`** over an mmap reverse driver: planner emits `Reverse { outputs: N }`; the driver pages in *only the file tail*. The kernel never reads the head. This is the killer feature for log-tail queries on multi-GB files.
- **`nth(K)`** over a driver that built an offset index (Phase 7b, optional): direct byte-seek.
These must reuse the existing positional dispatch in `indexed_exec.rs::single_select_indexed_dispatch`, not introduce a parallel path.
### 2.6 `TapeData` recycling
`jetro-core/src/data/tape.rs:209`:
```rust
pub fn parse(mut bytes: Vec<u8>) -> Result<Arc<Self>, String>;
```
Internally:
```rust
let mut buffers = simd_json::Buffers::new(bytes.len());
let tape = simd_json::to_tape_with_buffers(bytes, &mut buffers)?;
```
Per-line parsing must not allocate fresh `Buffers` on every line. Add an additive API:
```rust
// New, sibling of `parse`. Does not change existing signature.
pub fn parse_into(scratch: &mut TapeScratch, line: &mut Vec<u8>) -> Result<TapeView<'_>, TapeError>;
pub struct TapeScratch {
buffers: simd_json::Buffers,
nodes: Vec<simd_json::Node<'static>>,
}
```
Each line is moved into the reused `line: Vec<u8>`, simd-json mutates it in place, the tape nodes go into the reused `nodes` vec, and we hand back a `TapeView` that borrows from the scratch. The view is invalidated when `parse_into` is called again — enforce with `&mut self` borrowing or a `Drop`-based generation counter. **Do not change the existing `parse` signature**; callers across the codebase depend on it.
### 2.7 `JetroEngine` plan cache
`JetroEngine` already maintains a plan cache keyed by `PlanningContext` (`plan/physical.rs`) and a thread-local VM. Per-row mode therefore *amortizes the plan for free* across millions of lines. The cache is the reason per-row throughput will be competitive without further heroics. **Do not bypass it.** If `Source::Streaming` ends up in the cache key, every NDJSON call becomes a cold plan — verify by inspecting `PlanningContext` after wiring.
---
## 3. Module layout
```
jetro-core/src/
├── data/
│ └── tape.rs # +parse_into + TapeScratch (additive)
├── exec/
│ └── pipeline.rs # +Source::Streaming variant
│ └── pipeline/
│ ├── capability.rs # +SourceCapabilities for Streaming
│ ├── plan.rs # +positional dispatch for streaming
│ └── indexed_exec.rs # +streaming positional seek
├── plan/
│ ├── analysis.rs # +streaming in payload-lane analysis
│ └── physical.rs # +PlanNode for named streaming source
├── io/ # NEW MODULE
│ ├── mod.rs # RowProvider trait, NdjsonSource enum
│ ├── ndjson.rs # forward per-row driver, line scanner
│ ├── ndjson_rev.rs # reverse mmap memrchr driver
│ ├── ndjson_multi.rs # concat / named slurp, hash-build joins
│ ├── parallel.rs # rayon shard runner (fwd + rev)
│ ├── prefilter.rs # aho-corasick byte prefilter
│ └── lane.rs # selective tape walk for payload lanes
├── builtins/ # +join, left_join, lookup, diff, intersect, zip_streams
├── parse/grammar.pest # +@<ident> named-source ref
└── lib.rs # NDJSON API surface
tests/
├── ndjson_basic.rs # filter, take, sum
├── ndjson_stream.rs # cross-row aggregation
├── ndjson_lanes.rs # partial parse correctness + bench
├── ndjson_rev.rs # tail-N, find-latest
├── ndjson_multi.rs # concat, named binding
├── ndjson_named_join.rs # cross-stream joins
├── ndjson_parallel.rs # rayon correctness + scaling
└── ndjson_prefilter.rs # soundness
jetro-book/src/recipes/ndjson.md # docs
```
Estimated total: ~3 500 LOC across the new module and the planner/pipeline hookups.
---
## 4. Core abstractions
### 4.1 `RowProvider` trait (public, in `io/mod.rs`)
```rust
pub trait RowProvider: Send {
/// Pull the next row. Returns None on EOF.
/// The returned RowView borrows from `scratch` until the next call.
fn next_row<'a>(
&'a mut self,
scratch: &'a mut RowScratch,
) -> Option<Result<RowView<'a>, RowError>>;
fn capabilities(&self) -> RowProviderCapabilities;
/// Called by the planner once before iteration. Provider may use this
/// to drive selective parsing.
fn request_lanes(&mut self, plan: &PayloadLanePlan) {
let _ = plan; // default: no-op (full parse)
}
/// Idempotent. Causes future next_row calls to return None.
fn cancel(&mut self) {}
}
pub struct RowScratch {
pub line_buf: Vec<u8>,
pub tape: TapeScratch,
}
pub enum RowView<'a> {
Tape(crate::data::view::TapeView<'a>),
Owned(Val), // fallback for non-simd-json builds
}
pub struct RowProviderCapabilities {
pub source: SourceCapabilities, // forwards into pipeline
pub line_byte_offset: bool, // can report &[u8] of current line for prefilter
pub deterministic_order: bool, // false for parallel
}
pub enum RowError {
InvalidJson { line_no: u64, source: String },
LineTooLarge { line_no: u64, len: usize },
Io(std::io::Error),
}
```
The trait is the seam the rest of the project does not need to know about. Implementing `RowProvider` is the contract for plugging a new ingestion format into jetro.
### 4.2 `StreamingSourceHandle` (pipeline-internal, in `exec/pipeline.rs`)
```rust
pub(crate) trait StreamingSourceHandle: Send + Sync + std::fmt::Debug {
fn capabilities(&self) -> SourceCapabilities;
fn request_lanes(&self, plan: &PayloadLanePlan);
/// Iterate rows, calling `f` for each. `f` returns Continue/Cancel.
/// Returns count consumed.
fn drive(&self, f: &mut dyn FnMut(RowView<'_>) -> RowDecision) -> Result<usize, RowError>;
}
#[derive(Debug, Clone)]
pub enum Source {
Receiver(Val),
FieldChain { keys: Arc<[Arc<str>]> },
Streaming(Arc<dyn StreamingSourceHandle>),
}
```
`Source: Clone` is satisfied by `Arc::clone`. `Source: Debug` requires an explicit `Debug` impl that does not include the underlying file path or reader identity (else it leaks into cache-key hashing). Print only the *capabilities*, not the source.
The `Pipeline` cache key in `JetroEngine::plan_cache` derives from `PlanningContext`, which today uses `Source` shape only. **Audit before merging**: confirm `Streaming` collapses to a single cache equivalence class regardless of which file is open.
### 4.3 `PayloadLanePlan` (in `plan/analysis.rs`)
Already mostly there for `ValueView`. Extend with explicit "selective parse" hint:
```rust
pub struct PayloadLanePlan {
pub scan_fields: FieldDemand, // fields read during scanning
pub result_fields: FieldDemand, // fields preserved into output rows
pub selective_parse: bool, // RowProvider should partial-parse if possible
}
```
`RowProvider::request_lanes` receives this once per query. Forward driver translates it to a `TapeFieldSet` consumed by the partial-parse path in `io/lane.rs`.
---
## 5. Phased implementation
Each phase compiles and tests independently. Do not skip ahead — Phase N reuses scaffolding from Phase N-1.
### Phase 0 — Wiring prep (no behavior change)
1. Create `io/mod.rs` with stubbed `RowProvider`, `RowScratch`, `RowView`, `RowProviderCapabilities`, `RowError`.
2. Add `Source::Streaming(Arc<dyn StreamingSourceHandle>)` behind a `#[cfg(feature = "ndjson")]` feature gate that is *off* by default until Phase 4.
3. Extend `SourceAccessMode` if needed (already covers all cases — verify).
4. CI green, no test changes.
**Acceptance:** `cargo build --features ndjson` succeeds. `cargo test` (default features) unchanged.
### Phase 1 — Per-row driver, correctness first
1. `io::ndjson::PerRowDriver<R: BufRead>`. Reusable `Vec<u8>` line buffer. Read via `BufRead::read_until(b'\n', ..)`. Strip trailing `\r?\n`.
2. Parse each line via `serde_json::from_slice` (no simd-json yet). Convert to `Val` via existing `Jetro::from_bytes` slow path (`lib.rs:443`).
3. Implement `JetroEngine::run_ndjson`, `collect_ndjson`. Per-row mode: for each line, build `Jetro` for the line, call `engine.collect_value(line_value, query)`. Plan cache makes this cheap.
4. Output writer: `BufWriter`, one `serde_json::to_writer` + newline per result.
5. Tests in `tests/ndjson_basic.rs`: filter, take, sum, count, projection. Cover trailing-newline-less last line, empty file, single line, blank line policy (skip).
6. Doc test in `lib.rs`.
**Bench placeholder:** measure throughput on a 100 MB file. This is the baseline for Phase 2.
### Phase 2 — simd-json fast path with tape reuse
1. Implement `TapeData::parse_into(&mut TapeScratch, &mut Vec<u8>) -> Result<TapeView<'_>, TapeError>` (additive). Recycle `simd_json::Buffers` and the nodes vec.
2. `PerRowDriver` switches to `parse_into` over a reused byte scratch.
3. Per-row mode: `TapeView` rooted at the line's tape feeds the existing `TapeView` collect path (`exec/router.rs` already handles tape-rooted queries — verify).
4. Feature-gate behind `simd-json` (already default). Fall back to `serde_json` path when the feature is off.
5. Bench cold/warm per-line cost: target **≥ 3×** over Phase 1.
6. Steady-state allocation check (use `dhat` or `cargo-llvm-lines`): line buf + tape scratch + plan must be the only live allocations across iterations.
### Phase 3 — IO performance
1. Replace `read_until` with `memchr::memchr(b'\n', BufReader::buffer())` to avoid per-line `Vec` growth.
2. Handle `\r\n`, UTF-8 BOM (strip on first line only), trailing newline-less last line.
3. Optional `ndjson-mmap` feature: `memmap2::Mmap`, yields `&[u8]` line refs (zero-copy into tape scratch).
4. Bench 1 GB file: target near-disk throughput on warm cache.
### Phase 3b — Reverse mmap (tail mode)
1. `io::ndjson_rev::MmapRevDriver` reading lines EOF → BOF via `memchr::memrchr` over an mmap slice.
2. Chunked reverse scan in 64 KiB windows. Cache the window and boundary so we do not re-scan; the worst case must be `O(file_size)`, not `O(file_size²)`.
3. Edge cases: missing trailing newline, empty file, single line, BOM at start.
4. Wire `SourceCapabilities` with `reverse_stream: true` and `indexed_array_child: true` (only for `last(1)`).
5. **Critical integration:** `take(N)` over the reverse driver must lower through `indexed_exec.rs::single_select_indexed_dispatch`. The kernel then pages in only the file tail — `tail(N)` becomes `O(N)` regardless of file size. Verify with `mincore` or by reading the page-fault count under `time -v`.
6. Public API: `run_ndjson_rev(path, query, w)`, `collect_ndjson_rev`, `for_each_ndjson_rev`. File path only — generic `R: Read` cannot reverse, do not paper over this with buffer-everything.
7. Tests: tail-N via `take`, find-latest-matching, full reverse equals `forward → collect → reverse`.
### Phase 4 — Stream-as-array mode (`Source::Streaming` activates)
1. Flesh out `RowProvider` trait. Implement for `PerRowDriver`.
2. Implement `StreamingSourceHandle` wrapping a `Box<dyn RowProvider>`. Place behind `Arc` for pipeline IR.
3. Light up `Source::Streaming` in capability negotiation (`exec/pipeline/capability.rs`). `SourceAccessMode::Forward`, `ForwardBounded(n)`, or `Reverse { outputs }` depending on capabilities + demand.
4. Public API: `run_ndjson_stream`, `collect_ndjson_stream`. `$` binds to the virtual row stream. `take` / `find` / `first` / `sum` / `count` short-circuit through the cancellation hook (`Builtin::cancellation`, already exists).
5. Hook into `plan::demand`: streaming sources advertise `PullDemand::FirstInput(n)` properly so `take(n)` lowers to `ForwardBounded(n)` and stops reading from the file.
6. Tests: cross-row aggregation, early termination via `take(N)`, `first()` over a million-line file reads at most one buffer.
### Phase 4b — Payload-lane integration
This is the v2.14 win. Treat it as a first-class phase, not a polish step.
1. In `plan/analysis.rs`, ensure existing payload-lane analysis sees `Source::Streaming` capabilities and emits a `PayloadLanePlan` with `selective_parse: true` when downstream demand is field-shaped.
2. Wire `RowProvider::request_lanes(plan)` into the planner: called once after `Pipeline::with_source` and before the first `next_row`.
3. Implement selective tape walk in `io/lane.rs`. Two implementations to bench:
- **(a)** custom partial parser that drives `simd_json` token-by-token, skipping non-required object branches via `tape.span(idx)` skip arithmetic;
- **(b)** full simd-json parse + selective `TapeView` projection.
Ship whichever wins. (a) is theoretically lower bound but (b) might win on cache locality.
4. Compose with `has` / `missing` demand: when a chain contains `filter(has(error))`, the driver does a fast key-presence pre-scan and skips parse on miss.
5. Tests in `tests/ndjson_lanes.rs`: assert that a query touching 2 of 50 fields parses ~2/50 of the bytes per line (measure via cycle counter or stub `Buffers`).
6. Bench narrow-projection query over wide-row NDJSON: target **≥ 2×** over Phase 4.
### Phase 5 — Parallel execution
1. `io/parallel.rs` rayon shard-by-line-offset. Align shard boundaries to newlines via memchr.
2. Each worker owns: line scratch, tape scratch, thread-local VM (existing).
3. Plan shared via `Arc<QueryPlan>` (existing).
4. Merge strategies:
- **Commutative aggregates** (`sum`, `count`, `min`, `max`): associative merge in the rayon reducer.
- **Ordered collect:** shard-indexed merge preserving input order. Each worker emits `(shard_idx, row_idx, value)`; the merger does a k-way merge.
- **`find` / `take(1)`:** first-match wins via `AtomicBool` cancellation. Plumb through the existing `Builtin::cancellation` hook so workers stop pulling rows once a match is found.
- **`unique` / `unique_by`:** concurrent hash set (e.g. `dashmap`) merged into a single set at the end. Document non-determinism in output order.
5. Reverse + parallel: shard from EOF → BOF with reverse-shard-index ordered merge. Workers process tail chunks but emit in EOF-first order across shards.
6. Bench 8 / 16 core scaling. Target near-linear for non-aggregate workloads.
7. **Determinism caveat:** FP `sum` is not associative across shards. Document explicitly; offer a deterministic ordered-sum option for users who need bit-equality.
### Phase 6 — Predicate byte-prefilter
1. Planner pass in `io/prefilter.rs`: detect filter shapes safe for byte-level skip. Conservative: only `@.field == "literal"` and `@.field contains "literal"` against ASCII-clean literals.
2. Build an aho-corasick automaton from the required substrings.
3. Pre-scan each line; on miss, skip simd-json parse entirely.
4. **Soundness checks (non-negotiable):**
- UTF-8 escape sequences (`"A"` vs `"A"`) must not produce false negatives. Either reject predicates whose literal would be ambiguously escaped, or normalize the input before comparison.
- Unicode normalization forms (NFC / NFD) — reject non-ASCII literals from the fast path.
5. Compose with payload lanes: prefilter prunes lines, lanes prune fields within survivors.
6. Bench 1% selectivity: target **10–50×** over Phase 4b.
### Phase 6b — Multi-source and cross-stream
**Use cases:** log correlation (`app.ndjson` ⋈ `trace.ndjson` on `trace_id`), enrichment lookup, set difference, snapshot diff.
1. `NdjsonSource` enum (`File`, `Mmap`, `Reader`).
2. Public API: `run_ndjson_multi`, `collect_ndjson_multi`, `run_ndjson_named`, `collect_ndjson_named`, `for_each_ndjson_multi`.
3. Multi-source `RowProvider` impl that concats sub-providers; preserves order.
4. Grammar extension in `parse/grammar.pest`: `@<ident>` as named-source reference, root-position only (disambiguate from `@` current-element by position).
5. Plan: bind `@name` to source slot in `plan/physical.rs`; new `PlanNode` variant for streaming source reference.
6. New builtins (live in `builtins/defs.rs`, identity entries in `for_each_builtin!`):
- `.join(@other, on: key)` — inner join
- `.left_join(@other, on: key)` — left outer
- `.lookup(@other, on: key)` — first-match enrichment
- `.diff(@other, on: key)` — set difference
- `.intersect(@other, on: key)` — intersection
- `.zip_streams(@other)` — positional pairing
7. Each follows the v2.14 trait pattern (`spec()`, `apply_stream`, `apply_barrier`, `propagate_demand`, `participates_in_demand`). Build-side fully materialized into hash index keyed by `Val` structural hash; probe side stays streaming.
8. Planner heuristic: smaller side as build (file-size stat or explicit annotation).
9. Compose with reverse, prefilter, lanes, parallel (probe side parallel, build side single-threaded).
10. Tests: cross-stream join correctness, build-side memory bounds, named-source binding, mixed source types.
**Deferred sub-phase:** spill-to-disk hash join when build side exceeds RAM. Not in scope for the initial release.
### Phase 7 — Documentation and release
1. Recipes in `jetro-book/src/recipes/ndjson.md` covering every mode: per-row, stream, mmap, parallel, prefilter, lanes, multi / named, reverse-tail.
2. Doc tests in `lib.rs` per public NDJSON entry point.
3. CHANGELOG entry.
4. README NDJSON section.
5. **Acceptance for release:** an external downstream consumer can reach every NDJSON mode through the public `JetroEngine` API alone. If a CLI needs to reach into private state, the public API is missing surface — that is a bug in this project, not the CLI.
---
## 6. Risks and policy decisions
Decisions you need to make explicit during implementation, with the option to defer to a config flag if reasonable defaults disagree with users.
| Mid-stream parse error | Abort on first error | Expose `strict: bool` option; alternative is skip + log to stderr |
| Line > N MB | Abort with `LineTooLarge` | Configurable cap; default 64 MB |
| Empty lines | Skip silently | Document; some producers emit them |
| BOM at file start | Strip on first line, error on subsequent | UTF-8 only; reject UTF-16 BOM |
| Parallel FP sum non-associative | Document, offer ordered-sum option | Users running financial / scientific workloads will want bit-equality |
| Reverse mode on non-seekable input | Compile-time error via API typing | `&Path` / `&File` only, no `R: Read` overload |
| Hash-join build side OOM | Runtime watermark warning + error | Add `@source.size_hint()` override |
| Prefilter false negative on Unicode | Reject predicate from fast path | Falls back to full parse, correct but slow |
| Grammar `@ident` vs `@` | Disambiguate by position | `@` standalone = current element, `@<ident>` only at root-binding position |
| `Source::Streaming` cache key leak | Audit `PlanningContext` | One cache entry per query shape, not per stream instance |
| `TapeData` API breakage | Additive `parse_into` only | Do not change `parse` signature |
---
## 7. Testing and benchmarking
### Correctness
- `tests/ndjson_basic.rs` — per-row filter / take / sum / count / projection.
- `tests/ndjson_stream.rs` — cross-row aggregation; `take` / `find` short-circuit verified by row count.
- `tests/ndjson_lanes.rs` — narrow query parses only requested fields. Cross-check against full-parse baseline for equality.
- `tests/ndjson_rev.rs` — tail-N, find-latest, `reverse == forward.collect().reverse()`.
- `tests/ndjson_multi.rs` — concat slurp, order preservation across sources.
- `tests/ndjson_named_join.rs` — inner, left, lookup, diff, intersect, zip; build-side memory bounds.
- `tests/ndjson_parallel.rs` — same query, serial vs parallel, results equal (modulo FP-sum tolerance).
- `tests/ndjson_prefilter.rs` — soundness for UTF-8 escapes, Unicode normalization, literal edge cases.
### Performance
Add to the existing `cargo bench -p jetro-core` suite. Track these workloads against baselines:
- **Per-row throughput** on a 1 GB synthetic file (100 B avg line). Goal: > 1 GB/s warm with simd-json + reused buffers.
- **Narrow projection** (2 of 50 fields) with payload lanes. Goal: ≥ 2× over full-parse baseline.
- **`take(100)` over 1 GB** file. Goal: linear in 100, not in file size.
- **`tail(100)` over 10 GB** mmap file. Goal: < 50 ms warm.
- **1% selectivity filter** with prefilter. Goal: 10–50× over full-parse.
- **8-core parallel** ordered collect. Goal: ≥ 6× single-thread.
---
## 8. Order of work for the implementer
1. Phase 0 (wiring prep) — half a day. Lands as one commit, no behavior change.
2. Phase 1 (correctness baseline) — one to two days. First commit users can exercise.
3. Phase 2 (simd-json + tape reuse) — two days. First commit with competitive numbers.
4. Phase 3 + 3b (IO perf + reverse) — three days. Tail-N tests pass here.
5. Phase 4 (stream mode) — two days. `Source::Streaming` first hits production paths here.
6. Phase 4b (payload lanes) — three days. Biggest perf win in the project.
7. Phase 5 (parallel) — three days.
8. Phase 6 (prefilter) — two days.
9. Phase 6b (multi / named) — four to five days. Largest single phase due to grammar, planner, and six new builtins.
10. Phase 7 (docs + release) — one to two days.
**Roughly four weeks of focused work for one senior engineer.** The hard parts are 4b (lanes — requires deep planner familiarity) and 6b (cross-stream — requires grammar / planner / builtin work).
---
## 9. Things to read before writing any code
In this order:
1. `jetro-core/src/lib.rs` — public surface and `JetroEngine` internals.
2. `jetro-core/src/exec/pipeline.rs` (lines 190–520) — `Source`, `Pipeline`, `PipelineBody`, `with_source`. Understand how a `Pipeline` is built.
3. `jetro-core/src/exec/pipeline/capability.rs` — `SourceCapabilities`, `SourceAccessMode`, `choose_access`. The capability vocabulary controls everything downstream.
4. `jetro-core/src/exec/pipeline/row_source.rs` — what NOT to extend. See why a public trait belongs above this file, not inside it.
5. `jetro-core/src/data/tape.rs` — `TapeData::parse`, `TapeView`, simd-json buffer reuse.
6. `jetro-core/src/plan/analysis.rs` and `jetro-core/src/plan/chain_demand.rs` — payload lanes, `has` / `missing` demand. Required for Phase 4b.
7. `jetro-core/src/exec/pipeline/plan.rs` and `indexed_exec.rs` — direct positional planning. Required for Phase 3b and Phase 4.
8. `jetro-core/src/builtins/builtin.rs` — `Builtin` trait surface. Required for Phase 6b.
9. `jetro-core/CLAUDE.md` — overall architecture; this document assumes it.
If anything in this plan contradicts the source after you have read it, **the source wins** — flag the contradiction and update this document.