Skip to main content

relon_eval_api/
buffer.rs

1//! Typesafe writer + reader for the host <-> wasm binary handshake.
2//!
3//! Spec: `docs/internal/adr/wasm-binary-layout-v1-2026-05-16.md`.
4//!
5//! [`BufferBuilder`] writes scalar fields by name into a pre-allocated
6//! byte buffer sized by a [`crate::layout::OffsetTable`]; [`BufferReader`]
7//! reads the same fields back. The intermediate `Vec<u8>` is exactly
8//! what the host hands the wasm module (`run_main(in_ptr, in_len,
9//! out_ptr)`) once Phase 2.b flips the wasm signature; in Phase 2.a
10//! the binary path is dormant but the writer/reader infrastructure is
11//! already in place so host code can be wired ahead of the codegen
12//! flip.
13//!
14//! Type safety is enforced at runtime by checking the schema's
15//! declared type for each field against the writer call (e.g.
16//! `write_int` on a `Bool` slot is [`BufferError::TypeMismatch`]).
17//! Static enforcement via codegen-generated wrappers is a Phase 3
18//! follow-up.
19
20use crate::layout::{FieldKind, ListElementKind, OffsetTable, SchemaLayout};
21use crate::schema_canonical::{Field, Schema, TypeRepr};
22use std::sync::Arc;
23use thiserror::Error;
24
25/// One row in the per-builder / per-reader field index. Carries the
26/// declared schema type alongside the offset-table data so writers
27/// and readers can dispatch on declared shape without re-walking the
28/// schema. Phase 10-c added `list_element` for `List<T>` dispatch.
29#[derive(Debug, Clone)]
30struct FieldEntry {
31    name: String,
32    ty: TypeRepr,
33    offset: usize,
34    size: usize,
35    kind: FieldKind,
36    list_element: Option<ListElementKind>,
37}
38
39/// One slot in a [`RelocLayout`] — every entry maps 1:1 to a
40/// `PointerIndirect` field in the parent [`OffsetTable`]. Inline fields
41/// don't need relocation and are filtered out at build time so the
42/// relocation walker can skip them without a `matches!` check per slot.
43#[derive(Debug)]
44pub(crate) struct RelocSlot {
45    /// Byte offset of the pointer slot within the record's fixed area.
46    offset: usize,
47    /// Per-element layout for `List<T>` fields, mirroring
48    /// [`FieldOffset::list_element`]. Drives the dispatch between the
49    /// pointer-array relocator and the inline schema recursion.
50    list_element: Option<ListElementKind>,
51    /// Sub-layout for nested `Schema` / `List<Schema>` fields. Pre-computed
52    /// at builder construction so relocation never has to call
53    /// [`SchemaLayout::offsets_for`] on the hot path. `None` for
54    /// `String` / `List<scalar>` slots where the tail record carries no
55    /// further pointers.
56    nested: Option<Arc<RelocLayout>>,
57    /// For a `PointerArray` list slot, the recursive descriptor of what
58    /// each entry points at — so the relocation walker can rebase the
59    /// entries' own internal pointers (`List<Schema>` sub-records,
60    /// `List<List<String|Schema>>` inner pointer-array lists). `None` for
61    /// inline-fixed / scalar element lists whose entries carry no further
62    /// pointers. This is the F5 piece: it lets a doubly-nested
63    /// pointer-array list (`List<List<String>>`) be relocated to one level
64    /// deeper than the `nested` schema cache alone could reach.
65    list_elem: Option<PtrArrayElem>,
66    /// For a direct `Option<T>` / `Result<T, E>` pointer slot, the variant
67    /// type whose selected payload may itself contain pointer slots that must
68    /// be relocated after a paste / arena rebase.
69    variant: Option<TypeRepr>,
70}
71
72/// Recursive descriptor of one `PointerArray` list level's element, used
73/// by the relocation walker to rebase the pointers an entry introduces.
74/// Built once from the field's declared [`TypeRepr`] at builder
75/// construction; mirrors the depth the verifier / reader recurse to.
76#[derive(Debug)]
77pub(crate) enum PtrArrayElem {
78    /// `List<String>` elements: each entry points at a `[len][utf8]`
79    /// String record with no further pointers — entry-pointer rebase only.
80    String,
81    /// `List<Schema>` elements: each entry points at a sub-record whose
82    /// own pointer slots are rebased via the cached [`RelocLayout`].
83    Schema(Arc<RelocLayout>),
84    /// `List<List<…>>` elements: each entry points at an **inner list
85    /// header** that is itself a pointer array. Recurse one level: rebase
86    /// the inner header's entries (and, for the inner element, whatever
87    /// `inner` describes) too. This is what lifts the `List<List<String>>`
88    /// / `List<List<Schema>>` cap — the inner pointer array's entries would
89    /// otherwise keep their child-buffer-relative offsets and dereference
90    /// `paste_base` bytes off.
91    InnerList {
92        /// Element descriptor of the inner list (`String` / `Schema` /
93        /// a still-deeper `InnerList`).
94        inner: Box<PtrArrayElem>,
95    },
96    /// `List<Option<T>>` / `List<Result<T, E>>` elements: each entry points
97    /// at a variant record whose selected payload may recursively contain
98    /// pointers.
99    Variant(TypeRepr),
100}
101
102/// Build the recursive [`PtrArrayElem`] descriptor for a pointer-array
103/// list whose declared element type is `element`. Returns `None` for an
104/// inline-fixed element (`Int` / `Float` / `Bool`) where no per-entry
105/// recursion is needed — the entry-pointer rebase the walker always does
106/// suffices.
107fn ptr_array_elem_for(element: &TypeRepr) -> Option<PtrArrayElem> {
108    match element {
109        TypeRepr::String => Some(PtrArrayElem::String),
110        TypeRepr::Schema { schema } => SchemaLayout::offsets_for(schema)
111            .ok()
112            .map(|sub| PtrArrayElem::Schema(RelocLayout::build(&sub, &schema.fields))),
113        // A `List<inner>` element. Only recurse when the inner list is
114        // itself a **pointer array** carrying internal pointers
115        // (`List<List<String|Schema|List>>`): then each entry points at an
116        // inner pointer-array header whose own entries must be rebased. An
117        // inner *inline-fixed* scalar list (`List<List<Int>>`) is a
118        // self-contained `[len][payload]` record with no internal pointers,
119        // so the outer entry-pointer rebase is all that is needed —
120        // recursing into it would mis-treat the i64 payload as entry
121        // pointers and corrupt the buffer.
122        TypeRepr::List { element: inner } => {
123            ptr_array_elem_for(inner).map(|inner_desc| PtrArrayElem::InnerList {
124                inner: Box::new(inner_desc),
125            })
126        }
127        TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
128            Some(PtrArrayElem::Variant(element.clone()))
129        }
130        // Inline-fixed scalar element: entry-pointer rebase only.
131        _ => None,
132    }
133}
134
135/// The strongest byte alignment any value of declared type `ty` requires
136/// **anywhere in its serialised graph** — fixed area *and* tail records.
137///
138/// The tail-area scalar-list writers
139/// ([`BufferBuilder::append_tail_record_with_inner_alignment`]) place an
140/// `Int` / `Float` payload on an 8-byte boundary, computed in the
141/// authoring buffer's coordinates. When a sub-record / list entry is later
142/// pasted into a parent and its pointers are relocated by adding the paste
143/// base, the payload bytes are **not** moved — so the reader's absolute
144/// `(rec_start + 4).next_multiple_of(8)` recovery only lands on the bytes
145/// the writer wrote when the paste base is itself a multiple of 8. A
146/// record's fixed-area `root_align` does not capture this (the 8-aligned
147/// content lives in the tail), so paste sites must additionally honour the
148/// **deep tail alignment** computed here. Returns 1 when nothing inside
149/// `ty` needs more than byte alignment.
150fn type_graph_align(ty: &TypeRepr) -> usize {
151    match ty {
152        // Inline-fixed scalar / String tail (`[len][utf8]`) — 4 suffices;
153        // the record-prefix `pad_to(4)` already covers them.
154        TypeRepr::Unit | TypeRepr::Bool | TypeRepr::Int | TypeRepr::Float | TypeRepr::String => 1,
155        TypeRepr::List { element } => match element.as_ref() {
156            // `List<Int>` / `List<Float>` tail payload sits on an 8-byte
157            // boundary; `List<Bool>` packs at 1.
158            TypeRepr::Int | TypeRepr::Float => 8,
159            TypeRepr::Bool => 1,
160            // String element pointer array — 4-aligned offsets only.
161            TypeRepr::String => 1,
162            // Recurse into the element graph (`List<List<Int>>`,
163            // `List<Schema>`, deeper nests).
164            other => type_graph_align(other),
165        },
166        TypeRepr::Schema { schema } => schema
167            .fields
168            .iter()
169            .map(|f| type_graph_align(&f.ty))
170            .max()
171            .unwrap_or(1),
172        TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
173            variant_record_align_runtime(ty)
174        }
175        // Closure values never reach the host-visible buffer protocol.
176        TypeRepr::Closure { .. } => 1,
177    }
178}
179
180/// The deepest paste alignment a schema's serialised graph requires:
181/// `max(root_align, deep tail alignment of every field)`. Used at every
182/// sub-record / list-entry paste so an 8-aligned `List<Int|Float>` (or a
183/// nested-list / nested-schema field that transitively carries one) in the
184/// tail lands at an absolute offset where the reader's alignment recovery
185/// is correct after relocation.
186fn schema_paste_align(layout: &OffsetTable, fields: &[Field]) -> usize {
187    let tail = fields
188        .iter()
189        .map(|f| type_graph_align(&f.ty))
190        .max()
191        .unwrap_or(1);
192    layout.root_align.max(tail).max(1)
193}
194
195fn payload_slot_layout_runtime(ty: &TypeRepr) -> (usize, usize) {
196    match ty {
197        TypeRepr::Unit | TypeRepr::Bool => (1, 1),
198        TypeRepr::Int | TypeRepr::Float => (8, 8),
199        TypeRepr::String
200        | TypeRepr::List { .. }
201        | TypeRepr::Schema { .. }
202        | TypeRepr::Option { .. }
203        | TypeRepr::Result { .. }
204        | TypeRepr::Enum { .. }
205        | TypeRepr::Closure { .. } => (4, 4),
206    }
207}
208
209#[derive(Debug, Clone)]
210struct SelectedVariant {
211    name: String,
212    payload: Option<SelectedVariantPayload>,
213}
214
215#[derive(Debug, Clone)]
216struct SelectedVariantPayload {
217    ty: TypeRepr,
218    /// `Some(key)` for scalar-like single-field wrappers (`Option.Some`,
219    /// `Result.Ok`, `Result.Err`). `None` means the payload type is a schema
220    /// whose decoded field map is used directly for a custom enum variant.
221    key: Option<&'static str>,
222}
223
224fn variant_payload_types(ty: &TypeRepr) -> Vec<TypeRepr> {
225    match ty {
226        TypeRepr::Option { inner } => vec![inner.as_ref().clone()],
227        TypeRepr::Result { ok, err } => vec![ok.as_ref().clone(), err.as_ref().clone()],
228        TypeRepr::Enum { name, variants } => variants
229            .iter()
230            .filter_map(|variant| {
231                variant.payload_schema(name).map(|schema| TypeRepr::Schema {
232                    schema: Box::new(schema),
233                })
234            })
235            .collect(),
236        _ => Vec::new(),
237    }
238}
239
240fn variant_record_align_runtime(ty: &TypeRepr) -> usize {
241    let mut align = 4usize;
242    for payload in variant_payload_types(ty) {
243        let (_, slot_align) = payload_slot_layout_runtime(&payload);
244        align = align.max(slot_align).max(type_graph_align(&payload));
245    }
246    align
247}
248
249fn variant_payload_slot_offset(record_start: usize, payload_ty: &TypeRepr) -> Option<usize> {
250    let (_, align) = payload_slot_layout_runtime(payload_ty);
251    let raw = record_start.checked_add(1)?;
252    if align <= 1 {
253        Some(raw)
254    } else {
255        raw.checked_next_multiple_of(align)
256    }
257}
258
259fn variant_selected_payload(ty: &TypeRepr, tag: u8) -> Result<SelectedVariant, &'static str> {
260    match ty {
261        TypeRepr::Option { inner } => match tag {
262            0 => Ok(SelectedVariant {
263                name: "None".to_string(),
264                payload: None,
265            }),
266            1 => Ok(SelectedVariant {
267                name: "Some".to_string(),
268                payload: Some(SelectedVariantPayload {
269                    ty: inner.as_ref().clone(),
270                    key: Some("value"),
271                }),
272            }),
273            _ => Err("invalid Option tag"),
274        },
275        TypeRepr::Result { ok, err } => match tag {
276            0 => Ok(SelectedVariant {
277                name: "Ok".to_string(),
278                payload: Some(SelectedVariantPayload {
279                    ty: ok.as_ref().clone(),
280                    key: Some("value"),
281                }),
282            }),
283            1 => Ok(SelectedVariant {
284                name: "Err".to_string(),
285                payload: Some(SelectedVariantPayload {
286                    ty: err.as_ref().clone(),
287                    key: Some("error"),
288                }),
289            }),
290            _ => Err("invalid Result tag"),
291        },
292        TypeRepr::Enum { name, variants } => {
293            let variant = variants
294                .iter()
295                .find(|variant| variant.tag == tag)
296                .ok_or("invalid enum tag")?;
297            Ok(SelectedVariant {
298                name: variant.name.clone(),
299                payload: variant
300                    .payload_schema(name)
301                    .map(|schema| SelectedVariantPayload {
302                        ty: TypeRepr::Schema {
303                            schema: Box::new(schema),
304                        },
305                        key: None,
306                    }),
307            })
308        }
309        _ => Err("expected variant record"),
310    }
311}
312
313fn list_payload_is_pointer_array(element: &TypeRepr) -> bool {
314    matches!(
315        element,
316        TypeRepr::String
317            | TypeRepr::Schema { .. }
318            | TypeRepr::List { .. }
319            | TypeRepr::Option { .. }
320            | TypeRepr::Result { .. }
321            | TypeRepr::Enum { .. }
322    )
323}
324
325/// Pre-computed relocation table for a [`BufferBuilder`]'s schema.
326///
327/// `relocate_pointers` walks pointer-indirect slots when a child buffer
328/// is pasted into a parent (see [`BufferBuilder::finish_sub_record`] and
329/// [`ListRecordWriter::finish_entry`]). Each nested `Schema` / `List<Schema>`
330/// slot needs its own [`OffsetTable`] to recurse through; computing it
331/// via [`SchemaLayout::offsets_for`] on every relocation made the
332/// `List<Schema>` / `Dict<_, Schema>` paths quadratic in nesting depth
333/// and quadratic in entry count. The cache shifts that work to a single
334/// up-front walk at [`BufferBuilder::new`] / [`ListRecordWriter`]
335/// construction.
336///
337/// Layout: `slots` mirrors the `PointerIndirect` fields in
338/// [`OffsetTable::fields`]; each [`RelocSlot::nested`] is `Some` exactly
339/// when the field's declared type is `Schema { .. }` or
340/// `List { element: Schema { .. } }`, and points at a sibling
341/// `RelocLayout` for the inner record. `Arc` lets identical sub-trees
342/// share a single allocation when the same nested schema appears at
343/// multiple sites (e.g. two `List<User>` fields sharing the inner
344/// `User` layout).
345#[derive(Debug)]
346pub(crate) struct RelocLayout {
347    slots: Vec<RelocSlot>,
348}
349
350impl RelocLayout {
351    /// Build the relocation cache for one schema layer.
352    ///
353    /// Walks `layout.fields` and for every `PointerIndirect` slot whose
354    /// declared type is `Schema` / `List<Schema>`, recursively computes
355    /// the nested layout via [`SchemaLayout::offsets_for`] **once**.
356    /// Subsequent relocations reuse the cached layouts via the returned
357    /// `Arc`. Inline slots (`Int`, `Bool`, ...) are skipped — they're
358    /// never touched by `relocate_pointers`.
359    fn build(layout: &OffsetTable, fields: &[Field]) -> Arc<Self> {
360        let mut slots: Vec<RelocSlot> = Vec::new();
361        for fo in &layout.fields {
362            if !matches!(fo.kind, FieldKind::PointerIndirect { .. }) {
363                continue;
364            }
365            let declared = fields.iter().find(|f| f.name == fo.name);
366            let nested = declared.and_then(|f| match &f.ty {
367                TypeRepr::Schema { schema } => SchemaLayout::offsets_for(schema)
368                    .ok()
369                    .map(|sub| RelocLayout::build(&sub, &schema.fields)),
370                TypeRepr::List { element } => match element.as_ref() {
371                    TypeRepr::Schema { schema } => SchemaLayout::offsets_for(schema)
372                        .ok()
373                        .map(|sub| RelocLayout::build(&sub, &schema.fields)),
374                    _ => None,
375                },
376                _ => None,
377            });
378            // For a list field, pre-compute the recursive element
379            // descriptor so the relocation walker can rebase pointers one
380            // (or more) level deeper than `nested` alone reaches — the
381            // `List<List<String|Schema>>` inner pointer arrays.
382            let list_elem = declared.and_then(|f| match &f.ty {
383                TypeRepr::List { element } => ptr_array_elem_for(element),
384                _ => None,
385            });
386            let variant = declared.and_then(|f| match &f.ty {
387                TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
388                    Some(f.ty.clone())
389                }
390                _ => None,
391            });
392            slots.push(RelocSlot {
393                offset: fo.offset,
394                list_element: fo.list_element,
395                nested,
396                list_elem,
397                variant,
398            });
399        }
400        Arc::new(Self { slots })
401    }
402}
403
404/// Failure modes when writing / reading typed fields against a buffer.
405#[derive(Debug, Error, Clone, PartialEq, Eq)]
406pub enum BufferError {
407    /// The requested field name does not appear in the offset table.
408    /// Indicates a schema-drift bug or a typo at the call site.
409    #[error("unknown field `{name}`")]
410    UnknownField {
411        /// Field name passed to the writer / reader.
412        name: String,
413    },
414    /// The requested write / read type does not match the field's
415    /// declared type. Writing an `i64` to a `Bool` slot would corrupt
416    /// the layout for adjacent fields, so this surfaces as a hard
417    /// error rather than a silent reinterpret.
418    #[error(
419        "type mismatch for field `{name}`: schema declares {declared}, caller used {requested}"
420    )]
421    TypeMismatch {
422        /// Field name being accessed.
423        name: String,
424        /// Type the schema declared for this field.
425        declared: &'static str,
426        /// Type the caller's writer / reader assumed.
427        requested: &'static str,
428    },
429    /// The buffer is shorter than the offset table's `root_size`.
430    /// Always indicates the buffer was truncated mid-flight (e.g. an
431    /// `out_buf` the wasm module didn't fully fill).
432    #[error("buffer too small: have {have} bytes, layout requires {need}")]
433    BufferTooSmall {
434        /// Actual byte length of the buffer.
435        have: usize,
436        /// Required length per the offset table.
437        need: usize,
438    },
439    /// A pointer-indirect payload (String / `List<Int>`) is larger than
440    /// the `u32` length prefix can describe. Phase 2.c caps each
441    /// payload at `u32::MAX` bytes / elements; longer values surface
442    /// here rather than overflow silently.
443    #[error("payload for field `{name}` is too large: {len} exceeds u32::MAX")]
444    ValueTooLarge {
445        /// Field name carrying the oversized payload.
446        name: String,
447        /// Requested length (bytes for String, elements for `List<Int>`).
448        len: usize,
449    },
450    /// A pointer-indirect read tripped over a malformed tail-area
451    /// payload — the length prefix points beyond the buffer end, the
452    /// pointer is null when the schema expects a value, or the
453    /// utf-8 bytes inside a `String` payload are invalid.
454    #[error("malformed payload for field `{name}`: {reason}")]
455    MalformedPayload {
456        /// Field name being read.
457        name: String,
458        /// Why the payload couldn't be decoded.
459        reason: &'static str,
460    },
461}
462
463/// Type-checked writer over a record buffer with an optional tail
464/// area.
465///
466/// Phase 2.c shape:
467///
468/// * The fixed area is pre-allocated to `layout.root_size` so every
469///   inline scalar slot is well-defined zero bytes per the spec.
470/// * String / `List<Int>` writes append a `[len: u32 LE][payload]`
471///   record after the fixed area and back-patch the pointer slot in
472///   the fixed area with the tail-record's byte offset (relative to
473///   the buffer start — the wasm side adds `in_ptr` to it).
474///
475/// Lifetime tie-in: the builder borrows the offset table so the same
476/// layout description can be reused for the matching reader without
477/// reparsing.
478#[derive(Debug)]
479pub struct BufferBuilder<'a> {
480    layout: &'a OffsetTable,
481    field_index: Vec<FieldEntry>,
482    bytes: Vec<u8>,
483    /// Pre-computed relocation cache for `relocate_pointers`. Built once
484    /// at `new` time so subsequent pastes into a parent buffer (via
485    /// `finish_sub_record` / `finish_entry`) never re-walk the nested
486    /// schemas. Shared via `Arc` so a `ListRecordWriter`'s entry builders
487    /// hand the same cache back to the parent without re-deriving it.
488    reloc_layout: Arc<RelocLayout>,
489}
490
491impl<'a> BufferBuilder<'a> {
492    /// Build a writer for `layout` with the byte buffer pre-zeroed to
493    /// `layout.root_size`.
494    ///
495    /// `fields` carries the schema-level type info the layout pass
496    /// already validated; we keep a side index so the writer / reader
497    /// can detect a type mismatch without re-walking the schema.
498    pub fn new(layout: &'a OffsetTable, fields: &[Field]) -> Self {
499        let bytes = vec![0u8; layout.root_size];
500        let field_index = layout
501            .fields
502            .iter()
503            .filter_map(|fo| {
504                fields
505                    .iter()
506                    .find(|f| f.name == fo.name)
507                    .map(|f| FieldEntry {
508                        name: fo.name.clone(),
509                        ty: f.ty.clone(),
510                        offset: fo.offset,
511                        size: fo.size,
512                        kind: fo.kind,
513                        list_element: fo.list_element,
514                    })
515            })
516            .collect();
517        let reloc_layout = RelocLayout::build(layout, fields);
518        Self {
519            layout,
520            field_index,
521            bytes,
522            reloc_layout,
523        }
524    }
525
526    /// Write a 64-bit signed integer to `field_name`.
527    pub fn write_int(&mut self, field_name: &str, value: i64) -> Result<(), BufferError> {
528        let (offset, _, _) = self.locate(field_name, &TypeRepr::Int, "Int")?;
529        self.bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
530        Ok(())
531    }
532
533    /// Write a 64-bit float to `field_name`.
534    pub fn write_float(&mut self, field_name: &str, value: f64) -> Result<(), BufferError> {
535        let (offset, _, _) = self.locate(field_name, &TypeRepr::Float, "Float")?;
536        self.bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
537        Ok(())
538    }
539
540    /// Write a boolean to `field_name`. Encoded as `0u8` / `1u8`.
541    pub fn write_bool(&mut self, field_name: &str, value: bool) -> Result<(), BufferError> {
542        let (offset, _, _) = self.locate(field_name, &TypeRepr::Bool, "Bool")?;
543        self.bytes[offset] = u8::from(value);
544        Ok(())
545    }
546
547    /// Mark `field_name` as an internal unit slot. The slot is already zeroed by
548    /// `new`, so this is a no-op beyond the type check — useful to
549    /// surface a `TypeMismatch` early when the host accidentally
550    /// writes a unit marker to a non-unit slot.
551    pub fn write_unit(&mut self, field_name: &str) -> Result<(), BufferError> {
552        let (_, _, _) = self.locate(field_name, &TypeRepr::Unit, "Unit")?;
553        Ok(())
554    }
555
556    /// Write any supported value shape into `field_name` using the declared
557    /// canonical type. This is the generic marshalling entry point used by
558    /// nested schemas, tuples, and the compiled backends for types that do not
559    /// have a dedicated `write_*` convenience method.
560    pub fn write_value(
561        &mut self,
562        field_name: &str,
563        ty: &TypeRepr,
564        value: &crate::value::Value,
565    ) -> Result<(), BufferError> {
566        let entry = self.find_entry(field_name)?.clone();
567        if !type_matches(&entry.ty, ty) {
568            return Err(BufferError::TypeMismatch {
569                name: field_name.to_string(),
570                declared: type_label(&entry.ty),
571                requested: type_label(ty),
572            });
573        }
574        self.write_value_slot(field_name, entry.offset, ty, value)
575    }
576
577    /// Write a UTF-8 string into `field_name`'s tail-area record.
578    ///
579    /// Appends `[len: u32 LE][bytes]` after the current buffer end,
580    /// padding the cursor up to 4 bytes first so the length prefix is
581    /// naturally aligned. The pointer slot in the fixed area is
582    /// back-patched with the byte offset of the length prefix
583    /// (relative to the buffer base — the wasm side adds `in_ptr` to
584    /// reach absolute memory).
585    pub fn write_string(&mut self, field_name: &str, value: &str) -> Result<(), BufferError> {
586        let (slot_offset, _, kind) = self.locate(field_name, &TypeRepr::String, "String")?;
587        if !matches!(kind, FieldKind::PointerIndirect { .. }) {
588            // Defensive: the layout pass guarantees this, but we keep
589            // the check so a hand-built `OffsetTable` can't sneak an
590            // inline String through and corrupt adjacent slots.
591            return Err(BufferError::TypeMismatch {
592                name: field_name.to_string(),
593                declared: "String",
594                requested: "String",
595            });
596        }
597        let len = value.len();
598        if len > u32::MAX as usize {
599            return Err(BufferError::ValueTooLarge {
600                name: field_name.to_string(),
601                len,
602            });
603        }
604        let payload_offset = self.append_tail_record(4, len, value.as_bytes());
605        let ptr = u32::try_from(payload_offset).map_err(|_| BufferError::ValueTooLarge {
606            name: field_name.to_string(),
607            len: payload_offset,
608        })?;
609        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
610        Ok(())
611    }
612
613    /// Write a `List<Int>` into `field_name`'s tail-area record.
614    ///
615    /// Tail layout: `[len: u32 LE][i64 LE x len]`. The length prefix
616    /// is padded up to 8 bytes after itself so the i64 elements sit
617    /// on an 8-byte boundary the way the wasm side will eventually
618    /// expect (Phase 2.c keeps the elements untouched, but later
619    /// phases reading them need the alignment to be honest).
620    pub fn write_list_int(&mut self, field_name: &str, values: &[i64]) -> Result<(), BufferError> {
621        let (slot_offset, _, kind) = self.locate(
622            field_name,
623            &TypeRepr::List {
624                element: Box::new(TypeRepr::Int),
625            },
626            "List<Int>",
627        )?;
628        let FieldKind::PointerIndirect { tail_alignment } = kind else {
629            return Err(BufferError::TypeMismatch {
630                name: field_name.to_string(),
631                declared: "List<Int>",
632                requested: "List<Int>",
633            });
634        };
635        if values.len() > u32::MAX as usize {
636            return Err(BufferError::ValueTooLarge {
637                name: field_name.to_string(),
638                len: values.len(),
639            });
640        }
641        // Materialise elements into a `Vec<u8>` so `append_tail_record`
642        // can copy them in a single slice — avoids reborrowing the
643        // builder mid-write.
644        let mut payload = Vec::with_capacity(values.len() * 8);
645        for v in values {
646            payload.extend_from_slice(&v.to_le_bytes());
647        }
648        let payload_offset =
649            self.append_tail_record_with_inner_alignment(4, tail_alignment, values.len(), &payload);
650        let ptr = u32::try_from(payload_offset).map_err(|_| BufferError::ValueTooLarge {
651            name: field_name.to_string(),
652            len: payload_offset,
653        })?;
654        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
655        Ok(())
656    }
657
658    /// Write a `List<Float>` into `field_name`'s tail-area record.
659    ///
660    /// Phase 10-c: tail layout mirrors `List<Int>` — `[len: u32 LE]
661    /// [pad to 8][f64 LE x len]`. The post-len pad keeps the f64
662    /// payload on an 8-byte boundary so the wasm side can issue
663    /// `f64.load align=3` against the element stream.
664    pub fn write_list_float(
665        &mut self,
666        field_name: &str,
667        values: &[f64],
668    ) -> Result<(), BufferError> {
669        let (slot_offset, kind, _elem) =
670            self.locate_list(field_name, &TypeRepr::Float, "List<Float>")?;
671        let FieldKind::PointerIndirect { tail_alignment } = kind else {
672            return Err(BufferError::TypeMismatch {
673                name: field_name.to_string(),
674                declared: "List<Float>",
675                requested: "List<Float>",
676            });
677        };
678        if values.len() > u32::MAX as usize {
679            return Err(BufferError::ValueTooLarge {
680                name: field_name.to_string(),
681                len: values.len(),
682            });
683        }
684        let mut payload = Vec::with_capacity(values.len() * 8);
685        for v in values {
686            payload.extend_from_slice(&v.to_le_bytes());
687        }
688        let payload_offset =
689            self.append_tail_record_with_inner_alignment(4, tail_alignment, values.len(), &payload);
690        let ptr = u32::try_from(payload_offset).map_err(|_| BufferError::ValueTooLarge {
691            name: field_name.to_string(),
692            len: payload_offset,
693        })?;
694        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
695        Ok(())
696    }
697
698    /// Write a `List<Bool>` into `field_name`'s tail-area record.
699    ///
700    /// Phase 10-c: tail layout `[len: u32 LE][u8 x len]` — booleans
701    /// pack tightly with no inter-element padding per spec. The
702    /// record start is 4-byte aligned so the len prefix loads cleanly;
703    /// each element is one byte (`0` for false, `1` for true).
704    pub fn write_list_bool(
705        &mut self,
706        field_name: &str,
707        values: &[bool],
708    ) -> Result<(), BufferError> {
709        let (slot_offset, kind, _elem) =
710            self.locate_list(field_name, &TypeRepr::Bool, "List<Bool>")?;
711        if !matches!(kind, FieldKind::PointerIndirect { .. }) {
712            return Err(BufferError::TypeMismatch {
713                name: field_name.to_string(),
714                declared: "List<Bool>",
715                requested: "List<Bool>",
716            });
717        }
718        if values.len() > u32::MAX as usize {
719            return Err(BufferError::ValueTooLarge {
720                name: field_name.to_string(),
721                len: values.len(),
722            });
723        }
724        let payload: Vec<u8> = values.iter().map(|&b| u8::from(b)).collect();
725        let payload_offset = self.append_tail_record(4, values.len(), &payload);
726        let ptr = u32::try_from(payload_offset).map_err(|_| BufferError::ValueTooLarge {
727            name: field_name.to_string(),
728            len: payload_offset,
729        })?;
730        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
731        Ok(())
732    }
733
734    /// Write a `List<String>` into `field_name`'s tail-area record.
735    ///
736    /// Phase 10-c: header `[len: u32 LE][off_0: u32 LE]...[off_(n-1)]`
737    /// followed by per-string `[len: u32 LE][utf8 bytes]` tail records.
738    /// Each `off_i` is the buffer-relative byte offset of the matching
739    /// String's len prefix; the writer pads each String header to a
740    /// 4-byte boundary so the reader can dereference without an
741    /// unaligned load.
742    pub fn write_list_string<S: AsRef<str>>(
743        &mut self,
744        field_name: &str,
745        values: &[S],
746    ) -> Result<(), BufferError> {
747        let (slot_offset, kind, _elem) =
748            self.locate_list(field_name, &TypeRepr::String, "List<String>")?;
749        if !matches!(kind, FieldKind::PointerIndirect { .. }) {
750            return Err(BufferError::TypeMismatch {
751                name: field_name.to_string(),
752                declared: "List<String>",
753                requested: "List<String>",
754            });
755        }
756        let count = values.len();
757        if count > u32::MAX as usize {
758            return Err(BufferError::ValueTooLarge {
759                name: field_name.to_string(),
760                len: count,
761            });
762        }
763        // Allocate the header `[len][off_0]...[off_(n-1)]` first, with
764        // zeroed entries we'll back-patch as each String tail record
765        // lands. The header itself is 4-byte aligned.
766        self.pad_to(4);
767        let header_offset = self.bytes.len();
768        self.bytes.extend_from_slice(
769            &u32::try_from(count)
770                .expect("count already checked <= u32::MAX")
771                .to_le_bytes(),
772        );
773        let entries_start = self.bytes.len();
774        self.bytes.resize(entries_start + count * 4, 0);
775        // Append each String tail record, capturing its offset.
776        let mut offsets: Vec<u32> = Vec::with_capacity(count);
777        for (i, s) in values.iter().enumerate() {
778            let bytes = s.as_ref().as_bytes();
779            if bytes.len() > u32::MAX as usize {
780                return Err(BufferError::ValueTooLarge {
781                    name: format!("{field_name}[{i}]"),
782                    len: bytes.len(),
783                });
784            }
785            let entry_offset = self.append_tail_record(4, bytes.len(), bytes);
786            let entry_u32 =
787                u32::try_from(entry_offset).map_err(|_| BufferError::ValueTooLarge {
788                    name: field_name.to_string(),
789                    len: entry_offset,
790                })?;
791            offsets.push(entry_u32);
792        }
793        // Back-patch the pointer-array entries with the actual offsets.
794        for (i, off) in offsets.iter().enumerate() {
795            let dst = entries_start + i * 4;
796            self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
797        }
798        // Back-patch the field's pointer slot to the list-header
799        // offset (the `len` prefix).
800        let ptr = u32::try_from(header_offset).map_err(|_| BufferError::ValueTooLarge {
801            name: field_name.to_string(),
802            len: header_offset,
803        })?;
804        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
805        Ok(())
806    }
807
808    /// Write a nested `List<List<inner>>` into `field_name`'s tail
809    /// area, where `inner` is an inline-fixed scalar element
810    /// (`Int` / `Float` / `Bool`).
811    ///
812    /// Layout mirrors `List<String>` / `List<Schema>`: a header
813    /// `[len: u32 LE][off_0]...[off_(n-1)]` whose `off_i` are
814    /// buffer-relative offsets to per-element inner list records. Each
815    /// inner record is the same `[len: u32 LE][payload]` shape
816    /// [`Self::write_list_int`] / `write_list_float` / `write_list_bool`
817    /// produce, so an inner-record reader decodes them bit-identically.
818    /// The inner records carry no pointer slots of their own, so no
819    /// per-element relocation beyond the header's `off_i` rebase is
820    /// needed when the buffer is later pasted into a parent.
821    ///
822    /// `encode_inner` serialises one element's payload bytes and returns
823    /// `(element_count, inner_alignment)`; the caller drives it once per
824    /// inner list. Returns the count actually written.
825    pub fn write_list_list_with<F>(
826        &mut self,
827        field_name: &str,
828        inner_count: usize,
829        mut encode_inner: F,
830    ) -> Result<(), BufferError>
831    where
832        F: FnMut(usize, &mut Vec<u8>) -> Result<(usize, usize), BufferError>,
833    {
834        let entry = self
835            .field_index
836            .iter()
837            .find(|e| e.name == field_name)
838            .ok_or_else(|| BufferError::UnknownField {
839                name: field_name.to_string(),
840            })?;
841        let is_nested_list = matches!(&entry.ty, TypeRepr::List { element }
842            if matches!(element.as_ref(), TypeRepr::List { .. }));
843        if !is_nested_list || !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
844            return Err(BufferError::TypeMismatch {
845                name: field_name.to_string(),
846                declared: type_label(&entry.ty),
847                requested: "List<List<…>>",
848            });
849        }
850        let slot_offset = entry.offset;
851        if inner_count > u32::MAX as usize {
852            return Err(BufferError::ValueTooLarge {
853                name: field_name.to_string(),
854                len: inner_count,
855            });
856        }
857        // Reserve the header `[len][off_0]...` first; back-patch each
858        // `off_i` after the matching inner record lands.
859        self.pad_to(4);
860        let header_offset = self.bytes.len();
861        self.bytes
862            .extend_from_slice(&u32::try_from(inner_count).unwrap().to_le_bytes());
863        let entries_start = self.bytes.len();
864        self.bytes.resize(entries_start + inner_count * 4, 0);
865        let mut offsets: Vec<u32> = Vec::with_capacity(inner_count);
866        for i in 0..inner_count {
867            let mut payload = Vec::new();
868            let (elem_count, inner_alignment) = encode_inner(i, &mut payload)?;
869            let rec_offset = self.append_tail_record_with_inner_alignment(
870                4,
871                inner_alignment.max(1),
872                elem_count,
873                &payload,
874            );
875            let rec_u32 = u32::try_from(rec_offset).map_err(|_| BufferError::ValueTooLarge {
876                name: format!("{field_name}[{i}]"),
877                len: rec_offset,
878            })?;
879            offsets.push(rec_u32);
880        }
881        for (i, off) in offsets.iter().enumerate() {
882            let dst = entries_start + i * 4;
883            self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
884        }
885        let ptr = u32::try_from(header_offset).map_err(|_| BufferError::ValueTooLarge {
886            name: field_name.to_string(),
887            len: header_offset,
888        })?;
889        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
890        Ok(())
891    }
892
893    /// Start writing a `List<Schema>` element. Returns a list-record
894    /// writer the caller drives one entry at a time; the actual list
895    /// header pointer is patched into the parent slot when
896    /// [`Self::finish_list_record`] is called.
897    ///
898    /// Phase 10-c: each element is a branded sub-record (the inner
899    /// `TypeRepr::Schema { schema }`) whose fixed area lives in the
900    /// parent buffer's tail area, addressed by a `u32` entry in the
901    /// pointer array. The parent's pointer slot in turn holds the
902    /// buffer-relative offset of the list header (`[len: u32][off_0]
903    /// ...`).
904    ///
905    /// Workflow:
906    ///
907    /// ```ignore
908    /// let mut lw = parent.list_record(&field_name, &elem_layout, &elem_schema.fields)?;
909    /// for entry in entries {
910    ///     let mut child = lw.start_entry(&parent_builder)?;
911    ///     // ... write_int / write_string into `child` ...
912    ///     lw.finish_entry(&mut parent_builder, child)?;
913    /// }
914    /// parent.finish_list_record(&field_name, lw)?;
915    /// ```
916    ///
917    /// The split workflow keeps the parent buffer mutable for the
918    /// per-entry tail copy without aliasing the child borrow against
919    /// the parent's `field_index`. Hosts that don't need the full
920    /// step-by-step control can use the [`Self::write_list_record`]
921    /// convenience wrapper which takes a slice of pre-built dicts.
922    pub fn list_record_writer<'b>(
923        &self,
924        field_name: &str,
925        elem_layout: &'b OffsetTable,
926        elem_schema: &'b Schema,
927    ) -> Result<ListRecordWriter<'b>, BufferError> {
928        let entry = self
929            .field_index
930            .iter()
931            .find(|e| e.name == field_name)
932            .ok_or_else(|| BufferError::UnknownField {
933                name: field_name.to_string(),
934            })?;
935        match &entry.ty {
936            TypeRepr::List { element } => match element.as_ref() {
937                TypeRepr::Schema { schema } => {
938                    if schema.as_ref() != elem_schema {
939                        return Err(BufferError::TypeMismatch {
940                            name: field_name.to_string(),
941                            declared: type_label(&entry.ty),
942                            requested: "List<Schema>",
943                        });
944                    }
945                }
946                _ => {
947                    return Err(BufferError::TypeMismatch {
948                        name: field_name.to_string(),
949                        declared: type_label(&entry.ty),
950                        requested: "List<Schema>",
951                    });
952                }
953            },
954            other => {
955                return Err(BufferError::TypeMismatch {
956                    name: field_name.to_string(),
957                    declared: type_label(other),
958                    requested: "List<Schema>",
959                });
960            }
961        };
962        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
963            return Err(BufferError::TypeMismatch {
964                name: field_name.to_string(),
965                declared: "List<Schema>",
966                requested: "List<Schema>",
967            });
968        }
969        Ok(ListRecordWriter {
970            field_name: field_name.to_string(),
971            slot_offset: entry.offset,
972            elem_layout,
973            elem_schema,
974            entry_offsets: Vec::new(),
975            // Paste each entry at the element schema's deep paste alignment
976            // (not just its fixed-area `root_align`) so an 8-aligned
977            // `List<Int|Float>` payload in the entry's tail lands where the
978            // reader's absolute alignment recovery expects it post-paste.
979            elem_align: schema_paste_align(elem_layout, &elem_schema.fields),
980        })
981    }
982
983    /// Convenience writer for `List<Schema>` that builds each entry
984    /// from a pre-prepared `Vec<(field_name, write_callback)>` shape.
985    /// Phase 10-c: tests use the longer-form [`Self::list_record_writer`]
986    /// for full control; this wrapper accepts a list of buffer-builder
987    /// "actions" so simple cases don't need to spell out the start /
988    /// finish dance.
989    pub fn write_list_record<'b, F>(
990        &mut self,
991        field_name: &str,
992        elem_layout: &'b OffsetTable,
993        elem_schema: &'b Schema,
994        entries: &[F],
995    ) -> Result<(), BufferError>
996    where
997        F: Fn(&mut BufferBuilder<'b>) -> Result<(), BufferError>,
998    {
999        let mut writer = self.list_record_writer(field_name, elem_layout, elem_schema)?;
1000        for action in entries {
1001            let mut child = writer.start_entry();
1002            action(&mut child)?;
1003            writer.finish_entry(self, child)?;
1004        }
1005        self.finish_list_record(writer)
1006    }
1007
1008    /// Commit a [`ListRecordWriter`] — emit the list header into the
1009    /// tail area (aligned to 4) and patch the field's pointer slot
1010    /// with the header offset.
1011    pub fn finish_list_record(&mut self, writer: ListRecordWriter<'_>) -> Result<(), BufferError> {
1012        let count = writer.entry_offsets.len();
1013        if count > u32::MAX as usize {
1014            return Err(BufferError::ValueTooLarge {
1015                name: writer.field_name,
1016                len: count,
1017            });
1018        }
1019        self.pad_to(4);
1020        let header_offset = self.bytes.len();
1021        self.bytes
1022            .extend_from_slice(&u32::try_from(count).unwrap().to_le_bytes());
1023        for off in &writer.entry_offsets {
1024            self.bytes.extend_from_slice(&off.to_le_bytes());
1025        }
1026        let ptr = u32::try_from(header_offset).map_err(|_| BufferError::ValueTooLarge {
1027            name: writer.field_name.clone(),
1028            len: header_offset,
1029        })?;
1030        let slot_offset = writer.slot_offset;
1031        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
1032        Ok(())
1033    }
1034
1035    /// Consume the builder and return the underlying byte buffer.
1036    pub fn finish(self) -> Vec<u8> {
1037        self.bytes
1038    }
1039
1040    /// Consume the builder and return the byte buffer with every
1041    /// pointer slot rebased from **buffer-relative** to
1042    /// **arena-absolute** by adding `arena_base` (the absolute arena
1043    /// offset the buffer is about to be copied to — i.e. `in_ptr`).
1044    ///
1045    /// F1 unifies the in-buffer pointer convention on a single
1046    /// arena-absolute basis: the input marshaller knows `in_ptr` at this
1047    /// point, so it bakes it into every slot here once, and the machine
1048    /// code's param-read drops its old `+ in_ptr` rebase (the slot is
1049    /// already arena-absolute). The same recursive walk
1050    /// [`finish_sub_record`] uses to paste a child into a parent applies
1051    /// — a rebase by `arena_base` is structurally identical to a paste at
1052    /// `arena_base`, relocating every nested-schema and pointer-array
1053    /// entry too. `arena_base == 0` is a no-op (the slots are already
1054    /// correct), so a zero-const-data layout stays byte-identical.
1055    pub fn finish_arena_absolute(self, arena_base: u32) -> Result<Vec<u8>, BufferError> {
1056        let (mut bytes, reloc) = self.into_parts();
1057        if arena_base != 0 {
1058            relocate_pointers(&mut bytes, &reloc, 0, arena_base).map_err(|reason| {
1059                BufferError::MalformedPayload {
1060                    name: "<input marshal arena rebase>".to_string(),
1061                    reason,
1062                }
1063            })?;
1064        }
1065        Ok(bytes)
1066    }
1067
1068    /// Internal sibling of [`Self::finish`] that surrenders the byte
1069    /// buffer together with the pre-computed relocation cache.
1070    /// `finish_sub_record` and `ListRecordWriter::finish_entry` use this
1071    /// so the relocation walker can skip a fresh `OffsetTable` derivation
1072    /// per paste — the cache was already built at `new` time.
1073    pub(crate) fn into_parts(self) -> (Vec<u8>, Arc<RelocLayout>) {
1074        (self.bytes, self.reloc_layout)
1075    }
1076
1077    /// Allocate a nested branded sub-record under `field_name` and
1078    /// return a detached child [`BufferBuilder`] sized to the sub
1079    /// schema's fixed area.
1080    ///
1081    /// Phase 9.b-1: mirrors [`BufferReader::sub_record`] on the writer
1082    /// side so a host can pack Schema-typed `#main` args without
1083    /// reaching for hand-rolled byte arithmetic. The returned builder
1084    /// is *detached* — it owns its own `Vec<u8>` pre-sized to
1085    /// `sub_layout.root_size`. The parent's pointer slot stays zero
1086    /// until the caller hands the child back via
1087    /// [`Self::finish_sub_record`], which appends the child's bytes to
1088    /// the parent's tail area (aligning to `sub_layout.root_align`) and
1089    /// back-patches the slot.
1090    ///
1091    /// Detached children keep the writer simple: they don't borrow the
1092    /// parent (so multiple sibling sub-records can be authored
1093    /// independently), and the parent has a single commit step that
1094    /// also enforces the field-name → pointer-slot binding the layout
1095    /// pass guarantees.
1096    pub fn sub_record<'b>(
1097        &mut self,
1098        field_name: &str,
1099        sub_layout: &'b OffsetTable,
1100        sub_fields: &[Field],
1101    ) -> Result<BufferBuilder<'b>, BufferError> {
1102        // Validate the parent slot is a Schema-typed pointer-indirect
1103        // entry. Anything else would corrupt adjacent fields once we
1104        // back-patched the (wrong) slot.
1105        let entry = self
1106            .field_index
1107            .iter()
1108            .find(|e| e.name == field_name)
1109            .ok_or_else(|| BufferError::UnknownField {
1110                name: field_name.to_string(),
1111            })?;
1112        if !matches!(entry.ty, TypeRepr::Schema { .. }) {
1113            return Err(BufferError::TypeMismatch {
1114                name: field_name.to_string(),
1115                declared: type_label(&entry.ty),
1116                requested: "Schema",
1117            });
1118        }
1119        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
1120            return Err(BufferError::TypeMismatch {
1121                name: field_name.to_string(),
1122                declared: "Schema",
1123                requested: "Schema",
1124            });
1125        }
1126        Ok(BufferBuilder::new(sub_layout, sub_fields))
1127    }
1128
1129    /// Commit a detached sub-record produced by [`Self::sub_record`].
1130    ///
1131    /// Appends the child's byte buffer to the parent's tail area
1132    /// (padded up to the sub schema's root alignment) and writes the
1133    /// resulting buffer-relative offset into the parent's pointer slot
1134    /// for `field_name`. The child is consumed.
1135    ///
1136    /// Pointer relocation: the child built its `String` / `List<Int>` /
1137    /// nested-`Schema` slots with offsets relative to the **child's**
1138    /// buffer base (0). Once the child is pasted into the parent at
1139    /// `sub_base`, every such pointer slot needs `+ sub_base` to become
1140    /// parent-relative again — otherwise the wasm side / reader walks
1141    /// the wrong bytes. We walk the child's field layout recursively
1142    /// and rewrite each u32 pointer in place before appending.
1143    ///
1144    /// Errors mirror the parent's other writers: an unknown field name
1145    /// or a type-shape mismatch surfaces before any bytes are moved.
1146    /// An oversized child (offset doesn't fit in `u32`) surfaces as
1147    /// [`BufferError::ValueTooLarge`].
1148    pub fn finish_sub_record(
1149        &mut self,
1150        field_name: &str,
1151        child: BufferBuilder<'_>,
1152    ) -> Result<(), BufferError> {
1153        let entry = self
1154            .field_index
1155            .iter()
1156            .find(|e| e.name == field_name)
1157            .ok_or_else(|| BufferError::UnknownField {
1158                name: field_name.to_string(),
1159            })?;
1160        if !matches!(entry.ty, TypeRepr::Schema { .. }) {
1161            return Err(BufferError::TypeMismatch {
1162                name: field_name.to_string(),
1163                declared: type_label(&entry.ty),
1164                requested: "Schema",
1165            });
1166        }
1167        let FieldKind::PointerIndirect { tail_alignment } = entry.kind else {
1168            return Err(BufferError::TypeMismatch {
1169                name: field_name.to_string(),
1170                declared: "Schema",
1171                requested: "Schema",
1172            });
1173        };
1174        let slot_offset = entry.offset;
1175        // Paste at the child's deep paste alignment so an 8-aligned
1176        // `List<Int|Float>` (or a nested field transitively carrying one)
1177        // in the child's tail lands at an absolute offset where the
1178        // reader's alignment recovery is correct after relocation — not
1179        // just the child's fixed-area `root_align`.
1180        let child_tail_align = child
1181            .field_index
1182            .iter()
1183            .map(|fe| type_graph_align(&fe.ty))
1184            .max()
1185            .unwrap_or(1);
1186        let child_align = child
1187            .layout
1188            .root_align
1189            .max(tail_alignment)
1190            .max(child_tail_align)
1191            .max(1);
1192        let (child_bytes, child_reloc) = child.into_parts();
1193        let mut child_bytes = child_bytes;
1194        self.pad_to(child_align);
1195        let sub_base = self.bytes.len();
1196        let ptr = u32::try_from(sub_base).map_err(|_| BufferError::ValueTooLarge {
1197            name: field_name.to_string(),
1198            len: sub_base,
1199        })?;
1200        // Rebase every pointer slot inside the child so it's
1201        // parent-relative once we paste the child bytes.
1202        relocate_pointers(&mut child_bytes, &child_reloc, 0, ptr).map_err(|reason| {
1203            BufferError::MalformedPayload {
1204                name: field_name.to_string(),
1205                reason,
1206            }
1207        })?;
1208        self.bytes.extend_from_slice(&child_bytes);
1209        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
1210        Ok(())
1211    }
1212
1213    fn find_entry(&self, field_name: &str) -> Result<&FieldEntry, BufferError> {
1214        self.field_index
1215            .iter()
1216            .find(|e| e.name == field_name)
1217            .ok_or_else(|| BufferError::UnknownField {
1218                name: field_name.to_string(),
1219            })
1220    }
1221
1222    fn locate(
1223        &self,
1224        field_name: &str,
1225        expected: &TypeRepr,
1226        requested_label: &'static str,
1227    ) -> Result<(usize, usize, FieldKind), BufferError> {
1228        let entry = self
1229            .field_index
1230            .iter()
1231            .find(|e| e.name == field_name)
1232            .ok_or_else(|| BufferError::UnknownField {
1233                name: field_name.to_string(),
1234            })?;
1235        if !type_matches(&entry.ty, expected) {
1236            return Err(BufferError::TypeMismatch {
1237                name: field_name.to_string(),
1238                declared: type_label(&entry.ty),
1239                requested: requested_label,
1240            });
1241        }
1242        Ok((entry.offset, entry.size, entry.kind))
1243    }
1244
1245    /// Locate a list-typed field by name, validating that the
1246    /// declared element type matches `expected_element`. Returns the
1247    /// pointer slot offset, the `FieldKind` for sanity-checking the
1248    /// pointer-indirect shape, and the [`ListElementKind`] sidecar.
1249    fn locate_list(
1250        &self,
1251        field_name: &str,
1252        expected_element: &TypeRepr,
1253        requested_label: &'static str,
1254    ) -> Result<(usize, FieldKind, ListElementKind), BufferError> {
1255        let entry = self
1256            .field_index
1257            .iter()
1258            .find(|e| e.name == field_name)
1259            .ok_or_else(|| BufferError::UnknownField {
1260                name: field_name.to_string(),
1261            })?;
1262        let declared_elem = match &entry.ty {
1263            TypeRepr::List { element } => element.as_ref(),
1264            other => {
1265                return Err(BufferError::TypeMismatch {
1266                    name: field_name.to_string(),
1267                    declared: type_label(other),
1268                    requested: requested_label,
1269                });
1270            }
1271        };
1272        if !type_matches(declared_elem, expected_element) {
1273            return Err(BufferError::TypeMismatch {
1274                name: field_name.to_string(),
1275                declared: type_label(&entry.ty),
1276                requested: requested_label,
1277            });
1278        }
1279        let list_elem = entry
1280            .list_element
1281            .ok_or_else(|| BufferError::MalformedPayload {
1282                name: field_name.to_string(),
1283                reason: "list field missing element layout",
1284            })?;
1285        Ok((entry.offset, entry.kind, list_elem))
1286    }
1287
1288    /// Append a `[len: u32 LE][payload]` record. Pads the buffer up
1289    /// to `prefix_alignment` before the length prefix so a future
1290    /// reader can dereference the pointer without an unaligned load.
1291    ///
1292    /// Returns the byte offset of the **length prefix** — the value
1293    /// that gets back-patched into the fixed-area pointer slot.
1294    fn append_tail_record(&mut self, prefix_alignment: usize, len: usize, payload: &[u8]) -> usize {
1295        self.pad_to(prefix_alignment);
1296        let record_offset = self.bytes.len();
1297        self.bytes.extend_from_slice(&(len as u32).to_le_bytes());
1298        self.bytes.extend_from_slice(payload);
1299        record_offset
1300    }
1301
1302    /// Variant of [`Self::append_tail_record`] that pads between the
1303    /// length prefix and the payload so the payload starts at
1304    /// `inner_alignment`. `List<Int>` uses this so the i64 elements
1305    /// sit on an 8-byte boundary the wasm side can load aligned.
1306    fn append_tail_record_with_inner_alignment(
1307        &mut self,
1308        prefix_alignment: usize,
1309        inner_alignment: usize,
1310        len: usize,
1311        payload: &[u8],
1312    ) -> usize {
1313        self.pad_to(prefix_alignment);
1314        let record_offset = self.bytes.len();
1315        self.bytes.extend_from_slice(&(len as u32).to_le_bytes());
1316        if inner_alignment > 1 {
1317            self.pad_to(inner_alignment);
1318        }
1319        self.bytes.extend_from_slice(payload);
1320        record_offset
1321    }
1322
1323    fn write_value_slot(
1324        &mut self,
1325        name: &str,
1326        slot_offset: usize,
1327        ty: &TypeRepr,
1328        value: &crate::value::Value,
1329    ) -> Result<(), BufferError> {
1330        use crate::value::Value;
1331        match (ty, value) {
1332            (TypeRepr::Int, Value::Int(v)) => {
1333                self.bytes[slot_offset..slot_offset + 8].copy_from_slice(&v.to_le_bytes());
1334                Ok(())
1335            }
1336            (TypeRepr::Float, Value::Float(v)) => {
1337                self.bytes[slot_offset..slot_offset + 8]
1338                    .copy_from_slice(&v.into_inner().to_le_bytes());
1339                Ok(())
1340            }
1341            (TypeRepr::Float, Value::Int(v)) => {
1342                self.bytes[slot_offset..slot_offset + 8]
1343                    .copy_from_slice(&(*v as f64).to_le_bytes());
1344                Ok(())
1345            }
1346            (TypeRepr::Bool, Value::Bool(v)) => {
1347                self.bytes[slot_offset] = u8::from(*v);
1348                Ok(())
1349            }
1350            (TypeRepr::Unit, v) if v.is_option_none() => Ok(()),
1351            (TypeRepr::String, Value::String(s)) => {
1352                let off = self.append_tail_record(4, s.len(), s.as_str().as_bytes());
1353                self.write_pointer_slot(name, slot_offset, off)
1354            }
1355            (TypeRepr::List { element }, Value::List(items)) => {
1356                let off = self.append_list_payload(element, items)?;
1357                self.write_pointer_slot(name, slot_offset, off)
1358            }
1359            (TypeRepr::Schema { schema }, v) => {
1360                let off = self.append_schema_value_payload(name, schema, v)?;
1361                self.write_pointer_slot(name, slot_offset, off)
1362            }
1363            (TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. }, v) => {
1364                let off = self.append_variant_record(name, ty, v)?;
1365                self.write_pointer_slot(name, slot_offset, off)
1366            }
1367            (_, other) => Err(BufferError::TypeMismatch {
1368                name: name.to_string(),
1369                declared: type_label(ty),
1370                requested: other.type_name(),
1371            }),
1372        }
1373    }
1374
1375    fn write_pointer_slot(
1376        &mut self,
1377        name: &str,
1378        slot_offset: usize,
1379        target_offset: usize,
1380    ) -> Result<(), BufferError> {
1381        let ptr = u32::try_from(target_offset).map_err(|_| BufferError::ValueTooLarge {
1382            name: name.to_string(),
1383            len: target_offset,
1384        })?;
1385        self.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
1386        Ok(())
1387    }
1388
1389    fn append_schema_value_payload(
1390        &mut self,
1391        name: &str,
1392        schema: &Schema,
1393        value: &crate::value::Value,
1394    ) -> Result<usize, BufferError> {
1395        use crate::value::Value;
1396        let sub_layout =
1397            SchemaLayout::offsets_for(schema).map_err(|_| BufferError::MalformedPayload {
1398                name: name.to_string(),
1399                reason: "nested schema field is not layoutable",
1400            })?;
1401        let mut child = BufferBuilder::new(&sub_layout, &schema.fields);
1402        match value {
1403            Value::Dict(dict) if !schema.is_tuple => {
1404                write_schema_record_into_builder(&mut child, schema, dict)?;
1405            }
1406            Value::Tuple(items) if schema.is_tuple => {
1407                write_tuple_record_into_builder(&mut child, schema, items)?;
1408            }
1409            other => {
1410                return Err(BufferError::TypeMismatch {
1411                    name: name.to_string(),
1412                    declared: if schema.is_tuple { "Tuple" } else { "Schema" },
1413                    requested: other.type_name(),
1414                })
1415            }
1416        }
1417        let align = schema_paste_align(&sub_layout, &schema.fields);
1418        self.append_child_record_payload(name, child, align)
1419    }
1420
1421    fn append_child_record_payload(
1422        &mut self,
1423        name: &str,
1424        child: BufferBuilder<'_>,
1425        requested_align: usize,
1426    ) -> Result<usize, BufferError> {
1427        let child_tail_align = child
1428            .field_index
1429            .iter()
1430            .map(|fe| type_graph_align(&fe.ty))
1431            .max()
1432            .unwrap_or(1);
1433        let child_align = child
1434            .layout
1435            .root_align
1436            .max(requested_align)
1437            .max(child_tail_align)
1438            .max(1);
1439        let (mut child_bytes, child_reloc) = child.into_parts();
1440        self.pad_to(child_align);
1441        let entry_offset = self.bytes.len();
1442        let ptr = u32::try_from(entry_offset).map_err(|_| BufferError::ValueTooLarge {
1443            name: name.to_string(),
1444            len: entry_offset,
1445        })?;
1446        relocate_pointers(&mut child_bytes, &child_reloc, 0, ptr).map_err(|reason| {
1447            BufferError::MalformedPayload {
1448                name: name.to_string(),
1449                reason,
1450            }
1451        })?;
1452        self.bytes.extend_from_slice(&child_bytes);
1453        Ok(entry_offset)
1454    }
1455
1456    fn append_variant_record(
1457        &mut self,
1458        name: &str,
1459        ty: &TypeRepr,
1460        value: &crate::value::Value,
1461    ) -> Result<usize, BufferError> {
1462        let (tag, payload) = variant_payload_for_value(name, ty, value)?;
1463        self.pad_to(variant_record_align_runtime(ty));
1464        let record_offset = self.bytes.len();
1465        self.bytes.push(tag);
1466        if let Some((payload_ty, payload_value)) = payload {
1467            let (slot_size, _) = payload_slot_layout_runtime(&payload_ty);
1468            let slot_offset =
1469                variant_payload_slot_offset(record_offset, &payload_ty).ok_or_else(|| {
1470                    BufferError::MalformedPayload {
1471                        name: name.to_string(),
1472                        reason: "variant payload slot offset overflows usize",
1473                    }
1474                })?;
1475            let slot_end = slot_offset.checked_add(slot_size).ok_or_else(|| {
1476                BufferError::MalformedPayload {
1477                    name: name.to_string(),
1478                    reason: "variant payload slot end overflows usize",
1479                }
1480            })?;
1481            if self.bytes.len() < slot_end {
1482                self.bytes.resize(slot_end, 0);
1483            }
1484            self.write_value_slot(name, slot_offset, &payload_ty, payload_value)?;
1485        }
1486        Ok(record_offset)
1487    }
1488
1489    /// Append a `List<element>` payload (a pointer-array `[len][off_i]…`
1490    /// header plus the per-element records it points at, or an inline
1491    /// `[len][payload]` record for inline-fixed scalar elements) at the
1492    /// current tail end, **without** wiring any fixed-area field slot, and
1493    /// return the buffer-relative offset of the header. The returned
1494    /// offset is the value a field / entry slot should hold to reach this
1495    /// list. Recurses for `List<List<…>>` so a doubly-nested pointer array
1496    /// is laid out bit-identically to the field-slot writers (the reader
1497    /// and verifier walk the same shape). Every emitted offset is
1498    /// child-buffer-relative, so the existing relocation walk rebases the
1499    /// whole graph when the buffer is later pasted / arena-rebased.
1500    ///
1501    /// This is the recursive input marshaller behind `List<List<String>>`
1502    /// / `List<List<Schema>>` params (F5): the layout pass admits those
1503    /// shapes and the relocation walker's `PtrArrayElem::InnerList`
1504    /// descriptor rebases the inner pointer arrays.
1505    fn append_list_payload(
1506        &mut self,
1507        element: &TypeRepr,
1508        items: &[crate::value::Value],
1509    ) -> Result<usize, BufferError> {
1510        use crate::value::Value;
1511        let count = items.len();
1512        if count > u32::MAX as usize {
1513            return Err(BufferError::ValueTooLarge {
1514                name: "<nested list>".to_string(),
1515                len: count,
1516            });
1517        }
1518        match element {
1519            // Inline-fixed scalar element: one self-contained
1520            // `[len][pad][payload]` record, byte-identical to
1521            // `write_list_int` / `write_list_float` / `write_list_bool`.
1522            TypeRepr::Int | TypeRepr::Float | TypeRepr::Bool => {
1523                let (inner_align, mut payload) = (
1524                    match element {
1525                        TypeRepr::Int | TypeRepr::Float => 8usize,
1526                        _ => 4usize,
1527                    },
1528                    Vec::<u8>::new(),
1529                );
1530                for (i, it) in items.iter().enumerate() {
1531                    match (element, it) {
1532                        (TypeRepr::Int, Value::Int(v)) => {
1533                            payload.extend_from_slice(&v.to_le_bytes())
1534                        }
1535                        (TypeRepr::Float, Value::Float(v)) => {
1536                            payload.extend_from_slice(&v.into_inner().to_le_bytes())
1537                        }
1538                        (TypeRepr::Float, Value::Int(v)) => {
1539                            payload.extend_from_slice(&(*v as f64).to_le_bytes())
1540                        }
1541                        (TypeRepr::Bool, Value::Bool(v)) => payload.push(u8::from(*v)),
1542                        (_, other) => {
1543                            return Err(BufferError::TypeMismatch {
1544                                name: format!("<nested list>[{i}]"),
1545                                declared: "List<scalar>",
1546                                requested: other.type_name(),
1547                            })
1548                        }
1549                    }
1550                }
1551                Ok(self.append_tail_record_with_inner_alignment(4, inner_align, count, &payload))
1552            }
1553            // `List<String>`: pointer-array header whose entries point at
1554            // `[len][utf8]` String records — identical to
1555            // `write_list_string`.
1556            TypeRepr::String => {
1557                self.pad_to(4);
1558                let header_offset = self.bytes.len();
1559                self.bytes.extend_from_slice(&(count as u32).to_le_bytes());
1560                let entries_start = self.bytes.len();
1561                self.bytes.resize(entries_start + count * 4, 0);
1562                let mut offsets: Vec<u32> = Vec::with_capacity(count);
1563                for (i, it) in items.iter().enumerate() {
1564                    let Value::String(s) = it else {
1565                        return Err(BufferError::TypeMismatch {
1566                            name: format!("<nested list>[{i}]"),
1567                            declared: "List<String>",
1568                            requested: it.type_name(),
1569                        });
1570                    };
1571                    let bytes = s.as_str().as_bytes();
1572                    if bytes.len() > u32::MAX as usize {
1573                        return Err(BufferError::ValueTooLarge {
1574                            name: format!("<nested list>[{i}]"),
1575                            len: bytes.len(),
1576                        });
1577                    }
1578                    let off = self.append_tail_record(4, bytes.len(), bytes);
1579                    offsets.push(u32::try_from(off).map_err(|_| BufferError::ValueTooLarge {
1580                        name: "<nested list>".to_string(),
1581                        len: off,
1582                    })?);
1583                }
1584                for (i, off) in offsets.iter().enumerate() {
1585                    let dst = entries_start + i * 4;
1586                    self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
1587                }
1588                Ok(header_offset)
1589            }
1590            // `List<List<…>>`: pointer-array header whose entries point at
1591            // inner list headers — recurse.
1592            TypeRepr::List { element: inner } => {
1593                self.pad_to(4);
1594                let header_offset = self.bytes.len();
1595                self.bytes.extend_from_slice(&(count as u32).to_le_bytes());
1596                let entries_start = self.bytes.len();
1597                self.bytes.resize(entries_start + count * 4, 0);
1598                let mut offsets: Vec<u32> = Vec::with_capacity(count);
1599                for (i, it) in items.iter().enumerate() {
1600                    let Value::List(inner_items) = it else {
1601                        return Err(BufferError::TypeMismatch {
1602                            name: format!("<nested list>[{i}]"),
1603                            declared: "List<List<…>>",
1604                            requested: it.type_name(),
1605                        });
1606                    };
1607                    let off = self.append_list_payload(inner, inner_items)?;
1608                    offsets.push(u32::try_from(off).map_err(|_| BufferError::ValueTooLarge {
1609                        name: "<nested list>".to_string(),
1610                        len: off,
1611                    })?);
1612                }
1613                for (i, off) in offsets.iter().enumerate() {
1614                    let dst = entries_start + i * 4;
1615                    self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
1616                }
1617                Ok(header_offset)
1618            }
1619            // `List<Schema>`: pointer-array header whose entries point at
1620            // schema sub-records. Each sub-record is built in a detached
1621            // child builder, relocated to its entry offset (so its own
1622            // pointer slots become buffer-relative), and pasted — exactly
1623            // the bytes `ListRecordWriter` produces.
1624            TypeRepr::Schema { schema } => {
1625                let sub_layout = SchemaLayout::offsets_for(schema).map_err(|_| {
1626                    BufferError::MalformedPayload {
1627                        name: "<nested list>".to_string(),
1628                        reason: "inner List<Schema> element schema is not layoutable",
1629                    }
1630                })?;
1631                // Paste each inner schema sub-record at its deep paste
1632                // alignment (not just `root_align`) so an 8-aligned
1633                // `List<Int|Float>` field in the sub-record's tail lands
1634                // where the reader's absolute alignment recovery expects it.
1635                let elem_align = schema_paste_align(&sub_layout, &schema.fields);
1636                self.pad_to(4);
1637                let header_offset = self.bytes.len();
1638                self.bytes.extend_from_slice(&(count as u32).to_le_bytes());
1639                let entries_start = self.bytes.len();
1640                self.bytes.resize(entries_start + count * 4, 0);
1641                let mut offsets: Vec<u32> = Vec::with_capacity(count);
1642                for (i, it) in items.iter().enumerate() {
1643                    let mut child = BufferBuilder::new(&sub_layout, &schema.fields);
1644                    match it {
1645                        Value::Dict(dict) if !schema.is_tuple => {
1646                            write_schema_record_into_builder(&mut child, schema, dict)?;
1647                        }
1648                        Value::Tuple(tuple_items) if schema.is_tuple => {
1649                            write_tuple_record_into_builder(&mut child, schema, tuple_items)?;
1650                        }
1651                        other => {
1652                            return Err(BufferError::TypeMismatch {
1653                                name: format!("<nested list>[{i}]"),
1654                                declared: if schema.is_tuple {
1655                                    "List<Tuple>"
1656                                } else {
1657                                    "List<Schema>"
1658                                },
1659                                requested: other.type_name(),
1660                            });
1661                        }
1662                    }
1663                    let (mut child_bytes, child_reloc) = child.into_parts();
1664                    self.pad_to(elem_align);
1665                    let entry_offset = self.bytes.len();
1666                    let ptr =
1667                        u32::try_from(entry_offset).map_err(|_| BufferError::ValueTooLarge {
1668                            name: "<nested list>".to_string(),
1669                            len: entry_offset,
1670                        })?;
1671                    relocate_pointers(&mut child_bytes, &child_reloc, 0, ptr).map_err(
1672                        |reason| BufferError::MalformedPayload {
1673                            name: format!("<nested list>[{i}]"),
1674                            reason,
1675                        },
1676                    )?;
1677                    self.bytes.extend_from_slice(&child_bytes);
1678                    offsets.push(ptr);
1679                }
1680                for (i, off) in offsets.iter().enumerate() {
1681                    let dst = entries_start + i * 4;
1682                    self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
1683                }
1684                Ok(header_offset)
1685            }
1686            TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
1687                self.pad_to(4);
1688                let header_offset = self.bytes.len();
1689                self.bytes.extend_from_slice(&(count as u32).to_le_bytes());
1690                let entries_start = self.bytes.len();
1691                self.bytes.resize(entries_start + count * 4, 0);
1692                let mut offsets: Vec<u32> = Vec::with_capacity(count);
1693                for (i, it) in items.iter().enumerate() {
1694                    let entry_name = format!("<nested list>[{i}]");
1695                    let off = self.append_variant_record(&entry_name, element, it)?;
1696                    offsets.push(u32::try_from(off).map_err(|_| BufferError::ValueTooLarge {
1697                        name: "<nested list>".to_string(),
1698                        len: off,
1699                    })?);
1700                }
1701                for (i, off) in offsets.iter().enumerate() {
1702                    let dst = entries_start + i * 4;
1703                    self.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
1704                }
1705                Ok(header_offset)
1706            }
1707            other => Err(BufferError::TypeMismatch {
1708                name: "<nested list>".to_string(),
1709                declared: type_label(other),
1710                requested: "List<scalar/String/Schema/List/Option/Result>",
1711            }),
1712        }
1713    }
1714
1715    /// Grow the buffer with zero bytes until its length is a multiple
1716    /// of `align`. No-op when already aligned.
1717    fn pad_to(&mut self, align: usize) {
1718        if align <= 1 {
1719            return;
1720        }
1721        if let Some(target) = self.bytes.len().checked_next_multiple_of(align) {
1722            self.bytes.resize(target, 0);
1723        }
1724    }
1725
1726    /// Read-only accessor used by tests to peek at the layout the
1727    /// writer is filling — keeps `layout: &OffsetTable` from looking
1728    /// unused while staying out of the public surface in cases where
1729    /// callers already have the table.
1730    #[allow(dead_code)]
1731    pub(crate) fn layout(&self) -> &OffsetTable {
1732        self.layout
1733    }
1734}
1735
1736/// Phase 10-c: in-flight `List<Schema>` element writer.
1737///
1738/// Buffered between [`BufferBuilder::list_record_writer`] and
1739/// [`BufferBuilder::finish_list_record`]. Each `start_entry` allocates
1740/// a detached child builder; `finish_entry` rebases the child's
1741/// internal pointers, appends the bytes to the parent's tail area,
1742/// and records the per-entry offset. The list header is written
1743/// only when the writer is finished, so the entry pointer array can
1744/// be filled with the final per-entry offsets without back-patches.
1745pub struct ListRecordWriter<'b> {
1746    field_name: String,
1747    slot_offset: usize,
1748    elem_layout: &'b OffsetTable,
1749    elem_schema: &'b Schema,
1750    entry_offsets: Vec<u32>,
1751    elem_align: usize,
1752}
1753
1754impl<'b> ListRecordWriter<'b> {
1755    /// Allocate a fresh entry builder. The caller drives it with
1756    /// `write_int` / `write_string` / nested `sub_record` and hands
1757    /// it back via [`Self::finish_entry`].
1758    pub fn start_entry(&self) -> BufferBuilder<'b> {
1759        BufferBuilder::new(self.elem_layout, &self.elem_schema.fields)
1760    }
1761
1762    /// Commit a previously-started entry into the parent buffer.
1763    ///
1764    /// Pads the parent tail area up to the schema's `root_align`,
1765    /// rebases the child's pointer-indirect slots through
1766    /// `relocate_pointers`, appends the bytes, and records the
1767    /// entry's offset for the eventual list header.
1768    pub fn finish_entry(
1769        &mut self,
1770        parent: &mut BufferBuilder<'_>,
1771        child: BufferBuilder<'_>,
1772    ) -> Result<(), BufferError> {
1773        if self.entry_offsets.len() >= u32::MAX as usize {
1774            return Err(BufferError::ValueTooLarge {
1775                name: self.field_name.clone(),
1776                len: self.entry_offsets.len() + 1,
1777            });
1778        }
1779        let (child_bytes, child_reloc) = child.into_parts();
1780        let mut child_bytes = child_bytes;
1781        parent.pad_to(self.elem_align);
1782        let entry_offset = parent.bytes.len();
1783        let ptr = u32::try_from(entry_offset).map_err(|_| BufferError::ValueTooLarge {
1784            name: self.field_name.clone(),
1785            len: entry_offset,
1786        })?;
1787        relocate_pointers(&mut child_bytes, &child_reloc, 0, ptr).map_err(|reason| {
1788            BufferError::MalformedPayload {
1789                name: self.field_name.clone(),
1790                reason,
1791            }
1792        })?;
1793        parent.bytes.extend_from_slice(&child_bytes);
1794        self.entry_offsets.push(ptr);
1795        Ok(())
1796    }
1797}
1798
1799/// Rebase every pointer-indirect slot inside `bytes` so the offsets
1800/// are valid once the whole buffer is pasted at `paste_base` of a
1801/// parent buffer. `record_base` is the byte offset of the record's
1802/// fixed area inside `bytes` (0 for the root call); recursion walks
1803/// nested `Schema` sub-records by following the slot's pre-relocation
1804/// value to find the inner fixed area.
1805///
1806/// All pointer slots in the input are expected to carry offsets
1807/// relative to `bytes`'s own base — i.e. the values a freshly built
1808/// child [`BufferBuilder`] would have written. After this routine each
1809/// slot is updated to `original_value + paste_base`, which matches the
1810/// parent buffer's coordinate system.
1811fn relocate_pointers(
1812    bytes: &mut [u8],
1813    reloc: &RelocLayout,
1814    record_base: usize,
1815    paste_base: u32,
1816) -> Result<(), &'static str> {
1817    for slot in &reloc.slots {
1818        let slot_abs = record_base
1819            .checked_add(slot.offset)
1820            .ok_or("pointer slot offset overflows usize")?;
1821        if slot_abs
1822            .checked_add(4)
1823            .map(|end| end > bytes.len())
1824            .unwrap_or(true)
1825        {
1826            return Err("pointer slot exceeds buffer end");
1827        }
1828        let mut ptr_buf = [0u8; 4];
1829        ptr_buf.copy_from_slice(&bytes[slot_abs..slot_abs + 4]);
1830        let original = u32::from_le_bytes(ptr_buf);
1831        let relocated = original
1832            .checked_add(paste_base)
1833            .ok_or("relocated pointer overflows u32")?;
1834        bytes[slot_abs..slot_abs + 4].copy_from_slice(&relocated.to_le_bytes());
1835        // For nested Schema fields, the pre-relocation pointer named
1836        // the inner record's fixed-area base inside `bytes`. Recurse
1837        // there so the inner record's own pointer-indirect slots get
1838        // rebased too — without recursion the wasm reader walking the
1839        // grand-child's String slot would fall off the parent buffer
1840        // by `paste_base` bytes.
1841        if let Some(variant_ty) = slot.variant.as_ref() {
1842            relocate_variant_record(bytes, original as usize, variant_ty, paste_base)?;
1843            continue;
1844        }
1845        match slot.list_element {
1846            // Phase 10-c: `List<String>` / `List<Schema>` payloads are
1847            // pointer arrays whose entries also reference tail-area
1848            // records. Each entry needs `+ paste_base` so the reader can
1849            // still resolve through them. `List<Int>` / `List<Float>` /
1850            // `List<Bool>` are inline-fixed and need no per-element
1851            // rebase.
1852            Some(ListElementKind::PointerArray { .. }) => {
1853                relocate_list_pointer_array(
1854                    bytes,
1855                    original as usize,
1856                    slot.list_elem.as_ref(),
1857                    paste_base,
1858                )?;
1859            }
1860            Some(ListElementKind::InlineFixed { .. }) | None => {
1861                if let Some(nested) = slot.nested.as_deref() {
1862                    relocate_pointers(bytes, nested, original as usize, paste_base)?;
1863                }
1864            }
1865        }
1866    }
1867    Ok(())
1868}
1869
1870/// Rebase every entry of a `List<String>` / `List<Schema>` /
1871/// `List<List<…>>` pointer array. `record_start` is the byte offset of
1872/// the list's tail record (the `[len: u32][off_0: u32]...` header). Walks
1873/// the `len` entries, adds `paste_base` to each, and recurses per the
1874/// element descriptor `elem`:
1875///
1876/// * `Schema` — into the per-element sub-record's [`RelocLayout`] so its
1877///   own pointer slots are rebased too.
1878/// * `InnerList` — the entry points at an **inner list header** that is
1879///   itself a pointer array; recurse into it one level deeper (this is
1880///   the `List<List<String|Schema>>` relocation the v1 walker lacked).
1881/// * `String` / `None` — the entry's target carries no internal pointer
1882///   (an inline-fixed inner scalar list, or a String record), so the
1883///   entry-pointer rebase above is all that is needed.
1884fn relocate_list_pointer_array(
1885    bytes: &mut [u8],
1886    record_start: usize,
1887    elem: Option<&PtrArrayElem>,
1888    paste_base: u32,
1889) -> Result<(), &'static str> {
1890    if record_start
1891        .checked_add(4)
1892        .map(|end| end > bytes.len())
1893        .unwrap_or(true)
1894    {
1895        return Err("list length prefix exceeds buffer end");
1896    }
1897    let mut len_buf = [0u8; 4];
1898    len_buf.copy_from_slice(&bytes[record_start..record_start + 4]);
1899    let count = u32::from_le_bytes(len_buf) as usize;
1900    let mut cursor = record_start + 4;
1901    for _ in 0..count {
1902        if cursor
1903            .checked_add(4)
1904            .map(|end| end > bytes.len())
1905            .unwrap_or(true)
1906        {
1907            return Err("list pointer array entry exceeds buffer end");
1908        }
1909        let mut entry_buf = [0u8; 4];
1910        entry_buf.copy_from_slice(&bytes[cursor..cursor + 4]);
1911        let original = u32::from_le_bytes(entry_buf);
1912        let relocated = original
1913            .checked_add(paste_base)
1914            .ok_or("relocated list-entry pointer overflows u32")?;
1915        bytes[cursor..cursor + 4].copy_from_slice(&relocated.to_le_bytes());
1916        // Recurse per element descriptor so each entry's own internal
1917        // pointers are rebased. `original` is the entry's pre-relocation
1918        // (child-buffer-relative) offset — the coordinate the inner
1919        // pointers were written against — so the inner walk continues to
1920        // add `paste_base` exactly once.
1921        match elem {
1922            Some(PtrArrayElem::Schema(element_reloc)) => {
1923                relocate_pointers(bytes, element_reloc, original as usize, paste_base)?;
1924            }
1925            Some(PtrArrayElem::InnerList { inner }) => {
1926                relocate_list_pointer_array(
1927                    bytes,
1928                    original as usize,
1929                    Some(inner.as_ref()),
1930                    paste_base,
1931                )?;
1932            }
1933            Some(PtrArrayElem::Variant(ty)) => {
1934                relocate_variant_record(bytes, original as usize, ty, paste_base)?;
1935            }
1936            Some(PtrArrayElem::String) | None => {}
1937        }
1938        cursor += 4;
1939    }
1940    Ok(())
1941}
1942
1943fn relocate_variant_record(
1944    bytes: &mut [u8],
1945    record_start: usize,
1946    ty: &TypeRepr,
1947    paste_base: u32,
1948) -> Result<(), &'static str> {
1949    if record_start
1950        .checked_add(1)
1951        .map(|end| end > bytes.len())
1952        .unwrap_or(true)
1953    {
1954        return Err("variant tag exceeds buffer end");
1955    }
1956    let tag = bytes[record_start];
1957    let selected = variant_selected_payload(ty, tag)?;
1958    let Some(payload) = selected.payload else {
1959        return Ok(());
1960    };
1961    let payload_ty = payload.ty;
1962    if !matches!(
1963        payload_ty,
1964        TypeRepr::String
1965            | TypeRepr::List { .. }
1966            | TypeRepr::Schema { .. }
1967            | TypeRepr::Option { .. }
1968            | TypeRepr::Result { .. }
1969            | TypeRepr::Enum { .. }
1970    ) {
1971        return Ok(());
1972    }
1973    let slot_abs = variant_payload_slot_offset(record_start, &payload_ty)
1974        .ok_or("variant payload slot offset overflows usize")?;
1975    if slot_abs
1976        .checked_add(4)
1977        .map(|end| end > bytes.len())
1978        .unwrap_or(true)
1979    {
1980        return Err("variant payload pointer slot exceeds buffer end");
1981    }
1982    let mut ptr_buf = [0u8; 4];
1983    ptr_buf.copy_from_slice(&bytes[slot_abs..slot_abs + 4]);
1984    let original = u32::from_le_bytes(ptr_buf);
1985    let relocated = original
1986        .checked_add(paste_base)
1987        .ok_or("relocated variant payload pointer overflows u32")?;
1988    bytes[slot_abs..slot_abs + 4].copy_from_slice(&relocated.to_le_bytes());
1989    match &payload_ty {
1990        TypeRepr::String => Ok(()),
1991        TypeRepr::Schema { schema } => {
1992            let layout = SchemaLayout::offsets_for(schema)
1993                .map_err(|_| "variant schema payload is not layoutable")?;
1994            let reloc = RelocLayout::build(&layout, &schema.fields);
1995            relocate_pointers(bytes, &reloc, original as usize, paste_base)
1996        }
1997        TypeRepr::List { element } => {
1998            if list_payload_is_pointer_array(element) {
1999                let elem = ptr_array_elem_for(element);
2000                relocate_list_pointer_array(bytes, original as usize, elem.as_ref(), paste_base)?;
2001            }
2002            Ok(())
2003        }
2004        TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
2005            relocate_variant_record(bytes, original as usize, &payload_ty, paste_base)
2006        }
2007        _ => Ok(()),
2008    }
2009}
2010
2011/// Marshal a nested `List<List<scalar>>` arg / schema field into
2012/// `field_name`'s tail area, given the inner element [`TypeRepr`] and
2013/// the outer `Value::List` items (each element itself a `Value::List`
2014/// of inline-fixed scalars). Shared by both compiled backends so they
2015/// emit byte-identical input buffers. Inner pointer-array element lists
2016/// (`List<List<String>>` / `List<List<Schema>>`) are rejected by the
2017/// layout pass before reaching here; this routine only handles the
2018/// inline-fixed innermost elements (`Int` / `Float` / `Bool`).
2019pub fn write_nested_scalar_list(
2020    builder: &mut BufferBuilder<'_>,
2021    field_name: &str,
2022    inner: &TypeRepr,
2023    items: &[crate::value::Value],
2024) -> Result<(), BufferError> {
2025    use crate::value::Value;
2026    builder.write_list_list_with(field_name, items.len(), |i, payload| {
2027        let Value::List(inner_items) = &items[i] else {
2028            return Err(BufferError::TypeMismatch {
2029                name: format!("{field_name}[{i}]"),
2030                declared: "List<List<…>>",
2031                requested: "List",
2032            });
2033        };
2034        match inner {
2035            TypeRepr::Int => {
2036                for it in inner_items.iter() {
2037                    let Value::Int(v) = it else {
2038                        return Err(BufferError::TypeMismatch {
2039                            name: format!("{field_name}[{i}]"),
2040                            declared: "List<Int>",
2041                            requested: "List<Int>",
2042                        });
2043                    };
2044                    payload.extend_from_slice(&v.to_le_bytes());
2045                }
2046                Ok((inner_items.len(), 8))
2047            }
2048            TypeRepr::Float => {
2049                for it in inner_items.iter() {
2050                    let f = match it {
2051                        Value::Float(v) => v.into_inner(),
2052                        // Int → Float promotion, matching the scalar arm.
2053                        Value::Int(v) => *v as f64,
2054                        _ => {
2055                            return Err(BufferError::TypeMismatch {
2056                                name: format!("{field_name}[{i}]"),
2057                                declared: "List<Float>",
2058                                requested: "List<Float>",
2059                            })
2060                        }
2061                    };
2062                    payload.extend_from_slice(&f.to_le_bytes());
2063                }
2064                Ok((inner_items.len(), 8))
2065            }
2066            TypeRepr::Bool => {
2067                for it in inner_items.iter() {
2068                    let Value::Bool(v) = it else {
2069                        return Err(BufferError::TypeMismatch {
2070                            name: format!("{field_name}[{i}]"),
2071                            declared: "List<Bool>",
2072                            requested: "List<Bool>",
2073                        });
2074                    };
2075                    payload.push(u8::from(*v));
2076                }
2077                Ok((inner_items.len(), 4))
2078            }
2079            _ => Err(BufferError::TypeMismatch {
2080                name: field_name.to_string(),
2081                declared: "List<List<scalar>>",
2082                requested: "List<List<pointer-array element>>",
2083            }),
2084        }
2085    })
2086}
2087
2088/// Marshal a `List<List<String>>` / `List<List<Schema>>` (F5: a doubly-
2089/// nested pointer-array list, where each outer element is itself a
2090/// pointer-array list) into `field_name`'s tail area. `inner_element` is
2091/// the **innermost** element type (`String` / `Schema` / a deeper
2092/// `List<…>`), matching the `marshal_list_list_in` dispatch contract;
2093/// `items` are the outer `Value::List` rows (each itself a
2094/// `List<inner_element>`). The whole nested structure is written at
2095/// child-buffer-relative offsets and the field's pointer slot patched to
2096/// the outer header — the relocation walker's `PtrArrayElem::InnerList`
2097/// descriptor rebases the inner pointer arrays on the later arena rebase.
2098///
2099/// Shared by both compiled backends so they emit byte-identical input
2100/// buffers. The layout pass admits these shapes (`inner_list_record_
2101/// alignment`); the inline-fixed `List<List<scalar>>` case keeps using
2102/// [`write_nested_scalar_list`].
2103pub fn write_nested_pointer_array_list(
2104    builder: &mut BufferBuilder<'_>,
2105    field_name: &str,
2106    inner_element: &TypeRepr,
2107    items: &[crate::value::Value],
2108) -> Result<(), BufferError> {
2109    // The field slot must be a pointer-indirect `List<List<…>>` slot.
2110    let slot_offset = {
2111        let entry = builder
2112            .field_index
2113            .iter()
2114            .find(|e| e.name == field_name)
2115            .ok_or_else(|| BufferError::UnknownField {
2116                name: field_name.to_string(),
2117            })?;
2118        let is_nested = matches!(&entry.ty, TypeRepr::List { element: outer }
2119            if matches!(outer.as_ref(), TypeRepr::List { .. }));
2120        if !is_nested || !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
2121            return Err(BufferError::TypeMismatch {
2122                name: field_name.to_string(),
2123                declared: type_label(&entry.ty),
2124                requested: "List<List<String|Schema>>",
2125            });
2126        }
2127        entry.offset
2128    };
2129    // `inner_element` is the innermost element (`String` / `Schema`).
2130    // Build the outer pointer array whose entries each point at an inner
2131    // `List<inner_element>` written by recursion.
2132    let header = append_outer_pointer_array(builder, inner_element, items)?;
2133    let ptr = u32::try_from(header).map_err(|_| BufferError::ValueTooLarge {
2134        name: field_name.to_string(),
2135        len: header,
2136    })?;
2137    builder.bytes[slot_offset..slot_offset + 4].copy_from_slice(&ptr.to_le_bytes());
2138    Ok(())
2139}
2140
2141/// Write the outer pointer-array header for a `List<List<inner_element>>`
2142/// whose **innermost** element type is `inner_element` (`String` /
2143/// `Schema` / a deeper `List<…>`), returning the header's buffer-relative
2144/// offset. Each outer entry is a `List<inner_element>` value
2145/// (`Value::List`), serialised via [`BufferBuilder::append_list_payload`].
2146///
2147/// Mirrors the marshaller dispatch contract (`marshal_list_list_in`):
2148/// `inner_element` is the inner-list element, **not** the outer list
2149/// element — so for `List<List<String>>` the callers pass `String`.
2150fn append_outer_pointer_array(
2151    builder: &mut BufferBuilder<'_>,
2152    inner_element: &TypeRepr,
2153    items: &[crate::value::Value],
2154) -> Result<usize, BufferError> {
2155    use crate::value::Value;
2156    let count = items.len();
2157    if count > u32::MAX as usize {
2158        return Err(BufferError::ValueTooLarge {
2159            name: "<nested list>".to_string(),
2160            len: count,
2161        });
2162    }
2163    builder.pad_to(4);
2164    let header_offset = builder.bytes.len();
2165    builder
2166        .bytes
2167        .extend_from_slice(&(count as u32).to_le_bytes());
2168    let entries_start = builder.bytes.len();
2169    builder.bytes.resize(entries_start + count * 4, 0);
2170    let mut offsets: Vec<u32> = Vec::with_capacity(count);
2171    for (i, it) in items.iter().enumerate() {
2172        let Value::List(inner_items) = it else {
2173            return Err(BufferError::TypeMismatch {
2174                name: format!("<nested list>[{i}]"),
2175                declared: "List<List<…>>",
2176                requested: it.type_name(),
2177            });
2178        };
2179        // Each outer entry is a `List<inner_element>`; serialise it.
2180        let off = builder.append_list_payload(inner_element, inner_items)?;
2181        offsets.push(u32::try_from(off).map_err(|_| BufferError::ValueTooLarge {
2182            name: "<nested list>".to_string(),
2183            len: off,
2184        })?);
2185    }
2186    for (i, off) in offsets.iter().enumerate() {
2187        let dst = entries_start + i * 4;
2188        builder.bytes[dst..dst + 4].copy_from_slice(&off.to_le_bytes());
2189    }
2190    Ok(header_offset)
2191}
2192
2193/// Write every field of a `#schema` record (`schema`) from a branded
2194/// `Value::Dict` into `child`. Generic over field type — scalars,
2195/// `String`, `List<scalar/String/Schema/List>`, and nested `Schema`
2196/// sub-records — so the recursive nested-list marshaller has one
2197/// self-contained schema writer that produces bytes identical to the
2198/// per-backend `write_value_into_builder`. A missing / mistyped field is
2199/// a loud error.
2200fn write_schema_record_into_builder(
2201    child: &mut BufferBuilder<'_>,
2202    schema: &Schema,
2203    dict: &crate::value::ValueDict,
2204) -> Result<(), BufferError> {
2205    for field in &schema.fields {
2206        let value =
2207            dict.map
2208                .get(field.name.as_str())
2209                .ok_or_else(|| BufferError::MalformedPayload {
2210                    name: field.name.clone(),
2211                    reason: "schema record is missing a declared field",
2212                })?;
2213        write_schema_field_into_builder(child, field, value)?;
2214    }
2215    Ok(())
2216}
2217
2218/// Write every field of a tuple schema from a positional `Value::Tuple`.
2219/// The schema is still a record at the binary layer; only the source
2220/// container shape is positional.
2221fn write_tuple_record_into_builder(
2222    child: &mut BufferBuilder<'_>,
2223    schema: &Schema,
2224    items: &[crate::value::Value],
2225) -> Result<(), BufferError> {
2226    if items.len() != schema.fields.len() {
2227        return Err(BufferError::MalformedPayload {
2228            name: schema.name.clone(),
2229            reason: "tuple arity does not match schema",
2230        });
2231    }
2232    for (field, value) in schema.fields.iter().zip(items.iter()) {
2233        write_schema_field_into_builder(child, field, value)?;
2234    }
2235    Ok(())
2236}
2237
2238/// Write one schema field (`field`) carrying `value` into `child`.
2239fn write_schema_field_into_builder(
2240    child: &mut BufferBuilder<'_>,
2241    field: &Field,
2242    value: &crate::value::Value,
2243) -> Result<(), BufferError> {
2244    child.write_value(field.name.as_str(), &field.ty, value)
2245}
2246
2247/// `(discriminant, optional (payload type, payload value))` produced when
2248/// matching a value against a variant slot.
2249type VariantPayloadSlot<'a> = (u8, Option<(TypeRepr, &'a crate::value::Value)>);
2250
2251fn variant_payload_for_value<'a>(
2252    name: &str,
2253    ty: &'a TypeRepr,
2254    value: &'a crate::value::Value,
2255) -> Result<VariantPayloadSlot<'a>, BufferError> {
2256    let crate::value::Value::Dict(dict) = value else {
2257        return Err(BufferError::TypeMismatch {
2258            name: name.to_string(),
2259            declared: type_label(ty),
2260            requested: value.type_name(),
2261        });
2262    };
2263    match ty {
2264        TypeRepr::Option { inner } => match (dict.variant_of.as_deref(), dict.brand.as_deref()) {
2265            (Some("Option"), Some("None")) => Ok((0, None)),
2266            (Some("Option"), Some("Some")) => {
2267                let payload =
2268                    dict.map
2269                        .get("value")
2270                        .ok_or_else(|| BufferError::MalformedPayload {
2271                            name: name.to_string(),
2272                            reason: "Option.Some is missing `value` payload",
2273                        })?;
2274                Ok((1, Some((inner.as_ref().clone(), payload))))
2275            }
2276            _ => Err(BufferError::TypeMismatch {
2277                name: name.to_string(),
2278                declared: "Option",
2279                requested: value.type_name(),
2280            }),
2281        },
2282        TypeRepr::Result { ok, err } => match (dict.variant_of.as_deref(), dict.brand.as_deref()) {
2283            (Some("Result"), Some("Ok")) => {
2284                let payload =
2285                    dict.map
2286                        .get("value")
2287                        .ok_or_else(|| BufferError::MalformedPayload {
2288                            name: name.to_string(),
2289                            reason: "Result.Ok is missing `value` payload",
2290                        })?;
2291                Ok((0, Some((ok.as_ref().clone(), payload))))
2292            }
2293            (Some("Result"), Some("Err")) => {
2294                let payload =
2295                    dict.map
2296                        .get("error")
2297                        .ok_or_else(|| BufferError::MalformedPayload {
2298                            name: name.to_string(),
2299                            reason: "Result.Err is missing `error` payload",
2300                        })?;
2301                Ok((1, Some((err.as_ref().clone(), payload))))
2302            }
2303            _ => Err(BufferError::TypeMismatch {
2304                name: name.to_string(),
2305                declared: "Result",
2306                requested: value.type_name(),
2307            }),
2308        },
2309        TypeRepr::Enum {
2310            name: enum_name,
2311            variants,
2312        } => {
2313            let (Some(value_enum), Some(value_variant)) =
2314                (dict.variant_of.as_deref(), dict.brand.as_deref())
2315            else {
2316                return Err(BufferError::TypeMismatch {
2317                    name: name.to_string(),
2318                    declared: "Enum",
2319                    requested: value.type_name(),
2320                });
2321            };
2322            if value_enum != enum_name {
2323                return Err(BufferError::TypeMismatch {
2324                    name: name.to_string(),
2325                    declared: "Enum",
2326                    requested: value.type_name(),
2327                });
2328            }
2329            let variant = variants
2330                .iter()
2331                .find(|variant| variant.name == value_variant)
2332                .ok_or_else(|| BufferError::MalformedPayload {
2333                    name: name.to_string(),
2334                    reason: "enum value carries an unknown variant",
2335                })?;
2336            if variant.fields.is_empty() {
2337                if !dict.map.is_empty() {
2338                    return Err(BufferError::MalformedPayload {
2339                        name: name.to_string(),
2340                        reason: "unit enum variant carries payload fields",
2341                    });
2342                }
2343                return Ok((variant.tag, None));
2344            }
2345            let payload_ty = TypeRepr::Schema {
2346                schema: Box::new(variant.payload_schema(enum_name).ok_or_else(|| {
2347                    BufferError::MalformedPayload {
2348                        name: name.to_string(),
2349                        reason: "enum payload schema is missing",
2350                    }
2351                })?),
2352            };
2353            Ok((variant.tag, Some((payload_ty, value))))
2354        }
2355        _ => Err(BufferError::TypeMismatch {
2356            name: name.to_string(),
2357            declared: type_label(ty),
2358            requested: value.type_name(),
2359        }),
2360    }
2361}
2362
2363/// Type-checked reader over a record buffer plus optional tail area.
2364///
2365/// The buffer is borrowed (no copy), so inline reads cost a bounds
2366/// check plus a `from_le_bytes`. Pointer-indirect reads follow the
2367/// `u32` slot through to the tail-area `[len: u32 LE][payload]`
2368/// record, validating the bounds and (for `String`) the utf-8 bytes
2369/// against the borrowed buffer.
2370#[derive(Debug)]
2371pub struct BufferReader<'a> {
2372    layout: &'a OffsetTable,
2373    field_index: Vec<FieldEntry>,
2374    bytes: &'a [u8],
2375}
2376
2377impl<'a> BufferReader<'a> {
2378    /// Build a reader over `bytes` interpreting it under `layout`.
2379    /// Returns [`BufferError::BufferTooSmall`] when `bytes` is shorter
2380    /// than `layout.root_size` — every leaf read otherwise would have
2381    /// to repeat the same bounds check, so we do it once at
2382    /// construction.
2383    pub fn new(
2384        layout: &'a OffsetTable,
2385        fields: &[Field],
2386        bytes: &'a [u8],
2387    ) -> Result<Self, BufferError> {
2388        if bytes.len() < layout.root_size {
2389            return Err(BufferError::BufferTooSmall {
2390                have: bytes.len(),
2391                need: layout.root_size,
2392            });
2393        }
2394        let field_index = layout
2395            .fields
2396            .iter()
2397            .filter_map(|fo| {
2398                fields
2399                    .iter()
2400                    .find(|f| f.name == fo.name)
2401                    .map(|f| FieldEntry {
2402                        name: fo.name.clone(),
2403                        ty: f.ty.clone(),
2404                        offset: fo.offset,
2405                        size: fo.size,
2406                        kind: fo.kind,
2407                        list_element: fo.list_element,
2408                    })
2409            })
2410            .collect();
2411        Ok(Self {
2412            layout,
2413            field_index,
2414            bytes,
2415        })
2416    }
2417
2418    /// Build a reader whose root record's fixed area is anchored at the
2419    /// arena-absolute offset `record_base` inside the whole-arena slice
2420    /// `bytes`, rather than at offset `0`.
2421    ///
2422    /// This is the F1 object-return decode entry point: under the
2423    /// arena-absolute slot convention the object head lives at `out_ptr`
2424    /// and every pointer slot it carries holds an arena-absolute offset,
2425    /// so the reader walks the **whole arena** (`bytes`) and each field
2426    /// slot is read at `record_base + fo.offset`. Mirrors the per-entry
2427    /// field-index rebase [`Self::read_list_record_at`] /
2428    /// [`Self::sub_record`] already perform, so a subsequent
2429    /// pointer-indirect read resolves its arena-absolute slot value
2430    /// directly against `bytes`.
2431    pub fn new_at_base(
2432        layout: &'a OffsetTable,
2433        fields: &[Field],
2434        bytes: &'a [u8],
2435        record_base: usize,
2436    ) -> Result<Self, BufferError> {
2437        let record_end =
2438            record_base
2439                .checked_add(layout.root_size)
2440                .ok_or(BufferError::BufferTooSmall {
2441                    have: bytes.len(),
2442                    need: usize::MAX,
2443                })?;
2444        if record_end > bytes.len() {
2445            return Err(BufferError::BufferTooSmall {
2446                have: bytes.len(),
2447                need: record_end,
2448            });
2449        }
2450        let field_index = layout
2451            .fields
2452            .iter()
2453            .filter_map(|fo| {
2454                fields
2455                    .iter()
2456                    .find(|f| f.name == fo.name)
2457                    .map(|f| FieldEntry {
2458                        name: fo.name.clone(),
2459                        ty: f.ty.clone(),
2460                        offset: record_base + fo.offset,
2461                        size: fo.size,
2462                        kind: fo.kind,
2463                        list_element: fo.list_element,
2464                    })
2465            })
2466            .collect();
2467        Ok(Self {
2468            layout,
2469            field_index,
2470            bytes,
2471        })
2472    }
2473
2474    /// Read a 64-bit signed integer from `field_name`.
2475    pub fn read_int(&self, field_name: &str) -> Result<i64, BufferError> {
2476        let (offset, _, _) = self.locate(field_name, &TypeRepr::Int, "Int")?;
2477        let mut buf = [0u8; 8];
2478        buf.copy_from_slice(&self.bytes[offset..offset + 8]);
2479        Ok(i64::from_le_bytes(buf))
2480    }
2481
2482    /// Read a 64-bit float from `field_name`.
2483    pub fn read_float(&self, field_name: &str) -> Result<f64, BufferError> {
2484        let (offset, _, _) = self.locate(field_name, &TypeRepr::Float, "Float")?;
2485        let mut buf = [0u8; 8];
2486        buf.copy_from_slice(&self.bytes[offset..offset + 8]);
2487        Ok(f64::from_le_bytes(buf))
2488    }
2489
2490    /// Read a boolean from `field_name`. Any non-zero byte decodes
2491    /// as `true` (the layout only writes 0 or 1, but defensive
2492    /// decoding makes the reader robust against buffer corruption).
2493    pub fn read_bool(&self, field_name: &str) -> Result<bool, BufferError> {
2494        let (offset, _, _) = self.locate(field_name, &TypeRepr::Bool, "Bool")?;
2495        Ok(self.bytes[offset] != 0)
2496    }
2497
2498    /// Confirm `field_name` is declared as an internal unit slot and that the slot
2499    /// is reachable. The byte value is unused (Unit slots are
2500    /// tag-only), so this only validates the type label.
2501    pub fn read_unit(&self, field_name: &str) -> Result<(), BufferError> {
2502        let (_, _, _) = self.locate(field_name, &TypeRepr::Unit, "Unit")?;
2503        Ok(())
2504    }
2505
2506    /// Read any supported value shape from `field_name` using its canonical
2507    /// type. Mirrors [`BufferBuilder::write_value`].
2508    pub fn read_value(
2509        &self,
2510        field_name: &str,
2511        ty: &TypeRepr,
2512    ) -> Result<crate::value::Value, BufferError> {
2513        let entry = self.find_entry(field_name)?;
2514        if !type_matches(&entry.ty, ty) {
2515            return Err(BufferError::TypeMismatch {
2516                name: field_name.to_string(),
2517                declared: type_label(&entry.ty),
2518                requested: type_label(ty),
2519            });
2520        }
2521        self.read_field_at(entry.offset, ty)
2522    }
2523
2524    /// Read a UTF-8 string from `field_name`. Follows the fixed-area
2525    /// `u32` pointer into the tail area, validates the length prefix
2526    /// + payload bounds, and decodes the bytes as utf-8.
2527    pub fn read_string(&self, field_name: &str) -> Result<&'a str, BufferError> {
2528        let (ptr_offset, _, kind) = self.locate(field_name, &TypeRepr::String, "String")?;
2529        if !matches!(kind, FieldKind::PointerIndirect { .. }) {
2530            return Err(BufferError::MalformedPayload {
2531                name: field_name.to_string(),
2532                reason: "expected pointer-indirect kind",
2533            });
2534        }
2535        let (len, payload_start) = self.decode_pointer_header(field_name, ptr_offset, 0)?;
2536        let payload_end =
2537            payload_start
2538                .checked_add(len)
2539                .ok_or_else(|| BufferError::MalformedPayload {
2540                    name: field_name.to_string(),
2541                    reason: "payload end overflows usize",
2542                })?;
2543        if payload_end > self.bytes.len() {
2544            return Err(BufferError::MalformedPayload {
2545                name: field_name.to_string(),
2546                reason: "payload exceeds buffer end",
2547            });
2548        }
2549        std::str::from_utf8(&self.bytes[payload_start..payload_end]).map_err(|_| {
2550            BufferError::MalformedPayload {
2551                name: field_name.to_string(),
2552                reason: "payload is not valid utf-8",
2553            }
2554        })
2555    }
2556
2557    /// Read a `List<Int>` from `field_name`. Follows the fixed-area
2558    /// `u32` pointer into the tail area and copies the i64 elements
2559    /// into a fresh `Vec<i64>` so callers don't have to wrestle with
2560    /// alignment of the borrowed slice.
2561    pub fn read_list_int(&self, field_name: &str) -> Result<Vec<i64>, BufferError> {
2562        let (ptr_offset, _, kind) = self.locate(
2563            field_name,
2564            &TypeRepr::List {
2565                element: Box::new(TypeRepr::Int),
2566            },
2567            "List<Int>",
2568        )?;
2569        let FieldKind::PointerIndirect { tail_alignment } = kind else {
2570            return Err(BufferError::MalformedPayload {
2571                name: field_name.to_string(),
2572                reason: "expected pointer-indirect kind",
2573            });
2574        };
2575        let (count, payload_start) =
2576            self.decode_pointer_header(field_name, ptr_offset, tail_alignment)?;
2577        let byte_len = count
2578            .checked_mul(8)
2579            .ok_or_else(|| BufferError::MalformedPayload {
2580                name: field_name.to_string(),
2581                reason: "byte length overflows usize",
2582            })?;
2583        let payload_end =
2584            payload_start
2585                .checked_add(byte_len)
2586                .ok_or_else(|| BufferError::MalformedPayload {
2587                    name: field_name.to_string(),
2588                    reason: "payload end overflows usize",
2589                })?;
2590        if payload_end > self.bytes.len() {
2591            return Err(BufferError::MalformedPayload {
2592                name: field_name.to_string(),
2593                reason: "payload exceeds buffer end",
2594            });
2595        }
2596        let mut out = Vec::with_capacity(count);
2597        let mut cursor = payload_start;
2598        for _ in 0..count {
2599            let mut buf = [0u8; 8];
2600            buf.copy_from_slice(&self.bytes[cursor..cursor + 8]);
2601            out.push(i64::from_le_bytes(buf));
2602            cursor += 8;
2603        }
2604        Ok(out)
2605    }
2606
2607    /// Read a `List<Float>` from `field_name`. Tail layout mirrors
2608    /// `List<Int>` — `[len: u32][pad to 8][f64 elements]`.
2609    pub fn read_list_float(&self, field_name: &str) -> Result<Vec<f64>, BufferError> {
2610        let entry = self.find_entry(field_name)?;
2611        let elem = match &entry.ty {
2612            TypeRepr::List { element } if matches!(element.as_ref(), TypeRepr::Float) => {
2613                element.as_ref()
2614            }
2615            _ => {
2616                return Err(BufferError::TypeMismatch {
2617                    name: field_name.to_string(),
2618                    declared: type_label(&entry.ty),
2619                    requested: "List<Float>",
2620                });
2621            }
2622        };
2623        let _ = elem;
2624        let FieldKind::PointerIndirect { tail_alignment } = entry.kind else {
2625            return Err(BufferError::MalformedPayload {
2626                name: field_name.to_string(),
2627                reason: "expected pointer-indirect kind",
2628            });
2629        };
2630        let (count, payload_start) =
2631            self.decode_pointer_header(field_name, entry.offset, tail_alignment)?;
2632        let byte_len = count
2633            .checked_mul(8)
2634            .ok_or_else(|| BufferError::MalformedPayload {
2635                name: field_name.to_string(),
2636                reason: "byte length overflows usize",
2637            })?;
2638        let payload_end =
2639            payload_start
2640                .checked_add(byte_len)
2641                .ok_or_else(|| BufferError::MalformedPayload {
2642                    name: field_name.to_string(),
2643                    reason: "payload end overflows usize",
2644                })?;
2645        if payload_end > self.bytes.len() {
2646            return Err(BufferError::MalformedPayload {
2647                name: field_name.to_string(),
2648                reason: "payload exceeds buffer end",
2649            });
2650        }
2651        let mut out = Vec::with_capacity(count);
2652        let mut cursor = payload_start;
2653        for _ in 0..count {
2654            let mut buf = [0u8; 8];
2655            buf.copy_from_slice(&self.bytes[cursor..cursor + 8]);
2656            out.push(f64::from_le_bytes(buf));
2657            cursor += 8;
2658        }
2659        Ok(out)
2660    }
2661
2662    /// Read a `List<Bool>` from `field_name`. Tail layout `[len: u32]
2663    /// [u8 booleans]`. Non-zero bytes decode as `true` — defensive,
2664    /// the writer always emits `0` / `1`.
2665    pub fn read_list_bool(&self, field_name: &str) -> Result<Vec<bool>, BufferError> {
2666        let entry = self.find_entry(field_name)?;
2667        match &entry.ty {
2668            TypeRepr::List { element } if matches!(element.as_ref(), TypeRepr::Bool) => {}
2669            _ => {
2670                return Err(BufferError::TypeMismatch {
2671                    name: field_name.to_string(),
2672                    declared: type_label(&entry.ty),
2673                    requested: "List<Bool>",
2674                });
2675            }
2676        }
2677        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
2678            return Err(BufferError::MalformedPayload {
2679                name: field_name.to_string(),
2680                reason: "expected pointer-indirect kind",
2681            });
2682        }
2683        let (count, payload_start) = self.decode_pointer_header(field_name, entry.offset, 0)?;
2684        let payload_end =
2685            payload_start
2686                .checked_add(count)
2687                .ok_or_else(|| BufferError::MalformedPayload {
2688                    name: field_name.to_string(),
2689                    reason: "payload end overflows usize",
2690                })?;
2691        if payload_end > self.bytes.len() {
2692            return Err(BufferError::MalformedPayload {
2693                name: field_name.to_string(),
2694                reason: "payload exceeds buffer end",
2695            });
2696        }
2697        let mut out = Vec::with_capacity(count);
2698        for i in 0..count {
2699            out.push(self.bytes[payload_start + i] != 0);
2700        }
2701        Ok(out)
2702    }
2703
2704    /// Read a `List<String>` from `field_name`. Walks the pointer
2705    /// array, then decodes each per-entry `[len: u32][bytes]` record
2706    /// as UTF-8 borrowed from the underlying buffer.
2707    pub fn read_list_string(&self, field_name: &str) -> Result<Vec<&'a str>, BufferError> {
2708        let entry = self.find_entry(field_name)?;
2709        match &entry.ty {
2710            TypeRepr::List { element } if matches!(element.as_ref(), TypeRepr::String) => {}
2711            _ => {
2712                return Err(BufferError::TypeMismatch {
2713                    name: field_name.to_string(),
2714                    declared: type_label(&entry.ty),
2715                    requested: "List<String>",
2716                });
2717            }
2718        }
2719        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
2720            return Err(BufferError::MalformedPayload {
2721                name: field_name.to_string(),
2722                reason: "expected pointer-indirect kind",
2723            });
2724        }
2725        // `decode_pointer_header` with `inner_alignment = 0` keeps the
2726        // payload start at `header + 4` (no extra pad), which is the
2727        // pointer-array start. Each entry is a u32 buffer-relative
2728        // offset pointing at a String `[len: u32][bytes]` record.
2729        let (count, entries_start) = self.decode_pointer_header(field_name, entry.offset, 0)?;
2730        let mut out = Vec::with_capacity(count);
2731        for i in 0..count {
2732            let cursor = entries_start + i * 4;
2733            if cursor + 4 > self.bytes.len() {
2734                return Err(BufferError::MalformedPayload {
2735                    name: field_name.to_string(),
2736                    reason: "list entry pointer exceeds buffer end",
2737                });
2738            }
2739            let mut entry_buf = [0u8; 4];
2740            entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
2741            let record_start = u32::from_le_bytes(entry_buf) as usize;
2742            if record_start + 4 > self.bytes.len() {
2743                return Err(BufferError::MalformedPayload {
2744                    name: field_name.to_string(),
2745                    reason: "list string len prefix exceeds buffer end",
2746                });
2747            }
2748            let mut len_buf = [0u8; 4];
2749            len_buf.copy_from_slice(&self.bytes[record_start..record_start + 4]);
2750            let str_len = u32::from_le_bytes(len_buf) as usize;
2751            let payload_start = record_start + 4;
2752            let payload_end = payload_start.checked_add(str_len).ok_or_else(|| {
2753                BufferError::MalformedPayload {
2754                    name: field_name.to_string(),
2755                    reason: "list string payload end overflows usize",
2756                }
2757            })?;
2758            if payload_end > self.bytes.len() {
2759                return Err(BufferError::MalformedPayload {
2760                    name: field_name.to_string(),
2761                    reason: "list string payload exceeds buffer end",
2762                });
2763            }
2764            let s = std::str::from_utf8(&self.bytes[payload_start..payload_end]).map_err(|_| {
2765                BufferError::MalformedPayload {
2766                    name: field_name.to_string(),
2767                    reason: "list string payload is not valid utf-8",
2768                }
2769            })?;
2770            out.push(s);
2771        }
2772        Ok(out)
2773    }
2774
2775    /// Read a `List<String>` whose pointer-array header sits **directly**
2776    /// at `header_off` (rather than behind a named fixed-area slot). This
2777    /// is the in-place region-walk return entry point (S3): the machine
2778    /// code reports the arena-relative offset of the outer
2779    /// `[len][off_0]…[off_{N-1}]` header, the host rebases it to
2780    /// `header_off`, and — after the verifier certifies the whole graph
2781    /// stays in-region — this walks the same bytes [`read_list_string`]
2782    /// would, so a top-level (field-slot) and an in-place decode of the
2783    /// same buffer are byte-identical, including each string's content.
2784    ///
2785    /// Every offset / length is bounds-checked against the buffer end
2786    /// before any read (the verifier already proved the tighter
2787    /// single-region bound); a non-UTF-8 payload is a loud error, matching
2788    /// [`read_list_string`] and the `Value::String` invariant (Relon
2789    /// strings are always valid UTF-8).
2790    pub fn read_list_string_at(&self, header_off: usize) -> Result<Vec<&'a str>, BufferError> {
2791        // Header: `[len: u32][off_0]…`. `entries_start = header_off + 4`.
2792        if header_off
2793            .checked_add(4)
2794            .map(|end| end > self.bytes.len())
2795            .unwrap_or(true)
2796        {
2797            return Err(BufferError::MalformedPayload {
2798                name: "<in-place root>".to_string(),
2799                reason: "in-place list-string header length prefix exceeds buffer end",
2800            });
2801        }
2802        let mut len_buf = [0u8; 4];
2803        len_buf.copy_from_slice(&self.bytes[header_off..header_off + 4]);
2804        let count = u32::from_le_bytes(len_buf) as usize;
2805        let entries_start = header_off + 4;
2806        let mut out = Vec::with_capacity(count);
2807        for i in 0..count {
2808            let cursor =
2809                entries_start
2810                    .checked_add(i * 4)
2811                    .ok_or_else(|| BufferError::MalformedPayload {
2812                        name: "<in-place root>".to_string(),
2813                        reason: "in-place list-string entry cursor overflows usize",
2814                    })?;
2815            if cursor + 4 > self.bytes.len() {
2816                return Err(BufferError::MalformedPayload {
2817                    name: "<in-place root>".to_string(),
2818                    reason: "in-place list-string entry pointer exceeds buffer end",
2819                });
2820            }
2821            let mut entry_buf = [0u8; 4];
2822            entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
2823            let record_start = u32::from_le_bytes(entry_buf) as usize;
2824            if record_start + 4 > self.bytes.len() {
2825                return Err(BufferError::MalformedPayload {
2826                    name: "<in-place root>".to_string(),
2827                    reason: "in-place list-string len prefix exceeds buffer end",
2828                });
2829            }
2830            let mut sl_buf = [0u8; 4];
2831            sl_buf.copy_from_slice(&self.bytes[record_start..record_start + 4]);
2832            let str_len = u32::from_le_bytes(sl_buf) as usize;
2833            let payload_start = record_start + 4;
2834            let payload_end = payload_start.checked_add(str_len).ok_or_else(|| {
2835                BufferError::MalformedPayload {
2836                    name: "<in-place root>".to_string(),
2837                    reason: "in-place list-string payload end overflows usize",
2838                }
2839            })?;
2840            if payload_end > self.bytes.len() {
2841                return Err(BufferError::MalformedPayload {
2842                    name: "<in-place root>".to_string(),
2843                    reason: "in-place list-string payload exceeds buffer end",
2844                });
2845            }
2846            let s = std::str::from_utf8(&self.bytes[payload_start..payload_end]).map_err(|_| {
2847                BufferError::MalformedPayload {
2848                    name: "<in-place root>".to_string(),
2849                    reason: "in-place list-string payload is not valid utf-8",
2850                }
2851            })?;
2852            out.push(s);
2853        }
2854        Ok(out)
2855    }
2856
2857    /// Read a `List<Schema>` from `field_name`. Returns a vector of
2858    /// [`BufferReader`]s, one per entry, each anchored at the
2859    /// matching sub-record's fixed area. The readers share the parent
2860    /// buffer slice so subsequent String / Int / Schema reads through
2861    /// them resolve back into the same tail area.
2862    pub fn read_list_record<'b>(
2863        &self,
2864        field_name: &str,
2865        elem_layout: &'b OffsetTable,
2866        elem_schema: &'b Schema,
2867    ) -> Result<Vec<BufferReader<'a>>, BufferError>
2868    where
2869        'b: 'a,
2870    {
2871        let entry = self.find_entry(field_name)?;
2872        match &entry.ty {
2873            TypeRepr::List { element } => match element.as_ref() {
2874                TypeRepr::Schema { schema } => {
2875                    if schema.as_ref() != elem_schema {
2876                        return Err(BufferError::TypeMismatch {
2877                            name: field_name.to_string(),
2878                            declared: type_label(&entry.ty),
2879                            requested: "List<Schema>",
2880                        });
2881                    }
2882                }
2883                _ => {
2884                    return Err(BufferError::TypeMismatch {
2885                        name: field_name.to_string(),
2886                        declared: type_label(&entry.ty),
2887                        requested: "List<Schema>",
2888                    });
2889                }
2890            },
2891            _ => {
2892                return Err(BufferError::TypeMismatch {
2893                    name: field_name.to_string(),
2894                    declared: type_label(&entry.ty),
2895                    requested: "List<Schema>",
2896                });
2897            }
2898        }
2899        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
2900            return Err(BufferError::MalformedPayload {
2901                name: field_name.to_string(),
2902                reason: "expected pointer-indirect kind",
2903            });
2904        }
2905        let (count, entries_start) = self.decode_pointer_header(field_name, entry.offset, 0)?;
2906        let mut out: Vec<BufferReader<'a>> = Vec::with_capacity(count);
2907        for i in 0..count {
2908            let cursor = entries_start + i * 4;
2909            if cursor + 4 > self.bytes.len() {
2910                return Err(BufferError::MalformedPayload {
2911                    name: field_name.to_string(),
2912                    reason: "list entry pointer exceeds buffer end",
2913                });
2914            }
2915            let mut entry_buf = [0u8; 4];
2916            entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
2917            let sub_base = u32::from_le_bytes(entry_buf) as usize;
2918            let sub_end = sub_base.checked_add(elem_layout.root_size).ok_or_else(|| {
2919                BufferError::MalformedPayload {
2920                    name: field_name.to_string(),
2921                    reason: "list sub-record end overflows usize",
2922                }
2923            })?;
2924            if sub_end > self.bytes.len() {
2925                return Err(BufferError::MalformedPayload {
2926                    name: field_name.to_string(),
2927                    reason: "list sub-record exceeds buffer end",
2928                });
2929            }
2930            let field_index = elem_layout
2931                .fields
2932                .iter()
2933                .filter_map(|fo| {
2934                    elem_schema
2935                        .fields
2936                        .iter()
2937                        .find(|f| f.name == fo.name)
2938                        .map(|f| FieldEntry {
2939                            name: fo.name.clone(),
2940                            ty: f.ty.clone(),
2941                            offset: sub_base + fo.offset,
2942                            size: fo.size,
2943                            kind: fo.kind,
2944                            list_element: fo.list_element,
2945                        })
2946                })
2947                .collect();
2948            out.push(BufferReader {
2949                layout: elem_layout,
2950                field_index,
2951                bytes: self.bytes,
2952            });
2953        }
2954        Ok(out)
2955    }
2956
2957    /// Read a `List<Schema>` whose **outer pointer-array header** sits
2958    /// directly at `header_off` (a region-relative offset into
2959    /// `self.bytes`), rather than being reached through a record's
2960    /// fixed-area slot.
2961    ///
2962    /// This is the reader half of the in-place region-walk return ABI
2963    /// (S4). The machine code returns the arena-absolute offset of the
2964    /// root list header `[len][off_0]…[off_{N-1}]`; the host rebases it to
2965    /// a region-relative offset, runs [`crate::verifier::verify_value_at`]
2966    /// over the whole reachable graph (outer entries → each sub-record's
2967    /// fixed area → every String / List field pointer the sub-record
2968    /// carries), and only then calls this to decode in place. Each entry
2969    /// pointer names a sub-record anchored at `elem_layout`; a
2970    /// [`BufferReader`] is produced per entry sharing the same buffer
2971    /// slice, so subsequent field reads (`read_string`, `read_list_int`,
2972    /// …) resolve back into the same tail area — bit-identical to the
2973    /// field-slot [`Self::read_list_record`] path the tree-walk oracle's
2974    /// writer feeds.
2975    pub fn read_list_record_at<'b>(
2976        &self,
2977        header_off: usize,
2978        elem_layout: &'b OffsetTable,
2979        elem_schema: &'b Schema,
2980    ) -> Result<Vec<BufferReader<'a>>, BufferError>
2981    where
2982        'b: 'a,
2983    {
2984        // Header sits directly at `header_off`: `[len][off_0]…`.
2985        if header_off
2986            .checked_add(4)
2987            .map(|end| end > self.bytes.len())
2988            .unwrap_or(true)
2989        {
2990            return Err(BufferError::MalformedPayload {
2991                name: "<in-place root>".to_string(),
2992                reason: "in-place list-record header length prefix exceeds buffer end",
2993            });
2994        }
2995        let mut len_buf = [0u8; 4];
2996        len_buf.copy_from_slice(&self.bytes[header_off..header_off + 4]);
2997        let count = u32::from_le_bytes(len_buf) as usize;
2998        let entries_start = header_off + 4;
2999        let mut out: Vec<BufferReader<'a>> = Vec::with_capacity(count);
3000        for i in 0..count {
3001            let cursor =
3002                entries_start
3003                    .checked_add(i * 4)
3004                    .ok_or_else(|| BufferError::MalformedPayload {
3005                        name: "<in-place root>".to_string(),
3006                        reason: "in-place list-record entry cursor overflows usize",
3007                    })?;
3008            if cursor + 4 > self.bytes.len() {
3009                return Err(BufferError::MalformedPayload {
3010                    name: "<in-place root>".to_string(),
3011                    reason: "in-place list-record entry pointer exceeds buffer end",
3012                });
3013            }
3014            let mut entry_buf = [0u8; 4];
3015            entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
3016            let sub_base = u32::from_le_bytes(entry_buf) as usize;
3017            let sub_end = sub_base.checked_add(elem_layout.root_size).ok_or_else(|| {
3018                BufferError::MalformedPayload {
3019                    name: "<in-place root>".to_string(),
3020                    reason: "in-place list-record sub-record end overflows usize",
3021                }
3022            })?;
3023            if sub_end > self.bytes.len() {
3024                return Err(BufferError::MalformedPayload {
3025                    name: "<in-place root>".to_string(),
3026                    reason: "in-place list-record sub-record exceeds buffer end",
3027                });
3028            }
3029            let field_index = elem_layout
3030                .fields
3031                .iter()
3032                .filter_map(|fo| {
3033                    elem_schema
3034                        .fields
3035                        .iter()
3036                        .find(|f| f.name == fo.name)
3037                        .map(|f| FieldEntry {
3038                            name: fo.name.clone(),
3039                            ty: f.ty.clone(),
3040                            offset: sub_base + fo.offset,
3041                            size: fo.size,
3042                            kind: fo.kind,
3043                            list_element: fo.list_element,
3044                        })
3045                })
3046                .collect();
3047            out.push(BufferReader {
3048                layout: elem_layout,
3049                field_index,
3050                bytes: self.bytes,
3051            });
3052        }
3053        Ok(out)
3054    }
3055
3056    /// Recursively decode a `List<element>` whose **header sits directly
3057    /// at `header_off`** (a region-relative offset into `self.bytes`) into
3058    /// a `Vec<Value>`, dispatching on the declared `element` type. This is
3059    /// the unified in-place list reader behind the F5 doubly-nested
3060    /// pointer-array shapes (`List<List<String>>` / `List<List<Schema>>`):
3061    /// the outer call reads the outer pointer array, and each entry —
3062    /// being itself a `List<inner>` — recurses one level deeper, exactly
3063    /// the depth the verifier already certified. Every offset / length is
3064    /// bounds-checked against `self.bytes`; a non-UTF-8 String payload is a
3065    /// loud error. The produced values are bit-identical to the field-slot
3066    /// readers the tree-walk oracle's writer feeds.
3067    pub fn read_list_value_at(
3068        &self,
3069        header_off: usize,
3070        element: &TypeRepr,
3071    ) -> Result<Vec<crate::value::Value>, BufferError> {
3072        use crate::value::Value;
3073        match element {
3074            // Inline-fixed scalar inner list / pointer-array String /
3075            // nested-list element: dispatch through the existing readers.
3076            TypeRepr::Int | TypeRepr::Float | TypeRepr::Bool => {
3077                // A `List<scalar>` whose header is at `header_off`:
3078                // `[len][pad][payload]`. Decode directly.
3079                self.read_inline_scalar_list_at(header_off, element)
3080            }
3081            TypeRepr::String => Ok(self
3082                .read_list_string_at(header_off)?
3083                .into_iter()
3084                .map(|s| Value::String(s.into()))
3085                .collect()),
3086            TypeRepr::List { element: inner } => {
3087                // Outer pointer array; recurse per entry into the inner
3088                // list it points at.
3089                let (count, entries_start) = self.read_pointer_array_header(header_off)?;
3090                let mut out = Vec::with_capacity(count);
3091                for i in 0..count {
3092                    let entry = self.read_entry_pointer(entries_start, i)?;
3093                    let inner_vals = self.read_list_value_at(entry, inner)?;
3094                    out.push(Value::List(std::sync::Arc::new(inner_vals)));
3095                }
3096                Ok(out)
3097            }
3098            TypeRepr::Schema { schema } => {
3099                // Outer pointer array; each entry points at a sub-record's
3100                // fixed area. Decode each at its absolute base recursively.
3101                let elem_layout = SchemaLayout::offsets_for(schema).map_err(|_| {
3102                    BufferError::MalformedPayload {
3103                        name: "<in-place root>".to_string(),
3104                        reason: "inner List<Schema> element schema is not layoutable",
3105                    }
3106                })?;
3107                let (count, entries_start) = self.read_pointer_array_header(header_off)?;
3108                let mut out = Vec::with_capacity(count);
3109                for i in 0..count {
3110                    let sub_base = self.read_entry_pointer(entries_start, i)?;
3111                    let sub_end = sub_base.checked_add(elem_layout.root_size).ok_or_else(|| {
3112                        BufferError::MalformedPayload {
3113                            name: "<in-place root>".to_string(),
3114                            reason: "in-place sub-record end overflows usize",
3115                        }
3116                    })?;
3117                    if sub_end > self.bytes.len() {
3118                        return Err(BufferError::MalformedPayload {
3119                            name: "<in-place root>".to_string(),
3120                            reason: "in-place sub-record exceeds buffer end",
3121                        });
3122                    }
3123                    out.push(self.read_record_at(sub_base, &elem_layout, schema)?);
3124                }
3125                Ok(out)
3126            }
3127            TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
3128                let (count, entries_start) = self.read_pointer_array_header(header_off)?;
3129                let mut out = Vec::with_capacity(count);
3130                for i in 0..count {
3131                    let variant_base = self.read_entry_pointer(entries_start, i)?;
3132                    out.push(self.read_variant_record_at(variant_base, element)?);
3133                }
3134                Ok(out)
3135            }
3136            other => Err(BufferError::TypeMismatch {
3137                name: "<in-place root>".to_string(),
3138                declared: type_label(other),
3139                requested: "List<scalar/String/Schema/List/Option/Result>",
3140            }),
3141        }
3142    }
3143
3144    /// Field-slot entry point for the recursive list reader: resolve the
3145    /// pointer-indirect `field_name` slot to its list header offset, then
3146    /// decode through [`Self::read_list_value_at`]. `element` is the
3147    /// outer list's element type. Used by the object / sub-record decode
3148    /// path so a `List<List<String|Schema>>` field decodes to the same
3149    /// `Vec<Value>` the in-place return path produces.
3150    pub fn read_list_value(
3151        &self,
3152        field_name: &str,
3153        element: &TypeRepr,
3154    ) -> Result<Vec<crate::value::Value>, BufferError> {
3155        let entry = self.find_entry(field_name)?;
3156        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
3157            return Err(BufferError::MalformedPayload {
3158                name: field_name.to_string(),
3159                reason: "expected pointer-indirect list slot",
3160            });
3161        }
3162        let header_off = self.read_slot_pointer(entry.offset)?;
3163        self.read_list_value_at(header_off, element)
3164    }
3165
3166    /// Read the `[len][off_i]…` pointer-array header at `header_off`,
3167    /// returning `(count, entries_start)` after bounds-checking the length
3168    /// prefix.
3169    fn read_pointer_array_header(&self, header_off: usize) -> Result<(usize, usize), BufferError> {
3170        if header_off
3171            .checked_add(4)
3172            .map(|end| end > self.bytes.len())
3173            .unwrap_or(true)
3174        {
3175            return Err(BufferError::MalformedPayload {
3176                name: "<in-place root>".to_string(),
3177                reason: "in-place list header length prefix exceeds buffer end",
3178            });
3179        }
3180        let mut len_buf = [0u8; 4];
3181        len_buf.copy_from_slice(&self.bytes[header_off..header_off + 4]);
3182        let count = u32::from_le_bytes(len_buf) as usize;
3183        Ok((count, header_off + 4))
3184    }
3185
3186    /// Read the `i`-th entry pointer (a `u32`) of a pointer array whose
3187    /// entries start at `entries_start`, bounds-checked.
3188    fn read_entry_pointer(&self, entries_start: usize, i: usize) -> Result<usize, BufferError> {
3189        let cursor =
3190            entries_start
3191                .checked_add(i * 4)
3192                .ok_or_else(|| BufferError::MalformedPayload {
3193                    name: "<in-place root>".to_string(),
3194                    reason: "in-place list entry cursor overflows usize",
3195                })?;
3196        if cursor + 4 > self.bytes.len() {
3197            return Err(BufferError::MalformedPayload {
3198                name: "<in-place root>".to_string(),
3199                reason: "in-place list entry pointer exceeds buffer end",
3200            });
3201        }
3202        let mut entry_buf = [0u8; 4];
3203        entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
3204        Ok(u32::from_le_bytes(entry_buf) as usize)
3205    }
3206
3207    /// Decode a `List<scalar>` whose `[len][pad][payload]` record sits at
3208    /// `header_off`, dispatching on the scalar element type. Mirrors the
3209    /// field-slot `read_list_int` / `read_list_float` / `read_list_bool`
3210    /// element decode.
3211    fn read_inline_scalar_list_at(
3212        &self,
3213        header_off: usize,
3214        element: &TypeRepr,
3215    ) -> Result<Vec<crate::value::Value>, BufferError> {
3216        use crate::value::Value;
3217        if header_off
3218            .checked_add(4)
3219            .map(|end| end > self.bytes.len())
3220            .unwrap_or(true)
3221        {
3222            return Err(BufferError::MalformedPayload {
3223                name: "<in-place root>".to_string(),
3224                reason: "inline scalar list header length prefix exceeds buffer end",
3225            });
3226        }
3227        let mut len_buf = [0u8; 4];
3228        len_buf.copy_from_slice(&self.bytes[header_off..header_off + 4]);
3229        let count = u32::from_le_bytes(len_buf) as usize;
3230        let (elem_size, align): (usize, usize) = match element {
3231            TypeRepr::Int | TypeRepr::Float => (8, 8),
3232            TypeRepr::Bool => (1, 1),
3233            other => {
3234                return Err(BufferError::TypeMismatch {
3235                    name: "<in-place root>".to_string(),
3236                    declared: type_label(other),
3237                    requested: "List<scalar>",
3238                })
3239            }
3240        };
3241        let payload_start = if align > 1 {
3242            (header_off + 4).next_multiple_of(align)
3243        } else {
3244            header_off + 4
3245        };
3246        let byte_len =
3247            count
3248                .checked_mul(elem_size)
3249                .ok_or_else(|| BufferError::MalformedPayload {
3250                    name: "<in-place root>".to_string(),
3251                    reason: "inline scalar list byte length overflows usize",
3252                })?;
3253        let end =
3254            payload_start
3255                .checked_add(byte_len)
3256                .ok_or_else(|| BufferError::MalformedPayload {
3257                    name: "<in-place root>".to_string(),
3258                    reason: "inline scalar list payload end overflows usize",
3259                })?;
3260        if end > self.bytes.len() {
3261            return Err(BufferError::MalformedPayload {
3262                name: "<in-place root>".to_string(),
3263                reason: "inline scalar list payload exceeds buffer end",
3264            });
3265        }
3266        let mut out = Vec::with_capacity(count);
3267        for k in 0..count {
3268            let off = payload_start + k * elem_size;
3269            match element {
3270                TypeRepr::Int => {
3271                    let mut b = [0u8; 8];
3272                    b.copy_from_slice(&self.bytes[off..off + 8]);
3273                    out.push(Value::Int(i64::from_le_bytes(b)));
3274                }
3275                TypeRepr::Float => {
3276                    let mut b = [0u8; 8];
3277                    b.copy_from_slice(&self.bytes[off..off + 8]);
3278                    out.push(Value::Float(ordered_float::OrderedFloat(
3279                        f64::from_le_bytes(b),
3280                    )));
3281                }
3282                TypeRepr::Bool => out.push(Value::Bool(self.bytes[off] != 0)),
3283                _ => unreachable!("scalar element validated above"),
3284            }
3285        }
3286        Ok(out)
3287    }
3288
3289    /// Decode a `#schema` sub-record whose fixed area sits at the
3290    /// arena/region offset `record_base` into a branded `Value::Dict`,
3291    /// reading each field at `record_base + fo.offset`. Generic over field
3292    /// type — scalars, `String`, `List<…>` (via the recursive
3293    /// [`Self::read_list_value_at`]), and nested `Schema` — so a
3294    /// `List<List<Schema>>` element or a sub-record carrying a
3295    /// `List<List<String>>` field decodes to the same `Value` the
3296    /// tree-walk oracle produces. Bounds are checked at each pointer
3297    /// dereference; the verifier has already certified the whole graph.
3298    fn read_record_at(
3299        &self,
3300        record_base: usize,
3301        layout: &OffsetTable,
3302        schema: &Schema,
3303    ) -> Result<crate::value::Value, BufferError> {
3304        use crate::smol_str::SmolStr;
3305        use crate::value::Value;
3306        if schema.is_tuple {
3307            let mut items = Vec::with_capacity(schema.fields.len());
3308            for field in &schema.fields {
3309                let fo = layout
3310                    .fields
3311                    .iter()
3312                    .find(|fo| fo.name == field.name)
3313                    .ok_or_else(|| BufferError::MalformedPayload {
3314                        name: field.name.clone(),
3315                        reason: "tuple field missing from layout",
3316                    })?;
3317                let slot_abs = record_base.checked_add(fo.offset).ok_or_else(|| {
3318                    BufferError::MalformedPayload {
3319                        name: field.name.clone(),
3320                        reason: "tuple field slot offset overflows usize",
3321                    }
3322                })?;
3323                items.push(self.read_field_at(slot_abs, &field.ty)?);
3324            }
3325            return Ok(Value::Tuple(std::sync::Arc::new(items)));
3326        }
3327        let mut map = std::collections::BTreeMap::new();
3328        for field in &schema.fields {
3329            let fo = layout
3330                .fields
3331                .iter()
3332                .find(|fo| fo.name == field.name)
3333                .ok_or_else(|| BufferError::MalformedPayload {
3334                    name: field.name.clone(),
3335                    reason: "schema field missing from layout",
3336                })?;
3337            let slot_abs = record_base.checked_add(fo.offset).ok_or_else(|| {
3338                BufferError::MalformedPayload {
3339                    name: field.name.clone(),
3340                    reason: "field slot offset overflows usize",
3341                }
3342            })?;
3343            let v = self.read_field_at(slot_abs, &field.ty)?;
3344            map.insert(SmolStr::from(field.name.as_str()), v);
3345        }
3346        Ok(Value::branded_dict(map, Some(schema.name.clone())))
3347    }
3348
3349    /// Decode one field of declared type `ty` whose fixed-area slot sits
3350    /// at the absolute offset `slot_abs`. Scalars read inline; pointer-
3351    /// indirect fields (`String` / `List<…>` / `Schema`) read the `u32`
3352    /// slot and recurse.
3353    fn read_field_at(
3354        &self,
3355        slot_abs: usize,
3356        ty: &TypeRepr,
3357    ) -> Result<crate::value::Value, BufferError> {
3358        use crate::value::Value;
3359        let read_inline = |abs: usize, len: usize| -> Result<&[u8], BufferError> {
3360            let end = abs
3361                .checked_add(len)
3362                .ok_or_else(|| BufferError::MalformedPayload {
3363                    name: "<sub-record field>".to_string(),
3364                    reason: "inline field span overflows usize",
3365                })?;
3366            if end > self.bytes.len() {
3367                return Err(BufferError::MalformedPayload {
3368                    name: "<sub-record field>".to_string(),
3369                    reason: "inline field span exceeds buffer end",
3370                });
3371            }
3372            Ok(&self.bytes[abs..end])
3373        };
3374        match ty {
3375            TypeRepr::Int => {
3376                let b = read_inline(slot_abs, 8)?;
3377                Ok(Value::Int(i64::from_le_bytes(b.try_into().unwrap())))
3378            }
3379            TypeRepr::Float => {
3380                let b = read_inline(slot_abs, 8)?;
3381                Ok(Value::Float(ordered_float::OrderedFloat(
3382                    f64::from_le_bytes(b.try_into().unwrap()),
3383                )))
3384            }
3385            TypeRepr::Bool => {
3386                let b = read_inline(slot_abs, 1)?;
3387                Ok(Value::Bool(b[0] != 0))
3388            }
3389            TypeRepr::Unit => Ok(Value::option_none()),
3390            TypeRepr::String => {
3391                let ptr = self.read_slot_pointer(slot_abs)?;
3392                Ok(Value::String(self.read_string_record_at(ptr)?.into()))
3393            }
3394            TypeRepr::List { element } => {
3395                let header = self.read_slot_pointer(slot_abs)?;
3396                let vals = self.read_list_value_at(header, element)?;
3397                Ok(Value::List(std::sync::Arc::new(vals)))
3398            }
3399            TypeRepr::Schema { schema } => {
3400                let sub_layout = SchemaLayout::offsets_for(schema).map_err(|_| {
3401                    BufferError::MalformedPayload {
3402                        name: "<sub-record field>".to_string(),
3403                        reason: "nested schema field is not layoutable",
3404                    }
3405                })?;
3406                let sub_base = self.read_slot_pointer(slot_abs)?;
3407                self.read_record_at(sub_base, &sub_layout, schema)
3408            }
3409            TypeRepr::Option { .. } | TypeRepr::Result { .. } | TypeRepr::Enum { .. } => {
3410                let variant_base = self.read_slot_pointer(slot_abs)?;
3411                self.read_variant_record_at(variant_base, ty)
3412            }
3413            other => Err(BufferError::TypeMismatch {
3414                name: "<sub-record field>".to_string(),
3415                declared: type_label(other),
3416                requested: "scalar/String/List/Schema/Option/Result",
3417            }),
3418        }
3419    }
3420
3421    fn read_variant_record_at(
3422        &self,
3423        record_off: usize,
3424        ty: &TypeRepr,
3425    ) -> Result<crate::value::Value, BufferError> {
3426        use crate::smol_str::SmolStr;
3427        use crate::value::Value;
3428        if record_off
3429            .checked_add(1)
3430            .map(|end| end > self.bytes.len())
3431            .unwrap_or(true)
3432        {
3433            return Err(BufferError::MalformedPayload {
3434                name: "<variant>".to_string(),
3435                reason: "variant tag exceeds buffer end",
3436            });
3437        }
3438        let tag = self.bytes[record_off];
3439        let selected =
3440            variant_selected_payload(ty, tag).map_err(|reason| BufferError::MalformedPayload {
3441                name: "<variant>".to_string(),
3442                reason,
3443            })?;
3444        match ty {
3445            TypeRepr::Option { .. } => {
3446                match selected.payload {
3447                    None => Ok(Value::option_none()),
3448                    Some(payload) => {
3449                        let slot = variant_payload_slot_offset(record_off, &payload.ty)
3450                            .ok_or_else(|| BufferError::MalformedPayload {
3451                                name: "<variant>".to_string(),
3452                                reason: "variant payload slot offset overflows usize",
3453                            })?;
3454                        let value = self.read_field_at(slot, &payload.ty)?;
3455                        Ok(Value::option_some(value))
3456                    }
3457                }
3458            }
3459            TypeRepr::Result { .. } => {
3460                let Some(payload) = selected.payload else {
3461                    return Err(BufferError::MalformedPayload {
3462                        name: "<variant>".to_string(),
3463                        reason: "Result variant has no payload",
3464                    });
3465                };
3466                let slot =
3467                    variant_payload_slot_offset(record_off, &payload.ty).ok_or_else(|| {
3468                        BufferError::MalformedPayload {
3469                            name: "<variant>".to_string(),
3470                            reason: "variant payload slot offset overflows usize",
3471                        }
3472                    })?;
3473                let value = self.read_field_at(slot, &payload.ty)?;
3474                let mut map = std::collections::BTreeMap::new();
3475                map.insert(SmolStr::from(payload.key.unwrap_or("value")), value);
3476                Ok(Value::variant_dict(
3477                    map,
3478                    selected.name,
3479                    "Result".to_string(),
3480                ))
3481            }
3482            TypeRepr::Enum { name, .. } => {
3483                let mut map = std::collections::BTreeMap::new();
3484                if let Some(payload) = selected.payload {
3485                    let slot =
3486                        variant_payload_slot_offset(record_off, &payload.ty).ok_or_else(|| {
3487                            BufferError::MalformedPayload {
3488                                name: "<variant>".to_string(),
3489                                reason: "variant payload slot offset overflows usize",
3490                            }
3491                        })?;
3492                    let payload_value = self.read_field_at(slot, &payload.ty)?;
3493                    let Value::Dict(dict) = payload_value else {
3494                        return Err(BufferError::MalformedPayload {
3495                            name: "<variant>".to_string(),
3496                            reason: "enum payload did not decode to a record",
3497                        });
3498                    };
3499                    map = dict.map.clone();
3500                }
3501                Ok(Value::variant_dict(map, selected.name, name.clone()))
3502            }
3503            other => Err(BufferError::TypeMismatch {
3504                name: "<variant>".to_string(),
3505                declared: type_label(other),
3506                requested: "variant record",
3507            }),
3508        }
3509    }
3510
3511    /// Read a `u32` pointer slot at the absolute offset `slot_abs`,
3512    /// bounds-checked, returning the pointed-at offset.
3513    fn read_slot_pointer(&self, slot_abs: usize) -> Result<usize, BufferError> {
3514        if slot_abs
3515            .checked_add(4)
3516            .map(|end| end > self.bytes.len())
3517            .unwrap_or(true)
3518        {
3519            return Err(BufferError::MalformedPayload {
3520                name: "<sub-record field>".to_string(),
3521                reason: "pointer slot exceeds buffer end",
3522            });
3523        }
3524        let mut b = [0u8; 4];
3525        b.copy_from_slice(&self.bytes[slot_abs..slot_abs + 4]);
3526        Ok(u32::from_le_bytes(b) as usize)
3527    }
3528
3529    /// Read a `[len: u32][utf8]` String record at `record_off`,
3530    /// bounds-checked, validating UTF-8.
3531    fn read_string_record_at(&self, record_off: usize) -> Result<&'a str, BufferError> {
3532        if record_off
3533            .checked_add(4)
3534            .map(|end| end > self.bytes.len())
3535            .unwrap_or(true)
3536        {
3537            return Err(BufferError::MalformedPayload {
3538                name: "<sub-record field>".to_string(),
3539                reason: "string len prefix exceeds buffer end",
3540            });
3541        }
3542        let mut b = [0u8; 4];
3543        b.copy_from_slice(&self.bytes[record_off..record_off + 4]);
3544        let len = u32::from_le_bytes(b) as usize;
3545        let start = record_off + 4;
3546        let end = start
3547            .checked_add(len)
3548            .ok_or_else(|| BufferError::MalformedPayload {
3549                name: "<sub-record field>".to_string(),
3550                reason: "string payload end overflows usize",
3551            })?;
3552        if end > self.bytes.len() {
3553            return Err(BufferError::MalformedPayload {
3554                name: "<sub-record field>".to_string(),
3555                reason: "string payload exceeds buffer end",
3556            });
3557        }
3558        std::str::from_utf8(&self.bytes[start..end]).map_err(|_| BufferError::MalformedPayload {
3559            name: "<sub-record field>".to_string(),
3560            reason: "string payload is not valid utf-8",
3561        })
3562    }
3563
3564    /// Read a `List<List<scalar>>` from `field_name` as a vector of
3565    /// inner `Vec<Value>`s. The outer slot points at a pointer-array
3566    /// header `[len][off_0]…[off_{N-1}]` whose entries name per-element
3567    /// inner records `[len: u32][pad to inner_align][payload]` — exactly
3568    /// what [`write_nested_scalar_list`] / `write_list_list_with` emit.
3569    /// Each inner record is decoded per the innermost scalar type
3570    /// (`Int` / `Float` / `Bool`); inner pointer-array elements
3571    /// (`List<List<String>>` / `List<List<Schema>>`) are rejected by the
3572    /// layout pass before any buffer is produced, so they never reach
3573    /// here and are surfaced as a hard error if they somehow do.
3574    ///
3575    /// The walk is the reader-side mirror of the writer and shares the
3576    /// same single buffer base: every `off_i` and inner `[len]` prefix is
3577    /// resolved against `self.bytes`, so a sub-reader produced by
3578    /// [`Self::read_list_record`] / [`Self::sub_record`] (which re-bases
3579    /// the field index but keeps the same `bytes` slice) decodes a nested
3580    /// list field bit-identically to a top-level one.
3581    pub fn read_list_list(
3582        &self,
3583        field_name: &str,
3584    ) -> Result<Vec<Vec<crate::value::Value>>, BufferError> {
3585        let entry = self.find_entry(field_name)?;
3586        let inner = match &entry.ty {
3587            TypeRepr::List { element } => match element.as_ref() {
3588                TypeRepr::List { element: inner } => inner.as_ref().clone(),
3589                _ => {
3590                    return Err(BufferError::TypeMismatch {
3591                        name: field_name.to_string(),
3592                        declared: type_label(&entry.ty),
3593                        requested: "List<List<…>>",
3594                    });
3595                }
3596            },
3597            other => {
3598                return Err(BufferError::TypeMismatch {
3599                    name: field_name.to_string(),
3600                    declared: type_label(other),
3601                    requested: "List<List<…>>",
3602                });
3603            }
3604        };
3605        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
3606            return Err(BufferError::MalformedPayload {
3607                name: field_name.to_string(),
3608                reason: "expected pointer-indirect kind",
3609            });
3610        }
3611        // Per-inner-record alignment between the `[len]` prefix and the
3612        // payload, matching `write_nested_scalar_list`: Int / Float pad
3613        // the payload to 8, Bool packs at 1 (no pad past the 4-byte len).
3614        let inner_align: usize = match &inner {
3615            TypeRepr::Int | TypeRepr::Float => 8,
3616            TypeRepr::Bool => 1,
3617            other => {
3618                return Err(BufferError::MalformedPayload {
3619                    name: field_name.to_string(),
3620                    reason: match other {
3621                        TypeRepr::String | TypeRepr::Schema { .. } | TypeRepr::List { .. } => {
3622                            "nested list inner element is a pointer-array type (unsupported)"
3623                        }
3624                        _ => "nested list inner element is not an inline-fixed scalar",
3625                    },
3626                });
3627            }
3628        };
3629        // Outer pointer-array header: `[len][off_0]…`. `inner_alignment
3630        // = 0` keeps the payload start at `header + 4` (the off array).
3631        let (count, entries_start) = self.decode_pointer_header(field_name, entry.offset, 0)?;
3632        self.decode_list_list_rows(field_name, &inner, inner_align, count, entries_start)
3633    }
3634
3635    /// Decode a `List<List<scalar>>` value whose **outer pointer-array
3636    /// header** sits at `header_off` (a direct offset into `self.bytes`),
3637    /// rather than being reached through a record's fixed-area slot.
3638    ///
3639    /// This is the reader half of the in-place region-walk return ABI
3640    /// (S1). The machine code returns the arena-absolute offset of the
3641    /// root list header; the host rebases it to a region-relative offset,
3642    /// runs [`crate::verifier::verify_value_at`] over the whole reachable
3643    /// graph, and only then calls this to decode in place. `inner` is the
3644    /// innermost scalar element type (`Int` / `Float` / `Bool`) drained
3645    /// from the return layout. The decode shares
3646    /// [`Self::decode_list_list_rows`] with the field-slot
3647    /// [`Self::read_list_list`] path, so a top-level and an in-place
3648    /// decode of the same bytes are bit-identical.
3649    pub fn read_list_list_at(
3650        &self,
3651        header_off: usize,
3652        inner: &TypeRepr,
3653    ) -> Result<Vec<Vec<crate::value::Value>>, BufferError> {
3654        let inner_align: usize = match inner {
3655            TypeRepr::Int | TypeRepr::Float => 8,
3656            TypeRepr::Bool => 1,
3657            other => {
3658                return Err(BufferError::MalformedPayload {
3659                    name: "<in-place root>".to_string(),
3660                    reason: match other {
3661                        TypeRepr::String | TypeRepr::Schema { .. } | TypeRepr::List { .. } => {
3662                            "nested list inner element is a pointer-array type (unsupported)"
3663                        }
3664                        _ => "nested list inner element is not an inline-fixed scalar",
3665                    },
3666                });
3667            }
3668        };
3669        // The header sits directly at `header_off`: `[len][off_0]…`.
3670        // `entries_start = header_off + 4`; bounds-check the length
3671        // prefix against the buffer end first.
3672        if header_off
3673            .checked_add(4)
3674            .map(|end| end > self.bytes.len())
3675            .unwrap_or(true)
3676        {
3677            return Err(BufferError::MalformedPayload {
3678                name: "<in-place root>".to_string(),
3679                reason: "in-place list header length prefix exceeds buffer end",
3680            });
3681        }
3682        let mut len_buf = [0u8; 4];
3683        len_buf.copy_from_slice(&self.bytes[header_off..header_off + 4]);
3684        let count = u32::from_le_bytes(len_buf) as usize;
3685        let entries_start = header_off + 4;
3686        self.decode_list_list_rows("<in-place root>", inner, inner_align, count, entries_start)
3687    }
3688
3689    /// Shared row-decode for both the field-slot
3690    /// ([`Self::read_list_list`]) and direct-offset
3691    /// ([`Self::read_list_list_at`]) nested-list paths. Walks `count`
3692    /// pointer-array entries from `entries_start`, following each to its
3693    /// inner `[len][pad to inner_align][payload]` record and decoding the
3694    /// payload per the innermost scalar `inner` type. Every offset and
3695    /// length is bounds-checked against `self.bytes` before any read.
3696    fn decode_list_list_rows(
3697        &self,
3698        field_name: &str,
3699        inner: &TypeRepr,
3700        inner_align: usize,
3701        count: usize,
3702        entries_start: usize,
3703    ) -> Result<Vec<Vec<crate::value::Value>>, BufferError> {
3704        use crate::value::Value;
3705        let mut out: Vec<Vec<Value>> = Vec::with_capacity(count);
3706        for i in 0..count {
3707            let cursor =
3708                entries_start
3709                    .checked_add(i * 4)
3710                    .ok_or_else(|| BufferError::MalformedPayload {
3711                        name: field_name.to_string(),
3712                        reason: "nested list entry cursor overflows usize",
3713                    })?;
3714            if cursor + 4 > self.bytes.len() {
3715                return Err(BufferError::MalformedPayload {
3716                    name: field_name.to_string(),
3717                    reason: "nested list entry pointer exceeds buffer end",
3718                });
3719            }
3720            let mut entry_buf = [0u8; 4];
3721            entry_buf.copy_from_slice(&self.bytes[cursor..cursor + 4]);
3722            let rec_start = u32::from_le_bytes(entry_buf) as usize;
3723            if rec_start + 4 > self.bytes.len() {
3724                return Err(BufferError::MalformedPayload {
3725                    name: field_name.to_string(),
3726                    reason: "nested list inner len prefix exceeds buffer end",
3727                });
3728            }
3729            let mut len_buf = [0u8; 4];
3730            len_buf.copy_from_slice(&self.bytes[rec_start..rec_start + 4]);
3731            let inner_count = u32::from_le_bytes(len_buf) as usize;
3732            let payload_start = if inner_align > 1 {
3733                (rec_start + 4).next_multiple_of(inner_align)
3734            } else {
3735                rec_start + 4
3736            };
3737            let mut inner_vec: Vec<Value> = Vec::with_capacity(inner_count);
3738            match inner {
3739                TypeRepr::Int => {
3740                    let end = payload_start
3741                        .checked_add(inner_count.checked_mul(8).ok_or_else(|| {
3742                            BufferError::MalformedPayload {
3743                                name: field_name.to_string(),
3744                                reason: "nested Int payload byte length overflows usize",
3745                            }
3746                        })?)
3747                        .ok_or_else(|| BufferError::MalformedPayload {
3748                            name: field_name.to_string(),
3749                            reason: "nested Int payload end overflows usize",
3750                        })?;
3751                    if end > self.bytes.len() {
3752                        return Err(BufferError::MalformedPayload {
3753                            name: field_name.to_string(),
3754                            reason: "nested Int payload exceeds buffer end",
3755                        });
3756                    }
3757                    let mut c = payload_start;
3758                    for _ in 0..inner_count {
3759                        let mut b = [0u8; 8];
3760                        b.copy_from_slice(&self.bytes[c..c + 8]);
3761                        inner_vec.push(Value::Int(i64::from_le_bytes(b)));
3762                        c += 8;
3763                    }
3764                }
3765                TypeRepr::Float => {
3766                    let end = payload_start
3767                        .checked_add(inner_count.checked_mul(8).ok_or_else(|| {
3768                            BufferError::MalformedPayload {
3769                                name: field_name.to_string(),
3770                                reason: "nested Float payload byte length overflows usize",
3771                            }
3772                        })?)
3773                        .ok_or_else(|| BufferError::MalformedPayload {
3774                            name: field_name.to_string(),
3775                            reason: "nested Float payload end overflows usize",
3776                        })?;
3777                    if end > self.bytes.len() {
3778                        return Err(BufferError::MalformedPayload {
3779                            name: field_name.to_string(),
3780                            reason: "nested Float payload exceeds buffer end",
3781                        });
3782                    }
3783                    let mut c = payload_start;
3784                    for _ in 0..inner_count {
3785                        let mut b = [0u8; 8];
3786                        b.copy_from_slice(&self.bytes[c..c + 8]);
3787                        inner_vec.push(Value::Float(ordered_float::OrderedFloat(
3788                            f64::from_le_bytes(b),
3789                        )));
3790                        c += 8;
3791                    }
3792                }
3793                TypeRepr::Bool => {
3794                    let end = payload_start.checked_add(inner_count).ok_or_else(|| {
3795                        BufferError::MalformedPayload {
3796                            name: field_name.to_string(),
3797                            reason: "nested Bool payload end overflows usize",
3798                        }
3799                    })?;
3800                    if end > self.bytes.len() {
3801                        return Err(BufferError::MalformedPayload {
3802                            name: field_name.to_string(),
3803                            reason: "nested Bool payload exceeds buffer end",
3804                        });
3805                    }
3806                    for k in 0..inner_count {
3807                        inner_vec.push(Value::Bool(self.bytes[payload_start + k] != 0));
3808                    }
3809                }
3810                _ => unreachable!("inner_align guard already rejected non-scalar inner"),
3811            }
3812            out.push(inner_vec);
3813        }
3814        Ok(out)
3815    }
3816
3817    /// Decode the buffer-relative pointer slot for a nested branded
3818    /// dict field. Returns a fresh [`BufferReader`] anchored at the
3819    /// sub-record's fixed-area base, sharing the parent's underlying
3820    /// byte buffer.
3821    ///
3822    /// Phase 3.b: branded dict fields lay their fixed area in the
3823    /// parent's tail area, addressed through a 4-byte pointer slot in
3824    /// the parent's fixed area. The sub-record's own pointer-indirect
3825    /// children (its String / `List<Int>` / nested Dict slots) keep
3826    /// pointing into the same shared buffer — `sub_record` borrows the
3827    /// parent bytes verbatim, so a subsequent `read_string` on the
3828    /// sub-reader resolves through the same tail area without copying.
3829    ///
3830    /// `sub_layout` is the [`OffsetTable`] for the sub-schema (e.g.
3831    /// computed via [`crate::layout::SchemaLayout::offsets_for`]).
3832    /// `sub_fields` is the schema-declared field list — pass
3833    /// `sub_schema.fields.as_slice()` so the returned reader can
3834    /// type-check its own field accesses.
3835    pub fn sub_record(
3836        &self,
3837        field_name: &str,
3838        sub_layout: &'a OffsetTable,
3839        sub_fields: &[Field],
3840    ) -> Result<BufferReader<'a>, BufferError> {
3841        // Locate the parent's pointer slot. We don't constrain the
3842        // declared `TypeRepr` here because the caller already knows
3843        // the sub-schema — re-matching it would duplicate the schema
3844        // walker. Use a wildcard type check via an opt-in helper to
3845        // get the offset + kind back.
3846        let entry = self
3847            .field_index
3848            .iter()
3849            .find(|e| e.name == field_name)
3850            .ok_or_else(|| BufferError::UnknownField {
3851                name: field_name.to_string(),
3852            })?;
3853        if !matches!(entry.ty, TypeRepr::Schema { .. }) {
3854            return Err(BufferError::TypeMismatch {
3855                name: field_name.to_string(),
3856                declared: type_label(&entry.ty),
3857                requested: "Schema",
3858            });
3859        }
3860        if !matches!(entry.kind, FieldKind::PointerIndirect { .. }) {
3861            return Err(BufferError::MalformedPayload {
3862                name: field_name.to_string(),
3863                reason: "expected pointer-indirect kind",
3864            });
3865        }
3866        let ptr_offset = entry.offset;
3867        if ptr_offset
3868            .checked_add(4)
3869            .map(|end| end > self.bytes.len())
3870            .unwrap_or(true)
3871        {
3872            return Err(BufferError::MalformedPayload {
3873                name: field_name.to_string(),
3874                reason: "pointer slot exceeds buffer end",
3875            });
3876        }
3877        let mut ptr_buf = [0u8; 4];
3878        ptr_buf.copy_from_slice(&self.bytes[ptr_offset..ptr_offset + 4]);
3879        let sub_base = u32::from_le_bytes(ptr_buf) as usize;
3880        let sub_end = sub_base.checked_add(sub_layout.root_size).ok_or_else(|| {
3881            BufferError::MalformedPayload {
3882                name: field_name.to_string(),
3883                reason: "sub-record end overflows usize",
3884            }
3885        })?;
3886        if sub_end > self.bytes.len() {
3887            return Err(BufferError::MalformedPayload {
3888                name: field_name.to_string(),
3889                reason: "sub-record exceeds buffer end",
3890            });
3891        }
3892        // The sub-reader walks the **same** underlying buffer slice —
3893        // not a slice starting at `sub_base`. Its `bytes` covers the
3894        // whole parent buffer because the sub-record's own pointer-
3895        // indirect slots store offsets relative to the buffer base.
3896        // We instead re-base each field by adding `sub_base` to the
3897        // declared offset, which `BufferReader::new` doesn't do for
3898        // us — so we build the field-index manually.
3899        let field_index = sub_layout
3900            .fields
3901            .iter()
3902            .filter_map(|fo| {
3903                sub_fields
3904                    .iter()
3905                    .find(|f| f.name == fo.name)
3906                    .map(|f| FieldEntry {
3907                        name: fo.name.clone(),
3908                        ty: f.ty.clone(),
3909                        offset: sub_base + fo.offset,
3910                        size: fo.size,
3911                        kind: fo.kind,
3912                        list_element: fo.list_element,
3913                    })
3914            })
3915            .collect();
3916        Ok(BufferReader {
3917            layout: sub_layout,
3918            field_index,
3919            bytes: self.bytes,
3920        })
3921    }
3922
3923    /// Resolve a fixed-area `u32` pointer slot into `(payload_count,
3924    /// payload_byte_offset)`. `inner_alignment` controls the padding
3925    /// between the length prefix and the payload bytes — `String`
3926    /// passes `0` (no padding); `List<Int>` passes `8`.
3927    fn decode_pointer_header(
3928        &self,
3929        field_name: &str,
3930        ptr_offset: usize,
3931        inner_alignment: usize,
3932    ) -> Result<(usize, usize), BufferError> {
3933        if ptr_offset
3934            .checked_add(4)
3935            .map(|end| end > self.bytes.len())
3936            .unwrap_or(true)
3937        {
3938            return Err(BufferError::MalformedPayload {
3939                name: field_name.to_string(),
3940                reason: "pointer slot exceeds buffer end",
3941            });
3942        }
3943        let mut ptr_buf = [0u8; 4];
3944        ptr_buf.copy_from_slice(&self.bytes[ptr_offset..ptr_offset + 4]);
3945        let record_start = u32::from_le_bytes(ptr_buf) as usize;
3946        if record_start
3947            .checked_add(4)
3948            .map(|end| end > self.bytes.len())
3949            .unwrap_or(true)
3950        {
3951            return Err(BufferError::MalformedPayload {
3952                name: field_name.to_string(),
3953                reason: "length prefix exceeds buffer end",
3954            });
3955        }
3956        let mut len_buf = [0u8; 4];
3957        len_buf.copy_from_slice(&self.bytes[record_start..record_start + 4]);
3958        let count = u32::from_le_bytes(len_buf) as usize;
3959        let payload_start_raw = record_start + 4;
3960        let payload_start = if inner_alignment > 1 {
3961            let rem = payload_start_raw % inner_alignment;
3962            if rem == 0 {
3963                payload_start_raw
3964            } else {
3965                payload_start_raw
3966                    .checked_add(inner_alignment - rem)
3967                    .ok_or_else(|| BufferError::MalformedPayload {
3968                        name: field_name.to_string(),
3969                        reason: "payload start overflows usize",
3970                    })?
3971            }
3972        } else {
3973            payload_start_raw
3974        };
3975        Ok((count, payload_start))
3976    }
3977
3978    fn locate(
3979        &self,
3980        field_name: &str,
3981        expected: &TypeRepr,
3982        requested_label: &'static str,
3983    ) -> Result<(usize, usize, FieldKind), BufferError> {
3984        let entry = self
3985            .field_index
3986            .iter()
3987            .find(|e| e.name == field_name)
3988            .ok_or_else(|| BufferError::UnknownField {
3989                name: field_name.to_string(),
3990            })?;
3991        if !type_matches(&entry.ty, expected) {
3992            return Err(BufferError::TypeMismatch {
3993                name: field_name.to_string(),
3994                declared: type_label(&entry.ty),
3995                requested: requested_label,
3996            });
3997        }
3998        Ok((entry.offset, entry.size, entry.kind))
3999    }
4000
4001    /// Find an entry by name. Used by the list readers when they need
4002    /// the carried `list_element` sidecar alongside the offset.
4003    fn find_entry(&self, field_name: &str) -> Result<&FieldEntry, BufferError> {
4004        self.field_index
4005            .iter()
4006            .find(|e| e.name == field_name)
4007            .ok_or_else(|| BufferError::UnknownField {
4008                name: field_name.to_string(),
4009            })
4010    }
4011
4012    /// Read-only access to the layout this reader walks.
4013    #[allow(dead_code)]
4014    pub(crate) fn layout(&self) -> &OffsetTable {
4015        self.layout
4016    }
4017}
4018
4019/// Compare a schema-declared [`TypeRepr`] against the one a writer /
4020/// reader assumed. Phase 3.b extends the match set to include nested
4021/// branded `Schema { ... }` slots — the sub-record reader compares
4022/// schemas by structural equality of the canonical form.
4023fn type_matches(declared: &TypeRepr, requested: &TypeRepr) -> bool {
4024    match (declared, requested) {
4025        (TypeRepr::Int, TypeRepr::Int)
4026        | (TypeRepr::Float, TypeRepr::Float)
4027        | (TypeRepr::Bool, TypeRepr::Bool)
4028        | (TypeRepr::Unit, TypeRepr::Unit)
4029        | (TypeRepr::String, TypeRepr::String) => true,
4030        (TypeRepr::List { element: d }, TypeRepr::List { element: r }) => {
4031            type_matches(d.as_ref(), r.as_ref())
4032        }
4033        (TypeRepr::Option { inner: d }, TypeRepr::Option { inner: r }) => {
4034            type_matches(d.as_ref(), r.as_ref())
4035        }
4036        (TypeRepr::Result { ok: dok, err: derr }, TypeRepr::Result { ok: rok, err: rerr }) => {
4037            type_matches(dok.as_ref(), rok.as_ref()) && type_matches(derr.as_ref(), rerr.as_ref())
4038        }
4039        (TypeRepr::Schema { schema: d }, TypeRepr::Schema { schema: r }) => d == r,
4040        (
4041            TypeRepr::Enum {
4042                name: dn,
4043                variants: dv,
4044            },
4045            TypeRepr::Enum {
4046                name: rn,
4047                variants: rv,
4048            },
4049        ) => dn == rn && dv == rv,
4050        _ => false,
4051    }
4052}
4053
4054/// Human-readable label for the schema's declared type. Used in the
4055/// `TypeMismatch` error path so users see "Int" rather than `Debug`-
4056/// formatted `TypeRepr::Int`.
4057fn type_label(ty: &TypeRepr) -> &'static str {
4058    match ty {
4059        TypeRepr::Unit => "Unit",
4060        TypeRepr::Bool => "Bool",
4061        TypeRepr::Int => "Int",
4062        TypeRepr::Float => "Float",
4063        TypeRepr::String => "String",
4064        TypeRepr::List { .. } => "List",
4065        TypeRepr::Option { .. } => "Option",
4066        TypeRepr::Result { .. } => "Result",
4067        TypeRepr::Enum { .. } => "Enum",
4068        TypeRepr::Schema { .. } => "Schema",
4069        TypeRepr::Closure { .. } => "Closure",
4070    }
4071}
4072
4073#[cfg(test)]
4074mod tests {
4075    use super::*;
4076    use crate::layout::SchemaLayout;
4077    use crate::schema_canonical::{Field, Schema};
4078    use crate::value::Value;
4079
4080    fn field(name: &str, ty: TypeRepr) -> Field {
4081        Field {
4082            name: name.into(),
4083            ty,
4084            default: None,
4085        }
4086    }
4087
4088    fn result_ok(value: Value) -> Value {
4089        let mut map = std::collections::BTreeMap::new();
4090        map.insert(crate::smol_str::SmolStr::from("value"), value);
4091        Value::variant_dict(map, "Ok".to_string(), "Result".to_string())
4092    }
4093
4094    fn result_err(value: Value) -> Value {
4095        let mut map = std::collections::BTreeMap::new();
4096        map.insert(crate::smol_str::SmolStr::from("error"), value);
4097        Value::variant_dict(map, "Err".to_string(), "Result".to_string())
4098    }
4099
4100    #[test]
4101    fn option_and_result_fields_roundtrip_through_buffer_and_verifier() {
4102        let schema = Schema {
4103            name: "Variants".into(),
4104            generics: vec![],
4105            is_tuple: false,
4106            fields: vec![
4107                field(
4108                    "maybe",
4109                    TypeRepr::Option {
4110                        inner: Box::new(TypeRepr::Int),
4111                    },
4112                ),
4113                field(
4114                    "res",
4115                    TypeRepr::Result {
4116                        ok: Box::new(TypeRepr::Int),
4117                        err: Box::new(TypeRepr::String),
4118                    },
4119                ),
4120                field(
4121                    "ok",
4122                    TypeRepr::Result {
4123                        ok: Box::new(TypeRepr::Int),
4124                        err: Box::new(TypeRepr::String),
4125                    },
4126                ),
4127            ],
4128        };
4129        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4130        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4131        let maybe = Value::option_some(Value::Int(42));
4132        let res = result_err(Value::String("bad".into()));
4133        let ok = result_ok(Value::Int(7));
4134        builder
4135            .write_value("maybe", &schema.fields[0].ty, &maybe)
4136            .expect("write option");
4137        builder
4138            .write_value("res", &schema.fields[1].ty, &res)
4139            .expect("write result err");
4140        builder
4141            .write_value("ok", &schema.fields[2].ty, &ok)
4142            .expect("write result ok");
4143        let bytes = builder.finish();
4144        crate::verifier::verify_record(
4145            &bytes,
4146            &layout,
4147            &schema.fields,
4148            0,
4149            crate::verifier::Region::new(0, bytes.len()).unwrap(),
4150        )
4151        .expect("verify");
4152        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4153        assert_eq!(
4154            reader.read_value("maybe", &schema.fields[0].ty).unwrap(),
4155            maybe
4156        );
4157        assert_eq!(reader.read_value("res", &schema.fields[1].ty).unwrap(), res);
4158    }
4159
4160    #[test]
4161    fn option_string_inside_nested_schema_relocates_when_pasted() {
4162        let inner = Schema {
4163            name: "Inner".into(),
4164            generics: vec![],
4165            is_tuple: false,
4166            fields: vec![field(
4167                "maybe",
4168                TypeRepr::Option {
4169                    inner: Box::new(TypeRepr::String),
4170                },
4171            )],
4172        };
4173        let outer = Schema {
4174            name: "Outer".into(),
4175            generics: vec![],
4176            is_tuple: false,
4177            fields: vec![field(
4178                "inner",
4179                TypeRepr::Schema {
4180                    schema: Box::new(inner.clone()),
4181                },
4182            )],
4183        };
4184        let layout = SchemaLayout::offsets_for(&outer).expect("layout");
4185        let mut map = std::collections::BTreeMap::new();
4186        let payload = Value::option_some(Value::String("hello".into()));
4187        map.insert(crate::smol_str::SmolStr::from("maybe"), payload.clone());
4188        let value = Value::branded_dict(map, Some("Inner".to_string()));
4189        let mut builder = BufferBuilder::new(&layout, &outer.fields);
4190        builder
4191            .write_value("inner", &outer.fields[0].ty, &value)
4192            .expect("write nested schema");
4193        let bytes = builder.finish_arena_absolute(64).expect("arena rebase");
4194        let mut arena = vec![0u8; 64];
4195        arena.extend_from_slice(&bytes);
4196        let reader = BufferReader::new_at_base(&layout, &outer.fields, &arena, 64).expect("reader");
4197        let decoded = reader.read_value("inner", &outer.fields[0].ty).unwrap();
4198        assert_eq!(decoded, value);
4199    }
4200
4201    #[test]
4202    fn list_option_string_relocates_variant_entry_payloads() {
4203        let schema = Schema {
4204            name: "Rows".into(),
4205            generics: vec![],
4206            is_tuple: false,
4207            fields: vec![field(
4208                "xs",
4209                TypeRepr::List {
4210                    element: Box::new(TypeRepr::Option {
4211                        inner: Box::new(TypeRepr::String),
4212                    }),
4213                },
4214            )],
4215        };
4216        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4217        let value = Value::list(vec![
4218            Value::option_some(Value::String("a".into())),
4219            Value::option_none(),
4220            Value::option_some(Value::String("bc".into())),
4221        ]);
4222        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4223        builder
4224            .write_value("xs", &schema.fields[0].ty, &value)
4225            .expect("write list option");
4226        let bytes = builder.finish_arena_absolute(128).expect("arena rebase");
4227        let mut arena = vec![0u8; 128];
4228        arena.extend_from_slice(&bytes);
4229        crate::verifier::verify_record_multi(
4230            &arena,
4231            &layout,
4232            &schema.fields,
4233            128,
4234            crate::verifier::MultiRegion::new(
4235                (0, 128),
4236                (128, arena.len()),
4237                (arena.len(), arena.len()),
4238                (arena.len(), arena.len()),
4239            )
4240            .unwrap(),
4241        )
4242        .expect("verify arena absolute");
4243        let reader =
4244            BufferReader::new_at_base(&layout, &schema.fields, &arena, 128).expect("reader");
4245        assert_eq!(
4246            reader.read_value("xs", &schema.fields[0].ty).unwrap(),
4247            value
4248        );
4249    }
4250
4251    fn int_schema() -> Schema {
4252        Schema {
4253            name: "Pair".into(),
4254            generics: vec![],
4255            is_tuple: false,
4256            fields: vec![field("x", TypeRepr::Int), field("y", TypeRepr::Int)],
4257        }
4258    }
4259
4260    fn mixed_schema() -> Schema {
4261        Schema {
4262            name: "Mix".into(),
4263            generics: vec![],
4264            is_tuple: false,
4265            fields: vec![
4266                field("count", TypeRepr::Int),
4267                field("active", TypeRepr::Bool),
4268            ],
4269        }
4270    }
4271
4272    #[test]
4273    fn write_int_then_read_back_roundtrips() {
4274        let schema = int_schema();
4275        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4276        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4277        builder.write_int("x", 42).expect("write x");
4278        builder.write_int("y", -7).expect("write y");
4279        let bytes = builder.finish();
4280        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4281        assert_eq!(reader.read_int("x").expect("read x"), 42);
4282        assert_eq!(reader.read_int("y").expect("read y"), -7);
4283    }
4284
4285    #[test]
4286    fn mixed_int_bool_roundtrip_respects_padding() {
4287        let schema = mixed_schema();
4288        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4289        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4290        builder.write_int("count", 100).expect("write count");
4291        builder.write_bool("active", true).expect("write active");
4292        let bytes = builder.finish();
4293
4294        // Buffer is exactly root_size wide; padding lives at offsets
4295        // 9..16. The reader doesn't care about padding contents.
4296        assert_eq!(bytes.len(), layout.root_size);
4297
4298        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4299        assert_eq!(reader.read_int("count").expect("read count"), 100);
4300        assert!(reader.read_bool("active").expect("read active"));
4301    }
4302
4303    #[test]
4304    fn unknown_field_is_rejected() {
4305        let schema = int_schema();
4306        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4307        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4308        let err = builder
4309            .write_int("missing", 1)
4310            .expect_err("unknown field must reject");
4311        assert!(matches!(err, BufferError::UnknownField { ref name } if name == "missing"));
4312    }
4313
4314    #[test]
4315    fn type_mismatch_is_rejected() {
4316        // Bool slot accessed via write_int — would corrupt adjacent
4317        // fields if the writer silently accepted it.
4318        let schema = mixed_schema();
4319        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4320        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4321        let err = builder
4322            .write_int("active", 1)
4323            .expect_err("type mismatch must reject");
4324        assert!(matches!(
4325            err,
4326            BufferError::TypeMismatch {
4327                declared: "Bool",
4328                requested: "Int",
4329                ..
4330            }
4331        ));
4332    }
4333
4334    #[test]
4335    fn reader_rejects_short_buffer() {
4336        let schema = mixed_schema();
4337        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4338        let short = vec![0u8; layout.root_size - 1];
4339        let err = BufferReader::new(&layout, &schema.fields, &short)
4340            .expect_err("short buffer must reject");
4341        assert!(matches!(
4342            err,
4343            BufferError::BufferTooSmall { have, need }
4344            if have == layout.root_size - 1 && need == layout.root_size
4345        ));
4346    }
4347
4348    #[test]
4349    fn float_and_unit_roundtrip() {
4350        let schema = Schema {
4351            name: "Phys".into(),
4352            generics: vec![],
4353            is_tuple: false,
4354            fields: vec![field("mass", TypeRepr::Float), field("nil", TypeRepr::Unit)],
4355        };
4356        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4357        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4358        builder.write_float("mass", 1.5_f64).expect("write mass");
4359        builder.write_unit("nil").expect("write nil");
4360        let bytes = builder.finish();
4361        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4362        assert_eq!(reader.read_float("mass").expect("read mass"), 1.5);
4363        reader.read_unit("nil").expect("read nil");
4364    }
4365
4366    #[test]
4367    fn write_string_then_read_back_roundtrips() {
4368        let schema = Schema {
4369            name: "Greet".into(),
4370            generics: vec![],
4371            is_tuple: false,
4372            fields: vec![field("name", TypeRepr::String)],
4373        };
4374        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4375        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4376        builder.write_string("name", "hello").expect("write name");
4377        let bytes = builder.finish();
4378        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4379        assert_eq!(reader.read_string("name").expect("read name"), "hello");
4380    }
4381
4382    /// F-5 wire-format smoke gate: pin the exact tail-record bytes a
4383    /// `write_string` call emits. The buffer-protocol producer (this
4384    /// fn) and every consumer (cranelift `emit_read_string_len`,
4385    /// stdlib `string_*` bodies indexing payload at `s + 4`,
4386    /// `decode_pointer_header` in `read_string`) all hard-code the
4387    /// `[len:u32 LE][payload]` shape. Flipping to the 12-byte
4388    /// `[len_with_ascii_flag:u32 LE][hash:u64 LE][payload]` planned
4389    /// by `docs/internal/archive/review-improvement-169-conststring-wire-full-2026-05-22.md`
4390    /// must update producer + every consumer atomically; this test
4391    /// fires the moment the producer side drifts so the migrant cannot
4392    /// silently land a partial revision.
4393    ///
4394    /// See also `relon-codegen-cranelift::codegen::const_pool::
4395    /// opvisitor_emits_const_string_record_in_declaration_order` for
4396    /// the matching pin on the cranelift const-pool producer.
4397    #[test]
4398    fn write_string_wire_format_smoke_gate() {
4399        let schema = Schema {
4400            name: "Greet".into(),
4401            generics: vec![],
4402            is_tuple: false,
4403            fields: vec![field("name", TypeRepr::String)],
4404        };
4405        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4406        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4407        builder.write_string("name", "hello").expect("write name");
4408        let bytes = builder.finish();
4409
4410        // Fixed-area: a 4-byte pointer slot at offset 0 carrying the
4411        // tail-record absolute offset. The schema has no other fields,
4412        // so root_size = 4, root_align = 4 -> tail starts at offset 4.
4413        assert_eq!(bytes.len(), 4 + 4 + 5, "fixed-area + header + payload");
4414        let ptr = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
4415        assert_eq!(ptr, 4, "pointer slot must reference offset 4");
4416
4417        // Tail record at offset 4: `[len: u32 LE][payload]`. Migration
4418        // to the 12-byte header changes this to
4419        // `[len_with_ascii_flag: u32 LE][hash: u64 LE][payload]`.
4420        assert_eq!(
4421            &bytes[4..8],
4422            &5u32.to_le_bytes(),
4423            "tail record len prefix must be u32 LE of payload length"
4424        );
4425        assert_eq!(&bytes[8..13], b"hello", "payload follows the 4-byte header");
4426    }
4427
4428    #[test]
4429    fn empty_string_roundtrips() {
4430        // Zero-byte payload still gets a length prefix, so the reader
4431        // must walk through `[len=0][]` without spuriously claiming
4432        // out-of-bounds.
4433        let schema = Schema {
4434            name: "Greet".into(),
4435            generics: vec![],
4436            is_tuple: false,
4437            fields: vec![field("name", TypeRepr::String)],
4438        };
4439        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4440        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4441        builder.write_string("name", "").expect("write empty");
4442        let bytes = builder.finish();
4443        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4444        assert_eq!(reader.read_string("name").expect("read name"), "");
4445    }
4446
4447    #[test]
4448    fn write_string_then_int_fixed_area_lays_out_correctly() {
4449        // Pointer slot at 0..4, padding 4..8, Int slot at 8..16. The
4450        // tail-area record sits at offset 16 (or later if padded).
4451        // We verify the layout via direct byte inspection so a
4452        // regression in the slot order surfaces immediately.
4453        let schema = Schema {
4454            name: "User".into(),
4455            generics: vec![],
4456            is_tuple: false,
4457            fields: vec![field("name", TypeRepr::String), field("age", TypeRepr::Int)],
4458        };
4459        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4460        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4461        builder.write_string("name", "ada").expect("write name");
4462        builder.write_int("age", 36).expect("write age");
4463        let bytes = builder.finish();
4464
4465        // First 4 bytes hold the tail-area pointer; bytes 8..16 hold
4466        // the Int slot regardless of how big the tail record is.
4467        let ptr = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
4468        assert!(ptr >= layout.root_size);
4469        let len_prefix = u32::from_le_bytes(bytes[ptr..ptr + 4].try_into().unwrap()) as usize;
4470        assert_eq!(len_prefix, "ada".len());
4471        let age = i64::from_le_bytes(bytes[8..16].try_into().unwrap());
4472        assert_eq!(age, 36);
4473
4474        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4475        assert_eq!(reader.read_string("name").expect("read name"), "ada");
4476        assert_eq!(reader.read_int("age").expect("read age"), 36);
4477    }
4478
4479    #[test]
4480    fn unknown_field_on_write_string_is_rejected() {
4481        let schema = Schema {
4482            name: "Greet".into(),
4483            generics: vec![],
4484            is_tuple: false,
4485            fields: vec![field("name", TypeRepr::String)],
4486        };
4487        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4488        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4489        let err = builder
4490            .write_string("missing", "x")
4491            .expect_err("unknown field must reject");
4492        assert!(matches!(err, BufferError::UnknownField { ref name } if name == "missing"));
4493    }
4494
4495    #[test]
4496    fn write_list_int_then_read_back_roundtrips() {
4497        let schema = Schema {
4498            name: "Nums".into(),
4499            generics: vec![],
4500            is_tuple: false,
4501            fields: vec![field(
4502                "nums",
4503                TypeRepr::List {
4504                    element: Box::new(TypeRepr::Int),
4505                },
4506            )],
4507        };
4508        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4509        let mut builder = BufferBuilder::new(&layout, &schema.fields);
4510        builder
4511            .write_list_int("nums", &[1, -2, 3])
4512            .expect("write nums");
4513        let bytes = builder.finish();
4514        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4515        assert_eq!(
4516            reader.read_list_int("nums").expect("read nums"),
4517            vec![1, -2, 3]
4518        );
4519    }
4520
4521    #[test]
4522    fn buffer_too_small_on_string_schema_rejected() {
4523        let schema = Schema {
4524            name: "Greet".into(),
4525            generics: vec![],
4526            is_tuple: false,
4527            fields: vec![field("name", TypeRepr::String)],
4528        };
4529        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4530        let short = vec![0u8; layout.root_size - 1];
4531        let err = BufferReader::new(&layout, &schema.fields, &short)
4532            .expect_err("short buffer must reject");
4533        assert!(matches!(err, BufferError::BufferTooSmall { .. }));
4534    }
4535
4536    /// Hand-build a buffer matching the host->wasm wire shape for an
4537    /// outer `Usr { Addr addr, String name }` record so the
4538    /// `sub_record` reader test exercises both a nested record and a
4539    /// trailing String alongside it. Keeps the layout details in one
4540    /// place — the asserts focus on the reader returning the right
4541    /// values rather than the exact tail-area byte arithmetic.
4542    fn build_usr_buffer() -> (Schema, Schema, Vec<u8>) {
4543        let addr_schema = Schema {
4544            name: "Addr".into(),
4545            generics: vec![],
4546            is_tuple: false,
4547            fields: vec![field("city", TypeRepr::String), field("zip", TypeRepr::Int)],
4548        };
4549        let usr_schema = Schema {
4550            name: "Usr".into(),
4551            generics: vec![],
4552            is_tuple: false,
4553            fields: vec![
4554                field(
4555                    "addr",
4556                    TypeRepr::Schema {
4557                        schema: Box::new(addr_schema.clone()),
4558                    },
4559                ),
4560                field("name", TypeRepr::String),
4561            ],
4562        };
4563        let usr_layout = SchemaLayout::offsets_for(&usr_schema).expect("usr layout");
4564        let addr_layout = SchemaLayout::offsets_for(&addr_schema).expect("addr layout");
4565
4566        // Fixed area sizes: Usr.root_size = 8 (two 4-byte pointers);
4567        // Addr.root_size = 16 (4-byte ptr + 4 pad + 8 int). Bytes are
4568        // assembled by hand so the layout invariants stay visible at
4569        // the call site of the test.
4570        let usr_root = usr_layout.root_size;
4571        assert_eq!(usr_root, 8);
4572        assert_eq!(addr_layout.root_size, 16);
4573
4574        let mut bytes = vec![0u8; usr_root];
4575
4576        // Sub-record Addr fixed area lives in the tail at offset =
4577        // usr_root, padded up to addr_layout.root_align (=8). 8 is
4578        // already aligned.
4579        let addr_base = bytes.len();
4580        bytes.resize(addr_base + addr_layout.root_size, 0);
4581
4582        // String "BJ" tail record at offset following Addr.
4583        let bj_offset = bytes.len();
4584        bytes.extend_from_slice(&(2u32).to_le_bytes());
4585        bytes.extend_from_slice(b"BJ");
4586        // Patch Addr.city pointer (offset 0 inside Addr).
4587        let bj_ptr = bj_offset as u32;
4588        bytes[addr_base..addr_base + 4].copy_from_slice(&bj_ptr.to_le_bytes());
4589        // Patch Addr.zip = 100000 at offset 8 inside Addr.
4590        bytes[addr_base + 8..addr_base + 16].copy_from_slice(&100000i64.to_le_bytes());
4591
4592        // String "Bob" tail record. Pad up to 4-byte boundary first
4593        // since the previous "BJ" ended at an odd byte position.
4594        while !bytes.len().is_multiple_of(4) {
4595            bytes.push(0);
4596        }
4597        let bob_offset = bytes.len();
4598        bytes.extend_from_slice(&(3u32).to_le_bytes());
4599        bytes.extend_from_slice(b"Bob");
4600
4601        // Patch the Usr fixed area: addr pointer slot at offset 0,
4602        // name pointer slot at offset 4.
4603        let addr_ptr = addr_base as u32;
4604        bytes[0..4].copy_from_slice(&addr_ptr.to_le_bytes());
4605        bytes[4..8].copy_from_slice(&(bob_offset as u32).to_le_bytes());
4606
4607        (usr_schema, addr_schema, bytes)
4608    }
4609
4610    #[test]
4611    fn sub_record_reads_nested_dict_fields() {
4612        let (usr_schema, addr_schema, bytes) = build_usr_buffer();
4613        let usr_layout = SchemaLayout::offsets_for(&usr_schema).expect("usr layout");
4614        let addr_layout = SchemaLayout::offsets_for(&addr_schema).expect("addr layout");
4615        let reader = BufferReader::new(&usr_layout, &usr_schema.fields, &bytes).expect("usr");
4616
4617        let sub = reader
4618            .sub_record("addr", &addr_layout, &addr_schema.fields)
4619            .expect("sub");
4620        assert_eq!(sub.read_string("city").expect("city"), "BJ");
4621        assert_eq!(sub.read_int("zip").expect("zip"), 100000);
4622        // Parent reader still reaches the trailing top-level String.
4623        assert_eq!(reader.read_string("name").expect("name"), "Bob");
4624    }
4625
4626    #[test]
4627    fn sub_record_rejects_non_schema_field() {
4628        // `name` is a String, not a sub-schema — sub_record must
4629        // refuse rather than read random bytes out of the buffer.
4630        let (usr_schema, addr_schema, bytes) = build_usr_buffer();
4631        let usr_layout = SchemaLayout::offsets_for(&usr_schema).expect("usr layout");
4632        let addr_layout = SchemaLayout::offsets_for(&addr_schema).expect("addr layout");
4633        let reader = BufferReader::new(&usr_layout, &usr_schema.fields, &bytes).expect("usr");
4634        let err = reader
4635            .sub_record("name", &addr_layout, &addr_schema.fields)
4636            .expect_err("non-schema slot");
4637        assert!(matches!(
4638            err,
4639            BufferError::TypeMismatch {
4640                requested: "Schema",
4641                ..
4642            }
4643        ));
4644    }
4645
4646    #[test]
4647    fn sub_record_unknown_field_rejected() {
4648        let (usr_schema, addr_schema, bytes) = build_usr_buffer();
4649        let usr_layout = SchemaLayout::offsets_for(&usr_schema).expect("usr layout");
4650        let addr_layout = SchemaLayout::offsets_for(&addr_schema).expect("addr layout");
4651        let reader = BufferReader::new(&usr_layout, &usr_schema.fields, &bytes).expect("usr");
4652        let err = reader
4653            .sub_record("missing", &addr_layout, &addr_schema.fields)
4654            .expect_err("missing");
4655        assert!(matches!(err, BufferError::UnknownField { .. }));
4656    }
4657
4658    #[test]
4659    fn nested_schema_layout_picks_inner_alignment() {
4660        // Schema { String s, Int i } pulls root_align up to 8, so a
4661        // parent slot referencing it lays out as a pointer-indirect
4662        // field with `tail_alignment: 8`.
4663        let inner = Schema {
4664            name: "Inner".into(),
4665            generics: vec![],
4666            is_tuple: false,
4667            fields: vec![field("s", TypeRepr::String), field("i", TypeRepr::Int)],
4668        };
4669        let outer = Schema {
4670            name: "Outer".into(),
4671            generics: vec![],
4672            is_tuple: false,
4673            fields: vec![field(
4674                "child",
4675                TypeRepr::Schema {
4676                    schema: Box::new(inner),
4677                },
4678            )],
4679        };
4680        let table = SchemaLayout::offsets_for(&outer).expect("layout");
4681        let kind = table.fields[0].kind;
4682        assert!(matches!(
4683            kind,
4684            FieldKind::PointerIndirect { tail_alignment: 8 }
4685        ));
4686    }
4687
4688    #[test]
4689    fn write_sub_record_simple_schema_arg_roundtrips() {
4690        // `Wrap { User u }` where User { Int age } — writer fills the
4691        // parent's pointer slot via `sub_record`, reader walks back via
4692        // `BufferReader::sub_record` and reads the inner `age`.
4693        let user_schema = Schema {
4694            name: "User".into(),
4695            generics: vec![],
4696            is_tuple: false,
4697            fields: vec![field("age", TypeRepr::Int)],
4698        };
4699        let wrap_schema = Schema {
4700            name: "Wrap".into(),
4701            generics: vec![],
4702            is_tuple: false,
4703            fields: vec![field(
4704                "u",
4705                TypeRepr::Schema {
4706                    schema: Box::new(user_schema.clone()),
4707                },
4708            )],
4709        };
4710        let wrap_layout = SchemaLayout::offsets_for(&wrap_schema).expect("wrap layout");
4711        let user_layout = SchemaLayout::offsets_for(&user_schema).expect("user layout");
4712
4713        let mut wrap_builder = BufferBuilder::new(&wrap_layout, &wrap_schema.fields);
4714        let mut user_builder = wrap_builder
4715            .sub_record("u", &user_layout, &user_schema.fields)
4716            .expect("sub_record");
4717        user_builder.write_int("age", 42).expect("write age");
4718        wrap_builder
4719            .finish_sub_record("u", user_builder)
4720            .expect("finish_sub_record");
4721        let bytes = wrap_builder.finish();
4722
4723        let reader = BufferReader::new(&wrap_layout, &wrap_schema.fields, &bytes).expect("reader");
4724        let sub = reader
4725            .sub_record("u", &user_layout, &user_schema.fields)
4726            .expect("sub");
4727        assert_eq!(sub.read_int("age").expect("read age"), 42);
4728    }
4729
4730    #[test]
4731    fn write_sub_record_nested_with_string_field() {
4732        // `Usr { Addr addr, String name }` — exercises both the
4733        // sub_record writer and the surrounding parent-area String slot
4734        // sharing the same tail area.
4735        let addr_schema = Schema {
4736            name: "Addr".into(),
4737            generics: vec![],
4738            is_tuple: false,
4739            fields: vec![field("city", TypeRepr::String), field("zip", TypeRepr::Int)],
4740        };
4741        let usr_schema = Schema {
4742            name: "Usr".into(),
4743            generics: vec![],
4744            is_tuple: false,
4745            fields: vec![
4746                field(
4747                    "addr",
4748                    TypeRepr::Schema {
4749                        schema: Box::new(addr_schema.clone()),
4750                    },
4751                ),
4752                field("name", TypeRepr::String),
4753            ],
4754        };
4755        let usr_layout = SchemaLayout::offsets_for(&usr_schema).expect("usr layout");
4756        let addr_layout = SchemaLayout::offsets_for(&addr_schema).expect("addr layout");
4757
4758        let mut usr_builder = BufferBuilder::new(&usr_layout, &usr_schema.fields);
4759        let mut addr_builder = usr_builder
4760            .sub_record("addr", &addr_layout, &addr_schema.fields)
4761            .expect("sub_record");
4762        addr_builder.write_string("city", "BJ").expect("write city");
4763        addr_builder.write_int("zip", 100000).expect("write zip");
4764        usr_builder
4765            .finish_sub_record("addr", addr_builder)
4766            .expect("finish_sub_record");
4767        usr_builder.write_string("name", "Bob").expect("write name");
4768        let bytes = usr_builder.finish();
4769
4770        let reader = BufferReader::new(&usr_layout, &usr_schema.fields, &bytes).expect("reader");
4771        let sub = reader
4772            .sub_record("addr", &addr_layout, &addr_schema.fields)
4773            .expect("sub");
4774        assert_eq!(sub.read_string("city").expect("city"), "BJ");
4775        assert_eq!(sub.read_int("zip").expect("zip"), 100000);
4776        assert_eq!(reader.read_string("name").expect("name"), "Bob");
4777    }
4778
4779    #[test]
4780    fn write_sub_record_inner_list_int_roundtrips() {
4781        // Mixed Schema arg: parent has the sub-record + a top-level
4782        // List<Int>; the sub-record itself has a String. Confirms the
4783        // tail-area cursor advances correctly across multiple
4784        // heterogenous appends.
4785        let inner_schema = Schema {
4786            name: "Inner".into(),
4787            generics: vec![],
4788            is_tuple: false,
4789            fields: vec![field("tag", TypeRepr::String)],
4790        };
4791        let outer_schema = Schema {
4792            name: "Outer".into(),
4793            generics: vec![],
4794            is_tuple: false,
4795            fields: vec![
4796                field(
4797                    "child",
4798                    TypeRepr::Schema {
4799                        schema: Box::new(inner_schema.clone()),
4800                    },
4801                ),
4802                field(
4803                    "nums",
4804                    TypeRepr::List {
4805                        element: Box::new(TypeRepr::Int),
4806                    },
4807                ),
4808            ],
4809        };
4810        let outer_layout = SchemaLayout::offsets_for(&outer_schema).expect("outer layout");
4811        let inner_layout = SchemaLayout::offsets_for(&inner_schema).expect("inner layout");
4812
4813        let mut outer = BufferBuilder::new(&outer_layout, &outer_schema.fields);
4814        let mut inner = outer
4815            .sub_record("child", &inner_layout, &inner_schema.fields)
4816            .expect("sub_record");
4817        inner.write_string("tag", "hello").expect("write tag");
4818        outer
4819            .finish_sub_record("child", inner)
4820            .expect("finish_sub_record");
4821        outer
4822            .write_list_int("nums", &[10, 20, 30])
4823            .expect("write nums");
4824        let bytes = outer.finish();
4825
4826        let reader =
4827            BufferReader::new(&outer_layout, &outer_schema.fields, &bytes).expect("reader");
4828        let sub = reader
4829            .sub_record("child", &inner_layout, &inner_schema.fields)
4830            .expect("sub");
4831        assert_eq!(sub.read_string("tag").expect("tag"), "hello");
4832        assert_eq!(
4833            reader.read_list_int("nums").expect("nums"),
4834            vec![10, 20, 30]
4835        );
4836    }
4837
4838    #[test]
4839    fn write_sub_record_unknown_field_rejected() {
4840        let inner_schema = Schema {
4841            name: "Inner".into(),
4842            generics: vec![],
4843            is_tuple: false,
4844            fields: vec![field("x", TypeRepr::Int)],
4845        };
4846        let outer_schema = Schema {
4847            name: "Outer".into(),
4848            generics: vec![],
4849            is_tuple: false,
4850            fields: vec![field(
4851                "child",
4852                TypeRepr::Schema {
4853                    schema: Box::new(inner_schema.clone()),
4854                },
4855            )],
4856        };
4857        let outer_layout = SchemaLayout::offsets_for(&outer_schema).expect("outer layout");
4858        let inner_layout = SchemaLayout::offsets_for(&inner_schema).expect("inner layout");
4859        let mut outer = BufferBuilder::new(&outer_layout, &outer_schema.fields);
4860        let err = outer
4861            .sub_record("missing", &inner_layout, &inner_schema.fields)
4862            .expect_err("missing field must reject");
4863        assert!(matches!(err, BufferError::UnknownField { ref name } if name == "missing"));
4864    }
4865
4866    #[test]
4867    fn write_sub_record_on_non_schema_slot_rejected() {
4868        // `name` is a String — sub_record must refuse rather than
4869        // patch a 4-byte pointer into a slot the layout reserved for
4870        // a String tail-record offset.
4871        let inner_schema = Schema {
4872            name: "Inner".into(),
4873            generics: vec![],
4874            is_tuple: false,
4875            fields: vec![field("x", TypeRepr::Int)],
4876        };
4877        let mixed = Schema {
4878            name: "Mixed".into(),
4879            generics: vec![],
4880            is_tuple: false,
4881            fields: vec![field("name", TypeRepr::String)],
4882        };
4883        let mixed_layout = SchemaLayout::offsets_for(&mixed).expect("mixed layout");
4884        let inner_layout = SchemaLayout::offsets_for(&inner_schema).expect("inner layout");
4885        let mut builder = BufferBuilder::new(&mixed_layout, &mixed.fields);
4886        let err = builder
4887            .sub_record("name", &inner_layout, &inner_schema.fields)
4888            .expect_err("non-schema slot must reject");
4889        assert!(matches!(
4890            err,
4891            BufferError::TypeMismatch {
4892                requested: "Schema",
4893                ..
4894            }
4895        ));
4896    }
4897
4898    #[test]
4899    fn type_mismatch_on_read_is_rejected() {
4900        // Symmetric to the writer test — guards against a host doing
4901        // `read_bool` on an Int slot and getting back random bytes.
4902        let schema = mixed_schema();
4903        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4904        let bytes = vec![0u8; layout.root_size];
4905        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4906        let err = reader
4907            .read_bool("count")
4908            .expect_err("type mismatch on read must reject");
4909        assert!(matches!(
4910            err,
4911            BufferError::TypeMismatch {
4912                declared: "Int",
4913                requested: "Bool",
4914                ..
4915            }
4916        ));
4917    }
4918
4919    // =================================================================
4920    // Phase 10-c: List<Float / Bool / String / Schema> roundtrips.
4921    // =================================================================
4922
4923    fn list_schema(name: &str, elem: TypeRepr) -> Schema {
4924        Schema {
4925            name: "Wrap".into(),
4926            generics: vec![],
4927            is_tuple: false,
4928            fields: vec![field(
4929                name,
4930                TypeRepr::List {
4931                    element: Box::new(elem),
4932                },
4933            )],
4934        }
4935    }
4936
4937    #[test]
4938    fn write_list_float_then_read_back_roundtrips() {
4939        let schema = list_schema("xs", TypeRepr::Float);
4940        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4941        let mut b = BufferBuilder::new(&layout, &schema.fields);
4942        b.write_list_float("xs", &[1.5, -2.25, 3.125])
4943            .expect("write");
4944        let bytes = b.finish();
4945        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4946        assert_eq!(
4947            r.read_list_float("xs").expect("read"),
4948            vec![1.5, -2.25, 3.125]
4949        );
4950    }
4951
4952    #[test]
4953    fn write_list_bool_then_read_back_roundtrips() {
4954        let schema = list_schema("xs", TypeRepr::Bool);
4955        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4956        let mut b = BufferBuilder::new(&layout, &schema.fields);
4957        b.write_list_bool("xs", &[true, false, true, true])
4958            .expect("write");
4959        let bytes = b.finish();
4960        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4961        assert_eq!(
4962            r.read_list_bool("xs").expect("read"),
4963            vec![true, false, true, true]
4964        );
4965    }
4966
4967    #[test]
4968    fn empty_list_bool_roundtrips() {
4969        // Zero-element payload still needs a valid `[len=0]` prefix
4970        // the reader can walk through without OOB checks tripping.
4971        let schema = list_schema("xs", TypeRepr::Bool);
4972        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4973        let mut b = BufferBuilder::new(&layout, &schema.fields);
4974        b.write_list_bool("xs", &[]).expect("write");
4975        let bytes = b.finish();
4976        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4977        assert!(r.read_list_bool("xs").expect("read").is_empty());
4978    }
4979
4980    #[test]
4981    fn write_list_string_then_read_back_roundtrips() {
4982        let schema = list_schema("xs", TypeRepr::String);
4983        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4984        let mut b = BufferBuilder::new(&layout, &schema.fields);
4985        b.write_list_string("xs", &["alpha", "beta", "", "gamma"])
4986            .expect("write");
4987        let bytes = b.finish();
4988        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
4989        assert_eq!(
4990            r.read_list_string("xs").expect("read"),
4991            vec!["alpha", "beta", "", "gamma"]
4992        );
4993    }
4994
4995    #[test]
4996    fn empty_list_string_roundtrips() {
4997        let schema = list_schema("xs", TypeRepr::String);
4998        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
4999        let mut b = BufferBuilder::new(&layout, &schema.fields);
5000        b.write_list_string::<&str>("xs", &[]).expect("write");
5001        let bytes = b.finish();
5002        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
5003        assert!(r.read_list_string("xs").expect("read").is_empty());
5004    }
5005
5006    #[test]
5007    fn mixed_record_and_list_string_roundtrip() {
5008        // Phase 10-c: a top-level Int + a List<String> share the tail
5009        // area. Verifies the cursor advances correctly across heterogeneous
5010        // tail appends.
5011        let schema = Schema {
5012            name: "Mixed".into(),
5013            generics: vec![],
5014            is_tuple: false,
5015            fields: vec![
5016                field("n", TypeRepr::Int),
5017                field(
5018                    "xs",
5019                    TypeRepr::List {
5020                        element: Box::new(TypeRepr::String),
5021                    },
5022                ),
5023                field("name", TypeRepr::String),
5024            ],
5025        };
5026        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
5027        let mut b = BufferBuilder::new(&layout, &schema.fields);
5028        b.write_int("n", 7).expect("write n");
5029        b.write_list_string("xs", &["ada", "bob"])
5030            .expect("write xs");
5031        b.write_string("name", "ada").expect("write name");
5032        let bytes = b.finish();
5033        let r = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
5034        assert_eq!(r.read_int("n").expect("n"), 7);
5035        assert_eq!(r.read_list_string("xs").expect("xs"), vec!["ada", "bob"]);
5036        assert_eq!(r.read_string("name").expect("name"), "ada");
5037    }
5038
5039    #[test]
5040    fn write_list_record_with_nested_string_roundtrip() {
5041        // `List<User>` where User { String name, Int age }. Verifies
5042        // both the per-entry pointer array and the inner string
5043        // payload's relocation through the parent buffer.
5044        let user_schema = Schema {
5045            name: "User".into(),
5046            generics: vec![],
5047            is_tuple: false,
5048            fields: vec![field("name", TypeRepr::String), field("age", TypeRepr::Int)],
5049        };
5050        let outer = Schema {
5051            name: "Group".into(),
5052            generics: vec![],
5053            is_tuple: false,
5054            fields: vec![field(
5055                "users",
5056                TypeRepr::List {
5057                    element: Box::new(TypeRepr::Schema {
5058                        schema: Box::new(user_schema.clone()),
5059                    }),
5060                },
5061            )],
5062        };
5063        let outer_layout = SchemaLayout::offsets_for(&outer).expect("outer layout");
5064        let user_layout = SchemaLayout::offsets_for(&user_schema).expect("user layout");
5065
5066        let mut b = BufferBuilder::new(&outer_layout, &outer.fields);
5067        let users_data: Vec<(&str, i64)> = vec![("ada", 36), ("bob", 41), ("zoe", 19)];
5068        let mut writer = b
5069            .list_record_writer("users", &user_layout, &user_schema)
5070            .expect("list_record_writer");
5071        for (name, age) in &users_data {
5072            let mut child = writer.start_entry();
5073            child.write_string("name", name).expect("write name");
5074            child.write_int("age", *age).expect("write age");
5075            writer.finish_entry(&mut b, child).expect("finish entry");
5076        }
5077        b.finish_list_record(writer).expect("finish list");
5078        let bytes = b.finish();
5079
5080        let r = BufferReader::new(&outer_layout, &outer.fields, &bytes).expect("reader");
5081        let entries = r
5082            .read_list_record("users", &user_layout, &user_schema)
5083            .expect("read list");
5084        assert_eq!(entries.len(), 3);
5085        for (sub, (name, age)) in entries.iter().zip(users_data.iter()) {
5086            assert_eq!(sub.read_string("name").expect("name"), *name);
5087            assert_eq!(sub.read_int("age").expect("age"), *age);
5088        }
5089    }
5090
5091    #[test]
5092    fn list_string_type_mismatch_rejected() {
5093        let schema = list_schema("xs", TypeRepr::Int);
5094        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
5095        let mut b = BufferBuilder::new(&layout, &schema.fields);
5096        let err = b
5097            .write_list_string("xs", &["nope"])
5098            .expect_err("must reject");
5099        assert!(matches!(
5100            err,
5101            BufferError::TypeMismatch {
5102                requested: "List<String>",
5103                ..
5104            }
5105        ));
5106    }
5107
5108    /// Nested `List<List<Int>>`: the header's `[len][off_i]` pointer
5109    /// array names per-element inner `[len][pad][i64...]` records.
5110    /// Decodes the buffer by hand (no reader API for nested lists yet)
5111    /// to pin the exact byte layout the compiled backends consume.
5112    #[test]
5113    fn write_nested_scalar_list_int_layout() {
5114        use crate::value::Value;
5115        let schema = Schema {
5116            name: "Grid".into(),
5117            generics: vec![],
5118            is_tuple: false,
5119            fields: vec![field(
5120                "xss",
5121                TypeRepr::List {
5122                    element: Box::new(TypeRepr::List {
5123                        element: Box::new(TypeRepr::Int),
5124                    }),
5125                },
5126            )],
5127        };
5128        let layout = SchemaLayout::offsets_for(&schema).expect("layout");
5129        let mut b = BufferBuilder::new(&layout, &schema.fields);
5130        let items = vec![
5131            Value::List(vec![Value::Int(1), Value::Int(2)].into()),
5132            Value::List(vec![Value::Int(3)].into()),
5133            Value::List(vec![].into()),
5134        ];
5135        write_nested_scalar_list(&mut b, "xss", &TypeRepr::Int, &items).expect("write");
5136        let bytes = b.finish();
5137
5138        // Field slot at offset 0 → header offset.
5139        let read_u32 =
5140            |at: usize| u32::from_le_bytes(bytes[at..at + 4].try_into().unwrap()) as usize;
5141        let header = read_u32(0);
5142        assert_eq!(read_u32(header), 3, "outer len");
5143        // Each inner record: [len:u32][pad to 8][i64 x len].
5144        let decode_inner = |rec: usize| -> Vec<i64> {
5145            let n = read_u32(rec);
5146            let payload = (rec + 4).next_multiple_of(8);
5147            (0..n)
5148                .map(|i| {
5149                    let at = payload + i * 8;
5150                    i64::from_le_bytes(bytes[at..at + 8].try_into().unwrap())
5151                })
5152                .collect()
5153        };
5154        let off0 = read_u32(header + 4);
5155        let off1 = read_u32(header + 8);
5156        let off2 = read_u32(header + 12);
5157        assert_eq!(decode_inner(off0), vec![1, 2]);
5158        assert_eq!(decode_inner(off1), vec![3]);
5159        assert_eq!(decode_inner(off2), Vec::<i64>::new());
5160    }
5161
5162    /// `read_list_list` is the reader-side mirror of
5163    /// `write_nested_scalar_list`: round-tripping any nested scalar list
5164    /// through the writer and back yields the exact input rows. The
5165    /// written `Value`s are the oracle, so this pins the host walk
5166    /// bit-for-bit independent of any compiled backend.
5167    #[test]
5168    fn read_list_list_roundtrips_nested_scalars() {
5169        use crate::value::Value;
5170        let cases: &[(TypeRepr, Vec<Value>)] = &[
5171            (
5172                TypeRepr::Int,
5173                vec![
5174                    Value::List(vec![Value::Int(1), Value::Int(2)].into()),
5175                    Value::List(vec![Value::Int(-7)].into()),
5176                    Value::List(vec![].into()),
5177                    Value::List(vec![Value::Int(i64::MAX), Value::Int(i64::MIN)].into()),
5178                ],
5179            ),
5180            (
5181                TypeRepr::Float,
5182                vec![
5183                    Value::List(
5184                        vec![
5185                            Value::Float(ordered_float::OrderedFloat(1.5)),
5186                            Value::Float(ordered_float::OrderedFloat(-0.25)),
5187                        ]
5188                        .into(),
5189                    ),
5190                    Value::List(vec![].into()),
5191                ],
5192            ),
5193            (
5194                TypeRepr::Bool,
5195                vec![
5196                    Value::List(
5197                        vec![Value::Bool(true), Value::Bool(false), Value::Bool(true)].into(),
5198                    ),
5199                    Value::List(vec![Value::Bool(false)].into()),
5200                ],
5201            ),
5202        ];
5203        for (inner, items) in cases {
5204            let schema = Schema {
5205                name: "Grid".into(),
5206                generics: vec![],
5207                is_tuple: false,
5208                fields: vec![field(
5209                    "xss",
5210                    TypeRepr::List {
5211                        element: Box::new(TypeRepr::List {
5212                            element: Box::new(inner.clone()),
5213                        }),
5214                    },
5215                )],
5216            };
5217            let layout = SchemaLayout::offsets_for(&schema).expect("layout");
5218            let mut b = BufferBuilder::new(&layout, &schema.fields);
5219            write_nested_scalar_list(&mut b, "xss", inner, items).expect("write");
5220            let bytes = b.finish();
5221            let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
5222            let rows = reader.read_list_list("xss").expect("read_list_list");
5223            let got: Vec<Value> = rows.into_iter().map(|r| Value::List(r.into())).collect();
5224            assert_eq!(&got, items, "nested {inner:?} list roundtrip mismatch");
5225        }
5226    }
5227
5228    /// F5: `write_nested_pointer_array_list` marshals a `List<List<String>>`
5229    /// and `read_list_value` decodes it back bit-identically, including
5230    /// empty inner / outer lists and an empty / multibyte string. The
5231    /// doubly-nested pointer array round-trips through one buffer.
5232    #[test]
5233    fn nested_list_inner_string_roundtrips() {
5234        use crate::value::Value;
5235        let inner_str = TypeRepr::List {
5236            element: Box::new(TypeRepr::String),
5237        };
5238        let schema = Schema {
5239            name: "Grid".into(),
5240            generics: vec![],
5241            is_tuple: false,
5242            fields: vec![field(
5243                "xss",
5244                TypeRepr::List {
5245                    element: Box::new(inner_str.clone()),
5246                },
5247            )],
5248        };
5249        let layout = SchemaLayout::offsets_for(&schema).expect("List<List<String>> layout");
5250        let multibyte: String = [0x4E2Du32, 0x6587]
5251            .iter()
5252            .map(|c| char::from_u32(*c).unwrap())
5253            .collect();
5254        let rows: Vec<Value> = vec![
5255            Value::List(std::sync::Arc::new(vec![
5256                Value::String("a".into()),
5257                Value::String("".into()),
5258                Value::String(multibyte.as_str().into()),
5259            ])),
5260            Value::List(std::sync::Arc::new(vec![])),
5261            Value::List(std::sync::Arc::new(vec![Value::String("zz".into())])),
5262        ];
5263        let mut b = BufferBuilder::new(&layout, &schema.fields);
5264        // The marshaller's `element` is the *innermost* element (String),
5265        // mirroring the `marshal_list_list_in` dispatch contract.
5266        write_nested_pointer_array_list(&mut b, "xss", &TypeRepr::String, &rows)
5267            .expect("write nested");
5268        let bytes = b.finish();
5269        // Read it back through the field slot (the reader's `element` is
5270        // the *outer* list element, `List<String>`), then via the in-place
5271        // root.
5272        let reader = BufferReader::new(&layout, &schema.fields, &bytes).expect("reader");
5273        let got = reader
5274            .read_list_value("xss", &inner_str)
5275            .expect("read field");
5276        assert_eq!(
5277            Value::List(std::sync::Arc::new(got)),
5278            Value::List(std::sync::Arc::new(rows.clone()))
5279        );
5280        // And via the direct header offset (the in-place return path).
5281        let fo = &layout.fields[0];
5282        let mut slot = [0u8; 4];
5283        slot.copy_from_slice(&bytes[fo.offset..fo.offset + 4]);
5284        let header = u32::from_le_bytes(slot) as usize;
5285        let got2 = reader
5286            .read_list_value_at(header, &inner_str)
5287            .expect("read at header");
5288        assert_eq!(
5289            Value::List(std::sync::Arc::new(got2)),
5290            Value::List(std::sync::Arc::new(rows))
5291        );
5292    }
5293}