1use std::fmt;
12
13use fsqlite_error::{FrankenError, Result};
14use tracing::{debug, error, info, warn};
15
16use crate::source_block_partition::K_MAX;
17
18const BEAD_ID: &str = "bd-1hi.13";
19
20pub const CHANGESET_MAGIC: [u8; 4] = *b"FSRP";
26
27pub const CHANGESET_VERSION: u16 = 1;
29
30pub const CHANGESET_DOMAIN: &str = "fsqlite:replication:changeset:v1";
32
33pub const REPLICATION_HEADER_SIZE: usize = 72;
35
36pub const REPLICATION_HEADER_SIZE_LEGACY: usize = 24;
38
39pub const REPLICATION_PROTOCOL_MAGIC: [u8; 4] = *b"FSRP";
41
42pub const REPLICATION_PROTOCOL_VERSION_V2: u8 = 2;
44
45pub const REPLICATION_HEADER_SIZE_V2: usize = REPLICATION_HEADER_SIZE;
47pub const REPLICATION_HEADER_SIZE_V2_U16: u16 = 72;
49
50pub const REPLICATION_FLAG_AUTH_PRESENT: u8 = 0b0000_0001;
52
53pub const REPLICATION_PACKET_AUTH_DOMAIN: &str = "fsqlite:replication:packet-auth:v1";
55
56pub const MAX_UDP_PAYLOAD: usize = 65_507;
58
59pub const MAX_REPLICATION_SYMBOL_SIZE: usize = MAX_UDP_PAYLOAD - REPLICATION_HEADER_SIZE;
61
62pub const MTU_SAFE_SYMBOL_SIZE: u16 = 1400;
65
66pub const DEFAULT_MAX_ISI_MULTIPLIER: u32 = 2;
68
69pub const DEFAULT_RPC_MESSAGE_CAP_BYTES: usize = 4 * 1024 * 1024;
71
72pub const DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS: u32 = 256;
74
75pub const DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE: usize = 65_536;
77
78pub const DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS: u64 = 5_000;
80
81pub const DEFAULT_HTTP2_HEADER_FRAGMENT_CAP: usize = 262_144;
83
84pub const DEFAULT_HANDSHAKE_TIMEOUT_MS: u64 = 500;
86
87pub const CHANGESET_HEADER_SIZE: usize = 4 + 2 + 4 + 4 + 8; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum TransportSecurityMode {
97 RustlsTls,
99 Plaintext,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct Http2HardLimits {
106 pub max_concurrent_streams: u32,
107 pub max_header_list_size: usize,
108 pub continuation_timeout_ms: u64,
109 pub header_fragment_cap: usize,
110}
111
112impl Default for Http2HardLimits {
113 fn default() -> Self {
114 Self {
115 max_concurrent_streams: DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS,
116 max_header_list_size: DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE,
117 continuation_timeout_ms: DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS,
118 header_fragment_cap: DEFAULT_HTTP2_HEADER_FRAGMENT_CAP,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub struct NetworkStackConfig {
126 pub security: TransportSecurityMode,
127 pub explicit_plaintext_opt_in: bool,
128 pub handshake_timeout_ms: u64,
129 pub message_size_cap_bytes: usize,
130 pub http2: Http2HardLimits,
131}
132
133impl Default for NetworkStackConfig {
134 fn default() -> Self {
135 Self {
136 security: TransportSecurityMode::RustlsTls,
137 explicit_plaintext_opt_in: false,
138 handshake_timeout_ms: DEFAULT_HANDSHAKE_TIMEOUT_MS,
139 message_size_cap_bytes: DEFAULT_RPC_MESSAGE_CAP_BYTES,
140 http2: Http2HardLimits::default(),
141 }
142 }
143}
144
145impl NetworkStackConfig {
146 pub fn plaintext_local_dev(explicit_opt_in: bool) -> Result<Self> {
153 if !explicit_opt_in {
154 return Err(FrankenError::Unsupported);
155 }
156 Ok(Self {
157 security: TransportSecurityMode::Plaintext,
158 explicit_plaintext_opt_in: true,
159 ..Self::default()
160 })
161 }
162
163 pub fn validate_security(&self) -> Result<()> {
169 if self.security == TransportSecurityMode::Plaintext && !self.explicit_plaintext_opt_in {
170 return Err(FrankenError::Unsupported);
171 }
172 Ok(())
173 }
174
175 pub fn validate_concurrent_streams(&self, streams: u32) -> Result<()> {
181 if streams > self.http2.max_concurrent_streams {
182 return Err(FrankenError::Busy);
183 }
184 Ok(())
185 }
186
187 pub fn validate_header_list_size(&self, header_bytes: usize) -> Result<()> {
193 if header_bytes > self.http2.max_header_list_size {
194 return Err(FrankenError::TooBig);
195 }
196 Ok(())
197 }
198
199 pub fn validate_continuation_elapsed(&self, elapsed_ms: u64) -> Result<()> {
206 if elapsed_ms > self.http2.continuation_timeout_ms {
207 return Err(FrankenError::BusyRecovery);
208 }
209 Ok(())
210 }
211
212 pub fn validate_handshake_elapsed(&self, elapsed_ms: u64) -> Result<()> {
218 if elapsed_ms > self.handshake_timeout_ms {
219 return Err(FrankenError::BusyRecovery);
220 }
221 Ok(())
222 }
223
224 pub fn validate_message_size(&self, message_bytes: usize) -> Result<()> {
230 if message_bytes > self.message_size_cap_bytes {
231 return Err(FrankenError::TooBig);
232 }
233 Ok(())
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct VirtualTcpFaultProfile {
240 pub drop_per_million: u32,
241 pub reorder_per_million: u32,
242 pub corrupt_per_million: u32,
243}
244
245impl VirtualTcpFaultProfile {
246 pub fn validate(&self) -> Result<()> {
252 const PPM_MAX: u32 = 1_000_000;
253 if self.drop_per_million > PPM_MAX {
254 return Err(FrankenError::OutOfRange {
255 what: "drop_per_million".to_owned(),
256 value: self.drop_per_million.to_string(),
257 });
258 }
259 if self.reorder_per_million > PPM_MAX {
260 return Err(FrankenError::OutOfRange {
261 what: "reorder_per_million".to_owned(),
262 value: self.reorder_per_million.to_string(),
263 });
264 }
265 if self.corrupt_per_million > PPM_MAX {
266 return Err(FrankenError::OutOfRange {
267 what: "corrupt_per_million".to_owned(),
268 value: self.corrupt_per_million.to_string(),
269 });
270 }
271 Ok(())
272 }
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
277pub enum VirtualTcpTraceKind {
278 Dropped,
279 BufferedForReorder,
280 Delivered,
281 DeliveredCorrupt,
282 FlushedReordered,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct VirtualTcpTraceEvent {
288 pub seq: u64,
289 pub kind: VirtualTcpTraceKind,
290 pub payload_hash: u64,
291}
292
293#[derive(Debug, Clone)]
295pub struct VirtualTcp {
296 state: u64,
297 seq: u64,
298 faults: VirtualTcpFaultProfile,
299 pending_reorder: Option<Vec<u8>>,
300 trace: Vec<VirtualTcpTraceEvent>,
301}
302
303impl VirtualTcp {
304 pub fn new(seed: u64, faults: VirtualTcpFaultProfile) -> Result<Self> {
310 faults.validate()?;
311 Ok(Self {
312 state: seed,
313 seq: 0,
314 faults,
315 pending_reorder: None,
316 trace: Vec::new(),
317 })
318 }
319
320 #[must_use]
322 pub fn trace(&self) -> &[VirtualTcpTraceEvent] {
323 &self.trace
324 }
325
326 #[must_use]
330 pub fn transmit(&mut self, payload: &[u8]) -> Vec<Vec<u8>> {
331 self.seq = self.seq.saturating_add(1);
332
333 if self.coin_flip(self.faults.drop_per_million) {
334 self.push_trace(VirtualTcpTraceKind::Dropped, payload);
335 return Vec::new();
336 }
337
338 let mut wire = payload.to_vec();
339 let corrupted = if !wire.is_empty() && self.coin_flip(self.faults.corrupt_per_million) {
340 let idx = (self.next_u32() as usize) % wire.len();
341 wire[idx] ^= 0x01;
342 true
343 } else {
344 false
345 };
346
347 if self.coin_flip(self.faults.reorder_per_million) && self.pending_reorder.is_none() {
348 self.push_trace(VirtualTcpTraceKind::BufferedForReorder, &wire);
349 self.pending_reorder = Some(wire);
350 return Vec::new();
351 }
352
353 let mut out = Vec::with_capacity(2);
354 if let Some(previous) = self.pending_reorder.take() {
355 let kind = if corrupted {
356 VirtualTcpTraceKind::DeliveredCorrupt
357 } else {
358 VirtualTcpTraceKind::Delivered
359 };
360 self.push_trace(kind, &wire);
361 out.push(wire);
362 self.push_trace(VirtualTcpTraceKind::FlushedReordered, &previous);
363 out.push(previous);
364 return out;
365 }
366
367 let kind = if corrupted {
368 VirtualTcpTraceKind::DeliveredCorrupt
369 } else {
370 VirtualTcpTraceKind::Delivered
371 };
372 self.push_trace(kind, &wire);
373 out.push(wire);
374 out
375 }
376
377 pub fn flush(&mut self) -> Option<Vec<u8>> {
379 let pending = self.pending_reorder.take()?;
380 self.seq = self.seq.saturating_add(1);
381 self.push_trace(VirtualTcpTraceKind::FlushedReordered, &pending);
382 Some(pending)
383 }
384
385 fn push_trace(&mut self, kind: VirtualTcpTraceKind, payload: &[u8]) {
386 self.trace.push(VirtualTcpTraceEvent {
387 seq: self.seq,
388 kind,
389 payload_hash: xxhash_rust::xxh3::xxh3_64(payload),
390 });
391 }
392
393 fn coin_flip(&mut self, per_million: u32) -> bool {
394 const PPM_MAX: u32 = 1_000_000;
395 if per_million == 0 {
396 return false;
397 }
398 if per_million >= PPM_MAX {
399 return true;
400 }
401 self.next_u32() % PPM_MAX < per_million
402 }
403
404 fn next_u32(&mut self) -> u32 {
405 self.state = self
407 .state
408 .wrapping_mul(6_364_136_223_846_793_005)
409 .wrapping_add(1);
410 (self.state >> 32) as u32
411 }
412}
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
420pub struct ChangesetHeader {
421 pub magic: [u8; 4],
422 pub version: u16,
423 pub page_size: u32,
424 pub n_pages: u32,
425 pub total_len: u64,
426}
427
428impl ChangesetHeader {
429 #[must_use]
431 pub fn to_bytes(&self) -> [u8; CHANGESET_HEADER_SIZE] {
432 let mut buf = [0_u8; CHANGESET_HEADER_SIZE];
433 buf[0..4].copy_from_slice(&self.magic);
434 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
435 buf[6..10].copy_from_slice(&self.page_size.to_le_bytes());
436 buf[10..14].copy_from_slice(&self.n_pages.to_le_bytes());
437 buf[14..22].copy_from_slice(&self.total_len.to_le_bytes());
438 buf
439 }
440
441 pub fn from_bytes(buf: &[u8; CHANGESET_HEADER_SIZE]) -> Result<Self> {
447 let magic: [u8; 4] = buf[0..4].try_into().expect("4 bytes");
448 if magic != CHANGESET_MAGIC {
449 return Err(FrankenError::DatabaseCorrupt {
450 detail: format!("changeset magic mismatch: expected FSRP, got {magic:?}"),
451 });
452 }
453 let version = u16::from_le_bytes(buf[4..6].try_into().expect("2 bytes"));
454 if version != CHANGESET_VERSION {
455 return Err(FrankenError::DatabaseCorrupt {
456 detail: format!(
457 "changeset version mismatch: expected {CHANGESET_VERSION}, got {version}"
458 ),
459 });
460 }
461 let page_size = u32::from_le_bytes(buf[6..10].try_into().expect("4 bytes"));
462 let n_pages = u32::from_le_bytes(buf[10..14].try_into().expect("4 bytes"));
463 let total_len = u64::from_le_bytes(buf[14..22].try_into().expect("8 bytes"));
464 Ok(Self {
465 magic,
466 version,
467 page_size,
468 n_pages,
469 total_len,
470 })
471 }
472}
473
474#[derive(Debug, Clone, PartialEq, Eq)]
476pub struct PageEntry {
477 pub page_number: u32,
478 pub page_xxh3: u64,
479 pub page_bytes: Vec<u8>,
480}
481
482impl PageEntry {
483 #[must_use]
485 pub fn new(page_number: u32, page_bytes: Vec<u8>) -> Self {
486 let page_xxh3 = xxhash_rust::xxh3::xxh3_64(&page_bytes);
487 Self {
488 page_number,
489 page_xxh3,
490 page_bytes,
491 }
492 }
493
494 #[must_use]
496 pub fn validate_xxh3(&self) -> bool {
497 xxhash_rust::xxh3::xxh3_64(&self.page_bytes) == self.page_xxh3
498 }
499}
500
501#[derive(Clone, Copy, PartialEq, Eq, Hash)]
503pub struct ChangesetId([u8; 16]);
504
505impl ChangesetId {
506 #[must_use]
508 pub const fn as_bytes(&self) -> &[u8; 16] {
509 &self.0
510 }
511
512 #[must_use]
514 pub const fn from_bytes(bytes: [u8; 16]) -> Self {
515 Self(bytes)
516 }
517}
518
519impl fmt::Debug for ChangesetId {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 write!(f, "ChangesetId(")?;
522 for byte in &self.0 {
523 write!(f, "{byte:02x}")?;
524 }
525 write!(f, ")")
526 }
527}
528
529#[must_use]
531pub fn compute_changeset_id(changeset_bytes: &[u8]) -> ChangesetId {
532 let mut hasher = blake3::Hasher::new();
533 hasher.update(CHANGESET_DOMAIN.as_bytes());
534 hasher.update(changeset_bytes);
535 let hash = hasher.finalize();
536 let mut id = [0_u8; 16];
537 id.copy_from_slice(&hash.as_bytes()[..16]);
538 ChangesetId(id)
539}
540
541#[must_use]
543pub fn derive_seed_from_changeset_id(id: &ChangesetId) -> u64 {
544 xxhash_rust::xxh3::xxh3_64(id.as_bytes())
545}
546
547pub fn compute_k_source(total_bytes: usize, symbol_size: u16) -> Result<u64> {
555 if symbol_size == 0 {
556 return Err(FrankenError::OutOfRange {
557 what: "symbol_size".to_owned(),
558 value: "0".to_owned(),
559 });
560 }
561 let f = u64::try_from(total_bytes).map_err(|_| FrankenError::OutOfRange {
562 what: "total_bytes".to_owned(),
563 value: total_bytes.to_string(),
564 })?;
565 let t = u64::from(symbol_size);
566 Ok(f.div_ceil(t))
567}
568
569fn canonicalize_changeset_pages(pages: &mut [PageEntry]) {
575 pages.sort_by(|lhs, rhs| {
576 lhs.page_number
577 .cmp(&rhs.page_number)
578 .then_with(|| lhs.page_xxh3.cmp(&rhs.page_xxh3))
579 .then_with(|| lhs.page_bytes.cmp(&rhs.page_bytes))
580 });
581}
582
583pub fn encode_changeset(page_size: u32, pages: &mut [PageEntry]) -> Result<Vec<u8>> {
596 if pages.is_empty() {
597 return Err(FrankenError::OutOfRange {
598 what: "pages".to_owned(),
599 value: "0".to_owned(),
600 });
601 }
602 if page_size == 0 {
603 return Err(FrankenError::OutOfRange {
604 what: "page_size".to_owned(),
605 value: "0".to_owned(),
606 });
607 }
608
609 canonicalize_changeset_pages(pages);
610
611 let n_pages = u32::try_from(pages.len()).map_err(|_| FrankenError::OutOfRange {
612 what: "n_pages".to_owned(),
613 value: pages.len().to_string(),
614 })?;
615
616 let entry_size = 4_u64 + 8 + u64::from(page_size);
618 let total_len = CHANGESET_HEADER_SIZE as u64 + entry_size * u64::from(n_pages);
619
620 let header = ChangesetHeader {
621 magic: CHANGESET_MAGIC,
622 version: CHANGESET_VERSION,
623 page_size,
624 n_pages,
625 total_len,
626 };
627
628 let buf_cap = usize::try_from(total_len).map_err(|_| FrankenError::OutOfRange {
629 what: "changeset total_len".to_owned(),
630 value: total_len.to_string(),
631 })?;
632 let mut buf = Vec::with_capacity(buf_cap);
633 buf.extend_from_slice(&header.to_bytes());
634
635 for page in pages.iter() {
636 buf.extend_from_slice(&page.page_number.to_le_bytes());
637 buf.extend_from_slice(&page.page_xxh3.to_le_bytes());
638 buf.extend_from_slice(&page.page_bytes);
639 }
640
641 debug!(
642 bead_id = BEAD_ID,
643 n_pages, page_size, total_len, "encoded changeset"
644 );
645
646 debug_assert_eq!(buf.len() as u64, total_len);
647 Ok(buf)
648}
649
650#[derive(Debug, Clone)]
656pub struct ChangesetShard {
657 pub changeset_bytes: Vec<u8>,
659 pub changeset_id: ChangesetId,
661 pub seed: u64,
663 pub k_source: u32,
665}
666
667pub fn shard_changeset(changeset_bytes: Vec<u8>, symbol_size: u16) -> Result<Vec<ChangesetShard>> {
680 let t = u64::from(symbol_size);
681 let f = u64::try_from(changeset_bytes.len()).map_err(|_| FrankenError::OutOfRange {
682 what: "changeset_bytes".to_owned(),
683 value: changeset_bytes.len().to_string(),
684 })?;
685 let k_source_total = compute_k_source(changeset_bytes.len(), symbol_size)?;
686
687 if k_source_total <= u64::from(K_MAX) {
688 let id = compute_changeset_id(&changeset_bytes);
689 let seed = derive_seed_from_changeset_id(&id);
690 let k_source = u32::try_from(k_source_total).expect("checked <= K_MAX");
691 info!(
692 bead_id = BEAD_ID,
693 k_source,
694 symbol_size,
695 changeset_len = changeset_bytes.len(),
696 "single-shard changeset"
697 );
698 return Ok(vec![ChangesetShard {
699 changeset_bytes,
700 changeset_id: id,
701 seed,
702 k_source,
703 }]);
704 }
705
706 let max_chunk = u64::from(K_MAX) * t;
709 let n_shards = f.div_ceil(max_chunk);
710
711 info!(
712 bead_id = BEAD_ID,
713 n_shards,
714 k_source_total,
715 symbol_size,
716 changeset_len = changeset_bytes.len(),
717 "sharding large changeset"
718 );
719
720 let n_shards_usize = usize::try_from(n_shards).map_err(|_| FrankenError::OutOfRange {
721 what: "n_shards".to_owned(),
722 value: n_shards.to_string(),
723 })?;
724 let mut shards = Vec::with_capacity(n_shards_usize);
725 let max_chunk_usize = usize::try_from(max_chunk).map_err(|_| FrankenError::OutOfRange {
726 what: "max_chunk".to_owned(),
727 value: max_chunk.to_string(),
728 })?;
729
730 for (i, chunk) in changeset_bytes.chunks(max_chunk_usize).enumerate() {
731 let shard_bytes = chunk.to_vec();
732 let id = compute_changeset_id(&shard_bytes);
733 let seed = derive_seed_from_changeset_id(&id);
734 let k = compute_k_source(chunk.len(), symbol_size)?;
735 let k_source = u32::try_from(k).expect("each shard <= K_MAX symbols");
736
737 debug!(
738 bead_id = BEAD_ID,
739 shard_index = i,
740 k_source,
741 shard_len = chunk.len(),
742 "created changeset shard"
743 );
744
745 shards.push(ChangesetShard {
746 changeset_bytes: shard_bytes,
747 changeset_id: id,
748 seed,
749 k_source,
750 });
751 }
752
753 Ok(shards)
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
762pub struct ReplicationPacket {
763 pub wire_version: ReplicationWireVersion,
765 pub changeset_id: ChangesetId,
767 pub sbn: u8,
769 pub esi: u32,
771 pub k_source: u32,
773 pub r_repair: u32,
775 pub symbol_size_t: u16,
777 pub seed: u64,
779 pub payload_xxh3: u64,
781 pub auth_tag: Option<[u8; 16]>,
783 pub symbol_data: Vec<u8>,
785}
786
787#[derive(Debug, Clone, Copy, PartialEq, Eq)]
789pub enum ReplicationWireVersion {
790 LegacyV1,
792 FramedV2,
794}
795
796#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub struct ReplicationPacketV2Header {
799 pub changeset_id: ChangesetId,
800 pub sbn: u8,
801 pub esi: u32,
802 pub k_source: u32,
803 pub r_repair: u32,
804 pub symbol_size_t: u16,
805 pub seed: u64,
806}
807
808impl ReplicationPacket {
809 #[must_use]
811 pub fn new_v2(header: ReplicationPacketV2Header, symbol_data: Vec<u8>) -> Self {
812 let payload_xxh3 = Self::compute_payload_xxh3(&symbol_data);
813 Self {
814 wire_version: ReplicationWireVersion::FramedV2,
815 changeset_id: header.changeset_id,
816 sbn: header.sbn,
817 esi: header.esi,
818 k_source: header.k_source,
819 r_repair: header.r_repair,
820 symbol_size_t: header.symbol_size_t,
821 seed: header.seed,
822 payload_xxh3,
823 auth_tag: None,
824 symbol_data,
825 }
826 }
827
828 #[must_use]
830 pub fn compute_payload_xxh3(symbol_data: &[u8]) -> u64 {
831 xxhash_rust::xxh3::xxh3_64(symbol_data)
832 }
833
834 fn auth_material(&self) -> Vec<u8> {
835 let mut material = Vec::with_capacity(16 + 1 + 4 + 4 + 2 + 8 + 8);
836 material.extend_from_slice(self.changeset_id.as_bytes());
837 material.push(self.sbn);
838 material.extend_from_slice(&self.esi.to_be_bytes());
839 material.extend_from_slice(&self.k_source.to_be_bytes());
840 material.extend_from_slice(&self.r_repair.to_be_bytes());
841 material.extend_from_slice(&self.symbol_size_t.to_be_bytes());
842 material.extend_from_slice(&self.seed.to_be_bytes());
843 material.extend_from_slice(&self.payload_xxh3.to_be_bytes());
844 material
845 }
846
847 fn compute_auth_tag(&self, auth_key: &[u8; 32]) -> [u8; 16] {
848 let mut hasher = blake3::Hasher::new_keyed(auth_key);
849 hasher.update(REPLICATION_PACKET_AUTH_DOMAIN.as_bytes());
850 hasher.update(&self.auth_material());
851 let digest = hasher.finalize();
852 let mut out = [0_u8; 16];
853 out.copy_from_slice(&digest.as_bytes()[..16]);
854 out
855 }
856
857 pub fn attach_auth_tag(&mut self, auth_key: &[u8; 32]) {
859 self.auth_tag = Some(self.compute_auth_tag(auth_key));
860 }
861
862 #[must_use]
864 pub fn verify_integrity(&self, auth_key: Option<&[u8; 32]>) -> bool {
865 if Self::compute_payload_xxh3(&self.symbol_data) != self.payload_xxh3 {
866 return false;
867 }
868 match (self.auth_tag, auth_key) {
869 (Some(tag), Some(key)) => tag == self.compute_auth_tag(key),
870 (Some(_), None) => false,
871 (None, _) => true,
872 }
873 }
874
875 pub fn validate_symbol_size(symbol_size: usize) -> Result<()> {
881 if symbol_size > MAX_REPLICATION_SYMBOL_SIZE {
882 error!(
883 bead_id = BEAD_ID,
884 symbol_size,
885 max = MAX_REPLICATION_SYMBOL_SIZE,
886 "symbol size exceeds UDP hard wire limit"
887 );
888 return Err(FrankenError::OutOfRange {
889 what: "symbol_size".to_owned(),
890 value: symbol_size.to_string(),
891 });
892 }
893 Ok(())
894 }
895
896 pub fn to_bytes(&self) -> Result<Vec<u8>> {
902 if self.esi > 0x00FF_FFFF {
903 return Err(FrankenError::OutOfRange {
904 what: "esi".to_owned(),
905 value: self.esi.to_string(),
906 });
907 }
908 if usize::from(self.symbol_size_t) != self.symbol_data.len() {
909 return Err(FrankenError::DatabaseCorrupt {
910 detail: format!(
911 "symbol_size_t mismatch: header={}, payload={}",
912 self.symbol_size_t,
913 self.symbol_data.len()
914 ),
915 });
916 }
917 Self::validate_symbol_size(self.symbol_data.len())?;
918
919 match self.wire_version {
920 ReplicationWireVersion::LegacyV1 => {
921 let total = REPLICATION_HEADER_SIZE_LEGACY + self.symbol_data.len();
922 let mut buf = Vec::with_capacity(total);
923 buf.extend_from_slice(self.changeset_id.as_bytes());
924 buf.push(self.sbn);
925 let esi_bytes = self.esi.to_be_bytes();
926 buf.extend_from_slice(&esi_bytes[1..4]);
927 buf.extend_from_slice(&self.k_source.to_be_bytes());
928 buf.extend_from_slice(&self.symbol_data);
929 Ok(buf)
930 }
931 ReplicationWireVersion::FramedV2 => {
932 let total = REPLICATION_HEADER_SIZE + self.symbol_data.len();
933 let mut buf = Vec::with_capacity(total);
934 let mut flags = 0_u8;
935 if self.auth_tag.is_some() {
936 flags |= REPLICATION_FLAG_AUTH_PRESENT;
937 }
938 buf.extend_from_slice(&REPLICATION_PROTOCOL_MAGIC);
939 buf.push(REPLICATION_PROTOCOL_VERSION_V2);
940 buf.push(flags);
941 buf.extend_from_slice(&REPLICATION_HEADER_SIZE_V2_U16.to_be_bytes());
942 buf.extend_from_slice(self.changeset_id.as_bytes());
943 buf.push(self.sbn);
944 let esi_bytes = self.esi.to_be_bytes();
945 buf.extend_from_slice(&esi_bytes[1..4]);
946 buf.extend_from_slice(&self.k_source.to_be_bytes());
947 buf.extend_from_slice(&self.r_repair.to_be_bytes());
948 buf.extend_from_slice(&self.symbol_size_t.to_be_bytes());
949 buf.extend_from_slice(&0_u16.to_be_bytes()); buf.extend_from_slice(&self.seed.to_be_bytes());
951 buf.extend_from_slice(&self.payload_xxh3.to_be_bytes());
952 if let Some(tag) = self.auth_tag {
953 buf.extend_from_slice(&tag);
954 } else {
955 buf.extend_from_slice(&[0_u8; 16]);
956 }
957 buf.extend_from_slice(&self.symbol_data);
958 Ok(buf)
959 }
960 }
961 }
962
963 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
969 if buf.len() < REPLICATION_HEADER_SIZE_LEGACY {
970 return Err(FrankenError::DatabaseCorrupt {
971 detail: format!(
972 "replication packet too short: {} < {REPLICATION_HEADER_SIZE_LEGACY}",
973 buf.len()
974 ),
975 });
976 }
977 let is_v2 = buf.len() >= REPLICATION_HEADER_SIZE
978 && buf[0..4] == REPLICATION_PROTOCOL_MAGIC
979 && buf[4] == REPLICATION_PROTOCOL_VERSION_V2;
980 if is_v2 {
981 let flags = buf[5];
982 let header_len = usize::from(u16::from_be_bytes([buf[6], buf[7]]));
983 if header_len != REPLICATION_HEADER_SIZE {
984 return Err(FrankenError::DatabaseCorrupt {
985 detail: format!(
986 "unsupported replication header length: expected {}, got {header_len}",
987 REPLICATION_HEADER_SIZE
988 ),
989 });
990 }
991 if buf.len() < header_len {
992 return Err(FrankenError::DatabaseCorrupt {
993 detail: format!("packet shorter than declared header length: {header_len}"),
994 });
995 }
996 let mut id_bytes = [0_u8; 16];
997 id_bytes.copy_from_slice(&buf[8..24]);
998 let changeset_id = ChangesetId::from_bytes(id_bytes);
999 let sbn = buf[24];
1000 let esi = u32::from(buf[25]) << 16 | u32::from(buf[26]) << 8 | u32::from(buf[27]);
1001 let k_source = u32::from_be_bytes(buf[28..32].try_into().expect("4 bytes"));
1002 let r_repair = u32::from_be_bytes(buf[32..36].try_into().expect("4 bytes"));
1003 let symbol_size_t = u16::from_be_bytes(buf[36..38].try_into().expect("2 bytes"));
1004 let seed = u64::from_be_bytes(buf[40..48].try_into().expect("8 bytes"));
1005 let payload_xxh3 = u64::from_be_bytes(buf[48..56].try_into().expect("8 bytes"));
1006 let mut auth_tag_bytes = [0_u8; 16];
1007 auth_tag_bytes.copy_from_slice(&buf[56..72]);
1008 let auth_tag = if (flags & REPLICATION_FLAG_AUTH_PRESENT) != 0 {
1009 Some(auth_tag_bytes)
1010 } else {
1011 None
1012 };
1013 let symbol_data = buf[header_len..].to_vec();
1014 if symbol_data.len() != usize::from(symbol_size_t) {
1015 return Err(FrankenError::DatabaseCorrupt {
1016 detail: format!(
1017 "symbol_size_t mismatch in packet: header={symbol_size_t}, payload={}",
1018 symbol_data.len()
1019 ),
1020 });
1021 }
1022 return Ok(Self {
1023 wire_version: ReplicationWireVersion::FramedV2,
1024 changeset_id,
1025 sbn,
1026 esi,
1027 k_source,
1028 r_repair,
1029 symbol_size_t,
1030 seed,
1031 payload_xxh3,
1032 auth_tag,
1033 symbol_data,
1034 });
1035 }
1036
1037 let mut id_bytes = [0_u8; 16];
1038 id_bytes.copy_from_slice(&buf[0..16]);
1039 let changeset_id = ChangesetId::from_bytes(id_bytes);
1040 let sbn = buf[16];
1041 let esi = u32::from(buf[17]) << 16 | u32::from(buf[18]) << 8 | u32::from(buf[19]);
1042 let k_source = u32::from_be_bytes(buf[20..24].try_into().expect("4 bytes"));
1043 let symbol_data = buf[24..].to_vec();
1044 let symbol_size_t =
1045 u16::try_from(symbol_data.len()).map_err(|_| FrankenError::OutOfRange {
1046 what: "symbol_size_t".to_owned(),
1047 value: symbol_data.len().to_string(),
1048 })?;
1049
1050 Ok(Self {
1051 wire_version: ReplicationWireVersion::LegacyV1,
1052 changeset_id,
1053 sbn,
1054 esi,
1055 k_source,
1056 r_repair: 0,
1057 symbol_size_t,
1058 seed: derive_seed_from_changeset_id(&changeset_id),
1059 payload_xxh3: Self::compute_payload_xxh3(&symbol_data),
1060 auth_tag: None,
1061 symbol_data,
1062 })
1063 }
1064
1065 #[must_use]
1067 pub fn wire_size(&self) -> usize {
1068 let header_size = match self.wire_version {
1069 ReplicationWireVersion::LegacyV1 => REPLICATION_HEADER_SIZE_LEGACY,
1070 ReplicationWireVersion::FramedV2 => REPLICATION_HEADER_SIZE,
1071 };
1072 header_size + self.symbol_data.len()
1073 }
1074
1075 #[must_use]
1077 pub fn is_source_symbol(&self) -> bool {
1078 self.esi < self.k_source
1079 }
1080}
1081
1082#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1088pub enum SenderState {
1089 Idle,
1091 Encoding,
1093 Streaming,
1095 Complete,
1097}
1098
1099#[derive(Debug, Clone)]
1101pub struct SenderConfig {
1102 pub symbol_size: u16,
1104 pub max_isi_multiplier: u32,
1106}
1107
1108impl Default for SenderConfig {
1109 fn default() -> Self {
1110 Self {
1111 symbol_size: MTU_SAFE_SYMBOL_SIZE,
1112 max_isi_multiplier: DEFAULT_MAX_ISI_MULTIPLIER,
1113 }
1114 }
1115}
1116
1117#[derive(Debug)]
1119pub struct EncodingSession {
1120 pub shards: Vec<ChangesetShard>,
1122 pub current_shard: usize,
1124 pub current_isi: u32,
1126 pub config: SenderConfig,
1128}
1129
1130#[derive(Debug)]
1132pub struct ReplicationSender {
1133 state: SenderState,
1134 session: Option<EncodingSession>,
1135}
1136
1137impl ReplicationSender {
1138 #[must_use]
1140 pub fn new() -> Self {
1141 Self {
1142 state: SenderState::Idle,
1143 session: None,
1144 }
1145 }
1146
1147 #[must_use]
1149 pub const fn state(&self) -> SenderState {
1150 self.state
1151 }
1152
1153 pub fn prepare(
1159 &mut self,
1160 page_size: u32,
1161 pages: &mut [PageEntry],
1162 config: SenderConfig,
1163 ) -> Result<()> {
1164 if self.state != SenderState::Idle {
1165 return Err(FrankenError::Internal(format!(
1166 "sender must be IDLE to prepare, current state: {:?}",
1167 self.state
1168 )));
1169 }
1170
1171 ReplicationPacket::validate_symbol_size(usize::from(config.symbol_size))?;
1172
1173 let changeset_bytes = encode_changeset(page_size, pages)?;
1174 let shards = shard_changeset(changeset_bytes, config.symbol_size)?;
1175
1176 info!(
1177 bead_id = BEAD_ID,
1178 n_shards = shards.len(),
1179 symbol_size = config.symbol_size,
1180 "sender prepared for streaming"
1181 );
1182
1183 self.session = Some(EncodingSession {
1184 shards,
1185 current_shard: 0,
1186 current_isi: 0,
1187 config,
1188 });
1189 self.state = SenderState::Encoding;
1190 Ok(())
1191 }
1192
1193 pub fn start_streaming(&mut self) -> Result<()> {
1199 if self.state != SenderState::Encoding {
1200 return Err(FrankenError::Internal(format!(
1201 "sender must be ENCODING to start streaming, current state: {:?}",
1202 self.state
1203 )));
1204 }
1205 self.state = SenderState::Streaming;
1206 info!(bead_id = BEAD_ID, "sender started streaming");
1207 Ok(())
1208 }
1209
1210 #[allow(clippy::too_many_lines)]
1218 pub fn next_packet(&mut self) -> Result<Option<ReplicationPacket>> {
1219 if self.state != SenderState::Streaming {
1220 return Err(FrankenError::Internal(format!(
1221 "sender must be STREAMING to generate packets, current state: {:?}",
1222 self.state
1223 )));
1224 }
1225
1226 let session = self
1227 .session
1228 .as_mut()
1229 .expect("session exists in STREAMING state");
1230
1231 if session.current_shard >= session.shards.len() {
1232 return Ok(None);
1234 }
1235
1236 let shard = &session.shards[session.current_shard];
1237 let max_isi = shard
1238 .k_source
1239 .saturating_mul(session.config.max_isi_multiplier);
1240
1241 if session.current_isi >= max_isi {
1242 session.current_shard += 1;
1244 session.current_isi = 0;
1245
1246 if session.current_shard >= session.shards.len() {
1247 return Ok(None);
1248 }
1249
1250 let next_shard = &session.shards[session.current_shard];
1251 debug!(
1252 bead_id = BEAD_ID,
1253 shard_index = session.current_shard,
1254 k_source = next_shard.k_source,
1255 "advancing to next shard"
1256 );
1257 }
1258
1259 let shard = &session.shards[session.current_shard];
1260 let isi = session.current_isi;
1261 let t = usize::from(session.config.symbol_size);
1262
1263 let symbol_data = if u64::from(isi) < u64::from(shard.k_source) {
1268 let start = isi as usize * t;
1270 let end = (start + t).min(shard.changeset_bytes.len());
1271 let mut data = vec![0_u8; t];
1272 let available = end.saturating_sub(start);
1273 if available > 0 {
1274 data[..available].copy_from_slice(&shard.changeset_bytes[start..end]);
1275 }
1276 data
1278 } else {
1279 let mut data = vec![0_u8; t];
1282 #[allow(clippy::cast_possible_truncation)]
1283 {
1284 let repair_seed = shard.seed.wrapping_add(u64::from(isi));
1285 for (i, byte) in data.iter_mut().enumerate() {
1286 let mixed = repair_seed
1287 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
1288 .wrapping_add(i as u64);
1289 *byte = (mixed >> 32) as u8;
1290 }
1291 }
1292 warn!(
1293 bead_id = BEAD_ID,
1294 isi,
1295 shard_index = session.current_shard,
1296 "generated placeholder repair symbol (production uses RaptorQ encoder)"
1297 );
1298 data
1299 };
1300
1301 let r_repair = max_isi.saturating_sub(shard.k_source);
1302 let packet = ReplicationPacket::new_v2(
1303 ReplicationPacketV2Header {
1304 changeset_id: shard.changeset_id,
1305 sbn: 0, esi: isi,
1307 k_source: shard.k_source,
1308 r_repair,
1309 symbol_size_t: session.config.symbol_size,
1310 seed: shard.seed,
1311 },
1312 symbol_data,
1313 );
1314
1315 session.current_isi += 1;
1316 Ok(Some(packet))
1317 }
1318
1319 pub fn acknowledge_complete(&mut self) -> Result<()> {
1325 if self.state != SenderState::Streaming {
1326 return Err(FrankenError::Internal(format!(
1327 "sender must be STREAMING to acknowledge, current state: {:?}",
1328 self.state
1329 )));
1330 }
1331 self.state = SenderState::Complete;
1332 info!(bead_id = BEAD_ID, "sender acknowledged completion");
1333 Ok(())
1334 }
1335
1336 pub fn complete(&mut self) {
1340 if self.state == SenderState::Streaming || self.state == SenderState::Encoding {
1341 self.state = SenderState::Complete;
1342 info!(bead_id = BEAD_ID, "sender completed");
1343 }
1344 }
1345
1346 pub fn reset(&mut self) {
1348 self.state = SenderState::Idle;
1349 self.session = None;
1350 debug!(bead_id = BEAD_ID, "sender reset to IDLE");
1351 }
1352}
1353
1354impl Default for ReplicationSender {
1355 fn default() -> Self {
1356 Self::new()
1357 }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use super::*;
1363
1364 const TEST_BEAD_ID: &str = "bd-1hi.13";
1365 const TEST_BEAD_BD_1SQU: &str = "bd-1squ";
1366
1367 #[allow(clippy::cast_possible_truncation)]
1368 fn make_pages(page_size: u32, page_numbers: &[u32]) -> Vec<PageEntry> {
1369 page_numbers
1370 .iter()
1371 .map(|&pn| {
1372 let mut data = vec![0_u8; page_size as usize];
1373 for (i, byte) in data.iter_mut().enumerate() {
1375 *byte = ((pn as usize * 251 + i * 31) % 256) as u8;
1376 }
1377 PageEntry::new(pn, data)
1378 })
1379 .collect()
1380 }
1381
1382 #[test]
1387 fn test_changeset_header_format() {
1388 let header = ChangesetHeader {
1389 magic: CHANGESET_MAGIC,
1390 version: CHANGESET_VERSION,
1391 page_size: 4096,
1392 n_pages: 10,
1393 total_len: 42_000,
1394 };
1395 let bytes = header.to_bytes();
1396 assert_eq!(
1397 &bytes[0..4],
1398 b"FSRP",
1399 "bead_id={TEST_BEAD_ID} case=header_magic"
1400 );
1401 assert_eq!(bytes.len(), CHANGESET_HEADER_SIZE);
1402
1403 let decoded = ChangesetHeader::from_bytes(&bytes).expect("decode should succeed");
1404 assert_eq!(
1405 header, decoded,
1406 "bead_id={TEST_BEAD_ID} case=header_roundtrip"
1407 );
1408 }
1409
1410 #[test]
1411 fn test_changeset_encoding_deterministic() {
1412 let page_size = 512_u32;
1413 let mut pages_a = make_pages(page_size, &[3, 1, 2]);
1414 let mut pages_b = make_pages(page_size, &[2, 3, 1]); let bytes_a = encode_changeset(page_size, &mut pages_a).expect("encode a");
1417 let bytes_b = encode_changeset(page_size, &mut pages_b).expect("encode b");
1418
1419 assert_eq!(
1421 bytes_a, bytes_b,
1422 "bead_id={TEST_BEAD_ID} case=deterministic_encoding"
1423 );
1424
1425 let id_a = compute_changeset_id(&bytes_a);
1427 let id_b = compute_changeset_id(&bytes_b);
1428 assert_eq!(
1429 id_a, id_b,
1430 "bead_id={TEST_BEAD_ID} case=deterministic_changeset_id"
1431 );
1432 }
1433
1434 #[test]
1435 fn test_changeset_id_domain_separation() {
1436 let data = b"test payload";
1437
1438 let changeset_id = compute_changeset_id(data);
1440
1441 let mut hasher = blake3::Hasher::new();
1443 hasher.update(b"fsqlite:ecs:v1");
1444 hasher.update(data);
1445 let ecs_hash = hasher.finalize();
1446 let mut ecs_id = [0_u8; 16];
1447 ecs_id.copy_from_slice(&ecs_hash.as_bytes()[..16]);
1448
1449 assert_ne!(
1450 changeset_id.as_bytes(),
1451 &ecs_id,
1452 "bead_id={TEST_BEAD_ID} case=domain_separation"
1453 );
1454 }
1455
1456 #[test]
1457 fn test_seed_derivation() {
1458 let id = ChangesetId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
1459 let seed = derive_seed_from_changeset_id(&id);
1460
1461 let seed2 = derive_seed_from_changeset_id(&id);
1463 assert_eq!(
1464 seed, seed2,
1465 "bead_id={TEST_BEAD_ID} case=seed_deterministic"
1466 );
1467
1468 assert_ne!(seed, 0, "bead_id={TEST_BEAD_ID} case=seed_nonzero");
1470 }
1471
1472 #[test]
1473 fn test_bd_1squ_changeset_id_stability() {
1474 let payload = b"deterministic-changeset-payload";
1475 let id_a = compute_changeset_id(payload);
1476 let id_b = compute_changeset_id(payload);
1477 assert_eq!(
1478 id_a, id_b,
1479 "bead_id={TEST_BEAD_BD_1SQU} case=id_stability_same_payload"
1480 );
1481
1482 let mut altered = payload.to_vec();
1483 altered[0] ^= 0xFF;
1484 let id_c = compute_changeset_id(&altered);
1485 assert_ne!(
1486 id_a, id_c,
1487 "bead_id={TEST_BEAD_BD_1SQU} case=id_stability_diff_payload"
1488 );
1489 }
1490
1491 #[test]
1492 fn test_bd_1squ_seed_stability() {
1493 let id = compute_changeset_id(b"seed-stability");
1494 let seed_a = derive_seed_from_changeset_id(&id);
1495 let seed_b = derive_seed_from_changeset_id(&id);
1496 assert_eq!(
1497 seed_a, seed_b,
1498 "bead_id={TEST_BEAD_BD_1SQU} case=seed_stability_same_id"
1499 );
1500
1501 let other = compute_changeset_id(b"seed-stability-other");
1502 let seed_other = derive_seed_from_changeset_id(&other);
1503 assert_ne!(
1504 seed_a, seed_other,
1505 "bead_id={TEST_BEAD_BD_1SQU} case=seed_stability_diff_id"
1506 );
1507 }
1508
1509 #[test]
1510 fn test_bd_1squ_k_source_computation() {
1511 assert_eq!(
1512 compute_k_source(0, 256).expect("k_source"),
1513 0,
1514 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_empty"
1515 );
1516 assert_eq!(
1517 compute_k_source(1, 256).expect("k_source"),
1518 1,
1519 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_single_byte"
1520 );
1521 assert_eq!(
1522 compute_k_source(256, 256).expect("k_source"),
1523 1,
1524 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_exact_division"
1525 );
1526 assert_eq!(
1527 compute_k_source(257, 256).expect("k_source"),
1528 2,
1529 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_round_up"
1530 );
1531 assert_eq!(
1532 compute_k_source(usize::try_from(K_MAX).unwrap() * 64, 64).expect("k_source"),
1533 u64::from(K_MAX),
1534 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_kmax_boundary"
1535 );
1536 assert_eq!(
1537 compute_k_source(usize::try_from(K_MAX).unwrap() * 64 + 1, 64).expect("k_source"),
1538 u64::from(K_MAX) + 1,
1539 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_kmax_plus_one"
1540 );
1541 assert!(
1542 compute_k_source(10, 0).is_err(),
1543 "bead_id={TEST_BEAD_BD_1SQU} case=k_source_zero_symbol_rejected"
1544 );
1545 }
1546
1547 #[test]
1548 fn test_bd_1squ_sharding_threshold_rule() {
1549 let symbol_size = 64_u16;
1550 let max_payload = usize::try_from(u64::from(K_MAX) * u64::from(symbol_size)).unwrap();
1551
1552 let exact = vec![0xA5_u8; max_payload];
1553 let exact_shards = shard_changeset(exact, symbol_size).expect("exact shard");
1554 assert_eq!(
1555 exact_shards.len(),
1556 1,
1557 "bead_id={TEST_BEAD_BD_1SQU} case=exact_threshold_single_shard"
1558 );
1559 assert_eq!(
1560 exact_shards[0].k_source, K_MAX,
1561 "bead_id={TEST_BEAD_BD_1SQU} case=exact_threshold_kmax"
1562 );
1563
1564 let over = vec![0x5A_u8; max_payload + 1];
1565 let over_shards = shard_changeset(over, symbol_size).expect("over shard");
1566 assert_eq!(
1567 over_shards.len(),
1568 2,
1569 "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_two_shards"
1570 );
1571 assert_eq!(
1572 over_shards[0].k_source, K_MAX,
1573 "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_first_kmax"
1574 );
1575 assert_eq!(
1576 over_shards[1].k_source, 1,
1577 "bead_id={TEST_BEAD_BD_1SQU} case=over_threshold_second_one_symbol"
1578 );
1579 }
1580
1581 #[test]
1582 fn test_page_entries_sorted() {
1583 let page_size = 128_u32;
1584 let mut pages = make_pages(page_size, &[5, 1, 3, 2, 4]);
1585 let bytes = encode_changeset(page_size, &mut pages).expect("encode");
1586
1587 assert_eq!(pages[0].page_number, 1);
1589 assert_eq!(pages[1].page_number, 2);
1590 assert_eq!(pages[2].page_number, 3);
1591 assert_eq!(pages[3].page_number, 4);
1592 assert_eq!(pages[4].page_number, 5);
1593
1594 let header_bytes: [u8; CHANGESET_HEADER_SIZE] =
1596 bytes[..CHANGESET_HEADER_SIZE].try_into().unwrap();
1597 let header = ChangesetHeader::from_bytes(&header_bytes).expect("decode header");
1598 assert_eq!(
1599 header.total_len,
1600 bytes.len() as u64,
1601 "bead_id={TEST_BEAD_ID} case=total_len_matches"
1602 );
1603 assert_eq!(header.n_pages, 5);
1604 }
1605
1606 #[test]
1607 fn test_page_xxh3_validation() {
1608 let page = PageEntry::new(1, vec![0xAA; 4096]);
1609 assert!(
1610 page.validate_xxh3(),
1611 "bead_id={TEST_BEAD_ID} case=xxh3_valid"
1612 );
1613
1614 let mut tampered = page;
1616 tampered.page_bytes[0] ^= 0xFF;
1617 assert!(
1618 !tampered.validate_xxh3(),
1619 "bead_id={TEST_BEAD_ID} case=xxh3_tampered"
1620 );
1621 }
1622
1623 #[test]
1628 fn test_udp_packet_format() {
1629 let id = ChangesetId::from_bytes([0xAA; 16]);
1630 let packet = ReplicationPacket::new_v2(
1631 ReplicationPacketV2Header {
1632 changeset_id: id,
1633 sbn: 0,
1634 esi: 42,
1635 k_source: 100,
1636 r_repair: 12,
1637 symbol_size_t: 512,
1638 seed: derive_seed_from_changeset_id(&id),
1639 },
1640 vec![0x55; 512],
1641 );
1642
1643 let wire = packet.to_bytes().expect("encode");
1644 assert_eq!(
1645 wire.len(),
1646 REPLICATION_HEADER_SIZE + 512,
1647 "bead_id={TEST_BEAD_ID} case=packet_size"
1648 );
1649
1650 assert_eq!(&wire[0..4], &REPLICATION_PROTOCOL_MAGIC);
1652 assert_eq!(wire[4], REPLICATION_PROTOCOL_VERSION_V2);
1653 assert_eq!(wire[5], 0, "flags");
1654 assert_eq!(&wire[8..24], &[0xAA; 16], "changeset_id");
1655 assert_eq!(wire[24], 0, "sbn");
1656 assert_eq!(&wire[25..28], &[0, 0, 42], "esi u24 big-endian");
1657 assert_eq!(&wire[28..32], &100_u32.to_be_bytes(), "k_source");
1658 assert_eq!(&wire[32..36], &12_u32.to_be_bytes(), "r_repair");
1659 assert_eq!(&wire[36..38], &512_u16.to_be_bytes(), "symbol_size_t");
1660
1661 let decoded = ReplicationPacket::from_bytes(&wire).expect("decode");
1663 assert_eq!(
1664 packet, decoded,
1665 "bead_id={TEST_BEAD_ID} case=packet_roundtrip"
1666 );
1667 }
1668
1669 #[test]
1670 fn test_udp_packet_mtu_safe() {
1671 let t = usize::from(MTU_SAFE_SYMBOL_SIZE);
1673 let total = REPLICATION_HEADER_SIZE + t;
1674 assert_eq!(
1675 total, 1472,
1676 "bead_id={TEST_BEAD_ID} case=mtu_safe_packet_size"
1677 );
1678 assert_eq!(total + 20 + 8, 1500, "fits in Ethernet MTU");
1680 }
1681
1682 #[test]
1683 fn test_hard_wire_limit() {
1684 let oversized = MAX_REPLICATION_SYMBOL_SIZE + 1;
1686 let result = ReplicationPacket::validate_symbol_size(oversized);
1687 assert!(
1688 result.is_err(),
1689 "bead_id={TEST_BEAD_ID} case=hard_wire_limit_rejected"
1690 );
1691
1692 let at_limit = MAX_REPLICATION_SYMBOL_SIZE;
1694 let result = ReplicationPacket::validate_symbol_size(at_limit);
1695 assert!(
1696 result.is_ok(),
1697 "bead_id={TEST_BEAD_ID} case=hard_wire_limit_at_max"
1698 );
1699 }
1700
1701 #[test]
1706 fn test_sender_idle_to_encoding() {
1707 let mut sender = ReplicationSender::new();
1708 assert_eq!(sender.state(), SenderState::Idle);
1709
1710 let mut pages = make_pages(512, &[1, 2, 3]);
1711 sender
1712 .prepare(512, &mut pages, SenderConfig::default())
1713 .expect("prepare");
1714 assert_eq!(
1715 sender.state(),
1716 SenderState::Encoding,
1717 "bead_id={TEST_BEAD_ID} case=idle_to_encoding"
1718 );
1719 }
1720
1721 #[test]
1722 fn test_streaming_source_then_repair() {
1723 let mut sender = ReplicationSender::new();
1724 let mut pages = make_pages(512, &[1, 2]);
1725 let config = SenderConfig {
1726 symbol_size: 512,
1727 max_isi_multiplier: 2,
1728 };
1729 sender.prepare(512, &mut pages, config).expect("prepare");
1730 sender.start_streaming().expect("start");
1731
1732 let session = sender.session.as_ref().unwrap();
1733 let k_source = session.shards[0].k_source;
1734
1735 let mut source_count = 0_u32;
1736 let mut repair_count = 0_u32;
1737 let mut last_isi = 0_u32;
1738
1739 while let Some(packet) = sender.next_packet().expect("next") {
1740 if packet.is_source_symbol() {
1741 source_count += 1;
1742 } else {
1743 repair_count += 1;
1744 }
1745 last_isi = packet.esi;
1746 }
1747
1748 assert!(
1749 source_count > 0,
1750 "bead_id={TEST_BEAD_ID} case=has_source_symbols"
1751 );
1752 assert!(
1753 repair_count > 0,
1754 "bead_id={TEST_BEAD_ID} case=has_repair_symbols"
1755 );
1756 assert_eq!(
1757 source_count, k_source,
1758 "bead_id={TEST_BEAD_ID} case=source_count_matches_k"
1759 );
1760 assert_eq!(
1761 last_isi,
1762 k_source * 2 - 1,
1763 "bead_id={TEST_BEAD_ID} case=max_isi_reached"
1764 );
1765 }
1766
1767 #[test]
1768 fn test_streaming_systematic_first_ordering() {
1769 let mut sender = ReplicationSender::new();
1770 let mut pages = make_pages(512, &[1, 2]);
1771 let config = SenderConfig {
1772 symbol_size: 512,
1773 max_isi_multiplier: 2,
1774 };
1775 sender.prepare(512, &mut pages, config).expect("prepare");
1776 sender.start_streaming().expect("start");
1777
1778 let session = sender.session.as_ref().expect("session");
1779 let k_source = session.shards[0].k_source;
1780 let k_source_usize = usize::try_from(k_source).expect("K_source fits usize");
1781
1782 let mut observed_esis = Vec::new();
1783 while let Some(packet) = sender.next_packet().expect("next") {
1784 observed_esis.push(packet.esi);
1785 }
1786
1787 assert!(
1788 observed_esis.len() >= k_source_usize,
1789 "bead_id={TEST_BEAD_ID} case=have_at_least_k_source_packets"
1790 );
1791
1792 let expected_systematic: Vec<u32> = (0..k_source).collect();
1793 assert_eq!(
1794 &observed_esis[..k_source_usize],
1795 expected_systematic.as_slice(),
1796 "bead_id={TEST_BEAD_ID} case=systematic_first_ordering"
1797 );
1798
1799 if observed_esis.len() > k_source_usize {
1800 assert!(
1801 observed_esis[k_source_usize] >= k_source,
1802 "bead_id={TEST_BEAD_ID} case=repair_starts_after_systematic"
1803 );
1804 }
1805 }
1806
1807 #[test]
1808 fn test_streaming_schedule_deterministic_across_runs() {
1809 fn collect_packets(
1810 page_size: u32,
1811 page_numbers: &[u32],
1812 config: &SenderConfig,
1813 ) -> Vec<ReplicationPacket> {
1814 let mut sender = ReplicationSender::new();
1815 let mut pages = make_pages(page_size, page_numbers);
1816 sender
1817 .prepare(page_size, &mut pages, config.clone())
1818 .expect("prepare");
1819 sender.start_streaming().expect("start");
1820
1821 let mut packets = Vec::new();
1822 while let Some(packet) = sender.next_packet().expect("next") {
1823 packets.push(packet);
1824 }
1825 packets
1826 }
1827
1828 let config = SenderConfig {
1829 symbol_size: 256,
1830 max_isi_multiplier: 2,
1831 };
1832 let run_a = collect_packets(512, &[1, 3, 2], &config);
1833 let run_b = collect_packets(512, &[1, 3, 2], &config);
1834
1835 assert_eq!(
1836 run_a.len(),
1837 run_b.len(),
1838 "bead_id={TEST_BEAD_ID} case=deterministic_run_packet_count"
1839 );
1840 assert_eq!(
1841 run_a, run_b,
1842 "bead_id={TEST_BEAD_ID} case=deterministic_schedule_reproducible"
1843 );
1844 }
1845
1846 #[test]
1847 fn test_streaming_stop_on_ack() {
1848 let mut sender = ReplicationSender::new();
1849 let mut pages = make_pages(512, &[1]);
1850 sender
1851 .prepare(512, &mut pages, SenderConfig::default())
1852 .expect("prepare");
1853 sender.start_streaming().expect("start");
1854
1855 let _p1 = sender.next_packet().expect("next").expect("packet");
1857
1858 sender.acknowledge_complete().expect("ack");
1860 assert_eq!(
1861 sender.state(),
1862 SenderState::Complete,
1863 "bead_id={TEST_BEAD_ID} case=stop_on_ack"
1864 );
1865 assert!(
1866 sender.next_packet().is_err(),
1867 "bead_id={TEST_BEAD_ID} case=no_packets_after_ack_complete"
1868 );
1869 }
1870
1871 #[test]
1872 fn test_streaming_stop_on_max_isi() {
1873 let mut sender = ReplicationSender::new();
1874 let mut pages = make_pages(128, &[1]);
1875 let config = SenderConfig {
1876 symbol_size: 128,
1877 max_isi_multiplier: 2,
1878 };
1879 sender.prepare(128, &mut pages, config).expect("prepare");
1880 sender.start_streaming().expect("start");
1881
1882 let mut count = 0_u32;
1883 while sender.next_packet().expect("next").is_some() {
1884 count += 1;
1885 }
1886
1887 let session = sender.session.as_ref().unwrap();
1889 let expected = session.shards[0].k_source * 2;
1890 assert_eq!(
1891 count, expected,
1892 "bead_id={TEST_BEAD_ID} case=stop_on_max_isi"
1893 );
1894 }
1895
1896 #[test]
1897 fn test_block_size_limit_sharding() {
1898 let symbol_size = 64_u16;
1900 let bytes_per_max_block = u64::from(K_MAX) * u64::from(symbol_size);
1901 let changeset_bytes = vec![0xAB_u8; usize::try_from(bytes_per_max_block).unwrap() + 1];
1903 let shards = shard_changeset(changeset_bytes.clone(), symbol_size).expect("shard");
1904
1905 assert!(
1906 shards.len() > 1,
1907 "bead_id={TEST_BEAD_ID} case=sharding_triggered shards={}",
1908 shards.len()
1909 );
1910
1911 for (i, shard) in shards.iter().enumerate() {
1913 assert!(
1914 shard.k_source <= K_MAX,
1915 "bead_id={TEST_BEAD_ID} case=shard_k_max shard={i} k_source={}",
1916 shard.k_source
1917 );
1918 }
1919
1920 let total_bytes: usize = shards.iter().map(|s| s.changeset_bytes.len()).sum();
1922 assert_eq!(
1923 total_bytes,
1924 changeset_bytes.len(),
1925 "bead_id={TEST_BEAD_ID} case=sharding_coverage"
1926 );
1927 }
1928
1929 #[test]
1934 fn prop_changeset_id_unique() {
1935 let page_size = 128_u32;
1936 let mut ids = Vec::new();
1937 for seed in 0_u32..20 {
1938 let mut pages = vec![PageEntry::new(
1939 1,
1940 vec![u8::try_from(seed).unwrap(); page_size as usize],
1941 )];
1942 let bytes = encode_changeset(page_size, &mut pages).expect("encode");
1943 ids.push(compute_changeset_id(&bytes));
1944 }
1945
1946 for i in 0..ids.len() {
1948 for j in (i + 1)..ids.len() {
1949 assert_ne!(
1950 ids[i], ids[j],
1951 "bead_id={TEST_BEAD_ID} case=prop_id_unique i={i} j={j}"
1952 );
1953 }
1954 }
1955 }
1956
1957 #[test]
1958 fn prop_sharding_covers_all_pages() {
1959 let symbol_size = 64_u16;
1960 for size_multiplier in [1_u64, 2, 5] {
1961 let total = u64::from(K_MAX) * u64::from(symbol_size) * size_multiplier + 7;
1962 let changeset = vec![0xCC_u8; usize::try_from(total).unwrap()];
1963 let shards = shard_changeset(changeset.clone(), symbol_size).expect("shard");
1964
1965 let reassembled: Vec<u8> = shards
1966 .iter()
1967 .flat_map(|s| s.changeset_bytes.iter().copied())
1968 .collect();
1969
1970 assert_eq!(
1971 reassembled, changeset,
1972 "bead_id={TEST_BEAD_ID} case=prop_sharding_coverage multiplier={size_multiplier}"
1973 );
1974 }
1975 }
1976
1977 #[test]
1982 fn test_bd_1hi_13_unit_compliance_gate() {
1983 assert_eq!(CHANGESET_MAGIC, *b"FSRP");
1984 assert_eq!(CHANGESET_VERSION, 1);
1985 assert_eq!(CHANGESET_HEADER_SIZE, 22);
1986 assert_eq!(REPLICATION_HEADER_SIZE_LEGACY, 24);
1987 assert_eq!(REPLICATION_HEADER_SIZE, 72);
1988 assert_eq!(REPLICATION_HEADER_SIZE_V2, 72);
1989 assert_eq!(MAX_UDP_PAYLOAD, 65_507);
1990 const { assert!(MAX_REPLICATION_SYMBOL_SIZE < MAX_UDP_PAYLOAD) };
1991
1992 let _ = ChangesetId::from_bytes([0; 16]);
1994 let _ = compute_changeset_id(b"test");
1995 let _ = derive_seed_from_changeset_id(&ChangesetId::from_bytes([0; 16]));
1996 }
1997
1998 #[test]
1999 fn prop_bd_1hi_13_structure_compliance() {
2000 let mut sender = ReplicationSender::new();
2002 assert_eq!(sender.state(), SenderState::Idle);
2003
2004 let mut pages = make_pages(256, &[1, 2]);
2005 sender
2006 .prepare(256, &mut pages, SenderConfig::default())
2007 .expect("prepare");
2008 assert_eq!(sender.state(), SenderState::Encoding);
2009
2010 sender.start_streaming().expect("start");
2011 assert_eq!(sender.state(), SenderState::Streaming);
2012
2013 sender.complete();
2014 assert_eq!(sender.state(), SenderState::Complete);
2015
2016 sender.reset();
2017 assert_eq!(sender.state(), SenderState::Idle);
2018 }
2019
2020 #[test]
2025 fn test_tls_by_default() {
2026 let cfg = NetworkStackConfig::default();
2027 assert_eq!(cfg.security, TransportSecurityMode::RustlsTls);
2028 assert!(cfg.validate_security().is_ok());
2029 }
2030
2031 #[test]
2032 fn test_plaintext_requires_explicit_opt_in() {
2033 let cfg = NetworkStackConfig {
2034 security: TransportSecurityMode::Plaintext,
2035 explicit_plaintext_opt_in: false,
2036 ..NetworkStackConfig::default()
2037 };
2038 let err = cfg.validate_security().unwrap_err();
2039 assert!(matches!(err, FrankenError::Unsupported));
2040
2041 let opted_in = NetworkStackConfig::plaintext_local_dev(true).unwrap();
2042 assert_eq!(opted_in.security, TransportSecurityMode::Plaintext);
2043 assert!(opted_in.validate_security().is_ok());
2044 }
2045
2046 #[test]
2047 fn test_http2_max_concurrent_streams() {
2048 let cfg = NetworkStackConfig::default();
2049 assert!(
2050 cfg.validate_concurrent_streams(DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS)
2051 .is_ok()
2052 );
2053 let err = cfg
2054 .validate_concurrent_streams(DEFAULT_HTTP2_MAX_CONCURRENT_STREAMS + 1)
2055 .unwrap_err();
2056 assert!(matches!(err, FrankenError::Busy));
2057 }
2058
2059 #[test]
2060 fn test_http2_max_header_list_size() {
2061 let cfg = NetworkStackConfig::default();
2062 assert!(
2063 cfg.validate_header_list_size(DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE)
2064 .is_ok()
2065 );
2066 let err = cfg
2067 .validate_header_list_size(DEFAULT_HTTP2_MAX_HEADER_LIST_SIZE + 1)
2068 .unwrap_err();
2069 assert!(matches!(err, FrankenError::TooBig));
2070 }
2071
2072 #[test]
2073 fn test_http2_continuation_timeout() {
2074 let cfg = NetworkStackConfig::default();
2075 assert!(
2076 cfg.validate_continuation_elapsed(DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS)
2077 .is_ok()
2078 );
2079 let err = cfg
2080 .validate_continuation_elapsed(DEFAULT_HTTP2_CONTINUATION_TIMEOUT_MS + 1)
2081 .unwrap_err();
2082 assert!(matches!(err, FrankenError::BusyRecovery));
2083 }
2084
2085 #[test]
2086 fn test_message_size_cap_enforced() {
2087 let cfg = NetworkStackConfig::default();
2088 assert!(
2089 cfg.validate_message_size(DEFAULT_RPC_MESSAGE_CAP_BYTES)
2090 .is_ok()
2091 );
2092 let err = cfg
2093 .validate_message_size(DEFAULT_RPC_MESSAGE_CAP_BYTES + 1)
2094 .unwrap_err();
2095 assert!(matches!(err, FrankenError::TooBig));
2096 }
2097
2098 #[test]
2099 fn test_handshake_timeout_bounded() {
2100 let cfg = NetworkStackConfig {
2101 handshake_timeout_ms: DEFAULT_HANDSHAKE_TIMEOUT_MS,
2102 ..NetworkStackConfig::default()
2103 };
2104 assert!(
2105 cfg.validate_handshake_elapsed(DEFAULT_HANDSHAKE_TIMEOUT_MS)
2106 .is_ok()
2107 );
2108 let err = cfg
2109 .validate_handshake_elapsed(DEFAULT_HANDSHAKE_TIMEOUT_MS + 500)
2110 .unwrap_err();
2111 assert!(matches!(err, FrankenError::BusyRecovery));
2112 }
2113
2114 #[test]
2115 fn test_virtual_tcp_deterministic() {
2116 let faults = VirtualTcpFaultProfile {
2117 drop_per_million: 150_000,
2118 reorder_per_million: 200_000,
2119 corrupt_per_million: 125_000,
2120 };
2121 let payloads = vec![
2122 b"alpha".to_vec(),
2123 b"beta".to_vec(),
2124 b"gamma".to_vec(),
2125 b"delta".to_vec(),
2126 b"epsilon".to_vec(),
2127 ];
2128
2129 let mut left = VirtualTcp::new(42, faults).unwrap();
2130 let mut left_out = Vec::new();
2131 for payload in &payloads {
2132 left_out.extend(left.transmit(payload));
2133 }
2134 if let Some(flush) = left.flush() {
2135 left_out.push(flush);
2136 }
2137 let left_trace = left.trace().to_vec();
2138
2139 let mut right = VirtualTcp::new(42, faults).unwrap();
2140 let mut right_out = Vec::new();
2141 for payload in &payloads {
2142 right_out.extend(right.transmit(payload));
2143 }
2144 if let Some(flush) = right.flush() {
2145 right_out.push(flush);
2146 }
2147 let right_trace = right.trace().to_vec();
2148
2149 assert_eq!(left_out, right_out);
2150 assert_eq!(left_trace, right_trace);
2151 }
2152
2153 #[test]
2154 fn test_virtual_tcp_fault_injection() {
2155 let mut vtcp = VirtualTcp::new(
2156 7,
2157 VirtualTcpFaultProfile {
2158 drop_per_million: 0,
2159 reorder_per_million: 1_000_000,
2160 corrupt_per_million: 1_000_000,
2161 },
2162 )
2163 .unwrap();
2164
2165 let out_first = vtcp.transmit(b"packet-a");
2166 assert!(out_first.is_empty(), "first packet must be buffered");
2167
2168 let out_second = vtcp.transmit(b"packet-b");
2169 assert_eq!(out_second.len(), 2, "second transmit flushes reorder queue");
2170 assert_ne!(
2171 out_second[0],
2172 b"packet-b".to_vec(),
2173 "corruption must alter delivered payload"
2174 );
2175
2176 let has_buffer = vtcp
2177 .trace()
2178 .iter()
2179 .any(|event| event.kind == VirtualTcpTraceKind::BufferedForReorder);
2180 let has_corrupt_delivery = vtcp
2181 .trace()
2182 .iter()
2183 .any(|event| event.kind == VirtualTcpTraceKind::DeliveredCorrupt);
2184 let has_flush = vtcp
2185 .trace()
2186 .iter()
2187 .any(|event| event.kind == VirtualTcpTraceKind::FlushedReordered);
2188
2189 assert!(has_buffer);
2190 assert!(has_corrupt_delivery);
2191 assert!(has_flush);
2192 }
2193}