Skip to main content

phon_engine/
plan.rs

1//! Compatibility planning: translate a *writer* schema with a *reader* schema
2//! into a [`Plan`], then decode the writer's compact bytes into a
3//! reader-shaped [`Value`].
4//!
5//! The plan is built from the two schemas alone, before any payload is touched
6//! (`r[compat.plan-first]`): if it cannot be built the schemas are incompatible
7//! and decoding never begins. Struct fields are matched by name
8//! (`r[compat.field-matching]`) — writer-only fields are skipped
9//! (`r[compat.skip-writer-only]`), reader-only fields are defaulted or, when
10//! required, fail the plan (`r[compat.reader-only-fields]`). Enum variants are
11//! matched by name (`r[compat.enum]`). Types match only by the rules of
12//! `r[compat.type-match]` — no implicit numeric widening.
13//!
14//! This is the dynamic-`Value` path: reader-only fields default to `null` (the
15//! typed path will use a descriptor-supplied default instead). It builds on the
16//! compact codec's registry, resolution, and decoders.
17//!
18//! Spec: "Compatibility".
19
20use std::collections::{BTreeMap, HashMap, HashSet};
21
22use facet_value::{VArray, VObject, VString, Value};
23use phon_schema::bytes::Reader;
24use phon_schema::{
25    DecodeError, Field, Primitive, SchemaId, SchemaKind, SchemaRef, Variant, VariantPayload,
26    read_value,
27};
28
29use phon_ir::ir::{EnumArm, Op, Program, ValueProgram};
30
31use crate::compact::{self, CompactError, Registry, Resolved};
32use crate::compat::{self, FieldMatch, VariantMatch, incompatible};
33
34type Result<T> = core::result::Result<T, CompactError>;
35
36const MAX_DEPTH: usize = 128;
37
38// ============================================================================
39// Plan
40// ============================================================================
41
42/// A built translation plan from a writer schema to a reader schema. Build it
43/// once with [`build_plan`], then decode many messages with [`decode_with_plan`].
44pub struct Plan {
45    root: Node,
46    blocks: BTreeMap<SchemaId, Node>,
47}
48
49enum Node {
50    /// A primitive copied through (writer and reader primitive are identical).
51    Scalar(Primitive),
52    /// A back-edge into a recursive reader schema, resolved through
53    /// [`Plan::blocks`] at execution/lowering time.
54    CallBlock(SchemaId),
55    Struct(StructPlan),
56    /// Writer variant index -> how to produce the reader's variant. An index
57    /// absent here is a writer-only variant: a decode error if it arrives.
58    Enum(HashMap<u32, VariantPlan>),
59    Tuple(Vec<Node>),
60    /// A `list` (`set == false`) or `set` (`set == true`). `min_wire` is the
61    /// element's minimum wire size for the `r[validate.lengths]` count guard —
62    /// `0` for a zero-sized element (an empty struct, `unit`, …), else `1`.
63    Seq {
64        set: bool,
65        element: Box<Node>,
66        min_wire: usize,
67    },
68    Map {
69        key: Box<Node>,
70        value: Box<Node>,
71    },
72    /// A fixed-shape array. `min_wire` bounds `product(dims)` exactly as in `Seq`.
73    Array {
74        element: Box<Node>,
75        dims: Vec<u64>,
76        min_wire: usize,
77    },
78    Option(Box<Node>),
79    Dynamic,
80}
81
82struct StructPlan {
83    /// One step per writer field, in wire order.
84    steps: Vec<Step>,
85    /// Reader-only, non-required field names to fill with a default.
86    defaults: Vec<String>,
87}
88
89enum Step {
90    /// A writer field matched to this reader field; decode it with `node`.
91    Take { reader: String, node: Node },
92    /// A writer-only field: decode it by its writer schema and discard.
93    Skip(SchemaRef),
94}
95
96struct VariantPlan {
97    reader: String,
98    payload: Payload,
99}
100
101enum Payload {
102    Unit,
103    Newtype(Box<Node>),
104    Tuple(Vec<Node>),
105    Struct(StructPlan),
106}
107
108struct RecCtx {
109    recursive: HashSet<SchemaId>,
110    blocks: BTreeMap<SchemaId, Node>,
111    building: HashSet<SchemaId>,
112}
113
114// ============================================================================
115// Public API
116// ============================================================================
117
118#[derive(Clone, Copy, Debug, Eq, PartialEq)]
119pub enum CompatDirection {
120    /// The newer schema can read bytes written by the older schema.
121    Backward,
122    /// The older schema can read bytes written by the newer schema.
123    Forward,
124    /// Both schema versions can read each other's bytes.
125    Bidirectional,
126    /// Neither schema version can read the other's bytes.
127    Incompatible,
128}
129
130/// Build the translation plan from `writer_root` to `reader_root`.
131///
132/// # Errors
133/// [`CompactError::Incompatible`] (or a resolution error) if the schemas cannot
134/// be translated.
135// r[impl compat.plan-first]
136pub fn build_plan(writer_root: SchemaId, reader_root: SchemaId, reg: &Registry) -> Result<Plan> {
137    let mut rec = RecCtx {
138        recursive: recursive_schema_ids(reader_root, reg),
139        blocks: BTreeMap::new(),
140        building: HashSet::new(),
141    };
142    let root = plan_ref(
143        &SchemaRef::concrete(writer_root),
144        &SchemaRef::concrete(reader_root),
145        reg,
146        &mut rec,
147        0,
148    )?;
149    Ok(Plan {
150        root,
151        blocks: rec.blocks,
152    })
153}
154
155/// Classify compatibility between an older and newer schema by planning both
156/// directions. This is tooling over [`build_plan`], not a decode path.
157// r[impl compat.direction]
158#[must_use]
159pub fn compatibility_direction(
160    older_root: SchemaId,
161    newer_root: SchemaId,
162    reg: &Registry,
163) -> CompatDirection {
164    let backward = build_plan(older_root, newer_root, reg).is_ok();
165    let forward = build_plan(newer_root, older_root, reg).is_ok();
166    match (backward, forward) {
167        (true, true) => CompatDirection::Bidirectional,
168        (true, false) => CompatDirection::Backward,
169        (false, true) => CompatDirection::Forward,
170        (false, false) => CompatDirection::Incompatible,
171    }
172}
173
174/// Decode writer compact `bytes` into a reader-shaped value using a prebuilt plan.
175///
176/// # Errors
177/// [`CompactError`] for malformed input, or a writer-only enum variant.
178pub fn decode_with_plan(bytes: &[u8], plan: &Plan, reg: &Registry) -> Result<Value> {
179    let mut r = Reader::new(bytes);
180    let v = exec(&plan.root, &mut r, reg, &plan.blocks, 0)?;
181    if r.remaining() != 0 {
182        return Err(CompactError::Decode(DecodeError::TrailingBytes(
183            r.remaining(),
184        )));
185    }
186    Ok(v)
187}
188
189/// Build a plan and decode in one step.
190///
191/// # Errors
192/// As [`build_plan`] and [`decode_with_plan`].
193pub fn decode(
194    bytes: &[u8],
195    writer_root: SchemaId,
196    reader_root: SchemaId,
197    reg: &Registry,
198) -> Result<Value> {
199    let plan = build_plan(writer_root, reader_root, reg)?;
200    decode_with_plan(bytes, &plan, reg)
201}
202
203/// Build a plan, lower it to the linear IR, and run the interpreter — the flat
204/// counterpart to [`decode`]. The interpreter must produce the same value the
205/// recursive [`decode_with_plan`] would; the compat tests assert exactly that.
206///
207/// # Errors
208/// As [`build_plan`] and [`crate::interp::run`].
209pub fn decode_via_ir(
210    bytes: &[u8],
211    writer_root: SchemaId,
212    reader_root: SchemaId,
213    reg: &Registry,
214) -> Result<Value> {
215    let plan = build_plan(writer_root, reader_root, reg)?;
216    let program = lower(&plan);
217    crate::interp::run_lowered(&program, bytes, reg)
218}
219
220// ============================================================================
221// Building the plan
222// ============================================================================
223
224fn plan_ref(
225    w: &SchemaRef,
226    r: &SchemaRef,
227    reg: &Registry,
228    rec: &mut RecCtx,
229    depth: usize,
230) -> Result<Node> {
231    if depth > MAX_DEPTH {
232        return Err(incompatible("schema nests too deep"));
233    }
234    if let SchemaRef::Concrete { id, .. } = r
235        && rec.recursive.contains(id)
236    {
237        if !rec.blocks.contains_key(id) && !rec.building.contains(id) {
238            rec.building.insert(*id);
239            let body = plan_resolved(
240                compact::resolve(reg, w)?,
241                compact::resolve(reg, r)?,
242                reg,
243                rec,
244                depth,
245            )?;
246            rec.building.remove(id);
247            rec.blocks.insert(*id, body);
248        }
249        return Ok(Node::CallBlock(*id));
250    }
251    plan_resolved(
252        compact::resolve(reg, w)?,
253        compact::resolve(reg, r)?,
254        reg,
255        rec,
256        depth,
257    )
258}
259
260fn plan_resolved(
261    w: Resolved,
262    r: Resolved,
263    reg: &Registry,
264    rec: &mut RecCtx,
265    depth: usize,
266) -> Result<Node> {
267    match (w, r) {
268        (Resolved::Primitive(wp), Resolved::Primitive(rp)) => {
269            if wp == rp {
270                Ok(Node::Scalar(wp))
271            } else {
272                Err(incompatible(format!("primitive {wp:?} is not {rp:?}")))
273            }
274        }
275        (Resolved::Composite(wk), Resolved::Composite(rk)) => plan_kind(wk, rk, reg, rec, depth),
276        _ => Err(incompatible("primitive does not match composite")),
277    }
278}
279
280// r[impl compat.type-match]
281fn plan_kind(
282    wk: SchemaKind,
283    rk: SchemaKind,
284    reg: &Registry,
285    rec: &mut RecCtx,
286    depth: usize,
287) -> Result<Node> {
288    match (wk, rk) {
289        (SchemaKind::Primitive(wp), SchemaKind::Primitive(rp)) => {
290            if wp == rp {
291                Ok(Node::Scalar(wp))
292            } else {
293                Err(incompatible(format!("primitive {wp:?} is not {rp:?}")))
294            }
295        }
296        (SchemaKind::Struct { fields: wf, .. }, SchemaKind::Struct { fields: rf, .. }) => {
297            Ok(Node::Struct(plan_struct(&wf, &rf, reg, rec, depth)?))
298        }
299        (SchemaKind::Enum { variants: wv, .. }, SchemaKind::Enum { variants: rv, .. }) => {
300            plan_enum(&wv, &rv, reg, rec, depth)
301        }
302        (SchemaKind::Tuple { elements: we }, SchemaKind::Tuple { elements: re }) => {
303            if we.len() != re.len() {
304                return Err(incompatible("tuple arity differs"));
305            }
306            let mut nodes = Vec::with_capacity(we.len());
307            for (w, r) in we.iter().zip(&re) {
308                nodes.push(plan_ref(w, r, reg, rec, depth + 1)?);
309            }
310            Ok(Node::Tuple(nodes))
311        }
312        (SchemaKind::List { element: we }, SchemaKind::List { element: re }) => Ok(Node::Seq {
313            set: false,
314            min_wire: compact::min_wire_size_ref(reg, &we),
315            element: Box::new(plan_ref(&we, &re, reg, rec, depth + 1)?),
316        }),
317        (SchemaKind::Set { element: we }, SchemaKind::Set { element: re }) => Ok(Node::Seq {
318            set: true,
319            min_wire: compact::min_wire_size_ref(reg, &we),
320            element: Box::new(plan_ref(&we, &re, reg, rec, depth + 1)?),
321        }),
322        (SchemaKind::Option { element: we }, SchemaKind::Option { element: re }) => Ok(
323            Node::Option(Box::new(plan_ref(&we, &re, reg, rec, depth + 1)?)),
324        ),
325        (SchemaKind::Map { key: wk, value: wv }, SchemaKind::Map { key: rk, value: rv }) => {
326            Ok(Node::Map {
327                key: Box::new(plan_ref(&wk, &rk, reg, rec, depth + 1)?),
328                value: Box::new(plan_ref(&wv, &rv, reg, rec, depth + 1)?),
329            })
330        }
331        (
332            SchemaKind::Array {
333                element: we,
334                dimensions: wd,
335            },
336            SchemaKind::Array {
337                element: re,
338                dimensions: rd,
339            },
340        ) => {
341            if wd != rd {
342                return Err(incompatible("array dimensions differ"));
343            }
344            Ok(Node::Array {
345                min_wire: compact::min_wire_size_ref(reg, &we),
346                element: Box::new(plan_ref(&we, &re, reg, rec, depth + 1)?),
347                dims: wd,
348            })
349        }
350        (SchemaKind::Dynamic, SchemaKind::Dynamic) => Ok(Node::Dynamic),
351        (SchemaKind::Tensor { .. }, SchemaKind::Tensor { .. }) => {
352            Err(CompactError::Unsupported("tensor"))
353        }
354        (SchemaKind::Channel { .. }, SchemaKind::Channel { .. }) => {
355            Err(CompactError::Unsupported("channel"))
356        }
357        (SchemaKind::External { .. }, SchemaKind::External { .. }) => {
358            Err(CompactError::Unsupported("external"))
359        }
360        _ => Err(incompatible("schema kinds differ")),
361    }
362}
363
364// r[impl compat.field-matching]
365// r[impl compat.skip-writer-only]
366// r[impl compat.reader-only-fields]
367// r[impl compat.defaults-are-reader-side]
368fn plan_struct(
369    w_fields: &[Field],
370    r_fields: &[Field],
371    reg: &Registry,
372    rec: &mut RecCtx,
373    depth: usize,
374) -> Result<StructPlan> {
375    let mut steps = Vec::with_capacity(w_fields.len());
376    let mut defaults = Vec::new();
377    for step in compat::match_fields(
378        w_fields,
379        r_fields,
380        |_, rf| !rf.required,
381        |rf| {
382            incompatible(format!(
383                "required reader field '{}' is absent from the writer",
384                rf.name
385            ))
386        },
387    )? {
388        match step {
389            FieldMatch::Take {
390                writer,
391                reader_index,
392            } => {
393                let rf = &r_fields[reader_index];
394                let node = plan_ref(&writer.schema, &rf.schema, reg, rec, depth + 1)?;
395                steps.push(Step::Take {
396                    reader: rf.name.clone(),
397                    node,
398                });
399            }
400            FieldMatch::Skip { writer } => steps.push(Step::Skip(writer.schema.clone())),
401            FieldMatch::Default { reader_index } => {
402                defaults.push(r_fields[reader_index].name.clone());
403            }
404        }
405    }
406
407    Ok(StructPlan { steps, defaults })
408}
409
410// r[impl compat.enum]
411fn plan_enum(
412    w_variants: &[Variant],
413    r_variants: &[Variant],
414    reg: &Registry,
415    rec: &mut RecCtx,
416    depth: usize,
417) -> Result<Node> {
418    let mut by_index = HashMap::new();
419    for step in compat::match_variants(w_variants, r_variants) {
420        let VariantMatch::Take {
421            writer,
422            reader_index,
423        } = step
424        else {
425            continue;
426        };
427        let rv = &r_variants[reader_index];
428        let payload = plan_payload(&writer.payload, &rv.payload, reg, rec, depth)?;
429        by_index.insert(
430            writer.index,
431            VariantPlan {
432                reader: rv.name.clone(),
433                payload,
434            },
435        );
436    }
437    Ok(Node::Enum(by_index))
438}
439
440fn plan_payload(
441    w: &VariantPayload,
442    r: &VariantPayload,
443    reg: &Registry,
444    rec: &mut RecCtx,
445    depth: usize,
446) -> Result<Payload> {
447    match (w, r) {
448        (VariantPayload::Unit, VariantPayload::Unit) => Ok(Payload::Unit),
449        (VariantPayload::Newtype(wr), VariantPayload::Newtype(rr)) => Ok(Payload::Newtype(
450            Box::new(plan_ref(wr, rr, reg, rec, depth + 1)?),
451        )),
452        (VariantPayload::Tuple(wrs), VariantPayload::Tuple(rrs)) => {
453            if wrs.len() != rrs.len() {
454                return Err(incompatible("variant tuple arity differs"));
455            }
456            let mut nodes = Vec::with_capacity(wrs.len());
457            for (w, r) in wrs.iter().zip(rrs) {
458                nodes.push(plan_ref(w, r, reg, rec, depth + 1)?);
459            }
460            Ok(Payload::Tuple(nodes))
461        }
462        (VariantPayload::Struct(wfs), VariantPayload::Struct(rfs)) => {
463            Ok(Payload::Struct(plan_struct(wfs, rfs, reg, rec, depth)?))
464        }
465        _ => Err(incompatible("variant payload shapes differ")),
466    }
467}
468
469fn recursive_schema_ids(root: SchemaId, reg: &Registry) -> HashSet<SchemaId> {
470    let mut recursive = HashSet::new();
471    let mut visited = HashSet::new();
472    let mut on_stack = HashSet::new();
473    let mut stack = Vec::new();
474    visit_schema(
475        root,
476        reg,
477        &mut recursive,
478        &mut visited,
479        &mut on_stack,
480        &mut stack,
481    );
482    recursive
483}
484
485fn visit_schema(
486    id: SchemaId,
487    reg: &Registry,
488    recursive: &mut HashSet<SchemaId>,
489    visited: &mut HashSet<SchemaId>,
490    on_stack: &mut HashSet<SchemaId>,
491    stack: &mut Vec<SchemaId>,
492) {
493    visited.insert(id);
494    on_stack.insert(id);
495    stack.push(id);
496
497    if let Ok(Resolved::Composite(kind)) = compact::resolve(reg, &SchemaRef::concrete(id)) {
498        for target in ref_targets(&kind) {
499            if on_stack.contains(&target) {
500                for &candidate in stack.iter().rev() {
501                    recursive.insert(candidate);
502                    if candidate == target {
503                        break;
504                    }
505                }
506            } else if !visited.contains(&target) {
507                visit_schema(target, reg, recursive, visited, on_stack, stack);
508            }
509        }
510    }
511
512    stack.pop();
513    on_stack.remove(&id);
514}
515
516fn ref_targets(kind: &SchemaKind) -> Vec<SchemaId> {
517    let mut out = Vec::new();
518    match kind {
519        SchemaKind::Struct { fields, .. } => {
520            for f in fields {
521                add_ref_targets(&f.schema, &mut out);
522            }
523        }
524        SchemaKind::Enum { variants, .. } => {
525            for v in variants {
526                match &v.payload {
527                    VariantPayload::Unit => {}
528                    VariantPayload::Newtype(r) => add_ref_targets(r, &mut out),
529                    VariantPayload::Tuple(refs) => {
530                        for r in refs {
531                            add_ref_targets(r, &mut out);
532                        }
533                    }
534                    VariantPayload::Struct(fields) => {
535                        for f in fields {
536                            add_ref_targets(&f.schema, &mut out);
537                        }
538                    }
539                }
540            }
541        }
542        SchemaKind::Tuple { elements } => {
543            for r in elements {
544                add_ref_targets(r, &mut out);
545            }
546        }
547        SchemaKind::List { element }
548        | SchemaKind::Set { element }
549        | SchemaKind::Array { element, .. }
550        | SchemaKind::Tensor { element, .. }
551        | SchemaKind::Option { element }
552        | SchemaKind::Channel { element, .. } => add_ref_targets(element, &mut out),
553        SchemaKind::Map { key, value } => {
554            add_ref_targets(key, &mut out);
555            add_ref_targets(value, &mut out);
556        }
557        SchemaKind::External { metadata, .. } => {
558            if let Some(r) = metadata {
559                add_ref_targets(r, &mut out);
560            }
561        }
562        SchemaKind::Primitive(_) | SchemaKind::Dynamic => {}
563    }
564    out
565}
566
567fn add_ref_targets(r: &SchemaRef, out: &mut Vec<SchemaId>) {
568    if let SchemaRef::Concrete { id, args } = r {
569        out.push(*id);
570        for arg in args {
571            add_ref_targets(arg, out);
572        }
573    }
574}
575
576// ============================================================================
577// Lowering the plan to the linear IR
578// ============================================================================
579
580/// Flatten a built plan's `Node` tree into a linear [`Program`]. Every
581/// type-directed decision the tree encodes is resolved here, once; what the
582/// interpreter runs carries only data-directed control flow. A struct of structs
583/// of scalars lowers to a single branch-free run of ops.
584// r[impl ir.two-forms]
585#[must_use]
586pub fn lower(plan: &Plan) -> ValueProgram {
587    let mut out = Vec::new();
588    lower_node(&plan.root, &mut out);
589    let blocks = plan
590        .blocks
591        .iter()
592        .map(|(id, node)| (*id, lower_subtree(node)))
593        .collect();
594    ValueProgram {
595        program: out,
596        blocks,
597    }
598}
599
600fn lower_subtree(node: &Node) -> Program {
601    let mut out = Vec::new();
602    lower_node(node, &mut out);
603    out
604}
605
606/// Append the ops for `node`. A complete node nets one value on the stack.
607fn lower_node(node: &Node, out: &mut Program) {
608    match node {
609        Node::Scalar(p) => out.push(Op::Scalar(*p)),
610        Node::Dynamic => out.push(Op::Dynamic),
611        Node::CallBlock(schema) => out.push(Op::CallBlock { schema: *schema }),
612        Node::Struct(sp) => lower_struct(sp, out),
613        Node::Enum(by_index) => {
614            let mut arms: Vec<EnumArm> = by_index
615                .iter()
616                .map(|(idx, vp)| EnumArm {
617                    writer_index: *idx,
618                    reader_name: vp.reader.clone(),
619                    payload: lower_payload(&vp.payload),
620                })
621                .collect();
622            // Deterministic order; the interpreter dispatches by writer_index.
623            arms.sort_by_key(|a| a.writer_index);
624            out.push(Op::Enum { arms });
625        }
626        Node::Tuple(nodes) => {
627            for n in nodes {
628                lower_node(n, out);
629            }
630            out.push(Op::Array { count: nodes.len() });
631        }
632        Node::Seq {
633            set,
634            element,
635            min_wire,
636        } => out.push(Op::Seq {
637            set: *set,
638            min_wire: *min_wire,
639            body: lower_subtree(element),
640        }),
641        Node::Map { key, value } => out.push(Op::Map {
642            key: lower_subtree(key),
643            value: lower_subtree(value),
644        }),
645        Node::Array {
646            element,
647            dims,
648            min_wire,
649        } => out.push(Op::FixedArray {
650            dimensions: dims.clone(),
651            min_wire: *min_wire,
652            body: lower_subtree(element),
653        }),
654        Node::Option(element) => out.push(Op::Option {
655            some: lower_subtree(element),
656        }),
657    }
658}
659
660/// Each writer field in wire order (a matched field's value, or a skip for a
661/// writer-only one), then a null per reader-only default, then assemble the
662/// object. The `keys` track the stack values the `Object` op will name.
663///
664/// A field that is itself a fixed structure splices its ops inline here (via
665/// `lower_node`); only dynamic-length and branching children become sub-programs.
666// r[impl ir.inlining]
667fn lower_struct(sp: &StructPlan, out: &mut Program) {
668    let mut keys = Vec::new();
669    for step in &sp.steps {
670        match step {
671            Step::Take { reader, node } => {
672                lower_node(node, out);
673                keys.push(reader.clone());
674            }
675            Step::Skip(writer_ref) => out.push(Op::Skip(writer_ref.clone())),
676        }
677    }
678    for name in &sp.defaults {
679        out.push(Op::Null);
680        keys.push(name.clone());
681    }
682    out.push(Op::Object { keys });
683}
684
685fn lower_payload(payload: &Payload) -> Program {
686    let mut out = Vec::new();
687    match payload {
688        Payload::Unit => out.push(Op::Null),
689        Payload::Newtype(node) => lower_node(node, &mut out),
690        Payload::Tuple(nodes) => {
691            for n in nodes {
692                lower_node(n, &mut out);
693            }
694            out.push(Op::Array { count: nodes.len() });
695        }
696        Payload::Struct(sp) => lower_struct(sp, &mut out),
697    }
698    out
699}
700
701// ============================================================================
702// Executing the plan
703// ============================================================================
704
705fn exec(
706    node: &Node,
707    r: &mut Reader,
708    reg: &Registry,
709    blocks: &BTreeMap<SchemaId, Node>,
710    depth: usize,
711) -> Result<Value> {
712    if depth > MAX_DEPTH {
713        return Err(CompactError::Decode(DecodeError::DepthExceeded));
714    }
715    match node {
716        Node::Scalar(p) => compact::decode_primitive(r, *p),
717        Node::CallBlock(schema) => {
718            let block = blocks
719                .get(schema)
720                .ok_or(CompactError::Decode(DecodeError::Malformed(
721                    "missing recursion block",
722                )))?;
723            exec(block, r, reg, blocks, depth + 1)
724        }
725        Node::Struct(sp) => exec_struct(sp, r, reg, blocks, depth),
726        Node::Enum(by_index) => {
727            let idx = r.read_u32()?;
728            let v = by_index
729                .get(&idx)
730                .ok_or(CompactError::WriterOnlyVariant(idx))?;
731            let payload = exec_payload(&v.payload, r, reg, blocks, depth)?;
732            let mut obj = VObject::new();
733            obj.insert(VString::new(&v.reader), payload);
734            Ok(obj.into())
735        }
736        Node::Tuple(nodes) => {
737            let mut a = VArray::new();
738            for n in nodes {
739                a.push(exec(n, r, reg, blocks, depth + 1)?);
740            }
741            Ok(a.into())
742        }
743        Node::Seq {
744            set,
745            element,
746            min_wire,
747        } => {
748            let n = r.read_len(*min_wire)?;
749            let mut a = VArray::new();
750            let mut seen = if *set { Some(HashSet::new()) } else { None };
751            for _ in 0..n {
752                let v = exec(element, r, reg, blocks, depth + 1)?;
753                if let Some(s) = &mut seen
754                    && !s.insert(v.clone())
755                {
756                    return Err(CompactError::Decode(DecodeError::DuplicateElement));
757                }
758                a.push(v);
759            }
760            Ok(a.into())
761        }
762        Node::Map { key, value } => {
763            let n = r.read_len(1)?;
764            let mut obj = VObject::new();
765            for _ in 0..n {
766                let k = exec(key, r, reg, blocks, depth + 1)?;
767                let v = exec(value, r, reg, blocks, depth + 1)?;
768                let ks = k
769                    .as_string()
770                    .ok_or(CompactError::Unsupported("map with non-string keys"))?;
771                if obj.insert(VString::new(ks.as_str()), v).is_some() {
772                    return Err(CompactError::Decode(DecodeError::DuplicateKey));
773                }
774            }
775            Ok(obj.into())
776        }
777        Node::Array {
778            element,
779            dims,
780            min_wire,
781        } => {
782            let count = compact::product(dims)?;
783            compact::check_fixed_count(count, *min_wire, r.remaining())?;
784            let mut a = VArray::new();
785            for _ in 0..count {
786                a.push(exec(element, r, reg, blocks, depth + 1)?);
787            }
788            Ok(a.into())
789        }
790        Node::Option(element) => match r.read_u8()? {
791            0 => Ok(Value::NULL),
792            1 => exec(element, r, reg, blocks, depth + 1),
793            b => Err(CompactError::Decode(DecodeError::InvalidBool(b))),
794        },
795        Node::Dynamic => Ok(read_value(r)?),
796    }
797}
798
799fn exec_struct(
800    sp: &StructPlan,
801    r: &mut Reader,
802    reg: &Registry,
803    blocks: &BTreeMap<SchemaId, Node>,
804    depth: usize,
805) -> Result<Value> {
806    let mut obj = VObject::new();
807    for step in &sp.steps {
808        match step {
809            Step::Take { reader, node } => {
810                let v = exec(node, r, reg, blocks, depth + 1)?;
811                obj.insert(VString::new(reader), v);
812            }
813            Step::Skip(writer_ref) => {
814                // Walk the writer field by its own schema and discard it.
815                compact::decode_ref(r, writer_ref, reg, depth + 1)?;
816            }
817        }
818    }
819    for name in &sp.defaults {
820        obj.insert(VString::new(name), Value::NULL);
821    }
822    Ok(obj.into())
823}
824
825fn exec_payload(
826    p: &Payload,
827    r: &mut Reader,
828    reg: &Registry,
829    blocks: &BTreeMap<SchemaId, Node>,
830    depth: usize,
831) -> Result<Value> {
832    match p {
833        Payload::Unit => Ok(Value::NULL),
834        Payload::Newtype(n) => exec(n, r, reg, blocks, depth + 1),
835        Payload::Tuple(ns) => {
836            let mut a = VArray::new();
837            for n in ns {
838                a.push(exec(n, r, reg, blocks, depth + 1)?);
839            }
840            Ok(a.into())
841        }
842        Payload::Struct(sp) => exec_struct(sp, r, reg, blocks, depth),
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849    use crate::compact;
850    use phon_schema::{Schema, primitive_id};
851
852    fn prim(p: Primitive) -> SchemaRef {
853        SchemaRef::concrete(primitive_id(p))
854    }
855
856    fn schema(id: u64, kind: SchemaKind) -> Schema {
857        Schema {
858            id: SchemaId(id),
859            type_params: Vec::new(),
860            kind,
861        }
862    }
863
864    fn field(name: &str, schema: SchemaRef, required: bool) -> Field {
865        Field {
866            name: name.to_string(),
867            schema,
868            required,
869        }
870    }
871
872    fn point_struct(id: u64, fields: Vec<Field>) -> Schema {
873        schema(
874            id,
875            SchemaKind::Struct {
876                name: "P".to_string(),
877                fields,
878            },
879        )
880    }
881
882    fn obj(entries: &[(&str, Value)]) -> Value {
883        let mut o = VObject::new();
884        for (k, v) in entries {
885            o.insert(VString::new(k), v.clone());
886        }
887        o.into()
888    }
889
890    /// Decode through both the recursive `exec` and the lowered IR, assert they
891    /// agree, and return the value. `exec` is the oracle for the interpreter.
892    fn decode_both(bytes: &[u8], w: SchemaId, r: SchemaId, reg: &Registry) -> Value {
893        let recursive = decode(bytes, w, r, reg).unwrap();
894        let flat = decode_via_ir(bytes, w, r, reg).unwrap();
895        assert_eq!(
896            recursive, flat,
897            "IR interpreter disagreed with recursive exec"
898        );
899        recursive
900    }
901
902    // r[verify compat.field-matching]
903    #[test]
904    fn field_reorder_is_transparent() {
905        // writer: { x: u32, y: u32 }; reader: { y: u32, x: u32 }
906        let writer = point_struct(
907            1,
908            vec![
909                field("x", prim(Primitive::U32), true),
910                field("y", prim(Primitive::U32), true),
911            ],
912        );
913        let reader = point_struct(
914            2,
915            vec![
916                field("y", prim(Primitive::U32), true),
917                field("x", prim(Primitive::U32), true),
918            ],
919        );
920        let reg = Registry::new([writer, reader]);
921        let bytes = compact::to_bytes(
922            &obj(&[("x", Value::from(7u32)), ("y", Value::from(9u32))]),
923            SchemaId(1),
924            &reg,
925        )
926        .unwrap();
927        let got = decode_both(&bytes, SchemaId(1), SchemaId(2), &reg);
928        assert_eq!(
929            got,
930            obj(&[("x", Value::from(7u32)), ("y", Value::from(9u32))])
931        );
932    }
933
934    // r[verify compat.skip-writer-only]
935    #[test]
936    fn writer_only_field_is_skipped() {
937        // writer: { x: u32, gone: string }; reader: { x: u32 }
938        let writer = point_struct(
939            1,
940            vec![
941                field("x", prim(Primitive::U32), true),
942                field("gone", prim(Primitive::String), true),
943            ],
944        );
945        let reader = point_struct(2, vec![field("x", prim(Primitive::U32), true)]);
946        let reg = Registry::new([writer, reader]);
947        let bytes = compact::to_bytes(
948            &obj(&[("x", Value::from(7u32)), ("gone", Value::from("bye"))]),
949            SchemaId(1),
950            &reg,
951        )
952        .unwrap();
953        let got = decode_both(&bytes, SchemaId(1), SchemaId(2), &reg);
954        assert_eq!(got, obj(&[("x", Value::from(7u32))]));
955    }
956
957    // r[verify compat.plan-first]
958    // r[verify compat.reader-only-fields]
959    // r[verify compat.defaults-are-reader-side]
960    #[test]
961    fn reader_only_field_defaults_or_fails() {
962        // writer: { x: u32 }; reader: { x: u32, extra: u32 }
963        let writer = point_struct(1, vec![field("x", prim(Primitive::U32), true)]);
964        let optional = point_struct(
965            2,
966            vec![
967                field("x", prim(Primitive::U32), true),
968                field("extra", prim(Primitive::U32), false), // non-required -> default
969            ],
970        );
971        let required = point_struct(
972            3,
973            vec![
974                field("x", prim(Primitive::U32), true),
975                field("extra", prim(Primitive::U32), true), // required -> plan fails
976            ],
977        );
978        let reg = Registry::new([writer, optional, required]);
979        let bytes =
980            compact::to_bytes(&obj(&[("x", Value::from(7u32))]), SchemaId(1), &reg).unwrap();
981
982        let got = decode_both(&bytes, SchemaId(1), SchemaId(2), &reg);
983        assert_eq!(
984            got,
985            obj(&[("x", Value::from(7u32)), ("extra", Value::NULL)])
986        );
987
988        assert!(matches!(
989            build_plan(SchemaId(1), SchemaId(3), &reg),
990            Err(CompactError::Incompatible(_))
991        ));
992        assert!(matches!(
993            decode_via_ir(&bytes, SchemaId(1), SchemaId(3), &reg),
994            Err(CompactError::Incompatible(_))
995        ));
996    }
997
998    // r[verify compat.type-match]
999    #[test]
1000    fn numeric_widening_is_not_implicit() {
1001        let writer = schema(
1002            1,
1003            SchemaKind::List {
1004                element: prim(Primitive::U32),
1005            },
1006        );
1007        let reader = schema(
1008            2,
1009            SchemaKind::List {
1010                element: prim(Primitive::U64),
1011            },
1012        );
1013        let reg = Registry::new([writer, reader]);
1014        assert!(matches!(
1015            build_plan(SchemaId(1), SchemaId(2), &reg),
1016            Err(CompactError::Incompatible(_))
1017        ));
1018        assert!(matches!(
1019            decode_via_ir(&[], SchemaId(1), SchemaId(2), &reg),
1020            Err(CompactError::Incompatible(_))
1021        ));
1022    }
1023
1024    // r[verify compat.enum]
1025    #[test]
1026    fn enum_variant_added_and_removed() {
1027        // writer enum { A, B(u32) }; reader enum { A, B(u32), C } (C added).
1028        let writer = schema(
1029            1,
1030            SchemaKind::Enum {
1031                name: "E".to_string(),
1032                variants: vec![
1033                    Variant {
1034                        name: "A".to_string(),
1035                        index: 0,
1036                        payload: VariantPayload::Unit,
1037                    },
1038                    Variant {
1039                        name: "B".to_string(),
1040                        index: 1,
1041                        payload: VariantPayload::Newtype(prim(Primitive::U32)),
1042                    },
1043                ],
1044            },
1045        );
1046        let reader_more = schema(
1047            2,
1048            SchemaKind::Enum {
1049                name: "E".to_string(),
1050                variants: vec![
1051                    Variant {
1052                        name: "A".to_string(),
1053                        index: 0,
1054                        payload: VariantPayload::Unit,
1055                    },
1056                    Variant {
1057                        name: "B".to_string(),
1058                        index: 1,
1059                        payload: VariantPayload::Newtype(prim(Primitive::U32)),
1060                    },
1061                    Variant {
1062                        name: "C".to_string(),
1063                        index: 2,
1064                        payload: VariantPayload::Unit,
1065                    },
1066                ],
1067            },
1068        );
1069        // reader that lacks B: receiving B at runtime is a decode error.
1070        let reader_fewer = schema(
1071            3,
1072            SchemaKind::Enum {
1073                name: "E".to_string(),
1074                variants: vec![Variant {
1075                    name: "A".to_string(),
1076                    index: 0,
1077                    payload: VariantPayload::Unit,
1078                }],
1079            },
1080        );
1081        let reg = Registry::new([writer, reader_more, reader_fewer]);
1082
1083        let b = obj(&[("B", Value::from(42u32))]);
1084        let bytes = compact::to_bytes(&b, SchemaId(1), &reg).unwrap();
1085
1086        // reader_more can read B fine (C just goes unused).
1087        assert_eq!(decode_both(&bytes, SchemaId(1), SchemaId(2), &reg), b);
1088
1089        // reader_fewer plans (A matches), but receiving B is a decode error.
1090        assert!(matches!(
1091            decode(&bytes, SchemaId(1), SchemaId(3), &reg),
1092            Err(CompactError::WriterOnlyVariant(1))
1093        ));
1094        assert!(matches!(
1095            decode_via_ir(&bytes, SchemaId(1), SchemaId(3), &reg),
1096            Err(CompactError::WriterOnlyVariant(1))
1097        ));
1098        // an A value still decodes against reader_fewer.
1099        let a = obj(&[("A", Value::NULL)]);
1100        let a_bytes = compact::to_bytes(&a, SchemaId(1), &reg).unwrap();
1101        assert_eq!(decode_both(&a_bytes, SchemaId(1), SchemaId(3), &reg), a);
1102    }
1103
1104    #[test]
1105    fn nested_struct_compat() {
1106        // Inner differs (field added); Outer holds an Inner.
1107        let w_inner = point_struct(10, vec![field("a", prim(Primitive::U32), true)]);
1108        let r_inner = point_struct(
1109            20,
1110            vec![
1111                field("a", prim(Primitive::U32), true),
1112                field("b", prim(Primitive::Bool), false),
1113            ],
1114        );
1115        let w_outer = schema(
1116            1,
1117            SchemaKind::Struct {
1118                name: "Outer".to_string(),
1119                fields: vec![field("inner", SchemaRef::concrete(SchemaId(10)), true)],
1120            },
1121        );
1122        let r_outer = schema(
1123            2,
1124            SchemaKind::Struct {
1125                name: "Outer".to_string(),
1126                fields: vec![field("inner", SchemaRef::concrete(SchemaId(20)), true)],
1127            },
1128        );
1129        let reg = Registry::new([w_inner, r_inner, w_outer, r_outer]);
1130        let bytes = compact::to_bytes(
1131            &obj(&[("inner", obj(&[("a", Value::from(5u32))]))]),
1132            SchemaId(1),
1133            &reg,
1134        )
1135        .unwrap();
1136        let got = decode_both(&bytes, SchemaId(1), SchemaId(2), &reg);
1137        assert_eq!(
1138            got,
1139            obj(&[(
1140                "inner",
1141                obj(&[("a", Value::from(5u32)), ("b", Value::NULL)])
1142            )])
1143        );
1144    }
1145
1146    // r[verify compat.direction]
1147    #[test]
1148    fn compatibility_direction_reports_both_ways() {
1149        let older = point_struct(1, vec![field("x", prim(Primitive::U32), true)]);
1150        let newer_optional = point_struct(
1151            2,
1152            vec![
1153                field("x", prim(Primitive::U32), true),
1154                field("y", prim(Primitive::U32), false),
1155            ],
1156        );
1157        let newer_required = point_struct(
1158            3,
1159            vec![
1160                field("x", prim(Primitive::U32), true),
1161                field("y", prim(Primitive::U32), true),
1162            ],
1163        );
1164        let different = point_struct(4, vec![field("x", prim(Primitive::U64), true)]);
1165        let reg = Registry::new([older, newer_optional, newer_required, different]);
1166
1167        assert_eq!(
1168            compatibility_direction(SchemaId(1), SchemaId(2), &reg),
1169            CompatDirection::Bidirectional
1170        );
1171        assert_eq!(
1172            compatibility_direction(SchemaId(1), SchemaId(3), &reg),
1173            CompatDirection::Forward
1174        );
1175        assert_eq!(
1176            compatibility_direction(SchemaId(1), SchemaId(4), &reg),
1177            CompatDirection::Incompatible
1178        );
1179    }
1180
1181    /// Regression (found by `tests/compat_fuzz.rs`): a `list` of *zero-sized*
1182    /// elements (an empty struct) encodes to nothing but its count, so after the
1183    /// count is read the buffer is empty. The `r[validate.lengths]` guard wrongly
1184    /// rejected this — it assumed every element costs at least one wire byte —
1185    /// even at writer==reader. The element's true minimum wire size (0 here) now
1186    /// flows into the guard, which falls back to a fixed count cap for zero-sized
1187    /// elements. Both decode paths must accept `[{}, {}, {}]`.
1188    #[test]
1189    fn list_of_zero_sized_structs_decodes() {
1190        // Inner = empty struct; List<Inner>.
1191        let inner = point_struct(1, vec![]);
1192        let list = schema(
1193            2,
1194            SchemaKind::List {
1195                element: SchemaRef::concrete(SchemaId(1)),
1196            },
1197        );
1198        let reg = Registry::new([inner, list]);
1199
1200        // [ {}, {}, {} ] — three empty structs, zero payload bytes each.
1201        let mut arr = VArray::new();
1202        for _ in 0..3 {
1203            arr.push(obj(&[]));
1204        }
1205        let value = Value::from(arr);
1206        let bytes = compact::to_bytes(&value, SchemaId(2), &reg).unwrap();
1207        // The whole message is just the u32 count: 4 bytes.
1208        assert_eq!(bytes.len(), 4);
1209
1210        let got = decode_both(&bytes, SchemaId(2), SchemaId(2), &reg);
1211        assert_eq!(got, value);
1212    }
1213
1214    /// Regression: a fixed `array` of zero-sized elements has element count from
1215    /// the schema (here `[3]`) but zero wire bytes, so the in-decoder count bound
1216    /// (`count > remaining`) wrongly rejected it. The fixed-count check now uses
1217    /// the same zero-sized fallback as the wire-driven path.
1218    #[test]
1219    fn array_of_zero_sized_units_decodes() {
1220        // Array<unit, 3> — three units, zero payload bytes total.
1221        let arr_schema = schema(
1222            1,
1223            SchemaKind::Array {
1224                element: prim(Primitive::Unit),
1225                dimensions: vec![3],
1226            },
1227        );
1228        let reg = Registry::new([arr_schema]);
1229        let mut arr = VArray::new();
1230        for _ in 0..3 {
1231            arr.push(Value::NULL);
1232        }
1233        let value = Value::from(arr);
1234        let bytes = compact::to_bytes(&value, SchemaId(1), &reg).unwrap();
1235        assert_eq!(bytes.len(), 0);
1236
1237        let got = decode_both(&bytes, SchemaId(1), SchemaId(1), &reg);
1238        assert_eq!(got, value);
1239    }
1240}