jetro 0.5.12

Jetro - transform, query, and compare JSON
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# NDJSON Support for Jetro — Engineering Plan

**Audience:** senior Rust engineer joining the jetro project to implement NDJSON ingestion.
**Branch:** `v2.14`
**Status:** design complete, no code written.
**Repo entry points:** `jetro-core/src/lib.rs`, `jetro-core/src/exec/pipeline.rs`, `jetro-core/src/exec/pipeline/capability.rs`.

This document is the implementation contract. Read it end-to-end before touching code. The architecture has constraints the v2.14 branch enforces — bypassing them will produce something that compiles but loses to the legacy interpreted path on every benchmark.

---

## 1. What we're building

Newline-delimited JSON ingestion as a first-class jetro input. One JSON document per line, queries evaluated either per-line (default) or as a virtual array spanning all lines (`--stream`).

**Library only.** No CLI in this repo. The downstream CLI lives in a separate project. Every NDJSON behavior must be reachable through `JetroEngine` public methods.

**Public API target (in `jetro-core/src/lib.rs`):**

```rust
impl JetroEngine {
    // Per-row mode: each line is its own $.
    pub fn run_ndjson<R: BufRead, W: Write>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;
    pub fn collect_ndjson<R: BufRead>(&self, r: R, query: &str) -> Result<Vec<Value>, JetroEngineError>;
    pub fn for_each_ndjson<R, F>(&self, r: R, query: &str, f: F) -> Result<usize, JetroEngineError>
        where R: BufRead, F: FnMut(Value);

    // Stream-as-array: $ binds to virtual Vec of all rows.
    pub fn run_ndjson_stream<R, W>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;
    pub fn collect_ndjson_stream<R>(&self, r: R, query: &str) -> Result<Value, JetroEngineError>;

    // Reverse iteration (tail-N, find-latest). File-only (needs mmap).
    pub fn run_ndjson_rev<W>(&self, path: &Path, query: &str, w: W) -> Result<usize, JetroEngineError>;
    pub fn collect_ndjson_rev(&self, path: &Path, query: &str) -> Result<Vec<Value>, JetroEngineError>;

    // Parallel.
    pub fn run_ndjson_par<R, W>(&self, r: R, query: &str, w: W) -> Result<usize, JetroEngineError>;

    // Multi-source: concat or named binding for cross-stream queries.
    pub fn run_ndjson_multi<W>(&self, sources: &[NdjsonSource], query: &str, w: W) -> Result<usize, JetroEngineError>;
    pub fn run_ndjson_named<W>(&self, sources: &[(&str, NdjsonSource)], query: &str, w: W) -> Result<usize, JetroEngineError>;
}
```

`NdjsonSource` is `enum { File(PathBuf), Mmap(PathBuf), Reader(Box<dyn BufRead + Send>) }`.

---

## 2. Architectural constraints from v2.14

Before designing anything, internalize the parts of the codebase you will be plugging into. The relevant files are listed against each constraint so you can read them now.

### 2.1 `pipeline::Source` is two variants only

`jetro-core/src/exec/pipeline.rs:193`:

```rust
pub enum Source {
    Receiver(Val),
    FieldChain { keys: Arc<[Arc<str>]> },
}
```

Both assume a materialized document root (`&Val`). `Pipeline` is `Debug + Clone` and is stored in `JetroEngine::plan_cache` keyed by `PlanningContext`. **Any new variant must preserve `Debug + Clone` and must not leak source instance identity into the cache key**, or the plan cache degenerates to one entry per NDJSON stream.

### 2.2 Row-source layer is pipeline-internal

`jetro-core/src/exec/pipeline/row_source.rs` defines `ValRowSource<'a>`, `TapeRowSource<'a>`, `Rows<'a>`. All `pub(super)`. There is **no public `RowSource` trait**. The old plan said "add NDJSON as a new `RowSource` variant"; that is wrong shape — those enums are tied to in-memory row iteration after the source has been resolved.

The right layer for NDJSON is *above* the pipeline, not inside `row_source.rs`. Two places attach:

