1use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14 ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15 write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16 FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20#[must_use]
22pub fn select_baseline_tick<T>(
23 store: &BaselineStore<T>,
24 ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26 store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29pub fn encode_delta_snapshot(
35 schema: &schema::Schema,
36 tick: SnapshotTick,
37 baseline_tick: SnapshotTick,
38 baseline: &Snapshot,
39 current: &Snapshot,
40 limits: &CodecLimits,
41 out: &mut [u8],
42) -> CodecResult<usize> {
43 let mut scratch = CodecScratch::default();
44 encode_delta_snapshot_with_scratch(
45 schema,
46 tick,
47 baseline_tick,
48 baseline,
49 current,
50 limits,
51 &mut scratch,
52 out,
53 )
54}
55
56#[allow(clippy::too_many_arguments)]
58pub fn encode_delta_snapshot_with_scratch(
59 schema: &schema::Schema,
60 tick: SnapshotTick,
61 baseline_tick: SnapshotTick,
62 baseline: &Snapshot,
63 current: &Snapshot,
64 limits: &CodecLimits,
65 scratch: &mut CodecScratch,
66 out: &mut [u8],
67) -> CodecResult<usize> {
68 encode_delta_snapshot_with_scratch_mode(
69 schema,
70 tick,
71 baseline_tick,
72 baseline,
73 current,
74 limits,
75 scratch,
76 out,
77 EncodeUpdateMode::Auto,
78 )
79}
80
81pub fn encode_delta_snapshot_for_client(
86 schema: &schema::Schema,
87 tick: SnapshotTick,
88 baseline_tick: SnapshotTick,
89 baseline: &Snapshot,
90 current: &Snapshot,
91 limits: &CodecLimits,
92 out: &mut [u8],
93) -> CodecResult<usize> {
94 let mut scratch = CodecScratch::default();
95 encode_delta_snapshot_for_client_with_scratch(
96 schema,
97 tick,
98 baseline_tick,
99 baseline,
100 current,
101 limits,
102 &mut scratch,
103 out,
104 )
105}
106
107pub struct SessionEncoder<'a> {
112 schema: &'a schema::Schema,
113 limits: &'a CodecLimits,
114 #[allow(dead_code)]
115 scratch: CodecScratch,
116}
117
118impl<'a> SessionEncoder<'a> {
119 #[must_use]
120 pub fn new(schema: &'a schema::Schema, limits: &'a CodecLimits) -> Self {
121 Self {
122 schema,
123 limits,
124 scratch: CodecScratch::default(),
125 }
126 }
127
128 #[must_use]
129 pub fn schema(&self) -> &'a schema::Schema {
130 self.schema
131 }
132
133 #[must_use]
134 pub fn limits(&self) -> &'a CodecLimits {
135 self.limits
136 }
137}
138
139pub fn encode_delta_from_changes(
144 session: &mut SessionEncoder<'_>,
145 tick: SnapshotTick,
146 baseline_tick: SnapshotTick,
147 creates: &[EntitySnapshot],
148 destroys: &[EntityId],
149 updates: &[DeltaUpdateEntity],
150 out: &mut [u8],
151) -> CodecResult<usize> {
152 encode_delta_snapshot_from_updates(
153 session.schema,
154 tick,
155 baseline_tick,
156 destroys,
157 creates,
158 updates,
159 session.limits,
160 out,
161 )
162}
163
164#[allow(clippy::too_many_arguments)]
169pub fn encode_delta_snapshot_from_updates(
170 schema: &schema::Schema,
171 tick: SnapshotTick,
172 baseline_tick: SnapshotTick,
173 destroys: &[EntityId],
174 creates: &[EntitySnapshot],
175 updates: &[DeltaUpdateEntity],
176 limits: &CodecLimits,
177 out: &mut [u8],
178) -> CodecResult<usize> {
179 if out.len() < wire::HEADER_SIZE {
180 return Err(CodecError::OutputTooSmall {
181 needed: wire::HEADER_SIZE,
182 available: out.len(),
183 });
184 }
185 let payload_len = encode_delta_payload_from_updates(
186 schema,
187 destroys,
188 creates,
189 updates,
190 limits,
191 &mut out[wire::HEADER_SIZE..],
192 )?;
193 let header = wire::PacketHeader::delta_snapshot(
194 schema_hash(schema),
195 tick.raw(),
196 baseline_tick.raw(),
197 payload_len as u32,
198 );
199 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
200 CodecError::OutputTooSmall {
201 needed: wire::HEADER_SIZE,
202 available: out.len(),
203 }
204 })?;
205 Ok(wire::HEADER_SIZE + payload_len)
206}
207
208#[allow(clippy::too_many_arguments)]
210pub fn encode_delta_snapshot_for_client_session(
211 schema: &schema::Schema,
212 tick: SnapshotTick,
213 baseline_tick: SnapshotTick,
214 baseline: &Snapshot,
215 current: &Snapshot,
216 limits: &CodecLimits,
217 last_tick: &mut SnapshotTick,
218 out: &mut [u8],
219) -> CodecResult<usize> {
220 let mut scratch = CodecScratch::default();
221 encode_delta_snapshot_for_client_session_with_scratch(
222 schema,
223 tick,
224 baseline_tick,
225 baseline,
226 current,
227 limits,
228 &mut scratch,
229 last_tick,
230 out,
231 )
232}
233
234#[allow(clippy::too_many_arguments)]
236pub fn encode_delta_snapshot_for_client_with_scratch(
237 schema: &schema::Schema,
238 tick: SnapshotTick,
239 baseline_tick: SnapshotTick,
240 baseline: &Snapshot,
241 current: &Snapshot,
242 limits: &CodecLimits,
243 scratch: &mut CodecScratch,
244 out: &mut [u8],
245) -> CodecResult<usize> {
246 encode_delta_snapshot_with_scratch_mode(
247 schema,
248 tick,
249 baseline_tick,
250 baseline,
251 current,
252 limits,
253 scratch,
254 out,
255 EncodeUpdateMode::Sparse,
256 )
257}
258
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260enum EncodeUpdateMode {
261 Auto,
262 Sparse,
263}
264
265#[allow(clippy::too_many_arguments)]
267pub fn encode_delta_snapshot_for_client_session_with_scratch(
268 schema: &schema::Schema,
269 tick: SnapshotTick,
270 baseline_tick: SnapshotTick,
271 baseline: &Snapshot,
272 current: &Snapshot,
273 limits: &CodecLimits,
274 scratch: &mut CodecScratch,
275 last_tick: &mut SnapshotTick,
276 out: &mut [u8],
277) -> CodecResult<usize> {
278 let max_header = wire::SESSION_MAX_HEADER_SIZE;
279 if out.len() < max_header {
280 return Err(CodecError::OutputTooSmall {
281 needed: max_header,
282 available: out.len(),
283 });
284 }
285 if tick.raw() <= last_tick.raw() {
286 return Err(CodecError::InvalidEntityOrder {
287 previous: last_tick.raw(),
288 current: tick.raw(),
289 });
290 }
291 if baseline_tick.raw() > tick.raw() {
292 return Err(CodecError::BaselineTickMismatch {
293 expected: baseline_tick.raw(),
294 found: tick.raw(),
295 });
296 }
297 let payload_len = encode_delta_payload_with_mode(
298 schema,
299 baseline_tick,
300 baseline,
301 current,
302 limits,
303 scratch,
304 &mut out[max_header..],
305 EncodeUpdateMode::Sparse,
306 )?;
307 let tick_delta = tick.raw() - last_tick.raw();
308 let baseline_delta = tick.raw() - baseline_tick.raw();
309 let header_len = wire::encode_session_header(
310 &mut out[..max_header],
311 wire::SessionFlags::delta_snapshot(),
312 tick_delta,
313 baseline_delta,
314 payload_len as u32,
315 )
316 .map_err(|_| CodecError::OutputTooSmall {
317 needed: max_header,
318 available: out.len(),
319 })?;
320 if header_len < max_header {
321 let payload_start = max_header;
322 let payload_end = max_header + payload_len;
323 out.copy_within(payload_start..payload_end, header_len);
324 }
325 *last_tick = tick;
326 Ok(header_len + payload_len)
327}
328
329#[allow(clippy::too_many_arguments)]
330fn encode_delta_snapshot_with_scratch_mode(
331 schema: &schema::Schema,
332 tick: SnapshotTick,
333 baseline_tick: SnapshotTick,
334 baseline: &Snapshot,
335 current: &Snapshot,
336 limits: &CodecLimits,
337 scratch: &mut CodecScratch,
338 out: &mut [u8],
339 mode: EncodeUpdateMode,
340) -> CodecResult<usize> {
341 if out.len() < wire::HEADER_SIZE {
342 return Err(CodecError::OutputTooSmall {
343 needed: wire::HEADER_SIZE,
344 available: out.len(),
345 });
346 }
347 let payload_len = encode_delta_payload_with_mode(
348 schema,
349 baseline_tick,
350 baseline,
351 current,
352 limits,
353 scratch,
354 &mut out[wire::HEADER_SIZE..],
355 mode,
356 )?;
357 let header = wire::PacketHeader::delta_snapshot(
358 schema_hash(schema),
359 tick.raw(),
360 baseline_tick.raw(),
361 payload_len as u32,
362 );
363 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
364 CodecError::OutputTooSmall {
365 needed: wire::HEADER_SIZE,
366 available: out.len(),
367 }
368 })?;
369
370 Ok(wire::HEADER_SIZE + payload_len)
371}
372
373#[allow(clippy::too_many_arguments)]
374fn encode_delta_payload_with_mode(
375 schema: &schema::Schema,
376 baseline_tick: SnapshotTick,
377 baseline: &Snapshot,
378 current: &Snapshot,
379 limits: &CodecLimits,
380 scratch: &mut CodecScratch,
381 out: &mut [u8],
382 mode: EncodeUpdateMode,
383) -> CodecResult<usize> {
384 if baseline.tick != baseline_tick {
385 return Err(CodecError::BaselineTickMismatch {
386 expected: baseline.tick.raw(),
387 found: baseline_tick.raw(),
388 });
389 }
390
391 ensure_entities_sorted(&baseline.entities)?;
392 ensure_entities_sorted(¤t.entities)?;
393
394 let mut counts = DiffCounts::default();
395 diff_counts(schema, baseline, current, limits, &mut counts)?;
396
397 if counts.creates > limits.max_entities_create {
398 return Err(CodecError::LimitsExceeded {
399 kind: LimitKind::EntitiesCreate,
400 limit: limits.max_entities_create,
401 actual: counts.creates,
402 });
403 }
404 if counts.updates > limits.max_entities_update {
405 return Err(CodecError::LimitsExceeded {
406 kind: LimitKind::EntitiesUpdate,
407 limit: limits.max_entities_update,
408 actual: counts.updates,
409 });
410 }
411 if counts.destroys > limits.max_entities_destroy {
412 return Err(CodecError::LimitsExceeded {
413 kind: LimitKind::EntitiesDestroy,
414 limit: limits.max_entities_destroy,
415 actual: counts.destroys,
416 });
417 }
418
419 let mut offset = 0;
420 if counts.destroys > 0 {
421 let written = write_section(
422 SectionTag::EntityDestroy,
423 &mut out[offset..],
424 limits,
425 |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
426 )?;
427 offset += written;
428 }
429 if counts.creates > 0 {
430 let written = write_section(
431 SectionTag::EntityCreate,
432 &mut out[offset..],
433 limits,
434 |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
435 )?;
436 offset += written;
437 }
438 if counts.updates > 0 {
439 let update_encoding = match mode {
440 EncodeUpdateMode::Auto => {
441 select_update_encoding(schema, baseline, current, limits, scratch)?
442 }
443 EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
444 };
445 let section_tag = match update_encoding {
446 UpdateEncoding::Masked => SectionTag::EntityUpdate,
447 UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
448 };
449 let written =
450 write_section(
451 section_tag,
452 &mut out[offset..],
453 limits,
454 |writer| match update_encoding {
455 UpdateEncoding::Masked => encode_update_body_masked(
456 schema,
457 baseline,
458 current,
459 counts.updates,
460 limits,
461 scratch,
462 writer,
463 ),
464 UpdateEncoding::Sparse => encode_update_body_sparse_packed(
465 schema,
466 baseline,
467 current,
468 counts.updates,
469 limits,
470 scratch,
471 writer,
472 ),
473 },
474 )?;
475 offset += written;
476 }
477
478 Ok(offset)
479}
480
481#[allow(clippy::too_many_arguments)]
482fn encode_delta_payload_from_updates(
483 schema: &schema::Schema,
484 destroys: &[EntityId],
485 creates: &[EntitySnapshot],
486 updates: &[DeltaUpdateEntity],
487 limits: &CodecLimits,
488 out: &mut [u8],
489) -> CodecResult<usize> {
490 if destroys.len() > limits.max_entities_destroy {
491 return Err(CodecError::LimitsExceeded {
492 kind: LimitKind::EntitiesDestroy,
493 limit: limits.max_entities_destroy,
494 actual: destroys.len(),
495 });
496 }
497 if creates.len() > limits.max_entities_create {
498 return Err(CodecError::LimitsExceeded {
499 kind: LimitKind::EntitiesCreate,
500 limit: limits.max_entities_create,
501 actual: creates.len(),
502 });
503 }
504 if updates.len() > limits.max_entities_update {
505 return Err(CodecError::LimitsExceeded {
506 kind: LimitKind::EntitiesUpdate,
507 limit: limits.max_entities_update,
508 actual: updates.len(),
509 });
510 }
511
512 ensure_entity_ids_sorted(destroys)?;
513 ensure_entities_sorted(creates)?;
514 let lookup = build_component_lookup(schema);
515 validate_updates_for_encoding(schema, updates, limits, &lookup)?;
516
517 let mut offset = 0;
518 if !destroys.is_empty() {
519 let written = write_section(
520 SectionTag::EntityDestroy,
521 &mut out[offset..],
522 limits,
523 |writer| encode_destroy_body_from_list(destroys, limits, writer),
524 )?;
525 offset += written;
526 }
527 if !creates.is_empty() {
528 let written = write_section(
529 SectionTag::EntityCreate,
530 &mut out[offset..],
531 limits,
532 |writer| encode_create_body_from_list(schema, creates, limits, writer),
533 )?;
534 offset += written;
535 }
536 if !updates.is_empty() {
537 let written = write_section(
538 SectionTag::EntityUpdateSparsePacked,
539 &mut out[offset..],
540 limits,
541 |writer| {
542 encode_update_body_sparse_packed_from_updates(
543 schema, updates, limits, &lookup, writer,
544 )
545 },
546 )?;
547 offset += written;
548 }
549
550 Ok(offset)
551}
552
553pub fn apply_delta_snapshot(
555 schema: &schema::Schema,
556 baseline: &Snapshot,
557 bytes: &[u8],
558 wire_limits: &wire::Limits,
559 limits: &CodecLimits,
560) -> CodecResult<Snapshot> {
561 let packet = decode_packet(bytes, wire_limits)?;
562 apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
563}
564
565pub fn apply_delta_snapshot_from_packet(
567 schema: &schema::Schema,
568 baseline: &Snapshot,
569 packet: &WirePacket<'_>,
570 limits: &CodecLimits,
571) -> CodecResult<Snapshot> {
572 let header = packet.header;
573 if !header.flags.is_delta_snapshot() {
574 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
575 flags: header.flags.raw(),
576 }));
577 }
578 if header.baseline_tick == 0 {
579 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
580 baseline_tick: header.baseline_tick,
581 flags: header.flags.raw(),
582 }));
583 }
584 if header.baseline_tick != baseline.tick.raw() {
585 return Err(CodecError::BaselineTickMismatch {
586 expected: baseline.tick.raw(),
587 found: header.baseline_tick,
588 });
589 }
590
591 let expected_hash = schema_hash(schema);
592 if header.schema_hash != expected_hash {
593 return Err(CodecError::SchemaMismatch {
594 expected: expected_hash,
595 found: header.schema_hash,
596 });
597 }
598
599 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
600
601 ensure_entities_sorted(&baseline.entities)?;
602 ensure_entities_sorted(&creates)?;
603
604 let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
605 remaining = apply_creates(remaining, creates)?;
606 if remaining.len() > limits.max_total_entities_after_apply {
607 return Err(CodecError::LimitsExceeded {
608 kind: LimitKind::TotalEntitiesAfterApply,
609 limit: limits.max_total_entities_after_apply,
610 actual: remaining.len(),
611 });
612 }
613 apply_updates(&mut remaining, &updates)?;
614
615 Ok(Snapshot {
616 tick: SnapshotTick::new(header.tick),
617 entities: remaining,
618 })
619}
620
621pub fn decode_delta_packet(
623 schema: &schema::Schema,
624 packet: &WirePacket<'_>,
625 limits: &CodecLimits,
626) -> CodecResult<DeltaDecoded> {
627 let header = packet.header;
628 if !header.flags.is_delta_snapshot() {
629 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
630 flags: header.flags.raw(),
631 }));
632 }
633 if header.baseline_tick == 0 {
634 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
635 baseline_tick: header.baseline_tick,
636 flags: header.flags.raw(),
637 }));
638 }
639 let expected_hash = schema_hash(schema);
640 if header.schema_hash != expected_hash {
641 return Err(CodecError::SchemaMismatch {
642 expected: expected_hash,
643 found: header.schema_hash,
644 });
645 }
646
647 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
648
649 Ok(DeltaDecoded {
650 tick: SnapshotTick::new(header.tick),
651 baseline_tick: SnapshotTick::new(header.baseline_tick),
652 destroys,
653 creates,
654 updates,
655 })
656}
657
658#[derive(Default)]
659struct DiffCounts {
660 creates: usize,
661 updates: usize,
662 destroys: usize,
663}
664
665#[derive(Debug, Clone, Copy, PartialEq, Eq)]
666enum UpdateEncoding {
667 Masked,
668 Sparse,
669}
670
671fn diff_counts(
672 schema: &schema::Schema,
673 baseline: &Snapshot,
674 current: &Snapshot,
675 limits: &CodecLimits,
676 counts: &mut DiffCounts,
677) -> CodecResult<()> {
678 let mut i = 0usize;
679 let mut j = 0usize;
680 while i < baseline.entities.len() || j < current.entities.len() {
681 let base = baseline.entities.get(i);
682 let curr = current.entities.get(j);
683 match (base, curr) {
684 (Some(b), Some(c)) => {
685 if b.id.raw() < c.id.raw() {
686 counts.destroys += 1;
687 i += 1;
688 } else if b.id.raw() > c.id.raw() {
689 counts.creates += 1;
690 j += 1;
691 } else {
692 if entity_has_updates(schema, b, c, limits)? {
693 counts.updates += 1;
694 }
695 i += 1;
696 j += 1;
697 }
698 }
699 (Some(_), None) => {
700 counts.destroys += 1;
701 i += 1;
702 }
703 (None, Some(_)) => {
704 counts.creates += 1;
705 j += 1;
706 }
707 (None, None) => break,
708 }
709 }
710 Ok(())
711}
712
713fn select_update_encoding(
714 schema: &schema::Schema,
715 baseline: &Snapshot,
716 current: &Snapshot,
717 limits: &CodecLimits,
718 scratch: &mut CodecScratch,
719) -> CodecResult<UpdateEncoding> {
720 let component_count = schema.components.len();
721 let mut mask_bits = 0usize;
722 let mut sparse_bits = 0usize;
723 let mut baseline_iter = baseline.entities.iter();
724 let mut current_iter = current.entities.iter();
725 let mut baseline_next = baseline_iter.next();
726 let mut current_next = current_iter.next();
727
728 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
729 match base.id.cmp(&curr.id) {
730 Ordering::Less => {
731 baseline_next = baseline_iter.next();
732 }
733 Ordering::Greater => {
734 current_next = current_iter.next();
735 }
736 Ordering::Equal => {
737 for component in &schema.components {
738 let base_component = find_component(base, component.id);
739 let curr_component = find_component(curr, component.id);
740 if base_component.is_some() != curr_component.is_some() {
741 return Err(CodecError::InvalidMask {
742 kind: MaskKind::ComponentMask,
743 reason: MaskReason::ComponentPresenceMismatch {
744 component: component.id,
745 },
746 });
747 }
748 if let (Some(base_component), Some(curr_component)) =
749 (base_component, curr_component)
750 {
751 if base_component.fields.len() != component.fields.len()
752 || curr_component.fields.len() != component.fields.len()
753 {
754 return Err(CodecError::InvalidMask {
755 kind: MaskKind::FieldMask {
756 component: component.id,
757 },
758 reason: MaskReason::FieldCountMismatch {
759 expected: component.fields.len(),
760 actual: base_component
761 .fields
762 .len()
763 .max(curr_component.fields.len()),
764 },
765 });
766 }
767 if component.fields.len() > limits.max_fields_per_component {
768 return Err(CodecError::LimitsExceeded {
769 kind: LimitKind::FieldsPerComponent,
770 limit: limits.max_fields_per_component,
771 actual: component.fields.len(),
772 });
773 }
774 let (_, field_mask) = scratch
775 .component_and_field_masks_mut(component_count, component.fields.len());
776 let field_mask = compute_field_mask_into(
778 component,
779 base_component,
780 curr_component,
781 field_mask,
782 )?;
783 let changed = field_mask.iter().filter(|bit| **bit).count();
784 if changed > 0 {
785 let field_count = component.fields.len();
786 let index_bits =
787 required_bits(field_count.saturating_sub(1) as u64) as usize;
788 mask_bits += field_count;
791 sparse_bits += index_bits * changed;
792 sparse_bits += varu32_len(curr.id.raw()) * 8;
793 sparse_bits += varu32_len(component.id.get() as u32) * 8;
794 sparse_bits += varu32_len(changed as u32) * 8;
795 }
796 }
797 }
798
799 baseline_next = baseline_iter.next();
800 current_next = current_iter.next();
801 }
802 }
803 }
804
805 if mask_bits == 0 {
806 return Ok(UpdateEncoding::Masked);
807 }
808
809 if sparse_bits <= mask_bits {
810 Ok(UpdateEncoding::Sparse)
811 } else {
812 Ok(UpdateEncoding::Masked)
813 }
814}
815
816fn varu32_len(value: u32) -> usize {
817 if value < (1 << 7) {
818 1
819 } else if value < (1 << 14) {
820 2
821 } else if value < (1 << 21) {
822 3
823 } else if value < (1 << 28) {
824 4
825 } else {
826 5
827 }
828}
829
830fn ensure_entity_ids_sorted(ids: &[EntityId]) -> CodecResult<()> {
831 let mut prev: Option<u32> = None;
832 for id in ids {
833 let raw = id.raw();
834 if let Some(prev_id) = prev {
835 if raw <= prev_id {
836 return Err(CodecError::InvalidEntityOrder {
837 previous: prev_id,
838 current: raw,
839 });
840 }
841 }
842 prev = Some(raw);
843 }
844 Ok(())
845}
846
847fn encode_destroy_body_from_list(
848 destroys: &[EntityId],
849 limits: &CodecLimits,
850 writer: &mut BitWriter<'_>,
851) -> CodecResult<()> {
852 if destroys.len() > limits.max_entities_destroy {
853 return Err(CodecError::LimitsExceeded {
854 kind: LimitKind::EntitiesDestroy,
855 limit: limits.max_entities_destroy,
856 actual: destroys.len(),
857 });
858 }
859 writer.align_to_byte()?;
860 writer.write_varu32(destroys.len() as u32)?;
861 for id in destroys {
862 writer.align_to_byte()?;
863 writer.write_u32_aligned(id.raw())?;
864 }
865 writer.align_to_byte()?;
866 Ok(())
867}
868
869fn encode_create_body_from_list(
870 schema: &schema::Schema,
871 creates: &[EntitySnapshot],
872 limits: &CodecLimits,
873 writer: &mut BitWriter<'_>,
874) -> CodecResult<()> {
875 if creates.len() > limits.max_entities_create {
876 return Err(CodecError::LimitsExceeded {
877 kind: LimitKind::EntitiesCreate,
878 limit: limits.max_entities_create,
879 actual: creates.len(),
880 });
881 }
882 writer.align_to_byte()?;
883 writer.write_varu32(creates.len() as u32)?;
884 for entity in creates {
885 write_create_entity(schema, entity, limits, writer)?;
886 }
887 writer.align_to_byte()?;
888 Ok(())
889}
890
891fn validate_updates_for_encoding(
892 schema: &schema::Schema,
893 updates: &[DeltaUpdateEntity],
894 limits: &CodecLimits,
895 lookup: &ComponentLookup,
896) -> CodecResult<()> {
897 let mut prev: Option<u32> = None;
898 for entity_update in updates {
899 let id = entity_update.id.raw();
900 if let Some(prev_id) = prev {
901 if id <= prev_id {
902 return Err(CodecError::InvalidEntityOrder {
903 previous: prev_id,
904 current: id,
905 });
906 }
907 }
908 prev = Some(id);
909 if entity_update.components.len() > limits.max_components_per_entity {
910 return Err(CodecError::LimitsExceeded {
911 kind: LimitKind::ComponentsPerEntity,
912 limit: limits.max_components_per_entity,
913 actual: entity_update.components.len(),
914 });
915 }
916 for component_update in &entity_update.components {
917 let component = lookup.component(schema, component_update.id)?;
918 if component_update.fields.is_empty() {
919 return Err(CodecError::InvalidMask {
920 kind: MaskKind::FieldMask {
921 component: component_update.id,
922 },
923 reason: MaskReason::EmptyFieldMask {
924 component: component_update.id,
925 },
926 });
927 }
928 if component_update.fields.len() > limits.max_fields_per_component {
929 return Err(CodecError::LimitsExceeded {
930 kind: LimitKind::FieldsPerComponent,
931 limit: limits.max_fields_per_component,
932 actual: component_update.fields.len(),
933 });
934 }
935 let max_index = component.fields.len().saturating_sub(1);
936 for (field_idx, _value) in &component_update.fields {
937 if *field_idx >= component.fields.len() {
938 return Err(CodecError::InvalidMask {
939 kind: MaskKind::FieldMask {
940 component: component_update.id,
941 },
942 reason: MaskReason::InvalidFieldIndex {
943 field_index: *field_idx,
944 max: max_index,
945 },
946 });
947 }
948 }
949 }
950 }
951 Ok(())
952}
953
954fn encode_update_body_sparse_packed_from_updates(
955 schema: &schema::Schema,
956 updates: &[DeltaUpdateEntity],
957 limits: &CodecLimits,
958 lookup: &ComponentLookup,
959 writer: &mut BitWriter<'_>,
960) -> CodecResult<()> {
961 if updates.len() > limits.max_entities_update {
962 return Err(CodecError::LimitsExceeded {
963 kind: LimitKind::EntitiesUpdate,
964 limit: limits.max_entities_update,
965 actual: updates.len(),
966 });
967 }
968 let entry_count: usize = updates.iter().map(|entity| entity.components.len()).sum();
969 let entry_limit = limits
970 .max_entities_update
971 .saturating_mul(limits.max_components_per_entity);
972 if entry_count > entry_limit {
973 return Err(CodecError::LimitsExceeded {
974 kind: LimitKind::EntitiesUpdate,
975 limit: entry_limit,
976 actual: entry_count,
977 });
978 }
979 writer.align_to_byte()?;
980 writer.write_varu32(entry_count as u32)?;
981 for entity_update in updates {
982 let entity_id = entity_update.id.raw();
983 for component_update in &entity_update.components {
984 let component = lookup.component(schema, component_update.id)?;
985 writer.align_to_byte()?;
986 writer.write_varu32(entity_id)?;
987 writer.write_varu32(component.id.get() as u32)?;
988 writer.write_varu32(component_update.fields.len() as u32)?;
989 let index_bits = lookup.index_bits(component.id);
990 for (field_idx, value) in &component_update.fields {
991 if index_bits > 0 {
992 writer.write_bits(*field_idx as u64, index_bits)?;
993 }
994 write_field_value_sparse(
995 component.id,
996 component.fields[*field_idx],
997 *value,
998 writer,
999 )?;
1000 }
1001 }
1002 }
1003 writer.align_to_byte()?;
1004 Ok(())
1005}
1006
1007struct ComponentLookup {
1008 index: Vec<Option<usize>>,
1009 index_bits: Vec<u8>,
1010}
1011
1012impl ComponentLookup {
1013 fn component<'a>(
1014 &self,
1015 schema: &'a schema::Schema,
1016 id: ComponentId,
1017 ) -> CodecResult<&'a ComponentDef> {
1018 let idx = id.get() as usize;
1019 let Some(Some(component_index)) = self.index.get(idx) else {
1020 return Err(CodecError::InvalidMask {
1021 kind: MaskKind::ComponentMask,
1022 reason: MaskReason::UnknownComponent { component: id },
1023 });
1024 };
1025 Ok(&schema.components[*component_index])
1026 }
1027
1028 fn index_bits(&self, id: ComponentId) -> u8 {
1029 let idx = id.get() as usize;
1030 let Some(bits) = self.index_bits.get(idx).copied() else {
1031 return 0;
1032 };
1033 bits
1034 }
1035}
1036
1037fn build_component_lookup(schema: &schema::Schema) -> ComponentLookup {
1038 let max_id = schema
1039 .components
1040 .iter()
1041 .map(|component| component.id.get() as usize)
1042 .max()
1043 .unwrap_or(0);
1044 let mut index = vec![None; max_id + 1];
1045 let mut index_bits = vec![0u8; max_id + 1];
1046 for (component_index, component) in schema.components.iter().enumerate() {
1047 let id = component.id.get() as usize;
1048 index[id] = Some(component_index);
1049 let bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1050 index_bits[id] = bits;
1051 }
1052 ComponentLookup { index, index_bits }
1053}
1054
1055fn encode_destroy_body(
1056 baseline: &Snapshot,
1057 current: &Snapshot,
1058 destroy_count: usize,
1059 limits: &CodecLimits,
1060 writer: &mut BitWriter<'_>,
1061) -> CodecResult<()> {
1062 if destroy_count > limits.max_entities_destroy {
1063 return Err(CodecError::LimitsExceeded {
1064 kind: LimitKind::EntitiesDestroy,
1065 limit: limits.max_entities_destroy,
1066 actual: destroy_count,
1067 });
1068 }
1069
1070 writer.align_to_byte()?;
1071 writer.write_varu32(destroy_count as u32)?;
1072
1073 let mut i = 0usize;
1074 let mut j = 0usize;
1075 while i < baseline.entities.len() || j < current.entities.len() {
1076 let base = baseline.entities.get(i);
1077 let curr = current.entities.get(j);
1078 match (base, curr) {
1079 (Some(b), Some(c)) => {
1080 if b.id.raw() < c.id.raw() {
1081 writer.align_to_byte()?;
1082 writer.write_u32_aligned(b.id.raw())?;
1083 i += 1;
1084 } else if b.id.raw() > c.id.raw() {
1085 j += 1;
1086 } else {
1087 i += 1;
1088 j += 1;
1089 }
1090 }
1091 (Some(b), None) => {
1092 writer.align_to_byte()?;
1093 writer.write_u32_aligned(b.id.raw())?;
1094 i += 1;
1095 }
1096 (None, Some(_)) => {
1097 j += 1;
1098 }
1099 (None, None) => break,
1100 }
1101 }
1102
1103 writer.align_to_byte()?;
1104 Ok(())
1105}
1106
1107fn encode_create_body(
1108 schema: &schema::Schema,
1109 baseline: &Snapshot,
1110 current: &Snapshot,
1111 create_count: usize,
1112 limits: &CodecLimits,
1113 writer: &mut BitWriter<'_>,
1114) -> CodecResult<()> {
1115 if create_count > limits.max_entities_create {
1116 return Err(CodecError::LimitsExceeded {
1117 kind: LimitKind::EntitiesCreate,
1118 limit: limits.max_entities_create,
1119 actual: create_count,
1120 });
1121 }
1122
1123 writer.align_to_byte()?;
1124 writer.write_varu32(create_count as u32)?;
1125
1126 let mut i = 0usize;
1127 let mut j = 0usize;
1128 while i < baseline.entities.len() || j < current.entities.len() {
1129 let base = baseline.entities.get(i);
1130 let curr = current.entities.get(j);
1131 match (base, curr) {
1132 (Some(b), Some(c)) => {
1133 if b.id.raw() < c.id.raw() {
1134 i += 1;
1135 } else if b.id.raw() > c.id.raw() {
1136 write_create_entity(schema, c, limits, writer)?;
1137 j += 1;
1138 } else {
1139 i += 1;
1140 j += 1;
1141 }
1142 }
1143 (Some(_), None) => {
1144 i += 1;
1145 }
1146 (None, Some(c)) => {
1147 write_create_entity(schema, c, limits, writer)?;
1148 j += 1;
1149 }
1150 (None, None) => break,
1151 }
1152 }
1153
1154 writer.align_to_byte()?;
1155 Ok(())
1156}
1157
1158fn encode_update_body_masked(
1159 schema: &schema::Schema,
1160 baseline: &Snapshot,
1161 current: &Snapshot,
1162 update_count: usize,
1163 limits: &CodecLimits,
1164 scratch: &mut CodecScratch,
1165 writer: &mut BitWriter<'_>,
1166) -> CodecResult<()> {
1167 if update_count > limits.max_entities_update {
1168 return Err(CodecError::LimitsExceeded {
1169 kind: LimitKind::EntitiesUpdate,
1170 limit: limits.max_entities_update,
1171 actual: update_count,
1172 });
1173 }
1174
1175 writer.align_to_byte()?;
1176 writer.write_varu32(update_count as u32)?;
1177
1178 let mut i = 0usize;
1179 let mut j = 0usize;
1180 while i < baseline.entities.len() || j < current.entities.len() {
1181 let base = baseline.entities.get(i);
1182 let curr = current.entities.get(j);
1183 match (base, curr) {
1184 (Some(b), Some(c)) => {
1185 if b.id.raw() < c.id.raw() {
1186 i += 1;
1187 } else if b.id.raw() > c.id.raw() {
1188 j += 1;
1189 } else {
1190 if entity_has_updates(schema, b, c, limits)? {
1191 writer.align_to_byte()?;
1192 writer.write_u32_aligned(c.id.raw())?;
1193 ensure_component_presence_matches(schema, b, c)?;
1194 write_update_components(schema, b, c, limits, scratch, writer)?;
1195 }
1196 i += 1;
1197 j += 1;
1198 }
1199 }
1200 (Some(_), None) => i += 1,
1201 (None, Some(_)) => j += 1,
1202 (None, None) => break,
1203 }
1204 }
1205
1206 writer.align_to_byte()?;
1207 Ok(())
1208}
1209
1210#[allow(dead_code)]
1211fn encode_update_body_sparse_varint(
1212 schema: &schema::Schema,
1213 baseline: &Snapshot,
1214 current: &Snapshot,
1215 update_count: usize,
1216 limits: &CodecLimits,
1217 scratch: &mut CodecScratch,
1218 writer: &mut BitWriter<'_>,
1219) -> CodecResult<()> {
1220 if update_count > limits.max_entities_update {
1221 return Err(CodecError::LimitsExceeded {
1222 kind: LimitKind::EntitiesUpdate,
1223 limit: limits.max_entities_update,
1224 actual: update_count,
1225 });
1226 }
1227
1228 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1229 let entry_limit = limits
1230 .max_entities_update
1231 .saturating_mul(limits.max_components_per_entity);
1232 if entry_count > entry_limit {
1233 return Err(CodecError::LimitsExceeded {
1234 kind: LimitKind::EntitiesUpdate,
1235 limit: entry_limit,
1236 actual: entry_count,
1237 });
1238 }
1239
1240 writer.align_to_byte()?;
1241 writer.write_varu32(entry_count as u32)?;
1242
1243 let mut baseline_iter = baseline.entities.iter();
1244 let mut current_iter = current.entities.iter();
1245 let mut baseline_next = baseline_iter.next();
1246 let mut current_next = current_iter.next();
1247 let component_count = schema.components.len();
1248
1249 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1250 match base.id.cmp(&curr.id) {
1251 Ordering::Less => {
1252 baseline_next = baseline_iter.next();
1253 }
1254 Ordering::Greater => {
1255 current_next = current_iter.next();
1256 }
1257 Ordering::Equal => {
1258 if entity_has_updates(schema, base, curr, limits)? {
1259 for component in &schema.components {
1260 let base_component = find_component(base, component.id);
1261 let curr_component = find_component(curr, component.id);
1262 if base_component.is_some() != curr_component.is_some() {
1263 return Err(CodecError::InvalidMask {
1264 kind: MaskKind::ComponentMask,
1265 reason: MaskReason::ComponentPresenceMismatch {
1266 component: component.id,
1267 },
1268 });
1269 }
1270 if let (Some(base_component), Some(curr_component)) =
1271 (base_component, curr_component)
1272 {
1273 if base_component.fields.len() != component.fields.len()
1274 || curr_component.fields.len() != component.fields.len()
1275 {
1276 return Err(CodecError::InvalidMask {
1277 kind: MaskKind::FieldMask {
1278 component: component.id,
1279 },
1280 reason: MaskReason::FieldCountMismatch {
1281 expected: component.fields.len(),
1282 actual: base_component
1283 .fields
1284 .len()
1285 .max(curr_component.fields.len()),
1286 },
1287 });
1288 }
1289 if component.fields.len() > limits.max_fields_per_component {
1290 return Err(CodecError::LimitsExceeded {
1291 kind: LimitKind::FieldsPerComponent,
1292 limit: limits.max_fields_per_component,
1293 actual: component.fields.len(),
1294 });
1295 }
1296 let (_, field_mask) = scratch.component_and_field_masks_mut(
1297 component_count,
1298 component.fields.len(),
1299 );
1300 let field_mask = compute_field_mask_into(
1301 component,
1302 base_component,
1303 curr_component,
1304 field_mask,
1305 )?;
1306 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1307 if changed_fields > 0 {
1308 writer.align_to_byte()?;
1309 writer.write_u32_aligned(curr.id.raw())?;
1310 writer.write_u16_aligned(component.id.get())?;
1311 writer.write_varu32(changed_fields as u32)?;
1312 for (idx, field) in component.fields.iter().enumerate() {
1313 if field_mask[idx] {
1314 writer.align_to_byte()?;
1315 writer.write_varu32(idx as u32)?;
1316 write_field_value_sparse(
1317 component.id,
1318 *field,
1319 curr_component.fields[idx],
1320 writer,
1321 )?;
1322 }
1323 }
1324 }
1325 }
1326 }
1327 }
1328 baseline_next = baseline_iter.next();
1329 current_next = current_iter.next();
1330 }
1331 }
1332 }
1333
1334 writer.align_to_byte()?;
1335 Ok(())
1336}
1337
1338fn encode_update_body_sparse_packed(
1339 schema: &schema::Schema,
1340 baseline: &Snapshot,
1341 current: &Snapshot,
1342 update_count: usize,
1343 limits: &CodecLimits,
1344 scratch: &mut CodecScratch,
1345 writer: &mut BitWriter<'_>,
1346) -> CodecResult<()> {
1347 if update_count > limits.max_entities_update {
1348 return Err(CodecError::LimitsExceeded {
1349 kind: LimitKind::EntitiesUpdate,
1350 limit: limits.max_entities_update,
1351 actual: update_count,
1352 });
1353 }
1354
1355 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
1356 let entry_limit = limits
1357 .max_entities_update
1358 .saturating_mul(limits.max_components_per_entity);
1359 if entry_count > entry_limit {
1360 return Err(CodecError::LimitsExceeded {
1361 kind: LimitKind::EntitiesUpdate,
1362 limit: entry_limit,
1363 actual: entry_count,
1364 });
1365 }
1366
1367 writer.align_to_byte()?;
1368 writer.write_varu32(entry_count as u32)?;
1369
1370 let mut baseline_iter = baseline.entities.iter();
1371 let mut current_iter = current.entities.iter();
1372 let mut baseline_next = baseline_iter.next();
1373 let mut current_next = current_iter.next();
1374 let component_count = schema.components.len();
1375
1376 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1377 match base.id.cmp(&curr.id) {
1378 Ordering::Less => {
1379 baseline_next = baseline_iter.next();
1380 }
1381 Ordering::Greater => {
1382 current_next = current_iter.next();
1383 }
1384 Ordering::Equal => {
1385 if entity_has_updates(schema, base, curr, limits)? {
1386 for component in &schema.components {
1387 let base_component = find_component(base, component.id);
1388 let curr_component = find_component(curr, component.id);
1389 if base_component.is_some() != curr_component.is_some() {
1390 return Err(CodecError::InvalidMask {
1391 kind: MaskKind::ComponentMask,
1392 reason: MaskReason::ComponentPresenceMismatch {
1393 component: component.id,
1394 },
1395 });
1396 }
1397 if let (Some(base_component), Some(curr_component)) =
1398 (base_component, curr_component)
1399 {
1400 if base_component.fields.len() != component.fields.len()
1401 || curr_component.fields.len() != component.fields.len()
1402 {
1403 return Err(CodecError::InvalidMask {
1404 kind: MaskKind::FieldMask {
1405 component: component.id,
1406 },
1407 reason: MaskReason::FieldCountMismatch {
1408 expected: component.fields.len(),
1409 actual: base_component
1410 .fields
1411 .len()
1412 .max(curr_component.fields.len()),
1413 },
1414 });
1415 }
1416 if component.fields.len() > limits.max_fields_per_component {
1417 return Err(CodecError::LimitsExceeded {
1418 kind: LimitKind::FieldsPerComponent,
1419 limit: limits.max_fields_per_component,
1420 actual: component.fields.len(),
1421 });
1422 }
1423 let (_, field_mask) = scratch.component_and_field_masks_mut(
1424 component_count,
1425 component.fields.len(),
1426 );
1427 let field_mask = compute_field_mask_into(
1428 component,
1429 base_component,
1430 curr_component,
1431 field_mask,
1432 )?;
1433 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1434 if changed_fields > 0 {
1435 writer.align_to_byte()?;
1436 writer.write_varu32(curr.id.raw())?;
1437 writer.write_varu32(component.id.get() as u32)?;
1438 writer.write_varu32(changed_fields as u32)?;
1439 let index_bits =
1440 required_bits(component.fields.len().saturating_sub(1) as u64);
1441 for (idx, field) in component.fields.iter().enumerate() {
1442 if field_mask[idx] {
1443 if index_bits > 0 {
1444 writer.write_bits(idx as u64, index_bits)?;
1445 }
1446 write_field_value_sparse(
1447 component.id,
1448 *field,
1449 curr_component.fields[idx],
1450 writer,
1451 )?;
1452 }
1453 }
1454 }
1455 }
1456 }
1457 }
1458 baseline_next = baseline_iter.next();
1459 current_next = current_iter.next();
1460 }
1461 }
1462 }
1463
1464 writer.align_to_byte()?;
1465 Ok(())
1466}
1467
1468fn count_sparse_update_entries(
1469 schema: &schema::Schema,
1470 baseline: &Snapshot,
1471 current: &Snapshot,
1472 limits: &CodecLimits,
1473 scratch: &mut CodecScratch,
1474) -> CodecResult<usize> {
1475 let component_count = schema.components.len();
1476 let mut count = 0usize;
1477 let mut baseline_iter = baseline.entities.iter();
1478 let mut current_iter = current.entities.iter();
1479 let mut baseline_next = baseline_iter.next();
1480 let mut current_next = current_iter.next();
1481
1482 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1483 match base.id.cmp(&curr.id) {
1484 Ordering::Less => baseline_next = baseline_iter.next(),
1485 Ordering::Greater => current_next = current_iter.next(),
1486 Ordering::Equal => {
1487 if entity_has_updates(schema, base, curr, limits)? {
1488 for component in &schema.components {
1489 let base_component = find_component(base, component.id);
1490 let curr_component = find_component(curr, component.id);
1491 if base_component.is_some() != curr_component.is_some() {
1492 return Err(CodecError::InvalidMask {
1493 kind: MaskKind::ComponentMask,
1494 reason: MaskReason::ComponentPresenceMismatch {
1495 component: component.id,
1496 },
1497 });
1498 }
1499 if let (Some(base_component), Some(curr_component)) =
1500 (base_component, curr_component)
1501 {
1502 if base_component.fields.len() != component.fields.len()
1503 || curr_component.fields.len() != component.fields.len()
1504 {
1505 return Err(CodecError::InvalidMask {
1506 kind: MaskKind::FieldMask {
1507 component: component.id,
1508 },
1509 reason: MaskReason::FieldCountMismatch {
1510 expected: component.fields.len(),
1511 actual: base_component
1512 .fields
1513 .len()
1514 .max(curr_component.fields.len()),
1515 },
1516 });
1517 }
1518 if component.fields.len() > limits.max_fields_per_component {
1519 return Err(CodecError::LimitsExceeded {
1520 kind: LimitKind::FieldsPerComponent,
1521 limit: limits.max_fields_per_component,
1522 actual: component.fields.len(),
1523 });
1524 }
1525 let (_, field_mask) = scratch.component_and_field_masks_mut(
1526 component_count,
1527 component.fields.len(),
1528 );
1529 let field_mask = compute_field_mask_into(
1530 component,
1531 base_component,
1532 curr_component,
1533 field_mask,
1534 )?;
1535 if field_mask.iter().any(|bit| *bit) {
1536 count += 1;
1537 }
1538 }
1539 }
1540 }
1541 baseline_next = baseline_iter.next();
1542 current_next = current_iter.next();
1543 }
1544 }
1545 }
1546
1547 Ok(count)
1548}
1549
1550fn write_create_entity(
1551 schema: &schema::Schema,
1552 entity: &EntitySnapshot,
1553 limits: &CodecLimits,
1554 writer: &mut BitWriter<'_>,
1555) -> CodecResult<()> {
1556 writer.align_to_byte()?;
1557 writer.write_u32_aligned(entity.id.raw())?;
1558 ensure_known_components(schema, entity)?;
1559 write_component_mask(schema, entity, writer)?;
1560 for component in schema.components.iter() {
1561 if let Some(snapshot) = find_component(entity, component.id) {
1562 write_full_component(component, snapshot, limits, writer)?;
1563 }
1564 }
1565 Ok(())
1566}
1567
1568fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1569 if body.len() > limits.max_section_bytes {
1570 return Err(CodecError::LimitsExceeded {
1571 kind: LimitKind::SectionBytes,
1572 limit: limits.max_section_bytes,
1573 actual: body.len(),
1574 });
1575 }
1576
1577 let mut reader = BitReader::new(body);
1578 reader.align_to_byte()?;
1579 let count = reader.read_varu32()? as usize;
1580 if count > limits.max_entities_destroy {
1581 return Err(CodecError::LimitsExceeded {
1582 kind: LimitKind::EntitiesDestroy,
1583 limit: limits.max_entities_destroy,
1584 actual: count,
1585 });
1586 }
1587
1588 let mut ids = Vec::with_capacity(count);
1589 let mut prev: Option<u32> = None;
1590 for _ in 0..count {
1591 reader.align_to_byte()?;
1592 let id = reader.read_u32_aligned()?;
1593 if let Some(prev_id) = prev {
1594 if id <= prev_id {
1595 return Err(CodecError::InvalidEntityOrder {
1596 previous: prev_id,
1597 current: id,
1598 });
1599 }
1600 }
1601 prev = Some(id);
1602 ids.push(EntityId::new(id));
1603 }
1604 reader.align_to_byte()?;
1605 if reader.bits_remaining() != 0 {
1606 return Err(CodecError::TrailingSectionData {
1607 section: SectionTag::EntityDestroy,
1608 remaining_bits: reader.bits_remaining(),
1609 });
1610 }
1611 Ok(ids)
1612}
1613
1614fn decode_create_section(
1615 schema: &schema::Schema,
1616 body: &[u8],
1617 limits: &CodecLimits,
1618) -> CodecResult<Vec<EntitySnapshot>> {
1619 if body.len() > limits.max_section_bytes {
1620 return Err(CodecError::LimitsExceeded {
1621 kind: LimitKind::SectionBytes,
1622 limit: limits.max_section_bytes,
1623 actual: body.len(),
1624 });
1625 }
1626
1627 let mut reader = BitReader::new(body);
1628 reader.align_to_byte()?;
1629 let count = reader.read_varu32()? as usize;
1630 if count > limits.max_entities_create {
1631 return Err(CodecError::LimitsExceeded {
1632 kind: LimitKind::EntitiesCreate,
1633 limit: limits.max_entities_create,
1634 actual: count,
1635 });
1636 }
1637
1638 let mut entities = Vec::with_capacity(count);
1639 let mut prev: Option<u32> = None;
1640 for _ in 0..count {
1641 reader.align_to_byte()?;
1642 let id = reader.read_u32_aligned()?;
1643 if let Some(prev_id) = prev {
1644 if id <= prev_id {
1645 return Err(CodecError::InvalidEntityOrder {
1646 previous: prev_id,
1647 current: id,
1648 });
1649 }
1650 }
1651 prev = Some(id);
1652
1653 let component_mask = read_mask(
1654 &mut reader,
1655 schema.components.len(),
1656 MaskKind::ComponentMask,
1657 )?;
1658
1659 let mut components = Vec::new();
1660 for (idx, component) in schema.components.iter().enumerate() {
1661 if component_mask[idx] {
1662 let fields = decode_full_component(component, &mut reader, limits)?;
1663 components.push(ComponentSnapshot {
1664 id: component.id,
1665 fields,
1666 });
1667 }
1668 }
1669
1670 let entity = EntitySnapshot {
1671 id: EntityId::new(id),
1672 components,
1673 };
1674 ensure_known_components(schema, &entity)?;
1675 entities.push(entity);
1676 }
1677
1678 reader.align_to_byte()?;
1679 if reader.bits_remaining() != 0 {
1680 return Err(CodecError::TrailingSectionData {
1681 section: SectionTag::EntityCreate,
1682 remaining_bits: reader.bits_remaining(),
1683 });
1684 }
1685 Ok(entities)
1686}
1687
1688fn decode_update_section_masked(
1689 schema: &schema::Schema,
1690 body: &[u8],
1691 limits: &CodecLimits,
1692) -> CodecResult<Vec<DeltaUpdateEntity>> {
1693 if body.len() > limits.max_section_bytes {
1694 return Err(CodecError::LimitsExceeded {
1695 kind: LimitKind::SectionBytes,
1696 limit: limits.max_section_bytes,
1697 actual: body.len(),
1698 });
1699 }
1700
1701 let mut reader = BitReader::new(body);
1702 reader.align_to_byte()?;
1703 let count = reader.read_varu32()? as usize;
1704 if count > limits.max_entities_update {
1705 return Err(CodecError::LimitsExceeded {
1706 kind: LimitKind::EntitiesUpdate,
1707 limit: limits.max_entities_update,
1708 actual: count,
1709 });
1710 }
1711
1712 let mut updates = Vec::with_capacity(count);
1713 let mut prev: Option<u32> = None;
1714 for _ in 0..count {
1715 reader.align_to_byte()?;
1716 let id = reader.read_u32_aligned()?;
1717 if let Some(prev_id) = prev {
1718 if id <= prev_id {
1719 return Err(CodecError::InvalidEntityOrder {
1720 previous: prev_id,
1721 current: id,
1722 });
1723 }
1724 }
1725 prev = Some(id);
1726
1727 let component_mask = read_mask(
1728 &mut reader,
1729 schema.components.len(),
1730 MaskKind::ComponentMask,
1731 )?;
1732 let mut components = Vec::new();
1733 for (idx, component) in schema.components.iter().enumerate() {
1734 if component_mask[idx] {
1735 let fields = decode_update_component(component, &mut reader, limits)?;
1736 components.push(DeltaUpdateComponent {
1737 id: component.id,
1738 fields,
1739 });
1740 }
1741 }
1742
1743 updates.push(DeltaUpdateEntity {
1744 id: EntityId::new(id),
1745 components,
1746 });
1747 }
1748
1749 reader.align_to_byte()?;
1750 if reader.bits_remaining() != 0 {
1751 return Err(CodecError::TrailingSectionData {
1752 section: SectionTag::EntityUpdate,
1753 remaining_bits: reader.bits_remaining(),
1754 });
1755 }
1756 Ok(updates)
1757}
1758
1759fn decode_update_section_sparse_varint(
1760 schema: &schema::Schema,
1761 body: &[u8],
1762 limits: &CodecLimits,
1763) -> CodecResult<Vec<DeltaUpdateEntity>> {
1764 if body.len() > limits.max_section_bytes {
1765 return Err(CodecError::LimitsExceeded {
1766 kind: LimitKind::SectionBytes,
1767 limit: limits.max_section_bytes,
1768 actual: body.len(),
1769 });
1770 }
1771
1772 let mut reader = BitReader::new(body);
1773 reader.align_to_byte()?;
1774 let entry_count = reader.read_varu32()? as usize;
1775 let entry_limit = limits
1776 .max_entities_update
1777 .saturating_mul(limits.max_components_per_entity);
1778 if entry_count > entry_limit {
1779 return Err(CodecError::LimitsExceeded {
1780 kind: LimitKind::EntitiesUpdate,
1781 limit: entry_limit,
1782 actual: entry_count,
1783 });
1784 }
1785
1786 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1787 let mut prev_entity: Option<u32> = None;
1788 let mut prev_component: Option<u16> = None;
1789 for _ in 0..entry_count {
1790 reader.align_to_byte()?;
1791 let entity_id = reader.read_u32_aligned()?;
1792 let component_raw = reader.read_u16_aligned()?;
1793 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1794 kind: MaskKind::ComponentMask,
1795 reason: MaskReason::InvalidComponentId { raw: component_raw },
1796 })?;
1797 if let Some(prev) = prev_entity {
1798 if entity_id < prev {
1799 return Err(CodecError::InvalidEntityOrder {
1800 previous: prev,
1801 current: entity_id,
1802 });
1803 }
1804 if entity_id == prev {
1805 if let Some(prev_component) = prev_component {
1806 if component_raw <= prev_component {
1807 return Err(CodecError::InvalidEntityOrder {
1808 previous: prev,
1809 current: entity_id,
1810 });
1811 }
1812 }
1813 }
1814 }
1815 prev_entity = Some(entity_id);
1816 prev_component = Some(component_raw);
1817
1818 let component = schema
1819 .components
1820 .iter()
1821 .find(|component| component.id == component_id)
1822 .ok_or(CodecError::InvalidMask {
1823 kind: MaskKind::ComponentMask,
1824 reason: MaskReason::UnknownComponent {
1825 component: component_id,
1826 },
1827 })?;
1828
1829 let field_count = reader.read_varu32()? as usize;
1830 if field_count == 0 {
1831 return Err(CodecError::InvalidMask {
1832 kind: MaskKind::FieldMask {
1833 component: component.id,
1834 },
1835 reason: MaskReason::EmptyFieldMask {
1836 component: component.id,
1837 },
1838 });
1839 }
1840 if field_count > limits.max_fields_per_component {
1841 return Err(CodecError::LimitsExceeded {
1842 kind: LimitKind::FieldsPerComponent,
1843 limit: limits.max_fields_per_component,
1844 actual: field_count,
1845 });
1846 }
1847 if field_count > component.fields.len() {
1848 return Err(CodecError::InvalidMask {
1849 kind: MaskKind::FieldMask {
1850 component: component.id,
1851 },
1852 reason: MaskReason::FieldCountMismatch {
1853 expected: component.fields.len(),
1854 actual: field_count,
1855 },
1856 });
1857 }
1858
1859 let mut fields = Vec::with_capacity(field_count);
1860 let mut prev_index: Option<usize> = None;
1861 for _ in 0..field_count {
1862 reader.align_to_byte()?;
1863 let field_index = reader.read_varu32()? as usize;
1864 if field_index >= component.fields.len() {
1865 return Err(CodecError::InvalidMask {
1866 kind: MaskKind::FieldMask {
1867 component: component.id,
1868 },
1869 reason: MaskReason::InvalidFieldIndex {
1870 field_index,
1871 max: component.fields.len(),
1872 },
1873 });
1874 }
1875 if let Some(prev_index) = prev_index {
1876 if field_index <= prev_index {
1877 return Err(CodecError::InvalidMask {
1878 kind: MaskKind::FieldMask {
1879 component: component.id,
1880 },
1881 reason: MaskReason::InvalidFieldIndex {
1882 field_index,
1883 max: component.fields.len(),
1884 },
1885 });
1886 }
1887 }
1888 prev_index = Some(field_index);
1889 let field = component.fields[field_index];
1890 let value = read_field_value_sparse(component.id, field, &mut reader)?;
1891 fields.push((field_index, value));
1892 }
1893
1894 if let Some(last) = updates.last_mut() {
1895 if last.id.raw() == entity_id {
1896 last.components.push(DeltaUpdateComponent {
1897 id: component.id,
1898 fields,
1899 });
1900 continue;
1901 }
1902 }
1903
1904 updates.push(DeltaUpdateEntity {
1905 id: EntityId::new(entity_id),
1906 components: vec![DeltaUpdateComponent {
1907 id: component.id,
1908 fields,
1909 }],
1910 });
1911 }
1912
1913 reader.align_to_byte()?;
1914 if reader.bits_remaining() != 0 {
1915 return Err(CodecError::TrailingSectionData {
1916 section: SectionTag::EntityUpdateSparse,
1917 remaining_bits: reader.bits_remaining(),
1918 });
1919 }
1920
1921 let unique_entities = updates.len();
1922 if unique_entities > limits.max_entities_update {
1923 return Err(CodecError::LimitsExceeded {
1924 kind: LimitKind::EntitiesUpdate,
1925 limit: limits.max_entities_update,
1926 actual: unique_entities,
1927 });
1928 }
1929
1930 Ok(updates)
1931}
1932
1933fn decode_update_section_sparse_packed(
1934 schema: &schema::Schema,
1935 body: &[u8],
1936 limits: &CodecLimits,
1937) -> CodecResult<Vec<DeltaUpdateEntity>> {
1938 if body.len() > limits.max_section_bytes {
1939 return Err(CodecError::LimitsExceeded {
1940 kind: LimitKind::SectionBytes,
1941 limit: limits.max_section_bytes,
1942 actual: body.len(),
1943 });
1944 }
1945
1946 let mut reader = BitReader::new(body);
1947 reader.align_to_byte()?;
1948 let entry_count = reader.read_varu32()? as usize;
1949 let entry_limit = limits
1950 .max_entities_update
1951 .saturating_mul(limits.max_components_per_entity);
1952 if entry_count > entry_limit {
1953 return Err(CodecError::LimitsExceeded {
1954 kind: LimitKind::EntitiesUpdate,
1955 limit: entry_limit,
1956 actual: entry_count,
1957 });
1958 }
1959
1960 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1961 let mut prev_entity: Option<u32> = None;
1962 let mut prev_component: Option<u16> = None;
1963 for _ in 0..entry_count {
1964 reader.align_to_byte()?;
1965 let entity_id = reader.read_varu32()?;
1966 let component_raw = reader.read_varu32()?;
1967 if component_raw > u16::MAX as u32 {
1968 return Err(CodecError::InvalidMask {
1969 kind: MaskKind::ComponentMask,
1970 reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1971 });
1972 }
1973 let component_raw = component_raw as u16;
1974 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1975 kind: MaskKind::ComponentMask,
1976 reason: MaskReason::InvalidComponentId { raw: component_raw },
1977 })?;
1978 if let Some(prev) = prev_entity {
1979 if entity_id < prev {
1980 return Err(CodecError::InvalidEntityOrder {
1981 previous: prev,
1982 current: entity_id,
1983 });
1984 }
1985 if entity_id == prev {
1986 if let Some(prev_component) = prev_component {
1987 if component_raw <= prev_component {
1988 return Err(CodecError::InvalidEntityOrder {
1989 previous: prev,
1990 current: entity_id,
1991 });
1992 }
1993 }
1994 }
1995 }
1996 prev_entity = Some(entity_id);
1997 prev_component = Some(component_raw);
1998
1999 let component = schema
2000 .components
2001 .iter()
2002 .find(|component| component.id == component_id)
2003 .ok_or(CodecError::InvalidMask {
2004 kind: MaskKind::ComponentMask,
2005 reason: MaskReason::UnknownComponent {
2006 component: component_id,
2007 },
2008 })?;
2009
2010 let field_count = reader.read_varu32()? as usize;
2011 if field_count == 0 {
2012 return Err(CodecError::InvalidMask {
2013 kind: MaskKind::FieldMask {
2014 component: component.id,
2015 },
2016 reason: MaskReason::EmptyFieldMask {
2017 component: component.id,
2018 },
2019 });
2020 }
2021 if field_count > limits.max_fields_per_component {
2022 return Err(CodecError::LimitsExceeded {
2023 kind: LimitKind::FieldsPerComponent,
2024 limit: limits.max_fields_per_component,
2025 actual: field_count,
2026 });
2027 }
2028 if field_count > component.fields.len() {
2029 return Err(CodecError::InvalidMask {
2030 kind: MaskKind::FieldMask {
2031 component: component.id,
2032 },
2033 reason: MaskReason::FieldCountMismatch {
2034 expected: component.fields.len(),
2035 actual: field_count,
2036 },
2037 });
2038 }
2039
2040 let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
2041 let mut fields = Vec::with_capacity(field_count);
2042 let mut prev_index: Option<usize> = None;
2043 for _ in 0..field_count {
2044 let field_index = if index_bits == 0 {
2045 0
2046 } else {
2047 reader.read_bits(index_bits)? as usize
2048 };
2049 if field_index >= component.fields.len() {
2050 return Err(CodecError::InvalidMask {
2051 kind: MaskKind::FieldMask {
2052 component: component.id,
2053 },
2054 reason: MaskReason::InvalidFieldIndex {
2055 field_index,
2056 max: component.fields.len(),
2057 },
2058 });
2059 }
2060 if let Some(prev_index) = prev_index {
2061 if field_index <= prev_index {
2062 return Err(CodecError::InvalidMask {
2063 kind: MaskKind::FieldMask {
2064 component: component.id,
2065 },
2066 reason: MaskReason::InvalidFieldIndex {
2067 field_index,
2068 max: component.fields.len(),
2069 },
2070 });
2071 }
2072 }
2073 prev_index = Some(field_index);
2074 let field = component.fields[field_index];
2075 let value = read_field_value_sparse(component.id, field, &mut reader)?;
2076 fields.push((field_index, value));
2077 }
2078
2079 if let Some(last) = updates.last_mut() {
2080 if last.id.raw() == entity_id {
2081 last.components.push(DeltaUpdateComponent {
2082 id: component.id,
2083 fields,
2084 });
2085 continue;
2086 }
2087 }
2088
2089 updates.push(DeltaUpdateEntity {
2090 id: EntityId::new(entity_id),
2091 components: vec![DeltaUpdateComponent {
2092 id: component.id,
2093 fields,
2094 }],
2095 });
2096 }
2097
2098 reader.align_to_byte()?;
2099 if reader.bits_remaining() != 0 {
2100 return Err(CodecError::TrailingSectionData {
2101 section: SectionTag::EntityUpdateSparsePacked,
2102 remaining_bits: reader.bits_remaining(),
2103 });
2104 }
2105
2106 let unique_entities = updates.len();
2107 if unique_entities > limits.max_entities_update {
2108 return Err(CodecError::LimitsExceeded {
2109 kind: LimitKind::EntitiesUpdate,
2110 limit: limits.max_entities_update,
2111 actual: unique_entities,
2112 });
2113 }
2114
2115 Ok(updates)
2116}
2117
2118fn decode_delta_sections(
2119 schema: &schema::Schema,
2120 packet: &WirePacket<'_>,
2121 limits: &CodecLimits,
2122) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
2123 let mut destroys: Option<Vec<EntityId>> = None;
2124 let mut creates: Option<Vec<EntitySnapshot>> = None;
2125 let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
2126 let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
2127
2128 for section in &packet.sections {
2129 match section.tag {
2130 SectionTag::EntityDestroy => {
2131 if destroys.is_some() {
2132 return Err(CodecError::DuplicateSection {
2133 section: section.tag,
2134 });
2135 }
2136 destroys = Some(decode_destroy_section(section.body, limits)?);
2137 }
2138 SectionTag::EntityCreate => {
2139 if creates.is_some() {
2140 return Err(CodecError::DuplicateSection {
2141 section: section.tag,
2142 });
2143 }
2144 creates = Some(decode_create_section(schema, section.body, limits)?);
2145 }
2146 SectionTag::EntityUpdate => {
2147 if updates_masked.is_some() {
2148 return Err(CodecError::DuplicateSection {
2149 section: section.tag,
2150 });
2151 }
2152 updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
2153 }
2154 SectionTag::EntityUpdateSparse => {
2155 if updates_sparse.is_some() {
2156 return Err(CodecError::DuplicateSection {
2157 section: section.tag,
2158 });
2159 }
2160 updates_sparse = Some(decode_update_section_sparse_varint(
2161 schema,
2162 section.body,
2163 limits,
2164 )?);
2165 }
2166 SectionTag::EntityUpdateSparsePacked => {
2167 if updates_sparse.is_some() {
2168 return Err(CodecError::DuplicateSection {
2169 section: section.tag,
2170 });
2171 }
2172 updates_sparse = Some(decode_update_section_sparse_packed(
2173 schema,
2174 section.body,
2175 limits,
2176 )?);
2177 }
2178 _ => {
2179 return Err(CodecError::UnexpectedSection {
2180 section: section.tag,
2181 });
2182 }
2183 }
2184 }
2185
2186 Ok((
2187 destroys.unwrap_or_default(),
2188 creates.unwrap_or_default(),
2189 match (updates_masked, updates_sparse) {
2190 (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
2191 (Some(updates), None) => updates,
2192 (None, Some(updates)) => updates,
2193 (None, None) => Vec::new(),
2194 },
2195 ))
2196}
2197
2198#[derive(Debug, Clone, PartialEq, Eq)]
2199pub struct DeltaDecoded {
2200 pub tick: SnapshotTick,
2201 pub baseline_tick: SnapshotTick,
2202 pub destroys: Vec<EntityId>,
2203 pub creates: Vec<EntitySnapshot>,
2204 pub updates: Vec<DeltaUpdateEntity>,
2205}
2206
2207#[derive(Debug, Clone, PartialEq, Eq)]
2208pub struct DeltaUpdateEntity {
2209 pub id: EntityId,
2210 pub components: Vec<DeltaUpdateComponent>,
2211}
2212
2213#[derive(Debug, Clone, PartialEq, Eq)]
2214pub struct DeltaUpdateComponent {
2215 pub id: ComponentId,
2216 pub fields: Vec<(usize, FieldValue)>,
2217}
2218
2219fn apply_destroys(
2220 baseline: &[EntitySnapshot],
2221 destroys: &[EntityId],
2222) -> CodecResult<Vec<EntitySnapshot>> {
2223 let mut result = Vec::with_capacity(baseline.len());
2224 let mut i = 0usize;
2225 let mut j = 0usize;
2226 while i < baseline.len() || j < destroys.len() {
2227 let base = baseline.get(i);
2228 let destroy = destroys.get(j);
2229 match (base, destroy) {
2230 (Some(b), Some(d)) => {
2231 if b.id.raw() < d.raw() {
2232 result.push(b.clone());
2233 i += 1;
2234 } else if b.id.raw() > d.raw() {
2235 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2236 } else {
2237 i += 1;
2238 j += 1;
2239 }
2240 }
2241 (Some(b), None) => {
2242 result.push(b.clone());
2243 i += 1;
2244 }
2245 (None, Some(d)) => {
2246 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
2247 }
2248 (None, None) => break,
2249 }
2250 }
2251 Ok(result)
2252}
2253
2254fn apply_creates(
2255 baseline: Vec<EntitySnapshot>,
2256 creates: Vec<EntitySnapshot>,
2257) -> CodecResult<Vec<EntitySnapshot>> {
2258 let mut result = Vec::with_capacity(baseline.len() + creates.len());
2259 let mut i = 0usize;
2260 let mut j = 0usize;
2261 while i < baseline.len() || j < creates.len() {
2262 let base = baseline.get(i);
2263 let create = creates.get(j);
2264 match (base, create) {
2265 (Some(b), Some(c)) => {
2266 if b.id.raw() < c.id.raw() {
2267 result.push(b.clone());
2268 i += 1;
2269 } else if b.id.raw() > c.id.raw() {
2270 result.push(c.clone());
2271 j += 1;
2272 } else {
2273 return Err(CodecError::EntityAlreadyExists {
2274 entity_id: c.id.raw(),
2275 });
2276 }
2277 }
2278 (Some(b), None) => {
2279 result.push(b.clone());
2280 i += 1;
2281 }
2282 (None, Some(c)) => {
2283 result.push(c.clone());
2284 j += 1;
2285 }
2286 (None, None) => break,
2287 }
2288 }
2289 Ok(result)
2290}
2291
2292fn apply_updates(
2293 entities: &mut [EntitySnapshot],
2294 updates: &[DeltaUpdateEntity],
2295) -> CodecResult<()> {
2296 for update in updates {
2297 let idx = entities
2298 .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
2299 .map_err(|_| CodecError::EntityNotFound {
2300 entity_id: update.id.raw(),
2301 })?;
2302 let entity = &mut entities[idx];
2303 for component_update in &update.components {
2304 let component = entity
2305 .components
2306 .iter_mut()
2307 .find(|c| c.id == component_update.id)
2308 .ok_or_else(|| CodecError::ComponentNotFound {
2309 entity_id: update.id.raw(),
2310 component_id: component_update.id.get(),
2311 })?;
2312 for (field_idx, value) in &component_update.fields {
2313 if *field_idx >= component.fields.len() {
2314 return Err(CodecError::InvalidMask {
2315 kind: MaskKind::FieldMask {
2316 component: component_update.id,
2317 },
2318 reason: MaskReason::FieldCountMismatch {
2319 expected: component.fields.len(),
2320 actual: *field_idx + 1,
2321 },
2322 });
2323 }
2324 component.fields[*field_idx] = *value;
2325 }
2326 }
2327 }
2328 Ok(())
2329}
2330
2331fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
2332 let mut prev: Option<u32> = None;
2333 for entity in entities {
2334 if let Some(prev_id) = prev {
2335 if entity.id.raw() <= prev_id {
2336 return Err(CodecError::InvalidEntityOrder {
2337 previous: prev_id,
2338 current: entity.id.raw(),
2339 });
2340 }
2341 }
2342 prev = Some(entity.id.raw());
2343 }
2344 Ok(())
2345}
2346
2347fn write_component_mask(
2348 schema: &schema::Schema,
2349 entity: &EntitySnapshot,
2350 writer: &mut BitWriter<'_>,
2351) -> CodecResult<()> {
2352 for component in &schema.components {
2353 let present = find_component(entity, component.id).is_some();
2354 writer.write_bit(present)?;
2355 }
2356 Ok(())
2357}
2358
2359fn write_full_component(
2360 component: &ComponentDef,
2361 snapshot: &ComponentSnapshot,
2362 limits: &CodecLimits,
2363 writer: &mut BitWriter<'_>,
2364) -> CodecResult<()> {
2365 if snapshot.fields.len() != component.fields.len() {
2366 return Err(CodecError::InvalidMask {
2367 kind: MaskKind::FieldMask {
2368 component: component.id,
2369 },
2370 reason: MaskReason::FieldCountMismatch {
2371 expected: component.fields.len(),
2372 actual: snapshot.fields.len(),
2373 },
2374 });
2375 }
2376 if snapshot.fields.len() > limits.max_fields_per_component {
2377 return Err(CodecError::LimitsExceeded {
2378 kind: LimitKind::FieldsPerComponent,
2379 limit: limits.max_fields_per_component,
2380 actual: snapshot.fields.len(),
2381 });
2382 }
2383
2384 for _ in &component.fields {
2385 writer.write_bit(true)?;
2386 }
2387 for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
2388 write_field_value(component.id, *field, *value, writer)?;
2389 }
2390 Ok(())
2391}
2392
2393fn decode_full_component(
2394 component: &ComponentDef,
2395 reader: &mut BitReader<'_>,
2396 limits: &CodecLimits,
2397) -> CodecResult<Vec<FieldValue>> {
2398 if component.fields.len() > limits.max_fields_per_component {
2399 return Err(CodecError::LimitsExceeded {
2400 kind: LimitKind::FieldsPerComponent,
2401 limit: limits.max_fields_per_component,
2402 actual: component.fields.len(),
2403 });
2404 }
2405
2406 let mask = read_mask(
2407 reader,
2408 component.fields.len(),
2409 MaskKind::FieldMask {
2410 component: component.id,
2411 },
2412 )?;
2413 let mut values = Vec::with_capacity(component.fields.len());
2414 for (idx, field) in component.fields.iter().enumerate() {
2415 if !mask[idx] {
2416 return Err(CodecError::InvalidMask {
2417 kind: MaskKind::FieldMask {
2418 component: component.id,
2419 },
2420 reason: MaskReason::MissingField { field: field.id },
2421 });
2422 }
2423 values.push(read_field_value(component.id, *field, reader)?);
2424 }
2425 Ok(values)
2426}
2427
2428fn write_update_components(
2429 schema: &schema::Schema,
2430 baseline: &EntitySnapshot,
2431 current: &EntitySnapshot,
2432 limits: &CodecLimits,
2433 scratch: &mut CodecScratch,
2434 writer: &mut BitWriter<'_>,
2435) -> CodecResult<()> {
2436 let component_count = schema.components.len();
2437 let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2438 component_changed.fill(false);
2439 for (idx, component) in schema.components.iter().enumerate() {
2440 let base = find_component(baseline, component.id);
2441 let curr = find_component(current, component.id);
2442 if base.is_some() != curr.is_some() {
2443 return Err(CodecError::InvalidMask {
2444 kind: MaskKind::ComponentMask,
2445 reason: MaskReason::ComponentPresenceMismatch {
2446 component: component.id,
2447 },
2448 });
2449 }
2450 if let (Some(base), Some(curr)) = (base, curr) {
2451 if base.fields.len() != component.fields.len()
2452 || curr.fields.len() != component.fields.len()
2453 {
2454 return Err(CodecError::InvalidMask {
2455 kind: MaskKind::FieldMask {
2456 component: component.id,
2457 },
2458 reason: MaskReason::FieldCountMismatch {
2459 expected: component.fields.len(),
2460 actual: base.fields.len().max(curr.fields.len()),
2461 },
2462 });
2463 }
2464 if component.fields.len() > limits.max_fields_per_component {
2465 return Err(CodecError::LimitsExceeded {
2466 kind: LimitKind::FieldsPerComponent,
2467 limit: limits.max_fields_per_component,
2468 actual: component.fields.len(),
2469 });
2470 }
2471 let (component_changed, field_mask) =
2472 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2473 let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2474 .iter()
2475 .any(|b| *b);
2476 writer.write_bit(any_changed)?;
2477 if any_changed {
2478 component_changed[idx] = true;
2479 }
2480 } else {
2481 writer.write_bit(false)?;
2482 }
2483 }
2484
2485 for (idx, component) in schema.components.iter().enumerate() {
2486 let (base, curr) = match (
2487 find_component(baseline, component.id),
2488 find_component(current, component.id),
2489 ) {
2490 (Some(base), Some(curr)) => (base, curr),
2491 _ => continue,
2492 };
2493 if component.fields.len() > limits.max_fields_per_component {
2494 return Err(CodecError::LimitsExceeded {
2495 kind: LimitKind::FieldsPerComponent,
2496 limit: limits.max_fields_per_component,
2497 actual: component.fields.len(),
2498 });
2499 }
2500 let (component_changed, field_mask) =
2501 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2502 if component_changed[idx] {
2503 let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2504 for bit in field_mask {
2505 writer.write_bit(*bit)?;
2506 }
2507 for (((field, _base_val), curr_val), changed) in component
2508 .fields
2509 .iter()
2510 .zip(base.fields.iter())
2511 .zip(curr.fields.iter())
2512 .zip(field_mask.iter())
2513 {
2514 if *changed {
2515 write_field_value(component.id, *field, *curr_val, writer)?;
2516 }
2517 }
2518 }
2519 }
2520 Ok(())
2521}
2522
2523fn decode_update_component(
2524 component: &ComponentDef,
2525 reader: &mut BitReader<'_>,
2526 limits: &CodecLimits,
2527) -> CodecResult<Vec<(usize, FieldValue)>> {
2528 if component.fields.len() > limits.max_fields_per_component {
2529 return Err(CodecError::LimitsExceeded {
2530 kind: LimitKind::FieldsPerComponent,
2531 limit: limits.max_fields_per_component,
2532 actual: component.fields.len(),
2533 });
2534 }
2535 let mask = read_mask(
2536 reader,
2537 component.fields.len(),
2538 MaskKind::FieldMask {
2539 component: component.id,
2540 },
2541 )?;
2542 if !mask.iter().any(|b| *b) {
2543 return Err(CodecError::InvalidMask {
2544 kind: MaskKind::FieldMask {
2545 component: component.id,
2546 },
2547 reason: MaskReason::EmptyFieldMask {
2548 component: component.id,
2549 },
2550 });
2551 }
2552 let mut fields = Vec::new();
2553 for (idx, field) in component.fields.iter().enumerate() {
2554 if mask[idx] {
2555 let value = read_field_value(component.id, *field, reader)?;
2556 fields.push((idx, value));
2557 }
2558 }
2559 Ok(fields)
2560}
2561
2562fn compute_field_mask_into<'a>(
2563 component: &ComponentDef,
2564 baseline: &ComponentSnapshot,
2565 current: &ComponentSnapshot,
2566 field_mask: &'a mut [bool],
2567) -> CodecResult<&'a [bool]> {
2568 for (((field, base_val), curr_val), slot) in component
2569 .fields
2570 .iter()
2571 .zip(baseline.fields.iter())
2572 .zip(current.fields.iter())
2573 .zip(field_mask.iter_mut())
2574 {
2575 *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2576 }
2577 Ok(field_mask)
2578}
2579
2580fn field_changed(
2581 component_id: ComponentId,
2582 field: FieldDef,
2583 baseline: FieldValue,
2584 current: FieldValue,
2585) -> CodecResult<bool> {
2586 match field.change {
2587 ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2588 ChangePolicy::Threshold { threshold_q } => {
2589 field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2590 }
2591 }
2592}
2593
2594fn field_differs(
2595 component_id: ComponentId,
2596 field: FieldDef,
2597 baseline: FieldValue,
2598 current: FieldValue,
2599) -> CodecResult<bool> {
2600 match (baseline, current) {
2601 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2602 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2603 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2604 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2605 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2606 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2607 _ => Err(CodecError::InvalidValue {
2608 component: component_id,
2609 field: field.id,
2610 reason: ValueReason::TypeMismatch {
2611 expected: codec_name(field.codec),
2612 found: value_name(current),
2613 },
2614 }),
2615 }
2616}
2617
2618fn field_exceeds_threshold(
2619 component_id: ComponentId,
2620 field: FieldDef,
2621 baseline: FieldValue,
2622 current: FieldValue,
2623 threshold_q: u32,
2624) -> CodecResult<bool> {
2625 let threshold_q = threshold_q as u64;
2626 match (baseline, current) {
2627 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2628 Ok((a - b).unsigned_abs() > threshold_q)
2629 }
2630 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2631 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2632 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2633 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2634 Ok((a - b).unsigned_abs() > threshold_q)
2635 }
2636 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2637 _ => Err(CodecError::InvalidValue {
2638 component: component_id,
2639 field: field.id,
2640 reason: ValueReason::TypeMismatch {
2641 expected: codec_name(field.codec),
2642 found: value_name(current),
2643 },
2644 }),
2645 }
2646}
2647
2648fn entity_has_updates(
2649 schema: &schema::Schema,
2650 baseline: &EntitySnapshot,
2651 current: &EntitySnapshot,
2652 limits: &CodecLimits,
2653) -> CodecResult<bool> {
2654 ensure_component_presence_matches(schema, baseline, current)?;
2655 for component in &schema.components {
2656 let base = find_component(baseline, component.id);
2657 let curr = find_component(current, component.id);
2658 if let (Some(base), Some(curr)) = (base, curr) {
2659 if base.fields.len() != component.fields.len()
2660 || curr.fields.len() != component.fields.len()
2661 {
2662 return Err(CodecError::InvalidMask {
2663 kind: MaskKind::FieldMask {
2664 component: component.id,
2665 },
2666 reason: MaskReason::FieldCountMismatch {
2667 expected: component.fields.len(),
2668 actual: base.fields.len().max(curr.fields.len()),
2669 },
2670 });
2671 }
2672 if component.fields.len() > limits.max_fields_per_component {
2673 return Err(CodecError::LimitsExceeded {
2674 kind: LimitKind::FieldsPerComponent,
2675 limit: limits.max_fields_per_component,
2676 actual: component.fields.len(),
2677 });
2678 }
2679 for ((field, base_val), curr_val) in component
2680 .fields
2681 .iter()
2682 .zip(base.fields.iter())
2683 .zip(curr.fields.iter())
2684 {
2685 if field_changed(component.id, *field, *base_val, *curr_val)? {
2686 return Ok(true);
2687 }
2688 }
2689 }
2690 }
2691 Ok(false)
2692}
2693
2694fn ensure_component_presence_matches(
2695 schema: &schema::Schema,
2696 baseline: &EntitySnapshot,
2697 current: &EntitySnapshot,
2698) -> CodecResult<()> {
2699 for component in &schema.components {
2701 let base = find_component(baseline, component.id).is_some();
2702 let curr = find_component(current, component.id).is_some();
2703 if base != curr {
2704 return Err(CodecError::InvalidMask {
2705 kind: MaskKind::ComponentMask,
2706 reason: MaskReason::ComponentPresenceMismatch {
2707 component: component.id,
2708 },
2709 });
2710 }
2711 }
2712 Ok(())
2713}
2714
2715fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2716 entity.components.iter().find(|c| c.id == id)
2717}
2718
2719fn codec_name(codec: schema::FieldCodec) -> &'static str {
2720 match codec {
2721 schema::FieldCodec::Bool => "bool",
2722 schema::FieldCodec::UInt { .. } => "uint",
2723 schema::FieldCodec::SInt { .. } => "sint",
2724 schema::FieldCodec::VarUInt => "varuint",
2725 schema::FieldCodec::VarSInt => "varsint",
2726 schema::FieldCodec::FixedPoint(_) => "fixed-point",
2727 }
2728}
2729
2730fn value_name(value: FieldValue) -> &'static str {
2731 match value {
2732 FieldValue::Bool(_) => "bool",
2733 FieldValue::UInt(_) => "uint",
2734 FieldValue::SInt(_) => "sint",
2735 FieldValue::VarUInt(_) => "varuint",
2736 FieldValue::VarSInt(_) => "varsint",
2737 FieldValue::FixedPoint(_) => "fixed-point",
2738 }
2739}
2740
2741#[cfg(test)]
2742mod tests {
2743 use super::*;
2744 use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2745
2746 fn schema_one_bool() -> Schema {
2747 let component = ComponentDef::new(ComponentId::new(1).unwrap())
2748 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2749 Schema::new(vec![component]).unwrap()
2750 }
2751
2752 fn schema_uint_threshold(threshold_q: u32) -> Schema {
2753 let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2754 .change(ChangePolicy::Threshold { threshold_q });
2755 let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2756 Schema::new(vec![component]).unwrap()
2757 }
2758
2759 fn schema_two_components() -> Schema {
2760 let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2761 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2762 let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2763 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2764 Schema::new(vec![c1, c2]).unwrap()
2765 }
2766
2767 fn baseline_snapshot() -> Snapshot {
2768 Snapshot {
2769 tick: SnapshotTick::new(10),
2770 entities: vec![EntitySnapshot {
2771 id: EntityId::new(1),
2772 components: vec![ComponentSnapshot {
2773 id: ComponentId::new(1).unwrap(),
2774 fields: vec![FieldValue::Bool(false)],
2775 }],
2776 }],
2777 }
2778 }
2779
2780 #[test]
2781 fn no_op_delta_is_empty() {
2782 let schema = schema_one_bool();
2783 let baseline = baseline_snapshot();
2784 let current = baseline.clone();
2785 let mut buf = [0u8; 128];
2786 let bytes = encode_delta_snapshot(
2787 &schema,
2788 SnapshotTick::new(11),
2789 baseline.tick,
2790 &baseline,
2791 ¤t,
2792 &CodecLimits::for_testing(),
2793 &mut buf,
2794 )
2795 .unwrap();
2796 let header =
2797 wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2798 let mut expected = [0u8; wire::HEADER_SIZE];
2799 encode_header(&header, &mut expected).unwrap();
2800 assert_eq!(&buf[..bytes], expected.as_slice());
2801 }
2802
2803 #[test]
2804 fn delta_roundtrip_single_update() {
2805 let schema = schema_one_bool();
2806 let baseline = baseline_snapshot();
2807 let current = Snapshot {
2808 tick: SnapshotTick::new(11),
2809 entities: vec![EntitySnapshot {
2810 id: EntityId::new(1),
2811 components: vec![ComponentSnapshot {
2812 id: ComponentId::new(1).unwrap(),
2813 fields: vec![FieldValue::Bool(true)],
2814 }],
2815 }],
2816 };
2817
2818 let mut buf = [0u8; 128];
2819 let bytes = encode_delta_snapshot(
2820 &schema,
2821 current.tick,
2822 baseline.tick,
2823 &baseline,
2824 ¤t,
2825 &CodecLimits::for_testing(),
2826 &mut buf,
2827 )
2828 .unwrap();
2829 let applied = apply_delta_snapshot(
2830 &schema,
2831 &baseline,
2832 &buf[..bytes],
2833 &wire::Limits::for_testing(),
2834 &CodecLimits::for_testing(),
2835 )
2836 .unwrap();
2837 assert_eq!(applied.entities, current.entities);
2838 }
2839
2840 #[test]
2841 fn delta_roundtrip_reuse_scratch() {
2842 let schema = schema_one_bool();
2843 let baseline = baseline_snapshot();
2844 let current_one = Snapshot {
2845 tick: SnapshotTick::new(11),
2846 entities: vec![EntitySnapshot {
2847 id: EntityId::new(1),
2848 components: vec![ComponentSnapshot {
2849 id: ComponentId::new(1).unwrap(),
2850 fields: vec![FieldValue::Bool(true)],
2851 }],
2852 }],
2853 };
2854 let current_two = Snapshot {
2855 tick: SnapshotTick::new(12),
2856 entities: vec![EntitySnapshot {
2857 id: EntityId::new(1),
2858 components: vec![ComponentSnapshot {
2859 id: ComponentId::new(1).unwrap(),
2860 fields: vec![FieldValue::Bool(false)],
2861 }],
2862 }],
2863 };
2864
2865 let mut scratch = CodecScratch::default();
2866 let mut buf_one = [0u8; 128];
2867 let mut buf_two = [0u8; 128];
2868
2869 let bytes_one = encode_delta_snapshot_with_scratch(
2870 &schema,
2871 current_one.tick,
2872 baseline.tick,
2873 &baseline,
2874 ¤t_one,
2875 &CodecLimits::for_testing(),
2876 &mut scratch,
2877 &mut buf_one,
2878 )
2879 .unwrap();
2880 let applied_one = apply_delta_snapshot(
2881 &schema,
2882 &baseline,
2883 &buf_one[..bytes_one],
2884 &wire::Limits::for_testing(),
2885 &CodecLimits::for_testing(),
2886 )
2887 .unwrap();
2888 assert_eq!(applied_one.entities, current_one.entities);
2889
2890 let bytes_two = encode_delta_snapshot_with_scratch(
2891 &schema,
2892 current_two.tick,
2893 baseline.tick,
2894 &baseline,
2895 ¤t_two,
2896 &CodecLimits::for_testing(),
2897 &mut scratch,
2898 &mut buf_two,
2899 )
2900 .unwrap();
2901 let applied_two = apply_delta_snapshot(
2902 &schema,
2903 &baseline,
2904 &buf_two[..bytes_two],
2905 &wire::Limits::for_testing(),
2906 &CodecLimits::for_testing(),
2907 )
2908 .unwrap();
2909 assert_eq!(applied_two.entities, current_two.entities);
2910 }
2911
2912 #[test]
2913 fn delta_rejects_both_update_encodings() {
2914 let schema = schema_one_bool();
2915 let baseline = baseline_snapshot();
2916 let update_body = [0u8; 1];
2917 let mut update_section = [0u8; 8];
2918 let update_len =
2919 wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2920 .unwrap();
2921
2922 let mut sparse_section = [0u8; 8];
2923 let sparse_len = wire::encode_section(
2924 SectionTag::EntityUpdateSparse,
2925 &update_body,
2926 &mut sparse_section,
2927 )
2928 .unwrap();
2929
2930 let payload_len = (update_len + sparse_len) as u32;
2931 let header = wire::PacketHeader::delta_snapshot(
2932 schema_hash(&schema),
2933 baseline.tick.raw() + 1,
2934 baseline.tick.raw(),
2935 payload_len,
2936 );
2937 let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2938 wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2939 buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2940 .copy_from_slice(&update_section[..update_len]);
2941 buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2942 .copy_from_slice(&sparse_section[..sparse_len]);
2943
2944 let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2945 let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2946 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2947
2948 let err = apply_delta_snapshot_from_packet(
2949 &schema,
2950 &baseline,
2951 &packet,
2952 &CodecLimits::for_testing(),
2953 )
2954 .unwrap_err();
2955 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2956 }
2957
2958 #[test]
2959 fn delta_roundtrip_create_destroy_update() {
2960 let schema = schema_one_bool();
2961 let baseline = Snapshot {
2962 tick: SnapshotTick::new(10),
2963 entities: vec![
2964 EntitySnapshot {
2965 id: EntityId::new(1),
2966 components: vec![ComponentSnapshot {
2967 id: ComponentId::new(1).unwrap(),
2968 fields: vec![FieldValue::Bool(false)],
2969 }],
2970 },
2971 EntitySnapshot {
2972 id: EntityId::new(2),
2973 components: vec![ComponentSnapshot {
2974 id: ComponentId::new(1).unwrap(),
2975 fields: vec![FieldValue::Bool(false)],
2976 }],
2977 },
2978 ],
2979 };
2980 let current = Snapshot {
2981 tick: SnapshotTick::new(11),
2982 entities: vec![
2983 EntitySnapshot {
2984 id: EntityId::new(2),
2985 components: vec![ComponentSnapshot {
2986 id: ComponentId::new(1).unwrap(),
2987 fields: vec![FieldValue::Bool(true)],
2988 }],
2989 },
2990 EntitySnapshot {
2991 id: EntityId::new(3),
2992 components: vec![ComponentSnapshot {
2993 id: ComponentId::new(1).unwrap(),
2994 fields: vec![FieldValue::Bool(true)],
2995 }],
2996 },
2997 ],
2998 };
2999
3000 let mut buf = [0u8; 256];
3001 let bytes = encode_delta_snapshot(
3002 &schema,
3003 current.tick,
3004 baseline.tick,
3005 &baseline,
3006 ¤t,
3007 &CodecLimits::for_testing(),
3008 &mut buf,
3009 )
3010 .unwrap();
3011 let applied = apply_delta_snapshot(
3012 &schema,
3013 &baseline,
3014 &buf[..bytes],
3015 &wire::Limits::for_testing(),
3016 &CodecLimits::for_testing(),
3017 )
3018 .unwrap();
3019 assert_eq!(applied.entities, current.entities);
3020 }
3021
3022 #[test]
3023 fn delta_session_header_matches_payload() {
3024 let schema = schema_one_bool();
3025 let baseline = baseline_snapshot();
3026 let current = Snapshot {
3027 tick: SnapshotTick::new(11),
3028 entities: vec![EntitySnapshot {
3029 id: EntityId::new(1),
3030 components: vec![ComponentSnapshot {
3031 id: ComponentId::new(1).unwrap(),
3032 fields: vec![FieldValue::Bool(true)],
3033 }],
3034 }],
3035 };
3036
3037 let mut full_buf = [0u8; 128];
3038 let full_bytes = encode_delta_snapshot_for_client(
3039 &schema,
3040 current.tick,
3041 baseline.tick,
3042 &baseline,
3043 ¤t,
3044 &CodecLimits::for_testing(),
3045 &mut full_buf,
3046 )
3047 .unwrap();
3048 let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
3049
3050 let mut session_buf = [0u8; 128];
3051 let mut last_tick = baseline.tick;
3052 let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
3053 &schema,
3054 current.tick,
3055 baseline.tick,
3056 &baseline,
3057 ¤t,
3058 &CodecLimits::for_testing(),
3059 &mut CodecScratch::default(),
3060 &mut last_tick,
3061 &mut session_buf,
3062 )
3063 .unwrap();
3064
3065 let session_header =
3066 wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
3067 .unwrap();
3068 let payload_start = session_header.header_len;
3069 let payload_end = payload_start + session_header.payload_len as usize;
3070 let session_payload = &session_buf[payload_start..payload_end];
3071 assert_eq!(full_payload, session_payload);
3072 }
3073
3074 #[test]
3075 fn delta_roundtrip_single_component_change() {
3076 let schema = schema_two_components();
3077 let baseline = Snapshot {
3078 tick: SnapshotTick::new(10),
3079 entities: vec![EntitySnapshot {
3080 id: EntityId::new(1),
3081 components: vec![
3082 ComponentSnapshot {
3083 id: ComponentId::new(1).unwrap(),
3084 fields: vec![FieldValue::Bool(false)],
3085 },
3086 ComponentSnapshot {
3087 id: ComponentId::new(2).unwrap(),
3088 fields: vec![FieldValue::Bool(false)],
3089 },
3090 ],
3091 }],
3092 };
3093 let current = Snapshot {
3094 tick: SnapshotTick::new(11),
3095 entities: vec![EntitySnapshot {
3096 id: EntityId::new(1),
3097 components: vec![
3098 ComponentSnapshot {
3099 id: ComponentId::new(1).unwrap(),
3100 fields: vec![FieldValue::Bool(true)],
3101 },
3102 ComponentSnapshot {
3103 id: ComponentId::new(2).unwrap(),
3104 fields: vec![FieldValue::Bool(false)],
3105 },
3106 ],
3107 }],
3108 };
3109
3110 let mut buf = [0u8; 256];
3111 let bytes = encode_delta_snapshot(
3112 &schema,
3113 current.tick,
3114 baseline.tick,
3115 &baseline,
3116 ¤t,
3117 &CodecLimits::for_testing(),
3118 &mut buf,
3119 )
3120 .unwrap();
3121 let applied = apply_delta_snapshot(
3122 &schema,
3123 &baseline,
3124 &buf[..bytes],
3125 &wire::Limits::for_testing(),
3126 &CodecLimits::for_testing(),
3127 )
3128 .unwrap();
3129 assert_eq!(applied.entities, current.entities);
3130 }
3131
3132 #[test]
3133 fn baseline_tick_mismatch_is_error() {
3134 let schema = schema_one_bool();
3135 let baseline = baseline_snapshot();
3136 let current = baseline.clone();
3137 let mut buf = [0u8; 128];
3138 let bytes = encode_delta_snapshot(
3139 &schema,
3140 SnapshotTick::new(11),
3141 baseline.tick,
3142 &baseline,
3143 ¤t,
3144 &CodecLimits::for_testing(),
3145 &mut buf,
3146 )
3147 .unwrap();
3148 let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3149 packet.header.baseline_tick = 999;
3150 wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
3151 let err = apply_delta_snapshot(
3152 &schema,
3153 &baseline,
3154 &buf[..bytes],
3155 &wire::Limits::for_testing(),
3156 &CodecLimits::for_testing(),
3157 )
3158 .unwrap_err();
3159 assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
3160 }
3161
3162 #[test]
3163 fn threshold_suppresses_small_change() {
3164 let schema = schema_uint_threshold(5);
3165 let baseline = Snapshot {
3166 tick: SnapshotTick::new(10),
3167 entities: vec![EntitySnapshot {
3168 id: EntityId::new(1),
3169 components: vec![ComponentSnapshot {
3170 id: ComponentId::new(1).unwrap(),
3171 fields: vec![FieldValue::UInt(10)],
3172 }],
3173 }],
3174 };
3175 let current = Snapshot {
3176 tick: SnapshotTick::new(11),
3177 entities: vec![EntitySnapshot {
3178 id: EntityId::new(1),
3179 components: vec![ComponentSnapshot {
3180 id: ComponentId::new(1).unwrap(),
3181 fields: vec![FieldValue::UInt(12)],
3182 }],
3183 }],
3184 };
3185
3186 let mut buf = [0u8; 128];
3187 let bytes = encode_delta_snapshot(
3188 &schema,
3189 current.tick,
3190 baseline.tick,
3191 &baseline,
3192 ¤t,
3193 &CodecLimits::for_testing(),
3194 &mut buf,
3195 )
3196 .unwrap();
3197
3198 let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
3199 assert_eq!(packet.sections.len(), 0);
3200
3201 let applied = apply_delta_snapshot(
3202 &schema,
3203 &baseline,
3204 &buf[..bytes],
3205 &wire::Limits::for_testing(),
3206 &CodecLimits::for_testing(),
3207 )
3208 .unwrap();
3209 assert_eq!(applied.entities, baseline.entities);
3210 }
3211}