Skip to main content

codec/
delta.rs

1//! Delta snapshot encoding/decoding.
2
3use bitstream::{BitReader, BitWriter};
4use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
5use wire::{decode_packet, encode_header, SectionTag, WirePacket};
6
7use crate::baseline::BaselineStore;
8use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
9use crate::limits::CodecLimits;
10use crate::scratch::CodecScratch;
11use crate::snapshot::{
12    ensure_known_components, read_field_value, read_mask, write_field_value, write_section,
13    ComponentSnapshot, EntitySnapshot, FieldValue, Snapshot,
14};
15use crate::types::{EntityId, SnapshotTick};
16
17/// Selects the latest baseline tick at or before the ack tick.
18#[must_use]
19pub fn select_baseline_tick<T>(
20    store: &BaselineStore<T>,
21    ack_tick: SnapshotTick,
22) -> Option<SnapshotTick> {
23    store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
24}
25
26/// Encodes a delta snapshot into the provided output buffer.
27///
28/// Baseline and current snapshots must have entities sorted by `EntityId`.
29pub fn encode_delta_snapshot(
30    schema: &schema::Schema,
31    tick: SnapshotTick,
32    baseline_tick: SnapshotTick,
33    baseline: &Snapshot,
34    current: &Snapshot,
35    limits: &CodecLimits,
36    out: &mut [u8],
37) -> CodecResult<usize> {
38    let mut scratch = CodecScratch::default();
39    encode_delta_snapshot_with_scratch(
40        schema,
41        tick,
42        baseline_tick,
43        baseline,
44        current,
45        limits,
46        &mut scratch,
47        out,
48    )
49}
50
51/// Encodes a delta snapshot using reusable scratch buffers.
52#[allow(clippy::too_many_arguments)]
53pub fn encode_delta_snapshot_with_scratch(
54    schema: &schema::Schema,
55    tick: SnapshotTick,
56    baseline_tick: SnapshotTick,
57    baseline: &Snapshot,
58    current: &Snapshot,
59    limits: &CodecLimits,
60    scratch: &mut CodecScratch,
61    out: &mut [u8],
62) -> CodecResult<usize> {
63    if out.len() < wire::HEADER_SIZE {
64        return Err(CodecError::OutputTooSmall {
65            needed: wire::HEADER_SIZE,
66            available: out.len(),
67        });
68    }
69
70    if baseline.tick != baseline_tick {
71        return Err(CodecError::BaselineTickMismatch {
72            expected: baseline.tick.raw(),
73            found: baseline_tick.raw(),
74        });
75    }
76
77    ensure_entities_sorted(&baseline.entities)?;
78    ensure_entities_sorted(&current.entities)?;
79
80    let mut counts = DiffCounts::default();
81    diff_counts(schema, baseline, current, limits, &mut counts)?;
82
83    if counts.creates > limits.max_entities_create {
84        return Err(CodecError::LimitsExceeded {
85            kind: LimitKind::EntitiesCreate,
86            limit: limits.max_entities_create,
87            actual: counts.creates,
88        });
89    }
90    if counts.updates > limits.max_entities_update {
91        return Err(CodecError::LimitsExceeded {
92            kind: LimitKind::EntitiesUpdate,
93            limit: limits.max_entities_update,
94            actual: counts.updates,
95        });
96    }
97    if counts.destroys > limits.max_entities_destroy {
98        return Err(CodecError::LimitsExceeded {
99            kind: LimitKind::EntitiesDestroy,
100            limit: limits.max_entities_destroy,
101            actual: counts.destroys,
102        });
103    }
104
105    let mut offset = wire::HEADER_SIZE;
106    if counts.destroys > 0 {
107        let written = write_section(
108            SectionTag::EntityDestroy,
109            &mut out[offset..],
110            limits,
111            |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
112        )?;
113        offset += written;
114    }
115    if counts.creates > 0 {
116        let written = write_section(
117            SectionTag::EntityCreate,
118            &mut out[offset..],
119            limits,
120            |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
121        )?;
122        offset += written;
123    }
124    if counts.updates > 0 {
125        let written = write_section(
126            SectionTag::EntityUpdate,
127            &mut out[offset..],
128            limits,
129            |writer| {
130                encode_update_body(
131                    schema,
132                    baseline,
133                    current,
134                    counts.updates,
135                    limits,
136                    scratch,
137                    writer,
138                )
139            },
140        )?;
141        offset += written;
142    }
143
144    let payload_len = offset - wire::HEADER_SIZE;
145    let header = wire::PacketHeader::delta_snapshot(
146        schema_hash(schema),
147        tick.raw(),
148        baseline_tick.raw(),
149        payload_len as u32,
150    );
151    encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
152        CodecError::OutputTooSmall {
153            needed: wire::HEADER_SIZE,
154            available: out.len(),
155        }
156    })?;
157
158    Ok(offset)
159}
160
161/// Applies a delta snapshot to a baseline snapshot.
162pub fn apply_delta_snapshot(
163    schema: &schema::Schema,
164    baseline: &Snapshot,
165    bytes: &[u8],
166    wire_limits: &wire::Limits,
167    limits: &CodecLimits,
168) -> CodecResult<Snapshot> {
169    let packet = decode_packet(bytes, wire_limits)?;
170    apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
171}
172
173/// Applies a delta snapshot from a parsed wire packet.
174pub fn apply_delta_snapshot_from_packet(
175    schema: &schema::Schema,
176    baseline: &Snapshot,
177    packet: &WirePacket<'_>,
178    limits: &CodecLimits,
179) -> CodecResult<Snapshot> {
180    let header = packet.header;
181    if !header.flags.is_delta_snapshot() {
182        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
183            flags: header.flags.raw(),
184        }));
185    }
186    if header.baseline_tick == 0 {
187        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
188            baseline_tick: header.baseline_tick,
189            flags: header.flags.raw(),
190        }));
191    }
192    if header.baseline_tick != baseline.tick.raw() {
193        return Err(CodecError::BaselineTickMismatch {
194            expected: baseline.tick.raw(),
195            found: header.baseline_tick,
196        });
197    }
198
199    let expected_hash = schema_hash(schema);
200    if header.schema_hash != expected_hash {
201        return Err(CodecError::SchemaMismatch {
202            expected: expected_hash,
203            found: header.schema_hash,
204        });
205    }
206
207    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
208
209    ensure_entities_sorted(&baseline.entities)?;
210    ensure_entities_sorted(&creates)?;
211
212    let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
213    remaining = apply_creates(remaining, creates)?;
214    if remaining.len() > limits.max_total_entities_after_apply {
215        return Err(CodecError::LimitsExceeded {
216            kind: LimitKind::TotalEntitiesAfterApply,
217            limit: limits.max_total_entities_after_apply,
218            actual: remaining.len(),
219        });
220    }
221    apply_updates(&mut remaining, &updates)?;
222
223    Ok(Snapshot {
224        tick: SnapshotTick::new(header.tick),
225        entities: remaining,
226    })
227}
228
229/// Decodes a delta packet without applying it to a baseline.
230pub fn decode_delta_packet(
231    schema: &schema::Schema,
232    packet: &WirePacket<'_>,
233    limits: &CodecLimits,
234) -> CodecResult<DeltaDecoded> {
235    let header = packet.header;
236    if !header.flags.is_delta_snapshot() {
237        return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
238            flags: header.flags.raw(),
239        }));
240    }
241    if header.baseline_tick == 0 {
242        return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
243            baseline_tick: header.baseline_tick,
244            flags: header.flags.raw(),
245        }));
246    }
247    let expected_hash = schema_hash(schema);
248    if header.schema_hash != expected_hash {
249        return Err(CodecError::SchemaMismatch {
250            expected: expected_hash,
251            found: header.schema_hash,
252        });
253    }
254
255    let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
256
257    Ok(DeltaDecoded {
258        tick: SnapshotTick::new(header.tick),
259        baseline_tick: SnapshotTick::new(header.baseline_tick),
260        destroys,
261        creates,
262        updates,
263    })
264}
265
266#[derive(Default)]
267struct DiffCounts {
268    creates: usize,
269    updates: usize,
270    destroys: usize,
271}
272
273fn diff_counts(
274    schema: &schema::Schema,
275    baseline: &Snapshot,
276    current: &Snapshot,
277    limits: &CodecLimits,
278    counts: &mut DiffCounts,
279) -> CodecResult<()> {
280    let mut i = 0usize;
281    let mut j = 0usize;
282    while i < baseline.entities.len() || j < current.entities.len() {
283        let base = baseline.entities.get(i);
284        let curr = current.entities.get(j);
285        match (base, curr) {
286            (Some(b), Some(c)) => {
287                if b.id.raw() < c.id.raw() {
288                    counts.destroys += 1;
289                    i += 1;
290                } else if b.id.raw() > c.id.raw() {
291                    counts.creates += 1;
292                    j += 1;
293                } else {
294                    if entity_has_updates(schema, b, c, limits)? {
295                        counts.updates += 1;
296                    }
297                    i += 1;
298                    j += 1;
299                }
300            }
301            (Some(_), None) => {
302                counts.destroys += 1;
303                i += 1;
304            }
305            (None, Some(_)) => {
306                counts.creates += 1;
307                j += 1;
308            }
309            (None, None) => break,
310        }
311    }
312    Ok(())
313}
314
315fn encode_destroy_body(
316    baseline: &Snapshot,
317    current: &Snapshot,
318    destroy_count: usize,
319    limits: &CodecLimits,
320    writer: &mut BitWriter<'_>,
321) -> CodecResult<()> {
322    if destroy_count > limits.max_entities_destroy {
323        return Err(CodecError::LimitsExceeded {
324            kind: LimitKind::EntitiesDestroy,
325            limit: limits.max_entities_destroy,
326            actual: destroy_count,
327        });
328    }
329
330    writer.align_to_byte()?;
331    writer.write_varu32(destroy_count as u32)?;
332
333    let mut i = 0usize;
334    let mut j = 0usize;
335    while i < baseline.entities.len() || j < current.entities.len() {
336        let base = baseline.entities.get(i);
337        let curr = current.entities.get(j);
338        match (base, curr) {
339            (Some(b), Some(c)) => {
340                if b.id.raw() < c.id.raw() {
341                    writer.align_to_byte()?;
342                    writer.write_u32_aligned(b.id.raw())?;
343                    i += 1;
344                } else if b.id.raw() > c.id.raw() {
345                    j += 1;
346                } else {
347                    i += 1;
348                    j += 1;
349                }
350            }
351            (Some(b), None) => {
352                writer.align_to_byte()?;
353                writer.write_u32_aligned(b.id.raw())?;
354                i += 1;
355            }
356            (None, Some(_)) => {
357                j += 1;
358            }
359            (None, None) => break,
360        }
361    }
362
363    writer.align_to_byte()?;
364    Ok(())
365}
366
367fn encode_create_body(
368    schema: &schema::Schema,
369    baseline: &Snapshot,
370    current: &Snapshot,
371    create_count: usize,
372    limits: &CodecLimits,
373    writer: &mut BitWriter<'_>,
374) -> CodecResult<()> {
375    if create_count > limits.max_entities_create {
376        return Err(CodecError::LimitsExceeded {
377            kind: LimitKind::EntitiesCreate,
378            limit: limits.max_entities_create,
379            actual: create_count,
380        });
381    }
382
383    writer.align_to_byte()?;
384    writer.write_varu32(create_count as u32)?;
385
386    let mut i = 0usize;
387    let mut j = 0usize;
388    while i < baseline.entities.len() || j < current.entities.len() {
389        let base = baseline.entities.get(i);
390        let curr = current.entities.get(j);
391        match (base, curr) {
392            (Some(b), Some(c)) => {
393                if b.id.raw() < c.id.raw() {
394                    i += 1;
395                } else if b.id.raw() > c.id.raw() {
396                    write_create_entity(schema, c, limits, writer)?;
397                    j += 1;
398                } else {
399                    i += 1;
400                    j += 1;
401                }
402            }
403            (Some(_), None) => {
404                i += 1;
405            }
406            (None, Some(c)) => {
407                write_create_entity(schema, c, limits, writer)?;
408                j += 1;
409            }
410            (None, None) => break,
411        }
412    }
413
414    writer.align_to_byte()?;
415    Ok(())
416}
417
418fn encode_update_body(
419    schema: &schema::Schema,
420    baseline: &Snapshot,
421    current: &Snapshot,
422    update_count: usize,
423    limits: &CodecLimits,
424    scratch: &mut CodecScratch,
425    writer: &mut BitWriter<'_>,
426) -> CodecResult<()> {
427    if update_count > limits.max_entities_update {
428        return Err(CodecError::LimitsExceeded {
429            kind: LimitKind::EntitiesUpdate,
430            limit: limits.max_entities_update,
431            actual: update_count,
432        });
433    }
434
435    writer.align_to_byte()?;
436    writer.write_varu32(update_count as u32)?;
437
438    let mut i = 0usize;
439    let mut j = 0usize;
440    while i < baseline.entities.len() || j < current.entities.len() {
441        let base = baseline.entities.get(i);
442        let curr = current.entities.get(j);
443        match (base, curr) {
444            (Some(b), Some(c)) => {
445                if b.id.raw() < c.id.raw() {
446                    i += 1;
447                } else if b.id.raw() > c.id.raw() {
448                    j += 1;
449                } else {
450                    if entity_has_updates(schema, b, c, limits)? {
451                        writer.align_to_byte()?;
452                        writer.write_u32_aligned(c.id.raw())?;
453                        ensure_component_presence_matches(schema, b, c)?;
454                        write_update_components(schema, b, c, limits, scratch, writer)?;
455                    }
456                    i += 1;
457                    j += 1;
458                }
459            }
460            (Some(_), None) => i += 1,
461            (None, Some(_)) => j += 1,
462            (None, None) => break,
463        }
464    }
465
466    writer.align_to_byte()?;
467    Ok(())
468}
469
470fn write_create_entity(
471    schema: &schema::Schema,
472    entity: &EntitySnapshot,
473    limits: &CodecLimits,
474    writer: &mut BitWriter<'_>,
475) -> CodecResult<()> {
476    writer.align_to_byte()?;
477    writer.write_u32_aligned(entity.id.raw())?;
478    ensure_known_components(schema, entity)?;
479    write_component_mask(schema, entity, writer)?;
480    for component in schema.components.iter() {
481        if let Some(snapshot) = find_component(entity, component.id) {
482            write_full_component(component, snapshot, limits, writer)?;
483        }
484    }
485    Ok(())
486}
487
488fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
489    if body.len() > limits.max_section_bytes {
490        return Err(CodecError::LimitsExceeded {
491            kind: LimitKind::SectionBytes,
492            limit: limits.max_section_bytes,
493            actual: body.len(),
494        });
495    }
496
497    let mut reader = BitReader::new(body);
498    reader.align_to_byte()?;
499    let count = reader.read_varu32()? as usize;
500    if count > limits.max_entities_destroy {
501        return Err(CodecError::LimitsExceeded {
502            kind: LimitKind::EntitiesDestroy,
503            limit: limits.max_entities_destroy,
504            actual: count,
505        });
506    }
507
508    let mut ids = Vec::with_capacity(count);
509    let mut prev: Option<u32> = None;
510    for _ in 0..count {
511        reader.align_to_byte()?;
512        let id = reader.read_u32_aligned()?;
513        if let Some(prev_id) = prev {
514            if id <= prev_id {
515                return Err(CodecError::InvalidEntityOrder {
516                    previous: prev_id,
517                    current: id,
518                });
519            }
520        }
521        prev = Some(id);
522        ids.push(EntityId::new(id));
523    }
524    reader.align_to_byte()?;
525    if reader.bits_remaining() != 0 {
526        return Err(CodecError::TrailingSectionData {
527            section: SectionTag::EntityDestroy,
528            remaining_bits: reader.bits_remaining(),
529        });
530    }
531    Ok(ids)
532}
533
534fn decode_create_section(
535    schema: &schema::Schema,
536    body: &[u8],
537    limits: &CodecLimits,
538) -> CodecResult<Vec<EntitySnapshot>> {
539    if body.len() > limits.max_section_bytes {
540        return Err(CodecError::LimitsExceeded {
541            kind: LimitKind::SectionBytes,
542            limit: limits.max_section_bytes,
543            actual: body.len(),
544        });
545    }
546
547    let mut reader = BitReader::new(body);
548    reader.align_to_byte()?;
549    let count = reader.read_varu32()? as usize;
550    if count > limits.max_entities_create {
551        return Err(CodecError::LimitsExceeded {
552            kind: LimitKind::EntitiesCreate,
553            limit: limits.max_entities_create,
554            actual: count,
555        });
556    }
557
558    let mut entities = Vec::with_capacity(count);
559    let mut prev: Option<u32> = None;
560    for _ in 0..count {
561        reader.align_to_byte()?;
562        let id = reader.read_u32_aligned()?;
563        if let Some(prev_id) = prev {
564            if id <= prev_id {
565                return Err(CodecError::InvalidEntityOrder {
566                    previous: prev_id,
567                    current: id,
568                });
569            }
570        }
571        prev = Some(id);
572
573        let component_mask = read_mask(
574            &mut reader,
575            schema.components.len(),
576            MaskKind::ComponentMask,
577        )?;
578
579        let mut components = Vec::new();
580        for (idx, component) in schema.components.iter().enumerate() {
581            if component_mask[idx] {
582                let fields = decode_full_component(component, &mut reader, limits)?;
583                components.push(ComponentSnapshot {
584                    id: component.id,
585                    fields,
586                });
587            }
588        }
589
590        let entity = EntitySnapshot {
591            id: EntityId::new(id),
592            components,
593        };
594        ensure_known_components(schema, &entity)?;
595        entities.push(entity);
596    }
597
598    reader.align_to_byte()?;
599    if reader.bits_remaining() != 0 {
600        return Err(CodecError::TrailingSectionData {
601            section: SectionTag::EntityCreate,
602            remaining_bits: reader.bits_remaining(),
603        });
604    }
605    Ok(entities)
606}
607
608fn decode_update_section(
609    schema: &schema::Schema,
610    body: &[u8],
611    limits: &CodecLimits,
612) -> CodecResult<Vec<DeltaUpdateEntity>> {
613    if body.len() > limits.max_section_bytes {
614        return Err(CodecError::LimitsExceeded {
615            kind: LimitKind::SectionBytes,
616            limit: limits.max_section_bytes,
617            actual: body.len(),
618        });
619    }
620
621    let mut reader = BitReader::new(body);
622    reader.align_to_byte()?;
623    let count = reader.read_varu32()? as usize;
624    if count > limits.max_entities_update {
625        return Err(CodecError::LimitsExceeded {
626            kind: LimitKind::EntitiesUpdate,
627            limit: limits.max_entities_update,
628            actual: count,
629        });
630    }
631
632    let mut updates = Vec::with_capacity(count);
633    let mut prev: Option<u32> = None;
634    for _ in 0..count {
635        reader.align_to_byte()?;
636        let id = reader.read_u32_aligned()?;
637        if let Some(prev_id) = prev {
638            if id <= prev_id {
639                return Err(CodecError::InvalidEntityOrder {
640                    previous: prev_id,
641                    current: id,
642                });
643            }
644        }
645        prev = Some(id);
646
647        let component_mask = read_mask(
648            &mut reader,
649            schema.components.len(),
650            MaskKind::ComponentMask,
651        )?;
652        let mut components = Vec::new();
653        for (idx, component) in schema.components.iter().enumerate() {
654            if component_mask[idx] {
655                let fields = decode_update_component(component, &mut reader, limits)?;
656                components.push(DeltaUpdateComponent {
657                    id: component.id,
658                    fields,
659                });
660            }
661        }
662
663        updates.push(DeltaUpdateEntity {
664            id: EntityId::new(id),
665            components,
666        });
667    }
668
669    reader.align_to_byte()?;
670    if reader.bits_remaining() != 0 {
671        return Err(CodecError::TrailingSectionData {
672            section: SectionTag::EntityUpdate,
673            remaining_bits: reader.bits_remaining(),
674        });
675    }
676    Ok(updates)
677}
678
679fn decode_delta_sections(
680    schema: &schema::Schema,
681    packet: &WirePacket<'_>,
682    limits: &CodecLimits,
683) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
684    let mut destroys: Option<Vec<EntityId>> = None;
685    let mut creates: Option<Vec<EntitySnapshot>> = None;
686    let mut updates: Option<Vec<DeltaUpdateEntity>> = None;
687
688    for section in &packet.sections {
689        match section.tag {
690            SectionTag::EntityDestroy => {
691                if destroys.is_some() {
692                    return Err(CodecError::DuplicateSection {
693                        section: section.tag,
694                    });
695                }
696                destroys = Some(decode_destroy_section(section.body, limits)?);
697            }
698            SectionTag::EntityCreate => {
699                if creates.is_some() {
700                    return Err(CodecError::DuplicateSection {
701                        section: section.tag,
702                    });
703                }
704                creates = Some(decode_create_section(schema, section.body, limits)?);
705            }
706            SectionTag::EntityUpdate => {
707                if updates.is_some() {
708                    return Err(CodecError::DuplicateSection {
709                        section: section.tag,
710                    });
711                }
712                updates = Some(decode_update_section(schema, section.body, limits)?);
713            }
714            _ => {
715                return Err(CodecError::UnexpectedSection {
716                    section: section.tag,
717                });
718            }
719        }
720    }
721
722    Ok((
723        destroys.unwrap_or_default(),
724        creates.unwrap_or_default(),
725        updates.unwrap_or_default(),
726    ))
727}
728
729#[derive(Debug, Clone, PartialEq, Eq)]
730pub struct DeltaDecoded {
731    pub tick: SnapshotTick,
732    pub baseline_tick: SnapshotTick,
733    pub destroys: Vec<EntityId>,
734    pub creates: Vec<EntitySnapshot>,
735    pub updates: Vec<DeltaUpdateEntity>,
736}
737
738#[derive(Debug, Clone, PartialEq, Eq)]
739pub struct DeltaUpdateEntity {
740    pub id: EntityId,
741    pub components: Vec<DeltaUpdateComponent>,
742}
743
744#[derive(Debug, Clone, PartialEq, Eq)]
745pub struct DeltaUpdateComponent {
746    pub id: ComponentId,
747    pub fields: Vec<(usize, FieldValue)>,
748}
749
750fn apply_destroys(
751    baseline: &[EntitySnapshot],
752    destroys: &[EntityId],
753) -> CodecResult<Vec<EntitySnapshot>> {
754    let mut result = Vec::with_capacity(baseline.len());
755    let mut i = 0usize;
756    let mut j = 0usize;
757    while i < baseline.len() || j < destroys.len() {
758        let base = baseline.get(i);
759        let destroy = destroys.get(j);
760        match (base, destroy) {
761            (Some(b), Some(d)) => {
762                if b.id.raw() < d.raw() {
763                    result.push(b.clone());
764                    i += 1;
765                } else if b.id.raw() > d.raw() {
766                    return Err(CodecError::EntityNotFound { entity_id: d.raw() });
767                } else {
768                    i += 1;
769                    j += 1;
770                }
771            }
772            (Some(b), None) => {
773                result.push(b.clone());
774                i += 1;
775            }
776            (None, Some(d)) => {
777                return Err(CodecError::EntityNotFound { entity_id: d.raw() });
778            }
779            (None, None) => break,
780        }
781    }
782    Ok(result)
783}
784
785fn apply_creates(
786    baseline: Vec<EntitySnapshot>,
787    creates: Vec<EntitySnapshot>,
788) -> CodecResult<Vec<EntitySnapshot>> {
789    let mut result = Vec::with_capacity(baseline.len() + creates.len());
790    let mut i = 0usize;
791    let mut j = 0usize;
792    while i < baseline.len() || j < creates.len() {
793        let base = baseline.get(i);
794        let create = creates.get(j);
795        match (base, create) {
796            (Some(b), Some(c)) => {
797                if b.id.raw() < c.id.raw() {
798                    result.push(b.clone());
799                    i += 1;
800                } else if b.id.raw() > c.id.raw() {
801                    result.push(c.clone());
802                    j += 1;
803                } else {
804                    return Err(CodecError::EntityAlreadyExists {
805                        entity_id: c.id.raw(),
806                    });
807                }
808            }
809            (Some(b), None) => {
810                result.push(b.clone());
811                i += 1;
812            }
813            (None, Some(c)) => {
814                result.push(c.clone());
815                j += 1;
816            }
817            (None, None) => break,
818        }
819    }
820    Ok(result)
821}
822
823fn apply_updates(
824    entities: &mut [EntitySnapshot],
825    updates: &[DeltaUpdateEntity],
826) -> CodecResult<()> {
827    for update in updates {
828        let idx = entities
829            .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
830            .map_err(|_| CodecError::EntityNotFound {
831                entity_id: update.id.raw(),
832            })?;
833        let entity = &mut entities[idx];
834        for component_update in &update.components {
835            let component = entity
836                .components
837                .iter_mut()
838                .find(|c| c.id == component_update.id)
839                .ok_or_else(|| CodecError::ComponentNotFound {
840                    entity_id: update.id.raw(),
841                    component_id: component_update.id.get(),
842                })?;
843            for (field_idx, value) in &component_update.fields {
844                if *field_idx >= component.fields.len() {
845                    return Err(CodecError::InvalidMask {
846                        kind: MaskKind::FieldMask {
847                            component: component_update.id,
848                        },
849                        reason: MaskReason::FieldCountMismatch {
850                            expected: component.fields.len(),
851                            actual: *field_idx + 1,
852                        },
853                    });
854                }
855                component.fields[*field_idx] = *value;
856            }
857        }
858    }
859    Ok(())
860}
861
862fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
863    let mut prev: Option<u32> = None;
864    for entity in entities {
865        if let Some(prev_id) = prev {
866            if entity.id.raw() <= prev_id {
867                return Err(CodecError::InvalidEntityOrder {
868                    previous: prev_id,
869                    current: entity.id.raw(),
870                });
871            }
872        }
873        prev = Some(entity.id.raw());
874    }
875    Ok(())
876}
877
878fn write_component_mask(
879    schema: &schema::Schema,
880    entity: &EntitySnapshot,
881    writer: &mut BitWriter<'_>,
882) -> CodecResult<()> {
883    for component in &schema.components {
884        let present = find_component(entity, component.id).is_some();
885        writer.write_bit(present)?;
886    }
887    Ok(())
888}
889
890fn write_full_component(
891    component: &ComponentDef,
892    snapshot: &ComponentSnapshot,
893    limits: &CodecLimits,
894    writer: &mut BitWriter<'_>,
895) -> CodecResult<()> {
896    if snapshot.fields.len() != component.fields.len() {
897        return Err(CodecError::InvalidMask {
898            kind: MaskKind::FieldMask {
899                component: component.id,
900            },
901            reason: MaskReason::FieldCountMismatch {
902                expected: component.fields.len(),
903                actual: snapshot.fields.len(),
904            },
905        });
906    }
907    if snapshot.fields.len() > limits.max_fields_per_component {
908        return Err(CodecError::LimitsExceeded {
909            kind: LimitKind::FieldsPerComponent,
910            limit: limits.max_fields_per_component,
911            actual: snapshot.fields.len(),
912        });
913    }
914
915    for _ in &component.fields {
916        writer.write_bit(true)?;
917    }
918    for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
919        write_field_value(component.id, *field, *value, writer)?;
920    }
921    Ok(())
922}
923
924fn decode_full_component(
925    component: &ComponentDef,
926    reader: &mut BitReader<'_>,
927    limits: &CodecLimits,
928) -> CodecResult<Vec<FieldValue>> {
929    if component.fields.len() > limits.max_fields_per_component {
930        return Err(CodecError::LimitsExceeded {
931            kind: LimitKind::FieldsPerComponent,
932            limit: limits.max_fields_per_component,
933            actual: component.fields.len(),
934        });
935    }
936
937    let mask = read_mask(
938        reader,
939        component.fields.len(),
940        MaskKind::FieldMask {
941            component: component.id,
942        },
943    )?;
944    let mut values = Vec::with_capacity(component.fields.len());
945    for (idx, field) in component.fields.iter().enumerate() {
946        if !mask[idx] {
947            return Err(CodecError::InvalidMask {
948                kind: MaskKind::FieldMask {
949                    component: component.id,
950                },
951                reason: MaskReason::MissingField { field: field.id },
952            });
953        }
954        values.push(read_field_value(component.id, *field, reader)?);
955    }
956    Ok(values)
957}
958
959fn write_update_components(
960    schema: &schema::Schema,
961    baseline: &EntitySnapshot,
962    current: &EntitySnapshot,
963    limits: &CodecLimits,
964    scratch: &mut CodecScratch,
965    writer: &mut BitWriter<'_>,
966) -> CodecResult<()> {
967    let component_count = schema.components.len();
968    let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
969    component_changed.fill(false);
970    for (idx, component) in schema.components.iter().enumerate() {
971        let base = find_component(baseline, component.id);
972        let curr = find_component(current, component.id);
973        if base.is_some() != curr.is_some() {
974            return Err(CodecError::InvalidMask {
975                kind: MaskKind::ComponentMask,
976                reason: MaskReason::ComponentPresenceMismatch {
977                    component: component.id,
978                },
979            });
980        }
981        if let (Some(base), Some(curr)) = (base, curr) {
982            if base.fields.len() != component.fields.len()
983                || curr.fields.len() != component.fields.len()
984            {
985                return Err(CodecError::InvalidMask {
986                    kind: MaskKind::FieldMask {
987                        component: component.id,
988                    },
989                    reason: MaskReason::FieldCountMismatch {
990                        expected: component.fields.len(),
991                        actual: base.fields.len().max(curr.fields.len()),
992                    },
993                });
994            }
995            if component.fields.len() > limits.max_fields_per_component {
996                return Err(CodecError::LimitsExceeded {
997                    kind: LimitKind::FieldsPerComponent,
998                    limit: limits.max_fields_per_component,
999                    actual: component.fields.len(),
1000                });
1001            }
1002            let (component_changed, field_mask) =
1003                scratch.component_and_field_masks_mut(component_count, component.fields.len());
1004            let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
1005                .iter()
1006                .any(|b| *b);
1007            writer.write_bit(any_changed)?;
1008            if any_changed {
1009                component_changed[idx] = true;
1010            }
1011        } else {
1012            writer.write_bit(false)?;
1013        }
1014    }
1015
1016    for (idx, component) in schema.components.iter().enumerate() {
1017        let (base, curr) = match (
1018            find_component(baseline, component.id),
1019            find_component(current, component.id),
1020        ) {
1021            (Some(base), Some(curr)) => (base, curr),
1022            _ => continue,
1023        };
1024        if component.fields.len() > limits.max_fields_per_component {
1025            return Err(CodecError::LimitsExceeded {
1026                kind: LimitKind::FieldsPerComponent,
1027                limit: limits.max_fields_per_component,
1028                actual: component.fields.len(),
1029            });
1030        }
1031        let (component_changed, field_mask) =
1032            scratch.component_and_field_masks_mut(component_count, component.fields.len());
1033        if component_changed[idx] {
1034            let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
1035            for bit in field_mask {
1036                writer.write_bit(*bit)?;
1037            }
1038            for (((field, _base_val), curr_val), changed) in component
1039                .fields
1040                .iter()
1041                .zip(base.fields.iter())
1042                .zip(curr.fields.iter())
1043                .zip(field_mask.iter())
1044            {
1045                if *changed {
1046                    write_field_value(component.id, *field, *curr_val, writer)?;
1047                }
1048            }
1049        }
1050    }
1051    Ok(())
1052}
1053
1054fn decode_update_component(
1055    component: &ComponentDef,
1056    reader: &mut BitReader<'_>,
1057    limits: &CodecLimits,
1058) -> CodecResult<Vec<(usize, FieldValue)>> {
1059    if component.fields.len() > limits.max_fields_per_component {
1060        return Err(CodecError::LimitsExceeded {
1061            kind: LimitKind::FieldsPerComponent,
1062            limit: limits.max_fields_per_component,
1063            actual: component.fields.len(),
1064        });
1065    }
1066    let mask = read_mask(
1067        reader,
1068        component.fields.len(),
1069        MaskKind::FieldMask {
1070            component: component.id,
1071        },
1072    )?;
1073    if !mask.iter().any(|b| *b) {
1074        return Err(CodecError::InvalidMask {
1075            kind: MaskKind::FieldMask {
1076                component: component.id,
1077            },
1078            reason: MaskReason::EmptyFieldMask {
1079                component: component.id,
1080            },
1081        });
1082    }
1083    let mut fields = Vec::new();
1084    for (idx, field) in component.fields.iter().enumerate() {
1085        if mask[idx] {
1086            let value = read_field_value(component.id, *field, reader)?;
1087            fields.push((idx, value));
1088        }
1089    }
1090    Ok(fields)
1091}
1092
1093fn compute_field_mask_into<'a>(
1094    component: &ComponentDef,
1095    baseline: &ComponentSnapshot,
1096    current: &ComponentSnapshot,
1097    field_mask: &'a mut [bool],
1098) -> CodecResult<&'a [bool]> {
1099    for (((field, base_val), curr_val), slot) in component
1100        .fields
1101        .iter()
1102        .zip(baseline.fields.iter())
1103        .zip(current.fields.iter())
1104        .zip(field_mask.iter_mut())
1105    {
1106        *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
1107    }
1108    Ok(field_mask)
1109}
1110
1111fn field_changed(
1112    component_id: ComponentId,
1113    field: FieldDef,
1114    baseline: FieldValue,
1115    current: FieldValue,
1116) -> CodecResult<bool> {
1117    match field.change {
1118        ChangePolicy::Always => field_differs(component_id, field, baseline, current),
1119        ChangePolicy::Threshold { threshold_q } => {
1120            field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
1121        }
1122    }
1123}
1124
1125fn field_differs(
1126    component_id: ComponentId,
1127    field: FieldDef,
1128    baseline: FieldValue,
1129    current: FieldValue,
1130) -> CodecResult<bool> {
1131    match (baseline, current) {
1132        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
1133        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
1134        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
1135        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
1136        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
1137        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
1138        _ => Err(CodecError::InvalidValue {
1139            component: component_id,
1140            field: field.id,
1141            reason: ValueReason::TypeMismatch {
1142                expected: codec_name(field.codec),
1143                found: value_name(current),
1144            },
1145        }),
1146    }
1147}
1148
1149fn field_exceeds_threshold(
1150    component_id: ComponentId,
1151    field: FieldDef,
1152    baseline: FieldValue,
1153    current: FieldValue,
1154    threshold_q: u32,
1155) -> CodecResult<bool> {
1156    let threshold_q = threshold_q as u64;
1157    match (baseline, current) {
1158        (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
1159            Ok((a - b).unsigned_abs() > threshold_q)
1160        }
1161        (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
1162        (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
1163        (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
1164        (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
1165            Ok((a - b).unsigned_abs() > threshold_q)
1166        }
1167        (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
1168        _ => Err(CodecError::InvalidValue {
1169            component: component_id,
1170            field: field.id,
1171            reason: ValueReason::TypeMismatch {
1172                expected: codec_name(field.codec),
1173                found: value_name(current),
1174            },
1175        }),
1176    }
1177}
1178
1179fn entity_has_updates(
1180    schema: &schema::Schema,
1181    baseline: &EntitySnapshot,
1182    current: &EntitySnapshot,
1183    limits: &CodecLimits,
1184) -> CodecResult<bool> {
1185    ensure_component_presence_matches(schema, baseline, current)?;
1186    for component in &schema.components {
1187        let base = find_component(baseline, component.id);
1188        let curr = find_component(current, component.id);
1189        if let (Some(base), Some(curr)) = (base, curr) {
1190            if base.fields.len() != component.fields.len()
1191                || curr.fields.len() != component.fields.len()
1192            {
1193                return Err(CodecError::InvalidMask {
1194                    kind: MaskKind::FieldMask {
1195                        component: component.id,
1196                    },
1197                    reason: MaskReason::FieldCountMismatch {
1198                        expected: component.fields.len(),
1199                        actual: base.fields.len().max(curr.fields.len()),
1200                    },
1201                });
1202            }
1203            if component.fields.len() > limits.max_fields_per_component {
1204                return Err(CodecError::LimitsExceeded {
1205                    kind: LimitKind::FieldsPerComponent,
1206                    limit: limits.max_fields_per_component,
1207                    actual: component.fields.len(),
1208                });
1209            }
1210            for ((field, base_val), curr_val) in component
1211                .fields
1212                .iter()
1213                .zip(base.fields.iter())
1214                .zip(curr.fields.iter())
1215            {
1216                if field_changed(component.id, *field, *base_val, *curr_val)? {
1217                    return Ok(true);
1218                }
1219            }
1220        }
1221    }
1222    Ok(false)
1223}
1224
1225fn ensure_component_presence_matches(
1226    schema: &schema::Schema,
1227    baseline: &EntitySnapshot,
1228    current: &EntitySnapshot,
1229) -> CodecResult<()> {
1230    // In this version, component presence is stable across an entity's lifetime.
1231    for component in &schema.components {
1232        let base = find_component(baseline, component.id).is_some();
1233        let curr = find_component(current, component.id).is_some();
1234        if base != curr {
1235            return Err(CodecError::InvalidMask {
1236                kind: MaskKind::ComponentMask,
1237                reason: MaskReason::ComponentPresenceMismatch {
1238                    component: component.id,
1239                },
1240            });
1241        }
1242    }
1243    Ok(())
1244}
1245
1246fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
1247    entity.components.iter().find(|c| c.id == id)
1248}
1249
1250fn codec_name(codec: schema::FieldCodec) -> &'static str {
1251    match codec {
1252        schema::FieldCodec::Bool => "bool",
1253        schema::FieldCodec::UInt { .. } => "uint",
1254        schema::FieldCodec::SInt { .. } => "sint",
1255        schema::FieldCodec::VarUInt => "varuint",
1256        schema::FieldCodec::VarSInt => "varsint",
1257        schema::FieldCodec::FixedPoint(_) => "fixed-point",
1258    }
1259}
1260
1261fn value_name(value: FieldValue) -> &'static str {
1262    match value {
1263        FieldValue::Bool(_) => "bool",
1264        FieldValue::UInt(_) => "uint",
1265        FieldValue::SInt(_) => "sint",
1266        FieldValue::VarUInt(_) => "varuint",
1267        FieldValue::VarSInt(_) => "varsint",
1268        FieldValue::FixedPoint(_) => "fixed-point",
1269    }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274    use super::*;
1275    use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
1276
1277    fn schema_one_bool() -> Schema {
1278        let component = ComponentDef::new(ComponentId::new(1).unwrap())
1279            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1280        Schema::new(vec![component]).unwrap()
1281    }
1282
1283    fn schema_uint_threshold(threshold_q: u32) -> Schema {
1284        let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
1285            .change(ChangePolicy::Threshold { threshold_q });
1286        let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
1287        Schema::new(vec![component]).unwrap()
1288    }
1289
1290    fn schema_two_components() -> Schema {
1291        let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
1292            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1293        let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
1294            .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1295        Schema::new(vec![c1, c2]).unwrap()
1296    }
1297
1298    fn baseline_snapshot() -> Snapshot {
1299        Snapshot {
1300            tick: SnapshotTick::new(10),
1301            entities: vec![EntitySnapshot {
1302                id: EntityId::new(1),
1303                components: vec![ComponentSnapshot {
1304                    id: ComponentId::new(1).unwrap(),
1305                    fields: vec![FieldValue::Bool(false)],
1306                }],
1307            }],
1308        }
1309    }
1310
1311    #[test]
1312    fn no_op_delta_is_empty() {
1313        let schema = schema_one_bool();
1314        let baseline = baseline_snapshot();
1315        let current = baseline.clone();
1316        let mut buf = [0u8; 128];
1317        let bytes = encode_delta_snapshot(
1318            &schema,
1319            SnapshotTick::new(11),
1320            baseline.tick,
1321            &baseline,
1322            &current,
1323            &CodecLimits::for_testing(),
1324            &mut buf,
1325        )
1326        .unwrap();
1327        let header =
1328            wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
1329        let mut expected = [0u8; wire::HEADER_SIZE];
1330        encode_header(&header, &mut expected).unwrap();
1331        assert_eq!(&buf[..bytes], expected.as_slice());
1332    }
1333
1334    #[test]
1335    fn delta_roundtrip_single_update() {
1336        let schema = schema_one_bool();
1337        let baseline = baseline_snapshot();
1338        let current = Snapshot {
1339            tick: SnapshotTick::new(11),
1340            entities: vec![EntitySnapshot {
1341                id: EntityId::new(1),
1342                components: vec![ComponentSnapshot {
1343                    id: ComponentId::new(1).unwrap(),
1344                    fields: vec![FieldValue::Bool(true)],
1345                }],
1346            }],
1347        };
1348
1349        let mut buf = [0u8; 128];
1350        let bytes = encode_delta_snapshot(
1351            &schema,
1352            current.tick,
1353            baseline.tick,
1354            &baseline,
1355            &current,
1356            &CodecLimits::for_testing(),
1357            &mut buf,
1358        )
1359        .unwrap();
1360        let applied = apply_delta_snapshot(
1361            &schema,
1362            &baseline,
1363            &buf[..bytes],
1364            &wire::Limits::for_testing(),
1365            &CodecLimits::for_testing(),
1366        )
1367        .unwrap();
1368        assert_eq!(applied.entities, current.entities);
1369    }
1370
1371    #[test]
1372    fn delta_roundtrip_reuse_scratch() {
1373        let schema = schema_one_bool();
1374        let baseline = baseline_snapshot();
1375        let current_one = Snapshot {
1376            tick: SnapshotTick::new(11),
1377            entities: vec![EntitySnapshot {
1378                id: EntityId::new(1),
1379                components: vec![ComponentSnapshot {
1380                    id: ComponentId::new(1).unwrap(),
1381                    fields: vec![FieldValue::Bool(true)],
1382                }],
1383            }],
1384        };
1385        let current_two = Snapshot {
1386            tick: SnapshotTick::new(12),
1387            entities: vec![EntitySnapshot {
1388                id: EntityId::new(1),
1389                components: vec![ComponentSnapshot {
1390                    id: ComponentId::new(1).unwrap(),
1391                    fields: vec![FieldValue::Bool(false)],
1392                }],
1393            }],
1394        };
1395
1396        let mut scratch = CodecScratch::default();
1397        let mut buf_one = [0u8; 128];
1398        let mut buf_two = [0u8; 128];
1399
1400        let bytes_one = encode_delta_snapshot_with_scratch(
1401            &schema,
1402            current_one.tick,
1403            baseline.tick,
1404            &baseline,
1405            &current_one,
1406            &CodecLimits::for_testing(),
1407            &mut scratch,
1408            &mut buf_one,
1409        )
1410        .unwrap();
1411        let applied_one = apply_delta_snapshot(
1412            &schema,
1413            &baseline,
1414            &buf_one[..bytes_one],
1415            &wire::Limits::for_testing(),
1416            &CodecLimits::for_testing(),
1417        )
1418        .unwrap();
1419        assert_eq!(applied_one.entities, current_one.entities);
1420
1421        let bytes_two = encode_delta_snapshot_with_scratch(
1422            &schema,
1423            current_two.tick,
1424            baseline.tick,
1425            &baseline,
1426            &current_two,
1427            &CodecLimits::for_testing(),
1428            &mut scratch,
1429            &mut buf_two,
1430        )
1431        .unwrap();
1432        let applied_two = apply_delta_snapshot(
1433            &schema,
1434            &baseline,
1435            &buf_two[..bytes_two],
1436            &wire::Limits::for_testing(),
1437            &CodecLimits::for_testing(),
1438        )
1439        .unwrap();
1440        assert_eq!(applied_two.entities, current_two.entities);
1441    }
1442
1443    #[test]
1444    fn delta_roundtrip_create_destroy_update() {
1445        let schema = schema_one_bool();
1446        let baseline = Snapshot {
1447            tick: SnapshotTick::new(10),
1448            entities: vec![
1449                EntitySnapshot {
1450                    id: EntityId::new(1),
1451                    components: vec![ComponentSnapshot {
1452                        id: ComponentId::new(1).unwrap(),
1453                        fields: vec![FieldValue::Bool(false)],
1454                    }],
1455                },
1456                EntitySnapshot {
1457                    id: EntityId::new(2),
1458                    components: vec![ComponentSnapshot {
1459                        id: ComponentId::new(1).unwrap(),
1460                        fields: vec![FieldValue::Bool(false)],
1461                    }],
1462                },
1463            ],
1464        };
1465        let current = Snapshot {
1466            tick: SnapshotTick::new(11),
1467            entities: vec![
1468                EntitySnapshot {
1469                    id: EntityId::new(2),
1470                    components: vec![ComponentSnapshot {
1471                        id: ComponentId::new(1).unwrap(),
1472                        fields: vec![FieldValue::Bool(true)],
1473                    }],
1474                },
1475                EntitySnapshot {
1476                    id: EntityId::new(3),
1477                    components: vec![ComponentSnapshot {
1478                        id: ComponentId::new(1).unwrap(),
1479                        fields: vec![FieldValue::Bool(true)],
1480                    }],
1481                },
1482            ],
1483        };
1484
1485        let mut buf = [0u8; 256];
1486        let bytes = encode_delta_snapshot(
1487            &schema,
1488            current.tick,
1489            baseline.tick,
1490            &baseline,
1491            &current,
1492            &CodecLimits::for_testing(),
1493            &mut buf,
1494        )
1495        .unwrap();
1496        let applied = apply_delta_snapshot(
1497            &schema,
1498            &baseline,
1499            &buf[..bytes],
1500            &wire::Limits::for_testing(),
1501            &CodecLimits::for_testing(),
1502        )
1503        .unwrap();
1504        assert_eq!(applied.entities, current.entities);
1505    }
1506
1507    #[test]
1508    fn delta_roundtrip_single_component_change() {
1509        let schema = schema_two_components();
1510        let baseline = Snapshot {
1511            tick: SnapshotTick::new(10),
1512            entities: vec![EntitySnapshot {
1513                id: EntityId::new(1),
1514                components: vec![
1515                    ComponentSnapshot {
1516                        id: ComponentId::new(1).unwrap(),
1517                        fields: vec![FieldValue::Bool(false)],
1518                    },
1519                    ComponentSnapshot {
1520                        id: ComponentId::new(2).unwrap(),
1521                        fields: vec![FieldValue::Bool(false)],
1522                    },
1523                ],
1524            }],
1525        };
1526        let current = Snapshot {
1527            tick: SnapshotTick::new(11),
1528            entities: vec![EntitySnapshot {
1529                id: EntityId::new(1),
1530                components: vec![
1531                    ComponentSnapshot {
1532                        id: ComponentId::new(1).unwrap(),
1533                        fields: vec![FieldValue::Bool(true)],
1534                    },
1535                    ComponentSnapshot {
1536                        id: ComponentId::new(2).unwrap(),
1537                        fields: vec![FieldValue::Bool(false)],
1538                    },
1539                ],
1540            }],
1541        };
1542
1543        let mut buf = [0u8; 256];
1544        let bytes = encode_delta_snapshot(
1545            &schema,
1546            current.tick,
1547            baseline.tick,
1548            &baseline,
1549            &current,
1550            &CodecLimits::for_testing(),
1551            &mut buf,
1552        )
1553        .unwrap();
1554        let applied = apply_delta_snapshot(
1555            &schema,
1556            &baseline,
1557            &buf[..bytes],
1558            &wire::Limits::for_testing(),
1559            &CodecLimits::for_testing(),
1560        )
1561        .unwrap();
1562        assert_eq!(applied.entities, current.entities);
1563    }
1564
1565    #[test]
1566    fn baseline_tick_mismatch_is_error() {
1567        let schema = schema_one_bool();
1568        let baseline = baseline_snapshot();
1569        let current = baseline.clone();
1570        let mut buf = [0u8; 128];
1571        let bytes = encode_delta_snapshot(
1572            &schema,
1573            SnapshotTick::new(11),
1574            baseline.tick,
1575            &baseline,
1576            &current,
1577            &CodecLimits::for_testing(),
1578            &mut buf,
1579        )
1580        .unwrap();
1581        let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
1582        packet.header.baseline_tick = 999;
1583        wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
1584        let err = apply_delta_snapshot(
1585            &schema,
1586            &baseline,
1587            &buf[..bytes],
1588            &wire::Limits::for_testing(),
1589            &CodecLimits::for_testing(),
1590        )
1591        .unwrap_err();
1592        assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
1593    }
1594
1595    #[test]
1596    fn threshold_suppresses_small_change() {
1597        let schema = schema_uint_threshold(5);
1598        let baseline = Snapshot {
1599            tick: SnapshotTick::new(10),
1600            entities: vec![EntitySnapshot {
1601                id: EntityId::new(1),
1602                components: vec![ComponentSnapshot {
1603                    id: ComponentId::new(1).unwrap(),
1604                    fields: vec![FieldValue::UInt(10)],
1605                }],
1606            }],
1607        };
1608        let current = Snapshot {
1609            tick: SnapshotTick::new(11),
1610            entities: vec![EntitySnapshot {
1611                id: EntityId::new(1),
1612                components: vec![ComponentSnapshot {
1613                    id: ComponentId::new(1).unwrap(),
1614                    fields: vec![FieldValue::UInt(12)],
1615                }],
1616            }],
1617        };
1618
1619        let mut buf = [0u8; 128];
1620        let bytes = encode_delta_snapshot(
1621            &schema,
1622            current.tick,
1623            baseline.tick,
1624            &baseline,
1625            &current,
1626            &CodecLimits::for_testing(),
1627            &mut buf,
1628        )
1629        .unwrap();
1630
1631        let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
1632        assert_eq!(packet.sections.len(), 0);
1633
1634        let applied = apply_delta_snapshot(
1635            &schema,
1636            &baseline,
1637            &buf[..bytes],
1638            &wire::Limits::for_testing(),
1639            &CodecLimits::for_testing(),
1640        )
1641        .unwrap();
1642        assert_eq!(applied.entities, baseline.entities);
1643    }
1644}