- **Per-row mode:** a top-level driver in `io::ndjson::PerRowDriver` that produces one `Val` (or `TapeView`) per line and calls into the existing single-document evaluation paths. No pipeline IR changes.
- **Stream-as-array mode:** a new `Source::Streaming(Arc<dyn StreamingSourceHandle>)` variant in `pipeline::Source`. `StreamingSourceHandle` is the *pipeline-internal* trait that wraps a public `RowProvider` and adapts it to the pipeline's iteration loop.

The public extensibility surface is the `RowProvider` trait in `io/mod.rs`. Future async sources, gzip/zstd-wrapped NDJSON, Parquet row groups, etc. all implement `RowProvider` without touching pipeline code.

### 2.3 `SourceCapabilities` controls access mode

`jetro-core/src/exec/pipeline/capability.rs:19`:

```rust
pub(crate) struct SourceCapabilities {
    pub forward_stream: bool,
    pub reverse_stream: bool,
    pub indexed_array_child: bool,
    pub tape_view: bool,
    pub field_key_read: bool,
    pub subtree_skip: bool,
    pub selected_row_materialization: bool,
    pub materialized_fallback: bool,
}
```

`Source::Streaming` must declare capabilities accurately. For a generic NDJSON reader:

```rust
SourceCapabilities {
    forward_stream: true,
    reverse_stream: false,        // true only for mmap reverse driver
    indexed_array_child: false,   // true only when source supports byte-offset skip
    tape_view: true,              // we hand the pipeline TapeView per line
    field_key_read: true,         // simd_json lets us pick keys per line
    subtree_skip: true,           // partial parse via payload lanes
    selected_row_materialization: true,
    materialized_fallback: true,
}
```

`SourceCapabilities::choose_access(demand)` (capability.rs:64) decides the `SourceAccessMode`. For NDJSON the interesting cases are:

| `PullDemand` | Chosen access | What NDJSON must support |
| --- | --- | --- |
| `All` | `Forward` | read line, parse, emit |
| `FirstInput(n)` | `ForwardBounded(n)` | stop after `n` rows |
| `LastInput(n)` | `Reverse { outputs: n }` (only if `reverse_stream`) | reverse driver |
| `LastInput(1)` | `IndexedFromEnd(0)` (only if `indexed_array_child`) | seek to last record |
| `NthInput(k)` | `Indexed(k)` (only if `indexed_array_child`) | byte-offset skip |

For the generic forward driver, `reverse_stream` and `indexed_array_child` are false and `choose_access` will degrade `LastInput` / `NthInput` to `Forward` plus runtime skip — correct but slow. The reverse mmap driver sets both true.

### 2.4 Payload-lane and demand plumbing

`jetro-core/src/plan/analysis.rs` and `jetro-core/src/exec/pipeline/capability.rs` compute *what fields each row contributes to the downstream pipeline*. The result is two `FieldDemand` values on `Pipeline`: `payload_demand.scan_need` (fields read during scanning, e.g. inside filter predicates) and `payload_demand.result_need` (fields needed on rows that survive the sink). When `source_payload_lanes_supported = true`, the source is allowed to skip non-required fields entirely.

For NDJSON this is the **biggest single win** in the project. A query `$.map({id, name})` over 1 GB of wide rows touches two fields per line, not the whole document. The lane plan needs to flow from the planner into `RowProvider::request_lanes(plan)` before iteration starts.

Demand also covers `has` / `missing` propagation (`jetro-core/src/plan/chain_demand.rs`, v2.14). A predicate `filter(has(error))` lets the parser skip lines without an `error` field after a partial scan.

### 2.5 Direct positional planning

`jetro-core/src/exec/pipeline/plan.rs` and `jetro-core/src/exec/pipeline/indexed_exec.rs` lower `first / last / nth / take(K)` against indexed sources to direct seeks instead of streaming. For NDJSON:

