Skip to main content

codec/
delta.rs

1//! Delta snapshot encoding/decoding.
2
3use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14    ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15    write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16    FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20/// Selects the latest baseline tick at or before the ack tick.
21#[must_use]
22pub fn select_baseline_tick<T>(
23    store: &BaselineStore<T>,
24    ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26    store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29/// Encodes a delta snapshot into the provided output buffer.
30///
31/// This is the scan-based path and is kept as a convenience/fallback. For
32/// production engines with dirty/change lists, prefer `encode_delta_from_changes`.
33/// Baseline and current snapshots must have entities sorted by `EntityId`.
34pub fn encode_delta_snapshot(
35    schema: &schema::Schema,
36    tick: SnapshotTick,
37    baseline_tick: SnapshotTick,
38    baseline: &Snapshot,
39    current: &Snapshot,
40    limits: &CodecLimits,
41    out: &mut [u8],
42) -> CodecResult<usize> {
43    let mut scratch = CodecScratch::default();
44    encode_delta_snapshot_with_scratch(
45        schema,
46        tick,
47        baseline_tick,
48        baseline,
49        current,
50        limits,
51        &mut scratch,
52        out,
53    )
54}
55
56/// Encodes a delta snapshot using reusable scratch buffers.
57#[allow(clippy::too_many_arguments)]
58pub fn encode_delta_snapshot_with_scratch(
59    schema: &schema::Schema,
60    tick: SnapshotTick,
61    baseline_tick: SnapshotTick,
62    baseline: &Snapshot,
63    current: &Snapshot,
64    limits: &CodecLimits,
65    scratch: &mut CodecScratch,
66    out: &mut [u8],
67) -> CodecResult<usize> {
68    encode_delta_snapshot_with_scratch_mode(
69        schema,
70        tick,
71        baseline_tick,
72        baseline,
73        current,
74        limits,
75        scratch,
76        out,
77        EncodeUpdateMode::Auto,
78    )
79}
80
81/// Encodes a delta snapshot for a client-specific view.
82///
83/// This assumes `baseline` and `current` are already filtered for the client interest set.
84/// Updates are encoded in sparse mode to optimize for small per-client packets.
85pub fn encode_delta_snapshot_for_client(
86    schema: &schema::Schema,
87    tick: SnapshotTick,
88    baseline_tick: SnapshotTick,
89    baseline: &Snapshot,
90    current: &Snapshot,
91    limits: &CodecLimits,
92    out: &mut [u8],
93) -> CodecResult<usize> {
94    let mut scratch = CodecScratch::default();
95    encode_delta_snapshot_for_client_with_scratch(
96        schema,
97        tick,
98        baseline_tick,
99        baseline,
100        current,
101        limits,
102        &mut scratch,
103        out,
104    )
105}
106
107/// Encoder context for delta packets that use caller-provided change lists.
108///
109/// This avoids scanning baseline/current snapshots and is preferred for
110/// production engines that already track dirty state.
111pub struct SessionEncoder<'a> {
112    schema: &'a schema::Schema,
113    limits: &'a CodecLimits,
114    #[allow(dead_code)]
115    scratch: CodecScratch,
116}
117
118impl<'a> SessionEncoder<'a> {
119    #[must_use]
120    pub fn new(schema: &'a schema::Schema, limits: &'a CodecLimits) -> Self {
121        Self {
122            schema,
123            limits,
124            scratch: CodecScratch::default(),
125        }
126    }
127
128    #[must_use]
129    pub fn schema(&self) -> &'a schema::Schema {
130        self.schema
131    }
132
133    #[must_use]
134    pub fn limits(&self) -> &'a CodecLimits {
135        self.limits
136    }
137}
138
139/// Encodes a delta snapshot from precomputed change lists.
140///
141/// Preferred for production engines (dirty/change-list driven). The scan-based
142/// encode paths remain as a convenience/fallback.
143pub fn encode_delta_from_changes(
144    session: &mut SessionEncoder<'_>,
145    tick: SnapshotTick,
146    baseline_tick: SnapshotTick,
147    creates: &[EntitySnapshot],
148    destroys: &[EntityId],
149    updates: &[DeltaUpdateEntity],
150    out: &mut [u8],
151) -> CodecResult<usize> {
152    encode_delta_snapshot_from_updates(
153        session.schema,
154        tick,
155        baseline_tick,
156        destroys,
157        creates,
158        updates,
159        session.limits,
160        out,
161    )
162}
163
164/// Encodes a delta snapshot using precomputed change lists.
165///
166/// This is a lower-level helper. Prefer `encode_delta_from_changes` with a
167/// `SessionEncoder` for production use.
168#[allow(clippy::too_many_arguments)]
169pub fn encode_delta_snapshot_from_updates(
170    schema: &schema::Schema,
171    tick: SnapshotTick,
172    baseline_tick: SnapshotTick,
173    destroys: &[EntityId],
174    creates: &[EntitySnapshot],
175    updates: &[DeltaUpdateEntity],
176    limits: &CodecLimits,
177    out: &mut [u8],
178) -> CodecResult<usize> {
179    if out.len() < wire::HEADER_SIZE {
180        return Err(CodecError::OutputTooSmall {
181            needed: wire::HEADER_SIZE,
182            available: out.len(),
183        });
184    }
185    let payload_len = encode_delta_payload_from_updates(
186        schema,
187        destroys,
188        creates,
189        updates,
190        limits,
191        &mut out[wire::HEADER_SIZE..],
192    )?;
193    let header = wire::PacketHeader::delta_snapshot(
194        schema_hash(schema),
195        tick.raw(),
196        baseline_tick.raw(),
197        payload_len as u32,
198    );
199    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
200        CodecError::OutputTooSmall {
201            needed: wire::HEADER_SIZE,
202            available: out.len(),
203        }
204    })?;
205    Ok(wire::HEADER_SIZE + payload_len)
206}
207
208/// Encodes a client delta snapshot using a compact session header.
209#[allow(clippy::too_many_arguments)]
210pub fn encode_delta_snapshot_for_client_session(
211    schema: &schema::Schema,
212    tick: SnapshotTick,
213    baseline_tick: SnapshotTick,
214    baseline: &Snapshot,
215    current: &Snapshot,
216    limits: &CodecLimits,
217    last_tick: &mut SnapshotTick,
218    out: &mut [u8],
219) -> CodecResult<usize> {
220    let mut scratch = CodecScratch::default();
221    encode_delta_snapshot_for_client_session_with_scratch(
222        schema,
223        tick,
224        baseline_tick,
225        baseline,
226        current,
227        limits,
228        &mut scratch,
229        last_tick,
230        out,
231    )
232}
233
234/// Encodes a delta snapshot for a client-specific view using reusable scratch buffers.
235#[allow(clippy::too_many_arguments)]
236pub fn encode_delta_snapshot_for_client_with_scratch(
237    schema: &schema::Schema,
238    tick: SnapshotTick,
239    baseline_tick: SnapshotTick,
240    baseline: &Snapshot,
241    current: &Snapshot,
242    limits: &CodecLimits,
243    scratch: &mut CodecScratch,
244    out: &mut [u8],
245) -> CodecResult<usize> {
246    encode_delta_snapshot_with_scratch_mode(
247        schema,
248        tick,
249        baseline_tick,
250        baseline,
251        current,
252        limits,
253        scratch,
254        out,
255        EncodeUpdateMode::Sparse,
256    )
257}
258
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260enum EncodeUpdateMode {
261    Auto,
262    Sparse,
263}
264
265/// Encodes a client delta snapshot using a compact session header.
266#[allow(clippy::too_many_arguments)]
267pub fn encode_delta_snapshot_for_client_session_with_scratch(
268    schema: &schema::Schema,
269    tick: SnapshotTick,
270    baseline_tick: SnapshotTick,
271    baseline: &Snapshot,
272    current: &Snapshot,
273    limits: &CodecLimits,
274    scratch: &mut CodecScratch,
275    last_tick: &mut SnapshotTick,
276    out: &mut [u8],
277) -> CodecResult<usize> {
278    let max_header = wire::SESSION_MAX_HEADER_SIZE;
279    if out.len() < max_header {
280        return Err(CodecError::OutputTooSmall {
281            needed: max_header,
282            available: out.len(),
283        });
284    }
285    if tick.raw() <= last_tick.raw() {
286        return Err(CodecError::InvalidEntityOrder {
287            previous: last_tick.raw(),
288            current: tick.raw(),
289        });
290    }
291    if baseline_tick.raw() > tick.raw() {
292        return Err(CodecError::BaselineTickMismatch {
293            expected: baseline_tick.raw(),
294            found: tick.raw(),
295        });
296    }
297    let payload_len = encode_delta_payload_with_mode(
298        schema,
299        baseline_tick,
300        baseline,
301        current,
302        limits,
303        scratch,
304        &mut out[max_header..],
305        EncodeUpdateMode::Sparse,
306    )?;
307    let tick_delta = tick.raw() - last_tick.raw();
308    let baseline_delta = tick.raw() - baseline_tick.raw();
309    let header_len = wire::encode_session_header(
310        &mut out[..max_header],
311        wire::SessionFlags::delta_snapshot(),
312        tick_delta,
313        baseline_delta,
314        payload_len as u32,
315    )
316    .map_err(|_| CodecError::OutputTooSmall {
317        needed: max_header,
318        available: out.len(),
319    })?;
320    if header_len < max_header {
321        let payload_start = max_header;
322        let payload_end = max_header + payload_len;
323        out.copy_within(payload_start..payload_end, header_len);
324    }
325    *last_tick = tick;
326    Ok(header_len + payload_len)
327}
328
329#[allow(clippy::too_many_arguments)]
330fn encode_delta_snapshot_with_scratch_mode(
331    schema: &schema::Schema,
332    tick: SnapshotTick,
333    baseline_tick: SnapshotTick,
334    baseline: &Snapshot,
335    current: &Snapshot,
336    limits: &CodecLimits,
337    scratch: &mut CodecScratch,
338    out: &mut [u8],
339    mode: EncodeUpdateMode,
340) -> CodecResult<usize> {
341    if out.len() < wire::HEADER_SIZE {
342        return Err(CodecError::OutputTooSmall {
343            needed: wire::HEADER_SIZE,
344            available: out.len(),
345        });
346    }
347    let payload_len = encode_delta_payload_with_mode(
348        schema,
349        baseline_tick,
350        baseline,
351        current,
352        limits,
353        scratch,
354        &mut out[wire::HEADER_SIZE..],
355        mode,
356    )?;
357    let header = wire::PacketHeader::delta_snapshot(
358        schema_hash(schema),
359        tick.raw(),
360        baseline_tick.raw(),
361        payload_len as u32,
362    );
363    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
364        CodecError::OutputTooSmall {
365            needed: wire::HEADER_SIZE,
366            available: out.len(),
367        }
368    })?;
369
370    Ok(wire::HEADER_SIZE + payload_len)
371}
372
373#[allow(clippy::too_many_arguments)]
374fn encode_delta_payload_with_mode(
375    schema: &schema::Schema,
376    baseline_tick: SnapshotTick,
377    baseline: &Snapshot,
378    current: &Snapshot,
379    limits: &CodecLimits,
380    scratch: &mut CodecScratch,
381    out: &mut [u8],
382    mode: EncodeUpdateMode,
383) -> CodecResult<usize> {
384    if baseline.tick != baseline_tick {
385        return Err(CodecError::BaselineTickMismatch {
386            expected: baseline.tick.raw(),
387            found: baseline_tick.raw(),
388        });
389    }
390
391    ensure_entities_sorted(&baseline.entities)?;
392    ensure_entities_sorted(&current.entities)?;
393
394    let mut counts = DiffCounts::default();
395    diff_counts(schema, baseline, current, limits, &mut counts)?;
396
397    if counts.creates > limits.max_entities_create {
398        return Err(CodecError::LimitsExceeded {
399            kind: LimitKind::EntitiesCreate,
400            limit: limits.max_entities_create,
401            actual: counts.creates,
402        });
403    }
404    if counts.updates > limits.max_entities_update {
405        return Err(CodecError::LimitsExceeded {
406            kind: LimitKind::EntitiesUpdate,
407            limit: limits.max_entities_update,
408            actual: counts.updates,
409        });
410    }
411    if counts.destroys > limits.max_entities_destroy {
412        return Err(CodecError::LimitsExceeded {
413            kind: LimitKind::EntitiesDestroy,
414            limit: limits.max_entities_destroy,
415            actual: counts.destroys,
416        });
417    }
418
419    let mut offset = 0;
420    if counts.destroys > 0 {
421        let written = write_section(
422            SectionTag::EntityDestroy,
423            &mut out[offset..],
424            limits,
425            |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
426        )?;
427        offset += written;
428    }
429    if counts.creates > 0 {
430        let written = write_section(
431            SectionTag::EntityCreate,
432            &mut out[offset..],
433            limits,
434            |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
435        )?;
436        offset += written;
437    }
438    if counts.updates > 0 {
439        let update_encoding = match mode {
440            EncodeUpdateMode::Auto => {
441                select_update_encoding(schema, baseline, current, limits, scratch)?
442            }
443            EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
444        };
445        let section_tag = match update_encoding {
446            UpdateEncoding::Masked => SectionTag::EntityUpdate,
447            UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
448        };
449        let written =
450            write_section(
451                section_tag,
452                &mut out[offset..],
453                limits,
454                |writer| match update_encoding {
455                    UpdateEncoding::Masked => encode_update_body_masked(
456                        schema,
457                        baseline,
458                        current,
459                        counts.updates,
460                        limits,
461                        scratch,
462                        writer,
463                    ),
464                    UpdateEncoding::Sparse => encode_update_body_sparse_packed(
465                        schema,
466                        baseline,
467                        current,
468                        counts.updates,
469                        limits,
470                        scratch,
471                        writer,
472                    ),
473                },
474            )?;
475        offset += written;
476    }
477
478    Ok(offset)
479}
480
481#[allow(clippy::too_many_arguments)]
482fn encode_delta_payload_from_updates(
483    schema: &schema::Schema,
484    destroys: &[EntityId],
485    creates: &[EntitySnapshot],
486    updates: &[DeltaUpdateEntity],
487    limits: &CodecLimits,
488    out: &mut [u8],
489) -> CodecResult<usize> {
490    if destroys.len() > limits.max_entities_destroy {
491        return Err(CodecError::LimitsExceeded {
492            kind: LimitKind::EntitiesDestroy,
493            limit: limits.max_entities_destroy,
494            actual: destroys.len(),
495        });
496    }
497    if creates.len() > limits.max_entities_create {
498        return Err(CodecError::LimitsExceeded {
499            kind: LimitKind::EntitiesCreate,
500            limit: limits.max_entities_create,
501            actual: creates.len(),
502        });
503    }
504    if updates.len() > limits.max_entities_update {
505        return Err(CodecError::LimitsExceeded {
506            kind: LimitKind::EntitiesUpdate,
507            limit: limits.max_entities_update,
508            actual: updates.len(),
509        });
510    }
511
512    ensure_entity_ids_sorted(destroys)?;
513    ensure_entities_sorted(creates)?;
514    let lookup = build_component_lookup(schema);
515    validate_updates_for_encoding(schema, updates, limits, &lookup)?;
516
517    let mut offset = 0;
518    if !destroys.is_empty() {
519        let written = write_section(
520            SectionTag::EntityDestroy,
521            &mut out[offset..],
522            limits,
523            |writer| encode_destroy_body_from_list(destroys, limits, writer),
524        )?;
525        offset += written;
526    }
527    if !creates.is_empty() {
528        let written = write_section(
529            SectionTag::EntityCreate,
530            &mut out[offset..],
531            limits,
532            |writer| encode_create_body_from_list(schema, creates, limits, writer),
533        )?;
534        offset += written;
535    }
536    if !updates.is_empty() {
537        let written = write_section(
538            SectionTag::EntityUpdateSparsePacked,
539            &mut out[offset..],
540            limits,
541            |writer| {
542                encode_update_body_sparse_packed_from_updates(
543                    schema, updates, limits, &lookup, writer,
544                )
545            },
546        )?;
547        offset += written;
548    }
549
550    Ok(offset)
551}
552
553/// Applies a delta snapshot to a baseline snapshot.
554pub fn apply_delta_snapshot(
555    schema: &schema::Schema,
556    baseline: &Snapshot,
557    bytes: &[u8],
558    wire_limits: &wire::Limits,
559    limits: &CodecLimits,
560) -> CodecResult<Snapshot> {
561    let packet = decode_packet(bytes, wire_limits)?;
562    apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
563}
564
565/// Applies a delta snapshot from a parsed wire packet.
566pub fn apply_delta_snapshot_from_packet(
567    schema: &schema::Schema,
568    baseline: &Snapshot,
569    packet: &WirePacket<'_>,
570    limits: &CodecLimits,
571) -> CodecResult<Snapshot> {
572    let header = packet.header;
573    if !header.flags.is_delta_snapshot() {
574        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
575            flags: header.flags.raw(),
576        }));
577    }
578    if header.baseline_tick == 0 {
579        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
580            baseline_tick: header.baseline_tick,
581            flags: header.flags.raw(),
582        }));
583    }
584    if header.baseline_tick != baseline.tick.raw() {
585        return Err(CodecError::BaselineTickMismatch {
586            expected: baseline.tick.raw(),
587            found: header.baseline_tick,
588        });
589    }
590
591    let expected_hash = schema_hash(schema);
592    if header.schema_hash != expected_hash {
593        return Err(CodecError::SchemaMismatch {
594            expected: expected_hash,
595            found: header.schema_hash,
596        });
597    }
598
599    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
600
601    ensure_entities_sorted(&baseline.entities)?;
602    ensure_entities_sorted(&creates)?;
603
604    let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
605    remaining = apply_creates(remaining, creates)?;
606    if remaining.len() > limits.max_total_entities_after_apply {
607        return Err(CodecError::LimitsExceeded {
608            kind: LimitKind::TotalEntitiesAfterApply,
609            limit: limits.max_total_entities_after_apply,
610            actual: remaining.len(),
611        });
612    }
613    apply_updates(&mut remaining, &updates)?;
614
615    Ok(Snapshot {
616        tick: SnapshotTick::new(header.tick),
617        entities: remaining,
618    })
619}
620
621/// Decodes a delta packet without applying it to a baseline.
622pub fn decode_delta_packet(
623    schema: &schema::Schema,
624    packet: &WirePacket<'_>,
625    limits: &CodecLimits,
626) -> CodecResult<DeltaDecoded> {
627    let header = packet.header;
628    if !header.flags.is_delta_snapshot() {
629        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
630            flags: header.flags.raw(),
631        }));
632    }
633    if header.baseline_tick == 0 {
634        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
635            baseline_tick: header.baseline_tick,
636            flags: header.flags.raw(),
637        }));
638    }
639    let expected_hash = schema_hash(schema);
640    if header.schema_hash != expected_hash {
641        return Err(CodecError::SchemaMismatch {
642            expected: expected_hash,
643            found: header.schema_hash,
644        });
645    }
646
647    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
648
649    Ok(DeltaDecoded {
650        tick: SnapshotTick::new(header.tick),
651        baseline_tick: SnapshotTick::new(header.baseline_tick),
652        destroys,
653        creates,
654        updates,
655    })
656}
657
658#[derive(Default)]
659struct DiffCounts {
660    creates: usize,
661    updates: usize,
662    destroys: usize,
663}
664
665#[derive(Debug, Clone, Copy, PartialEq, Eq)]
666enum UpdateEncoding {
667    Masked,
668    Sparse,
669}
670
671fn diff_counts(
672    schema: &schema::Schema,
673    baseline: &Snapshot,
674    current: &Snapshot,
675    limits: &CodecLimits,
676    counts: &mut DiffCounts,
677) -> CodecResult<()> {
678    let mut i = 0usize;
679    let mut j = 0usize;
680    while i < baseline.entities.len() || j < current.entities.len() {
681        let base = baseline.entities.get(i);
682        let curr = current.entities.get(j);
683        match (base, curr) {
684            (Some(b), Some(c)) => {
685                if b.id.raw() < c.id.raw() {
686                    counts.destroys += 1;
687                    i += 1;
688                } else if b.id.raw() > c.id.raw() {
689                    counts.creates += 1;
690                    j += 1;
691                } else {
692                    if entity_has_updates(schema, b, c, limits)? {
693                        counts.updates += 1;
694                    }
695                    i += 1;
696                    j += 1;
697                }
698            }
699            (Some(_), None) => {
700                counts.destroys += 1;
701                i += 1;
702            }
703            (None, Some(_)) => {
704                counts.creates += 1;
705                j += 1;
706            }
707            (None, None) => break,
708        }
709    }
710    Ok(())
711}
712
713fn select_update_encoding(
714    schema: &schema::Schema,
715    baseline: &Snapshot,
716    current: &Snapshot,
717    limits: &CodecLimits,
718    scratch: &mut CodecScratch,
719) -> CodecResult<UpdateEncoding> {
720    let component_count = schema.components.len();
721    let mut mask_bits = 0usize;
722    let mut sparse_bits = 0usize;
723    let mut baseline_iter = baseline.entities.iter();
724    let mut current_iter = current.entities.iter();
725    let mut baseline_next = baseline_iter.next();
726    let mut current_next = current_iter.next();
727
728    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
729        match base.id.cmp(&curr.id) {
730            Ordering::Less => {
731                baseline_next = baseline_iter.next();
732            }
733            Ordering::Greater => {
734                current_next = current_iter.next();
735            }
736            Ordering::Equal => {
737                for component in &schema.components {
738                    let base_component = find_component(base, component.id);
739                    let curr_component = find_component(curr, component.id);
740                    if base_component.is_some() != curr_component.is_some() {
741                        return Err(CodecError::InvalidMask {
742                            kind: MaskKind::ComponentMask,
743                            reason: MaskReason::ComponentPresenceMismatch {
744                                component: component.id,
745                            },
746                        });
747                    }
748                    if let (Some(base_component), Some(curr_component)) =
749                        (base_component, curr_component)
750                    {
751                        if base_component.fields.len() != component.fields.len()
752                            || curr_component.fields.len() != component.fields.len()
753                        {
754                            return Err(CodecError::InvalidMask {
755                                kind: MaskKind::FieldMask {
756                                    component: component.id,
757                                },
758                                reason: MaskReason::FieldCountMismatch {
759                                    expected: component.fields.len(),
760                                    actual: base_component
761                                        .fields
762                                        .len()
763                                        .max(curr_component.fields.len()),
764                                },
765                            });
766                        }
767                        if component.fields.len() > limits.max_fields_per_component {
768                            return Err(CodecError::LimitsExceeded {
769                                kind: LimitKind::FieldsPerComponent,
770                                limit: limits.max_fields_per_component,
771                                actual: component.fields.len(),
772                            });
773                        }
774                        let (_, field_mask) = scratch
775                            .component_and_field_masks_mut(component_count, component.fields.len());
776                        // Use the same change predicate as masked/sparse encoding.
777                        let field_mask = compute_field_mask_into(
778                            component,
779                            base_component,
780                            curr_component,
781                            field_mask,
782                        )?;
783                        let changed = field_mask.iter().filter(|bit| **bit).count();
784                        if changed > 0 {
785                            let field_count = component.fields.len();
786                            let index_bits =
787                                required_bits(field_count.saturating_sub(1) as u64) as usize;
788                            // Compare per-component mask bits vs packed sparse index bits.
789                            // Values are sent in both encodings, so we only estimate index/mask + id overhead.
790                            mask_bits += field_count;
791                            sparse_bits += index_bits * changed;
792                            sparse_bits += varu32_len(curr.id.raw()) * 8;
793                            sparse_bits += varu32_len(component.id.get() as u32) * 8;
794                            sparse_bits += varu32_len(changed as u32) * 8;
795                        }
796                    }
797                }
798
799                baseline_next = baseline_iter.next();
800                current_next = current_iter.next();
801            }
802        }
803    }
804
805    if mask_bits == 0 {
806        return Ok(UpdateEncoding::Masked);
807    }
808
809    if sparse_bits <= mask_bits {
810        Ok(UpdateEncoding::Sparse)
811    } else {
812        Ok(UpdateEncoding::Masked)
813    }
814}
815
816fn varu32_len(value: u32) -> usize {
817    if value < (1 << 7) {
818        1
819    } else if value < (1 << 14) {
820        2
821    } else if value < (1 << 21) {
822        3
823    } else if value < (1 << 28) {
824        4
825    } else {
826        5
827    }
828}
829
830fn ensure_entity_ids_sorted(ids: &[EntityId]) -> CodecResult<()> {
831    let mut prev: Option<u32> = None;
832    for id in ids {
833        let raw = id.raw();
834        if let Some(prev_id) = prev {
835            if raw <= prev_id {
836                return Err(CodecError::InvalidEntityOrder {
837                    previous: prev_id,
838                    current: raw,
839                });
840            }
841        }
842        prev = Some(raw);
843    }
844    Ok(())
845}
846
847fn encode_destroy_body_from_list(
848    destroys: &[EntityId],
849    limits: &CodecLimits,
850    writer: &mut BitWriter<'_>,
851) -> CodecResult<()> {
852    if destroys.len() > limits.max_entities_destroy {
853        return Err(CodecError::LimitsExceeded {
854            kind: LimitKind::EntitiesDestroy,
855            limit: limits.max_entities_destroy,
856            actual: destroys.len(),
857        });
858    }
859    writer.align_to_byte()?;
860    writer.write_varu32(destroys.len() as u32)?;
861    for id in destroys {
862        writer.align_to_byte()?;
863        writer.write_u32_aligned(id.raw())?;
864    }
865    writer.align_to_byte()?;
866    Ok(())
867}
868
869fn encode_create_body_from_list(
870    schema: &schema::Schema,
871    creates: &[EntitySnapshot],
872    limits: &CodecLimits,
873    writer: &mut BitWriter<'_>,
874) -> CodecResult<()> {
875    if creates.len() > limits.max_entities_create {
876        return Err(CodecError::LimitsExceeded {
877            kind: LimitKind::EntitiesCreate,
878            limit: limits.max_entities_create,
879            actual: creates.len(),
880        });
881    }
882    writer.align_to_byte()?;
883    writer.write_varu32(creates.len() as u32)?;
884    for entity in creates {
885        write_create_entity(schema, entity, limits, writer)?;
886    }
887    writer.align_to_byte()?;
888    Ok(())
889}
890
891fn validate_updates_for_encoding(
892    schema: &schema::Schema,
893    updates: &[DeltaUpdateEntity],
894    limits: &CodecLimits,
895    lookup: &ComponentLookup,
896) -> CodecResult<()> {
897    let mut prev: Option<u32> = None;
898    for entity_update in updates {
899        let id = entity_update.id.raw();
900        if let Some(prev_id) = prev {
901            if id <= prev_id {
902                return Err(CodecError::InvalidEntityOrder {
903                    previous: prev_id,
904                    current: id,
905                });
906            }
907        }
908        prev = Some(id);
909        if entity_update.components.len() > limits.max_components_per_entity {
910            return Err(CodecError::LimitsExceeded {
911                kind: LimitKind::ComponentsPerEntity,
912                limit: limits.max_components_per_entity,
913                actual: entity_update.components.len(),
914            });
915        }
916        for component_update in &entity_update.components {
917            let component = lookup.component(schema, component_update.id)?;
918            if component_update.fields.is_empty() {
919                return Err(CodecError::InvalidMask {
920                    kind: MaskKind::FieldMask {
921                        component: component_update.id,
922                    },
923                    reason: MaskReason::EmptyFieldMask {
924                        component: component_update.id,
925                    },
926                });
927            }
928            if component_update.fields.len() > limits.max_fields_per_component {
929                return Err(CodecError::LimitsExceeded {
930                    kind: LimitKind::FieldsPerComponent,
931                    limit: limits.max_fields_per_component,
932                    actual: component_update.fields.len(),
933                });
934            }
935            let max_index = component.fields.len().saturating_sub(1);
936            for (field_idx, _value) in &component_update.fields {
937                if *field_idx >= component.fields.len() {
938                    return Err(CodecError::InvalidMask {
939                        kind: MaskKind::FieldMask {
940                            component: component_update.id,
941                        },
942                        reason: MaskReason::InvalidFieldIndex {
943                            field_index: *field_idx,
944                            max: max_index,
945                        },
946                    });
947                }
948            }
949        }
950    }
951    Ok(())
952}
953
954fn encode_update_body_sparse_packed_from_updates(
955    schema: &schema::Schema,
956    updates: &[DeltaUpdateEntity],
957    limits: &CodecLimits,
958    lookup: &ComponentLookup,
959    writer: &mut BitWriter<'_>,
960) -> CodecResult<()> {
961    if updates.len() > limits.max_entities_update {
962        return Err(CodecError::LimitsExceeded {
963            kind: LimitKind::EntitiesUpdate,
964            limit: limits.max_entities_update,
965            actual: updates.len(),
966        });
967    }
968    let entry_count: usize = updates.iter().map(|entity| entity.components.len()).sum();
969    let entry_limit = limits
970        .max_entities_update
971        .saturating_mul(limits.max_components_per_entity);
972    if entry_count > entry_limit {
973        return Err(CodecError::LimitsExceeded {
974            kind: LimitKind::EntitiesUpdate,
975            limit: entry_limit,
976            actual: entry_count,
977        });
978    }
979    writer.align_to_byte()?;
980    writer.write_varu32(entry_count as u32)?;
981    for entity_update in updates {
982        let entity_id = entity_update.id.raw();
983        for component_update in &entity_update.components {
984            let component = lookup.component(schema, component_update.id)?;
985            writer.align_to_byte()?;
986            writer.write_varu32(entity_id)?;
987            writer.write_varu32(component.id.get() as u32)?;
988            writer.write_varu32(component_update.fields.len() as u32)?;
989            let index_bits = lookup.index_bits(component.id);
990            for (field_idx, value) in &component_update.fields {
991                if index_bits > 0 {
992                    writer.write_bits(*field_idx as u64, index_bits)?;
993                }
994                write_field_value_sparse(
995                    component.id,
996                    component.fields[*field_idx],
997                    *value,
998                    writer,
999                )?;
1000            }
1001        }
1002    }
1003    writer.align_to_byte()?;
1004    Ok(())
1005}
1006
1007struct ComponentLookup {
1008    index: Vec<Option<usize>>,
1009    index_bits: Vec<u8>,
1010}
1011
1012impl ComponentLookup {
1013    fn component<'a>(
1014        &self,
1015        schema: &'a schema::Schema,
1016        id: ComponentId,
1017    ) -> CodecResult<&'a ComponentDef> {
1018        let idx = id.get() as usize;
1019        let Some(Some(component_index)) = self.index.get(idx) else {
1020            return Err(CodecError::InvalidMask {
1021                kind: MaskKind::ComponentMask,
1022                reason: MaskReason::UnknownComponent { component: id },
1023            });
1024        };
1025        Ok(&schema.components[*component_index])
1026    }
1027
1028    fn index_bits(&self, id: ComponentId) -> u8 {
1029        let idx = id.get() as usize;
1030        let Some(bits) = self.index_bits.get(idx).copied() else {
1031            return 0;
1032        };
1033        bits
1034    }
1035}
1036
1037fn build_component_lookup(schema: &schema::Schema) -> ComponentLookup {
1038    let max_id = schema
1039        .components
1040        .iter()
1041        .map(|component| component.id.get() as usize)
1042        .max()
1043        .unwrap_or(0);
1044    let mut index = vec![None; max_id + 1];
1045    let mut index_bits = vec![0u8; max_id + 1];
1046    for (component_index, component) in schema.components.iter().enumerate() {
1047        let id = component.id.get() as usize;
1048        index[id] = Some(component_index);
1049        let bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1050        index_bits[id] = bits;
1051    }
1052    ComponentLookup { index, index_bits }
1053}
1054
1055fn encode_destroy_body(
1056    baseline: &Snapshot,
1057    current: &Snapshot,
1058    destroy_count: usize,
1059    limits: &CodecLimits,
1060    writer: &mut BitWriter<'_>,
1061) -> CodecResult<()> {
1062    if destroy_count > limits.max_entities_destroy {
1063        return Err(CodecError::LimitsExceeded {
1064            kind: LimitKind::EntitiesDestroy,
1065            limit: limits.max_entities_destroy,
1066            actual: destroy_count,
1067        });
1068    }
1069
1070    writer.align_to_byte()?;
1071    writer.write_varu32(destroy_count as u32)?;
1072
1073    let mut i = 0usize;
1074    let mut j = 0usize;
1075    while i < baseline.entities.len() || j < current.entities.len() {
1076        let base = baseline.entities.get(i);
1077        let curr = current.entities.get(j);
1078        match (base, curr) {
1079            (Some(b), Some(c)) => {
1080                if b.id.raw() < c.id.raw() {
1081                    writer.align_to_byte()?;
1082                    writer.write_u32_aligned(b.id.raw())?;
1083                    i += 1;
1084                } else if b.id.raw() > c.id.raw() {
1085                    j += 1;
1086                } else {
1087                    i += 1;
1088                    j += 1;
1089                }
1090            }
1091            (Some(b), None) => {
1092                writer.align_to_byte()?;
1093                writer.write_u32_aligned(b.id.raw())?;
1094                i += 1;
1095            }
1096            (None, Some(_)) => {
1097                j += 1;
1098            }
1099            (None, None) => break,
1100        }
1101    }
1102
1103    writer.align_to_byte()?;
1104    Ok(())
1105}
1106
1107fn encode_create_body(
1108    schema: &schema::Schema,
1109    baseline: &Snapshot,
1110    current: &Snapshot,
1111    create_count: usize,
1112    limits: &CodecLimits,
1113    writer: &mut BitWriter<'_>,
1114) -> CodecResult<()> {
1115    if create_count > limits.max_entities_create {
1116        return Err(CodecError::LimitsExceeded {
1117            kind: LimitKind::EntitiesCreate,
1118            limit: limits.max_entities_create,
1119            actual: create_count,
1120        });
1121    }
1122
1123    writer.align_to_byte()?;
1124    writer.write_varu32(create_count as u32)?;
1125
1126    let mut i = 0usize;
1127    let mut j = 0usize;
1128    while i < baseline.entities.len() || j < current.entities.len() {
1129        let base = baseline.entities.get(i);
1130        let curr = current.entities.get(j);
1131        match (base, curr) {
1132            (Some(b), Some(c)) => {
1133                if b.id.raw() < c.id.raw() {
1134                    i += 1;
1135                } else if b.id.raw() > c.id.raw() {
1136                    write_create_entity(schema, c, limits, writer)?;
1137                    j += 1;
1138                } else {
1139                    i += 1;
1140                    j += 1;
1141                }
1142            }
1143            (Some(_), None) => {
1144                i += 1;
1145            }
1146            (None, Some(c)) => {
1147                write_create_entity(schema, c, limits, writer)?;
1148                j += 1;
1149            }
1150            (None, None) => break,
1151        }
1152    }
1153
1154    writer.align_to_byte()?;
1155    Ok(())
1156}
1157
1158fn encode_update_body_masked(
1159    schema: &schema::Schema,
1160    baseline: &Snapshot,
1161    current: &Snapshot,
1162    update_count: usize,
1163    limits: &CodecLimits,
1164    scratch: &mut CodecScratch,
1165    writer: &mut BitWriter<'_>,
1166) -> CodecResult<()> {
1167    if update_count > limits.max_entities_update {
1168        return Err(CodecError::LimitsExceeded {
1169            kind: LimitKind::EntitiesUpdate,
1170            limit: limits.max_entities_update,
1171            actual: update_count,
1172        });
1173    }
1174
1175    writer.align_to_byte()?;
1176    writer.write_varu32(update_count as u32)?;
1177
1178    let mut i = 0usize;
1179    let mut j = 0usize;
1180    while i < baseline.entities.len() || j < current.entities.len() {
1181        let base = baseline.entities.get(i);
1182        let curr = current.entities.get(j);
1183        match (base, curr) {
1184            (Some(b), Some(c)) => {
1185                if b.id.raw() < c.id.raw() {
1186                    i += 1;
1187                } else if b.id.raw() > c.id.raw() {
1188                    j += 1;
1189                } else {
1190                    if entity_has_updates(schema, b, c, limits)? {
1191                        writer.align_to_byte()?;
1192                        writer.write_u32_aligned(c.id.raw())?;
1193                        ensure_component_presence_matches(schema, b, c)?;
1194                        write_update_components(schema, b, c, limits, scratch, writer)?;
1195                    }
1196                    i += 1;
1197                    j += 1;
1198                }
1199            }
1200            (Some(_), None) => i += 1,
1201            (None, Some(_)) => j += 1,
1202            (None, None) => break,
1203        }
1204    }
1205
1206    writer.align_to_byte()?;
1207    Ok(())
1208}
1209
1210#[allow(dead_code)]
1211fn encode_update_body_sparse_varint(
1212    schema: &schema::Schema,
1213    baseline: &Snapshot,
1214    current: &Snapshot,
1215    update_count: usize,
1216    limits: &CodecLimits,
1217    scratch: &mut CodecScratch,
1218    writer: &mut BitWriter<'_>,
1219) -> CodecResult<()> {
1220    if update_count > limits.max_entities_update {
1221        return Err(CodecError::LimitsExceeded {
1222            kind: LimitKind::EntitiesUpdate,
1223            limit: limits.max_entities_update,
1224            actual: update_count,
1225        });
1226    }
1227
1228    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1229    let entry_limit = limits
1230        .max_entities_update
1231        .saturating_mul(limits.max_components_per_entity);
1232    if entry_count > entry_limit {
1233        return Err(CodecError::LimitsExceeded {
1234            kind: LimitKind::EntitiesUpdate,
1235            limit: entry_limit,
1236            actual: entry_count,
1237        });
1238    }
1239
1240    writer.align_to_byte()?;
1241    writer.write_varu32(entry_count as u32)?;
1242
1243    let mut baseline_iter = baseline.entities.iter();
1244    let mut current_iter = current.entities.iter();
1245    let mut baseline_next = baseline_iter.next();
1246    let mut current_next = current_iter.next();
1247    let component_count = schema.components.len();
1248
1249    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1250        match base.id.cmp(&curr.id) {
1251            Ordering::Less => {
1252                baseline_next = baseline_iter.next();
1253            }
1254            Ordering::Greater => {
1255                current_next = current_iter.next();
1256            }
1257            Ordering::Equal => {
1258                if entity_has_updates(schema, base, curr, limits)? {
1259                    for component in &schema.components {
1260                        let base_component = find_component(base, component.id);
1261                        let curr_component = find_component(curr, component.id);
1262                        if base_component.is_some() != curr_component.is_some() {
1263                            return Err(CodecError::InvalidMask {
1264                                kind: MaskKind::ComponentMask,
1265                                reason: MaskReason::ComponentPresenceMismatch {
1266                                    component: component.id,
1267                                },
1268                            });
1269                        }
1270                        if let (Some(base_component), Some(curr_component)) =
1271                            (base_component, curr_component)
1272                        {
1273                            if base_component.fields.len() != component.fields.len()
1274                                || curr_component.fields.len() != component.fields.len()
1275                            {
1276                                return Err(CodecError::InvalidMask {
1277                                    kind: MaskKind::FieldMask {
1278                                        component: component.id,
1279                                    },
1280                                    reason: MaskReason::FieldCountMismatch {
1281                                        expected: component.fields.len(),
1282                                        actual: base_component
1283                                            .fields
1284                                            .len()
1285                                            .max(curr_component.fields.len()),
1286                                    },
1287                                });
1288                            }
1289                            if component.fields.len() > limits.max_fields_per_component {
1290                                return Err(CodecError::LimitsExceeded {
1291                                    kind: LimitKind::FieldsPerComponent,
1292                                    limit: limits.max_fields_per_component,
1293                                    actual: component.fields.len(),
1294                                });
1295                            }
1296                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1297                                component_count,
1298                                component.fields.len(),
1299                            );
1300                            let field_mask = compute_field_mask_into(
1301                                component,
1302                                base_component,
1303                                curr_component,
1304                                field_mask,
1305                            )?;
1306                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1307                            if changed_fields > 0 {
1308                                writer.align_to_byte()?;
1309                                writer.write_u32_aligned(curr.id.raw())?;
1310                                writer.write_u16_aligned(component.id.get())?;
1311                                writer.write_varu32(changed_fields as u32)?;
1312                                for (idx, field) in component.fields.iter().enumerate() {
1313                                    if field_mask[idx] {
1314                                        writer.align_to_byte()?;
1315                                        writer.write_varu32(idx as u32)?;
1316                                        write_field_value_sparse(
1317                                            component.id,
1318                                            *field,
1319                                            curr_component.fields[idx],
1320                                            writer,
1321                                        )?;
1322                                    }
1323                                }
1324                            }
1325                        }
1326                    }
1327                }
1328                baseline_next = baseline_iter.next();
1329                current_next = current_iter.next();
1330            }
1331        }
1332    }
1333
1334    writer.align_to_byte()?;
1335    Ok(())
1336}
1337
1338fn encode_update_body_sparse_packed(
1339    schema: &schema::Schema,
1340    baseline: &Snapshot,
1341    current: &Snapshot,
1342    update_count: usize,
1343    limits: &CodecLimits,
1344    scratch: &mut CodecScratch,
1345    writer: &mut BitWriter<'_>,
1346) -> CodecResult<()> {
1347    if update_count > limits.max_entities_update {
1348        return Err(CodecError::LimitsExceeded {
1349            kind: LimitKind::EntitiesUpdate,
1350            limit: limits.max_entities_update,
1351            actual: update_count,
1352        });
1353    }
1354
1355    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1356    let entry_limit = limits
1357        .max_entities_update
1358        .saturating_mul(limits.max_components_per_entity);
1359    if entry_count > entry_limit {
1360        return Err(CodecError::LimitsExceeded {
1361            kind: LimitKind::EntitiesUpdate,
1362            limit: entry_limit,
1363            actual: entry_count,
1364        });
1365    }
1366
1367    writer.align_to_byte()?;
1368    writer.write_varu32(entry_count as u32)?;
1369
1370    let mut baseline_iter = baseline.entities.iter();
1371    let mut current_iter = current.entities.iter();
1372    let mut baseline_next = baseline_iter.next();
1373    let mut current_next = current_iter.next();
1374    let component_count = schema.components.len();
1375
1376    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1377        match base.id.cmp(&curr.id) {
1378            Ordering::Less => {
1379                baseline_next = baseline_iter.next();
1380            }
1381            Ordering::Greater => {
1382                current_next = current_iter.next();
1383            }
1384            Ordering::Equal => {
1385                if entity_has_updates(schema, base, curr, limits)? {
1386                    for component in &schema.components {
1387                        let base_component = find_component(base, component.id);
1388                        let curr_component = find_component(curr, component.id);
1389                        if base_component.is_some() != curr_component.is_some() {
1390                            return Err(CodecError::InvalidMask {
1391                                kind: MaskKind::ComponentMask,
1392                                reason: MaskReason::ComponentPresenceMismatch {
1393                                    component: component.id,
1394                                },
1395                            });
1396                        }
1397                        if let (Some(base_component), Some(curr_component)) =
1398                            (base_component, curr_component)
1399                        {
1400                            if base_component.fields.len() != component.fields.len()
1401                                || curr_component.fields.len() != component.fields.len()
1402                            {
1403                                return Err(CodecError::InvalidMask {
1404                                    kind: MaskKind::FieldMask {
1405                                        component: component.id,
1406                                    },
1407                                    reason: MaskReason::FieldCountMismatch {
1408                                        expected: component.fields.len(),
1409                                        actual: base_component
1410                                            .fields
1411                                            .len()
1412                                            .max(curr_component.fields.len()),
1413                                    },
1414                                });
1415                            }
1416                            if component.fields.len() > limits.max_fields_per_component {
1417                                return Err(CodecError::LimitsExceeded {
1418                                    kind: LimitKind::FieldsPerComponent,
1419                                    limit: limits.max_fields_per_component,
1420                                    actual: component.fields.len(),
1421                                });
1422                            }
1423                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1424                                component_count,
1425                                component.fields.len(),
1426                            );
1427                            let field_mask = compute_field_mask_into(
1428                                component,
1429                                base_component,
1430                                curr_component,
1431                                field_mask,
1432                            )?;
1433                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1434                            if changed_fields > 0 {
1435                                writer.align_to_byte()?;
1436                                writer.write_varu32(curr.id.raw())?;
1437                                writer.write_varu32(component.id.get() as u32)?;
1438                                writer.write_varu32(changed_fields as u32)?;
1439                                let index_bits =
1440                                    required_bits(component.fields.len().saturating_sub(1) as u64);
1441                                for (idx, field) in component.fields.iter().enumerate() {
1442                                    if field_mask[idx] {
1443                                        if index_bits > 0 {
1444                                            writer.write_bits(idx as u64, index_bits)?;
1445                                        }
1446                                        write_field_value_sparse(
1447                                            component.id,
1448                                            *field,
1449                                            curr_component.fields[idx],
1450                                            writer,
1451                                        )?;
1452                                    }
1453                                }
1454                            }
1455                        }
1456                    }
1457                }
1458                baseline_next = baseline_iter.next();
1459                current_next = current_iter.next();
1460            }
1461        }
1462    }
1463
1464    writer.align_to_byte()?;
1465    Ok(())
1466}
1467
1468fn count_sparse_update_entries(
1469    schema: &schema::Schema,
1470    baseline: &Snapshot,
1471    current: &Snapshot,
1472    limits: &CodecLimits,
1473    scratch: &mut CodecScratch,
1474) -> CodecResult<usize> {
1475    let component_count = schema.components.len();
1476    let mut count = 0usize;
1477    let mut baseline_iter = baseline.entities.iter();
1478    let mut current_iter = current.entities.iter();
1479    let mut baseline_next = baseline_iter.next();
1480    let mut current_next = current_iter.next();
1481
1482    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1483        match base.id.cmp(&curr.id) {
1484            Ordering::Less => baseline_next = baseline_iter.next(),
1485            Ordering::Greater => current_next = current_iter.next(),
1486            Ordering::Equal => {
1487                if entity_has_updates(schema, base, curr, limits)? {
1488                    for component in &schema.components {
1489                        let base_component = find_component(base, component.id);
1490                        let curr_component = find_component(curr, component.id);
1491                        if base_component.is_some() != curr_component.is_some() {
1492                            return Err(CodecError::InvalidMask {
1493                                kind: MaskKind::ComponentMask,
1494                                reason: MaskReason::ComponentPresenceMismatch {
1495                                    component: component.id,
1496                                },
1497                            });
1498                        }
1499                        if let (Some(base_component), Some(curr_component)) =
1500                            (base_component, curr_component)
1501                        {
1502                            if base_component.fields.len() != component.fields.len()
1503                                || curr_component.fields.len() != component.fields.len()
1504                            {
1505                                return Err(CodecError::InvalidMask {
1506                                    kind: MaskKind::FieldMask {
1507                                        component: component.id,
1508                                    },
1509                                    reason: MaskReason::FieldCountMismatch {
1510                                        expected: component.fields.len(),
1511                                        actual: base_component
1512                                            .fields
1513                                            .len()
1514                                            .max(curr_component.fields.len()),
1515                                    },
1516                                });
1517                            }
1518                            if component.fields.len() > limits.max_fields_per_component {
1519                                return Err(CodecError::LimitsExceeded {
1520                                    kind: LimitKind::FieldsPerComponent,
1521                                    limit: limits.max_fields_per_component,
1522                                    actual: component.fields.len(),
1523                                });
1524                            }
1525                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1526                                component_count,
1527                                component.fields.len(),
1528                            );
1529                            let field_mask = compute_field_mask_into(
1530                                component,
1531                                base_component,
1532                                curr_component,
1533                                field_mask,
1534                            )?;
1535                            if field_mask.iter().any(|bit| *bit) {
1536                                count += 1;
1537                            }
1538                        }
1539                    }
1540                }
1541                baseline_next = baseline_iter.next();
1542                current_next = current_iter.next();
1543            }
1544        }
1545    }
1546
1547    Ok(count)
1548}
1549
1550fn write_create_entity(
1551    schema: &schema::Schema,
1552    entity: &EntitySnapshot,
1553    limits: &CodecLimits,
1554    writer: &mut BitWriter<'_>,
1555) -> CodecResult<()> {
1556    writer.align_to_byte()?;
1557    writer.write_u32_aligned(entity.id.raw())?;
1558    ensure_known_components(schema, entity)?;
1559    write_component_mask(schema, entity, writer)?;
1560    for component in schema.components.iter() {
1561        if let Some(snapshot) = find_component(entity, component.id) {
1562            write_full_component(component, snapshot, limits, writer)?;
1563        }
1564    }
1565    Ok(())
1566}
1567
1568fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1569    if body.len() > limits.max_section_bytes {
1570        return Err(CodecError::LimitsExceeded {
1571            kind: LimitKind::SectionBytes,
1572            limit: limits.max_section_bytes,
1573            actual: body.len(),
1574        });
1575    }
1576
1577    let mut reader = BitReader::new(body);
1578    reader.align_to_byte()?;
1579    let count = reader.read_varu32()? as usize;
1580    if count > limits.max_entities_destroy {
1581        return Err(CodecError::LimitsExceeded {
1582            kind: LimitKind::EntitiesDestroy,
1583            limit: limits.max_entities_destroy,
1584            actual: count,
1585        });
1586    }
1587
1588    let mut ids = Vec::with_capacity(count);
1589    let mut prev: Option<u32> = None;
1590    for _ in 0..count {
1591        reader.align_to_byte()?;
1592        let id = reader.read_u32_aligned()?;
1593        if let Some(prev_id) = prev {
1594            if id <= prev_id {
1595                return Err(CodecError::InvalidEntityOrder {
1596                    previous: prev_id,
1597                    current: id,
1598                });
1599            }
1600        }
1601        prev = Some(id);
1602        ids.push(EntityId::new(id));
1603    }
1604    reader.align_to_byte()?;
1605    if reader.bits_remaining() != 0 {
1606        return Err(CodecError::TrailingSectionData {
1607            section: SectionTag::EntityDestroy,
1608            remaining_bits: reader.bits_remaining(),
1609        });
1610    }
1611    Ok(ids)
1612}
1613
1614fn decode_create_section(
1615    schema: &schema::Schema,
1616    body: &[u8],
1617    limits: &CodecLimits,
1618) -> CodecResult<Vec<EntitySnapshot>> {
1619    if body.len() > limits.max_section_bytes {
1620        return Err(CodecError::LimitsExceeded {
1621            kind: LimitKind::SectionBytes,
1622            limit: limits.max_section_bytes,
1623            actual: body.len(),
1624        });
1625    }
1626
1627    let mut reader = BitReader::new(body);
1628    reader.align_to_byte()?;
1629    let count = reader.read_varu32()? as usize;
1630    if count > limits.max_entities_create {
1631        return Err(CodecError::LimitsExceeded {
1632            kind: LimitKind::EntitiesCreate,
1633            limit: limits.max_entities_create,
1634            actual: count,
1635        });
1636    }
1637
1638    let mut entities = Vec::with_capacity(count);
1639    let mut prev: Option<u32> = None;
1640    for _ in 0..count {
1641        reader.align_to_byte()?;
1642        let id = reader.read_u32_aligned()?;
1643        if let Some(prev_id) = prev {
1644            if id <= prev_id {
1645                return Err(CodecError::InvalidEntityOrder {
1646                    previous: prev_id,
1647                    current: id,
1648                });
1649            }
1650        }
1651        prev = Some(id);
1652
1653        let component_mask = read_mask(
1654            &mut reader,
1655            schema.components.len(),
1656            MaskKind::ComponentMask,
1657        )?;
1658
1659        let mut components = Vec::new();
1660        for (idx, component) in schema.components.iter().enumerate() {
1661            if component_mask[idx] {
1662                let fields = decode_full_component(component, &mut reader, limits)?;
1663                components.push(ComponentSnapshot {
1664                    id: component.id,
1665                    fields,
1666                });
1667            }
1668        }
1669
1670        let entity = EntitySnapshot {
1671            id: EntityId::new(id),
1672            components,
1673        };
1674        ensure_known_components(schema, &entity)?;
1675        entities.push(entity);
1676    }
1677
1678    reader.align_to_byte()?;
1679    if reader.bits_remaining() != 0 {
1680        return Err(CodecError::TrailingSectionData {
1681            section: SectionTag::EntityCreate,
1682            remaining_bits: reader.bits_remaining(),
1683        });
1684    }
1685    Ok(entities)
1686}
1687
1688fn decode_update_section_masked(
1689    schema: &schema::Schema,
1690    body: &[u8],
1691    limits: &CodecLimits,
1692) -> CodecResult<Vec<DeltaUpdateEntity>> {
1693    if body.len() > limits.max_section_bytes {
1694        return Err(CodecError::LimitsExceeded {
1695            kind: LimitKind::SectionBytes,
1696            limit: limits.max_section_bytes,
1697            actual: body.len(),
1698        });
1699    }
1700
1701    let mut reader = BitReader::new(body);
1702    reader.align_to_byte()?;
1703    let count = reader.read_varu32()? as usize;
1704    if count > limits.max_entities_update {
1705        return Err(CodecError::LimitsExceeded {
1706            kind: LimitKind::EntitiesUpdate,
1707            limit: limits.max_entities_update,
1708            actual: count,
1709        });
1710    }
1711
1712    let mut updates = Vec::with_capacity(count);
1713    let mut prev: Option<u32> = None;
1714    for _ in 0..count {
1715        reader.align_to_byte()?;
1716        let id = reader.read_u32_aligned()?;
1717        if let Some(prev_id) = prev {
1718            if id <= prev_id {
1719                return Err(CodecError::InvalidEntityOrder {
1720                    previous: prev_id,
1721                    current: id,
1722                });
1723            }
1724        }
1725        prev = Some(id);
1726
1727        let component_mask = read_mask(
1728            &mut reader,
1729            schema.components.len(),
1730            MaskKind::ComponentMask,
1731        )?;
1732        let mut components = Vec::new();
1733        for (idx, component) in schema.components.iter().enumerate() {
1734            if component_mask[idx] {
1735                let fields = decode_update_component(component, &mut reader, limits)?;
1736                components.push(DeltaUpdateComponent {
1737                    id: component.id,
1738                    fields,
1739                });
1740            }
1741        }
1742
1743        updates.push(DeltaUpdateEntity {
1744            id: EntityId::new(id),
1745            components,
1746        });
1747    }
1748
1749    reader.align_to_byte()?;
1750    if reader.bits_remaining() != 0 {
1751        return Err(CodecError::TrailingSectionData {
1752            section: SectionTag::EntityUpdate,
1753            remaining_bits: reader.bits_remaining(),
1754        });
1755    }
1756    Ok(updates)
1757}
1758
1759fn decode_update_section_sparse_varint(
1760    schema: &schema::Schema,
1761    body: &[u8],
1762    limits: &CodecLimits,
1763) -> CodecResult<Vec<DeltaUpdateEntity>> {
1764    if body.len() > limits.max_section_bytes {
1765        return Err(CodecError::LimitsExceeded {
1766            kind: LimitKind::SectionBytes,
1767            limit: limits.max_section_bytes,
1768            actual: body.len(),
1769        });
1770    }
1771
1772    let mut reader = BitReader::new(body);
1773    reader.align_to_byte()?;
1774    let entry_count = reader.read_varu32()? as usize;
1775    let entry_limit = limits
1776        .max_entities_update
1777        .saturating_mul(limits.max_components_per_entity);
1778    if entry_count > entry_limit {
1779        return Err(CodecError::LimitsExceeded {
1780            kind: LimitKind::EntitiesUpdate,
1781            limit: entry_limit,
1782            actual: entry_count,
1783        });
1784    }
1785
1786    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1787    let mut prev_entity: Option<u32> = None;
1788    let mut prev_component: Option<u16> = None;
1789    for _ in 0..entry_count {
1790        reader.align_to_byte()?;
1791        let entity_id = reader.read_u32_aligned()?;
1792        let component_raw = reader.read_u16_aligned()?;
1793        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1794            kind: MaskKind::ComponentMask,
1795            reason: MaskReason::InvalidComponentId { raw: component_raw },
1796        })?;
1797        if let Some(prev) = prev_entity {
1798            if entity_id < prev {
1799                return Err(CodecError::InvalidEntityOrder {
1800                    previous: prev,
1801                    current: entity_id,
1802                });
1803            }
1804            if entity_id == prev {
1805                if let Some(prev_component) = prev_component {
1806                    if component_raw <= prev_component {
1807                        return Err(CodecError::InvalidEntityOrder {
1808                            previous: prev,
1809                            current: entity_id,
1810                        });
1811                    }
1812                }
1813            }
1814        }
1815        prev_entity = Some(entity_id);
1816        prev_component = Some(component_raw);
1817
1818        let component = schema
1819            .components
1820            .iter()
1821            .find(|component| component.id == component_id)
1822            .ok_or(CodecError::InvalidMask {
1823                kind: MaskKind::ComponentMask,
1824                reason: MaskReason::UnknownComponent {
1825                    component: component_id,
1826                },
1827            })?;
1828
1829        let field_count = reader.read_varu32()? as usize;
1830        if field_count == 0 {
1831            return Err(CodecError::InvalidMask {
1832                kind: MaskKind::FieldMask {
1833                    component: component.id,
1834                },
1835                reason: MaskReason::EmptyFieldMask {
1836                    component: component.id,
1837                },
1838            });
1839        }
1840        if field_count > limits.max_fields_per_component {
1841            return Err(CodecError::LimitsExceeded {
1842                kind: LimitKind::FieldsPerComponent,
1843                limit: limits.max_fields_per_component,
1844                actual: field_count,
1845            });
1846        }
1847        if field_count > component.fields.len() {
1848            return Err(CodecError::InvalidMask {
1849                kind: MaskKind::FieldMask {
1850                    component: component.id,
1851                },
1852                reason: MaskReason::FieldCountMismatch {
1853                    expected: component.fields.len(),
1854                    actual: field_count,
1855                },
1856            });
1857        }
1858
1859        let mut fields = Vec::with_capacity(field_count);
1860        let mut prev_index: Option<usize> = None;
1861        for _ in 0..field_count {
1862            reader.align_to_byte()?;
1863            let field_index = reader.read_varu32()? as usize;
1864            if field_index >= component.fields.len() {
1865                return Err(CodecError::InvalidMask {
1866                    kind: MaskKind::FieldMask {
1867                        component: component.id,
1868                    },
1869                    reason: MaskReason::InvalidFieldIndex {
1870                        field_index,
1871                        max: component.fields.len(),
1872                    },
1873                });
1874            }
1875            if let Some(prev_index) = prev_index {
1876                if field_index <= prev_index {
1877                    return Err(CodecError::InvalidMask {
1878                        kind: MaskKind::FieldMask {
1879                            component: component.id,
1880                        },
1881                        reason: MaskReason::InvalidFieldIndex {
1882                            field_index,
1883                            max: component.fields.len(),
1884                        },
1885                    });
1886                }
1887            }
1888            prev_index = Some(field_index);
1889            let field = component.fields[field_index];
1890            let value = read_field_value_sparse(component.id, field, &mut reader)?;
1891            fields.push((field_index, value));
1892        }
1893
1894        if let Some(last) = updates.last_mut() {
1895            if last.id.raw() == entity_id {
1896                last.components.push(DeltaUpdateComponent {
1897                    id: component.id,
1898                    fields,
1899                });
1900                continue;
1901            }
1902        }
1903
1904        updates.push(DeltaUpdateEntity {
1905            id: EntityId::new(entity_id),
1906            components: vec![DeltaUpdateComponent {
1907                id: component.id,
1908                fields,
1909            }],
1910        });
1911    }
1912
1913    reader.align_to_byte()?;
1914    if reader.bits_remaining() != 0 {
1915        return Err(CodecError::TrailingSectionData {
1916            section: SectionTag::EntityUpdateSparse,
1917            remaining_bits: reader.bits_remaining(),
1918        });
1919    }
1920
1921    let unique_entities = updates.len();
1922    if unique_entities > limits.max_entities_update {
1923        return Err(CodecError::LimitsExceeded {
1924            kind: LimitKind::EntitiesUpdate,
1925            limit: limits.max_entities_update,
1926            actual: unique_entities,
1927        });
1928    }
1929
1930    Ok(updates)
1931}
1932
1933fn decode_update_section_sparse_packed(
1934    schema: &schema::Schema,
1935    body: &[u8],
1936    limits: &CodecLimits,
1937) -> CodecResult<Vec<DeltaUpdateEntity>> {
1938    if body.len() > limits.max_section_bytes {
1939        return Err(CodecError::LimitsExceeded {
1940            kind: LimitKind::SectionBytes,
1941            limit: limits.max_section_bytes,
1942            actual: body.len(),
1943        });
1944    }
1945
1946    let mut reader = BitReader::new(body);
1947    reader.align_to_byte()?;
1948    let entry_count = reader.read_varu32()? as usize;
1949    let entry_limit = limits
1950        .max_entities_update
1951        .saturating_mul(limits.max_components_per_entity);
1952    if entry_count > entry_limit {
1953        return Err(CodecError::LimitsExceeded {
1954            kind: LimitKind::EntitiesUpdate,
1955            limit: entry_limit,
1956            actual: entry_count,
1957        });
1958    }
1959
1960    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1961    let mut prev_entity: Option<u32> = None;
1962    let mut prev_component: Option<u16> = None;
1963    for _ in 0..entry_count {
1964        reader.align_to_byte()?;
1965        let entity_id = reader.read_varu32()?;
1966        let component_raw = reader.read_varu32()?;
1967        if component_raw > u16::MAX as u32 {
1968            return Err(CodecError::InvalidMask {
1969                kind: MaskKind::ComponentMask,
1970                reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1971            });
1972        }
1973        let component_raw = component_raw as u16;
1974        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1975            kind: MaskKind::ComponentMask,
1976            reason: MaskReason::InvalidComponentId { raw: component_raw },
1977        })?;
1978        if let Some(prev) = prev_entity {
1979            if entity_id < prev {
1980                return Err(CodecError::InvalidEntityOrder {
1981                    previous: prev,
1982                    current: entity_id,
1983                });
1984            }
1985            if entity_id == prev {
1986                if let Some(prev_component) = prev_component {
1987                    if component_raw <= prev_component {
1988                        return Err(CodecError::InvalidEntityOrder {
1989                            previous: prev,
1990                            current: entity_id,
1991                        });
1992                    }
1993                }
1994            }
1995        }
1996        prev_entity = Some(entity_id);
1997        prev_component = Some(component_raw);
1998
1999        let component = schema
2000            .components
2001            .iter()
2002            .find(|component| component.id == component_id)
2003            .ok_or(CodecError::InvalidMask {
2004                kind: MaskKind::ComponentMask,
2005                reason: MaskReason::UnknownComponent {
2006                    component: component_id,
2007                },
2008            })?;
2009
2010        let field_count = reader.read_varu32()? as usize;
2011        if field_count == 0 {
2012            return Err(CodecError::InvalidMask {
2013                kind: MaskKind::FieldMask {
2014                    component: component.id,
2015                },
2016                reason: MaskReason::EmptyFieldMask {
2017                    component: component.id,
2018                },
2019            });
2020        }
2021        if field_count > limits.max_fields_per_component {
2022            return Err(CodecError::LimitsExceeded {
2023                kind: LimitKind::FieldsPerComponent,
2024                limit: limits.max_fields_per_component,
2025                actual: field_count,
2026            });
2027        }
2028        if field_count > component.fields.len() {
2029            return Err(CodecError::InvalidMask {
2030                kind: MaskKind::FieldMask {
2031                    component: component.id,
2032                },
2033                reason: MaskReason::FieldCountMismatch {
2034                    expected: component.fields.len(),
2035                    actual: field_count,
2036                },
2037            });
2038        }
2039
2040        let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
2041        let mut fields = Vec::with_capacity(field_count);
2042        let mut prev_index: Option<usize> = None;
2043        for _ in 0..field_count {
2044            let field_index = if index_bits == 0 {
2045                0
2046            } else {
2047                reader.read_bits(index_bits)? as usize
2048            };
2049            if field_index >= component.fields.len() {
2050                return Err(CodecError::InvalidMask {
2051                    kind: MaskKind::FieldMask {
2052                        component: component.id,
2053                    },
2054                    reason: MaskReason::InvalidFieldIndex {
2055                        field_index,
2056                        max: component.fields.len(),
2057                    },
2058                });
2059            }
2060            if let Some(prev_index) = prev_index {
2061                if field_index <= prev_index {
2062                    return Err(CodecError::InvalidMask {
2063                        kind: MaskKind::FieldMask {
2064                            component: component.id,
2065                        },
2066                        reason: MaskReason::InvalidFieldIndex {
2067                            field_index,
2068                            max: component.fields.len(),
2069                        },
2070                    });
2071                }
2072            }
2073            prev_index = Some(field_index);
2074            let field = component.fields[field_index];
2075            let value = read_field_value_sparse(component.id, field, &mut reader)?;
2076            fields.push((field_index, value));
2077        }
2078
2079        if let Some(last) = updates.last_mut() {
2080            if last.id.raw() == entity_id {
2081                last.components.push(DeltaUpdateComponent {
2082                    id: component.id,
2083                    fields,
2084                });
2085                continue;
2086            }
2087        }
2088
2089        updates.push(DeltaUpdateEntity {
2090            id: EntityId::new(entity_id),
2091            components: vec![DeltaUpdateComponent {
2092                id: component.id,
2093                fields,
2094            }],
2095        });
2096    }
2097
2098    reader.align_to_byte()?;
2099    if reader.bits_remaining() != 0 {
2100        return Err(CodecError::TrailingSectionData {
2101            section: SectionTag::EntityUpdateSparsePacked,
2102            remaining_bits: reader.bits_remaining(),
2103        });
2104    }
2105
2106    let unique_entities = updates.len();
2107    if unique_entities > limits.max_entities_update {
2108        return Err(CodecError::LimitsExceeded {
2109            kind: LimitKind::EntitiesUpdate,
2110            limit: limits.max_entities_update,
2111            actual: unique_entities,
2112        });
2113    }
2114
2115    Ok(updates)
2116}
2117
2118fn decode_delta_sections(
2119    schema: &schema::Schema,
2120    packet: &WirePacket<'_>,
2121    limits: &CodecLimits,
2122) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
2123    let mut destroys: Option<Vec<EntityId>> = None;
2124    let mut creates: Option<Vec<EntitySnapshot>> = None;
2125    let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
2126    let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
2127
2128    for section in &packet.sections {
2129        match section.tag {
2130            SectionTag::EntityDestroy => {
2131                if destroys.is_some() {
2132                    return Err(CodecError::DuplicateSection {
2133                        section: section.tag,
2134                    });
2135                }
2136                destroys = Some(decode_destroy_section(section.body, limits)?);
2137            }
2138            SectionTag::EntityCreate => {
2139                if creates.is_some() {
2140                    return Err(CodecError::DuplicateSection {
2141                        section: section.tag,
2142                    });
2143                }
2144                creates = Some(decode_create_section(schema, section.body, limits)?);
2145            }
2146            SectionTag::EntityUpdate => {
2147                if updates_masked.is_some() {
2148                    return Err(CodecError::DuplicateSection {
2149                        section: section.tag,
2150                    });
2151                }
2152                updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
2153            }
2154            SectionTag::EntityUpdateSparse => {
2155                if updates_sparse.is_some() {
2156                    return Err(CodecError::DuplicateSection {
2157                        section: section.tag,
2158                    });
2159                }
2160                updates_sparse = Some(decode_update_section_sparse_varint(
2161                    schema,
2162                    section.body,
2163                    limits,
2164                )?);
2165            }
2166            SectionTag::EntityUpdateSparsePacked => {
2167                if updates_sparse.is_some() {
2168                    return Err(CodecError::DuplicateSection {
2169                        section: section.tag,
2170                    });
2171                }
2172                updates_sparse = Some(decode_update_section_sparse_packed(
2173                    schema,
2174                    section.body,
2175                    limits,
2176                )?);
2177            }
2178            _ => {
2179                return Err(CodecError::UnexpectedSection {
2180                    section: section.tag,
2181                });
2182            }
2183        }
2184    }
2185
2186    Ok((
2187        destroys.unwrap_or_default(),
2188        creates.unwrap_or_default(),
2189        match (updates_masked, updates_sparse) {
2190            (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
2191            (Some(updates), None) => updates,
2192            (None, Some(updates)) => updates,
2193            (None, None) => Vec::new(),
2194        },
2195    ))
2196}
2197
2198#[derive(Debug, Clone, PartialEq, Eq)]
2199pub struct DeltaDecoded {
2200    pub tick: SnapshotTick,
2201    pub baseline_tick: SnapshotTick,
2202    pub destroys: Vec<EntityId>,
2203    pub creates: Vec<EntitySnapshot>,
2204    pub updates: Vec<DeltaUpdateEntity>,
2205}
2206
2207#[derive(Debug, Clone, PartialEq, Eq)]
2208pub struct DeltaUpdateEntity {
2209    pub id: EntityId,
2210    pub components: Vec<DeltaUpdateComponent>,
2211}
2212
2213#[derive(Debug, Clone, PartialEq, Eq)]
2214pub struct DeltaUpdateComponent {
2215    pub id: ComponentId,
2216    pub fields: Vec<(usize, FieldValue)>,
2217}
2218
2219fn apply_destroys(
2220    baseline: &[EntitySnapshot],
2221    destroys: &[EntityId],
2222) -> CodecResult<Vec<EntitySnapshot>> {
2223    let mut result = Vec::with_capacity(baseline.len());
2224    let mut i = 0usize;
2225    let mut j = 0usize;
2226    while i < baseline.len() || j < destroys.len() {
2227        let base = baseline.get(i);
2228        let destroy = destroys.get(j);
2229        match (base, destroy) {
2230            (Some(b), Some(d)) => {
2231                if b.id.raw() < d.raw() {
2232                    result.push(b.clone());
2233                    i += 1;
2234                } else if b.id.raw() > d.raw() {
2235                    return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2236                } else {
2237                    i += 1;
2238                    j += 1;
2239                }
2240            }
2241            (Some(b), None) => {
2242                result.push(b.clone());
2243                i += 1;
2244            }
2245            (None, Some(d)) => {
2246                return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2247            }
2248            (None, None) => break,
2249        }
2250    }
2251    Ok(result)
2252}
2253
2254fn apply_creates(
2255    baseline: Vec<EntitySnapshot>,
2256    creates: Vec<EntitySnapshot>,
2257) -> CodecResult<Vec<EntitySnapshot>> {
2258    let mut result = Vec::with_capacity(baseline.len() + creates.len());
2259    let mut i = 0usize;
2260    let mut j = 0usize;
2261    while i < baseline.len() || j < creates.len() {
2262        let base = baseline.get(i);
2263        let create = creates.get(j);
2264        match (base, create) {
2265            (Some(b), Some(c)) => {
2266                if b.id.raw() < c.id.raw() {
2267                    result.push(b.clone());
2268                    i += 1;
2269                } else if b.id.raw() > c.id.raw() {
2270                    result.push(c.clone());
2271                    j += 1;
2272                } else {
2273                    return Err(CodecError::EntityAlreadyExists {
2274                        entity_id: c.id.raw(),
2275                    });
2276                }
2277            }
2278            (Some(b), None) => {
2279                result.push(b.clone());
2280                i += 1;
2281            }
2282            (None, Some(c)) => {
2283                result.push(c.clone());
2284                j += 1;
2285            }
2286            (None, None) => break,
2287        }
2288    }
2289    Ok(result)
2290}
2291
2292fn apply_updates(
2293    entities: &mut [EntitySnapshot],
2294    updates: &[DeltaUpdateEntity],
2295) -> CodecResult<()> {
2296    for update in updates {
2297        let idx = entities
2298            .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
2299            .map_err(|_| CodecError::EntityNotFound {
2300                entity_id: update.id.raw(),
2301            })?;
2302        let entity = &mut entities[idx];
2303        for component_update in &update.components {
2304            let component = entity
2305                .components
2306                .iter_mut()
2307                .find(|c| c.id == component_update.id)
2308                .ok_or_else(|| CodecError::ComponentNotFound {
2309                    entity_id: update.id.raw(),
2310                    component_id: component_update.id.get(),
2311                })?;
2312            for (field_idx, value) in &component_update.fields {
2313                if *field_idx >= component.fields.len() {
2314                    return Err(CodecError::InvalidMask {
2315                        kind: MaskKind::FieldMask {
2316                            component: component_update.id,
2317                        },
2318                        reason: MaskReason::FieldCountMismatch {
2319                            expected: component.fields.len(),
2320                            actual: *field_idx + 1,
2321                        },
2322                    });
2323                }
2324                component.fields[*field_idx] = *value;
2325            }
2326        }
2327    }
2328    Ok(())
2329}
2330
2331fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
2332    let mut prev: Option<u32> = None;
2333    for entity in entities {
2334        if let Some(prev_id) = prev {
2335            if entity.id.raw() <= prev_id {
2336                return Err(CodecError::InvalidEntityOrder {
2337                    previous: prev_id,
2338                    current: entity.id.raw(),
2339                });
2340            }
2341        }
2342        prev = Some(entity.id.raw());
2343    }
2344    Ok(())
2345}
2346
2347fn write_component_mask(
2348    schema: &schema::Schema,
2349    entity: &EntitySnapshot,
2350    writer: &mut BitWriter<'_>,
2351) -> CodecResult<()> {
2352    for component in &schema.components {
2353        let present = find_component(entity, component.id).is_some();
2354        writer.write_bit(present)?;
2355    }
2356    Ok(())
2357}
2358
2359fn write_full_component(
2360    component: &ComponentDef,
2361    snapshot: &ComponentSnapshot,
2362    limits: &CodecLimits,
2363    writer: &mut BitWriter<'_>,
2364) -> CodecResult<()> {
2365    if snapshot.fields.len() != component.fields.len() {
2366        return Err(CodecError::InvalidMask {
2367            kind: MaskKind::FieldMask {
2368                component: component.id,
2369            },
2370            reason: MaskReason::FieldCountMismatch {
2371                expected: component.fields.len(),
2372                actual: snapshot.fields.len(),
2373            },
2374        });
2375    }
2376    if snapshot.fields.len() > limits.max_fields_per_component {
2377        return Err(CodecError::LimitsExceeded {
2378            kind: LimitKind::FieldsPerComponent,
2379            limit: limits.max_fields_per_component,
2380            actual: snapshot.fields.len(),
2381        });
2382    }
2383
2384    for _ in &component.fields {
2385        writer.write_bit(true)?;
2386    }
2387    for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
2388        write_field_value(component.id, *field, *value, writer)?;
2389    }
2390    Ok(())
2391}
2392
2393fn decode_full_component(
2394    component: &ComponentDef,
2395    reader: &mut BitReader<'_>,
2396    limits: &CodecLimits,
2397) -> CodecResult<Vec<FieldValue>> {
2398    if component.fields.len() > limits.max_fields_per_component {
2399        return Err(CodecError::LimitsExceeded {
2400            kind: LimitKind::FieldsPerComponent,
2401            limit: limits.max_fields_per_component,
2402            actual: component.fields.len(),
2403        });
2404    }
2405
2406    let mask = read_mask(
2407        reader,
2408        component.fields.len(),
2409        MaskKind::FieldMask {
2410            component: component.id,
2411        },
2412    )?;
2413    let mut values = Vec::with_capacity(component.fields.len());
2414    for (idx, field) in component.fields.iter().enumerate() {
2415        if !mask[idx] {
2416            return Err(CodecError::InvalidMask {
2417                kind: MaskKind::FieldMask {
2418                    component: component.id,
2419                },
2420                reason: MaskReason::MissingField { field: field.id },
2421            });
2422        }
2423        values.push(read_field_value(component.id, *field, reader)?);
2424    }
2425    Ok(values)
2426}
2427
2428fn write_update_components(
2429    schema: &schema::Schema,
2430    baseline: &EntitySnapshot,
2431    current: &EntitySnapshot,
2432    limits: &CodecLimits,
2433    scratch: &mut CodecScratch,
2434    writer: &mut BitWriter<'_>,
2435) -> CodecResult<()> {
2436    let component_count = schema.components.len();
2437    let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2438    component_changed.fill(false);
2439    for (idx, component) in schema.components.iter().enumerate() {
2440        let base = find_component(baseline, component.id);
2441        let curr = find_component(current, component.id);
2442        if base.is_some() != curr.is_some() {
2443            return Err(CodecError::InvalidMask {
2444                kind: MaskKind::ComponentMask,
2445                reason: MaskReason::ComponentPresenceMismatch {
2446                    component: component.id,
2447                },
2448            });
2449        }
2450        if let (Some(base), Some(curr)) = (base, curr) {
2451            if base.fields.len() != component.fields.len()
2452                || curr.fields.len() != component.fields.len()
2453            {
2454                return Err(CodecError::InvalidMask {
2455                    kind: MaskKind::FieldMask {
2456                        component: component.id,
2457                    },
2458                    reason: MaskReason::FieldCountMismatch {
2459                        expected: component.fields.len(),
2460                        actual: base.fields.len().max(curr.fields.len()),
2461                    },
2462                });
2463            }
2464            if component.fields.len() > limits.max_fields_per_component {
2465                return Err(CodecError::LimitsExceeded {
2466                    kind: LimitKind::FieldsPerComponent,
2467                    limit: limits.max_fields_per_component,
2468                    actual: component.fields.len(),
2469                });
2470            }
2471            let (component_changed, field_mask) =
2472                scratch.component_and_field_masks_mut(component_count, component.fields.len());
2473            let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2474                .iter()
2475                .any(|b| *b);
2476            writer.write_bit(any_changed)?;
2477            if any_changed {
2478                component_changed[idx] = true;
2479            }
2480        } else {
2481            writer.write_bit(false)?;
2482        }
2483    }
2484
2485    for (idx, component) in schema.components.iter().enumerate() {
2486        let (base, curr) = match (
2487            find_component(baseline, component.id),
2488            find_component(current, component.id),
2489        ) {
2490            (Some(base), Some(curr)) => (base, curr),
2491            _ => continue,
2492        };
2493        if component.fields.len() > limits.max_fields_per_component {
2494            return Err(CodecError::LimitsExceeded {
2495                kind: LimitKind::FieldsPerComponent,
2496                limit: limits.max_fields_per_component,
2497                actual: component.fields.len(),
2498            });
2499        }
2500        let (component_changed, field_mask) =
2501            scratch.component_and_field_masks_mut(component_count, component.fields.len());
2502        if component_changed[idx] {
2503            let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2504            for bit in field_mask {
2505                writer.write_bit(*bit)?;
2506            }
2507            for (((field, _base_val), curr_val), changed) in component
2508                .fields
2509                .iter()
2510                .zip(base.fields.iter())
2511                .zip(curr.fields.iter())
2512                .zip(field_mask.iter())
2513            {
2514                if *changed {
2515                    write_field_value(component.id, *field, *curr_val, writer)?;
2516                }
2517            }
2518        }
2519    }
2520    Ok(())
2521}
2522
2523fn decode_update_component(
2524    component: &ComponentDef,
2525    reader: &mut BitReader<'_>,
2526    limits: &CodecLimits,
2527) -> CodecResult<Vec<(usize, FieldValue)>> {
2528    if component.fields.len() > limits.max_fields_per_component {
2529        return Err(CodecError::LimitsExceeded {
2530            kind: LimitKind::FieldsPerComponent,
2531            limit: limits.max_fields_per_component,
2532            actual: component.fields.len(),
2533        });
2534    }
2535    let mask = read_mask(
2536        reader,
2537        component.fields.len(),
2538        MaskKind::FieldMask {
2539            component: component.id,
2540        },
2541    )?;
2542    if !mask.iter().any(|b| *b) {
2543        return Err(CodecError::InvalidMask {
2544            kind: MaskKind::FieldMask {
2545                component: component.id,
2546            },
2547            reason: MaskReason::EmptyFieldMask {
2548                component: component.id,
2549            },
2550        });
2551    }
2552    let mut fields = Vec::new();
2553    for (idx, field) in component.fields.iter().enumerate() {
2554        if mask[idx] {
2555            let value = read_field_value(component.id, *field, reader)?;
2556            fields.push((idx, value));
2557        }
2558    }
2559    Ok(fields)
2560}
2561
2562fn compute_field_mask_into<'a>(
2563    component: &ComponentDef,
2564    baseline: &ComponentSnapshot,
2565    current: &ComponentSnapshot,
2566    field_mask: &'a mut [bool],
2567) -> CodecResult<&'a [bool]> {
2568    for (((field, base_val), curr_val), slot) in component
2569        .fields
2570        .iter()
2571        .zip(baseline.fields.iter())
2572        .zip(current.fields.iter())
2573        .zip(field_mask.iter_mut())
2574    {
2575        *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2576    }
2577    Ok(field_mask)
2578}
2579
2580fn field_changed(
2581    component_id: ComponentId,
2582    field: FieldDef,
2583    baseline: FieldValue,
2584    current: FieldValue,
2585) -> CodecResult<bool> {
2586    match field.change {
2587        ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2588        ChangePolicy::Threshold { threshold_q } => {
2589            field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2590        }
2591    }
2592}
2593
2594fn field_differs(
2595    component_id: ComponentId,
2596    field: FieldDef,
2597    baseline: FieldValue,
2598    current: FieldValue,
2599) -> CodecResult<bool> {
2600    match (baseline, current) {
2601        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2602        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2603        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2604        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2605        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2606        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2607        _ => Err(CodecError::InvalidValue {
2608            component: component_id,
2609            field: field.id,
2610            reason: ValueReason::TypeMismatch {
2611                expected: codec_name(field.codec),
2612                found: value_name(current),
2613            },
2614        }),
2615    }
2616}
2617
2618fn field_exceeds_threshold(
2619    component_id: ComponentId,
2620    field: FieldDef,
2621    baseline: FieldValue,
2622    current: FieldValue,
2623    threshold_q: u32,
2624) -> CodecResult<bool> {
2625    let threshold_q = threshold_q as u64;
2626    match (baseline, current) {
2627        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2628            Ok((a - b).unsigned_abs() > threshold_q)
2629        }
2630        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2631        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2632        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2633        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2634            Ok((a - b).unsigned_abs() > threshold_q)
2635        }
2636        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2637        _ => Err(CodecError::InvalidValue {
2638            component: component_id,
2639            field: field.id,
2640            reason: ValueReason::TypeMismatch {
2641                expected: codec_name(field.codec),
2642                found: value_name(current),
2643            },
2644        }),
2645    }
2646}
2647
2648fn entity_has_updates(
2649    schema: &schema::Schema,
2650    baseline: &EntitySnapshot,
2651    current: &EntitySnapshot,
2652    limits: &CodecLimits,
2653) -> CodecResult<bool> {
2654    ensure_component_presence_matches(schema, baseline, current)?;
2655    for component in &schema.components {
2656        let base = find_component(baseline, component.id);
2657        let curr = find_component(current, component.id);
2658        if let (Some(base), Some(curr)) = (base, curr) {
2659            if base.fields.len() != component.fields.len()
2660                || curr.fields.len() != component.fields.len()
2661            {
2662                return Err(CodecError::InvalidMask {
2663                    kind: MaskKind::FieldMask {
2664                        component: component.id,
2665                    },
2666                    reason: MaskReason::FieldCountMismatch {
2667                        expected: component.fields.len(),
2668                        actual: base.fields.len().max(curr.fields.len()),
2669                    },
2670                });
2671            }
2672            if component.fields.len() > limits.max_fields_per_component {
2673                return Err(CodecError::LimitsExceeded {
2674                    kind: LimitKind::FieldsPerComponent,
2675                    limit: limits.max_fields_per_component,
2676                    actual: component.fields.len(),
2677                });
2678            }
2679            for ((field, base_val), curr_val) in component
2680                .fields
2681                .iter()
2682                .zip(base.fields.iter())
2683                .zip(curr.fields.iter())
2684            {
2685                if field_changed(component.id, *field, *base_val, *curr_val)? {
2686                    return Ok(true);
2687                }
2688            }
2689        }
2690    }
2691    Ok(false)
2692}
2693
2694fn ensure_component_presence_matches(
2695    schema: &schema::Schema,
2696    baseline: &EntitySnapshot,
2697    current: &EntitySnapshot,
2698) -> CodecResult<()> {
2699    // In this version, component presence is stable across an entity's lifetime.
2700    for component in &schema.components {
2701        let base = find_component(baseline, component.id).is_some();
2702        let curr = find_component(current, component.id).is_some();
2703        if base != curr {
2704            return Err(CodecError::InvalidMask {
2705                kind: MaskKind::ComponentMask,
2706                reason: MaskReason::ComponentPresenceMismatch {
2707                    component: component.id,
2708                },
2709            });
2710        }
2711    }
2712    Ok(())
2713}
2714
2715fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2716    entity.components.iter().find(|c| c.id == id)
2717}
2718
2719fn codec_name(codec: schema::FieldCodec) -> &'static str {
2720    match codec {
2721        schema::FieldCodec::Bool => "bool",
2722        schema::FieldCodec::UInt { .. } => "uint",
2723        schema::FieldCodec::SInt { .. } => "sint",
2724        schema::FieldCodec::VarUInt => "varuint",
2725        schema::FieldCodec::VarSInt => "varsint",
2726        schema::FieldCodec::FixedPoint(_) => "fixed-point",
2727    }
2728}
2729
2730fn value_name(value: FieldValue) -> &'static str {
2731    match value {
2732        FieldValue::Bool(_) => "bool",
2733        FieldValue::UInt(_) => "uint",
2734        FieldValue::SInt(_) => "sint",
2735        FieldValue::VarUInt(_) => "varuint",
2736        FieldValue::VarSInt(_) => "varsint",
2737        FieldValue::FixedPoint(_) => "fixed-point",
2738    }
2739}
2740
2741#[cfg(test)]
2742mod tests {
2743    use super::*;
2744    use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2745
2746    fn schema_one_bool() -> Schema {
2747        let component = ComponentDef::new(ComponentId::new(1).unwrap())
2748            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2749        Schema::new(vec![component]).unwrap()
2750    }
2751
2752    fn schema_uint_threshold(threshold_q: u32) -> Schema {
2753        let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2754            .change(ChangePolicy::Threshold { threshold_q });
2755        let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2756        Schema::new(vec![component]).unwrap()
2757    }
2758
2759    fn schema_two_components() -> Schema {
2760        let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2761            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2762        let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2763            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2764        Schema::new(vec![c1, c2]).unwrap()
2765    }
2766
2767    fn baseline_snapshot() -> Snapshot {
2768        Snapshot {
2769            tick: SnapshotTick::new(10),
2770            entities: vec![EntitySnapshot {
2771                id: EntityId::new(1),
2772                components: vec![ComponentSnapshot {
2773                    id: ComponentId::new(1).unwrap(),
2774                    fields: vec![FieldValue::Bool(false)],
2775                }],
2776            }],
2777        }
2778    }
2779
2780    #[test]
2781    fn no_op_delta_is_empty() {
2782        let schema = schema_one_bool();
2783        let baseline = baseline_snapshot();
2784        let current = baseline.clone();
2785        let mut buf = [0u8; 128];
2786        let bytes = encode_delta_snapshot(
2787            &schema,
2788            SnapshotTick::new(11),
2789            baseline.tick,
2790            &baseline,
2791            &current,
2792            &CodecLimits::for_testing(),
2793            &mut buf,
2794        )
2795        .unwrap();
2796        let header =
2797            wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2798        let mut expected = [0u8; wire::HEADER_SIZE];
2799        encode_header(&header, &mut expected).unwrap();
2800        assert_eq!(&buf[..bytes], expected.as_slice());
2801    }
2802
2803    #[test]
2804    fn delta_roundtrip_single_update() {
2805        let schema = schema_one_bool();
2806        let baseline = baseline_snapshot();
2807        let current = Snapshot {
2808            tick: SnapshotTick::new(11),
2809            entities: vec![EntitySnapshot {
2810                id: EntityId::new(1),
2811                components: vec![ComponentSnapshot {
2812                    id: ComponentId::new(1).unwrap(),
2813                    fields: vec![FieldValue::Bool(true)],
2814                }],
2815            }],
2816        };
2817
2818        let mut buf = [0u8; 128];
2819        let bytes = encode_delta_snapshot(
2820            &schema,
2821            current.tick,
2822            baseline.tick,
2823            &baseline,
2824            &current,
2825            &CodecLimits::for_testing(),
2826            &mut buf,
2827        )
2828        .unwrap();
2829        let applied = apply_delta_snapshot(
2830            &schema,
2831            &baseline,
2832            &buf[..bytes],
2833            &wire::Limits::for_testing(),
2834            &CodecLimits::for_testing(),
2835        )
2836        .unwrap();
2837        assert_eq!(applied.entities, current.entities);
2838    }
2839
2840    #[test]
2841    fn delta_roundtrip_reuse_scratch() {
2842        let schema = schema_one_bool();
2843        let baseline = baseline_snapshot();
2844        let current_one = Snapshot {
2845            tick: SnapshotTick::new(11),
2846            entities: vec![EntitySnapshot {
2847                id: EntityId::new(1),
2848                components: vec![ComponentSnapshot {
2849                    id: ComponentId::new(1).unwrap(),
2850                    fields: vec![FieldValue::Bool(true)],
2851                }],
2852            }],
2853        };
2854        let current_two = Snapshot {
2855            tick: SnapshotTick::new(12),
2856            entities: vec![EntitySnapshot {
2857                id: EntityId::new(1),
2858                components: vec![ComponentSnapshot {
2859                    id: ComponentId::new(1).unwrap(),
2860                    fields: vec![FieldValue::Bool(false)],
2861                }],
2862            }],
2863        };
2864
2865        let mut scratch = CodecScratch::default();
2866        let mut buf_one = [0u8; 128];
2867        let mut buf_two = [0u8; 128];
2868
2869        let bytes_one = encode_delta_snapshot_with_scratch(
2870            &schema,
2871            current_one.tick,
2872            baseline.tick,
2873            &baseline,
2874            &current_one,
2875            &CodecLimits::for_testing(),
2876            &mut scratch,
2877            &mut buf_one,
2878        )
2879        .unwrap();
2880        let applied_one = apply_delta_snapshot(
2881            &schema,
2882            &baseline,
2883            &buf_one[..bytes_one],
2884            &wire::Limits::for_testing(),
2885            &CodecLimits::for_testing(),
2886        )
2887        .unwrap();
2888        assert_eq!(applied_one.entities, current_one.entities);
2889
2890        let bytes_two = encode_delta_snapshot_with_scratch(
2891            &schema,
2892            current_two.tick,
2893            baseline.tick,
2894            &baseline,
2895            &current_two,
2896            &CodecLimits::for_testing(),
2897            &mut scratch,
2898            &mut buf_two,
2899        )
2900        .unwrap();
2901        let applied_two = apply_delta_snapshot(
2902            &schema,
2903            &baseline,
2904            &buf_two[..bytes_two],
2905            &wire::Limits::for_testing(),
2906            &CodecLimits::for_testing(),
2907        )
2908        .unwrap();
2909        assert_eq!(applied_two.entities, current_two.entities);
2910    }
2911
2912    #[test]
2913    fn delta_rejects_both_update_encodings() {
2914        let schema = schema_one_bool();
2915        let baseline = baseline_snapshot();
2916        let update_body = [0u8; 1];
2917        let mut update_section = [0u8; 8];
2918        let update_len =
2919            wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2920                .unwrap();
2921
2922        let mut sparse_section = [0u8; 8];
2923        let sparse_len = wire::encode_section(
2924            SectionTag::EntityUpdateSparse,
2925            &update_body,
2926            &mut sparse_section,
2927        )
2928        .unwrap();
2929
2930        let payload_len = (update_len + sparse_len) as u32;
2931        let header = wire::PacketHeader::delta_snapshot(
2932            schema_hash(&schema),
2933            baseline.tick.raw() + 1,
2934            baseline.tick.raw(),
2935            payload_len,
2936        );
2937        let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2938        wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2939        buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2940            .copy_from_slice(&update_section[..update_len]);
2941        buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2942            .copy_from_slice(&sparse_section[..sparse_len]);
2943
2944        let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2945        let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2946        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2947
2948        let err = apply_delta_snapshot_from_packet(
2949            &schema,
2950            &baseline,
2951            &packet,
2952            &CodecLimits::for_testing(),
2953        )
2954        .unwrap_err();
2955        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2956    }
2957
2958    #[test]
2959    fn delta_roundtrip_create_destroy_update() {
2960        let schema = schema_one_bool();
2961        let baseline = Snapshot {
2962            tick: SnapshotTick::new(10),
2963            entities: vec![
2964                EntitySnapshot {
2965                    id: EntityId::new(1),
2966                    components: vec![ComponentSnapshot {
2967                        id: ComponentId::new(1).unwrap(),
2968                        fields: vec![FieldValue::Bool(false)],
2969                    }],
2970                },
2971                EntitySnapshot {
2972                    id: EntityId::new(2),
2973                    components: vec![ComponentSnapshot {
2974                        id: ComponentId::new(1).unwrap(),
2975                        fields: vec![FieldValue::Bool(false)],
2976                    }],
2977                },
2978            ],
2979        };
2980        let current = Snapshot {
2981            tick: SnapshotTick::new(11),
2982            entities: vec![
2983                EntitySnapshot {
2984                    id: EntityId::new(2),
2985                    components: vec![ComponentSnapshot {
2986                        id: ComponentId::new(1).unwrap(),
2987                        fields: vec![FieldValue::Bool(true)],
2988                    }],
2989                },
2990                EntitySnapshot {
2991                    id: EntityId::new(3),
2992                    components: vec![ComponentSnapshot {
2993                        id: ComponentId::new(1).unwrap(),
2994                        fields: vec![FieldValue::Bool(true)],
2995                    }],
2996                },
2997            ],
2998        };
2999
3000        let mut buf = [0u8; 256];
3001        let bytes = encode_delta_snapshot(
3002            &schema,
3003            current.tick,
3004            baseline.tick,
3005            &baseline,
3006            &current,
3007            &CodecLimits::for_testing(),
3008            &mut buf,
3009        )
3010        .unwrap();
3011        let applied = apply_delta_snapshot(
3012            &schema,
3013            &baseline,
3014            &buf[..bytes],
3015            &wire::Limits::for_testing(),
3016            &CodecLimits::for_testing(),
3017        )
3018        .unwrap();
3019        assert_eq!(applied.entities, current.entities);
3020    }
3021
3022    #[test]
3023    fn delta_session_header_matches_payload() {
3024        let schema = schema_one_bool();
3025        let baseline = baseline_snapshot();
3026        let current = Snapshot {
3027            tick: SnapshotTick::new(11),
3028            entities: vec![EntitySnapshot {
3029                id: EntityId::new(1),
3030                components: vec![ComponentSnapshot {
3031                    id: ComponentId::new(1).unwrap(),
3032                    fields: vec![FieldValue::Bool(true)],
3033                }],
3034            }],
3035        };
3036
3037        let mut full_buf = [0u8; 128];
3038        let full_bytes = encode_delta_snapshot_for_client(
3039            &schema,
3040            current.tick,
3041            baseline.tick,
3042            &baseline,
3043            &current,
3044            &CodecLimits::for_testing(),
3045            &mut full_buf,
3046        )
3047        .unwrap();
3048        let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
3049
3050        let mut session_buf = [0u8; 128];
3051        let mut last_tick = baseline.tick;
3052        let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
3053            &schema,
3054            current.tick,
3055            baseline.tick,
3056            &baseline,
3057            &current,
3058            &CodecLimits::for_testing(),
3059            &mut CodecScratch::default(),
3060            &mut last_tick,
3061            &mut session_buf,
3062        )
3063        .unwrap();
3064
3065        let session_header =
3066            wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
3067                .unwrap();
3068        let payload_start = session_header.header_len;
3069        let payload_end = payload_start + session_header.payload_len as usize;
3070        let session_payload = &session_buf[payload_start..payload_end];
3071        assert_eq!(full_payload, session_payload);
3072    }
3073
3074    #[test]
3075    fn delta_roundtrip_single_component_change() {
3076        let schema = schema_two_components();
3077        let baseline = Snapshot {
3078            tick: SnapshotTick::new(10),
3079            entities: vec![EntitySnapshot {
3080                id: EntityId::new(1),
3081                components: vec![
3082                    ComponentSnapshot {
3083                        id: ComponentId::new(1).unwrap(),
3084                        fields: vec![FieldValue::Bool(false)],
3085                    },
3086                    ComponentSnapshot {
3087                        id: ComponentId::new(2).unwrap(),
3088                        fields: vec![FieldValue::Bool(false)],
3089                    },
3090                ],
3091            }],
3092        };
3093        let current = Snapshot {
3094            tick: SnapshotTick::new(11),
3095            entities: vec![EntitySnapshot {
3096                id: EntityId::new(1),
3097                components: vec![
3098                    ComponentSnapshot {
3099                        id: ComponentId::new(1).unwrap(),
3100                        fields: vec![FieldValue::Bool(true)],
3101                    },
3102                    ComponentSnapshot {
3103                        id: ComponentId::new(2).unwrap(),
3104                        fields: vec![FieldValue::Bool(false)],
3105                    },
3106                ],
3107            }],
3108        };
3109
3110        let mut buf = [0u8; 256];
3111        let bytes = encode_delta_snapshot(
3112            &schema,
3113            current.tick,
3114            baseline.tick,
3115            &baseline,
3116            &current,
3117            &CodecLimits::for_testing(),
3118            &mut buf,
3119        )
3120        .unwrap();
3121        let applied = apply_delta_snapshot(
3122            &schema,
3123            &baseline,
3124            &buf[..bytes],
3125            &wire::Limits::for_testing(),
3126            &CodecLimits::for_testing(),
3127        )
3128        .unwrap();
3129        assert_eq!(applied.entities, current.entities);
3130    }
3131
3132    #[test]
3133    fn baseline_tick_mismatch_is_error() {
3134        let schema = schema_one_bool();
3135        let baseline = baseline_snapshot();
3136        let current = baseline.clone();
3137        let mut buf = [0u8; 128];
3138        let bytes = encode_delta_snapshot(
3139            &schema,
3140            SnapshotTick::new(11),
3141            baseline.tick,
3142            &baseline,
3143            &current,
3144            &CodecLimits::for_testing(),
3145            &mut buf,
3146        )
3147        .unwrap();
3148        let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3149        packet.header.baseline_tick = 999;
3150        wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
3151        let err = apply_delta_snapshot(
3152            &schema,
3153            &baseline,
3154            &buf[..bytes],
3155            &wire::Limits::for_testing(),
3156            &CodecLimits::for_testing(),
3157        )
3158        .unwrap_err();
3159        assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
3160    }
3161
3162    #[test]
3163    fn threshold_suppresses_small_change() {
3164        let schema = schema_uint_threshold(5);
3165        let baseline = Snapshot {
3166            tick: SnapshotTick::new(10),
3167            entities: vec![EntitySnapshot {
3168                id: EntityId::new(1),
3169                components: vec![ComponentSnapshot {
3170                    id: ComponentId::new(1).unwrap(),
3171                    fields: vec![FieldValue::UInt(10)],
3172                }],
3173            }],
3174        };
3175        let current = Snapshot {
3176            tick: SnapshotTick::new(11),
3177            entities: vec![EntitySnapshot {
3178                id: EntityId::new(1),
3179                components: vec![ComponentSnapshot {
3180                    id: ComponentId::new(1).unwrap(),
3181                    fields: vec![FieldValue::UInt(12)],
3182                }],
3183            }],
3184        };
3185
3186        let mut buf = [0u8; 128];
3187        let bytes = encode_delta_snapshot(
3188            &schema,
3189            current.tick,
3190            baseline.tick,
3191            &baseline,
3192            &current,
3193            &CodecLimits::for_testing(),
3194            &mut buf,
3195        )
3196        .unwrap();
3197
3198        let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3199        assert_eq!(packet.sections.len(), 0);
3200
3201        let applied = apply_delta_snapshot(
3202            &schema,
3203            &baseline,
3204            &buf[..bytes],
3205            &wire::Limits::for_testing(),
3206            &CodecLimits::for_testing(),
3207        )
3208        .unwrap();
3209        assert_eq!(applied.entities, baseline.entities);
3210    }
3211}