1use std::collections::{HashMap, HashSet};
15
16use fsqlite_error::{FrankenError, Result};
17use tracing::{debug, error, info, warn};
18
19use crate::replication_sender::{
20 CHANGESET_HEADER_SIZE, ChangesetId, PageEntry, ReplicationPacket, ReplicationPacketV2Header,
21 SenderConfig, compute_changeset_id, derive_seed_from_changeset_id, encode_changeset,
22};
23use crate::source_block_partition::{K_MAX, SourceBlock, partition_source_blocks};
24
25const BEAD_ID: &str = "bd-1hi.15";
26
27#[derive(Debug, Clone)]
33pub struct BlockResumeState {
34 pub block_id: u32,
36 pub num_received: u32,
38 pub received_isis: HashSet<u32>,
40 pub decoded: bool,
42}
43
44impl BlockResumeState {
45 #[must_use]
47 fn new(block_id: u32) -> Self {
48 Self {
49 block_id,
50 num_received: 0,
51 received_isis: HashSet::new(),
52 decoded: false,
53 }
54 }
55
56 fn record_isi(&mut self, isi: u32) -> bool {
58 if self.received_isis.insert(isi) {
59 self.num_received += 1;
60 true
61 } else {
62 false
63 }
64 }
65
66 #[must_use]
70 pub fn to_bytes(&self) -> Vec<u8> {
71 let n = self.received_isis.len();
72 let mut buf = Vec::with_capacity(13 + n * 4);
73 buf.extend_from_slice(&self.block_id.to_le_bytes());
74 buf.extend_from_slice(&self.num_received.to_le_bytes());
75 buf.push(u8::from(self.decoded));
76 let n_u32 = u32::try_from(n).unwrap_or(u32::MAX);
77 buf.extend_from_slice(&n_u32.to_le_bytes());
78 let mut sorted_isis: Vec<u32> = self.received_isis.iter().copied().collect();
79 sorted_isis.sort_unstable();
80 for isi in sorted_isis {
81 buf.extend_from_slice(&isi.to_le_bytes());
82 }
83 buf
84 }
85
86 pub fn from_bytes(buf: &[u8]) -> Result<(Self, usize)> {
92 if buf.len() < 13 {
93 return Err(FrankenError::DatabaseCorrupt {
94 detail: format!("BlockResumeState too short: {} < 13", buf.len()),
95 });
96 }
97 let block_id = u32::from_le_bytes(buf[0..4].try_into().expect("4 bytes"));
98 let num_received = u32::from_le_bytes(buf[4..8].try_into().expect("4 bytes"));
99 let decoded = buf[8] != 0;
100 let n_isis = u32::from_le_bytes(buf[9..13].try_into().expect("4 bytes"));
101 let n = n_isis as usize;
102 let expected = n
103 .checked_mul(4)
104 .and_then(|v| v.checked_add(13))
105 .ok_or_else(|| FrankenError::DatabaseCorrupt {
106 detail: format!("BlockResumeState n_isis ({n_isis}) causes size overflow"),
107 })?;
108 if buf.len() < expected {
109 return Err(FrankenError::DatabaseCorrupt {
110 detail: format!("BlockResumeState truncated: {} < {expected}", buf.len()),
111 });
112 }
113 let mut received_isis = HashSet::with_capacity(n);
114 for i in 0..n {
115 let offset = 13 + i * 4;
116 let isi = u32::from_le_bytes(buf[offset..offset + 4].try_into().expect("4 bytes"));
117 received_isis.insert(isi);
118 }
119 Ok((
120 Self {
121 block_id,
122 num_received,
123 received_isis,
124 decoded,
125 },
126 expected,
127 ))
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct ResumeState {
134 pub blocks: Vec<BlockResumeState>,
136 pub total_blocks: u32,
138}
139
140impl ResumeState {
141 #[must_use]
143 pub fn new(total_blocks: u32) -> Self {
144 let blocks = (0..total_blocks).map(BlockResumeState::new).collect();
145 Self {
146 blocks,
147 total_blocks,
148 }
149 }
150
151 #[must_use]
153 pub fn decoded_count(&self) -> u32 {
154 u32::try_from(self.blocks.iter().filter(|b| b.decoded).count()).unwrap_or(u32::MAX)
155 }
156
157 #[must_use]
159 pub fn all_decoded(&self) -> bool {
160 self.blocks.iter().all(|b| b.decoded)
161 }
162
163 #[must_use]
165 pub fn to_bytes(&self) -> Vec<u8> {
166 let mut buf = Vec::new();
167 buf.extend_from_slice(&self.total_blocks.to_le_bytes());
168 for block in &self.blocks {
169 buf.extend_from_slice(&block.to_bytes());
170 }
171 buf
172 }
173
174 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
180 if buf.len() < 4 {
181 return Err(FrankenError::DatabaseCorrupt {
182 detail: format!("ResumeState too short: {} < 4", buf.len()),
183 });
184 }
185 let total_blocks = u32::from_le_bytes(buf[0..4].try_into().expect("4 bytes"));
186 let mut blocks = Vec::with_capacity(total_blocks as usize);
187 let mut offset = 4;
188 for _ in 0..total_blocks {
189 let (block, consumed) = BlockResumeState::from_bytes(&buf[offset..])?;
190 blocks.push(block);
191 offset += consumed;
192 }
193 Ok(Self {
194 blocks,
195 total_blocks,
196 })
197 }
198}
199
200#[derive(Debug)]
206pub struct SnapshotSender {
207 pub source_blocks: Vec<SourceBlock>,
209 pub page_size: u32,
211 current_block: usize,
213 current_isi: u32,
215 block_changeset_ids: Vec<ChangesetId>,
217 block_k_sources: Vec<u32>,
219 block_changesets: Vec<Vec<u8>>,
221 config: SenderConfig,
223 done: bool,
225}
226
227impl SnapshotSender {
228 #[allow(clippy::too_many_lines)]
236 pub fn prepare(
237 page_size: u32,
238 all_pages: &mut [PageEntry],
239 config: SenderConfig,
240 ) -> Result<Self> {
241 if all_pages.is_empty() {
242 return Err(FrankenError::OutOfRange {
243 what: "pages".to_owned(),
244 value: "0".to_owned(),
245 });
246 }
247
248 let total_pages = u32::try_from(all_pages.len()).map_err(|_| FrankenError::OutOfRange {
249 what: "total_pages".to_owned(),
250 value: all_pages.len().to_string(),
251 })?;
252
253 let source_blocks = partition_source_blocks(total_pages)?;
254 info!(
255 bead_id = BEAD_ID,
256 total_pages,
257 n_blocks = source_blocks.len(),
258 page_size,
259 "snapshot partitioned into source blocks"
260 );
261
262 all_pages.sort_by_key(|p| p.page_number);
264
265 let mut block_changeset_ids = Vec::with_capacity(source_blocks.len());
267 let mut block_k_sources = Vec::with_capacity(source_blocks.len());
268 let mut block_changesets = Vec::with_capacity(source_blocks.len());
269
270 let mut page_idx = 0_usize;
271 for block in &source_blocks {
272 let end = page_idx + block.num_pages as usize;
273 if end > all_pages.len() {
274 return Err(FrankenError::Internal(format!(
275 "block {} requires pages up to index {end}, but only {} available",
276 block.index,
277 all_pages.len()
278 )));
279 }
280 let block_pages = &mut all_pages[page_idx..end];
281 let changeset_bytes = encode_changeset(page_size, block_pages)?;
282 let changeset_id = compute_changeset_id(&changeset_bytes);
283
284 let t = u64::from(config.symbol_size);
286 let f = changeset_bytes.len() as u64;
287 let k_source = u32::try_from(f.div_ceil(t)).map_err(|_| FrankenError::OutOfRange {
288 what: "k_source".to_owned(),
289 value: f.div_ceil(t).to_string(),
290 })?;
291
292 debug!(
293 bead_id = BEAD_ID,
294 block_index = block.index,
295 num_pages = block.num_pages,
296 changeset_len = changeset_bytes.len(),
297 k_source,
298 "prepared block changeset"
299 );
300
301 block_changeset_ids.push(changeset_id);
302 block_k_sources.push(k_source);
303 block_changesets.push(changeset_bytes);
304 page_idx = end;
305 }
306
307 Ok(Self {
308 source_blocks,
309 page_size,
310 current_block: 0,
311 current_isi: 0,
312 block_changeset_ids,
313 block_k_sources,
314 block_changesets,
315 config,
316 done: false,
317 })
318 }
319
320 pub fn next_packet(&mut self) -> Option<ReplicationPacket> {
325 if self.done || self.current_block >= self.source_blocks.len() {
326 self.done = true;
327 return None;
328 }
329
330 let k_source = self.block_k_sources[self.current_block];
331 let max_isi = k_source.saturating_mul(self.config.max_isi_multiplier);
332
333 if self.current_isi >= max_isi {
334 self.current_block += 1;
335 self.current_isi = 0;
336 if self.current_block >= self.source_blocks.len() {
337 self.done = true;
338 return None;
339 }
340 }
341
342 let changeset = &self.block_changesets[self.current_block];
343 let changeset_id = self.block_changeset_ids[self.current_block];
344 let k_source = self.block_k_sources[self.current_block];
345 let isi = self.current_isi;
346 let t = usize::from(self.config.symbol_size);
347
348 let symbol_data = if u64::from(isi) < u64::from(k_source) {
350 let start = isi as usize * t;
351 let end = (start + t).min(changeset.len());
352 let mut data = vec![0_u8; t];
353 let available = end.saturating_sub(start);
354 if available > 0 {
355 data[..available].copy_from_slice(&changeset[start..end]);
356 }
357 data
358 } else {
359 #[allow(clippy::cast_possible_truncation)]
361 {
362 let seed = derive_seed_from_changeset_id(&changeset_id);
363 let repair_seed = seed.wrapping_add(u64::from(isi));
364 let mut data = vec![0_u8; t];
365 for (i, byte) in data.iter_mut().enumerate() {
366 let mixed = repair_seed
367 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
368 .wrapping_add(i as u64);
369 *byte = (mixed >> 32) as u8;
370 }
371 data
372 }
373 };
374
375 let seed = derive_seed_from_changeset_id(&changeset_id);
376 let r_repair = max_isi.saturating_sub(k_source);
377 let packet = ReplicationPacket::new_v2(
378 ReplicationPacketV2Header {
379 changeset_id,
380 sbn: 0,
381 esi: isi,
382 k_source,
383 r_repair,
384 symbol_size_t: self.config.symbol_size,
385 seed,
386 },
387 symbol_data,
388 );
389
390 self.current_isi += 1;
391 Some(packet)
392 }
393
394 #[must_use]
396 pub fn num_blocks(&self) -> usize {
397 self.source_blocks.len()
398 }
399
400 #[must_use]
402 pub fn total_source_symbols(&self) -> u64 {
403 self.block_k_sources.iter().map(|&k| u64::from(k)).sum()
404 }
405
406 pub fn restart(&mut self) {
408 self.current_block = 0;
409 self.current_isi = 0;
410 self.done = false;
411 debug!(bead_id = BEAD_ID, "snapshot sender restarted for next pass");
412 }
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub enum SnapshotReceiverState {
422 Waiting,
424 Receiving,
426 Complete,
428}
429
430#[derive(Debug, Clone)]
432pub struct DecodedBlock {
433 pub block_index: u32,
435 pub pages: Vec<DecodedBlockPage>,
437}
438
439#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct DecodedBlockPage {
442 pub page_number: u32,
444 pub page_data: Vec<u8>,
446}
447
448#[derive(Debug)]
450struct BlockDecoder {
451 changeset_id: Option<ChangesetId>,
453 k_source: u32,
455 symbol_size: u32,
457 seed: u64,
459 symbols: HashMap<u32, Vec<u8>>,
461 received_isis: HashSet<u32>,
463 decoded: bool,
465}
466
467impl BlockDecoder {
468 fn new() -> Self {
469 Self {
470 changeset_id: None,
471 k_source: 0,
472 symbol_size: 0,
473 seed: 0,
474 symbols: HashMap::new(),
475 received_isis: HashSet::new(),
476 decoded: false,
477 }
478 }
479
480 fn initialize(&mut self, changeset_id: ChangesetId, k_source: u32, symbol_size: u32) {
481 self.changeset_id = Some(changeset_id);
482 self.k_source = k_source;
483 self.symbol_size = symbol_size;
484 self.seed = derive_seed_from_changeset_id(&changeset_id);
485 }
486
487 fn add_symbol(&mut self, isi: u32, data: Vec<u8>) -> bool {
488 if self.received_isis.insert(isi) {
489 self.symbols.insert(isi, data);
490 true
491 } else {
492 false
493 }
494 }
495
496 fn received_count(&self) -> u32 {
497 u32::try_from(self.received_isis.len()).unwrap_or(u32::MAX)
498 }
499
500 fn ready_to_decode(&self) -> bool {
501 self.received_count() >= self.k_source && self.k_source > 0
502 }
503
504 fn try_decode(&self) -> Option<Vec<u8>> {
505 if !self.ready_to_decode() {
506 return None;
507 }
508 let source_count = self
509 .symbols
510 .keys()
511 .filter(|&&isi| isi < self.k_source)
512 .count();
513 let k = self.k_source as usize;
514 let t = self.symbol_size as usize;
515 if source_count >= k {
516 let padded_len = k * t;
517 let mut padded = vec![0_u8; padded_len];
518 for isi in 0..self.k_source {
519 if let Some(data) = self.symbols.get(&isi) {
520 let start = isi as usize * t;
521 let copy_len = data.len().min(t);
522 padded[start..start + copy_len].copy_from_slice(&data[..copy_len]);
523 }
524 }
525 Some(padded)
526 } else {
527 warn!(
528 bead_id = BEAD_ID,
529 source_count,
530 k_source = self.k_source,
531 "snapshot block decode needs repair symbols (production RaptorQ)"
532 );
533 None
534 }
535 }
536}
537
538#[derive(Debug)]
540pub struct SnapshotReceiver {
541 state: SnapshotReceiverState,
542 changeset_to_block: HashMap<ChangesetId, usize>,
544 block_decoders: Vec<BlockDecoder>,
546 num_blocks: usize,
548 decoded_blocks: Vec<DecodedBlock>,
550 resume: ResumeState,
552 page_size: u32,
554}
555
556impl SnapshotReceiver {
557 #[must_use]
562 pub fn new(num_blocks: usize, page_size: u32) -> Self {
563 let block_decoders = (0..num_blocks).map(|_| BlockDecoder::new()).collect();
564 Self {
565 state: SnapshotReceiverState::Waiting,
566 changeset_to_block: HashMap::new(),
567 block_decoders,
568 num_blocks,
569 decoded_blocks: Vec::new(),
570 resume: ResumeState::new(u32::try_from(num_blocks).unwrap_or(u32::MAX)),
571 page_size,
572 }
573 }
574
575 #[must_use]
577 pub fn from_resume(resume: ResumeState, page_size: u32) -> Self {
578 let num_blocks = resume.total_blocks as usize;
579 let block_decoders = (0..num_blocks).map(|_| BlockDecoder::new()).collect();
580 Self {
581 state: if resume.all_decoded() {
582 SnapshotReceiverState::Complete
583 } else {
584 SnapshotReceiverState::Waiting
585 },
586 changeset_to_block: HashMap::new(),
587 block_decoders,
588 num_blocks,
589 decoded_blocks: Vec::new(),
590 resume,
591 page_size,
592 }
593 }
594
595 #[must_use]
597 pub const fn state(&self) -> SnapshotReceiverState {
598 self.state
599 }
600
601 #[must_use]
603 pub fn blocks_decoded(&self) -> usize {
604 self.decoded_blocks.len()
605 }
606
607 #[must_use]
609 pub fn resume_state(&self) -> &ResumeState {
610 &self.resume
611 }
612
613 pub fn take_decoded_blocks(&mut self) -> Vec<DecodedBlock> {
615 std::mem::take(&mut self.decoded_blocks)
616 }
617
618 #[allow(clippy::too_many_lines)]
627 pub fn process_packet(&mut self, packet: &ReplicationPacket) -> Result<SnapshotPacketResult> {
628 if self.state == SnapshotReceiverState::Complete {
629 return Ok(SnapshotPacketResult::AlreadyComplete);
630 }
631
632 if packet.sbn != 0 {
634 return Err(FrankenError::Internal(format!(
635 "V1: SBN must be 0, got {}",
636 packet.sbn
637 )));
638 }
639 if packet.k_source == 0 || packet.k_source > K_MAX {
640 return Err(FrankenError::OutOfRange {
641 what: "k_source".to_owned(),
642 value: packet.k_source.to_string(),
643 });
644 }
645 let symbol_size =
646 u32::try_from(packet.symbol_data.len()).map_err(|_| FrankenError::OutOfRange {
647 what: "symbol_size".to_owned(),
648 value: packet.symbol_data.len().to_string(),
649 })?;
650 if symbol_size == 0 {
651 return Err(FrankenError::OutOfRange {
652 what: "symbol_size".to_owned(),
653 value: "0".to_owned(),
654 });
655 }
656
657 if self.state == SnapshotReceiverState::Waiting {
658 self.state = SnapshotReceiverState::Receiving;
659 info!(bead_id = BEAD_ID, "snapshot receiving started");
660 }
661
662 let changeset_id = packet.changeset_id;
663
664 let block_idx = if let Some(&idx) = self.changeset_to_block.get(&changeset_id) {
666 idx
667 } else {
668 let next_idx = self
670 .block_decoders
671 .iter()
672 .position(|d| d.changeset_id.is_none() && !d.decoded);
673 if let Some(idx) = next_idx {
674 self.changeset_to_block.insert(changeset_id, idx);
675 self.block_decoders[idx].initialize(changeset_id, packet.k_source, symbol_size);
676 debug!(
677 bead_id = BEAD_ID,
678 block_index = idx,
679 k_source = packet.k_source,
680 "mapped new changeset to block"
681 );
682 idx
683 } else {
684 warn!(
685 bead_id = BEAD_ID,
686 "no available block slot for new changeset_id"
687 );
688 return Ok(SnapshotPacketResult::Rejected);
689 }
690 };
691
692 if block_idx >= self.block_decoders.len() {
693 return Ok(SnapshotPacketResult::Rejected);
694 }
695
696 let decoder = &mut self.block_decoders[block_idx];
697 if decoder.decoded {
698 return Ok(SnapshotPacketResult::BlockAlreadyDecoded);
699 }
700
701 if decoder.k_source != packet.k_source {
703 return Err(FrankenError::DatabaseCorrupt {
704 detail: format!(
705 "k_source mismatch for block {block_idx}: {} vs {}",
706 decoder.k_source, packet.k_source
707 ),
708 });
709 }
710 if decoder.symbol_size != symbol_size {
711 return Err(FrankenError::DatabaseCorrupt {
712 detail: format!(
713 "symbol_size mismatch for block {block_idx}: {} vs {symbol_size}",
714 decoder.symbol_size
715 ),
716 });
717 }
718
719 let accepted = decoder.add_symbol(packet.esi, packet.symbol_data.clone());
721 if !accepted {
722 return Ok(SnapshotPacketResult::Duplicate);
723 }
724
725 if block_idx < self.resume.blocks.len() {
727 self.resume.blocks[block_idx].record_isi(packet.esi);
728 }
729
730 if decoder.ready_to_decode() && !decoder.decoded {
732 if let Some(padded) = decoder.try_decode() {
733 match parse_decoded_snapshot_block(&padded, self.page_size) {
734 Ok(pages) => {
735 let block_id = u32::try_from(block_idx).unwrap_or(u32::MAX);
736 decoder.decoded = true;
737 if block_idx < self.resume.blocks.len() {
738 self.resume.blocks[block_idx].decoded = true;
739 }
740 let n_pages = pages.len();
741 self.decoded_blocks.push(DecodedBlock {
742 block_index: block_id,
743 pages,
744 });
745 info!(
746 bead_id = BEAD_ID,
747 block_index = block_idx,
748 n_pages,
749 decoded_so_far = self.decoded_blocks.len(),
750 total_blocks = self.num_blocks,
751 "source block decoded (progressive)"
752 );
753
754 if self.block_decoders.iter().all(|d| d.decoded) {
756 self.state = SnapshotReceiverState::Complete;
757 info!(
758 bead_id = BEAD_ID,
759 total_blocks = self.num_blocks,
760 "snapshot fully received"
761 );
762 }
763 return Ok(SnapshotPacketResult::BlockDecoded(block_id));
764 }
765 Err(e) => {
766 error!(
767 bead_id = BEAD_ID,
768 block_index = block_idx,
769 error = %e,
770 "snapshot block validation failed"
771 );
772 return Err(e);
773 }
774 }
775 }
776 }
777
778 Ok(SnapshotPacketResult::Accepted)
779 }
780}
781
782#[derive(Debug, Clone, Copy, PartialEq, Eq)]
784pub enum SnapshotPacketResult {
785 Accepted,
787 Duplicate,
789 BlockDecoded(u32),
791 BlockAlreadyDecoded,
793 Rejected,
795 AlreadyComplete,
797}
798
799fn parse_decoded_snapshot_block(
805 padded_bytes: &[u8],
806 _page_size: u32,
807) -> Result<Vec<DecodedBlockPage>> {
808 use crate::replication_sender::ChangesetHeader;
809
810 if padded_bytes.len() < CHANGESET_HEADER_SIZE {
811 return Err(FrankenError::DatabaseCorrupt {
812 detail: format!(
813 "decoded block too short for header: {} < {CHANGESET_HEADER_SIZE}",
814 padded_bytes.len()
815 ),
816 });
817 }
818
819 let header_bytes: [u8; CHANGESET_HEADER_SIZE] = padded_bytes[..CHANGESET_HEADER_SIZE]
820 .try_into()
821 .expect("checked length");
822 let header = ChangesetHeader::from_bytes(&header_bytes)?;
823
824 let total_len = usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
825 what: "total_len".to_owned(),
826 value: header.total_len.to_string(),
827 })?;
828 if total_len > padded_bytes.len() {
829 return Err(FrankenError::DatabaseCorrupt {
830 detail: format!(
831 "total_len ({total_len}) exceeds decoded bytes ({})",
832 padded_bytes.len()
833 ),
834 });
835 }
836 let changeset_bytes = &padded_bytes[..total_len];
837
838 let entry_size = 4_usize + 8 + header.page_size as usize;
839 let data_bytes = &changeset_bytes[CHANGESET_HEADER_SIZE..];
840
841 let required_data_len = (header.n_pages as usize)
842 .checked_mul(entry_size)
843 .ok_or_else(|| FrankenError::DatabaseCorrupt {
844 detail: "n_pages causes size overflow".to_owned(),
845 })?;
846
847 if data_bytes.len() < required_data_len {
848 return Err(FrankenError::DatabaseCorrupt {
849 detail: format!(
850 "changeset truncated: expected {} data bytes, got {}",
851 required_data_len,
852 data_bytes.len()
853 ),
854 });
855 }
856
857 let mut pages = Vec::with_capacity(header.n_pages as usize);
858 for i in 0..header.n_pages as usize {
859 let offset = i * entry_size;
860 let page_number =
861 u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
862 let page_xxh3 = u64::from_le_bytes(
863 data_bytes[offset + 4..offset + 12]
864 .try_into()
865 .expect("8 bytes"),
866 );
867 let page_data = data_bytes[offset + 12..offset + 12 + header.page_size as usize].to_vec();
868
869 let computed_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_data);
870 if computed_xxh3 != page_xxh3 {
871 error!(
872 bead_id = BEAD_ID,
873 page_number,
874 expected_xxh3 = page_xxh3,
875 computed_xxh3,
876 "snapshot page xxh3 mismatch"
877 );
878 return Err(FrankenError::DatabaseCorrupt {
879 detail: format!(
880 "snapshot page {page_number} xxh3 mismatch: {page_xxh3:#x} vs {computed_xxh3:#x}"
881 ),
882 });
883 }
884
885 pages.push(DecodedBlockPage {
886 page_number,
887 page_data,
888 });
889 }
890
891 Ok(pages)
892}
893
894#[cfg(test)]
895mod tests {
896 use super::*;
897 use crate::replication_sender::PageEntry;
898
899 const TEST_BEAD_ID: &str = "bd-1hi.15";
900
901 #[allow(clippy::cast_possible_truncation)]
902 fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
903 page_numbers
904 .iter()
905 .map(|&pn| {
906 let mut data = vec![0_u8; page_size as usize];
907 for (i, byte) in data.iter_mut().enumerate() {
908 *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
909 }
910 PageEntry::new(pn, data)
911 })
912 .collect()
913 }
914
915 #[test]
920 fn test_resume_state_persistence() {
921 let mut resume = ResumeState::new(3);
922 resume.blocks[0].record_isi(0);
923 resume.blocks[0].record_isi(5);
924 resume.blocks[0].record_isi(10);
925 resume.blocks[1].decoded = true;
926
927 let bytes = resume.to_bytes();
928 let restored = ResumeState::from_bytes(&bytes).expect("deserialize");
929
930 assert_eq!(
931 restored.total_blocks, 3,
932 "bead_id={TEST_BEAD_ID} case=resume_total_blocks"
933 );
934 assert_eq!(
935 restored.blocks[0].num_received, 3,
936 "bead_id={TEST_BEAD_ID} case=resume_block0_received"
937 );
938 assert!(
939 restored.blocks[0].received_isis.contains(&5),
940 "bead_id={TEST_BEAD_ID} case=resume_block0_isi_5"
941 );
942 assert!(
943 restored.blocks[1].decoded,
944 "bead_id={TEST_BEAD_ID} case=resume_block1_decoded"
945 );
946 assert!(
947 !restored.blocks[2].decoded,
948 "bead_id={TEST_BEAD_ID} case=resume_block2_not_decoded"
949 );
950 }
951
952 #[test]
953 fn test_resume_no_protocol_negotiation() {
954 let mut resume = ResumeState::new(2);
956 resume.blocks[0].record_isi(0);
957 resume.blocks[0].record_isi(1);
958
959 let bytes = resume.to_bytes();
961 let restored = ResumeState::from_bytes(&bytes).expect("deserialize");
962 assert_eq!(
963 restored.blocks[0].num_received, 2,
964 "bead_id={TEST_BEAD_ID} case=resume_no_negotiation"
965 );
966 assert!(!restored.all_decoded());
967 }
968
969 #[test]
974 fn test_snapshot_single_block() {
975 let page_size = 256_u32;
976 let page_numbers: Vec<u32> = (1..=10).collect();
977 let mut pages = make_pages(page_size, &page_numbers);
978
979 let config = SenderConfig {
980 symbol_size: 256,
981 max_isi_multiplier: 1,
982 };
983 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
984 assert_eq!(
985 sender.num_blocks(),
986 1,
987 "bead_id={TEST_BEAD_ID} case=single_block"
988 );
989
990 let mut packets = Vec::new();
992 while let Some(pkt) = sender.next_packet() {
993 packets.push(pkt);
994 }
995 assert!(
996 !packets.is_empty(),
997 "bead_id={TEST_BEAD_ID} case=has_packets"
998 );
999
1000 let mut receiver = SnapshotReceiver::new(1, page_size);
1002 for pkt in &packets {
1003 let _ = receiver.process_packet(pkt);
1004 }
1005
1006 assert_eq!(
1007 receiver.state(),
1008 SnapshotReceiverState::Complete,
1009 "bead_id={TEST_BEAD_ID} case=single_block_complete"
1010 );
1011
1012 let blocks = receiver.take_decoded_blocks();
1013 assert_eq!(blocks.len(), 1);
1014 assert_eq!(blocks[0].pages.len(), 10);
1015 }
1016
1017 #[test]
1018 fn test_snapshot_multi_block_small() {
1019 let page_size = 64_u32;
1023 let n_pages = 200_u32;
1024 let page_numbers: Vec<u32> = (1..=n_pages).collect();
1025 let mut pages = make_pages(page_size, &page_numbers);
1026
1027 let config = SenderConfig {
1028 symbol_size: 64,
1029 max_isi_multiplier: 1,
1030 };
1031 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1032
1033 assert_eq!(
1035 sender.num_blocks(),
1036 1,
1037 "bead_id={TEST_BEAD_ID} case=multi_block_small_count"
1038 );
1039
1040 let mut packets = Vec::new();
1041 while let Some(pkt) = sender.next_packet() {
1042 packets.push(pkt);
1043 }
1044
1045 let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1046 for pkt in &packets {
1047 let _ = receiver.process_packet(pkt);
1048 }
1049
1050 assert_eq!(
1051 receiver.state(),
1052 SnapshotReceiverState::Complete,
1053 "bead_id={TEST_BEAD_ID} case=multi_block_small_complete"
1054 );
1055
1056 let blocks = receiver.take_decoded_blocks();
1057 let total_pages: usize = blocks.iter().map(|b| b.pages.len()).sum();
1058 assert_eq!(
1059 total_pages, n_pages as usize,
1060 "bead_id={TEST_BEAD_ID} case=multi_block_all_pages"
1061 );
1062 }
1063
1064 #[test]
1065 fn test_duplicate_isi_discarded() {
1066 let page_size = 128_u32;
1067 let mut pages = make_pages(page_size, &[1, 2, 3]);
1068 let config = SenderConfig {
1069 symbol_size: 128,
1070 max_isi_multiplier: 1,
1071 };
1072 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1073
1074 let mut packets = Vec::new();
1075 while let Some(pkt) = sender.next_packet() {
1076 packets.push(pkt);
1077 }
1078
1079 let mut receiver = SnapshotReceiver::new(1, page_size);
1080
1081 let r1 = receiver.process_packet(&packets[0]).expect("first");
1083 assert_ne!(
1084 r1,
1085 SnapshotPacketResult::Duplicate,
1086 "bead_id={TEST_BEAD_ID} case=first_not_dup"
1087 );
1088 let r2 = receiver.process_packet(&packets[0]).expect("duplicate");
1089 assert_eq!(
1090 r2,
1091 SnapshotPacketResult::Duplicate,
1092 "bead_id={TEST_BEAD_ID} case=dup_discarded"
1093 );
1094 }
1095
1096 #[test]
1097 fn test_snapshot_progressive_receive() {
1098 let page_size = 128_u32;
1102 let mut pages = make_pages(page_size, &[1, 2, 3, 4, 5]);
1103 let config = SenderConfig {
1104 symbol_size: 128,
1105 max_isi_multiplier: 1,
1106 };
1107 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1108
1109 let mut packets = Vec::new();
1110 while let Some(pkt) = sender.next_packet() {
1111 packets.push(pkt);
1112 }
1113
1114 let mut receiver = SnapshotReceiver::new(1, page_size);
1115 let mut block_decoded_at = None;
1116
1117 for (i, pkt) in packets.iter().enumerate() {
1118 if let Ok(SnapshotPacketResult::BlockDecoded(_)) = receiver.process_packet(pkt) {
1119 block_decoded_at = Some(i);
1120 break;
1121 }
1122 }
1123
1124 assert!(
1125 block_decoded_at.is_some(),
1126 "bead_id={TEST_BEAD_ID} case=progressive_block_decoded"
1127 );
1128
1129 let blocks = receiver.take_decoded_blocks();
1131 assert!(
1132 !blocks.is_empty(),
1133 "bead_id={TEST_BEAD_ID} case=progressive_has_pages"
1134 );
1135 }
1136
1137 #[test]
1142 fn test_e2e_sender_receiver_roundtrip() {
1143 let page_size = 512_u32;
1144 let n_pages = 50_u32;
1145 let page_numbers: Vec<u32> = (1..=n_pages).collect();
1146 let original_pages = make_pages(page_size, &page_numbers);
1147 let mut pages = original_pages.clone();
1148
1149 let config = SenderConfig {
1150 symbol_size: 512,
1151 max_isi_multiplier: 1,
1152 };
1153 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1154
1155 let mut packets = Vec::new();
1156 while let Some(pkt) = sender.next_packet() {
1157 packets.push(pkt);
1158 }
1159
1160 let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1161 for pkt in &packets {
1162 let _ = receiver.process_packet(pkt);
1163 }
1164
1165 assert_eq!(
1166 receiver.state(),
1167 SnapshotReceiverState::Complete,
1168 "bead_id={TEST_BEAD_ID} case=e2e_roundtrip_complete"
1169 );
1170
1171 let blocks = receiver.take_decoded_blocks();
1172 let mut all_decoded_pages: Vec<&DecodedBlockPage> =
1173 blocks.iter().flat_map(|b| b.pages.iter()).collect();
1174 all_decoded_pages.sort_by_key(|p| p.page_number);
1175
1176 assert_eq!(
1177 all_decoded_pages.len(),
1178 original_pages.len(),
1179 "bead_id={TEST_BEAD_ID} case=e2e_page_count"
1180 );
1181
1182 for (decoded, original) in all_decoded_pages.iter().zip(original_pages.iter()) {
1183 assert_eq!(
1184 decoded.page_number, original.page_number,
1185 "bead_id={TEST_BEAD_ID} case=e2e_page_number"
1186 );
1187 assert_eq!(
1188 decoded.page_data, original.page_bytes,
1189 "bead_id={TEST_BEAD_ID} case=e2e_page_data pn={}",
1190 original.page_number
1191 );
1192 }
1193 }
1194
1195 #[test]
1196 fn test_e2e_resume_after_partial() {
1197 let page_size = 128_u32;
1198 let n_pages = 20_u32;
1199 let mut pages = make_pages(page_size, &(1..=n_pages).collect::<Vec<_>>());
1200
1201 let config = SenderConfig {
1202 symbol_size: 128,
1203 max_isi_multiplier: 1,
1204 };
1205 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1206
1207 let mut packets = Vec::new();
1208 while let Some(pkt) = sender.next_packet() {
1209 packets.push(pkt);
1210 }
1211
1212 let half = packets.len() / 2;
1214 let mut receiver1 = SnapshotReceiver::new(sender.num_blocks(), page_size);
1215 for pkt in &packets[..half] {
1216 let _ = receiver1.process_packet(pkt);
1217 }
1218
1219 let resume_bytes = receiver1.resume_state().to_bytes();
1221
1222 let resume = ResumeState::from_bytes(&resume_bytes).expect("restore");
1224 let mut receiver2 = SnapshotReceiver::from_resume(resume, page_size);
1225
1226 for pkt in &packets {
1228 let _ = receiver2.process_packet(pkt);
1229 }
1230
1231 assert_eq!(
1233 receiver2.state(),
1234 SnapshotReceiverState::Complete,
1235 "bead_id={TEST_BEAD_ID} case=e2e_resume_complete"
1236 );
1237 }
1238
1239 #[test]
1240 fn test_e2e_bd_1hi_15_compliance() {
1241 let page_size = 256_u32;
1243 let n_pages = 30_u32;
1244 let original_pages = make_pages(page_size, &(1..=n_pages).collect::<Vec<_>>());
1245 let mut pages = original_pages;
1246
1247 let config = SenderConfig {
1248 symbol_size: 256,
1249 max_isi_multiplier: 1,
1250 };
1251 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1252
1253 assert!(
1255 sender.num_blocks() >= 1,
1256 "bead_id={TEST_BEAD_ID} case=compliance_has_blocks"
1257 );
1258 assert!(
1259 sender.total_source_symbols() > 0,
1260 "bead_id={TEST_BEAD_ID} case=compliance_has_symbols"
1261 );
1262
1263 let mut packets = Vec::new();
1264 while let Some(pkt) = sender.next_packet() {
1265 packets.push(pkt);
1266 }
1267
1268 let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1269 assert_eq!(receiver.state(), SnapshotReceiverState::Waiting);
1270
1271 for pkt in &packets {
1272 let _ = receiver.process_packet(pkt);
1273 }
1274 assert_eq!(receiver.state(), SnapshotReceiverState::Complete);
1275
1276 let blocks = receiver.take_decoded_blocks();
1277 let total_decoded: usize = blocks.iter().map(|b| b.pages.len()).sum();
1278 assert_eq!(
1279 total_decoded, n_pages as usize,
1280 "bead_id={TEST_BEAD_ID} case=compliance_all_pages_decoded"
1281 );
1282
1283 assert!(
1285 receiver.resume_state().all_decoded(),
1286 "bead_id={TEST_BEAD_ID} case=compliance_resume_all_decoded"
1287 );
1288 }
1289
1290 #[test]
1295 fn prop_partition_covers_all_pages() {
1296 for p in [1_u32, 10, 100, 1000, 56_403, 56_404, 100_000] {
1297 let blocks = partition_source_blocks(p).expect("partition");
1298 let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
1299 assert_eq!(
1300 total, p,
1301 "bead_id={TEST_BEAD_ID} case=prop_partition_covers p={p}"
1302 );
1303 }
1304 }
1305
1306 #[test]
1307 fn prop_partition_block_sizes_valid() {
1308 for p in [1_u32, 56_403, 56_404, 200_000] {
1309 let blocks = partition_source_blocks(p).expect("partition");
1310 for block in &blocks {
1311 assert!(
1312 block.num_pages <= K_MAX,
1313 "bead_id={TEST_BEAD_ID} case=prop_block_size p={p} block={} num_pages={}",
1314 block.index,
1315 block.num_pages
1316 );
1317 }
1318 }
1319 }
1320
1321 #[test]
1326 fn test_bd_1hi_15_unit_compliance_gate() {
1327 let _ = SnapshotReceiverState::Waiting;
1329 let _ = SnapshotReceiverState::Receiving;
1330 let _ = SnapshotReceiverState::Complete;
1331
1332 let _ = SnapshotPacketResult::Accepted;
1333 let _ = SnapshotPacketResult::Duplicate;
1334 let _ = SnapshotPacketResult::Rejected;
1335 let _ = SnapshotPacketResult::AlreadyComplete;
1336
1337 let resume = ResumeState::new(3);
1338 assert_eq!(resume.total_blocks, 3);
1339 assert!(!resume.all_decoded());
1340 assert_eq!(resume.decoded_count(), 0);
1341
1342 let block = BlockResumeState::new(0);
1344 let bytes = block.to_bytes();
1345 let (restored, _) = BlockResumeState::from_bytes(&bytes).expect("deser");
1346 assert_eq!(restored.block_id, 0);
1347 }
1348
1349 #[test]
1350 fn prop_bd_1hi_15_structure_compliance() {
1351 let page_size = 128_u32;
1353 let mut pages = make_pages(page_size, &[1, 2]);
1354 let config = SenderConfig {
1355 symbol_size: 128,
1356 max_isi_multiplier: 1,
1357 };
1358 let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1359 assert!(sender.num_blocks() >= 1);
1360
1361 let mut packets = Vec::new();
1362 while let Some(pkt) = sender.next_packet() {
1363 packets.push(pkt);
1364 }
1365
1366 let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1367 for pkt in &packets {
1368 let _ = receiver.process_packet(pkt);
1369 }
1370 assert_eq!(receiver.state(), SnapshotReceiverState::Complete);
1371 }
1372}