- **`take(N)`** over a forward driver: planner emits `ForwardBounded(N)` and the driver returns `None` from `next_row` after `N` calls. Cancellation token short-circuits the rest of the file.
- **`last(N)`** over an mmap reverse driver: planner emits `Reverse { outputs: N }`; the driver pages in *only the file tail*. The kernel never reads the head. This is the killer feature for log-tail queries on multi-GB files.
- **`nth(K)`** over a driver that built an offset index (Phase 7b, optional): direct byte-seek.

These must reuse the existing positional dispatch in `indexed_exec.rs::single_select_indexed_dispatch`, not introduce a parallel path.

### 2.6 `TapeData` recycling

`jetro-core/src/data/tape.rs:209`:

```rust
pub fn parse(mut bytes: Vec<u8>) -> Result<Arc<Self>, String>;
```

Internally:

```rust
let mut buffers = simd_json::Buffers::new(bytes.len());
let tape = simd_json::to_tape_with_buffers(bytes, &mut buffers)?;
```

Per-line parsing must not allocate fresh `Buffers` on every line. Add an additive API:

```rust
// New, sibling of `parse`. Does not change existing signature.
pub fn parse_into(scratch: &mut TapeScratch, line: &mut Vec<u8>) -> Result<TapeView<'_>, TapeError>;

pub struct TapeScratch {
    buffers: simd_json::Buffers,
    nodes: Vec<simd_json::Node<'static>>,
}
```

Each line is moved into the reused `line: Vec<u8>`, simd-json mutates it in place, the tape nodes go into the reused `nodes` vec, and we hand back a `TapeView` that borrows from the scratch. The view is invalidated when `parse_into` is called again — enforce with `&mut self` borrowing or a `Drop`-based generation counter. **Do not change the existing `parse` signature**; callers across the codebase depend on it.

### 2.7 `JetroEngine` plan cache

`JetroEngine` already maintains a plan cache keyed by `PlanningContext` (`plan/physical.rs`) and a thread-local VM. Per-row mode therefore *amortizes the plan for free* across millions of lines. The cache is the reason per-row throughput will be competitive without further heroics. **Do not bypass it.** If `Source::Streaming` ends up in the cache key, every NDJSON call becomes a cold plan — verify by inspecting `PlanningContext` after wiring.

---

## 3. Module layout

```
jetro-core/src/
├── data/
│   └── tape.rs                        # +parse_into + TapeScratch (additive)
├── exec/
│   └── pipeline.rs                    # +Source::Streaming variant
│   └── pipeline/
│       ├── capability.rs              # +SourceCapabilities for Streaming
│       ├── plan.rs                    # +positional dispatch for streaming
│       └── indexed_exec.rs            # +streaming positional seek
├── plan/
│   ├── analysis.rs                    # +streaming in payload-lane analysis
│   └── physical.rs                    # +PlanNode for named streaming source
├── io/                                # NEW MODULE
│   ├── mod.rs                         # RowProvider trait, NdjsonSource enum
│   ├── ndjson.rs                      # forward per-row driver, line scanner
│   ├── ndjson_rev.rs                  # reverse mmap memrchr driver
│   ├── ndjson_multi.rs                # concat / named slurp, hash-build joins
│   ├── parallel.rs                    # rayon shard runner (fwd + rev)
│   ├── prefilter.rs                   # aho-corasick byte prefilter
│   └── lane.rs                        # selective tape walk for payload lanes
├── builtins/                          # +join, left_join, lookup, diff, intersect, zip_streams
├── parse/grammar.pest                 # +@<ident> named-source ref
└── lib.rs                             # NDJSON API surface

tests/
├── ndjson_basic.rs                    # filter, take, sum
├── ndjson_stream.rs                   # cross-row aggregation
├── ndjson_lanes.rs                    # partial parse correctness + bench
├── ndjson_rev.rs                      # tail-N, find-latest
├── ndjson_multi.rs                    # concat, named binding
├── ndjson_named_join.rs               # cross-stream joins
├── ndjson_parallel.rs                 # rayon correctness + scaling
└── ndjson_prefilter.rs                # soundness

jetro-book/src/recipes/ndjson.md       # docs
```

