1use std::cmp::Ordering;
4
5use bitstream::{BitReader, BitWriter};
6use schema::{schema_hash, ChangePolicy, ComponentDef, ComponentId, FieldDef};
7use wire::{decode_packet, encode_header, SectionTag, WirePacket};
8
9use crate::baseline::BaselineStore;
10use crate::error::{CodecError, CodecResult, LimitKind, MaskKind, MaskReason, ValueReason};
11use crate::limits::CodecLimits;
12use crate::scratch::CodecScratch;
13use crate::snapshot::{
14 ensure_known_components, read_field_value, read_field_value_sparse, read_mask, required_bits,
15 write_field_value, write_field_value_sparse, write_section, ComponentSnapshot, EntitySnapshot,
16 FieldValue, Snapshot,
17};
18use crate::types::{EntityId, SnapshotTick};
19
20#[must_use]
22pub fn select_baseline_tick<T>(
23 store: &BaselineStore<T>,
24 ack_tick: SnapshotTick,
25) -> Option<SnapshotTick> {
26 store.latest_at_or_before(ack_tick).map(|(tick, _)| tick)
27}
28
29pub fn encode_delta_snapshot(
33 schema: &schema::Schema,
34 tick: SnapshotTick,
35 baseline_tick: SnapshotTick,
36 baseline: &Snapshot,
37 current: &Snapshot,
38 limits: &CodecLimits,
39 out: &mut [u8],
40) -> CodecResult<usize> {
41 let mut scratch = CodecScratch::default();
42 encode_delta_snapshot_with_scratch(
43 schema,
44 tick,
45 baseline_tick,
46 baseline,
47 current,
48 limits,
49 &mut scratch,
50 out,
51 )
52}
53
54#[allow(clippy::too_many_arguments)]
56pub fn encode_delta_snapshot_with_scratch(
57 schema: &schema::Schema,
58 tick: SnapshotTick,
59 baseline_tick: SnapshotTick,
60 baseline: &Snapshot,
61 current: &Snapshot,
62 limits: &CodecLimits,
63 scratch: &mut CodecScratch,
64 out: &mut [u8],
65) -> CodecResult<usize> {
66 encode_delta_snapshot_with_scratch_mode(
67 schema,
68 tick,
69 baseline_tick,
70 baseline,
71 current,
72 limits,
73 scratch,
74 out,
75 EncodeUpdateMode::Auto,
76 )
77}
78
79pub fn encode_delta_snapshot_for_client(
84 schema: &schema::Schema,
85 tick: SnapshotTick,
86 baseline_tick: SnapshotTick,
87 baseline: &Snapshot,
88 current: &Snapshot,
89 limits: &CodecLimits,
90 out: &mut [u8],
91) -> CodecResult<usize> {
92 let mut scratch = CodecScratch::default();
93 encode_delta_snapshot_for_client_with_scratch(
94 schema,
95 tick,
96 baseline_tick,
97 baseline,
98 current,
99 limits,
100 &mut scratch,
101 out,
102 )
103}
104
105#[allow(clippy::too_many_arguments)]
107pub fn encode_delta_snapshot_for_client_session(
108 schema: &schema::Schema,
109 tick: SnapshotTick,
110 baseline_tick: SnapshotTick,
111 baseline: &Snapshot,
112 current: &Snapshot,
113 limits: &CodecLimits,
114 last_tick: &mut SnapshotTick,
115 out: &mut [u8],
116) -> CodecResult<usize> {
117 let mut scratch = CodecScratch::default();
118 encode_delta_snapshot_for_client_session_with_scratch(
119 schema,
120 tick,
121 baseline_tick,
122 baseline,
123 current,
124 limits,
125 &mut scratch,
126 last_tick,
127 out,
128 )
129}
130
131#[allow(clippy::too_many_arguments)]
133pub fn encode_delta_snapshot_for_client_with_scratch(
134 schema: &schema::Schema,
135 tick: SnapshotTick,
136 baseline_tick: SnapshotTick,
137 baseline: &Snapshot,
138 current: &Snapshot,
139 limits: &CodecLimits,
140 scratch: &mut CodecScratch,
141 out: &mut [u8],
142) -> CodecResult<usize> {
143 encode_delta_snapshot_with_scratch_mode(
144 schema,
145 tick,
146 baseline_tick,
147 baseline,
148 current,
149 limits,
150 scratch,
151 out,
152 EncodeUpdateMode::Sparse,
153 )
154}
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157enum EncodeUpdateMode {
158 Auto,
159 Sparse,
160}
161
162#[allow(clippy::too_many_arguments)]
164pub fn encode_delta_snapshot_for_client_session_with_scratch(
165 schema: &schema::Schema,
166 tick: SnapshotTick,
167 baseline_tick: SnapshotTick,
168 baseline: &Snapshot,
169 current: &Snapshot,
170 limits: &CodecLimits,
171 scratch: &mut CodecScratch,
172 last_tick: &mut SnapshotTick,
173 out: &mut [u8],
174) -> CodecResult<usize> {
175 let max_header = wire::SESSION_MAX_HEADER_SIZE;
176 if out.len() < max_header {
177 return Err(CodecError::OutputTooSmall {
178 needed: max_header,
179 available: out.len(),
180 });
181 }
182 if tick.raw() <= last_tick.raw() {
183 return Err(CodecError::InvalidEntityOrder {
184 previous: last_tick.raw(),
185 current: tick.raw(),
186 });
187 }
188 if baseline_tick.raw() > tick.raw() {
189 return Err(CodecError::BaselineTickMismatch {
190 expected: baseline_tick.raw(),
191 found: tick.raw(),
192 });
193 }
194 let payload_len = encode_delta_payload_with_mode(
195 schema,
196 baseline_tick,
197 baseline,
198 current,
199 limits,
200 scratch,
201 &mut out[max_header..],
202 EncodeUpdateMode::Sparse,
203 )?;
204 let tick_delta = tick.raw() - last_tick.raw();
205 let baseline_delta = tick.raw() - baseline_tick.raw();
206 let header_len = wire::encode_session_header(
207 &mut out[..max_header],
208 wire::SessionFlags::delta_snapshot(),
209 tick_delta,
210 baseline_delta,
211 payload_len as u32,
212 )
213 .map_err(|_| CodecError::OutputTooSmall {
214 needed: max_header,
215 available: out.len(),
216 })?;
217 if header_len < max_header {
218 let payload_start = max_header;
219 let payload_end = max_header + payload_len;
220 out.copy_within(payload_start..payload_end, header_len);
221 }
222 *last_tick = tick;
223 Ok(header_len + payload_len)
224}
225
226#[allow(clippy::too_many_arguments)]
227fn encode_delta_snapshot_with_scratch_mode(
228 schema: &schema::Schema,
229 tick: SnapshotTick,
230 baseline_tick: SnapshotTick,
231 baseline: &Snapshot,
232 current: &Snapshot,
233 limits: &CodecLimits,
234 scratch: &mut CodecScratch,
235 out: &mut [u8],
236 mode: EncodeUpdateMode,
237) -> CodecResult<usize> {
238 if out.len() < wire::HEADER_SIZE {
239 return Err(CodecError::OutputTooSmall {
240 needed: wire::HEADER_SIZE,
241 available: out.len(),
242 });
243 }
244 let payload_len = encode_delta_payload_with_mode(
245 schema,
246 baseline_tick,
247 baseline,
248 current,
249 limits,
250 scratch,
251 &mut out[wire::HEADER_SIZE..],
252 mode,
253 )?;
254 let header = wire::PacketHeader::delta_snapshot(
255 schema_hash(schema),
256 tick.raw(),
257 baseline_tick.raw(),
258 payload_len as u32,
259 );
260 encode_header(&header, &mut out[..wire::HEADER_SIZE]).map_err(|_| {
261 CodecError::OutputTooSmall {
262 needed: wire::HEADER_SIZE,
263 available: out.len(),
264 }
265 })?;
266
267 Ok(wire::HEADER_SIZE + payload_len)
268}
269
270#[allow(clippy::too_many_arguments)]
271fn encode_delta_payload_with_mode(
272 schema: &schema::Schema,
273 baseline_tick: SnapshotTick,
274 baseline: &Snapshot,
275 current: &Snapshot,
276 limits: &CodecLimits,
277 scratch: &mut CodecScratch,
278 out: &mut [u8],
279 mode: EncodeUpdateMode,
280) -> CodecResult<usize> {
281 if baseline.tick != baseline_tick {
282 return Err(CodecError::BaselineTickMismatch {
283 expected: baseline.tick.raw(),
284 found: baseline_tick.raw(),
285 });
286 }
287
288 ensure_entities_sorted(&baseline.entities)?;
289 ensure_entities_sorted(¤t.entities)?;
290
291 let mut counts = DiffCounts::default();
292 diff_counts(schema, baseline, current, limits, &mut counts)?;
293
294 if counts.creates > limits.max_entities_create {
295 return Err(CodecError::LimitsExceeded {
296 kind: LimitKind::EntitiesCreate,
297 limit: limits.max_entities_create,
298 actual: counts.creates,
299 });
300 }
301 if counts.updates > limits.max_entities_update {
302 return Err(CodecError::LimitsExceeded {
303 kind: LimitKind::EntitiesUpdate,
304 limit: limits.max_entities_update,
305 actual: counts.updates,
306 });
307 }
308 if counts.destroys > limits.max_entities_destroy {
309 return Err(CodecError::LimitsExceeded {
310 kind: LimitKind::EntitiesDestroy,
311 limit: limits.max_entities_destroy,
312 actual: counts.destroys,
313 });
314 }
315
316 let mut offset = 0;
317 if counts.destroys > 0 {
318 let written = write_section(
319 SectionTag::EntityDestroy,
320 &mut out[offset..],
321 limits,
322 |writer| encode_destroy_body(baseline, current, counts.destroys, limits, writer),
323 )?;
324 offset += written;
325 }
326 if counts.creates > 0 {
327 let written = write_section(
328 SectionTag::EntityCreate,
329 &mut out[offset..],
330 limits,
331 |writer| encode_create_body(schema, baseline, current, counts.creates, limits, writer),
332 )?;
333 offset += written;
334 }
335 if counts.updates > 0 {
336 let update_encoding = match mode {
337 EncodeUpdateMode::Auto => {
338 select_update_encoding(schema, baseline, current, limits, scratch)?
339 }
340 EncodeUpdateMode::Sparse => UpdateEncoding::Sparse,
341 };
342 let section_tag = match update_encoding {
343 UpdateEncoding::Masked => SectionTag::EntityUpdate,
344 UpdateEncoding::Sparse => SectionTag::EntityUpdateSparsePacked,
345 };
346 let written =
347 write_section(
348 section_tag,
349 &mut out[offset..],
350 limits,
351 |writer| match update_encoding {
352 UpdateEncoding::Masked => encode_update_body_masked(
353 schema,
354 baseline,
355 current,
356 counts.updates,
357 limits,
358 scratch,
359 writer,
360 ),
361 UpdateEncoding::Sparse => encode_update_body_sparse_packed(
362 schema,
363 baseline,
364 current,
365 counts.updates,
366 limits,
367 scratch,
368 writer,
369 ),
370 },
371 )?;
372 offset += written;
373 }
374
375 Ok(offset)
376}
377
378pub fn apply_delta_snapshot(
380 schema: &schema::Schema,
381 baseline: &Snapshot,
382 bytes: &[u8],
383 wire_limits: &wire::Limits,
384 limits: &CodecLimits,
385) -> CodecResult<Snapshot> {
386 let packet = decode_packet(bytes, wire_limits)?;
387 apply_delta_snapshot_from_packet(schema, baseline, &packet, limits)
388}
389
390pub fn apply_delta_snapshot_from_packet(
392 schema: &schema::Schema,
393 baseline: &Snapshot,
394 packet: &WirePacket<'_>,
395 limits: &CodecLimits,
396) -> CodecResult<Snapshot> {
397 let header = packet.header;
398 if !header.flags.is_delta_snapshot() {
399 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
400 flags: header.flags.raw(),
401 }));
402 }
403 if header.baseline_tick == 0 {
404 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
405 baseline_tick: header.baseline_tick,
406 flags: header.flags.raw(),
407 }));
408 }
409 if header.baseline_tick != baseline.tick.raw() {
410 return Err(CodecError::BaselineTickMismatch {
411 expected: baseline.tick.raw(),
412 found: header.baseline_tick,
413 });
414 }
415
416 let expected_hash = schema_hash(schema);
417 if header.schema_hash != expected_hash {
418 return Err(CodecError::SchemaMismatch {
419 expected: expected_hash,
420 found: header.schema_hash,
421 });
422 }
423
424 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
425
426 ensure_entities_sorted(&baseline.entities)?;
427 ensure_entities_sorted(&creates)?;
428
429 let mut remaining = apply_destroys(&baseline.entities, &destroys)?;
430 remaining = apply_creates(remaining, creates)?;
431 if remaining.len() > limits.max_total_entities_after_apply {
432 return Err(CodecError::LimitsExceeded {
433 kind: LimitKind::TotalEntitiesAfterApply,
434 limit: limits.max_total_entities_after_apply,
435 actual: remaining.len(),
436 });
437 }
438 apply_updates(&mut remaining, &updates)?;
439
440 Ok(Snapshot {
441 tick: SnapshotTick::new(header.tick),
442 entities: remaining,
443 })
444}
445
446pub fn decode_delta_packet(
448 schema: &schema::Schema,
449 packet: &WirePacket<'_>,
450 limits: &CodecLimits,
451) -> CodecResult<DeltaDecoded> {
452 let header = packet.header;
453 if !header.flags.is_delta_snapshot() {
454 return Err(CodecError::Wire(wire::DecodeError::InvalidFlags {
455 flags: header.flags.raw(),
456 }));
457 }
458 if header.baseline_tick == 0 {
459 return Err(CodecError::Wire(wire::DecodeError::InvalidBaselineTick {
460 baseline_tick: header.baseline_tick,
461 flags: header.flags.raw(),
462 }));
463 }
464 let expected_hash = schema_hash(schema);
465 if header.schema_hash != expected_hash {
466 return Err(CodecError::SchemaMismatch {
467 expected: expected_hash,
468 found: header.schema_hash,
469 });
470 }
471
472 let (destroys, creates, updates) = decode_delta_sections(schema, packet, limits)?;
473
474 Ok(DeltaDecoded {
475 tick: SnapshotTick::new(header.tick),
476 baseline_tick: SnapshotTick::new(header.baseline_tick),
477 destroys,
478 creates,
479 updates,
480 })
481}
482
483#[derive(Default)]
484struct DiffCounts {
485 creates: usize,
486 updates: usize,
487 destroys: usize,
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491enum UpdateEncoding {
492 Masked,
493 Sparse,
494}
495
496fn diff_counts(
497 schema: &schema::Schema,
498 baseline: &Snapshot,
499 current: &Snapshot,
500 limits: &CodecLimits,
501 counts: &mut DiffCounts,
502) -> CodecResult<()> {
503 let mut i = 0usize;
504 let mut j = 0usize;
505 while i < baseline.entities.len() || j < current.entities.len() {
506 let base = baseline.entities.get(i);
507 let curr = current.entities.get(j);
508 match (base, curr) {
509 (Some(b), Some(c)) => {
510 if b.id.raw() < c.id.raw() {
511 counts.destroys += 1;
512 i += 1;
513 } else if b.id.raw() > c.id.raw() {
514 counts.creates += 1;
515 j += 1;
516 } else {
517 if entity_has_updates(schema, b, c, limits)? {
518 counts.updates += 1;
519 }
520 i += 1;
521 j += 1;
522 }
523 }
524 (Some(_), None) => {
525 counts.destroys += 1;
526 i += 1;
527 }
528 (None, Some(_)) => {
529 counts.creates += 1;
530 j += 1;
531 }
532 (None, None) => break,
533 }
534 }
535 Ok(())
536}
537
538fn select_update_encoding(
539 schema: &schema::Schema,
540 baseline: &Snapshot,
541 current: &Snapshot,
542 limits: &CodecLimits,
543 scratch: &mut CodecScratch,
544) -> CodecResult<UpdateEncoding> {
545 let component_count = schema.components.len();
546 let mut mask_bits = 0usize;
547 let mut sparse_bits = 0usize;
548 let mut baseline_iter = baseline.entities.iter();
549 let mut current_iter = current.entities.iter();
550 let mut baseline_next = baseline_iter.next();
551 let mut current_next = current_iter.next();
552
553 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
554 match base.id.cmp(&curr.id) {
555 Ordering::Less => {
556 baseline_next = baseline_iter.next();
557 }
558 Ordering::Greater => {
559 current_next = current_iter.next();
560 }
561 Ordering::Equal => {
562 for component in &schema.components {
563 let base_component = find_component(base, component.id);
564 let curr_component = find_component(curr, component.id);
565 if base_component.is_some() != curr_component.is_some() {
566 return Err(CodecError::InvalidMask {
567 kind: MaskKind::ComponentMask,
568 reason: MaskReason::ComponentPresenceMismatch {
569 component: component.id,
570 },
571 });
572 }
573 if let (Some(base_component), Some(curr_component)) =
574 (base_component, curr_component)
575 {
576 if base_component.fields.len() != component.fields.len()
577 || curr_component.fields.len() != component.fields.len()
578 {
579 return Err(CodecError::InvalidMask {
580 kind: MaskKind::FieldMask {
581 component: component.id,
582 },
583 reason: MaskReason::FieldCountMismatch {
584 expected: component.fields.len(),
585 actual: base_component
586 .fields
587 .len()
588 .max(curr_component.fields.len()),
589 },
590 });
591 }
592 if component.fields.len() > limits.max_fields_per_component {
593 return Err(CodecError::LimitsExceeded {
594 kind: LimitKind::FieldsPerComponent,
595 limit: limits.max_fields_per_component,
596 actual: component.fields.len(),
597 });
598 }
599 let (_, field_mask) = scratch
600 .component_and_field_masks_mut(component_count, component.fields.len());
601 let field_mask = compute_field_mask_into(
603 component,
604 base_component,
605 curr_component,
606 field_mask,
607 )?;
608 let changed = field_mask.iter().filter(|bit| **bit).count();
609 if changed > 0 {
610 let field_count = component.fields.len();
611 let index_bits =
612 required_bits(field_count.saturating_sub(1) as u64) as usize;
613 mask_bits += field_count;
616 sparse_bits += index_bits * changed;
617 sparse_bits += varu32_len(curr.id.raw()) * 8;
618 sparse_bits += varu32_len(component.id.get() as u32) * 8;
619 sparse_bits += varu32_len(changed as u32) * 8;
620 }
621 }
622 }
623
624 baseline_next = baseline_iter.next();
625 current_next = current_iter.next();
626 }
627 }
628 }
629
630 if mask_bits == 0 {
631 return Ok(UpdateEncoding::Masked);
632 }
633
634 if sparse_bits <= mask_bits {
635 Ok(UpdateEncoding::Sparse)
636 } else {
637 Ok(UpdateEncoding::Masked)
638 }
639}
640
641fn varu32_len(value: u32) -> usize {
642 if value < (1 << 7) {
643 1
644 } else if value < (1 << 14) {
645 2
646 } else if value < (1 << 21) {
647 3
648 } else if value < (1 << 28) {
649 4
650 } else {
651 5
652 }
653}
654
655fn encode_destroy_body(
656 baseline: &Snapshot,
657 current: &Snapshot,
658 destroy_count: usize,
659 limits: &CodecLimits,
660 writer: &mut BitWriter<'_>,
661) -> CodecResult<()> {
662 if destroy_count > limits.max_entities_destroy {
663 return Err(CodecError::LimitsExceeded {
664 kind: LimitKind::EntitiesDestroy,
665 limit: limits.max_entities_destroy,
666 actual: destroy_count,
667 });
668 }
669
670 writer.align_to_byte()?;
671 writer.write_varu32(destroy_count as u32)?;
672
673 let mut i = 0usize;
674 let mut j = 0usize;
675 while i < baseline.entities.len() || j < current.entities.len() {
676 let base = baseline.entities.get(i);
677 let curr = current.entities.get(j);
678 match (base, curr) {
679 (Some(b), Some(c)) => {
680 if b.id.raw() < c.id.raw() {
681 writer.align_to_byte()?;
682 writer.write_u32_aligned(b.id.raw())?;
683 i += 1;
684 } else if b.id.raw() > c.id.raw() {
685 j += 1;
686 } else {
687 i += 1;
688 j += 1;
689 }
690 }
691 (Some(b), None) => {
692 writer.align_to_byte()?;
693 writer.write_u32_aligned(b.id.raw())?;
694 i += 1;
695 }
696 (None, Some(_)) => {
697 j += 1;
698 }
699 (None, None) => break,
700 }
701 }
702
703 writer.align_to_byte()?;
704 Ok(())
705}
706
707fn encode_create_body(
708 schema: &schema::Schema,
709 baseline: &Snapshot,
710 current: &Snapshot,
711 create_count: usize,
712 limits: &CodecLimits,
713 writer: &mut BitWriter<'_>,
714) -> CodecResult<()> {
715 if create_count > limits.max_entities_create {
716 return Err(CodecError::LimitsExceeded {
717 kind: LimitKind::EntitiesCreate,
718 limit: limits.max_entities_create,
719 actual: create_count,
720 });
721 }
722
723 writer.align_to_byte()?;
724 writer.write_varu32(create_count as u32)?;
725
726 let mut i = 0usize;
727 let mut j = 0usize;
728 while i < baseline.entities.len() || j < current.entities.len() {
729 let base = baseline.entities.get(i);
730 let curr = current.entities.get(j);
731 match (base, curr) {
732 (Some(b), Some(c)) => {
733 if b.id.raw() < c.id.raw() {
734 i += 1;
735 } else if b.id.raw() > c.id.raw() {
736 write_create_entity(schema, c, limits, writer)?;
737 j += 1;
738 } else {
739 i += 1;
740 j += 1;
741 }
742 }
743 (Some(_), None) => {
744 i += 1;
745 }
746 (None, Some(c)) => {
747 write_create_entity(schema, c, limits, writer)?;
748 j += 1;
749 }
750 (None, None) => break,
751 }
752 }
753
754 writer.align_to_byte()?;
755 Ok(())
756}
757
758fn encode_update_body_masked(
759 schema: &schema::Schema,
760 baseline: &Snapshot,
761 current: &Snapshot,
762 update_count: usize,
763 limits: &CodecLimits,
764 scratch: &mut CodecScratch,
765 writer: &mut BitWriter<'_>,
766) -> CodecResult<()> {
767 if update_count > limits.max_entities_update {
768 return Err(CodecError::LimitsExceeded {
769 kind: LimitKind::EntitiesUpdate,
770 limit: limits.max_entities_update,
771 actual: update_count,
772 });
773 }
774
775 writer.align_to_byte()?;
776 writer.write_varu32(update_count as u32)?;
777
778 let mut i = 0usize;
779 let mut j = 0usize;
780 while i < baseline.entities.len() || j < current.entities.len() {
781 let base = baseline.entities.get(i);
782 let curr = current.entities.get(j);
783 match (base, curr) {
784 (Some(b), Some(c)) => {
785 if b.id.raw() < c.id.raw() {
786 i += 1;
787 } else if b.id.raw() > c.id.raw() {
788 j += 1;
789 } else {
790 if entity_has_updates(schema, b, c, limits)? {
791 writer.align_to_byte()?;
792 writer.write_u32_aligned(c.id.raw())?;
793 ensure_component_presence_matches(schema, b, c)?;
794 write_update_components(schema, b, c, limits, scratch, writer)?;
795 }
796 i += 1;
797 j += 1;
798 }
799 }
800 (Some(_), None) => i += 1,
801 (None, Some(_)) => j += 1,
802 (None, None) => break,
803 }
804 }
805
806 writer.align_to_byte()?;
807 Ok(())
808}
809
810#[allow(dead_code)]
811fn encode_update_body_sparse_varint(
812 schema: &schema::Schema,
813 baseline: &Snapshot,
814 current: &Snapshot,
815 update_count: usize,
816 limits: &CodecLimits,
817 scratch: &mut CodecScratch,
818 writer: &mut BitWriter<'_>,
819) -> CodecResult<()> {
820 if update_count > limits.max_entities_update {
821 return Err(CodecError::LimitsExceeded {
822 kind: LimitKind::EntitiesUpdate,
823 limit: limits.max_entities_update,
824 actual: update_count,
825 });
826 }
827
828 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
829 let entry_limit = limits
830 .max_entities_update
831 .saturating_mul(limits.max_components_per_entity);
832 if entry_count > entry_limit {
833 return Err(CodecError::LimitsExceeded {
834 kind: LimitKind::EntitiesUpdate,
835 limit: entry_limit,
836 actual: entry_count,
837 });
838 }
839
840 writer.align_to_byte()?;
841 writer.write_varu32(entry_count as u32)?;
842
843 let mut baseline_iter = baseline.entities.iter();
844 let mut current_iter = current.entities.iter();
845 let mut baseline_next = baseline_iter.next();
846 let mut current_next = current_iter.next();
847 let component_count = schema.components.len();
848
849 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
850 match base.id.cmp(&curr.id) {
851 Ordering::Less => {
852 baseline_next = baseline_iter.next();
853 }
854 Ordering::Greater => {
855 current_next = current_iter.next();
856 }
857 Ordering::Equal => {
858 if entity_has_updates(schema, base, curr, limits)? {
859 for component in &schema.components {
860 let base_component = find_component(base, component.id);
861 let curr_component = find_component(curr, component.id);
862 if base_component.is_some() != curr_component.is_some() {
863 return Err(CodecError::InvalidMask {
864 kind: MaskKind::ComponentMask,
865 reason: MaskReason::ComponentPresenceMismatch {
866 component: component.id,
867 },
868 });
869 }
870 if let (Some(base_component), Some(curr_component)) =
871 (base_component, curr_component)
872 {
873 if base_component.fields.len() != component.fields.len()
874 || curr_component.fields.len() != component.fields.len()
875 {
876 return Err(CodecError::InvalidMask {
877 kind: MaskKind::FieldMask {
878 component: component.id,
879 },
880 reason: MaskReason::FieldCountMismatch {
881 expected: component.fields.len(),
882 actual: base_component
883 .fields
884 .len()
885 .max(curr_component.fields.len()),
886 },
887 });
888 }
889 if component.fields.len() > limits.max_fields_per_component {
890 return Err(CodecError::LimitsExceeded {
891 kind: LimitKind::FieldsPerComponent,
892 limit: limits.max_fields_per_component,
893 actual: component.fields.len(),
894 });
895 }
896 let (_, field_mask) = scratch.component_and_field_masks_mut(
897 component_count,
898 component.fields.len(),
899 );
900 let field_mask = compute_field_mask_into(
901 component,
902 base_component,
903 curr_component,
904 field_mask,
905 )?;
906 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
907 if changed_fields > 0 {
908 writer.align_to_byte()?;
909 writer.write_u32_aligned(curr.id.raw())?;
910 writer.write_u16_aligned(component.id.get())?;
911 writer.write_varu32(changed_fields as u32)?;
912 for (idx, field) in component.fields.iter().enumerate() {
913 if field_mask[idx] {
914 writer.align_to_byte()?;
915 writer.write_varu32(idx as u32)?;
916 write_field_value_sparse(
917 component.id,
918 *field,
919 curr_component.fields[idx],
920 writer,
921 )?;
922 }
923 }
924 }
925 }
926 }
927 }
928 baseline_next = baseline_iter.next();
929 current_next = current_iter.next();
930 }
931 }
932 }
933
934 writer.align_to_byte()?;
935 Ok(())
936}
937
938fn encode_update_body_sparse_packed(
939 schema: &schema::Schema,
940 baseline: &Snapshot,
941 current: &Snapshot,
942 update_count: usize,
943 limits: &CodecLimits,
944 scratch: &mut CodecScratch,
945 writer: &mut BitWriter<'_>,
946) -> CodecResult<()> {
947 if update_count > limits.max_entities_update {
948 return Err(CodecError::LimitsExceeded {
949 kind: LimitKind::EntitiesUpdate,
950 limit: limits.max_entities_update,
951 actual: update_count,
952 });
953 }
954
955 let entry_count = count_sparse_update_entries(schema, baseline, current, limits, scratch)?;
956 let entry_limit = limits
957 .max_entities_update
958 .saturating_mul(limits.max_components_per_entity);
959 if entry_count > entry_limit {
960 return Err(CodecError::LimitsExceeded {
961 kind: LimitKind::EntitiesUpdate,
962 limit: entry_limit,
963 actual: entry_count,
964 });
965 }
966
967 writer.align_to_byte()?;
968 writer.write_varu32(entry_count as u32)?;
969
970 let mut baseline_iter = baseline.entities.iter();
971 let mut current_iter = current.entities.iter();
972 let mut baseline_next = baseline_iter.next();
973 let mut current_next = current_iter.next();
974 let component_count = schema.components.len();
975
976 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
977 match base.id.cmp(&curr.id) {
978 Ordering::Less => {
979 baseline_next = baseline_iter.next();
980 }
981 Ordering::Greater => {
982 current_next = current_iter.next();
983 }
984 Ordering::Equal => {
985 if entity_has_updates(schema, base, curr, limits)? {
986 for component in &schema.components {
987 let base_component = find_component(base, component.id);
988 let curr_component = find_component(curr, component.id);
989 if base_component.is_some() != curr_component.is_some() {
990 return Err(CodecError::InvalidMask {
991 kind: MaskKind::ComponentMask,
992 reason: MaskReason::ComponentPresenceMismatch {
993 component: component.id,
994 },
995 });
996 }
997 if let (Some(base_component), Some(curr_component)) =
998 (base_component, curr_component)
999 {
1000 if base_component.fields.len() != component.fields.len()
1001 || curr_component.fields.len() != component.fields.len()
1002 {
1003 return Err(CodecError::InvalidMask {
1004 kind: MaskKind::FieldMask {
1005 component: component.id,
1006 },
1007 reason: MaskReason::FieldCountMismatch {
1008 expected: component.fields.len(),
1009 actual: base_component
1010 .fields
1011 .len()
1012 .max(curr_component.fields.len()),
1013 },
1014 });
1015 }
1016 if component.fields.len() > limits.max_fields_per_component {
1017 return Err(CodecError::LimitsExceeded {
1018 kind: LimitKind::FieldsPerComponent,
1019 limit: limits.max_fields_per_component,
1020 actual: component.fields.len(),
1021 });
1022 }
1023 let (_, field_mask) = scratch.component_and_field_masks_mut(
1024 component_count,
1025 component.fields.len(),
1026 );
1027 let field_mask = compute_field_mask_into(
1028 component,
1029 base_component,
1030 curr_component,
1031 field_mask,
1032 )?;
1033 let changed_fields = field_mask.iter().filter(|bit| **bit).count();
1034 if changed_fields > 0 {
1035 writer.align_to_byte()?;
1036 writer.write_varu32(curr.id.raw())?;
1037 writer.write_varu32(component.id.get() as u32)?;
1038 writer.write_varu32(changed_fields as u32)?;
1039 let index_bits =
1040 required_bits(component.fields.len().saturating_sub(1) as u64);
1041 for (idx, field) in component.fields.iter().enumerate() {
1042 if field_mask[idx] {
1043 if index_bits > 0 {
1044 writer.write_bits(idx as u64, index_bits)?;
1045 }
1046 write_field_value_sparse(
1047 component.id,
1048 *field,
1049 curr_component.fields[idx],
1050 writer,
1051 )?;
1052 }
1053 }
1054 }
1055 }
1056 }
1057 }
1058 baseline_next = baseline_iter.next();
1059 current_next = current_iter.next();
1060 }
1061 }
1062 }
1063
1064 writer.align_to_byte()?;
1065 Ok(())
1066}
1067
1068fn count_sparse_update_entries(
1069 schema: &schema::Schema,
1070 baseline: &Snapshot,
1071 current: &Snapshot,
1072 limits: &CodecLimits,
1073 scratch: &mut CodecScratch,
1074) -> CodecResult<usize> {
1075 let component_count = schema.components.len();
1076 let mut count = 0usize;
1077 let mut baseline_iter = baseline.entities.iter();
1078 let mut current_iter = current.entities.iter();
1079 let mut baseline_next = baseline_iter.next();
1080 let mut current_next = current_iter.next();
1081
1082 while let (Some(base), Some(curr)) = (baseline_next, current_next) {
1083 match base.id.cmp(&curr.id) {
1084 Ordering::Less => baseline_next = baseline_iter.next(),
1085 Ordering::Greater => current_next = current_iter.next(),
1086 Ordering::Equal => {
1087 if entity_has_updates(schema, base, curr, limits)? {
1088 for component in &schema.components {
1089 let base_component = find_component(base, component.id);
1090 let curr_component = find_component(curr, component.id);
1091 if base_component.is_some() != curr_component.is_some() {
1092 return Err(CodecError::InvalidMask {
1093 kind: MaskKind::ComponentMask,
1094 reason: MaskReason::ComponentPresenceMismatch {
1095 component: component.id,
1096 },
1097 });
1098 }
1099 if let (Some(base_component), Some(curr_component)) =
1100 (base_component, curr_component)
1101 {
1102 if base_component.fields.len() != component.fields.len()
1103 || curr_component.fields.len() != component.fields.len()
1104 {
1105 return Err(CodecError::InvalidMask {
1106 kind: MaskKind::FieldMask {
1107 component: component.id,
1108 },
1109 reason: MaskReason::FieldCountMismatch {
1110 expected: component.fields.len(),
1111 actual: base_component
1112 .fields
1113 .len()
1114 .max(curr_component.fields.len()),
1115 },
1116 });
1117 }
1118 if component.fields.len() > limits.max_fields_per_component {
1119 return Err(CodecError::LimitsExceeded {
1120 kind: LimitKind::FieldsPerComponent,
1121 limit: limits.max_fields_per_component,
1122 actual: component.fields.len(),
1123 });
1124 }
1125 let (_, field_mask) = scratch.component_and_field_masks_mut(
1126 component_count,
1127 component.fields.len(),
1128 );
1129 let field_mask = compute_field_mask_into(
1130 component,
1131 base_component,
1132 curr_component,
1133 field_mask,
1134 )?;
1135 if field_mask.iter().any(|bit| *bit) {
1136 count += 1;
1137 }
1138 }
1139 }
1140 }
1141 baseline_next = baseline_iter.next();
1142 current_next = current_iter.next();
1143 }
1144 }
1145 }
1146
1147 Ok(count)
1148}
1149
1150fn write_create_entity(
1151 schema: &schema::Schema,
1152 entity: &EntitySnapshot,
1153 limits: &CodecLimits,
1154 writer: &mut BitWriter<'_>,
1155) -> CodecResult<()> {
1156 writer.align_to_byte()?;
1157 writer.write_u32_aligned(entity.id.raw())?;
1158 ensure_known_components(schema, entity)?;
1159 write_component_mask(schema, entity, writer)?;
1160 for component in schema.components.iter() {
1161 if let Some(snapshot) = find_component(entity, component.id) {
1162 write_full_component(component, snapshot, limits, writer)?;
1163 }
1164 }
1165 Ok(())
1166}
1167
1168fn decode_destroy_section(body: &[u8], limits: &CodecLimits) -> CodecResult<Vec<EntityId>> {
1169 if body.len() > limits.max_section_bytes {
1170 return Err(CodecError::LimitsExceeded {
1171 kind: LimitKind::SectionBytes,
1172 limit: limits.max_section_bytes,
1173 actual: body.len(),
1174 });
1175 }
1176
1177 let mut reader = BitReader::new(body);
1178 reader.align_to_byte()?;
1179 let count = reader.read_varu32()? as usize;
1180 if count > limits.max_entities_destroy {
1181 return Err(CodecError::LimitsExceeded {
1182 kind: LimitKind::EntitiesDestroy,
1183 limit: limits.max_entities_destroy,
1184 actual: count,
1185 });
1186 }
1187
1188 let mut ids = Vec::with_capacity(count);
1189 let mut prev: Option<u32> = None;
1190 for _ in 0..count {
1191 reader.align_to_byte()?;
1192 let id = reader.read_u32_aligned()?;
1193 if let Some(prev_id) = prev {
1194 if id <= prev_id {
1195 return Err(CodecError::InvalidEntityOrder {
1196 previous: prev_id,
1197 current: id,
1198 });
1199 }
1200 }
1201 prev = Some(id);
1202 ids.push(EntityId::new(id));
1203 }
1204 reader.align_to_byte()?;
1205 if reader.bits_remaining() != 0 {
1206 return Err(CodecError::TrailingSectionData {
1207 section: SectionTag::EntityDestroy,
1208 remaining_bits: reader.bits_remaining(),
1209 });
1210 }
1211 Ok(ids)
1212}
1213
1214fn decode_create_section(
1215 schema: &schema::Schema,
1216 body: &[u8],
1217 limits: &CodecLimits,
1218) -> CodecResult<Vec<EntitySnapshot>> {
1219 if body.len() > limits.max_section_bytes {
1220 return Err(CodecError::LimitsExceeded {
1221 kind: LimitKind::SectionBytes,
1222 limit: limits.max_section_bytes,
1223 actual: body.len(),
1224 });
1225 }
1226
1227 let mut reader = BitReader::new(body);
1228 reader.align_to_byte()?;
1229 let count = reader.read_varu32()? as usize;
1230 if count > limits.max_entities_create {
1231 return Err(CodecError::LimitsExceeded {
1232 kind: LimitKind::EntitiesCreate,
1233 limit: limits.max_entities_create,
1234 actual: count,
1235 });
1236 }
1237
1238 let mut entities = Vec::with_capacity(count);
1239 let mut prev: Option<u32> = None;
1240 for _ in 0..count {
1241 reader.align_to_byte()?;
1242 let id = reader.read_u32_aligned()?;
1243 if let Some(prev_id) = prev {
1244 if id <= prev_id {
1245 return Err(CodecError::InvalidEntityOrder {
1246 previous: prev_id,
1247 current: id,
1248 });
1249 }
1250 }
1251 prev = Some(id);
1252
1253 let component_mask = read_mask(
1254 &mut reader,
1255 schema.components.len(),
1256 MaskKind::ComponentMask,
1257 )?;
1258
1259 let mut components = Vec::new();
1260 for (idx, component) in schema.components.iter().enumerate() {
1261 if component_mask[idx] {
1262 let fields = decode_full_component(component, &mut reader, limits)?;
1263 components.push(ComponentSnapshot {
1264 id: component.id,
1265 fields,
1266 });
1267 }
1268 }
1269
1270 let entity = EntitySnapshot {
1271 id: EntityId::new(id),
1272 components,
1273 };
1274 ensure_known_components(schema, &entity)?;
1275 entities.push(entity);
1276 }
1277
1278 reader.align_to_byte()?;
1279 if reader.bits_remaining() != 0 {
1280 return Err(CodecError::TrailingSectionData {
1281 section: SectionTag::EntityCreate,
1282 remaining_bits: reader.bits_remaining(),
1283 });
1284 }
1285 Ok(entities)
1286}
1287
1288fn decode_update_section_masked(
1289 schema: &schema::Schema,
1290 body: &[u8],
1291 limits: &CodecLimits,
1292) -> CodecResult<Vec<DeltaUpdateEntity>> {
1293 if body.len() > limits.max_section_bytes {
1294 return Err(CodecError::LimitsExceeded {
1295 kind: LimitKind::SectionBytes,
1296 limit: limits.max_section_bytes,
1297 actual: body.len(),
1298 });
1299 }
1300
1301 let mut reader = BitReader::new(body);
1302 reader.align_to_byte()?;
1303 let count = reader.read_varu32()? as usize;
1304 if count > limits.max_entities_update {
1305 return Err(CodecError::LimitsExceeded {
1306 kind: LimitKind::EntitiesUpdate,
1307 limit: limits.max_entities_update,
1308 actual: count,
1309 });
1310 }
1311
1312 let mut updates = Vec::with_capacity(count);
1313 let mut prev: Option<u32> = None;
1314 for _ in 0..count {
1315 reader.align_to_byte()?;
1316 let id = reader.read_u32_aligned()?;
1317 if let Some(prev_id) = prev {
1318 if id <= prev_id {
1319 return Err(CodecError::InvalidEntityOrder {
1320 previous: prev_id,
1321 current: id,
1322 });
1323 }
1324 }
1325 prev = Some(id);
1326
1327 let component_mask = read_mask(
1328 &mut reader,
1329 schema.components.len(),
1330 MaskKind::ComponentMask,
1331 )?;
1332 let mut components = Vec::new();
1333 for (idx, component) in schema.components.iter().enumerate() {
1334 if component_mask[idx] {
1335 let fields = decode_update_component(component, &mut reader, limits)?;
1336 components.push(DeltaUpdateComponent {
1337 id: component.id,
1338 fields,
1339 });
1340 }
1341 }
1342
1343 updates.push(DeltaUpdateEntity {
1344 id: EntityId::new(id),
1345 components,
1346 });
1347 }
1348
1349 reader.align_to_byte()?;
1350 if reader.bits_remaining() != 0 {
1351 return Err(CodecError::TrailingSectionData {
1352 section: SectionTag::EntityUpdate,
1353 remaining_bits: reader.bits_remaining(),
1354 });
1355 }
1356 Ok(updates)
1357}
1358
1359fn decode_update_section_sparse_varint(
1360 schema: &schema::Schema,
1361 body: &[u8],
1362 limits: &CodecLimits,
1363) -> CodecResult<Vec<DeltaUpdateEntity>> {
1364 if body.len() > limits.max_section_bytes {
1365 return Err(CodecError::LimitsExceeded {
1366 kind: LimitKind::SectionBytes,
1367 limit: limits.max_section_bytes,
1368 actual: body.len(),
1369 });
1370 }
1371
1372 let mut reader = BitReader::new(body);
1373 reader.align_to_byte()?;
1374 let entry_count = reader.read_varu32()? as usize;
1375 let entry_limit = limits
1376 .max_entities_update
1377 .saturating_mul(limits.max_components_per_entity);
1378 if entry_count > entry_limit {
1379 return Err(CodecError::LimitsExceeded {
1380 kind: LimitKind::EntitiesUpdate,
1381 limit: entry_limit,
1382 actual: entry_count,
1383 });
1384 }
1385
1386 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1387 let mut prev_entity: Option<u32> = None;
1388 let mut prev_component: Option<u16> = None;
1389 for _ in 0..entry_count {
1390 reader.align_to_byte()?;
1391 let entity_id = reader.read_u32_aligned()?;
1392 let component_raw = reader.read_u16_aligned()?;
1393 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1394 kind: MaskKind::ComponentMask,
1395 reason: MaskReason::InvalidComponentId { raw: component_raw },
1396 })?;
1397 if let Some(prev) = prev_entity {
1398 if entity_id < prev {
1399 return Err(CodecError::InvalidEntityOrder {
1400 previous: prev,
1401 current: entity_id,
1402 });
1403 }
1404 if entity_id == prev {
1405 if let Some(prev_component) = prev_component {
1406 if component_raw <= prev_component {
1407 return Err(CodecError::InvalidEntityOrder {
1408 previous: prev,
1409 current: entity_id,
1410 });
1411 }
1412 }
1413 }
1414 }
1415 prev_entity = Some(entity_id);
1416 prev_component = Some(component_raw);
1417
1418 let component = schema
1419 .components
1420 .iter()
1421 .find(|component| component.id == component_id)
1422 .ok_or(CodecError::InvalidMask {
1423 kind: MaskKind::ComponentMask,
1424 reason: MaskReason::UnknownComponent {
1425 component: component_id,
1426 },
1427 })?;
1428
1429 let field_count = reader.read_varu32()? as usize;
1430 if field_count == 0 {
1431 return Err(CodecError::InvalidMask {
1432 kind: MaskKind::FieldMask {
1433 component: component.id,
1434 },
1435 reason: MaskReason::EmptyFieldMask {
1436 component: component.id,
1437 },
1438 });
1439 }
1440 if field_count > limits.max_fields_per_component {
1441 return Err(CodecError::LimitsExceeded {
1442 kind: LimitKind::FieldsPerComponent,
1443 limit: limits.max_fields_per_component,
1444 actual: field_count,
1445 });
1446 }
1447 if field_count > component.fields.len() {
1448 return Err(CodecError::InvalidMask {
1449 kind: MaskKind::FieldMask {
1450 component: component.id,
1451 },
1452 reason: MaskReason::FieldCountMismatch {
1453 expected: component.fields.len(),
1454 actual: field_count,
1455 },
1456 });
1457 }
1458
1459 let mut fields = Vec::with_capacity(field_count);
1460 let mut prev_index: Option<usize> = None;
1461 for _ in 0..field_count {
1462 reader.align_to_byte()?;
1463 let field_index = reader.read_varu32()? as usize;
1464 if field_index >= component.fields.len() {
1465 return Err(CodecError::InvalidMask {
1466 kind: MaskKind::FieldMask {
1467 component: component.id,
1468 },
1469 reason: MaskReason::InvalidFieldIndex {
1470 field_index,
1471 max: component.fields.len(),
1472 },
1473 });
1474 }
1475 if let Some(prev_index) = prev_index {
1476 if field_index <= prev_index {
1477 return Err(CodecError::InvalidMask {
1478 kind: MaskKind::FieldMask {
1479 component: component.id,
1480 },
1481 reason: MaskReason::InvalidFieldIndex {
1482 field_index,
1483 max: component.fields.len(),
1484 },
1485 });
1486 }
1487 }
1488 prev_index = Some(field_index);
1489 let field = component.fields[field_index];
1490 let value = read_field_value_sparse(component.id, field, &mut reader)?;
1491 fields.push((field_index, value));
1492 }
1493
1494 if let Some(last) = updates.last_mut() {
1495 if last.id.raw() == entity_id {
1496 last.components.push(DeltaUpdateComponent {
1497 id: component.id,
1498 fields,
1499 });
1500 continue;
1501 }
1502 }
1503
1504 updates.push(DeltaUpdateEntity {
1505 id: EntityId::new(entity_id),
1506 components: vec![DeltaUpdateComponent {
1507 id: component.id,
1508 fields,
1509 }],
1510 });
1511 }
1512
1513 reader.align_to_byte()?;
1514 if reader.bits_remaining() != 0 {
1515 return Err(CodecError::TrailingSectionData {
1516 section: SectionTag::EntityUpdateSparse,
1517 remaining_bits: reader.bits_remaining(),
1518 });
1519 }
1520
1521 let unique_entities = updates.len();
1522 if unique_entities > limits.max_entities_update {
1523 return Err(CodecError::LimitsExceeded {
1524 kind: LimitKind::EntitiesUpdate,
1525 limit: limits.max_entities_update,
1526 actual: unique_entities,
1527 });
1528 }
1529
1530 Ok(updates)
1531}
1532
1533fn decode_update_section_sparse_packed(
1534 schema: &schema::Schema,
1535 body: &[u8],
1536 limits: &CodecLimits,
1537) -> CodecResult<Vec<DeltaUpdateEntity>> {
1538 if body.len() > limits.max_section_bytes {
1539 return Err(CodecError::LimitsExceeded {
1540 kind: LimitKind::SectionBytes,
1541 limit: limits.max_section_bytes,
1542 actual: body.len(),
1543 });
1544 }
1545
1546 let mut reader = BitReader::new(body);
1547 reader.align_to_byte()?;
1548 let entry_count = reader.read_varu32()? as usize;
1549 let entry_limit = limits
1550 .max_entities_update
1551 .saturating_mul(limits.max_components_per_entity);
1552 if entry_count > entry_limit {
1553 return Err(CodecError::LimitsExceeded {
1554 kind: LimitKind::EntitiesUpdate,
1555 limit: entry_limit,
1556 actual: entry_count,
1557 });
1558 }
1559
1560 let mut updates: Vec<DeltaUpdateEntity> = Vec::new();
1561 let mut prev_entity: Option<u32> = None;
1562 let mut prev_component: Option<u16> = None;
1563 for _ in 0..entry_count {
1564 reader.align_to_byte()?;
1565 let entity_id = reader.read_varu32()?;
1566 let component_raw = reader.read_varu32()?;
1567 if component_raw > u16::MAX as u32 {
1568 return Err(CodecError::InvalidMask {
1569 kind: MaskKind::ComponentMask,
1570 reason: MaskReason::InvalidComponentId { raw: u16::MAX },
1571 });
1572 }
1573 let component_raw = component_raw as u16;
1574 let component_id = ComponentId::new(component_raw).ok_or(CodecError::InvalidMask {
1575 kind: MaskKind::ComponentMask,
1576 reason: MaskReason::InvalidComponentId { raw: component_raw },
1577 })?;
1578 if let Some(prev) = prev_entity {
1579 if entity_id < prev {
1580 return Err(CodecError::InvalidEntityOrder {
1581 previous: prev,
1582 current: entity_id,
1583 });
1584 }
1585 if entity_id == prev {
1586 if let Some(prev_component) = prev_component {
1587 if component_raw <= prev_component {
1588 return Err(CodecError::InvalidEntityOrder {
1589 previous: prev,
1590 current: entity_id,
1591 });
1592 }
1593 }
1594 }
1595 }
1596 prev_entity = Some(entity_id);
1597 prev_component = Some(component_raw);
1598
1599 let component = schema
1600 .components
1601 .iter()
1602 .find(|component| component.id == component_id)
1603 .ok_or(CodecError::InvalidMask {
1604 kind: MaskKind::ComponentMask,
1605 reason: MaskReason::UnknownComponent {
1606 component: component_id,
1607 },
1608 })?;
1609
1610 let field_count = reader.read_varu32()? as usize;
1611 if field_count == 0 {
1612 return Err(CodecError::InvalidMask {
1613 kind: MaskKind::FieldMask {
1614 component: component.id,
1615 },
1616 reason: MaskReason::EmptyFieldMask {
1617 component: component.id,
1618 },
1619 });
1620 }
1621 if field_count > limits.max_fields_per_component {
1622 return Err(CodecError::LimitsExceeded {
1623 kind: LimitKind::FieldsPerComponent,
1624 limit: limits.max_fields_per_component,
1625 actual: field_count,
1626 });
1627 }
1628 if field_count > component.fields.len() {
1629 return Err(CodecError::InvalidMask {
1630 kind: MaskKind::FieldMask {
1631 component: component.id,
1632 },
1633 reason: MaskReason::FieldCountMismatch {
1634 expected: component.fields.len(),
1635 actual: field_count,
1636 },
1637 });
1638 }
1639
1640 let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64);
1641 let mut fields = Vec::with_capacity(field_count);
1642 let mut prev_index: Option<usize> = None;
1643 for _ in 0..field_count {
1644 let field_index = if index_bits == 0 {
1645 0
1646 } else {
1647 reader.read_bits(index_bits)? as usize
1648 };
1649 if field_index >= component.fields.len() {
1650 return Err(CodecError::InvalidMask {
1651 kind: MaskKind::FieldMask {
1652 component: component.id,
1653 },
1654 reason: MaskReason::InvalidFieldIndex {
1655 field_index,
1656 max: component.fields.len(),
1657 },
1658 });
1659 }
1660 if let Some(prev_index) = prev_index {
1661 if field_index <= prev_index {
1662 return Err(CodecError::InvalidMask {
1663 kind: MaskKind::FieldMask {
1664 component: component.id,
1665 },
1666 reason: MaskReason::InvalidFieldIndex {
1667 field_index,
1668 max: component.fields.len(),
1669 },
1670 });
1671 }
1672 }
1673 prev_index = Some(field_index);
1674 let field = component.fields[field_index];
1675 let value = read_field_value_sparse(component.id, field, &mut reader)?;
1676 fields.push((field_index, value));
1677 }
1678
1679 if let Some(last) = updates.last_mut() {
1680 if last.id.raw() == entity_id {
1681 last.components.push(DeltaUpdateComponent {
1682 id: component.id,
1683 fields,
1684 });
1685 continue;
1686 }
1687 }
1688
1689 updates.push(DeltaUpdateEntity {
1690 id: EntityId::new(entity_id),
1691 components: vec![DeltaUpdateComponent {
1692 id: component.id,
1693 fields,
1694 }],
1695 });
1696 }
1697
1698 reader.align_to_byte()?;
1699 if reader.bits_remaining() != 0 {
1700 return Err(CodecError::TrailingSectionData {
1701 section: SectionTag::EntityUpdateSparsePacked,
1702 remaining_bits: reader.bits_remaining(),
1703 });
1704 }
1705
1706 let unique_entities = updates.len();
1707 if unique_entities > limits.max_entities_update {
1708 return Err(CodecError::LimitsExceeded {
1709 kind: LimitKind::EntitiesUpdate,
1710 limit: limits.max_entities_update,
1711 actual: unique_entities,
1712 });
1713 }
1714
1715 Ok(updates)
1716}
1717
1718fn decode_delta_sections(
1719 schema: &schema::Schema,
1720 packet: &WirePacket<'_>,
1721 limits: &CodecLimits,
1722) -> CodecResult<(Vec<EntityId>, Vec<EntitySnapshot>, Vec<DeltaUpdateEntity>)> {
1723 let mut destroys: Option<Vec<EntityId>> = None;
1724 let mut creates: Option<Vec<EntitySnapshot>> = None;
1725 let mut updates_masked: Option<Vec<DeltaUpdateEntity>> = None;
1726 let mut updates_sparse: Option<Vec<DeltaUpdateEntity>> = None;
1727
1728 for section in &packet.sections {
1729 match section.tag {
1730 SectionTag::EntityDestroy => {
1731 if destroys.is_some() {
1732 return Err(CodecError::DuplicateSection {
1733 section: section.tag,
1734 });
1735 }
1736 destroys = Some(decode_destroy_section(section.body, limits)?);
1737 }
1738 SectionTag::EntityCreate => {
1739 if creates.is_some() {
1740 return Err(CodecError::DuplicateSection {
1741 section: section.tag,
1742 });
1743 }
1744 creates = Some(decode_create_section(schema, section.body, limits)?);
1745 }
1746 SectionTag::EntityUpdate => {
1747 if updates_masked.is_some() {
1748 return Err(CodecError::DuplicateSection {
1749 section: section.tag,
1750 });
1751 }
1752 updates_masked = Some(decode_update_section_masked(schema, section.body, limits)?);
1753 }
1754 SectionTag::EntityUpdateSparse => {
1755 if updates_sparse.is_some() {
1756 return Err(CodecError::DuplicateSection {
1757 section: section.tag,
1758 });
1759 }
1760 updates_sparse = Some(decode_update_section_sparse_varint(
1761 schema,
1762 section.body,
1763 limits,
1764 )?);
1765 }
1766 SectionTag::EntityUpdateSparsePacked => {
1767 if updates_sparse.is_some() {
1768 return Err(CodecError::DuplicateSection {
1769 section: section.tag,
1770 });
1771 }
1772 updates_sparse = Some(decode_update_section_sparse_packed(
1773 schema,
1774 section.body,
1775 limits,
1776 )?);
1777 }
1778 _ => {
1779 return Err(CodecError::UnexpectedSection {
1780 section: section.tag,
1781 });
1782 }
1783 }
1784 }
1785
1786 Ok((
1787 destroys.unwrap_or_default(),
1788 creates.unwrap_or_default(),
1789 match (updates_masked, updates_sparse) {
1790 (Some(_), Some(_)) => return Err(CodecError::DuplicateUpdateEncoding),
1791 (Some(updates), None) => updates,
1792 (None, Some(updates)) => updates,
1793 (None, None) => Vec::new(),
1794 },
1795 ))
1796}
1797
1798#[derive(Debug, Clone, PartialEq, Eq)]
1799pub struct DeltaDecoded {
1800 pub tick: SnapshotTick,
1801 pub baseline_tick: SnapshotTick,
1802 pub destroys: Vec<EntityId>,
1803 pub creates: Vec<EntitySnapshot>,
1804 pub updates: Vec<DeltaUpdateEntity>,
1805}
1806
1807#[derive(Debug, Clone, PartialEq, Eq)]
1808pub struct DeltaUpdateEntity {
1809 pub id: EntityId,
1810 pub components: Vec<DeltaUpdateComponent>,
1811}
1812
1813#[derive(Debug, Clone, PartialEq, Eq)]
1814pub struct DeltaUpdateComponent {
1815 pub id: ComponentId,
1816 pub fields: Vec<(usize, FieldValue)>,
1817}
1818
1819fn apply_destroys(
1820 baseline: &[EntitySnapshot],
1821 destroys: &[EntityId],
1822) -> CodecResult<Vec<EntitySnapshot>> {
1823 let mut result = Vec::with_capacity(baseline.len());
1824 let mut i = 0usize;
1825 let mut j = 0usize;
1826 while i < baseline.len() || j < destroys.len() {
1827 let base = baseline.get(i);
1828 let destroy = destroys.get(j);
1829 match (base, destroy) {
1830 (Some(b), Some(d)) => {
1831 if b.id.raw() < d.raw() {
1832 result.push(b.clone());
1833 i += 1;
1834 } else if b.id.raw() > d.raw() {
1835 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
1836 } else {
1837 i += 1;
1838 j += 1;
1839 }
1840 }
1841 (Some(b), None) => {
1842 result.push(b.clone());
1843 i += 1;
1844 }
1845 (None, Some(d)) => {
1846 return Err(CodecError::EntityNotFound { entity_id: d.raw() });
1847 }
1848 (None, None) => break,
1849 }
1850 }
1851 Ok(result)
1852}
1853
1854fn apply_creates(
1855 baseline: Vec<EntitySnapshot>,
1856 creates: Vec<EntitySnapshot>,
1857) -> CodecResult<Vec<EntitySnapshot>> {
1858 let mut result = Vec::with_capacity(baseline.len() + creates.len());
1859 let mut i = 0usize;
1860 let mut j = 0usize;
1861 while i < baseline.len() || j < creates.len() {
1862 let base = baseline.get(i);
1863 let create = creates.get(j);
1864 match (base, create) {
1865 (Some(b), Some(c)) => {
1866 if b.id.raw() < c.id.raw() {
1867 result.push(b.clone());
1868 i += 1;
1869 } else if b.id.raw() > c.id.raw() {
1870 result.push(c.clone());
1871 j += 1;
1872 } else {
1873 return Err(CodecError::EntityAlreadyExists {
1874 entity_id: c.id.raw(),
1875 });
1876 }
1877 }
1878 (Some(b), None) => {
1879 result.push(b.clone());
1880 i += 1;
1881 }
1882 (None, Some(c)) => {
1883 result.push(c.clone());
1884 j += 1;
1885 }
1886 (None, None) => break,
1887 }
1888 }
1889 Ok(result)
1890}
1891
1892fn apply_updates(
1893 entities: &mut [EntitySnapshot],
1894 updates: &[DeltaUpdateEntity],
1895) -> CodecResult<()> {
1896 for update in updates {
1897 let idx = entities
1898 .binary_search_by_key(&update.id.raw(), |e| e.id.raw())
1899 .map_err(|_| CodecError::EntityNotFound {
1900 entity_id: update.id.raw(),
1901 })?;
1902 let entity = &mut entities[idx];
1903 for component_update in &update.components {
1904 let component = entity
1905 .components
1906 .iter_mut()
1907 .find(|c| c.id == component_update.id)
1908 .ok_or_else(|| CodecError::ComponentNotFound {
1909 entity_id: update.id.raw(),
1910 component_id: component_update.id.get(),
1911 })?;
1912 for (field_idx, value) in &component_update.fields {
1913 if *field_idx >= component.fields.len() {
1914 return Err(CodecError::InvalidMask {
1915 kind: MaskKind::FieldMask {
1916 component: component_update.id,
1917 },
1918 reason: MaskReason::FieldCountMismatch {
1919 expected: component.fields.len(),
1920 actual: *field_idx + 1,
1921 },
1922 });
1923 }
1924 component.fields[*field_idx] = *value;
1925 }
1926 }
1927 }
1928 Ok(())
1929}
1930
1931fn ensure_entities_sorted(entities: &[EntitySnapshot]) -> CodecResult<()> {
1932 let mut prev: Option<u32> = None;
1933 for entity in entities {
1934 if let Some(prev_id) = prev {
1935 if entity.id.raw() <= prev_id {
1936 return Err(CodecError::InvalidEntityOrder {
1937 previous: prev_id,
1938 current: entity.id.raw(),
1939 });
1940 }
1941 }
1942 prev = Some(entity.id.raw());
1943 }
1944 Ok(())
1945}
1946
1947fn write_component_mask(
1948 schema: &schema::Schema,
1949 entity: &EntitySnapshot,
1950 writer: &mut BitWriter<'_>,
1951) -> CodecResult<()> {
1952 for component in &schema.components {
1953 let present = find_component(entity, component.id).is_some();
1954 writer.write_bit(present)?;
1955 }
1956 Ok(())
1957}
1958
1959fn write_full_component(
1960 component: &ComponentDef,
1961 snapshot: &ComponentSnapshot,
1962 limits: &CodecLimits,
1963 writer: &mut BitWriter<'_>,
1964) -> CodecResult<()> {
1965 if snapshot.fields.len() != component.fields.len() {
1966 return Err(CodecError::InvalidMask {
1967 kind: MaskKind::FieldMask {
1968 component: component.id,
1969 },
1970 reason: MaskReason::FieldCountMismatch {
1971 expected: component.fields.len(),
1972 actual: snapshot.fields.len(),
1973 },
1974 });
1975 }
1976 if snapshot.fields.len() > limits.max_fields_per_component {
1977 return Err(CodecError::LimitsExceeded {
1978 kind: LimitKind::FieldsPerComponent,
1979 limit: limits.max_fields_per_component,
1980 actual: snapshot.fields.len(),
1981 });
1982 }
1983
1984 for _ in &component.fields {
1985 writer.write_bit(true)?;
1986 }
1987 for (field, value) in component.fields.iter().zip(snapshot.fields.iter()) {
1988 write_field_value(component.id, *field, *value, writer)?;
1989 }
1990 Ok(())
1991}
1992
1993fn decode_full_component(
1994 component: &ComponentDef,
1995 reader: &mut BitReader<'_>,
1996 limits: &CodecLimits,
1997) -> CodecResult<Vec<FieldValue>> {
1998 if component.fields.len() > limits.max_fields_per_component {
1999 return Err(CodecError::LimitsExceeded {
2000 kind: LimitKind::FieldsPerComponent,
2001 limit: limits.max_fields_per_component,
2002 actual: component.fields.len(),
2003 });
2004 }
2005
2006 let mask = read_mask(
2007 reader,
2008 component.fields.len(),
2009 MaskKind::FieldMask {
2010 component: component.id,
2011 },
2012 )?;
2013 let mut values = Vec::with_capacity(component.fields.len());
2014 for (idx, field) in component.fields.iter().enumerate() {
2015 if !mask[idx] {
2016 return Err(CodecError::InvalidMask {
2017 kind: MaskKind::FieldMask {
2018 component: component.id,
2019 },
2020 reason: MaskReason::MissingField { field: field.id },
2021 });
2022 }
2023 values.push(read_field_value(component.id, *field, reader)?);
2024 }
2025 Ok(values)
2026}
2027
2028fn write_update_components(
2029 schema: &schema::Schema,
2030 baseline: &EntitySnapshot,
2031 current: &EntitySnapshot,
2032 limits: &CodecLimits,
2033 scratch: &mut CodecScratch,
2034 writer: &mut BitWriter<'_>,
2035) -> CodecResult<()> {
2036 let component_count = schema.components.len();
2037 let (component_changed, _) = scratch.component_and_field_masks_mut(component_count, 0);
2038 component_changed.fill(false);
2039 for (idx, component) in schema.components.iter().enumerate() {
2040 let base = find_component(baseline, component.id);
2041 let curr = find_component(current, component.id);
2042 if base.is_some() != curr.is_some() {
2043 return Err(CodecError::InvalidMask {
2044 kind: MaskKind::ComponentMask,
2045 reason: MaskReason::ComponentPresenceMismatch {
2046 component: component.id,
2047 },
2048 });
2049 }
2050 if let (Some(base), Some(curr)) = (base, curr) {
2051 if base.fields.len() != component.fields.len()
2052 || curr.fields.len() != component.fields.len()
2053 {
2054 return Err(CodecError::InvalidMask {
2055 kind: MaskKind::FieldMask {
2056 component: component.id,
2057 },
2058 reason: MaskReason::FieldCountMismatch {
2059 expected: component.fields.len(),
2060 actual: base.fields.len().max(curr.fields.len()),
2061 },
2062 });
2063 }
2064 if component.fields.len() > limits.max_fields_per_component {
2065 return Err(CodecError::LimitsExceeded {
2066 kind: LimitKind::FieldsPerComponent,
2067 limit: limits.max_fields_per_component,
2068 actual: component.fields.len(),
2069 });
2070 }
2071 let (component_changed, field_mask) =
2072 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2073 let any_changed = compute_field_mask_into(component, base, curr, field_mask)?
2074 .iter()
2075 .any(|b| *b);
2076 writer.write_bit(any_changed)?;
2077 if any_changed {
2078 component_changed[idx] = true;
2079 }
2080 } else {
2081 writer.write_bit(false)?;
2082 }
2083 }
2084
2085 for (idx, component) in schema.components.iter().enumerate() {
2086 let (base, curr) = match (
2087 find_component(baseline, component.id),
2088 find_component(current, component.id),
2089 ) {
2090 (Some(base), Some(curr)) => (base, curr),
2091 _ => continue,
2092 };
2093 if component.fields.len() > limits.max_fields_per_component {
2094 return Err(CodecError::LimitsExceeded {
2095 kind: LimitKind::FieldsPerComponent,
2096 limit: limits.max_fields_per_component,
2097 actual: component.fields.len(),
2098 });
2099 }
2100 let (component_changed, field_mask) =
2101 scratch.component_and_field_masks_mut(component_count, component.fields.len());
2102 if component_changed[idx] {
2103 let field_mask = compute_field_mask_into(component, base, curr, field_mask)?;
2104 for bit in field_mask {
2105 writer.write_bit(*bit)?;
2106 }
2107 for (((field, _base_val), curr_val), changed) in component
2108 .fields
2109 .iter()
2110 .zip(base.fields.iter())
2111 .zip(curr.fields.iter())
2112 .zip(field_mask.iter())
2113 {
2114 if *changed {
2115 write_field_value(component.id, *field, *curr_val, writer)?;
2116 }
2117 }
2118 }
2119 }
2120 Ok(())
2121}
2122
2123fn decode_update_component(
2124 component: &ComponentDef,
2125 reader: &mut BitReader<'_>,
2126 limits: &CodecLimits,
2127) -> CodecResult<Vec<(usize, FieldValue)>> {
2128 if component.fields.len() > limits.max_fields_per_component {
2129 return Err(CodecError::LimitsExceeded {
2130 kind: LimitKind::FieldsPerComponent,
2131 limit: limits.max_fields_per_component,
2132 actual: component.fields.len(),
2133 });
2134 }
2135 let mask = read_mask(
2136 reader,
2137 component.fields.len(),
2138 MaskKind::FieldMask {
2139 component: component.id,
2140 },
2141 )?;
2142 if !mask.iter().any(|b| *b) {
2143 return Err(CodecError::InvalidMask {
2144 kind: MaskKind::FieldMask {
2145 component: component.id,
2146 },
2147 reason: MaskReason::EmptyFieldMask {
2148 component: component.id,
2149 },
2150 });
2151 }
2152 let mut fields = Vec::new();
2153 for (idx, field) in component.fields.iter().enumerate() {
2154 if mask[idx] {
2155 let value = read_field_value(component.id, *field, reader)?;
2156 fields.push((idx, value));
2157 }
2158 }
2159 Ok(fields)
2160}
2161
2162fn compute_field_mask_into<'a>(
2163 component: &ComponentDef,
2164 baseline: &ComponentSnapshot,
2165 current: &ComponentSnapshot,
2166 field_mask: &'a mut [bool],
2167) -> CodecResult<&'a [bool]> {
2168 for (((field, base_val), curr_val), slot) in component
2169 .fields
2170 .iter()
2171 .zip(baseline.fields.iter())
2172 .zip(current.fields.iter())
2173 .zip(field_mask.iter_mut())
2174 {
2175 *slot = field_changed(component.id, *field, *base_val, *curr_val)?;
2176 }
2177 Ok(field_mask)
2178}
2179
2180fn field_changed(
2181 component_id: ComponentId,
2182 field: FieldDef,
2183 baseline: FieldValue,
2184 current: FieldValue,
2185) -> CodecResult<bool> {
2186 match field.change {
2187 ChangePolicy::Always => field_differs(component_id, field, baseline, current),
2188 ChangePolicy::Threshold { threshold_q } => {
2189 field_exceeds_threshold(component_id, field, baseline, current, threshold_q)
2190 }
2191 }
2192}
2193
2194fn field_differs(
2195 component_id: ComponentId,
2196 field: FieldDef,
2197 baseline: FieldValue,
2198 current: FieldValue,
2199) -> CodecResult<bool> {
2200 match (baseline, current) {
2201 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2202 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a != b),
2203 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok(a != b),
2204 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a != b),
2205 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => Ok(a != b),
2206 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => Ok(a != b),
2207 _ => Err(CodecError::InvalidValue {
2208 component: component_id,
2209 field: field.id,
2210 reason: ValueReason::TypeMismatch {
2211 expected: codec_name(field.codec),
2212 found: value_name(current),
2213 },
2214 }),
2215 }
2216}
2217
2218fn field_exceeds_threshold(
2219 component_id: ComponentId,
2220 field: FieldDef,
2221 baseline: FieldValue,
2222 current: FieldValue,
2223 threshold_q: u32,
2224) -> CodecResult<bool> {
2225 let threshold_q = threshold_q as u64;
2226 match (baseline, current) {
2227 (FieldValue::FixedPoint(a), FieldValue::FixedPoint(b)) => {
2228 Ok((a - b).unsigned_abs() > threshold_q)
2229 }
2230 (FieldValue::UInt(a), FieldValue::UInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2231 (FieldValue::SInt(a), FieldValue::SInt(b)) => Ok((a - b).unsigned_abs() > threshold_q),
2232 (FieldValue::VarUInt(a), FieldValue::VarUInt(b)) => Ok(a.abs_diff(b) > threshold_q),
2233 (FieldValue::VarSInt(a), FieldValue::VarSInt(b)) => {
2234 Ok((a - b).unsigned_abs() > threshold_q)
2235 }
2236 (FieldValue::Bool(a), FieldValue::Bool(b)) => Ok(a != b),
2237 _ => Err(CodecError::InvalidValue {
2238 component: component_id,
2239 field: field.id,
2240 reason: ValueReason::TypeMismatch {
2241 expected: codec_name(field.codec),
2242 found: value_name(current),
2243 },
2244 }),
2245 }
2246}
2247
2248fn entity_has_updates(
2249 schema: &schema::Schema,
2250 baseline: &EntitySnapshot,
2251 current: &EntitySnapshot,
2252 limits: &CodecLimits,
2253) -> CodecResult<bool> {
2254 ensure_component_presence_matches(schema, baseline, current)?;
2255 for component in &schema.components {
2256 let base = find_component(baseline, component.id);
2257 let curr = find_component(current, component.id);
2258 if let (Some(base), Some(curr)) = (base, curr) {
2259 if base.fields.len() != component.fields.len()
2260 || curr.fields.len() != component.fields.len()
2261 {
2262 return Err(CodecError::InvalidMask {
2263 kind: MaskKind::FieldMask {
2264 component: component.id,
2265 },
2266 reason: MaskReason::FieldCountMismatch {
2267 expected: component.fields.len(),
2268 actual: base.fields.len().max(curr.fields.len()),
2269 },
2270 });
2271 }
2272 if component.fields.len() > limits.max_fields_per_component {
2273 return Err(CodecError::LimitsExceeded {
2274 kind: LimitKind::FieldsPerComponent,
2275 limit: limits.max_fields_per_component,
2276 actual: component.fields.len(),
2277 });
2278 }
2279 for ((field, base_val), curr_val) in component
2280 .fields
2281 .iter()
2282 .zip(base.fields.iter())
2283 .zip(curr.fields.iter())
2284 {
2285 if field_changed(component.id, *field, *base_val, *curr_val)? {
2286 return Ok(true);
2287 }
2288 }
2289 }
2290 }
2291 Ok(false)
2292}
2293
2294fn ensure_component_presence_matches(
2295 schema: &schema::Schema,
2296 baseline: &EntitySnapshot,
2297 current: &EntitySnapshot,
2298) -> CodecResult<()> {
2299 for component in &schema.components {
2301 let base = find_component(baseline, component.id).is_some();
2302 let curr = find_component(current, component.id).is_some();
2303 if base != curr {
2304 return Err(CodecError::InvalidMask {
2305 kind: MaskKind::ComponentMask,
2306 reason: MaskReason::ComponentPresenceMismatch {
2307 component: component.id,
2308 },
2309 });
2310 }
2311 }
2312 Ok(())
2313}
2314
2315fn find_component(entity: &EntitySnapshot, id: ComponentId) -> Option<&ComponentSnapshot> {
2316 entity.components.iter().find(|c| c.id == id)
2317}
2318
2319fn codec_name(codec: schema::FieldCodec) -> &'static str {
2320 match codec {
2321 schema::FieldCodec::Bool => "bool",
2322 schema::FieldCodec::UInt { .. } => "uint",
2323 schema::FieldCodec::SInt { .. } => "sint",
2324 schema::FieldCodec::VarUInt => "varuint",
2325 schema::FieldCodec::VarSInt => "varsint",
2326 schema::FieldCodec::FixedPoint(_) => "fixed-point",
2327 }
2328}
2329
2330fn value_name(value: FieldValue) -> &'static str {
2331 match value {
2332 FieldValue::Bool(_) => "bool",
2333 FieldValue::UInt(_) => "uint",
2334 FieldValue::SInt(_) => "sint",
2335 FieldValue::VarUInt(_) => "varuint",
2336 FieldValue::VarSInt(_) => "varsint",
2337 FieldValue::FixedPoint(_) => "fixed-point",
2338 }
2339}
2340
2341#[cfg(test)]
2342mod tests {
2343 use super::*;
2344 use schema::{ComponentDef, FieldCodec, FieldDef, FieldId, Schema};
2345
2346 fn schema_one_bool() -> Schema {
2347 let component = ComponentDef::new(ComponentId::new(1).unwrap())
2348 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2349 Schema::new(vec![component]).unwrap()
2350 }
2351
2352 fn schema_uint_threshold(threshold_q: u32) -> Schema {
2353 let field = FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::uint(8))
2354 .change(ChangePolicy::Threshold { threshold_q });
2355 let component = ComponentDef::new(ComponentId::new(1).unwrap()).field(field);
2356 Schema::new(vec![component]).unwrap()
2357 }
2358
2359 fn schema_two_components() -> Schema {
2360 let c1 = ComponentDef::new(ComponentId::new(1).unwrap())
2361 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2362 let c2 = ComponentDef::new(ComponentId::new(2).unwrap())
2363 .field(FieldDef::new(FieldId::new(1).unwrap(), FieldCodec::bool()));
2364 Schema::new(vec![c1, c2]).unwrap()
2365 }
2366
2367 fn baseline_snapshot() -> Snapshot {
2368 Snapshot {
2369 tick: SnapshotTick::new(10),
2370 entities: vec![EntitySnapshot {
2371 id: EntityId::new(1),
2372 components: vec![ComponentSnapshot {
2373 id: ComponentId::new(1).unwrap(),
2374 fields: vec![FieldValue::Bool(false)],
2375 }],
2376 }],
2377 }
2378 }
2379
2380 #[test]
2381 fn no_op_delta_is_empty() {
2382 let schema = schema_one_bool();
2383 let baseline = baseline_snapshot();
2384 let current = baseline.clone();
2385 let mut buf = [0u8; 128];
2386 let bytes = encode_delta_snapshot(
2387 &schema,
2388 SnapshotTick::new(11),
2389 baseline.tick,
2390 &baseline,
2391 ¤t,
2392 &CodecLimits::for_testing(),
2393 &mut buf,
2394 )
2395 .unwrap();
2396 let header =
2397 wire::PacketHeader::delta_snapshot(schema_hash(&schema), 11, baseline.tick.raw(), 0);
2398 let mut expected = [0u8; wire::HEADER_SIZE];
2399 encode_header(&header, &mut expected).unwrap();
2400 assert_eq!(&buf[..bytes], expected.as_slice());
2401 }
2402
2403 #[test]
2404 fn delta_roundtrip_single_update() {
2405 let schema = schema_one_bool();
2406 let baseline = baseline_snapshot();
2407 let current = Snapshot {
2408 tick: SnapshotTick::new(11),
2409 entities: vec![EntitySnapshot {
2410 id: EntityId::new(1),
2411 components: vec![ComponentSnapshot {
2412 id: ComponentId::new(1).unwrap(),
2413 fields: vec![FieldValue::Bool(true)],
2414 }],
2415 }],
2416 };
2417
2418 let mut buf = [0u8; 128];
2419 let bytes = encode_delta_snapshot(
2420 &schema,
2421 current.tick,
2422 baseline.tick,
2423 &baseline,
2424 ¤t,
2425 &CodecLimits::for_testing(),
2426 &mut buf,
2427 )
2428 .unwrap();
2429 let applied = apply_delta_snapshot(
2430 &schema,
2431 &baseline,
2432 &buf[..bytes],
2433 &wire::Limits::for_testing(),
2434 &CodecLimits::for_testing(),
2435 )
2436 .unwrap();
2437 assert_eq!(applied.entities, current.entities);
2438 }
2439
2440 #[test]
2441 fn delta_roundtrip_reuse_scratch() {
2442 let schema = schema_one_bool();
2443 let baseline = baseline_snapshot();
2444 let current_one = Snapshot {
2445 tick: SnapshotTick::new(11),
2446 entities: vec![EntitySnapshot {
2447 id: EntityId::new(1),
2448 components: vec![ComponentSnapshot {
2449 id: ComponentId::new(1).unwrap(),
2450 fields: vec![FieldValue::Bool(true)],
2451 }],
2452 }],
2453 };
2454 let current_two = Snapshot {
2455 tick: SnapshotTick::new(12),
2456 entities: vec![EntitySnapshot {
2457 id: EntityId::new(1),
2458 components: vec![ComponentSnapshot {
2459 id: ComponentId::new(1).unwrap(),
2460 fields: vec![FieldValue::Bool(false)],
2461 }],
2462 }],
2463 };
2464
2465 let mut scratch = CodecScratch::default();
2466 let mut buf_one = [0u8; 128];
2467 let mut buf_two = [0u8; 128];
2468
2469 let bytes_one = encode_delta_snapshot_with_scratch(
2470 &schema,
2471 current_one.tick,
2472 baseline.tick,
2473 &baseline,
2474 ¤t_one,
2475 &CodecLimits::for_testing(),
2476 &mut scratch,
2477 &mut buf_one,
2478 )
2479 .unwrap();
2480 let applied_one = apply_delta_snapshot(
2481 &schema,
2482 &baseline,
2483 &buf_one[..bytes_one],
2484 &wire::Limits::for_testing(),
2485 &CodecLimits::for_testing(),
2486 )
2487 .unwrap();
2488 assert_eq!(applied_one.entities, current_one.entities);
2489
2490 let bytes_two = encode_delta_snapshot_with_scratch(
2491 &schema,
2492 current_two.tick,
2493 baseline.tick,
2494 &baseline,
2495 ¤t_two,
2496 &CodecLimits::for_testing(),
2497 &mut scratch,
2498 &mut buf_two,
2499 )
2500 .unwrap();
2501 let applied_two = apply_delta_snapshot(
2502 &schema,
2503 &baseline,
2504 &buf_two[..bytes_two],
2505 &wire::Limits::for_testing(),
2506 &CodecLimits::for_testing(),
2507 )
2508 .unwrap();
2509 assert_eq!(applied_two.entities, current_two.entities);
2510 }
2511
2512 #[test]
2513 fn delta_rejects_both_update_encodings() {
2514 let schema = schema_one_bool();
2515 let baseline = baseline_snapshot();
2516 let update_body = [0u8; 1];
2517 let mut update_section = [0u8; 8];
2518 let update_len =
2519 wire::encode_section(SectionTag::EntityUpdate, &update_body, &mut update_section)
2520 .unwrap();
2521
2522 let mut sparse_section = [0u8; 8];
2523 let sparse_len = wire::encode_section(
2524 SectionTag::EntityUpdateSparse,
2525 &update_body,
2526 &mut sparse_section,
2527 )
2528 .unwrap();
2529
2530 let payload_len = (update_len + sparse_len) as u32;
2531 let header = wire::PacketHeader::delta_snapshot(
2532 schema_hash(&schema),
2533 baseline.tick.raw() + 1,
2534 baseline.tick.raw(),
2535 payload_len,
2536 );
2537 let mut buf = vec![0u8; wire::HEADER_SIZE + payload_len as usize];
2538 wire::encode_header(&header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2539 buf[wire::HEADER_SIZE..wire::HEADER_SIZE + update_len]
2540 .copy_from_slice(&update_section[..update_len]);
2541 buf[wire::HEADER_SIZE + update_len..wire::HEADER_SIZE + update_len + sparse_len]
2542 .copy_from_slice(&sparse_section[..sparse_len]);
2543
2544 let packet = wire::decode_packet(&buf, &wire::Limits::for_testing()).unwrap();
2545 let err = decode_delta_packet(&schema, &packet, &CodecLimits::for_testing()).unwrap_err();
2546 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2547
2548 let err = apply_delta_snapshot_from_packet(
2549 &schema,
2550 &baseline,
2551 &packet,
2552 &CodecLimits::for_testing(),
2553 )
2554 .unwrap_err();
2555 assert!(matches!(err, CodecError::DuplicateUpdateEncoding));
2556 }
2557
2558 #[test]
2559 fn delta_roundtrip_create_destroy_update() {
2560 let schema = schema_one_bool();
2561 let baseline = Snapshot {
2562 tick: SnapshotTick::new(10),
2563 entities: vec![
2564 EntitySnapshot {
2565 id: EntityId::new(1),
2566 components: vec![ComponentSnapshot {
2567 id: ComponentId::new(1).unwrap(),
2568 fields: vec![FieldValue::Bool(false)],
2569 }],
2570 },
2571 EntitySnapshot {
2572 id: EntityId::new(2),
2573 components: vec![ComponentSnapshot {
2574 id: ComponentId::new(1).unwrap(),
2575 fields: vec![FieldValue::Bool(false)],
2576 }],
2577 },
2578 ],
2579 };
2580 let current = Snapshot {
2581 tick: SnapshotTick::new(11),
2582 entities: vec![
2583 EntitySnapshot {
2584 id: EntityId::new(2),
2585 components: vec![ComponentSnapshot {
2586 id: ComponentId::new(1).unwrap(),
2587 fields: vec![FieldValue::Bool(true)],
2588 }],
2589 },
2590 EntitySnapshot {
2591 id: EntityId::new(3),
2592 components: vec![ComponentSnapshot {
2593 id: ComponentId::new(1).unwrap(),
2594 fields: vec![FieldValue::Bool(true)],
2595 }],
2596 },
2597 ],
2598 };
2599
2600 let mut buf = [0u8; 256];
2601 let bytes = encode_delta_snapshot(
2602 &schema,
2603 current.tick,
2604 baseline.tick,
2605 &baseline,
2606 ¤t,
2607 &CodecLimits::for_testing(),
2608 &mut buf,
2609 )
2610 .unwrap();
2611 let applied = apply_delta_snapshot(
2612 &schema,
2613 &baseline,
2614 &buf[..bytes],
2615 &wire::Limits::for_testing(),
2616 &CodecLimits::for_testing(),
2617 )
2618 .unwrap();
2619 assert_eq!(applied.entities, current.entities);
2620 }
2621
2622 #[test]
2623 fn delta_session_header_matches_payload() {
2624 let schema = schema_one_bool();
2625 let baseline = baseline_snapshot();
2626 let current = Snapshot {
2627 tick: SnapshotTick::new(11),
2628 entities: vec![EntitySnapshot {
2629 id: EntityId::new(1),
2630 components: vec![ComponentSnapshot {
2631 id: ComponentId::new(1).unwrap(),
2632 fields: vec![FieldValue::Bool(true)],
2633 }],
2634 }],
2635 };
2636
2637 let mut full_buf = [0u8; 128];
2638 let full_bytes = encode_delta_snapshot_for_client(
2639 &schema,
2640 current.tick,
2641 baseline.tick,
2642 &baseline,
2643 ¤t,
2644 &CodecLimits::for_testing(),
2645 &mut full_buf,
2646 )
2647 .unwrap();
2648 let full_payload = &full_buf[wire::HEADER_SIZE..full_bytes];
2649
2650 let mut session_buf = [0u8; 128];
2651 let mut last_tick = baseline.tick;
2652 let session_bytes = encode_delta_snapshot_for_client_session_with_scratch(
2653 &schema,
2654 current.tick,
2655 baseline.tick,
2656 &baseline,
2657 ¤t,
2658 &CodecLimits::for_testing(),
2659 &mut CodecScratch::default(),
2660 &mut last_tick,
2661 &mut session_buf,
2662 )
2663 .unwrap();
2664
2665 let session_header =
2666 wire::decode_session_header(&session_buf[..session_bytes], baseline.tick.raw())
2667 .unwrap();
2668 let payload_start = session_header.header_len;
2669 let payload_end = payload_start + session_header.payload_len as usize;
2670 let session_payload = &session_buf[payload_start..payload_end];
2671 assert_eq!(full_payload, session_payload);
2672 }
2673
2674 #[test]
2675 fn delta_roundtrip_single_component_change() {
2676 let schema = schema_two_components();
2677 let baseline = Snapshot {
2678 tick: SnapshotTick::new(10),
2679 entities: vec![EntitySnapshot {
2680 id: EntityId::new(1),
2681 components: vec![
2682 ComponentSnapshot {
2683 id: ComponentId::new(1).unwrap(),
2684 fields: vec![FieldValue::Bool(false)],
2685 },
2686 ComponentSnapshot {
2687 id: ComponentId::new(2).unwrap(),
2688 fields: vec![FieldValue::Bool(false)],
2689 },
2690 ],
2691 }],
2692 };
2693 let current = Snapshot {
2694 tick: SnapshotTick::new(11),
2695 entities: vec![EntitySnapshot {
2696 id: EntityId::new(1),
2697 components: vec![
2698 ComponentSnapshot {
2699 id: ComponentId::new(1).unwrap(),
2700 fields: vec![FieldValue::Bool(true)],
2701 },
2702 ComponentSnapshot {
2703 id: ComponentId::new(2).unwrap(),
2704 fields: vec![FieldValue::Bool(false)],
2705 },
2706 ],
2707 }],
2708 };
2709
2710 let mut buf = [0u8; 256];
2711 let bytes = encode_delta_snapshot(
2712 &schema,
2713 current.tick,
2714 baseline.tick,
2715 &baseline,
2716 ¤t,
2717 &CodecLimits::for_testing(),
2718 &mut buf,
2719 )
2720 .unwrap();
2721 let applied = apply_delta_snapshot(
2722 &schema,
2723 &baseline,
2724 &buf[..bytes],
2725 &wire::Limits::for_testing(),
2726 &CodecLimits::for_testing(),
2727 )
2728 .unwrap();
2729 assert_eq!(applied.entities, current.entities);
2730 }
2731
2732 #[test]
2733 fn baseline_tick_mismatch_is_error() {
2734 let schema = schema_one_bool();
2735 let baseline = baseline_snapshot();
2736 let current = baseline.clone();
2737 let mut buf = [0u8; 128];
2738 let bytes = encode_delta_snapshot(
2739 &schema,
2740 SnapshotTick::new(11),
2741 baseline.tick,
2742 &baseline,
2743 ¤t,
2744 &CodecLimits::for_testing(),
2745 &mut buf,
2746 )
2747 .unwrap();
2748 let mut packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
2749 packet.header.baseline_tick = 999;
2750 wire::encode_header(&packet.header, &mut buf[..wire::HEADER_SIZE]).unwrap();
2751 let err = apply_delta_snapshot(
2752 &schema,
2753 &baseline,
2754 &buf[..bytes],
2755 &wire::Limits::for_testing(),
2756 &CodecLimits::for_testing(),
2757 )
2758 .unwrap_err();
2759 assert!(matches!(err, CodecError::BaselineTickMismatch { .. }));
2760 }
2761
2762 #[test]
2763 fn threshold_suppresses_small_change() {
2764 let schema = schema_uint_threshold(5);
2765 let baseline = Snapshot {
2766 tick: SnapshotTick::new(10),
2767 entities: vec![EntitySnapshot {
2768 id: EntityId::new(1),
2769 components: vec![ComponentSnapshot {
2770 id: ComponentId::new(1).unwrap(),
2771 fields: vec![FieldValue::UInt(10)],
2772 }],
2773 }],
2774 };
2775 let current = Snapshot {
2776 tick: SnapshotTick::new(11),
2777 entities: vec![EntitySnapshot {
2778 id: EntityId::new(1),
2779 components: vec![ComponentSnapshot {
2780 id: ComponentId::new(1).unwrap(),
2781 fields: vec![FieldValue::UInt(12)],
2782 }],
2783 }],
2784 };
2785
2786 let mut buf = [0u8; 128];
2787 let bytes = encode_delta_snapshot(
2788 &schema,
2789 current.tick,
2790 baseline.tick,
2791 &baseline,
2792 ¤t,
2793 &CodecLimits::for_testing(),
2794 &mut buf,
2795 )
2796 .unwrap();
2797
2798 let packet = wire::decode_packet(&buf[..bytes], &wire::Limits::for_testing()).unwrap();
2799 assert_eq!(packet.sections.len(), 0);
2800
2801 let applied = apply_delta_snapshot(
2802 &schema,
2803 &baseline,
2804 &buf[..bytes],
2805 &wire::Limits::for_testing(),
2806 &CodecLimits::for_testing(),
2807 )
2808 .unwrap();
2809 assert_eq!(applied.entities, baseline.entities);
2810 }
2811}