Skip to main content

codec/
delta.rs

1//! Delta snapshot encoding/decoding.
2
3use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14    ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15    write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16    FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20/// Selects the latest baseline tick at or before the ack tick.
21#[must_use]
22pub fn select_baseline_tick<T>(
23    store: &BaselineStore<T>,
24    ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26    store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29/// Encodes a delta snapshot into the provided output buffer.
30///
31/// This is the scan-based path and is kept as a convenience/fallback. For
32/// production engines with dirty/change lists, prefer `encode_delta_from_changes`.
33/// Baseline and current snapshots must have entities sorted by `EntityId`.
34pub fn encode_delta_snapshot(
35    schema: &schema::Schema,
36    tick: SnapshotTick,
37    baseline_tick: SnapshotTick,
38    baseline: &Snapshot,
39    current: &Snapshot,
40    limits: &CodecLimits,
41    out: &mut [u8],
42) -> CodecResult<usize> {
43    let mut scratch = CodecScratch::default();
44    encode_delta_snapshot_with_scratch(
45        schema,
46        tick,
47        baseline_tick,
48        baseline,
49        current,
50        limits,
51        &mut scratch,
52        out,
53    )
54}
55
56/// Encodes a delta snapshot using reusable scratch buffers.
57#[allow(clippy::too_many_arguments)]
58pub fn encode_delta_snapshot_with_scratch(
59    schema: &schema::Schema,
60    tick: SnapshotTick,
61    baseline_tick: SnapshotTick,
62    baseline: &Snapshot,
63    current: &Snapshot,
64    limits: &CodecLimits,
65    scratch: &mut CodecScratch,
66    out: &mut [u8],
67) -> CodecResult<usize> {
68    encode_delta_snapshot_with_scratch_mode(
69        schema,
70        tick,
71        baseline_tick,
72        baseline,
73        current,
74        limits,
75        scratch,
76        out,
77        EncodeUpdateMode::Auto,
78    )
79}
80
81/// Encodes a delta snapshot for a client-specific view.
82///
83/// This assumes `baseline` and `current` are already filtered for the client interest set.
84/// Updates are encoded in sparse mode to optimize for small per-client packets.
85pub fn encode_delta_snapshot_for_client(
86    schema: &schema::Schema,
87    tick: SnapshotTick,
88    baseline_tick: SnapshotTick,
89    baseline: &Snapshot,
90    current: &Snapshot,
91    limits: &CodecLimits,
92    out: &mut [u8],
93) -> CodecResult<usize> {
94    let mut scratch = CodecScratch::default();
95    encode_delta_snapshot_for_client_with_scratch(
96        schema,
97        tick,
98        baseline_tick,
99        baseline,
100        current,
101        limits,
102        &mut scratch,
103        out,
104    )
105}
106
107/// Encoder context for delta packets that use caller-provided change lists.
108///
109/// This avoids scanning baseline/current snapshots and is preferred for
110/// production engines that already track dirty state.
111pub struct SessionEncoder<'a> {
112    schema: &'a schema::Schema,
113    limits: &'a CodecLimits,
114    #[allow(dead_code)]
115    scratch: CodecScratch,
116}
117
118impl<'a> SessionEncoder<'a> {
119    #[must_use]
120    pub fn new(schema: &'a schema::Schema, limits: &'a CodecLimits) -> Self {
121        Self {
122            schema,
123            limits,
124            scratch: CodecScratch::default(),
125        }
126    }
127
128    #[must_use]
129    pub fn schema(&self) -> &'a schema::Schema {
130        self.schema
131    }
132
133    #[must_use]
134    pub fn limits(&self) -> &'a CodecLimits {
135        self.limits
136    }
137}
138
139/// Encodes a delta snapshot from precomputed change lists.
140///
141/// Preferred for production engines (dirty/change-list driven). The scan-based
142/// encode paths remain as a convenience/fallback.
143pub fn encode_delta_from_changes(
144    session: &mut SessionEncoder<'_>,
145    tick: SnapshotTick,
146    baseline_tick: SnapshotTick,
147    creates: &[EntitySnapshot],
148    destroys: &[EntityId],
149    updates: &[DeltaUpdateEntity],
150    out: &mut [u8],
151) -> CodecResult<usize> {
152    encode_delta_snapshot_from_updates(
153        session.schema,
154        tick,
155        baseline_tick,
156        destroys,
157        creates,
158        updates,
159        session.limits,
160        out,
161    )
162}
163
164/// Encodes a delta snapshot using precomputed change lists.
165///
166/// This is a lower-level helper. Prefer `encode_delta_from_changes` with a
167/// `SessionEncoder` for production use.
168#[allow(clippy::too_many_arguments)]
169pub fn encode_delta_snapshot_from_updates(
170    schema: &schema::Schema,
171    tick: SnapshotTick,
172    baseline_tick: SnapshotTick,
173    destroys: &[EntityId],
174    creates: &[EntitySnapshot],
175    updates: &[DeltaUpdateEntity],
176    limits: &CodecLimits,
177    out: &mut [u8],
178) -> CodecResult<usize> {
179    if out.len() < wire::HEADER_SIZE {
180        return Err(CodecError::OutputTooSmall {
181            needed: wire::HEADER_SIZE,
182            available: out.len(),
183        });
184    }
185    let payload_len = encode_delta_payload_from_updates(
186        schema,
187        destroys,
188        creates,
189        updates,
190        limits,
191        &mut out[wire::HEADER_SIZE..],
192    )?;
193    let header = wire::PacketHeader::delta_snapshot(
194        schema_hash(schema),
195        tick.raw(),
196        baseline_tick.raw(),
197        payload_len as u32,
198    );
199    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
200        CodecError::OutputTooSmall {
201            needed: wire::HEADER_SIZE,
202            available: out.len(),
203        }
204    })?;
205    Ok(wire::HEADER_SIZE + payload_len)
206}
207
208/// Encodes a client delta snapshot using a compact session header.
209#[allow(clippy::too_many_arguments)]
210pub fn encode_delta_snapshot_for_client_session(
211    schema: &schema::Schema,
212    tick: SnapshotTick,
213    baseline_tick: SnapshotTick,
214    baseline: &Snapshot,
215    current: &Snapshot,
216    limits: &CodecLimits,
217    last_tick: &mut SnapshotTick,
218    out: &mut [u8],
219) -> CodecResult<usize> {
220    let mut scratch = CodecScratch::default();
221    encode_delta_snapshot_for_client_session_with_scratch(
222        schema,
223        tick,
224        baseline_tick,
225        baseline,
226        current,
227        limits,
228        &mut scratch,
229        last_tick,
230        out,
231    )
232}
233
234/// Encodes a delta snapshot for a client-specific view using reusable scratch buffers.
235#[allow(clippy::too_many_arguments)]
236pub fn encode_delta_snapshot_for_client_with_scratch(
237    schema: &schema::Schema,
238    tick: SnapshotTick,
239    baseline_tick: SnapshotTick,
240    baseline: &Snapshot,
241    current: &Snapshot,
242    limits: &CodecLimits,
243    scratch: &mut CodecScratch,
244    out: &mut [u8],
245) -> CodecResult<usize> {
246    encode_delta_snapshot_with_scratch_mode(
247        schema,
248        tick,
249        baseline_tick,
250        baseline,
251        current,
252        limits,
253        scratch,
254        out,
255        EncodeUpdateMode::Sparse,
256    )
257}
258
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260enum EncodeUpdateMode {
261    Auto,
262    Sparse,
263}
264
265/// Encodes a client delta snapshot using a compact session header.
266#[allow(clippy::too_many_arguments)]
267pub fn encode_delta_snapshot_for_client_session_with_scratch(
268    schema: &schema::Schema,
269    tick: SnapshotTick,
270    baseline_tick: SnapshotTick,
271    baseline: &Snapshot,
272    current: &Snapshot,
273    limits: &CodecLimits,
274    scratch: &mut CodecScratch,
275    last_tick: &mut SnapshotTick,
276    out: &mut [u8],
277) -> CodecResult<usize> {
278    let max_header = wire::SESSION_MAX_HEADER_SIZE;
279    if out.len() < max_header {
280        return Err(CodecError::OutputTooSmall {
281            needed: max_header,
282            available: out.len(),
283        });
284    }
285    if tick.raw() <= last_tick.raw() {
286        return Err(CodecError::InvalidEntityOrder {
287            previous: last_tick.raw(),
288            current: tick.raw(),
289        });
290    }
291    if baseline_tick.raw() > tick.raw() {
292        return Err(CodecError::BaselineTickMismatch {
293            expected: baseline_tick.raw(),
294            found: tick.raw(),
295        });
296    }
297    let payload_len = encode_delta_payload_with_mode(
298        schema,
299        baseline_tick,
300        baseline,
301        current,
302        limits,
303        scratch,
304        &mut out[max_header..],
305        EncodeUpdateMode::Sparse,
306    )?;
307    let tick_delta = tick.raw() - last_tick.raw();
308    let baseline_delta = tick.raw() - baseline_tick.raw();
309    let header_len = wire::encode_session_header(
310        &mut out[..max_header],
311        wire::SessionFlags::delta_snapshot(),
312        tick_delta,
313        baseline_delta,
314        payload_len as u32,
315    )
316    .map_err(|_| CodecError::OutputTooSmall {
317        needed: max_header,
318        available: out.len(),
319    })?;
320    if header_len < max_header {
321        let payload_start = max_header;
322        let payload_end = max_header + payload_len;
323        out.copy_within(payload_start..payload_end, header_len);
324    }
325    *last_tick = tick;
326    Ok(header_len + payload_len)
327}
328
329#[allow(clippy::too_many_arguments)]
330fn encode_delta_snapshot_with_scratch_mode(
331    schema: &schema::Schema,
332    tick: SnapshotTick,
333    baseline_tick: SnapshotTick,
334    baseline: &Snapshot,
335    current: &Snapshot,
336    limits: &CodecLimits,
337    scratch: &mut CodecScratch,
338    out: &mut [u8],
339    mode: EncodeUpdateMode,
340) -> CodecResult<usize> {
341    if out.len() < wire::HEADER_SIZE {
342        return Err(CodecError::OutputTooSmall {
343            needed: wire::HEADER_SIZE,
344            available: out.len(),
345        });
346    }
347    let payload_len = encode_delta_payload_with_mode(
348        schema,
349        baseline_tick,
350        baseline,
351        current,
352        limits,
353        scratch,
354        &mut out[wire::HEADER_SIZE..],
355        mode,
356    )?;
357    let header = wire::PacketHeader::delta_snapshot(
358        schema_hash(schema),
359        tick.raw(),
360        baseline_tick.raw(),
361        payload_len as u32,
362    );
363    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
364        CodecError::OutputTooSmall {
365            needed: wire::HEADER_SIZE,
366            available: out.len(),
367        }
368    })?;
369
370    Ok(wire::HEADER_SIZE + payload_len)
371}
372
373#[allow(clippy::too_many_arguments)]
374fn encode_delta_payload_with_mode(
375    schema: &schema::Schema,
376    baseline_tick: SnapshotTick,
377    baseline: &Snapshot,
378    current: &Snapshot,
379    limits: &CodecLimits,
380    scratch: &mut CodecScratch,
381    out: &mut [u8],
382    mode: EncodeUpdateMode,
383) -> CodecResult<usize> {
384    if baseline.tick != baseline_tick {
385        return Err(CodecError::BaselineTickMismatch {
386            expected: baseline.tick.raw(),
387            found: baseline_tick.raw(),
388        });
389    }
390
391    ensure_entities_sorted(&baseline.entities)?;
392    ensure_entities_sorted(&current.entities)?;
393
394    let mut counts = DiffCounts::default();
395    diff_counts(schema, baseline, current, limits, &mut counts)?;
396
397    if counts.creates > limits.max_entities_create {
398        return Err(CodecError::LimitsExceeded {
399            kind: LimitKind::EntitiesCreate,
400            limit: limits.max_entities_create,
401            actual: counts.creates,
402        });
403    }
404    if counts.updates > limits.max_entities_update {
405        return Err(CodecError::LimitsExceeded {
406            kind: LimitKind::EntitiesUpdate,
407            limit: limits.max_entities_update,
408            actual: counts.updates,
409        });
410    }
411    if counts.destroys > limits.max_entities_destroy {
412        return Err(CodecError::LimitsExceeded {
413            kind: LimitKind::EntitiesDestroy,
414            limit: limits.max_entities_destroy,
415            actual: counts.destroys,
416        });
417    }
418
419    let mut offset = 0;
420    if counts.destroys > 0 {
421        let written = write_section(
422            SectionTag::EntityDestroy,
423            &mut out[offset..],
424            limits,
425            |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
426        )?;
427        offset += written;
428    }
429    if counts.creates > 0 {
430        let written = write_section(
431            SectionTag::EntityCreate,
432            &mut out[offset..],
433            limits,
434            |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
435        )?;
436        offset += written;
437    }
438    if counts.updates > 0 {
439        let update_encoding = match mode {
440            EncodeUpdateMode::Auto => {
441                select_update_encoding(schema, baseline, current, limits, scratch)?
442            }
443            EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
444        };
445        let section_tag = match update_encoding {
446            UpdateEncoding::Masked => SectionTag::EntityUpdate,
447            UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
448        };
449        let written =
450            write_section(
451                section_tag,
452                &mut out[offset..],
453                limits,
454                |writer| match update_encoding {
455                    UpdateEncoding::Masked => encode_update_body_masked(
456                        schema,
457                        baseline,
458                        current,
459                        counts.updates,
460                        limits,
461                        scratch,
462                        writer,
463                    ),
464                    UpdateEncoding::Sparse => encode_update_body_sparse_packed(
465                        schema,
466                        baseline,
467                        current,
468                        counts.updates,
469                        limits,
470                        scratch,
471                        writer,
472                    ),
473                },
474            )?;
475        offset += written;
476    }
477
478    Ok(offset)
479}
480
481#[allow(clippy::too_many_arguments)]
482fn encode_delta_payload_from_updates(
483    schema: &schema::Schema,
484    destroys: &[EntityId],
485    creates: &[EntitySnapshot],
486    updates: &[DeltaUpdateEntity],
487    limits: &CodecLimits,
488    out: &mut [u8],
489) -> CodecResult<usize> {
490    if destroys.len() > limits.max_entities_destroy {
491        return Err(CodecError::LimitsExceeded {
492            kind: LimitKind::EntitiesDestroy,
493            limit: limits.max_entities_destroy,
494            actual: destroys.len(),
495        });
496    }
497    if creates.len() > limits.max_entities_create {
498        return Err(CodecError::LimitsExceeded {
499            kind: LimitKind::EntitiesCreate,
500            limit: limits.max_entities_create,
501            actual: creates.len(),
502        });
503    }
504    if updates.len() > limits.max_entities_update {
505        return Err(CodecError::LimitsExceeded {
506            kind: LimitKind::EntitiesUpdate,
507            limit: limits.max_entities_update,
508            actual: updates.len(),
509        });
510    }
511
512    ensure_entity_ids_sorted(destroys)?;
513    ensure_entities_sorted(creates)?;
514    validate_updates_for_encoding(schema, updates, limits)?;
515
516    let mut offset = 0;
517    if !destroys.is_empty() {
518        let written = write_section(
519            SectionTag::EntityDestroy,
520            &mut out[offset..],
521            limits,
522            |writer| encode_destroy_body_from_list(destroys, limits, writer),
523        )?;
524        offset += written;
525    }
526    if !creates.is_empty() {
527        let written = write_section(
528            SectionTag::EntityCreate,
529            &mut out[offset..],
530            limits,
531            |writer| encode_create_body_from_list(schema, creates, limits, writer),
532        )?;
533        offset += written;
534    }
535    if !updates.is_empty() {
536        let written = write_section(
537            SectionTag::EntityUpdateSparsePacked,
538            &mut out[offset..],
539            limits,
540            |writer| encode_update_body_sparse_packed_from_updates(schema, updates, limits, writer),
541        )?;
542        offset += written;
543    }
544
545    Ok(offset)
546}
547
548/// Applies a delta snapshot to a baseline snapshot.
549pub fn apply_delta_snapshot(
550    schema: &schema::Schema,
551    baseline: &Snapshot,
552    bytes: &[u8],
553    wire_limits: &wire::Limits,
554    limits: &CodecLimits,
555) -> CodecResult<Snapshot> {
556    let packet = decode_packet(bytes, wire_limits)?;
557    apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
558}
559
560/// Applies a delta snapshot from a parsed wire packet.
561pub fn apply_delta_snapshot_from_packet(
562    schema: &schema::Schema,
563    baseline: &Snapshot,
564    packet: &WirePacket<'_>,
565    limits: &CodecLimits,
566) -> CodecResult<Snapshot> {
567    let header = packet.header;
568    if !header.flags.is_delta_snapshot() {
569        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
570            flags: header.flags.raw(),
571        }));
572    }
573    if header.baseline_tick == 0 {
574        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
575            baseline_tick: header.baseline_tick,
576            flags: header.flags.raw(),
577        }));
578    }
579    if header.baseline_tick != baseline.tick.raw() {
580        return Err(CodecError::BaselineTickMismatch {
581            expected: baseline.tick.raw(),
582            found: header.baseline_tick,
583        });
584    }
585
586    let expected_hash = schema_hash(schema);
587    if header.schema_hash != expected_hash {
588        return Err(CodecError::SchemaMismatch {
589            expected: expected_hash,
590            found: header.schema_hash,
591        });
592    }
593
594    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
595
596    ensure_entities_sorted(&baseline.entities)?;
597    ensure_entities_sorted(&creates)?;
598
599    let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
600    remaining = apply_creates(remaining, creates)?;
601    if remaining.len() > limits.max_total_entities_after_apply {
602        return Err(CodecError::LimitsExceeded {
603            kind: LimitKind::TotalEntitiesAfterApply,
604            limit: limits.max_total_entities_after_apply,
605            actual: remaining.len(),
606        });
607    }
608    apply_updates(&mut remaining, &updates)?;
609
610    Ok(Snapshot {
611        tick: SnapshotTick::new(header.tick),
612        entities: remaining,
613    })
614}
615
616/// Decodes a delta packet without applying it to a baseline.
617pub fn decode_delta_packet(
618    schema: &schema::Schema,
619    packet: &WirePacket<'_>,
620    limits: &CodecLimits,
621) -> CodecResult<DeltaDecoded> {
622    let header = packet.header;
623    if !header.flags.is_delta_snapshot() {
624        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
625            flags: header.flags.raw(),
626        }));
627    }
628    if header.baseline_tick == 0 {
629        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
630            baseline_tick: header.baseline_tick,
631            flags: header.flags.raw(),
632        }));
633    }
634    let expected_hash = schema_hash(schema);
635    if header.schema_hash != expected_hash {
636        return Err(CodecError::SchemaMismatch {
637            expected: expected_hash,
638            found: header.schema_hash,
639        });
640    }
641
642    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
643
644    Ok(DeltaDecoded {
645        tick: SnapshotTick::new(header.tick),
646        baseline_tick: SnapshotTick::new(header.baseline_tick),
647        destroys,
648        creates,
649        updates,
650    })
651}
652
653#[derive(Default)]
654struct DiffCounts {
655    creates: usize,
656    updates: usize,
657    destroys: usize,
658}
659
660#[derive(Debug, Clone, Copy, PartialEq, Eq)]
661enum UpdateEncoding {
662    Masked,
663    Sparse,
664}
665
666fn diff_counts(
667    schema: &schema::Schema,
668    baseline: &Snapshot,
669    current: &Snapshot,
670    limits: &CodecLimits,
671    counts: &mut DiffCounts,
672) -> CodecResult<()> {
673    let mut i = 0usize;
674    let mut j = 0usize;
675    while i < baseline.entities.len() || j < current.entities.len() {
676        let base = baseline.entities.get(i);
677        let curr = current.entities.get(j);
678        match (base, curr) {
679            (Some(b), Some(c)) => {
680                if b.id.raw() < c.id.raw() {
681                    counts.destroys += 1;
682                    i += 1;
683                } else if b.id.raw() > c.id.raw() {
684                    counts.creates += 1;
685                    j += 1;
686                } else {
687                    if entity_has_updates(schema, b, c, limits)? {
688                        counts.updates += 1;
689                    }
690                    i += 1;
691                    j += 1;
692                }
693            }
694            (Some(_), None) => {
695                counts.destroys += 1;
696                i += 1;
697            }
698            (None, Some(_)) => {
699                counts.creates += 1;
700                j += 1;
701            }
702            (None, None) => break,
703        }
704    }
705    Ok(())
706}
707
708fn select_update_encoding(
709    schema: &schema::Schema,
710    baseline: &Snapshot,
711    current: &Snapshot,
712    limits: &CodecLimits,
713    scratch: &mut CodecScratch,
714) -> CodecResult<UpdateEncoding> {
715    let component_count = schema.components.len();
716    let mut mask_bits = 0usize;
717    let mut sparse_bits = 0usize;
718    let mut baseline_iter = baseline.entities.iter();
719    let mut current_iter = current.entities.iter();
720    let mut baseline_next = baseline_iter.next();
721    let mut current_next = current_iter.next();
722
723    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
724        match base.id.cmp(&curr.id) {
725            Ordering::Less => {
726                baseline_next = baseline_iter.next();
727            }
728            Ordering::Greater => {
729                current_next = current_iter.next();
730            }
731            Ordering::Equal => {
732                for component in &schema.components {
733                    let base_component = find_component(base, component.id);
734                    let curr_component = find_component(curr, component.id);
735                    if base_component.is_some() != curr_component.is_some() {
736                        return Err(CodecError::InvalidMask {
737                            kind: MaskKind::ComponentMask,
738                            reason: MaskReason::ComponentPresenceMismatch {
739                                component: component.id,
740                            },
741                        });
742                    }
743                    if let (Some(base_component), Some(curr_component)) =
744                        (base_component, curr_component)
745                    {
746                        if base_component.fields.len() != component.fields.len()
747                            || curr_component.fields.len() != component.fields.len()
748                        {
749                            return Err(CodecError::InvalidMask {
750                                kind: MaskKind::FieldMask {
751                                    component: component.id,
752                                },
753                                reason: MaskReason::FieldCountMismatch {
754                                    expected: component.fields.len(),
755                                    actual: base_component
756                                        .fields
757                                        .len()
758                                        .max(curr_component.fields.len()),
759                                },
760                            });
761                        }
762                        if component.fields.len() > limits.max_fields_per_component {
763                            return Err(CodecError::LimitsExceeded {
764                                kind: LimitKind::FieldsPerComponent,
765                                limit: limits.max_fields_per_component,
766                                actual: component.fields.len(),
767                            });
768                        }
769                        let (_, field_mask) = scratch
770                            .component_and_field_masks_mut(component_count, component.fields.len());
771                        // Use the same change predicate as masked/sparse encoding.
772                        let field_mask = compute_field_mask_into(
773                            component,
774                            base_component,
775                            curr_component,
776                            field_mask,
777                        )?;
778                        let changed = field_mask.iter().filter(|bit| **bit).count();
779                        if changed > 0 {
780                            let field_count = component.fields.len();
781                            let index_bits =
782                                required_bits(field_count.saturating_sub(1) as u64) as usize;
783                            // Compare per-component mask bits vs packed sparse index bits.
784                            // Values are sent in both encodings, so we only estimate index/mask + id overhead.
785                            mask_bits += field_count;
786                            sparse_bits += index_bits * changed;
787                            sparse_bits += varu32_len(curr.id.raw()) * 8;
788                            sparse_bits += varu32_len(component.id.get() as u32) * 8;
789                            sparse_bits += varu32_len(changed as u32) * 8;
790                        }
791                    }
792                }
793
794                baseline_next = baseline_iter.next();
795                current_next = current_iter.next();
796            }
797        }
798    }
799
800    if mask_bits == 0 {
801        return Ok(UpdateEncoding::Masked);
802    }
803
804    if sparse_bits <= mask_bits {
805        Ok(UpdateEncoding::Sparse)
806    } else {
807        Ok(UpdateEncoding::Masked)
808    }
809}
810
811fn varu32_len(value: u32) -> usize {
812    if value < (1 << 7) {
813        1
814    } else if value < (1 << 14) {
815        2
816    } else if value < (1 << 21) {
817        3
818    } else if value < (1 << 28) {
819        4
820    } else {
821        5
822    }
823}
824
825fn ensure_entity_ids_sorted(ids: &[EntityId]) -> CodecResult<()> {
826    let mut prev: Option<u32> = None;
827    for id in ids {
828        let raw = id.raw();
829        if let Some(prev_id) = prev {
830            if raw <= prev_id {
831                return Err(CodecError::InvalidEntityOrder {
832                    previous: prev_id,
833                    current: raw,
834                });
835            }
836        }
837        prev = Some(raw);
838    }
839    Ok(())
840}
841
842fn encode_destroy_body_from_list(
843    destroys: &[EntityId],
844    limits: &CodecLimits,
845    writer: &mut BitWriter<'_>,
846) -> CodecResult<()> {
847    if destroys.len() > limits.max_entities_destroy {
848        return Err(CodecError::LimitsExceeded {
849            kind: LimitKind::EntitiesDestroy,
850            limit: limits.max_entities_destroy,
851            actual: destroys.len(),
852        });
853    }
854    writer.align_to_byte()?;
855    writer.write_varu32(destroys.len() as u32)?;
856    for id in destroys {
857        writer.align_to_byte()?;
858        writer.write_u32_aligned(id.raw())?;
859    }
860    writer.align_to_byte()?;
861    Ok(())
862}
863
864fn encode_create_body_from_list(
865    schema: &schema::Schema,
866    creates: &[EntitySnapshot],
867    limits: &CodecLimits,
868    writer: &mut BitWriter<'_>,
869) -> CodecResult<()> {
870    if creates.len() > limits.max_entities_create {
871        return Err(CodecError::LimitsExceeded {
872            kind: LimitKind::EntitiesCreate,
873            limit: limits.max_entities_create,
874            actual: creates.len(),
875        });
876    }
877    writer.align_to_byte()?;
878    writer.write_varu32(creates.len() as u32)?;
879    for entity in creates {
880        write_create_entity(schema, entity, limits, writer)?;
881    }
882    writer.align_to_byte()?;
883    Ok(())
884}
885
886fn validate_updates_for_encoding(
887    schema: &schema::Schema,
888    updates: &[DeltaUpdateEntity],
889    limits: &CodecLimits,
890) -> CodecResult<()> {
891    let mut prev: Option<u32> = None;
892    for entity_update in updates {
893        let id = entity_update.id.raw();
894        if let Some(prev_id) = prev {
895            if id <= prev_id {
896                return Err(CodecError::InvalidEntityOrder {
897                    previous: prev_id,
898                    current: id,
899                });
900            }
901        }
902        prev = Some(id);
903        if entity_update.components.len() > limits.max_components_per_entity {
904            return Err(CodecError::LimitsExceeded {
905                kind: LimitKind::ComponentsPerEntity,
906                limit: limits.max_components_per_entity,
907                actual: entity_update.components.len(),
908            });
909        }
910        for component_update in &entity_update.components {
911            let component = schema
912                .components
913                .iter()
914                .find(|component| component.id == component_update.id)
915                .ok_or(CodecError::InvalidMask {
916                    kind: MaskKind::ComponentMask,
917                    reason: MaskReason::UnknownComponent {
918                        component: component_update.id,
919                    },
920                })?;
921            if component_update.fields.is_empty() {
922                return Err(CodecError::InvalidMask {
923                    kind: MaskKind::FieldMask {
924                        component: component_update.id,
925                    },
926                    reason: MaskReason::EmptyFieldMask {
927                        component: component_update.id,
928                    },
929                });
930            }
931            if component_update.fields.len() > limits.max_fields_per_component {
932                return Err(CodecError::LimitsExceeded {
933                    kind: LimitKind::FieldsPerComponent,
934                    limit: limits.max_fields_per_component,
935                    actual: component_update.fields.len(),
936                });
937            }
938            let max_index = component.fields.len().saturating_sub(1);
939            for (field_idx, _value) in &component_update.fields {
940                if *field_idx >= component.fields.len() {
941                    return Err(CodecError::InvalidMask {
942                        kind: MaskKind::FieldMask {
943                            component: component_update.id,
944                        },
945                        reason: MaskReason::InvalidFieldIndex {
946                            field_index: *field_idx,
947                            max: max_index,
948                        },
949                    });
950                }
951            }
952        }
953    }
954    Ok(())
955}
956
957fn encode_update_body_sparse_packed_from_updates(
958    schema: &schema::Schema,
959    updates: &[DeltaUpdateEntity],
960    limits: &CodecLimits,
961    writer: &mut BitWriter<'_>,
962) -> CodecResult<()> {
963    if updates.len() > limits.max_entities_update {
964        return Err(CodecError::LimitsExceeded {
965            kind: LimitKind::EntitiesUpdate,
966            limit: limits.max_entities_update,
967            actual: updates.len(),
968        });
969    }
970    let entry_count: usize = updates.iter().map(|entity| entity.components.len()).sum();
971    let entry_limit = limits
972        .max_entities_update
973        .saturating_mul(limits.max_components_per_entity);
974    if entry_count > entry_limit {
975        return Err(CodecError::LimitsExceeded {
976            kind: LimitKind::EntitiesUpdate,
977            limit: entry_limit,
978            actual: entry_count,
979        });
980    }
981    writer.align_to_byte()?;
982    writer.write_varu32(entry_count as u32)?;
983    for entity_update in updates {
984        let entity_id = entity_update.id.raw();
985        for component_update in &entity_update.components {
986            let component = schema
987                .components
988                .iter()
989                .find(|component| component.id == component_update.id)
990                .ok_or(CodecError::InvalidMask {
991                    kind: MaskKind::ComponentMask,
992                    reason: MaskReason::UnknownComponent {
993                        component: component_update.id,
994                    },
995                })?;
996            writer.align_to_byte()?;
997            writer.write_varu32(entity_id)?;
998            writer.write_varu32(component.id.get() as u32)?;
999            writer.write_varu32(component_update.fields.len() as u32)?;
1000            let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1001            for (field_idx, value) in &component_update.fields {
1002                if index_bits > 0 {
1003                    writer.write_bits(*field_idx as u64, index_bits)?;
1004                }
1005                write_field_value_sparse(
1006                    component.id,
1007                    component.fields[*field_idx],
1008                    *value,
1009                    writer,
1010                )?;
1011            }
1012        }
1013    }
1014    writer.align_to_byte()?;
1015    Ok(())
1016}
1017
1018fn encode_destroy_body(
1019    baseline: &Snapshot,
1020    current: &Snapshot,
1021    destroy_count: usize,
1022    limits: &CodecLimits,
1023    writer: &mut BitWriter<'_>,
1024) -> CodecResult<()> {
1025    if destroy_count > limits.max_entities_destroy {
1026        return Err(CodecError::LimitsExceeded {
1027            kind: LimitKind::EntitiesDestroy,
1028            limit: limits.max_entities_destroy,
1029            actual: destroy_count,
1030        });
1031    }
1032
1033    writer.align_to_byte()?;
1034    writer.write_varu32(destroy_count as u32)?;
1035
1036    let mut i = 0usize;
1037    let mut j = 0usize;
1038    while i < baseline.entities.len() || j < current.entities.len() {
1039        let base = baseline.entities.get(i);
1040        let curr = current.entities.get(j);
1041        match (base, curr) {
1042            (Some(b), Some(c)) => {
1043                if b.id.raw() < c.id.raw() {
1044                    writer.align_to_byte()?;
1045                    writer.write_u32_aligned(b.id.raw())?;
1046                    i += 1;
1047                } else if b.id.raw() > c.id.raw() {
1048                    j += 1;
1049                } else {
1050                    i += 1;
1051                    j += 1;
1052                }
1053            }
1054            (Some(b), None) => {
1055                writer.align_to_byte()?;
1056                writer.write_u32_aligned(b.id.raw())?;
1057                i += 1;
1058            }
1059            (None, Some(_)) => {
1060                j += 1;
1061            }
1062            (None, None) => break,
1063        }
1064    }
1065
1066    writer.align_to_byte()?;
1067    Ok(())
1068}
1069
1070fn encode_create_body(
1071    schema: &schema::Schema,
1072    baseline: &Snapshot,
1073    current: &Snapshot,
1074    create_count: usize,
1075    limits: &CodecLimits,
1076    writer: &mut BitWriter<'_>,
1077) -> CodecResult<()> {
1078    if create_count > limits.max_entities_create {
1079        return Err(CodecError::LimitsExceeded {
1080            kind: LimitKind::EntitiesCreate,
1081            limit: limits.max_entities_create,
1082            actual: create_count,
1083        });
1084    }
1085
1086    writer.align_to_byte()?;
1087    writer.write_varu32(create_count as u32)?;
1088
1089    let mut i = 0usize;
1090    let mut j = 0usize;
1091    while i < baseline.entities.len() || j < current.entities.len() {
1092        let base = baseline.entities.get(i);
1093        let curr = current.entities.get(j);
1094        match (base, curr) {
1095            (Some(b), Some(c)) => {
1096                if b.id.raw() < c.id.raw() {
1097                    i += 1;
1098                } else if b.id.raw() > c.id.raw() {
1099                    write_create_entity(schema, c, limits, writer)?;
1100                    j += 1;
1101                } else {
1102                    i += 1;
1103                    j += 1;
1104                }
1105            }
1106            (Some(_), None) => {
1107                i += 1;
1108            }
1109            (None, Some(c)) => {
1110                write_create_entity(schema, c, limits, writer)?;
1111                j += 1;
1112            }
1113            (None, None) => break,
1114        }
1115    }
1116
1117    writer.align_to_byte()?;
1118    Ok(())
1119}
1120
1121fn encode_update_body_masked(
1122    schema: &schema::Schema,
1123    baseline: &Snapshot,
1124    current: &Snapshot,
1125    update_count: usize,
1126    limits: &CodecLimits,
1127    scratch: &mut CodecScratch,
1128    writer: &mut BitWriter<'_>,
1129) -> CodecResult<()> {
1130    if update_count > limits.max_entities_update {
1131        return Err(CodecError::LimitsExceeded {
1132            kind: LimitKind::EntitiesUpdate,
1133            limit: limits.max_entities_update,
1134            actual: update_count,
1135        });
1136    }
1137
1138    writer.align_to_byte()?;
1139    writer.write_varu32(update_count as u32)?;
1140
1141    let mut i = 0usize;
1142    let mut j = 0usize;
1143    while i < baseline.entities.len() || j < current.entities.len() {
1144        let base = baseline.entities.get(i);
1145        let curr = current.entities.get(j);
1146        match (base, curr) {
1147            (Some(b), Some(c)) => {
1148                if b.id.raw() < c.id.raw() {
1149                    i += 1;
1150                } else if b.id.raw() > c.id.raw() {
1151                    j += 1;
1152                } else {
1153                    if entity_has_updates(schema, b, c, limits)? {
1154                        writer.align_to_byte()?;
1155                        writer.write_u32_aligned(c.id.raw())?;
1156                        ensure_component_presence_matches(schema, b, c)?;
1157                        write_update_components(schema, b, c, limits, scratch, writer)?;
1158                    }
1159                    i += 1;
1160                    j += 1;
1161                }
1162            }
1163            (Some(_), None) => i += 1,
1164            (None, Some(_)) => j += 1,
1165            (None, None) => break,
1166        }
1167    }
1168
1169    writer.align_to_byte()?;
1170    Ok(())
1171}
1172
1173#[allow(dead_code)]
1174fn encode_update_body_sparse_varint(
1175    schema: &schema::Schema,
1176    baseline: &Snapshot,
1177    current: &Snapshot,
1178    update_count: usize,
1179    limits: &CodecLimits,
1180    scratch: &mut CodecScratch,
1181    writer: &mut BitWriter<'_>,
1182) -> CodecResult<()> {
1183    if update_count > limits.max_entities_update {
1184        return Err(CodecError::LimitsExceeded {
1185            kind: LimitKind::EntitiesUpdate,
1186            limit: limits.max_entities_update,
1187            actual: update_count,
1188        });
1189    }
1190
1191    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1192    let entry_limit = limits
1193        .max_entities_update
1194        .saturating_mul(limits.max_components_per_entity);
1195    if entry_count > entry_limit {
1196        return Err(CodecError::LimitsExceeded {
1197            kind: LimitKind::EntitiesUpdate,
1198            limit: entry_limit,
1199            actual: entry_count,
1200        });
1201    }
1202
1203    writer.align_to_byte()?;
1204    writer.write_varu32(entry_count as u32)?;
1205
1206    let mut baseline_iter = baseline.entities.iter();
1207    let mut current_iter = current.entities.iter();
1208    let mut baseline_next = baseline_iter.next();
1209    let mut current_next = current_iter.next();
1210    let component_count = schema.components.len();
1211
1212    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1213        match base.id.cmp(&curr.id) {
1214            Ordering::Less => {
1215                baseline_next = baseline_iter.next();
1216            }
1217            Ordering::Greater => {
1218                current_next = current_iter.next();
1219            }
1220            Ordering::Equal => {
1221                if entity_has_updates(schema, base, curr, limits)? {
1222                    for component in &schema.components {
1223                        let base_component = find_component(base, component.id);
1224                        let curr_component = find_component(curr, component.id);
1225                        if base_component.is_some() != curr_component.is_some() {
1226                            return Err(CodecError::InvalidMask {
1227                                kind: MaskKind::ComponentMask,
1228                                reason: MaskReason::ComponentPresenceMismatch {
1229                                    component: component.id,
1230                                },
1231                            });
1232                        }
1233                        if let (Some(base_component), Some(curr_component)) =
1234                            (base_component, curr_component)
1235                        {
1236                            if base_component.fields.len() != component.fields.len()
1237                                || curr_component.fields.len() != component.fields.len()
1238                            {
1239                                return Err(CodecError::InvalidMask {
1240                                    kind: MaskKind::FieldMask {
1241                                        component: component.id,
1242                                    },
1243                                    reason: MaskReason::FieldCountMismatch {
1244                                        expected: component.fields.len(),
1245                                        actual: base_component
1246                                            .fields
1247                                            .len()
1248                                            .max(curr_component.fields.len()),
1249                                    },
1250                                });
1251                            }
1252                            if component.fields.len() > limits.max_fields_per_component {
1253                                return Err(CodecError::LimitsExceeded {
1254                                    kind: LimitKind::FieldsPerComponent,
1255                                    limit: limits.max_fields_per_component,
1256                                    actual: component.fields.len(),
1257                                });
1258                            }
1259                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1260                                component_count,
1261                                component.fields.len(),
1262                            );
1263                            let field_mask = compute_field_mask_into(
1264                                component,
1265                                base_component,
1266                                curr_component,
1267                                field_mask,
1268                            )?;
1269                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1270                            if changed_fields > 0 {
1271                                writer.align_to_byte()?;
1272                                writer.write_u32_aligned(curr.id.raw())?;
1273                                writer.write_u16_aligned(component.id.get())?;
1274                                writer.write_varu32(changed_fields as u32)?;
1275                                for (idx, field) in component.fields.iter().enumerate() {
1276                                    if field_mask[idx] {
1277                                        writer.align_to_byte()?;
1278                                        writer.write_varu32(idx as u32)?;
1279                                        write_field_value_sparse(
1280                                            component.id,
1281                                            *field,
1282                                            curr_component.fields[idx],
1283                                            writer,
1284                                        )?;
1285                                    }
1286                                }
1287                            }
1288                        }
1289                    }
1290                }
1291                baseline_next = baseline_iter.next();
1292                current_next = current_iter.next();
1293            }
1294        }
1295    }
1296
1297    writer.align_to_byte()?;
1298    Ok(())
1299}
1300
1301fn encode_update_body_sparse_packed(
1302    schema: &schema::Schema,
1303    baseline: &Snapshot,
1304    current: &Snapshot,
1305    update_count: usize,
1306    limits: &CodecLimits,
1307    scratch: &mut CodecScratch,
1308    writer: &mut BitWriter<'_>,
1309) -> CodecResult<()> {
1310    if update_count > limits.max_entities_update {
1311        return Err(CodecError::LimitsExceeded {
1312            kind: LimitKind::EntitiesUpdate,
1313            limit: limits.max_entities_update,
1314            actual: update_count,
1315        });
1316    }
1317
1318    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1319    let entry_limit = limits
1320        .max_entities_update
1321        .saturating_mul(limits.max_components_per_entity);
1322    if entry_count > entry_limit {
1323        return Err(CodecError::LimitsExceeded {
1324            kind: LimitKind::EntitiesUpdate,
1325            limit: entry_limit,
1326            actual: entry_count,
1327        });
1328    }
1329
1330    writer.align_to_byte()?;
1331    writer.write_varu32(entry_count as u32)?;
1332
1333    let mut baseline_iter = baseline.entities.iter();
1334    let mut current_iter = current.entities.iter();
1335    let mut baseline_next = baseline_iter.next();
1336    let mut current_next = current_iter.next();
1337    let component_count = schema.components.len();
1338
1339    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1340        match base.id.cmp(&curr.id) {
1341            Ordering::Less => {
1342                baseline_next = baseline_iter.next();
1343            }
1344            Ordering::Greater => {
1345                current_next = current_iter.next();
1346            }
1347            Ordering::Equal => {
1348                if entity_has_updates(schema, base, curr, limits)? {
1349                    for component in &schema.components {
1350                        let base_component = find_component(base, component.id);
1351                        let curr_component = find_component(curr, component.id);
1352                        if base_component.is_some() != curr_component.is_some() {
1353                            return Err(CodecError::InvalidMask {
1354                                kind: MaskKind::ComponentMask,
1355                                reason: MaskReason::ComponentPresenceMismatch {
1356                                    component: component.id,
1357                                },
1358                            });
1359                        }
1360                        if let (Some(base_component), Some(curr_component)) =
1361                            (base_component, curr_component)
1362                        {
1363                            if base_component.fields.len() != component.fields.len()
1364                                || curr_component.fields.len() != component.fields.len()
1365                            {
1366                                return Err(CodecError::InvalidMask {
1367                                    kind: MaskKind::FieldMask {
1368                                        component: component.id,
1369                                    },
1370                                    reason: MaskReason::FieldCountMismatch {
1371                                        expected: component.fields.len(),
1372                                        actual: base_component
1373                                            .fields
1374                                            .len()
1375                                            .max(curr_component.fields.len()),
1376                                    },
1377                                });
1378                            }
1379                            if component.fields.len() > limits.max_fields_per_component {
1380                                return Err(CodecError::LimitsExceeded {
1381                                    kind: LimitKind::FieldsPerComponent,
1382                                    limit: limits.max_fields_per_component,
1383                                    actual: component.fields.len(),
1384                                });
1385                            }
1386                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1387                                component_count,
1388                                component.fields.len(),
1389                            );
1390                            let field_mask = compute_field_mask_into(
1391                                component,
1392                                base_component,
1393                                curr_component,
1394                                field_mask,
1395                            )?;
1396                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1397                            if changed_fields > 0 {
1398                                writer.align_to_byte()?;
1399                                writer.write_varu32(curr.id.raw())?;
1400                                writer.write_varu32(component.id.get() as u32)?;
1401                                writer.write_varu32(changed_fields as u32)?;
1402                                let index_bits =
1403                                    required_bits(component.fields.len().saturating_sub(1) as u64);
1404                                for (idx, field) in component.fields.iter().enumerate() {
1405                                    if field_mask[idx] {
1406                                        if index_bits > 0 {
1407                                            writer.write_bits(idx as u64, index_bits)?;
1408                                        }
1409                                        write_field_value_sparse(
1410                                            component.id,
1411                                            *field,
1412                                            curr_component.fields[idx],
1413                                            writer,
1414                                        )?;
1415                                    }
1416                                }
1417                            }
1418                        }
1419                    }
1420                }
1421                baseline_next = baseline_iter.next();
1422                current_next = current_iter.next();
1423            }
1424        }
1425    }
1426
1427    writer.align_to_byte()?;
1428    Ok(())
1429}
1430
1431fn count_sparse_update_entries(
1432    schema: &schema::Schema,
1433    baseline: &Snapshot,
1434    current: &Snapshot,
1435    limits: &CodecLimits,
1436    scratch: &mut CodecScratch,
1437) -> CodecResult<usize> {
1438    let component_count = schema.components.len();
1439    let mut count = 0usize;
1440    let mut baseline_iter = baseline.entities.iter();
1441    let mut current_iter = current.entities.iter();
1442    let mut baseline_next = baseline_iter.next();
1443    let mut current_next = current_iter.next();
1444
1445    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1446        match base.id.cmp(&curr.id) {
1447            Ordering::Less => baseline_next = baseline_iter.next(),
1448            Ordering::Greater => current_next = current_iter.next(),
1449            Ordering::Equal => {
1450                if entity_has_updates(schema, base, curr, limits)? {
1451                    for component in &schema.components {
1452                        let base_component = find_component(base, component.id);
1453                        let curr_component = find_component(curr, component.id);
1454                        if base_component.is_some() != curr_component.is_some() {
1455                            return Err(CodecError::InvalidMask {
1456                                kind: MaskKind::ComponentMask,
1457                                reason: MaskReason::ComponentPresenceMismatch {
1458                                    component: component.id,
1459                                },
1460                            });
1461                        }
1462                        if let (Some(base_component), Some(curr_component)) =
1463                            (base_component, curr_component)
1464                        {
1465                            if base_component.fields.len() != component.fields.len()
1466                                || curr_component.fields.len() != component.fields.len()
1467                            {
1468                                return Err(CodecError::InvalidMask {
1469                                    kind: MaskKind::FieldMask {
1470                                        component: component.id,
1471                                    },
1472                                    reason: MaskReason::FieldCountMismatch {
1473                                        expected: component.fields.len(),
1474                                        actual: base_component
1475                                            .fields
1476                                            .len()
1477                                            .max(curr_component.fields.len()),
1478                                    },
1479                                });
1480                            }
1481                            if component.fields.len() > limits.max_fields_per_component {
1482                                return Err(CodecError::LimitsExceeded {
1483                                    kind: LimitKind::FieldsPerComponent,
1484                                    limit: limits.max_fields_per_component,
1485                                    actual: component.fields.len(),
1486                                });
1487                            }
1488                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1489                                component_count,
1490                                component.fields.len(),
1491                            );
1492                            let field_mask = compute_field_mask_into(
1493                                component,
1494                                base_component,
1495                                curr_component,
1496                                field_mask,
1497                            )?;
1498                            if field_mask.iter().any(|bit| *bit) {
1499                                count += 1;
1500                            }
1501                        }
1502                    }
1503                }
1504                baseline_next = baseline_iter.next();
1505                current_next = current_iter.next();
1506            }
1507        }
1508    }
1509
1510    Ok(count)
1511}
1512
1513fn write_create_entity(
1514    schema: &schema::Schema,
1515    entity: &EntitySnapshot,
1516    limits: &CodecLimits,
1517    writer: &mut BitWriter<'_>,
1518) -> CodecResult<()> {
1519    writer.align_to_byte()?;
1520    writer.write_u32_aligned(entity.id.raw())?;
1521    ensure_known_components(schema, entity)?;
1522    write_component_mask(schema, entity, writer)?;
1523    for component in schema.components.iter() {
1524        if let Some(snapshot) = find_component(entity, component.id) {
1525            write_full_component(component, snapshot, limits, writer)?;
1526        }
1527    }
1528    Ok(())
1529}
1530
1531fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1532    if body.len() > limits.max_section_bytes {
1533        return Err(CodecError::LimitsExceeded {
1534            kind: LimitKind::SectionBytes,
1535            limit: limits.max_section_bytes,
1536            actual: body.len(),
1537        });
1538    }
1539
1540    let mut reader = BitReader::new(body);
1541    reader.align_to_byte()?;
1542    let count = reader.read_varu32()? as usize;
1543    if count > limits.max_entities_destroy {
1544        return Err(CodecError::LimitsExceeded {
1545            kind: LimitKind::EntitiesDestroy,
1546            limit: limits.max_entities_destroy,
1547            actual: count,
1548        });
1549    }
1550
1551    let mut ids = Vec::with_capacity(count);
1552    let mut prev: Option<u32> = None;
1553    for _ in 0..count {
1554        reader.align_to_byte()?;
1555        let id = reader.read_u32_aligned()?;
1556        if let Some(prev_id) = prev {
1557            if id <= prev_id {
1558                return Err(CodecError::InvalidEntityOrder {
1559                    previous: prev_id,
1560                    current: id,
1561                });
1562            }
1563        }
1564        prev = Some(id);
1565        ids.push(EntityId::new(id));
1566    }
1567    reader.align_to_byte()?;
1568    if reader.bits_remaining() != 0 {
1569        return Err(CodecError::TrailingSectionData {
1570            section: SectionTag::EntityDestroy,
1571            remaining_bits: reader.bits_remaining(),
1572        });
1573    }
1574    Ok(ids)
1575}
1576
1577fn decode_create_section(
1578    schema: &schema::Schema,
1579    body: &[u8],
1580    limits: &CodecLimits,
1581) -> CodecResult<Vec<EntitySnapshot>> {
1582    if body.len() > limits.max_section_bytes {
1583        return Err(CodecError::LimitsExceeded {
1584            kind: LimitKind::SectionBytes,
1585            limit: limits.max_section_bytes,
1586            actual: body.len(),
1587        });
1588    }
1589
1590    let mut reader = BitReader::new(body);
1591    reader.align_to_byte()?;
1592    let count = reader.read_varu32()? as usize;
1593    if count > limits.max_entities_create {
1594        return Err(CodecError::LimitsExceeded {
1595            kind: LimitKind::EntitiesCreate,
1596            limit: limits.max_entities_create,
1597            actual: count,
1598        });
1599    }
1600
1601    let mut entities = Vec::with_capacity(count);
1602    let mut prev: Option<u32> = None;
1603    for _ in 0..count {
1604        reader.align_to_byte()?;
1605        let id = reader.read_u32_aligned()?;
1606        if let Some(prev_id) = prev {
1607            if id <= prev_id {
1608                return Err(CodecError::InvalidEntityOrder {
1609                    previous: prev_id,
1610                    current: id,
1611                });
1612            }
1613        }
1614        prev = Some(id);
1615
1616        let component_mask = read_mask(
1617            &mut reader,
1618            schema.components.len(),
1619            MaskKind::ComponentMask,
1620        )?;
1621
1622        let mut components = Vec::new();
1623        for (idx, component) in schema.components.iter().enumerate() {
1624            if component_mask[idx] {
1625                let fields = decode_full_component(component, &mut reader, limits)?;
1626                components.push(ComponentSnapshot {
1627                    id: component.id,
1628                    fields,
1629                });
1630            }
1631        }
1632
1633        let entity = EntitySnapshot {
1634            id: EntityId::new(id),
1635            components,
1636        };
1637        ensure_known_components(schema, &entity)?;
1638        entities.push(entity);
1639    }
1640
1641    reader.align_to_byte()?;
1642    if reader.bits_remaining() != 0 {
1643        return Err(CodecError::TrailingSectionData {
1644            section: SectionTag::EntityCreate,
1645            remaining_bits: reader.bits_remaining(),
1646        });
1647    }
1648    Ok(entities)
1649}
1650
1651fn decode_update_section_masked(
1652    schema: &schema::Schema,
1653    body: &[u8],
1654    limits: &CodecLimits,
1655) -> CodecResult<Vec<DeltaUpdateEntity>> {
1656    if body.len() > limits.max_section_bytes {
1657        return Err(CodecError::LimitsExceeded {
1658            kind: LimitKind::SectionBytes,
1659            limit: limits.max_section_bytes,
1660            actual: body.len(),
1661        });
1662    }
1663
1664    let mut reader = BitReader::new(body);
1665    reader.align_to_byte()?;
1666    let count = reader.read_varu32()? as usize;
1667    if count > limits.max_entities_update {
1668        return Err(CodecError::LimitsExceeded {
1669            kind: LimitKind::EntitiesUpdate,
1670            limit: limits.max_entities_update,
1671            actual: count,
1672        });
1673    }
1674
1675    let mut updates = Vec::with_capacity(count);
1676    let mut prev: Option<u32> = None;
1677    for _ in 0..count {
1678        reader.align_to_byte()?;
1679        let id = reader.read_u32_aligned()?;
1680        if let Some(prev_id) = prev {
1681            if id <= prev_id {
1682                return Err(CodecError::InvalidEntityOrder {
1683                    previous: prev_id,
1684                    current: id,
1685                });
1686            }
1687        }
1688        prev = Some(id);
1689
1690        let component_mask = read_mask(
1691            &mut reader,
1692            schema.components.len(),
1693            MaskKind::ComponentMask,
1694        )?;
1695        let mut components = Vec::new();
1696        for (idx, component) in schema.components.iter().enumerate() {
1697            if component_mask[idx] {
1698                let fields = decode_update_component(component, &mut reader, limits)?;
1699                components.push(DeltaUpdateComponent {
1700                    id: component.id,
1701                    fields,
1702                });
1703            }
1704        }
1705
1706        updates.push(DeltaUpdateEntity {
1707            id: EntityId::new(id),
1708            components,
1709        });
1710    }
1711
1712    reader.align_to_byte()?;
1713    if reader.bits_remaining() != 0 {
1714        return Err(CodecError::TrailingSectionData {
1715            section: SectionTag::EntityUpdate,
1716            remaining_bits: reader.bits_remaining(),
1717        });
1718    }
1719    Ok(updates)
1720}
1721
1722fn decode_update_section_sparse_varint(
1723    schema: &schema::Schema,
1724    body: &[u8],
1725    limits: &CodecLimits,
1726) -> CodecResult<Vec<DeltaUpdateEntity>> {
1727    if body.len() > limits.max_section_bytes {
1728        return Err(CodecError::LimitsExceeded {
1729            kind: LimitKind::SectionBytes,
1730            limit: limits.max_section_bytes,
1731            actual: body.len(),
1732        });
1733    }
1734
1735    let mut reader = BitReader::new(body);
1736    reader.align_to_byte()?;
1737    let entry_count = reader.read_varu32()? as usize;
1738    let entry_limit = limits
1739        .max_entities_update
1740        .saturating_mul(limits.max_components_per_entity);
1741    if entry_count > entry_limit {
1742        return Err(CodecError::LimitsExceeded {
1743            kind: LimitKind::EntitiesUpdate,
1744            limit: entry_limit,
1745            actual: entry_count,
1746        });
1747    }
1748
1749    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1750    let mut prev_entity: Option<u32> = None;
1751    let mut prev_component: Option<u16> = None;
1752    for _ in 0..entry_count {
1753        reader.align_to_byte()?;
1754        let entity_id = reader.read_u32_aligned()?;
1755        let component_raw = reader.read_u16_aligned()?;
1756        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1757            kind: MaskKind::ComponentMask,
1758            reason: MaskReason::InvalidComponentId { raw: component_raw },
1759        })?;
1760        if let Some(prev) = prev_entity {
1761            if entity_id < prev {
1762                return Err(CodecError::InvalidEntityOrder {
1763                    previous: prev,
1764                    current: entity_id,
1765                });
1766            }
1767            if entity_id == prev {
1768                if let Some(prev_component) = prev_component {
1769                    if component_raw <= prev_component {
1770                        return Err(CodecError::InvalidEntityOrder {
1771                            previous: prev,
1772                            current: entity_id,
1773                        });
1774                    }
1775                }
1776            }
1777        }
1778        prev_entity = Some(entity_id);
1779        prev_component = Some(component_raw);
1780
1781        let component = schema
1782            .components
1783            .iter()
1784            .find(|component| component.id == component_id)
1785            .ok_or(CodecError::InvalidMask {
1786                kind: MaskKind::ComponentMask,
1787                reason: MaskReason::UnknownComponent {
1788                    component: component_id,
1789                },
1790            })?;
1791
1792        let field_count = reader.read_varu32()? as usize;
1793        if field_count == 0 {
1794            return Err(CodecError::InvalidMask {
1795                kind: MaskKind::FieldMask {
1796                    component: component.id,
1797                },
1798                reason: MaskReason::EmptyFieldMask {
1799                    component: component.id,
1800                },
1801            });
1802        }
1803        if field_count > limits.max_fields_per_component {
1804            return Err(CodecError::LimitsExceeded {
1805                kind: LimitKind::FieldsPerComponent,
1806                limit: limits.max_fields_per_component,
1807                actual: field_count,
1808            });
1809        }
1810        if field_count > component.fields.len() {
1811            return Err(CodecError::InvalidMask {
1812                kind: MaskKind::FieldMask {
1813                    component: component.id,
1814                },
1815                reason: MaskReason::FieldCountMismatch {
1816                    expected: component.fields.len(),
1817                    actual: field_count,
1818                },
1819            });
1820        }
1821
1822        let mut fields = Vec::with_capacity(field_count);
1823        let mut prev_index: Option<usize> = None;
1824        for _ in 0..field_count {
1825            reader.align_to_byte()?;
1826            let field_index = reader.read_varu32()? as usize;
1827            if field_index >= component.fields.len() {
1828                return Err(CodecError::InvalidMask {
1829                    kind: MaskKind::FieldMask {
1830                        component: component.id,
1831                    },
1832                    reason: MaskReason::InvalidFieldIndex {
1833                        field_index,
1834                        max: component.fields.len(),
1835                    },
1836                });
1837            }
1838            if let Some(prev_index) = prev_index {
1839                if field_index <= prev_index {
1840                    return Err(CodecError::InvalidMask {
1841                        kind: MaskKind::FieldMask {
1842                            component: component.id,
1843                        },
1844                        reason: MaskReason::InvalidFieldIndex {
1845                            field_index,
1846                            max: component.fields.len(),
1847                        },
1848                    });
1849                }
1850            }
1851            prev_index = Some(field_index);
1852            let field = component.fields[field_index];
1853            let value = read_field_value_sparse(component.id, field, &mut reader)?;
1854            fields.push((field_index, value));
1855        }
1856
1857        if let Some(last) = updates.last_mut() {
1858            if last.id.raw() == entity_id {
1859                last.components.push(DeltaUpdateComponent {
1860                    id: component.id,
1861                    fields,
1862                });
1863                continue;
1864            }
1865        }
1866
1867        updates.push(DeltaUpdateEntity {
1868            id: EntityId::new(entity_id),
1869            components: vec![DeltaUpdateComponent {
1870                id: component.id,
1871                fields,
1872            }],
1873        });
1874    }
1875
1876    reader.align_to_byte()?;
1877    if reader.bits_remaining() != 0 {
1878        return Err(CodecError::TrailingSectionData {
1879            section: SectionTag::EntityUpdateSparse,
1880            remaining_bits: reader.bits_remaining(),
1881        });
1882    }
1883
1884    let unique_entities = updates.len();
1885    if unique_entities > limits.max_entities_update {
1886        return Err(CodecError::LimitsExceeded {
1887            kind: LimitKind::EntitiesUpdate,
1888            limit: limits.max_entities_update,
1889            actual: unique_entities,
1890        });
1891    }
1892
1893    Ok(updates)
1894}
1895
1896fn decode_update_section_sparse_packed(
1897    schema: &schema::Schema,
1898    body: &[u8],
1899    limits: &CodecLimits,
1900) -> CodecResult<Vec<DeltaUpdateEntity>> {
1901    if body.len() > limits.max_section_bytes {
1902        return Err(CodecError::LimitsExceeded {
1903            kind: LimitKind::SectionBytes,
1904            limit: limits.max_section_bytes,
1905            actual: body.len(),
1906        });
1907    }
1908
1909    let mut reader = BitReader::new(body);
1910    reader.align_to_byte()?;
1911    let entry_count = reader.read_varu32()? as usize;
1912    let entry_limit = limits
1913        .max_entities_update
1914        .saturating_mul(limits.max_components_per_entity);
1915    if entry_count > entry_limit {
1916        return Err(CodecError::LimitsExceeded {
1917            kind: LimitKind::EntitiesUpdate,
1918            limit: entry_limit,
1919            actual: entry_count,
1920        });
1921    }
1922
1923    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1924    let mut prev_entity: Option<u32> = None;
1925    let mut prev_component: Option<u16> = None;
1926    for _ in 0..entry_count {
1927        reader.align_to_byte()?;
1928        let entity_id = reader.read_varu32()?;
1929        let component_raw = reader.read_varu32()?;
1930        if component_raw > u16::MAX as u32 {
1931            return Err(CodecError::InvalidMask {
1932                kind: MaskKind::ComponentMask,
1933                reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1934            });
1935        }
1936        let component_raw = component_raw as u16;
1937        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1938            kind: MaskKind::ComponentMask,
1939            reason: MaskReason::InvalidComponentId { raw: component_raw },
1940        })?;
1941        if let Some(prev) = prev_entity {
1942            if entity_id < prev {
1943                return Err(CodecError::InvalidEntityOrder {
1944                    previous: prev,
1945                    current: entity_id,
1946                });
1947            }
1948            if entity_id == prev {
1949                if let Some(prev_component) = prev_component {
1950                    if component_raw <= prev_component {
1951                        return Err(CodecError::InvalidEntityOrder {
1952                            previous: prev,
1953                            current: entity_id,
1954                        });
1955                    }
1956                }
1957            }
1958        }
1959        prev_entity = Some(entity_id);
1960        prev_component = Some(component_raw);
1961
1962        let component = schema
1963            .components
1964            .iter()
1965            .find(|component| component.id == component_id)
1966            .ok_or(CodecError::InvalidMask {
1967                kind: MaskKind::ComponentMask,
1968                reason: MaskReason::UnknownComponent {
1969                    component: component_id,
1970                },
1971            })?;
1972
1973        let field_count = reader.read_varu32()? as usize;
1974        if field_count == 0 {
1975            return Err(CodecError::InvalidMask {
1976                kind: MaskKind::FieldMask {
1977                    component: component.id,
1978                },
1979                reason: MaskReason::EmptyFieldMask {
1980                    component: component.id,
1981                },
1982            });
1983        }
1984        if field_count > limits.max_fields_per_component {
1985            return Err(CodecError::LimitsExceeded {
1986                kind: LimitKind::FieldsPerComponent,
1987                limit: limits.max_fields_per_component,
1988                actual: field_count,
1989            });
1990        }
1991        if field_count > component.fields.len() {
1992            return Err(CodecError::InvalidMask {
1993                kind: MaskKind::FieldMask {
1994                    component: component.id,
1995                },
1996                reason: MaskReason::FieldCountMismatch {
1997                    expected: component.fields.len(),
1998                    actual: field_count,
1999                },
2000            });
2001        }
2002
2003        let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
2004        let mut fields = Vec::with_capacity(field_count);
2005        let mut prev_index: Option<usize> = None;
2006        for _ in 0..field_count {
2007            let field_index = if index_bits == 0 {
2008                0
2009            } else {
2010                reader.read_bits(index_bits)? as usize
2011            };
2012            if field_index >= component.fields.len() {
2013                return Err(CodecError::InvalidMask {
2014                    kind: MaskKind::FieldMask {
2015                        component: component.id,
2016                    },
2017                    reason: MaskReason::InvalidFieldIndex {
2018                        field_index,
2019                        max: component.fields.len(),
2020                    },
2021                });
2022            }
2023            if let Some(prev_index) = prev_index {
2024                if field_index <= prev_index {
2025                    return Err(CodecError::InvalidMask {
2026                        kind: MaskKind::FieldMask {
2027                            component: component.id,
2028                        },
2029                        reason: MaskReason::InvalidFieldIndex {
2030                            field_index,
2031                            max: component.fields.len(),
2032                        },
2033                    });
2034                }
2035            }
2036            prev_index = Some(field_index);
2037            let field = component.fields[field_index];
2038            let value = read_field_value_sparse(component.id, field, &mut reader)?;
2039            fields.push((field_index, value));
2040        }
2041
2042        if let Some(last) = updates.last_mut() {
2043            if last.id.raw() == entity_id {
2044                last.components.push(DeltaUpdateComponent {
2045                    id: component.id,
2046                    fields,
2047                });
2048                continue;
2049            }
2050        }
2051
2052        updates.push(DeltaUpdateEntity {
2053            id: EntityId::new(entity_id),
2054            components: vec![DeltaUpdateComponent {
2055                id: component.id,
2056                fields,
2057            }],
2058        });
2059    }
2060
2061    reader.align_to_byte()?;
2062    if reader.bits_remaining() != 0 {
2063        return Err(CodecError::TrailingSectionData {
2064            section: SectionTag::EntityUpdateSparsePacked,
2065            remaining_bits: reader.bits_remaining(),
2066        });
2067    }
2068
2069    let unique_entities = updates.len();
2070    if unique_entities > limits.max_entities_update {
2071        return Err(CodecError::LimitsExceeded {
2072            kind: LimitKind::EntitiesUpdate,
2073            limit: limits.max_entities_update,
2074            actual: unique_entities,
2075        });
2076    }
2077
2078    Ok(updates)
2079}
2080
2081fn decode_delta_sections(
2082    schema: &schema::Schema,
2083    packet: &WirePacket<'_>,
2084    limits: &CodecLimits,
2085) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
2086    let mut destroys: Option<Vec<EntityId>> = None;
2087    let mut creates: Option<Vec<EntitySnapshot>> = None;
2088    let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
2089    let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
2090
2091    for section in &packet.sections {
2092        match section.tag {
2093            SectionTag::EntityDestroy => {
2094                if destroys.is_some() {
2095                    return Err(CodecError::DuplicateSection {
2096                        section: section.tag,
2097                    });
2098                }
2099                destroys = Some(decode_destroy_section(section.body, limits)?);
2100            }
2101            SectionTag::EntityCreate => {
2102                if creates.is_some() {
2103                    return Err(CodecError::DuplicateSection {
2104                        section: section.tag,
2105                    });
2106                }
2107                creates = Some(decode_create_section(schema, section.body, limits)?);
2108            }
2109            SectionTag::EntityUpdate => {
2110                if updates_masked.is_some() {
2111                    return Err(CodecError::DuplicateSection {
2112                        section: section.tag,
2113                    });
2114                }
2115                updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
2116            }
2117            SectionTag::EntityUpdateSparse => {
2118                if updates_sparse.is_some() {
2119                    return Err(CodecError::DuplicateSection {
2120                        section: section.tag,
2121                    });
2122                }
2123                updates_sparse = Some(decode_update_section_sparse_varint(
2124                    schema,
2125                    section.body,
2126                    limits,
2127                )?);
2128            }
2129            SectionTag::EntityUpdateSparsePacked => {
2130                if updates_sparse.is_some() {
2131                    return Err(CodecError::DuplicateSection {
2132                        section: section.tag,
2133                    });
2134                }
2135                updates_sparse = Some(decode_update_section_sparse_packed(
2136                    schema,
2137                    section.body,
2138                    limits,
2139                )?);
2140            }
2141            _ => {
2142                return Err(CodecError::UnexpectedSection {
2143                    section: section.tag,
2144                });
2145            }
2146        }
2147    }
2148
2149    Ok((
2150        destroys.unwrap_or_default(),
2151        creates.unwrap_or_default(),
2152        match (updates_masked, updates_sparse) {
2153            (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
2154            (Some(updates), None) => updates,
2155            (None, Some(updates)) => updates,
2156            (None, None) => Vec::new(),
2157        },
2158    ))
2159}
2160
2161#[derive(Debug, Clone, PartialEq, Eq)]
2162pub struct DeltaDecoded {
2163    pub tick: SnapshotTick,
2164    pub baseline_tick: SnapshotTick,
2165    pub destroys: Vec<EntityId>,
2166    pub creates: Vec<EntitySnapshot>,
2167    pub updates: Vec<DeltaUpdateEntity>,
2168}
2169
2170#[derive(Debug, Clone, PartialEq, Eq)]
2171pub struct DeltaUpdateEntity {
2172    pub id: EntityId,
2173    pub components: Vec<DeltaUpdateComponent>,
2174}
2175
2176#[derive(Debug, Clone, PartialEq, Eq)]
2177pub struct DeltaUpdateComponent {
2178    pub id: ComponentId,
2179    pub fields: Vec<(usize, FieldValue)>,
2180}
2181
2182fn apply_destroys(
2183    baseline: &[EntitySnapshot],
2184    destroys: &[EntityId],
2185) -> CodecResult<Vec<EntitySnapshot>> {
2186    let mut result = Vec::with_capacity(baseline.len());
2187    let mut i = 0usize;
2188    let mut j = 0usize;
2189    while i < baseline.len() || j < destroys.len() {
2190        let base = baseline.get(i);
2191        let destroy = destroys.get(j);
2192        match (base, destroy) {
2193            (Some(b), Some(d)) => {
2194                if b.id.raw() < d.raw() {
2195                    result.push(b.clone());
2196                    i += 1;
2197                } else if b.id.raw() > d.raw() {
2198                    return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2199                } else {
2200                    i += 1;
2201                    j += 1;
2202                }
2203            }
2204            (Some(b), None) => {
2205                result.push(b.clone());
2206                i += 1;
2207            }
2208            (None, Some(d)) => {
2209                return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2210            }
2211            (None, None) => break,
2212        }
2213    }
2214    Ok(result)
2215}
2216
2217fn apply_creates(
2218    baseline: Vec<EntitySnapshot>,
2219    creates: Vec<EntitySnapshot>,
2220) -> CodecResult<Vec<EntitySnapshot>> {
2221    let mut result = Vec::with_capacity(baseline.len() + creates.len());
2222    let mut i = 0usize;
2223    let mut j = 0usize;
2224    while i < baseline.len() || j < creates.len() {
2225        let base = baseline.get(i);
2226        let create = creates.get(j);
2227        match (base, create) {
2228            (Some(b), Some(c)) => {
2229                if b.id.raw() < c.id.raw() {
2230                    result.push(b.clone());
2231                    i += 1;
2232                } else if b.id.raw() > c.id.raw() {
2233                    result.push(c.clone());
2234                    j += 1;
2235                } else {
2236                    return Err(CodecError::EntityAlreadyExists {
2237                        entity_id: c.id.raw(),
2238                    });
2239                }
2240            }
2241            (Some(b), None) => {
2242                result.push(b.clone());
2243                i += 1;
2244            }
2245            (None, Some(c)) => {
2246                result.push(c.clone());
2247                j += 1;
2248            }
2249            (None, None) => break,
2250        }
2251    }
2252    Ok(result)
2253}
2254
2255fn apply_updates(
2256    entities: &mut [EntitySnapshot],
2257    updates: &[DeltaUpdateEntity],
2258) -> CodecResult<()> {
2259    for update in updates {
2260        let idx = entities
2261            .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
2262            .map_err(|_| CodecError::EntityNotFound {
2263                entity_id: update.id.raw(),
2264            })?;
2265        let entity = &mut entities[idx];
2266        for component_update in &update.components {
2267            let component = entity
2268                .components
2269                .iter_mut()
2270                .find(|c| c.id == component_update.id)
2271                .ok_or_else(|| CodecError::ComponentNotFound {
2272                    entity_id: update.id.raw(),
2273                    component_id: component_update.id.get(),
2274                })?;
2275            for (field_idx, value) in &component_update.fields {
2276                if *field_idx >= component.fields.len() {
2277                    return Err(CodecError::InvalidMask {
2278                        kind: MaskKind::FieldMask {
2279                            component: component_update.id,
2280                        },
2281                        reason: MaskReason::FieldCountMismatch {
2282                            expected: component.fields.len(),
2283                            actual: *field_idx + 1,
2284                        },
2285                    });
2286                }
2287                component.fields[*field_idx] = *value;
2288            }
2289        }
2290    }
2291    Ok(())
2292}
2293
2294fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
2295    let mut prev: Option<u32> = None;
2296    for entity in entities {
2297        if let Some(prev_id) = prev {
2298            if entity.id.raw() <= prev_id {
2299                return Err(CodecError::InvalidEntityOrder {
2300                    previous: prev_id,
2301                    current: entity.id.raw(),
2302                });
2303            }
2304        }
2305        prev = Some(entity.id.raw());
2306    }
2307    Ok(())
2308}
2309
2310fn write_component_mask(
2311    schema: &schema::Schema,
2312    entity: &EntitySnapshot,
2313    writer: &mut BitWriter<'_>,
2314) -> CodecResult<()> {
2315    for component in &schema.components {
2316        let present = find_component(entity, component.id).is_some();
2317        writer.write_bit(present)?;
2318    }
2319    Ok(())
2320}
2321
2322fn write_full_component(
2323    component: &ComponentDef,
2324    snapshot: &ComponentSnapshot,
2325    limits: &CodecLimits,
2326    writer: &mut BitWriter<'_>,
2327) -> CodecResult<()> {
2328    if snapshot.fields.len() != component.fields.len() {
2329        return Err(CodecError::InvalidMask {
2330            kind: MaskKind::FieldMask {
2331                component: component.id,
2332            },
2333            reason: MaskReason::FieldCountMismatch {
2334                expected: component.fields.len(),
2335                actual: snapshot.fields.len(),
2336            },
2337        });
2338    }
2339    if snapshot.fields.len() > limits.max_fields_per_component {
2340        return Err(CodecError::LimitsExceeded {
2341            kind: LimitKind::FieldsPerComponent,
2342            limit: limits.max_fields_per_component,
2343            actual: snapshot.fields.len(),
2344        });
2345    }
2346
2347    for _ in &component.fields {
2348        writer.write_bit(true)?;
2349    }
2350    for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
2351        write_field_value(component.id, *field, *value, writer)?;
2352    }
2353    Ok(())
2354}
2355
2356fn decode_full_component(
2357    component: &ComponentDef,
2358    reader: &mut BitReader<'_>,
2359    limits: &CodecLimits,
2360) -> CodecResult<Vec<FieldValue>> {
2361    if component.fields.len() > limits.max_fields_per_component {
2362        return Err(CodecError::LimitsExceeded {
2363            kind: LimitKind::FieldsPerComponent,
2364            limit: limits.max_fields_per_component,
2365            actual: component.fields.len(),
2366        });
2367    }
2368
2369    let mask = read_mask(
2370        reader,
2371        component.fields.len(),
2372        MaskKind::FieldMask {
2373            component: component.id,
2374        },
2375    )?;
2376    let mut values = Vec::with_capacity(component.fields.len());
2377    for (idx, field) in component.fields.iter().enumerate() {
2378        if !mask[idx] {
2379            return Err(CodecError::InvalidMask {
2380                kind: MaskKind::FieldMask {
2381                    component: component.id,
2382                },
2383                reason: MaskReason::MissingField { field: field.id },
2384            });
2385        }
2386        values.push(read_field_value(component.id, *field, reader)?);
2387    }
2388    Ok(values)
2389}
2390
2391fn write_update_components(
2392    schema: &schema::Schema,
2393    baseline: &EntitySnapshot,
2394    current: &EntitySnapshot,
2395    limits: &CodecLimits,
2396    scratch: &mut CodecScratch,
2397    writer: &mut BitWriter<'_>,
2398) -> CodecResult<()> {
2399    let component_count = schema.components.len();
2400    let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2401    component_changed.fill(false);
2402    for (idx, component) in schema.components.iter().enumerate() {
2403        let base = find_component(baseline, component.id);
2404        let curr = find_component(current, component.id);
2405        if base.is_some() != curr.is_some() {
2406            return Err(CodecError::InvalidMask {
2407                kind: MaskKind::ComponentMask,
2408                reason: MaskReason::ComponentPresenceMismatch {
2409                    component: component.id,
2410                },
2411            });
2412        }
2413        if let (Some(base), Some(curr)) = (base, curr) {
2414            if base.fields.len() != component.fields.len()
2415                || curr.fields.len() != component.fields.len()
2416            {
2417                return Err(CodecError::InvalidMask {
2418                    kind: MaskKind::FieldMask {
2419                        component: component.id,
2420                    },
2421                    reason: MaskReason::FieldCountMismatch {
2422                        expected: component.fields.len(),
2423                        actual: base.fields.len().max(curr.fields.len()),
2424                    },
2425                });
2426            }
2427            if component.fields.len() > limits.max_fields_per_component {
2428                return Err(CodecError::LimitsExceeded {
2429                    kind: LimitKind::FieldsPerComponent,
2430                    limit: limits.max_fields_per_component,
2431                    actual: component.fields.len(),
2432                });
2433            }
2434            let (component_changed, field_mask) =
2435                scratch.component_and_field_masks_mut(component_count, component.fields.len());
2436            let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2437                .iter()
2438                .any(|b| *b);
2439            writer.write_bit(any_changed)?;
2440            if any_changed {
2441                component_changed[idx] = true;
2442            }
2443        } else {
2444            writer.write_bit(false)?;
2445        }
2446    }
2447
2448    for (idx, component) in schema.components.iter().enumerate() {
2449        let (base, curr) = match (
2450            find_component(baseline, component.id),
2451            find_component(current, component.id),
2452        ) {
2453            (Some(base), Some(curr)) => (base, curr),
2454            _ => continue,
2455        };
2456        if component.fields.len() > limits.max_fields_per_component {
2457            return Err(CodecError::LimitsExceeded {
2458                kind: LimitKind::FieldsPerComponent,
2459                limit: limits.max_fields_per_component,
2460                actual: component.fields.len(),
2461            });
2462        }
2463        let (component_changed, field_mask) =
2464            scratch.component_and_field_masks_mut(component_count, component.fields.len());
2465        if component_changed[idx] {
2466            let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2467            for bit in field_mask {
2468                writer.write_bit(*bit)?;
2469            }
2470            for (((field, _base_val), curr_val), changed) in component
2471                .fields
2472                .iter()
2473                .zip(base.fields.iter())
2474                .zip(curr.fields.iter())
2475                .zip(field_mask.iter())
2476            {
2477                if *changed {
2478                    write_field_value(component.id, *field, *curr_val, writer)?;
2479                }
2480            }
2481        }
2482    }
2483    Ok(())
2484}
2485
2486fn decode_update_component(
2487    component: &ComponentDef,
2488    reader: &mut BitReader<'_>,
2489    limits: &CodecLimits,
2490) -> CodecResult<Vec<(usize, FieldValue)>> {
2491    if component.fields.len() > limits.max_fields_per_component {
2492        return Err(CodecError::LimitsExceeded {
2493            kind: LimitKind::FieldsPerComponent,
2494            limit: limits.max_fields_per_component,
2495            actual: component.fields.len(),
2496        });
2497    }
2498    let mask = read_mask(
2499        reader,
2500        component.fields.len(),
2501        MaskKind::FieldMask {
2502            component: component.id,
2503        },
2504    )?;
2505    if !mask.iter().any(|b| *b) {
2506        return Err(CodecError::InvalidMask {
2507            kind: MaskKind::FieldMask {
2508                component: component.id,
2509            },
2510            reason: MaskReason::EmptyFieldMask {
2511                component: component.id,
2512            },
2513        });
2514    }
2515    let mut fields = Vec::new();
2516    for (idx, field) in component.fields.iter().enumerate() {
2517        if mask[idx] {
2518            let value = read_field_value(component.id, *field, reader)?;
2519            fields.push((idx, value));
2520        }
2521    }
2522    Ok(fields)
2523}
2524
2525fn compute_field_mask_into<'a>(
2526    component: &ComponentDef,
2527    baseline: &ComponentSnapshot,
2528    current: &ComponentSnapshot,
2529    field_mask: &'a mut [bool],
2530) -> CodecResult<&'a [bool]> {
2531    for (((field, base_val), curr_val), slot) in component
2532        .fields
2533        .iter()
2534        .zip(baseline.fields.iter())
2535        .zip(current.fields.iter())
2536        .zip(field_mask.iter_mut())
2537    {
2538        *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2539    }
2540    Ok(field_mask)
2541}
2542
2543fn field_changed(
2544    component_id: ComponentId,
2545    field: FieldDef,
2546    baseline: FieldValue,
2547    current: FieldValue,
2548) -> CodecResult<bool> {
2549    match field.change {
2550        ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2551        ChangePolicy::Threshold { threshold_q } => {
2552            field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2553        }
2554    }
2555}
2556
2557fn field_differs(
2558    component_id: ComponentId,
2559    field: FieldDef,
2560    baseline: FieldValue,
2561    current: FieldValue,
2562) -> CodecResult<bool> {
2563    match (baseline, current) {
2564        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2565        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2566        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2567        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2568        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2569        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2570        _ => Err(CodecError::InvalidValue {
2571            component: component_id,
2572            field: field.id,
2573            reason: ValueReason::TypeMismatch {
2574                expected: codec_name(field.codec),
2575                found: value_name(current),
2576            },
2577        }),
2578    }
2579}
2580
2581fn field_exceeds_threshold(
2582    component_id: ComponentId,
2583    field: FieldDef,
2584    baseline: FieldValue,
2585    current: FieldValue,
2586    threshold_q: u32,
2587) -> CodecResult<bool> {
2588    let threshold_q = threshold_q as u64;
2589    match (baseline, current) {
2590        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2591            Ok((a - b).unsigned_abs() > threshold_q)
2592        }
2593        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2594        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2595        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2596        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2597            Ok((a - b).unsigned_abs() > threshold_q)
2598        }
2599        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2600        _ => Err(CodecError::InvalidValue {
2601            component: component_id,
2602            field: field.id,
2603            reason: ValueReason::TypeMismatch {
2604                expected: codec_name(field.codec),
2605                found: value_name(current),
2606            },
2607        }),
2608    }
2609}
2610
2611fn entity_has_updates(
2612    schema: &schema::Schema,
2613    baseline: &EntitySnapshot,
2614    current: &EntitySnapshot,
2615    limits: &CodecLimits,
2616) -> CodecResult<bool> {
2617    ensure_component_presence_matches(schema, baseline, current)?;
2618    for component in &schema.components {
2619        let base = find_component(baseline, component.id);
2620        let curr = find_component(current, component.id);
2621        if let (Some(base), Some(curr)) = (base, curr) {
2622            if base.fields.len() != component.fields.len()
2623                || curr.fields.len() != component.fields.len()
2624            {
2625                return Err(CodecError::InvalidMask {
2626                    kind: MaskKind::FieldMask {
2627                        component: component.id,
2628                    },
2629                    reason: MaskReason::FieldCountMismatch {
2630                        expected: component.fields.len(),
2631                        actual: base.fields.len().max(curr.fields.len()),
2632                    },
2633                });
2634            }
2635            if component.fields.len() > limits.max_fields_per_component {
2636                return Err(CodecError::LimitsExceeded {
2637                    kind: LimitKind::FieldsPerComponent,
2638                    limit: limits.max_fields_per_component,
2639                    actual: component.fields.len(),
2640                });
2641            }
2642            for ((field, base_val), curr_val) in component
2643                .fields
2644                .iter()
2645                .zip(base.fields.iter())
2646                .zip(curr.fields.iter())
2647            {
2648                if field_changed(component.id, *field, *base_val, *curr_val)? {
2649                    return Ok(true);
2650                }
2651            }
2652        }
2653    }
2654    Ok(false)
2655}
2656
2657fn ensure_component_presence_matches(
2658    schema: &schema::Schema,
2659    baseline: &EntitySnapshot,
2660    current: &EntitySnapshot,
2661) -> CodecResult<()> {
2662    // In this version, component presence is stable across an entity's lifetime.
2663    for component in &schema.components {
2664        let base = find_component(baseline, component.id).is_some();
2665        let curr = find_component(current, component.id).is_some();
2666        if base != curr {
2667            return Err(CodecError::InvalidMask {
2668                kind: MaskKind::ComponentMask,
2669                reason: MaskReason::ComponentPresenceMismatch {
2670                    component: component.id,
2671                },
2672            });
2673        }
2674    }
2675    Ok(())
2676}
2677
2678fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2679    entity.components.iter().find(|c| c.id == id)
2680}
2681
2682fn codec_name(codec: schema::FieldCodec) -> &'static str {
2683    match codec {
2684        schema::FieldCodec::Bool => "bool",
2685        schema::FieldCodec::UInt { .. } => "uint",
2686        schema::FieldCodec::SInt { .. } => "sint",
2687        schema::FieldCodec::VarUInt => "varuint",
2688        schema::FieldCodec::VarSInt => "varsint",
2689        schema::FieldCodec::FixedPoint(_) => "fixed-point",
2690    }
2691}
2692
2693fn value_name(value: FieldValue) -> &'static str {
2694    match value {
2695        FieldValue::Bool(_) => "bool",
2696        FieldValue::UInt(_) => "uint",
2697        FieldValue::SInt(_) => "sint",
2698        FieldValue::VarUInt(_) => "varuint",
2699        FieldValue::VarSInt(_) => "varsint",
2700        FieldValue::FixedPoint(_) => "fixed-point",
2701    }
2702}
2703
2704#[cfg(test)]
2705mod tests {
2706    use super::*;
2707    use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2708
2709    fn schema_one_bool() -> Schema {
2710        let component = ComponentDef::new(ComponentId::new(1).unwrap())
2711            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2712        Schema::new(vec![component]).unwrap()
2713    }
2714
2715    fn schema_uint_threshold(threshold_q: u32) -> Schema {
2716        let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2717            .change(ChangePolicy::Threshold { threshold_q });
2718        let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2719        Schema::new(vec![component]).unwrap()
2720    }
2721
2722    fn schema_two_components() -> Schema {
2723        let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2724            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2725        let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2726            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2727        Schema::new(vec![c1, c2]).unwrap()
2728    }
2729
2730    fn baseline_snapshot() -> Snapshot {
2731        Snapshot {
2732            tick: SnapshotTick::new(10),
2733            entities: vec![EntitySnapshot {
2734                id: EntityId::new(1),
2735                components: vec![ComponentSnapshot {
2736                    id: ComponentId::new(1).unwrap(),
2737                    fields: vec![FieldValue::Bool(false)],
2738                }],
2739            }],
2740        }
2741    }
2742
2743    #[test]
2744    fn no_op_delta_is_empty() {
2745        let schema = schema_one_bool();
2746        let baseline = baseline_snapshot();
2747        let current = baseline.clone();
2748        let mut buf = [0u8; 128];
2749        let bytes = encode_delta_snapshot(
2750            &schema,
2751            SnapshotTick::new(11),
2752            baseline.tick,
2753            &baseline,
2754            &current,
2755            &CodecLimits::for_testing(),
2756            &mut buf,
2757        )
2758        .unwrap();
2759        let header =
2760            wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2761        let mut expected = [0u8; wire::HEADER_SIZE];
2762        encode_header(&header, &mut expected).unwrap();
2763        assert_eq!(&buf[..bytes], expected.as_slice());
2764    }
2765
2766    #[test]
2767    fn delta_roundtrip_single_update() {
2768        let schema = schema_one_bool();
2769        let baseline = baseline_snapshot();
2770        let current = Snapshot {
2771            tick: SnapshotTick::new(11),
2772            entities: vec![EntitySnapshot {
2773                id: EntityId::new(1),
2774                components: vec![ComponentSnapshot {
2775                    id: ComponentId::new(1).unwrap(),
2776                    fields: vec![FieldValue::Bool(true)],
2777                }],
2778            }],
2779        };
2780
2781        let mut buf = [0u8; 128];
2782        let bytes = encode_delta_snapshot(
2783            &schema,
2784            current.tick,
2785            baseline.tick,
2786            &baseline,
2787            &current,
2788            &CodecLimits::for_testing(),
2789            &mut buf,
2790        )
2791        .unwrap();
2792        let applied = apply_delta_snapshot(
2793            &schema,
2794            &baseline,
2795            &buf[..bytes],
2796            &wire::Limits::for_testing(),
2797            &CodecLimits::for_testing(),
2798        )
2799        .unwrap();
2800        assert_eq!(applied.entities, current.entities);
2801    }
2802
2803    #[test]
2804    fn delta_roundtrip_reuse_scratch() {
2805        let schema = schema_one_bool();
2806        let baseline = baseline_snapshot();
2807        let current_one = Snapshot {
2808            tick: SnapshotTick::new(11),
2809            entities: vec![EntitySnapshot {
2810                id: EntityId::new(1),
2811                components: vec![ComponentSnapshot {
2812                    id: ComponentId::new(1).unwrap(),
2813                    fields: vec![FieldValue::Bool(true)],
2814                }],
2815            }],
2816        };
2817        let current_two = Snapshot {
2818            tick: SnapshotTick::new(12),
2819            entities: vec![EntitySnapshot {
2820                id: EntityId::new(1),
2821                components: vec![ComponentSnapshot {
2822                    id: ComponentId::new(1).unwrap(),
2823                    fields: vec![FieldValue::Bool(false)],
2824                }],
2825            }],
2826        };
2827
2828        let mut scratch = CodecScratch::default();
2829        let mut buf_one = [0u8; 128];
2830        let mut buf_two = [0u8; 128];
2831
2832        let bytes_one = encode_delta_snapshot_with_scratch(
2833            &schema,
2834            current_one.tick,
2835            baseline.tick,
2836            &baseline,
2837            &current_one,
2838            &CodecLimits::for_testing(),
2839            &mut scratch,
2840            &mut buf_one,
2841        )
2842        .unwrap();
2843        let applied_one = apply_delta_snapshot(
2844            &schema,
2845            &baseline,
2846            &buf_one[..bytes_one],
2847            &wire::Limits::for_testing(),
2848            &CodecLimits::for_testing(),
2849        )
2850        .unwrap();
2851        assert_eq!(applied_one.entities, current_one.entities);
2852
2853        let bytes_two = encode_delta_snapshot_with_scratch(
2854            &schema,
2855            current_two.tick,
2856            baseline.tick,
2857            &baseline,
2858            &current_two,
2859            &CodecLimits::for_testing(),
2860            &mut scratch,
2861            &mut buf_two,
2862        )
2863        .unwrap();
2864        let applied_two = apply_delta_snapshot(
2865            &schema,
2866            &baseline,
2867            &buf_two[..bytes_two],
2868            &wire::Limits::for_testing(),
2869            &CodecLimits::for_testing(),
2870        )
2871        .unwrap();
2872        assert_eq!(applied_two.entities, current_two.entities);
2873    }
2874
2875    #[test]
2876    fn delta_rejects_both_update_encodings() {
2877        let schema = schema_one_bool();
2878        let baseline = baseline_snapshot();
2879        let update_body = [0u8; 1];
2880        let mut update_section = [0u8; 8];
2881        let update_len =
2882            wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2883                .unwrap();
2884
2885        let mut sparse_section = [0u8; 8];
2886        let sparse_len = wire::encode_section(
2887            SectionTag::EntityUpdateSparse,
2888            &update_body,
2889            &mut sparse_section,
2890        )
2891        .unwrap();
2892
2893        let payload_len = (update_len + sparse_len) as u32;
2894        let header = wire::PacketHeader::delta_snapshot(
2895            schema_hash(&schema),
2896            baseline.tick.raw() + 1,
2897            baseline.tick.raw(),
2898            payload_len,
2899        );
2900        let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2901        wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2902        buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2903            .copy_from_slice(&update_section[..update_len]);
2904        buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2905            .copy_from_slice(&sparse_section[..sparse_len]);
2906
2907        let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2908        let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2909        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2910
2911        let err = apply_delta_snapshot_from_packet(
2912            &schema,
2913            &baseline,
2914            &packet,
2915            &CodecLimits::for_testing(),
2916        )
2917        .unwrap_err();
2918        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2919    }
2920
2921    #[test]
2922    fn delta_roundtrip_create_destroy_update() {
2923        let schema = schema_one_bool();
2924        let baseline = Snapshot {
2925            tick: SnapshotTick::new(10),
2926            entities: vec![
2927                EntitySnapshot {
2928                    id: EntityId::new(1),
2929                    components: vec![ComponentSnapshot {
2930                        id: ComponentId::new(1).unwrap(),
2931                        fields: vec![FieldValue::Bool(false)],
2932                    }],
2933                },
2934                EntitySnapshot {
2935                    id: EntityId::new(2),
2936                    components: vec![ComponentSnapshot {
2937                        id: ComponentId::new(1).unwrap(),
2938                        fields: vec![FieldValue::Bool(false)],
2939                    }],
2940                },
2941            ],
2942        };
2943        let current = Snapshot {
2944            tick: SnapshotTick::new(11),
2945            entities: vec![
2946                EntitySnapshot {
2947                    id: EntityId::new(2),
2948                    components: vec![ComponentSnapshot {
2949                        id: ComponentId::new(1).unwrap(),
2950                        fields: vec![FieldValue::Bool(true)],
2951                    }],
2952                },
2953                EntitySnapshot {
2954                    id: EntityId::new(3),
2955                    components: vec![ComponentSnapshot {
2956                        id: ComponentId::new(1).unwrap(),
2957                        fields: vec![FieldValue::Bool(true)],
2958                    }],
2959                },
2960            ],
2961        };
2962
2963        let mut buf = [0u8; 256];
2964        let bytes = encode_delta_snapshot(
2965            &schema,
2966            current.tick,
2967            baseline.tick,
2968            &baseline,
2969            &current,
2970            &CodecLimits::for_testing(),
2971            &mut buf,
2972        )
2973        .unwrap();
2974        let applied = apply_delta_snapshot(
2975            &schema,
2976            &baseline,
2977            &buf[..bytes],
2978            &wire::Limits::for_testing(),
2979            &CodecLimits::for_testing(),
2980        )
2981        .unwrap();
2982        assert_eq!(applied.entities, current.entities);
2983    }
2984
2985    #[test]
2986    fn delta_session_header_matches_payload() {
2987        let schema = schema_one_bool();
2988        let baseline = baseline_snapshot();
2989        let current = Snapshot {
2990            tick: SnapshotTick::new(11),
2991            entities: vec![EntitySnapshot {
2992                id: EntityId::new(1),
2993                components: vec![ComponentSnapshot {
2994                    id: ComponentId::new(1).unwrap(),
2995                    fields: vec![FieldValue::Bool(true)],
2996                }],
2997            }],
2998        };
2999
3000        let mut full_buf = [0u8; 128];
3001        let full_bytes = encode_delta_snapshot_for_client(
3002            &schema,
3003            current.tick,
3004            baseline.tick,
3005            &baseline,
3006            &current,
3007            &CodecLimits::for_testing(),
3008            &mut full_buf,
3009        )
3010        .unwrap();
3011        let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
3012
3013        let mut session_buf = [0u8; 128];
3014        let mut last_tick = baseline.tick;
3015        let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
3016            &schema,
3017            current.tick,
3018            baseline.tick,
3019            &baseline,
3020            &current,
3021            &CodecLimits::for_testing(),
3022            &mut CodecScratch::default(),
3023            &mut last_tick,
3024            &mut session_buf,
3025        )
3026        .unwrap();
3027
3028        let session_header =
3029            wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
3030                .unwrap();
3031        let payload_start = session_header.header_len;
3032        let payload_end = payload_start + session_header.payload_len as usize;
3033        let session_payload = &session_buf[payload_start..payload_end];
3034        assert_eq!(full_payload, session_payload);
3035    }
3036
3037    #[test]
3038    fn delta_roundtrip_single_component_change() {
3039        let schema = schema_two_components();
3040        let baseline = Snapshot {
3041            tick: SnapshotTick::new(10),
3042            entities: vec![EntitySnapshot {
3043                id: EntityId::new(1),
3044                components: vec![
3045                    ComponentSnapshot {
3046                        id: ComponentId::new(1).unwrap(),
3047                        fields: vec![FieldValue::Bool(false)],
3048                    },
3049                    ComponentSnapshot {
3050                        id: ComponentId::new(2).unwrap(),
3051                        fields: vec![FieldValue::Bool(false)],
3052                    },
3053                ],
3054            }],
3055        };
3056        let current = Snapshot {
3057            tick: SnapshotTick::new(11),
3058            entities: vec![EntitySnapshot {
3059                id: EntityId::new(1),
3060                components: vec![
3061                    ComponentSnapshot {
3062                        id: ComponentId::new(1).unwrap(),
3063                        fields: vec![FieldValue::Bool(true)],
3064                    },
3065                    ComponentSnapshot {
3066                        id: ComponentId::new(2).unwrap(),
3067                        fields: vec![FieldValue::Bool(false)],
3068                    },
3069                ],
3070            }],
3071        };
3072
3073        let mut buf = [0u8; 256];
3074        let bytes = encode_delta_snapshot(
3075            &schema,
3076            current.tick,
3077            baseline.tick,
3078            &baseline,
3079            &current,
3080            &CodecLimits::for_testing(),
3081            &mut buf,
3082        )
3083        .unwrap();
3084        let applied = apply_delta_snapshot(
3085            &schema,
3086            &baseline,
3087            &buf[..bytes],
3088            &wire::Limits::for_testing(),
3089            &CodecLimits::for_testing(),
3090        )
3091        .unwrap();
3092        assert_eq!(applied.entities, current.entities);
3093    }
3094
3095    #[test]
3096    fn baseline_tick_mismatch_is_error() {
3097        let schema = schema_one_bool();
3098        let baseline = baseline_snapshot();
3099        let current = baseline.clone();
3100        let mut buf = [0u8; 128];
3101        let bytes = encode_delta_snapshot(
3102            &schema,
3103            SnapshotTick::new(11),
3104            baseline.tick,
3105            &baseline,
3106            &current,
3107            &CodecLimits::for_testing(),
3108            &mut buf,
3109        )
3110        .unwrap();
3111        let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3112        packet.header.baseline_tick = 999;
3113        wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
3114        let err = apply_delta_snapshot(
3115            &schema,
3116            &baseline,
3117            &buf[..bytes],
3118            &wire::Limits::for_testing(),
3119            &CodecLimits::for_testing(),
3120        )
3121        .unwrap_err();
3122        assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
3123    }
3124
3125    #[test]
3126    fn threshold_suppresses_small_change() {
3127        let schema = schema_uint_threshold(5);
3128        let baseline = Snapshot {
3129            tick: SnapshotTick::new(10),
3130            entities: vec![EntitySnapshot {
3131                id: EntityId::new(1),
3132                components: vec![ComponentSnapshot {
3133                    id: ComponentId::new(1).unwrap(),
3134                    fields: vec![FieldValue::UInt(10)],
3135                }],
3136            }],
3137        };
3138        let current = Snapshot {
3139            tick: SnapshotTick::new(11),
3140            entities: vec![EntitySnapshot {
3141                id: EntityId::new(1),
3142                components: vec![ComponentSnapshot {
3143                    id: ComponentId::new(1).unwrap(),
3144                    fields: vec![FieldValue::UInt(12)],
3145                }],
3146            }],
3147        };
3148
3149        let mut buf = [0u8; 128];
3150        let bytes = encode_delta_snapshot(
3151            &schema,
3152            current.tick,
3153            baseline.tick,
3154            &baseline,
3155            &current,
3156            &CodecLimits::for_testing(),
3157            &mut buf,
3158        )
3159        .unwrap();
3160
3161        let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3162        assert_eq!(packet.sections.len(), 0);
3163
3164        let applied = apply_delta_snapshot(
3165            &schema,
3166            &baseline,
3167            &buf[..bytes],
3168            &wire::Limits::for_testing(),
3169            &CodecLimits::for_testing(),
3170        )
3171        .unwrap();
3172        assert_eq!(applied.entities, baseline.entities);
3173    }
3174}