Estimated total: ~3 500 LOC across the new module and the planner/pipeline hookups.

---

## 4. Core abstractions

### 4.1 `RowProvider` trait (public, in `io/mod.rs`)

```rust
pub trait RowProvider: Send {
    /// Pull the next row. Returns None on EOF.
    /// The returned RowView borrows from `scratch` until the next call.
    fn next_row<'a>(
        &'a mut self,
        scratch: &'a mut RowScratch,
    ) -> Option<Result<RowView<'a>, RowError>>;

    fn capabilities(&self) -> RowProviderCapabilities;

    /// Called by the planner once before iteration. Provider may use this
    /// to drive selective parsing.
    fn request_lanes(&mut self, plan: &PayloadLanePlan) {
        let _ = plan; // default: no-op (full parse)
    }

    /// Idempotent. Causes future next_row calls to return None.
    fn cancel(&mut self) {}
}

pub struct RowScratch {
    pub line_buf: Vec<u8>,
    pub tape: TapeScratch,
}

pub enum RowView<'a> {
    Tape(crate::data::view::TapeView<'a>),
    Owned(Val), // fallback for non-simd-json builds
}

pub struct RowProviderCapabilities {
    pub source: SourceCapabilities,  // forwards into pipeline
    pub line_byte_offset: bool,      // can report &[u8] of current line for prefilter
    pub deterministic_order: bool,   // false for parallel
}

pub enum RowError {
    InvalidJson { line_no: u64, source: String },
    LineTooLarge { line_no: u64, len: usize },
    Io(std::io::Error),
}
```

The trait is the seam the rest of the project does not need to know about. Implementing `RowProvider` is the contract for plugging a new ingestion format into jetro.

### 4.2 `StreamingSourceHandle` (pipeline-internal, in `exec/pipeline.rs`)

```rust
pub(crate) trait StreamingSourceHandle: Send + Sync + std::fmt::Debug {
    fn capabilities(&self) -> SourceCapabilities;
    fn request_lanes(&self, plan: &PayloadLanePlan);
    /// Iterate rows, calling `f` for each. `f` returns Continue/Cancel.
    /// Returns count consumed.
    fn drive(&self, f: &mut dyn FnMut(RowView<'_>) -> RowDecision) -> Result<usize, RowError>;
}

#[derive(Debug, Clone)]
pub enum Source {
    Receiver(Val),
    FieldChain { keys: Arc<[Arc<str>]> },
    Streaming(Arc<dyn StreamingSourceHandle>),
}
```

`Source: Clone` is satisfied by `Arc::clone`. `Source: Debug` requires an explicit `Debug` impl that does not include the underlying file path or reader identity (else it leaks into cache-key hashing). Print only the *capabilities*, not the source.

The `Pipeline` cache key in `JetroEngine::plan_cache` derives from `PlanningContext`, which today uses `Source` shape only. **Audit before merging**: confirm `Streaming` collapses to a single cache equivalence class regardless of which file is open.

### 4.3 `PayloadLanePlan` (in `plan/analysis.rs`)

Already mostly there for `ValueView`. Extend with explicit "selective parse" hint:

```rust
pub struct PayloadLanePlan {
    pub scan_fields: FieldDemand,    // fields read during scanning
    pub result_fields: FieldDemand,  // fields preserved into output rows
    pub selective_parse: bool,       // RowProvider should partial-parse if possible
}
```

`RowProvider::request_lanes` receives this once per query. Forward driver translates it to a `TapeFieldSet` consumed by the partial-parse path in `io/lane.rs`.

---

## 5. Phased implementation

Each phase compiles and tests independently. Do not skip ahead — Phase N reuses scaffolding from Phase N-1.

### Phase 0 — Wiring prep (no behavior change)

1. Create `io/mod.rs` with stubbed `RowProvider`, `RowScratch`, `RowView`, `RowProviderCapabilities`, `RowError`.
2. Add `Source::Streaming(Arc<dyn StreamingSourceHandle>)` behind a `#[cfg(feature = "ndjson")]` feature gate that is *off* by default until Phase 4.
3. Extend `SourceAccessMode` if needed (already covers all cases — verify).
4. CI green, no test changes.

