Skip to main content

fsqlite_core/
replication_receiver.rs

1//! §3.4.2 Fountain-Coded Replication Receiver (bd-1hi.14).
2//!
3//! Implements the receiver-side state machine for fountain-coded database
4//! replication. Listens for UDP packets, collects symbols per changeset,
5//! decodes when sufficient, validates and applies recovered pages.
6//!
7//! State machine: LISTENING → COLLECTING → DECODING → APPLYING → COMPLETE
8
9use std::collections::{HashMap, HashSet};
10
11use fsqlite_error::{FrankenError, Result};
12use fsqlite_types::ObjectId;
13use tracing::{debug, error, info, warn};
14
15use crate::decode_proofs::{DecodeAuditEntry, EcsDecodeProof};
16use crate::replication_sender::{
17    CHANGESET_HEADER_SIZE, ChangesetHeader, ChangesetId, DEFAULT_RPC_MESSAGE_CAP_BYTES, PageEntry,
18    ReplicationPacket, ReplicationWireVersion,
19};
20use crate::source_block_partition::K_MAX;
21
22const BEAD_ID: &str = "bd-1hi.14";
23const DEFAULT_MAX_INFLIGHT_DECODERS: usize = 128;
24const DEFAULT_MAX_BUFFERED_SYMBOL_BYTES: usize = 64 * 1024 * 1024;
25
26// ---------------------------------------------------------------------------
27// Receiver State Machine
28// ---------------------------------------------------------------------------
29
30/// Receiver state (§3.4.2).
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ReceiverState {
33    /// Ready to accept replication data.
34    Listening,
35    /// At least one packet received; collecting symbols.
36    Collecting,
37    /// Sufficient symbols collected; decoding in progress.
38    Decoding,
39    /// Pages decoded; applying to local database.
40    Applying,
41    /// All pages applied; ready for next changeset.
42    Complete,
43}
44
45/// Per-changeset decoder state, created on first packet.
46#[derive(Debug)]
47pub struct DecoderState {
48    /// Number of source symbols expected.
49    pub k_source: u32,
50    /// Symbol size in bytes (inferred from first packet).
51    pub symbol_size: u32,
52    /// Deterministic seed derived from changeset_id.
53    pub seed: u64,
54    /// Collected symbols indexed by ISI.
55    symbols: HashMap<u32, Vec<u8>>,
56    /// Set of received ISIs for O(1) deduplication.
57    received_isis: HashSet<u32>,
58}
59
60impl DecoderState {
61    /// Create a new decoder state for a changeset.
62    fn new(k_source: u32, symbol_size: u32, seed: u64) -> Self {
63        Self {
64            k_source,
65            symbol_size,
66            seed,
67            symbols: HashMap::with_capacity(k_source as usize),
68            received_isis: HashSet::with_capacity(k_source as usize),
69        }
70    }
71
72    /// Number of unique symbols received.
73    #[must_use]
74    pub fn received_count(&self) -> u32 {
75        u32::try_from(self.received_isis.len()).unwrap_or(u32::MAX)
76    }
77
78    /// Whether enough symbols have been collected to attempt decode.
79    #[must_use]
80    pub fn ready_to_decode(&self) -> bool {
81        self.received_count() >= self.k_source
82    }
83
84    /// Number of collected source symbols (`isi < k_source`).
85    #[must_use]
86    pub fn source_symbol_count(&self) -> u32 {
87        let count = self
88            .symbols
89            .keys()
90            .filter(|&&isi| isi < self.k_source)
91            .count();
92        u32::try_from(count).unwrap_or(u32::MAX)
93    }
94
95    /// Whether any collected symbol is a repair symbol (`isi >= k_source`).
96    #[must_use]
97    pub fn has_repair_symbols(&self) -> bool {
98        self.symbols.keys().any(|&isi| isi >= self.k_source)
99    }
100
101    /// Sorted unique ISIs of all collected symbols.
102    #[must_use]
103    pub fn sorted_isis(&self) -> Vec<u32> {
104        let mut isis: Vec<u32> = self.symbols.keys().copied().collect();
105        isis.sort_unstable();
106        isis.dedup();
107        isis
108    }
109
110    /// Add a symbol. Returns `true` if the symbol was new (accepted).
111    fn add_symbol(&mut self, isi: u32, data: Vec<u8>) -> bool {
112        if self.received_isis.contains(&isi) {
113            return false;
114        }
115        self.received_isis.insert(isi);
116        self.symbols.insert(isi, data);
117        true
118    }
119
120    #[must_use]
121    fn has_symbol(&self, isi: u32) -> bool {
122        self.received_isis.contains(&isi)
123    }
124
125    #[must_use]
126    fn buffered_bytes(&self) -> usize {
127        self.symbols.values().map(Vec::len).sum()
128    }
129
130    /// Attempt to decode the collected symbols into changeset bytes.
131    ///
132    /// For source symbols (ISI < k_source), this reconstructs the padded
133    /// changeset by placing each symbol at offset `ISI * symbol_size`.
134    /// Repair symbols would require RaptorQ decoding in production;
135    /// this implementation handles the source-symbol-only case.
136    ///
137    /// Returns `None` if insufficient symbols or decode fails.
138    fn try_decode(&self) -> Option<Vec<u8>> {
139        if !self.ready_to_decode() {
140            return None;
141        }
142
143        // Count source symbols available.
144        let source_count = usize::try_from(self.source_symbol_count()).unwrap_or(usize::MAX);
145
146        let k = self.k_source as usize;
147        let t = self.symbol_size as usize;
148
149        if source_count >= k {
150            // All source symbols available — reconstruct directly.
151            let padded_len = k * t;
152            let mut padded = vec![0_u8; padded_len];
153            for isi in 0..self.k_source {
154                if let Some(data) = self.symbols.get(&isi) {
155                    let start = isi as usize * t;
156                    let copy_len = data.len().min(t);
157                    padded[start..start + copy_len].copy_from_slice(&data[..copy_len]);
158                }
159            }
160            Some(padded)
161        } else {
162            // Need repair symbols + RaptorQ decoder (production path via asupersync).
163            // For now, return None to stay in COLLECTING.
164            warn!(
165                bead_id = BEAD_ID,
166                source_count,
167                k_source = self.k_source,
168                total_received = self.received_count(),
169                "decode requires repair symbols (production uses RaptorQ decoder)"
170            );
171            None
172        }
173    }
174}
175
176/// A decoded and validated page ready for application.
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct DecodedPage {
179    /// Page number in the database.
180    pub page_number: u32,
181    /// Validated page data.
182    pub page_data: Vec<u8>,
183}
184
185/// Result of a successful decode operation.
186#[derive(Debug)]
187pub struct DecodeResult {
188    /// The changeset identifier that was decoded.
189    pub changeset_id: ChangesetId,
190    /// Decoded and validated pages, sorted by page number.
191    pub pages: Vec<DecodedPage>,
192    /// Number of symbols used for decoding.
193    pub symbols_used: u32,
194    /// Optional decode proof emitted under policy control.
195    pub decode_proof: Option<EcsDecodeProof>,
196}
197
198#[derive(Debug, Clone, Copy)]
199struct DecodeProofBuildInput<'a> {
200    changeset_id: ChangesetId,
201    k_source: u32,
202    symbol_size: u32,
203    seed: u64,
204    received_isis: &'a [u32],
205    decode_success: bool,
206    intermediate_rank: Option<u32>,
207    symbols_used: u32,
208}
209
210/// Replication receiver state machine.
211#[derive(Debug)]
212pub struct ReplicationReceiver {
213    config: ReceiverConfig,
214    state: ReceiverState,
215    /// Per-changeset decoder states.
216    decoders: HashMap<ChangesetId, DecoderState>,
217    /// Received symbol counts per changeset.
218    received_counts: HashMap<ChangesetId, u32>,
219    /// Total bytes currently buffered across all decoder symbol sets.
220    buffered_symbol_bytes: usize,
221    /// Decoded results waiting for application.
222    pending_results: Vec<DecodeResult>,
223    /// Applied results (for metrics/ACK).
224    applied_count: u64,
225    /// Decode-proof audit entries emitted by this receiver.
226    decode_audit: Vec<DecodeAuditEntry>,
227    /// Monotonic audit sequence.
228    decode_audit_seq: u64,
229}
230
231/// Receiver policy knobs for packet integrity/auth enforcement.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct DecodeProofEmissionPolicy {
234    /// Emit proofs on decode failure (durability-critical requirement).
235    pub emit_on_decode_failure: bool,
236    /// Emit proofs on successful decode that included repair symbols.
237    pub emit_on_repair_success: bool,
238}
239
240impl DecodeProofEmissionPolicy {
241    /// Default production posture: disabled.
242    #[must_use]
243    pub const fn disabled() -> Self {
244        Self {
245            emit_on_decode_failure: false,
246            emit_on_repair_success: false,
247        }
248    }
249
250    /// Durability-critical posture for replication apply paths.
251    #[must_use]
252    pub const fn durability_critical() -> Self {
253        Self {
254            emit_on_decode_failure: true,
255            emit_on_repair_success: true,
256        }
257    }
258}
259
260impl Default for DecodeProofEmissionPolicy {
261    fn default() -> Self {
262        Self::disabled()
263    }
264}
265
266/// Receiver policy knobs for packet integrity/auth enforcement.
267#[derive(Debug, Clone)]
268pub struct ReceiverConfig {
269    /// Optional auth key for validating packet auth tags.
270    pub auth_key: Option<[u8; 32]>,
271    /// Decode proof emission hooks.
272    pub decode_proof_policy: DecodeProofEmissionPolicy,
273    /// Maximum number of concurrent in-flight changeset decoders.
274    pub max_inflight_decoders: usize,
275    /// Maximum total bytes buffered across all decoder symbol maps.
276    pub max_buffered_symbol_bytes: usize,
277}
278
279impl ReceiverConfig {
280    /// Build a receiver config with authenticated transport enabled.
281    #[must_use]
282    pub const fn with_auth_key(auth_key: [u8; 32]) -> Self {
283        Self {
284            auth_key: Some(auth_key),
285            decode_proof_policy: DecodeProofEmissionPolicy::disabled(),
286            max_inflight_decoders: DEFAULT_MAX_INFLIGHT_DECODERS,
287            max_buffered_symbol_bytes: DEFAULT_MAX_BUFFERED_SYMBOL_BYTES,
288        }
289    }
290}
291
292impl Default for ReceiverConfig {
293    fn default() -> Self {
294        Self {
295            auth_key: None,
296            decode_proof_policy: DecodeProofEmissionPolicy::disabled(),
297            max_inflight_decoders: DEFAULT_MAX_INFLIGHT_DECODERS,
298            max_buffered_symbol_bytes: DEFAULT_MAX_BUFFERED_SYMBOL_BYTES,
299        }
300    }
301}
302
303impl ReplicationReceiver {
304    fn remove_decoder(&mut self, changeset_id: ChangesetId) {
305        if let Some(decoder) = self.decoders.remove(&changeset_id) {
306            self.buffered_symbol_bytes = self
307                .buffered_symbol_bytes
308                .saturating_sub(decoder.buffered_bytes());
309        }
310        self.received_counts.remove(&changeset_id);
311    }
312
313    /// Create a new receiver with explicit configuration.
314    #[must_use]
315    pub fn with_config(config: ReceiverConfig) -> Self {
316        Self {
317            config,
318            state: ReceiverState::Listening,
319            decoders: HashMap::new(),
320            received_counts: HashMap::new(),
321            buffered_symbol_bytes: 0,
322            pending_results: Vec::new(),
323            applied_count: 0,
324            decode_audit: Vec::new(),
325            decode_audit_seq: 0,
326        }
327    }
328
329    /// Create a new receiver in LISTENING state.
330    #[must_use]
331    pub fn new() -> Self {
332        Self::with_config(ReceiverConfig::default())
333    }
334
335    /// Current state.
336    #[must_use]
337    pub const fn state(&self) -> ReceiverState {
338        self.state
339    }
340
341    /// Number of changesets successfully applied.
342    #[must_use]
343    pub const fn applied_count(&self) -> u64 {
344        self.applied_count
345    }
346
347    /// Number of active decoder sessions.
348    #[must_use]
349    pub fn active_decoders(&self) -> usize {
350        self.decoders.len()
351    }
352
353    /// View decode-proof audit entries emitted so far.
354    #[must_use]
355    pub fn decode_audit_entries(&self) -> &[DecodeAuditEntry] {
356        &self.decode_audit
357    }
358
359    /// Drain decode-proof audit entries.
360    pub fn take_decode_audit_entries(&mut self) -> Vec<DecodeAuditEntry> {
361        std::mem::take(&mut self.decode_audit)
362    }
363
364    /// Process a raw packet from the wire.
365    ///
366    /// # Errors
367    ///
368    /// Returns error if:
369    /// - Packet is malformed (too short, symbol_size = 0)
370    /// - V1 rule violated (SBN != 0)
371    /// - K_source out of range
372    /// - K_source or symbol_size mismatch for existing decoder
373    pub fn process_packet(&mut self, packet_bytes: &[u8]) -> Result<PacketResult> {
374        if packet_bytes.len() > DEFAULT_RPC_MESSAGE_CAP_BYTES {
375            return Err(FrankenError::TooBig);
376        }
377        let packet = ReplicationPacket::from_bytes(packet_bytes)?;
378        if !packet.verify_integrity(self.config.auth_key.as_ref()) {
379            warn!(
380                bead_id = BEAD_ID,
381                wire_version = ?packet.wire_version,
382                has_auth = packet.auth_tag.is_some(),
383                "packet integrity/auth verification failed; treating as erasure"
384            );
385            return Ok(PacketResult::Erasure);
386        }
387        self.process_parsed_packet(&packet)
388    }
389
390    /// Process a parsed packet.
391    ///
392    /// # Errors
393    ///
394    /// See `process_packet`.
395    #[allow(clippy::too_many_lines)]
396    pub fn process_parsed_packet(&mut self, packet: &ReplicationPacket) -> Result<PacketResult> {
397        // V1 rule: reject multi-block packets.
398        if packet.sbn != 0 {
399            error!(
400                bead_id = BEAD_ID,
401                sbn = packet.sbn,
402                "V1 rule: SBN must be 0"
403            );
404            return Err(FrankenError::Internal(format!(
405                "V1 replication: source_block must be 0, got {}",
406                packet.sbn
407            )));
408        }
409
410        // Validate K_source range.
411        if packet.k_source == 0 || packet.k_source > K_MAX {
412            error!(
413                bead_id = BEAD_ID,
414                k_source = packet.k_source,
415                k_max = K_MAX,
416                "K_source out of valid range"
417            );
418            return Err(FrankenError::OutOfRange {
419                what: "k_source".to_owned(),
420                value: packet.k_source.to_string(),
421            });
422        }
423
424        // Compute symbol_size from packet header and validate payload consistency.
425        if usize::from(packet.symbol_size_t) != packet.symbol_data.len() {
426            return Err(FrankenError::DatabaseCorrupt {
427                detail: format!(
428                    "symbol_size_t mismatch: header={}, payload={}",
429                    packet.symbol_size_t,
430                    packet.symbol_data.len()
431                ),
432            });
433        }
434        let symbol_size = u32::from(packet.symbol_size_t);
435        if symbol_size == 0 {
436            return Err(FrankenError::OutOfRange {
437                what: "symbol_size".to_owned(),
438                value: "0".to_owned(),
439            });
440        }
441
442        // Transition LISTENING → COLLECTING on first packet.
443        if self.state == ReceiverState::Listening {
444            self.state = ReceiverState::Collecting;
445            info!(bead_id = BEAD_ID, "first packet received, now COLLECTING");
446        }
447
448        let changeset_id = packet.changeset_id;
449        let mut created_decoder = false;
450
451        // Get or create decoder state.
452        if let Some(decoder) = self.decoders.get(&changeset_id) {
453            // Validate consistency with existing decoder.
454            if decoder.k_source != packet.k_source {
455                error!(
456                    bead_id = BEAD_ID,
457                    expected_k = decoder.k_source,
458                    got_k = packet.k_source,
459                    "K_source mismatch for existing changeset"
460                );
461                return Err(FrankenError::DatabaseCorrupt {
462                    detail: format!(
463                        "K_source mismatch: expected {}, got {}",
464                        decoder.k_source, packet.k_source
465                    ),
466                });
467            }
468            if decoder.symbol_size != symbol_size {
469                error!(
470                    bead_id = BEAD_ID,
471                    expected_t = decoder.symbol_size,
472                    got_t = symbol_size,
473                    "symbol_size mismatch for existing changeset"
474                );
475                return Err(FrankenError::DatabaseCorrupt {
476                    detail: format!(
477                        "symbol_size mismatch: expected {}, got {}",
478                        decoder.symbol_size, symbol_size
479                    ),
480                });
481            }
482            if packet.wire_version == ReplicationWireVersion::FramedV2
483                && decoder.seed != packet.seed
484            {
485                return Err(FrankenError::DatabaseCorrupt {
486                    detail: format!(
487                        "seed mismatch: expected {}, got {}",
488                        decoder.seed, packet.seed
489                    ),
490                });
491            }
492        } else {
493            if self.decoders.len() >= self.config.max_inflight_decoders {
494                warn!(
495                    bead_id = BEAD_ID,
496                    active_decoders = self.decoders.len(),
497                    max_inflight_decoders = self.config.max_inflight_decoders,
498                    "decoder cap reached; rejecting new changeset"
499                );
500                return Err(FrankenError::Busy);
501            }
502            // Create new decoder state.
503            let expected_seed =
504                crate::replication_sender::derive_seed_from_changeset_id(&changeset_id);
505            if packet.wire_version == ReplicationWireVersion::FramedV2
506                && packet.seed != expected_seed
507            {
508                return Err(FrankenError::DatabaseCorrupt {
509                    detail: format!(
510                        "seed does not match deterministic derivation for changeset: expected {expected_seed}, got {}",
511                        packet.seed
512                    ),
513                });
514            }
515            let seed = expected_seed;
516            debug!(
517                bead_id = BEAD_ID,
518                k_source = packet.k_source,
519                symbol_size,
520                seed,
521                "created decoder for new changeset"
522            );
523            self.decoders.insert(
524                changeset_id,
525                DecoderState::new(packet.k_source, symbol_size, seed),
526            );
527            self.received_counts.insert(changeset_id, 0);
528            created_decoder = true;
529        }
530
531        // Enforce global buffered-symbol bound before accepting a new symbol.
532        if let Some(decoder) = self.decoders.get(&changeset_id) {
533            if !decoder.has_symbol(packet.esi) {
534                let next_total = self
535                    .buffered_symbol_bytes
536                    .saturating_add(packet.symbol_data.len());
537                if next_total > self.config.max_buffered_symbol_bytes {
538                    warn!(
539                        bead_id = BEAD_ID,
540                        buffered_symbol_bytes = self.buffered_symbol_bytes,
541                        incoming_symbol_bytes = packet.symbol_data.len(),
542                        max_buffered_symbol_bytes = self.config.max_buffered_symbol_bytes,
543                        "buffered symbol budget exceeded"
544                    );
545                    if created_decoder {
546                        self.remove_decoder(changeset_id);
547                        self.state = if self.decoders.is_empty() {
548                            ReceiverState::Listening
549                        } else {
550                            ReceiverState::Collecting
551                        };
552                    }
553                    return Err(FrankenError::TooBig);
554                }
555            }
556        }
557
558        // Add symbol to decoder (with ISI deduplication) and capture decode context.
559        let (
560            ready_to_decode,
561            k_source_ctx,
562            symbol_size_ctx,
563            seed_ctx,
564            received_isis_ctx,
565            received_count_ctx,
566            source_count_ctx,
567            has_repair_ctx,
568            decoded_padded,
569        ) = {
570            let decoder = self.decoders.get_mut(&changeset_id).expect("just inserted");
571            let accepted = decoder.add_symbol(packet.esi, packet.symbol_data.clone());
572
573            if !accepted {
574                debug!(
575                    bead_id = BEAD_ID,
576                    isi = packet.esi,
577                    "duplicate ISI, symbol ignored"
578                );
579                return Ok(PacketResult::Duplicate);
580            }
581
582            self.buffered_symbol_bytes = self
583                .buffered_symbol_bytes
584                .saturating_add(packet.symbol_data.len());
585            let count = self.received_counts.entry(changeset_id).or_insert(0);
586            *count += 1;
587            debug!(
588                bead_id = BEAD_ID,
589                isi = packet.esi,
590                received = *count,
591                k_source = packet.k_source,
592                "symbol accepted"
593            );
594
595            let ready = decoder.ready_to_decode();
596            let padded = if ready { decoder.try_decode() } else { None };
597            (
598                ready,
599                decoder.k_source,
600                decoder.symbol_size,
601                decoder.seed,
602                decoder.sorted_isis(),
603                decoder.received_count(),
604                decoder.source_symbol_count(),
605                decoder.has_repair_symbols(),
606                padded,
607            )
608        };
609
610        if ready_to_decode {
611            info!(
612                bead_id = BEAD_ID,
613                received = received_count_ctx,
614                k_source = k_source_ctx,
615                "attempting decode"
616            );
617            self.state = ReceiverState::Decoding;
618
619            if let Some(padded_bytes) = decoded_padded {
620                let success_proof =
621                    if self.config.decode_proof_policy.emit_on_repair_success && has_repair_ctx {
622                        Some(Self::build_decode_proof(DecodeProofBuildInput {
623                            changeset_id,
624                            k_source: k_source_ctx,
625                            symbol_size: symbol_size_ctx,
626                            seed: seed_ctx,
627                            received_isis: &received_isis_ctx,
628                            decode_success: true,
629                            intermediate_rank: Some(k_source_ctx),
630                            symbols_used: received_count_ctx,
631                        }))
632                    } else {
633                        None
634                    };
635
636                // Decode succeeded: truncate to total_len and parse pages.
637                match self.parse_and_validate_changeset(changeset_id, &padded_bytes) {
638                    Ok(mut result) => {
639                        let n_pages = result.pages.len();
640                        if let Some(proof) = success_proof {
641                            self.record_decode_proof(proof.clone());
642                            result.decode_proof = Some(proof);
643                        }
644                        self.pending_results.push(result);
645                        self.state = ReceiverState::Applying;
646                        info!(
647                            bead_id = BEAD_ID,
648                            n_pages, "decode succeeded, ready to apply"
649                        );
650                        // Clean up decoder for this changeset.
651                        self.remove_decoder(changeset_id);
652                        return Ok(PacketResult::DecodeReady);
653                    }
654                    Err(e) => {
655                        error!(
656                            bead_id = BEAD_ID,
657                            error = %e,
658                            "changeset validation failed after decode"
659                        );
660                        // Clean up failed decoder.
661                        self.remove_decoder(changeset_id);
662                        self.state = if self.decoders.is_empty() {
663                            ReceiverState::Listening
664                        } else {
665                            ReceiverState::Collecting
666                        };
667                        return Err(e);
668                    }
669                }
670            }
671
672            if self.config.decode_proof_policy.emit_on_decode_failure {
673                let failure_proof = Self::build_decode_proof(DecodeProofBuildInput {
674                    changeset_id,
675                    k_source: k_source_ctx,
676                    symbol_size: symbol_size_ctx,
677                    seed: seed_ctx,
678                    received_isis: &received_isis_ctx,
679                    decode_success: false,
680                    intermediate_rank: Some(source_count_ctx),
681                    symbols_used: received_count_ctx,
682                });
683                self.record_decode_proof(failure_proof);
684            }
685
686            // Decode failed (need more symbols).
687            warn!(
688                bead_id = BEAD_ID,
689                source_count = source_count_ctx,
690                k_source = k_source_ctx,
691                "decode failed at K_source, continuing collection"
692            );
693            self.state = ReceiverState::Collecting;
694            return Ok(PacketResult::NeedMore);
695        }
696
697        Ok(PacketResult::Accepted)
698    }
699
700    /// Parse and validate decoded changeset bytes.
701    #[allow(clippy::too_many_lines)]
702    fn parse_and_validate_changeset(
703        &self,
704        changeset_id: ChangesetId,
705        padded_bytes: &[u8],
706    ) -> Result<DecodeResult> {
707        if padded_bytes.len() < CHANGESET_HEADER_SIZE {
708            return Err(FrankenError::DatabaseCorrupt {
709                detail: format!(
710                    "decoded bytes too short for header: {} < {CHANGESET_HEADER_SIZE}",
711                    padded_bytes.len()
712                ),
713            });
714        }
715
716        // Parse header.
717        let header_bytes: [u8; CHANGESET_HEADER_SIZE] = padded_bytes[..CHANGESET_HEADER_SIZE]
718            .try_into()
719            .expect("checked length");
720        let header = ChangesetHeader::from_bytes(&header_bytes)?;
721
722        // Truncate to total_len.
723        let total_len =
724            usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
725                what: "total_len".to_owned(),
726                value: header.total_len.to_string(),
727            })?;
728        if total_len < CHANGESET_HEADER_SIZE {
729            return Err(FrankenError::DatabaseCorrupt {
730                detail: format!(
731                    "total_len ({total_len}) smaller than changeset header size ({CHANGESET_HEADER_SIZE})"
732                ),
733            });
734        }
735        if total_len > padded_bytes.len() {
736            return Err(FrankenError::DatabaseCorrupt {
737                detail: format!(
738                    "total_len ({total_len}) exceeds decoded bytes ({})",
739                    padded_bytes.len()
740                ),
741            });
742        }
743        let changeset_bytes = &padded_bytes[..total_len];
744
745        // Parse page entries.
746        let page_size =
747            usize::try_from(header.page_size).map_err(|_| FrankenError::OutOfRange {
748                what: "page_size".to_owned(),
749                value: header.page_size.to_string(),
750            })?;
751        let entry_size = 4_usize
752            .checked_add(8)
753            .and_then(|value| value.checked_add(page_size))
754            .ok_or_else(|| FrankenError::OutOfRange {
755                what: "entry_size".to_owned(),
756                value: format!("page_size={}", header.page_size),
757            })?; // page_number + xxh3 + data
758        let n_pages = usize::try_from(header.n_pages).map_err(|_| FrankenError::OutOfRange {
759            what: "n_pages".to_owned(),
760            value: header.n_pages.to_string(),
761        })?;
762        let data_start = CHANGESET_HEADER_SIZE;
763        let data_bytes = &changeset_bytes[data_start..];
764        let required_bytes =
765            entry_size
766                .checked_mul(n_pages)
767                .ok_or_else(|| FrankenError::OutOfRange {
768                    what: "changeset payload size".to_owned(),
769                    value: format!("entry_size={entry_size}, n_pages={}", header.n_pages),
770                })?;
771
772        if data_bytes.len() < required_bytes {
773            return Err(FrankenError::DatabaseCorrupt {
774                detail: format!(
775                    "insufficient data for {} pages: {} < {}",
776                    header.n_pages,
777                    data_bytes.len(),
778                    required_bytes,
779                ),
780            });
781        }
782
783        let mut pages = Vec::with_capacity(n_pages);
784        let decoder_state_symbols = self
785            .decoders
786            .get(&changeset_id)
787            .map_or(0, DecoderState::received_count);
788
789        for i in 0..n_pages {
790            let offset = i
791                .checked_mul(entry_size)
792                .ok_or_else(|| FrankenError::OutOfRange {
793                    what: "page entry offset".to_owned(),
794                    value: format!("index={i}, entry_size={entry_size}"),
795                })?;
796            let page_number =
797                u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
798            let page_xxh3 = u64::from_le_bytes(
799                data_bytes[offset + 4..offset + 12]
800                    .try_into()
801                    .expect("8 bytes"),
802            );
803            let page_data = data_bytes[offset + 12..offset + 12 + page_size].to_vec();
804
805            // Validate page xxh3.
806            let computed_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_data);
807            if computed_xxh3 != page_xxh3 {
808                error!(
809                    bead_id = BEAD_ID,
810                    page_number,
811                    expected_xxh3 = page_xxh3,
812                    computed_xxh3,
813                    "page xxh3 validation failed"
814                );
815                return Err(FrankenError::DatabaseCorrupt {
816                    detail: format!(
817                        "page {page_number} xxh3 mismatch: expected {page_xxh3:#x}, got {computed_xxh3:#x}"
818                    ),
819                });
820            }
821
822            pages.push(DecodedPage {
823                page_number,
824                page_data,
825            });
826        }
827
828        // Pages should already be sorted (sender sorts them).
829        debug_assert!(
830            pages
831                .windows(2)
832                .all(|w| w[0].page_number <= w[1].page_number)
833        );
834
835        Ok(DecodeResult {
836            changeset_id,
837            pages,
838            symbols_used: decoder_state_symbols,
839            decode_proof: None,
840        })
841    }
842
843    fn build_decode_proof(input: DecodeProofBuildInput<'_>) -> EcsDecodeProof {
844        let object_id = ObjectId::from_bytes(*input.changeset_id.as_bytes());
845        let timing_ns =
846            deterministic_timing_ns(input.k_source, input.symbol_size, input.symbols_used);
847        EcsDecodeProof::from_esis(
848            object_id,
849            input.k_source,
850            input.received_isis,
851            input.decode_success,
852            input.intermediate_rank,
853            timing_ns,
854            input.seed,
855        )
856        .with_changeset_id(*input.changeset_id.as_bytes())
857    }
858
859    fn record_decode_proof(&mut self, proof: EcsDecodeProof) {
860        self.decode_audit_seq = self.decode_audit_seq.saturating_add(1);
861        self.decode_audit.push(DecodeAuditEntry {
862            proof,
863            seq: self.decode_audit_seq,
864            lab_mode: false,
865        });
866    }
867
868    /// Apply pending decoded results. Returns applied page counts.
869    ///
870    /// In production, this writes pages to the local database. Here we
871    /// validate and return the results for the caller to apply.
872    ///
873    /// # Errors
874    ///
875    /// Returns error if not in APPLYING state.
876    pub fn apply_pending(&mut self) -> Result<Vec<DecodeResult>> {
877        if self.state != ReceiverState::Applying {
878            return Err(FrankenError::Internal(format!(
879                "receiver must be APPLYING to apply, current state: {:?}",
880                self.state
881            )));
882        }
883
884        let results = std::mem::take(&mut self.pending_results);
885        let n = results.len();
886        self.applied_count += u64::try_from(n).unwrap_or(u64::MAX);
887
888        info!(
889            bead_id = BEAD_ID,
890            applied = n,
891            total_applied = self.applied_count,
892            "applied pending changesets"
893        );
894
895        // Transition to COMPLETE.
896        self.state = ReceiverState::Complete;
897        Ok(results)
898    }
899
900    /// Transition from COMPLETE back to LISTENING for the next changeset.
901    ///
902    /// # Errors
903    ///
904    /// Returns error if not in COMPLETE state.
905    pub fn reset_to_listening(&mut self) -> Result<()> {
906        if self.state != ReceiverState::Complete {
907            return Err(FrankenError::Internal(format!(
908                "receiver must be COMPLETE to reset, current state: {:?}",
909                self.state
910            )));
911        }
912        self.state = ReceiverState::Listening;
913        debug!(bead_id = BEAD_ID, "receiver reset to LISTENING");
914        Ok(())
915    }
916
917    /// Force reset to LISTENING from any state (e.g., on error recovery).
918    pub fn force_reset(&mut self) {
919        self.decoders.clear();
920        self.received_counts.clear();
921        self.buffered_symbol_bytes = 0;
922        self.pending_results.clear();
923        self.state = ReceiverState::Listening;
924        warn!(bead_id = BEAD_ID, "receiver force-reset to LISTENING");
925    }
926}
927
928impl Default for ReplicationReceiver {
929    fn default() -> Self {
930        Self::new()
931    }
932}
933
934fn deterministic_timing_ns(k_source: u32, symbol_size: u32, symbols_used: u32) -> u64 {
935    let mut material = [0_u8; 12];
936    material[..4].copy_from_slice(&k_source.to_le_bytes());
937    material[4..8].copy_from_slice(&symbol_size.to_le_bytes());
938    material[8..12].copy_from_slice(&symbols_used.to_le_bytes());
939    xxhash_rust::xxh3::xxh3_64(&material)
940}
941
942/// Result of processing a single packet.
943#[derive(Debug, Clone, Copy, PartialEq, Eq)]
944pub enum PacketResult {
945    /// Symbol accepted, need more for decode.
946    Accepted,
947    /// Integrity/auth invalid; packet ignored as erasure.
948    Erasure,
949    /// Duplicate ISI, silently ignored.
950    Duplicate,
951    /// Enough symbols collected, decode succeeded and ready to apply.
952    DecodeReady,
953    /// Had enough symbols but decode failed, need more.
954    NeedMore,
955}
956
957// ---------------------------------------------------------------------------
958// Changeset parsing utility (used by tests and receiver)
959// ---------------------------------------------------------------------------
960
961/// Parse changeset bytes into page entries (for validation/testing).
962///
963/// # Errors
964///
965/// Returns error if the changeset is malformed.
966pub fn parse_changeset_pages(changeset_bytes: &[u8]) -> Result<(ChangesetHeader, Vec<PageEntry>)> {
967    if changeset_bytes.len() < CHANGESET_HEADER_SIZE {
968        return Err(FrankenError::DatabaseCorrupt {
969            detail: format!(
970                "changeset too short: {} < {CHANGESET_HEADER_SIZE}",
971                changeset_bytes.len()
972            ),
973        });
974    }
975
976    let header_bytes: [u8; CHANGESET_HEADER_SIZE] = changeset_bytes[..CHANGESET_HEADER_SIZE]
977        .try_into()
978        .expect("checked length");
979    let header = ChangesetHeader::from_bytes(&header_bytes)?;
980
981    let total_len = usize::try_from(header.total_len).map_err(|_| FrankenError::OutOfRange {
982        what: "total_len".to_owned(),
983        value: header.total_len.to_string(),
984    })?;
985    if total_len < CHANGESET_HEADER_SIZE {
986        return Err(FrankenError::DatabaseCorrupt {
987            detail: format!(
988                "total_len ({total_len}) smaller than changeset header size ({CHANGESET_HEADER_SIZE})"
989            ),
990        });
991    }
992    if total_len > changeset_bytes.len() {
993        return Err(FrankenError::DatabaseCorrupt {
994            detail: format!(
995                "total_len ({total_len}) exceeds available bytes ({})",
996                changeset_bytes.len()
997            ),
998        });
999    }
1000    let changeset_bytes = &changeset_bytes[..total_len];
1001
1002    let page_size = usize::try_from(header.page_size).map_err(|_| FrankenError::OutOfRange {
1003        what: "page_size".to_owned(),
1004        value: header.page_size.to_string(),
1005    })?;
1006    let entry_size = 4_usize
1007        .checked_add(8)
1008        .and_then(|value| value.checked_add(page_size))
1009        .ok_or_else(|| FrankenError::OutOfRange {
1010            what: "entry_size".to_owned(),
1011            value: format!("page_size={}", header.page_size),
1012        })?;
1013    let n_pages = usize::try_from(header.n_pages).map_err(|_| FrankenError::OutOfRange {
1014        what: "n_pages".to_owned(),
1015        value: header.n_pages.to_string(),
1016    })?;
1017    let data_start = CHANGESET_HEADER_SIZE;
1018    let data_bytes = &changeset_bytes[data_start..];
1019    let required_bytes =
1020        entry_size
1021            .checked_mul(n_pages)
1022            .ok_or_else(|| FrankenError::OutOfRange {
1023                what: "changeset payload size".to_owned(),
1024                value: format!("entry_size={entry_size}, n_pages={}", header.n_pages),
1025            })?;
1026    if data_bytes.len() < required_bytes {
1027        return Err(FrankenError::DatabaseCorrupt {
1028            detail: format!(
1029                "insufficient data for {} pages: {} < {}",
1030                header.n_pages,
1031                data_bytes.len(),
1032                required_bytes
1033            ),
1034        });
1035    }
1036
1037    let mut pages = Vec::with_capacity(n_pages);
1038    for i in 0..n_pages {
1039        let offset = i
1040            .checked_mul(entry_size)
1041            .ok_or_else(|| FrankenError::OutOfRange {
1042                what: "page entry offset".to_owned(),
1043                value: format!("index={i}, entry_size={entry_size}"),
1044            })?;
1045        let page_number =
1046            u32::from_le_bytes(data_bytes[offset..offset + 4].try_into().expect("4 bytes"));
1047        let page_xxh3 = u64::from_le_bytes(
1048            data_bytes[offset + 4..offset + 12]
1049                .try_into()
1050                .expect("8 bytes"),
1051        );
1052        let page_bytes = data_bytes[offset + 12..offset + 12 + page_size].to_vec();
1053
1054        pages.push(PageEntry {
1055            page_number,
1056            page_xxh3,
1057            page_bytes,
1058        });
1059    }
1060
1061    Ok((header, pages))
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use asupersync::runtime::RuntimeBuilder;
1067    use asupersync::security::authenticated::AuthenticatedSymbol;
1068    use asupersync::security::tag::AuthenticationTag;
1069    use asupersync::transport::{
1070        SimNetwork, SimTransportConfig, SymbolSinkExt as _, SymbolStreamExt as _,
1071    };
1072    use asupersync::types::{Symbol, SymbolId, SymbolKind};
1073    use std::collections::HashSet;
1074
1075    use super::*;
1076    use crate::replication_sender::{
1077        CHANGESET_HEADER_SIZE, ChangesetId, PageEntry, REPLICATION_HEADER_SIZE, ReplicationPacket,
1078        ReplicationPacketV2Header, ReplicationSender, ReplicationWireVersion, SenderConfig,
1079        compute_changeset_id, derive_seed_from_changeset_id, encode_changeset,
1080    };
1081
1082    const TEST_BEAD_ID: &str = "bd-1hi.14";
1083
1084    #[allow(clippy::cast_possible_truncation)]
1085    fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
1086        page_numbers
1087            .iter()
1088            .map(|&pn| {
1089                let mut data = vec![0_u8; page_size as usize];
1090                for (i, byte) in data.iter_mut().enumerate() {
1091                    *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
1092                }
1093                PageEntry::new(pn, data)
1094            })
1095            .collect()
1096    }
1097
1098    /// Helper: generate sender packets for a set of pages.
1099    fn generate_sender_packets(
1100        page_size: u32,
1101        page_numbers: &[u32],
1102        symbol_size: u16,
1103    ) -> Vec<Vec<u8>> {
1104        generate_sender_packets_with_multiplier(page_size, page_numbers, symbol_size, 1)
1105    }
1106
1107    fn generate_sender_packets_with_multiplier(
1108        page_size: u32,
1109        page_numbers: &[u32],
1110        symbol_size: u16,
1111        max_isi_multiplier: u32,
1112    ) -> Vec<Vec<u8>> {
1113        let mut sender = ReplicationSender::new();
1114        let mut pages = make_pages(page_size, page_numbers);
1115        let config = SenderConfig {
1116            symbol_size,
1117            max_isi_multiplier,
1118        };
1119        sender
1120            .prepare(page_size, &mut pages, config)
1121            .expect("prepare");
1122        sender.start_streaming().expect("start");
1123
1124        let mut packets = Vec::new();
1125        while let Some(packet) = sender.next_packet().expect("next") {
1126            packets.push(packet.to_bytes().expect("encode"));
1127        }
1128        packets
1129    }
1130
1131    #[derive(Debug)]
1132    struct SimNetworkDelivery {
1133        sent_count: usize,
1134        delivered: Vec<(u32, Vec<u8>)>,
1135    }
1136
1137    fn packet_symbol(esi: u32, wire_bytes: Vec<u8>) -> AuthenticatedSymbol {
1138        let symbol_id = SymbolId::new_for_test(0xBEEF, 0, esi);
1139        let symbol = Symbol::new(symbol_id, wire_bytes, SymbolKind::Source);
1140        AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero())
1141    }
1142
1143    fn transmit_packets_simnetwork(
1144        config: SimTransportConfig,
1145        packet_bytes: &[Vec<u8>],
1146    ) -> SimNetworkDelivery {
1147        let network = SimNetwork::fully_connected(2, config);
1148        let (mut sink, mut stream) = network.transport(0, 1);
1149        let runtime = RuntimeBuilder::current_thread()
1150            .build()
1151            .expect("runtime build");
1152
1153        runtime.block_on(async {
1154            for (index, bytes) in packet_bytes.iter().enumerate() {
1155                let esi = u32::try_from(index).expect("test packet index fits u32");
1156                sink.send(packet_symbol(esi, bytes.clone()))
1157                    .await
1158                    .expect("send simulated symbol");
1159            }
1160            sink.close().await.expect("close simulated sink");
1161
1162            let mut delivered = Vec::new();
1163            while let Some(item) = stream.next().await {
1164                let auth = item.expect("sim stream item");
1165                delivered.push((auth.symbol().id().esi(), auth.symbol().data().to_vec()));
1166            }
1167
1168            SimNetworkDelivery {
1169                sent_count: packet_bytes.len(),
1170                delivered,
1171            }
1172        })
1173    }
1174
1175    fn has_duplicate_esies(delivery: &SimNetworkDelivery) -> bool {
1176        let mut seen = HashSet::new();
1177        delivery.delivered.iter().any(|(esi, _)| !seen.insert(*esi))
1178    }
1179
1180    fn has_reordered_esies(delivery: &SimNetworkDelivery) -> bool {
1181        delivery
1182            .delivered
1183            .windows(2)
1184            .any(|window| window[0].0 > window[1].0)
1185    }
1186
1187    fn has_corrupted_wire_bytes(delivery: &SimNetworkDelivery, original: &[Vec<u8>]) -> bool {
1188        delivery.delivered.iter().any(|(esi, bytes)| {
1189            usize::try_from(*esi)
1190                .ok()
1191                .and_then(|index| original.get(index))
1192                .is_some_and(|expected| expected.as_slice() != bytes.as_slice())
1193        })
1194    }
1195
1196    fn decode_from_wire_packets(
1197        delivered: &[(u32, Vec<u8>)],
1198    ) -> (Option<Vec<DecodedPage>>, usize, usize) {
1199        let mut receiver = ReplicationReceiver::new();
1200        let mut erasures = 0_usize;
1201        let mut parse_errors = 0_usize;
1202
1203        for (_, wire) in delivered {
1204            match receiver.process_packet(wire) {
1205                Ok(PacketResult::DecodeReady) => {
1206                    let mut applied = receiver.apply_pending().expect("apply decoded changeset");
1207                    let pages = applied.pop().expect("decode result pages").pages;
1208                    return (Some(pages), erasures, parse_errors);
1209                }
1210                Ok(PacketResult::Erasure) => erasures += 1,
1211                Ok(PacketResult::Accepted | PacketResult::Duplicate | PacketResult::NeedMore) => {}
1212                Err(_) => parse_errors += 1,
1213            }
1214        }
1215
1216        (None, erasures, parse_errors)
1217    }
1218
1219    fn decoded_matches_original(decoded: &[DecodedPage], original: &[PageEntry]) -> bool {
1220        if decoded.len() != original.len() {
1221            return false;
1222        }
1223        for (decoded, original) in decoded.iter().zip(original.iter()) {
1224            if decoded.page_number != original.page_number {
1225                return false;
1226            }
1227            if decoded.page_data != original.page_bytes {
1228                return false;
1229            }
1230        }
1231        true
1232    }
1233
1234    fn make_packet(
1235        changeset_id: ChangesetId,
1236        sbn: u8,
1237        esi: u32,
1238        k_source: u32,
1239        symbol_data: Vec<u8>,
1240    ) -> ReplicationPacket {
1241        let symbol_size_t =
1242            u16::try_from(symbol_data.len()).expect("test symbol payload must fit u16");
1243        let seed = derive_seed_from_changeset_id(&changeset_id);
1244        ReplicationPacket::new_v2(
1245            ReplicationPacketV2Header {
1246                changeset_id,
1247                sbn,
1248                esi,
1249                k_source,
1250                r_repair: 0,
1251                symbol_size_t,
1252                seed,
1253            },
1254            symbol_data,
1255        )
1256    }
1257
1258    fn receiver_with_decode_proofs() -> ReplicationReceiver {
1259        ReplicationReceiver::with_config(ReceiverConfig {
1260            auth_key: None,
1261            decode_proof_policy: DecodeProofEmissionPolicy::durability_critical(),
1262            ..ReceiverConfig::default()
1263        })
1264    }
1265
1266    // -----------------------------------------------------------------------
1267    // State transition tests
1268    // -----------------------------------------------------------------------
1269
1270    #[test]
1271    fn test_receiver_listening_to_collecting() {
1272        let mut receiver = ReplicationReceiver::new();
1273        assert_eq!(
1274            receiver.state(),
1275            ReceiverState::Listening,
1276            "bead_id={TEST_BEAD_ID} case=initial_state"
1277        );
1278
1279        let packets = generate_sender_packets(512, &[1], 512);
1280        assert!(!packets.is_empty());
1281
1282        receiver.process_packet(&packets[0]).expect("first packet");
1283        assert_ne!(
1284            receiver.state(),
1285            ReceiverState::Listening,
1286            "bead_id={TEST_BEAD_ID} case=transition_on_first_packet"
1287        );
1288    }
1289
1290    #[test]
1291    fn test_receiver_decoder_creation() {
1292        let mut receiver = ReplicationReceiver::new();
1293        let packets = generate_sender_packets(512, &[1, 2], 512);
1294        assert_eq!(receiver.active_decoders(), 0);
1295
1296        receiver.process_packet(&packets[0]).expect("first packet");
1297        // Should have created exactly one decoder.
1298        // Note: if decode triggers, the decoder may be cleaned up,
1299        // so just check that processing succeeded.
1300        assert_ne!(
1301            receiver.state(),
1302            ReceiverState::Listening,
1303            "bead_id={TEST_BEAD_ID} case=decoder_created"
1304        );
1305    }
1306
1307    #[test]
1308    fn test_receiver_rejects_new_changeset_when_decoder_limit_hit() {
1309        let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1310            max_inflight_decoders: 1,
1311            ..ReceiverConfig::default()
1312        });
1313
1314        let first = make_packet(
1315            ChangesetId::from_bytes([0x31; 16]),
1316            0,
1317            0,
1318            100,
1319            vec![0x11; 256],
1320        );
1321        receiver
1322            .process_parsed_packet(&first)
1323            .expect("first decoder");
1324        assert_eq!(receiver.active_decoders(), 1);
1325
1326        let second = make_packet(
1327            ChangesetId::from_bytes([0x32; 16]),
1328            0,
1329            0,
1330            100,
1331            vec![0x22; 256],
1332        );
1333        let err = receiver.process_parsed_packet(&second).unwrap_err();
1334        assert!(matches!(err, FrankenError::Busy));
1335        assert_eq!(receiver.active_decoders(), 1);
1336    }
1337
1338    #[test]
1339    fn test_receiver_enforces_buffered_symbol_budget() {
1340        let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1341            max_buffered_symbol_bytes: 512,
1342            ..ReceiverConfig::default()
1343        });
1344
1345        let first = make_packet(
1346            ChangesetId::from_bytes([0x41; 16]),
1347            0,
1348            0,
1349            100,
1350            vec![0x55; 400],
1351        );
1352        receiver
1353            .process_parsed_packet(&first)
1354            .expect("first packet");
1355        assert_eq!(receiver.active_decoders(), 1);
1356
1357        // New changeset would exceed budget and should be rejected/cleaned up.
1358        let second = make_packet(
1359            ChangesetId::from_bytes([0x42; 16]),
1360            0,
1361            0,
1362            100,
1363            vec![0x77; 200],
1364        );
1365        let err = receiver.process_parsed_packet(&second).unwrap_err();
1366        assert!(matches!(err, FrankenError::TooBig));
1367        assert_eq!(receiver.active_decoders(), 1);
1368    }
1369
1370    #[test]
1371    fn test_receiver_seed_derivation() {
1372        // Verify seed = xxh3_64(changeset_id_bytes) matches sender.
1373        let id = ChangesetId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
1374        let seed = derive_seed_from_changeset_id(&id);
1375
1376        let expected = xxhash_rust::xxh3::xxh3_64(id.as_bytes());
1377        assert_eq!(
1378            seed, expected,
1379            "bead_id={TEST_BEAD_ID} case=seed_matches_sender"
1380        );
1381    }
1382
1383    #[test]
1384    fn test_receiver_v1_reject_sbn_nonzero() {
1385        let mut receiver = ReplicationReceiver::new();
1386        let packet = make_packet(
1387            ChangesetId::from_bytes([0xAA; 16]),
1388            1, // V1 violation
1389            0,
1390            10,
1391            vec![0x55; 512],
1392        );
1393        let wire = packet.to_bytes().expect("encode");
1394        let result = receiver.process_packet(&wire);
1395        assert!(
1396            result.is_err(),
1397            "bead_id={TEST_BEAD_ID} case=v1_sbn_rejected"
1398        );
1399    }
1400
1401    #[test]
1402    fn test_receiver_k_source_validation() {
1403        let mut receiver = ReplicationReceiver::new();
1404
1405        // K_source = 0 → rejected.
1406        let packet_zero = make_packet(
1407            ChangesetId::from_bytes([0xBB; 16]),
1408            0,
1409            0,
1410            0,
1411            vec![0x55; 512],
1412        );
1413        let wire_zero = packet_zero.to_bytes().expect("encode");
1414        assert!(
1415            receiver.process_packet(&wire_zero).is_err(),
1416            "bead_id={TEST_BEAD_ID} case=k_source_zero_rejected"
1417        );
1418
1419        // K_source = K_MAX + 1 → rejected.
1420        let packet_over = make_packet(
1421            ChangesetId::from_bytes([0xCC; 16]),
1422            0,
1423            0,
1424            K_MAX + 1,
1425            vec![0x55; 512],
1426        );
1427        // ESI only has 24 bits, K_source > K_MAX might not fit in packet format
1428        // but we test the validation path directly.
1429        let result = receiver.process_parsed_packet(&packet_over);
1430        assert!(
1431            result.is_err(),
1432            "bead_id={TEST_BEAD_ID} case=k_source_over_max_rejected"
1433        );
1434
1435        // K_source = K_MAX → accepted.
1436        let packet_max = make_packet(
1437            ChangesetId::from_bytes([0xDD; 16]),
1438            0,
1439            0,
1440            K_MAX,
1441            vec![0x55; 512],
1442        );
1443        let result = receiver.process_parsed_packet(&packet_max);
1444        assert!(
1445            result.is_ok(),
1446            "bead_id={TEST_BEAD_ID} case=k_source_at_max_accepted"
1447        );
1448    }
1449
1450    #[test]
1451    fn test_receiver_symbol_size_inference() {
1452        let mut receiver = ReplicationReceiver::new();
1453        let packet = make_packet(
1454            ChangesetId::from_bytes([0xEE; 16]),
1455            0,
1456            0,
1457            100,
1458            vec![0x42; 1024],
1459        );
1460        receiver
1461            .process_parsed_packet(&packet)
1462            .expect("accept packet");
1463
1464        // Symbol size should be inferred as 1024.
1465        let decoder = receiver
1466            .decoders
1467            .get(&packet.changeset_id)
1468            .expect("decoder exists");
1469        assert_eq!(
1470            decoder.symbol_size, 1024,
1471            "bead_id={TEST_BEAD_ID} case=symbol_size_inferred"
1472        );
1473
1474        // Zero-length symbol data → rejected.
1475        let mut receiver2 = ReplicationReceiver::new();
1476        let empty_packet = make_packet(ChangesetId::from_bytes([0xFF; 16]), 0, 0, 10, vec![]);
1477        assert!(
1478            receiver2.process_parsed_packet(&empty_packet).is_err(),
1479            "bead_id={TEST_BEAD_ID} case=zero_symbol_size_rejected"
1480        );
1481    }
1482
1483    #[test]
1484    fn test_receiver_k_source_mismatch_rejected() {
1485        let mut receiver = ReplicationReceiver::new();
1486        let id = ChangesetId::from_bytes([0x11; 16]);
1487
1488        let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1489        receiver
1490            .process_parsed_packet(&p1)
1491            .expect("first packet ok");
1492
1493        // Same changeset_id, different K_source.
1494        let p2 = make_packet(id, 0, 1, 200, vec![0x42; 512]); // mismatch
1495        assert!(
1496            receiver.process_parsed_packet(&p2).is_err(),
1497            "bead_id={TEST_BEAD_ID} case=k_source_mismatch_rejected"
1498        );
1499    }
1500
1501    #[test]
1502    fn test_receiver_symbol_size_mismatch_rejected() {
1503        let mut receiver = ReplicationReceiver::new();
1504        let id = ChangesetId::from_bytes([0x22; 16]);
1505
1506        let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1507        receiver
1508            .process_parsed_packet(&p1)
1509            .expect("first packet ok");
1510
1511        // Same changeset_id, different symbol_size.
1512        let p2 = make_packet(id, 0, 1, 100, vec![0x42; 1024]); // different size
1513        assert!(
1514            receiver.process_parsed_packet(&p2).is_err(),
1515            "bead_id={TEST_BEAD_ID} case=symbol_size_mismatch_rejected"
1516        );
1517    }
1518
1519    #[test]
1520    fn test_receiver_isi_deduplication() {
1521        let mut receiver = ReplicationReceiver::new();
1522        let id = ChangesetId::from_bytes([0x33; 16]);
1523
1524        let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]);
1525
1526        let r1 = receiver.process_parsed_packet(&p1).expect("first");
1527        assert_eq!(
1528            r1,
1529            PacketResult::Accepted,
1530            "bead_id={TEST_BEAD_ID} case=first_accepted"
1531        );
1532
1533        // Same ISI again → duplicate.
1534        let r2 = receiver.process_parsed_packet(&p1).expect("duplicate");
1535        assert_eq!(
1536            r2,
1537            PacketResult::Duplicate,
1538            "bead_id={TEST_BEAD_ID} case=isi_dedup"
1539        );
1540
1541        // Count should still be 1.
1542        let count = receiver.received_counts.get(&id).copied().unwrap_or(0);
1543        assert_eq!(
1544            count, 1,
1545            "bead_id={TEST_BEAD_ID} case=dedup_count_unchanged"
1546        );
1547    }
1548
1549    #[test]
1550    fn test_receiver_treats_payload_hash_mismatch_as_erasure() {
1551        let mut receiver = ReplicationReceiver::new();
1552        let mut packet = make_packet(
1553            ChangesetId::from_bytes([0x44; 16]),
1554            0,
1555            0,
1556            100,
1557            vec![0x42; 512],
1558        );
1559        packet.payload_xxh3 ^= 0xDEAD_BEEF;
1560        let wire = packet.to_bytes().expect("encode tampered packet");
1561        let result = receiver.process_packet(&wire).expect("process packet");
1562        assert_eq!(result, PacketResult::Erasure);
1563    }
1564
1565    #[test]
1566    fn test_receiver_treats_invalid_auth_tag_as_erasure() {
1567        let receiver_key = [0x11_u8; 32];
1568        let sender_key = [0x22_u8; 32];
1569        let mut receiver =
1570            ReplicationReceiver::with_config(ReceiverConfig::with_auth_key(receiver_key));
1571        let mut packet = make_packet(
1572            ChangesetId::from_bytes([0x45; 16]),
1573            0,
1574            0,
1575            100,
1576            vec![0x24; 512],
1577        );
1578        packet.attach_auth_tag(&sender_key);
1579        let wire = packet.to_bytes().expect("encode auth packet");
1580        let result = receiver.process_packet(&wire).expect("process packet");
1581        assert_eq!(result, PacketResult::Erasure);
1582    }
1583
1584    #[test]
1585    fn test_receiver_accepts_legacy_v1_packets() {
1586        let mut receiver = ReplicationReceiver::new();
1587        let id = ChangesetId::from_bytes([0x46; 16]);
1588        let symbol_data = vec![0x5A; 512];
1589        let legacy = ReplicationPacket {
1590            wire_version: ReplicationWireVersion::LegacyV1,
1591            changeset_id: id,
1592            sbn: 0,
1593            esi: 0,
1594            k_source: 100,
1595            r_repair: 0,
1596            symbol_size_t: 512,
1597            seed: derive_seed_from_changeset_id(&id),
1598            payload_xxh3: ReplicationPacket::compute_payload_xxh3(&symbol_data),
1599            auth_tag: None,
1600            symbol_data,
1601        };
1602        let wire = legacy.to_bytes().expect("encode legacy packet");
1603        let parsed = ReplicationPacket::from_bytes(&wire).expect("decode legacy packet");
1604        assert_eq!(parsed.wire_version, ReplicationWireVersion::LegacyV1);
1605        let result = receiver
1606            .process_packet(&wire)
1607            .expect("process legacy packet");
1608        assert_eq!(result, PacketResult::Accepted);
1609    }
1610
1611    #[test]
1612    fn test_receiver_decode_at_k_source() {
1613        // Use the sender to generate proper packets, then feed to receiver.
1614        let page_size = 512_u32;
1615        let mut receiver = ReplicationReceiver::new();
1616        let packets = generate_sender_packets(page_size, &[1, 2, 3], 512);
1617
1618        let mut last_result = PacketResult::Accepted;
1619        for pkt in &packets {
1620            let result = receiver
1621                .process_packet(pkt)
1622                .expect("bead_id={TEST_BEAD_ID} case=decode_at_k unexpected error");
1623            last_result = result;
1624        }
1625
1626        assert_eq!(
1627            last_result,
1628            PacketResult::DecodeReady,
1629            "bead_id={TEST_BEAD_ID} case=decode_triggers_at_k_source"
1630        );
1631        assert_eq!(
1632            receiver.state(),
1633            ReceiverState::Applying,
1634            "bead_id={TEST_BEAD_ID} case=state_applying_after_decode"
1635        );
1636    }
1637
1638    #[test]
1639    fn test_receiver_decode_failure_emits_proof_when_enabled() {
1640        let mut receiver = receiver_with_decode_proofs();
1641        let changeset_id = ChangesetId::from_bytes([0x5A; 16]);
1642
1643        // Two repair-only symbols at K=2: ready_to_decode => true, but decode fails.
1644        let p1 = make_packet(changeset_id, 0, 2, 2, vec![0xA1; 64]);
1645        let p2 = make_packet(changeset_id, 0, 3, 2, vec![0xA2; 64]);
1646
1647        let r1 = receiver.process_parsed_packet(&p1).expect("first packet");
1648        assert_eq!(r1, PacketResult::Accepted);
1649        let r2 = receiver.process_parsed_packet(&p2).expect("second packet");
1650        assert_eq!(r2, PacketResult::NeedMore);
1651
1652        let audit = receiver.take_decode_audit_entries();
1653        assert_eq!(audit.len(), 1, "bead_id=bd-faz4 case=failure_proof_emitted");
1654        let proof = &audit[0].proof;
1655        assert!(
1656            !proof.decode_success,
1657            "bead_id=bd-faz4 case=failure_proof_decode_success_false"
1658        );
1659        assert_eq!(proof.changeset_id, Some(*changeset_id.as_bytes()));
1660        assert!(
1661            proof.is_consistent(),
1662            "bead_id=bd-faz4 case=failure_proof_consistent"
1663        );
1664    }
1665
1666    #[test]
1667    fn test_receiver_decode_success_with_repair_emits_proof_when_enabled() {
1668        let mut receiver = ReplicationReceiver::with_config(ReceiverConfig {
1669            auth_key: None,
1670            decode_proof_policy: DecodeProofEmissionPolicy {
1671                emit_on_decode_failure: false,
1672                emit_on_repair_success: true,
1673            },
1674            ..ReceiverConfig::default()
1675        });
1676        let page_size = 64_u32;
1677        let mut pages = make_pages(page_size, &[7]);
1678        let changeset_bytes = encode_changeset(page_size, &mut pages).expect("encode changeset");
1679        let changeset_id = compute_changeset_id(&changeset_bytes);
1680
1681        // Build K=2 source symbols from encoded bytes.
1682        let symbol_size = 64_usize;
1683        let mut s0 = vec![0_u8; symbol_size];
1684        let mut s1 = vec![0_u8; symbol_size];
1685        let split = changeset_bytes.len().min(symbol_size);
1686        s0[..split].copy_from_slice(&changeset_bytes[..split]);
1687        if changeset_bytes.len() > symbol_size {
1688            let rem = changeset_bytes.len() - symbol_size;
1689            s1[..rem].copy_from_slice(&changeset_bytes[symbol_size..]);
1690        }
1691
1692        // Interleave source+repair so K is reached with at least one repair symbol present.
1693        let p0 = make_packet(changeset_id, 0, 0, 2, s0);
1694        let p_repair = make_packet(changeset_id, 0, 2, 2, vec![0xCC; symbol_size]);
1695        let p1 = make_packet(changeset_id, 0, 1, 2, s1);
1696
1697        assert_eq!(
1698            receiver.process_parsed_packet(&p0).expect("p0"),
1699            PacketResult::Accepted
1700        );
1701        assert_eq!(
1702            receiver.process_parsed_packet(&p_repair).expect("repair"),
1703            PacketResult::NeedMore
1704        );
1705        assert_eq!(
1706            receiver.process_parsed_packet(&p1).expect("p1"),
1707            PacketResult::DecodeReady
1708        );
1709        assert_eq!(receiver.state(), ReceiverState::Applying);
1710
1711        let results = receiver.apply_pending().expect("apply");
1712        assert_eq!(results.len(), 1);
1713        let decode_proof = results[0]
1714            .decode_proof
1715            .as_ref()
1716            .expect("bead_id=bd-faz4 case=success_proof_attached_to_result");
1717        assert!(decode_proof.decode_success);
1718        assert!(decode_proof.is_repair());
1719        assert!(
1720            decode_proof.is_consistent(),
1721            "bead_id=bd-faz4 case=success_proof_consistent"
1722        );
1723
1724        let audit = receiver.take_decode_audit_entries();
1725        assert_eq!(audit.len(), 1, "bead_id=bd-faz4 case=success_proof_emitted");
1726    }
1727
1728    #[test]
1729    fn test_receiver_decode_success_truncation() {
1730        let page_size = 128_u32;
1731        let mut receiver = ReplicationReceiver::new();
1732        let packets = generate_sender_packets(page_size, &[1], 128);
1733
1734        for pkt in &packets {
1735            let _ = receiver.process_packet(pkt);
1736        }
1737
1738        // Apply and check that pages are correctly truncated.
1739        if receiver.state() == ReceiverState::Applying {
1740            let results = receiver.apply_pending().expect("apply");
1741            assert!(
1742                !results.is_empty(),
1743                "bead_id={TEST_BEAD_ID} case=has_results"
1744            );
1745            for result in &results {
1746                for page in &result.pages {
1747                    assert_eq!(
1748                        page.page_data.len(),
1749                        page_size as usize,
1750                        "bead_id={TEST_BEAD_ID} case=page_data_correct_size"
1751                    );
1752                }
1753            }
1754        }
1755    }
1756
1757    #[test]
1758    fn test_receiver_page_xxh3_validation() {
1759        let page_size = 256_u32;
1760        let mut pages = make_pages(page_size, &[1]);
1761        let changeset_bytes = encode_changeset(page_size, &mut pages).expect("encode");
1762
1763        // Tamper with a page byte in the changeset (after header + page_number + xxh3).
1764        let mut tampered = changeset_bytes.clone();
1765        let tamper_offset = CHANGESET_HEADER_SIZE + 4 + 8 + 10; // into page data
1766        tampered[tamper_offset] ^= 0xFF;
1767
1768        // Now create a "decoded" changeset and try to parse it.
1769        let receiver = ReplicationReceiver::new();
1770        let changeset_id = compute_changeset_id(&changeset_bytes);
1771        let result = receiver.parse_and_validate_changeset(changeset_id, &tampered);
1772        assert!(
1773            result.is_err(),
1774            "bead_id={TEST_BEAD_ID} case=xxh3_validation_catches_corruption"
1775        );
1776    }
1777
1778    #[test]
1779    fn test_parse_and_validate_rejects_total_len_smaller_than_header() {
1780        let receiver = ReplicationReceiver::new();
1781        let changeset_id = ChangesetId::from_bytes([0xA5; 16]);
1782
1783        let mut malformed = vec![0_u8; CHANGESET_HEADER_SIZE];
1784        malformed[0..4].copy_from_slice(b"FSRP");
1785        malformed[4..6].copy_from_slice(&1_u16.to_le_bytes());
1786        malformed[6..10].copy_from_slice(&4096_u32.to_le_bytes());
1787        malformed[10..14].copy_from_slice(&1_u32.to_le_bytes());
1788        malformed[14..22].copy_from_slice(&1_u64.to_le_bytes());
1789
1790        let result = receiver.parse_and_validate_changeset(changeset_id, &malformed);
1791        assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1792    }
1793
1794    #[test]
1795    fn test_parse_changeset_pages_rejects_truncated_payload() {
1796        let total_len = CHANGESET_HEADER_SIZE + 8;
1797        let mut malformed = vec![0_u8; total_len];
1798        malformed[0..4].copy_from_slice(b"FSRP");
1799        malformed[4..6].copy_from_slice(&1_u16.to_le_bytes());
1800        malformed[6..10].copy_from_slice(&4096_u32.to_le_bytes());
1801        malformed[10..14].copy_from_slice(&1_u32.to_le_bytes());
1802        malformed[14..22].copy_from_slice(
1803            &u64::try_from(total_len)
1804                .expect("test total_len fits into u64")
1805                .to_le_bytes(),
1806        );
1807
1808        let result = parse_changeset_pages(&malformed);
1809        assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1810    }
1811
1812    #[test]
1813    fn test_receiver_pages_applied_in_order() {
1814        let page_size = 256_u32;
1815        let mut receiver = ReplicationReceiver::new();
1816        let packets = generate_sender_packets(page_size, &[5, 1, 3, 2, 4], 256);
1817
1818        for pkt in &packets {
1819            let _ = receiver.process_packet(pkt);
1820        }
1821
1822        if receiver.state() == ReceiverState::Applying {
1823            let results = receiver.apply_pending().expect("apply");
1824            let pages = &results[0].pages;
1825            for w in pages.windows(2) {
1826                assert!(
1827                    w[0].page_number <= w[1].page_number,
1828                    "bead_id={TEST_BEAD_ID} case=pages_sorted pn0={} pn1={}",
1829                    w[0].page_number,
1830                    w[1].page_number
1831                );
1832            }
1833        }
1834    }
1835
1836    // -----------------------------------------------------------------------
1837    // Property tests
1838    // -----------------------------------------------------------------------
1839
1840    #[test]
1841    fn prop_any_k_symbols_decode() {
1842        // With only source symbols and k_source = actual source count,
1843        // providing all k source symbols always decodes.
1844        for n_pages in [1_u32, 3, 5, 10] {
1845            let page_size = 256_u32;
1846            let mut receiver = ReplicationReceiver::new();
1847            let packets =
1848                generate_sender_packets(page_size, &(1..=n_pages).collect::<Vec<_>>(), 256);
1849
1850            let mut decode_ready = false;
1851            for pkt in &packets {
1852                if matches!(receiver.process_packet(pkt), Ok(PacketResult::DecodeReady)) {
1853                    decode_ready = true;
1854                    break;
1855                }
1856            }
1857            assert!(
1858                decode_ready,
1859                "bead_id={TEST_BEAD_ID} case=prop_any_k_decode n_pages={n_pages}"
1860            );
1861        }
1862    }
1863
1864    #[test]
1865    fn prop_dedup_idempotent() {
1866        // Use a large K_source so we can feed duplicates before decode triggers.
1867        let mut receiver = ReplicationReceiver::new();
1868        let id = ChangesetId::from_bytes([0x77; 16]);
1869
1870        // Feed the same ISI multiple times within a single decoder session.
1871        let p1 = make_packet(id, 0, 0, 100, vec![0x42; 512]); // large enough that one symbol won't trigger decode
1872
1873        let r1 = receiver.process_parsed_packet(&p1).expect("first");
1874        assert_eq!(
1875            r1,
1876            PacketResult::Accepted,
1877            "bead_id={TEST_BEAD_ID} case=dedup_first_accepted"
1878        );
1879
1880        for _ in 0..5 {
1881            let r = receiver.process_parsed_packet(&p1).expect("duplicate");
1882            assert_eq!(
1883                r,
1884                PacketResult::Duplicate,
1885                "bead_id={TEST_BEAD_ID} case=dedup_subsequent_always_duplicate"
1886            );
1887        }
1888
1889        // Count should still be 1.
1890        let count = receiver.received_counts.get(&id).copied().unwrap_or(0);
1891        assert_eq!(count, 1, "bead_id={TEST_BEAD_ID} case=dedup_count_stable");
1892    }
1893
1894    // -----------------------------------------------------------------------
1895    // E2E tests
1896    // -----------------------------------------------------------------------
1897
1898    #[test]
1899    fn test_packet_reject_over_message_cap() {
1900        let mut receiver = ReplicationReceiver::new();
1901        let oversized = vec![0_u8; DEFAULT_RPC_MESSAGE_CAP_BYTES + 1];
1902        let err = receiver.process_packet(&oversized).unwrap_err();
1903        assert!(matches!(err, FrankenError::TooBig));
1904    }
1905
1906    #[test]
1907    fn test_e2e_sender_receiver_roundtrip() {
1908        // Sender encodes pages. Receiver collects and decodes. Byte-identical.
1909        let page_size = 512_u32;
1910        let page_numbers: Vec<u32> = (1..=20).collect();
1911        let original_pages = make_pages(page_size, &page_numbers);
1912
1913        let mut receiver = ReplicationReceiver::new();
1914        let packets = generate_sender_packets(page_size, &page_numbers, 512);
1915
1916        for pkt in &packets {
1917            let _ = receiver.process_packet(pkt);
1918        }
1919
1920        assert_eq!(
1921            receiver.state(),
1922            ReceiverState::Applying,
1923            "bead_id={TEST_BEAD_ID} case=e2e_roundtrip_applying"
1924        );
1925
1926        let results = receiver.apply_pending().expect("apply");
1927        assert_eq!(
1928            results.len(),
1929            1,
1930            "bead_id={TEST_BEAD_ID} case=e2e_one_changeset"
1931        );
1932
1933        let decoded_pages = &results[0].pages;
1934        assert_eq!(
1935            decoded_pages.len(),
1936            original_pages.len(),
1937            "bead_id={TEST_BEAD_ID} case=e2e_page_count"
1938        );
1939
1940        for (decoded, original) in decoded_pages.iter().zip(original_pages.iter()) {
1941            assert_eq!(
1942                decoded.page_number, original.page_number,
1943                "bead_id={TEST_BEAD_ID} case=e2e_page_number_match"
1944            );
1945            assert_eq!(
1946                decoded.page_data, original.page_bytes,
1947                "bead_id={TEST_BEAD_ID} case=e2e_page_data_identical pn={}",
1948                original.page_number
1949            );
1950        }
1951
1952        // Complete the cycle.
1953        receiver.reset_to_listening().expect("reset");
1954        assert_eq!(
1955            receiver.state(),
1956            ReceiverState::Listening,
1957            "bead_id={TEST_BEAD_ID} case=e2e_back_to_listening"
1958        );
1959    }
1960
1961    #[test]
1962    fn test_e2e_concurrent_changesets() {
1963        // Two changesets streaming simultaneously.
1964        let mut receiver = ReplicationReceiver::new();
1965
1966        let packets_a = generate_sender_packets(256, &[1, 2, 3], 256);
1967        let packets_b = generate_sender_packets(256, &[10, 20, 30], 256);
1968
1969        // Interleave packets from two different changesets.
1970        let mut all_packets = Vec::new();
1971        let max_len = packets_a.len().max(packets_b.len());
1972        for i in 0..max_len {
1973            if i < packets_a.len() {
1974                all_packets.push(packets_a[i].clone());
1975            }
1976            if i < packets_b.len() {
1977                all_packets.push(packets_b[i].clone());
1978            }
1979        }
1980
1981        let mut decode_count = 0_u32;
1982        for pkt in &all_packets {
1983            if matches!(receiver.process_packet(pkt), Ok(PacketResult::DecodeReady)) {
1984                decode_count += 1;
1985                // Apply immediately and reset if needed.
1986                if receiver.state() == ReceiverState::Applying {
1987                    let _ = receiver.apply_pending();
1988                    // If more decoders remain, go back to collecting.
1989                    if !receiver.decoders.is_empty() {
1990                        receiver.state = ReceiverState::Collecting;
1991                    }
1992                }
1993            }
1994        }
1995
1996        assert!(
1997            decode_count >= 1,
1998            "bead_id={TEST_BEAD_ID} case=e2e_concurrent_at_least_one_decoded count={decode_count}"
1999        );
2000    }
2001
2002    #[test]
2003    fn test_e2e_bd_1hi_14_compliance() {
2004        // Full end-to-end compliance test.
2005        let page_size = 1024_u32;
2006        let page_numbers: Vec<u32> = (1..=10).collect();
2007        let original_pages = make_pages(page_size, &page_numbers);
2008
2009        // Encode via sender.
2010        let mut sender = ReplicationSender::new();
2011        let mut pages = make_pages(page_size, &page_numbers);
2012        sender
2013            .prepare(page_size, &mut pages, SenderConfig::default())
2014            .expect("prepare");
2015        sender.start_streaming().expect("start");
2016
2017        // Collect all packets.
2018        let mut wire_packets = Vec::new();
2019        while let Some(packet) = sender.next_packet().expect("next") {
2020            wire_packets.push(packet.to_bytes().expect("encode"));
2021        }
2022
2023        // Feed to receiver.
2024        let mut receiver = ReplicationReceiver::new();
2025        assert_eq!(receiver.state(), ReceiverState::Listening);
2026
2027        let mut last_result = PacketResult::Accepted;
2028        for pkt in &wire_packets {
2029            let result = receiver
2030                .process_packet(pkt)
2031                .expect("bead_id={TEST_BEAD_ID} case=e2e_compliance unexpected error");
2032            last_result = result;
2033            if result == PacketResult::DecodeReady {
2034                break;
2035            }
2036        }
2037
2038        // Verify decode happened.
2039        assert_eq!(
2040            last_result,
2041            PacketResult::DecodeReady,
2042            "bead_id={TEST_BEAD_ID} case=e2e_compliance_decoded"
2043        );
2044        assert_eq!(receiver.state(), ReceiverState::Applying);
2045
2046        // Apply.
2047        let results = receiver.apply_pending().expect("apply");
2048        assert_eq!(receiver.state(), ReceiverState::Complete);
2049        assert_eq!(results.len(), 1);
2050
2051        // Verify byte-identical pages.
2052        let decoded = &results[0].pages;
2053        assert_eq!(decoded.len(), original_pages.len());
2054        for (d, o) in decoded.iter().zip(original_pages.iter()) {
2055            assert_eq!(d.page_number, o.page_number);
2056            assert_eq!(d.page_data, o.page_bytes);
2057        }
2058
2059        // Reset and verify.
2060        receiver.reset_to_listening().expect("reset");
2061        assert_eq!(
2062            receiver.state(),
2063            ReceiverState::Listening,
2064            "bead_id={TEST_BEAD_ID} case=e2e_compliance_reset"
2065        );
2066        assert_eq!(receiver.applied_count(), 1);
2067    }
2068
2069    #[test]
2070    fn test_simnetwork_loss_profiles_converge_with_repair_symbols() {
2071        let page_size = 128_u32;
2072        let page_numbers = [1_u32, 2];
2073        let original_pages = make_pages(page_size, &page_numbers);
2074        let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2075        let loss_packets: Vec<Vec<u8>> = packets
2076            .iter()
2077            .flat_map(|packet| [packet.clone(), packet.clone()])
2078            .collect();
2079
2080        for (loss_rate, require_observed_drop) in [(0.05_f64, false), (0.30_f64, true)] {
2081            let mut found_seed = None;
2082            for seed in 1_u64..=20_000 {
2083                let mut config = SimTransportConfig::deterministic(seed);
2084                config.loss_rate = loss_rate;
2085                config.preserve_order = true;
2086
2087                let delivery = transmit_packets_simnetwork(config, &loss_packets);
2088                let observed_drop = delivery.delivered.len() < delivery.sent_count;
2089                if require_observed_drop && !observed_drop {
2090                    continue;
2091                }
2092                let saw_repair_symbol = delivery.delivered.iter().any(|(_, wire)| {
2093                    ReplicationPacket::from_bytes(wire)
2094                        .is_ok_and(|packet| !packet.is_source_symbol())
2095                });
2096                if !saw_repair_symbol {
2097                    continue;
2098                }
2099
2100                let (decoded, _erasures, _parse_errors) =
2101                    decode_from_wire_packets(&delivery.delivered);
2102                if decoded
2103                    .as_ref()
2104                    .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2105                {
2106                    found_seed = Some(seed);
2107                    break;
2108                }
2109            }
2110
2111            assert!(
2112                found_seed.is_some(),
2113                "bead_id=bd-xgoe case=loss_profile_convergence loss_rate={loss_rate} require_drop={require_observed_drop} did not find deterministic convergent seed"
2114            );
2115        }
2116    }
2117
2118    #[test]
2119    fn test_simnetwork_reorder_and_dup_converge() {
2120        let page_size = 128_u32;
2121        let page_numbers = [7_u32, 11];
2122        let original_pages = make_pages(page_size, &page_numbers);
2123        let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2124
2125        let mut found_seed = None;
2126        for seed in 1_u64..=2_000 {
2127            let mut config = SimTransportConfig::deterministic(seed);
2128            config.preserve_order = false;
2129            config.duplication_rate = 0.35;
2130
2131            let delivery = transmit_packets_simnetwork(config, &packets);
2132            if !has_duplicate_esies(&delivery) || !has_reordered_esies(&delivery) {
2133                continue;
2134            }
2135
2136            let (decoded, _erasures, _parse_errors) = decode_from_wire_packets(&delivery.delivered);
2137            if decoded
2138                .as_ref()
2139                .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2140            {
2141                found_seed = Some(seed);
2142                break;
2143            }
2144        }
2145
2146        assert!(
2147            found_seed.is_some(),
2148            "bead_id=bd-xgoe case=reorder_dup_convergence no deterministic seed achieved reorder+dup convergence"
2149        );
2150    }
2151
2152    #[test]
2153    fn test_simnetwork_corruption_is_rejected_and_recovered() {
2154        let page_size = 128_u32;
2155        let page_numbers = [21_u32, 34];
2156        let original_pages = make_pages(page_size, &page_numbers);
2157        let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 128, 2);
2158
2159        let mut found_seed = None;
2160        for seed in 1_u64..=20_000 {
2161            let mut config = SimTransportConfig::deterministic(seed);
2162            config.corruption_rate = 0.20;
2163            config.preserve_order = false;
2164
2165            let delivery = transmit_packets_simnetwork(config, &packets);
2166            if !has_corrupted_wire_bytes(&delivery, &packets) {
2167                continue;
2168            }
2169
2170            let (decoded, erasures, parse_errors) = decode_from_wire_packets(&delivery.delivered);
2171            if erasures + parse_errors == 0 {
2172                continue;
2173            }
2174            if decoded
2175                .as_ref()
2176                .is_some_and(|pages| decoded_matches_original(pages, &original_pages))
2177            {
2178                found_seed = Some(seed);
2179                break;
2180            }
2181        }
2182
2183        assert!(
2184            found_seed.is_some(),
2185            "bead_id=bd-xgoe case=corruption_recovery no deterministic seed achieved corruption rejection + convergence"
2186        );
2187    }
2188
2189    #[test]
2190    fn test_simnetwork_stop_early_reduces_traffic() {
2191        let page_size = 256_u32;
2192        let page_numbers = [1_u32, 2, 3];
2193        let packets = generate_sender_packets_with_multiplier(page_size, &page_numbers, 256, 2);
2194
2195        let full_delivery = transmit_packets_simnetwork(SimTransportConfig::reliable(), &packets);
2196        let full_sent = full_delivery.sent_count;
2197
2198        let network = SimNetwork::fully_connected(2, SimTransportConfig::reliable());
2199        let (mut sink, mut stream) = network.transport(0, 1);
2200        let runtime = RuntimeBuilder::current_thread()
2201            .build()
2202            .expect("runtime build");
2203
2204        let mut receiver = ReplicationReceiver::new();
2205        let mut stop_early_sent = 0_usize;
2206        let mut decoded = false;
2207
2208        runtime.block_on(async {
2209            for (index, bytes) in packets.iter().enumerate() {
2210                let esi = u32::try_from(index).expect("test packet index fits u32");
2211                sink.send(packet_symbol(esi, bytes.clone()))
2212                    .await
2213                    .expect("send simulated symbol");
2214                stop_early_sent += 1;
2215
2216                let delivered = stream
2217                    .next()
2218                    .await
2219                    .expect("delivered packet")
2220                    .expect("stream item");
2221                let wire = delivered.symbol().data().to_vec();
2222                if matches!(
2223                    receiver.process_packet(&wire).expect("receiver process"),
2224                    PacketResult::DecodeReady
2225                ) {
2226                    decoded = true;
2227                    break;
2228                }
2229            }
2230            sink.close().await.expect("close simulated sink");
2231        });
2232
2233        assert!(
2234            decoded,
2235            "bead_id=bd-xgoe case=stop_early_decode_not_reached"
2236        );
2237        assert!(
2238            stop_early_sent < full_sent,
2239            "bead_id=bd-xgoe case=stop_early_not_reduced stop_early_sent={stop_early_sent} full_sent={full_sent}"
2240        );
2241    }
2242
2243    // -----------------------------------------------------------------------
2244    // Compliance gate tests
2245    // -----------------------------------------------------------------------
2246
2247    #[test]
2248    fn test_bd_1hi_14_unit_compliance_gate() {
2249        // Verify all required types and functions exist.
2250        let _ = ReceiverState::Listening;
2251        let _ = ReceiverState::Collecting;
2252        let _ = ReceiverState::Decoding;
2253        let _ = ReceiverState::Applying;
2254        let _ = ReceiverState::Complete;
2255
2256        let _ = PacketResult::Accepted;
2257        let _ = PacketResult::Erasure;
2258        let _ = PacketResult::Duplicate;
2259        let _ = PacketResult::DecodeReady;
2260        let _ = PacketResult::NeedMore;
2261
2262        let receiver = ReplicationReceiver::new();
2263        assert_eq!(receiver.state(), ReceiverState::Listening);
2264        assert_eq!(receiver.applied_count(), 0);
2265        assert_eq!(receiver.active_decoders(), 0);
2266
2267        // Verify REPLICATION_HEADER_SIZE is correct.
2268        assert_eq!(REPLICATION_HEADER_SIZE, 72);
2269    }
2270
2271    #[test]
2272    fn prop_bd_1hi_14_structure_compliance() {
2273        // Full state machine cycle.
2274        let page_size = 256_u32;
2275        let mut receiver = ReplicationReceiver::new();
2276        assert_eq!(receiver.state(), ReceiverState::Listening);
2277
2278        let packets = generate_sender_packets(page_size, &[1, 2], 256);
2279        for pkt in &packets {
2280            let _ = receiver.process_packet(pkt);
2281        }
2282
2283        // Should have transitioned through the state machine.
2284        assert!(
2285            receiver.state() == ReceiverState::Applying
2286                || receiver.state() == ReceiverState::Collecting,
2287            "bead_id={TEST_BEAD_ID} case=prop_state_machine state={:?}",
2288            receiver.state()
2289        );
2290
2291        if receiver.state() == ReceiverState::Applying {
2292            let results = receiver.apply_pending().expect("apply");
2293            assert!(!results.is_empty());
2294            assert_eq!(receiver.state(), ReceiverState::Complete);
2295            receiver.reset_to_listening().expect("reset");
2296            assert_eq!(receiver.state(), ReceiverState::Listening);
2297        }
2298    }
2299}