Skip to main content

fsqlite_core/
snapshot_shipping.rs

1//! §3.4.3 Fountain-Coded Snapshot Shipping (bd-1hi.15).
2//!
3//! Implements snapshot transfer for initializing new replicas using
4//! fountain coding. The entire database is partitioned into source blocks
5//! and streamed as rateless-coded symbols over UDP.
6//!
7//! Key advantages:
8//! - No handshake or acknowledgment needed
9//! - Receiver can start receiving from any point in the stream
10//! - Inherently resumable with zero protocol overhead
11//! - Natural multicast: initialize many replicas simultaneously
12//! - Progressive receive: partial queries after first block decoded
13
14use std::collections::{HashMap, HashSet};
15
16use fsqlite_error::{FrankenError, Result};
17use tracing::{debug, error, info, warn};
18
19use crate::replication_sender::{
20    CHANGESET_HEADER_SIZE, ChangesetId, PageEntry, ReplicationPacket, ReplicationPacketV2Header,
21    SenderConfig, compute_changeset_id, derive_seed_from_changeset_id, encode_changeset,
22};
23use crate::source_block_partition::{K_MAX, SourceBlock, partition_source_blocks};
24
25const BEAD_ID: &str = "bd-1hi.15";
26
27// ---------------------------------------------------------------------------
28// Resume State (persistent across connection losses)
29// ---------------------------------------------------------------------------
30
31/// Per-block resume state: tracks which ISIs have been received.
32#[derive(Debug, Clone)]
33pub struct BlockResumeState {
34    /// Source block index (SBN).
35    pub block_id: u32,
36    /// Number of unique symbols received.
37    pub num_received: u32,
38    /// Set of received ISIs (for O(1) dedup).
39    pub received_isis: HashSet<u32>,
40    /// Whether this block has been fully decoded.
41    pub decoded: bool,
42}
43
44impl BlockResumeState {
45    /// Create a new empty resume state for a block.
46    #[must_use]
47    fn new(block_id: u32) -> Self {
48        Self {
49            block_id,
50            num_received: 0,
51            received_isis: HashSet::new(),
52            decoded: false,
53        }
54    }
55
56    /// Record a received ISI. Returns true if new (accepted).
57    fn record_isi(&mut self, isi: u32) -> bool {
58        if self.received_isis.insert(isi) {
59            self.num_received += 1;
60            true
61        } else {
62            false
63        }
64    }
65
66    /// Serialize to a compact binary format for persistence.
67    ///
68    /// Format: `block_id(4 LE) | num_received(4 LE) | decoded(1) | n_isis(4 LE) | isis(4 LE each)`
69    #[must_use]
70    pub fn to_bytes(&self) -> Vec<u8> {
71        let n = self.received_isis.len();
72        let mut buf = Vec::with_capacity(13 + n * 4);
73        buf.extend_from_slice(&self.block_id.to_le_bytes());
74        buf.extend_from_slice(&self.num_received.to_le_bytes());
75        buf.push(u8::from(self.decoded));
76        let n_u32 = u32::try_from(n).unwrap_or(u32::MAX);
77        buf.extend_from_slice(&n_u32.to_le_bytes());
78        let mut sorted_isis: Vec<u32> = self.received_isis.iter().copied().collect();
79        sorted_isis.sort_unstable();
80        for isi in sorted_isis {
81            buf.extend_from_slice(&isi.to_le_bytes());
82        }
83        buf
84    }
85
86    /// Deserialize from bytes.
87    ///
88    /// # Errors
89    ///
90    /// Returns error if buffer is too short or malformed.
91    pub fn from_bytes(buf: &[u8]) -> Result<(Self, usize)> {
92        if buf.len() < 13 {
93            return Err(FrankenError::DatabaseCorrupt {
94                detail: format!("BlockResumeState too short: {} < 13", buf.len()),
95            });
96        }
97        let block_id = u32::from_le_bytes(buf[0..4].try_into().expect("4 bytes"));
98        let num_received = u32::from_le_bytes(buf[4..8].try_into().expect("4 bytes"));
99        let decoded = buf[8] != 0;
100        let n_isis = u32::from_le_bytes(buf[9..13].try_into().expect("4 bytes"));
101        let n = n_isis as usize;
102        let expected = n
103            .checked_mul(4)
104            .and_then(|v| v.checked_add(13))
105            .ok_or_else(|| FrankenError::DatabaseCorrupt {
106                detail: format!("BlockResumeState n_isis ({n_isis}) causes size overflow"),
107            })?;
108        if buf.len() < expected {
109            return Err(FrankenError::DatabaseCorrupt {
110                detail: format!("BlockResumeState truncated: {} < {expected}", buf.len()),
111            });
112        }
113        let mut received_isis = HashSet::with_capacity(n);
114        for i in 0..n {
115            let offset = 13 + i * 4;
116            let isi = u32::from_le_bytes(buf[offset..offset + 4].try_into().expect("4 bytes"));
117            received_isis.insert(isi);
118        }
119        Ok((
120            Self {
121                block_id,
122                num_received,
123                received_isis,
124                decoded,
125            },
126            expected,
127        ))
128    }
129}
130
131/// Full resume state for a snapshot transfer.
132#[derive(Debug, Clone)]
133pub struct ResumeState {
134    /// Per-block resume states.
135    pub blocks: Vec<BlockResumeState>,
136    /// Total number of source blocks expected.
137    pub total_blocks: u32,
138}
139
140impl ResumeState {
141    /// Create a new resume state for a snapshot with `total_blocks` blocks.
142    #[must_use]
143    pub fn new(total_blocks: u32) -> Self {
144        let blocks = (0..total_blocks).map(BlockResumeState::new).collect();
145        Self {
146            blocks,
147            total_blocks,
148        }
149    }
150
151    /// Number of blocks fully decoded.
152    #[must_use]
153    pub fn decoded_count(&self) -> u32 {
154        u32::try_from(self.blocks.iter().filter(|b| b.decoded).count()).unwrap_or(u32::MAX)
155    }
156
157    /// Whether all blocks are decoded.
158    #[must_use]
159    pub fn all_decoded(&self) -> bool {
160        self.blocks.iter().all(|b| b.decoded)
161    }
162
163    /// Serialize to bytes.
164    #[must_use]
165    pub fn to_bytes(&self) -> Vec<u8> {
166        let mut buf = Vec::new();
167        buf.extend_from_slice(&self.total_blocks.to_le_bytes());
168        for block in &self.blocks {
169            buf.extend_from_slice(&block.to_bytes());
170        }
171        buf
172    }
173
174    /// Deserialize from bytes.
175    ///
176    /// # Errors
177    ///
178    /// Returns error if buffer is malformed.
179    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
180        if buf.len() < 4 {
181            return Err(FrankenError::DatabaseCorrupt {
182                detail: format!("ResumeState too short: {} < 4", buf.len()),
183            });
184        }
185        let total_blocks = u32::from_le_bytes(buf[0..4].try_into().expect("4 bytes"));
186        let mut blocks = Vec::with_capacity(total_blocks as usize);
187        let mut offset = 4;
188        for _ in 0..total_blocks {
189            let (block, consumed) = BlockResumeState::from_bytes(&buf[offset..])?;
190            blocks.push(block);
191            offset += consumed;
192        }
193        Ok(Self {
194            blocks,
195            total_blocks,
196        })
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Snapshot Sender
202// ---------------------------------------------------------------------------
203
204/// Snapshot sender: partitions a database into source blocks and streams symbols.
205#[derive(Debug)]
206pub struct SnapshotSender {
207    /// Source blocks from the partition algorithm.
208    pub source_blocks: Vec<SourceBlock>,
209    /// Page size of the database.
210    pub page_size: u32,
211    /// Current block being streamed.
212    current_block: usize,
213    /// Current ISI within the current block.
214    current_isi: u32,
215    /// Per-block changeset IDs (computed during prepare).
216    block_changeset_ids: Vec<ChangesetId>,
217    /// Per-block K_source values.
218    block_k_sources: Vec<u32>,
219    /// Per-block changeset bytes.
220    block_changesets: Vec<Vec<u8>>,
221    /// Sender config.
222    config: SenderConfig,
223    /// Whether we're done.
224    done: bool,
225}
226
227impl SnapshotSender {
228    /// Prepare a snapshot sender for the given database pages.
229    ///
230    /// `all_pages` must be sorted by page number and cover the entire database.
231    ///
232    /// # Errors
233    ///
234    /// Returns error if partitioning fails or pages are empty.
235    #[allow(clippy::too_many_lines)]
236    pub fn prepare(
237        page_size: u32,
238        all_pages: &mut [PageEntry],
239        config: SenderConfig,
240    ) -> Result<Self> {
241        if all_pages.is_empty() {
242            return Err(FrankenError::OutOfRange {
243                what: "pages".to_owned(),
244                value: "0".to_owned(),
245            });
246        }
247
248        let total_pages = u32::try_from(all_pages.len()).map_err(|_| FrankenError::OutOfRange {
249            what: "total_pages".to_owned(),
250            value: all_pages.len().to_string(),
251        })?;
252
253        let source_blocks = partition_source_blocks(total_pages)?;
254        info!(
255            bead_id = BEAD_ID,
256            total_pages,
257            n_blocks = source_blocks.len(),
258            page_size,
259            "snapshot partitioned into source blocks"
260        );
261
262        // Sort all pages by page_number.
263        all_pages.sort_by_key(|p| p.page_number);
264
265        // Build per-block changesets.
266        let mut block_changeset_ids = Vec::with_capacity(source_blocks.len());
267        let mut block_k_sources = Vec::with_capacity(source_blocks.len());
268        let mut block_changesets = Vec::with_capacity(source_blocks.len());
269
270        let mut page_idx = 0_usize;
271        for block in &source_blocks {
272            let end = page_idx + block.num_pages as usize;
273            if end > all_pages.len() {
274                return Err(FrankenError::Internal(format!(
275                    "block {} requires pages up to index {end}, but only {} available",
276                    block.index,
277                    all_pages.len()
278                )));
279            }
280            let block_pages = &mut all_pages[page_idx..end];
281            let changeset_bytes = encode_changeset(page_size, block_pages)?;
282            let changeset_id = compute_changeset_id(&changeset_bytes);
283
284            // Compute K_source from changeset + symbol_size.
285            let t = u64::from(config.symbol_size);
286            let f = changeset_bytes.len() as u64;
287            let k_source = u32::try_from(f.div_ceil(t)).map_err(|_| FrankenError::OutOfRange {
288                what: "k_source".to_owned(),
289                value: f.div_ceil(t).to_string(),
290            })?;
291
292            debug!(
293                bead_id = BEAD_ID,
294                block_index = block.index,
295                num_pages = block.num_pages,
296                changeset_len = changeset_bytes.len(),
297                k_source,
298                "prepared block changeset"
299            );
300
301            block_changeset_ids.push(changeset_id);
302            block_k_sources.push(k_source);
303            block_changesets.push(changeset_bytes);
304            page_idx = end;
305        }
306
307        Ok(Self {
308            source_blocks,
309            page_size,
310            current_block: 0,
311            current_isi: 0,
312            block_changeset_ids,
313            block_k_sources,
314            block_changesets,
315            config,
316            done: false,
317        })
318    }
319
320    /// Generate the next snapshot packet.
321    ///
322    /// Returns `None` when the current streaming pass is complete.
323    /// Caller can restart from block 0 for continuous streaming.
324    pub fn next_packet(&mut self) -> Option<ReplicationPacket> {
325        if self.done || self.current_block >= self.source_blocks.len() {
326            self.done = true;
327            return None;
328        }
329
330        let k_source = self.block_k_sources[self.current_block];
331        let max_isi = k_source.saturating_mul(self.config.max_isi_multiplier);
332
333        if self.current_isi >= max_isi {
334            self.current_block += 1;
335            self.current_isi = 0;
336            if self.current_block >= self.source_blocks.len() {
337                self.done = true;
338                return None;
339            }
340        }
341
342        let changeset = &self.block_changesets[self.current_block];
343        let changeset_id = self.block_changeset_ids[self.current_block];
344        let k_source = self.block_k_sources[self.current_block];
345        let isi = self.current_isi;
346        let t = usize::from(self.config.symbol_size);
347
348        // Extract or generate symbol data.
349        let symbol_data = if u64::from(isi) < u64::from(k_source) {
350            let start = isi as usize * t;
351            let end = (start + t).min(changeset.len());
352            let mut data = vec![0_u8; t];
353            let available = end.saturating_sub(start);
354            if available > 0 {
355                data[..available].copy_from_slice(&changeset[start..end]);
356            }
357            data
358        } else {
359            // Repair symbol placeholder.
360            #[allow(clippy::cast_possible_truncation)]
361            {
362                let seed = derive_seed_from_changeset_id(&changeset_id);
363                let repair_seed = seed.wrapping_add(u64::from(isi));
364                let mut data = vec![0_u8; t];
365                for (i, byte) in data.iter_mut().enumerate() {
366                    let mixed = repair_seed
367                        .wrapping_mul(0x9E37_79B9_7F4A_7C15)
368                        .wrapping_add(i as u64);
369                    *byte = (mixed >> 32) as u8;
370                }
371                data
372            }
373        };
374
375        let seed = derive_seed_from_changeset_id(&changeset_id);
376        let r_repair = max_isi.saturating_sub(k_source);
377        let packet = ReplicationPacket::new_v2(
378            ReplicationPacketV2Header {
379                changeset_id,
380                sbn: 0,
381                esi: isi,
382                k_source,
383                r_repair,
384                symbol_size_t: self.config.symbol_size,
385                seed,
386            },
387            symbol_data,
388        );
389
390        self.current_isi += 1;
391        Some(packet)
392    }
393
394    /// Number of source blocks.
395    #[must_use]
396    pub fn num_blocks(&self) -> usize {
397        self.source_blocks.len()
398    }
399
400    /// Total source symbols across all blocks.
401    #[must_use]
402    pub fn total_source_symbols(&self) -> u64 {
403        self.block_k_sources.iter().map(|&k| u64::from(k)).sum()
404    }
405
406    /// Reset to re-stream from the beginning (for continuous multicast).
407    pub fn restart(&mut self) {
408        self.current_block = 0;
409        self.current_isi = 0;
410        self.done = false;
411        debug!(bead_id = BEAD_ID, "snapshot sender restarted for next pass");
412    }
413}
414
415// ---------------------------------------------------------------------------
416// Snapshot Receiver
417// ---------------------------------------------------------------------------
418
419/// Snapshot receiver state.
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub enum SnapshotReceiverState {
422    /// Waiting for first packet.
423    Waiting,
424    /// Actively collecting symbols.
425    Receiving,
426    /// All blocks decoded, snapshot complete.
427    Complete,
428}
429
430/// A decoded source block's pages.
431#[derive(Debug, Clone)]
432pub struct DecodedBlock {
433    /// Block index.
434    pub block_index: u32,
435    /// Decoded pages sorted by page number.
436    pub pages: Vec<DecodedBlockPage>,
437}
438
439/// A single page from a decoded block.
440#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct DecodedBlockPage {
442    /// Page number.
443    pub page_number: u32,
444    /// Page data.
445    pub page_data: Vec<u8>,
446}
447
448/// Per-block decoder used by the snapshot receiver.
449#[derive(Debug)]
450struct BlockDecoder {
451    /// The changeset_id for this block (determined from first packet).
452    changeset_id: Option<ChangesetId>,
453    /// K_source for this block.
454    k_source: u32,
455    /// Symbol size.
456    symbol_size: u32,
457    /// Seed for RaptorQ.
458    seed: u64,
459    /// Symbols collected by ISI.
460    symbols: HashMap<u32, Vec<u8>>,
461    /// ISI dedup set.
462    received_isis: HashSet<u32>,
463    /// Whether decoded.
464    decoded: bool,
465}
466
467impl BlockDecoder {
468    fn new() -> Self {
469        Self {
470            changeset_id: None,
471            k_source: 0,
472            symbol_size: 0,
473            seed: 0,
474            symbols: HashMap::new(),
475            received_isis: HashSet::new(),
476            decoded: false,
477        }
478    }
479
480    fn initialize(&mut self, changeset_id: ChangesetId, k_source: u32, symbol_size: u32) {
481        self.changeset_id = Some(changeset_id);
482        self.k_source = k_source;
483        self.symbol_size = symbol_size;
484        self.seed = derive_seed_from_changeset_id(&changeset_id);
485    }
486
487    fn add_symbol(&mut self, isi: u32, data: Vec<u8>) -> bool {
488        if self.received_isis.insert(isi) {
489            self.symbols.insert(isi, data);
490            true
491        } else {
492            false
493        }
494    }
495
496    fn received_count(&self) -> u32 {
497        u32::try_from(self.received_isis.len()).unwrap_or(u32::MAX)
498    }
499
500    fn ready_to_decode(&self) -> bool {
501        self.received_count() >= self.k_source && self.k_source > 0
502    }
503
504    fn try_decode(&self) -> Option<Vec<u8>> {
505        if !self.ready_to_decode() {
506            return None;
507        }
508        let source_count = self
509            .symbols
510            .keys()
511            .filter(|&&isi| isi < self.k_source)
512            .count();
513        let k = self.k_source as usize;
514        let t = self.symbol_size as usize;
515        if source_count >= k {
516            let padded_len = k * t;
517            let mut padded = vec![0_u8; padded_len];
518            for isi in 0..self.k_source {
519                if let Some(data) = self.symbols.get(&isi) {
520                    let start = isi as usize * t;
521                    let copy_len = data.len().min(t);
522                    padded[start..start + copy_len].copy_from_slice(&data[..copy_len]);
523                }
524            }
525            Some(padded)
526        } else {
527            warn!(
528                bead_id = BEAD_ID,
529                source_count,
530                k_source = self.k_source,
531                "snapshot block decode needs repair symbols (production RaptorQ)"
532            );
533            None
534        }
535    }
536}
537
538/// Snapshot receiver: collects symbols per source block, decodes progressively.
539#[derive(Debug)]
540pub struct SnapshotReceiver {
541    state: SnapshotReceiverState,
542    /// Per-changeset_id → block index mapping.
543    changeset_to_block: HashMap<ChangesetId, usize>,
544    /// Per-block decoders.
545    block_decoders: Vec<BlockDecoder>,
546    /// Number of blocks expected (set after first packet or from resume state).
547    num_blocks: usize,
548    /// Decoded blocks ready for application.
549    decoded_blocks: Vec<DecodedBlock>,
550    /// Resume state.
551    resume: ResumeState,
552    /// Page size.
553    page_size: u32,
554}
555
556impl SnapshotReceiver {
557    /// Create a new snapshot receiver.
558    ///
559    /// `num_blocks` is the expected number of source blocks (from partitioning).
560    /// `page_size` is the database page size.
561    #[must_use]
562    pub fn new(num_blocks: usize, page_size: u32) -> Self {
563        let block_decoders = (0..num_blocks).map(|_| BlockDecoder::new()).collect();
564        Self {
565            state: SnapshotReceiverState::Waiting,
566            changeset_to_block: HashMap::new(),
567            block_decoders,
568            num_blocks,
569            decoded_blocks: Vec::new(),
570            resume: ResumeState::new(u32::try_from(num_blocks).unwrap_or(u32::MAX)),
571            page_size,
572        }
573    }
574
575    /// Create from a resume state (after crash/reconnect).
576    #[must_use]
577    pub fn from_resume(resume: ResumeState, page_size: u32) -> Self {
578        let num_blocks = resume.total_blocks as usize;
579        let block_decoders = (0..num_blocks).map(|_| BlockDecoder::new()).collect();
580        Self {
581            state: if resume.all_decoded() {
582                SnapshotReceiverState::Complete
583            } else {
584                SnapshotReceiverState::Waiting
585            },
586            changeset_to_block: HashMap::new(),
587            block_decoders,
588            num_blocks,
589            decoded_blocks: Vec::new(),
590            resume,
591            page_size,
592        }
593    }
594
595    /// Current state.
596    #[must_use]
597    pub const fn state(&self) -> SnapshotReceiverState {
598        self.state
599    }
600
601    /// Number of blocks decoded so far.
602    #[must_use]
603    pub fn blocks_decoded(&self) -> usize {
604        self.decoded_blocks.len()
605    }
606
607    /// Get the resume state for persistence.
608    #[must_use]
609    pub fn resume_state(&self) -> &ResumeState {
610        &self.resume
611    }
612
613    /// Take decoded blocks (for application to local database).
614    pub fn take_decoded_blocks(&mut self) -> Vec<DecodedBlock> {
615        std::mem::take(&mut self.decoded_blocks)
616    }
617
618    /// Process a snapshot packet.
619    ///
620    /// The receiver maps packets to blocks by changeset_id. The first packet
621    /// for a new changeset_id establishes the mapping to the next unmapped block.
622    ///
623    /// # Errors
624    ///
625    /// Returns error if the packet is malformed or validation fails.
626    #[allow(clippy::too_many_lines)]
627    pub fn process_packet(&mut self, packet: &ReplicationPacket) -> Result<SnapshotPacketResult> {
628        if self.state == SnapshotReceiverState::Complete {
629            return Ok(SnapshotPacketResult::AlreadyComplete);
630        }
631
632        // V1 rule.
633        if packet.sbn != 0 {
634            return Err(FrankenError::Internal(format!(
635                "V1: SBN must be 0, got {}",
636                packet.sbn
637            )));
638        }
639        if packet.k_source == 0 || packet.k_source > K_MAX {
640            return Err(FrankenError::OutOfRange {
641                what: "k_source".to_owned(),
642                value: packet.k_source.to_string(),
643            });
644        }
645        let symbol_size =
646            u32::try_from(packet.symbol_data.len()).map_err(|_| FrankenError::OutOfRange {
647                what: "symbol_size".to_owned(),
648                value: packet.symbol_data.len().to_string(),
649            })?;
650        if symbol_size == 0 {
651            return Err(FrankenError::OutOfRange {
652                what: "symbol_size".to_owned(),
653                value: "0".to_owned(),
654            });
655        }
656
657        if self.state == SnapshotReceiverState::Waiting {
658            self.state = SnapshotReceiverState::Receiving;
659            info!(bead_id = BEAD_ID, "snapshot receiving started");
660        }
661
662        let changeset_id = packet.changeset_id;
663
664        // Map changeset_id to block index.
665        let block_idx = if let Some(&idx) = self.changeset_to_block.get(&changeset_id) {
666            idx
667        } else {
668            // Find the next unmapped, undecoded block.
669            let next_idx = self
670                .block_decoders
671                .iter()
672                .position(|d| d.changeset_id.is_none() && !d.decoded);
673            if let Some(idx) = next_idx {
674                self.changeset_to_block.insert(changeset_id, idx);
675                self.block_decoders[idx].initialize(changeset_id, packet.k_source, symbol_size);
676                debug!(
677                    bead_id = BEAD_ID,
678                    block_index = idx,
679                    k_source = packet.k_source,
680                    "mapped new changeset to block"
681                );
682                idx
683            } else {
684                warn!(
685                    bead_id = BEAD_ID,
686                    "no available block slot for new changeset_id"
687                );
688                return Ok(SnapshotPacketResult::Rejected);
689            }
690        };
691
692        if block_idx >= self.block_decoders.len() {
693            return Ok(SnapshotPacketResult::Rejected);
694        }
695
696        let decoder = &mut self.block_decoders[block_idx];
697        if decoder.decoded {
698            return Ok(SnapshotPacketResult::BlockAlreadyDecoded);
699        }
700
701        // Validate consistency.
702        if decoder.k_source != packet.k_source {
703            return Err(FrankenError::DatabaseCorrupt {
704                detail: format!(
705                    "k_source mismatch for block {block_idx}: {} vs {}",
706                    decoder.k_source, packet.k_source
707                ),
708            });
709        }
710        if decoder.symbol_size != symbol_size {
711            return Err(FrankenError::DatabaseCorrupt {
712                detail: format!(
713                    "symbol_size mismatch for block {block_idx}: {} vs {symbol_size}",
714                    decoder.symbol_size
715                ),
716            });
717        }
718
719        // Add symbol.
720        let accepted = decoder.add_symbol(packet.esi, packet.symbol_data.clone());
721        if !accepted {
722            return Ok(SnapshotPacketResult::Duplicate);
723        }
724
725        // Update resume state.
726        if block_idx < self.resume.blocks.len() {
727            self.resume.blocks[block_idx].record_isi(packet.esi);
728        }
729
730        // Check if ready to decode this block.
731        if decoder.ready_to_decode() && !decoder.decoded {
732            if let Some(padded) = decoder.try_decode() {
733                match parse_decoded_snapshot_block(&padded, self.page_size) {
734                    Ok(pages) => {
735                        let block_id = u32::try_from(block_idx).unwrap_or(u32::MAX);
736                        decoder.decoded = true;
737                        if block_idx < self.resume.blocks.len() {
738                            self.resume.blocks[block_idx].decoded = true;
739                        }
740                        let n_pages = pages.len();
741                        self.decoded_blocks.push(DecodedBlock {
742                            block_index: block_id,
743                            pages,
744                        });
745                        info!(
746                            bead_id = BEAD_ID,
747                            block_index = block_idx,
748                            n_pages,
749                            decoded_so_far = self.decoded_blocks.len(),
750                            total_blocks = self.num_blocks,
751                            "source block decoded (progressive)"
752                        );
753
754                        // Check if all blocks are done.
755                        if self.block_decoders.iter().all(|d| d.decoded) {
756                            self.state = SnapshotReceiverState::Complete;
757                            info!(
758                                bead_id = BEAD_ID,
759                                total_blocks = self.num_blocks,
760                                "snapshot fully received"
761                            );
762                        }
763                        return Ok(SnapshotPacketResult::BlockDecoded(block_id));
764                    }
765                    Err(e) => {
766                        error!(
767                            bead_id = BEAD_ID,
768                            block_index = block_idx,
769                            error = %e,
770                            "snapshot block validation failed"
771                        );
772                        return Err(e);
773                    }
774                }
775            }
776        }
777
778        Ok(SnapshotPacketResult::Accepted)
779    }
780}
781
782/// Result of processing a snapshot packet.
783#[derive(Debug, Clone, Copy, PartialEq, Eq)]
784pub enum SnapshotPacketResult {
785    /// Symbol accepted, need more.
786    Accepted,
787    /// Duplicate ISI, ignored.
788    Duplicate,
789    /// A source block was fully decoded (progressive).
790    BlockDecoded(u32),
791    /// This block was already decoded.
792    BlockAlreadyDecoded,
793    /// Packet rejected (no available block slot or already complete).
794    Rejected,
795    /// Snapshot already complete.
796    AlreadyComplete,
797}
798
799// ---------------------------------------------------------------------------
800// Helpers
801// ---------------------------------------------------------------------------
802
803/// Parse decoded snapshot block bytes into pages with xxh3 validation.
804fn parse_decoded_snapshot_block(
805    padded_bytes: &[u8],
806    _page_size: u32,
807) -> Result<Vec<DecodedBlockPage>> {
808    use crate::replication_sender::ChangesetHeader;
809
810    if padded_bytes.len() < CHANGESET_HEADER_SIZE {
811        return Err(FrankenError::DatabaseCorrupt {
812            detail: format!(
813                "decoded block too short for header: {} < {CHANGESET_HEADER_SIZE}",
814                padded_bytes.len()
815            ),
816        });
817    }
818
819    let header_bytes: [u8; CHANGESET_HEADER_SIZE] = padded_bytes[..CHANGESET_HEADER_SIZE]
820        .try_into()
821        .expect("checked length");
822    let header = ChangesetHeader::from_bytes(&header_bytes)?;
823
824    let total_len = usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
825        what: "total_len".to_owned(),
826        value: header.total_len.to_string(),
827    })?;
828    if total_len > padded_bytes.len() {
829        return Err(FrankenError::DatabaseCorrupt {
830            detail: format!(
831                "total_len ({total_len}) exceeds decoded bytes ({})",
832                padded_bytes.len()
833            ),
834        });
835    }
836    let changeset_bytes = &padded_bytes[..total_len];
837
838    let entry_size = 4_usize + 8 + header.page_size as usize;
839    let data_bytes = &changeset_bytes[CHANGESET_HEADER_SIZE..];
840
841    let required_data_len = (header.n_pages as usize)
842        .checked_mul(entry_size)
843        .ok_or_else(|| FrankenError::DatabaseCorrupt {
844            detail: "n_pages causes size overflow".to_owned(),
845        })?;
846
847    if data_bytes.len() < required_data_len {
848        return Err(FrankenError::DatabaseCorrupt {
849            detail: format!(
850                "changeset truncated: expected {} data bytes, got {}",
851                required_data_len,
852                data_bytes.len()
853            ),
854        });
855    }
856
857    let mut pages = Vec::with_capacity(header.n_pages as usize);
858    for i in 0..header.n_pages as usize {
859        let offset = i * entry_size;
860        let page_number =
861            u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
862        let page_xxh3 = u64::from_le_bytes(
863            data_bytes[offset + 4..offset + 12]
864                .try_into()
865                .expect("8 bytes"),
866        );
867        let page_data = data_bytes[offset + 12..offset + 12 + header.page_size as usize].to_vec();
868
869        let computed_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_data);
870        if computed_xxh3 != page_xxh3 {
871            error!(
872                bead_id = BEAD_ID,
873                page_number,
874                expected_xxh3 = page_xxh3,
875                computed_xxh3,
876                "snapshot page xxh3 mismatch"
877            );
878            return Err(FrankenError::DatabaseCorrupt {
879                detail: format!(
880                    "snapshot page {page_number} xxh3 mismatch: {page_xxh3:#x} vs {computed_xxh3:#x}"
881                ),
882            });
883        }
884
885        pages.push(DecodedBlockPage {
886            page_number,
887            page_data,
888        });
889    }
890
891    Ok(pages)
892}
893
894#[cfg(test)]
895mod tests {
896    use super::*;
897    use crate::replication_sender::PageEntry;
898
899    const TEST_BEAD_ID: &str = "bd-1hi.15";
900
901    #[allow(clippy::cast_possible_truncation)]
902    fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
903        page_numbers
904            .iter()
905            .map(|&pn| {
906                let mut data = vec![0_u8; page_size as usize];
907                for (i, byte) in data.iter_mut().enumerate() {
908                    *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
909                }
910                PageEntry::new(pn, data)
911            })
912            .collect()
913    }
914
915    // -----------------------------------------------------------------------
916    // Resume state tests
917    // -----------------------------------------------------------------------
918
919    #[test]
920    fn test_resume_state_persistence() {
921        let mut resume = ResumeState::new(3);
922        resume.blocks[0].record_isi(0);
923        resume.blocks[0].record_isi(5);
924        resume.blocks[0].record_isi(10);
925        resume.blocks[1].decoded = true;
926
927        let bytes = resume.to_bytes();
928        let restored = ResumeState::from_bytes(&bytes).expect("deserialize");
929
930        assert_eq!(
931            restored.total_blocks, 3,
932            "bead_id={TEST_BEAD_ID} case=resume_total_blocks"
933        );
934        assert_eq!(
935            restored.blocks[0].num_received, 3,
936            "bead_id={TEST_BEAD_ID} case=resume_block0_received"
937        );
938        assert!(
939            restored.blocks[0].received_isis.contains(&5),
940            "bead_id={TEST_BEAD_ID} case=resume_block0_isi_5"
941        );
942        assert!(
943            restored.blocks[1].decoded,
944            "bead_id={TEST_BEAD_ID} case=resume_block1_decoded"
945        );
946        assert!(
947            !restored.blocks[2].decoded,
948            "bead_id={TEST_BEAD_ID} case=resume_block2_not_decoded"
949        );
950    }
951
952    #[test]
953    fn test_resume_no_protocol_negotiation() {
954        // Resume state works without any sender-side coordination.
955        let mut resume = ResumeState::new(2);
956        resume.blocks[0].record_isi(0);
957        resume.blocks[0].record_isi(1);
958
959        // Persist and restore.
960        let bytes = resume.to_bytes();
961        let restored = ResumeState::from_bytes(&bytes).expect("deserialize");
962        assert_eq!(
963            restored.blocks[0].num_received, 2,
964            "bead_id={TEST_BEAD_ID} case=resume_no_negotiation"
965        );
966        assert!(!restored.all_decoded());
967    }
968
969    // -----------------------------------------------------------------------
970    // Snapshot sender/receiver integration
971    // -----------------------------------------------------------------------
972
973    #[test]
974    fn test_snapshot_single_block() {
975        let page_size = 256_u32;
976        let page_numbers: Vec<u32> = (1..=10).collect();
977        let mut pages = make_pages(page_size, &page_numbers);
978
979        let config = SenderConfig {
980            symbol_size: 256,
981            max_isi_multiplier: 1,
982        };
983        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
984        assert_eq!(
985            sender.num_blocks(),
986            1,
987            "bead_id={TEST_BEAD_ID} case=single_block"
988        );
989
990        // Collect all packets.
991        let mut packets = Vec::new();
992        while let Some(pkt) = sender.next_packet() {
993            packets.push(pkt);
994        }
995        assert!(
996            !packets.is_empty(),
997            "bead_id={TEST_BEAD_ID} case=has_packets"
998        );
999
1000        // Feed to receiver.
1001        let mut receiver = SnapshotReceiver::new(1, page_size);
1002        for pkt in &packets {
1003            let _ = receiver.process_packet(pkt);
1004        }
1005
1006        assert_eq!(
1007            receiver.state(),
1008            SnapshotReceiverState::Complete,
1009            "bead_id={TEST_BEAD_ID} case=single_block_complete"
1010        );
1011
1012        let blocks = receiver.take_decoded_blocks();
1013        assert_eq!(blocks.len(), 1);
1014        assert_eq!(blocks[0].pages.len(), 10);
1015    }
1016
1017    #[test]
1018    fn test_snapshot_multi_block_small() {
1019        // Force multi-block by using many pages.
1020        // Use smaller page count that still creates multiple blocks
1021        // by using the sender's internal sharding mechanism.
1022        let page_size = 64_u32;
1023        let n_pages = 200_u32;
1024        let page_numbers: Vec<u32> = (1..=n_pages).collect();
1025        let mut pages = make_pages(page_size, &page_numbers);
1026
1027        let config = SenderConfig {
1028            symbol_size: 64,
1029            max_isi_multiplier: 1,
1030        };
1031        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1032
1033        // Should be 1 block (200 < K_MAX).
1034        assert_eq!(
1035            sender.num_blocks(),
1036            1,
1037            "bead_id={TEST_BEAD_ID} case=multi_block_small_count"
1038        );
1039
1040        let mut packets = Vec::new();
1041        while let Some(pkt) = sender.next_packet() {
1042            packets.push(pkt);
1043        }
1044
1045        let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1046        for pkt in &packets {
1047            let _ = receiver.process_packet(pkt);
1048        }
1049
1050        assert_eq!(
1051            receiver.state(),
1052            SnapshotReceiverState::Complete,
1053            "bead_id={TEST_BEAD_ID} case=multi_block_small_complete"
1054        );
1055
1056        let blocks = receiver.take_decoded_blocks();
1057        let total_pages: usize = blocks.iter().map(|b| b.pages.len()).sum();
1058        assert_eq!(
1059            total_pages, n_pages as usize,
1060            "bead_id={TEST_BEAD_ID} case=multi_block_all_pages"
1061        );
1062    }
1063
1064    #[test]
1065    fn test_duplicate_isi_discarded() {
1066        let page_size = 128_u32;
1067        let mut pages = make_pages(page_size, &[1, 2, 3]);
1068        let config = SenderConfig {
1069            symbol_size: 128,
1070            max_isi_multiplier: 1,
1071        };
1072        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1073
1074        let mut packets = Vec::new();
1075        while let Some(pkt) = sender.next_packet() {
1076            packets.push(pkt);
1077        }
1078
1079        let mut receiver = SnapshotReceiver::new(1, page_size);
1080
1081        // Feed first packet twice.
1082        let r1 = receiver.process_packet(&packets[0]).expect("first");
1083        assert_ne!(
1084            r1,
1085            SnapshotPacketResult::Duplicate,
1086            "bead_id={TEST_BEAD_ID} case=first_not_dup"
1087        );
1088        let r2 = receiver.process_packet(&packets[0]).expect("duplicate");
1089        assert_eq!(
1090            r2,
1091            SnapshotPacketResult::Duplicate,
1092            "bead_id={TEST_BEAD_ID} case=dup_discarded"
1093        );
1094    }
1095
1096    #[test]
1097    fn test_snapshot_progressive_receive() {
1098        // With a single block, after decode the receiver is complete.
1099        // Progressive receive means we can query pages from decoded blocks
1100        // while other blocks are still being received.
1101        let page_size = 128_u32;
1102        let mut pages = make_pages(page_size, &[1, 2, 3, 4, 5]);
1103        let config = SenderConfig {
1104            symbol_size: 128,
1105            max_isi_multiplier: 1,
1106        };
1107        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1108
1109        let mut packets = Vec::new();
1110        while let Some(pkt) = sender.next_packet() {
1111            packets.push(pkt);
1112        }
1113
1114        let mut receiver = SnapshotReceiver::new(1, page_size);
1115        let mut block_decoded_at = None;
1116
1117        for (i, pkt) in packets.iter().enumerate() {
1118            if let Ok(SnapshotPacketResult::BlockDecoded(_)) = receiver.process_packet(pkt) {
1119                block_decoded_at = Some(i);
1120                break;
1121            }
1122        }
1123
1124        assert!(
1125            block_decoded_at.is_some(),
1126            "bead_id={TEST_BEAD_ID} case=progressive_block_decoded"
1127        );
1128
1129        // After decoding, pages are available.
1130        let blocks = receiver.take_decoded_blocks();
1131        assert!(
1132            !blocks.is_empty(),
1133            "bead_id={TEST_BEAD_ID} case=progressive_has_pages"
1134        );
1135    }
1136
1137    // -----------------------------------------------------------------------
1138    // E2E tests
1139    // -----------------------------------------------------------------------
1140
1141    #[test]
1142    fn test_e2e_sender_receiver_roundtrip() {
1143        let page_size = 512_u32;
1144        let n_pages = 50_u32;
1145        let page_numbers: Vec<u32> = (1..=n_pages).collect();
1146        let original_pages = make_pages(page_size, &page_numbers);
1147        let mut pages = original_pages.clone();
1148
1149        let config = SenderConfig {
1150            symbol_size: 512,
1151            max_isi_multiplier: 1,
1152        };
1153        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1154
1155        let mut packets = Vec::new();
1156        while let Some(pkt) = sender.next_packet() {
1157            packets.push(pkt);
1158        }
1159
1160        let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1161        for pkt in &packets {
1162            let _ = receiver.process_packet(pkt);
1163        }
1164
1165        assert_eq!(
1166            receiver.state(),
1167            SnapshotReceiverState::Complete,
1168            "bead_id={TEST_BEAD_ID} case=e2e_roundtrip_complete"
1169        );
1170
1171        let blocks = receiver.take_decoded_blocks();
1172        let mut all_decoded_pages: Vec<&DecodedBlockPage> =
1173            blocks.iter().flat_map(|b| b.pages.iter()).collect();
1174        all_decoded_pages.sort_by_key(|p| p.page_number);
1175
1176        assert_eq!(
1177            all_decoded_pages.len(),
1178            original_pages.len(),
1179            "bead_id={TEST_BEAD_ID} case=e2e_page_count"
1180        );
1181
1182        for (decoded, original) in all_decoded_pages.iter().zip(original_pages.iter()) {
1183            assert_eq!(
1184                decoded.page_number, original.page_number,
1185                "bead_id={TEST_BEAD_ID} case=e2e_page_number"
1186            );
1187            assert_eq!(
1188                decoded.page_data, original.page_bytes,
1189                "bead_id={TEST_BEAD_ID} case=e2e_page_data pn={}",
1190                original.page_number
1191            );
1192        }
1193    }
1194
1195    #[test]
1196    fn test_e2e_resume_after_partial() {
1197        let page_size = 128_u32;
1198        let n_pages = 20_u32;
1199        let mut pages = make_pages(page_size, &(1..=n_pages).collect::<Vec<_>>());
1200
1201        let config = SenderConfig {
1202            symbol_size: 128,
1203            max_isi_multiplier: 1,
1204        };
1205        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1206
1207        let mut packets = Vec::new();
1208        while let Some(pkt) = sender.next_packet() {
1209            packets.push(pkt);
1210        }
1211
1212        // First receiver: receive only half the packets.
1213        let half = packets.len() / 2;
1214        let mut receiver1 = SnapshotReceiver::new(sender.num_blocks(), page_size);
1215        for pkt in &packets[..half] {
1216            let _ = receiver1.process_packet(pkt);
1217        }
1218
1219        // Persist resume state.
1220        let resume_bytes = receiver1.resume_state().to_bytes();
1221
1222        // "Crash" — create new receiver from resume state.
1223        let resume = ResumeState::from_bytes(&resume_bytes).expect("restore");
1224        let mut receiver2 = SnapshotReceiver::from_resume(resume, page_size);
1225
1226        // Continue with remaining packets (and possibly some overlap).
1227        for pkt in &packets {
1228            let _ = receiver2.process_packet(pkt);
1229        }
1230
1231        // Should be complete now.
1232        assert_eq!(
1233            receiver2.state(),
1234            SnapshotReceiverState::Complete,
1235            "bead_id={TEST_BEAD_ID} case=e2e_resume_complete"
1236        );
1237    }
1238
1239    #[test]
1240    fn test_e2e_bd_1hi_15_compliance() {
1241        // Full compliance test.
1242        let page_size = 256_u32;
1243        let n_pages = 30_u32;
1244        let original_pages = make_pages(page_size, &(1..=n_pages).collect::<Vec<_>>());
1245        let mut pages = original_pages;
1246
1247        let config = SenderConfig {
1248            symbol_size: 256,
1249            max_isi_multiplier: 1,
1250        };
1251        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1252
1253        // Verify sender state.
1254        assert!(
1255            sender.num_blocks() >= 1,
1256            "bead_id={TEST_BEAD_ID} case=compliance_has_blocks"
1257        );
1258        assert!(
1259            sender.total_source_symbols() > 0,
1260            "bead_id={TEST_BEAD_ID} case=compliance_has_symbols"
1261        );
1262
1263        let mut packets = Vec::new();
1264        while let Some(pkt) = sender.next_packet() {
1265            packets.push(pkt);
1266        }
1267
1268        let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1269        assert_eq!(receiver.state(), SnapshotReceiverState::Waiting);
1270
1271        for pkt in &packets {
1272            let _ = receiver.process_packet(pkt);
1273        }
1274        assert_eq!(receiver.state(), SnapshotReceiverState::Complete);
1275
1276        let blocks = receiver.take_decoded_blocks();
1277        let total_decoded: usize = blocks.iter().map(|b| b.pages.len()).sum();
1278        assert_eq!(
1279            total_decoded, n_pages as usize,
1280            "bead_id={TEST_BEAD_ID} case=compliance_all_pages_decoded"
1281        );
1282
1283        // Verify resume state.
1284        assert!(
1285            receiver.resume_state().all_decoded(),
1286            "bead_id={TEST_BEAD_ID} case=compliance_resume_all_decoded"
1287        );
1288    }
1289
1290    // -----------------------------------------------------------------------
1291    // Property tests
1292    // -----------------------------------------------------------------------
1293
1294    #[test]
1295    fn prop_partition_covers_all_pages() {
1296        for p in [1_u32, 10, 100, 1000, 56_403, 56_404, 100_000] {
1297            let blocks = partition_source_blocks(p).expect("partition");
1298            let total: u32 = blocks.iter().map(|b| b.num_pages).sum();
1299            assert_eq!(
1300                total, p,
1301                "bead_id={TEST_BEAD_ID} case=prop_partition_covers p={p}"
1302            );
1303        }
1304    }
1305
1306    #[test]
1307    fn prop_partition_block_sizes_valid() {
1308        for p in [1_u32, 56_403, 56_404, 200_000] {
1309            let blocks = partition_source_blocks(p).expect("partition");
1310            for block in &blocks {
1311                assert!(
1312                    block.num_pages <= K_MAX,
1313                    "bead_id={TEST_BEAD_ID} case=prop_block_size p={p} block={} num_pages={}",
1314                    block.index,
1315                    block.num_pages
1316                );
1317            }
1318        }
1319    }
1320
1321    // -----------------------------------------------------------------------
1322    // Compliance gate tests
1323    // -----------------------------------------------------------------------
1324
1325    #[test]
1326    fn test_bd_1hi_15_unit_compliance_gate() {
1327        // Verify all required types exist.
1328        let _ = SnapshotReceiverState::Waiting;
1329        let _ = SnapshotReceiverState::Receiving;
1330        let _ = SnapshotReceiverState::Complete;
1331
1332        let _ = SnapshotPacketResult::Accepted;
1333        let _ = SnapshotPacketResult::Duplicate;
1334        let _ = SnapshotPacketResult::Rejected;
1335        let _ = SnapshotPacketResult::AlreadyComplete;
1336
1337        let resume = ResumeState::new(3);
1338        assert_eq!(resume.total_blocks, 3);
1339        assert!(!resume.all_decoded());
1340        assert_eq!(resume.decoded_count(), 0);
1341
1342        // Verify BlockResumeState serialization.
1343        let block = BlockResumeState::new(0);
1344        let bytes = block.to_bytes();
1345        let (restored, _) = BlockResumeState::from_bytes(&bytes).expect("deser");
1346        assert_eq!(restored.block_id, 0);
1347    }
1348
1349    #[test]
1350    fn prop_bd_1hi_15_structure_compliance() {
1351        // Verify snapshot sender + receiver integration.
1352        let page_size = 128_u32;
1353        let mut pages = make_pages(page_size, &[1, 2]);
1354        let config = SenderConfig {
1355            symbol_size: 128,
1356            max_isi_multiplier: 1,
1357        };
1358        let mut sender = SnapshotSender::prepare(page_size, &mut pages, config).expect("prepare");
1359        assert!(sender.num_blocks() >= 1);
1360
1361        let mut packets = Vec::new();
1362        while let Some(pkt) = sender.next_packet() {
1363            packets.push(pkt);
1364        }
1365
1366        let mut receiver = SnapshotReceiver::new(sender.num_blocks(), page_size);
1367        for pkt in &packets {
1368            let _ = receiver.process_packet(pkt);
1369        }
1370        assert_eq!(receiver.state(), SnapshotReceiverState::Complete);
1371    }
1372}