1use std::collections::{HashMap, HashSet};
10
11use fsqlite_error::{FrankenError, Result};
12use fsqlite_types::ObjectId;
13use tracing::{debug, error, info, warn};
14
15use crate::decode_proofs::{DecodeAuditEntry, EcsDecodeProof};
16use crate::replication_sender::{
17 CHANGESET_HEADER_SIZE, ChangesetHeader, ChangesetId, DEFAULT_RPC_MESSAGE_CAP_BYTES, PageEntry,
18 ReplicationPacket, ReplicationWireVersion,
19};
20use crate::source_block_partition::K_MAX;
21
22const BEAD_ID: &str = "bd-1hi.14";
23const DEFAULT_MAX_INFLIGHT_DECODERS: usize = 128;
24const DEFAULT_MAX_BUFFERED_SYMBOL_BYTES: usize = 64 * 1024 * 1024;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ReceiverState {
33 Listening,
35 Collecting,
37 Decoding,
39 Applying,
41 Complete,
43}
44
45#[derive(Debug)]
47pub struct DecoderState {
48 pub k_source: u32,
50 pub symbol_size: u32,
52 pub seed: u64,
54 symbols: HashMap<u32, Vec<u8>>,
56 received_isis: HashSet<u32>,
58}
59
60impl DecoderState {
61 fn new(k_source: u32, symbol_size: u32, seed: u64) -> Self {
63 Self {
64 k_source,
65 symbol_size,
66 seed,
67 symbols: HashMap::with_capacity(k_source as usize),
68 received_isis: HashSet::with_capacity(k_source as usize),
69 }
70 }
71
72 #[must_use]
74 pub fn received_count(&self) -> u32 {
75 u32::try_from(self.received_isis.len()).unwrap_or(u32::MAX)
76 }
77
78 #[must_use]
80 pub fn ready_to_decode(&self) -> bool {
81 self.received_count() >= self.k_source
82 }
83
84 #[must_use]
86 pub fn source_symbol_count(&self) -> u32 {
87 let count = self
88 .symbols
89 .keys()
90 .filter(|&&isi| isi < self.k_source)
91 .count();
92 u32::try_from(count).unwrap_or(u32::MAX)
93 }
94
95 #[must_use]
97 pub fn has_repair_symbols(&self) -> bool {
98 self.symbols.keys().any(|&isi| isi >= self.k_source)
99 }
100
101 #[must_use]
103 pub fn sorted_isis(&self) -> Vec<u32> {
104 let mut isis: Vec<u32> = self.symbols.keys().copied().collect();
105 isis.sort_unstable();
106 isis.dedup();
107 isis
108 }
109
110 fn add_symbol(&mut self, isi: u32, data: Vec<u8>) -> bool {
112 if self.received_isis.contains(&isi) {
113 return false;
114 }
115 self.received_isis.insert(isi);
116 self.symbols.insert(isi, data);
117 true
118 }
119
120 #[must_use]
121 fn has_symbol(&self, isi: u32) -> bool {
122 self.received_isis.contains(&isi)
123 }
124
125 #[must_use]
126 fn buffered_bytes(&self) -> usize {
127 self.symbols.values().map(Vec::len).sum()
128 }
129
130 fn try_decode(&self) -> Option<Vec<u8>> {
139 if !self.ready_to_decode() {
140 return None;
141 }
142
143 let source_count = usize::try_from(self.source_symbol_count()).unwrap_or(usize::MAX);
145
146 let k = self.k_source as usize;
147 let t = self.symbol_size as usize;
148
149 if source_count >= k {
150 let padded_len = k * t;
152 let mut padded = vec![0_u8; padded_len];
153 for isi in 0..self.k_source {
154 if let Some(data) = self.symbols.get(&isi) {
155 let start = isi as usize * t;
156 let copy_len = data.len().min(t);
157 padded[start..start + copy_len].copy_from_slice(&data[..copy_len]);
158 }
159 }
160 Some(padded)
161 } else {
162 warn!(
165 bead_id = BEAD_ID,
166 source_count,
167 k_source = self.k_source,
168 total_received = self.received_count(),
169 "decode requires repair symbols (production uses RaptorQ decoder)"
170 );
171 None
172 }
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct DecodedPage {
179 pub page_number: u32,
181 pub page_data: Vec<u8>,
183}
184
185#[derive(Debug)]
187pub struct DecodeResult {
188 pub changeset_id: ChangesetId,
190 pub pages: Vec<DecodedPage>,
192 pub symbols_used: u32,
194 pub decode_proof: Option<EcsDecodeProof>,
196}
197
198#[derive(Debug, Clone, Copy)]
199struct DecodeProofBuildInput<'a> {
200 changeset_id: ChangesetId,
201 k_source: u32,
202 symbol_size: u32,
203 seed: u64,
204 received_isis: &'a [u32],
205 decode_success: bool,
206 intermediate_rank: Option<u32>,
207 symbols_used: u32,
208}
209
210#[derive(Debug)]
212pub struct ReplicationReceiver {
213 config: ReceiverConfig,
214 state: ReceiverState,
215 decoders: HashMap<ChangesetId, DecoderState>,
217 received_counts: HashMap<ChangesetId, u32>,
219 buffered_symbol_bytes: usize,
221 pending_results: Vec<DecodeResult>,
223 applied_count: u64,
225 decode_audit: Vec<DecodeAuditEntry>,
227 decode_audit_seq: u64,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct DecodeProofEmissionPolicy {
234 pub emit_on_decode_failure: bool,
236 pub emit_on_repair_success: bool,
238}
239
240impl DecodeProofEmissionPolicy {
241 #[must_use]
243 pub const fn disabled() -> Self {
244 Self {
245 emit_on_decode_failure: false,
246 emit_on_repair_success: false,
247 }
248 }
249
250 #[must_use]
252 pub const fn durability_critical() -> Self {
253 Self {
254 emit_on_decode_failure: true,
255 emit_on_repair_success: true,
256 }
257 }
258}
259
260impl Default for DecodeProofEmissionPolicy {
261 fn default() -> Self {
262 Self::disabled()
263 }
264}
265
266#[derive(Debug, Clone)]
268pub struct ReceiverConfig {
269 pub auth_key: Option<[u8; 32]>,
271 pub decode_proof_policy: DecodeProofEmissionPolicy,
273 pub max_inflight_decoders: usize,
275 pub max_buffered_symbol_bytes: usize,
277}
278
279impl ReceiverConfig {
280 #[must_use]
282 pub const fn with_auth_key(auth_key: [u8; 32]) -> Self {
283 Self {
284 auth_key: Some(auth_key),
285 decode_proof_policy: DecodeProofEmissionPolicy::disabled(),
286 max_inflight_decoders: DEFAULT_MAX_INFLIGHT_DECODERS,
287 max_buffered_symbol_bytes: DEFAULT_MAX_BUFFERED_SYMBOL_BYTES,
288 }
289 }
290}
291
292impl Default for ReceiverConfig {
293 fn default() -> Self {
294 Self {
295 auth_key: None,
296 decode_proof_policy: DecodeProofEmissionPolicy::disabled(),
297 max_inflight_decoders: DEFAULT_MAX_INFLIGHT_DECODERS,
298 max_buffered_symbol_bytes: DEFAULT_MAX_BUFFERED_SYMBOL_BYTES,
299 }
300 }
301}
302
303impl ReplicationReceiver {
304 fn remove_decoder(&mut self, changeset_id: ChangesetId) {
305 if let Some(decoder) = self.decoders.remove(&changeset_id) {
306 self.buffered_symbol_bytes = self
307 .buffered_symbol_bytes
308 .saturating_sub(decoder.buffered_bytes());
309 }
310 self.received_counts.remove(&changeset_id);
311 }
312
313 #[must_use]
315 pub fn with_config(config: ReceiverConfig) -> Self {
316 Self {
317 config,
318 state: ReceiverState::Listening,
319 decoders: HashMap::new(),
320 received_counts: HashMap::new(),
321 buffered_symbol_bytes: 0,
322 pending_results: Vec::new(),
323 applied_count: 0,
324 decode_audit: Vec::new(),
325 decode_audit_seq: 0,
326 }
327 }
328
329 #[must_use]
331 pub fn new() -> Self {
332 Self::with_config(ReceiverConfig::default())
333 }
334
335 #[must_use]
337 pub const fn state(&self) -> ReceiverState {
338 self.state
339 }
340
341 #[must_use]
343 pub const fn applied_count(&self) -> u64 {
344 self.applied_count
345 }
346
347 #[must_use]
349 pub fn active_decoders(&self) -> usize {
350 self.decoders.len()
351 }
352
353 #[must_use]
355 pub fn decode_audit_entries(&self) -> &[DecodeAuditEntry] {
356 &self.decode_audit
357 }
358
359 pub fn take_decode_audit_entries(&mut self) -> Vec<DecodeAuditEntry> {
361 std::mem::take(&mut self.decode_audit)
362 }
363
364 pub fn process_packet(&mut self, packet_bytes: &[u8]) -> Result<PacketResult> {
374 if packet_bytes.len() > DEFAULT_RPC_MESSAGE_CAP_BYTES {
375 return Err(FrankenError::TooBig);
376 }
377 let packet = ReplicationPacket::from_bytes(packet_bytes)?;
378 if !packet.verify_integrity(self.config.auth_key.as_ref()) {
379 warn!(
380 bead_id = BEAD_ID,
381 wire_version = ?packet.wire_version,
382 has_auth = packet.auth_tag.is_some(),
383 "packet integrity/auth verification failed; treating as erasure"
384 );
385 return Ok(PacketResult::Erasure);
386 }
387 self.process_parsed_packet(&packet)
388 }
389
390 #[allow(clippy::too_many_lines)]
396 pub fn process_parsed_packet(&mut self, packet: &ReplicationPacket) -> Result<PacketResult> {
397 if packet.sbn != 0 {
399 error!(
400 bead_id = BEAD_ID,
401 sbn = packet.sbn,
402 "V1 rule: SBN must be 0"
403 );
404 return Err(FrankenError::Internal(format!(
405 "V1 replication: source_block must be 0, got {}",
406 packet.sbn
407 )));
408 }
409
410 if packet.k_source == 0 || packet.k_source > K_MAX {
412 error!(
413 bead_id = BEAD_ID,
414 k_source = packet.k_source,
415 k_max = K_MAX,
416 "K_source out of valid range"
417 );
418 return Err(FrankenError::OutOfRange {
419 what: "k_source".to_owned(),
420 value: packet.k_source.to_string(),
421 });
422 }
423
424 if usize::from(packet.symbol_size_t) != packet.symbol_data.len() {
426 return Err(FrankenError::DatabaseCorrupt {
427 detail: format!(
428 "symbol_size_t mismatch: header={}, payload={}",
429 packet.symbol_size_t,
430 packet.symbol_data.len()
431 ),
432 });
433 }
434 let symbol_size = u32::from(packet.symbol_size_t);
435 if symbol_size == 0 {
436 return Err(FrankenError::OutOfRange {
437 what: "symbol_size".to_owned(),
438 value: "0".to_owned(),
439 });
440 }
441
442 if self.state == ReceiverState::Listening {
444 self.state = ReceiverState::Collecting;
445 info!(bead_id = BEAD_ID, "first packet received, now COLLECTING");
446 }
447
448 let changeset_id = packet.changeset_id;
449 let mut created_decoder = false;
450
451 if let Some(decoder) = self.decoders.get(&changeset_id) {
453 if decoder.k_source != packet.k_source {
455 error!(
456 bead_id = BEAD_ID,
457 expected_k = decoder.k_source,
458 got_k = packet.k_source,
459 "K_source mismatch for existing changeset"
460 );
461 return Err(FrankenError::DatabaseCorrupt {
462 detail: format!(
463 "K_source mismatch: expected {}, got {}",
464 decoder.k_source, packet.k_source
465 ),
466 });
467 }
468 if decoder.symbol_size != symbol_size {
469 error!(
470 bead_id = BEAD_ID,
471 expected_t = decoder.symbol_size,
472 got_t = symbol_size,
473 "symbol_size mismatch for existing changeset"
474 );
475 return Err(FrankenError::DatabaseCorrupt {
476 detail: format!(
477 "symbol_size mismatch: expected {}, got {}",
478 decoder.symbol_size, symbol_size
479 ),
480 });
481 }
482 if packet.wire_version == ReplicationWireVersion::FramedV2
483 && decoder.seed != packet.seed
484 {
485 return Err(FrankenError::DatabaseCorrupt {
486 detail: format!(
487 "seed mismatch: expected {}, got {}",
488 decoder.seed, packet.seed
489 ),
490 });
491 }
492 } else {
493 if self.decoders.len() >= self.config.max_inflight_decoders {
494 warn!(
495 bead_id = BEAD_ID,
496 active_decoders = self.decoders.len(),
497 max_inflight_decoders = self.config.max_inflight_decoders,
498 "decoder cap reached; rejecting new changeset"
499 );
500 return Err(FrankenError::Busy);
501 }
502 let expected_seed =
504 crate::replication_sender::derive_seed_from_changeset_id(&changeset_id);
505 if packet.wire_version == ReplicationWireVersion::FramedV2
506 && packet.seed != expected_seed
507 {
508 return Err(FrankenError::DatabaseCorrupt {
509 detail: format!(
510 "seed does not match deterministic derivation for changeset: expected {expected_seed}, got {}",
511 packet.seed
512 ),
513 });
514 }
515 let seed = expected_seed;
516 debug!(
517 bead_id = BEAD_ID,
518 k_source = packet.k_source,
519 symbol_size,
520 seed,
521 "created decoder for new changeset"
522 );
523 self.decoders.insert(
524 changeset_id,
525 DecoderState::new(packet.k_source, symbol_size, seed),
526 );
527 self.received_counts.insert(changeset_id, 0);
528 created_decoder = true;
529 }
530
531 if let Some(decoder) = self.decoders.get(&changeset_id) {
533 if !decoder.has_symbol(packet.esi) {
534 let next_total = self
535 .buffered_symbol_bytes
536 .saturating_add(packet.symbol_data.len());
537 if next_total > self.config.max_buffered_symbol_bytes {
538 warn!(
539 bead_id = BEAD_ID,
540 buffered_symbol_bytes = self.buffered_symbol_bytes,
541 incoming_symbol_bytes = packet.symbol_data.len(),
542 max_buffered_symbol_bytes = self.config.max_buffered_symbol_bytes,
543 "buffered symbol budget exceeded"
544 );
545 if created_decoder {
546 self.remove_decoder(changeset_id);
547 self.state = if self.decoders.is_empty() {
548 ReceiverState::Listening
549 } else {
550 ReceiverState::Collecting
551 };
552 }
553 return Err(FrankenError::TooBig);
554 }
555 }
556 }
557
558 let (
560 ready_to_decode,
561 k_source_ctx,
562 symbol_size_ctx,
563 seed_ctx,
564 received_isis_ctx,
565 received_count_ctx,
566 source_count_ctx,
567 has_repair_ctx,
568 decoded_padded,
569 ) = {
570 let decoder = self.decoders.get_mut(&changeset_id).expect("just inserted");
571 let accepted = decoder.add_symbol(packet.esi, packet.symbol_data.clone());
572
573 if !accepted {
574 debug!(
575 bead_id = BEAD_ID,
576 isi = packet.esi,
577 "duplicate ISI, symbol ignored"
578 );
579 return Ok(PacketResult::Duplicate);
580 }
581
582 self.buffered_symbol_bytes = self
583 .buffered_symbol_bytes
584 .saturating_add(packet.symbol_data.len());
585 let count = self.received_counts.entry(changeset_id).or_insert(0);
586 *count += 1;
587 debug!(
588 bead_id = BEAD_ID,
589 isi = packet.esi,
590 received = *count,
591 k_source = packet.k_source,
592 "symbol accepted"
593 );
594
595 let ready = decoder.ready_to_decode();
596 let padded = if ready { decoder.try_decode() } else { None };
597 (
598 ready,
599 decoder.k_source,
600 decoder.symbol_size,
601 decoder.seed,
602 decoder.sorted_isis(),
603 decoder.received_count(),
604 decoder.source_symbol_count(),
605 decoder.has_repair_symbols(),
606 padded,
607 )
608 };
609
610 if ready_to_decode {
611 info!(
612 bead_id = BEAD_ID,
613 received = received_count_ctx,
614 k_source = k_source_ctx,
615 "attempting decode"
616 );
617 self.state = ReceiverState::Decoding;
618
619 if let Some(padded_bytes) = decoded_padded {
620 let success_proof =
621 if self.config.decode_proof_policy.emit_on_repair_success && has_repair_ctx {
622 Some(Self::build_decode_proof(DecodeProofBuildInput {
623 changeset_id,
624 k_source: k_source_ctx,
625 symbol_size: symbol_size_ctx,
626 seed: seed_ctx,
627 received_isis: &received_isis_ctx,
628 decode_success: true,
629 intermediate_rank: Some(k_source_ctx),
630 symbols_used: received_count_ctx,
631 }))
632 } else {
633 None
634 };
635
636 match self.parse_and_validate_changeset(changeset_id, &padded_bytes) {
638 Ok(mut result) => {
639 let n_pages = result.pages.len();
640 if let Some(proof) = success_proof {
641 self.record_decode_proof(proof.clone());
642 result.decode_proof = Some(proof);
643 }
644 self.pending_results.push(result);
645 self.state = ReceiverState::Applying;
646 info!(
647 bead_id = BEAD_ID,
648 n_pages, "decode succeeded, ready to apply"
649 );
650 self.remove_decoder(changeset_id);
652 return Ok(PacketResult::DecodeReady);
653 }
654 Err(e) => {
655 error!(
656 bead_id = BEAD_ID,
657 error = %e,
658 "changeset validation failed after decode"
659 );
660 self.remove_decoder(changeset_id);
662 self.state = if self.decoders.is_empty() {
663 ReceiverState::Listening
664 } else {
665 ReceiverState::Collecting
666 };
667 return Err(e);
668 }
669 }
670 }
671
672 if self.config.decode_proof_policy.emit_on_decode_failure {
673 let failure_proof = Self::build_decode_proof(DecodeProofBuildInput {
674 changeset_id,
675 k_source: k_source_ctx,
676 symbol_size: symbol_size_ctx,
677 seed: seed_ctx,
678 received_isis: &received_isis_ctx,
679 decode_success: false,
680 intermediate_rank: Some(source_count_ctx),
681 symbols_used: received_count_ctx,
682 });
683 self.record_decode_proof(failure_proof);
684 }
685
686 warn!(
688 bead_id = BEAD_ID,
689 source_count = source_count_ctx,
690 k_source = k_source_ctx,
691 "decode failed at K_source, continuing collection"
692 );
693 self.state = ReceiverState::Collecting;
694 return Ok(PacketResult::NeedMore);
695 }
696
697 Ok(PacketResult::Accepted)
698 }
699
700 #[allow(clippy::too_many_lines)]
702 fn parse_and_validate_changeset(
703 &self,
704 changeset_id: ChangesetId,
705 padded_bytes: &[u8],
706 ) -> Result<DecodeResult> {
707 if padded_bytes.len() < CHANGESET_HEADER_SIZE {
708 return Err(FrankenError::DatabaseCorrupt {
709 detail: format!(
710 "decoded bytes too short for header: {} < {CHANGESET_HEADER_SIZE}",
711 padded_bytes.len()
712 ),
713 });
714 }
715
716 let header_bytes: [u8; CHANGESET_HEADER_SIZE] = padded_bytes[..CHANGESET_HEADER_SIZE]
718 .try_into()
719 .expect("checked length");
720 let header = ChangesetHeader::from_bytes(&header_bytes)?;
721
722 let total_len =
724 usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
725 what: "total_len".to_owned(),
726 value: header.total_len.to_string(),
727 })?;
728 if total_len < CHANGESET_HEADER_SIZE {
729 return Err(FrankenError::DatabaseCorrupt {
730 detail: format!(
731 "total_len ({total_len}) smaller than changeset header size ({CHANGESET_HEADER_SIZE})"
732 ),
733 });
734 }
735 if total_len > padded_bytes.len() {
736 return Err(FrankenError::DatabaseCorrupt {
737 detail: format!(
738 "total_len ({total_len}) exceeds decoded bytes ({})",
739 padded_bytes.len()
740 ),
741 });
742 }
743 let changeset_bytes = &padded_bytes[..total_len];
744
745 let page_size =
747 usize::try_from(header.page_size).map_err(|_| FrankenError::OutOfRange {
748 what: "page_size".to_owned(),
749 value: header.page_size.to_string(),
750 })?;
751 let entry_size = 4_usize
752 .checked_add(8)
753 .and_then(|value| value.checked_add(page_size))
754 .ok_or_else(|| FrankenError::OutOfRange {
755 what: "entry_size".to_owned(),
756 value: format!("page_size={}", header.page_size),
757 })?; let n_pages = usize::try_from(header.n_pages).map_err(|_| FrankenError::OutOfRange {
759 what: "n_pages".to_owned(),
760 value: header.n_pages.to_string(),
761 })?;
762 let data_start = CHANGESET_HEADER_SIZE;
763 let data_bytes = &changeset_bytes[data_start..];
764 let required_bytes =
765 entry_size
766 .checked_mul(n_pages)
767 .ok_or_else(|| FrankenError::OutOfRange {
768 what: "changeset payload size".to_owned(),
769 value: format!("entry_size={entry_size}, n_pages={}", header.n_pages),
770 })?;
771
772 if data_bytes.len() < required_bytes {
773 return Err(FrankenError::DatabaseCorrupt {
774 detail: format!(
775 "insufficient data for {} pages: {} < {}",
776 header.n_pages,
777 data_bytes.len(),
778 required_bytes,
779 ),
780 });
781 }
782
783 let mut pages = Vec::with_capacity(n_pages);
784 let decoder_state_symbols = self
785 .decoders
786 .get(&changeset_id)
787 .map_or(0, DecoderState::received_count);
788
789 for i in 0..n_pages {
790 let offset = i
791 .checked_mul(entry_size)
792 .ok_or_else(|| FrankenError::OutOfRange {
793 what: "page entry offset".to_owned(),
794 value: format!("index={i}, entry_size={entry_size}"),
795 })?;
796 let page_number =
797 u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
798 let page_xxh3 = u64::from_le_bytes(
799 data_bytes[offset + 4..offset + 12]
800 .try_into()
801 .expect("8 bytes"),
802 );
803 let page_data = data_bytes[offset + 12..offset + 12 + page_size].to_vec();
804
805 let computed_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_data);
807 if computed_xxh3 != page_xxh3 {
808 error!(
809 bead_id = BEAD_ID,
810 page_number,
811 expected_xxh3 = page_xxh3,
812 computed_xxh3,
813 "page xxh3 validation failed"
814 );
815 return Err(FrankenError::DatabaseCorrupt {
816 detail: format!(
817 "page {page_number} xxh3 mismatch: expected {page_xxh3:#x}, got {computed_xxh3:#x}"
818 ),
819 });
820 }
821
822 pages.push(DecodedPage {
823 page_number,
824 page_data,
825 });
826 }
827
828 debug_assert!(
830 pages
831 .windows(2)
832 .all(|w| w[0].page_number <= w[1].page_number)
833 );
834
835 Ok(DecodeResult {
836 changeset_id,
837 pages,
838 symbols_used: decoder_state_symbols,
839 decode_proof: None,
840 })
841 }
842
843 fn build_decode_proof(input: DecodeProofBuildInput<'_>) -> EcsDecodeProof {
844 let object_id = ObjectId::from_bytes(*input.changeset_id.as_bytes());
845 let timing_ns =
846 deterministic_timing_ns(input.k_source, input.symbol_size, input.symbols_used);
847 EcsDecodeProof::from_esis(
848 object_id,
849 input.k_source,
850 input.received_isis,
851 input.decode_success,
852 input.intermediate_rank,
853 timing_ns,
854 input.seed,
855 )
856 .with_changeset_id(*input.changeset_id.as_bytes())
857 }
858
859 fn record_decode_proof(&mut self, proof: EcsDecodeProof) {
860 self.decode_audit_seq = self.decode_audit_seq.saturating_add(1);
861 self.decode_audit.push(DecodeAuditEntry {
862 proof,
863 seq: self.decode_audit_seq,
864 lab_mode: false,
865 });
866 }
867
868 pub fn apply_pending(&mut self) -> Result<Vec<DecodeResult>> {
877 if self.state != ReceiverState::Applying {
878 return Err(FrankenError::Internal(format!(
879 "receiver must be APPLYING to apply, current state: {:?}",
880 self.state
881 )));
882 }
883
884 let results = std::mem::take(&mut self.pending_results);
885 let n = results.len();
886 self.applied_count += u64::try_from(n).unwrap_or(u64::MAX);
887
888 info!(
889 bead_id = BEAD_ID,
890 applied = n,
891 total_applied = self.applied_count,
892 "applied pending changesets"
893 );
894
895 self.state = ReceiverState::Complete;
897 Ok(results)
898 }
899
900 pub fn reset_to_listening(&mut self) -> Result<()> {
906 if self.state != ReceiverState::Complete {
907 return Err(FrankenError::Internal(format!(
908 "receiver must be COMPLETE to reset, current state: {:?}",
909 self.state
910 )));
911 }
912 self.state = ReceiverState::Listening;
913 debug!(bead_id = BEAD_ID, "receiver reset to LISTENING");
914 Ok(())
915 }
916
917 pub fn force_reset(&mut self) {
919 self.decoders.clear();
920 self.received_counts.clear();
921 self.buffered_symbol_bytes = 0;
922 self.pending_results.clear();
923 self.state = ReceiverState::Listening;
924 warn!(bead_id = BEAD_ID, "receiver force-reset to LISTENING");
925 }
926}
927
928impl Default for ReplicationReceiver {
929 fn default() -> Self {
930 Self::new()
931 }
932}
933
934fn deterministic_timing_ns(k_source: u32, symbol_size: u32, symbols_used: u32) -> u64 {
935 let mut material = [0_u8; 12];
936 material[..4].copy_from_slice(&k_source.to_le_bytes());
937 material[4..8].copy_from_slice(&symbol_size.to_le_bytes());
938 material[8..12].copy_from_slice(&symbols_used.to_le_bytes());
939 xxhash_rust::xxh3::xxh3_64(&material)
940}
941
942#[derive(Debug, Clone, Copy, PartialEq, Eq)]
944pub enum PacketResult {
945 Accepted,
947 Erasure,
949 Duplicate,
951 DecodeReady,
953 NeedMore,
955}
956
957pub fn parse_changeset_pages(changeset_bytes: &[u8]) -> Result<(ChangesetHeader, Vec<PageEntry>)> {
967 if changeset_bytes.len() < CHANGESET_HEADER_SIZE {
968 return Err(FrankenError::DatabaseCorrupt {
969 detail: format!(
970 "changeset too short: {} < {CHANGESET_HEADER_SIZE}",
971 changeset_bytes.len()
972 ),
973 });
974 }
975
976 let header_bytes: [u8; CHANGESET_HEADER_SIZE] = changeset_bytes[..CHANGESET_HEADER_SIZE]
977 .try_into()
978 .expect("checked length");
979 let header = ChangesetHeader::from_bytes(&header_bytes)?;
980
981 let total_len = usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
982 what: "total_len".to_owned(),
983 value: header.total_len.to_string(),
984 })?;
985 if total_len < CHANGESET_HEADER_SIZE {
986 return Err(FrankenError::DatabaseCorrupt {
987 detail: format!(
988 "total_len ({total_len}) smaller than changeset header size ({CHANGESET_HEADER_SIZE})"
989 ),
990 });
991 }
992 if total_len > changeset_bytes.len() {
993 return Err(FrankenError::DatabaseCorrupt {
994 detail: format!(
995 "total_len ({total_len}) exceeds available bytes ({})",
996 changeset_bytes.len()
997 ),
998 });
999 }
1000 let changeset_bytes = &changeset_bytes[..total_len];
1001
1002 let page_size = usize::try_from(header.page_size).map_err(|_| FrankenError::OutOfRange {
1003 what: "page_size".to_owned(),
1004 value: header.page_size.to_string(),
1005 })?;
1006 let entry_size = 4_usize
1007 .checked_add(8)
1008 .and_then(|value| value.checked_add(page_size))
1009 .ok_or_else(|| FrankenError::OutOfRange {
1010 what: "entry_size".to_owned(),
1011 value: format!("page_size={}", header.page_size),
1012 })?;
1013 let n_pages = usize::try_from(header.n_pages).map_err(|_| FrankenError::OutOfRange {
1014 what: "n_pages".to_owned(),
1015 value: header.n_pages.to_string(),
1016 })?;
1017 let data_start = CHANGESET_HEADER_SIZE;
1018 let data_bytes = &changeset_bytes[data_start..];
1019 let required_bytes =
1020 entry_size
1021 .checked_mul(n_pages)
1022 .ok_or_else(|| FrankenError::OutOfRange {
1023 what: "changeset payload size".to_owned(),
1024 value: format!("entry_size={entry_size}, n_pages={}", header.n_pages),
1025 })?;
1026 if data_bytes.len() < required_bytes {
1027 return Err(FrankenError::DatabaseCorrupt {
1028 detail: format!(
1029 "insufficient data for {} pages: {} < {}",
1030 header.n_pages,
1031 data_bytes.len(),
1032 required_bytes
1033 ),
1034 });
1035 }
1036
1037 let mut pages = Vec::with_capacity(n_pages);
1038 for i in 0..n_pages {
1039 let offset = i
1040 .checked_mul(entry_size)
1041 .ok_or_else(|| FrankenError::OutOfRange {
1042 what: "page entry offset".to_owned(),
1043 value: format!("index={i}, entry_size={entry_size}"),
1044 })?;
1045 let page_number =
1046 u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
1047 let page_xxh3 = u64::from_le_bytes(
1048 data_bytes[offset + 4..offset + 12]
1049 .try_into()
1050 .expect("8 bytes"),
1051 );
1052 let page_bytes = data_bytes[offset + 12..offset + 12 + page_size].to_vec();
1053
1054 pages.push(PageEntry {
1055 page_number,
1056 page_xxh3,
1057 page_bytes,
1058 });
1059 }
1060
1061 Ok((header, pages))
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use asupersync::runtime::RuntimeBuilder;
1067 use asupersync::security::authenticated::AuthenticatedSymbol;
1068 use asupersync::security::tag::AuthenticationTag;
1069 use asupersync::transport::{
1070 SimNetwork, SimTransportConfig, SymbolSinkExt as _, SymbolStreamExt as _,
1071 };
1072 use asupersync::types::{Symbol, SymbolId, SymbolKind};
1073 use std::collections::HashSet;
1074
1075 use super::*;
1076 use crate::replication_sender::{
1077 CHANGESET_HEADER_SIZE, ChangesetId, PageEntry, REPLICATION_HEADER_SIZE, ReplicationPacket,
1078 ReplicationPacketV2Header, ReplicationSender, ReplicationWireVersion, SenderConfig,
1079 compute_changeset_id, derive_seed_from_changeset_id, encode_changeset,
1080 };
1081
1082 const TEST_BEAD_ID: &str = "bd-1hi.14";
1083
1084 #[allow(clippy::cast_possible_truncation)]
1085 fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
1086 page_numbers
1087 .iter()
1088 .map(|&pn| {
1089 let mut data = vec![0_u8; page_size as usize];
1090 for (i, byte) in data.iter_mut().enumerate() {
1091 *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
1092 }
1093 PageEntry::new(pn, data)
1094 })
1095 .collect()
1096 }
1097
1098 fn generate_sender_packets(
1100 page_size: u32,
1101 page_numbers: &[u32],
1102 symbol_size: u16,
1103 ) -> Vec<Vec<u8>> {
1104 generate_sender_packets_with_multiplier(page_size, page_numbers, symbol_size, 1)
1105 }
1106
1107 fn generate_sender_packets_with_multiplier(
1108 page_size: u32,
1109 page_numbers: &[u32],
1110 symbol_size: u16,
1111 max_isi_multiplier: u32,
1112 ) -> Vec<Vec<u8>> {
1113 let mut sender = ReplicationSender::new();
1114 let mut pages = make_pages(page_size, page_numbers);
1115 let config = SenderConfig {
1116 symbol_size,
1117 max_isi_multiplier,
1118 };
1119 sender
1120 .prepare(page_size, &mut pages, config)
1121 .expect("prepare");
1122 sender.start_streaming().expect("start");
1123
1124 let mut packets = Vec::new();
1125 while let Some(packet) = sender.next_packet().expect("next") {
1126 packets.push(packet.to_bytes().expect("encode"));
1127 }
1128 packets
1129 }
1130
1131 #[derive(Debug)]
1132 struct SimNetworkDelivery {
1133 sent_count: usize,
1134 delivered: Vec<(u32, Vec<u8>)>,
1135 }
1136
1137 fn packet_symbol(esi: u32, wire_bytes: Vec<u8>) -> AuthenticatedSymbol {
1138 let symbol_id = SymbolId::new_for_test(0xBEEF, 0, esi);
1139 let symbol = Symbol::new(symbol_id, wire_bytes, SymbolKind::Source);
1140 AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero())
1141 }
1142
1143 fn transmit_packets_simnetwork(
1144 config: SimTransportConfig,
1145 packet_bytes: &[Vec<u8>],
1146 ) -> SimNetworkDelivery {
1147 let network = SimNetwork::fully_connected(2, config);
1148 let (mut sink, mut stream) = network.transport(0, 1);
1149 let runtime = RuntimeBuilder::current_thread()
1150 .build()
1151 .expect("runtime build");
1152
1153 runtime.block_on(async {
1154 for (index, bytes) in packet_bytes.iter().enumerate() {
1155 let esi = u32::try_from(index).expect("test packet index fits u32");
1156 sink.send(packet_symbol(esi, bytes.clone()))
1157 .await
1158 .expect("send simulated symbol");
1159 }
1160 sink.close().await.expect("close simulated sink");
1161
1162 let mut delivered = Vec::new();
1163 while let Some(item) = stream.next().await {
1164 let auth = item.expect("sim stream item");
1165 delivered.push((auth.symbol().id().esi(), auth.symbol().data().to_vec()));
1166 }
1167
1168 SimNetworkDelivery {
1169 sent_count: packet_bytes.len(),
1170 delivered,
1171 }
1172 })
1173 }
1174
1175 fn has_duplicate_esies(delivery: &SimNetworkDelivery) -> bool {
1176 let mut seen = HashSet::new();
1177 delivery.delivered.iter().any(|(esi, _)| !seen.insert(*esi))
1178 }
1179
1180 fn has_reordered_esies(delivery: &SimNetworkDelivery) -> bool {
1181 delivery
1182 .delivered
1183 .windows(2)
1184 .any(|window| window[0].0 > window[1].0)
1185 }
1186
1187 fn has_corrupted_wire_bytes(delivery: &SimNetworkDelivery, original: &[Vec<u8>]) -> bool {
1188 delivery.delivered.iter().any(|(esi, bytes)| {
1189 usize::try_from(*esi)
1190 .ok()
1191 .and_then(|index| original.get(index))
1192 .is_some_and(|expected| expected.as_slice() != bytes.as_slice())
1193 })
1194 }
1195
1196 fn decode_from_wire_packets(
1197 delivered: &[(u32, Vec<u8>)],
1198 ) -> (Option<Vec<DecodedPage>>, usize, usize) {
1199 let mut receiver = ReplicationReceiver::new();
1200 let mut erasures = 0_usize;
1201 let mut parse_errors = 0_usize;
1202
1203 for (_, wire) in delivered {
1204 match receiver.process_packet(wire) {
1205 Ok(PacketResult::DecodeReady) => {
1206 let mut applied = receiver.apply_pending().expect("apply decoded changeset");
1207 let pages = applied.pop().expect("decode result pages").pages;
1208 return (Some(pages), erasures, parse_errors);
1209 }
1210 Ok(PacketResult::Erasure) => erasures += 1,
1211 Ok(PacketResult::Accepted | PacketResult::Duplicate | PacketResult::NeedMore) => {}
1212 Err(_) => parse_errors += 1,
1213 }
1214 }
1215
1216 (None, erasures, parse_errors)
1217 }
1218
1219 fn decoded_matches_original(decoded: &[DecodedPage], original: &[PageEntry]) -> bool {
1220 if decoded.len() != original.len() {
1221 return false;
1222 }
1223 for (decoded, original) in decoded.iter().zip(original.iter()) {
1224 if decoded.page_number != original.page_number {
1225 return false;
1226 }
1227 if decoded.page_data != original.page_bytes {
1228 return false;
1229 }
1230 }
1231 true
1232 }
1233
1234 fn make_packet(
1235 changeset_id: ChangesetId,
1236 sbn: u8,
1237 esi: u32,
1238 k_source: u32,
1239 symbol_data: Vec<u8>,
1240 ) -> ReplicationPacket {
1241 let symbol_size_t =
1242 u16::try_from(symbol_data.len()).expect("test symbol payload must fit u16");
1243 let seed = derive_seed_from_changeset_id(&changeset_id);
1244 ReplicationPacket::new_v2(
1245 ReplicationPacketV2Header {
1246 changeset_id,
1247 sbn,
1248 esi,
1249 k_source,
1250 r_repair: 0,
1251 symbol_size_t,
1252 seed,
1253 },
1254 symbol_data,
1255 )
1256 }
1257
1258 fn receiver_with_decode_proofs() -> ReplicationReceiver {
1259 ReplicationReceiver::with_config(ReceiverConfig {
1260 auth_key: None,
1261 decode_proof_policy: DecodeProofEmissionPolicy::durability_critical(),
1262 ..ReceiverConfig::default()
1263 })
1264 }
1265
1266 #[test]
1271 fn test_receiver_listening_to_collecting() {
1272 let mut receiver = ReplicationReceiver::new();
1273 assert_eq!(
1274 receiver.state(),
1275 ReceiverState::Listening,
1276 "bead_id={TEST_BEAD_ID} case=initial_state"
1277 );
1278
1279 let packets = generate_sender_packets(512, &[1], 512);
1280 assert!(!packets.is_empty());
1281
1282 receiver.process_packet(&packets[0]).expect("first packet");
1283 assert_ne!(
1284 receiver.state(),
1285 ReceiverState::Listening,
1286 "bead_id={TEST_BEAD_ID} case=transition_on_first_packet"
1287 );
1288 }
1289
1290 #[test]
1291 fn test_receiver_decoder_creation() {
1292 let mut receiver = ReplicationReceiver::new();
1293 let packets = generate_sender_packets(512, &[1, 2], 512);
1294 assert_eq!(receiver.active_decoders(), 0);
1295
1296 receiver.process_packet(&packets[0]).expect("first packet");
1297 assert_ne!(
1301 receiver.state(),
1302 ReceiverState::Listening,
1303 "bead_id={TEST_BEAD_ID} case=decoder_created"
1304 );
1305 }
1306
1307 #[test]
1308 fn test_receiver_rejects_new_changeset_when_decoder_limit_hit() {
1309 let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1310 max_inflight_decoders: 1,
1311 ..ReceiverConfig::default()
1312 });
1313
1314 let first = make_packet(
1315 ChangesetId::from_bytes([0x31; 16]),
1316 0,
1317 0,
1318 100,
1319 vec![0x11; 256],
1320 );
1321 receiver
1322 .process_parsed_packet(&first)
1323 .expect("first decoder");
1324 assert_eq!(receiver.active_decoders(), 1);
1325
1326 let second = make_packet(
1327 ChangesetId::from_bytes([0x32; 16]),
1328 0,
1329 0,
1330 100,
1331 vec![0x22; 256],
1332 );
1333 let err = receiver.process_parsed_packet(&second).unwrap_err();
1334 assert!(matches!(err, FrankenError::Busy));
1335 assert_eq!(receiver.active_decoders(), 1);
1336 }
1337
1338 #[test]
1339 fn test_receiver_enforces_buffered_symbol_budget() {
1340 let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1341 max_buffered_symbol_bytes: 512,
1342 ..ReceiverConfig::default()
1343 });
1344
1345 let first = make_packet(
1346 ChangesetId::from_bytes([0x41; 16]),
1347 0,
1348 0,
1349 100,
1350 vec![0x55; 400],
1351 );
1352 receiver
1353 .process_parsed_packet(&first)
1354 .expect("first packet");
1355 assert_eq!(receiver.active_decoders(), 1);
1356
1357 let second = make_packet(
1359 ChangesetId::from_bytes([0x42; 16]),
1360 0,
1361 0,
1362 100,
1363 vec![0x77; 200],
1364 );
1365 let err = receiver.process_parsed_packet(&second).unwrap_err();
1366 assert!(matches!(err, FrankenError::TooBig));
1367 assert_eq!(receiver.active_decoders(), 1);
1368 }
1369
1370 #[test]
1371 fn test_receiver_seed_derivation() {
1372 let id = ChangesetId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
1374 let seed = derive_seed_from_changeset_id(&id);
1375
1376 let expected = xxhash_rust::xxh3::xxh3_64(id.as_bytes());
1377 assert_eq!(
1378 seed, expected,
1379 "bead_id={TEST_BEAD_ID} case=seed_matches_sender"
1380 );
1381 }
1382
1383 #[test]
1384 fn test_receiver_v1_reject_sbn_nonzero() {
1385 let mut receiver = ReplicationReceiver::new();
1386 let packet = make_packet(
1387 ChangesetId::from_bytes([0xAA; 16]),
1388 1, 0,
1390 10,
1391 vec![0x55; 512],
1392 );
1393 let wire = packet.to_bytes().expect("encode");
1394 let result = receiver.process_packet(&wire);
1395 assert!(
1396 result.is_err(),
1397 "bead_id={TEST_BEAD_ID} case=v1_sbn_rejected"
1398 );
1399 }
1400
1401 #[test]
1402 fn test_receiver_k_source_validation() {
1403 let mut receiver = ReplicationReceiver::new();
1404
1405 let packet_zero = make_packet(
1407 ChangesetId::from_bytes([0xBB; 16]),
1408 0,
1409 0,
1410 0,
1411 vec![0x55; 512],
1412 );
1413 let wire_zero = packet_zero.to_bytes().expect("encode");
1414 assert!(
1415 receiver.process_packet(&wire_zero).is_err(),
1416 "bead_id={TEST_BEAD_ID} case=k_source_zero_rejected"
1417 );
1418
1419 let packet_over = make_packet(
1421 ChangesetId::from_bytes([0xCC; 16]),
1422 0,
1423 0,
1424 K_MAX + 1,
1425 vec![0x55; 512],
1426 );
1427 let result = receiver.process_parsed_packet(&packet_over);
1430 assert!(
1431 result.is_err(),
1432 "bead_id={TEST_BEAD_ID} case=k_source_over_max_rejected"
1433 );
1434
1435 let packet_max = make_packet(
1437 ChangesetId::from_bytes([0xDD; 16]),
1438 0,
1439 0,
1440 K_MAX,
1441 vec![0x55; 512],
1442 );
1443 let result = receiver.process_parsed_packet(&packet_max);
1444 assert!(
1445 result.is_ok(),
1446 "bead_id={TEST_BEAD_ID} case=k_source_at_max_accepted"
1447 );
1448 }
1449
1450 #[test]
1451 fn test_receiver_symbol_size_inference() {
1452 let mut receiver = ReplicationReceiver::new();
1453 let packet = make_packet(
1454 ChangesetId::from_bytes([0xEE; 16]),
1455 0,
1456 0,
1457 100,
1458 vec![0x42; 1024],
1459 );
1460 receiver
1461 .process_parsed_packet(&packet)
1462 .expect("accept packet");
1463
1464 let decoder = receiver
1466 .decoders
1467 .get(&packet.changeset_id)
1468 .expect("decoder exists");
1469 assert_eq!(
1470 decoder.symbol_size, 1024,
1471 "bead_id={TEST_BEAD_ID} case=symbol_size_inferred"
1472 );
1473
1474 let mut receiver2 = ReplicationReceiver::new();
1476 let empty_packet = make_packet(ChangesetId::from_bytes([0xFF; 16]), 0, 0, 10, vec![]);
1477 assert!(
1478 receiver2.process_parsed_packet(&empty_packet).is_err(),
1479 "bead_id={TEST_BEAD_ID} case=zero_symbol_size_rejected"
1480 );
1481 }
1482
1483 #[test]
1484 fn test_receiver_k_source_mismatch_rejected() {
1485 let mut receiver = ReplicationReceiver::new();
1486 let id = ChangesetId::from_bytes([0x11; 16]);
1487
1488 let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1489 receiver
1490 .process_parsed_packet(&p1)
1491 .expect("first packet ok");
1492
1493 let p2 = make_packet(id, 0, 1, 200, vec![0x42; 512]); assert!(
1496 receiver.process_parsed_packet(&p2).is_err(),
1497 "bead_id={TEST_BEAD_ID} case=k_source_mismatch_rejected"
1498 );
1499 }
1500
1501 #[test]
1502 fn test_receiver_symbol_size_mismatch_rejected() {
1503 let mut receiver = ReplicationReceiver::new();
1504 let id = ChangesetId::from_bytes([0x22; 16]);
1505
1506 let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1507 receiver
1508 .process_parsed_packet(&p1)
1509 .expect("first packet ok");
1510
1511 let p2 = make_packet(id, 0, 1, 100, vec![0x42; 1024]); assert!(
1514 receiver.process_parsed_packet(&p2).is_err(),
1515 "bead_id={TEST_BEAD_ID} case=symbol_size_mismatch_rejected"
1516 );
1517 }
1518
1519 #[test]
1520 fn test_receiver_isi_deduplication() {
1521 let mut receiver = ReplicationReceiver::new();
1522 let id = ChangesetId::from_bytes([0x33; 16]);
1523
1524 let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1525
1526 let r1 = receiver.process_parsed_packet(&p1).expect("first");
1527 assert_eq!(
1528 r1,
1529 PacketResult::Accepted,
1530 "bead_id={TEST_BEAD_ID} case=first_accepted"
1531 );
1532
1533 let r2 = receiver.process_parsed_packet(&p1).expect("duplicate");
1535 assert_eq!(
1536 r2,
1537 PacketResult::Duplicate,
1538 "bead_id={TEST_BEAD_ID} case=isi_dedup"
1539 );
1540
1541 let count = receiver.received_counts.get(&id).copied().unwrap_or(0);
1543 assert_eq!(
1544 count, 1,
1545 "bead_id={TEST_BEAD_ID} case=dedup_count_unchanged"
1546 );
1547 }
1548
1549 #[test]
1550 fn test_receiver_treats_payload_hash_mismatch_as_erasure() {
1551 let mut receiver = ReplicationReceiver::new();
1552 let mut packet = make_packet(
1553 ChangesetId::from_bytes([0x44; 16]),
1554 0,
1555 0,
1556 100,
1557 vec![0x42; 512],
1558 );
1559 packet.payload_xxh3 ^= 0xDEAD_BEEF;
1560 let wire = packet.to_bytes().expect("encode tampered packet");
1561 let result = receiver.process_packet(&wire).expect("process packet");
1562 assert_eq!(result, PacketResult::Erasure);
1563 }
1564
1565 #[test]
1566 fn test_receiver_treats_invalid_auth_tag_as_erasure() {
1567 let receiver_key = [0x11_u8; 32];
1568 let sender_key = [0x22_u8; 32];
1569 let mut receiver =
1570 ReplicationReceiver::with_config(ReceiverConfig::with_auth_key(receiver_key));
1571 let mut packet = make_packet(
1572 ChangesetId::from_bytes([0x45; 16]),
1573 0,
1574 0,
1575 100,
1576 vec![0x24; 512],
1577 );
1578 packet.attach_auth_tag(&sender_key);
1579 let wire = packet.to_bytes().expect("encode auth packet");
1580 let result = receiver.process_packet(&wire).expect("process packet");
1581 assert_eq!(result, PacketResult::Erasure);
1582 }
1583
1584 #[test]
1585 fn test_receiver_accepts_legacy_v1_packets() {
1586 let mut receiver = ReplicationReceiver::new();
1587 let id = ChangesetId::from_bytes([0x46; 16]);
1588 let symbol_data = vec![0x5A; 512];
1589 let legacy = ReplicationPacket {
1590 wire_version: ReplicationWireVersion::LegacyV1,
1591 changeset_id: id,
1592 sbn: 0,
1593 esi: 0,
1594 k_source: 100,
1595 r_repair: 0,
1596 symbol_size_t: 512,
1597 seed: derive_seed_from_changeset_id(&id),
1598 payload_xxh3: ReplicationPacket::compute_payload_xxh3(&symbol_data),
1599 auth_tag: None,
1600 symbol_data,
1601 };
1602 let wire = legacy.to_bytes().expect("encode legacy packet");
1603 let parsed = ReplicationPacket::from_bytes(&wire).expect("decode legacy packet");
1604 assert_eq!(parsed.wire_version, ReplicationWireVersion::LegacyV1);
1605 let result = receiver
1606 .process_packet(&wire)
1607 .expect("process legacy packet");
1608 assert_eq!(result, PacketResult::Accepted);
1609 }
1610
1611 #[test]
1612 fn test_receiver_decode_at_k_source() {
1613 let page_size = 512_u32;
1615 let mut receiver = ReplicationReceiver::new();
1616 let packets = generate_sender_packets(page_size, &[1, 2, 3], 512);
1617
1618 let mut last_result = PacketResult::Accepted;
1619 for pkt in &packets {
1620 let result = receiver
1621 .process_packet(pkt)
1622 .expect("bead_id={TEST_BEAD_ID} case=decode_at_k unexpected error");
1623 last_result = result;
1624 }
1625
1626 assert_eq!(
1627 last_result,
1628 PacketResult::DecodeReady,
1629 "bead_id={TEST_BEAD_ID} case=decode_triggers_at_k_source"
1630 );
1631 assert_eq!(
1632 receiver.state(),
1633 ReceiverState::Applying,
1634 "bead_id={TEST_BEAD_ID} case=state_applying_after_decode"
1635 );
1636 }
1637
1638 #[test]
1639 fn test_receiver_decode_failure_emits_proof_when_enabled() {
1640 let mut receiver = receiver_with_decode_proofs();
1641 let changeset_id = ChangesetId::from_bytes([0x5A; 16]);
1642
1643 let p1 = make_packet(changeset_id, 0, 2, 2, vec![0xA1; 64]);
1645 let p2 = make_packet(changeset_id, 0, 3, 2, vec![0xA2; 64]);
1646
1647 let r1 = receiver.process_parsed_packet(&p1).expect("first packet");
1648 assert_eq!(r1, PacketResult::Accepted);
1649 let r2 = receiver.process_parsed_packet(&p2).expect("second packet");
1650 assert_eq!(r2, PacketResult::NeedMore);
1651
1652 let audit = receiver.take_decode_audit_entries();
1653 assert_eq!(audit.len(), 1, "bead_id=bd-faz4 case=failure_proof_emitted");
1654 let proof = &audit[0].proof;
1655 assert!(
1656 !proof.decode_success,
1657 "bead_id=bd-faz4 case=failure_proof_decode_success_false"
1658 );
1659 assert_eq!(proof.changeset_id, Some(*changeset_id.as_bytes()));
1660 assert!(
1661 proof.is_consistent(),
1662 "bead_id=bd-faz4 case=failure_proof_consistent"
1663 );
1664 }
1665
1666 #[test]
1667 fn test_receiver_decode_success_with_repair_emits_proof_when_enabled() {
1668 let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1669 auth_key: None,
1670 decode_proof_policy: DecodeProofEmissionPolicy {
1671 emit_on_decode_failure: false,
1672 emit_on_repair_success: true,
1673 },
1674 ..ReceiverConfig::default()
1675 });
1676 let page_size = 64_u32;
1677 let mut pages = make_pages(page_size, &[7]);
1678 let changeset_bytes = encode_changeset(page_size, &mut pages).expect("encode changeset");
1679 let changeset_id = compute_changeset_id(&changeset_bytes);
1680
1681 let symbol_size = 64_usize;
1683 let mut s0 = vec![0_u8; symbol_size];
1684 let mut s1 = vec![0_u8; symbol_size];
1685 let split = changeset_bytes.len().min(symbol_size);
1686 s0[..split].copy_from_slice(&changeset_bytes[..split]);
1687 if changeset_bytes.len() > symbol_size {
1688 let rem = changeset_bytes.len() - symbol_size;
1689 s1[..rem].copy_from_slice(&changeset_bytes[symbol_size..]);
1690 }
1691
1692 let p0 = make_packet(changeset_id, 0, 0, 2, s0);
1694 let p_repair = make_packet(changeset_id, 0, 2, 2, vec![0xCC; symbol_size]);
1695 let p1 = make_packet(changeset_id, 0, 1, 2, s1);
1696
1697 assert_eq!(
1698 receiver.process_parsed_packet(&p0).expect("p0"),
1699 PacketResult::Accepted
1700 );
1701 assert_eq!(
1702 receiver.process_parsed_packet(&p_repair).expect("repair"),
1703 PacketResult::NeedMore
1704 );
1705 assert_eq!(
1706 receiver.process_parsed_packet(&p1).expect("p1"),
1707 PacketResult::DecodeReady
1708 );
1709 assert_eq!(receiver.state(), ReceiverState::Applying);
1710
1711 let results = receiver.apply_pending().expect("apply");
1712 assert_eq!(results.len(), 1);
1713 let decode_proof = results[0]
1714 .decode_proof
1715 .as_ref()
1716 .expect("bead_id=bd-faz4 case=success_proof_attached_to_result");
1717 assert!(decode_proof.decode_success);
1718 assert!(decode_proof.is_repair());
1719 assert!(
1720 decode_proof.is_consistent(),
1721 "bead_id=bd-faz4 case=success_proof_consistent"
1722 );
1723
1724 let audit = receiver.take_decode_audit_entries();
1725 assert_eq!(audit.len(), 1, "bead_id=bd-faz4 case=success_proof_emitted");
1726 }
1727
1728 #[test]
1729 fn test_receiver_decode_success_truncation() {
1730 let page_size = 128_u32;
1731 let mut receiver = ReplicationReceiver::new();
1732 let packets = generate_sender_packets(page_size, &[1], 128);
1733
1734 for pkt in &packets {
1735 let _ = receiver.process_packet(pkt);
1736 }
1737
1738 if receiver.state() == ReceiverState::Applying {
1740 let results = receiver.apply_pending().expect("apply");
1741 assert!(
1742 !results.is_empty(),
1743 "bead_id={TEST_BEAD_ID} case=has_results"
1744 );
1745 for result in &results {
1746 for page in &result.pages {
1747 assert_eq!(
1748 page.page_data.len(),
1749 page_size as usize,
1750 "bead_id={TEST_BEAD_ID} case=page_data_correct_size"
1751 );
1752 }
1753 }
1754 }
1755 }
1756
1757 #[test]
1758 fn test_receiver_page_xxh3_validation() {
1759 let page_size = 256_u32;
1760 let mut pages = make_pages(page_size, &[1]);
1761 let changeset_bytes = encode_changeset(page_size, &mut pages).expect("encode");
1762
1763 let mut tampered = changeset_bytes.clone();
1765 let tamper_offset = CHANGESET_HEADER_SIZE + 4 + 8 + 10; tampered[tamper_offset] ^= 0xFF;
1767
1768 let receiver = ReplicationReceiver::new();
1770 let changeset_id = compute_changeset_id(&changeset_bytes);
1771 let result = receiver.parse_and_validate_changeset(changeset_id, &tampered);
1772 assert!(
1773 result.is_err(),
1774 "bead_id={TEST_BEAD_ID} case=xxh3_validation_catches_corruption"
1775 );
1776 }
1777
1778 #[test]
1779 fn test_parse_and_validate_rejects_total_len_smaller_than_header() {
1780 let receiver = ReplicationReceiver::new();
1781 let changeset_id = ChangesetId::from_bytes([0xA5; 16]);
1782
1783 let mut malformed = vec![0_u8; CHANGESET_HEADER_SIZE];
1784 malformed[0..4].copy_from_slice(b"FSRP");
1785 malformed[4..6].copy_from_slice(&1_u16.to_le_bytes());
1786 malformed[6..10].copy_from_slice(&4096_u32.to_le_bytes());
1787 malformed[10..14].copy_from_slice(&1_u32.to_le_bytes());
1788 malformed[14..22].copy_from_slice(&1_u64.to_le_bytes());
1789
1790 let result = receiver.parse_and_validate_changeset(changeset_id, &malformed);
1791 assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1792 }
1793
1794 #[test]
1795 fn test_parse_changeset_pages_rejects_truncated_payload() {
1796 let total_len = CHANGESET_HEADER_SIZE + 8;
1797 let mut malformed = vec![0_u8; total_len];
1798 malformed[0..4].copy_from_slice(b"FSRP");
1799 malformed[4..6].copy_from_slice(&1_u16.to_le_bytes());
1800 malformed[6..10].copy_from_slice(&4096_u32.to_le_bytes());
1801 malformed[10..14].copy_from_slice(&1_u32.to_le_bytes());
1802 malformed[14..22].copy_from_slice(
1803 &u64::try_from(total_len)
1804 .expect("test total_len fits into u64")
1805 .to_le_bytes(),
1806 );
1807
1808 let result = parse_changeset_pages(&malformed);
1809 assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1810 }
1811
1812 #[test]
1813 fn test_receiver_pages_applied_in_order() {
1814 let page_size = 256_u32;
1815 let mut receiver = ReplicationReceiver::new();
1816 let packets = generate_sender_packets(page_size, &[5, 1, 3, 2, 4], 256);
1817
1818 for pkt in &packets {
1819 let _ = receiver.process_packet(pkt);
1820 }
1821
1822 if receiver.state() == ReceiverState::Applying {
1823 let results = receiver.apply_pending().expect("apply");
1824 let pages = &results[0].pages;
1825 for w in pages.windows(2) {
1826 assert!(
1827 w[0].page_number <= w[1].page_number,
1828 "bead_id={TEST_BEAD_ID} case=pages_sorted pn0={} pn1={}",
1829 w[0].page_number,
1830 w[1].page_number
1831 );
1832 }
1833 }
1834 }
1835
1836 #[test]
1841 fn prop_any_k_symbols_decode() {
1842 for n_pages in [1_u32, 3, 5, 10] {
1845 let page_size = 256_u32;
1846 let mut receiver = ReplicationReceiver::new();
1847 let packets =
1848 generate_sender_packets(page_size, &(1..=n_pages).collect::<Vec<_>>(), 256);
1849
1850 let mut decode_ready = false;
1851 for pkt in &packets {
1852 if matches!(receiver.process_packet(pkt), Ok(PacketResult::DecodeReady)) {
1853 decode_ready = true;
1854 break;
1855 }
1856 }
1857 assert!(
1858 decode_ready,
1859 "bead_id={TEST_BEAD_ID} case=prop_any_k_decode n_pages={n_pages}"
1860 );
1861 }
1862 }
1863
1864 #[test]
1865 fn prop_dedup_idempotent() {
1866 let mut receiver = ReplicationReceiver::new();
1868 let id = ChangesetId::from_bytes([0x77; 16]);
1869
1870 let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]); let r1 = receiver.process_parsed_packet(&p1).expect("first");
1874 assert_eq!(
1875 r1,
1876 PacketResult::Accepted,
1877 "bead_id={TEST_BEAD_ID} case=dedup_first_accepted"
1878 );
1879
1880 for _ in 0..5 {
1881 let r = receiver.process_parsed_packet(&p1).expect("duplicate");
1882 assert_eq!(
1883 r,
1884 PacketResult::Duplicate,
1885 "bead_id={TEST_BEAD_ID} case=dedup_subsequent_always_duplicate"
1886 );
1887 }
1888
1889 let count = receiver.received_counts.get(&id).copied().unwrap_or(0);
1891 assert_eq!(count, 1, "bead_id={TEST_BEAD_ID} case=dedup_count_stable");
1892 }
1893
1894 #[test]
1899 fn test_packet_reject_over_message_cap() {
1900 let mut receiver = ReplicationReceiver::new();
1901 let oversized = vec![0_u8; DEFAULT_RPC_MESSAGE_CAP_BYTES + 1];
1902 let err = receiver.process_packet(&oversized).unwrap_err();
1903 assert!(matches!(err, FrankenError::TooBig));
1904 }
1905
1906 #[test]
1907 fn test_e2e_sender_receiver_roundtrip() {
1908 let page_size = 512_u32;
1910 let page_numbers: Vec<u32> = (1..=20).collect();
1911 let original_pages = make_pages(page_size, &page_numbers);
1912
1913 let mut receiver = ReplicationReceiver::new();
1914 let packets = generate_sender_packets(page_size, &page_numbers, 512);
1915
1916 for pkt in &packets {
1917 let _ = receiver.process_packet(pkt);
1918 }
1919
1920 assert_eq!(
1921 receiver.state(),
1922 ReceiverState::Applying,
1923 "bead_id={TEST_BEAD_ID} case=e2e_roundtrip_applying"
1924 );
1925
1926 let results = receiver.apply_pending().expect("apply");
1927 assert_eq!(
1928 results.len(),
1929 1,
1930 "bead_id={TEST_BEAD_ID} case=e2e_one_changeset"
1931 );
1932
1933 let decoded_pages = &results[0].pages;
1934 assert_eq!(
1935 decoded_pages.len(),
1936 original_pages.len(),
1937 "bead_id={TEST_BEAD_ID} case=e2e_page_count"
1938 );
1939
1940 for (decoded, original) in decoded_pages.iter().zip(original_pages.iter()) {
1941 assert_eq!(
1942 decoded.page_number, original.page_number,
1943 "bead_id={TEST_BEAD_ID} case=e2e_page_number_match"
1944 );
1945 assert_eq!(
1946 decoded.page_data, original.page_bytes,
1947 "bead_id={TEST_BEAD_ID} case=e2e_page_data_identical pn={}",
1948 original.page_number
1949 );
1950 }
1951
1952 receiver.reset_to_listening().expect("reset");
1954 assert_eq!(
1955 receiver.state(),
1956 ReceiverState::Listening,
1957 "bead_id={TEST_BEAD_ID} case=e2e_back_to_listening"
1958 );
1959 }
1960
1961 #[test]
1962 fn test_e2e_concurrent_changesets() {
1963 let mut receiver = ReplicationReceiver::new();
1965
1966 let packets_a = generate_sender_packets(256, &[1, 2, 3], 256);
1967 let packets_b = generate_sender_packets(256, &[10, 20, 30], 256);
1968
1969 let mut all_packets = Vec::new();
1971 let max_len = packets_a.len().max(packets_b.len());
1972 for i in 0..max_len {
1973 if i < packets_a.len() {
1974 all_packets.push(packets_a[i].clone());
1975 }
1976 if i < packets_b.len() {
1977 all_packets.push(packets_b[i].clone());
1978 }
1979 }
1980
1981 let mut decode_count = 0_u32;
1982 for pkt in &all_packets {
1983 if matches!(receiver.process_packet(pkt), Ok(PacketResult::DecodeReady)) {
1984 decode_count += 1;
1985 if receiver.state() == ReceiverState::Applying {
1987 let _ = receiver.apply_pending();
1988 if !receiver.decoders.is_empty() {
1990 receiver.state = ReceiverState::Collecting;
1991 }
1992 }
1993 }
1994 }
1995
1996 assert!(
1997 decode_count >= 1,
1998 "bead_id={TEST_BEAD_ID} case=e2e_concurrent_at_least_one_decoded count={decode_count}"
1999 );
2000 }
2001
2002 #[test]
2003 fn test_e2e_bd_1hi_14_compliance() {
2004 let page_size = 1024_u32;
2006 let page_numbers: Vec<u32> = (1..=10).collect();
2007 let original_pages = make_pages(page_size, &page_numbers);
2008
2009 let mut sender = ReplicationSender::new();
2011 let mut pages = make_pages(page_size, &page_numbers);
2012 sender
2013 .prepare(page_size, &mut pages, SenderConfig::default())
2014 .expect("prepare");
2015 sender.start_streaming().expect("start");
2016
2017 let mut wire_packets = Vec::new();
2019 while let Some(packet) = sender.next_packet().expect("next") {
2020 wire_packets.push(packet.to_bytes().expect("encode"));
2021 }
2022
2023 let mut receiver = ReplicationReceiver::new();
2025 assert_eq!(receiver.state(), ReceiverState::Listening);
2026
2027 let mut last_result = PacketResult::Accepted;
2028 for pkt in &wire_packets {
2029 let result = receiver
2030 .process_packet(pkt)
2031 .expect("bead_id={TEST_BEAD_ID} case=e2e_compliance unexpected error");
2032 last_result = result;
2033 if result == PacketResult::DecodeReady {
2034 break;
2035 }
2036 }
2037
2038 assert_eq!(
2040 last_result,
2041 PacketResult::DecodeReady,
2042 "bead_id={TEST_BEAD_ID} case=e2e_compliance_decoded"
2043 );
2044 assert_eq!(receiver.state(), ReceiverState::Applying);
2045
2046 let results = receiver.apply_pending().expect("apply");
2048 assert_eq!(receiver.state(), ReceiverState::Complete);
2049 assert_eq!(results.len(), 1);
2050
2051 let decoded = &results[0].pages;
2053 assert_eq!(decoded.len(), original_pages.len());
2054 for (d, o) in decoded.iter().zip(original_pages.iter()) {
2055 assert_eq!(d.page_number, o.page_number);
2056 assert_eq!(d.page_data, o.page_bytes);
2057 }
2058
2059 receiver.reset_to_listening().expect("reset");
2061 assert_eq!(
2062 receiver.state(),
2063 ReceiverState::Listening,
2064 "bead_id={TEST_BEAD_ID} case=e2e_compliance_reset"
2065 );
2066 assert_eq!(receiver.applied_count(), 1);
2067 }
2068
2069 #[test]
2070 fn test_simnetwork_loss_profiles_converge_with_repair_symbols() {
2071 let page_size = 128_u32;
2072 let page_numbers = [1_u32, 2];
2073 let original_pages = make_pages(page_size, &page_numbers);
2074 let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2075 let loss_packets: Vec<Vec<u8>> = packets
2076 .iter()
2077 .flat_map(|packet| [packet.clone(), packet.clone()])
2078 .collect();
2079
2080 for (loss_rate, require_observed_drop) in [(0.05_f64, false), (0.30_f64, true)] {
2081 let mut found_seed = None;
2082 for seed in 1_u64..=20_000 {
2083 let mut config = SimTransportConfig::deterministic(seed);
2084 config.loss_rate = loss_rate;
2085 config.preserve_order = true;
2086
2087 let delivery = transmit_packets_simnetwork(config, &loss_packets);
2088 let observed_drop = delivery.delivered.len() < delivery.sent_count;
2089 if require_observed_drop && !observed_drop {
2090 continue;
2091 }
2092 let saw_repair_symbol = delivery.delivered.iter().any(|(_, wire)| {
2093 ReplicationPacket::from_bytes(wire)
2094 .is_ok_and(|packet| !packet.is_source_symbol())
2095 });
2096 if !saw_repair_symbol {
2097 continue;
2098 }
2099
2100 let (decoded, _erasures, _parse_errors) =
2101 decode_from_wire_packets(&delivery.delivered);
2102 if decoded
2103 .as_ref()
2104 .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2105 {
2106 found_seed = Some(seed);
2107 break;
2108 }
2109 }
2110
2111 assert!(
2112 found_seed.is_some(),
2113 "bead_id=bd-xgoe case=loss_profile_convergence loss_rate={loss_rate} require_drop={require_observed_drop} did not find deterministic convergent seed"
2114 );
2115 }
2116 }
2117
2118 #[test]
2119 fn test_simnetwork_reorder_and_dup_converge() {
2120 let page_size = 128_u32;
2121 let page_numbers = [7_u32, 11];
2122 let original_pages = make_pages(page_size, &page_numbers);
2123 let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2124
2125 let mut found_seed = None;
2126 for seed in 1_u64..=2_000 {
2127 let mut config = SimTransportConfig::deterministic(seed);
2128 config.preserve_order = false;
2129 config.duplication_rate = 0.35;
2130
2131 let delivery = transmit_packets_simnetwork(config, &packets);
2132 if !has_duplicate_esies(&delivery) || !has_reordered_esies(&delivery) {
2133 continue;
2134 }
2135
2136 let (decoded, _erasures, _parse_errors) = decode_from_wire_packets(&delivery.delivered);
2137 if decoded
2138 .as_ref()
2139 .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2140 {
2141 found_seed = Some(seed);
2142 break;
2143 }
2144 }
2145
2146 assert!(
2147 found_seed.is_some(),
2148 "bead_id=bd-xgoe case=reorder_dup_convergence no deterministic seed achieved reorder+dup convergence"
2149 );
2150 }
2151
2152 #[test]
2153 fn test_simnetwork_corruption_is_rejected_and_recovered() {
2154 let page_size = 128_u32;
2155 let page_numbers = [21_u32, 34];
2156 let original_pages = make_pages(page_size, &page_numbers);
2157 let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2158
2159 let mut found_seed = None;
2160 for seed in 1_u64..=20_000 {
2161 let mut config = SimTransportConfig::deterministic(seed);
2162 config.corruption_rate = 0.20;
2163 config.preserve_order = false;
2164
2165 let delivery = transmit_packets_simnetwork(config, &packets);
2166 if !has_corrupted_wire_bytes(&delivery, &packets) {
2167 continue;
2168 }
2169
2170 let (decoded, erasures, parse_errors) = decode_from_wire_packets(&delivery.delivered);
2171 if erasures + parse_errors == 0 {
2172 continue;
2173 }
2174 if decoded
2175 .as_ref()
2176 .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2177 {
2178 found_seed = Some(seed);
2179 break;
2180 }
2181 }
2182
2183 assert!(
2184 found_seed.is_some(),
2185 "bead_id=bd-xgoe case=corruption_recovery no deterministic seed achieved corruption rejection + convergence"
2186 );
2187 }
2188
2189 #[test]
2190 fn test_simnetwork_stop_early_reduces_traffic() {
2191 let page_size = 256_u32;
2192 let page_numbers = [1_u32, 2, 3];
2193 let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 256, 2);
2194
2195 let full_delivery = transmit_packets_simnetwork(SimTransportConfig::reliable(), &packets);
2196 let full_sent = full_delivery.sent_count;
2197
2198 let network = SimNetwork::fully_connected(2, SimTransportConfig::reliable());
2199 let (mut sink, mut stream) = network.transport(0, 1);
2200 let runtime = RuntimeBuilder::current_thread()
2201 .build()
2202 .expect("runtime build");
2203
2204 let mut receiver = ReplicationReceiver::new();
2205 let mut stop_early_sent = 0_usize;
2206 let mut decoded = false;
2207
2208 runtime.block_on(async {
2209 for (index, bytes) in packets.iter().enumerate() {
2210 let esi = u32::try_from(index).expect("test packet index fits u32");
2211 sink.send(packet_symbol(esi, bytes.clone()))
2212 .await
2213 .expect("send simulated symbol");
2214 stop_early_sent += 1;
2215
2216 let delivered = stream
2217 .next()
2218 .await
2219 .expect("delivered packet")
2220 .expect("stream item");
2221 let wire = delivered.symbol().data().to_vec();
2222 if matches!(
2223 receiver.process_packet(&wire).expect("receiver process"),
2224 PacketResult::DecodeReady
2225 ) {
2226 decoded = true;
2227 break;
2228 }
2229 }
2230 sink.close().await.expect("close simulated sink");
2231 });
2232
2233 assert!(
2234 decoded,
2235 "bead_id=bd-xgoe case=stop_early_decode_not_reached"
2236 );
2237 assert!(
2238 stop_early_sent < full_sent,
2239 "bead_id=bd-xgoe case=stop_early_not_reduced stop_early_sent={stop_early_sent} full_sent={full_sent}"
2240 );
2241 }
2242
2243 #[test]
2248 fn test_bd_1hi_14_unit_compliance_gate() {
2249 let _ = ReceiverState::Listening;
2251 let _ = ReceiverState::Collecting;
2252 let _ = ReceiverState::Decoding;
2253 let _ = ReceiverState::Applying;
2254 let _ = ReceiverState::Complete;
2255
2256 let _ = PacketResult::Accepted;
2257 let _ = PacketResult::Erasure;
2258 let _ = PacketResult::Duplicate;
2259 let _ = PacketResult::DecodeReady;
2260 let _ = PacketResult::NeedMore;
2261
2262 let receiver = ReplicationReceiver::new();
2263 assert_eq!(receiver.state(), ReceiverState::Listening);
2264 assert_eq!(receiver.applied_count(), 0);
2265 assert_eq!(receiver.active_decoders(), 0);
2266
2267 assert_eq!(REPLICATION_HEADER_SIZE, 72);
2269 }
2270
2271 #[test]
2272 fn prop_bd_1hi_14_structure_compliance() {
2273 let page_size = 256_u32;
2275 let mut receiver = ReplicationReceiver::new();
2276 assert_eq!(receiver.state(), ReceiverState::Listening);
2277
2278 let packets = generate_sender_packets(page_size, &[1, 2], 256);
2279 for pkt in &packets {
2280 let _ = receiver.process_packet(pkt);
2281 }
2282
2283 assert!(
2285 receiver.state() == ReceiverState::Applying
2286 || receiver.state() == ReceiverState::Collecting,
2287 "bead_id={TEST_BEAD_ID} case=prop_state_machine state={:?}",
2288 receiver.state()
2289 );
2290
2291 if receiver.state() == ReceiverState::Applying {
2292 let results = receiver.apply_pending().expect("apply");
2293 assert!(!results.is_empty());
2294 assert_eq!(receiver.state(), ReceiverState::Complete);
2295 receiver.reset_to_listening().expect("reset");
2296 assert_eq!(receiver.state(), ReceiverState::Listening);
2297 }
2298 }
2299}