**Acceptance:** `cargo build --features ndjson` succeeds. `cargo test` (default features) unchanged.

### Phase 1 — Per-row driver, correctness first

1. `io::ndjson::PerRowDriver<R: BufRead>`. Reusable `Vec<u8>` line buffer. Read via `BufRead::read_until(b'\n', ..)`. Strip trailing `\r?\n`.
2. Parse each line via `serde_json::from_slice` (no simd-json yet). Convert to `Val` via existing `Jetro::from_bytes` slow path (`lib.rs:443`).
3. Implement `JetroEngine::run_ndjson`, `collect_ndjson`. Per-row mode: for each line, build `Jetro` for the line, call `engine.collect_value(line_value, query)`. Plan cache makes this cheap.
4. Output writer: `BufWriter`, one `serde_json::to_writer` + newline per result.
5. Tests in `tests/ndjson_basic.rs`: filter, take, sum, count, projection. Cover trailing-newline-less last line, empty file, single line, blank line policy (skip).
6. Doc test in `lib.rs`.

**Bench placeholder:** measure throughput on a 100 MB file. This is the baseline for Phase 2.

### Phase 2 — simd-json fast path with tape reuse

1. Implement `TapeData::parse_into(&mut TapeScratch, &mut Vec<u8>) -> Result<TapeView<'_>, TapeError>` (additive). Recycle `simd_json::Buffers` and the nodes vec.
2. `PerRowDriver` switches to `parse_into` over a reused byte scratch.
3. Per-row mode: `TapeView` rooted at the line's tape feeds the existing `TapeView` collect path (`exec/router.rs` already handles tape-rooted queries — verify).
4. Feature-gate behind `simd-json` (already default). Fall back to `serde_json` path when the feature is off.
5. Bench cold/warm per-line cost: target **≥ 3×** over Phase 1.
6. Steady-state allocation check (use `dhat` or `cargo-llvm-lines`): line buf + tape scratch + plan must be the only live allocations across iterations.

### Phase 3 — IO performance

1. Replace `read_until` with `memchr::memchr(b'\n', BufReader::buffer())` to avoid per-line `Vec` growth.
2. Handle `\r\n`, UTF-8 BOM (strip on first line only), trailing newline-less last line.
3. Optional `ndjson-mmap` feature: `memmap2::Mmap`, yields `&[u8]` line refs (zero-copy into tape scratch).
4. Bench 1 GB file: target near-disk throughput on warm cache.

### Phase 3b — Reverse mmap (tail mode)

1. `io::ndjson_rev::MmapRevDriver` reading lines EOF → BOF via `memchr::memrchr` over an mmap slice.
2. Chunked reverse scan in 64 KiB windows. Cache the window and boundary so we do not re-scan; the worst case must be `O(file_size)`, not `O(file_size²)`.
3. Edge cases: missing trailing newline, empty file, single line, BOM at start.
4. Wire `SourceCapabilities` with `reverse_stream: true` and `indexed_array_child: true` (only for `last(1)`).
5. **Critical integration:** `take(N)` over the reverse driver must lower through `indexed_exec.rs::single_select_indexed_dispatch`. The kernel then pages in only the file tail — `tail(N)` becomes `O(N)` regardless of file size. Verify with `mincore` or by reading the page-fault count under `time -v`.
6. Public API: `run_ndjson_rev(path, query, w)`, `collect_ndjson_rev`, `for_each_ndjson_rev`. File path only — generic `R: Read` cannot reverse, do not paper over this with buffer-everything.
7. Tests: tail-N via `take`, find-latest-matching, full reverse equals `forward → collect → reverse`.

### Phase 4 — Stream-as-array mode (`Source::Streaming` activates)

