Skip to main content

fsqlite_core/
replication_sender.rs

1//! §3.4.2 Fountain-Coded Replication Sender (bd-1hi.13).
2//!
3//! Implements the sender-side state machine for fountain-coded database
4//! replication using RaptorQ encoding over UDP.
5//!
6//! State machine: IDLE → ENCODING → STREAMING → COMPLETE
7//!
8//! Changeset encoding is deterministic, self-delimiting, and uses
9//! domain-separated BLAKE3 for changeset identity.
10
11use std::fmt;
12
13use fsqlite_error::{FrankenError, Result};
14use tracing::{debug, error, info, warn};
15
16use crate::source_block_partition::K_MAX;
17
18const BEAD_ID: &str = "bd-1hi.13";
19
20// ---------------------------------------------------------------------------
21// Constants
22// ---------------------------------------------------------------------------
23
24/// Changeset header magic bytes.
25pub const CHANGESET_MAGIC: [u8; 4] = *b"FSRP";
26
27/// Changeset format version.
28pub const CHANGESET_VERSION: u16 = 1;
29
30/// BLAKE3 domain separation context for changeset identity.
31pub const CHANGESET_DOMAIN: &str = "fsqlite:replication:changeset:v1";
32
33/// Replication packet header size (bytes).
34pub const REPLICATION_HEADER_SIZE: usize = 72;
35
36/// Legacy replication header size from bd-1hi.13 (bytes).
37pub const REPLICATION_HEADER_SIZE_LEGACY: usize = 24;
38
39/// Protocol magic for the fixed-size replication packet header.
40pub const REPLICATION_PROTOCOL_MAGIC: [u8; 4] = *b"FSRP";
41
42/// Current fixed-header packet protocol version.
43pub const REPLICATION_PROTOCOL_VERSION_V2: u8 = 2;
44
45/// Fixed-size V2 replication header length encoded on wire.
46pub const REPLICATION_HEADER_SIZE_V2: usize = REPLICATION_HEADER_SIZE;
47/// Fixed-size V2 replication header length encoded on wire (`u16` form).
48pub const REPLICATION_HEADER_SIZE_V2_U16: u16 = 72;
49
50/// Header flag: packet carries an authentication tag.
51pub const REPLICATION_FLAG_AUTH_PRESENT: u8 = 0b0000_0001;
52
53/// Domain separator for packet authentication tags.
54pub const REPLICATION_PACKET_AUTH_DOMAIN: &str = "fsqlite:replication:packet-auth:v1";
55
56/// Maximum UDP application payload (IPv4).
57pub const MAX_UDP_PAYLOAD: usize = 65_507;
58
59/// Maximum symbol size for replication: `MAX_UDP_PAYLOAD - REPLICATION_HEADER_SIZE`.
60pub const MAX_REPLICATION_SYMBOL_SIZE: usize = MAX_UDP_PAYLOAD - REPLICATION_HEADER_SIZE;
61
62/// Recommended MTU-safe symbol size for Ethernet.
63/// 1500 MTU - 20 IPv4 - 8 UDP - 72 replication header = 1400.
64pub const MTU_SAFE_SYMBOL_SIZE: u16 = 1400;
65
66/// Default maximum ISI multiplier for streaming stop.
67pub const DEFAULT_MAX_ISI_MULTIPLIER: u32 = 2;
68
69/// Default hard cap for a single remote message (4 MiB, §4.19.6).
70pub const DEFAULT_RPC_MESSAGE_CAP_BYTES: usize = 4 * 1024 * 1024;
71
72/// HTTP/2 default: max concurrent streams.
73pub const DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS: u32 = 256;
74
75/// HTTP/2 default: maximum compressed header list size (64 KiB).
76pub const DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE: usize = 65_536;
77
78/// HTTP/2 default: CONTINUATION timeout in milliseconds (5s).
79pub const DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS: u64 = 5_000;
80
81/// HTTP/2 default: absolute header fragment cap (256 KiB).
82pub const DEFAULT_HTTP2_HEADER_FRAGMENT_CAP: usize = 262_144;
83
84/// Default handshake timeout in milliseconds.
85pub const DEFAULT_HANDSHAKE_TIMEOUT_MS: u64 = 500;
86
87/// Changeset header size in bytes.
88pub const CHANGESET_HEADER_SIZE: usize = 4 + 2 + 4 + 4 + 8; // magic + version + page_size + n_pages + total_len = 22
89
90// ---------------------------------------------------------------------------
91// §4.19.6 Network Policy + Deterministic VirtualTcp
92// ---------------------------------------------------------------------------
93
94/// Transport security mode for remote networking.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum TransportSecurityMode {
97    /// TLS transport via rustls.
98    RustlsTls,
99    /// Plaintext transport (only for explicit local development opt-in).
100    Plaintext,
101}
102
103/// Enforced HTTP/2 hard limits.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct Http2HardLimits {
106    pub max_concurrent_streams: u32,
107    pub max_header_list_size: usize,
108    pub continuation_timeout_ms: u64,
109    pub header_fragment_cap: usize,
110}
111
112impl Default for Http2HardLimits {
113    fn default() -> Self {
114        Self {
115            max_concurrent_streams: DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS,
116            max_header_list_size: DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE,
117            continuation_timeout_ms: DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS,
118            header_fragment_cap: DEFAULT_HTTP2_HEADER_FRAGMENT_CAP,
119        }
120    }
121}
122
123/// Networking stack policy for remote effects and replication transport.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub struct NetworkStackConfig {
126    pub security: TransportSecurityMode,
127    pub explicit_plaintext_opt_in: bool,
128    pub handshake_timeout_ms: u64,
129    pub message_size_cap_bytes: usize,
130    pub http2: Http2HardLimits,
131}
132
133impl Default for NetworkStackConfig {
134    fn default() -> Self {
135        Self {
136            security: TransportSecurityMode::RustlsTls,
137            explicit_plaintext_opt_in: false,
138            handshake_timeout_ms: DEFAULT_HANDSHAKE_TIMEOUT_MS,
139            message_size_cap_bytes: DEFAULT_RPC_MESSAGE_CAP_BYTES,
140            http2: Http2HardLimits::default(),
141        }
142    }
143}
144
145impl NetworkStackConfig {
146    /// Build plaintext config for explicit local development.
147    ///
148    /// # Errors
149    ///
150    /// Returns `FrankenError::Unsupported` when plaintext is requested
151    /// without explicit opt-in.
152    pub fn plaintext_local_dev(explicit_opt_in: bool) -> Result<Self> {
153        if !explicit_opt_in {
154            return Err(FrankenError::Unsupported);
155        }
156        Ok(Self {
157            security: TransportSecurityMode::Plaintext,
158            explicit_plaintext_opt_in: true,
159            ..Self::default()
160        })
161    }
162
163    /// Validate the transport security policy.
164    ///
165    /// # Errors
166    ///
167    /// Returns `FrankenError::Unsupported` if plaintext is not explicitly opted in.
168    pub fn validate_security(&self) -> Result<()> {
169        if self.security == TransportSecurityMode::Plaintext && !self.explicit_plaintext_opt_in {
170            return Err(FrankenError::Unsupported);
171        }
172        Ok(())
173    }
174
175    /// Validate stream concurrency against HTTP/2 hard limits.
176    ///
177    /// # Errors
178    ///
179    /// Returns `FrankenError::Busy` when `streams` exceeds the configured maximum.
180    pub fn validate_concurrent_streams(&self, streams: u32) -> Result<()> {
181        if streams > self.http2.max_concurrent_streams {
182            return Err(FrankenError::Busy);
183        }
184        Ok(())
185    }
186
187    /// Validate HTTP header-list size.
188    ///
189    /// # Errors
190    ///
191    /// Returns `FrankenError::TooBig` if header bytes exceed configured limit.
192    pub fn validate_header_list_size(&self, header_bytes: usize) -> Result<()> {
193        if header_bytes > self.http2.max_header_list_size {
194            return Err(FrankenError::TooBig);
195        }
196        Ok(())
197    }
198
199    /// Validate elapsed time for HTTP/2 continuation.
200    ///
201    /// # Errors
202    ///
203    /// Returns `FrankenError::BusyRecovery` when continuation elapsed time
204    /// exceeds the configured timeout.
205    pub fn validate_continuation_elapsed(&self, elapsed_ms: u64) -> Result<()> {
206        if elapsed_ms > self.http2.continuation_timeout_ms {
207            return Err(FrankenError::BusyRecovery);
208        }
209        Ok(())
210    }
211
212    /// Validate elapsed handshake time against timeout budget.
213    ///
214    /// # Errors
215    ///
216    /// Returns `FrankenError::BusyRecovery` when elapsed time exceeds budget.
217    pub fn validate_handshake_elapsed(&self, elapsed_ms: u64) -> Result<()> {
218        if elapsed_ms > self.handshake_timeout_ms {
219            return Err(FrankenError::BusyRecovery);
220        }
221        Ok(())
222    }
223
224    /// Validate message size against the hard cap.
225    ///
226    /// # Errors
227    ///
228    /// Returns `FrankenError::TooBig` when `message_bytes` exceeds the cap.
229    pub fn validate_message_size(&self, message_bytes: usize) -> Result<()> {
230        if message_bytes > self.message_size_cap_bytes {
231            return Err(FrankenError::TooBig);
232        }
233        Ok(())
234    }
235}
236
237/// Fault profile for deterministic in-memory VirtualTcp transport.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct VirtualTcpFaultProfile {
240    pub drop_per_million: u32,
241    pub reorder_per_million: u32,
242    pub corrupt_per_million: u32,
243}
244
245impl VirtualTcpFaultProfile {
246    /// Validate rates in parts-per-million (`0..=1_000_000`).
247    ///
248    /// # Errors
249    ///
250    /// Returns `FrankenError::OutOfRange` when any rate is above 1_000_000.
251    pub fn validate(&self) -> Result<()> {
252        const PPM_MAX: u32 = 1_000_000;
253        if self.drop_per_million > PPM_MAX {
254            return Err(FrankenError::OutOfRange {
255                what: "drop_per_million".to_owned(),
256                value: self.drop_per_million.to_string(),
257            });
258        }
259        if self.reorder_per_million > PPM_MAX {
260            return Err(FrankenError::OutOfRange {
261                what: "reorder_per_million".to_owned(),
262                value: self.reorder_per_million.to_string(),
263            });
264        }
265        if self.corrupt_per_million > PPM_MAX {
266            return Err(FrankenError::OutOfRange {
267                what: "corrupt_per_million".to_owned(),
268                value: self.corrupt_per_million.to_string(),
269            });
270        }
271        Ok(())
272    }
273}
274
275/// Trace event kind for deterministic VirtualTcp replay.
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
277pub enum VirtualTcpTraceKind {
278    Dropped,
279    BufferedForReorder,
280    Delivered,
281    DeliveredCorrupt,
282    FlushedReordered,
283}
284
285/// Deterministic trace event emitted by VirtualTcp.
286#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct VirtualTcpTraceEvent {
288    pub seq: u64,
289    pub kind: VirtualTcpTraceKind,
290    pub payload_hash: u64,
291}
292
293/// Deterministic in-memory network shim for lab/DPOR.
294#[derive(Debug, Clone)]
295pub struct VirtualTcp {
296    state: u64,
297    seq: u64,
298    faults: VirtualTcpFaultProfile,
299    pending_reorder: Option<Vec<u8>>,
300    trace: Vec<VirtualTcpTraceEvent>,
301}
302
303impl VirtualTcp {
304    /// Construct a new deterministic VirtualTcp instance.
305    ///
306    /// # Errors
307    ///
308    /// Returns `FrankenError::OutOfRange` when fault probabilities are invalid.
309    pub fn new(seed: u64, faults: VirtualTcpFaultProfile) -> Result<Self> {
310        faults.validate()?;
311        Ok(Self {
312            state: seed,
313            seq: 0,
314            faults,
315            pending_reorder: None,
316            trace: Vec::new(),
317        })
318    }
319
320    /// Return deterministic trace events for replay/debugging.
321    #[must_use]
322    pub fn trace(&self) -> &[VirtualTcpTraceEvent] {
323        &self.trace
324    }
325
326    /// Transmit one payload through deterministic drop/reorder/corrupt rules.
327    ///
328    /// Returns zero, one, or two delivered payloads (reorder flush path).
329    #[must_use]
330    pub fn transmit(&mut self, payload: &[u8]) -> Vec<Vec<u8>> {
331        self.seq = self.seq.saturating_add(1);
332
333        if self.coin_flip(self.faults.drop_per_million) {
334            self.push_trace(VirtualTcpTraceKind::Dropped, payload);
335            return Vec::new();
336        }
337
338        let mut wire = payload.to_vec();
339        let corrupted = if !wire.is_empty() && self.coin_flip(self.faults.corrupt_per_million) {
340            let idx = (self.next_u32() as usize) % wire.len();
341            wire[idx] ^= 0x01;
342            true
343        } else {
344            false
345        };
346
347        if self.coin_flip(self.faults.reorder_per_million) && self.pending_reorder.is_none() {
348            self.push_trace(VirtualTcpTraceKind::BufferedForReorder, &wire);
349            self.pending_reorder = Some(wire);
350            return Vec::new();
351        }
352
353        let mut out = Vec::with_capacity(2);
354        if let Some(previous) = self.pending_reorder.take() {
355            let kind = if corrupted {
356                VirtualTcpTraceKind::DeliveredCorrupt
357            } else {
358                VirtualTcpTraceKind::Delivered
359            };
360            self.push_trace(kind, &wire);
361            out.push(wire);
362            self.push_trace(VirtualTcpTraceKind::FlushedReordered, &previous);
363            out.push(previous);
364            return out;
365        }
366
367        let kind = if corrupted {
368            VirtualTcpTraceKind::DeliveredCorrupt
369        } else {
370            VirtualTcpTraceKind::Delivered
371        };
372        self.push_trace(kind, &wire);
373        out.push(wire);
374        out
375    }
376
377    /// Flush any pending reordered payload.
378    pub fn flush(&mut self) -> Option<Vec<u8>> {
379        let pending = self.pending_reorder.take()?;
380        self.seq = self.seq.saturating_add(1);
381        self.push_trace(VirtualTcpTraceKind::FlushedReordered, &pending);
382        Some(pending)
383    }
384
385    fn push_trace(&mut self, kind: VirtualTcpTraceKind, payload: &[u8]) {
386        self.trace.push(VirtualTcpTraceEvent {
387            seq: self.seq,
388            kind,
389            payload_hash: xxhash_rust::xxh3::xxh3_64(payload),
390        });
391    }
392
393    fn coin_flip(&mut self, per_million: u32) -> bool {
394        const PPM_MAX: u32 = 1_000_000;
395        if per_million == 0 {
396            return false;
397        }
398        if per_million >= PPM_MAX {
399            return true;
400        }
401        self.next_u32() % PPM_MAX < per_million
402    }
403
404    fn next_u32(&mut self) -> u32 {
405        // Deterministic LCG for lab replay.
406        self.state = self
407            .state
408            .wrapping_mul(6_364_136_223_846_793_005)
409            .wrapping_add(1);
410        (self.state >> 32) as u32
411    }
412}
413
414// ---------------------------------------------------------------------------
415// Changeset Encoding
416// ---------------------------------------------------------------------------
417
418/// Self-delimiting changeset header (§3.4.2).
419#[derive(Debug, Clone, Copy, PartialEq, Eq)]
420pub struct ChangesetHeader {
421    pub magic: [u8; 4],
422    pub version: u16,
423    pub page_size: u32,
424    pub n_pages: u32,
425    pub total_len: u64,
426}
427
428impl ChangesetHeader {
429    /// Encode to little-endian bytes.
430    #[must_use]
431    pub fn to_bytes(&self) -> [u8; CHANGESET_HEADER_SIZE] {
432        let mut buf = [0_u8; CHANGESET_HEADER_SIZE];
433        buf[0..4].copy_from_slice(&self.magic);
434        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
435        buf[6..10].copy_from_slice(&self.page_size.to_le_bytes());
436        buf[10..14].copy_from_slice(&self.n_pages.to_le_bytes());
437        buf[14..22].copy_from_slice(&self.total_len.to_le_bytes());
438        buf
439    }
440
441    /// Decode from little-endian bytes.
442    ///
443    /// # Errors
444    ///
445    /// Returns error if magic or version mismatch.
446    pub fn from_bytes(buf: &[u8; CHANGESET_HEADER_SIZE]) -> Result<Self> {
447        let magic: [u8; 4] = buf[0..4].try_into().expect("4 bytes");
448        if magic != CHANGESET_MAGIC {
449            return Err(FrankenError::DatabaseCorrupt {
450                detail: format!("changeset magic mismatch: expected FSRP, got {magic:?}"),
451            });
452        }
453        let version = u16::from_le_bytes(buf[4..6].try_into().expect("2 bytes"));
454        if version != CHANGESET_VERSION {
455            return Err(FrankenError::DatabaseCorrupt {
456                detail: format!(
457                    "changeset version mismatch: expected {CHANGESET_VERSION}, got {version}"
458                ),
459            });
460        }
461        let page_size = u32::from_le_bytes(buf[6..10].try_into().expect("4 bytes"));
462        let n_pages = u32::from_le_bytes(buf[10..14].try_into().expect("4 bytes"));
463        let total_len = u64::from_le_bytes(buf[14..22].try_into().expect("8 bytes"));
464        Ok(Self {
465            magic,
466            version,
467            page_size,
468            n_pages,
469            total_len,
470        })
471    }
472}
473
474/// A single page entry in the changeset.
475#[derive(Debug, Clone, PartialEq, Eq)]
476pub struct PageEntry {
477    pub page_number: u32,
478    pub page_xxh3: u64,
479    pub page_bytes: Vec<u8>,
480}
481
482impl PageEntry {
483    /// Create a page entry, computing the xxh3 checksum.
484    #[must_use]
485    pub fn new(page_number: u32, page_bytes: Vec<u8>) -> Self {
486        let page_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_bytes);
487        Self {
488            page_number,
489            page_xxh3,
490            page_bytes,
491        }
492    }
493
494    /// Validate that the stored xxh3 matches the page bytes.
495    #[must_use]
496    pub fn validate_xxh3(&self) -> bool {
497        xxhash_rust::xxh3::xxh3_64(&self.page_bytes) == self.page_xxh3
498    }
499}
500
501/// 128-bit changeset identifier (truncated BLAKE3).
502#[derive(Clone, Copy, PartialEq, Eq, Hash)]
503pub struct ChangesetId([u8; 16]);
504
505impl ChangesetId {
506    /// Bytes of the identifier.
507    #[must_use]
508    pub const fn as_bytes(&self) -> &[u8; 16] {
509        &self.0
510    }
511
512    /// Create from raw bytes.
513    #[must_use]
514    pub const fn from_bytes(bytes: [u8; 16]) -> Self {
515        Self(bytes)
516    }
517}
518
519impl fmt::Debug for ChangesetId {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        write!(f, "ChangesetId(")?;
522        for byte in &self.0 {
523            write!(f, "{byte:02x}")?;
524        }
525        write!(f, ")")
526    }
527}
528
529/// Compute the changeset identifier: `Trunc128(BLAKE3(domain || changeset_bytes))`.
530#[must_use]
531pub fn compute_changeset_id(changeset_bytes: &[u8]) -> ChangesetId {
532    let mut hasher = blake3::Hasher::new();
533    hasher.update(CHANGESET_DOMAIN.as_bytes());
534    hasher.update(changeset_bytes);
535    let hash = hasher.finalize();
536    let mut id = [0_u8; 16];
537    id.copy_from_slice(&hash.as_bytes()[..16]);
538    ChangesetId(id)
539}
540
541/// Derive the deterministic RaptorQ seed from a changeset identifier.
542#[must_use]
543pub fn derive_seed_from_changeset_id(id: &ChangesetId) -> u64 {
544    xxhash_rust::xxh3::xxh3_64(id.as_bytes())
545}
546
547/// Compute `K_source = ceil(F / T_replication)` for a payload length `F`.
548///
549/// This is the normative symbol-count mapping for replication object sizing.
550///
551/// # Errors
552///
553/// Returns `FrankenError::OutOfRange` if `symbol_size` is 0.
554pub fn compute_k_source(total_bytes: usize, symbol_size: u16) -> Result<u64> {
555    if symbol_size == 0 {
556        return Err(FrankenError::OutOfRange {
557            what: "symbol_size".to_owned(),
558            value: "0".to_owned(),
559        });
560    }
561    let f = u64::try_from(total_bytes).map_err(|_| FrankenError::OutOfRange {
562        what: "total_bytes".to_owned(),
563        value: total_bytes.to_string(),
564    })?;
565    let t = u64::from(symbol_size);
566    Ok(f.div_ceil(t))
567}
568
569/// Canonicalize page entries for deterministic `changeset_bytes`.
570///
571/// Sorting by page number is the primary key. Tie-breakers remove dependence
572/// on input iteration order (e.g., hash-map traversal) for duplicate page
573/// numbers.
574fn canonicalize_changeset_pages(pages: &mut [PageEntry]) {
575    pages.sort_by(|lhs, rhs| {
576        lhs.page_number
577            .cmp(&rhs.page_number)
578            .then_with(|| lhs.page_xxh3.cmp(&rhs.page_xxh3))
579            .then_with(|| lhs.page_bytes.cmp(&rhs.page_bytes))
580    });
581}
582
583/// Encode pages into a deterministic changeset byte stream.
584///
585/// Canonicalization rule:
586/// - sort pages by `(page_number, page_xxh3, page_bytes)` before encoding.
587/// - this removes non-deterministic map-iteration effects from `changeset_bytes`.
588///
589/// The encoded stream is self-delimiting via the `total_len` field in the
590/// header.
591///
592/// # Errors
593///
594/// Returns error if `page_size` is 0 or pages are empty.
595pub fn encode_changeset(page_size: u32, pages: &mut [PageEntry]) -> Result<Vec<u8>> {
596    if pages.is_empty() {
597        return Err(FrankenError::OutOfRange {
598            what: "pages".to_owned(),
599            value: "0".to_owned(),
600        });
601    }
602    if page_size == 0 {
603        return Err(FrankenError::OutOfRange {
604            what: "page_size".to_owned(),
605            value: "0".to_owned(),
606        });
607    }
608
609    canonicalize_changeset_pages(pages);
610
611    let n_pages = u32::try_from(pages.len()).map_err(|_| FrankenError::OutOfRange {
612        what: "n_pages".to_owned(),
613        value: pages.len().to_string(),
614    })?;
615
616    // Per-page entry size: 4 (page_number) + 8 (xxh3) + page_size
617    let entry_size = 4_u64 + 8 + u64::from(page_size);
618    let total_len = CHANGESET_HEADER_SIZE as u64 + entry_size * u64::from(n_pages);
619
620    let header = ChangesetHeader {
621        magic: CHANGESET_MAGIC,
622        version: CHANGESET_VERSION,
623        page_size,
624        n_pages,
625        total_len,
626    };
627
628    let buf_cap = usize::try_from(total_len).map_err(|_| FrankenError::OutOfRange {
629        what: "changeset total_len".to_owned(),
630        value: total_len.to_string(),
631    })?;
632    let mut buf = Vec::with_capacity(buf_cap);
633    buf.extend_from_slice(&header.to_bytes());
634
635    for page in pages.iter() {
636        buf.extend_from_slice(&page.page_number.to_le_bytes());
637        buf.extend_from_slice(&page.page_xxh3.to_le_bytes());
638        buf.extend_from_slice(&page.page_bytes);
639    }
640
641    debug!(
642        bead_id = BEAD_ID,
643        n_pages, page_size, total_len, "encoded changeset"
644    );
645
646    debug_assert_eq!(buf.len() as u64, total_len);
647    Ok(buf)
648}
649
650// ---------------------------------------------------------------------------
651// Sharding
652// ---------------------------------------------------------------------------
653
654/// A shard of a large changeset that fits within a single RaptorQ source block.
655#[derive(Debug, Clone)]
656pub struct ChangesetShard {
657    /// The changeset bytes for this shard.
658    pub changeset_bytes: Vec<u8>,
659    /// The changeset identifier for this shard.
660    pub changeset_id: ChangesetId,
661    /// The deterministic seed for RaptorQ encoding.
662    pub seed: u64,
663    /// Number of source symbols (K_source) for this shard.
664    pub k_source: u32,
665}
666
667/// Shard a changeset into pieces that each fit within K_MAX source symbols.
668///
669/// If the changeset fits in one block, returns a single shard.
670///
671/// Large changesets use deterministic contiguous byte-range sharding:
672/// - max shard payload = `K_MAX * T_replication`
673/// - shard `i` = bytes `[i * max_payload .. min((i+1) * max_payload, F))`
674/// - each shard gets its own `changeset_id` and seed derived from shard bytes
675///
676/// # Errors
677///
678/// Returns error if `symbol_size` is 0.
679pub fn shard_changeset(changeset_bytes: Vec<u8>, symbol_size: u16) -> Result<Vec<ChangesetShard>> {
680    let t = u64::from(symbol_size);
681    let f = u64::try_from(changeset_bytes.len()).map_err(|_| FrankenError::OutOfRange {
682        what: "changeset_bytes".to_owned(),
683        value: changeset_bytes.len().to_string(),
684    })?;
685    let k_source_total = compute_k_source(changeset_bytes.len(), symbol_size)?;
686
687    if k_source_total <= u64::from(K_MAX) {
688        let id = compute_changeset_id(&changeset_bytes);
689        let seed = derive_seed_from_changeset_id(&id);
690        let k_source = u32::try_from(k_source_total).expect("checked <= K_MAX");
691        info!(
692            bead_id = BEAD_ID,
693            k_source,
694            symbol_size,
695            changeset_len = changeset_bytes.len(),
696            "single-shard changeset"
697        );
698        return Ok(vec![ChangesetShard {
699            changeset_bytes,
700            changeset_id: id,
701            seed,
702            k_source,
703        }]);
704    }
705
706    // Need to shard: split the changeset bytes into chunks
707    // Each chunk gets its own changeset_id and seed.
708    let max_chunk = u64::from(K_MAX) * t;
709    let n_shards = f.div_ceil(max_chunk);
710
711    info!(
712        bead_id = BEAD_ID,
713        n_shards,
714        k_source_total,
715        symbol_size,
716        changeset_len = changeset_bytes.len(),
717        "sharding large changeset"
718    );
719
720    let n_shards_usize = usize::try_from(n_shards).map_err(|_| FrankenError::OutOfRange {
721        what: "n_shards".to_owned(),
722        value: n_shards.to_string(),
723    })?;
724    let mut shards = Vec::with_capacity(n_shards_usize);
725    let max_chunk_usize = usize::try_from(max_chunk).map_err(|_| FrankenError::OutOfRange {
726        what: "max_chunk".to_owned(),
727        value: max_chunk.to_string(),
728    })?;
729
730    for (i, chunk) in changeset_bytes.chunks(max_chunk_usize).enumerate() {
731        let shard_bytes = chunk.to_vec();
732        let id = compute_changeset_id(&shard_bytes);
733        let seed = derive_seed_from_changeset_id(&id);
734        let k = compute_k_source(chunk.len(), symbol_size)?;
735        let k_source = u32::try_from(k).expect("each shard <= K_MAX symbols");
736
737        debug!(
738            bead_id = BEAD_ID,
739            shard_index = i,
740            k_source,
741            shard_len = chunk.len(),
742            "created changeset shard"
743        );
744
745        shards.push(ChangesetShard {
746            changeset_bytes: shard_bytes,
747            changeset_id: id,
748            seed,
749            k_source,
750        });
751    }
752
753    Ok(shards)
754}
755
756// ---------------------------------------------------------------------------
757// UDP Packet Format
758// ---------------------------------------------------------------------------
759
760/// Replication packet: big-endian header + little-endian symbol payload.
761#[derive(Debug, Clone, PartialEq, Eq)]
762pub struct ReplicationPacket {
763    /// Packet framing format.
764    pub wire_version: ReplicationWireVersion,
765    /// 16-byte changeset identifier for multiplexing.
766    pub changeset_id: ChangesetId,
767    /// Source block number (MUST be 0 in V1).
768    pub sbn: u8,
769    /// Encoding Symbol ID (ISI).
770    pub esi: u32,
771    /// Number of source symbols.
772    pub k_source: u32,
773    /// Number of planned repair symbols for this stream configuration.
774    pub r_repair: u32,
775    /// Symbol size T encoded on wire.
776    pub symbol_size_t: u16,
777    /// Deterministic seed for the object's symbol schedule.
778    pub seed: u64,
779    /// Integrity hash over `symbol_data`.
780    pub payload_xxh3: u64,
781    /// Optional authenticated tag for security mode.
782    pub auth_tag: Option<[u8; 16]>,
783    /// Symbol data (T bytes).
784    pub symbol_data: Vec<u8>,
785}
786
787/// Packet framing versions for compatibility.
788#[derive(Debug, Clone, Copy, PartialEq, Eq)]
789pub enum ReplicationWireVersion {
790    /// Legacy bd-1hi.13 packet layout (24-byte header).
791    LegacyV1,
792    /// Fixed-size versioned packet header with integrity/auth metadata.
793    FramedV2,
794}
795
796/// Metadata carried in a versioned V2 replication packet header.
797#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub struct ReplicationPacketV2Header {
799    pub changeset_id: ChangesetId,
800    pub sbn: u8,
801    pub esi: u32,
802    pub k_source: u32,
803    pub r_repair: u32,
804    pub symbol_size_t: u16,
805    pub seed: u64,
806}
807
808impl ReplicationPacket {
809    /// Create a versioned fixed-header packet and compute payload integrity hash.
810    #[must_use]
811    pub fn new_v2(header: ReplicationPacketV2Header, symbol_data: Vec<u8>) -> Self {
812        let payload_xxh3 = Self::compute_payload_xxh3(&symbol_data);
813        Self {
814            wire_version: ReplicationWireVersion::FramedV2,
815            changeset_id: header.changeset_id,
816            sbn: header.sbn,
817            esi: header.esi,
818            k_source: header.k_source,
819            r_repair: header.r_repair,
820            symbol_size_t: header.symbol_size_t,
821            seed: header.seed,
822            payload_xxh3,
823            auth_tag: None,
824            symbol_data,
825        }
826    }
827
828    /// Compute packet payload hash.
829    #[must_use]
830    pub fn compute_payload_xxh3(symbol_data: &[u8]) -> u64 {
831        xxhash_rust::xxh3::xxh3_64(symbol_data)
832    }
833
834    fn auth_material(&self) -> Vec<u8> {
835        let mut material = Vec::with_capacity(16 + 1 + 4 + 4 + 2 + 8 + 8);
836        material.extend_from_slice(self.changeset_id.as_bytes());
837        material.push(self.sbn);
838        material.extend_from_slice(&self.esi.to_be_bytes());
839        material.extend_from_slice(&self.k_source.to_be_bytes());
840        material.extend_from_slice(&self.r_repair.to_be_bytes());
841        material.extend_from_slice(&self.symbol_size_t.to_be_bytes());
842        material.extend_from_slice(&self.seed.to_be_bytes());
843        material.extend_from_slice(&self.payload_xxh3.to_be_bytes());
844        material
845    }
846
847    fn compute_auth_tag(&self, auth_key: &[u8; 32]) -> [u8; 16] {
848        let mut hasher = blake3::Hasher::new_keyed(auth_key);
849        hasher.update(REPLICATION_PACKET_AUTH_DOMAIN.as_bytes());
850        hasher.update(&self.auth_material());
851        let digest = hasher.finalize();
852        let mut out = [0_u8; 16];
853        out.copy_from_slice(&digest.as_bytes()[..16]);
854        out
855    }
856
857    /// Attach an auth tag for authenticated transport mode.
858    pub fn attach_auth_tag(&mut self, auth_key: &[u8; 32]) {
859        self.auth_tag = Some(self.compute_auth_tag(auth_key));
860    }
861
862    /// Verify payload hash and optional auth tag.
863    #[must_use]
864    pub fn verify_integrity(&self, auth_key: Option<&[u8; 32]>) -> bool {
865        if Self::compute_payload_xxh3(&self.symbol_data) != self.payload_xxh3 {
866            return false;
867        }
868        match (self.auth_tag, auth_key) {
869            (Some(tag), Some(key)) => tag == self.compute_auth_tag(key),
870            (Some(_), None) => false,
871            (None, _) => true,
872        }
873    }
874
875    /// Validate the symbol size against the hard wire limit.
876    ///
877    /// # Errors
878    ///
879    /// Returns error if symbol size exceeds `MAX_REPLICATION_SYMBOL_SIZE`.
880    pub fn validate_symbol_size(symbol_size: usize) -> Result<()> {
881        if symbol_size > MAX_REPLICATION_SYMBOL_SIZE {
882            error!(
883                bead_id = BEAD_ID,
884                symbol_size,
885                max = MAX_REPLICATION_SYMBOL_SIZE,
886                "symbol size exceeds UDP hard wire limit"
887            );
888            return Err(FrankenError::OutOfRange {
889                what: "symbol_size".to_owned(),
890                value: symbol_size.to_string(),
891            });
892        }
893        Ok(())
894    }
895
896    /// Encode to wire format: 24-byte big-endian header + symbol data.
897    ///
898    /// # Errors
899    ///
900    /// Returns error if ESI doesn't fit in 24 bits or symbol exceeds wire limit.
901    pub fn to_bytes(&self) -> Result<Vec<u8>> {
902        if self.esi > 0x00FF_FFFF {
903            return Err(FrankenError::OutOfRange {
904                what: "esi".to_owned(),
905                value: self.esi.to_string(),
906            });
907        }
908        if usize::from(self.symbol_size_t) != self.symbol_data.len() {
909            return Err(FrankenError::DatabaseCorrupt {
910                detail: format!(
911                    "symbol_size_t mismatch: header={}, payload={}",
912                    self.symbol_size_t,
913                    self.symbol_data.len()
914                ),
915            });
916        }
917        Self::validate_symbol_size(self.symbol_data.len())?;
918
919        match self.wire_version {
920            ReplicationWireVersion::LegacyV1 => {
921                let total = REPLICATION_HEADER_SIZE_LEGACY + self.symbol_data.len();
922                let mut buf = Vec::with_capacity(total);
923                buf.extend_from_slice(self.changeset_id.as_bytes());
924                buf.push(self.sbn);
925                let esi_bytes = self.esi.to_be_bytes();
926                buf.extend_from_slice(&esi_bytes[1..4]);
927                buf.extend_from_slice(&self.k_source.to_be_bytes());
928                buf.extend_from_slice(&self.symbol_data);
929                Ok(buf)
930            }
931            ReplicationWireVersion::FramedV2 => {
932                let total = REPLICATION_HEADER_SIZE + self.symbol_data.len();
933                let mut buf = Vec::with_capacity(total);
934                let mut flags = 0_u8;
935                if self.auth_tag.is_some() {
936                    flags |= REPLICATION_FLAG_AUTH_PRESENT;
937                }
938                buf.extend_from_slice(&REPLICATION_PROTOCOL_MAGIC);
939                buf.push(REPLICATION_PROTOCOL_VERSION_V2);
940                buf.push(flags);
941                buf.extend_from_slice(&REPLICATION_HEADER_SIZE_V2_U16.to_be_bytes());
942                buf.extend_from_slice(self.changeset_id.as_bytes());
943                buf.push(self.sbn);
944                let esi_bytes = self.esi.to_be_bytes();
945                buf.extend_from_slice(&esi_bytes[1..4]);
946                buf.extend_from_slice(&self.k_source.to_be_bytes());
947                buf.extend_from_slice(&self.r_repair.to_be_bytes());
948                buf.extend_from_slice(&self.symbol_size_t.to_be_bytes());
949                buf.extend_from_slice(&0_u16.to_be_bytes()); // reserved
950                buf.extend_from_slice(&self.seed.to_be_bytes());
951                buf.extend_from_slice(&self.payload_xxh3.to_be_bytes());
952                if let Some(tag) = self.auth_tag {
953                    buf.extend_from_slice(&tag);
954                } else {
955                    buf.extend_from_slice(&[0_u8; 16]);
956                }
957                buf.extend_from_slice(&self.symbol_data);
958                Ok(buf)
959            }
960        }
961    }
962
963    /// Decode from wire format.
964    ///
965    /// # Errors
966    ///
967    /// Returns error if buffer is too short.
968    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
969        if buf.len() < REPLICATION_HEADER_SIZE_LEGACY {
970            return Err(FrankenError::DatabaseCorrupt {
971                detail: format!(
972                    "replication packet too short: {} < {REPLICATION_HEADER_SIZE_LEGACY}",
973                    buf.len()
974                ),
975            });
976        }
977        let is_v2 = buf.len() >= REPLICATION_HEADER_SIZE
978            && buf[0..4] == REPLICATION_PROTOCOL_MAGIC
979            && buf[4] == REPLICATION_PROTOCOL_VERSION_V2;
980        if is_v2 {
981            let flags = buf[5];
982            let header_len = usize::from(u16::from_be_bytes([buf[6], buf[7]]));
983            if header_len != REPLICATION_HEADER_SIZE {
984                return Err(FrankenError::DatabaseCorrupt {
985                    detail: format!(
986                        "unsupported replication header length: expected {}, got {header_len}",
987                        REPLICATION_HEADER_SIZE
988                    ),
989                });
990            }
991            if buf.len() < header_len {
992                return Err(FrankenError::DatabaseCorrupt {
993                    detail: format!("packet shorter than declared header length: {header_len}"),
994                });
995            }
996            let mut id_bytes = [0_u8; 16];
997            id_bytes.copy_from_slice(&buf[8..24]);
998            let changeset_id = ChangesetId::from_bytes(id_bytes);
999            let sbn = buf[24];
1000            let esi = u32::from(buf[25]) << 16 | u32::from(buf[26]) << 8 | u32::from(buf[27]);
1001            let k_source = u32::from_be_bytes(buf[28..32].try_into().expect("4 bytes"));
1002            let r_repair = u32::from_be_bytes(buf[32..36].try_into().expect("4 bytes"));
1003            let symbol_size_t = u16::from_be_bytes(buf[36..38].try_into().expect("2 bytes"));
1004            let seed = u64::from_be_bytes(buf[40..48].try_into().expect("8 bytes"));
1005            let payload_xxh3 = u64::from_be_bytes(buf[48..56].try_into().expect("8 bytes"));
1006            let mut auth_tag_bytes = [0_u8; 16];
1007            auth_tag_bytes.copy_from_slice(&buf[56..72]);
1008            let auth_tag = if (flags & REPLICATION_FLAG_AUTH_PRESENT) != 0 {
1009                Some(auth_tag_bytes)
1010            } else {
1011                None
1012            };
1013            let symbol_data = buf[header_len..].to_vec();
1014            if symbol_data.len() != usize::from(symbol_size_t) {
1015                return Err(FrankenError::DatabaseCorrupt {
1016                    detail: format!(
1017                        "symbol_size_t mismatch in packet: header={symbol_size_t}, payload={}",
1018                        symbol_data.len()
1019                    ),
1020                });
1021            }
1022            return Ok(Self {
1023                wire_version: ReplicationWireVersion::FramedV2,
1024                changeset_id,
1025                sbn,
1026                esi,
1027                k_source,
1028                r_repair,
1029                symbol_size_t,
1030                seed,
1031                payload_xxh3,
1032                auth_tag,
1033                symbol_data,
1034            });
1035        }
1036
1037        let mut id_bytes = [0_u8; 16];
1038        id_bytes.copy_from_slice(&buf[0..16]);
1039        let changeset_id = ChangesetId::from_bytes(id_bytes);
1040        let sbn = buf[16];
1041        let esi = u32::from(buf[17]) << 16 | u32::from(buf[18]) << 8 | u32::from(buf[19]);
1042        let k_source = u32::from_be_bytes(buf[20..24].try_into().expect("4 bytes"));
1043        let symbol_data = buf[24..].to_vec();
1044        let symbol_size_t =
1045            u16::try_from(symbol_data.len()).map_err(|_| FrankenError::OutOfRange {
1046                what: "symbol_size_t".to_owned(),
1047                value: symbol_data.len().to_string(),
1048            })?;
1049
1050        Ok(Self {
1051            wire_version: ReplicationWireVersion::LegacyV1,
1052            changeset_id,
1053            sbn,
1054            esi,
1055            k_source,
1056            r_repair: 0,
1057            symbol_size_t,
1058            seed: derive_seed_from_changeset_id(&changeset_id),
1059            payload_xxh3: Self::compute_payload_xxh3(&symbol_data),
1060            auth_tag: None,
1061            symbol_data,
1062        })
1063    }
1064
1065    /// Total packet size on the wire.
1066    #[must_use]
1067    pub fn wire_size(&self) -> usize {
1068        let header_size = match self.wire_version {
1069            ReplicationWireVersion::LegacyV1 => REPLICATION_HEADER_SIZE_LEGACY,
1070            ReplicationWireVersion::FramedV2 => REPLICATION_HEADER_SIZE,
1071        };
1072        header_size + self.symbol_data.len()
1073    }
1074
1075    /// Whether this packet carries a source symbol (systematic).
1076    #[must_use]
1077    pub fn is_source_symbol(&self) -> bool {
1078        self.esi < self.k_source
1079    }
1080}
1081
1082// ---------------------------------------------------------------------------
1083// Sender State Machine
1084// ---------------------------------------------------------------------------
1085
1086/// Sender state (§3.4.2).
1087#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1088pub enum SenderState {
1089    /// No active replication session.
1090    Idle,
1091    /// Changeset encoded, encoder prepared.
1092    Encoding,
1093    /// Streaming symbols to receiver(s).
1094    Streaming,
1095    /// Streaming complete, resources released.
1096    Complete,
1097}
1098
1099/// Configuration for the replication sender.
1100#[derive(Debug, Clone)]
1101pub struct SenderConfig {
1102    /// Symbol size for replication transport.
1103    pub symbol_size: u16,
1104    /// Maximum ISI = `max_isi_multiplier * k_source`.
1105    pub max_isi_multiplier: u32,
1106}
1107
1108impl Default for SenderConfig {
1109    fn default() -> Self {
1110        Self {
1111            symbol_size: MTU_SAFE_SYMBOL_SIZE,
1112            max_isi_multiplier: DEFAULT_MAX_ISI_MULTIPLIER,
1113        }
1114    }
1115}
1116
1117/// Prepared encoding session ready for streaming.
1118#[derive(Debug)]
1119pub struct EncodingSession {
1120    /// Shards of the changeset.
1121    pub shards: Vec<ChangesetShard>,
1122    /// Current shard index being streamed.
1123    pub current_shard: usize,
1124    /// Current ISI within the current shard.
1125    pub current_isi: u32,
1126    /// Configuration.
1127    pub config: SenderConfig,
1128}
1129
1130/// Replication sender state machine.
1131#[derive(Debug)]
1132pub struct ReplicationSender {
1133    state: SenderState,
1134    session: Option<EncodingSession>,
1135}
1136
1137impl ReplicationSender {
1138    /// Create a new sender in IDLE state.
1139    #[must_use]
1140    pub fn new() -> Self {
1141        Self {
1142            state: SenderState::Idle,
1143            session: None,
1144        }
1145    }
1146
1147    /// Current state.
1148    #[must_use]
1149    pub const fn state(&self) -> SenderState {
1150        self.state
1151    }
1152
1153    /// Transition from IDLE to ENCODING: prepare a changeset for streaming.
1154    ///
1155    /// # Errors
1156    ///
1157    /// Returns error if not in IDLE state, pages are empty, or symbol size invalid.
1158    pub fn prepare(
1159        &mut self,
1160        page_size: u32,
1161        pages: &mut [PageEntry],
1162        config: SenderConfig,
1163    ) -> Result<()> {
1164        if self.state != SenderState::Idle {
1165            return Err(FrankenError::Internal(format!(
1166                "sender must be IDLE to prepare, current state: {:?}",
1167                self.state
1168            )));
1169        }
1170
1171        ReplicationPacket::validate_symbol_size(usize::from(config.symbol_size))?;
1172
1173        let changeset_bytes = encode_changeset(page_size, pages)?;
1174        let shards = shard_changeset(changeset_bytes, config.symbol_size)?;
1175
1176        info!(
1177            bead_id = BEAD_ID,
1178            n_shards = shards.len(),
1179            symbol_size = config.symbol_size,
1180            "sender prepared for streaming"
1181        );
1182
1183        self.session = Some(EncodingSession {
1184            shards,
1185            current_shard: 0,
1186            current_isi: 0,
1187            config,
1188        });
1189        self.state = SenderState::Encoding;
1190        Ok(())
1191    }
1192
1193    /// Transition from ENCODING to STREAMING.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns error if not in ENCODING state.
1198    pub fn start_streaming(&mut self) -> Result<()> {
1199        if self.state != SenderState::Encoding {
1200            return Err(FrankenError::Internal(format!(
1201                "sender must be ENCODING to start streaming, current state: {:?}",
1202                self.state
1203            )));
1204        }
1205        self.state = SenderState::Streaming;
1206        info!(bead_id = BEAD_ID, "sender started streaming");
1207        Ok(())
1208    }
1209
1210    /// Generate the next replication packet in the stream.
1211    ///
1212    /// Returns `None` when all shards have been fully streamed (ISI limit reached).
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns error if not in STREAMING state.
1217    #[allow(clippy::too_many_lines)]
1218    pub fn next_packet(&mut self) -> Result<Option<ReplicationPacket>> {
1219        if self.state != SenderState::Streaming {
1220            return Err(FrankenError::Internal(format!(
1221                "sender must be STREAMING to generate packets, current state: {:?}",
1222                self.state
1223            )));
1224        }
1225
1226        let session = self
1227            .session
1228            .as_mut()
1229            .expect("session exists in STREAMING state");
1230
1231        if session.current_shard >= session.shards.len() {
1232            // All shards complete.
1233            return Ok(None);
1234        }
1235
1236        let shard = &session.shards[session.current_shard];
1237        let max_isi = shard
1238            .k_source
1239            .saturating_mul(session.config.max_isi_multiplier);
1240
1241        if session.current_isi >= max_isi {
1242            // Move to next shard.
1243            session.current_shard += 1;
1244            session.current_isi = 0;
1245
1246            if session.current_shard >= session.shards.len() {
1247                return Ok(None);
1248            }
1249
1250            let next_shard = &session.shards[session.current_shard];
1251            debug!(
1252                bead_id = BEAD_ID,
1253                shard_index = session.current_shard,
1254                k_source = next_shard.k_source,
1255                "advancing to next shard"
1256            );
1257        }
1258
1259        let shard = &session.shards[session.current_shard];
1260        let isi = session.current_isi;
1261        let t = usize::from(session.config.symbol_size);
1262
1263        // Generate symbol data for current ISI.
1264        // For source symbols (ISI < K_source): extract from changeset bytes.
1265        // For repair symbols (ISI >= K_source): would use RaptorQ encoder in production.
1266        // Here we provide the framework; actual FEC encoding is delegated to asupersync.
1267        let symbol_data = if u64::from(isi) < u64::from(shard.k_source) {
1268            // Source symbol: extract T bytes starting at ISI * T.
1269            let start = isi as usize * t;
1270            let end = (start + t).min(shard.changeset_bytes.len());
1271            let mut data = vec![0_u8; t];
1272            let available = end.saturating_sub(start);
1273            if available > 0 {
1274                data[..available].copy_from_slice(&shard.changeset_bytes[start..end]);
1275            }
1276            // Remaining bytes are zero-padded (per RFC 6330 symbol alignment).
1277            data
1278        } else {
1279            // Repair symbol: placeholder (production uses RaptorQ intermediate symbols).
1280            // For now, generate deterministic placeholder from seed + ISI.
1281            let mut data = vec![0_u8; t];
1282            #[allow(clippy::cast_possible_truncation)]
1283            {
1284                let repair_seed = shard.seed.wrapping_add(u64::from(isi));
1285                for (i, byte) in data.iter_mut().enumerate() {
1286                    let mixed = repair_seed
1287                        .wrapping_mul(0x9E37_79B9_7F4A_7C15)
1288                        .wrapping_add(i as u64);
1289                    *byte = (mixed >> 32) as u8;
1290                }
1291            }
1292            warn!(
1293                bead_id = BEAD_ID,
1294                isi,
1295                shard_index = session.current_shard,
1296                "generated placeholder repair symbol (production uses RaptorQ encoder)"
1297            );
1298            data
1299        };
1300
1301        let r_repair = max_isi.saturating_sub(shard.k_source);
1302        let packet = ReplicationPacket::new_v2(
1303            ReplicationPacketV2Header {
1304                changeset_id: shard.changeset_id,
1305                sbn: 0, // V1/V2 single-source-block path
1306                esi: isi,
1307                k_source: shard.k_source,
1308                r_repair,
1309                symbol_size_t: session.config.symbol_size,
1310                seed: shard.seed,
1311            },
1312            symbol_data,
1313        );
1314
1315        session.current_isi += 1;
1316        Ok(Some(packet))
1317    }
1318
1319    /// Acknowledge completion from receiver: stop streaming and transition to COMPLETE.
1320    ///
1321    /// # Errors
1322    ///
1323    /// Returns error if not in STREAMING state.
1324    pub fn acknowledge_complete(&mut self) -> Result<()> {
1325        if self.state != SenderState::Streaming {
1326            return Err(FrankenError::Internal(format!(
1327                "sender must be STREAMING to acknowledge, current state: {:?}",
1328                self.state
1329            )));
1330        }
1331        self.state = SenderState::Complete;
1332        info!(bead_id = BEAD_ID, "sender acknowledged completion");
1333        Ok(())
1334    }
1335
1336    /// Complete streaming: release resources and transition to COMPLETE.
1337    ///
1338    /// This is called when ISI limit is reached or explicit stop.
1339    pub fn complete(&mut self) {
1340        if self.state == SenderState::Streaming || self.state == SenderState::Encoding {
1341            self.state = SenderState::Complete;
1342            info!(bead_id = BEAD_ID, "sender completed");
1343        }
1344    }
1345
1346    /// Reset to IDLE for the next replication session.
1347    pub fn reset(&mut self) {
1348        self.state = SenderState::Idle;
1349        self.session = None;
1350        debug!(bead_id = BEAD_ID, "sender reset to IDLE");
1351    }
1352}
1353
1354impl Default for ReplicationSender {
1355    fn default() -> Self {
1356        Self::new()
1357    }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use super::*;
1363
1364    const TEST_BEAD_ID: &str = "bd-1hi.13";
1365    const TEST_BEAD_BD_1SQU: &str = "bd-1squ";
1366
1367    #[allow(clippy::cast_possible_truncation)]
1368    fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
1369        page_numbers
1370            .iter()
1371            .map(|&pn| {
1372                let mut data = vec![0_u8; page_size as usize];
1373                // Fill with deterministic data based on page number.
1374                for (i, byte) in data.iter_mut().enumerate() {
1375                    *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
1376                }
1377                PageEntry::new(pn, data)
1378            })
1379            .collect()
1380    }
1381
1382    // -----------------------------------------------------------------------
1383    // Changeset encoding tests
1384    // -----------------------------------------------------------------------
1385
1386    #[test]
1387    fn test_changeset_header_format() {
1388        let header = ChangesetHeader {
1389            magic: CHANGESET_MAGIC,
1390            version: CHANGESET_VERSION,
1391            page_size: 4096,
1392            n_pages: 10,
1393            total_len: 42_000,
1394        };
1395        let bytes = header.to_bytes();
1396        assert_eq!(
1397            &bytes[0..4],
1398            b"FSRP",
1399            "bead_id={TEST_BEAD_ID} case=header_magic"
1400        );
1401        assert_eq!(bytes.len(), CHANGESET_HEADER_SIZE);
1402
1403        let decoded = ChangesetHeader::from_bytes(&bytes).expect("decode should succeed");
1404        assert_eq!(
1405            header, decoded,
1406            "bead_id={TEST_BEAD_ID} case=header_roundtrip"
1407        );
1408    }
1409
1410    #[test]
1411    fn test_changeset_encoding_deterministic() {
1412        let page_size = 512_u32;
1413        let mut pages_a = make_pages(page_size, &[3, 1, 2]);
1414        let mut pages_b = make_pages(page_size, &[2, 3, 1]); // different order
1415
1416        let bytes_a = encode_changeset(page_size, &mut pages_a).expect("encode a");
1417        let bytes_b = encode_changeset(page_size, &mut pages_b).expect("encode b");
1418
1419        // Same pages (different input order) → same changeset bytes (sorted).
1420        assert_eq!(
1421            bytes_a, bytes_b,
1422            "bead_id={TEST_BEAD_ID} case=deterministic_encoding"
1423        );
1424
1425        // Same bytes → same changeset_id.
1426        let id_a = compute_changeset_id(&bytes_a);
1427        let id_b = compute_changeset_id(&bytes_b);
1428        assert_eq!(
1429            id_a, id_b,
1430            "bead_id={TEST_BEAD_ID} case=deterministic_changeset_id"
1431        );
1432    }
1433
1434    #[test]
1435    fn test_changeset_id_domain_separation() {
1436        let data = b"test payload";
1437
1438        // Changeset domain
1439        let changeset_id = compute_changeset_id(data);
1440
1441        // Different domain (simulating ECS)
1442        let mut hasher = blake3::Hasher::new();
1443        hasher.update(b"fsqlite:ecs:v1");
1444        hasher.update(data);
1445        let ecs_hash = hasher.finalize();
1446        let mut ecs_id = [0_u8; 16];
1447        ecs_id.copy_from_slice(&ecs_hash.as_bytes()[..16]);
1448
1449        assert_ne!(
1450            changeset_id.as_bytes(),
1451            &ecs_id,
1452            "bead_id={TEST_BEAD_ID} case=domain_separation"
1453        );
1454    }
1455
1456    #[test]
1457    fn test_seed_derivation() {
1458        let id = ChangesetId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
1459        let seed = derive_seed_from_changeset_id(&id);
1460
1461        // Deterministic: same id → same seed.
1462        let seed2 = derive_seed_from_changeset_id(&id);
1463        assert_eq!(
1464            seed, seed2,
1465            "bead_id={TEST_BEAD_ID} case=seed_deterministic"
1466        );
1467
1468        // Non-trivial.
1469        assert_ne!(seed, 0, "bead_id={TEST_BEAD_ID} case=seed_nonzero");
1470    }
1471
1472    #[test]
1473    fn test_bd_1squ_changeset_id_stability() {
1474        let payload = b"deterministic-changeset-payload";
1475        let id_a = compute_changeset_id(payload);
1476        let id_b = compute_changeset_id(payload);
1477        assert_eq!(
1478            id_a, id_b,
1479            "bead_id={TEST_BEAD_BD_1SQU} case=id_stability_same_payload"
1480        );
1481
1482        let mut altered = payload.to_vec();
1483        altered[0] ^= 0xFF;
1484        let id_c = compute_changeset_id(&altered);
1485        assert_ne!(
1486            id_a, id_c,
1487            "bead_id={TEST_BEAD_BD_1SQU} case=id_stability_diff_payload"
1488        );
1489    }
1490
1491    #[test]
1492    fn test_bd_1squ_seed_stability() {
1493        let id = compute_changeset_id(b"seed-stability");
1494        let seed_a = derive_seed_from_changeset_id(&id);
1495        let seed_b = derive_seed_from_changeset_id(&id);
1496        assert_eq!(
1497            seed_a, seed_b,
1498            "bead_id={TEST_BEAD_BD_1SQU} case=seed_stability_same_id"
1499        );
1500
1501        let other = compute_changeset_id(b"seed-stability-other");
1502        let seed_other = derive_seed_from_changeset_id(&other);
1503        assert_ne!(
1504            seed_a, seed_other,
1505            "bead_id={TEST_BEAD_BD_1SQU} case=seed_stability_diff_id"
1506        );
1507    }
1508
1509    #[test]
1510    fn test_bd_1squ_k_source_computation() {
1511        assert_eq!(
1512            compute_k_source(0, 256).expect("k_source"),
1513            0,
1514            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_empty"
1515        );
1516        assert_eq!(
1517            compute_k_source(1, 256).expect("k_source"),
1518            1,
1519            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_single_byte"
1520        );
1521        assert_eq!(
1522            compute_k_source(256, 256).expect("k_source"),
1523            1,
1524            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_exact_division"
1525        );
1526        assert_eq!(
1527            compute_k_source(257, 256).expect("k_source"),
1528            2,
1529            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_round_up"
1530        );
1531        assert_eq!(
1532            compute_k_source(usize::try_from(K_MAX).unwrap() * 64, 64).expect("k_source"),
1533            u64::from(K_MAX),
1534            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_kmax_boundary"
1535        );
1536        assert_eq!(
1537            compute_k_source(usize::try_from(K_MAX).unwrap() * 64 + 1, 64).expect("k_source"),
1538            u64::from(K_MAX) + 1,
1539            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_kmax_plus_one"
1540        );
1541        assert!(
1542            compute_k_source(10, 0).is_err(),
1543            "bead_id={TEST_BEAD_BD_1SQU} case=k_source_zero_symbol_rejected"
1544        );
1545    }
1546
1547    #[test]
1548    fn test_bd_1squ_sharding_threshold_rule() {
1549        let symbol_size = 64_u16;
1550        let max_payload = usize::try_from(u64::from(K_MAX) * u64::from(symbol_size)).unwrap();
1551
1552        let exact = vec![0xA5_u8; max_payload];
1553        let exact_shards = shard_changeset(exact, symbol_size).expect("exact shard");
1554        assert_eq!(
1555            exact_shards.len(),
1556            1,
1557            "bead_id={TEST_BEAD_BD_1SQU} case=exact_threshold_single_shard"
1558        );
1559        assert_eq!(
1560            exact_shards[0].k_source, K_MAX,
1561            "bead_id={TEST_BEAD_BD_1SQU} case=exact_threshold_kmax"
1562        );
1563
1564        let over = vec![0x5A_u8; max_payload + 1];
1565        let over_shards = shard_changeset(over, symbol_size).expect("over shard");
1566        assert_eq!(
1567            over_shards.len(),
1568            2,
1569            "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_two_shards"
1570        );
1571        assert_eq!(
1572            over_shards[0].k_source, K_MAX,
1573            "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_first_kmax"
1574        );
1575        assert_eq!(
1576            over_shards[1].k_source, 1,
1577            "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_second_one_symbol"
1578        );
1579    }
1580
1581    #[test]
1582    fn test_page_entries_sorted() {
1583        let page_size = 128_u32;
1584        let mut pages = make_pages(page_size, &[5, 1, 3, 2, 4]);
1585        let bytes = encode_changeset(page_size, &mut pages).expect("encode");
1586
1587        // Verify pages are sorted in the output.
1588        assert_eq!(pages[0].page_number, 1);
1589        assert_eq!(pages[1].page_number, 2);
1590        assert_eq!(pages[2].page_number, 3);
1591        assert_eq!(pages[3].page_number, 4);
1592        assert_eq!(pages[4].page_number, 5);
1593
1594        // Verify total_len from header matches actual length.
1595        let header_bytes: [u8; CHANGESET_HEADER_SIZE] =
1596            bytes[..CHANGESET_HEADER_SIZE].try_into().unwrap();
1597        let header = ChangesetHeader::from_bytes(&header_bytes).expect("decode header");
1598        assert_eq!(
1599            header.total_len,
1600            bytes.len() as u64,
1601            "bead_id={TEST_BEAD_ID} case=total_len_matches"
1602        );
1603        assert_eq!(header.n_pages, 5);
1604    }
1605
1606    #[test]
1607    fn test_page_xxh3_validation() {
1608        let page = PageEntry::new(1, vec![0xAA; 4096]);
1609        assert!(
1610            page.validate_xxh3(),
1611            "bead_id={TEST_BEAD_ID} case=xxh3_valid"
1612        );
1613
1614        // Tampered page fails validation.
1615        let mut tampered = page;
1616        tampered.page_bytes[0] ^= 0xFF;
1617        assert!(
1618            !tampered.validate_xxh3(),
1619            "bead_id={TEST_BEAD_ID} case=xxh3_tampered"
1620        );
1621    }
1622
1623    // -----------------------------------------------------------------------
1624    // UDP Packet format tests
1625    // -----------------------------------------------------------------------
1626
1627    #[test]
1628    fn test_udp_packet_format() {
1629        let id = ChangesetId::from_bytes([0xAA; 16]);
1630        let packet = ReplicationPacket::new_v2(
1631            ReplicationPacketV2Header {
1632                changeset_id: id,
1633                sbn: 0,
1634                esi: 42,
1635                k_source: 100,
1636                r_repair: 12,
1637                symbol_size_t: 512,
1638                seed: derive_seed_from_changeset_id(&id),
1639            },
1640            vec![0x55; 512],
1641        );
1642
1643        let wire = packet.to_bytes().expect("encode");
1644        assert_eq!(
1645            wire.len(),
1646            REPLICATION_HEADER_SIZE + 512,
1647            "bead_id={TEST_BEAD_ID} case=packet_size"
1648        );
1649
1650        // Header is versioned and fixed-size.
1651        assert_eq!(&wire[0..4], &REPLICATION_PROTOCOL_MAGIC);
1652        assert_eq!(wire[4], REPLICATION_PROTOCOL_VERSION_V2);
1653        assert_eq!(wire[5], 0, "flags");
1654        assert_eq!(&wire[8..24], &[0xAA; 16], "changeset_id");
1655        assert_eq!(wire[24], 0, "sbn");
1656        assert_eq!(&wire[25..28], &[0, 0, 42], "esi u24 big-endian");
1657        assert_eq!(&wire[28..32], &100_u32.to_be_bytes(), "k_source");
1658        assert_eq!(&wire[32..36], &12_u32.to_be_bytes(), "r_repair");
1659        assert_eq!(&wire[36..38], &512_u16.to_be_bytes(), "symbol_size_t");
1660
1661        // Roundtrip.
1662        let decoded = ReplicationPacket::from_bytes(&wire).expect("decode");
1663        assert_eq!(
1664            packet, decoded,
1665            "bead_id={TEST_BEAD_ID} case=packet_roundtrip"
1666        );
1667    }
1668
1669    #[test]
1670    fn test_udp_packet_mtu_safe() {
1671        // T=1400 → packet 1472 bytes. With IP(20) + UDP(8) = 1500 = Ethernet MTU.
1672        let t = usize::from(MTU_SAFE_SYMBOL_SIZE);
1673        let total = REPLICATION_HEADER_SIZE + t;
1674        assert_eq!(
1675            total, 1472,
1676            "bead_id={TEST_BEAD_ID} case=mtu_safe_packet_size"
1677        );
1678        // Plus IP + UDP headers: 1472 + 20 + 8 = 1500.
1679        assert_eq!(total + 20 + 8, 1500, "fits in Ethernet MTU");
1680    }
1681
1682    #[test]
1683    fn test_hard_wire_limit() {
1684        // Symbol that exceeds the hard wire limit.
1685        let oversized = MAX_REPLICATION_SYMBOL_SIZE + 1;
1686        let result = ReplicationPacket::validate_symbol_size(oversized);
1687        assert!(
1688            result.is_err(),
1689            "bead_id={TEST_BEAD_ID} case=hard_wire_limit_rejected"
1690        );
1691
1692        // At the limit: OK.
1693        let at_limit = MAX_REPLICATION_SYMBOL_SIZE;
1694        let result = ReplicationPacket::validate_symbol_size(at_limit);
1695        assert!(
1696            result.is_ok(),
1697            "bead_id={TEST_BEAD_ID} case=hard_wire_limit_at_max"
1698        );
1699    }
1700
1701    // -----------------------------------------------------------------------
1702    // State machine tests
1703    // -----------------------------------------------------------------------
1704
1705    #[test]
1706    fn test_sender_idle_to_encoding() {
1707        let mut sender = ReplicationSender::new();
1708        assert_eq!(sender.state(), SenderState::Idle);
1709
1710        let mut pages = make_pages(512, &[1, 2, 3]);
1711        sender
1712            .prepare(512, &mut pages, SenderConfig::default())
1713            .expect("prepare");
1714        assert_eq!(
1715            sender.state(),
1716            SenderState::Encoding,
1717            "bead_id={TEST_BEAD_ID} case=idle_to_encoding"
1718        );
1719    }
1720
1721    #[test]
1722    fn test_streaming_source_then_repair() {
1723        let mut sender = ReplicationSender::new();
1724        let mut pages = make_pages(512, &[1, 2]);
1725        let config = SenderConfig {
1726            symbol_size: 512,
1727            max_isi_multiplier: 2,
1728        };
1729        sender.prepare(512, &mut pages, config).expect("prepare");
1730        sender.start_streaming().expect("start");
1731
1732        let session = sender.session.as_ref().unwrap();
1733        let k_source = session.shards[0].k_source;
1734
1735        let mut source_count = 0_u32;
1736        let mut repair_count = 0_u32;
1737        let mut last_isi = 0_u32;
1738
1739        while let Some(packet) = sender.next_packet().expect("next") {
1740            if packet.is_source_symbol() {
1741                source_count += 1;
1742            } else {
1743                repair_count += 1;
1744            }
1745            last_isi = packet.esi;
1746        }
1747
1748        assert!(
1749            source_count > 0,
1750            "bead_id={TEST_BEAD_ID} case=has_source_symbols"
1751        );
1752        assert!(
1753            repair_count > 0,
1754            "bead_id={TEST_BEAD_ID} case=has_repair_symbols"
1755        );
1756        assert_eq!(
1757            source_count, k_source,
1758            "bead_id={TEST_BEAD_ID} case=source_count_matches_k"
1759        );
1760        assert_eq!(
1761            last_isi,
1762            k_source * 2 - 1,
1763            "bead_id={TEST_BEAD_ID} case=max_isi_reached"
1764        );
1765    }
1766
1767    #[test]
1768    fn test_streaming_systematic_first_ordering() {
1769        let mut sender = ReplicationSender::new();
1770        let mut pages = make_pages(512, &[1, 2]);
1771        let config = SenderConfig {
1772            symbol_size: 512,
1773            max_isi_multiplier: 2,
1774        };
1775        sender.prepare(512, &mut pages, config).expect("prepare");
1776        sender.start_streaming().expect("start");
1777
1778        let session = sender.session.as_ref().expect("session");
1779        let k_source = session.shards[0].k_source;
1780        let k_source_usize = usize::try_from(k_source).expect("K_source fits usize");
1781
1782        let mut observed_esis = Vec::new();
1783        while let Some(packet) = sender.next_packet().expect("next") {
1784            observed_esis.push(packet.esi);
1785        }
1786
1787        assert!(
1788            observed_esis.len() >= k_source_usize,
1789            "bead_id={TEST_BEAD_ID} case=have_at_least_k_source_packets"
1790        );
1791
1792        let expected_systematic: Vec<u32> = (0..k_source).collect();
1793        assert_eq!(
1794            &observed_esis[..k_source_usize],
1795            expected_systematic.as_slice(),
1796            "bead_id={TEST_BEAD_ID} case=systematic_first_ordering"
1797        );
1798
1799        if observed_esis.len() > k_source_usize {
1800            assert!(
1801                observed_esis[k_source_usize] >= k_source,
1802                "bead_id={TEST_BEAD_ID} case=repair_starts_after_systematic"
1803            );
1804        }
1805    }
1806
1807    #[test]
1808    fn test_streaming_schedule_deterministic_across_runs() {
1809        fn collect_packets(
1810            page_size: u32,
1811            page_numbers: &[u32],
1812            config: &SenderConfig,
1813        ) -> Vec<ReplicationPacket> {
1814            let mut sender = ReplicationSender::new();
1815            let mut pages = make_pages(page_size, page_numbers);
1816            sender
1817                .prepare(page_size, &mut pages, config.clone())
1818                .expect("prepare");
1819            sender.start_streaming().expect("start");
1820
1821            let mut packets = Vec::new();
1822            while let Some(packet) = sender.next_packet().expect("next") {
1823                packets.push(packet);
1824            }
1825            packets
1826        }
1827
1828        let config = SenderConfig {
1829            symbol_size: 256,
1830            max_isi_multiplier: 2,
1831        };
1832        let run_a = collect_packets(512, &[1, 3, 2], &config);
1833        let run_b = collect_packets(512, &[1, 3, 2], &config);
1834
1835        assert_eq!(
1836            run_a.len(),
1837            run_b.len(),
1838            "bead_id={TEST_BEAD_ID} case=deterministic_run_packet_count"
1839        );
1840        assert_eq!(
1841            run_a, run_b,
1842            "bead_id={TEST_BEAD_ID} case=deterministic_schedule_reproducible"
1843        );
1844    }
1845
1846    #[test]
1847    fn test_streaming_stop_on_ack() {
1848        let mut sender = ReplicationSender::new();
1849        let mut pages = make_pages(512, &[1]);
1850        sender
1851            .prepare(512, &mut pages, SenderConfig::default())
1852            .expect("prepare");
1853        sender.start_streaming().expect("start");
1854
1855        // Generate a few packets.
1856        let _p1 = sender.next_packet().expect("next").expect("packet");
1857
1858        // Receiver ACKs completion.
1859        sender.acknowledge_complete().expect("ack");
1860        assert_eq!(
1861            sender.state(),
1862            SenderState::Complete,
1863            "bead_id={TEST_BEAD_ID} case=stop_on_ack"
1864        );
1865        assert!(
1866            sender.next_packet().is_err(),
1867            "bead_id={TEST_BEAD_ID} case=no_packets_after_ack_complete"
1868        );
1869    }
1870
1871    #[test]
1872    fn test_streaming_stop_on_max_isi() {
1873        let mut sender = ReplicationSender::new();
1874        let mut pages = make_pages(128, &[1]);
1875        let config = SenderConfig {
1876            symbol_size: 128,
1877            max_isi_multiplier: 2,
1878        };
1879        sender.prepare(128, &mut pages, config).expect("prepare");
1880        sender.start_streaming().expect("start");
1881
1882        let mut count = 0_u32;
1883        while sender.next_packet().expect("next").is_some() {
1884            count += 1;
1885        }
1886
1887        // Should have generated exactly k_source * max_isi_multiplier packets.
1888        let session = sender.session.as_ref().unwrap();
1889        let expected = session.shards[0].k_source * 2;
1890        assert_eq!(
1891            count, expected,
1892            "bead_id={TEST_BEAD_ID} case=stop_on_max_isi"
1893        );
1894    }
1895
1896    #[test]
1897    fn test_block_size_limit_sharding() {
1898        // Create a changeset that exceeds K_MAX source symbols.
1899        let symbol_size = 64_u16;
1900        let bytes_per_max_block = u64::from(K_MAX) * u64::from(symbol_size);
1901        // Make changeset bytes just over the limit.
1902        let changeset_bytes = vec![0xAB_u8; usize::try_from(bytes_per_max_block).unwrap() + 1];
1903        let shards = shard_changeset(changeset_bytes.clone(), symbol_size).expect("shard");
1904
1905        assert!(
1906            shards.len() > 1,
1907            "bead_id={TEST_BEAD_ID} case=sharding_triggered shards={}",
1908            shards.len()
1909        );
1910
1911        // Each shard has k_source <= K_MAX.
1912        for (i, shard) in shards.iter().enumerate() {
1913            assert!(
1914                shard.k_source <= K_MAX,
1915                "bead_id={TEST_BEAD_ID} case=shard_k_max shard={i} k_source={}",
1916                shard.k_source
1917            );
1918        }
1919
1920        // All bytes covered.
1921        let total_bytes: usize = shards.iter().map(|s| s.changeset_bytes.len()).sum();
1922        assert_eq!(
1923            total_bytes,
1924            changeset_bytes.len(),
1925            "bead_id={TEST_BEAD_ID} case=sharding_coverage"
1926        );
1927    }
1928
1929    // -----------------------------------------------------------------------
1930    // Property tests
1931    // -----------------------------------------------------------------------
1932
1933    #[test]
1934    fn prop_changeset_id_unique() {
1935        let page_size = 128_u32;
1936        let mut ids = Vec::new();
1937        for seed in 0_u32..20 {
1938            let mut pages = vec![PageEntry::new(
1939                1,
1940                vec![u8::try_from(seed).unwrap(); page_size as usize],
1941            )];
1942            let bytes = encode_changeset(page_size, &mut pages).expect("encode");
1943            ids.push(compute_changeset_id(&bytes));
1944        }
1945
1946        // All IDs should be unique.
1947        for i in 0..ids.len() {
1948            for j in (i + 1)..ids.len() {
1949                assert_ne!(
1950                    ids[i], ids[j],
1951                    "bead_id={TEST_BEAD_ID} case=prop_id_unique i={i} j={j}"
1952                );
1953            }
1954        }
1955    }
1956
1957    #[test]
1958    fn prop_sharding_covers_all_pages() {
1959        let symbol_size = 64_u16;
1960        for size_multiplier in [1_u64, 2, 5] {
1961            let total = u64::from(K_MAX) * u64::from(symbol_size) * size_multiplier + 7;
1962            let changeset = vec![0xCC_u8; usize::try_from(total).unwrap()];
1963            let shards = shard_changeset(changeset.clone(), symbol_size).expect("shard");
1964
1965            let reassembled: Vec<u8> = shards
1966                .iter()
1967                .flat_map(|s| s.changeset_bytes.iter().copied())
1968                .collect();
1969
1970            assert_eq!(
1971                reassembled, changeset,
1972                "bead_id={TEST_BEAD_ID} case=prop_sharding_coverage multiplier={size_multiplier}"
1973            );
1974        }
1975    }
1976
1977    // -----------------------------------------------------------------------
1978    // Compliance tests
1979    // -----------------------------------------------------------------------
1980
1981    #[test]
1982    fn test_bd_1hi_13_unit_compliance_gate() {
1983        assert_eq!(CHANGESET_MAGIC, *b"FSRP");
1984        assert_eq!(CHANGESET_VERSION, 1);
1985        assert_eq!(CHANGESET_HEADER_SIZE, 22);
1986        assert_eq!(REPLICATION_HEADER_SIZE_LEGACY, 24);
1987        assert_eq!(REPLICATION_HEADER_SIZE, 72);
1988        assert_eq!(REPLICATION_HEADER_SIZE_V2, 72);
1989        assert_eq!(MAX_UDP_PAYLOAD, 65_507);
1990        const { assert!(MAX_REPLICATION_SYMBOL_SIZE < MAX_UDP_PAYLOAD) };
1991
1992        // Verify core functions exist.
1993        let _ = ChangesetId::from_bytes([0; 16]);
1994        let _ = compute_changeset_id(b"test");
1995        let _ = derive_seed_from_changeset_id(&ChangesetId::from_bytes([0; 16]));
1996    }
1997
1998    #[test]
1999    fn prop_bd_1hi_13_structure_compliance() {
2000        // State machine transitions are correct.
2001        let mut sender = ReplicationSender::new();
2002        assert_eq!(sender.state(), SenderState::Idle);
2003
2004        let mut pages = make_pages(256, &[1, 2]);
2005        sender
2006            .prepare(256, &mut pages, SenderConfig::default())
2007            .expect("prepare");
2008        assert_eq!(sender.state(), SenderState::Encoding);
2009
2010        sender.start_streaming().expect("start");
2011        assert_eq!(sender.state(), SenderState::Streaming);
2012
2013        sender.complete();
2014        assert_eq!(sender.state(), SenderState::Complete);
2015
2016        sender.reset();
2017        assert_eq!(sender.state(), SenderState::Idle);
2018    }
2019
2020    // -----------------------------------------------------------------------
2021    // §4.19.6 networking policy tests (bd-i0m5)
2022    // -----------------------------------------------------------------------
2023
2024    #[test]
2025    fn test_tls_by_default() {
2026        let cfg = NetworkStackConfig::default();
2027        assert_eq!(cfg.security, TransportSecurityMode::RustlsTls);
2028        assert!(cfg.validate_security().is_ok());
2029    }
2030
2031    #[test]
2032    fn test_plaintext_requires_explicit_opt_in() {
2033        let cfg = NetworkStackConfig {
2034            security: TransportSecurityMode::Plaintext,
2035            explicit_plaintext_opt_in: false,
2036            ..NetworkStackConfig::default()
2037        };
2038        let err = cfg.validate_security().unwrap_err();
2039        assert!(matches!(err, FrankenError::Unsupported));
2040
2041        let opted_in = NetworkStackConfig::plaintext_local_dev(true).unwrap();
2042        assert_eq!(opted_in.security, TransportSecurityMode::Plaintext);
2043        assert!(opted_in.validate_security().is_ok());
2044    }
2045
2046    #[test]
2047    fn test_http2_max_concurrent_streams() {
2048        let cfg = NetworkStackConfig::default();
2049        assert!(
2050            cfg.validate_concurrent_streams(DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS)
2051                .is_ok()
2052        );
2053        let err = cfg
2054            .validate_concurrent_streams(DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS + 1)
2055            .unwrap_err();
2056        assert!(matches!(err, FrankenError::Busy));
2057    }
2058
2059    #[test]
2060    fn test_http2_max_header_list_size() {
2061        let cfg = NetworkStackConfig::default();
2062        assert!(
2063            cfg.validate_header_list_size(DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE)
2064                .is_ok()
2065        );
2066        let err = cfg
2067            .validate_header_list_size(DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE + 1)
2068            .unwrap_err();
2069        assert!(matches!(err, FrankenError::TooBig));
2070    }
2071
2072    #[test]
2073    fn test_http2_continuation_timeout() {
2074        let cfg = NetworkStackConfig::default();
2075        assert!(
2076            cfg.validate_continuation_elapsed(DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS)
2077                .is_ok()
2078        );
2079        let err = cfg
2080            .validate_continuation_elapsed(DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS + 1)
2081            .unwrap_err();
2082        assert!(matches!(err, FrankenError::BusyRecovery));
2083    }
2084
2085    #[test]
2086    fn test_message_size_cap_enforced() {
2087        let cfg = NetworkStackConfig::default();
2088        assert!(
2089            cfg.validate_message_size(DEFAULT_RPC_MESSAGE_CAP_BYTES)
2090                .is_ok()
2091        );
2092        let err = cfg
2093            .validate_message_size(DEFAULT_RPC_MESSAGE_CAP_BYTES + 1)
2094            .unwrap_err();
2095        assert!(matches!(err, FrankenError::TooBig));
2096    }
2097
2098    #[test]
2099    fn test_handshake_timeout_bounded() {
2100        let cfg = NetworkStackConfig {
2101            handshake_timeout_ms: DEFAULT_HANDSHAKE_TIMEOUT_MS,
2102            ..NetworkStackConfig::default()
2103        };
2104        assert!(
2105            cfg.validate_handshake_elapsed(DEFAULT_HANDSHAKE_TIMEOUT_MS)
2106                .is_ok()
2107        );
2108        let err = cfg
2109            .validate_handshake_elapsed(DEFAULT_HANDSHAKE_TIMEOUT_MS + 500)
2110            .unwrap_err();
2111        assert!(matches!(err, FrankenError::BusyRecovery));
2112    }
2113
2114    #[test]
2115    fn test_virtual_tcp_deterministic() {
2116        let faults = VirtualTcpFaultProfile {
2117            drop_per_million: 150_000,
2118            reorder_per_million: 200_000,
2119            corrupt_per_million: 125_000,
2120        };
2121        let payloads = vec![
2122            b"alpha".to_vec(),
2123            b"beta".to_vec(),
2124            b"gamma".to_vec(),
2125            b"delta".to_vec(),
2126            b"epsilon".to_vec(),
2127        ];
2128
2129        let mut left = VirtualTcp::new(42, faults).unwrap();
2130        let mut left_out = Vec::new();
2131        for payload in &payloads {
2132            left_out.extend(left.transmit(payload));
2133        }
2134        if let Some(flush) = left.flush() {
2135            left_out.push(flush);
2136        }
2137        let left_trace = left.trace().to_vec();
2138
2139        let mut right = VirtualTcp::new(42, faults).unwrap();
2140        let mut right_out = Vec::new();
2141        for payload in &payloads {
2142            right_out.extend(right.transmit(payload));
2143        }
2144        if let Some(flush) = right.flush() {
2145            right_out.push(flush);
2146        }
2147        let right_trace = right.trace().to_vec();
2148
2149        assert_eq!(left_out, right_out);
2150        assert_eq!(left_trace, right_trace);
2151    }
2152
2153    #[test]
2154    fn test_virtual_tcp_fault_injection() {
2155        let mut vtcp = VirtualTcp::new(
2156            7,
2157            VirtualTcpFaultProfile {
2158                drop_per_million: 0,
2159                reorder_per_million: 1_000_000,
2160                corrupt_per_million: 1_000_000,
2161            },
2162        )
2163        .unwrap();
2164
2165        let out_first = vtcp.transmit(b"packet-a");
2166        assert!(out_first.is_empty(), "first packet must be buffered");
2167
2168        let out_second = vtcp.transmit(b"packet-b");
2169        assert_eq!(out_second.len(), 2, "second transmit flushes reorder queue");
2170        assert_ne!(
2171            out_second[0],
2172            b"packet-b".to_vec(),
2173            "corruption must alter delivered payload"
2174        );
2175
2176        let has_buffer = vtcp
2177            .trace()
2178            .iter()
2179            .any(|event| event.kind == VirtualTcpTraceKind::BufferedForReorder);
2180        let has_corrupt_delivery = vtcp
2181            .trace()
2182            .iter()
2183            .any(|event| event.kind == VirtualTcpTraceKind::DeliveredCorrupt);
2184        let has_flush = vtcp
2185            .trace()
2186            .iter()
2187            .any(|event| event.kind == VirtualTcpTraceKind::FlushedReordered);
2188
2189        assert!(has_buffer);
2190        assert!(has_corrupt_delivery);
2191        assert!(has_flush);
2192    }
2193}