Skip to main content

codec/
delta.rs

1//! Delta snapshot encoding/decoding.
2
3use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14    ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15    write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16    FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20/// Selects the latest baseline tick at or before the ack tick.
21#[must_use]
22pub fn select_baseline_tick<T>(
23    store: &BaselineStore<T>,
24    ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26    store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29/// Encodes a delta snapshot into the provided output buffer.
30///
31/// Baseline and current snapshots must have entities sorted by `EntityId`.
32pub fn encode_delta_snapshot(
33    schema: &schema::Schema,
34    tick: SnapshotTick,
35    baseline_tick: SnapshotTick,
36    baseline: &Snapshot,
37    current: &Snapshot,
38    limits: &CodecLimits,
39    out: &mut [u8],
40) -> CodecResult<usize> {
41    let mut scratch = CodecScratch::default();
42    encode_delta_snapshot_with_scratch(
43        schema,
44        tick,
45        baseline_tick,
46        baseline,
47        current,
48        limits,
49        &mut scratch,
50        out,
51    )
52}
53
54/// Encodes a delta snapshot using reusable scratch buffers.
55#[allow(clippy::too_many_arguments)]
56pub fn encode_delta_snapshot_with_scratch(
57    schema: &schema::Schema,
58    tick: SnapshotTick,
59    baseline_tick: SnapshotTick,
60    baseline: &Snapshot,
61    current: &Snapshot,
62    limits: &CodecLimits,
63    scratch: &mut CodecScratch,
64    out: &mut [u8],
65) -> CodecResult<usize> {
66    encode_delta_snapshot_with_scratch_mode(
67        schema,
68        tick,
69        baseline_tick,
70        baseline,
71        current,
72        limits,
73        scratch,
74        out,
75        EncodeUpdateMode::Auto,
76    )
77}
78
79/// Encodes a delta snapshot for a client-specific view.
80///
81/// This assumes `baseline` and `current` are already filtered for the client interest set.
82/// Updates are encoded in sparse mode to optimize for small per-client packets.
83pub fn encode_delta_snapshot_for_client(
84    schema: &schema::Schema,
85    tick: SnapshotTick,
86    baseline_tick: SnapshotTick,
87    baseline: &Snapshot,
88    current: &Snapshot,
89    limits: &CodecLimits,
90    out: &mut [u8],
91) -> CodecResult<usize> {
92    let mut scratch = CodecScratch::default();
93    encode_delta_snapshot_for_client_with_scratch(
94        schema,
95        tick,
96        baseline_tick,
97        baseline,
98        current,
99        limits,
100        &mut scratch,
101        out,
102    )
103}
104
105/// Encodes a client delta snapshot using a compact session header.
106#[allow(clippy::too_many_arguments)]
107pub fn encode_delta_snapshot_for_client_session(
108    schema: &schema::Schema,
109    tick: SnapshotTick,
110    baseline_tick: SnapshotTick,
111    baseline: &Snapshot,
112    current: &Snapshot,
113    limits: &CodecLimits,
114    last_tick: &mut SnapshotTick,
115    out: &mut [u8],
116) -> CodecResult<usize> {
117    let mut scratch = CodecScratch::default();
118    encode_delta_snapshot_for_client_session_with_scratch(
119        schema,
120        tick,
121        baseline_tick,
122        baseline,
123        current,
124        limits,
125        &mut scratch,
126        last_tick,
127        out,
128    )
129}
130
131/// Encodes a delta snapshot for a client-specific view using reusable scratch buffers.
132#[allow(clippy::too_many_arguments)]
133pub fn encode_delta_snapshot_for_client_with_scratch(
134    schema: &schema::Schema,
135    tick: SnapshotTick,
136    baseline_tick: SnapshotTick,
137    baseline: &Snapshot,
138    current: &Snapshot,
139    limits: &CodecLimits,
140    scratch: &mut CodecScratch,
141    out: &mut [u8],
142) -> CodecResult<usize> {
143    encode_delta_snapshot_with_scratch_mode(
144        schema,
145        tick,
146        baseline_tick,
147        baseline,
148        current,
149        limits,
150        scratch,
151        out,
152        EncodeUpdateMode::Sparse,
153    )
154}
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157enum EncodeUpdateMode {
158    Auto,
159    Sparse,
160}
161
162/// Encodes a client delta snapshot using a compact session header.
163#[allow(clippy::too_many_arguments)]
164pub fn encode_delta_snapshot_for_client_session_with_scratch(
165    schema: &schema::Schema,
166    tick: SnapshotTick,
167    baseline_tick: SnapshotTick,
168    baseline: &Snapshot,
169    current: &Snapshot,
170    limits: &CodecLimits,
171    scratch: &mut CodecScratch,
172    last_tick: &mut SnapshotTick,
173    out: &mut [u8],
174) -> CodecResult<usize> {
175    let max_header = wire::SESSION_MAX_HEADER_SIZE;
176    if out.len() < max_header {
177        return Err(CodecError::OutputTooSmall {
178            needed: max_header,
179            available: out.len(),
180        });
181    }
182    if tick.raw() <= last_tick.raw() {
183        return Err(CodecError::InvalidEntityOrder {
184            previous: last_tick.raw(),
185            current: tick.raw(),
186        });
187    }
188    if baseline_tick.raw() > tick.raw() {
189        return Err(CodecError::BaselineTickMismatch {
190            expected: baseline_tick.raw(),
191            found: tick.raw(),
192        });
193    }
194    let payload_len = encode_delta_payload_with_mode(
195        schema,
196        baseline_tick,
197        baseline,
198        current,
199        limits,
200        scratch,
201        &mut out[max_header..],
202        EncodeUpdateMode::Sparse,
203    )?;
204    let tick_delta = tick.raw() - last_tick.raw();
205    let baseline_delta = tick.raw() - baseline_tick.raw();
206    let header_len = wire::encode_session_header(
207        &mut out[..max_header],
208        wire::SessionFlags::delta_snapshot(),
209        tick_delta,
210        baseline_delta,
211        payload_len as u32,
212    )
213    .map_err(|_| CodecError::OutputTooSmall {
214        needed: max_header,
215        available: out.len(),
216    })?;
217    if header_len < max_header {
218        let payload_start = max_header;
219        let payload_end = max_header + payload_len;
220        out.copy_within(payload_start..payload_end, header_len);
221    }
222    *last_tick = tick;
223    Ok(header_len + payload_len)
224}
225
226#[allow(clippy::too_many_arguments)]
227fn encode_delta_snapshot_with_scratch_mode(
228    schema: &schema::Schema,
229    tick: SnapshotTick,
230    baseline_tick: SnapshotTick,
231    baseline: &Snapshot,
232    current: &Snapshot,
233    limits: &CodecLimits,
234    scratch: &mut CodecScratch,
235    out: &mut [u8],
236    mode: EncodeUpdateMode,
237) -> CodecResult<usize> {
238    if out.len() < wire::HEADER_SIZE {
239        return Err(CodecError::OutputTooSmall {
240            needed: wire::HEADER_SIZE,
241            available: out.len(),
242        });
243    }
244    let payload_len = encode_delta_payload_with_mode(
245        schema,
246        baseline_tick,
247        baseline,
248        current,
249        limits,
250        scratch,
251        &mut out[wire::HEADER_SIZE..],
252        mode,
253    )?;
254    let header = wire::PacketHeader::delta_snapshot(
255        schema_hash(schema),
256        tick.raw(),
257        baseline_tick.raw(),
258        payload_len as u32,
259    );
260    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
261        CodecError::OutputTooSmall {
262            needed: wire::HEADER_SIZE,
263            available: out.len(),
264        }
265    })?;
266
267    Ok(wire::HEADER_SIZE + payload_len)
268}
269
270#[allow(clippy::too_many_arguments)]
271fn encode_delta_payload_with_mode(
272    schema: &schema::Schema,
273    baseline_tick: SnapshotTick,
274    baseline: &Snapshot,
275    current: &Snapshot,
276    limits: &CodecLimits,
277    scratch: &mut CodecScratch,
278    out: &mut [u8],
279    mode: EncodeUpdateMode,
280) -> CodecResult<usize> {
281    if baseline.tick != baseline_tick {
282        return Err(CodecError::BaselineTickMismatch {
283            expected: baseline.tick.raw(),
284            found: baseline_tick.raw(),
285        });
286    }
287
288    ensure_entities_sorted(&baseline.entities)?;
289    ensure_entities_sorted(&current.entities)?;
290
291    let mut counts = DiffCounts::default();
292    diff_counts(schema, baseline, current, limits, &mut counts)?;
293
294    if counts.creates > limits.max_entities_create {
295        return Err(CodecError::LimitsExceeded {
296            kind: LimitKind::EntitiesCreate,
297            limit: limits.max_entities_create,
298            actual: counts.creates,
299        });
300    }
301    if counts.updates > limits.max_entities_update {
302        return Err(CodecError::LimitsExceeded {
303            kind: LimitKind::EntitiesUpdate,
304            limit: limits.max_entities_update,
305            actual: counts.updates,
306        });
307    }
308    if counts.destroys > limits.max_entities_destroy {
309        return Err(CodecError::LimitsExceeded {
310            kind: LimitKind::EntitiesDestroy,
311            limit: limits.max_entities_destroy,
312            actual: counts.destroys,
313        });
314    }
315
316    let mut offset = 0;
317    if counts.destroys > 0 {
318        let written = write_section(
319            SectionTag::EntityDestroy,
320            &mut out[offset..],
321            limits,
322            |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
323        )?;
324        offset += written;
325    }
326    if counts.creates > 0 {
327        let written = write_section(
328            SectionTag::EntityCreate,
329            &mut out[offset..],
330            limits,
331            |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
332        )?;
333        offset += written;
334    }
335    if counts.updates > 0 {
336        let update_encoding = match mode {
337            EncodeUpdateMode::Auto => {
338                select_update_encoding(schema, baseline, current, limits, scratch)?
339            }
340            EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
341        };
342        let section_tag = match update_encoding {
343            UpdateEncoding::Masked => SectionTag::EntityUpdate,
344            UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
345        };
346        let written =
347            write_section(
348                section_tag,
349                &mut out[offset..],
350                limits,
351                |writer| match update_encoding {
352                    UpdateEncoding::Masked => encode_update_body_masked(
353                        schema,
354                        baseline,
355                        current,
356                        counts.updates,
357                        limits,
358                        scratch,
359                        writer,
360                    ),
361                    UpdateEncoding::Sparse => encode_update_body_sparse_packed(
362                        schema,
363                        baseline,
364                        current,
365                        counts.updates,
366                        limits,
367                        scratch,
368                        writer,
369                    ),
370                },
371            )?;
372        offset += written;
373    }
374
375    Ok(offset)
376}
377
378/// Applies a delta snapshot to a baseline snapshot.
379pub fn apply_delta_snapshot(
380    schema: &schema::Schema,
381    baseline: &Snapshot,
382    bytes: &[u8],
383    wire_limits: &wire::Limits,
384    limits: &CodecLimits,
385) -> CodecResult<Snapshot> {
386    let packet = decode_packet(bytes, wire_limits)?;
387    apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
388}
389
390/// Applies a delta snapshot from a parsed wire packet.
391pub fn apply_delta_snapshot_from_packet(
392    schema: &schema::Schema,
393    baseline: &Snapshot,
394    packet: &WirePacket<'_>,
395    limits: &CodecLimits,
396) -> CodecResult<Snapshot> {
397    let header = packet.header;
398    if !header.flags.is_delta_snapshot() {
399        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
400            flags: header.flags.raw(),
401        }));
402    }
403    if header.baseline_tick == 0 {
404        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
405            baseline_tick: header.baseline_tick,
406            flags: header.flags.raw(),
407        }));
408    }
409    if header.baseline_tick != baseline.tick.raw() {
410        return Err(CodecError::BaselineTickMismatch {
411            expected: baseline.tick.raw(),
412            found: header.baseline_tick,
413        });
414    }
415
416    let expected_hash = schema_hash(schema);
417    if header.schema_hash != expected_hash {
418        return Err(CodecError::SchemaMismatch {
419            expected: expected_hash,
420            found: header.schema_hash,
421        });
422    }
423
424    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
425
426    ensure_entities_sorted(&baseline.entities)?;
427    ensure_entities_sorted(&creates)?;
428
429    let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
430    remaining = apply_creates(remaining, creates)?;
431    if remaining.len() > limits.max_total_entities_after_apply {
432        return Err(CodecError::LimitsExceeded {
433            kind: LimitKind::TotalEntitiesAfterApply,
434            limit: limits.max_total_entities_after_apply,
435            actual: remaining.len(),
436        });
437    }
438    apply_updates(&mut remaining, &updates)?;
439
440    Ok(Snapshot {
441        tick: SnapshotTick::new(header.tick),
442        entities: remaining,
443    })
444}
445
446/// Decodes a delta packet without applying it to a baseline.
447pub fn decode_delta_packet(
448    schema: &schema::Schema,
449    packet: &WirePacket<'_>,
450    limits: &CodecLimits,
451) -> CodecResult<DeltaDecoded> {
452    let header = packet.header;
453    if !header.flags.is_delta_snapshot() {
454        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
455            flags: header.flags.raw(),
456        }));
457    }
458    if header.baseline_tick == 0 {
459        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
460            baseline_tick: header.baseline_tick,
461            flags: header.flags.raw(),
462        }));
463    }
464    let expected_hash = schema_hash(schema);
465    if header.schema_hash != expected_hash {
466        return Err(CodecError::SchemaMismatch {
467            expected: expected_hash,
468            found: header.schema_hash,
469        });
470    }
471
472    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
473
474    Ok(DeltaDecoded {
475        tick: SnapshotTick::new(header.tick),
476        baseline_tick: SnapshotTick::new(header.baseline_tick),
477        destroys,
478        creates,
479        updates,
480    })
481}
482
483#[derive(Default)]
484struct DiffCounts {
485    creates: usize,
486    updates: usize,
487    destroys: usize,
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491enum UpdateEncoding {
492    Masked,
493    Sparse,
494}
495
496fn diff_counts(
497    schema: &schema::Schema,
498    baseline: &Snapshot,
499    current: &Snapshot,
500    limits: &CodecLimits,
501    counts: &mut DiffCounts,
502) -> CodecResult<()> {
503    let mut i = 0usize;
504    let mut j = 0usize;
505    while i < baseline.entities.len() || j < current.entities.len() {
506        let base = baseline.entities.get(i);
507        let curr = current.entities.get(j);
508        match (base, curr) {
509            (Some(b), Some(c)) => {
510                if b.id.raw() < c.id.raw() {
511                    counts.destroys += 1;
512                    i += 1;
513                } else if b.id.raw() > c.id.raw() {
514                    counts.creates += 1;
515                    j += 1;
516                } else {
517                    if entity_has_updates(schema, b, c, limits)? {
518                        counts.updates += 1;
519                    }
520                    i += 1;
521                    j += 1;
522                }
523            }
524            (Some(_), None) => {
525                counts.destroys += 1;
526                i += 1;
527            }
528            (None, Some(_)) => {
529                counts.creates += 1;
530                j += 1;
531            }
532            (None, None) => break,
533        }
534    }
535    Ok(())
536}
537
538fn select_update_encoding(
539    schema: &schema::Schema,
540    baseline: &Snapshot,
541    current: &Snapshot,
542    limits: &CodecLimits,
543    scratch: &mut CodecScratch,
544) -> CodecResult<UpdateEncoding> {
545    let component_count = schema.components.len();
546    let mut mask_bits = 0usize;
547    let mut sparse_bits = 0usize;
548    let mut baseline_iter = baseline.entities.iter();
549    let mut current_iter = current.entities.iter();
550    let mut baseline_next = baseline_iter.next();
551    let mut current_next = current_iter.next();
552
553    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
554        match base.id.cmp(&curr.id) {
555            Ordering::Less => {
556                baseline_next = baseline_iter.next();
557            }
558            Ordering::Greater => {
559                current_next = current_iter.next();
560            }
561            Ordering::Equal => {
562                for component in &schema.components {
563                    let base_component = find_component(base, component.id);
564                    let curr_component = find_component(curr, component.id);
565                    if base_component.is_some() != curr_component.is_some() {
566                        return Err(CodecError::InvalidMask {
567                            kind: MaskKind::ComponentMask,
568                            reason: MaskReason::ComponentPresenceMismatch {
569                                component: component.id,
570                            },
571                        });
572                    }
573                    if let (Some(base_component), Some(curr_component)) =
574                        (base_component, curr_component)
575                    {
576                        if base_component.fields.len() != component.fields.len()
577                            || curr_component.fields.len() != component.fields.len()
578                        {
579                            return Err(CodecError::InvalidMask {
580                                kind: MaskKind::FieldMask {
581                                    component: component.id,
582                                },
583                                reason: MaskReason::FieldCountMismatch {
584                                    expected: component.fields.len(),
585                                    actual: base_component
586                                        .fields
587                                        .len()
588                                        .max(curr_component.fields.len()),
589                                },
590                            });
591                        }
592                        if component.fields.len() > limits.max_fields_per_component {
593                            return Err(CodecError::LimitsExceeded {
594                                kind: LimitKind::FieldsPerComponent,
595                                limit: limits.max_fields_per_component,
596                                actual: component.fields.len(),
597                            });
598                        }
599                        let (_, field_mask) = scratch
600                            .component_and_field_masks_mut(component_count, component.fields.len());
601                        // Use the same change predicate as masked/sparse encoding.
602                        let field_mask = compute_field_mask_into(
603                            component,
604                            base_component,
605                            curr_component,
606                            field_mask,
607                        )?;
608                        let changed = field_mask.iter().filter(|bit| **bit).count();
609                        if changed > 0 {
610                            let field_count = component.fields.len();
611                            let index_bits =
612                                required_bits(field_count.saturating_sub(1) as u64) as usize;
613                            // Compare per-component mask bits vs packed sparse index bits.
614                            // Values are sent in both encodings, so we only estimate index/mask + id overhead.
615                            mask_bits += field_count;
616                            sparse_bits += index_bits * changed;
617                            sparse_bits += varu32_len(curr.id.raw()) * 8;
618                            sparse_bits += varu32_len(component.id.get() as u32) * 8;
619                            sparse_bits += varu32_len(changed as u32) * 8;
620                        }
621                    }
622                }
623
624                baseline_next = baseline_iter.next();
625                current_next = current_iter.next();
626            }
627        }
628    }
629
630    if mask_bits == 0 {
631        return Ok(UpdateEncoding::Masked);
632    }
633
634    if sparse_bits <= mask_bits {
635        Ok(UpdateEncoding::Sparse)
636    } else {
637        Ok(UpdateEncoding::Masked)
638    }
639}
640
641fn varu32_len(value: u32) -> usize {
642    if value < (1 << 7) {
643        1
644    } else if value < (1 << 14) {
645        2
646    } else if value < (1 << 21) {
647        3
648    } else if value < (1 << 28) {
649        4
650    } else {
651        5
652    }
653}
654
655fn encode_destroy_body(
656    baseline: &Snapshot,
657    current: &Snapshot,
658    destroy_count: usize,
659    limits: &CodecLimits,
660    writer: &mut BitWriter<'_>,
661) -> CodecResult<()> {
662    if destroy_count > limits.max_entities_destroy {
663        return Err(CodecError::LimitsExceeded {
664            kind: LimitKind::EntitiesDestroy,
665            limit: limits.max_entities_destroy,
666            actual: destroy_count,
667        });
668    }
669
670    writer.align_to_byte()?;
671    writer.write_varu32(destroy_count as u32)?;
672
673    let mut i = 0usize;
674    let mut j = 0usize;
675    while i < baseline.entities.len() || j < current.entities.len() {
676        let base = baseline.entities.get(i);
677        let curr = current.entities.get(j);
678        match (base, curr) {
679            (Some(b), Some(c)) => {
680                if b.id.raw() < c.id.raw() {
681                    writer.align_to_byte()?;
682                    writer.write_u32_aligned(b.id.raw())?;
683                    i += 1;
684                } else if b.id.raw() > c.id.raw() {
685                    j += 1;
686                } else {
687                    i += 1;
688                    j += 1;
689                }
690            }
691            (Some(b), None) => {
692                writer.align_to_byte()?;
693                writer.write_u32_aligned(b.id.raw())?;
694                i += 1;
695            }
696            (None, Some(_)) => {
697                j += 1;
698            }
699            (None, None) => break,
700        }
701    }
702
703    writer.align_to_byte()?;
704    Ok(())
705}
706
707fn encode_create_body(
708    schema: &schema::Schema,
709    baseline: &Snapshot,
710    current: &Snapshot,
711    create_count: usize,
712    limits: &CodecLimits,
713    writer: &mut BitWriter<'_>,
714) -> CodecResult<()> {
715    if create_count > limits.max_entities_create {
716        return Err(CodecError::LimitsExceeded {
717            kind: LimitKind::EntitiesCreate,
718            limit: limits.max_entities_create,
719            actual: create_count,
720        });
721    }
722
723    writer.align_to_byte()?;
724    writer.write_varu32(create_count as u32)?;
725
726    let mut i = 0usize;
727    let mut j = 0usize;
728    while i < baseline.entities.len() || j < current.entities.len() {
729        let base = baseline.entities.get(i);
730        let curr = current.entities.get(j);
731        match (base, curr) {
732            (Some(b), Some(c)) => {
733                if b.id.raw() < c.id.raw() {
734                    i += 1;
735                } else if b.id.raw() > c.id.raw() {
736                    write_create_entity(schema, c, limits, writer)?;
737                    j += 1;
738                } else {
739                    i += 1;
740                    j += 1;
741                }
742            }
743            (Some(_), None) => {
744                i += 1;
745            }
746            (None, Some(c)) => {
747                write_create_entity(schema, c, limits, writer)?;
748                j += 1;
749            }
750            (None, None) => break,
751        }
752    }
753
754    writer.align_to_byte()?;
755    Ok(())
756}
757
758fn encode_update_body_masked(
759    schema: &schema::Schema,
760    baseline: &Snapshot,
761    current: &Snapshot,
762    update_count: usize,
763    limits: &CodecLimits,
764    scratch: &mut CodecScratch,
765    writer: &mut BitWriter<'_>,
766) -> CodecResult<()> {
767    if update_count > limits.max_entities_update {
768        return Err(CodecError::LimitsExceeded {
769            kind: LimitKind::EntitiesUpdate,
770            limit: limits.max_entities_update,
771            actual: update_count,
772        });
773    }
774
775    writer.align_to_byte()?;
776    writer.write_varu32(update_count as u32)?;
777
778    let mut i = 0usize;
779    let mut j = 0usize;
780    while i < baseline.entities.len() || j < current.entities.len() {
781        let base = baseline.entities.get(i);
782        let curr = current.entities.get(j);
783        match (base, curr) {
784            (Some(b), Some(c)) => {
785                if b.id.raw() < c.id.raw() {
786                    i += 1;
787                } else if b.id.raw() > c.id.raw() {
788                    j += 1;
789                } else {
790                    if entity_has_updates(schema, b, c, limits)? {
791                        writer.align_to_byte()?;
792                        writer.write_u32_aligned(c.id.raw())?;
793                        ensure_component_presence_matches(schema, b, c)?;
794                        write_update_components(schema, b, c, limits, scratch, writer)?;
795                    }
796                    i += 1;
797                    j += 1;
798                }
799            }
800            (Some(_), None) => i += 1,
801            (None, Some(_)) => j += 1,
802            (None, None) => break,
803        }
804    }
805
806    writer.align_to_byte()?;
807    Ok(())
808}
809
810#[allow(dead_code)]
811fn encode_update_body_sparse_varint(
812    schema: &schema::Schema,
813    baseline: &Snapshot,
814    current: &Snapshot,
815    update_count: usize,
816    limits: &CodecLimits,
817    scratch: &mut CodecScratch,
818    writer: &mut BitWriter<'_>,
819) -> CodecResult<()> {
820    if update_count > limits.max_entities_update {
821        return Err(CodecError::LimitsExceeded {
822            kind: LimitKind::EntitiesUpdate,
823            limit: limits.max_entities_update,
824            actual: update_count,
825        });
826    }
827
828    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
829    let entry_limit = limits
830        .max_entities_update
831        .saturating_mul(limits.max_components_per_entity);
832    if entry_count > entry_limit {
833        return Err(CodecError::LimitsExceeded {
834            kind: LimitKind::EntitiesUpdate,
835            limit: entry_limit,
836            actual: entry_count,
837        });
838    }
839
840    writer.align_to_byte()?;
841    writer.write_varu32(entry_count as u32)?;
842
843    let mut baseline_iter = baseline.entities.iter();
844    let mut current_iter = current.entities.iter();
845    let mut baseline_next = baseline_iter.next();
846    let mut current_next = current_iter.next();
847    let component_count = schema.components.len();
848
849    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
850        match base.id.cmp(&curr.id) {
851            Ordering::Less => {
852                baseline_next = baseline_iter.next();
853            }
854            Ordering::Greater => {
855                current_next = current_iter.next();
856            }
857            Ordering::Equal => {
858                if entity_has_updates(schema, base, curr, limits)? {
859                    for component in &schema.components {
860                        let base_component = find_component(base, component.id);
861                        let curr_component = find_component(curr, component.id);
862                        if base_component.is_some() != curr_component.is_some() {
863                            return Err(CodecError::InvalidMask {
864                                kind: MaskKind::ComponentMask,
865                                reason: MaskReason::ComponentPresenceMismatch {
866                                    component: component.id,
867                                },
868                            });
869                        }
870                        if let (Some(base_component), Some(curr_component)) =
871                            (base_component, curr_component)
872                        {
873                            if base_component.fields.len() != component.fields.len()
874                                || curr_component.fields.len() != component.fields.len()
875                            {
876                                return Err(CodecError::InvalidMask {
877                                    kind: MaskKind::FieldMask {
878                                        component: component.id,
879                                    },
880                                    reason: MaskReason::FieldCountMismatch {
881                                        expected: component.fields.len(),
882                                        actual: base_component
883                                            .fields
884                                            .len()
885                                            .max(curr_component.fields.len()),
886                                    },
887                                });
888                            }
889                            if component.fields.len() > limits.max_fields_per_component {
890                                return Err(CodecError::LimitsExceeded {
891                                    kind: LimitKind::FieldsPerComponent,
892                                    limit: limits.max_fields_per_component,
893                                    actual: component.fields.len(),
894                                });
895                            }
896                            let (_, field_mask) = scratch.component_and_field_masks_mut(
897                                component_count,
898                                component.fields.len(),
899                            );
900                            let field_mask = compute_field_mask_into(
901                                component,
902                                base_component,
903                                curr_component,
904                                field_mask,
905                            )?;
906                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
907                            if changed_fields > 0 {
908                                writer.align_to_byte()?;
909                                writer.write_u32_aligned(curr.id.raw())?;
910                                writer.write_u16_aligned(component.id.get())?;
911                                writer.write_varu32(changed_fields as u32)?;
912                                for (idx, field) in component.fields.iter().enumerate() {
913                                    if field_mask[idx] {
914                                        writer.align_to_byte()?;
915                                        writer.write_varu32(idx as u32)?;
916                                        write_field_value_sparse(
917                                            component.id,
918                                            *field,
919                                            curr_component.fields[idx],
920                                            writer,
921                                        )?;
922                                    }
923                                }
924                            }
925                        }
926                    }
927                }
928                baseline_next = baseline_iter.next();
929                current_next = current_iter.next();
930            }
931        }
932    }
933
934    writer.align_to_byte()?;
935    Ok(())
936}
937
938fn encode_update_body_sparse_packed(
939    schema: &schema::Schema,
940    baseline: &Snapshot,
941    current: &Snapshot,
942    update_count: usize,
943    limits: &CodecLimits,
944    scratch: &mut CodecScratch,
945    writer: &mut BitWriter<'_>,
946) -> CodecResult<()> {
947    if update_count > limits.max_entities_update {
948        return Err(CodecError::LimitsExceeded {
949            kind: LimitKind::EntitiesUpdate,
950            limit: limits.max_entities_update,
951            actual: update_count,
952        });
953    }
954
955    let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
956    let entry_limit = limits
957        .max_entities_update
958        .saturating_mul(limits.max_components_per_entity);
959    if entry_count > entry_limit {
960        return Err(CodecError::LimitsExceeded {
961            kind: LimitKind::EntitiesUpdate,
962            limit: entry_limit,
963            actual: entry_count,
964        });
965    }
966
967    writer.align_to_byte()?;
968    writer.write_varu32(entry_count as u32)?;
969
970    let mut baseline_iter = baseline.entities.iter();
971    let mut current_iter = current.entities.iter();
972    let mut baseline_next = baseline_iter.next();
973    let mut current_next = current_iter.next();
974    let component_count = schema.components.len();
975
976    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
977        match base.id.cmp(&curr.id) {
978            Ordering::Less => {
979                baseline_next = baseline_iter.next();
980            }
981            Ordering::Greater => {
982                current_next = current_iter.next();
983            }
984            Ordering::Equal => {
985                if entity_has_updates(schema, base, curr, limits)? {
986                    for component in &schema.components {
987                        let base_component = find_component(base, component.id);
988                        let curr_component = find_component(curr, component.id);
989                        if base_component.is_some() != curr_component.is_some() {
990                            return Err(CodecError::InvalidMask {
991                                kind: MaskKind::ComponentMask,
992                                reason: MaskReason::ComponentPresenceMismatch {
993                                    component: component.id,
994                                },
995                            });
996                        }
997                        if let (Some(base_component), Some(curr_component)) =
998                            (base_component, curr_component)
999                        {
1000                            if base_component.fields.len() != component.fields.len()
1001                                || curr_component.fields.len() != component.fields.len()
1002                            {
1003                                return Err(CodecError::InvalidMask {
1004                                    kind: MaskKind::FieldMask {
1005                                        component: component.id,
1006                                    },
1007                                    reason: MaskReason::FieldCountMismatch {
1008                                        expected: component.fields.len(),
1009                                        actual: base_component
1010                                            .fields
1011                                            .len()
1012                                            .max(curr_component.fields.len()),
1013                                    },
1014                                });
1015                            }
1016                            if component.fields.len() > limits.max_fields_per_component {
1017                                return Err(CodecError::LimitsExceeded {
1018                                    kind: LimitKind::FieldsPerComponent,
1019                                    limit: limits.max_fields_per_component,
1020                                    actual: component.fields.len(),
1021                                });
1022                            }
1023                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1024                                component_count,
1025                                component.fields.len(),
1026                            );
1027                            let field_mask = compute_field_mask_into(
1028                                component,
1029                                base_component,
1030                                curr_component,
1031                                field_mask,
1032                            )?;
1033                            let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1034                            if changed_fields > 0 {
1035                                writer.align_to_byte()?;
1036                                writer.write_varu32(curr.id.raw())?;
1037                                writer.write_varu32(component.id.get() as u32)?;
1038                                writer.write_varu32(changed_fields as u32)?;
1039                                let index_bits =
1040                                    required_bits(component.fields.len().saturating_sub(1) as u64);
1041                                for (idx, field) in component.fields.iter().enumerate() {
1042                                    if field_mask[idx] {
1043                                        if index_bits > 0 {
1044                                            writer.write_bits(idx as u64, index_bits)?;
1045                                        }
1046                                        write_field_value_sparse(
1047                                            component.id,
1048                                            *field,
1049                                            curr_component.fields[idx],
1050                                            writer,
1051                                        )?;
1052                                    }
1053                                }
1054                            }
1055                        }
1056                    }
1057                }
1058                baseline_next = baseline_iter.next();
1059                current_next = current_iter.next();
1060            }
1061        }
1062    }
1063
1064    writer.align_to_byte()?;
1065    Ok(())
1066}
1067
1068fn count_sparse_update_entries(
1069    schema: &schema::Schema,
1070    baseline: &Snapshot,
1071    current: &Snapshot,
1072    limits: &CodecLimits,
1073    scratch: &mut CodecScratch,
1074) -> CodecResult<usize> {
1075    let component_count = schema.components.len();
1076    let mut count = 0usize;
1077    let mut baseline_iter = baseline.entities.iter();
1078    let mut current_iter = current.entities.iter();
1079    let mut baseline_next = baseline_iter.next();
1080    let mut current_next = current_iter.next();
1081
1082    while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1083        match base.id.cmp(&curr.id) {
1084            Ordering::Less => baseline_next = baseline_iter.next(),
1085            Ordering::Greater => current_next = current_iter.next(),
1086            Ordering::Equal => {
1087                if entity_has_updates(schema, base, curr, limits)? {
1088                    for component in &schema.components {
1089                        let base_component = find_component(base, component.id);
1090                        let curr_component = find_component(curr, component.id);
1091                        if base_component.is_some() != curr_component.is_some() {
1092                            return Err(CodecError::InvalidMask {
1093                                kind: MaskKind::ComponentMask,
1094                                reason: MaskReason::ComponentPresenceMismatch {
1095                                    component: component.id,
1096                                },
1097                            });
1098                        }
1099                        if let (Some(base_component), Some(curr_component)) =
1100                            (base_component, curr_component)
1101                        {
1102                            if base_component.fields.len() != component.fields.len()
1103                                || curr_component.fields.len() != component.fields.len()
1104                            {
1105                                return Err(CodecError::InvalidMask {
1106                                    kind: MaskKind::FieldMask {
1107                                        component: component.id,
1108                                    },
1109                                    reason: MaskReason::FieldCountMismatch {
1110                                        expected: component.fields.len(),
1111                                        actual: base_component
1112                                            .fields
1113                                            .len()
1114                                            .max(curr_component.fields.len()),
1115                                    },
1116                                });
1117                            }
1118                            if component.fields.len() > limits.max_fields_per_component {
1119                                return Err(CodecError::LimitsExceeded {
1120                                    kind: LimitKind::FieldsPerComponent,
1121                                    limit: limits.max_fields_per_component,
1122                                    actual: component.fields.len(),
1123                                });
1124                            }
1125                            let (_, field_mask) = scratch.component_and_field_masks_mut(
1126                                component_count,
1127                                component.fields.len(),
1128                            );
1129                            let field_mask = compute_field_mask_into(
1130                                component,
1131                                base_component,
1132                                curr_component,
1133                                field_mask,
1134                            )?;
1135                            if field_mask.iter().any(|bit| *bit) {
1136                                count += 1;
1137                            }
1138                        }
1139                    }
1140                }
1141                baseline_next = baseline_iter.next();
1142                current_next = current_iter.next();
1143            }
1144        }
1145    }
1146
1147    Ok(count)
1148}
1149
1150fn write_create_entity(
1151    schema: &schema::Schema,
1152    entity: &EntitySnapshot,
1153    limits: &CodecLimits,
1154    writer: &mut BitWriter<'_>,
1155) -> CodecResult<()> {
1156    writer.align_to_byte()?;
1157    writer.write_u32_aligned(entity.id.raw())?;
1158    ensure_known_components(schema, entity)?;
1159    write_component_mask(schema, entity, writer)?;
1160    for component in schema.components.iter() {
1161        if let Some(snapshot) = find_component(entity, component.id) {
1162            write_full_component(component, snapshot, limits, writer)?;
1163        }
1164    }
1165    Ok(())
1166}
1167
1168fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1169    if body.len() > limits.max_section_bytes {
1170        return Err(CodecError::LimitsExceeded {
1171            kind: LimitKind::SectionBytes,
1172            limit: limits.max_section_bytes,
1173            actual: body.len(),
1174        });
1175    }
1176
1177    let mut reader = BitReader::new(body);
1178    reader.align_to_byte()?;
1179    let count = reader.read_varu32()? as usize;
1180    if count > limits.max_entities_destroy {
1181        return Err(CodecError::LimitsExceeded {
1182            kind: LimitKind::EntitiesDestroy,
1183            limit: limits.max_entities_destroy,
1184            actual: count,
1185        });
1186    }
1187
1188    let mut ids = Vec::with_capacity(count);
1189    let mut prev: Option<u32> = None;
1190    for _ in 0..count {
1191        reader.align_to_byte()?;
1192        let id = reader.read_u32_aligned()?;
1193        if let Some(prev_id) = prev {
1194            if id <= prev_id {
1195                return Err(CodecError::InvalidEntityOrder {
1196                    previous: prev_id,
1197                    current: id,
1198                });
1199            }
1200        }
1201        prev = Some(id);
1202        ids.push(EntityId::new(id));
1203    }
1204    reader.align_to_byte()?;
1205    if reader.bits_remaining() != 0 {
1206        return Err(CodecError::TrailingSectionData {
1207            section: SectionTag::EntityDestroy,
1208            remaining_bits: reader.bits_remaining(),
1209        });
1210    }
1211    Ok(ids)
1212}
1213
1214fn decode_create_section(
1215    schema: &schema::Schema,
1216    body: &[u8],
1217    limits: &CodecLimits,
1218) -> CodecResult<Vec<EntitySnapshot>> {
1219    if body.len() > limits.max_section_bytes {
1220        return Err(CodecError::LimitsExceeded {
1221            kind: LimitKind::SectionBytes,
1222            limit: limits.max_section_bytes,
1223            actual: body.len(),
1224        });
1225    }
1226
1227    let mut reader = BitReader::new(body);
1228    reader.align_to_byte()?;
1229    let count = reader.read_varu32()? as usize;
1230    if count > limits.max_entities_create {
1231        return Err(CodecError::LimitsExceeded {
1232            kind: LimitKind::EntitiesCreate,
1233            limit: limits.max_entities_create,
1234            actual: count,
1235        });
1236    }
1237
1238    let mut entities = Vec::with_capacity(count);
1239    let mut prev: Option<u32> = None;
1240    for _ in 0..count {
1241        reader.align_to_byte()?;
1242        let id = reader.read_u32_aligned()?;
1243        if let Some(prev_id) = prev {
1244            if id <= prev_id {
1245                return Err(CodecError::InvalidEntityOrder {
1246                    previous: prev_id,
1247                    current: id,
1248                });
1249            }
1250        }
1251        prev = Some(id);
1252
1253        let component_mask = read_mask(
1254            &mut reader,
1255            schema.components.len(),
1256            MaskKind::ComponentMask,
1257        )?;
1258
1259        let mut components = Vec::new();
1260        for (idx, component) in schema.components.iter().enumerate() {
1261            if component_mask[idx] {
1262                let fields = decode_full_component(component, &mut reader, limits)?;
1263                components.push(ComponentSnapshot {
1264                    id: component.id,
1265                    fields,
1266                });
1267            }
1268        }
1269
1270        let entity = EntitySnapshot {
1271            id: EntityId::new(id),
1272            components,
1273        };
1274        ensure_known_components(schema, &entity)?;
1275        entities.push(entity);
1276    }
1277
1278    reader.align_to_byte()?;
1279    if reader.bits_remaining() != 0 {
1280        return Err(CodecError::TrailingSectionData {
1281            section: SectionTag::EntityCreate,
1282            remaining_bits: reader.bits_remaining(),
1283        });
1284    }
1285    Ok(entities)
1286}
1287
1288fn decode_update_section_masked(
1289    schema: &schema::Schema,
1290    body: &[u8],
1291    limits: &CodecLimits,
1292) -> CodecResult<Vec<DeltaUpdateEntity>> {
1293    if body.len() > limits.max_section_bytes {
1294        return Err(CodecError::LimitsExceeded {
1295            kind: LimitKind::SectionBytes,
1296            limit: limits.max_section_bytes,
1297            actual: body.len(),
1298        });
1299    }
1300
1301    let mut reader = BitReader::new(body);
1302    reader.align_to_byte()?;
1303    let count = reader.read_varu32()? as usize;
1304    if count > limits.max_entities_update {
1305        return Err(CodecError::LimitsExceeded {
1306            kind: LimitKind::EntitiesUpdate,
1307            limit: limits.max_entities_update,
1308            actual: count,
1309        });
1310    }
1311
1312    let mut updates = Vec::with_capacity(count);
1313    let mut prev: Option<u32> = None;
1314    for _ in 0..count {
1315        reader.align_to_byte()?;
1316        let id = reader.read_u32_aligned()?;
1317        if let Some(prev_id) = prev {
1318            if id <= prev_id {
1319                return Err(CodecError::InvalidEntityOrder {
1320                    previous: prev_id,
1321                    current: id,
1322                });
1323            }
1324        }
1325        prev = Some(id);
1326
1327        let component_mask = read_mask(
1328            &mut reader,
1329            schema.components.len(),
1330            MaskKind::ComponentMask,
1331        )?;
1332        let mut components = Vec::new();
1333        for (idx, component) in schema.components.iter().enumerate() {
1334            if component_mask[idx] {
1335                let fields = decode_update_component(component, &mut reader, limits)?;
1336                components.push(DeltaUpdateComponent {
1337                    id: component.id,
1338                    fields,
1339                });
1340            }
1341        }
1342
1343        updates.push(DeltaUpdateEntity {
1344            id: EntityId::new(id),
1345            components,
1346        });
1347    }
1348
1349    reader.align_to_byte()?;
1350    if reader.bits_remaining() != 0 {
1351        return Err(CodecError::TrailingSectionData {
1352            section: SectionTag::EntityUpdate,
1353            remaining_bits: reader.bits_remaining(),
1354        });
1355    }
1356    Ok(updates)
1357}
1358
1359fn decode_update_section_sparse_varint(
1360    schema: &schema::Schema,
1361    body: &[u8],
1362    limits: &CodecLimits,
1363) -> CodecResult<Vec<DeltaUpdateEntity>> {
1364    if body.len() > limits.max_section_bytes {
1365        return Err(CodecError::LimitsExceeded {
1366            kind: LimitKind::SectionBytes,
1367            limit: limits.max_section_bytes,
1368            actual: body.len(),
1369        });
1370    }
1371
1372    let mut reader = BitReader::new(body);
1373    reader.align_to_byte()?;
1374    let entry_count = reader.read_varu32()? as usize;
1375    let entry_limit = limits
1376        .max_entities_update
1377        .saturating_mul(limits.max_components_per_entity);
1378    if entry_count > entry_limit {
1379        return Err(CodecError::LimitsExceeded {
1380            kind: LimitKind::EntitiesUpdate,
1381            limit: entry_limit,
1382            actual: entry_count,
1383        });
1384    }
1385
1386    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1387    let mut prev_entity: Option<u32> = None;
1388    let mut prev_component: Option<u16> = None;
1389    for _ in 0..entry_count {
1390        reader.align_to_byte()?;
1391        let entity_id = reader.read_u32_aligned()?;
1392        let component_raw = reader.read_u16_aligned()?;
1393        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1394            kind: MaskKind::ComponentMask,
1395            reason: MaskReason::InvalidComponentId { raw: component_raw },
1396        })?;
1397        if let Some(prev) = prev_entity {
1398            if entity_id < prev {
1399                return Err(CodecError::InvalidEntityOrder {
1400                    previous: prev,
1401                    current: entity_id,
1402                });
1403            }
1404            if entity_id == prev {
1405                if let Some(prev_component) = prev_component {
1406                    if component_raw <= prev_component {
1407                        return Err(CodecError::InvalidEntityOrder {
1408                            previous: prev,
1409                            current: entity_id,
1410                        });
1411                    }
1412                }
1413            }
1414        }
1415        prev_entity = Some(entity_id);
1416        prev_component = Some(component_raw);
1417
1418        let component = schema
1419            .components
1420            .iter()
1421            .find(|component| component.id == component_id)
1422            .ok_or(CodecError::InvalidMask {
1423                kind: MaskKind::ComponentMask,
1424                reason: MaskReason::UnknownComponent {
1425                    component: component_id,
1426                },
1427            })?;
1428
1429        let field_count = reader.read_varu32()? as usize;
1430        if field_count == 0 {
1431            return Err(CodecError::InvalidMask {
1432                kind: MaskKind::FieldMask {
1433                    component: component.id,
1434                },
1435                reason: MaskReason::EmptyFieldMask {
1436                    component: component.id,
1437                },
1438            });
1439        }
1440        if field_count > limits.max_fields_per_component {
1441            return Err(CodecError::LimitsExceeded {
1442                kind: LimitKind::FieldsPerComponent,
1443                limit: limits.max_fields_per_component,
1444                actual: field_count,
1445            });
1446        }
1447        if field_count > component.fields.len() {
1448            return Err(CodecError::InvalidMask {
1449                kind: MaskKind::FieldMask {
1450                    component: component.id,
1451                },
1452                reason: MaskReason::FieldCountMismatch {
1453                    expected: component.fields.len(),
1454                    actual: field_count,
1455                },
1456            });
1457        }
1458
1459        let mut fields = Vec::with_capacity(field_count);
1460        let mut prev_index: Option<usize> = None;
1461        for _ in 0..field_count {
1462            reader.align_to_byte()?;
1463            let field_index = reader.read_varu32()? as usize;
1464            if field_index >= component.fields.len() {
1465                return Err(CodecError::InvalidMask {
1466                    kind: MaskKind::FieldMask {
1467                        component: component.id,
1468                    },
1469                    reason: MaskReason::InvalidFieldIndex {
1470                        field_index,
1471                        max: component.fields.len(),
1472                    },
1473                });
1474            }
1475            if let Some(prev_index) = prev_index {
1476                if field_index <= prev_index {
1477                    return Err(CodecError::InvalidMask {
1478                        kind: MaskKind::FieldMask {
1479                            component: component.id,
1480                        },
1481                        reason: MaskReason::InvalidFieldIndex {
1482                            field_index,
1483                            max: component.fields.len(),
1484                        },
1485                    });
1486                }
1487            }
1488            prev_index = Some(field_index);
1489            let field = component.fields[field_index];
1490            let value = read_field_value_sparse(component.id, field, &mut reader)?;
1491            fields.push((field_index, value));
1492        }
1493
1494        if let Some(last) = updates.last_mut() {
1495            if last.id.raw() == entity_id {
1496                last.components.push(DeltaUpdateComponent {
1497                    id: component.id,
1498                    fields,
1499                });
1500                continue;
1501            }
1502        }
1503
1504        updates.push(DeltaUpdateEntity {
1505            id: EntityId::new(entity_id),
1506            components: vec![DeltaUpdateComponent {
1507                id: component.id,
1508                fields,
1509            }],
1510        });
1511    }
1512
1513    reader.align_to_byte()?;
1514    if reader.bits_remaining() != 0 {
1515        return Err(CodecError::TrailingSectionData {
1516            section: SectionTag::EntityUpdateSparse,
1517            remaining_bits: reader.bits_remaining(),
1518        });
1519    }
1520
1521    let unique_entities = updates.len();
1522    if unique_entities > limits.max_entities_update {
1523        return Err(CodecError::LimitsExceeded {
1524            kind: LimitKind::EntitiesUpdate,
1525            limit: limits.max_entities_update,
1526            actual: unique_entities,
1527        });
1528    }
1529
1530    Ok(updates)
1531}
1532
1533fn decode_update_section_sparse_packed(
1534    schema: &schema::Schema,
1535    body: &[u8],
1536    limits: &CodecLimits,
1537) -> CodecResult<Vec<DeltaUpdateEntity>> {
1538    if body.len() > limits.max_section_bytes {
1539        return Err(CodecError::LimitsExceeded {
1540            kind: LimitKind::SectionBytes,
1541            limit: limits.max_section_bytes,
1542            actual: body.len(),
1543        });
1544    }
1545
1546    let mut reader = BitReader::new(body);
1547    reader.align_to_byte()?;
1548    let entry_count = reader.read_varu32()? as usize;
1549    let entry_limit = limits
1550        .max_entities_update
1551        .saturating_mul(limits.max_components_per_entity);
1552    if entry_count > entry_limit {
1553        return Err(CodecError::LimitsExceeded {
1554            kind: LimitKind::EntitiesUpdate,
1555            limit: entry_limit,
1556            actual: entry_count,
1557        });
1558    }
1559
1560    let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1561    let mut prev_entity: Option<u32> = None;
1562    let mut prev_component: Option<u16> = None;
1563    for _ in 0..entry_count {
1564        reader.align_to_byte()?;
1565        let entity_id = reader.read_varu32()?;
1566        let component_raw = reader.read_varu32()?;
1567        if component_raw > u16::MAX as u32 {
1568            return Err(CodecError::InvalidMask {
1569                kind: MaskKind::ComponentMask,
1570                reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1571            });
1572        }
1573        let component_raw = component_raw as u16;
1574        let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1575            kind: MaskKind::ComponentMask,
1576            reason: MaskReason::InvalidComponentId { raw: component_raw },
1577        })?;
1578        if let Some(prev) = prev_entity {
1579            if entity_id < prev {
1580                return Err(CodecError::InvalidEntityOrder {
1581                    previous: prev,
1582                    current: entity_id,
1583                });
1584            }
1585            if entity_id == prev {
1586                if let Some(prev_component) = prev_component {
1587                    if component_raw <= prev_component {
1588                        return Err(CodecError::InvalidEntityOrder {
1589                            previous: prev,
1590                            current: entity_id,
1591                        });
1592                    }
1593                }
1594            }
1595        }
1596        prev_entity = Some(entity_id);
1597        prev_component = Some(component_raw);
1598
1599        let component = schema
1600            .components
1601            .iter()
1602            .find(|component| component.id == component_id)
1603            .ok_or(CodecError::InvalidMask {
1604                kind: MaskKind::ComponentMask,
1605                reason: MaskReason::UnknownComponent {
1606                    component: component_id,
1607                },
1608            })?;
1609
1610        let field_count = reader.read_varu32()? as usize;
1611        if field_count == 0 {
1612            return Err(CodecError::InvalidMask {
1613                kind: MaskKind::FieldMask {
1614                    component: component.id,
1615                },
1616                reason: MaskReason::EmptyFieldMask {
1617                    component: component.id,
1618                },
1619            });
1620        }
1621        if field_count > limits.max_fields_per_component {
1622            return Err(CodecError::LimitsExceeded {
1623                kind: LimitKind::FieldsPerComponent,
1624                limit: limits.max_fields_per_component,
1625                actual: field_count,
1626            });
1627        }
1628        if field_count > component.fields.len() {
1629            return Err(CodecError::InvalidMask {
1630                kind: MaskKind::FieldMask {
1631                    component: component.id,
1632                },
1633                reason: MaskReason::FieldCountMismatch {
1634                    expected: component.fields.len(),
1635                    actual: field_count,
1636                },
1637            });
1638        }
1639
1640        let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1641        let mut fields = Vec::with_capacity(field_count);
1642        let mut prev_index: Option<usize> = None;
1643        for _ in 0..field_count {
1644            let field_index = if index_bits == 0 {
1645                0
1646            } else {
1647                reader.read_bits(index_bits)? as usize
1648            };
1649            if field_index >= component.fields.len() {
1650                return Err(CodecError::InvalidMask {
1651                    kind: MaskKind::FieldMask {
1652                        component: component.id,
1653                    },
1654                    reason: MaskReason::InvalidFieldIndex {
1655                        field_index,
1656                        max: component.fields.len(),
1657                    },
1658                });
1659            }
1660            if let Some(prev_index) = prev_index {
1661                if field_index <= prev_index {
1662                    return Err(CodecError::InvalidMask {
1663                        kind: MaskKind::FieldMask {
1664                            component: component.id,
1665                        },
1666                        reason: MaskReason::InvalidFieldIndex {
1667                            field_index,
1668                            max: component.fields.len(),
1669                        },
1670                    });
1671                }
1672            }
1673            prev_index = Some(field_index);
1674            let field = component.fields[field_index];
1675            let value = read_field_value_sparse(component.id, field, &mut reader)?;
1676            fields.push((field_index, value));
1677        }
1678
1679        if let Some(last) = updates.last_mut() {
1680            if last.id.raw() == entity_id {
1681                last.components.push(DeltaUpdateComponent {
1682                    id: component.id,
1683                    fields,
1684                });
1685                continue;
1686            }
1687        }
1688
1689        updates.push(DeltaUpdateEntity {
1690            id: EntityId::new(entity_id),
1691            components: vec![DeltaUpdateComponent {
1692                id: component.id,
1693                fields,
1694            }],
1695        });
1696    }
1697
1698    reader.align_to_byte()?;
1699    if reader.bits_remaining() != 0 {
1700        return Err(CodecError::TrailingSectionData {
1701            section: SectionTag::EntityUpdateSparsePacked,
1702            remaining_bits: reader.bits_remaining(),
1703        });
1704    }
1705
1706    let unique_entities = updates.len();
1707    if unique_entities > limits.max_entities_update {
1708        return Err(CodecError::LimitsExceeded {
1709            kind: LimitKind::EntitiesUpdate,
1710            limit: limits.max_entities_update,
1711            actual: unique_entities,
1712        });
1713    }
1714
1715    Ok(updates)
1716}
1717
1718fn decode_delta_sections(
1719    schema: &schema::Schema,
1720    packet: &WirePacket<'_>,
1721    limits: &CodecLimits,
1722) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
1723    let mut destroys: Option<Vec<EntityId>> = None;
1724    let mut creates: Option<Vec<EntitySnapshot>> = None;
1725    let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
1726    let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
1727
1728    for section in &packet.sections {
1729        match section.tag {
1730            SectionTag::EntityDestroy => {
1731                if destroys.is_some() {
1732                    return Err(CodecError::DuplicateSection {
1733                        section: section.tag,
1734                    });
1735                }
1736                destroys = Some(decode_destroy_section(section.body, limits)?);
1737            }
1738            SectionTag::EntityCreate => {
1739                if creates.is_some() {
1740                    return Err(CodecError::DuplicateSection {
1741                        section: section.tag,
1742                    });
1743                }
1744                creates = Some(decode_create_section(schema, section.body, limits)?);
1745            }
1746            SectionTag::EntityUpdate => {
1747                if updates_masked.is_some() {
1748                    return Err(CodecError::DuplicateSection {
1749                        section: section.tag,
1750                    });
1751                }
1752                updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
1753            }
1754            SectionTag::EntityUpdateSparse => {
1755                if updates_sparse.is_some() {
1756                    return Err(CodecError::DuplicateSection {
1757                        section: section.tag,
1758                    });
1759                }
1760                updates_sparse = Some(decode_update_section_sparse_varint(
1761                    schema,
1762                    section.body,
1763                    limits,
1764                )?);
1765            }
1766            SectionTag::EntityUpdateSparsePacked => {
1767                if updates_sparse.is_some() {
1768                    return Err(CodecError::DuplicateSection {
1769                        section: section.tag,
1770                    });
1771                }
1772                updates_sparse = Some(decode_update_section_sparse_packed(
1773                    schema,
1774                    section.body,
1775                    limits,
1776                )?);
1777            }
1778            _ => {
1779                return Err(CodecError::UnexpectedSection {
1780                    section: section.tag,
1781                });
1782            }
1783        }
1784    }
1785
1786    Ok((
1787        destroys.unwrap_or_default(),
1788        creates.unwrap_or_default(),
1789        match (updates_masked, updates_sparse) {
1790            (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
1791            (Some(updates), None) => updates,
1792            (None, Some(updates)) => updates,
1793            (None, None) => Vec::new(),
1794        },
1795    ))
1796}
1797
1798#[derive(Debug, Clone, PartialEq, Eq)]
1799pub struct DeltaDecoded {
1800    pub tick: SnapshotTick,
1801    pub baseline_tick: SnapshotTick,
1802    pub destroys: Vec<EntityId>,
1803    pub creates: Vec<EntitySnapshot>,
1804    pub updates: Vec<DeltaUpdateEntity>,
1805}
1806
1807#[derive(Debug, Clone, PartialEq, Eq)]
1808pub struct DeltaUpdateEntity {
1809    pub id: EntityId,
1810    pub components: Vec<DeltaUpdateComponent>,
1811}
1812
1813#[derive(Debug, Clone, PartialEq, Eq)]
1814pub struct DeltaUpdateComponent {
1815    pub id: ComponentId,
1816    pub fields: Vec<(usize, FieldValue)>,
1817}
1818
1819fn apply_destroys(
1820    baseline: &[EntitySnapshot],
1821    destroys: &[EntityId],
1822) -> CodecResult<Vec<EntitySnapshot>> {
1823    let mut result = Vec::with_capacity(baseline.len());
1824    let mut i = 0usize;
1825    let mut j = 0usize;
1826    while i < baseline.len() || j < destroys.len() {
1827        let base = baseline.get(i);
1828        let destroy = destroys.get(j);
1829        match (base, destroy) {
1830            (Some(b), Some(d)) => {
1831                if b.id.raw() < d.raw() {
1832                    result.push(b.clone());
1833                    i += 1;
1834                } else if b.id.raw() > d.raw() {
1835                    return Err(CodecError::EntityNotFound { entity_id: d.raw() });
1836                } else {
1837                    i += 1;
1838                    j += 1;
1839                }
1840            }
1841            (Some(b), None) => {
1842                result.push(b.clone());
1843                i += 1;
1844            }
1845            (None, Some(d)) => {
1846                return Err(CodecError::EntityNotFound { entity_id: d.raw() });
1847            }
1848            (None, None) => break,
1849        }
1850    }
1851    Ok(result)
1852}
1853
1854fn apply_creates(
1855    baseline: Vec<EntitySnapshot>,
1856    creates: Vec<EntitySnapshot>,
1857) -> CodecResult<Vec<EntitySnapshot>> {
1858    let mut result = Vec::with_capacity(baseline.len() + creates.len());
1859    let mut i = 0usize;
1860    let mut j = 0usize;
1861    while i < baseline.len() || j < creates.len() {
1862        let base = baseline.get(i);
1863        let create = creates.get(j);
1864        match (base, create) {
1865            (Some(b), Some(c)) => {
1866                if b.id.raw() < c.id.raw() {
1867                    result.push(b.clone());
1868                    i += 1;
1869                } else if b.id.raw() > c.id.raw() {
1870                    result.push(c.clone());
1871                    j += 1;
1872                } else {
1873                    return Err(CodecError::EntityAlreadyExists {
1874                        entity_id: c.id.raw(),
1875                    });
1876                }
1877            }
1878            (Some(b), None) => {
1879                result.push(b.clone());
1880                i += 1;
1881            }
1882            (None, Some(c)) => {
1883                result.push(c.clone());
1884                j += 1;
1885            }
1886            (None, None) => break,
1887        }
1888    }
1889    Ok(result)
1890}
1891
1892fn apply_updates(
1893    entities: &mut [EntitySnapshot],
1894    updates: &[DeltaUpdateEntity],
1895) -> CodecResult<()> {
1896    for update in updates {
1897        let idx = entities
1898            .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
1899            .map_err(|_| CodecError::EntityNotFound {
1900                entity_id: update.id.raw(),
1901            })?;
1902        let entity = &mut entities[idx];
1903        for component_update in &update.components {
1904            let component = entity
1905                .components
1906                .iter_mut()
1907                .find(|c| c.id == component_update.id)
1908                .ok_or_else(|| CodecError::ComponentNotFound {
1909                    entity_id: update.id.raw(),
1910                    component_id: component_update.id.get(),
1911                })?;
1912            for (field_idx, value) in &component_update.fields {
1913                if *field_idx >= component.fields.len() {
1914                    return Err(CodecError::InvalidMask {
1915                        kind: MaskKind::FieldMask {
1916                            component: component_update.id,
1917                        },
1918                        reason: MaskReason::FieldCountMismatch {
1919                            expected: component.fields.len(),
1920                            actual: *field_idx + 1,
1921                        },
1922                    });
1923                }
1924                component.fields[*field_idx] = *value;
1925            }
1926        }
1927    }
1928    Ok(())
1929}
1930
1931fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
1932    let mut prev: Option<u32> = None;
1933    for entity in entities {
1934        if let Some(prev_id) = prev {
1935            if entity.id.raw() <= prev_id {
1936                return Err(CodecError::InvalidEntityOrder {
1937                    previous: prev_id,
1938                    current: entity.id.raw(),
1939                });
1940            }
1941        }
1942        prev = Some(entity.id.raw());
1943    }
1944    Ok(())
1945}
1946
1947fn write_component_mask(
1948    schema: &schema::Schema,
1949    entity: &EntitySnapshot,
1950    writer: &mut BitWriter<'_>,
1951) -> CodecResult<()> {
1952    for component in &schema.components {
1953        let present = find_component(entity, component.id).is_some();
1954        writer.write_bit(present)?;
1955    }
1956    Ok(())
1957}
1958
1959fn write_full_component(
1960    component: &ComponentDef,
1961    snapshot: &ComponentSnapshot,
1962    limits: &CodecLimits,
1963    writer: &mut BitWriter<'_>,
1964) -> CodecResult<()> {
1965    if snapshot.fields.len() != component.fields.len() {
1966        return Err(CodecError::InvalidMask {
1967            kind: MaskKind::FieldMask {
1968                component: component.id,
1969            },
1970            reason: MaskReason::FieldCountMismatch {
1971                expected: component.fields.len(),
1972                actual: snapshot.fields.len(),
1973            },
1974        });
1975    }
1976    if snapshot.fields.len() > limits.max_fields_per_component {
1977        return Err(CodecError::LimitsExceeded {
1978            kind: LimitKind::FieldsPerComponent,
1979            limit: limits.max_fields_per_component,
1980            actual: snapshot.fields.len(),
1981        });
1982    }
1983
1984    for _ in &component.fields {
1985        writer.write_bit(true)?;
1986    }
1987    for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
1988        write_field_value(component.id, *field, *value, writer)?;
1989    }
1990    Ok(())
1991}
1992
1993fn decode_full_component(
1994    component: &ComponentDef,
1995    reader: &mut BitReader<'_>,
1996    limits: &CodecLimits,
1997) -> CodecResult<Vec<FieldValue>> {
1998    if component.fields.len() > limits.max_fields_per_component {
1999        return Err(CodecError::LimitsExceeded {
2000            kind: LimitKind::FieldsPerComponent,
2001            limit: limits.max_fields_per_component,
2002            actual: component.fields.len(),
2003        });
2004    }
2005
2006    let mask = read_mask(
2007        reader,
2008        component.fields.len(),
2009        MaskKind::FieldMask {
2010            component: component.id,
2011        },
2012    )?;
2013    let mut values = Vec::with_capacity(component.fields.len());
2014    for (idx, field) in component.fields.iter().enumerate() {
2015        if !mask[idx] {
2016            return Err(CodecError::InvalidMask {
2017                kind: MaskKind::FieldMask {
2018                    component: component.id,
2019                },
2020                reason: MaskReason::MissingField { field: field.id },
2021            });
2022        }
2023        values.push(read_field_value(component.id, *field, reader)?);
2024    }
2025    Ok(values)
2026}
2027
2028fn write_update_components(
2029    schema: &schema::Schema,
2030    baseline: &EntitySnapshot,
2031    current: &EntitySnapshot,
2032    limits: &CodecLimits,
2033    scratch: &mut CodecScratch,
2034    writer: &mut BitWriter<'_>,
2035) -> CodecResult<()> {
2036    let component_count = schema.components.len();
2037    let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2038    component_changed.fill(false);
2039    for (idx, component) in schema.components.iter().enumerate() {
2040        let base = find_component(baseline, component.id);
2041        let curr = find_component(current, component.id);
2042        if base.is_some() != curr.is_some() {
2043            return Err(CodecError::InvalidMask {
2044                kind: MaskKind::ComponentMask,
2045                reason: MaskReason::ComponentPresenceMismatch {
2046                    component: component.id,
2047                },
2048            });
2049        }
2050        if let (Some(base), Some(curr)) = (base, curr) {
2051            if base.fields.len() != component.fields.len()
2052                || curr.fields.len() != component.fields.len()
2053            {
2054                return Err(CodecError::InvalidMask {
2055                    kind: MaskKind::FieldMask {
2056                        component: component.id,
2057                    },
2058                    reason: MaskReason::FieldCountMismatch {
2059                        expected: component.fields.len(),
2060                        actual: base.fields.len().max(curr.fields.len()),
2061                    },
2062                });
2063            }
2064            if component.fields.len() > limits.max_fields_per_component {
2065                return Err(CodecError::LimitsExceeded {
2066                    kind: LimitKind::FieldsPerComponent,
2067                    limit: limits.max_fields_per_component,
2068                    actual: component.fields.len(),
2069                });
2070            }
2071            let (component_changed, field_mask) =
2072                scratch.component_and_field_masks_mut(component_count, component.fields.len());
2073            let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2074                .iter()
2075                .any(|b| *b);
2076            writer.write_bit(any_changed)?;
2077            if any_changed {
2078                component_changed[idx] = true;
2079            }
2080        } else {
2081            writer.write_bit(false)?;
2082        }
2083    }
2084
2085    for (idx, component) in schema.components.iter().enumerate() {
2086        let (base, curr) = match (
2087            find_component(baseline, component.id),
2088            find_component(current, component.id),
2089        ) {
2090            (Some(base), Some(curr)) => (base, curr),
2091            _ => continue,
2092        };
2093        if component.fields.len() > limits.max_fields_per_component {
2094            return Err(CodecError::LimitsExceeded {
2095                kind: LimitKind::FieldsPerComponent,
2096                limit: limits.max_fields_per_component,
2097                actual: component.fields.len(),
2098            });
2099        }
2100        let (component_changed, field_mask) =
2101            scratch.component_and_field_masks_mut(component_count, component.fields.len());
2102        if component_changed[idx] {
2103            let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2104            for bit in field_mask {
2105                writer.write_bit(*bit)?;
2106            }
2107            for (((field, _base_val), curr_val), changed) in component
2108                .fields
2109                .iter()
2110                .zip(base.fields.iter())
2111                .zip(curr.fields.iter())
2112                .zip(field_mask.iter())
2113            {
2114                if *changed {
2115                    write_field_value(component.id, *field, *curr_val, writer)?;
2116                }
2117            }
2118        }
2119    }
2120    Ok(())
2121}
2122
2123fn decode_update_component(
2124    component: &ComponentDef,
2125    reader: &mut BitReader<'_>,
2126    limits: &CodecLimits,
2127) -> CodecResult<Vec<(usize, FieldValue)>> {
2128    if component.fields.len() > limits.max_fields_per_component {
2129        return Err(CodecError::LimitsExceeded {
2130            kind: LimitKind::FieldsPerComponent,
2131            limit: limits.max_fields_per_component,
2132            actual: component.fields.len(),
2133        });
2134    }
2135    let mask = read_mask(
2136        reader,
2137        component.fields.len(),
2138        MaskKind::FieldMask {
2139            component: component.id,
2140        },
2141    )?;
2142    if !mask.iter().any(|b| *b) {
2143        return Err(CodecError::InvalidMask {
2144            kind: MaskKind::FieldMask {
2145                component: component.id,
2146            },
2147            reason: MaskReason::EmptyFieldMask {
2148                component: component.id,
2149            },
2150        });
2151    }
2152    let mut fields = Vec::new();
2153    for (idx, field) in component.fields.iter().enumerate() {
2154        if mask[idx] {
2155            let value = read_field_value(component.id, *field, reader)?;
2156            fields.push((idx, value));
2157        }
2158    }
2159    Ok(fields)
2160}
2161
2162fn compute_field_mask_into<'a>(
2163    component: &ComponentDef,
2164    baseline: &ComponentSnapshot,
2165    current: &ComponentSnapshot,
2166    field_mask: &'a mut [bool],
2167) -> CodecResult<&'a [bool]> {
2168    for (((field, base_val), curr_val), slot) in component
2169        .fields
2170        .iter()
2171        .zip(baseline.fields.iter())
2172        .zip(current.fields.iter())
2173        .zip(field_mask.iter_mut())
2174    {
2175        *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2176    }
2177    Ok(field_mask)
2178}
2179
2180fn field_changed(
2181    component_id: ComponentId,
2182    field: FieldDef,
2183    baseline: FieldValue,
2184    current: FieldValue,
2185) -> CodecResult<bool> {
2186    match field.change {
2187        ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2188        ChangePolicy::Threshold { threshold_q } => {
2189            field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2190        }
2191    }
2192}
2193
2194fn field_differs(
2195    component_id: ComponentId,
2196    field: FieldDef,
2197    baseline: FieldValue,
2198    current: FieldValue,
2199) -> CodecResult<bool> {
2200    match (baseline, current) {
2201        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2202        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2203        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2204        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2205        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2206        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2207        _ => Err(CodecError::InvalidValue {
2208            component: component_id,
2209            field: field.id,
2210            reason: ValueReason::TypeMismatch {
2211                expected: codec_name(field.codec),
2212                found: value_name(current),
2213            },
2214        }),
2215    }
2216}
2217
2218fn field_exceeds_threshold(
2219    component_id: ComponentId,
2220    field: FieldDef,
2221    baseline: FieldValue,
2222    current: FieldValue,
2223    threshold_q: u32,
2224) -> CodecResult<bool> {
2225    let threshold_q = threshold_q as u64;
2226    match (baseline, current) {
2227        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2228            Ok((a - b).unsigned_abs() > threshold_q)
2229        }
2230        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2231        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2232        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2233        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2234            Ok((a - b).unsigned_abs() > threshold_q)
2235        }
2236        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2237        _ => Err(CodecError::InvalidValue {
2238            component: component_id,
2239            field: field.id,
2240            reason: ValueReason::TypeMismatch {
2241                expected: codec_name(field.codec),
2242                found: value_name(current),
2243            },
2244        }),
2245    }
2246}
2247
2248fn entity_has_updates(
2249    schema: &schema::Schema,
2250    baseline: &EntitySnapshot,
2251    current: &EntitySnapshot,
2252    limits: &CodecLimits,
2253) -> CodecResult<bool> {
2254    ensure_component_presence_matches(schema, baseline, current)?;
2255    for component in &schema.components {
2256        let base = find_component(baseline, component.id);
2257        let curr = find_component(current, component.id);
2258        if let (Some(base), Some(curr)) = (base, curr) {
2259            if base.fields.len() != component.fields.len()
2260                || curr.fields.len() != component.fields.len()
2261            {
2262                return Err(CodecError::InvalidMask {
2263                    kind: MaskKind::FieldMask {
2264                        component: component.id,
2265                    },
2266                    reason: MaskReason::FieldCountMismatch {
2267                        expected: component.fields.len(),
2268                        actual: base.fields.len().max(curr.fields.len()),
2269                    },
2270                });
2271            }
2272            if component.fields.len() > limits.max_fields_per_component {
2273                return Err(CodecError::LimitsExceeded {
2274                    kind: LimitKind::FieldsPerComponent,
2275                    limit: limits.max_fields_per_component,
2276                    actual: component.fields.len(),
2277                });
2278            }
2279            for ((field, base_val), curr_val) in component
2280                .fields
2281                .iter()
2282                .zip(base.fields.iter())
2283                .zip(curr.fields.iter())
2284            {
2285                if field_changed(component.id, *field, *base_val, *curr_val)? {
2286                    return Ok(true);
2287                }
2288            }
2289        }
2290    }
2291    Ok(false)
2292}
2293
2294fn ensure_component_presence_matches(
2295    schema: &schema::Schema,
2296    baseline: &EntitySnapshot,
2297    current: &EntitySnapshot,
2298) -> CodecResult<()> {
2299    // In this version, component presence is stable across an entity's lifetime.
2300    for component in &schema.components {
2301        let base = find_component(baseline, component.id).is_some();
2302        let curr = find_component(current, component.id).is_some();
2303        if base != curr {
2304            return Err(CodecError::InvalidMask {
2305                kind: MaskKind::ComponentMask,
2306                reason: MaskReason::ComponentPresenceMismatch {
2307                    component: component.id,
2308                },
2309            });
2310        }
2311    }
2312    Ok(())
2313}
2314
2315fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2316    entity.components.iter().find(|c| c.id == id)
2317}
2318
2319fn codec_name(codec: schema::FieldCodec) -> &'static str {
2320    match codec {
2321        schema::FieldCodec::Bool => "bool",
2322        schema::FieldCodec::UInt { .. } => "uint",
2323        schema::FieldCodec::SInt { .. } => "sint",
2324        schema::FieldCodec::VarUInt => "varuint",
2325        schema::FieldCodec::VarSInt => "varsint",
2326        schema::FieldCodec::FixedPoint(_) => "fixed-point",
2327    }
2328}
2329
2330fn value_name(value: FieldValue) -> &'static str {
2331    match value {
2332        FieldValue::Bool(_) => "bool",
2333        FieldValue::UInt(_) => "uint",
2334        FieldValue::SInt(_) => "sint",
2335        FieldValue::VarUInt(_) => "varuint",
2336        FieldValue::VarSInt(_) => "varsint",
2337        FieldValue::FixedPoint(_) => "fixed-point",
2338    }
2339}
2340
2341#[cfg(test)]
2342mod tests {
2343    use super::*;
2344    use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2345
2346    fn schema_one_bool() -> Schema {
2347        let component = ComponentDef::new(ComponentId::new(1).unwrap())
2348            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2349        Schema::new(vec![component]).unwrap()
2350    }
2351
2352    fn schema_uint_threshold(threshold_q: u32) -> Schema {
2353        let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2354            .change(ChangePolicy::Threshold { threshold_q });
2355        let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2356        Schema::new(vec![component]).unwrap()
2357    }
2358
2359    fn schema_two_components() -> Schema {
2360        let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2361            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2362        let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2363            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2364        Schema::new(vec![c1, c2]).unwrap()
2365    }
2366
2367    fn baseline_snapshot() -> Snapshot {
2368        Snapshot {
2369            tick: SnapshotTick::new(10),
2370            entities: vec![EntitySnapshot {
2371                id: EntityId::new(1),
2372                components: vec![ComponentSnapshot {
2373                    id: ComponentId::new(1).unwrap(),
2374                    fields: vec![FieldValue::Bool(false)],
2375                }],
2376            }],
2377        }
2378    }
2379
2380    #[test]
2381    fn no_op_delta_is_empty() {
2382        let schema = schema_one_bool();
2383        let baseline = baseline_snapshot();
2384        let current = baseline.clone();
2385        let mut buf = [0u8; 128];
2386        let bytes = encode_delta_snapshot(
2387            &schema,
2388            SnapshotTick::new(11),
2389            baseline.tick,
2390            &baseline,
2391            &current,
2392            &CodecLimits::for_testing(),
2393            &mut buf,
2394        )
2395        .unwrap();
2396        let header =
2397            wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2398        let mut expected = [0u8; wire::HEADER_SIZE];
2399        encode_header(&header, &mut expected).unwrap();
2400        assert_eq!(&buf[..bytes], expected.as_slice());
2401    }
2402
2403    #[test]
2404    fn delta_roundtrip_single_update() {
2405        let schema = schema_one_bool();
2406        let baseline = baseline_snapshot();
2407        let current = Snapshot {
2408            tick: SnapshotTick::new(11),
2409            entities: vec![EntitySnapshot {
2410                id: EntityId::new(1),
2411                components: vec![ComponentSnapshot {
2412                    id: ComponentId::new(1).unwrap(),
2413                    fields: vec![FieldValue::Bool(true)],
2414                }],
2415            }],
2416        };
2417
2418        let mut buf = [0u8; 128];
2419        let bytes = encode_delta_snapshot(
2420            &schema,
2421            current.tick,
2422            baseline.tick,
2423            &baseline,
2424            &current,
2425            &CodecLimits::for_testing(),
2426            &mut buf,
2427        )
2428        .unwrap();
2429        let applied = apply_delta_snapshot(
2430            &schema,
2431            &baseline,
2432            &buf[..bytes],
2433            &wire::Limits::for_testing(),
2434            &CodecLimits::for_testing(),
2435        )
2436        .unwrap();
2437        assert_eq!(applied.entities, current.entities);
2438    }
2439
2440    #[test]
2441    fn delta_roundtrip_reuse_scratch() {
2442        let schema = schema_one_bool();
2443        let baseline = baseline_snapshot();
2444        let current_one = Snapshot {
2445            tick: SnapshotTick::new(11),
2446            entities: vec![EntitySnapshot {
2447                id: EntityId::new(1),
2448                components: vec![ComponentSnapshot {
2449                    id: ComponentId::new(1).unwrap(),
2450                    fields: vec![FieldValue::Bool(true)],
2451                }],
2452            }],
2453        };
2454        let current_two = Snapshot {
2455            tick: SnapshotTick::new(12),
2456            entities: vec![EntitySnapshot {
2457                id: EntityId::new(1),
2458                components: vec![ComponentSnapshot {
2459                    id: ComponentId::new(1).unwrap(),
2460                    fields: vec![FieldValue::Bool(false)],
2461                }],
2462            }],
2463        };
2464
2465        let mut scratch = CodecScratch::default();
2466        let mut buf_one = [0u8; 128];
2467        let mut buf_two = [0u8; 128];
2468
2469        let bytes_one = encode_delta_snapshot_with_scratch(
2470            &schema,
2471            current_one.tick,
2472            baseline.tick,
2473            &baseline,
2474            &current_one,
2475            &CodecLimits::for_testing(),
2476            &mut scratch,
2477            &mut buf_one,
2478        )
2479        .unwrap();
2480        let applied_one = apply_delta_snapshot(
2481            &schema,
2482            &baseline,
2483            &buf_one[..bytes_one],
2484            &wire::Limits::for_testing(),
2485            &CodecLimits::for_testing(),
2486        )
2487        .unwrap();
2488        assert_eq!(applied_one.entities, current_one.entities);
2489
2490        let bytes_two = encode_delta_snapshot_with_scratch(
2491            &schema,
2492            current_two.tick,
2493            baseline.tick,
2494            &baseline,
2495            &current_two,
2496            &CodecLimits::for_testing(),
2497            &mut scratch,
2498            &mut buf_two,
2499        )
2500        .unwrap();
2501        let applied_two = apply_delta_snapshot(
2502            &schema,
2503            &baseline,
2504            &buf_two[..bytes_two],
2505            &wire::Limits::for_testing(),
2506            &CodecLimits::for_testing(),
2507        )
2508        .unwrap();
2509        assert_eq!(applied_two.entities, current_two.entities);
2510    }
2511
2512    #[test]
2513    fn delta_rejects_both_update_encodings() {
2514        let schema = schema_one_bool();
2515        let baseline = baseline_snapshot();
2516        let update_body = [0u8; 1];
2517        let mut update_section = [0u8; 8];
2518        let update_len =
2519            wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2520                .unwrap();
2521
2522        let mut sparse_section = [0u8; 8];
2523        let sparse_len = wire::encode_section(
2524            SectionTag::EntityUpdateSparse,
2525            &update_body,
2526            &mut sparse_section,
2527        )
2528        .unwrap();
2529
2530        let payload_len = (update_len + sparse_len) as u32;
2531        let header = wire::PacketHeader::delta_snapshot(
2532            schema_hash(&schema),
2533            baseline.tick.raw() + 1,
2534            baseline.tick.raw(),
2535            payload_len,
2536        );
2537        let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2538        wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2539        buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2540            .copy_from_slice(&update_section[..update_len]);
2541        buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2542            .copy_from_slice(&sparse_section[..sparse_len]);
2543
2544        let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2545        let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2546        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2547
2548        let err = apply_delta_snapshot_from_packet(
2549            &schema,
2550            &baseline,
2551            &packet,
2552            &CodecLimits::for_testing(),
2553        )
2554        .unwrap_err();
2555        assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2556    }
2557
2558    #[test]
2559    fn delta_roundtrip_create_destroy_update() {
2560        let schema = schema_one_bool();
2561        let baseline = Snapshot {
2562            tick: SnapshotTick::new(10),
2563            entities: vec![
2564                EntitySnapshot {
2565                    id: EntityId::new(1),
2566                    components: vec![ComponentSnapshot {
2567                        id: ComponentId::new(1).unwrap(),
2568                        fields: vec![FieldValue::Bool(false)],
2569                    }],
2570                },
2571                EntitySnapshot {
2572                    id: EntityId::new(2),
2573                    components: vec![ComponentSnapshot {
2574                        id: ComponentId::new(1).unwrap(),
2575                        fields: vec![FieldValue::Bool(false)],
2576                    }],
2577                },
2578            ],
2579        };
2580        let current = Snapshot {
2581            tick: SnapshotTick::new(11),
2582            entities: vec![
2583                EntitySnapshot {
2584                    id: EntityId::new(2),
2585                    components: vec![ComponentSnapshot {
2586                        id: ComponentId::new(1).unwrap(),
2587                        fields: vec![FieldValue::Bool(true)],
2588                    }],
2589                },
2590                EntitySnapshot {
2591                    id: EntityId::new(3),
2592                    components: vec![ComponentSnapshot {
2593                        id: ComponentId::new(1).unwrap(),
2594                        fields: vec![FieldValue::Bool(true)],
2595                    }],
2596                },
2597            ],
2598        };
2599
2600        let mut buf = [0u8; 256];
2601        let bytes = encode_delta_snapshot(
2602            &schema,
2603            current.tick,
2604            baseline.tick,
2605            &baseline,
2606            &current,
2607            &CodecLimits::for_testing(),
2608            &mut buf,
2609        )
2610        .unwrap();
2611        let applied = apply_delta_snapshot(
2612            &schema,
2613            &baseline,
2614            &buf[..bytes],
2615            &wire::Limits::for_testing(),
2616            &CodecLimits::for_testing(),
2617        )
2618        .unwrap();
2619        assert_eq!(applied.entities, current.entities);
2620    }
2621
2622    #[test]
2623    fn delta_session_header_matches_payload() {
2624        let schema = schema_one_bool();
2625        let baseline = baseline_snapshot();
2626        let current = Snapshot {
2627            tick: SnapshotTick::new(11),
2628            entities: vec![EntitySnapshot {
2629                id: EntityId::new(1),
2630                components: vec![ComponentSnapshot {
2631                    id: ComponentId::new(1).unwrap(),
2632                    fields: vec![FieldValue::Bool(true)],
2633                }],
2634            }],
2635        };
2636
2637        let mut full_buf = [0u8; 128];
2638        let full_bytes = encode_delta_snapshot_for_client(
2639            &schema,
2640            current.tick,
2641            baseline.tick,
2642            &baseline,
2643            &current,
2644            &CodecLimits::for_testing(),
2645            &mut full_buf,
2646        )
2647        .unwrap();
2648        let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
2649
2650        let mut session_buf = [0u8; 128];
2651        let mut last_tick = baseline.tick;
2652        let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
2653            &schema,
2654            current.tick,
2655            baseline.tick,
2656            &baseline,
2657            &current,
2658            &CodecLimits::for_testing(),
2659            &mut CodecScratch::default(),
2660            &mut last_tick,
2661            &mut session_buf,
2662        )
2663        .unwrap();
2664
2665        let session_header =
2666            wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
2667                .unwrap();
2668        let payload_start = session_header.header_len;
2669        let payload_end = payload_start + session_header.payload_len as usize;
2670        let session_payload = &session_buf[payload_start..payload_end];
2671        assert_eq!(full_payload, session_payload);
2672    }
2673
2674    #[test]
2675    fn delta_roundtrip_single_component_change() {
2676        let schema = schema_two_components();
2677        let baseline = Snapshot {
2678            tick: SnapshotTick::new(10),
2679            entities: vec![EntitySnapshot {
2680                id: EntityId::new(1),
2681                components: vec![
2682                    ComponentSnapshot {
2683                        id: ComponentId::new(1).unwrap(),
2684                        fields: vec![FieldValue::Bool(false)],
2685                    },
2686                    ComponentSnapshot {
2687                        id: ComponentId::new(2).unwrap(),
2688                        fields: vec![FieldValue::Bool(false)],
2689                    },
2690                ],
2691            }],
2692        };
2693        let current = Snapshot {
2694            tick: SnapshotTick::new(11),
2695            entities: vec![EntitySnapshot {
2696                id: EntityId::new(1),
2697                components: vec![
2698                    ComponentSnapshot {
2699                        id: ComponentId::new(1).unwrap(),
2700                        fields: vec![FieldValue::Bool(true)],
2701                    },
2702                    ComponentSnapshot {
2703                        id: ComponentId::new(2).unwrap(),
2704                        fields: vec![FieldValue::Bool(false)],
2705                    },
2706                ],
2707            }],
2708        };
2709
2710        let mut buf = [0u8; 256];
2711        let bytes = encode_delta_snapshot(
2712            &schema,
2713            current.tick,
2714            baseline.tick,
2715            &baseline,
2716            &current,
2717            &CodecLimits::for_testing(),
2718            &mut buf,
2719        )
2720        .unwrap();
2721        let applied = apply_delta_snapshot(
2722            &schema,
2723            &baseline,
2724            &buf[..bytes],
2725            &wire::Limits::for_testing(),
2726            &CodecLimits::for_testing(),
2727        )
2728        .unwrap();
2729        assert_eq!(applied.entities, current.entities);
2730    }
2731
2732    #[test]
2733    fn baseline_tick_mismatch_is_error() {
2734        let schema = schema_one_bool();
2735        let baseline = baseline_snapshot();
2736        let current = baseline.clone();
2737        let mut buf = [0u8; 128];
2738        let bytes = encode_delta_snapshot(
2739            &schema,
2740            SnapshotTick::new(11),
2741            baseline.tick,
2742            &baseline,
2743            &current,
2744            &CodecLimits::for_testing(),
2745            &mut buf,
2746        )
2747        .unwrap();
2748        let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
2749        packet.header.baseline_tick = 999;
2750        wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2751        let err = apply_delta_snapshot(
2752            &schema,
2753            &baseline,
2754            &buf[..bytes],
2755            &wire::Limits::for_testing(),
2756            &CodecLimits::for_testing(),
2757        )
2758        .unwrap_err();
2759        assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
2760    }
2761
2762    #[test]
2763    fn threshold_suppresses_small_change() {
2764        let schema = schema_uint_threshold(5);
2765        let baseline = Snapshot {
2766            tick: SnapshotTick::new(10),
2767            entities: vec![EntitySnapshot {
2768                id: EntityId::new(1),
2769                components: vec![ComponentSnapshot {
2770                    id: ComponentId::new(1).unwrap(),
2771                    fields: vec![FieldValue::UInt(10)],
2772                }],
2773            }],
2774        };
2775        let current = Snapshot {
2776            tick: SnapshotTick::new(11),
2777            entities: vec![EntitySnapshot {
2778                id: EntityId::new(1),
2779                components: vec![ComponentSnapshot {
2780                    id: ComponentId::new(1).unwrap(),
2781                    fields: vec![FieldValue::UInt(12)],
2782                }],
2783            }],
2784        };
2785
2786        let mut buf = [0u8; 128];
2787        let bytes = encode_delta_snapshot(
2788            &schema,
2789            current.tick,
2790            baseline.tick,
2791            &baseline,
2792            &current,
2793            &CodecLimits::for_testing(),
2794            &mut buf,
2795        )
2796        .unwrap();
2797
2798        let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
2799        assert_eq!(packet.sections.len(), 0);
2800
2801        let applied = apply_delta_snapshot(
2802            &schema,
2803            &baseline,
2804            &buf[..bytes],
2805            &wire::Limits::for_testing(),
2806            &CodecLimits::for_testing(),
2807        )
2808        .unwrap();
2809        assert_eq!(applied.entities, baseline.entities);
2810    }
2811}