1. Flesh out `RowProvider` trait. Implement for `PerRowDriver`.
2. Implement `StreamingSourceHandle` wrapping a `Box<dyn RowProvider>`. Place behind `Arc` for pipeline IR.
3. Light up `Source::Streaming` in capability negotiation (`exec/pipeline/capability.rs`). `SourceAccessMode::Forward`, `ForwardBounded(n)`, or `Reverse { outputs }` depending on capabilities + demand.
4. Public API: `run_ndjson_stream`, `collect_ndjson_stream`. `$` binds to the virtual row stream. `take` / `find` / `first` / `sum` / `count` short-circuit through the cancellation hook (`Builtin::cancellation`, already exists).
5. Hook into `plan::demand`: streaming sources advertise `PullDemand::FirstInput(n)` properly so `take(n)` lowers to `ForwardBounded(n)` and stops reading from the file.
6. Tests: cross-row aggregation, early termination via `take(N)`, `first()` over a million-line file reads at most one buffer.

### Phase 4b — Payload-lane integration

This is the v2.14 win. Treat it as a first-class phase, not a polish step.

1. In `plan/analysis.rs`, ensure existing payload-lane analysis sees `Source::Streaming` capabilities and emits a `PayloadLanePlan` with `selective_parse: true` when downstream demand is field-shaped.
2. Wire `RowProvider::request_lanes(plan)` into the planner: called once after `Pipeline::with_source` and before the first `next_row`.
3. Implement selective tape walk in `io/lane.rs`. Two implementations to bench:
   - **(a)** custom partial parser that drives `simd_json` token-by-token, skipping non-required object branches via `tape.span(idx)` skip arithmetic;
   - **(b)** full simd-json parse + selective `TapeView` projection.
   Ship whichever wins. (a) is theoretically lower bound but (b) might win on cache locality.
4. Compose with `has` / `missing` demand: when a chain contains `filter(has(error))`, the driver does a fast key-presence pre-scan and skips parse on miss.
5. Tests in `tests/ndjson_lanes.rs`: assert that a query touching 2 of 50 fields parses ~2/50 of the bytes per line (measure via cycle counter or stub `Buffers`).
6. Bench narrow-projection query over wide-row NDJSON: target **≥ 2×** over Phase 4.

### Phase 5 — Parallel execution

1. `io/parallel.rs` rayon shard-by-line-offset. Align shard boundaries to newlines via memchr.
2. Each worker owns: line scratch, tape scratch, thread-local VM (existing).
3. Plan shared via `Arc<QueryPlan>` (existing).
4. Merge strategies:
   - **Commutative aggregates** (`sum`, `count`, `min`, `max`): associative merge in the rayon reducer.
   - **Ordered collect:** shard-indexed merge preserving input order. Each worker emits `(shard_idx, row_idx, value)`; the merger does a k-way merge.
   - **`find` / `take(1)`:** first-match wins via `AtomicBool` cancellation. Plumb through the existing `Builtin::cancellation` hook so workers stop pulling rows once a match is found.
   - **`unique` / `unique_by`:** concurrent hash set (e.g. `dashmap`) merged into a single set at the end. Document non-determinism in output order.
5. Reverse + parallel: shard from EOF → BOF with reverse-shard-index ordered merge. Workers process tail chunks but emit in EOF-first order across shards.
6. Bench 8 / 16 core scaling. Target near-linear for non-aggregate workloads.
7. **Determinism caveat:** FP `sum` is not associative across shards. Document explicitly; offer a deterministic ordered-sum option for users who need bit-equality.

### Phase 6 — Predicate byte-prefilter

1. Planner pass in `io/prefilter.rs`: detect filter shapes safe for byte-level skip. Conservative: only `@.field == "literal"` and `@.field contains "literal"` against ASCII-clean literals.
2. Build an aho-corasick automaton from the required substrings.
3. Pre-scan each line; on miss, skip simd-json parse entirely.
4. **Soundness checks (non-negotiable):**
   - UTF-8 escape sequences (`"A"` vs `"A"`) must not produce false negatives. Either reject predicates whose literal would be ambiguously escaped, or normalize the input before comparison.
   - Unicode normalization forms (NFC / NFD) — reject non-ASCII literals from the fast path.
