Skip to main content

phon_engine/
typed.rs

1//! The typed path: lower a [`Descriptor`] (which carries its schema) into a flat
2//! [`MemProgram`], then run it to encode or decode a value living in this
3//! process's memory.
4//!
5//! This is the memory counterpart to the dynamic [`Value`]
6//! path. The split is phon's schema+descriptor pairing: the **schema** (resolved
7//! through the registry) decides the wire bytes and their order; the
8//! **descriptor** says where each field lives in memory. Because the wire is
9//! schema-driven, a typed value produces byte-identical output to the dynamic
10//! codec for the same logical value — that equivalence is the oracle the tests
11//! check (in the `phon` front door, over real facet-derived descriptors), and
12//! it's what lets a typed peer and a dynamic peer interoperate.
13//!
14//! Lowering walks the descriptor once and folds every field offset to be
15//! relative to the value's base pointer. A nested fixed struct therefore
16//! dissolves into a single straight run of scalar copies (`r[ir.inlining]`,
17//! `r[ir.memory]`) — no per-decode descriptor walk, no branches. Owned
18//! sequences, options, enums (allocation and run-time branching) come next.
19//!
20//! First cut: fixed-width scalars and in-place records (struct/tuple). Anything
21//! else lowers to [`CompactError::Unsupported`].
22//!
23//! Spec: "The descriptor model", "Compact mode", `r[ir.memory]`.
24
25use std::alloc;
26use std::collections::{BTreeMap, HashMap};
27
28use phon_ir::ir::{
29    BorrowOp, BytesOp, DefaultOp, EnumOp, EnumVariantOp, Lowered, MapOp, MemOp, MemProgram,
30    OpaqueOp, OptionOp, PointerOp, ResultOp, SeqOp, SetOp, SkipOp, fuse,
31};
32use phon_ir::{
33    Access, Construct, Descriptor, EnumAccess, MapStorage, Presence, RecordAccess, ResultAccess,
34    SequenceAccess, SequenceStorage, SetAccess, SetStorage, Tag, VariantAccess,
35};
36use phon_schema::bytes::{Reader, write_u8, write_u32};
37use phon_schema::{
38    DecodeError, Field, Primitive, SchemaId, SchemaKind, SchemaRef, Value, Variant, VariantPayload,
39    read_value, write_value,
40};
41
42use crate::compact::{self, CompactError, Registry, Resolved, alignment, pad_to, skip_pad};
43use crate::compat::{self, FieldMatch, VariantMatch, incompatible};
44
45type Result<T> = core::result::Result<T, CompactError>;
46
47/// The wire (and, for our targets, in-memory) size of a fixed-width scalar, or
48/// `None` for the variable-length and uninhabited primitives, which need
49/// allocation or are never values and so are not plain copies.
50fn fixed_size(p: Primitive) -> Option<usize> {
51    Some(match p {
52        Primitive::Unit => 0,
53        Primitive::Bool | Primitive::U8 | Primitive::I8 => 1,
54        Primitive::U16 | Primitive::I16 => 2,
55        Primitive::U32 | Primitive::I32 | Primitive::F32 | Primitive::Char => 4,
56        Primitive::U64 | Primitive::I64 | Primitive::F64 => 8,
57        Primitive::U128 | Primitive::I128 => 16,
58        Primitive::String
59        | Primitive::Bytes
60        | Primitive::Never
61        | Primitive::DateTime
62        | Primitive::Uuid
63        | Primitive::QName => return None,
64    })
65}
66
67// ============================================================================
68// Lowering
69// ============================================================================
70
71/// Lower a descriptor into a flat [`MemProgram`]: a list of base-relative memory
72/// copies, in wire order. Build it once, run it many times.
73///
74/// # Errors
75/// [`CompactError`] if a referenced schema is missing, the descriptor and schema
76/// disagree, or a kind this first cut does not handle is reached.
77/// The minimum wire bytes one owned-sequence element occupies, for the
78/// length-vs-remaining guard (`r[validate.lengths]`). `0` when the element is
79/// zero-sized (an empty / all-ZST struct encodes to nothing, so the count is
80/// unbounded by the buffer and a fixed cap applies in `read_len` / the JIT
81/// stencil); `1` otherwise. An empty program is vacuously zero-sized.
82fn elem_min_wire(element: &MemProgram) -> usize {
83    let zero_sized = element
84        .iter()
85        .all(|op| matches!(op, MemOp::Scalar { size: 0, .. }));
86    usize::from(!zero_sized)
87}
88
89// r[impl ir.memory]
90// r[impl descriptors.fact-driven]
91pub fn lower(descriptor: &Descriptor, reg: &Registry) -> Result<MemProgram> {
92    let mut out = Vec::new();
93    lower_node(descriptor, reg, 0, &mut out)?;
94    // Coalesce contiguous scalar runs into single copies (e.g. a flat struct
95    // whose wire and memory layouts match becomes one memcpy).
96    Ok(fuse(out))
97}
98
99/// Lower a descriptor that may be recursive: the root program plus a block program per
100/// recursive (cyclic) schema, each lowered once from `descriptor_blocks` (which
101/// `phon::derive` collected). A `CallBlock` op resolves into [`Lowered::blocks`] at run
102/// time. For a non-recursive type `descriptor_blocks` is empty and the result is the
103/// familiar flat program with no blocks (`r[ir.recursion]`).
104// r[impl descriptors.separate-implementations]
105pub fn lower_typed(
106    descriptor: &Descriptor,
107    descriptor_blocks: &HashMap<SchemaId, Descriptor>,
108    reg: &Registry,
109) -> Result<Lowered> {
110    let mut root = Vec::new();
111    lower_node(descriptor, reg, 0, &mut root)?;
112    let mut blocks = BTreeMap::new();
113    for (id, body) in descriptor_blocks {
114        // A block's ops are relative to the recursive value's own start (base 0); a
115        // `CallBlock` supplies the actual pointer at run time.
116        let mut ops = Vec::new();
117        lower_node(body, reg, 0, &mut ops)?;
118        blocks.insert(*id, fuse(ops));
119    }
120    Ok(Lowered {
121        program: fuse(root),
122        blocks,
123    })
124}
125
126// r[impl ir.inlining]
127fn lower_node(d: &Descriptor, reg: &Registry, base: usize, out: &mut MemProgram) -> Result<()> {
128    // A back-edge to a recursive schema: emit a call into that schema's block, run at
129    // `base + offset` (the recursive value's location). The block itself is lowered once
130    // by `lower_typed` from `Derived::descriptor_blocks`. (`r[ir.recursion]`)
131    if matches!(d.access, Access::Recurse) {
132        let schema = match &d.schema {
133            SchemaRef::Concrete { id, .. } => *id,
134            SchemaRef::Var { .. } => {
135                return Err(CompactError::Unsupported(
136                    "typed: recursion via type-var ref",
137                ));
138            }
139        };
140        out.push(MemOp::CallBlock {
141            schema,
142            offset: base,
143        });
144        return Ok(());
145    }
146    match (&d.access, compact::resolve(reg, &d.schema)?) {
147        (Access::Scalar, Resolved::Primitive(p)) => {
148            let size = fixed_size(p).ok_or(CompactError::Unsupported(
149                "typed: variable-length scalar field",
150            ))?;
151            if d.layout.size == size {
152                out.push(MemOp::Scalar {
153                    offset: base,
154                    size,
155                    align: alignment(p),
156                });
157            } else if matches!(p, Primitive::U64 | Primitive::I64)
158                && matches!(d.layout.size, 1 | 2 | 4 | 8)
159            {
160                out.push(MemOp::NativeInt {
161                    offset: base,
162                    mem_size: d.layout.size,
163                    signed: matches!(p, Primitive::I64),
164                });
165            } else {
166                return Err(CompactError::Unsupported(
167                    "typed: scalar memory width differs from wire width",
168                ));
169            }
170            Ok(())
171        }
172        (Access::Record(ra), Resolved::Composite(kind)) => {
173            let arity = match &kind {
174                SchemaKind::Struct { fields, .. } => fields.len(),
175                SchemaKind::Tuple { elements } => elements.len(),
176                _ => {
177                    return Err(CompactError::TypeMismatch {
178                        expected: "struct or tuple for a record descriptor",
179                    });
180                }
181            };
182            if arity != ra.fields.len() {
183                return Err(CompactError::Malformed(
184                    "descriptor/schema field count mismatch",
185                ));
186            }
187            match &ra.construct {
188                Construct::InPlace => {}
189                Construct::Thunk(_) => {
190                    return Err(CompactError::Unsupported("typed: thunk construction"));
191                }
192            }
193            // Splice each field in wire order, folding its memory offset into the
194            // base. A field's own descriptor carries its schema and layout.
195            for fa in &ra.fields {
196                lower_node(&fa.descriptor, reg, base + fa.offset, out)?;
197            }
198            Ok(())
199        }
200        // r[impl ir.memory]
201        (Access::Sequence(seq), Resolved::Composite(SchemaKind::List { .. })) => {
202            let SequenceStorage::Vtable(thunks) = &seq.storage else {
203                return Err(CompactError::Unsupported(
204                    "typed: only vtable-backed owned sequences so far",
205                ));
206            };
207            // Lower the element once; it runs at each element slot (base 0).
208            let stride = seq.element.layout.size;
209            let elem_align = seq.element.layout.align;
210            let mut element = Vec::new();
211            lower_node(&seq.element, reg, 0, &mut element)?;
212            let element = fuse(element);
213            // Bulk-copy lowering: an element that is a single scalar covering its
214            // whole size, with no inter-element wire padding, decodes/encodes as
215            // ONE block copy — `Vec<u32>`, `Vec<f64>`, `Vec<u8>`, flat `repr(C)`
216            // structs. Anything with structure stays a per-element sequence.
217            let bulk = matches!(
218                element.as_slice(),
219                [MemOp::Scalar { offset: 0, size, align }]
220                    if *size == stride && stride.is_multiple_of(*align)
221            );
222            if bulk {
223                out.push(MemOp::Bytes(Box::new(BytesOp {
224                    field_offset: base,
225                    stride,
226                    elem_align,
227                    validate: validate_any,
228                    thunks: *thunks,
229                })));
230            } else {
231                let min_wire = elem_min_wire(&element);
232                out.push(MemOp::Sequence(Box::new(SeqOp {
233                    field_offset: base,
234                    element,
235                    stride,
236                    elem_align,
237                    min_wire,
238                    thunks: *thunks,
239                })));
240            }
241            Ok(())
242        }
243        (Access::Set(set), Resolved::Composite(SchemaKind::Set { .. })) => {
244            lower_set(set, reg, base, out)
245        }
246        // r[impl ir.memory] — String/Bytes: a bulk contiguous byte run.
247        (
248            Access::Sequence(seq),
249            Resolved::Primitive(p @ (Primitive::String | Primitive::Bytes)),
250        ) => {
251            match &seq.storage {
252                // A BORROWED leaf (`&str`/`&[u8]`): same wire as the owned run, but
253                // decode writes a fat pointer into the input (no alloc, no copy).
254                SequenceStorage::BorrowedVtable(thunks) => {
255                    out.push(MemOp::Borrow(Box::new(BorrowOp {
256                        field_offset: base,
257                        stride: 1,
258                        elem_align: 1,
259                        thunks: *thunks,
260                    })));
261                    Ok(())
262                }
263                SequenceStorage::Vtable(thunks) => {
264                    out.push(MemOp::Bytes(Box::new(BytesOp {
265                        field_offset: base,
266                        stride: 1,
267                        elem_align: 1,
268                        validate: if matches!(p, Primitive::String) {
269                            validate_utf8
270                        } else {
271                            validate_any
272                        },
273                        thunks: *thunks,
274                    })));
275                    Ok(())
276                }
277                _ => Err(CompactError::Unsupported(
278                    "typed: string/bytes needs vtable thunks",
279                )),
280            }
281        }
282        // r[impl ir.memory] — Option<T>: a presence byte then the inner value.
283        (Access::Option(opt), Resolved::Composite(SchemaKind::Option { .. })) => {
284            let Presence::Vtable(thunks) = &opt.presence else {
285                return Err(CompactError::Unsupported(
286                    "typed: option needs vtable presence thunks",
287                ));
288            };
289            // The some-payload sub-program runs at the inner value (base 0).
290            let mut some = Vec::new();
291            lower_node(&opt.some, reg, 0, &mut some)?;
292            out.push(MemOp::Option(Box::new(OptionOp {
293                field_offset: base,
294                some: fuse(some),
295                inner_size: opt.some.layout.size,
296                inner_align: opt.some.layout.align,
297                thunks: *thunks,
298            })));
299            Ok(())
300        }
301        // r[impl ir.memory] — #[repr(int)] enum: a u32 wire index then the payload.
302        (Access::Enum(ea), Resolved::Composite(SchemaKind::Enum { .. })) => {
303            let Tag::Direct { offset, width } = &ea.tag else {
304                return Err(CompactError::Unsupported(
305                    "typed: only #[repr(int)] enums (direct discriminant) so far",
306                ));
307            };
308            let mut variants = Vec::with_capacity(ea.variants.len());
309            for va in &ea.variants {
310                // The variant's fields live at base-relative offsets that already
311                // account for the discriminant (per facet); lower them as a record.
312                let mut payload = Vec::new();
313                for f in &va.payload.fields {
314                    lower_node(&f.descriptor, reg, base + f.offset, &mut payload)?;
315                }
316                variants.push(EnumVariantOp {
317                    wire_index: va.index,
318                    selector: va.selector,
319                    payload: fuse(payload),
320                });
321            }
322            out.push(MemOp::Enum(Box::new(EnumOp {
323                tag_offset: base + *offset,
324                tag_width: *width,
325                variants,
326                writer_only: Vec::new(),
327            })));
328            Ok(())
329        }
330        // r[impl ir.memory] — Map<K, V>: a u32 entry count then key+value pairs.
331        (Access::Map(ma), Resolved::Composite(SchemaKind::Map { .. })) => {
332            let MapStorage::Vtable(thunks) = &ma.storage else {
333                return Err(CompactError::Unsupported("typed: map needs vtable thunks"));
334            };
335            // The key and value sub-programs each run at their own value (base 0).
336            let mut key = Vec::new();
337            lower_node(&ma.key, reg, 0, &mut key)?;
338            let mut value = Vec::new();
339            lower_node(&ma.value, reg, 0, &mut value)?;
340            out.push(MemOp::Map(Box::new(MapOp {
341                field_offset: base,
342                key: fuse(key),
343                value: fuse(value),
344                key_size: ma.key.layout.size,
345                key_align: ma.key.layout.align,
346                value_size: ma.value.layout.size,
347                value_align: ma.value.layout.align,
348                thunks: *thunks,
349            })));
350            Ok(())
351        }
352        // r[impl ir.memory] — a self-describing dynamic `Value` field: encoded /
353        // decoded by the self-describing codec, self-delimiting on the wire.
354        (Access::Dynamic, Resolved::Composite(SchemaKind::Dynamic)) => {
355            out.push(MemOp::Dynamic { field_offset: base });
356            Ok(())
357        }
358        // r[impl ir.memory] — Result<T, E>: a u32 wire index then the active arm's
359        // payload (wire-identical to a two-variant enum). The schema gives the Ok/Err
360        // wire indices; the thunks drive the repr(Rust) layout.
361        (Access::Result(ra), Resolved::Composite(SchemaKind::Enum { variants, .. })) => {
362            out.push(MemOp::Result(Box::new(lower_result(
363                ra, &variants, reg, base,
364            )?)));
365            Ok(())
366        }
367        // r[impl descriptors.thunk-binding]
368        (Access::Pointer(pa), _) => {
369            let mut pointee = Vec::new();
370            lower_node(&pa.pointee, reg, 0, &mut pointee)?;
371            out.push(MemOp::Pointer(Box::new(PointerOp {
372                field_offset: base,
373                pointee: fuse(pointee),
374                pointee_size: pa.pointee.layout.size,
375                pointee_align: pa.pointee.layout.align,
376                thunks: pa.thunks,
377            })));
378            Ok(())
379        }
380        // r[impl ir.memory] — opaque field: a length-prefixed blob (wire-identical
381        // to a `Primitive::Bytes` run); the engine frames it and the thunks fill /
382        // consume the inner span.
383        (Access::Opaque(thunks), Resolved::Primitive(Primitive::Bytes)) => {
384            out.push(MemOp::Opaque(Box::new(OpaqueOp {
385                field_offset: base,
386                thunks: *thunks,
387            })));
388            Ok(())
389        }
390        _ => Err(CompactError::Unsupported(
391            "typed: only fixed scalars, in-place records, owned sequences, strings, options, #[repr(int)] enums, and opaque fields so far",
392        )),
393    }
394}
395
396// ============================================================================
397// Decode-compat lowering (writer schema ⋈ reader descriptor)
398// ============================================================================
399
400/// Lower a *writer* schema translated against a *reader* [`Descriptor`] into a
401/// flat [`MemProgram`] of reader-memory ops, in WIRE order. This is the typed
402/// (memory-side) analog of `plan::build_plan` + `plan::lower`: it bakes the
403/// writer↔reader compatibility decision in once, at lowering, so decode stays as
404/// fast as the single-schema path — there is no fast/slow path, only one program.
405///
406/// The compat rules mirror `plan.rs` exactly (the cross-engine oracle):
407/// struct fields match by name (writer-only skipped, reader-only defaulted or, if
408/// required, incompatible), enum variants match by name (writer-only → a decode
409/// error), and types match without implicit widening (`r[compat.*]`).
410///
411/// When `writer_root` resolves to the same schema the reader carries, the result
412/// is equivalent to [`lower`] (no skips/defaults) — the identity case.
413///
414/// # Errors
415/// [`CompactError::Incompatible`] (or a resolution error) if the writer and reader
416/// cannot be translated, or [`CompactError::Unsupported`] for a kind not yet
417/// carried by the typed path.
418// r[impl compat.plan-first]
419pub fn lower_decode(
420    writer_root: SchemaId,
421    reader: &Descriptor,
422    reader_blocks: &HashMap<SchemaId, Descriptor>,
423    reg: &Registry,
424) -> Result<Lowered> {
425    let mut out = Vec::new();
426    lower_decode_node(&SchemaRef::concrete(writer_root), reader, reg, 0, &mut out)?;
427    // A recursive reader lowers each of its cyclic schemas to a callable block, just
428    // as `lower_typed` does — a `Recurse` reader node became a `CallBlock` into one of
429    // these. For the same-schema path the writer's schema at every `Recurse`
430    // position is that same schema, so a block translates
431    // `concrete(R) ⋈ reader_blocks[R]` — the identity case. Compatibility across
432    // differing recursive schemas is the tracked follow-up; here the block's writer
433    // ref is the reader schema id.
434    let mut blocks = BTreeMap::new();
435    for (id, body) in reader_blocks {
436        let mut ops = Vec::new();
437        lower_decode_node(&SchemaRef::concrete(*id), body, reg, 0, &mut ops)?;
438        blocks.insert(*id, fuse(ops));
439    }
440    Ok(Lowered {
441        program: fuse(out),
442        blocks,
443    })
444}
445
446/// Append the reader-memory ops for one (writer schema ⋈ reader descriptor) node,
447/// folding the reader field offset into `base`.
448// r[impl compat.type-match]
449fn lower_decode_node(
450    writer: &SchemaRef,
451    reader: &Descriptor,
452    reg: &Registry,
453    base: usize,
454    out: &mut MemProgram,
455) -> Result<()> {
456    // A recursive reader back-edge: emit a call into that schema's block, run at
457    // `base + offset`. `lower_decode` lowers the block itself from `reader_blocks`.
458    // (`r[ir.recursion]`)
459    if matches!(reader.access, Access::Recurse) {
460        let schema = match &reader.schema {
461            SchemaRef::Concrete { id, .. } => *id,
462            SchemaRef::Var { .. } => {
463                return Err(CompactError::Unsupported(
464                    "typed: recursion via type-var ref (decode)",
465                ));
466            }
467        };
468        out.push(MemOp::CallBlock {
469            schema,
470            offset: base,
471        });
472        return Ok(());
473    }
474    let w = compact::resolve(reg, writer)?;
475    match (&reader.access, w) {
476        // Scalar ⋈ scalar: identical primitives copy through; differing ones are
477        // incompatible — NO implicit numeric widening (`r[compat.type-match]`).
478        (Access::Scalar, Resolved::Primitive(wp)) => {
479            let Resolved::Primitive(rp) = compact::resolve(reg, &reader.schema)? else {
480                return Err(CompactError::TypeMismatch {
481                    expected: "scalar reader schema for a scalar descriptor",
482                });
483            };
484            if wp != rp {
485                return Err(incompatible(format!("primitive {wp:?} is not {rp:?}")));
486            }
487            let size = fixed_size(wp).ok_or(CompactError::Unsupported(
488                "typed: variable-length scalar field",
489            ))?;
490            out.push(MemOp::Scalar {
491                offset: base,
492                size,
493                align: alignment(wp),
494            });
495            Ok(())
496        }
497        // Struct ⋈ struct: match fields by name, in WIRE order.
498        (Access::Record(ra), Resolved::Composite(SchemaKind::Struct { fields: wf, .. })) => {
499            lower_decode_record(&wf, ra, &reader.schema, RecordKind::Struct, reg, base, out)
500        }
501        // Tuple ⋈ tuple: positional record fields, carried as synthetic index names
502        // through the same field matcher.
503        (Access::Record(ra), Resolved::Composite(SchemaKind::Tuple { elements })) => {
504            let wf = tuple_fields(elements);
505            lower_decode_record(&wf, ra, &reader.schema, RecordKind::Tuple, reg, base, out)
506        }
507        // Enum ⋈ enum: match variants by name.
508        (Access::Enum(ea), Resolved::Composite(SchemaKind::Enum { variants: wv, .. })) => {
509            lower_decode_enum(&wv, ea, &reader.schema, reg, base, out)
510        }
511        // Option ⋈ Option: structural shapes match; translate the inner.
512        (Access::Option(opt), Resolved::Composite(SchemaKind::Option { element: we })) => {
513            require_reader_option(&reader.schema, reg)?;
514            let Presence::Vtable(thunks) = &opt.presence else {
515                return Err(CompactError::Unsupported(
516                    "typed: option needs vtable presence thunks",
517                ));
518            };
519            let mut some = Vec::new();
520            lower_decode_node(&we, &opt.some, reg, 0, &mut some)?;
521            out.push(MemOp::Option(Box::new(OptionOp {
522                field_offset: base,
523                some: fuse(some),
524                inner_size: opt.some.layout.size,
525                inner_align: opt.some.layout.align,
526                thunks: *thunks,
527            })));
528            Ok(())
529        }
530        // List ⋈ List: translate the element.
531        (Access::Sequence(seq), Resolved::Composite(SchemaKind::List { element: we })) => {
532            require_reader_list(&reader.schema, reg)?;
533            lower_decode_sequence(&we, seq, reg, base, out)
534        }
535        // Set ⋈ Set: translate the element.
536        (Access::Set(set), Resolved::Composite(SchemaKind::Set { element: we })) => {
537            require_reader_set(&reader.schema, reg)?;
538            lower_decode_set(&we, set, reg, base, out)
539        }
540        // String/Bytes ⋈ String/Bytes: a bulk byte run (no element translation).
541        (
542            Access::Sequence(seq),
543            Resolved::Primitive(p @ (Primitive::String | Primitive::Bytes)),
544        ) => {
545            let Resolved::Primitive(rp) = compact::resolve(reg, &reader.schema)? else {
546                return Err(CompactError::TypeMismatch {
547                    expected: "string/bytes reader schema",
548                });
549            };
550            if p != rp {
551                return Err(incompatible(format!("primitive {p:?} is not {rp:?}")));
552            }
553            match &seq.storage {
554                // A BORROWED leaf (`&str`/`&[u8]`): same wire, zero-copy decode.
555                SequenceStorage::BorrowedVtable(thunks) => {
556                    out.push(MemOp::Borrow(Box::new(BorrowOp {
557                        field_offset: base,
558                        stride: 1,
559                        elem_align: 1,
560                        thunks: *thunks,
561                    })));
562                    Ok(())
563                }
564                SequenceStorage::Vtable(thunks) => {
565                    out.push(MemOp::Bytes(Box::new(BytesOp {
566                        field_offset: base,
567                        stride: 1,
568                        elem_align: 1,
569                        validate: if matches!(p, Primitive::String) {
570                            validate_utf8
571                        } else {
572                            validate_any
573                        },
574                        thunks: *thunks,
575                    })));
576                    Ok(())
577                }
578                _ => Err(CompactError::Unsupported(
579                    "typed: string/bytes needs vtable thunks",
580                )),
581            }
582        }
583        // Map ⋈ Map: translate key and value.
584        (Access::Map(ma), Resolved::Composite(SchemaKind::Map { key: wk, value: wv })) => {
585            require_reader_map(&reader.schema, reg)?;
586            let MapStorage::Vtable(thunks) = &ma.storage else {
587                return Err(CompactError::Unsupported("typed: map needs vtable thunks"));
588            };
589            let mut key = Vec::new();
590            lower_decode_node(&wk, &ma.key, reg, 0, &mut key)?;
591            let mut value = Vec::new();
592            lower_decode_node(&wv, &ma.value, reg, 0, &mut value)?;
593            out.push(MemOp::Map(Box::new(MapOp {
594                field_offset: base,
595                key: fuse(key),
596                value: fuse(value),
597                key_size: ma.key.layout.size,
598                key_align: ma.key.layout.align,
599                value_size: ma.value.layout.size,
600                value_align: ma.value.layout.align,
601                thunks: *thunks,
602            })));
603            Ok(())
604        }
605        // Dynamic ⋈ Dynamic: both sides are self-describing; the value carries its
606        // own structure, so there is nothing to translate — passthrough.
607        (Access::Dynamic, Resolved::Composite(SchemaKind::Dynamic)) => {
608            require_reader_dynamic(&reader.schema, reg)?;
609            out.push(MemOp::Dynamic { field_offset: base });
610            Ok(())
611        }
612        // Result ⋈ enum: the writer's Result wire is a two-variant enum; match Ok/Err
613        // by name and translate each arm's payload (writer Ok ⋈ reader Ok, etc.).
614        (Access::Result(ra), Resolved::Composite(SchemaKind::Enum { variants: wv, .. })) => {
615            out.push(MemOp::Result(Box::new(lower_decode_result(
616                &wv, ra, reg, base,
617            )?)));
618            Ok(())
619        }
620        // r[impl descriptors.thunk-binding]
621        (Access::Pointer(pa), _) => {
622            let mut pointee = Vec::new();
623            lower_decode_node(writer, &pa.pointee, reg, 0, &mut pointee)?;
624            out.push(MemOp::Pointer(Box::new(PointerOp {
625                field_offset: base,
626                pointee: fuse(pointee),
627                pointee_size: pa.pointee.layout.size,
628                pointee_align: pa.pointee.layout.align,
629                thunks: pa.thunks,
630            })));
631            Ok(())
632        }
633        // Opaque ⋈ Bytes: the writer wire is a `Primitive::Bytes` run; the reader
634        // carries an opaque adapter. The inner bytes are never translated here — the
635        // adapter owns the inner type — so this is the single-schema op verbatim.
636        (Access::Opaque(thunks), Resolved::Primitive(Primitive::Bytes)) => {
637            require_reader_bytes(&reader.schema, reg)?;
638            out.push(MemOp::Opaque(Box::new(OpaqueOp {
639                field_offset: base,
640                thunks: *thunks,
641            })));
642            Ok(())
643        }
644        _ => Err(incompatible("writer and reader schema kinds differ")),
645    }
646}
647
648fn lower_decode_sequence(
649    writer_element: &SchemaRef,
650    seq: &SequenceAccess,
651    reg: &Registry,
652    base: usize,
653    out: &mut MemProgram,
654) -> Result<()> {
655    let SequenceStorage::Vtable(thunks) = &seq.storage else {
656        return Err(CompactError::Unsupported(
657            "typed: only vtable-backed owned sequences so far",
658        ));
659    };
660    let stride = seq.element.layout.size;
661    let elem_align = seq.element.layout.align;
662    let mut element = Vec::new();
663    lower_decode_node(writer_element, &seq.element, reg, 0, &mut element)?;
664    let element = fuse(element);
665    let bulk = matches!(
666        element.as_slice(),
667        [MemOp::Scalar { offset: 0, size, align }]
668            if *size == stride && stride.is_multiple_of(*align)
669    );
670    if bulk {
671        out.push(MemOp::Bytes(Box::new(BytesOp {
672            field_offset: base,
673            stride,
674            elem_align,
675            validate: validate_any,
676            thunks: *thunks,
677        })));
678    } else {
679        let min_wire = elem_min_wire(&element);
680        out.push(MemOp::Sequence(Box::new(SeqOp {
681            field_offset: base,
682            element,
683            stride,
684            elem_align,
685            min_wire,
686            thunks: *thunks,
687        })));
688    }
689    Ok(())
690}
691
692fn lower_set(set: &SetAccess, reg: &Registry, base: usize, out: &mut MemProgram) -> Result<()> {
693    let SetStorage::Vtable(thunks) = &set.storage;
694    let mut element = Vec::new();
695    lower_node(&set.element, reg, 0, &mut element)?;
696    let element = fuse(element);
697    let min_wire = elem_min_wire(&element);
698    out.push(MemOp::Set(Box::new(SetOp {
699        field_offset: base,
700        element,
701        elem_size: set.element.layout.size,
702        elem_align: set.element.layout.align,
703        min_wire,
704        thunks: *thunks,
705    })));
706    Ok(())
707}
708
709fn lower_decode_set(
710    writer_element: &SchemaRef,
711    set: &SetAccess,
712    reg: &Registry,
713    base: usize,
714    out: &mut MemProgram,
715) -> Result<()> {
716    let SetStorage::Vtable(thunks) = &set.storage;
717    let mut element = Vec::new();
718    lower_decode_node(writer_element, &set.element, reg, 0, &mut element)?;
719    let element = fuse(element);
720    let min_wire = elem_min_wire(&element);
721    out.push(MemOp::Set(Box::new(SetOp {
722        field_offset: base,
723        element,
724        elem_size: set.element.layout.size,
725        elem_align: set.element.layout.align,
726        min_wire,
727        thunks: *thunks,
728    })));
729    Ok(())
730}
731
732enum RecordKind {
733    Struct,
734    Tuple,
735}
736
737/// Translate a writer struct's wire fields against the reader's record descriptor.
738/// Reader field NAMES come from the reader schema (resolved here), aligned by index
739/// with the descriptor's fields (the bridge builds them in the same order).
740// r[impl compat.field-matching]
741fn lower_decode_record(
742    w_fields: &[Field],
743    ra: &RecordAccess,
744    reader_schema: &SchemaRef,
745    record_kind: RecordKind,
746    reg: &Registry,
747    base: usize,
748    out: &mut MemProgram,
749) -> Result<()> {
750    match &ra.construct {
751        Construct::InPlace => {}
752        Construct::Thunk(_) => {
753            return Err(CompactError::Unsupported("typed: thunk construction"));
754        }
755    }
756    // The reader field names, in the same order as `ra.fields`.
757    let r_named = reader_record_fields(reader_schema, record_kind, reg)?;
758    if r_named.len() != ra.fields.len() {
759        return Err(CompactError::Malformed(
760            "descriptor/schema field count mismatch",
761        ));
762    }
763
764    for step in compat::match_fields(
765        w_fields,
766        &r_named,
767        |ri, _| ra.fields[ri].default.is_some(),
768        |rf| {
769            incompatible(format!(
770                "required reader field '{}' is absent from the writer",
771                rf.name
772            ))
773        },
774    )? {
775        match step {
776            FieldMatch::Take {
777                writer,
778                reader_index: ri,
779            } => {
780                let fa = &ra.fields[ri];
781                lower_decode_node(&writer.schema, &fa.descriptor, reg, base + fa.offset, out)?;
782            }
783            FieldMatch::Skip { writer } => {
784                out.push(MemOp::SkipWire(Box::new(skip_op(&writer.schema, reg)?)));
785            }
786            FieldMatch::Default { reader_index: ri } => {
787                let fa = &ra.fields[ri];
788                let Some(d) = fa.default else {
789                    return Err(incompatible(format!(
790                        "required reader field '{}' is absent from the writer",
791                        r_named[ri].name
792                    )));
793                };
794                out.push(MemOp::Default(Box::new(DefaultOp {
795                    offset: base + fa.offset,
796                    ctx: d.ctx,
797                    default: d.thunk,
798                })));
799            }
800        }
801    }
802    Ok(())
803}
804
805/// Translate a writer enum's variants against the reader's enum descriptor, keyed
806/// by WRITER variant index → reader variant matched by NAME. Reader variant names
807/// come from the reader schema (resolved here), aligned by index with `ea.variants`.
808// r[impl compat.enum]
809fn lower_decode_enum(
810    w_variants: &[Variant],
811    ea: &EnumAccess,
812    reader_schema: &SchemaRef,
813    reg: &Registry,
814    base: usize,
815    out: &mut MemProgram,
816) -> Result<()> {
817    let Tag::Direct { offset, width } = &ea.tag else {
818        return Err(CompactError::Unsupported(
819            "typed: only #[repr(int)] enums (direct discriminant) so far",
820        ));
821    };
822    let r_named = reader_enum_variants(reader_schema, reg)?;
823    if r_named.len() != ea.variants.len() {
824        return Err(CompactError::Malformed(
825            "descriptor/schema variant count mismatch",
826        ));
827    }
828    let mut variants = Vec::new();
829    let mut writer_only = Vec::new();
830    for step in compat::match_variants(w_variants, &r_named) {
831        match step {
832            VariantMatch::Take {
833                writer,
834                reader_index: ri,
835            } => {
836                let va = &ea.variants[ri];
837                let payload =
838                    lower_decode_payload(&writer.payload, va, &r_named[ri].payload, reg, base)?;
839                variants.push(EnumVariantOp {
840                    wire_index: writer.index,
841                    selector: va.selector,
842                    payload,
843                });
844            }
845            VariantMatch::WriterOnly { writer } => {
846                writer_only.push(writer.index);
847            }
848        }
849    }
850    out.push(MemOp::Enum(Box::new(EnumOp {
851        tag_offset: base + *offset,
852        tag_width: *width,
853        variants,
854        writer_only,
855    })));
856    Ok(())
857}
858
859/// Translate one matched enum variant's payload (writer payload ⋈ reader payload).
860/// The reader payload fields live at base-relative offsets carried by the variant
861/// access; their names come from the reader schema payload.
862fn lower_decode_payload(
863    w: &VariantPayload,
864    va: &VariantAccess,
865    r_schema_payload: &VariantPayload,
866    reg: &Registry,
867    base: usize,
868) -> Result<MemProgram> {
869    let mut payload = Vec::new();
870    match (w, r_schema_payload) {
871        (VariantPayload::Unit, VariantPayload::Unit) => {}
872        (VariantPayload::Newtype(wr), VariantPayload::Newtype(_)) => {
873            // A single payload field at the variant's first field offset.
874            let fa = va.payload.fields.first().ok_or(CompactError::Malformed(
875                "newtype variant has no payload field",
876            ))?;
877            lower_decode_node(wr, &fa.descriptor, reg, base + fa.offset, &mut payload)?;
878        }
879        (VariantPayload::Tuple(wrs), VariantPayload::Tuple(rrs)) => {
880            if wrs.len() != rrs.len() || wrs.len() != va.payload.fields.len() {
881                return Err(incompatible("variant tuple arity differs"));
882            }
883            // Tuple fields are positional (no names): translate element-wise.
884            for (wr, fa) in wrs.iter().zip(&va.payload.fields) {
885                lower_decode_node(wr, &fa.descriptor, reg, base + fa.offset, &mut payload)?;
886            }
887        }
888        (VariantPayload::Struct(wfs), VariantPayload::Struct(rfs)) => {
889            // A struct-shaped payload matches by field name, like a top-level
890            // struct, but at the variant's base-relative offsets. Build a synthetic
891            // reader-schema ref is unnecessary: translate against the variant's own
892            // record access and the reader schema payload field list.
893            lower_decode_variant_struct(wfs, &va.payload, rfs, reg, base, &mut payload)?;
894        }
895        _ => return Err(incompatible("variant payload shapes differ")),
896    }
897    Ok(fuse(payload))
898}
899
900/// Translate a writer struct-variant payload against the reader's variant record
901/// access (matching by name, defaulting reader-only fields), at base-relative
902/// offsets. Mirrors [`lower_decode_struct`] but the reader names come straight from
903/// the reader schema payload field list (aligned with the variant's fields).
904fn lower_decode_variant_struct(
905    w_fields: &[Field],
906    ra: &RecordAccess,
907    r_fields: &[Field],
908    reg: &Registry,
909    base: usize,
910    out: &mut MemProgram,
911) -> Result<()> {
912    if r_fields.len() != ra.fields.len() {
913        return Err(CompactError::Malformed(
914            "variant descriptor/schema field count mismatch",
915        ));
916    }
917    for step in compat::match_fields(
918        w_fields,
919        r_fields,
920        |ri, _| ra.fields[ri].default.is_some(),
921        |rf| {
922            incompatible(format!(
923                "required reader variant field '{}' is absent from the writer",
924                rf.name
925            ))
926        },
927    )? {
928        match step {
929            FieldMatch::Take {
930                writer,
931                reader_index: ri,
932            } => {
933                let fa = &ra.fields[ri];
934                lower_decode_node(&writer.schema, &fa.descriptor, reg, base + fa.offset, out)?;
935            }
936            FieldMatch::Skip { writer } => {
937                out.push(MemOp::SkipWire(Box::new(skip_op(&writer.schema, reg)?)));
938            }
939            FieldMatch::Default { reader_index: ri } => {
940                let fa = &ra.fields[ri];
941                let Some(d) = fa.default else {
942                    return Err(incompatible(format!(
943                        "required reader variant field '{}' is absent from the writer",
944                        r_fields[ri].name
945                    )));
946                };
947                out.push(MemOp::Default(Box::new(DefaultOp {
948                    offset: base + fa.offset,
949                    ctx: d.ctx,
950                    default: d.thunk,
951                })));
952            }
953        }
954    }
955    Ok(())
956}
957
958fn tuple_fields(elements: Vec<SchemaRef>) -> Vec<Field> {
959    elements
960        .into_iter()
961        .enumerate()
962        .map(|(i, schema)| Field {
963            name: i.to_string(),
964            schema,
965            required: true,
966        })
967        .collect()
968}
969
970/// The reader record's fields (for names), resolved from a reader schema reference.
971fn reader_record_fields(
972    r: &SchemaRef,
973    record_kind: RecordKind,
974    reg: &Registry,
975) -> Result<Vec<Field>> {
976    match (record_kind, compact::resolve(reg, r)?) {
977        (RecordKind::Struct, Resolved::Composite(SchemaKind::Struct { fields, .. })) => Ok(fields),
978        (RecordKind::Tuple, Resolved::Composite(SchemaKind::Tuple { elements })) => {
979            Ok(tuple_fields(elements))
980        }
981        _ => Err(incompatible("schema kinds differ")),
982    }
983}
984
985/// The reader enum's variants (for names + payload shapes), resolved from a reader
986/// schema reference.
987fn reader_enum_variants(r: &SchemaRef, reg: &Registry) -> Result<Vec<Variant>> {
988    match compact::resolve(reg, r)? {
989        Resolved::Composite(SchemaKind::Enum { variants, .. }) => Ok(variants),
990        _ => Err(CompactError::TypeMismatch {
991            expected: "enum reader schema for an enum descriptor",
992        }),
993    }
994}
995
996fn require_reader_list(r: &SchemaRef, reg: &Registry) -> Result<()> {
997    match compact::resolve(reg, r)? {
998        Resolved::Composite(SchemaKind::List { .. }) => Ok(()),
999        _ => Err(incompatible("schema kinds differ")),
1000    }
1001}
1002
1003fn require_reader_set(r: &SchemaRef, reg: &Registry) -> Result<()> {
1004    match compact::resolve(reg, r)? {
1005        Resolved::Composite(SchemaKind::Set { .. }) => Ok(()),
1006        _ => Err(incompatible("schema kinds differ")),
1007    }
1008}
1009
1010fn require_reader_option(r: &SchemaRef, reg: &Registry) -> Result<()> {
1011    match compact::resolve(reg, r)? {
1012        Resolved::Composite(SchemaKind::Option { .. }) => Ok(()),
1013        _ => Err(incompatible("schema kinds differ")),
1014    }
1015}
1016
1017fn require_reader_map(r: &SchemaRef, reg: &Registry) -> Result<()> {
1018    match compact::resolve(reg, r)? {
1019        Resolved::Composite(SchemaKind::Map { .. }) => Ok(()),
1020        _ => Err(incompatible("schema kinds differ")),
1021    }
1022}
1023
1024fn require_reader_dynamic(r: &SchemaRef, reg: &Registry) -> Result<()> {
1025    match compact::resolve(reg, r)? {
1026        Resolved::Composite(SchemaKind::Dynamic) => Ok(()),
1027        _ => Err(incompatible("schema kinds differ")),
1028    }
1029}
1030
1031fn require_reader_bytes(r: &SchemaRef, reg: &Registry) -> Result<()> {
1032    match compact::resolve(reg, r)? {
1033        Resolved::Primitive(Primitive::Bytes) => Ok(()),
1034        _ => Err(incompatible("primitive Bytes is not reader schema")),
1035    }
1036}
1037
1038/// The wire index of the schema enum variant named `name` (`Ok`/`Err` for a
1039/// `Result`), for lowering a [`ResultOp`].
1040fn variant_index_by_name(variants: &[Variant], name: &str) -> Result<u32> {
1041    variants
1042        .iter()
1043        .find(|v| v.name == name)
1044        .map(|v| v.index)
1045        .ok_or(CompactError::Malformed(
1046            "Result schema missing Ok or Err variant",
1047        ))
1048}
1049
1050/// Lower a single-schema [`ResultOp`]: take the Ok/Err wire indices from the schema
1051/// and the Ok/Err payload sub-programs from the descriptor.
1052fn lower_result(
1053    ra: &ResultAccess,
1054    variants: &[Variant],
1055    reg: &Registry,
1056    base: usize,
1057) -> Result<ResultOp> {
1058    let ok_wire_index = variant_index_by_name(variants, "Ok")?;
1059    let err_wire_index = variant_index_by_name(variants, "Err")?;
1060    let mut ok = Vec::new();
1061    lower_node(&ra.ok, reg, 0, &mut ok)?;
1062    let mut err = Vec::new();
1063    lower_node(&ra.err, reg, 0, &mut err)?;
1064    Ok(ResultOp {
1065        field_offset: base,
1066        ok: fuse(ok),
1067        ok_size: ra.ok.layout.size,
1068        ok_align: ra.ok.layout.align,
1069        ok_wire_index,
1070        err: fuse(err),
1071        err_size: ra.err.layout.size,
1072        err_align: ra.err.layout.align,
1073        err_wire_index,
1074        thunks: ra.thunks,
1075    })
1076}
1077
1078/// Lower a decode-compat [`ResultOp`]: match the writer enum's Ok/Err variants by
1079/// name and translate each arm's payload against the reader's Ok/Err descriptor.
1080fn lower_decode_result(
1081    wv: &[Variant],
1082    ra: &ResultAccess,
1083    reg: &Registry,
1084    base: usize,
1085) -> Result<ResultOp> {
1086    let ok_wv = wv
1087        .iter()
1088        .find(|v| v.name == "Ok")
1089        .ok_or_else(|| incompatible("writer Result schema missing Ok variant"))?;
1090    let err_wv = wv
1091        .iter()
1092        .find(|v| v.name == "Err")
1093        .ok_or_else(|| incompatible("writer Result schema missing Err variant"))?;
1094    Ok(ResultOp {
1095        field_offset: base,
1096        ok: lower_decode_result_arm(&ok_wv.payload, &ra.ok, reg)?,
1097        ok_size: ra.ok.layout.size,
1098        ok_align: ra.ok.layout.align,
1099        ok_wire_index: ok_wv.index,
1100        err: lower_decode_result_arm(&err_wv.payload, &ra.err, reg)?,
1101        err_size: ra.err.layout.size,
1102        err_align: ra.err.layout.align,
1103        err_wire_index: err_wv.index,
1104        thunks: ra.thunks,
1105    })
1106}
1107
1108/// Translate one `Result` arm: the writer payload is a newtype (`Ok(T)`/`Err(E)`),
1109/// translated against the reader arm's descriptor at offset 0 (the arm value start).
1110fn lower_decode_result_arm(
1111    w: &VariantPayload,
1112    reader: &Descriptor,
1113    reg: &Registry,
1114) -> Result<MemProgram> {
1115    let VariantPayload::Newtype(wr) = w else {
1116        return Err(incompatible("Result arm payload must be a newtype"));
1117    };
1118    let mut prog = Vec::new();
1119    lower_decode_node(wr, reader, reg, 0, &mut prog)?;
1120    Ok(fuse(prog))
1121}
1122
1123// ============================================================================
1124// Wire-skeleton lowering (skip a writer-only value)
1125// ============================================================================
1126
1127/// Resolve a writer schema reference into a [`SkipOp`] wire skeleton — a pre-built
1128/// recipe to advance the cursor past one value of that schema without touching
1129/// memory. Used for writer-only fields (`r[compat.skip-writer-only]`).
1130///
1131/// # Errors
1132/// [`CompactError::Unsupported`] for a kind the skip walker does not carry, or a
1133/// resolution error.
1134fn skip_op(writer: &SchemaRef, reg: &Registry) -> Result<SkipOp> {
1135    match compact::resolve(reg, writer)? {
1136        Resolved::Primitive(p) => match p {
1137            Primitive::String | Primitive::Bytes => Ok(SkipOp::Bytes {
1138                stride: 1,
1139                elem_align: 1,
1140            }),
1141            other => {
1142                let size = fixed_size(other).ok_or(CompactError::Unsupported(
1143                    "skip: variable-length scalar (datetime/uuid/qname)",
1144                ))?;
1145                Ok(SkipOp::Scalar {
1146                    size,
1147                    align: alignment(other),
1148                })
1149            }
1150        },
1151        Resolved::Composite(kind) => match kind {
1152            SchemaKind::Struct { fields, .. } => {
1153                let mut fs = Vec::with_capacity(fields.len());
1154                for f in &fields {
1155                    fs.push(skip_op(&f.schema, reg)?);
1156                }
1157                Ok(SkipOp::Struct(fs))
1158            }
1159            SchemaKind::Tuple { elements } => {
1160                let mut fs = Vec::with_capacity(elements.len());
1161                for e in &elements {
1162                    fs.push(skip_op(e, reg)?);
1163                }
1164                Ok(SkipOp::Struct(fs))
1165            }
1166            SchemaKind::Enum { variants, .. } => {
1167                let mut arms = Vec::with_capacity(variants.len());
1168                for v in &variants {
1169                    let fields = match &v.payload {
1170                        VariantPayload::Unit => Vec::new(),
1171                        VariantPayload::Newtype(r) => vec![skip_op(r, reg)?],
1172                        VariantPayload::Tuple(rs) => {
1173                            let mut fs = Vec::with_capacity(rs.len());
1174                            for r in rs {
1175                                fs.push(skip_op(r, reg)?);
1176                            }
1177                            fs
1178                        }
1179                        VariantPayload::Struct(fields) => {
1180                            let mut fs = Vec::with_capacity(fields.len());
1181                            for f in fields {
1182                                fs.push(skip_op(&f.schema, reg)?);
1183                            }
1184                            fs
1185                        }
1186                    };
1187                    arms.push((v.index, fields));
1188                }
1189                Ok(SkipOp::Enum(arms))
1190            }
1191            SchemaKind::List { element } | SchemaKind::Set { element } => {
1192                // A bulk byte run when the element is a fixed scalar covering its
1193                // own size (no inter-element wire padding), else a per-element seq.
1194                if let Resolved::Primitive(ep) = compact::resolve(reg, &element)?
1195                    && let Some(size) = fixed_size(ep)
1196                    && !matches!(ep, Primitive::String | Primitive::Bytes)
1197                {
1198                    let align = alignment(ep);
1199                    if size % align == 0 {
1200                        return Ok(SkipOp::Bytes {
1201                            stride: size,
1202                            elem_align: align,
1203                        });
1204                    }
1205                }
1206                Ok(SkipOp::Seq(Box::new(skip_op(&element, reg)?)))
1207            }
1208            SchemaKind::Option { element } => Ok(SkipOp::Option(Box::new(skip_op(&element, reg)?))),
1209            SchemaKind::Map { key, value } => Ok(SkipOp::Map(
1210                Box::new(skip_op(&key, reg)?),
1211                Box::new(skip_op(&value, reg)?),
1212            )),
1213            SchemaKind::Array { .. } => Err(CompactError::Unsupported("skip: fixed array")),
1214            SchemaKind::Tensor { .. } => Err(CompactError::Unsupported("skip: tensor")),
1215            SchemaKind::Channel { .. } => Err(CompactError::Unsupported("skip: channel")),
1216            SchemaKind::External { .. } => Err(CompactError::Unsupported("skip: external")),
1217            // A self-describing value is self-delimiting: skip it by decoding one
1218            // value and discarding it.
1219            SchemaKind::Dynamic => Ok(SkipOp::Dynamic),
1220            SchemaKind::Primitive(_) => {
1221                // A composite that resolved to a primitive kind: treat as scalar.
1222                Err(CompactError::Malformed(
1223                    "skip: primitive in composite position",
1224                ))
1225            }
1226        },
1227    }
1228}
1229
1230/// Read `width` (1/2/4/8) little-endian bytes at `ptr` as a `u64`.
1231///
1232/// # Safety
1233/// `ptr` must be readable for `width` bytes.
1234unsafe fn read_uint(ptr: *const u8, width: usize) -> u64 {
1235    let mut buf = [0u8; 8];
1236    // Safety: forwarded; `width <= 8`.
1237    unsafe { core::ptr::copy_nonoverlapping(ptr, buf.as_mut_ptr(), width) };
1238    u64::from_le_bytes(buf)
1239}
1240
1241/// Write the low `width` (1/2/4/8) bytes of `val` little-endian at `ptr`.
1242///
1243/// # Safety
1244/// `ptr` must be writable for `width` bytes.
1245unsafe fn write_uint(ptr: *mut u8, width: usize, val: u64) {
1246    let bytes = val.to_le_bytes();
1247    // Safety: forwarded; `width <= 8`.
1248    unsafe { core::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, width) };
1249}
1250
1251fn sign_extend(raw: u64, width: usize) -> i64 {
1252    if width >= 8 {
1253        raw as i64
1254    } else {
1255        let shift = 64 - width * 8;
1256        ((raw << shift) as i64) >> shift
1257    }
1258}
1259
1260fn signed_fits_width(value: i64, width: usize) -> bool {
1261    if width >= 8 {
1262        return true;
1263    }
1264    let bits = width * 8;
1265    let min = -(1i64 << (bits - 1));
1266    let max = (1i64 << (bits - 1)) - 1;
1267    (min..=max).contains(&value)
1268}
1269
1270/// A mask of the low `width` bytes (`width >= 8` → all ones).
1271fn width_mask(width: usize) -> u64 {
1272    if width >= 8 {
1273        u64::MAX
1274    } else {
1275        (1u64 << (width * 8)) - 1
1276    }
1277}
1278
1279/// [`ByteValidator`] for `String` byte runs: the bytes must be valid UTF-8
1280/// (`r[validate.text]`). Both the interpreter and the JIT call this.
1281///
1282/// # Safety
1283/// `ptr` must point to `len` readable bytes (`len == 0` permits any non-null,
1284/// aligned `ptr`, which a slice's `as_ptr` always satisfies).
1285unsafe extern "C" fn validate_utf8(ptr: *const u8, len: usize) -> bool {
1286    // Safety: forwarded — `ptr`/`len` describe a readable byte run.
1287    let bytes = unsafe { core::slice::from_raw_parts(ptr, len) };
1288    core::str::from_utf8(bytes).is_ok()
1289}
1290
1291/// [`ByteValidator`] for byte runs with no content constraint — `Vec<u8>` and
1292/// bulk `Vec<scalar>` runs accept any bytes.
1293///
1294/// # Safety
1295/// Reads nothing; the signature matches [`ByteValidator`].
1296unsafe extern "C" fn validate_any(_ptr: *const u8, _len: usize) -> bool {
1297    true
1298}
1299
1300// ============================================================================
1301// Encode
1302// ============================================================================
1303
1304/// Encode the value at `base` into compact bytes, by a prebuilt program.
1305///
1306/// # Safety
1307/// `base` must point to an initialized value matching the descriptor the program
1308/// was lowered from, readable for every `offset + size` the program touches.
1309#[must_use]
1310pub unsafe fn encode_with(lowered: &Lowered, base: *const u8) -> Vec<u8> {
1311    let mut out = Vec::new();
1312    // Safety: forwarded from this function's contract.
1313    unsafe { encode_program(&lowered.program, base, &mut out, &lowered.blocks) };
1314    out
1315}
1316
1317unsafe fn encode_program(
1318    program: &MemProgram,
1319    base: *const u8,
1320    out: &mut Vec<u8>,
1321    blocks: &BTreeMap<SchemaId, MemProgram>,
1322) {
1323    for op in program {
1324        match op {
1325            // A recursive back-edge: run the callee schema's block at `base + offset`.
1326            MemOp::CallBlock { schema, offset } => {
1327                let block = blocks
1328                    .get(schema)
1329                    .expect("CallBlock references a lowered recursion block");
1330                // Safety: the recursive value lives at `base + offset`.
1331                unsafe { encode_program(block, base.add(*offset), out, blocks) };
1332            }
1333            MemOp::Scalar {
1334                offset,
1335                size,
1336                align,
1337            } => {
1338                pad_to(out, *align);
1339                // Safety: the value is valid for reads over this field's bytes.
1340                let src = unsafe { core::slice::from_raw_parts(base.add(*offset), *size) };
1341                out.extend_from_slice(src);
1342            }
1343            MemOp::NativeInt {
1344                offset,
1345                mem_size,
1346                signed,
1347            } => {
1348                pad_to(out, 8);
1349                // Safety: the native integer field is readable over `mem_size` bytes.
1350                let raw = unsafe { read_uint(base.add(*offset), *mem_size) };
1351                if *signed {
1352                    out.extend_from_slice(&sign_extend(raw, *mem_size).to_le_bytes());
1353                } else {
1354                    out.extend_from_slice(&raw.to_le_bytes());
1355                }
1356            }
1357            MemOp::Sequence(s) => {
1358                // Safety: the sequence handle lives at `field_offset`.
1359                let list = unsafe { base.add(s.field_offset) };
1360                let n = unsafe { (s.thunks.len)(s.thunks.ctx, list) };
1361                write_u32(out, n as u32);
1362                let data = unsafe { (s.thunks.data)(s.thunks.ctx, list) };
1363                for i in 0..n {
1364                    // Safety: element `i` lives at `data + i*stride`.
1365                    unsafe { encode_program(&s.element, data.add(i * s.stride), out, blocks) };
1366                }
1367            }
1368            MemOp::Set(s) => {
1369                // Safety: the set handle lives at `field_offset`.
1370                let set = unsafe { base.add(s.field_offset) };
1371                let n = unsafe { (s.thunks.len)(s.thunks.ctx, set) };
1372                write_u32(out, n as u32);
1373                let it = unsafe { (s.thunks.iter_init)(s.thunks.ctx, set) };
1374                loop {
1375                    let mut value: *const u8 = core::ptr::null();
1376                    // Safety: `it` is a live iterator; the out-param is valid.
1377                    if !unsafe { (s.thunks.iter_next)(s.thunks.ctx, it, &mut value) } {
1378                        break;
1379                    }
1380                    // Safety: `value` borrows the current set element.
1381                    unsafe { encode_program(&s.element, value, out, blocks) };
1382                }
1383                // Safety: `it` was built by `iter_init` and is freed exactly once.
1384                unsafe { (s.thunks.iter_dealloc)(s.thunks.ctx, it) };
1385            }
1386            MemOp::Bytes(b) => {
1387                // Safety: the handle lives at field_offset; one bulk read of its
1388                // contiguous `count * stride` bytes.
1389                let list = unsafe { base.add(b.field_offset) };
1390                let count = unsafe { (b.thunks.len)(b.thunks.ctx, list) };
1391                write_u32(out, count as u32);
1392                // Alignment pads BEFORE an element's bytes; an empty run has no
1393                // elements, so it writes no padding (`r[compact.alignment]`).
1394                if count > 0 {
1395                    pad_to(out, b.elem_align);
1396                }
1397                let data = unsafe { (b.thunks.data)(b.thunks.ctx, list) };
1398                let src = unsafe { core::slice::from_raw_parts(data, count * b.stride) };
1399                out.extend_from_slice(src);
1400            }
1401            // Encode of a borrowed leaf is byte-identical to the owned bulk run: the
1402            // `&str`/`&[u8]` reads its length and contiguous bytes through the borrow
1403            // thunks and writes them as a `u32` count + `count * stride` bytes.
1404            MemOp::Borrow(b) => {
1405                // Safety: the borrowed handle (fat pointer) lives at field_offset.
1406                let field = unsafe { base.add(b.field_offset) };
1407                let count = unsafe { (b.thunks.len)(b.thunks.ctx, field) };
1408                write_u32(out, count as u32);
1409                // Alignment pads BEFORE an element's bytes; an empty run has no
1410                // elements, so it writes no padding (`r[compact.alignment]`).
1411                if count > 0 {
1412                    pad_to(out, b.elem_align);
1413                }
1414                let data = unsafe { (b.thunks.data)(b.thunks.ctx, field) };
1415                let src = unsafe { core::slice::from_raw_parts(data, count * b.stride) };
1416                out.extend_from_slice(src);
1417            }
1418            MemOp::Option(o) => {
1419                // Safety: the option handle lives at field_offset.
1420                let option = unsafe { base.add(o.field_offset) };
1421                if unsafe { (o.thunks.is_some)(o.thunks.ctx, option) } {
1422                    write_u8(out, 1);
1423                    // Safety: present, so `get_value` returns a valid inner pointer.
1424                    let inner = unsafe { (o.thunks.get_value)(o.thunks.ctx, option) };
1425                    unsafe { encode_program(&o.some, inner, out, blocks) };
1426                } else {
1427                    write_u8(out, 0);
1428                }
1429            }
1430            MemOp::Enum(e) => {
1431                // Read the in-memory discriminant to pick the active variant.
1432                // Safety: the discriminant lives at base + tag_offset, tag_width wide.
1433                let disc = unsafe { read_uint(base.add(e.tag_offset), e.tag_width) };
1434                let mask = width_mask(e.tag_width);
1435                let variant = e
1436                    .variants
1437                    .iter()
1438                    .find(|v| (v.selector & mask) == (disc & mask))
1439                    .expect("enum discriminant matches no modelled variant (invalid value)");
1440                write_u32(out, variant.wire_index);
1441                // The payload fields live at base-relative offsets (same base).
1442                unsafe { encode_program(&variant.payload, base, out, blocks) };
1443            }
1444            MemOp::Map(m) => {
1445                // Safety: the map handle lives at field_offset.
1446                let map = unsafe { base.add(m.field_offset) };
1447                let n = unsafe { (m.thunks.len)(m.thunks.ctx, map) };
1448                write_u32(out, n as u32);
1449                // Drive a stateful iterator over the entries, encoding each
1450                // (key, value) pair in turn.
1451                let it = unsafe { (m.thunks.iter_init)(m.thunks.ctx, map) };
1452                loop {
1453                    let mut k: *const u8 = core::ptr::null();
1454                    let mut v: *const u8 = core::ptr::null();
1455                    // Safety: `it` is a live iterator; the out-params are valid.
1456                    if !unsafe { (m.thunks.iter_next)(m.thunks.ctx, it, &mut k, &mut v) } {
1457                        break;
1458                    }
1459                    // Safety: `k`/`v` borrow the current entry's key/value.
1460                    unsafe { encode_program(&m.key, k, out, blocks) };
1461                    unsafe { encode_program(&m.value, v, out, blocks) };
1462                }
1463                // Safety: `it` was built by `iter_init` and is freed exactly once.
1464                unsafe { (m.thunks.iter_dealloc)(m.thunks.ctx, it) };
1465            }
1466            // r[impl ir.memory] — a self-describing dynamic `Value`: write it through
1467            // the self-describing codec (self-delimiting; no length prefix).
1468            MemOp::Dynamic { field_offset } => {
1469                // Safety: the field at `field_offset` is an initialized `Value`.
1470                let v = unsafe { &*base.add(*field_offset).cast::<Value>() };
1471                write_value(out, v)
1472                    .expect("dynamic value is encodable by the self-describing codec");
1473            }
1474            // r[impl ir.memory] — Result<T, E>: read which arm is active via the
1475            // vtable, write its wire index, then encode that arm's payload at the
1476            // pointer the getter returns (the repr(Rust) layout is never assumed).
1477            MemOp::Result(rs) => {
1478                // Safety: the result handle lives at field_offset.
1479                let result = unsafe { base.add(rs.field_offset) };
1480                if unsafe { (rs.thunks.is_ok)(rs.thunks.ctx, result) } {
1481                    write_u32(out, rs.ok_wire_index);
1482                    // Safety: Ok, so `get_ok` returns a valid inner pointer.
1483                    let ok = unsafe { (rs.thunks.get_ok)(rs.thunks.ctx, result) };
1484                    unsafe { encode_program(&rs.ok, ok, out, blocks) };
1485                } else {
1486                    write_u32(out, rs.err_wire_index);
1487                    // Safety: Err, so `get_err` returns a valid inner pointer.
1488                    let err = unsafe { (rs.thunks.get_err)(rs.thunks.ctx, result) };
1489                    unsafe { encode_program(&rs.err, err, out, blocks) };
1490                }
1491            }
1492            // r[impl descriptors.thunk-binding]
1493            MemOp::Pointer(p) => {
1494                // Safety: the owning pointer handle lives at field_offset.
1495                let pointer = unsafe { base.add(p.field_offset) };
1496                // Safety: `borrow` returns a valid pointee pointer for initialized
1497                // strong pointers such as Box/Rc/Arc.
1498                let pointee = unsafe { (p.thunks.borrow)(p.thunks.ctx, pointer) };
1499                unsafe { encode_program(&p.pointee, pointee, out, blocks) };
1500            }
1501            // r[impl ir.memory] — opaque field: reserve a `u32`
1502            // length (align 1 — wire-identical to a `Primitive::Bytes` run, so no
1503            // pre-pad), append the inner bytes via the thunk, then backpatch the
1504            // length. The backpatch is what fixed-width (non-varint) framing buys.
1505            MemOp::Opaque(o) => {
1506                // Safety: the opaque field lives at `field_offset`.
1507                let field = unsafe { base.add(o.field_offset) };
1508                let len_pos = out.len();
1509                write_u32(out, 0); // length placeholder, backpatched below
1510                let start = out.len();
1511                // Safety: `field` points at the opaque field; the thunk appends the
1512                // inner value's encoded bytes to `out`.
1513                unsafe { (o.thunks.encode)(o.thunks.ctx, field, core::ptr::from_mut(out)) };
1514                let inner_len = (out.len() - start) as u32;
1515                out[len_pos..len_pos + 4].copy_from_slice(&inner_len.to_le_bytes());
1516            }
1517            // Compat-only decode ops never appear in an encode program (encode is
1518            // single-schema: `lower`, not `lower_decode`).
1519            MemOp::SkipWire(_) | MemOp::Default(_) => {
1520                unreachable!("typed encode never emits compat skip/default ops")
1521            }
1522        }
1523    }
1524}
1525
1526/// Lower `descriptor` and encode the value at `base` in one step.
1527///
1528/// # Safety
1529/// As [`encode_with`].
1530///
1531/// # Errors
1532/// As [`lower`].
1533pub unsafe fn encode(
1534    base: *const u8,
1535    descriptor: &Descriptor,
1536    descriptor_blocks: &HashMap<SchemaId, Descriptor>,
1537    reg: &Registry,
1538) -> Result<Vec<u8>> {
1539    let lowered = lower_typed(descriptor, descriptor_blocks, reg)?;
1540    // Safety: forwarded from this function's contract.
1541    Ok(unsafe { encode_with(&lowered, base) })
1542}
1543
1544// ============================================================================
1545// Decode
1546// ============================================================================
1547
1548/// Decode compact `bytes` into the value at `base`, by a prebuilt program,
1549/// rejecting trailing bytes.
1550///
1551/// # Safety
1552/// `base` must point to writable, suitably sized and aligned uninitialized
1553/// storage for the descriptor the program was lowered from. On `Ok` the bytes it
1554/// covers are initialized.
1555///
1556/// # Errors
1557/// [`CompactError`] for malformed or trailing input.
1558pub unsafe fn decode_with(lowered: &Lowered, bytes: &[u8], base: *mut u8) -> Result<()> {
1559    let mut r = Reader::new(bytes);
1560    // Safety: forwarded from this function's contract.
1561    unsafe { decode_program(&lowered.program, &mut r, base, &lowered.blocks)? };
1562    if r.remaining() != 0 {
1563        return Err(CompactError::Decode(DecodeError::TrailingBytes(
1564            r.remaining(),
1565        )));
1566    }
1567    Ok(())
1568}
1569
1570unsafe fn decode_program(
1571    program: &MemProgram,
1572    r: &mut Reader,
1573    base: *mut u8,
1574    blocks: &BTreeMap<SchemaId, MemProgram>,
1575) -> Result<()> {
1576    for op in program {
1577        match op {
1578            // A recursive back-edge: run the callee schema's block at `base + offset`.
1579            MemOp::CallBlock { schema, offset } => {
1580                let block = blocks
1581                    .get(schema)
1582                    .expect("CallBlock references a lowered recursion block");
1583                // Safety: the recursive value lives at `base + offset`.
1584                unsafe { decode_program(block, r, base.add(*offset), blocks)? };
1585            }
1586            MemOp::Scalar {
1587                offset,
1588                size,
1589                align,
1590            } => {
1591                skip_pad(r, *align)?;
1592                let src = r.read_slice(*size)?;
1593                // Safety: `base` is valid for writes over this field's bytes, and
1594                // the wire bytes equal the in-memory bytes for a fixed scalar.
1595                unsafe { core::ptr::copy_nonoverlapping(src.as_ptr(), base.add(*offset), *size) };
1596            }
1597            MemOp::NativeInt {
1598                offset,
1599                mem_size,
1600                signed,
1601            } => {
1602                skip_pad(r, 8)?;
1603                if *signed {
1604                    let value = r.read_i64()?;
1605                    if !signed_fits_width(value, *mem_size) {
1606                        return Err(DecodeError::Malformed(
1607                            "native-sized signed integer out of range",
1608                        )
1609                        .into());
1610                    }
1611                    // Safety: `base + offset` is writable for the native integer field.
1612                    unsafe { write_uint(base.add(*offset), *mem_size, value as u64) };
1613                } else {
1614                    let value = r.read_u64()?;
1615                    if *mem_size < 8 && value > width_mask(*mem_size) {
1616                        return Err(DecodeError::Malformed(
1617                            "native-sized unsigned integer out of range",
1618                        )
1619                        .into());
1620                    }
1621                    // Safety: `base + offset` is writable for the native integer field.
1622                    unsafe { write_uint(base.add(*offset), *mem_size, value) };
1623                }
1624            }
1625            MemOp::Sequence(s) => {
1626                let count = r.read_len(s.min_wire)?;
1627                // Engine owns the element buffer: allocate it, fill it directly,
1628                // then hand it to the sequence with `from_raw_parts`.
1629                // A zero total byte size — an empty sequence, OR any number of
1630                // zero-sized elements (`stride == 0`) — must not reach the
1631                // allocator: a zero-size `Layout` is UB to allocate. Use a
1632                // dangling-but-aligned pointer, exactly as `Vec` does for ZSTs;
1633                // `from_raw_parts` then adopts `count` elements over no bytes
1634                // (`size_of::<T>() * cap == 0` matches the empty allocation).
1635                let (buffer, cap) = if count == 0 || s.stride == 0 {
1636                    (s.elem_align as *mut u8, count)
1637                } else {
1638                    let layout = alloc::Layout::from_size_align(count * s.stride, s.elem_align)
1639                        .map_err(|_| {
1640                            CompactError::Decode(DecodeError::Malformed("sequence layout overflow"))
1641                        })?;
1642                    // Safety: layout has non-zero size (count > 0 and stride > 0).
1643                    let buf = unsafe { alloc::alloc(layout) };
1644                    if buf.is_null() {
1645                        alloc::handle_alloc_error(layout);
1646                    }
1647                    (buf, count)
1648                };
1649                for i in 0..count {
1650                    // Safety: element `i` occupies `buffer + i*stride`. For a ZST
1651                    // (`stride == 0`) every element shares the dangling pointer and
1652                    // `.add(0)` is sound (provenance is only required for non-zero
1653                    // offsets); the element program touches no buffer bytes.
1654                    if let Err(e) =
1655                        unsafe { decode_program(&s.element, r, buffer.add(i * s.stride), blocks) }
1656                    {
1657                        // Free the buffer on a mid-fill failure (elements are
1658                        // assumed trivially droppable for now). Only a real,
1659                        // non-zero-size allocation needs freeing.
1660                        if cap != 0 && s.stride != 0 {
1661                            let layout =
1662                                alloc::Layout::from_size_align(cap * s.stride, s.elem_align)
1663                                    .unwrap();
1664                            unsafe { alloc::dealloc(buffer, layout) };
1665                        }
1666                        return Err(e);
1667                    }
1668                }
1669                // Safety: the handle lives at `field_offset`; the buffer holds
1670                // `count` initialized elements allocated with the element layout.
1671                let list = unsafe { base.add(s.field_offset) };
1672                unsafe { (s.thunks.from_raw_parts)(s.thunks.ctx, list, buffer, count, cap) };
1673            }
1674            MemOp::Set(s) => {
1675                let count = r.read_len(s.min_wire)?;
1676                // Safety: the set handle lives at field_offset.
1677                let set = unsafe { base.add(s.field_offset) };
1678                // Initialize the (uninitialized) set with room for `count` entries.
1679                // NOTE: a decode error after this point leaks the partial set — the
1680                // same trivially-droppable limitation as sequences/options/maps.
1681                unsafe { (s.thunks.init_with_capacity)(s.thunks.ctx, set, count) };
1682                for _ in 0..count {
1683                    let (scratch, layout) = alloc_scratch(s.elem_size, s.elem_align)?;
1684                    // Safety: scratch is elem_size bytes at elem_align.
1685                    if let Err(e) = unsafe { decode_program(&s.element, r, scratch, blocks) } {
1686                        free_scratch(scratch, layout);
1687                        return Err(e);
1688                    }
1689                    // Safety: scratch holds an initialized element; `insert` moves it
1690                    // into the set and tells us whether it was unique.
1691                    let inserted = unsafe { (s.thunks.insert)(s.thunks.ctx, set, scratch) };
1692                    free_scratch(scratch, layout);
1693                    if !inserted {
1694                        // r[impl validate.uniqueness]
1695                        return Err(CompactError::Decode(DecodeError::DuplicateElement));
1696                    }
1697                }
1698            }
1699            MemOp::Bytes(b) => {
1700                let count = r.read_len(b.stride.max(1))?;
1701                // Symmetric with encode: only an empty run skips no padding.
1702                if count > 0 {
1703                    skip_pad(r, b.elem_align)?;
1704                }
1705                let total = count * b.stride;
1706                let src = r.read_slice(total)?;
1707                // r[impl validate.text] — validate the run before adopting it
1708                // (UTF-8 for `String`, a no-op for `Vec`). The JIT calls the very
1709                // same thunk, so both engines share one validation path.
1710                // Safety: `src` is `total` readable bytes.
1711                if !unsafe { (b.validate)(src.as_ptr(), total) } {
1712                    return Err(CompactError::Decode(DecodeError::InvalidUtf8));
1713                }
1714                // Allocate, bulk-copy the run in, adopt it via `from_raw_parts`.
1715                let (buffer, cap) = if total == 0 {
1716                    (b.elem_align as *mut u8, 0usize)
1717                } else {
1718                    let layout =
1719                        alloc::Layout::from_size_align(total, b.elem_align).map_err(|_| {
1720                            CompactError::Decode(DecodeError::Malformed("bytes layout overflow"))
1721                        })?;
1722                    // Safety: total > 0.
1723                    let buf = unsafe { alloc::alloc(layout) };
1724                    if buf.is_null() {
1725                        alloc::handle_alloc_error(layout);
1726                    }
1727                    // Safety: src and buf are both `total` bytes, non-overlapping.
1728                    unsafe { core::ptr::copy_nonoverlapping(src.as_ptr(), buf, total) };
1729                    (buf, count)
1730                };
1731                // Safety: the handle lives at field_offset; `from_raw_parts` adopts
1732                // the `count`-element buffer.
1733                let list = unsafe { base.add(b.field_offset) };
1734                unsafe { (b.thunks.from_raw_parts)(b.thunks.ctx, list, buffer, count, cap) };
1735            }
1736            // r[impl ir.memory] — BORROWED, zero-copy `&str`/`&[u8]`: same wire as
1737            // `Bytes`, but write a fat pointer INTO the input `bytes` — NO alloc, NO
1738            // copy. The written `&str`/`&[u8]` borrows the reader's input buffer, so
1739            // the caller must keep `bytes` alive as long as the decoded value (the
1740            // standard zero-copy contract, documented on `decode_with`'s `Safety`).
1741            MemOp::Borrow(b) => {
1742                let count = r.read_len(b.stride.max(1))?;
1743                // Symmetric with encode: only an empty run skips no padding.
1744                if count > 0 {
1745                    skip_pad(r, b.elem_align)?;
1746                }
1747                let total = count * b.stride;
1748                // `src` is a slice INTO the input `bytes` (no copy): the borrowed
1749                // value will point at exactly these bytes.
1750                let src = r.read_slice(total)?;
1751                // Safety: the borrowed handle lives at field_offset; the construct
1752                // thunk builds the `&str`/`&[u8]` fat pointer there, pointing into the
1753                // input. Returns false on invalid content (e.g. non-UTF-8 `&str`).
1754                let field = unsafe { base.add(b.field_offset) };
1755                if !unsafe { (b.thunks.set_borrowed)(b.thunks.ctx, field, src.as_ptr(), count) } {
1756                    return Err(CompactError::Decode(DecodeError::InvalidUtf8));
1757                }
1758            }
1759            MemOp::Option(o) => {
1760                // Safety: the option handle lives at field_offset.
1761                let option = unsafe { base.add(o.field_offset) };
1762                match r.read_u8()? {
1763                    0 => unsafe { (o.thunks.init_none)(o.thunks.ctx, option) },
1764                    1 => {
1765                        // Decode the inner into an engine-owned scratch buffer, then
1766                        // move it into the Option (init_some does a ptr::read) and
1767                        // free the scratch WITHOUT dropping (ownership transferred).
1768                        let (scratch, layout) = if o.inner_size == 0 {
1769                            (o.inner_align as *mut u8, None)
1770                        } else {
1771                            let layout =
1772                                alloc::Layout::from_size_align(o.inner_size, o.inner_align)
1773                                    .map_err(|_| {
1774                                        CompactError::Decode(DecodeError::Malformed(
1775                                            "option inner layout overflow",
1776                                        ))
1777                                    })?;
1778                            // Safety: inner_size > 0.
1779                            let buf = unsafe { alloc::alloc(layout) };
1780                            if buf.is_null() {
1781                                alloc::handle_alloc_error(layout);
1782                            }
1783                            (buf, Some(layout))
1784                        };
1785                        // Safety: scratch is inner_size bytes at inner_align.
1786                        if let Err(e) = unsafe { decode_program(&o.some, r, scratch, blocks) } {
1787                            if let Some(layout) = layout {
1788                                unsafe { alloc::dealloc(scratch, layout) };
1789                            }
1790                            return Err(e);
1791                        }
1792                        // Safety: scratch holds the initialized inner; init_some moves
1793                        // it into the option.
1794                        unsafe { (o.thunks.init_some)(o.thunks.ctx, option, scratch) };
1795                        if let Some(layout) = layout {
1796                            unsafe { alloc::dealloc(scratch, layout) };
1797                        }
1798                    }
1799                    b => return Err(CompactError::Decode(DecodeError::InvalidBool(b))),
1800                }
1801            }
1802            MemOp::Enum(e) => {
1803                let wire_index = r.read_u32()?;
1804                let variant = match e.variants.iter().find(|v| v.wire_index == wire_index) {
1805                    Some(v) => v,
1806                    None if e.writer_only.contains(&wire_index) => {
1807                        // A variant the writer has but the reader lacks
1808                        // (`r[compat.enum]`) — the same error plan.rs reports.
1809                        return Err(CompactError::WriterOnlyVariant(wire_index));
1810                    }
1811                    None => return Err(CompactError::BadVariantIndex(wire_index)),
1812                };
1813                // Write the in-memory discriminant, then decode the payload fields
1814                // (disjoint memory: the discriminant precedes every field).
1815                // Safety: the discriminant lives at base + tag_offset, tag_width wide.
1816                unsafe { write_uint(base.add(e.tag_offset), e.tag_width, variant.selector) };
1817                // Safety: payload fields write within the enum's storage at base.
1818                unsafe { decode_program(&variant.payload, r, base, blocks)? };
1819            }
1820            MemOp::Map(m) => {
1821                let n = r.read_len(1)?;
1822                // Safety: the map handle lives at field_offset.
1823                let map = unsafe { base.add(m.field_offset) };
1824                // Initialize the (uninitialized) map with room for `n` entries.
1825                // NOTE: a decode error after this point leaks the partial map — the
1826                // same trivially-droppable limitation as sequences/options.
1827                unsafe { (m.thunks.init_with_capacity)(m.thunks.ctx, map, n) };
1828                for _ in 0..n {
1829                    // Engine-owned scratch for the key and value: decode each in
1830                    // place, then `insert` moves both out (ptr::read), so we free the
1831                    // scratch WITHOUT dropping. A zero-size element needs no alloc — a
1832                    // dangling-but-aligned pointer suffices.
1833                    let (key_scratch, key_layout) = alloc_scratch(m.key_size, m.key_align)?;
1834                    let (value_scratch, value_layout) =
1835                        match alloc_scratch(m.value_size, m.value_align) {
1836                            Ok(s) => s,
1837                            Err(e) => {
1838                                free_scratch(key_scratch, key_layout);
1839                                return Err(e);
1840                            }
1841                        };
1842                    // Safety: key_scratch is key_size bytes at key_align.
1843                    if let Err(e) = unsafe { decode_program(&m.key, r, key_scratch, blocks) } {
1844                        free_scratch(key_scratch, key_layout);
1845                        free_scratch(value_scratch, value_layout);
1846                        return Err(e);
1847                    }
1848                    // Safety: value_scratch is value_size bytes at value_align.
1849                    if let Err(e) = unsafe { decode_program(&m.value, r, value_scratch, blocks) } {
1850                        free_scratch(key_scratch, key_layout);
1851                        free_scratch(value_scratch, value_layout);
1852                        return Err(e);
1853                    }
1854                    // Safety: both scratch buffers hold initialized values; `insert`
1855                    // moves them into the map.
1856                    unsafe {
1857                        (m.thunks.insert)(m.thunks.ctx, map, key_scratch, value_scratch);
1858                    }
1859                    // The key and value were moved into the map; free without dropping.
1860                    free_scratch(key_scratch, key_layout);
1861                    free_scratch(value_scratch, value_layout);
1862                }
1863                // A repeated key collapses two entries into one — reject it, matching
1864                // the dynamic codec's duplicate-key rejection (the oracle).
1865                if unsafe { (m.thunks.len)(m.thunks.ctx, map) } != n {
1866                    return Err(CompactError::Decode(DecodeError::DuplicateKey));
1867                }
1868            }
1869            // r[impl ir.memory] — a self-describing dynamic `Value`: decode one value
1870            // (self-delimiting) and write it into the field.
1871            MemOp::Dynamic { field_offset } => {
1872                let v = read_value(r)?;
1873                // Safety: `base + field_offset` is uninitialized `Value` storage; we
1874                // move the decoded value in.
1875                unsafe { core::ptr::write(base.add(*field_offset).cast::<Value>(), v) };
1876            }
1877            // r[impl ir.memory] — Result<T, E>: read the u32 wire index, decode the
1878            // matching arm's payload into an engine scratch buffer, then move it into
1879            // the Result via `init_ok`/`init_err` (the repr(Rust) layout is built by
1880            // the vtable, mirroring the Option some-arm). An index matching neither
1881            // arm is a decode error.
1882            MemOp::Result(rs) => {
1883                let idx = r.read_u32()?;
1884                // Safety: the result handle lives at field_offset.
1885                let result = unsafe { base.add(rs.field_offset) };
1886                if idx == rs.ok_wire_index {
1887                    // Safety: `result` is uninitialized Result storage; `init_ok`
1888                    // moves the decoded Ok payload in.
1889                    unsafe {
1890                        decode_into_via_init(
1891                            &rs.ok,
1892                            rs.ok_size,
1893                            rs.ok_align,
1894                            r,
1895                            InitTarget {
1896                                ctx: rs.thunks.ctx,
1897                                handle: result,
1898                                init: rs.thunks.init_ok,
1899                            },
1900                            blocks,
1901                        )?
1902                    };
1903                } else if idx == rs.err_wire_index {
1904                    // Safety: as above, `init_err` moves the decoded Err payload in.
1905                    unsafe {
1906                        decode_into_via_init(
1907                            &rs.err,
1908                            rs.err_size,
1909                            rs.err_align,
1910                            r,
1911                            InitTarget {
1912                                ctx: rs.thunks.ctx,
1913                                handle: result,
1914                                init: rs.thunks.init_err,
1915                            },
1916                            blocks,
1917                        )?
1918                    };
1919                } else {
1920                    return Err(CompactError::BadVariantIndex(idx));
1921                }
1922            }
1923            // r[impl descriptors.thunk-binding]
1924            MemOp::Pointer(p) => {
1925                // Safety: the pointer handle lives at field_offset and is
1926                // uninitialized; `init` moves the scratch-decoded pointee into it.
1927                unsafe {
1928                    decode_into_via_init(
1929                        &p.pointee,
1930                        p.pointee_size,
1931                        p.pointee_align,
1932                        r,
1933                        InitTarget {
1934                            ctx: p.thunks.ctx,
1935                            handle: base.add(p.field_offset),
1936                            init: p.thunks.init,
1937                        },
1938                        blocks,
1939                    )?
1940                };
1941            }
1942            // r[impl compat.skip-writer-only] — consume a writer-only value's wire
1943            // bytes; write nothing to memory. The walker lives in `phon-ir` next to
1944            // `SkipOp`, shared with the JIT so both decode engines skip identically.
1945            MemOp::SkipWire(s) => phon_ir::ir::skip(r, s)?,
1946            // r[impl compat.reader-only-fields]
1947            // r[impl compat.defaults-are-reader-side]
1948            // Write a reader-only field's default in place; read no wire.
1949            MemOp::Default(d) => {
1950                // Safety: `base + offset` is uninitialized storage of the reader
1951                // field's type; the bound thunk initializes it.
1952                unsafe { (d.default)(d.ctx, base.add(d.offset)) };
1953            }
1954            // r[impl ir.memory] — opaque field: read the `u32`
1955            // length (bounds-checked), borrow the inner span from the input, and hand
1956            // it to the adapter. The decoded value may borrow that span (zero-copy),
1957            // so the caller must keep the input alive as long as it (the contract on
1958            // `decode_with`). The inner schema is never known here.
1959            MemOp::Opaque(o) => {
1960                let len = r.read_len(1)?;
1961                let span = r.read_slice(len)?;
1962                // Safety: the opaque field lives at `field_offset`; the decode thunk
1963                // builds it from the borrowed span. `false` ⇒ the adapter rejected it,
1964                // leaving `slot` uninitialized.
1965                let slot = unsafe { base.add(o.field_offset) };
1966                if !unsafe { (o.thunks.decode)(o.thunks.ctx, span.as_ptr(), len, slot) } {
1967                    return Err(CompactError::Decode(DecodeError::Malformed(
1968                        "opaque adapter rejected input",
1969                    )));
1970                }
1971            }
1972        }
1973    }
1974    Ok(())
1975}
1976
1977/// Allocate an engine-owned scratch buffer of `size`/`align` for a decoded
1978/// key/value before it is moved into a map. A zero-size element needs no
1979/// allocation: a dangling-but-aligned pointer suffices (and `free_scratch` then
1980/// does nothing for it).
1981fn alloc_scratch(size: usize, align: usize) -> Result<(*mut u8, Option<alloc::Layout>)> {
1982    if size == 0 {
1983        Ok((align as *mut u8, None))
1984    } else {
1985        let layout = alloc::Layout::from_size_align(size, align).map_err(|_| {
1986            CompactError::Decode(DecodeError::Malformed("map scratch layout overflow"))
1987        })?;
1988        // Safety: size > 0.
1989        let buf = unsafe { alloc::alloc(layout) };
1990        if buf.is_null() {
1991            alloc::handle_alloc_error(layout);
1992        }
1993        Ok((buf, Some(layout)))
1994    }
1995}
1996
1997/// Free a scratch buffer from [`alloc_scratch`] WITHOUT dropping its contents
1998/// (ownership was moved into the map by `insert`). A `None` layout is a zero-size
1999/// dangling pointer that was never allocated.
2000fn free_scratch(buf: *mut u8, layout: Option<alloc::Layout>) {
2001    if let Some(layout) = layout {
2002        // Safety: `buf` was allocated by `alloc_scratch` with this exact layout.
2003        unsafe { alloc::dealloc(buf, layout) };
2004    }
2005}
2006
2007struct InitTarget {
2008    ctx: *const (),
2009    handle: *mut u8,
2010    init: unsafe extern "C" fn(ctx: *const (), handle: *mut u8, value: *mut u8),
2011}
2012
2013/// Decode one `program`'s value of `size`/`align` into an engine-owned scratch
2014/// buffer, then move it into `handle` via the `init` thunk (which `ptr::read`s the
2015/// scratch), freeing the scratch WITHOUT dropping. The construction half of a
2016/// [`MemOp::Result`] arm (and the same shape as the Option some-arm); `init` is
2017/// `init_ok` or `init_err`.
2018///
2019/// # Safety
2020/// `handle` must be uninitialized storage for the containing type; `program` must
2021/// match the arm payload of `size`/`align`; `ctx`/`init` are the bound result thunks.
2022unsafe fn decode_into_via_init(
2023    program: &MemProgram,
2024    size: usize,
2025    align: usize,
2026    r: &mut Reader,
2027    target: InitTarget,
2028    blocks: &BTreeMap<SchemaId, MemProgram>,
2029) -> Result<()> {
2030    let (scratch, layout) = alloc_scratch(size, align)?;
2031    // Safety: scratch is `size` bytes at `align`.
2032    if let Err(e) = unsafe { decode_program(program, r, scratch, blocks) } {
2033        free_scratch(scratch, layout);
2034        return Err(e);
2035    }
2036    // Safety: scratch holds the initialized arm payload; `init` moves it into `handle`.
2037    unsafe { (target.init)(target.ctx, target.handle, scratch) };
2038    free_scratch(scratch, layout);
2039    Ok(())
2040}
2041
2042/// Lower `descriptor` and decode `bytes` into the value at `base` in one step.
2043///
2044/// # Safety
2045/// As [`decode_with`].
2046///
2047/// # Errors
2048/// As [`lower`] and [`decode_with`].
2049pub unsafe fn decode(
2050    bytes: &[u8],
2051    descriptor: &Descriptor,
2052    descriptor_blocks: &HashMap<SchemaId, Descriptor>,
2053    reg: &Registry,
2054    base: *mut u8,
2055) -> Result<()> {
2056    let lowered = lower_typed(descriptor, descriptor_blocks, reg)?;
2057    // Safety: forwarded from this function's contract.
2058    unsafe { decode_with(&lowered, bytes, base) }
2059}
2060
2061#[cfg(test)]
2062mod tests {
2063    use super::*;
2064    use core::mem::{MaybeUninit, align_of, offset_of, size_of};
2065    use facet_value::{VArray, Value};
2066    use phon_ir::{FieldAccess, Layout, SeqThunks, SequenceAccess};
2067    use phon_schema::bytes::{write_i64, write_u64};
2068    use phon_schema::{Schema, SchemaId, SchemaRef, primitive_id};
2069
2070    // Hand-written list thunks for `Vec<u32>`. The facet bridge will generate
2071    // equivalents from the list vtable; here we wire them by hand to exercise the
2072    // engine's sequence machinery on a real `Vec`. The engine allocates the
2073    // buffer; `from_raw_parts` adopts it.
2074    unsafe extern "C" fn vu32_from_raw_parts(
2075        _ctx: *const (),
2076        list: *mut u8,
2077        ptr: *mut u8,
2078        len: usize,
2079        cap: usize,
2080    ) {
2081        let v = unsafe { Vec::<u32>::from_raw_parts(ptr.cast::<u32>(), len, cap) };
2082        unsafe { core::ptr::write(list.cast::<Vec<u32>>(), v) };
2083    }
2084    unsafe extern "C" fn vu32_len(_ctx: *const (), list: *const u8) -> usize {
2085        unsafe { (*list.cast::<Vec<u32>>()).len() }
2086    }
2087    unsafe extern "C" fn vu32_data(_ctx: *const (), list: *const u8) -> *const u8 {
2088        unsafe { (*list.cast::<Vec<u32>>()).as_ptr().cast::<u8>() }
2089    }
2090
2091    fn vu32_thunks() -> SeqThunks {
2092        SeqThunks {
2093            ctx: core::ptr::null(),
2094            from_raw_parts: vu32_from_raw_parts,
2095            len: vu32_len,
2096            data: vu32_data,
2097        }
2098    }
2099
2100    fn vec_u32_descriptor(schema: SchemaId) -> Descriptor {
2101        Descriptor {
2102            schema: SchemaRef::concrete(schema),
2103            layout: Layout {
2104                size: size_of::<Vec<u32>>(),
2105                align: align_of::<Vec<u32>>(),
2106            },
2107            access: Access::Sequence(SequenceAccess {
2108                element: Box::new(Descriptor {
2109                    schema: SchemaRef::concrete(primitive_id(Primitive::U32)),
2110                    layout: Layout { size: 4, align: 4 },
2111                    access: Access::Scalar,
2112                }),
2113                storage: SequenceStorage::Vtable(vu32_thunks()),
2114            }),
2115        }
2116    }
2117
2118    #[repr(C)]
2119    #[derive(Debug, PartialEq)]
2120    struct NarrowNativeInts {
2121        count: u32,
2122        delta: i32,
2123    }
2124
2125    fn narrow_native_int_schema(schema: SchemaId) -> Schema {
2126        Schema {
2127            id: schema,
2128            type_params: Vec::new(),
2129            kind: SchemaKind::Struct {
2130                name: "NarrowNativeInts".to_string(),
2131                fields: vec![
2132                    Field {
2133                        name: "count".to_string(),
2134                        schema: SchemaRef::concrete(primitive_id(Primitive::U64)),
2135                        required: true,
2136                    },
2137                    Field {
2138                        name: "delta".to_string(),
2139                        schema: SchemaRef::concrete(primitive_id(Primitive::I64)),
2140                        required: true,
2141                    },
2142                ],
2143            },
2144        }
2145    }
2146
2147    fn narrow_native_int_descriptor(schema: SchemaId) -> Descriptor {
2148        Descriptor {
2149            schema: SchemaRef::concrete(schema),
2150            layout: Layout {
2151                size: size_of::<NarrowNativeInts>(),
2152                align: align_of::<NarrowNativeInts>(),
2153            },
2154            access: Access::Record(RecordAccess {
2155                fields: vec![
2156                    FieldAccess {
2157                        offset: offset_of!(NarrowNativeInts, count),
2158                        descriptor: Descriptor {
2159                            schema: SchemaRef::concrete(primitive_id(Primitive::U64)),
2160                            layout: Layout {
2161                                size: size_of::<u32>(),
2162                                align: align_of::<u32>(),
2163                            },
2164                            access: Access::Scalar,
2165                        },
2166                        default: None,
2167                    },
2168                    FieldAccess {
2169                        offset: offset_of!(NarrowNativeInts, delta),
2170                        descriptor: Descriptor {
2171                            schema: SchemaRef::concrete(primitive_id(Primitive::I64)),
2172                            layout: Layout {
2173                                size: size_of::<i32>(),
2174                                align: align_of::<i32>(),
2175                            },
2176                            access: Access::Scalar,
2177                        },
2178                        default: None,
2179                    },
2180                ],
2181                construct: Construct::InPlace,
2182            }),
2183        }
2184    }
2185
2186    #[test]
2187    fn owned_vec_u32_roundtrips_and_matches_dynamic() {
2188        // Root type: List<u32> / Vec<u32>.
2189        let list = Schema {
2190            id: SchemaId(1),
2191            type_params: Vec::new(),
2192            kind: SchemaKind::List {
2193                element: SchemaRef::concrete(primitive_id(Primitive::U32)),
2194            },
2195        };
2196        let reg = Registry::new([list]);
2197
2198        let desc = vec_u32_descriptor(SchemaId(1));
2199
2200        let values = [1u32, 2, 999, 0xDEAD_BEEF];
2201
2202        // Oracle: the dynamic List<u32> codec over the equivalent array.
2203        let mut arr = VArray::new();
2204        for &v in &values {
2205            arr.push(Value::from(v));
2206        }
2207        let dyn_bytes = compact::to_bytes(&Value::from(arr), SchemaId(1), &reg).unwrap();
2208
2209        // Typed encode of a real Vec<u32> must produce identical bytes.
2210        let v: Vec<u32> = values.to_vec();
2211        let no_blocks = HashMap::new();
2212        let typed_bytes = unsafe {
2213            encode(
2214                core::ptr::from_ref(&v).cast::<u8>(),
2215                &desc,
2216                &no_blocks,
2217                &reg,
2218            )
2219        }
2220        .unwrap();
2221        assert_eq!(typed_bytes, dyn_bytes);
2222
2223        // Typed decode reconstructs the Vec.
2224        let mut slot = MaybeUninit::<Vec<u32>>::uninit();
2225        unsafe {
2226            decode(
2227                &typed_bytes,
2228                &desc,
2229                &no_blocks,
2230                &reg,
2231                slot.as_mut_ptr().cast::<u8>(),
2232            )
2233        }
2234        .unwrap();
2235        let back = unsafe { slot.assume_init() };
2236        assert_eq!(back, values.to_vec());
2237    }
2238
2239    #[test]
2240    // r[verify type-system.rust-subset]
2241    // r[verify exec.jit-optional]
2242    fn native_int_memops_roundtrip_and_reject_out_of_range_values() {
2243        let schema = SchemaId(1);
2244        let reg = Registry::new([narrow_native_int_schema(schema)]);
2245        let desc = narrow_native_int_descriptor(schema);
2246        let no_blocks = HashMap::new();
2247        let lowered = lower_typed(&desc, &no_blocks, &reg).unwrap();
2248
2249        assert_eq!(lowered.program.len(), 2);
2250        assert!(matches!(
2251            lowered.program[0],
2252            MemOp::NativeInt {
2253                mem_size: 4,
2254                signed: false,
2255                ..
2256            }
2257        ));
2258        assert!(matches!(
2259            lowered.program[1],
2260            MemOp::NativeInt {
2261                mem_size: 4,
2262                signed: true,
2263                ..
2264            }
2265        ));
2266
2267        let value = NarrowNativeInts {
2268            count: 0xCAFE_F00D,
2269            delta: -42,
2270        };
2271        let bytes = unsafe { encode_with(&lowered, core::ptr::from_ref(&value).cast::<u8>()) };
2272
2273        let mut expected = Vec::new();
2274        write_u64(&mut expected, u64::from(value.count));
2275        write_i64(&mut expected, i64::from(value.delta));
2276        assert_eq!(bytes, expected);
2277
2278        let mut slot = MaybeUninit::<NarrowNativeInts>::uninit();
2279        unsafe { decode_with(&lowered, &bytes, slot.as_mut_ptr().cast::<u8>()) }.unwrap();
2280        assert_eq!(unsafe { slot.assume_init() }, value);
2281
2282        let mut unsigned_out_of_range = Vec::new();
2283        write_u64(&mut unsigned_out_of_range, u64::from(u32::MAX) + 1);
2284        write_i64(&mut unsigned_out_of_range, 0);
2285        let mut slot = MaybeUninit::<NarrowNativeInts>::uninit();
2286        let err = unsafe {
2287            decode_with(
2288                &lowered,
2289                &unsigned_out_of_range,
2290                slot.as_mut_ptr().cast::<u8>(),
2291            )
2292        }
2293        .unwrap_err();
2294        assert!(matches!(
2295            err,
2296            CompactError::Decode(DecodeError::Malformed(
2297                "native-sized unsigned integer out of range"
2298            ))
2299        ));
2300
2301        let mut signed_out_of_range = Vec::new();
2302        write_u64(&mut signed_out_of_range, 0);
2303        write_i64(&mut signed_out_of_range, i64::from(i32::MIN) - 1);
2304        let mut slot = MaybeUninit::<NarrowNativeInts>::uninit();
2305        let err = unsafe {
2306            decode_with(
2307                &lowered,
2308                &signed_out_of_range,
2309                slot.as_mut_ptr().cast::<u8>(),
2310            )
2311        }
2312        .unwrap_err();
2313        assert!(matches!(
2314            err,
2315            CompactError::Decode(DecodeError::Malformed(
2316                "native-sized signed integer out of range"
2317            ))
2318        ));
2319    }
2320
2321    #[test]
2322    fn decode_compat_rejects_list_set_kind_mismatch() {
2323        let element = SchemaRef::concrete(primitive_id(Primitive::U32));
2324        let writer = Schema {
2325            id: SchemaId(1),
2326            type_params: Vec::new(),
2327            kind: SchemaKind::Set {
2328                element: element.clone(),
2329            },
2330        };
2331        let reader = Schema {
2332            id: SchemaId(2),
2333            type_params: Vec::new(),
2334            kind: SchemaKind::List { element },
2335        };
2336        let reg = Registry::new([writer, reader]);
2337        let desc = vec_u32_descriptor(SchemaId(2));
2338        let no_blocks = HashMap::new();
2339
2340        let typed = lower_decode(SchemaId(1), &desc, &no_blocks, &reg);
2341        assert!(
2342            matches!(typed, Err(CompactError::Incompatible(_))),
2343            "typed compat accepted Set writer for List reader: {typed:?}"
2344        );
2345
2346        let dynamic = crate::plan::build_plan(SchemaId(1), SchemaId(2), &reg);
2347        assert!(
2348            matches!(dynamic, Err(CompactError::Incompatible(_))),
2349            "dynamic compat unexpectedly accepted Set writer for List reader"
2350        );
2351    }
2352}