1use std::collections::BTreeMap;
2
3use crypto_box::aead::Aead;
4use crypto_box::{Nonce as BoxNonce, PublicKey, SalsaBox, SecretKey};
5
6use crate::channel::ChannelId;
7use crate::crypto::decrypt_chacha20poly1305_legacy_into;
8use crate::fec::FecCode;
9
10pub const WIFI_MTU: usize = 4045;
12pub const IEEE80211_HEADER_LEN: usize = 24;
14pub const CRYPTO_BOX_SECRETKEY_LEN: usize = 32;
16pub const CRYPTO_BOX_PUBLICKEY_LEN: usize = 32;
18pub const CRYPTO_BOX_NONCE_LEN: usize = 24;
20pub const CRYPTO_BOX_TAG_LEN: usize = 16;
22pub const WSESSION_HDR_LEN: usize = 1 + CRYPTO_BOX_NONCE_LEN;
24pub const WSESSION_DATA_LEN: usize = 8 + 4 + 1 + 1 + 1 + CHACHA20_POLY1305_KEY_LEN;
26pub const WBLOCK_HDR_LEN: usize = 9;
28pub const WPACKET_HDR_LEN: usize = 3;
30pub const CHACHA20_POLY1305_KEY_LEN: usize = 32;
32pub const CHACHA20_POLY1305_TAG_LEN: usize = 16;
34pub const MAX_FEC_PAYLOAD: usize =
36 WIFI_MTU - IEEE80211_HEADER_LEN - WBLOCK_HDR_LEN - CHACHA20_POLY1305_TAG_LEN;
37pub const MAX_PAYLOAD_SIZE: usize = MAX_FEC_PAYLOAD - WPACKET_HDR_LEN;
39pub const MAX_FORWARDER_PACKET_SIZE: usize = WIFI_MTU - IEEE80211_HEADER_LEN;
41pub const MAX_BLOCK_IDX: u64 = (1u64 << 55) - 1;
43
44pub const WFB_PACKET_DATA: u8 = 0x01;
46pub const WFB_PACKET_KEY: u8 = 0x02;
48pub const WFB_FEC_VDM_RS: u8 = 0x01;
50pub const WFB_PACKET_FEC_ONLY: u8 = 0x01;
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum WfbError {
56 Empty,
58 TooLong,
60 ShortDataPacket,
62 ShortSessionPacket,
64 InvalidKeypair,
66 SessionEncryptFailed,
68 SessionDecryptFailed,
70 DataEncryptFailed,
72 DataDecryptFailed,
74 SessionEpochTooOld {
76 session_epoch: u64,
78 minimum_epoch: u64,
80 },
81 SessionChannelMismatch {
83 expected: u32,
85 actual: u32,
87 },
88 UnsupportedFecType(u8),
90 UnknownPacketType(u8),
92 InvalidFecParameters,
94 InvalidFragmentIndex,
96 BlockIndexOverflow,
98 InvalidPlainPacket,
100 PayloadTooLarge,
102 MissingSession,
104 FecRecoveryFailed,
106}
107
108impl std::fmt::Display for WfbError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::Empty => write!(f, "empty WFB packet"),
112 Self::TooLong => write!(f, "WFB packet exceeds maximum forwarder size"),
113 Self::ShortDataPacket => write!(f, "short WFB data packet"),
114 Self::ShortSessionPacket => write!(f, "invalid WFB session packet size"),
115 Self::InvalidKeypair => write!(f, "WFB keypair must be 64 bytes"),
116 Self::SessionEncryptFailed => write!(f, "unable to encrypt WFB session key"),
117 Self::SessionDecryptFailed => write!(f, "unable to decrypt WFB session key"),
118 Self::DataEncryptFailed => write!(f, "unable to encrypt WFB data packet"),
119 Self::DataDecryptFailed => write!(f, "unable to decrypt WFB data packet"),
120 Self::SessionEpochTooOld {
121 session_epoch,
122 minimum_epoch,
123 } => write!(
124 f,
125 "WFB session epoch {session_epoch} is older than minimum {minimum_epoch}"
126 ),
127 Self::SessionChannelMismatch { expected, actual } => write!(
128 f,
129 "WFB session channel mismatch: expected 0x{expected:08x}, got 0x{actual:08x}"
130 ),
131 Self::UnsupportedFecType(fec_type) => {
132 write!(f, "unsupported WFB FEC type {fec_type}")
133 }
134 Self::UnknownPacketType(packet_type) => {
135 write!(f, "unknown WFB packet type 0x{packet_type:02x}")
136 }
137 Self::InvalidFecParameters => write!(f, "invalid WFB FEC parameters"),
138 Self::InvalidFragmentIndex => write!(f, "invalid WFB fragment index"),
139 Self::BlockIndexOverflow => write!(f, "WFB block index overflow"),
140 Self::InvalidPlainPacket => write!(f, "invalid decrypted WFB packet"),
141 Self::PayloadTooLarge => write!(f, "decrypted WFB payload is too large"),
142 Self::MissingSession => write!(f, "WFB data packet arrived before session key"),
143 Self::FecRecoveryFailed => write!(f, "WFB FEC recovery failed"),
144 }
145 }
146}
147
148impl std::error::Error for WfbError {}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub enum WfbPacket<'a> {
153 Data {
155 data_nonce: u64,
157 encrypted_payload: &'a [u8],
159 associated_data: &'a [u8],
161 },
162 SessionKey {
164 session_nonce: &'a [u8],
166 encrypted_session: &'a [u8],
168 },
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct WfbKeypair {
174 pub rx_secretkey: [u8; CRYPTO_BOX_SECRETKEY_LEN],
176 pub tx_publickey: [u8; CRYPTO_BOX_PUBLICKEY_LEN],
178}
179
180impl WfbKeypair {
181 pub fn from_bytes(bytes: &[u8]) -> Result<Self, WfbError> {
183 if bytes.len() != CRYPTO_BOX_SECRETKEY_LEN + CRYPTO_BOX_PUBLICKEY_LEN {
184 return Err(WfbError::InvalidKeypair);
185 }
186 let mut rx_secretkey = [0; CRYPTO_BOX_SECRETKEY_LEN];
187 let mut tx_publickey = [0; CRYPTO_BOX_PUBLICKEY_LEN];
188 rx_secretkey.copy_from_slice(&bytes[..CRYPTO_BOX_SECRETKEY_LEN]);
189 tx_publickey.copy_from_slice(&bytes[CRYPTO_BOX_SECRETKEY_LEN..]);
190 Ok(Self {
191 rx_secretkey,
192 tx_publickey,
193 })
194 }
195}
196
197#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
199pub struct FecCounters {
200 pub total_packets: u64,
202 pub recovered_packets: u64,
204 pub lost_packets: u64,
206 pub bad_packets: u64,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct WfbSession {
213 pub epoch: u64,
215 pub channel_id: ChannelId,
217 pub fec_type: u8,
219 pub fec_k: usize,
221 pub fec_n: usize,
223 pub session_key: [u8; CHACHA20_POLY1305_KEY_LEN],
225}
226
227impl WfbSession {
228 fn parse(
229 plaintext: &[u8],
230 expected_channel_id: ChannelId,
231 minimum_epoch: u64,
232 ) -> Result<Self, WfbError> {
233 if plaintext.len() < WSESSION_DATA_LEN {
234 return Err(WfbError::SessionDecryptFailed);
235 }
236 let epoch = u64::from_be_bytes(plaintext[0..8].try_into().expect("checked length"));
237 if epoch < minimum_epoch {
238 return Err(WfbError::SessionEpochTooOld {
239 session_epoch: epoch,
240 minimum_epoch,
241 });
242 }
243
244 let raw_channel = u32::from_be_bytes(plaintext[8..12].try_into().expect("checked length"));
245 let channel_id = ChannelId::new(raw_channel);
246 if channel_id != expected_channel_id {
247 return Err(WfbError::SessionChannelMismatch {
248 expected: expected_channel_id.raw(),
249 actual: raw_channel,
250 });
251 }
252
253 let fec_type = plaintext[12];
254 if fec_type != WFB_FEC_VDM_RS {
255 return Err(WfbError::UnsupportedFecType(fec_type));
256 }
257 let fec_k = plaintext[13] as usize;
258 let fec_n = plaintext[14] as usize;
259 if fec_k == 0 || fec_n == 0 || fec_k > fec_n || fec_n >= 256 {
260 return Err(WfbError::InvalidFecParameters);
261 }
262
263 let mut session_key = [0; CHACHA20_POLY1305_KEY_LEN];
264 session_key.copy_from_slice(&plaintext[15..47]);
265 Ok(Self {
266 epoch,
267 channel_id,
268 fec_type,
269 fec_k,
270 fec_n,
271 session_key,
272 })
273 }
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
278pub enum WfbEvent {
279 Session(WfbSession),
281 Payload(WfbOutput),
283}
284
285#[derive(Debug, Clone)]
287pub struct WfbReceiver {
288 channel_id: ChannelId,
289 minimum_epoch: u64,
290 keypair: WfbKeypair,
291 session: Option<WfbSession>,
292 assembler: Option<PlainAssembler>,
293 decrypt_scratch: Vec<u8>,
294 incoming_packets: u64,
295 session_packets: u64,
296 data_packets: u64,
297}
298
299impl WfbReceiver {
300 pub fn new(channel_id: ChannelId, keypair: WfbKeypair, minimum_epoch: u64) -> Self {
302 Self {
303 channel_id,
304 minimum_epoch,
305 keypair,
306 session: None,
307 assembler: None,
308 decrypt_scratch: Vec::with_capacity(MAX_FEC_PAYLOAD),
309 incoming_packets: 0,
310 session_packets: 0,
311 data_packets: 0,
312 }
313 }
314
315 pub fn session(&self) -> Option<&WfbSession> {
317 self.session.as_ref()
318 }
319
320 pub fn counters(&self) -> FecCounters {
322 let assembler = self
323 .assembler
324 .as_ref()
325 .map(PlainAssembler::counters)
326 .unwrap_or_default();
327 FecCounters {
328 total_packets: self.incoming_packets,
329 recovered_packets: assembler.recovered_packets,
330 lost_packets: assembler.lost_packets,
331 bad_packets: assembler.bad_packets,
332 }
333 }
334
335 pub fn push_forwarder_packet(&mut self, buf: &[u8]) -> Result<Vec<WfbEvent>, WfbError> {
337 log::trace!(target: "openipc_core::wfb", "received WFB forwarder packet bytes={}", buf.len());
338 match parse_forwarder_packet(buf)? {
339 WfbPacket::SessionKey {
340 session_nonce,
341 encrypted_session,
342 } => {
343 self.incoming_packets += 1;
344 self.session_packets += 1;
345 let session = self.decrypt_session(session_nonce, encrypted_session)?;
346 let changed = self
347 .session
348 .as_ref()
349 .map(|current| current.session_key != session.session_key)
350 .unwrap_or(true);
351 if changed {
352 log::info!(
353 target: "openipc_core::wfb",
354 "accepted WFB session epoch={} channel=0x{:08x} fec={}/{}",
355 session.epoch,
356 session.channel_id.raw(),
357 session.fec_k,
358 session.fec_n
359 );
360 self.assembler = Some(PlainAssembler::new(session.fec_k, session.fec_n)?);
361 self.session = Some(session.clone());
362 Ok(vec![WfbEvent::Session(session)])
363 } else {
364 Ok(Vec::new())
365 }
366 }
367 WfbPacket::Data {
368 data_nonce,
369 encrypted_payload,
370 associated_data,
371 } => {
372 self.incoming_packets += 1;
373 self.data_packets += 1;
374 let session = self.session.as_ref().ok_or(WfbError::MissingSession)?;
375 let nonce = &associated_data[1..WBLOCK_HDR_LEN];
376 decrypt_chacha20poly1305_legacy_into(
377 &session.session_key,
378 nonce,
379 associated_data,
380 encrypted_payload,
381 &mut self.decrypt_scratch,
382 )
383 .map_err(|_| WfbError::DataDecryptFailed)?;
384 let assembler = self.assembler.as_mut().ok_or(WfbError::MissingSession)?;
385 let payloads: Vec<_> = assembler
386 .push_decrypted_fragment(data_nonce, &self.decrypt_scratch)?
387 .into_iter()
388 .map(WfbEvent::Payload)
389 .collect();
390 log::trace!(
391 target: "openipc_core::wfb",
392 "processed encrypted WFB data fragment nonce={} payloads={}",
393 data_nonce,
394 payloads.len()
395 );
396 Ok(payloads)
397 }
398 }
399 }
400
401 fn decrypt_session(
402 &self,
403 session_nonce: &[u8],
404 encrypted_session: &[u8],
405 ) -> Result<WfbSession, WfbError> {
406 let nonce: [u8; CRYPTO_BOX_NONCE_LEN] = session_nonce
407 .try_into()
408 .map_err(|_| WfbError::ShortSessionPacket)?;
409 let rx_secret = SecretKey::from(self.keypair.rx_secretkey);
410 let tx_public = PublicKey::from(self.keypair.tx_publickey);
411 let cipher = SalsaBox::new(&tx_public, &rx_secret);
412 let plaintext = cipher
413 .decrypt(BoxNonce::from_slice(&nonce), encrypted_session)
414 .map_err(|_| WfbError::SessionDecryptFailed)?;
415 let minimum_epoch = self
416 .session
417 .as_ref()
418 .map(|session| session.epoch.max(self.minimum_epoch))
419 .unwrap_or(self.minimum_epoch);
420 WfbSession::parse(&plaintext, self.channel_id, minimum_epoch)
421 }
422}
423
424pub fn parse_forwarder_packet(buf: &[u8]) -> Result<WfbPacket<'_>, WfbError> {
426 if buf.is_empty() {
427 return Err(WfbError::Empty);
428 }
429 if buf.len() > MAX_FORWARDER_PACKET_SIZE {
430 return Err(WfbError::TooLong);
431 }
432
433 match buf[0] {
434 WFB_PACKET_DATA => {
435 if buf.len() < WBLOCK_HDR_LEN + WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN {
436 return Err(WfbError::ShortDataPacket);
437 }
438 let mut nonce = [0; 8];
439 nonce.copy_from_slice(&buf[1..9]);
440 Ok(WfbPacket::Data {
441 data_nonce: u64::from_be_bytes(nonce),
442 encrypted_payload: &buf[WBLOCK_HDR_LEN..],
443 associated_data: &buf[..WBLOCK_HDR_LEN],
444 })
445 }
446 WFB_PACKET_KEY => {
447 if buf.len() < WSESSION_HDR_LEN + WSESSION_DATA_LEN + CRYPTO_BOX_TAG_LEN {
448 return Err(WfbError::ShortSessionPacket);
449 }
450 Ok(WfbPacket::SessionKey {
451 session_nonce: &buf[1..WSESSION_HDR_LEN],
452 encrypted_session: &buf[WSESSION_HDR_LEN..],
453 })
454 }
455 other => Err(WfbError::UnknownPacketType(other)),
456 }
457}
458
459#[derive(Debug, Clone, PartialEq, Eq)]
461pub struct WfbOutput {
462 pub packet_seq: u64,
464 pub payload: Vec<u8>,
466}
467
468#[derive(Debug, Clone)]
469struct Block {
470 fragments: Vec<Option<Vec<u8>>>,
471 received: usize,
472 next_fragment: usize,
473}
474
475impl Block {
476 fn new(n: usize) -> Self {
477 Self {
478 fragments: vec![None; n],
479 received: 0,
480 next_fragment: 0,
481 }
482 }
483}
484
485#[derive(Debug, Clone)]
491pub struct PlainAssembler {
492 fec_k: usize,
493 fec_n: usize,
494 fec: FecCode,
495 blocks: BTreeMap<u64, Block>,
496 next_block: Option<u64>,
497 pub total_packets: u64,
499 pub lost_packets: u64,
501 pub recovered_packets: u64,
503 pub bad_packets: u64,
505}
506
507impl PlainAssembler {
508 pub fn new(fec_k: usize, fec_n: usize) -> Result<Self, WfbError> {
510 if fec_k == 0 || fec_n == 0 || fec_k > fec_n || fec_n > 255 {
511 return Err(WfbError::InvalidFecParameters);
512 }
513 let fec = FecCode::new(fec_k, fec_n).map_err(|_| WfbError::InvalidFecParameters)?;
514 Ok(Self {
515 fec_k,
516 fec_n,
517 fec,
518 blocks: BTreeMap::new(),
519 next_block: None,
520 total_packets: 0,
521 lost_packets: 0,
522 recovered_packets: 0,
523 bad_packets: 0,
524 })
525 }
526
527 pub const fn fec_k(&self) -> usize {
529 self.fec_k
530 }
531
532 pub const fn fec_n(&self) -> usize {
534 self.fec_n
535 }
536
537 pub fn reset_fec(&mut self, fec_k: usize, fec_n: usize) -> Result<(), WfbError> {
539 *self = Self::new(fec_k, fec_n)?;
540 Ok(())
541 }
542
543 pub fn counters(&self) -> FecCounters {
545 FecCounters {
546 total_packets: self.total_packets,
547 recovered_packets: self.recovered_packets,
548 lost_packets: self.lost_packets,
549 bad_packets: self.bad_packets,
550 }
551 }
552
553 pub fn push_decrypted_fragment(
555 &mut self,
556 data_nonce: u64,
557 fragment: &[u8],
558 ) -> Result<Vec<WfbOutput>, WfbError> {
559 let block_idx = data_nonce >> 8;
560 let fragment_idx = (data_nonce & 0xff) as usize;
561
562 if block_idx > MAX_BLOCK_IDX {
563 return Err(WfbError::BlockIndexOverflow);
564 }
565 if fragment_idx >= self.fec_n {
566 return Err(WfbError::InvalidFragmentIndex);
567 }
568 self.total_packets += 1;
569
570 if self.next_block.is_none() {
571 self.next_block = Some(block_idx);
572 }
573 if self
574 .next_block
575 .map(|next_block| block_idx < next_block)
576 .unwrap_or(false)
577 {
578 return Ok(Vec::new());
579 }
580
581 let block = self
582 .blocks
583 .entry(block_idx)
584 .or_insert_with(|| Block::new(self.fec_n));
585 if block.fragments[fragment_idx].is_none() {
586 let mut padded = vec![0; MAX_FEC_PAYLOAD];
587 let len = fragment.len().min(MAX_FEC_PAYLOAD);
588 padded[..len].copy_from_slice(&fragment[..len]);
589 block.fragments[fragment_idx] = Some(padded);
590 block.received += 1;
591 }
592
593 Ok(self.drain_ready_blocks())
594 }
595
596 fn drain_ready_blocks(&mut self) -> Vec<WfbOutput> {
597 let mut out = Vec::new();
598 while let Some(block_idx) = self.next_block {
599 if !self.blocks.contains_key(&block_idx) {
600 if self.should_skip_missing_block(block_idx) {
601 self.lost_packets += self.fec_k as u64;
602 self.next_block = Some(block_idx + 1);
603 continue;
604 }
605 break;
606 }
607
608 self.emit_contiguous_primary(block_idx, &mut out);
609 let complete = self
610 .blocks
611 .get(&block_idx)
612 .map(|block| block.next_fragment == self.fec_k)
613 .unwrap_or(false);
614 if complete {
615 self.blocks.remove(&block_idx);
616 self.next_block = Some(block_idx + 1);
617 continue;
618 }
619
620 let can_recover = self
621 .blocks
622 .get(&block_idx)
623 .map(|block| block.received >= self.fec_k)
624 .unwrap_or(false);
625 if can_recover {
626 if let Some(block) = self.blocks.get_mut(&block_idx) {
627 match self
628 .fec
629 .recover_primary(&mut block.fragments, MAX_FEC_PAYLOAD)
630 {
631 Ok(recovered) => {
632 if recovered > 0 {
633 log::debug!(
634 target: "openipc_core::fec",
635 "recovered missing primary WFB fragments block={} recovered={}",
636 block_idx,
637 recovered
638 );
639 }
640 self.recovered_packets += recovered as u64;
641 }
642 Err(error) => {
643 log::warn!(
644 target: "openipc_core::fec",
645 "FEC recovery failed block={block_idx}: {error}"
646 );
647 self.bad_packets += 1;
648 self.force_flush_block(block_idx, &mut out);
649 continue;
650 }
651 }
652 }
653 self.emit_contiguous_primary(block_idx, &mut out);
654 self.blocks.remove(&block_idx);
655 self.next_block = Some(block_idx + 1);
656 continue;
657 }
658
659 if self.should_force_flush(block_idx) {
660 self.force_flush_block(block_idx, &mut out);
661 continue;
662 }
663
664 break;
665 }
666 out
667 }
668
669 fn should_skip_missing_block(&self, block_idx: u64) -> bool {
670 let Some((&next_present_block, block)) = self.blocks.range((block_idx + 1)..).next() else {
671 return false;
672 };
673
674 block.received >= self.fec_k
675 || self.blocks.len() > 40
676 || next_present_block.saturating_sub(block_idx) >= 40
677 }
678
679 fn emit_contiguous_primary(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
680 let Some(block) = self.blocks.get_mut(&block_idx) else {
681 return;
682 };
683 while block.next_fragment < self.fec_k {
684 let fragment_idx = block.next_fragment;
685 let Some(fragment) = block.fragments[fragment_idx].as_deref() else {
686 break;
687 };
688 let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
689 match parse_plain_packet(fragment) {
690 Ok(Some(payload)) => out.push(WfbOutput {
691 packet_seq,
692 payload: payload.to_vec(),
693 }),
694 Ok(None) => {}
695 Err(_) => {
696 self.bad_packets += 1;
697 }
698 }
699 block.next_fragment += 1;
700 }
701 }
702
703 fn should_force_flush(&self, block_idx: u64) -> bool {
704 if self.blocks.len() > 40 {
705 return true;
706 }
707 self.blocks
708 .range((block_idx + 1)..)
709 .any(|(_, block)| block.received >= self.fec_k)
710 }
711
712 fn force_flush_block(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
713 if let Some(block) = self.blocks.remove(&block_idx) {
714 for fragment_idx in block.next_fragment..self.fec_k {
715 let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
716 match block.fragments[fragment_idx].as_deref() {
717 Some(fragment) => match parse_plain_packet(fragment) {
718 Ok(Some(payload)) => out.push(WfbOutput {
719 packet_seq,
720 payload: payload.to_vec(),
721 }),
722 Ok(None) => {}
723 Err(_) => {
724 self.bad_packets += 1;
725 }
726 },
727 None => {
728 self.lost_packets += 1;
729 }
730 }
731 }
732 self.next_block = Some(block_idx + 1);
733 }
734 }
735}
736
737pub fn parse_plain_packet(fragment: &[u8]) -> Result<Option<&[u8]>, WfbError> {
739 if fragment.len() < WPACKET_HDR_LEN {
740 return Err(WfbError::InvalidPlainPacket);
741 }
742 let flags = fragment[0];
743 let packet_size = u16::from_be_bytes([fragment[1], fragment[2]]) as usize;
744 if packet_size > MAX_PAYLOAD_SIZE || WPACKET_HDR_LEN + packet_size > fragment.len() {
745 return Err(WfbError::PayloadTooLarge);
746 }
747 if flags & WFB_PACKET_FEC_ONLY != 0 {
748 return Ok(None);
749 }
750 Ok(Some(
751 &fragment[WPACKET_HDR_LEN..WPACKET_HDR_LEN + packet_size],
752 ))
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use crate::crypto::encrypt_chacha20poly1305_legacy;
759 use crypto_box::aead::Aead;
760
761 fn plain(payload: &[u8]) -> Vec<u8> {
762 let mut out = Vec::new();
763 out.push(0);
764 out.extend_from_slice(&(payload.len() as u16).to_be_bytes());
765 out.extend_from_slice(payload);
766 out
767 }
768
769 fn padded(fragment: &[u8]) -> Vec<u8> {
770 let mut out = vec![0; MAX_FEC_PAYLOAD];
771 out[..fragment.len()].copy_from_slice(fragment);
772 out
773 }
774
775 #[test]
776 fn parses_forwarder_data_packet() {
777 let mut packet = vec![WFB_PACKET_DATA];
778 packet.extend_from_slice(&0x0102_0304_0506_0708u64.to_be_bytes());
779 let encrypted = [9; WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN];
780 packet.extend_from_slice(&encrypted);
781
782 let parsed = parse_forwarder_packet(&packet).unwrap();
783 match parsed {
784 WfbPacket::Data {
785 data_nonce,
786 encrypted_payload,
787 associated_data,
788 } => {
789 assert_eq!(data_nonce, 0x0102_0304_0506_0708);
790 assert_eq!(encrypted_payload, encrypted);
791 assert_eq!(associated_data.len(), WBLOCK_HDR_LEN);
792 }
793 WfbPacket::SessionKey { .. } => panic!("expected data"),
794 }
795 }
796
797 #[test]
798 fn rejects_data_packets_without_encrypted_plain_header_and_tag() {
799 let mut packet = vec![WFB_PACKET_DATA];
800 packet.extend_from_slice(&0x0102_0304_0506_0708u64.to_be_bytes());
801 packet.extend_from_slice(&[0; WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN - 1]);
802
803 assert_eq!(
804 parse_forwarder_packet(&packet),
805 Err(WfbError::ShortDataPacket)
806 );
807 }
808
809 #[test]
810 fn emits_primary_fragments_in_order() {
811 let mut assembler = PlainAssembler::new(2, 4).unwrap();
812 let first = assembler
813 .push_decrypted_fragment(0, &plain(b"first"))
814 .unwrap();
815 assert_eq!(first.len(), 1);
816 assert_eq!(first[0].payload, b"first");
817 let out = assembler
818 .push_decrypted_fragment(1, &plain(b"second"))
819 .unwrap();
820 assert_eq!(out.len(), 1);
821 assert_eq!(out[0].payload, b"second");
822 }
823
824 #[test]
825 fn recovers_missing_primary_fragment_from_fec() {
826 let fec = FecCode::new(3, 5).unwrap();
827 let primary = vec![
828 padded(&plain(b"first")),
829 padded(&plain(b"second")),
830 padded(&plain(b"third")),
831 ];
832 let parity = fec.encode(&primary, MAX_FEC_PAYLOAD).unwrap();
833
834 let mut assembler = PlainAssembler::new(3, 5).unwrap();
835 let first = assembler.push_decrypted_fragment(0, &primary[0]).unwrap();
836 assert_eq!(first[0].payload, b"first");
837 assert!(assembler
838 .push_decrypted_fragment(2, &primary[2])
839 .unwrap()
840 .is_empty());
841 let recovered = assembler.push_decrypted_fragment(3, &parity[0]).unwrap();
842 assert_eq!(recovered.len(), 2);
843 assert_eq!(recovered[0].payload, b"second");
844 assert_eq!(recovered[1].payload, b"third");
845 assert_eq!(assembler.recovered_packets, 1);
846 }
847
848 #[test]
849 fn skips_fully_missing_blocks_when_later_block_is_ready() {
850 let mut assembler = PlainAssembler::new(2, 2).unwrap();
851
852 let first = assembler
853 .push_decrypted_fragment(0, &plain(b"b0-f0"))
854 .unwrap();
855 assert_eq!(first[0].payload, b"b0-f0");
856
857 assert!(assembler
858 .push_decrypted_fragment(2 << 8, &plain(b"b2-f0"))
859 .unwrap()
860 .is_empty());
861 let out = assembler
862 .push_decrypted_fragment((2 << 8) | 1, &plain(b"b2-f1"))
863 .unwrap();
864
865 assert_eq!(out.len(), 2);
866 assert_eq!(out[0].payload, b"b2-f0");
867 assert_eq!(out[1].payload, b"b2-f1");
868 assert_eq!(assembler.lost_packets, 3);
869 }
870
871 #[test]
872 fn ignores_late_fragments_from_already_flushed_blocks() {
873 let mut assembler = PlainAssembler::new(2, 2).unwrap();
874
875 assembler
876 .push_decrypted_fragment(0, &plain(b"b0-f0"))
877 .unwrap();
878 assembler
879 .push_decrypted_fragment(2 << 8, &plain(b"b2-f0"))
880 .unwrap();
881 assembler
882 .push_decrypted_fragment((2 << 8) | 1, &plain(b"b2-f1"))
883 .unwrap();
884
885 let late = assembler
886 .push_decrypted_fragment(1 << 8, &plain(b"late-b1-f0"))
887 .unwrap();
888 assert!(late.is_empty());
889 }
890
891 #[test]
892 fn skips_fec_only_plain_packets() {
893 let mut fragment = vec![WFB_PACKET_FEC_ONLY];
894 fragment.extend_from_slice(&4u16.to_be_bytes());
895 fragment.extend_from_slice(b"skip");
896 assert!(parse_plain_packet(&fragment).unwrap().is_none());
897 }
898
899 #[test]
900 fn receiver_decrypts_session_and_data_packet() {
901 let rx_secret = SecretKey::from([1; CRYPTO_BOX_SECRETKEY_LEN]);
902 let tx_secret = SecretKey::from([2; CRYPTO_BOX_SECRETKEY_LEN]);
903 let keypair = WfbKeypair {
904 rx_secretkey: rx_secret.to_bytes(),
905 tx_publickey: *tx_secret.public_key().as_bytes(),
906 };
907 let channel_id = ChannelId::default_video();
908 let session_key = [7; CHACHA20_POLY1305_KEY_LEN];
909
910 let mut session_plain = Vec::new();
911 session_plain.extend_from_slice(&1u64.to_be_bytes());
912 session_plain.extend_from_slice(&channel_id.raw().to_be_bytes());
913 session_plain.push(WFB_FEC_VDM_RS);
914 session_plain.push(1);
915 session_plain.push(1);
916 session_plain.extend_from_slice(&session_key);
917 assert_eq!(session_plain.len(), WSESSION_DATA_LEN);
918 session_plain.extend_from_slice(&[0x42, 0x00, 0x01, 0x99]);
920
921 let session_nonce = [3; CRYPTO_BOX_NONCE_LEN];
922 let tx_box = SalsaBox::new(&rx_secret.public_key(), &tx_secret);
923 let encrypted_session = tx_box
924 .encrypt(
925 BoxNonce::from_slice(&session_nonce),
926 session_plain.as_slice(),
927 )
928 .unwrap();
929 let mut session_packet = vec![WFB_PACKET_KEY];
930 session_packet.extend_from_slice(&session_nonce);
931 session_packet.extend_from_slice(&encrypted_session);
932
933 let mut receiver = WfbReceiver::new(channel_id, keypair, 0);
934 let session_events = receiver.push_forwarder_packet(&session_packet).unwrap();
935 assert!(matches!(session_events.as_slice(), [WfbEvent::Session(_)]));
936
937 let data_nonce = 0u64;
938 let mut block_header = vec![WFB_PACKET_DATA];
939 block_header.extend_from_slice(&data_nonce.to_be_bytes());
940 let encrypted_data = encrypt_chacha20poly1305_legacy(
941 &session_key,
942 &block_header[1..WBLOCK_HDR_LEN],
943 &block_header,
944 &plain(b"rtp payload"),
945 )
946 .unwrap();
947 let mut data_packet = block_header;
948 data_packet.extend_from_slice(&encrypted_data);
949
950 let payload_events = receiver.push_forwarder_packet(&data_packet).unwrap();
951 match payload_events.as_slice() {
952 [WfbEvent::Payload(payload)] => assert_eq!(payload.payload, b"rtp payload"),
953 other => panic!("unexpected events: {other:?}"),
954 }
955
956 let mut older_session_plain = Vec::new();
957 older_session_plain.extend_from_slice(&0u64.to_be_bytes());
958 older_session_plain.extend_from_slice(&channel_id.raw().to_be_bytes());
959 older_session_plain.push(WFB_FEC_VDM_RS);
960 older_session_plain.push(1);
961 older_session_plain.push(1);
962 older_session_plain.extend_from_slice(&[8; CHACHA20_POLY1305_KEY_LEN]);
963 let older_session_nonce = [4; CRYPTO_BOX_NONCE_LEN];
964 let encrypted_older_session = tx_box
965 .encrypt(
966 BoxNonce::from_slice(&older_session_nonce),
967 older_session_plain.as_slice(),
968 )
969 .unwrap();
970 let mut older_session_packet = vec![WFB_PACKET_KEY];
971 older_session_packet.extend_from_slice(&older_session_nonce);
972 older_session_packet.extend_from_slice(&encrypted_older_session);
973
974 assert_eq!(
975 receiver.push_forwarder_packet(&older_session_packet),
976 Err(WfbError::SessionEpochTooOld {
977 session_epoch: 0,
978 minimum_epoch: 1,
979 })
980 );
981 }
982}