5. Compose with payload lanes: prefilter prunes lines, lanes prune fields within survivors.
6. Bench 1% selectivity: target **10–50×** over Phase 4b.

### Phase 6b — Multi-source and cross-stream

**Use cases:** log correlation (`app.ndjson` ⋈ `trace.ndjson` on `trace_id`), enrichment lookup, set difference, snapshot diff.

1. `NdjsonSource` enum (`File`, `Mmap`, `Reader`).
2. Public API: `run_ndjson_multi`, `collect_ndjson_multi`, `run_ndjson_named`, `collect_ndjson_named`, `for_each_ndjson_multi`.
3. Multi-source `RowProvider` impl that concats sub-providers; preserves order.
4. Grammar extension in `parse/grammar.pest`: `@<ident>` as named-source reference, root-position only (disambiguate from `@` current-element by position).
5. Plan: bind `@name` to source slot in `plan/physical.rs`; new `PlanNode` variant for streaming source reference.
6. New builtins (live in `builtins/defs.rs`, identity entries in `for_each_builtin!`):
   - `.join(@other, on: key)` — inner join
   - `.left_join(@other, on: key)` — left outer
   - `.lookup(@other, on: key)` — first-match enrichment
   - `.diff(@other, on: key)` — set difference
   - `.intersect(@other, on: key)` — intersection
   - `.zip_streams(@other)` — positional pairing
7. Each follows the v2.14 trait pattern (`spec()`, `apply_stream`, `apply_barrier`, `propagate_demand`, `participates_in_demand`). Build-side fully materialized into hash index keyed by `Val` structural hash; probe side stays streaming.
8. Planner heuristic: smaller side as build (file-size stat or explicit annotation).
9. Compose with reverse, prefilter, lanes, parallel (probe side parallel, build side single-threaded).
10. Tests: cross-stream join correctness, build-side memory bounds, named-source binding, mixed source types.

**Deferred sub-phase:** spill-to-disk hash join when build side exceeds RAM. Not in scope for the initial release.

### Phase 7 — Documentation and release

1. Recipes in `jetro-book/src/recipes/ndjson.md` covering every mode: per-row, stream, mmap, parallel, prefilter, lanes, multi / named, reverse-tail.
2. Doc tests in `lib.rs` per public NDJSON entry point.
3. CHANGELOG entry.
4. README NDJSON section.
5. **Acceptance for release:** an external downstream consumer can reach every NDJSON mode through the public `JetroEngine` API alone. If a CLI needs to reach into private state, the public API is missing surface — that is a bug in this project, not the CLI.

---

## 6. Risks and policy decisions

Decisions you need to make explicit during implementation, with the option to defer to a config flag if reasonable defaults disagree with users.

| Risk | Recommended default | Notes |
| --- | --- | --- |
| Mid-stream parse error | Abort on first error | Expose `strict: bool` option; alternative is skip + log to stderr |
| Line > N MB | Abort with `LineTooLarge` | Configurable cap; default 64 MB |
| Empty lines | Skip silently | Document; some producers emit them |
| BOM at file start | Strip on first line, error on subsequent | UTF-8 only; reject UTF-16 BOM |
| Parallel FP sum non-associative | Document, offer ordered-sum option | Users running financial / scientific workloads will want bit-equality |
| Reverse mode on non-seekable input | Compile-time error via API typing | `&Path` / `&File` only, no `R: Read` overload |
| Hash-join build side OOM | Runtime watermark warning + error | Add `@source.size_hint()` override |
| Prefilter false negative on Unicode | Reject predicate from fast path | Falls back to full parse, correct but slow |
| Grammar `@ident` vs `@` | Disambiguate by position | `@` standalone = current element, `@<ident>` only at root-binding position |
| `Source::Streaming` cache key leak | Audit `PlanningContext` | One cache entry per query shape, not per stream instance |
| `TapeData` API breakage | Additive `parse_into` only | Do not change `parse` signature |

