1use bitstream::{BitReader, BitWriter};
4use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
5use wire::{decode_packet, encode_header, SectionTag, WirePacket};
6
7use crate::baseline::BaselineStore;
8use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
9use crate::limits::CodecLimits;
10use crate::scratch::CodecScratch;
11use crate::snapshot::{
12 ensure_known_components, read_field_value, read_mask, write_field_value, write_section,
13 ComponentSnapshot, EntitySnapshot, FieldValue, Snapshot,
14};
15use crate::types::{EntityId, SnapshotTick};
16
17#[must_use]
19pub fn select_baseline_tick<T>(
20 store: &BaselineStore<T>,
21 ack_tick: SnapshotTick,
22) -> Option<SnapshotTick> {
23 store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
24}
25
26pub fn encode_delta_snapshot(
30 schema: &schema::Schema,
31 tick: SnapshotTick,
32 baseline_tick: SnapshotTick,
33 baseline: &Snapshot,
34 current: &Snapshot,
35 limits: &CodecLimits,
36 out: &mut [u8],
37) -> CodecResult<usize> {
38 let mut scratch = CodecScratch::default();
39 encode_delta_snapshot_with_scratch(
40 schema,
41 tick,
42 baseline_tick,
43 baseline,
44 current,
45 limits,
46 &mut scratch,
47 out,
48 )
49}
50
51#[allow(clippy::too_many_arguments)]
53pub fn encode_delta_snapshot_with_scratch(
54 schema: &schema::Schema,
55 tick: SnapshotTick,
56 baseline_tick: SnapshotTick,
57 baseline: &Snapshot,
58 current: &Snapshot,
59 limits: &CodecLimits,
60 scratch: &mut CodecScratch,
61 out: &mut [u8],
62) -> CodecResult<usize> {
63 if out.len() < wire::HEADER_SIZE {
64 return Err(CodecError::OutputTooSmall {
65 needed: wire::HEADER_SIZE,
66 available: out.len(),
67 });
68 }
69
70 if baseline.tick != baseline_tick {
71 return Err(CodecError::BaselineTickMismatch {
72 expected: baseline.tick.raw(),
73 found: baseline_tick.raw(),
74 });
75 }
76
77 ensure_entities_sorted(&baseline.entities)?;
78 ensure_entities_sorted(¤t.entities)?;
79
80 let mut counts = DiffCounts::default();
81 diff_counts(schema, baseline, current, limits, &mut counts)?;
82
83 if counts.creates > limits.max_entities_create {
84 return Err(CodecError::LimitsExceeded {
85 kind: LimitKind::EntitiesCreate,
86 limit: limits.max_entities_create,
87 actual: counts.creates,
88 });
89 }
90 if counts.updates > limits.max_entities_update {
91 return Err(CodecError::LimitsExceeded {
92 kind: LimitKind::EntitiesUpdate,
93 limit: limits.max_entities_update,
94 actual: counts.updates,
95 });
96 }
97 if counts.destroys > limits.max_entities_destroy {
98 return Err(CodecError::LimitsExceeded {
99 kind: LimitKind::EntitiesDestroy,
100 limit: limits.max_entities_destroy,
101 actual: counts.destroys,
102 });
103 }
104
105 let mut offset = wire::HEADER_SIZE;
106 if counts.destroys > 0 {
107 let written = write_section(
108 SectionTag::EntityDestroy,
109 &mut out[offset..],
110 limits,
111 |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
112 )?;
113 offset += written;
114 }
115 if counts.creates > 0 {
116 let written = write_section(
117 SectionTag::EntityCreate,
118 &mut out[offset..],
119 limits,
120 |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
121 )?;
122 offset += written;
123 }
124 if counts.updates > 0 {
125 let written = write_section(
126 SectionTag::EntityUpdate,
127 &mut out[offset..],
128 limits,
129 |writer| {
130 encode_update_body(
131 schema,
132 baseline,
133 current,
134 counts.updates,
135 limits,
136 scratch,
137 writer,
138 )
139 },
140 )?;
141 offset += written;
142 }
143
144 let payload_len = offset - wire::HEADER_SIZE;
145 let header = wire::PacketHeader::delta_snapshot(
146 schema_hash(schema),
147 tick.raw(),
148 baseline_tick.raw(),
149 payload_len as u32,
150 );
151 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
152 CodecError::OutputTooSmall {
153 needed: wire::HEADER_SIZE,
154 available: out.len(),
155 }
156 })?;
157
158 Ok(offset)
159}
160
161pub fn apply_delta_snapshot(
163 schema: &schema::Schema,
164 baseline: &Snapshot,
165 bytes: &[u8],
166 wire_limits: &wire::Limits,
167 limits: &CodecLimits,
168) -> CodecResult<Snapshot> {
169 let packet = decode_packet(bytes, wire_limits)?;
170 apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
171}
172
173pub fn apply_delta_snapshot_from_packet(
175 schema: &schema::Schema,
176 baseline: &Snapshot,
177 packet: &WirePacket<'_>,
178 limits: &CodecLimits,
179) -> CodecResult<Snapshot> {
180 let header = packet.header;
181 if !header.flags.is_delta_snapshot() {
182 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
183 flags: header.flags.raw(),
184 }));
185 }
186 if header.baseline_tick == 0 {
187 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
188 baseline_tick: header.baseline_tick,
189 flags: header.flags.raw(),
190 }));
191 }
192 if header.baseline_tick != baseline.tick.raw() {
193 return Err(CodecError::BaselineTickMismatch {
194 expected: baseline.tick.raw(),
195 found: header.baseline_tick,
196 });
197 }
198
199 let expected_hash = schema_hash(schema);
200 if header.schema_hash != expected_hash {
201 return Err(CodecError::SchemaMismatch {
202 expected: expected_hash,
203 found: header.schema_hash,
204 });
205 }
206
207 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
208
209 ensure_entities_sorted(&baseline.entities)?;
210 ensure_entities_sorted(&creates)?;
211
212 let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
213 remaining = apply_creates(remaining, creates)?;
214 if remaining.len() > limits.max_total_entities_after_apply {
215 return Err(CodecError::LimitsExceeded {
216 kind: LimitKind::TotalEntitiesAfterApply,
217 limit: limits.max_total_entities_after_apply,
218 actual: remaining.len(),
219 });
220 }
221 apply_updates(&mut remaining, &updates)?;
222
223 Ok(Snapshot {
224 tick: SnapshotTick::new(header.tick),
225 entities: remaining,
226 })
227}
228
229pub fn decode_delta_packet(
231 schema: &schema::Schema,
232 packet: &WirePacket<'_>,
233 limits: &CodecLimits,
234) -> CodecResult<DeltaDecoded> {
235 let header = packet.header;
236 if !header.flags.is_delta_snapshot() {
237 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
238 flags: header.flags.raw(),
239 }));
240 }
241 if header.baseline_tick == 0 {
242 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
243 baseline_tick: header.baseline_tick,
244 flags: header.flags.raw(),
245 }));
246 }
247 let expected_hash = schema_hash(schema);
248 if header.schema_hash != expected_hash {
249 return Err(CodecError::SchemaMismatch {
250 expected: expected_hash,
251 found: header.schema_hash,
252 });
253 }
254
255 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
256
257 Ok(DeltaDecoded {
258 tick: SnapshotTick::new(header.tick),
259 baseline_tick: SnapshotTick::new(header.baseline_tick),
260 destroys,
261 creates,
262 updates,
263 })
264}
265
266#[derive(Default)]
267struct DiffCounts {
268 creates: usize,
269 updates: usize,
270 destroys: usize,
271}
272
273fn diff_counts(
274 schema: &schema::Schema,
275 baseline: &Snapshot,
276 current: &Snapshot,
277 limits: &CodecLimits,
278 counts: &mut DiffCounts,
279) -> CodecResult<()> {
280 let mut i = 0usize;
281 let mut j = 0usize;
282 while i < baseline.entities.len() || j < current.entities.len() {
283 let base = baseline.entities.get(i);
284 let curr = current.entities.get(j);
285 match (base, curr) {
286 (Some(b), Some(c)) => {
287 if b.id.raw() < c.id.raw() {
288 counts.destroys += 1;
289 i += 1;
290 } else if b.id.raw() > c.id.raw() {
291 counts.creates += 1;
292 j += 1;
293 } else {
294 if entity_has_updates(schema, b, c, limits)? {
295 counts.updates += 1;
296 }
297 i += 1;
298 j += 1;
299 }
300 }
301 (Some(_), None) => {
302 counts.destroys += 1;
303 i += 1;
304 }
305 (None, Some(_)) => {
306 counts.creates += 1;
307 j += 1;
308 }
309 (None, None) => break,
310 }
311 }
312 Ok(())
313}
314
315fn encode_destroy_body(
316 baseline: &Snapshot,
317 current: &Snapshot,
318 destroy_count: usize,
319 limits: &CodecLimits,
320 writer: &mut BitWriter<'_>,
321) -> CodecResult<()> {
322 if destroy_count > limits.max_entities_destroy {
323 return Err(CodecError::LimitsExceeded {
324 kind: LimitKind::EntitiesDestroy,
325 limit: limits.max_entities_destroy,
326 actual: destroy_count,
327 });
328 }
329
330 writer.align_to_byte()?;
331 writer.write_varu32(destroy_count as u32)?;
332
333 let mut i = 0usize;
334 let mut j = 0usize;
335 while i < baseline.entities.len() || j < current.entities.len() {
336 let base = baseline.entities.get(i);
337 let curr = current.entities.get(j);
338 match (base, curr) {
339 (Some(b), Some(c)) => {
340 if b.id.raw() < c.id.raw() {
341 writer.align_to_byte()?;
342 writer.write_u32_aligned(b.id.raw())?;
343 i += 1;
344 } else if b.id.raw() > c.id.raw() {
345 j += 1;
346 } else {
347 i += 1;
348 j += 1;
349 }
350 }
351 (Some(b), None) => {
352 writer.align_to_byte()?;
353 writer.write_u32_aligned(b.id.raw())?;
354 i += 1;
355 }
356 (None, Some(_)) => {
357 j += 1;
358 }
359 (None, None) => break,
360 }
361 }
362
363 writer.align_to_byte()?;
364 Ok(())
365}
366
367fn encode_create_body(
368 schema: &schema::Schema,
369 baseline: &Snapshot,
370 current: &Snapshot,
371 create_count: usize,
372 limits: &CodecLimits,
373 writer: &mut BitWriter<'_>,
374) -> CodecResult<()> {
375 if create_count > limits.max_entities_create {
376 return Err(CodecError::LimitsExceeded {
377 kind: LimitKind::EntitiesCreate,
378 limit: limits.max_entities_create,
379 actual: create_count,
380 });
381 }
382
383 writer.align_to_byte()?;
384 writer.write_varu32(create_count as u32)?;
385
386 let mut i = 0usize;
387 let mut j = 0usize;
388 while i < baseline.entities.len() || j < current.entities.len() {
389 let base = baseline.entities.get(i);
390 let curr = current.entities.get(j);
391 match (base, curr) {
392 (Some(b), Some(c)) => {
393 if b.id.raw() < c.id.raw() {
394 i += 1;
395 } else if b.id.raw() > c.id.raw() {
396 write_create_entity(schema, c, limits, writer)?;
397 j += 1;
398 } else {
399 i += 1;
400 j += 1;
401 }
402 }
403 (Some(_), None) => {
404 i += 1;
405 }
406 (None, Some(c)) => {
407 write_create_entity(schema, c, limits, writer)?;
408 j += 1;
409 }
410 (None, None) => break,
411 }
412 }
413
414 writer.align_to_byte()?;
415 Ok(())
416}
417
418fn encode_update_body(
419 schema: &schema::Schema,
420 baseline: &Snapshot,
421 current: &Snapshot,
422 update_count: usize,
423 limits: &CodecLimits,
424 scratch: &mut CodecScratch,
425 writer: &mut BitWriter<'_>,
426) -> CodecResult<()> {
427 if update_count > limits.max_entities_update {
428 return Err(CodecError::LimitsExceeded {
429 kind: LimitKind::EntitiesUpdate,
430 limit: limits.max_entities_update,
431 actual: update_count,
432 });
433 }
434
435 writer.align_to_byte()?;
436 writer.write_varu32(update_count as u32)?;
437
438 let mut i = 0usize;
439 let mut j = 0usize;
440 while i < baseline.entities.len() || j < current.entities.len() {
441 let base = baseline.entities.get(i);
442 let curr = current.entities.get(j);
443 match (base, curr) {
444 (Some(b), Some(c)) => {
445 if b.id.raw() < c.id.raw() {
446 i += 1;
447 } else if b.id.raw() > c.id.raw() {
448 j += 1;
449 } else {
450 if entity_has_updates(schema, b, c, limits)? {
451 writer.align_to_byte()?;
452 writer.write_u32_aligned(c.id.raw())?;
453 ensure_component_presence_matches(schema, b, c)?;
454 write_update_components(schema, b, c, limits, scratch, writer)?;
455 }
456 i += 1;
457 j += 1;
458 }
459 }
460 (Some(_), None) => i += 1,
461 (None, Some(_)) => j += 1,
462 (None, None) => break,
463 }
464 }
465
466 writer.align_to_byte()?;
467 Ok(())
468}
469
470fn write_create_entity(
471 schema: &schema::Schema,
472 entity: &EntitySnapshot,
473 limits: &CodecLimits,
474 writer: &mut BitWriter<'_>,
475) -> CodecResult<()> {
476 writer.align_to_byte()?;
477 writer.write_u32_aligned(entity.id.raw())?;
478 ensure_known_components(schema, entity)?;
479 write_component_mask(schema, entity, writer)?;
480 for component in schema.components.iter() {
481 if let Some(snapshot) = find_component(entity, component.id) {
482 write_full_component(component, snapshot, limits, writer)?;
483 }
484 }
485 Ok(())
486}
487
488fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
489 if body.len() > limits.max_section_bytes {
490 return Err(CodecError::LimitsExceeded {
491 kind: LimitKind::SectionBytes,
492 limit: limits.max_section_bytes,
493 actual: body.len(),
494 });
495 }
496
497 let mut reader = BitReader::new(body);
498 reader.align_to_byte()?;
499 let count = reader.read_varu32()? as usize;
500 if count > limits.max_entities_destroy {
501 return Err(CodecError::LimitsExceeded {
502 kind: LimitKind::EntitiesDestroy,
503 limit: limits.max_entities_destroy,
504 actual: count,
505 });
506 }
507
508 let mut ids = Vec::with_capacity(count);
509 let mut prev: Option<u32> = None;
510 for _ in 0..count {
511 reader.align_to_byte()?;
512 let id = reader.read_u32_aligned()?;
513 if let Some(prev_id) = prev {
514 if id <= prev_id {
515 return Err(CodecError::InvalidEntityOrder {
516 previous: prev_id,
517 current: id,
518 });
519 }
520 }
521 prev = Some(id);
522 ids.push(EntityId::new(id));
523 }
524 reader.align_to_byte()?;
525 if reader.bits_remaining() != 0 {
526 return Err(CodecError::TrailingSectionData {
527 section: SectionTag::EntityDestroy,
528 remaining_bits: reader.bits_remaining(),
529 });
530 }
531 Ok(ids)
532}
533
534fn decode_create_section(
535 schema: &schema::Schema,
536 body: &[u8],
537 limits: &CodecLimits,
538) -> CodecResult<Vec<EntitySnapshot>> {
539 if body.len() > limits.max_section_bytes {
540 return Err(CodecError::LimitsExceeded {
541 kind: LimitKind::SectionBytes,
542 limit: limits.max_section_bytes,
543 actual: body.len(),
544 });
545 }
546
547 let mut reader = BitReader::new(body);
548 reader.align_to_byte()?;
549 let count = reader.read_varu32()? as usize;
550 if count > limits.max_entities_create {
551 return Err(CodecError::LimitsExceeded {
552 kind: LimitKind::EntitiesCreate,
553 limit: limits.max_entities_create,
554 actual: count,
555 });
556 }
557
558 let mut entities = Vec::with_capacity(count);
559 let mut prev: Option<u32> = None;
560 for _ in 0..count {
561 reader.align_to_byte()?;
562 let id = reader.read_u32_aligned()?;
563 if let Some(prev_id) = prev {
564 if id <= prev_id {
565 return Err(CodecError::InvalidEntityOrder {
566 previous: prev_id,
567 current: id,
568 });
569 }
570 }
571 prev = Some(id);
572
573 let component_mask = read_mask(
574 &mut reader,
575 schema.components.len(),
576 MaskKind::ComponentMask,
577 )?;
578
579 let mut components = Vec::new();
580 for (idx, component) in schema.components.iter().enumerate() {
581 if component_mask[idx] {
582 let fields = decode_full_component(component, &mut reader, limits)?;
583 components.push(ComponentSnapshot {
584 id: component.id,
585 fields,
586 });
587 }
588 }
589
590 let entity = EntitySnapshot {
591 id: EntityId::new(id),
592 components,
593 };
594 ensure_known_components(schema, &entity)?;
595 entities.push(entity);
596 }
597
598 reader.align_to_byte()?;
599 if reader.bits_remaining() != 0 {
600 return Err(CodecError::TrailingSectionData {
601 section: SectionTag::EntityCreate,
602 remaining_bits: reader.bits_remaining(),
603 });
604 }
605 Ok(entities)
606}
607
608fn decode_update_section(
609 schema: &schema::Schema,
610 body: &[u8],
611 limits: &CodecLimits,
612) -> CodecResult<Vec<DeltaUpdateEntity>> {
613 if body.len() > limits.max_section_bytes {
614 return Err(CodecError::LimitsExceeded {
615 kind: LimitKind::SectionBytes,
616 limit: limits.max_section_bytes,
617 actual: body.len(),
618 });
619 }
620
621 let mut reader = BitReader::new(body);
622 reader.align_to_byte()?;
623 let count = reader.read_varu32()? as usize;
624 if count > limits.max_entities_update {
625 return Err(CodecError::LimitsExceeded {
626 kind: LimitKind::EntitiesUpdate,
627 limit: limits.max_entities_update,
628 actual: count,
629 });
630 }
631
632 let mut updates = Vec::with_capacity(count);
633 let mut prev: Option<u32> = None;
634 for _ in 0..count {
635 reader.align_to_byte()?;
636 let id = reader.read_u32_aligned()?;
637 if let Some(prev_id) = prev {
638 if id <= prev_id {
639 return Err(CodecError::InvalidEntityOrder {
640 previous: prev_id,
641 current: id,
642 });
643 }
644 }
645 prev = Some(id);
646
647 let component_mask = read_mask(
648 &mut reader,
649 schema.components.len(),
650 MaskKind::ComponentMask,
651 )?;
652 let mut components = Vec::new();
653 for (idx, component) in schema.components.iter().enumerate() {
654 if component_mask[idx] {
655 let fields = decode_update_component(component, &mut reader, limits)?;
656 components.push(DeltaUpdateComponent {
657 id: component.id,
658 fields,
659 });
660 }
661 }
662
663 updates.push(DeltaUpdateEntity {
664 id: EntityId::new(id),
665 components,
666 });
667 }
668
669 reader.align_to_byte()?;
670 if reader.bits_remaining() != 0 {
671 return Err(CodecError::TrailingSectionData {
672 section: SectionTag::EntityUpdate,
673 remaining_bits: reader.bits_remaining(),
674 });
675 }
676 Ok(updates)
677}
678
679fn decode_delta_sections(
680 schema: &schema::Schema,
681 packet: &WirePacket<'_>,
682 limits: &CodecLimits,
683) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
684 let mut destroys: Option<Vec<EntityId>> = None;
685 let mut creates: Option<Vec<EntitySnapshot>> = None;
686 let mut updates: Option<Vec<DeltaUpdateEntity>> = None;
687
688 for section in &packet.sections {
689 match section.tag {
690 SectionTag::EntityDestroy => {
691 if destroys.is_some() {
692 return Err(CodecError::DuplicateSection {
693 section: section.tag,
694 });
695 }
696 destroys = Some(decode_destroy_section(section.body, limits)?);
697 }
698 SectionTag::EntityCreate => {
699 if creates.is_some() {
700 return Err(CodecError::DuplicateSection {
701 section: section.tag,
702 });
703 }
704 creates = Some(decode_create_section(schema, section.body, limits)?);
705 }
706 SectionTag::EntityUpdate => {
707 if updates.is_some() {
708 return Err(CodecError::DuplicateSection {
709 section: section.tag,
710 });
711 }
712 updates = Some(decode_update_section(schema, section.body, limits)?);
713 }
714 _ => {
715 return Err(CodecError::UnexpectedSection {
716 section: section.tag,
717 });
718 }
719 }
720 }
721
722 Ok((
723 destroys.unwrap_or_default(),
724 creates.unwrap_or_default(),
725 updates.unwrap_or_default(),
726 ))
727}
728
729#[derive(Debug, Clone, PartialEq, Eq)]
730pub struct DeltaDecoded {
731 pub tick: SnapshotTick,
732 pub baseline_tick: SnapshotTick,
733 pub destroys: Vec<EntityId>,
734 pub creates: Vec<EntitySnapshot>,
735 pub updates: Vec<DeltaUpdateEntity>,
736}
737
738#[derive(Debug, Clone, PartialEq, Eq)]
739pub struct DeltaUpdateEntity {
740 pub id: EntityId,
741 pub components: Vec<DeltaUpdateComponent>,
742}
743
744#[derive(Debug, Clone, PartialEq, Eq)]
745pub struct DeltaUpdateComponent {
746 pub id: ComponentId,
747 pub fields: Vec<(usize, FieldValue)>,
748}
749
750fn apply_destroys(
751 baseline: &[EntitySnapshot],
752 destroys: &[EntityId],
753) -> CodecResult<Vec<EntitySnapshot>> {
754 let mut result = Vec::with_capacity(baseline.len());
755 let mut i = 0usize;
756 let mut j = 0usize;
757 while i < baseline.len() || j < destroys.len() {
758 let base = baseline.get(i);
759 let destroy = destroys.get(j);
760 match (base, destroy) {
761 (Some(b), Some(d)) => {
762 if b.id.raw() < d.raw() {
763 result.push(b.clone());
764 i += 1;
765 } else if b.id.raw() > d.raw() {
766 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
767 } else {
768 i += 1;
769 j += 1;
770 }
771 }
772 (Some(b), None) => {
773 result.push(b.clone());
774 i += 1;
775 }
776 (None, Some(d)) => {
777 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
778 }
779 (None, None) => break,
780 }
781 }
782 Ok(result)
783}
784
785fn apply_creates(
786 baseline: Vec<EntitySnapshot>,
787 creates: Vec<EntitySnapshot>,
788) -> CodecResult<Vec<EntitySnapshot>> {
789 let mut result = Vec::with_capacity(baseline.len() + creates.len());
790 let mut i = 0usize;
791 let mut j = 0usize;
792 while i < baseline.len() || j < creates.len() {
793 let base = baseline.get(i);
794 let create = creates.get(j);
795 match (base, create) {
796 (Some(b), Some(c)) => {
797 if b.id.raw() < c.id.raw() {
798 result.push(b.clone());
799 i += 1;
800 } else if b.id.raw() > c.id.raw() {
801 result.push(c.clone());
802 j += 1;
803 } else {
804 return Err(CodecError::EntityAlreadyExists {
805 entity_id: c.id.raw(),
806 });
807 }
808 }
809 (Some(b), None) => {
810 result.push(b.clone());
811 i += 1;
812 }
813 (None, Some(c)) => {
814 result.push(c.clone());
815 j += 1;
816 }
817 (None, None) => break,
818 }
819 }
820 Ok(result)
821}
822
823fn apply_updates(
824 entities: &mut [EntitySnapshot],
825 updates: &[DeltaUpdateEntity],
826) -> CodecResult<()> {
827 for update in updates {
828 let idx = entities
829 .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
830 .map_err(|_| CodecError::EntityNotFound {
831 entity_id: update.id.raw(),
832 })?;
833 let entity = &mut entities[idx];
834 for component_update in &update.components {
835 let component = entity
836 .components
837 .iter_mut()
838 .find(|c| c.id == component_update.id)
839 .ok_or_else(|| CodecError::ComponentNotFound {
840 entity_id: update.id.raw(),
841 component_id: component_update.id.get(),
842 })?;
843 for (field_idx, value) in &component_update.fields {
844 if *field_idx >= component.fields.len() {
845 return Err(CodecError::InvalidMask {
846 kind: MaskKind::FieldMask {
847 component: component_update.id,
848 },
849 reason: MaskReason::FieldCountMismatch {
850 expected: component.fields.len(),
851 actual: *field_idx + 1,
852 },
853 });
854 }
855 component.fields[*field_idx] = *value;
856 }
857 }
858 }
859 Ok(())
860}
861
862fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
863 let mut prev: Option<u32> = None;
864 for entity in entities {
865 if let Some(prev_id) = prev {
866 if entity.id.raw() <= prev_id {
867 return Err(CodecError::InvalidEntityOrder {
868 previous: prev_id,
869 current: entity.id.raw(),
870 });
871 }
872 }
873 prev = Some(entity.id.raw());
874 }
875 Ok(())
876}
877
878fn write_component_mask(
879 schema: &schema::Schema,
880 entity: &EntitySnapshot,
881 writer: &mut BitWriter<'_>,
882) -> CodecResult<()> {
883 for component in &schema.components {
884 let present = find_component(entity, component.id).is_some();
885 writer.write_bit(present)?;
886 }
887 Ok(())
888}
889
890fn write_full_component(
891 component: &ComponentDef,
892 snapshot: &ComponentSnapshot,
893 limits: &CodecLimits,
894 writer: &mut BitWriter<'_>,
895) -> CodecResult<()> {
896 if snapshot.fields.len() != component.fields.len() {
897 return Err(CodecError::InvalidMask {
898 kind: MaskKind::FieldMask {
899 component: component.id,
900 },
901 reason: MaskReason::FieldCountMismatch {
902 expected: component.fields.len(),
903 actual: snapshot.fields.len(),
904 },
905 });
906 }
907 if snapshot.fields.len() > limits.max_fields_per_component {
908 return Err(CodecError::LimitsExceeded {
909 kind: LimitKind::FieldsPerComponent,
910 limit: limits.max_fields_per_component,
911 actual: snapshot.fields.len(),
912 });
913 }
914
915 for _ in &component.fields {
916 writer.write_bit(true)?;
917 }
918 for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
919 write_field_value(component.id, *field, *value, writer)?;
920 }
921 Ok(())
922}
923
924fn decode_full_component(
925 component: &ComponentDef,
926 reader: &mut BitReader<'_>,
927 limits: &CodecLimits,
928) -> CodecResult<Vec<FieldValue>> {
929 if component.fields.len() > limits.max_fields_per_component {
930 return Err(CodecError::LimitsExceeded {
931 kind: LimitKind::FieldsPerComponent,
932 limit: limits.max_fields_per_component,
933 actual: component.fields.len(),
934 });
935 }
936
937 let mask = read_mask(
938 reader,
939 component.fields.len(),
940 MaskKind::FieldMask {
941 component: component.id,
942 },
943 )?;
944 let mut values = Vec::with_capacity(component.fields.len());
945 for (idx, field) in component.fields.iter().enumerate() {
946 if !mask[idx] {
947 return Err(CodecError::InvalidMask {
948 kind: MaskKind::FieldMask {
949 component: component.id,
950 },
951 reason: MaskReason::MissingField { field: field.id },
952 });
953 }
954 values.push(read_field_value(component.id, *field, reader)?);
955 }
956 Ok(values)
957}
958
959fn write_update_components(
960 schema: &schema::Schema,
961 baseline: &EntitySnapshot,
962 current: &EntitySnapshot,
963 limits: &CodecLimits,
964 scratch: &mut CodecScratch,
965 writer: &mut BitWriter<'_>,
966) -> CodecResult<()> {
967 let component_count = schema.components.len();
968 let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
969 component_changed.fill(false);
970 for (idx, component) in schema.components.iter().enumerate() {
971 let base = find_component(baseline, component.id);
972 let curr = find_component(current, component.id);
973 if base.is_some() != curr.is_some() {
974 return Err(CodecError::InvalidMask {
975 kind: MaskKind::ComponentMask,
976 reason: MaskReason::ComponentPresenceMismatch {
977 component: component.id,
978 },
979 });
980 }
981 if let (Some(base), Some(curr)) = (base, curr) {
982 if base.fields.len() != component.fields.len()
983 || curr.fields.len() != component.fields.len()
984 {
985 return Err(CodecError::InvalidMask {
986 kind: MaskKind::FieldMask {
987 component: component.id,
988 },
989 reason: MaskReason::FieldCountMismatch {
990 expected: component.fields.len(),
991 actual: base.fields.len().max(curr.fields.len()),
992 },
993 });
994 }
995 if component.fields.len() > limits.max_fields_per_component {
996 return Err(CodecError::LimitsExceeded {
997 kind: LimitKind::FieldsPerComponent,
998 limit: limits.max_fields_per_component,
999 actual: component.fields.len(),
1000 });
1001 }
1002 let (component_changed, field_mask) =
1003 scratch.component_and_field_masks_mut(component_count, component.fields.len());
1004 let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
1005 .iter()
1006 .any(|b| *b);
1007 writer.write_bit(any_changed)?;
1008 if any_changed {
1009 component_changed[idx] = true;
1010 }
1011 } else {
1012 writer.write_bit(false)?;
1013 }
1014 }
1015
1016 for (idx, component) in schema.components.iter().enumerate() {
1017 let (base, curr) = match (
1018 find_component(baseline, component.id),
1019 find_component(current, component.id),
1020 ) {
1021 (Some(base), Some(curr)) => (base, curr),
1022 _ => continue,
1023 };
1024 if component.fields.len() > limits.max_fields_per_component {
1025 return Err(CodecError::LimitsExceeded {
1026 kind: LimitKind::FieldsPerComponent,
1027 limit: limits.max_fields_per_component,
1028 actual: component.fields.len(),
1029 });
1030 }
1031 let (component_changed, field_mask) =
1032 scratch.component_and_field_masks_mut(component_count, component.fields.len());
1033 if component_changed[idx] {
1034 let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
1035 for bit in field_mask {
1036 writer.write_bit(*bit)?;
1037 }
1038 for (((field, _base_val), curr_val), changed) in component
1039 .fields
1040 .iter()
1041 .zip(base.fields.iter())
1042 .zip(curr.fields.iter())
1043 .zip(field_mask.iter())
1044 {
1045 if *changed {
1046 write_field_value(component.id, *field, *curr_val, writer)?;
1047 }
1048 }
1049 }
1050 }
1051 Ok(())
1052}
1053
1054fn decode_update_component(
1055 component: &ComponentDef,
1056 reader: &mut BitReader<'_>,
1057 limits: &CodecLimits,
1058) -> CodecResult<Vec<(usize, FieldValue)>> {
1059 if component.fields.len() > limits.max_fields_per_component {
1060 return Err(CodecError::LimitsExceeded {
1061 kind: LimitKind::FieldsPerComponent,
1062 limit: limits.max_fields_per_component,
1063 actual: component.fields.len(),
1064 });
1065 }
1066 let mask = read_mask(
1067 reader,
1068 component.fields.len(),
1069 MaskKind::FieldMask {
1070 component: component.id,
1071 },
1072 )?;
1073 if !mask.iter().any(|b| *b) {
1074 return Err(CodecError::InvalidMask {
1075 kind: MaskKind::FieldMask {
1076 component: component.id,
1077 },
1078 reason: MaskReason::EmptyFieldMask {
1079 component: component.id,
1080 },
1081 });
1082 }
1083 let mut fields = Vec::new();
1084 for (idx, field) in component.fields.iter().enumerate() {
1085 if mask[idx] {
1086 let value = read_field_value(component.id, *field, reader)?;
1087 fields.push((idx, value));
1088 }
1089 }
1090 Ok(fields)
1091}
1092
1093fn compute_field_mask_into<'a>(
1094 component: &ComponentDef,
1095 baseline: &ComponentSnapshot,
1096 current: &ComponentSnapshot,
1097 field_mask: &'a mut [bool],
1098) -> CodecResult<&'a [bool]> {
1099 for (((field, base_val), curr_val), slot) in component
1100 .fields
1101 .iter()
1102 .zip(baseline.fields.iter())
1103 .zip(current.fields.iter())
1104 .zip(field_mask.iter_mut())
1105 {
1106 *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
1107 }
1108 Ok(field_mask)
1109}
1110
1111fn field_changed(
1112 component_id: ComponentId,
1113 field: FieldDef,
1114 baseline: FieldValue,
1115 current: FieldValue,
1116) -> CodecResult<bool> {
1117 match field.change {
1118 ChangePolicy::Always => field_differs(component_id, field, baseline, current),
1119 ChangePolicy::Threshold { threshold_q } => {
1120 field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
1121 }
1122 }
1123}
1124
1125fn field_differs(
1126 component_id: ComponentId,
1127 field: FieldDef,
1128 baseline: FieldValue,
1129 current: FieldValue,
1130) -> CodecResult<bool> {
1131 match (baseline, current) {
1132 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
1133 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
1134 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
1135 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
1136 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
1137 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
1138 _ => Err(CodecError::InvalidValue {
1139 component: component_id,
1140 field: field.id,
1141 reason: ValueReason::TypeMismatch {
1142 expected: codec_name(field.codec),
1143 found: value_name(current),
1144 },
1145 }),
1146 }
1147}
1148
1149fn field_exceeds_threshold(
1150 component_id: ComponentId,
1151 field: FieldDef,
1152 baseline: FieldValue,
1153 current: FieldValue,
1154 threshold_q: u32,
1155) -> CodecResult<bool> {
1156 let threshold_q = threshold_q as u64;
1157 match (baseline, current) {
1158 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
1159 Ok((a - b).unsigned_abs() > threshold_q)
1160 }
1161 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
1162 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
1163 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
1164 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
1165 Ok((a - b).unsigned_abs() > threshold_q)
1166 }
1167 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
1168 _ => Err(CodecError::InvalidValue {
1169 component: component_id,
1170 field: field.id,
1171 reason: ValueReason::TypeMismatch {
1172 expected: codec_name(field.codec),
1173 found: value_name(current),
1174 },
1175 }),
1176 }
1177}
1178
1179fn entity_has_updates(
1180 schema: &schema::Schema,
1181 baseline: &EntitySnapshot,
1182 current: &EntitySnapshot,
1183 limits: &CodecLimits,
1184) -> CodecResult<bool> {
1185 ensure_component_presence_matches(schema, baseline, current)?;
1186 for component in &schema.components {
1187 let base = find_component(baseline, component.id);
1188 let curr = find_component(current, component.id);
1189 if let (Some(base), Some(curr)) = (base, curr) {
1190 if base.fields.len() != component.fields.len()
1191 || curr.fields.len() != component.fields.len()
1192 {
1193 return Err(CodecError::InvalidMask {
1194 kind: MaskKind::FieldMask {
1195 component: component.id,
1196 },
1197 reason: MaskReason::FieldCountMismatch {
1198 expected: component.fields.len(),
1199 actual: base.fields.len().max(curr.fields.len()),
1200 },
1201 });
1202 }
1203 if component.fields.len() > limits.max_fields_per_component {
1204 return Err(CodecError::LimitsExceeded {
1205 kind: LimitKind::FieldsPerComponent,
1206 limit: limits.max_fields_per_component,
1207 actual: component.fields.len(),
1208 });
1209 }
1210 for ((field, base_val), curr_val) in component
1211 .fields
1212 .iter()
1213 .zip(base.fields.iter())
1214 .zip(curr.fields.iter())
1215 {
1216 if field_changed(component.id, *field, *base_val, *curr_val)? {
1217 return Ok(true);
1218 }
1219 }
1220 }
1221 }
1222 Ok(false)
1223}
1224
1225fn ensure_component_presence_matches(
1226 schema: &schema::Schema,
1227 baseline: &EntitySnapshot,
1228 current: &EntitySnapshot,
1229) -> CodecResult<()> {
1230 for component in &schema.components {
1232 let base = find_component(baseline, component.id).is_some();
1233 let curr = find_component(current, component.id).is_some();
1234 if base != curr {
1235 return Err(CodecError::InvalidMask {
1236 kind: MaskKind::ComponentMask,
1237 reason: MaskReason::ComponentPresenceMismatch {
1238 component: component.id,
1239 },
1240 });
1241 }
1242 }
1243 Ok(())
1244}
1245
1246fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
1247 entity.components.iter().find(|c| c.id == id)
1248}
1249
1250fn codec_name(codec: schema::FieldCodec) -> &'static str {
1251 match codec {
1252 schema::FieldCodec::Bool => "bool",
1253 schema::FieldCodec::UInt { .. } => "uint",
1254 schema::FieldCodec::SInt { .. } => "sint",
1255 schema::FieldCodec::VarUInt => "varuint",
1256 schema::FieldCodec::VarSInt => "varsint",
1257 schema::FieldCodec::FixedPoint(_) => "fixed-point",
1258 }
1259}
1260
1261fn value_name(value: FieldValue) -> &'static str {
1262 match value {
1263 FieldValue::Bool(_) => "bool",
1264 FieldValue::UInt(_) => "uint",
1265 FieldValue::SInt(_) => "sint",
1266 FieldValue::VarUInt(_) => "varuint",
1267 FieldValue::VarSInt(_) => "varsint",
1268 FieldValue::FixedPoint(_) => "fixed-point",
1269 }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274 use super::*;
1275 use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
1276
1277 fn schema_one_bool() -> Schema {
1278 let component = ComponentDef::new(ComponentId::new(1).unwrap())
1279 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1280 Schema::new(vec![component]).unwrap()
1281 }
1282
1283 fn schema_uint_threshold(threshold_q: u32) -> Schema {
1284 let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
1285 .change(ChangePolicy::Threshold { threshold_q });
1286 let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
1287 Schema::new(vec![component]).unwrap()
1288 }
1289
1290 fn schema_two_components() -> Schema {
1291 let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
1292 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1293 let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
1294 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
1295 Schema::new(vec![c1, c2]).unwrap()
1296 }
1297
1298 fn baseline_snapshot() -> Snapshot {
1299 Snapshot {
1300 tick: SnapshotTick::new(10),
1301 entities: vec![EntitySnapshot {
1302 id: EntityId::new(1),
1303 components: vec![ComponentSnapshot {
1304 id: ComponentId::new(1).unwrap(),
1305 fields: vec![FieldValue::Bool(false)],
1306 }],
1307 }],
1308 }
1309 }
1310
1311 #[test]
1312 fn no_op_delta_is_empty() {
1313 let schema = schema_one_bool();
1314 let baseline = baseline_snapshot();
1315 let current = baseline.clone();
1316 let mut buf = [0u8; 128];
1317 let bytes = encode_delta_snapshot(
1318 &schema,
1319 SnapshotTick::new(11),
1320 baseline.tick,
1321 &baseline,
1322 ¤t,
1323 &CodecLimits::for_testing(),
1324 &mut buf,
1325 )
1326 .unwrap();
1327 let header =
1328 wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
1329 let mut expected = [0u8; wire::HEADER_SIZE];
1330 encode_header(&header, &mut expected).unwrap();
1331 assert_eq!(&buf[..bytes], expected.as_slice());
1332 }
1333
1334 #[test]
1335 fn delta_roundtrip_single_update() {
1336 let schema = schema_one_bool();
1337 let baseline = baseline_snapshot();
1338 let current = Snapshot {
1339 tick: SnapshotTick::new(11),
1340 entities: vec![EntitySnapshot {
1341 id: EntityId::new(1),
1342 components: vec![ComponentSnapshot {
1343 id: ComponentId::new(1).unwrap(),
1344 fields: vec![FieldValue::Bool(true)],
1345 }],
1346 }],
1347 };
1348
1349 let mut buf = [0u8; 128];
1350 let bytes = encode_delta_snapshot(
1351 &schema,
1352 current.tick,
1353 baseline.tick,
1354 &baseline,
1355 ¤t,
1356 &CodecLimits::for_testing(),
1357 &mut buf,
1358 )
1359 .unwrap();
1360 let applied = apply_delta_snapshot(
1361 &schema,
1362 &baseline,
1363 &buf[..bytes],
1364 &wire::Limits::for_testing(),
1365 &CodecLimits::for_testing(),
1366 )
1367 .unwrap();
1368 assert_eq!(applied.entities, current.entities);
1369 }
1370
1371 #[test]
1372 fn delta_roundtrip_reuse_scratch() {
1373 let schema = schema_one_bool();
1374 let baseline = baseline_snapshot();
1375 let current_one = Snapshot {
1376 tick: SnapshotTick::new(11),
1377 entities: vec![EntitySnapshot {
1378 id: EntityId::new(1),
1379 components: vec![ComponentSnapshot {
1380 id: ComponentId::new(1).unwrap(),
1381 fields: vec![FieldValue::Bool(true)],
1382 }],
1383 }],
1384 };
1385 let current_two = Snapshot {
1386 tick: SnapshotTick::new(12),
1387 entities: vec![EntitySnapshot {
1388 id: EntityId::new(1),
1389 components: vec![ComponentSnapshot {
1390 id: ComponentId::new(1).unwrap(),
1391 fields: vec![FieldValue::Bool(false)],
1392 }],
1393 }],
1394 };
1395
1396 let mut scratch = CodecScratch::default();
1397 let mut buf_one = [0u8; 128];
1398 let mut buf_two = [0u8; 128];
1399
1400 let bytes_one = encode_delta_snapshot_with_scratch(
1401 &schema,
1402 current_one.tick,
1403 baseline.tick,
1404 &baseline,
1405 ¤t_one,
1406 &CodecLimits::for_testing(),
1407 &mut scratch,
1408 &mut buf_one,
1409 )
1410 .unwrap();
1411 let applied_one = apply_delta_snapshot(
1412 &schema,
1413 &baseline,
1414 &buf_one[..bytes_one],
1415 &wire::Limits::for_testing(),
1416 &CodecLimits::for_testing(),
1417 )
1418 .unwrap();
1419 assert_eq!(applied_one.entities, current_one.entities);
1420
1421 let bytes_two = encode_delta_snapshot_with_scratch(
1422 &schema,
1423 current_two.tick,
1424 baseline.tick,
1425 &baseline,
1426 ¤t_two,
1427 &CodecLimits::for_testing(),
1428 &mut scratch,
1429 &mut buf_two,
1430 )
1431 .unwrap();
1432 let applied_two = apply_delta_snapshot(
1433 &schema,
1434 &baseline,
1435 &buf_two[..bytes_two],
1436 &wire::Limits::for_testing(),
1437 &CodecLimits::for_testing(),
1438 )
1439 .unwrap();
1440 assert_eq!(applied_two.entities, current_two.entities);
1441 }
1442
1443 #[test]
1444 fn delta_roundtrip_create_destroy_update() {
1445 let schema = schema_one_bool();
1446 let baseline = Snapshot {
1447 tick: SnapshotTick::new(10),
1448 entities: vec![
1449 EntitySnapshot {
1450 id: EntityId::new(1),
1451 components: vec![ComponentSnapshot {
1452 id: ComponentId::new(1).unwrap(),
1453 fields: vec![FieldValue::Bool(false)],
1454 }],
1455 },
1456 EntitySnapshot {
1457 id: EntityId::new(2),
1458 components: vec![ComponentSnapshot {
1459 id: ComponentId::new(1).unwrap(),
1460 fields: vec![FieldValue::Bool(false)],
1461 }],
1462 },
1463 ],
1464 };
1465 let current = Snapshot {
1466 tick: SnapshotTick::new(11),
1467 entities: vec![
1468 EntitySnapshot {
1469 id: EntityId::new(2),
1470 components: vec![ComponentSnapshot {
1471 id: ComponentId::new(1).unwrap(),
1472 fields: vec![FieldValue::Bool(true)],
1473 }],
1474 },
1475 EntitySnapshot {
1476 id: EntityId::new(3),
1477 components: vec![ComponentSnapshot {
1478 id: ComponentId::new(1).unwrap(),
1479 fields: vec![FieldValue::Bool(true)],
1480 }],
1481 },
1482 ],
1483 };
1484
1485 let mut buf = [0u8; 256];
1486 let bytes = encode_delta_snapshot(
1487 &schema,
1488 current.tick,
1489 baseline.tick,
1490 &baseline,
1491 ¤t,
1492 &CodecLimits::for_testing(),
1493 &mut buf,
1494 )
1495 .unwrap();
1496 let applied = apply_delta_snapshot(
1497 &schema,
1498 &baseline,
1499 &buf[..bytes],
1500 &wire::Limits::for_testing(),
1501 &CodecLimits::for_testing(),
1502 )
1503 .unwrap();
1504 assert_eq!(applied.entities, current.entities);
1505 }
1506
1507 #[test]
1508 fn delta_roundtrip_single_component_change() {
1509 let schema = schema_two_components();
1510 let baseline = Snapshot {
1511 tick: SnapshotTick::new(10),
1512 entities: vec![EntitySnapshot {
1513 id: EntityId::new(1),
1514 components: vec![
1515 ComponentSnapshot {
1516 id: ComponentId::new(1).unwrap(),
1517 fields: vec![FieldValue::Bool(false)],
1518 },
1519 ComponentSnapshot {
1520 id: ComponentId::new(2).unwrap(),
1521 fields: vec![FieldValue::Bool(false)],
1522 },
1523 ],
1524 }],
1525 };
1526 let current = Snapshot {
1527 tick: SnapshotTick::new(11),
1528 entities: vec![EntitySnapshot {
1529 id: EntityId::new(1),
1530 components: vec![
1531 ComponentSnapshot {
1532 id: ComponentId::new(1).unwrap(),
1533 fields: vec![FieldValue::Bool(true)],
1534 },
1535 ComponentSnapshot {
1536 id: ComponentId::new(2).unwrap(),
1537 fields: vec![FieldValue::Bool(false)],
1538 },
1539 ],
1540 }],
1541 };
1542
1543 let mut buf = [0u8; 256];
1544 let bytes = encode_delta_snapshot(
1545 &schema,
1546 current.tick,
1547 baseline.tick,
1548 &baseline,
1549 ¤t,
1550 &CodecLimits::for_testing(),
1551 &mut buf,
1552 )
1553 .unwrap();
1554 let applied = apply_delta_snapshot(
1555 &schema,
1556 &baseline,
1557 &buf[..bytes],
1558 &wire::Limits::for_testing(),
1559 &CodecLimits::for_testing(),
1560 )
1561 .unwrap();
1562 assert_eq!(applied.entities, current.entities);
1563 }
1564
1565 #[test]
1566 fn baseline_tick_mismatch_is_error() {
1567 let schema = schema_one_bool();
1568 let baseline = baseline_snapshot();
1569 let current = baseline.clone();
1570 let mut buf = [0u8; 128];
1571 let bytes = encode_delta_snapshot(
1572 &schema,
1573 SnapshotTick::new(11),
1574 baseline.tick,
1575 &baseline,
1576 ¤t,
1577 &CodecLimits::for_testing(),
1578 &mut buf,
1579 )
1580 .unwrap();
1581 let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
1582 packet.header.baseline_tick = 999;
1583 wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
1584 let err = apply_delta_snapshot(
1585 &schema,
1586 &baseline,
1587 &buf[..bytes],
1588 &wire::Limits::for_testing(),
1589 &CodecLimits::for_testing(),
1590 )
1591 .unwrap_err();
1592 assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
1593 }
1594
1595 #[test]
1596 fn threshold_suppresses_small_change() {
1597 let schema = schema_uint_threshold(5);
1598 let baseline = Snapshot {
1599 tick: SnapshotTick::new(10),
1600 entities: vec![EntitySnapshot {
1601 id: EntityId::new(1),
1602 components: vec![ComponentSnapshot {
1603 id: ComponentId::new(1).unwrap(),
1604 fields: vec![FieldValue::UInt(10)],
1605 }],
1606 }],
1607 };
1608 let current = Snapshot {
1609 tick: SnapshotTick::new(11),
1610 entities: vec![EntitySnapshot {
1611 id: EntityId::new(1),
1612 components: vec![ComponentSnapshot {
1613 id: ComponentId::new(1).unwrap(),
1614 fields: vec![FieldValue::UInt(12)],
1615 }],
1616 }],
1617 };
1618
1619 let mut buf = [0u8; 128];
1620 let bytes = encode_delta_snapshot(
1621 &schema,
1622 current.tick,
1623 baseline.tick,
1624 &baseline,
1625 ¤t,
1626 &CodecLimits::for_testing(),
1627 &mut buf,
1628 )
1629 .unwrap();
1630
1631 let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
1632 assert_eq!(packet.sections.len(), 0);
1633
1634 let applied = apply_delta_snapshot(
1635 &schema,
1636 &baseline,
1637 &buf[..bytes],
1638 &wire::Limits::for_testing(),
1639 &CodecLimits::for_testing(),
1640 )
1641 .unwrap();
1642 assert_eq!(applied.entities, baseline.entities);
1643 }
1644}