1use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14 ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15 write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16 FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20#[must_use]
22pub fn select_baseline_tick<T>(
23 store: &BaselineStore<T>,
24 ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26 store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29pub fn encode_delta_snapshot(
35 schema: &schema::Schema,
36 tick: SnapshotTick,
37 baseline_tick: SnapshotTick,
38 baseline: &Snapshot,
39 current: &Snapshot,
40 limits: &CodecLimits,
41 out: &mut [u8],
42) -> CodecResult<usize> {
43 let mut scratch = CodecScratch::default();
44 encode_delta_snapshot_with_scratch(
45 schema,
46 tick,
47 baseline_tick,
48 baseline,
49 current,
50 limits,
51 &mut scratch,
52 out,
53 )
54}
55
56#[allow(clippy::too_many_arguments)]
58pub fn encode_delta_snapshot_with_scratch(
59 schema: &schema::Schema,
60 tick: SnapshotTick,
61 baseline_tick: SnapshotTick,
62 baseline: &Snapshot,
63 current: &Snapshot,
64 limits: &CodecLimits,
65 scratch: &mut CodecScratch,
66 out: &mut [u8],
67) -> CodecResult<usize> {
68 encode_delta_snapshot_with_scratch_mode(
69 schema,
70 tick,
71 baseline_tick,
72 baseline,
73 current,
74 limits,
75 scratch,
76 out,
77 EncodeUpdateMode::Auto,
78 )
79}
80
81pub fn encode_delta_snapshot_for_client(
86 schema: &schema::Schema,
87 tick: SnapshotTick,
88 baseline_tick: SnapshotTick,
89 baseline: &Snapshot,
90 current: &Snapshot,
91 limits: &CodecLimits,
92 out: &mut [u8],
93) -> CodecResult<usize> {
94 let mut scratch = CodecScratch::default();
95 encode_delta_snapshot_for_client_with_scratch(
96 schema,
97 tick,
98 baseline_tick,
99 baseline,
100 current,
101 limits,
102 &mut scratch,
103 out,
104 )
105}
106
107pub struct SessionEncoder<'a> {
112 schema: &'a schema::Schema,
113 limits: &'a CodecLimits,
114 #[allow(dead_code)]
115 scratch: CodecScratch,
116}
117
118impl<'a> SessionEncoder<'a> {
119 #[must_use]
120 pub fn new(schema: &'a schema::Schema, limits: &'a CodecLimits) -> Self {
121 Self {
122 schema,
123 limits,
124 scratch: CodecScratch::default(),
125 }
126 }
127
128 #[must_use]
129 pub fn schema(&self) -> &'a schema::Schema {
130 self.schema
131 }
132
133 #[must_use]
134 pub fn limits(&self) -> &'a CodecLimits {
135 self.limits
136 }
137}
138
139pub fn encode_delta_from_changes(
144 session: &mut SessionEncoder<'_>,
145 tick: SnapshotTick,
146 baseline_tick: SnapshotTick,
147 creates: &[EntitySnapshot],
148 destroys: &[EntityId],
149 updates: &[DeltaUpdateEntity],
150 out: &mut [u8],
151) -> CodecResult<usize> {
152 encode_delta_snapshot_from_updates(
153 session.schema,
154 tick,
155 baseline_tick,
156 destroys,
157 creates,
158 updates,
159 session.limits,
160 out,
161 )
162}
163
164#[allow(clippy::too_many_arguments)]
169pub fn encode_delta_snapshot_from_updates(
170 schema: &schema::Schema,
171 tick: SnapshotTick,
172 baseline_tick: SnapshotTick,
173 destroys: &[EntityId],
174 creates: &[EntitySnapshot],
175 updates: &[DeltaUpdateEntity],
176 limits: &CodecLimits,
177 out: &mut [u8],
178) -> CodecResult<usize> {
179 if out.len() < wire::HEADER_SIZE {
180 return Err(CodecError::OutputTooSmall {
181 needed: wire::HEADER_SIZE,
182 available: out.len(),
183 });
184 }
185 let payload_len = encode_delta_payload_from_updates(
186 schema,
187 destroys,
188 creates,
189 updates,
190 limits,
191 &mut out[wire::HEADER_SIZE..],
192 )?;
193 let header = wire::PacketHeader::delta_snapshot(
194 schema_hash(schema),
195 tick.raw(),
196 baseline_tick.raw(),
197 payload_len as u32,
198 );
199 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
200 CodecError::OutputTooSmall {
201 needed: wire::HEADER_SIZE,
202 available: out.len(),
203 }
204 })?;
205 Ok(wire::HEADER_SIZE + payload_len)
206}
207
208#[allow(clippy::too_many_arguments)]
210pub fn encode_delta_snapshot_for_client_session(
211 schema: &schema::Schema,
212 tick: SnapshotTick,
213 baseline_tick: SnapshotTick,
214 baseline: &Snapshot,
215 current: &Snapshot,
216 limits: &CodecLimits,
217 last_tick: &mut SnapshotTick,
218 out: &mut [u8],
219) -> CodecResult<usize> {
220 let mut scratch = CodecScratch::default();
221 encode_delta_snapshot_for_client_session_with_scratch(
222 schema,
223 tick,
224 baseline_tick,
225 baseline,
226 current,
227 limits,
228 &mut scratch,
229 last_tick,
230 out,
231 )
232}
233
234#[allow(clippy::too_many_arguments)]
236pub fn encode_delta_snapshot_for_client_with_scratch(
237 schema: &schema::Schema,
238 tick: SnapshotTick,
239 baseline_tick: SnapshotTick,
240 baseline: &Snapshot,
241 current: &Snapshot,
242 limits: &CodecLimits,
243 scratch: &mut CodecScratch,
244 out: &mut [u8],
245) -> CodecResult<usize> {
246 encode_delta_snapshot_with_scratch_mode(
247 schema,
248 tick,
249 baseline_tick,
250 baseline,
251 current,
252 limits,
253 scratch,
254 out,
255 EncodeUpdateMode::Sparse,
256 )
257}
258
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260enum EncodeUpdateMode {
261 Auto,
262 Sparse,
263}
264
265#[allow(clippy::too_many_arguments)]
267pub fn encode_delta_snapshot_for_client_session_with_scratch(
268 schema: &schema::Schema,
269 tick: SnapshotTick,
270 baseline_tick: SnapshotTick,
271 baseline: &Snapshot,
272 current: &Snapshot,
273 limits: &CodecLimits,
274 scratch: &mut CodecScratch,
275 last_tick: &mut SnapshotTick,
276 out: &mut [u8],
277) -> CodecResult<usize> {
278 let max_header = wire::SESSION_MAX_HEADER_SIZE;
279 if out.len() < max_header {
280 return Err(CodecError::OutputTooSmall {
281 needed: max_header,
282 available: out.len(),
283 });
284 }
285 if tick.raw() <= last_tick.raw() {
286 return Err(CodecError::InvalidEntityOrder {
287 previous: last_tick.raw(),
288 current: tick.raw(),
289 });
290 }
291 if baseline_tick.raw() > tick.raw() {
292 return Err(CodecError::BaselineTickMismatch {
293 expected: baseline_tick.raw(),
294 found: tick.raw(),
295 });
296 }
297 let payload_len = encode_delta_payload_with_mode(
298 schema,
299 baseline_tick,
300 baseline,
301 current,
302 limits,
303 scratch,
304 &mut out[max_header..],
305 EncodeUpdateMode::Sparse,
306 )?;
307 let tick_delta = tick.raw() - last_tick.raw();
308 let baseline_delta = tick.raw() - baseline_tick.raw();
309 let header_len = wire::encode_session_header(
310 &mut out[..max_header],
311 wire::SessionFlags::delta_snapshot(),
312 tick_delta,
313 baseline_delta,
314 payload_len as u32,
315 )
316 .map_err(|_| CodecError::OutputTooSmall {
317 needed: max_header,
318 available: out.len(),
319 })?;
320 if header_len < max_header {
321 let payload_start = max_header;
322 let payload_end = max_header + payload_len;
323 out.copy_within(payload_start..payload_end, header_len);
324 }
325 *last_tick = tick;
326 Ok(header_len + payload_len)
327}
328
329#[allow(clippy::too_many_arguments)]
330fn encode_delta_snapshot_with_scratch_mode(
331 schema: &schema::Schema,
332 tick: SnapshotTick,
333 baseline_tick: SnapshotTick,
334 baseline: &Snapshot,
335 current: &Snapshot,
336 limits: &CodecLimits,
337 scratch: &mut CodecScratch,
338 out: &mut [u8],
339 mode: EncodeUpdateMode,
340) -> CodecResult<usize> {
341 if out.len() < wire::HEADER_SIZE {
342 return Err(CodecError::OutputTooSmall {
343 needed: wire::HEADER_SIZE,
344 available: out.len(),
345 });
346 }
347 let payload_len = encode_delta_payload_with_mode(
348 schema,
349 baseline_tick,
350 baseline,
351 current,
352 limits,
353 scratch,
354 &mut out[wire::HEADER_SIZE..],
355 mode,
356 )?;
357 let header = wire::PacketHeader::delta_snapshot(
358 schema_hash(schema),
359 tick.raw(),
360 baseline_tick.raw(),
361 payload_len as u32,
362 );
363 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
364 CodecError::OutputTooSmall {
365 needed: wire::HEADER_SIZE,
366 available: out.len(),
367 }
368 })?;
369
370 Ok(wire::HEADER_SIZE + payload_len)
371}
372
373#[allow(clippy::too_many_arguments)]
374fn encode_delta_payload_with_mode(
375 schema: &schema::Schema,
376 baseline_tick: SnapshotTick,
377 baseline: &Snapshot,
378 current: &Snapshot,
379 limits: &CodecLimits,
380 scratch: &mut CodecScratch,
381 out: &mut [u8],
382 mode: EncodeUpdateMode,
383) -> CodecResult<usize> {
384 if baseline.tick != baseline_tick {
385 return Err(CodecError::BaselineTickMismatch {
386 expected: baseline.tick.raw(),
387 found: baseline_tick.raw(),
388 });
389 }
390
391 ensure_entities_sorted(&baseline.entities)?;
392 ensure_entities_sorted(¤t.entities)?;
393
394 let mut counts = DiffCounts::default();
395 diff_counts(schema, baseline, current, limits, &mut counts)?;
396
397 if counts.creates > limits.max_entities_create {
398 return Err(CodecError::LimitsExceeded {
399 kind: LimitKind::EntitiesCreate,
400 limit: limits.max_entities_create,
401 actual: counts.creates,
402 });
403 }
404 if counts.updates > limits.max_entities_update {
405 return Err(CodecError::LimitsExceeded {
406 kind: LimitKind::EntitiesUpdate,
407 limit: limits.max_entities_update,
408 actual: counts.updates,
409 });
410 }
411 if counts.destroys > limits.max_entities_destroy {
412 return Err(CodecError::LimitsExceeded {
413 kind: LimitKind::EntitiesDestroy,
414 limit: limits.max_entities_destroy,
415 actual: counts.destroys,
416 });
417 }
418
419 let mut offset = 0;
420 if counts.destroys > 0 {
421 let written = write_section(
422 SectionTag::EntityDestroy,
423 &mut out[offset..],
424 limits,
425 |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
426 )?;
427 offset += written;
428 }
429 if counts.creates > 0 {
430 let written = write_section(
431 SectionTag::EntityCreate,
432 &mut out[offset..],
433 limits,
434 |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
435 )?;
436 offset += written;
437 }
438 if counts.updates > 0 {
439 let update_encoding = match mode {
440 EncodeUpdateMode::Auto => {
441 select_update_encoding(schema, baseline, current, limits, scratch)?
442 }
443 EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
444 };
445 let section_tag = match update_encoding {
446 UpdateEncoding::Masked => SectionTag::EntityUpdate,
447 UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
448 };
449 let written =
450 write_section(
451 section_tag,
452 &mut out[offset..],
453 limits,
454 |writer| match update_encoding {
455 UpdateEncoding::Masked => encode_update_body_masked(
456 schema,
457 baseline,
458 current,
459 counts.updates,
460 limits,
461 scratch,
462 writer,
463 ),
464 UpdateEncoding::Sparse => encode_update_body_sparse_packed(
465 schema,
466 baseline,
467 current,
468 counts.updates,
469 limits,
470 scratch,
471 writer,
472 ),
473 },
474 )?;
475 offset += written;
476 }
477
478 Ok(offset)
479}
480
481#[allow(clippy::too_many_arguments)]
482fn encode_delta_payload_from_updates(
483 schema: &schema::Schema,
484 destroys: &[EntityId],
485 creates: &[EntitySnapshot],
486 updates: &[DeltaUpdateEntity],
487 limits: &CodecLimits,
488 out: &mut [u8],
489) -> CodecResult<usize> {
490 if destroys.len() > limits.max_entities_destroy {
491 return Err(CodecError::LimitsExceeded {
492 kind: LimitKind::EntitiesDestroy,
493 limit: limits.max_entities_destroy,
494 actual: destroys.len(),
495 });
496 }
497 if creates.len() > limits.max_entities_create {
498 return Err(CodecError::LimitsExceeded {
499 kind: LimitKind::EntitiesCreate,
500 limit: limits.max_entities_create,
501 actual: creates.len(),
502 });
503 }
504 if updates.len() > limits.max_entities_update {
505 return Err(CodecError::LimitsExceeded {
506 kind: LimitKind::EntitiesUpdate,
507 limit: limits.max_entities_update,
508 actual: updates.len(),
509 });
510 }
511
512 ensure_entity_ids_sorted(destroys)?;
513 ensure_entities_sorted(creates)?;
514 validate_updates_for_encoding(schema, updates, limits)?;
515
516 let mut offset = 0;
517 if !destroys.is_empty() {
518 let written = write_section(
519 SectionTag::EntityDestroy,
520 &mut out[offset..],
521 limits,
522 |writer| encode_destroy_body_from_list(destroys, limits, writer),
523 )?;
524 offset += written;
525 }
526 if !creates.is_empty() {
527 let written = write_section(
528 SectionTag::EntityCreate,
529 &mut out[offset..],
530 limits,
531 |writer| encode_create_body_from_list(schema, creates, limits, writer),
532 )?;
533 offset += written;
534 }
535 if !updates.is_empty() {
536 let written = write_section(
537 SectionTag::EntityUpdateSparsePacked,
538 &mut out[offset..],
539 limits,
540 |writer| encode_update_body_sparse_packed_from_updates(schema, updates, limits, writer),
541 )?;
542 offset += written;
543 }
544
545 Ok(offset)
546}
547
548pub fn apply_delta_snapshot(
550 schema: &schema::Schema,
551 baseline: &Snapshot,
552 bytes: &[u8],
553 wire_limits: &wire::Limits,
554 limits: &CodecLimits,
555) -> CodecResult<Snapshot> {
556 let packet = decode_packet(bytes, wire_limits)?;
557 apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
558}
559
560pub fn apply_delta_snapshot_from_packet(
562 schema: &schema::Schema,
563 baseline: &Snapshot,
564 packet: &WirePacket<'_>,
565 limits: &CodecLimits,
566) -> CodecResult<Snapshot> {
567 let header = packet.header;
568 if !header.flags.is_delta_snapshot() {
569 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
570 flags: header.flags.raw(),
571 }));
572 }
573 if header.baseline_tick == 0 {
574 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
575 baseline_tick: header.baseline_tick,
576 flags: header.flags.raw(),
577 }));
578 }
579 if header.baseline_tick != baseline.tick.raw() {
580 return Err(CodecError::BaselineTickMismatch {
581 expected: baseline.tick.raw(),
582 found: header.baseline_tick,
583 });
584 }
585
586 let expected_hash = schema_hash(schema);
587 if header.schema_hash != expected_hash {
588 return Err(CodecError::SchemaMismatch {
589 expected: expected_hash,
590 found: header.schema_hash,
591 });
592 }
593
594 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
595
596 ensure_entities_sorted(&baseline.entities)?;
597 ensure_entities_sorted(&creates)?;
598
599 let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
600 remaining = apply_creates(remaining, creates)?;
601 if remaining.len() > limits.max_total_entities_after_apply {
602 return Err(CodecError::LimitsExceeded {
603 kind: LimitKind::TotalEntitiesAfterApply,
604 limit: limits.max_total_entities_after_apply,
605 actual: remaining.len(),
606 });
607 }
608 apply_updates(&mut remaining, &updates)?;
609
610 Ok(Snapshot {
611 tick: SnapshotTick::new(header.tick),
612 entities: remaining,
613 })
614}
615
616pub fn decode_delta_packet(
618 schema: &schema::Schema,
619 packet: &WirePacket<'_>,
620 limits: &CodecLimits,
621) -> CodecResult<DeltaDecoded> {
622 let header = packet.header;
623 if !header.flags.is_delta_snapshot() {
624 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
625 flags: header.flags.raw(),
626 }));
627 }
628 if header.baseline_tick == 0 {
629 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
630 baseline_tick: header.baseline_tick,
631 flags: header.flags.raw(),
632 }));
633 }
634 let expected_hash = schema_hash(schema);
635 if header.schema_hash != expected_hash {
636 return Err(CodecError::SchemaMismatch {
637 expected: expected_hash,
638 found: header.schema_hash,
639 });
640 }
641
642 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
643
644 Ok(DeltaDecoded {
645 tick: SnapshotTick::new(header.tick),
646 baseline_tick: SnapshotTick::new(header.baseline_tick),
647 destroys,
648 creates,
649 updates,
650 })
651}
652
653#[derive(Default)]
654struct DiffCounts {
655 creates: usize,
656 updates: usize,
657 destroys: usize,
658}
659
660#[derive(Debug, Clone, Copy, PartialEq, Eq)]
661enum UpdateEncoding {
662 Masked,
663 Sparse,
664}
665
666fn diff_counts(
667 schema: &schema::Schema,
668 baseline: &Snapshot,
669 current: &Snapshot,
670 limits: &CodecLimits,
671 counts: &mut DiffCounts,
672) -> CodecResult<()> {
673 let mut i = 0usize;
674 let mut j = 0usize;
675 while i < baseline.entities.len() || j < current.entities.len() {
676 let base = baseline.entities.get(i);
677 let curr = current.entities.get(j);
678 match (base, curr) {
679 (Some(b), Some(c)) => {
680 if b.id.raw() < c.id.raw() {
681 counts.destroys += 1;
682 i += 1;
683 } else if b.id.raw() > c.id.raw() {
684 counts.creates += 1;
685 j += 1;
686 } else {
687 if entity_has_updates(schema, b, c, limits)? {
688 counts.updates += 1;
689 }
690 i += 1;
691 j += 1;
692 }
693 }
694 (Some(_), None) => {
695 counts.destroys += 1;
696 i += 1;
697 }
698 (None, Some(_)) => {
699 counts.creates += 1;
700 j += 1;
701 }
702 (None, None) => break,
703 }
704 }
705 Ok(())
706}
707
708fn select_update_encoding(
709 schema: &schema::Schema,
710 baseline: &Snapshot,
711 current: &Snapshot,
712 limits: &CodecLimits,
713 scratch: &mut CodecScratch,
714) -> CodecResult<UpdateEncoding> {
715 let component_count = schema.components.len();
716 let mut mask_bits = 0usize;
717 let mut sparse_bits = 0usize;
718 let mut baseline_iter = baseline.entities.iter();
719 let mut current_iter = current.entities.iter();
720 let mut baseline_next = baseline_iter.next();
721 let mut current_next = current_iter.next();
722
723 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
724 match base.id.cmp(&curr.id) {
725 Ordering::Less => {
726 baseline_next = baseline_iter.next();
727 }
728 Ordering::Greater => {
729 current_next = current_iter.next();
730 }
731 Ordering::Equal => {
732 for component in &schema.components {
733 let base_component = find_component(base, component.id);
734 let curr_component = find_component(curr, component.id);
735 if base_component.is_some() != curr_component.is_some() {
736 return Err(CodecError::InvalidMask {
737 kind: MaskKind::ComponentMask,
738 reason: MaskReason::ComponentPresenceMismatch {
739 component: component.id,
740 },
741 });
742 }
743 if let (Some(base_component), Some(curr_component)) =
744 (base_component, curr_component)
745 {
746 if base_component.fields.len() != component.fields.len()
747 || curr_component.fields.len() != component.fields.len()
748 {
749 return Err(CodecError::InvalidMask {
750 kind: MaskKind::FieldMask {
751 component: component.id,
752 },
753 reason: MaskReason::FieldCountMismatch {
754 expected: component.fields.len(),
755 actual: base_component
756 .fields
757 .len()
758 .max(curr_component.fields.len()),
759 },
760 });
761 }
762 if component.fields.len() > limits.max_fields_per_component {
763 return Err(CodecError::LimitsExceeded {
764 kind: LimitKind::FieldsPerComponent,
765 limit: limits.max_fields_per_component,
766 actual: component.fields.len(),
767 });
768 }
769 let (_, field_mask) = scratch
770 .component_and_field_masks_mut(component_count, component.fields.len());
771 let field_mask = compute_field_mask_into(
773 component,
774 base_component,
775 curr_component,
776 field_mask,
777 )?;
778 let changed = field_mask.iter().filter(|bit| **bit).count();
779 if changed > 0 {
780 let field_count = component.fields.len();
781 let index_bits =
782 required_bits(field_count.saturating_sub(1) as u64) as usize;
783 mask_bits += field_count;
786 sparse_bits += index_bits * changed;
787 sparse_bits += varu32_len(curr.id.raw()) * 8;
788 sparse_bits += varu32_len(component.id.get() as u32) * 8;
789 sparse_bits += varu32_len(changed as u32) * 8;
790 }
791 }
792 }
793
794 baseline_next = baseline_iter.next();
795 current_next = current_iter.next();
796 }
797 }
798 }
799
800 if mask_bits == 0 {
801 return Ok(UpdateEncoding::Masked);
802 }
803
804 if sparse_bits <= mask_bits {
805 Ok(UpdateEncoding::Sparse)
806 } else {
807 Ok(UpdateEncoding::Masked)
808 }
809}
810
811fn varu32_len(value: u32) -> usize {
812 if value < (1 << 7) {
813 1
814 } else if value < (1 << 14) {
815 2
816 } else if value < (1 << 21) {
817 3
818 } else if value < (1 << 28) {
819 4
820 } else {
821 5
822 }
823}
824
825fn ensure_entity_ids_sorted(ids: &[EntityId]) -> CodecResult<()> {
826 let mut prev: Option<u32> = None;
827 for id in ids {
828 let raw = id.raw();
829 if let Some(prev_id) = prev {
830 if raw <= prev_id {
831 return Err(CodecError::InvalidEntityOrder {
832 previous: prev_id,
833 current: raw,
834 });
835 }
836 }
837 prev = Some(raw);
838 }
839 Ok(())
840}
841
842fn encode_destroy_body_from_list(
843 destroys: &[EntityId],
844 limits: &CodecLimits,
845 writer: &mut BitWriter<'_>,
846) -> CodecResult<()> {
847 if destroys.len() > limits.max_entities_destroy {
848 return Err(CodecError::LimitsExceeded {
849 kind: LimitKind::EntitiesDestroy,
850 limit: limits.max_entities_destroy,
851 actual: destroys.len(),
852 });
853 }
854 writer.align_to_byte()?;
855 writer.write_varu32(destroys.len() as u32)?;
856 for id in destroys {
857 writer.align_to_byte()?;
858 writer.write_u32_aligned(id.raw())?;
859 }
860 writer.align_to_byte()?;
861 Ok(())
862}
863
864fn encode_create_body_from_list(
865 schema: &schema::Schema,
866 creates: &[EntitySnapshot],
867 limits: &CodecLimits,
868 writer: &mut BitWriter<'_>,
869) -> CodecResult<()> {
870 if creates.len() > limits.max_entities_create {
871 return Err(CodecError::LimitsExceeded {
872 kind: LimitKind::EntitiesCreate,
873 limit: limits.max_entities_create,
874 actual: creates.len(),
875 });
876 }
877 writer.align_to_byte()?;
878 writer.write_varu32(creates.len() as u32)?;
879 for entity in creates {
880 write_create_entity(schema, entity, limits, writer)?;
881 }
882 writer.align_to_byte()?;
883 Ok(())
884}
885
886fn validate_updates_for_encoding(
887 schema: &schema::Schema,
888 updates: &[DeltaUpdateEntity],
889 limits: &CodecLimits,
890) -> CodecResult<()> {
891 let mut prev: Option<u32> = None;
892 for entity_update in updates {
893 let id = entity_update.id.raw();
894 if let Some(prev_id) = prev {
895 if id <= prev_id {
896 return Err(CodecError::InvalidEntityOrder {
897 previous: prev_id,
898 current: id,
899 });
900 }
901 }
902 prev = Some(id);
903 if entity_update.components.len() > limits.max_components_per_entity {
904 return Err(CodecError::LimitsExceeded {
905 kind: LimitKind::ComponentsPerEntity,
906 limit: limits.max_components_per_entity,
907 actual: entity_update.components.len(),
908 });
909 }
910 for component_update in &entity_update.components {
911 let component = schema
912 .components
913 .iter()
914 .find(|component| component.id == component_update.id)
915 .ok_or(CodecError::InvalidMask {
916 kind: MaskKind::ComponentMask,
917 reason: MaskReason::UnknownComponent {
918 component: component_update.id,
919 },
920 })?;
921 if component_update.fields.is_empty() {
922 return Err(CodecError::InvalidMask {
923 kind: MaskKind::FieldMask {
924 component: component_update.id,
925 },
926 reason: MaskReason::EmptyFieldMask {
927 component: component_update.id,
928 },
929 });
930 }
931 if component_update.fields.len() > limits.max_fields_per_component {
932 return Err(CodecError::LimitsExceeded {
933 kind: LimitKind::FieldsPerComponent,
934 limit: limits.max_fields_per_component,
935 actual: component_update.fields.len(),
936 });
937 }
938 let max_index = component.fields.len().saturating_sub(1);
939 for (field_idx, _value) in &component_update.fields {
940 if *field_idx >= component.fields.len() {
941 return Err(CodecError::InvalidMask {
942 kind: MaskKind::FieldMask {
943 component: component_update.id,
944 },
945 reason: MaskReason::InvalidFieldIndex {
946 field_index: *field_idx,
947 max: max_index,
948 },
949 });
950 }
951 }
952 }
953 }
954 Ok(())
955}
956
957fn encode_update_body_sparse_packed_from_updates(
958 schema: &schema::Schema,
959 updates: &[DeltaUpdateEntity],
960 limits: &CodecLimits,
961 writer: &mut BitWriter<'_>,
962) -> CodecResult<()> {
963 if updates.len() > limits.max_entities_update {
964 return Err(CodecError::LimitsExceeded {
965 kind: LimitKind::EntitiesUpdate,
966 limit: limits.max_entities_update,
967 actual: updates.len(),
968 });
969 }
970 let entry_count: usize = updates.iter().map(|entity| entity.components.len()).sum();
971 let entry_limit = limits
972 .max_entities_update
973 .saturating_mul(limits.max_components_per_entity);
974 if entry_count > entry_limit {
975 return Err(CodecError::LimitsExceeded {
976 kind: LimitKind::EntitiesUpdate,
977 limit: entry_limit,
978 actual: entry_count,
979 });
980 }
981 writer.align_to_byte()?;
982 writer.write_varu32(entry_count as u32)?;
983 for entity_update in updates {
984 let entity_id = entity_update.id.raw();
985 for component_update in &entity_update.components {
986 let component = schema
987 .components
988 .iter()
989 .find(|component| component.id == component_update.id)
990 .ok_or(CodecError::InvalidMask {
991 kind: MaskKind::ComponentMask,
992 reason: MaskReason::UnknownComponent {
993 component: component_update.id,
994 },
995 })?;
996 writer.align_to_byte()?;
997 writer.write_varu32(entity_id)?;
998 writer.write_varu32(component.id.get() as u32)?;
999 writer.write_varu32(component_update.fields.len() as u32)?;
1000 let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1001 for (field_idx, value) in &component_update.fields {
1002 if index_bits > 0 {
1003 writer.write_bits(*field_idx as u64, index_bits)?;
1004 }
1005 write_field_value_sparse(
1006 component.id,
1007 component.fields[*field_idx],
1008 *value,
1009 writer,
1010 )?;
1011 }
1012 }
1013 }
1014 writer.align_to_byte()?;
1015 Ok(())
1016}
1017
1018fn encode_destroy_body(
1019 baseline: &Snapshot,
1020 current: &Snapshot,
1021 destroy_count: usize,
1022 limits: &CodecLimits,
1023 writer: &mut BitWriter<'_>,
1024) -> CodecResult<()> {
1025 if destroy_count > limits.max_entities_destroy {
1026 return Err(CodecError::LimitsExceeded {
1027 kind: LimitKind::EntitiesDestroy,
1028 limit: limits.max_entities_destroy,
1029 actual: destroy_count,
1030 });
1031 }
1032
1033 writer.align_to_byte()?;
1034 writer.write_varu32(destroy_count as u32)?;
1035
1036 let mut i = 0usize;
1037 let mut j = 0usize;
1038 while i < baseline.entities.len() || j < current.entities.len() {
1039 let base = baseline.entities.get(i);
1040 let curr = current.entities.get(j);
1041 match (base, curr) {
1042 (Some(b), Some(c)) => {
1043 if b.id.raw() < c.id.raw() {
1044 writer.align_to_byte()?;
1045 writer.write_u32_aligned(b.id.raw())?;
1046 i += 1;
1047 } else if b.id.raw() > c.id.raw() {
1048 j += 1;
1049 } else {
1050 i += 1;
1051 j += 1;
1052 }
1053 }
1054 (Some(b), None) => {
1055 writer.align_to_byte()?;
1056 writer.write_u32_aligned(b.id.raw())?;
1057 i += 1;
1058 }
1059 (None, Some(_)) => {
1060 j += 1;
1061 }
1062 (None, None) => break,
1063 }
1064 }
1065
1066 writer.align_to_byte()?;
1067 Ok(())
1068}
1069
1070fn encode_create_body(
1071 schema: &schema::Schema,
1072 baseline: &Snapshot,
1073 current: &Snapshot,
1074 create_count: usize,
1075 limits: &CodecLimits,
1076 writer: &mut BitWriter<'_>,
1077) -> CodecResult<()> {
1078 if create_count > limits.max_entities_create {
1079 return Err(CodecError::LimitsExceeded {
1080 kind: LimitKind::EntitiesCreate,
1081 limit: limits.max_entities_create,
1082 actual: create_count,
1083 });
1084 }
1085
1086 writer.align_to_byte()?;
1087 writer.write_varu32(create_count as u32)?;
1088
1089 let mut i = 0usize;
1090 let mut j = 0usize;
1091 while i < baseline.entities.len() || j < current.entities.len() {
1092 let base = baseline.entities.get(i);
1093 let curr = current.entities.get(j);
1094 match (base, curr) {
1095 (Some(b), Some(c)) => {
1096 if b.id.raw() < c.id.raw() {
1097 i += 1;
1098 } else if b.id.raw() > c.id.raw() {
1099 write_create_entity(schema, c, limits, writer)?;
1100 j += 1;
1101 } else {
1102 i += 1;
1103 j += 1;
1104 }
1105 }
1106 (Some(_), None) => {
1107 i += 1;
1108 }
1109 (None, Some(c)) => {
1110 write_create_entity(schema, c, limits, writer)?;
1111 j += 1;
1112 }
1113 (None, None) => break,
1114 }
1115 }
1116
1117 writer.align_to_byte()?;
1118 Ok(())
1119}
1120
1121fn encode_update_body_masked(
1122 schema: &schema::Schema,
1123 baseline: &Snapshot,
1124 current: &Snapshot,
1125 update_count: usize,
1126 limits: &CodecLimits,
1127 scratch: &mut CodecScratch,
1128 writer: &mut BitWriter<'_>,
1129) -> CodecResult<()> {
1130 if update_count > limits.max_entities_update {
1131 return Err(CodecError::LimitsExceeded {
1132 kind: LimitKind::EntitiesUpdate,
1133 limit: limits.max_entities_update,
1134 actual: update_count,
1135 });
1136 }
1137
1138 writer.align_to_byte()?;
1139 writer.write_varu32(update_count as u32)?;
1140
1141 let mut i = 0usize;
1142 let mut j = 0usize;
1143 while i < baseline.entities.len() || j < current.entities.len() {
1144 let base = baseline.entities.get(i);
1145 let curr = current.entities.get(j);
1146 match (base, curr) {
1147 (Some(b), Some(c)) => {
1148 if b.id.raw() < c.id.raw() {
1149 i += 1;
1150 } else if b.id.raw() > c.id.raw() {
1151 j += 1;
1152 } else {
1153 if entity_has_updates(schema, b, c, limits)? {
1154 writer.align_to_byte()?;
1155 writer.write_u32_aligned(c.id.raw())?;
1156 ensure_component_presence_matches(schema, b, c)?;
1157 write_update_components(schema, b, c, limits, scratch, writer)?;
1158 }
1159 i += 1;
1160 j += 1;
1161 }
1162 }
1163 (Some(_), None) => i += 1,
1164 (None, Some(_)) => j += 1,
1165 (None, None) => break,
1166 }
1167 }
1168
1169 writer.align_to_byte()?;
1170 Ok(())
1171}
1172
1173#[allow(dead_code)]
1174fn encode_update_body_sparse_varint(
1175 schema: &schema::Schema,
1176 baseline: &Snapshot,
1177 current: &Snapshot,
1178 update_count: usize,
1179 limits: &CodecLimits,
1180 scratch: &mut CodecScratch,
1181 writer: &mut BitWriter<'_>,
1182) -> CodecResult<()> {
1183 if update_count > limits.max_entities_update {
1184 return Err(CodecError::LimitsExceeded {
1185 kind: LimitKind::EntitiesUpdate,
1186 limit: limits.max_entities_update,
1187 actual: update_count,
1188 });
1189 }
1190
1191 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1192 let entry_limit = limits
1193 .max_entities_update
1194 .saturating_mul(limits.max_components_per_entity);
1195 if entry_count > entry_limit {
1196 return Err(CodecError::LimitsExceeded {
1197 kind: LimitKind::EntitiesUpdate,
1198 limit: entry_limit,
1199 actual: entry_count,
1200 });
1201 }
1202
1203 writer.align_to_byte()?;
1204 writer.write_varu32(entry_count as u32)?;
1205
1206 let mut baseline_iter = baseline.entities.iter();
1207 let mut current_iter = current.entities.iter();
1208 let mut baseline_next = baseline_iter.next();
1209 let mut current_next = current_iter.next();
1210 let component_count = schema.components.len();
1211
1212 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1213 match base.id.cmp(&curr.id) {
1214 Ordering::Less => {
1215 baseline_next = baseline_iter.next();
1216 }
1217 Ordering::Greater => {
1218 current_next = current_iter.next();
1219 }
1220 Ordering::Equal => {
1221 if entity_has_updates(schema, base, curr, limits)? {
1222 for component in &schema.components {
1223 let base_component = find_component(base, component.id);
1224 let curr_component = find_component(curr, component.id);
1225 if base_component.is_some() != curr_component.is_some() {
1226 return Err(CodecError::InvalidMask {
1227 kind: MaskKind::ComponentMask,
1228 reason: MaskReason::ComponentPresenceMismatch {
1229 component: component.id,
1230 },
1231 });
1232 }
1233 if let (Some(base_component), Some(curr_component)) =
1234 (base_component, curr_component)
1235 {
1236 if base_component.fields.len() != component.fields.len()
1237 || curr_component.fields.len() != component.fields.len()
1238 {
1239 return Err(CodecError::InvalidMask {
1240 kind: MaskKind::FieldMask {
1241 component: component.id,
1242 },
1243 reason: MaskReason::FieldCountMismatch {
1244 expected: component.fields.len(),
1245 actual: base_component
1246 .fields
1247 .len()
1248 .max(curr_component.fields.len()),
1249 },
1250 });
1251 }
1252 if component.fields.len() > limits.max_fields_per_component {
1253 return Err(CodecError::LimitsExceeded {
1254 kind: LimitKind::FieldsPerComponent,
1255 limit: limits.max_fields_per_component,
1256 actual: component.fields.len(),
1257 });
1258 }
1259 let (_, field_mask) = scratch.component_and_field_masks_mut(
1260 component_count,
1261 component.fields.len(),
1262 );
1263 let field_mask = compute_field_mask_into(
1264 component,
1265 base_component,
1266 curr_component,
1267 field_mask,
1268 )?;
1269 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1270 if changed_fields > 0 {
1271 writer.align_to_byte()?;
1272 writer.write_u32_aligned(curr.id.raw())?;
1273 writer.write_u16_aligned(component.id.get())?;
1274 writer.write_varu32(changed_fields as u32)?;
1275 for (idx, field) in component.fields.iter().enumerate() {
1276 if field_mask[idx] {
1277 writer.align_to_byte()?;
1278 writer.write_varu32(idx as u32)?;
1279 write_field_value_sparse(
1280 component.id,
1281 *field,
1282 curr_component.fields[idx],
1283 writer,
1284 )?;
1285 }
1286 }
1287 }
1288 }
1289 }
1290 }
1291 baseline_next = baseline_iter.next();
1292 current_next = current_iter.next();
1293 }
1294 }
1295 }
1296
1297 writer.align_to_byte()?;
1298 Ok(())
1299}
1300
1301fn encode_update_body_sparse_packed(
1302 schema: &schema::Schema,
1303 baseline: &Snapshot,
1304 current: &Snapshot,
1305 update_count: usize,
1306 limits: &CodecLimits,
1307 scratch: &mut CodecScratch,
1308 writer: &mut BitWriter<'_>,
1309) -> CodecResult<()> {
1310 if update_count > limits.max_entities_update {
1311 return Err(CodecError::LimitsExceeded {
1312 kind: LimitKind::EntitiesUpdate,
1313 limit: limits.max_entities_update,
1314 actual: update_count,
1315 });
1316 }
1317
1318 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1319 let entry_limit = limits
1320 .max_entities_update
1321 .saturating_mul(limits.max_components_per_entity);
1322 if entry_count > entry_limit {
1323 return Err(CodecError::LimitsExceeded {
1324 kind: LimitKind::EntitiesUpdate,
1325 limit: entry_limit,
1326 actual: entry_count,
1327 });
1328 }
1329
1330 writer.align_to_byte()?;
1331 writer.write_varu32(entry_count as u32)?;
1332
1333 let mut baseline_iter = baseline.entities.iter();
1334 let mut current_iter = current.entities.iter();
1335 let mut baseline_next = baseline_iter.next();
1336 let mut current_next = current_iter.next();
1337 let component_count = schema.components.len();
1338
1339 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1340 match base.id.cmp(&curr.id) {
1341 Ordering::Less => {
1342 baseline_next = baseline_iter.next();
1343 }
1344 Ordering::Greater => {
1345 current_next = current_iter.next();
1346 }
1347 Ordering::Equal => {
1348 if entity_has_updates(schema, base, curr, limits)? {
1349 for component in &schema.components {
1350 let base_component = find_component(base, component.id);
1351 let curr_component = find_component(curr, component.id);
1352 if base_component.is_some() != curr_component.is_some() {
1353 return Err(CodecError::InvalidMask {
1354 kind: MaskKind::ComponentMask,
1355 reason: MaskReason::ComponentPresenceMismatch {
1356 component: component.id,
1357 },
1358 });
1359 }
1360 if let (Some(base_component), Some(curr_component)) =
1361 (base_component, curr_component)
1362 {
1363 if base_component.fields.len() != component.fields.len()
1364 || curr_component.fields.len() != component.fields.len()
1365 {
1366 return Err(CodecError::InvalidMask {
1367 kind: MaskKind::FieldMask {
1368 component: component.id,
1369 },
1370 reason: MaskReason::FieldCountMismatch {
1371 expected: component.fields.len(),
1372 actual: base_component
1373 .fields
1374 .len()
1375 .max(curr_component.fields.len()),
1376 },
1377 });
1378 }
1379 if component.fields.len() > limits.max_fields_per_component {
1380 return Err(CodecError::LimitsExceeded {
1381 kind: LimitKind::FieldsPerComponent,
1382 limit: limits.max_fields_per_component,
1383 actual: component.fields.len(),
1384 });
1385 }
1386 let (_, field_mask) = scratch.component_and_field_masks_mut(
1387 component_count,
1388 component.fields.len(),
1389 );
1390 let field_mask = compute_field_mask_into(
1391 component,
1392 base_component,
1393 curr_component,
1394 field_mask,
1395 )?;
1396 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1397 if changed_fields > 0 {
1398 writer.align_to_byte()?;
1399 writer.write_varu32(curr.id.raw())?;
1400 writer.write_varu32(component.id.get() as u32)?;
1401 writer.write_varu32(changed_fields as u32)?;
1402 let index_bits =
1403 required_bits(component.fields.len().saturating_sub(1) as u64);
1404 for (idx, field) in component.fields.iter().enumerate() {
1405 if field_mask[idx] {
1406 if index_bits > 0 {
1407 writer.write_bits(idx as u64, index_bits)?;
1408 }
1409 write_field_value_sparse(
1410 component.id,
1411 *field,
1412 curr_component.fields[idx],
1413 writer,
1414 )?;
1415 }
1416 }
1417 }
1418 }
1419 }
1420 }
1421 baseline_next = baseline_iter.next();
1422 current_next = current_iter.next();
1423 }
1424 }
1425 }
1426
1427 writer.align_to_byte()?;
1428 Ok(())
1429}
1430
1431fn count_sparse_update_entries(
1432 schema: &schema::Schema,
1433 baseline: &Snapshot,
1434 current: &Snapshot,
1435 limits: &CodecLimits,
1436 scratch: &mut CodecScratch,
1437) -> CodecResult<usize> {
1438 let component_count = schema.components.len();
1439 let mut count = 0usize;
1440 let mut baseline_iter = baseline.entities.iter();
1441 let mut current_iter = current.entities.iter();
1442 let mut baseline_next = baseline_iter.next();
1443 let mut current_next = current_iter.next();
1444
1445 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1446 match base.id.cmp(&curr.id) {
1447 Ordering::Less => baseline_next = baseline_iter.next(),
1448 Ordering::Greater => current_next = current_iter.next(),
1449 Ordering::Equal => {
1450 if entity_has_updates(schema, base, curr, limits)? {
1451 for component in &schema.components {
1452 let base_component = find_component(base, component.id);
1453 let curr_component = find_component(curr, component.id);
1454 if base_component.is_some() != curr_component.is_some() {
1455 return Err(CodecError::InvalidMask {
1456 kind: MaskKind::ComponentMask,
1457 reason: MaskReason::ComponentPresenceMismatch {
1458 component: component.id,
1459 },
1460 });
1461 }
1462 if let (Some(base_component), Some(curr_component)) =
1463 (base_component, curr_component)
1464 {
1465 if base_component.fields.len() != component.fields.len()
1466 || curr_component.fields.len() != component.fields.len()
1467 {
1468 return Err(CodecError::InvalidMask {
1469 kind: MaskKind::FieldMask {
1470 component: component.id,
1471 },
1472 reason: MaskReason::FieldCountMismatch {
1473 expected: component.fields.len(),
1474 actual: base_component
1475 .fields
1476 .len()
1477 .max(curr_component.fields.len()),
1478 },
1479 });
1480 }
1481 if component.fields.len() > limits.max_fields_per_component {
1482 return Err(CodecError::LimitsExceeded {
1483 kind: LimitKind::FieldsPerComponent,
1484 limit: limits.max_fields_per_component,
1485 actual: component.fields.len(),
1486 });
1487 }
1488 let (_, field_mask) = scratch.component_and_field_masks_mut(
1489 component_count,
1490 component.fields.len(),
1491 );
1492 let field_mask = compute_field_mask_into(
1493 component,
1494 base_component,
1495 curr_component,
1496 field_mask,
1497 )?;
1498 if field_mask.iter().any(|bit| *bit) {
1499 count += 1;
1500 }
1501 }
1502 }
1503 }
1504 baseline_next = baseline_iter.next();
1505 current_next = current_iter.next();
1506 }
1507 }
1508 }
1509
1510 Ok(count)
1511}
1512
1513fn write_create_entity(
1514 schema: &schema::Schema,
1515 entity: &EntitySnapshot,
1516 limits: &CodecLimits,
1517 writer: &mut BitWriter<'_>,
1518) -> CodecResult<()> {
1519 writer.align_to_byte()?;
1520 writer.write_u32_aligned(entity.id.raw())?;
1521 ensure_known_components(schema, entity)?;
1522 write_component_mask(schema, entity, writer)?;
1523 for component in schema.components.iter() {
1524 if let Some(snapshot) = find_component(entity, component.id) {
1525 write_full_component(component, snapshot, limits, writer)?;
1526 }
1527 }
1528 Ok(())
1529}
1530
1531fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1532 if body.len() > limits.max_section_bytes {
1533 return Err(CodecError::LimitsExceeded {
1534 kind: LimitKind::SectionBytes,
1535 limit: limits.max_section_bytes,
1536 actual: body.len(),
1537 });
1538 }
1539
1540 let mut reader = BitReader::new(body);
1541 reader.align_to_byte()?;
1542 let count = reader.read_varu32()? as usize;
1543 if count > limits.max_entities_destroy {
1544 return Err(CodecError::LimitsExceeded {
1545 kind: LimitKind::EntitiesDestroy,
1546 limit: limits.max_entities_destroy,
1547 actual: count,
1548 });
1549 }
1550
1551 let mut ids = Vec::with_capacity(count);
1552 let mut prev: Option<u32> = None;
1553 for _ in 0..count {
1554 reader.align_to_byte()?;
1555 let id = reader.read_u32_aligned()?;
1556 if let Some(prev_id) = prev {
1557 if id <= prev_id {
1558 return Err(CodecError::InvalidEntityOrder {
1559 previous: prev_id,
1560 current: id,
1561 });
1562 }
1563 }
1564 prev = Some(id);
1565 ids.push(EntityId::new(id));
1566 }
1567 reader.align_to_byte()?;
1568 if reader.bits_remaining() != 0 {
1569 return Err(CodecError::TrailingSectionData {
1570 section: SectionTag::EntityDestroy,
1571 remaining_bits: reader.bits_remaining(),
1572 });
1573 }
1574 Ok(ids)
1575}
1576
1577fn decode_create_section(
1578 schema: &schema::Schema,
1579 body: &[u8],
1580 limits: &CodecLimits,
1581) -> CodecResult<Vec<EntitySnapshot>> {
1582 if body.len() > limits.max_section_bytes {
1583 return Err(CodecError::LimitsExceeded {
1584 kind: LimitKind::SectionBytes,
1585 limit: limits.max_section_bytes,
1586 actual: body.len(),
1587 });
1588 }
1589
1590 let mut reader = BitReader::new(body);
1591 reader.align_to_byte()?;
1592 let count = reader.read_varu32()? as usize;
1593 if count > limits.max_entities_create {
1594 return Err(CodecError::LimitsExceeded {
1595 kind: LimitKind::EntitiesCreate,
1596 limit: limits.max_entities_create,
1597 actual: count,
1598 });
1599 }
1600
1601 let mut entities = Vec::with_capacity(count);
1602 let mut prev: Option<u32> = None;
1603 for _ in 0..count {
1604 reader.align_to_byte()?;
1605 let id = reader.read_u32_aligned()?;
1606 if let Some(prev_id) = prev {
1607 if id <= prev_id {
1608 return Err(CodecError::InvalidEntityOrder {
1609 previous: prev_id,
1610 current: id,
1611 });
1612 }
1613 }
1614 prev = Some(id);
1615
1616 let component_mask = read_mask(
1617 &mut reader,
1618 schema.components.len(),
1619 MaskKind::ComponentMask,
1620 )?;
1621
1622 let mut components = Vec::new();
1623 for (idx, component) in schema.components.iter().enumerate() {
1624 if component_mask[idx] {
1625 let fields = decode_full_component(component, &mut reader, limits)?;
1626 components.push(ComponentSnapshot {
1627 id: component.id,
1628 fields,
1629 });
1630 }
1631 }
1632
1633 let entity = EntitySnapshot {
1634 id: EntityId::new(id),
1635 components,
1636 };
1637 ensure_known_components(schema, &entity)?;
1638 entities.push(entity);
1639 }
1640
1641 reader.align_to_byte()?;
1642 if reader.bits_remaining() != 0 {
1643 return Err(CodecError::TrailingSectionData {
1644 section: SectionTag::EntityCreate,
1645 remaining_bits: reader.bits_remaining(),
1646 });
1647 }
1648 Ok(entities)
1649}
1650
1651fn decode_update_section_masked(
1652 schema: &schema::Schema,
1653 body: &[u8],
1654 limits: &CodecLimits,
1655) -> CodecResult<Vec<DeltaUpdateEntity>> {
1656 if body.len() > limits.max_section_bytes {
1657 return Err(CodecError::LimitsExceeded {
1658 kind: LimitKind::SectionBytes,
1659 limit: limits.max_section_bytes,
1660 actual: body.len(),
1661 });
1662 }
1663
1664 let mut reader = BitReader::new(body);
1665 reader.align_to_byte()?;
1666 let count = reader.read_varu32()? as usize;
1667 if count > limits.max_entities_update {
1668 return Err(CodecError::LimitsExceeded {
1669 kind: LimitKind::EntitiesUpdate,
1670 limit: limits.max_entities_update,
1671 actual: count,
1672 });
1673 }
1674
1675 let mut updates = Vec::with_capacity(count);
1676 let mut prev: Option<u32> = None;
1677 for _ in 0..count {
1678 reader.align_to_byte()?;
1679 let id = reader.read_u32_aligned()?;
1680 if let Some(prev_id) = prev {
1681 if id <= prev_id {
1682 return Err(CodecError::InvalidEntityOrder {
1683 previous: prev_id,
1684 current: id,
1685 });
1686 }
1687 }
1688 prev = Some(id);
1689
1690 let component_mask = read_mask(
1691 &mut reader,
1692 schema.components.len(),
1693 MaskKind::ComponentMask,
1694 )?;
1695 let mut components = Vec::new();
1696 for (idx, component) in schema.components.iter().enumerate() {
1697 if component_mask[idx] {
1698 let fields = decode_update_component(component, &mut reader, limits)?;
1699 components.push(DeltaUpdateComponent {
1700 id: component.id,
1701 fields,
1702 });
1703 }
1704 }
1705
1706 updates.push(DeltaUpdateEntity {
1707 id: EntityId::new(id),
1708 components,
1709 });
1710 }
1711
1712 reader.align_to_byte()?;
1713 if reader.bits_remaining() != 0 {
1714 return Err(CodecError::TrailingSectionData {
1715 section: SectionTag::EntityUpdate,
1716 remaining_bits: reader.bits_remaining(),
1717 });
1718 }
1719 Ok(updates)
1720}
1721
1722fn decode_update_section_sparse_varint(
1723 schema: &schema::Schema,
1724 body: &[u8],
1725 limits: &CodecLimits,
1726) -> CodecResult<Vec<DeltaUpdateEntity>> {
1727 if body.len() > limits.max_section_bytes {
1728 return Err(CodecError::LimitsExceeded {
1729 kind: LimitKind::SectionBytes,
1730 limit: limits.max_section_bytes,
1731 actual: body.len(),
1732 });
1733 }
1734
1735 let mut reader = BitReader::new(body);
1736 reader.align_to_byte()?;
1737 let entry_count = reader.read_varu32()? as usize;
1738 let entry_limit = limits
1739 .max_entities_update
1740 .saturating_mul(limits.max_components_per_entity);
1741 if entry_count > entry_limit {
1742 return Err(CodecError::LimitsExceeded {
1743 kind: LimitKind::EntitiesUpdate,
1744 limit: entry_limit,
1745 actual: entry_count,
1746 });
1747 }
1748
1749 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1750 let mut prev_entity: Option<u32> = None;
1751 let mut prev_component: Option<u16> = None;
1752 for _ in 0..entry_count {
1753 reader.align_to_byte()?;
1754 let entity_id = reader.read_u32_aligned()?;
1755 let component_raw = reader.read_u16_aligned()?;
1756 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1757 kind: MaskKind::ComponentMask,
1758 reason: MaskReason::InvalidComponentId { raw: component_raw },
1759 })?;
1760 if let Some(prev) = prev_entity {
1761 if entity_id < prev {
1762 return Err(CodecError::InvalidEntityOrder {
1763 previous: prev,
1764 current: entity_id,
1765 });
1766 }
1767 if entity_id == prev {
1768 if let Some(prev_component) = prev_component {
1769 if component_raw <= prev_component {
1770 return Err(CodecError::InvalidEntityOrder {
1771 previous: prev,
1772 current: entity_id,
1773 });
1774 }
1775 }
1776 }
1777 }
1778 prev_entity = Some(entity_id);
1779 prev_component = Some(component_raw);
1780
1781 let component = schema
1782 .components
1783 .iter()
1784 .find(|component| component.id == component_id)
1785 .ok_or(CodecError::InvalidMask {
1786 kind: MaskKind::ComponentMask,
1787 reason: MaskReason::UnknownComponent {
1788 component: component_id,
1789 },
1790 })?;
1791
1792 let field_count = reader.read_varu32()? as usize;
1793 if field_count == 0 {
1794 return Err(CodecError::InvalidMask {
1795 kind: MaskKind::FieldMask {
1796 component: component.id,
1797 },
1798 reason: MaskReason::EmptyFieldMask {
1799 component: component.id,
1800 },
1801 });
1802 }
1803 if field_count > limits.max_fields_per_component {
1804 return Err(CodecError::LimitsExceeded {
1805 kind: LimitKind::FieldsPerComponent,
1806 limit: limits.max_fields_per_component,
1807 actual: field_count,
1808 });
1809 }
1810 if field_count > component.fields.len() {
1811 return Err(CodecError::InvalidMask {
1812 kind: MaskKind::FieldMask {
1813 component: component.id,
1814 },
1815 reason: MaskReason::FieldCountMismatch {
1816 expected: component.fields.len(),
1817 actual: field_count,
1818 },
1819 });
1820 }
1821
1822 let mut fields = Vec::with_capacity(field_count);
1823 let mut prev_index: Option<usize> = None;
1824 for _ in 0..field_count {
1825 reader.align_to_byte()?;
1826 let field_index = reader.read_varu32()? as usize;
1827 if field_index >= component.fields.len() {
1828 return Err(CodecError::InvalidMask {
1829 kind: MaskKind::FieldMask {
1830 component: component.id,
1831 },
1832 reason: MaskReason::InvalidFieldIndex {
1833 field_index,
1834 max: component.fields.len(),
1835 },
1836 });
1837 }
1838 if let Some(prev_index) = prev_index {
1839 if field_index <= prev_index {
1840 return Err(CodecError::InvalidMask {
1841 kind: MaskKind::FieldMask {
1842 component: component.id,
1843 },
1844 reason: MaskReason::InvalidFieldIndex {
1845 field_index,
1846 max: component.fields.len(),
1847 },
1848 });
1849 }
1850 }
1851 prev_index = Some(field_index);
1852 let field = component.fields[field_index];
1853 let value = read_field_value_sparse(component.id, field, &mut reader)?;
1854 fields.push((field_index, value));
1855 }
1856
1857 if let Some(last) = updates.last_mut() {
1858 if last.id.raw() == entity_id {
1859 last.components.push(DeltaUpdateComponent {
1860 id: component.id,
1861 fields,
1862 });
1863 continue;
1864 }
1865 }
1866
1867 updates.push(DeltaUpdateEntity {
1868 id: EntityId::new(entity_id),
1869 components: vec![DeltaUpdateComponent {
1870 id: component.id,
1871 fields,
1872 }],
1873 });
1874 }
1875
1876 reader.align_to_byte()?;
1877 if reader.bits_remaining() != 0 {
1878 return Err(CodecError::TrailingSectionData {
1879 section: SectionTag::EntityUpdateSparse,
1880 remaining_bits: reader.bits_remaining(),
1881 });
1882 }
1883
1884 let unique_entities = updates.len();
1885 if unique_entities > limits.max_entities_update {
1886 return Err(CodecError::LimitsExceeded {
1887 kind: LimitKind::EntitiesUpdate,
1888 limit: limits.max_entities_update,
1889 actual: unique_entities,
1890 });
1891 }
1892
1893 Ok(updates)
1894}
1895
1896fn decode_update_section_sparse_packed(
1897 schema: &schema::Schema,
1898 body: &[u8],
1899 limits: &CodecLimits,
1900) -> CodecResult<Vec<DeltaUpdateEntity>> {
1901 if body.len() > limits.max_section_bytes {
1902 return Err(CodecError::LimitsExceeded {
1903 kind: LimitKind::SectionBytes,
1904 limit: limits.max_section_bytes,
1905 actual: body.len(),
1906 });
1907 }
1908
1909 let mut reader = BitReader::new(body);
1910 reader.align_to_byte()?;
1911 let entry_count = reader.read_varu32()? as usize;
1912 let entry_limit = limits
1913 .max_entities_update
1914 .saturating_mul(limits.max_components_per_entity);
1915 if entry_count > entry_limit {
1916 return Err(CodecError::LimitsExceeded {
1917 kind: LimitKind::EntitiesUpdate,
1918 limit: entry_limit,
1919 actual: entry_count,
1920 });
1921 }
1922
1923 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1924 let mut prev_entity: Option<u32> = None;
1925 let mut prev_component: Option<u16> = None;
1926 for _ in 0..entry_count {
1927 reader.align_to_byte()?;
1928 let entity_id = reader.read_varu32()?;
1929 let component_raw = reader.read_varu32()?;
1930 if component_raw > u16::MAX as u32 {
1931 return Err(CodecError::InvalidMask {
1932 kind: MaskKind::ComponentMask,
1933 reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1934 });
1935 }
1936 let component_raw = component_raw as u16;
1937 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1938 kind: MaskKind::ComponentMask,
1939 reason: MaskReason::InvalidComponentId { raw: component_raw },
1940 })?;
1941 if let Some(prev) = prev_entity {
1942 if entity_id < prev {
1943 return Err(CodecError::InvalidEntityOrder {
1944 previous: prev,
1945 current: entity_id,
1946 });
1947 }
1948 if entity_id == prev {
1949 if let Some(prev_component) = prev_component {
1950 if component_raw <= prev_component {
1951 return Err(CodecError::InvalidEntityOrder {
1952 previous: prev,
1953 current: entity_id,
1954 });
1955 }
1956 }
1957 }
1958 }
1959 prev_entity = Some(entity_id);
1960 prev_component = Some(component_raw);
1961
1962 let component = schema
1963 .components
1964 .iter()
1965 .find(|component| component.id == component_id)
1966 .ok_or(CodecError::InvalidMask {
1967 kind: MaskKind::ComponentMask,
1968 reason: MaskReason::UnknownComponent {
1969 component: component_id,
1970 },
1971 })?;
1972
1973 let field_count = reader.read_varu32()? as usize;
1974 if field_count == 0 {
1975 return Err(CodecError::InvalidMask {
1976 kind: MaskKind::FieldMask {
1977 component: component.id,
1978 },
1979 reason: MaskReason::EmptyFieldMask {
1980 component: component.id,
1981 },
1982 });
1983 }
1984 if field_count > limits.max_fields_per_component {
1985 return Err(CodecError::LimitsExceeded {
1986 kind: LimitKind::FieldsPerComponent,
1987 limit: limits.max_fields_per_component,
1988 actual: field_count,
1989 });
1990 }
1991 if field_count > component.fields.len() {
1992 return Err(CodecError::InvalidMask {
1993 kind: MaskKind::FieldMask {
1994 component: component.id,
1995 },
1996 reason: MaskReason::FieldCountMismatch {
1997 expected: component.fields.len(),
1998 actual: field_count,
1999 },
2000 });
2001 }
2002
2003 let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
2004 let mut fields = Vec::with_capacity(field_count);
2005 let mut prev_index: Option<usize> = None;
2006 for _ in 0..field_count {
2007 let field_index = if index_bits == 0 {
2008 0
2009 } else {
2010 reader.read_bits(index_bits)? as usize
2011 };
2012 if field_index >= component.fields.len() {
2013 return Err(CodecError::InvalidMask {
2014 kind: MaskKind::FieldMask {
2015 component: component.id,
2016 },
2017 reason: MaskReason::InvalidFieldIndex {
2018 field_index,
2019 max: component.fields.len(),
2020 },
2021 });
2022 }
2023 if let Some(prev_index) = prev_index {
2024 if field_index <= prev_index {
2025 return Err(CodecError::InvalidMask {
2026 kind: MaskKind::FieldMask {
2027 component: component.id,
2028 },
2029 reason: MaskReason::InvalidFieldIndex {
2030 field_index,
2031 max: component.fields.len(),
2032 },
2033 });
2034 }
2035 }
2036 prev_index = Some(field_index);
2037 let field = component.fields[field_index];
2038 let value = read_field_value_sparse(component.id, field, &mut reader)?;
2039 fields.push((field_index, value));
2040 }
2041
2042 if let Some(last) = updates.last_mut() {
2043 if last.id.raw() == entity_id {
2044 last.components.push(DeltaUpdateComponent {
2045 id: component.id,
2046 fields,
2047 });
2048 continue;
2049 }
2050 }
2051
2052 updates.push(DeltaUpdateEntity {
2053 id: EntityId::new(entity_id),
2054 components: vec![DeltaUpdateComponent {
2055 id: component.id,
2056 fields,
2057 }],
2058 });
2059 }
2060
2061 reader.align_to_byte()?;
2062 if reader.bits_remaining() != 0 {
2063 return Err(CodecError::TrailingSectionData {
2064 section: SectionTag::EntityUpdateSparsePacked,
2065 remaining_bits: reader.bits_remaining(),
2066 });
2067 }
2068
2069 let unique_entities = updates.len();
2070 if unique_entities > limits.max_entities_update {
2071 return Err(CodecError::LimitsExceeded {
2072 kind: LimitKind::EntitiesUpdate,
2073 limit: limits.max_entities_update,
2074 actual: unique_entities,
2075 });
2076 }
2077
2078 Ok(updates)
2079}
2080
2081fn decode_delta_sections(
2082 schema: &schema::Schema,
2083 packet: &WirePacket<'_>,
2084 limits: &CodecLimits,
2085) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
2086 let mut destroys: Option<Vec<EntityId>> = None;
2087 let mut creates: Option<Vec<EntitySnapshot>> = None;
2088 let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
2089 let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
2090
2091 for section in &packet.sections {
2092 match section.tag {
2093 SectionTag::EntityDestroy => {
2094 if destroys.is_some() {
2095 return Err(CodecError::DuplicateSection {
2096 section: section.tag,
2097 });
2098 }
2099 destroys = Some(decode_destroy_section(section.body, limits)?);
2100 }
2101 SectionTag::EntityCreate => {
2102 if creates.is_some() {
2103 return Err(CodecError::DuplicateSection {
2104 section: section.tag,
2105 });
2106 }
2107 creates = Some(decode_create_section(schema, section.body, limits)?);
2108 }
2109 SectionTag::EntityUpdate => {
2110 if updates_masked.is_some() {
2111 return Err(CodecError::DuplicateSection {
2112 section: section.tag,
2113 });
2114 }
2115 updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
2116 }
2117 SectionTag::EntityUpdateSparse => {
2118 if updates_sparse.is_some() {
2119 return Err(CodecError::DuplicateSection {
2120 section: section.tag,
2121 });
2122 }
2123 updates_sparse = Some(decode_update_section_sparse_varint(
2124 schema,
2125 section.body,
2126 limits,
2127 )?);
2128 }
2129 SectionTag::EntityUpdateSparsePacked => {
2130 if updates_sparse.is_some() {
2131 return Err(CodecError::DuplicateSection {
2132 section: section.tag,
2133 });
2134 }
2135 updates_sparse = Some(decode_update_section_sparse_packed(
2136 schema,
2137 section.body,
2138 limits,
2139 )?);
2140 }
2141 _ => {
2142 return Err(CodecError::UnexpectedSection {
2143 section: section.tag,
2144 });
2145 }
2146 }
2147 }
2148
2149 Ok((
2150 destroys.unwrap_or_default(),
2151 creates.unwrap_or_default(),
2152 match (updates_masked, updates_sparse) {
2153 (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
2154 (Some(updates), None) => updates,
2155 (None, Some(updates)) => updates,
2156 (None, None) => Vec::new(),
2157 },
2158 ))
2159}
2160
2161#[derive(Debug, Clone, PartialEq, Eq)]
2162pub struct DeltaDecoded {
2163 pub tick: SnapshotTick,
2164 pub baseline_tick: SnapshotTick,
2165 pub destroys: Vec<EntityId>,
2166 pub creates: Vec<EntitySnapshot>,
2167 pub updates: Vec<DeltaUpdateEntity>,
2168}
2169
2170#[derive(Debug, Clone, PartialEq, Eq)]
2171pub struct DeltaUpdateEntity {
2172 pub id: EntityId,
2173 pub components: Vec<DeltaUpdateComponent>,
2174}
2175
2176#[derive(Debug, Clone, PartialEq, Eq)]
2177pub struct DeltaUpdateComponent {
2178 pub id: ComponentId,
2179 pub fields: Vec<(usize, FieldValue)>,
2180}
2181
2182fn apply_destroys(
2183 baseline: &[EntitySnapshot],
2184 destroys: &[EntityId],
2185) -> CodecResult<Vec<EntitySnapshot>> {
2186 let mut result = Vec::with_capacity(baseline.len());
2187 let mut i = 0usize;
2188 let mut j = 0usize;
2189 while i < baseline.len() || j < destroys.len() {
2190 let base = baseline.get(i);
2191 let destroy = destroys.get(j);
2192 match (base, destroy) {
2193 (Some(b), Some(d)) => {
2194 if b.id.raw() < d.raw() {
2195 result.push(b.clone());
2196 i += 1;
2197 } else if b.id.raw() > d.raw() {
2198 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2199 } else {
2200 i += 1;
2201 j += 1;
2202 }
2203 }
2204 (Some(b), None) => {
2205 result.push(b.clone());
2206 i += 1;
2207 }
2208 (None, Some(d)) => {
2209 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2210 }
2211 (None, None) => break,
2212 }
2213 }
2214 Ok(result)
2215}
2216
2217fn apply_creates(
2218 baseline: Vec<EntitySnapshot>,
2219 creates: Vec<EntitySnapshot>,
2220) -> CodecResult<Vec<EntitySnapshot>> {
2221 let mut result = Vec::with_capacity(baseline.len() + creates.len());
2222 let mut i = 0usize;
2223 let mut j = 0usize;
2224 while i < baseline.len() || j < creates.len() {
2225 let base = baseline.get(i);
2226 let create = creates.get(j);
2227 match (base, create) {
2228 (Some(b), Some(c)) => {
2229 if b.id.raw() < c.id.raw() {
2230 result.push(b.clone());
2231 i += 1;
2232 } else if b.id.raw() > c.id.raw() {
2233 result.push(c.clone());
2234 j += 1;
2235 } else {
2236 return Err(CodecError::EntityAlreadyExists {
2237 entity_id: c.id.raw(),
2238 });
2239 }
2240 }
2241 (Some(b), None) => {
2242 result.push(b.clone());
2243 i += 1;
2244 }
2245 (None, Some(c)) => {
2246 result.push(c.clone());
2247 j += 1;
2248 }
2249 (None, None) => break,
2250 }
2251 }
2252 Ok(result)
2253}
2254
2255fn apply_updates(
2256 entities: &mut [EntitySnapshot],
2257 updates: &[DeltaUpdateEntity],
2258) -> CodecResult<()> {
2259 for update in updates {
2260 let idx = entities
2261 .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
2262 .map_err(|_| CodecError::EntityNotFound {
2263 entity_id: update.id.raw(),
2264 })?;
2265 let entity = &mut entities[idx];
2266 for component_update in &update.components {
2267 let component = entity
2268 .components
2269 .iter_mut()
2270 .find(|c| c.id == component_update.id)
2271 .ok_or_else(|| CodecError::ComponentNotFound {
2272 entity_id: update.id.raw(),
2273 component_id: component_update.id.get(),
2274 })?;
2275 for (field_idx, value) in &component_update.fields {
2276 if *field_idx >= component.fields.len() {
2277 return Err(CodecError::InvalidMask {
2278 kind: MaskKind::FieldMask {
2279 component: component_update.id,
2280 },
2281 reason: MaskReason::FieldCountMismatch {
2282 expected: component.fields.len(),
2283 actual: *field_idx + 1,
2284 },
2285 });
2286 }
2287 component.fields[*field_idx] = *value;
2288 }
2289 }
2290 }
2291 Ok(())
2292}
2293
2294fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
2295 let mut prev: Option<u32> = None;
2296 for entity in entities {
2297 if let Some(prev_id) = prev {
2298 if entity.id.raw() <= prev_id {
2299 return Err(CodecError::InvalidEntityOrder {
2300 previous: prev_id,
2301 current: entity.id.raw(),
2302 });
2303 }
2304 }
2305 prev = Some(entity.id.raw());
2306 }
2307 Ok(())
2308}
2309
2310fn write_component_mask(
2311 schema: &schema::Schema,
2312 entity: &EntitySnapshot,
2313 writer: &mut BitWriter<'_>,
2314) -> CodecResult<()> {
2315 for component in &schema.components {
2316 let present = find_component(entity, component.id).is_some();
2317 writer.write_bit(present)?;
2318 }
2319 Ok(())
2320}
2321
2322fn write_full_component(
2323 component: &ComponentDef,
2324 snapshot: &ComponentSnapshot,
2325 limits: &CodecLimits,
2326 writer: &mut BitWriter<'_>,
2327) -> CodecResult<()> {
2328 if snapshot.fields.len() != component.fields.len() {
2329 return Err(CodecError::InvalidMask {
2330 kind: MaskKind::FieldMask {
2331 component: component.id,
2332 },
2333 reason: MaskReason::FieldCountMismatch {
2334 expected: component.fields.len(),
2335 actual: snapshot.fields.len(),
2336 },
2337 });
2338 }
2339 if snapshot.fields.len() > limits.max_fields_per_component {
2340 return Err(CodecError::LimitsExceeded {
2341 kind: LimitKind::FieldsPerComponent,
2342 limit: limits.max_fields_per_component,
2343 actual: snapshot.fields.len(),
2344 });
2345 }
2346
2347 for _ in &component.fields {
2348 writer.write_bit(true)?;
2349 }
2350 for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
2351 write_field_value(component.id, *field, *value, writer)?;
2352 }
2353 Ok(())
2354}
2355
2356fn decode_full_component(
2357 component: &ComponentDef,
2358 reader: &mut BitReader<'_>,
2359 limits: &CodecLimits,
2360) -> CodecResult<Vec<FieldValue>> {
2361 if component.fields.len() > limits.max_fields_per_component {
2362 return Err(CodecError::LimitsExceeded {
2363 kind: LimitKind::FieldsPerComponent,
2364 limit: limits.max_fields_per_component,
2365 actual: component.fields.len(),
2366 });
2367 }
2368
2369 let mask = read_mask(
2370 reader,
2371 component.fields.len(),
2372 MaskKind::FieldMask {
2373 component: component.id,
2374 },
2375 )?;
2376 let mut values = Vec::with_capacity(component.fields.len());
2377 for (idx, field) in component.fields.iter().enumerate() {
2378 if !mask[idx] {
2379 return Err(CodecError::InvalidMask {
2380 kind: MaskKind::FieldMask {
2381 component: component.id,
2382 },
2383 reason: MaskReason::MissingField { field: field.id },
2384 });
2385 }
2386 values.push(read_field_value(component.id, *field, reader)?);
2387 }
2388 Ok(values)
2389}
2390
2391fn write_update_components(
2392 schema: &schema::Schema,
2393 baseline: &EntitySnapshot,
2394 current: &EntitySnapshot,
2395 limits: &CodecLimits,
2396 scratch: &mut CodecScratch,
2397 writer: &mut BitWriter<'_>,
2398) -> CodecResult<()> {
2399 let component_count = schema.components.len();
2400 let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2401 component_changed.fill(false);
2402 for (idx, component) in schema.components.iter().enumerate() {
2403 let base = find_component(baseline, component.id);
2404 let curr = find_component(current, component.id);
2405 if base.is_some() != curr.is_some() {
2406 return Err(CodecError::InvalidMask {
2407 kind: MaskKind::ComponentMask,
2408 reason: MaskReason::ComponentPresenceMismatch {
2409 component: component.id,
2410 },
2411 });
2412 }
2413 if let (Some(base), Some(curr)) = (base, curr) {
2414 if base.fields.len() != component.fields.len()
2415 || curr.fields.len() != component.fields.len()
2416 {
2417 return Err(CodecError::InvalidMask {
2418 kind: MaskKind::FieldMask {
2419 component: component.id,
2420 },
2421 reason: MaskReason::FieldCountMismatch {
2422 expected: component.fields.len(),
2423 actual: base.fields.len().max(curr.fields.len()),
2424 },
2425 });
2426 }
2427 if component.fields.len() > limits.max_fields_per_component {
2428 return Err(CodecError::LimitsExceeded {
2429 kind: LimitKind::FieldsPerComponent,
2430 limit: limits.max_fields_per_component,
2431 actual: component.fields.len(),
2432 });
2433 }
2434 let (component_changed, field_mask) =
2435 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2436 let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2437 .iter()
2438 .any(|b| *b);
2439 writer.write_bit(any_changed)?;
2440 if any_changed {
2441 component_changed[idx] = true;
2442 }
2443 } else {
2444 writer.write_bit(false)?;
2445 }
2446 }
2447
2448 for (idx, component) in schema.components.iter().enumerate() {
2449 let (base, curr) = match (
2450 find_component(baseline, component.id),
2451 find_component(current, component.id),
2452 ) {
2453 (Some(base), Some(curr)) => (base, curr),
2454 _ => continue,
2455 };
2456 if component.fields.len() > limits.max_fields_per_component {
2457 return Err(CodecError::LimitsExceeded {
2458 kind: LimitKind::FieldsPerComponent,
2459 limit: limits.max_fields_per_component,
2460 actual: component.fields.len(),
2461 });
2462 }
2463 let (component_changed, field_mask) =
2464 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2465 if component_changed[idx] {
2466 let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2467 for bit in field_mask {
2468 writer.write_bit(*bit)?;
2469 }
2470 for (((field, _base_val), curr_val), changed) in component
2471 .fields
2472 .iter()
2473 .zip(base.fields.iter())
2474 .zip(curr.fields.iter())
2475 .zip(field_mask.iter())
2476 {
2477 if *changed {
2478 write_field_value(component.id, *field, *curr_val, writer)?;
2479 }
2480 }
2481 }
2482 }
2483 Ok(())
2484}
2485
2486fn decode_update_component(
2487 component: &ComponentDef,
2488 reader: &mut BitReader<'_>,
2489 limits: &CodecLimits,
2490) -> CodecResult<Vec<(usize, FieldValue)>> {
2491 if component.fields.len() > limits.max_fields_per_component {
2492 return Err(CodecError::LimitsExceeded {
2493 kind: LimitKind::FieldsPerComponent,
2494 limit: limits.max_fields_per_component,
2495 actual: component.fields.len(),
2496 });
2497 }
2498 let mask = read_mask(
2499 reader,
2500 component.fields.len(),
2501 MaskKind::FieldMask {
2502 component: component.id,
2503 },
2504 )?;
2505 if !mask.iter().any(|b| *b) {
2506 return Err(CodecError::InvalidMask {
2507 kind: MaskKind::FieldMask {
2508 component: component.id,
2509 },
2510 reason: MaskReason::EmptyFieldMask {
2511 component: component.id,
2512 },
2513 });
2514 }
2515 let mut fields = Vec::new();
2516 for (idx, field) in component.fields.iter().enumerate() {
2517 if mask[idx] {
2518 let value = read_field_value(component.id, *field, reader)?;
2519 fields.push((idx, value));
2520 }
2521 }
2522 Ok(fields)
2523}
2524
2525fn compute_field_mask_into<'a>(
2526 component: &ComponentDef,
2527 baseline: &ComponentSnapshot,
2528 current: &ComponentSnapshot,
2529 field_mask: &'a mut [bool],
2530) -> CodecResult<&'a [bool]> {
2531 for (((field, base_val), curr_val), slot) in component
2532 .fields
2533 .iter()
2534 .zip(baseline.fields.iter())
2535 .zip(current.fields.iter())
2536 .zip(field_mask.iter_mut())
2537 {
2538 *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2539 }
2540 Ok(field_mask)
2541}
2542
2543fn field_changed(
2544 component_id: ComponentId,
2545 field: FieldDef,
2546 baseline: FieldValue,
2547 current: FieldValue,
2548) -> CodecResult<bool> {
2549 match field.change {
2550 ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2551 ChangePolicy::Threshold { threshold_q } => {
2552 field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2553 }
2554 }
2555}
2556
2557fn field_differs(
2558 component_id: ComponentId,
2559 field: FieldDef,
2560 baseline: FieldValue,
2561 current: FieldValue,
2562) -> CodecResult<bool> {
2563 match (baseline, current) {
2564 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2565 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2566 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2567 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2568 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2569 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2570 _ => Err(CodecError::InvalidValue {
2571 component: component_id,
2572 field: field.id,
2573 reason: ValueReason::TypeMismatch {
2574 expected: codec_name(field.codec),
2575 found: value_name(current),
2576 },
2577 }),
2578 }
2579}
2580
2581fn field_exceeds_threshold(
2582 component_id: ComponentId,
2583 field: FieldDef,
2584 baseline: FieldValue,
2585 current: FieldValue,
2586 threshold_q: u32,
2587) -> CodecResult<bool> {
2588 let threshold_q = threshold_q as u64;
2589 match (baseline, current) {
2590 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2591 Ok((a - b).unsigned_abs() > threshold_q)
2592 }
2593 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2594 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2595 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2596 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2597 Ok((a - b).unsigned_abs() > threshold_q)
2598 }
2599 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2600 _ => Err(CodecError::InvalidValue {
2601 component: component_id,
2602 field: field.id,
2603 reason: ValueReason::TypeMismatch {
2604 expected: codec_name(field.codec),
2605 found: value_name(current),
2606 },
2607 }),
2608 }
2609}
2610
2611fn entity_has_updates(
2612 schema: &schema::Schema,
2613 baseline: &EntitySnapshot,
2614 current: &EntitySnapshot,
2615 limits: &CodecLimits,
2616) -> CodecResult<bool> {
2617 ensure_component_presence_matches(schema, baseline, current)?;
2618 for component in &schema.components {
2619 let base = find_component(baseline, component.id);
2620 let curr = find_component(current, component.id);
2621 if let (Some(base), Some(curr)) = (base, curr) {
2622 if base.fields.len() != component.fields.len()
2623 || curr.fields.len() != component.fields.len()
2624 {
2625 return Err(CodecError::InvalidMask {
2626 kind: MaskKind::FieldMask {
2627 component: component.id,
2628 },
2629 reason: MaskReason::FieldCountMismatch {
2630 expected: component.fields.len(),
2631 actual: base.fields.len().max(curr.fields.len()),
2632 },
2633 });
2634 }
2635 if component.fields.len() > limits.max_fields_per_component {
2636 return Err(CodecError::LimitsExceeded {
2637 kind: LimitKind::FieldsPerComponent,
2638 limit: limits.max_fields_per_component,
2639 actual: component.fields.len(),
2640 });
2641 }
2642 for ((field, base_val), curr_val) in component
2643 .fields
2644 .iter()
2645 .zip(base.fields.iter())
2646 .zip(curr.fields.iter())
2647 {
2648 if field_changed(component.id, *field, *base_val, *curr_val)? {
2649 return Ok(true);
2650 }
2651 }
2652 }
2653 }
2654 Ok(false)
2655}
2656
2657fn ensure_component_presence_matches(
2658 schema: &schema::Schema,
2659 baseline: &EntitySnapshot,
2660 current: &EntitySnapshot,
2661) -> CodecResult<()> {
2662 for component in &schema.components {
2664 let base = find_component(baseline, component.id).is_some();
2665 let curr = find_component(current, component.id).is_some();
2666 if base != curr {
2667 return Err(CodecError::InvalidMask {
2668 kind: MaskKind::ComponentMask,
2669 reason: MaskReason::ComponentPresenceMismatch {
2670 component: component.id,
2671 },
2672 });
2673 }
2674 }
2675 Ok(())
2676}
2677
2678fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2679 entity.components.iter().find(|c| c.id == id)
2680}
2681
2682fn codec_name(codec: schema::FieldCodec) -> &'static str {
2683 match codec {
2684 schema::FieldCodec::Bool => "bool",
2685 schema::FieldCodec::UInt { .. } => "uint",
2686 schema::FieldCodec::SInt { .. } => "sint",
2687 schema::FieldCodec::VarUInt => "varuint",
2688 schema::FieldCodec::VarSInt => "varsint",
2689 schema::FieldCodec::FixedPoint(_) => "fixed-point",
2690 }
2691}
2692
2693fn value_name(value: FieldValue) -> &'static str {
2694 match value {
2695 FieldValue::Bool(_) => "bool",
2696 FieldValue::UInt(_) => "uint",
2697 FieldValue::SInt(_) => "sint",
2698 FieldValue::VarUInt(_) => "varuint",
2699 FieldValue::VarSInt(_) => "varsint",
2700 FieldValue::FixedPoint(_) => "fixed-point",
2701 }
2702}
2703
2704#[cfg(test)]
2705mod tests {
2706 use super::*;
2707 use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2708
2709 fn schema_one_bool() -> Schema {
2710 let component = ComponentDef::new(ComponentId::new(1).unwrap())
2711 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2712 Schema::new(vec![component]).unwrap()
2713 }
2714
2715 fn schema_uint_threshold(threshold_q: u32) -> Schema {
2716 let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2717 .change(ChangePolicy::Threshold { threshold_q });
2718 let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2719 Schema::new(vec![component]).unwrap()
2720 }
2721
2722 fn schema_two_components() -> Schema {
2723 let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2724 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2725 let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2726 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2727 Schema::new(vec![c1, c2]).unwrap()
2728 }
2729
2730 fn baseline_snapshot() -> Snapshot {
2731 Snapshot {
2732 tick: SnapshotTick::new(10),
2733 entities: vec![EntitySnapshot {
2734 id: EntityId::new(1),
2735 components: vec![ComponentSnapshot {
2736 id: ComponentId::new(1).unwrap(),
2737 fields: vec![FieldValue::Bool(false)],
2738 }],
2739 }],
2740 }
2741 }
2742
2743 #[test]
2744 fn no_op_delta_is_empty() {
2745 let schema = schema_one_bool();
2746 let baseline = baseline_snapshot();
2747 let current = baseline.clone();
2748 let mut buf = [0u8; 128];
2749 let bytes = encode_delta_snapshot(
2750 &schema,
2751 SnapshotTick::new(11),
2752 baseline.tick,
2753 &baseline,
2754 ¤t,
2755 &CodecLimits::for_testing(),
2756 &mut buf,
2757 )
2758 .unwrap();
2759 let header =
2760 wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2761 let mut expected = [0u8; wire::HEADER_SIZE];
2762 encode_header(&header, &mut expected).unwrap();
2763 assert_eq!(&buf[..bytes], expected.as_slice());
2764 }
2765
2766 #[test]
2767 fn delta_roundtrip_single_update() {
2768 let schema = schema_one_bool();
2769 let baseline = baseline_snapshot();
2770 let current = Snapshot {
2771 tick: SnapshotTick::new(11),
2772 entities: vec![EntitySnapshot {
2773 id: EntityId::new(1),
2774 components: vec![ComponentSnapshot {
2775 id: ComponentId::new(1).unwrap(),
2776 fields: vec![FieldValue::Bool(true)],
2777 }],
2778 }],
2779 };
2780
2781 let mut buf = [0u8; 128];
2782 let bytes = encode_delta_snapshot(
2783 &schema,
2784 current.tick,
2785 baseline.tick,
2786 &baseline,
2787 ¤t,
2788 &CodecLimits::for_testing(),
2789 &mut buf,
2790 )
2791 .unwrap();
2792 let applied = apply_delta_snapshot(
2793 &schema,
2794 &baseline,
2795 &buf[..bytes],
2796 &wire::Limits::for_testing(),
2797 &CodecLimits::for_testing(),
2798 )
2799 .unwrap();
2800 assert_eq!(applied.entities, current.entities);
2801 }
2802
2803 #[test]
2804 fn delta_roundtrip_reuse_scratch() {
2805 let schema = schema_one_bool();
2806 let baseline = baseline_snapshot();
2807 let current_one = Snapshot {
2808 tick: SnapshotTick::new(11),
2809 entities: vec![EntitySnapshot {
2810 id: EntityId::new(1),
2811 components: vec![ComponentSnapshot {
2812 id: ComponentId::new(1).unwrap(),
2813 fields: vec![FieldValue::Bool(true)],
2814 }],
2815 }],
2816 };
2817 let current_two = Snapshot {
2818 tick: SnapshotTick::new(12),
2819 entities: vec![EntitySnapshot {
2820 id: EntityId::new(1),
2821 components: vec![ComponentSnapshot {
2822 id: ComponentId::new(1).unwrap(),
2823 fields: vec![FieldValue::Bool(false)],
2824 }],
2825 }],
2826 };
2827
2828 let mut scratch = CodecScratch::default();
2829 let mut buf_one = [0u8; 128];
2830 let mut buf_two = [0u8; 128];
2831
2832 let bytes_one = encode_delta_snapshot_with_scratch(
2833 &schema,
2834 current_one.tick,
2835 baseline.tick,
2836 &baseline,
2837 ¤t_one,
2838 &CodecLimits::for_testing(),
2839 &mut scratch,
2840 &mut buf_one,
2841 )
2842 .unwrap();
2843 let applied_one = apply_delta_snapshot(
2844 &schema,
2845 &baseline,
2846 &buf_one[..bytes_one],
2847 &wire::Limits::for_testing(),
2848 &CodecLimits::for_testing(),
2849 )
2850 .unwrap();
2851 assert_eq!(applied_one.entities, current_one.entities);
2852
2853 let bytes_two = encode_delta_snapshot_with_scratch(
2854 &schema,
2855 current_two.tick,
2856 baseline.tick,
2857 &baseline,
2858 ¤t_two,
2859 &CodecLimits::for_testing(),
2860 &mut scratch,
2861 &mut buf_two,
2862 )
2863 .unwrap();
2864 let applied_two = apply_delta_snapshot(
2865 &schema,
2866 &baseline,
2867 &buf_two[..bytes_two],
2868 &wire::Limits::for_testing(),
2869 &CodecLimits::for_testing(),
2870 )
2871 .unwrap();
2872 assert_eq!(applied_two.entities, current_two.entities);
2873 }
2874
2875 #[test]
2876 fn delta_rejects_both_update_encodings() {
2877 let schema = schema_one_bool();
2878 let baseline = baseline_snapshot();
2879 let update_body = [0u8; 1];
2880 let mut update_section = [0u8; 8];
2881 let update_len =
2882 wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2883 .unwrap();
2884
2885 let mut sparse_section = [0u8; 8];
2886 let sparse_len = wire::encode_section(
2887 SectionTag::EntityUpdateSparse,
2888 &update_body,
2889 &mut sparse_section,
2890 )
2891 .unwrap();
2892
2893 let payload_len = (update_len + sparse_len) as u32;
2894 let header = wire::PacketHeader::delta_snapshot(
2895 schema_hash(&schema),
2896 baseline.tick.raw() + 1,
2897 baseline.tick.raw(),
2898 payload_len,
2899 );
2900 let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2901 wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2902 buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2903 .copy_from_slice(&update_section[..update_len]);
2904 buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2905 .copy_from_slice(&sparse_section[..sparse_len]);
2906
2907 let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2908 let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2909 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2910
2911 let err = apply_delta_snapshot_from_packet(
2912 &schema,
2913 &baseline,
2914 &packet,
2915 &CodecLimits::for_testing(),
2916 )
2917 .unwrap_err();
2918 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2919 }
2920
2921 #[test]
2922 fn delta_roundtrip_create_destroy_update() {
2923 let schema = schema_one_bool();
2924 let baseline = Snapshot {
2925 tick: SnapshotTick::new(10),
2926 entities: vec![
2927 EntitySnapshot {
2928 id: EntityId::new(1),
2929 components: vec![ComponentSnapshot {
2930 id: ComponentId::new(1).unwrap(),
2931 fields: vec![FieldValue::Bool(false)],
2932 }],
2933 },
2934 EntitySnapshot {
2935 id: EntityId::new(2),
2936 components: vec![ComponentSnapshot {
2937 id: ComponentId::new(1).unwrap(),
2938 fields: vec![FieldValue::Bool(false)],
2939 }],
2940 },
2941 ],
2942 };
2943 let current = Snapshot {
2944 tick: SnapshotTick::new(11),
2945 entities: vec![
2946 EntitySnapshot {
2947 id: EntityId::new(2),
2948 components: vec![ComponentSnapshot {
2949 id: ComponentId::new(1).unwrap(),
2950 fields: vec![FieldValue::Bool(true)],
2951 }],
2952 },
2953 EntitySnapshot {
2954 id: EntityId::new(3),
2955 components: vec![ComponentSnapshot {
2956 id: ComponentId::new(1).unwrap(),
2957 fields: vec![FieldValue::Bool(true)],
2958 }],
2959 },
2960 ],
2961 };
2962
2963 let mut buf = [0u8; 256];
2964 let bytes = encode_delta_snapshot(
2965 &schema,
2966 current.tick,
2967 baseline.tick,
2968 &baseline,
2969 ¤t,
2970 &CodecLimits::for_testing(),
2971 &mut buf,
2972 )
2973 .unwrap();
2974 let applied = apply_delta_snapshot(
2975 &schema,
2976 &baseline,
2977 &buf[..bytes],
2978 &wire::Limits::for_testing(),
2979 &CodecLimits::for_testing(),
2980 )
2981 .unwrap();
2982 assert_eq!(applied.entities, current.entities);
2983 }
2984
2985 #[test]
2986 fn delta_session_header_matches_payload() {
2987 let schema = schema_one_bool();
2988 let baseline = baseline_snapshot();
2989 let current = Snapshot {
2990 tick: SnapshotTick::new(11),
2991 entities: vec![EntitySnapshot {
2992 id: EntityId::new(1),
2993 components: vec![ComponentSnapshot {
2994 id: ComponentId::new(1).unwrap(),
2995 fields: vec![FieldValue::Bool(true)],
2996 }],
2997 }],
2998 };
2999
3000 let mut full_buf = [0u8; 128];
3001 let full_bytes = encode_delta_snapshot_for_client(
3002 &schema,
3003 current.tick,
3004 baseline.tick,
3005 &baseline,
3006 ¤t,
3007 &CodecLimits::for_testing(),
3008 &mut full_buf,
3009 )
3010 .unwrap();
3011 let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
3012
3013 let mut session_buf = [0u8; 128];
3014 let mut last_tick = baseline.tick;
3015 let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
3016 &schema,
3017 current.tick,
3018 baseline.tick,
3019 &baseline,
3020 ¤t,
3021 &CodecLimits::for_testing(),
3022 &mut CodecScratch::default(),
3023 &mut last_tick,
3024 &mut session_buf,
3025 )
3026 .unwrap();
3027
3028 let session_header =
3029 wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
3030 .unwrap();
3031 let payload_start = session_header.header_len;
3032 let payload_end = payload_start + session_header.payload_len as usize;
3033 let session_payload = &session_buf[payload_start..payload_end];
3034 assert_eq!(full_payload, session_payload);
3035 }
3036
3037 #[test]
3038 fn delta_roundtrip_single_component_change() {
3039 let schema = schema_two_components();
3040 let baseline = Snapshot {
3041 tick: SnapshotTick::new(10),
3042 entities: vec![EntitySnapshot {
3043 id: EntityId::new(1),
3044 components: vec![
3045 ComponentSnapshot {
3046 id: ComponentId::new(1).unwrap(),
3047 fields: vec![FieldValue::Bool(false)],
3048 },
3049 ComponentSnapshot {
3050 id: ComponentId::new(2).unwrap(),
3051 fields: vec![FieldValue::Bool(false)],
3052 },
3053 ],
3054 }],
3055 };
3056 let current = Snapshot {
3057 tick: SnapshotTick::new(11),
3058 entities: vec![EntitySnapshot {
3059 id: EntityId::new(1),
3060 components: vec![
3061 ComponentSnapshot {
3062 id: ComponentId::new(1).unwrap(),
3063 fields: vec![FieldValue::Bool(true)],
3064 },
3065 ComponentSnapshot {
3066 id: ComponentId::new(2).unwrap(),
3067 fields: vec![FieldValue::Bool(false)],
3068 },
3069 ],
3070 }],
3071 };
3072
3073 let mut buf = [0u8; 256];
3074 let bytes = encode_delta_snapshot(
3075 &schema,
3076 current.tick,
3077 baseline.tick,
3078 &baseline,
3079 ¤t,
3080 &CodecLimits::for_testing(),
3081 &mut buf,
3082 )
3083 .unwrap();
3084 let applied = apply_delta_snapshot(
3085 &schema,
3086 &baseline,
3087 &buf[..bytes],
3088 &wire::Limits::for_testing(),
3089 &CodecLimits::for_testing(),
3090 )
3091 .unwrap();
3092 assert_eq!(applied.entities, current.entities);
3093 }
3094
3095 #[test]
3096 fn baseline_tick_mismatch_is_error() {
3097 let schema = schema_one_bool();
3098 let baseline = baseline_snapshot();
3099 let current = baseline.clone();
3100 let mut buf = [0u8; 128];
3101 let bytes = encode_delta_snapshot(
3102 &schema,
3103 SnapshotTick::new(11),
3104 baseline.tick,
3105 &baseline,
3106 ¤t,
3107 &CodecLimits::for_testing(),
3108 &mut buf,
3109 )
3110 .unwrap();
3111 let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3112 packet.header.baseline_tick = 999;
3113 wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
3114 let err = apply_delta_snapshot(
3115 &schema,
3116 &baseline,
3117 &buf[..bytes],
3118 &wire::Limits::for_testing(),
3119 &CodecLimits::for_testing(),
3120 )
3121 .unwrap_err();
3122 assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
3123 }
3124
3125 #[test]
3126 fn threshold_suppresses_small_change() {
3127 let schema = schema_uint_threshold(5);
3128 let baseline = Snapshot {
3129 tick: SnapshotTick::new(10),
3130 entities: vec![EntitySnapshot {
3131 id: EntityId::new(1),
3132 components: vec![ComponentSnapshot {
3133 id: ComponentId::new(1).unwrap(),
3134 fields: vec![FieldValue::UInt(10)],
3135 }],
3136 }],
3137 };
3138 let current = Snapshot {
3139 tick: SnapshotTick::new(11),
3140 entities: vec![EntitySnapshot {
3141 id: EntityId::new(1),
3142 components: vec![ComponentSnapshot {
3143 id: ComponentId::new(1).unwrap(),
3144 fields: vec![FieldValue::UInt(12)],
3145 }],
3146 }],
3147 };
3148
3149 let mut buf = [0u8; 128];
3150 let bytes = encode_delta_snapshot(
3151 &schema,
3152 current.tick,
3153 baseline.tick,
3154 &baseline,
3155 ¤t,
3156 &CodecLimits::for_testing(),
3157 &mut buf,
3158 )
3159 .unwrap();
3160
3161 let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3162 assert_eq!(packet.sections.len(), 0);
3163
3164 let applied = apply_delta_snapshot(
3165 &schema,
3166 &baseline,
3167 &buf[..bytes],
3168 &wire::Limits::for_testing(),
3169 &CodecLimits::for_testing(),
3170 )
3171 .unwrap();
3172 assert_eq!(applied.entities, baseline.entities);
3173 }
3174}