---

## 7. Testing and benchmarking

### Correctness

- `tests/ndjson_basic.rs` — per-row filter / take / sum / count / projection.
- `tests/ndjson_stream.rs` — cross-row aggregation; `take` / `find` short-circuit verified by row count.
- `tests/ndjson_lanes.rs` — narrow query parses only requested fields. Cross-check against full-parse baseline for equality.
- `tests/ndjson_rev.rs` — tail-N, find-latest, `reverse == forward.collect().reverse()`.
- `tests/ndjson_multi.rs` — concat slurp, order preservation across sources.
- `tests/ndjson_named_join.rs` — inner, left, lookup, diff, intersect, zip; build-side memory bounds.
- `tests/ndjson_parallel.rs` — same query, serial vs parallel, results equal (modulo FP-sum tolerance).
- `tests/ndjson_prefilter.rs` — soundness for UTF-8 escapes, Unicode normalization, literal edge cases.

### Performance

Add to the existing `cargo bench -p jetro-core` suite. Track these workloads against baselines:

- **Per-row throughput** on a 1 GB synthetic file (100 B avg line). Goal: > 1 GB/s warm with simd-json + reused buffers.
- **Narrow projection** (2 of 50 fields) with payload lanes. Goal: ≥ 2× over full-parse baseline.
- **`take(100)` over 1 GB** file. Goal: linear in 100, not in file size.
- **`tail(100)` over 10 GB** mmap file. Goal: < 50 ms warm.
- **1% selectivity filter** with prefilter. Goal: 10–50× over full-parse.
- **8-core parallel** ordered collect. Goal: ≥ 6× single-thread.

---

## 8. Order of work for the implementer

1. Phase 0 (wiring prep) — half a day. Lands as one commit, no behavior change.
2. Phase 1 (correctness baseline) — one to two days. First commit users can exercise.
3. Phase 2 (simd-json + tape reuse) — two days. First commit with competitive numbers.
4. Phase 3 + 3b (IO perf + reverse) — three days. Tail-N tests pass here.
5. Phase 4 (stream mode) — two days. `Source::Streaming` first hits production paths here.
6. Phase 4b (payload lanes) — three days. Biggest perf win in the project.
7. Phase 5 (parallel) — three days.
8. Phase 6 (prefilter) — two days.
9. Phase 6b (multi / named) — four to five days. Largest single phase due to grammar, planner, and six new builtins.
10. Phase 7 (docs + release) — one to two days.

**Roughly four weeks of focused work for one senior engineer.** The hard parts are 4b (lanes — requires deep planner familiarity) and 6b (cross-stream — requires grammar / planner / builtin work).

---

## 9. Things to read before writing any code

In this order:

1. `jetro-core/src/lib.rs` — public surface and `JetroEngine` internals.
2. `jetro-core/src/exec/pipeline.rs` (lines 190–520) — `Source`, `Pipeline`, `PipelineBody`, `with_source`. Understand how a `Pipeline` is built.
3. `jetro-core/src/exec/pipeline/capability.rs``SourceCapabilities`, `SourceAccessMode`, `choose_access`. The capability vocabulary controls everything downstream.
4. `jetro-core/src/exec/pipeline/row_source.rs` — what NOT to extend. See why a public trait belongs above this file, not inside it.
5. `jetro-core/src/data/tape.rs``TapeData::parse`, `TapeView`, simd-json buffer reuse.
6. `jetro-core/src/plan/analysis.rs` and `jetro-core/src/plan/chain_demand.rs` — payload lanes, `has` / `missing` demand. Required for Phase 4b.
7. `jetro-core/src/exec/pipeline/plan.rs` and `indexed_exec.rs` — direct positional planning. Required for Phase 3b and Phase 4.
8. `jetro-core/src/builtins/builtin.rs``Builtin` trait surface. Required for Phase 6b.
9. `jetro-core/CLAUDE.md` — overall architecture; this document assumes it.

If anything in this plan contradicts the source after you have read it, **the source wins** — flag the contradiction and update this document.