Skip to main content

openipc_core/
wfb.rs

1use std::collections::BTreeMap;
2
3use crypto_box::aead::Aead;
4use crypto_box::{Nonce as BoxNonce, PublicKey, SalsaBox, SecretKey};
5
6use crate::channel::ChannelId;
7use crate::crypto::decrypt_chacha20poly1305_legacy;
8use crate::fec::FecCode;
9
10pub const WIFI_MTU: usize = 4045;
11pub const IEEE80211_HEADER_LEN: usize = 24;
12pub const CRYPTO_BOX_SECRETKEY_LEN: usize = 32;
13pub const CRYPTO_BOX_PUBLICKEY_LEN: usize = 32;
14pub const CRYPTO_BOX_NONCE_LEN: usize = 24;
15pub const CRYPTO_BOX_TAG_LEN: usize = 16;
16pub const WSESSION_HDR_LEN: usize = 1 + CRYPTO_BOX_NONCE_LEN;
17pub const WSESSION_DATA_LEN: usize = 8 + 4 + 1 + 1 + 1 + CHACHA20_POLY1305_KEY_LEN;
18pub const WBLOCK_HDR_LEN: usize = 9;
19pub const WPACKET_HDR_LEN: usize = 3;
20pub const CHACHA20_POLY1305_KEY_LEN: usize = 32;
21pub const CHACHA20_POLY1305_TAG_LEN: usize = 16;
22pub const MAX_FEC_PAYLOAD: usize =
23    WIFI_MTU - IEEE80211_HEADER_LEN - WBLOCK_HDR_LEN - CHACHA20_POLY1305_TAG_LEN;
24pub const MAX_PAYLOAD_SIZE: usize = MAX_FEC_PAYLOAD - WPACKET_HDR_LEN;
25pub const MAX_FORWARDER_PACKET_SIZE: usize = WIFI_MTU - IEEE80211_HEADER_LEN;
26pub const MAX_BLOCK_IDX: u64 = (1u64 << 55) - 1;
27
28pub const WFB_PACKET_DATA: u8 = 0x01;
29pub const WFB_PACKET_KEY: u8 = 0x02;
30pub const WFB_FEC_VDM_RS: u8 = 0x01;
31pub const WFB_PACKET_FEC_ONLY: u8 = 0x01;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum WfbError {
35    Empty,
36    TooLong,
37    ShortDataPacket,
38    ShortSessionPacket,
39    InvalidKeypair,
40    SessionEncryptFailed,
41    SessionDecryptFailed,
42    DataEncryptFailed,
43    DataDecryptFailed,
44    SessionEpochTooOld {
45        session_epoch: u64,
46        minimum_epoch: u64,
47    },
48    SessionChannelMismatch {
49        expected: u32,
50        actual: u32,
51    },
52    UnsupportedFecType(u8),
53    UnknownPacketType(u8),
54    InvalidFecParameters,
55    InvalidFragmentIndex,
56    BlockIndexOverflow,
57    InvalidPlainPacket,
58    PayloadTooLarge,
59    MissingSession,
60    FecRecoveryFailed,
61}
62
63impl std::fmt::Display for WfbError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::Empty => write!(f, "empty WFB packet"),
67            Self::TooLong => write!(f, "WFB packet exceeds maximum forwarder size"),
68            Self::ShortDataPacket => write!(f, "short WFB data packet"),
69            Self::ShortSessionPacket => write!(f, "invalid WFB session packet size"),
70            Self::InvalidKeypair => write!(f, "WFB keypair must be 64 bytes"),
71            Self::SessionEncryptFailed => write!(f, "unable to encrypt WFB session key"),
72            Self::SessionDecryptFailed => write!(f, "unable to decrypt WFB session key"),
73            Self::DataEncryptFailed => write!(f, "unable to encrypt WFB data packet"),
74            Self::DataDecryptFailed => write!(f, "unable to decrypt WFB data packet"),
75            Self::SessionEpochTooOld {
76                session_epoch,
77                minimum_epoch,
78            } => write!(
79                f,
80                "WFB session epoch {session_epoch} is older than minimum {minimum_epoch}"
81            ),
82            Self::SessionChannelMismatch { expected, actual } => write!(
83                f,
84                "WFB session channel mismatch: expected 0x{expected:08x}, got 0x{actual:08x}"
85            ),
86            Self::UnsupportedFecType(fec_type) => {
87                write!(f, "unsupported WFB FEC type {fec_type}")
88            }
89            Self::UnknownPacketType(packet_type) => {
90                write!(f, "unknown WFB packet type 0x{packet_type:02x}")
91            }
92            Self::InvalidFecParameters => write!(f, "invalid WFB FEC parameters"),
93            Self::InvalidFragmentIndex => write!(f, "invalid WFB fragment index"),
94            Self::BlockIndexOverflow => write!(f, "WFB block index overflow"),
95            Self::InvalidPlainPacket => write!(f, "invalid decrypted WFB packet"),
96            Self::PayloadTooLarge => write!(f, "decrypted WFB payload is too large"),
97            Self::MissingSession => write!(f, "WFB data packet arrived before session key"),
98            Self::FecRecoveryFailed => write!(f, "WFB FEC recovery failed"),
99        }
100    }
101}
102
103impl std::error::Error for WfbError {}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum WfbPacket<'a> {
107    Data {
108        data_nonce: u64,
109        encrypted_payload: &'a [u8],
110        associated_data: &'a [u8],
111    },
112    SessionKey {
113        session_nonce: &'a [u8],
114        encrypted_session: &'a [u8],
115    },
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub struct WfbKeypair {
120    pub rx_secretkey: [u8; CRYPTO_BOX_SECRETKEY_LEN],
121    pub tx_publickey: [u8; CRYPTO_BOX_PUBLICKEY_LEN],
122}
123
124impl WfbKeypair {
125    pub fn from_bytes(bytes: &[u8]) -> Result<Self, WfbError> {
126        if bytes.len() != CRYPTO_BOX_SECRETKEY_LEN + CRYPTO_BOX_PUBLICKEY_LEN {
127            return Err(WfbError::InvalidKeypair);
128        }
129        let mut rx_secretkey = [0; CRYPTO_BOX_SECRETKEY_LEN];
130        let mut tx_publickey = [0; CRYPTO_BOX_PUBLICKEY_LEN];
131        rx_secretkey.copy_from_slice(&bytes[..CRYPTO_BOX_SECRETKEY_LEN]);
132        tx_publickey.copy_from_slice(&bytes[CRYPTO_BOX_SECRETKEY_LEN..]);
133        Ok(Self {
134            rx_secretkey,
135            tx_publickey,
136        })
137    }
138}
139
140#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
141pub struct FecCounters {
142    pub total_packets: u64,
143    pub recovered_packets: u64,
144    pub lost_packets: u64,
145    pub bad_packets: u64,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct WfbSession {
150    pub epoch: u64,
151    pub channel_id: ChannelId,
152    pub fec_type: u8,
153    pub fec_k: usize,
154    pub fec_n: usize,
155    pub session_key: [u8; CHACHA20_POLY1305_KEY_LEN],
156}
157
158impl WfbSession {
159    fn parse(
160        plaintext: &[u8],
161        expected_channel_id: ChannelId,
162        minimum_epoch: u64,
163    ) -> Result<Self, WfbError> {
164        if plaintext.len() != WSESSION_DATA_LEN {
165            return Err(WfbError::SessionDecryptFailed);
166        }
167        let epoch = u64::from_be_bytes(plaintext[0..8].try_into().expect("checked length"));
168        if epoch < minimum_epoch {
169            return Err(WfbError::SessionEpochTooOld {
170                session_epoch: epoch,
171                minimum_epoch,
172            });
173        }
174
175        let raw_channel = u32::from_be_bytes(plaintext[8..12].try_into().expect("checked length"));
176        let channel_id = ChannelId::new(raw_channel);
177        if channel_id != expected_channel_id {
178            return Err(WfbError::SessionChannelMismatch {
179                expected: expected_channel_id.raw(),
180                actual: raw_channel,
181            });
182        }
183
184        let fec_type = plaintext[12];
185        if fec_type != WFB_FEC_VDM_RS {
186            return Err(WfbError::UnsupportedFecType(fec_type));
187        }
188        let fec_k = plaintext[13] as usize;
189        let fec_n = plaintext[14] as usize;
190        if fec_k == 0 || fec_n == 0 || fec_k > fec_n {
191            return Err(WfbError::InvalidFecParameters);
192        }
193
194        let mut session_key = [0; CHACHA20_POLY1305_KEY_LEN];
195        session_key.copy_from_slice(&plaintext[15..47]);
196        Ok(Self {
197            epoch,
198            channel_id,
199            fec_type,
200            fec_k,
201            fec_n,
202            session_key,
203        })
204    }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum WfbEvent {
209    Session(WfbSession),
210    Payload(WfbOutput),
211}
212
213#[derive(Debug, Clone)]
214pub struct WfbReceiver {
215    channel_id: ChannelId,
216    minimum_epoch: u64,
217    keypair: WfbKeypair,
218    session: Option<WfbSession>,
219    assembler: Option<PlainAssembler>,
220    incoming_packets: u64,
221    session_packets: u64,
222    data_packets: u64,
223}
224
225impl WfbReceiver {
226    pub fn new(channel_id: ChannelId, keypair: WfbKeypair, minimum_epoch: u64) -> Self {
227        Self {
228            channel_id,
229            minimum_epoch,
230            keypair,
231            session: None,
232            assembler: None,
233            incoming_packets: 0,
234            session_packets: 0,
235            data_packets: 0,
236        }
237    }
238
239    pub fn session(&self) -> Option<&WfbSession> {
240        self.session.as_ref()
241    }
242
243    pub fn counters(&self) -> FecCounters {
244        let assembler = self
245            .assembler
246            .as_ref()
247            .map(PlainAssembler::counters)
248            .unwrap_or_default();
249        FecCounters {
250            total_packets: self.incoming_packets,
251            recovered_packets: assembler.recovered_packets,
252            lost_packets: assembler.lost_packets,
253            bad_packets: assembler.bad_packets,
254        }
255    }
256
257    pub fn push_forwarder_packet(&mut self, buf: &[u8]) -> Result<Vec<WfbEvent>, WfbError> {
258        match parse_forwarder_packet(buf)? {
259            WfbPacket::SessionKey {
260                session_nonce,
261                encrypted_session,
262            } => {
263                self.incoming_packets += 1;
264                self.session_packets += 1;
265                let session = self.decrypt_session(session_nonce, encrypted_session)?;
266                let changed = self
267                    .session
268                    .as_ref()
269                    .map(|current| current.session_key != session.session_key)
270                    .unwrap_or(true);
271                if changed {
272                    self.assembler = Some(PlainAssembler::new(session.fec_k, session.fec_n)?);
273                    self.session = Some(session.clone());
274                    Ok(vec![WfbEvent::Session(session)])
275                } else {
276                    Ok(Vec::new())
277                }
278            }
279            WfbPacket::Data {
280                data_nonce,
281                encrypted_payload,
282                associated_data,
283            } => {
284                self.incoming_packets += 1;
285                self.data_packets += 1;
286                let session = self.session.as_ref().ok_or(WfbError::MissingSession)?;
287                let nonce = &associated_data[1..WBLOCK_HDR_LEN];
288                let decrypted = decrypt_chacha20poly1305_legacy(
289                    &session.session_key,
290                    nonce,
291                    associated_data,
292                    encrypted_payload,
293                )
294                .map_err(|_| WfbError::DataDecryptFailed)?;
295                let assembler = self.assembler.as_mut().ok_or(WfbError::MissingSession)?;
296                Ok(assembler
297                    .push_decrypted_fragment(data_nonce, &decrypted)?
298                    .into_iter()
299                    .map(WfbEvent::Payload)
300                    .collect())
301            }
302        }
303    }
304
305    fn decrypt_session(
306        &self,
307        session_nonce: &[u8],
308        encrypted_session: &[u8],
309    ) -> Result<WfbSession, WfbError> {
310        let nonce: [u8; CRYPTO_BOX_NONCE_LEN] = session_nonce
311            .try_into()
312            .map_err(|_| WfbError::ShortSessionPacket)?;
313        let rx_secret = SecretKey::from(self.keypair.rx_secretkey);
314        let tx_public = PublicKey::from(self.keypair.tx_publickey);
315        let cipher = SalsaBox::new(&tx_public, &rx_secret);
316        let plaintext = cipher
317            .decrypt(BoxNonce::from_slice(&nonce), encrypted_session)
318            .map_err(|_| WfbError::SessionDecryptFailed)?;
319        WfbSession::parse(&plaintext, self.channel_id, self.minimum_epoch)
320    }
321}
322
323pub fn parse_forwarder_packet(buf: &[u8]) -> Result<WfbPacket<'_>, WfbError> {
324    if buf.is_empty() {
325        return Err(WfbError::Empty);
326    }
327    if buf.len() > MAX_FORWARDER_PACKET_SIZE {
328        return Err(WfbError::TooLong);
329    }
330
331    match buf[0] {
332        WFB_PACKET_DATA => {
333            if buf.len() < WBLOCK_HDR_LEN + WPACKET_HDR_LEN {
334                return Err(WfbError::ShortDataPacket);
335            }
336            let mut nonce = [0; 8];
337            nonce.copy_from_slice(&buf[1..9]);
338            Ok(WfbPacket::Data {
339                data_nonce: u64::from_be_bytes(nonce),
340                encrypted_payload: &buf[WBLOCK_HDR_LEN..],
341                associated_data: &buf[..WBLOCK_HDR_LEN],
342            })
343        }
344        WFB_PACKET_KEY => {
345            if buf.len() != WSESSION_HDR_LEN + WSESSION_DATA_LEN + CRYPTO_BOX_TAG_LEN {
346                return Err(WfbError::ShortSessionPacket);
347            }
348            Ok(WfbPacket::SessionKey {
349                session_nonce: &buf[1..WSESSION_HDR_LEN],
350                encrypted_session: &buf[WSESSION_HDR_LEN..],
351            })
352        }
353        other => Err(WfbError::UnknownPacketType(other)),
354    }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct WfbOutput {
359    pub packet_seq: u64,
360    pub payload: Vec<u8>,
361}
362
363#[derive(Debug, Clone)]
364struct Block {
365    fragments: Vec<Option<Vec<u8>>>,
366    received: usize,
367    next_fragment: usize,
368}
369
370impl Block {
371    fn new(n: usize) -> Self {
372        Self {
373            fragments: vec![None; n],
374            received: 0,
375            next_fragment: 0,
376        }
377    }
378}
379
380#[derive(Debug, Clone)]
381pub struct PlainAssembler {
382    fec_k: usize,
383    fec_n: usize,
384    fec: FecCode,
385    blocks: BTreeMap<u64, Block>,
386    next_block: Option<u64>,
387    pub total_packets: u64,
388    pub lost_packets: u64,
389    pub recovered_packets: u64,
390    pub bad_packets: u64,
391}
392
393impl PlainAssembler {
394    pub fn new(fec_k: usize, fec_n: usize) -> Result<Self, WfbError> {
395        if fec_k == 0 || fec_n == 0 || fec_k > fec_n || fec_n > 255 {
396            return Err(WfbError::InvalidFecParameters);
397        }
398        let fec = FecCode::new(fec_k, fec_n).map_err(|_| WfbError::InvalidFecParameters)?;
399        Ok(Self {
400            fec_k,
401            fec_n,
402            fec,
403            blocks: BTreeMap::new(),
404            next_block: None,
405            total_packets: 0,
406            lost_packets: 0,
407            recovered_packets: 0,
408            bad_packets: 0,
409        })
410    }
411
412    pub const fn fec_k(&self) -> usize {
413        self.fec_k
414    }
415
416    pub const fn fec_n(&self) -> usize {
417        self.fec_n
418    }
419
420    pub fn reset_fec(&mut self, fec_k: usize, fec_n: usize) -> Result<(), WfbError> {
421        *self = Self::new(fec_k, fec_n)?;
422        Ok(())
423    }
424
425    pub fn counters(&self) -> FecCounters {
426        FecCounters {
427            total_packets: self.total_packets,
428            recovered_packets: self.recovered_packets,
429            lost_packets: self.lost_packets,
430            bad_packets: self.bad_packets,
431        }
432    }
433
434    /// Push a decrypted WFB FEC fragment.
435    pub fn push_decrypted_fragment(
436        &mut self,
437        data_nonce: u64,
438        fragment: &[u8],
439    ) -> Result<Vec<WfbOutput>, WfbError> {
440        let block_idx = data_nonce >> 8;
441        let fragment_idx = (data_nonce & 0xff) as usize;
442
443        if block_idx > MAX_BLOCK_IDX {
444            return Err(WfbError::BlockIndexOverflow);
445        }
446        if fragment_idx >= self.fec_n {
447            return Err(WfbError::InvalidFragmentIndex);
448        }
449        self.total_packets += 1;
450
451        if self.next_block.is_none() {
452            self.next_block = Some(block_idx);
453        }
454
455        let block = self
456            .blocks
457            .entry(block_idx)
458            .or_insert_with(|| Block::new(self.fec_n));
459        if block.fragments[fragment_idx].is_none() {
460            let mut padded = vec![0; MAX_FEC_PAYLOAD];
461            let len = fragment.len().min(MAX_FEC_PAYLOAD);
462            padded[..len].copy_from_slice(&fragment[..len]);
463            block.fragments[fragment_idx] = Some(padded);
464            block.received += 1;
465        }
466
467        Ok(self.drain_ready_blocks())
468    }
469
470    fn drain_ready_blocks(&mut self) -> Vec<WfbOutput> {
471        let mut out = Vec::new();
472        while let Some(block_idx) = self.next_block {
473            if !self.blocks.contains_key(&block_idx) {
474                break;
475            }
476
477            self.emit_contiguous_primary(block_idx, &mut out);
478            let complete = self
479                .blocks
480                .get(&block_idx)
481                .map(|block| block.next_fragment == self.fec_k)
482                .unwrap_or(false);
483            if complete {
484                self.blocks.remove(&block_idx);
485                self.next_block = Some(block_idx + 1);
486                continue;
487            }
488
489            let can_recover = self
490                .blocks
491                .get(&block_idx)
492                .map(|block| block.received >= self.fec_k)
493                .unwrap_or(false);
494            if can_recover {
495                if let Some(block) = self.blocks.get_mut(&block_idx) {
496                    match self
497                        .fec
498                        .recover_primary(&mut block.fragments, MAX_FEC_PAYLOAD)
499                    {
500                        Ok(recovered) => {
501                            self.recovered_packets += recovered as u64;
502                        }
503                        Err(_) => {
504                            self.bad_packets += 1;
505                            self.force_flush_block(block_idx, &mut out);
506                            continue;
507                        }
508                    }
509                }
510                self.emit_contiguous_primary(block_idx, &mut out);
511                self.blocks.remove(&block_idx);
512                self.next_block = Some(block_idx + 1);
513                continue;
514            }
515
516            if self.should_force_flush(block_idx) {
517                self.force_flush_block(block_idx, &mut out);
518                continue;
519            }
520
521            break;
522        }
523        out
524    }
525
526    fn emit_contiguous_primary(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
527        let Some(block) = self.blocks.get_mut(&block_idx) else {
528            return;
529        };
530        while block.next_fragment < self.fec_k {
531            let fragment_idx = block.next_fragment;
532            let Some(fragment) = block.fragments[fragment_idx].as_deref() else {
533                break;
534            };
535            let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
536            match parse_plain_packet(fragment) {
537                Ok(Some(payload)) => out.push(WfbOutput {
538                    packet_seq,
539                    payload: payload.to_vec(),
540                }),
541                Ok(None) => {}
542                Err(_) => {
543                    self.bad_packets += 1;
544                }
545            }
546            block.next_fragment += 1;
547        }
548    }
549
550    fn should_force_flush(&self, block_idx: u64) -> bool {
551        if self.blocks.len() > 40 {
552            return true;
553        }
554        self.blocks
555            .range((block_idx + 1)..)
556            .any(|(_, block)| block.received >= self.fec_k)
557    }
558
559    fn force_flush_block(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
560        if let Some(block) = self.blocks.remove(&block_idx) {
561            for fragment_idx in block.next_fragment..self.fec_k {
562                let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
563                match block.fragments[fragment_idx].as_deref() {
564                    Some(fragment) => match parse_plain_packet(fragment) {
565                        Ok(Some(payload)) => out.push(WfbOutput {
566                            packet_seq,
567                            payload: payload.to_vec(),
568                        }),
569                        Ok(None) => {}
570                        Err(_) => {
571                            self.bad_packets += 1;
572                        }
573                    },
574                    None => {
575                        self.lost_packets += 1;
576                    }
577                }
578            }
579            self.next_block = Some(block_idx + 1);
580        }
581    }
582}
583
584pub fn parse_plain_packet(fragment: &[u8]) -> Result<Option<&[u8]>, WfbError> {
585    if fragment.len() < WPACKET_HDR_LEN {
586        return Err(WfbError::InvalidPlainPacket);
587    }
588    let flags = fragment[0];
589    let packet_size = u16::from_be_bytes([fragment[1], fragment[2]]) as usize;
590    if packet_size > MAX_PAYLOAD_SIZE || WPACKET_HDR_LEN + packet_size > fragment.len() {
591        return Err(WfbError::PayloadTooLarge);
592    }
593    if flags & WFB_PACKET_FEC_ONLY != 0 {
594        return Ok(None);
595    }
596    Ok(Some(
597        &fragment[WPACKET_HDR_LEN..WPACKET_HDR_LEN + packet_size],
598    ))
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use crate::crypto::encrypt_chacha20poly1305_legacy;
605    use crypto_box::aead::Aead;
606
607    fn plain(payload: &[u8]) -> Vec<u8> {
608        let mut out = Vec::new();
609        out.push(0);
610        out.extend_from_slice(&(payload.len() as u16).to_be_bytes());
611        out.extend_from_slice(payload);
612        out
613    }
614
615    fn padded(fragment: &[u8]) -> Vec<u8> {
616        let mut out = vec![0; MAX_FEC_PAYLOAD];
617        out[..fragment.len()].copy_from_slice(fragment);
618        out
619    }
620
621    #[test]
622    fn parses_forwarder_data_packet() {
623        let mut packet = vec![WFB_PACKET_DATA];
624        packet.extend_from_slice(&0x0102_0304_0506_0708u64.to_be_bytes());
625        packet.extend_from_slice(&[9, 10, 11]);
626
627        let parsed = parse_forwarder_packet(&packet).unwrap();
628        match parsed {
629            WfbPacket::Data {
630                data_nonce,
631                encrypted_payload,
632                associated_data,
633            } => {
634                assert_eq!(data_nonce, 0x0102_0304_0506_0708);
635                assert_eq!(encrypted_payload, &[9, 10, 11]);
636                assert_eq!(associated_data.len(), WBLOCK_HDR_LEN);
637            }
638            WfbPacket::SessionKey { .. } => panic!("expected data"),
639        }
640    }
641
642    #[test]
643    fn emits_primary_fragments_in_order() {
644        let mut assembler = PlainAssembler::new(2, 4).unwrap();
645        let first = assembler
646            .push_decrypted_fragment(0, &plain(b"first"))
647            .unwrap();
648        assert_eq!(first.len(), 1);
649        assert_eq!(first[0].payload, b"first");
650        let out = assembler
651            .push_decrypted_fragment(1, &plain(b"second"))
652            .unwrap();
653        assert_eq!(out.len(), 1);
654        assert_eq!(out[0].payload, b"second");
655    }
656
657    #[test]
658    fn recovers_missing_primary_fragment_from_fec() {
659        let fec = FecCode::new(3, 5).unwrap();
660        let primary = vec![
661            padded(&plain(b"first")),
662            padded(&plain(b"second")),
663            padded(&plain(b"third")),
664        ];
665        let parity = fec.encode(&primary, MAX_FEC_PAYLOAD).unwrap();
666
667        let mut assembler = PlainAssembler::new(3, 5).unwrap();
668        let first = assembler.push_decrypted_fragment(0, &primary[0]).unwrap();
669        assert_eq!(first[0].payload, b"first");
670        assert!(assembler
671            .push_decrypted_fragment(2, &primary[2])
672            .unwrap()
673            .is_empty());
674        let recovered = assembler.push_decrypted_fragment(3, &parity[0]).unwrap();
675        assert_eq!(recovered.len(), 2);
676        assert_eq!(recovered[0].payload, b"second");
677        assert_eq!(recovered[1].payload, b"third");
678        assert_eq!(assembler.recovered_packets, 1);
679    }
680
681    #[test]
682    fn skips_fec_only_plain_packets() {
683        let mut fragment = vec![WFB_PACKET_FEC_ONLY];
684        fragment.extend_from_slice(&4u16.to_be_bytes());
685        fragment.extend_from_slice(b"skip");
686        assert!(parse_plain_packet(&fragment).unwrap().is_none());
687    }
688
689    #[test]
690    fn receiver_decrypts_session_and_data_packet() {
691        let rx_secret = SecretKey::from([1; CRYPTO_BOX_SECRETKEY_LEN]);
692        let tx_secret = SecretKey::from([2; CRYPTO_BOX_SECRETKEY_LEN]);
693        let keypair = WfbKeypair {
694            rx_secretkey: rx_secret.to_bytes(),
695            tx_publickey: *tx_secret.public_key().as_bytes(),
696        };
697        let channel_id = ChannelId::default_video();
698        let session_key = [7; CHACHA20_POLY1305_KEY_LEN];
699
700        let mut session_plain = Vec::new();
701        session_plain.extend_from_slice(&1u64.to_be_bytes());
702        session_plain.extend_from_slice(&channel_id.raw().to_be_bytes());
703        session_plain.push(WFB_FEC_VDM_RS);
704        session_plain.push(1);
705        session_plain.push(1);
706        session_plain.extend_from_slice(&session_key);
707        assert_eq!(session_plain.len(), WSESSION_DATA_LEN);
708
709        let session_nonce = [3; CRYPTO_BOX_NONCE_LEN];
710        let tx_box = SalsaBox::new(&rx_secret.public_key(), &tx_secret);
711        let encrypted_session = tx_box
712            .encrypt(
713                BoxNonce::from_slice(&session_nonce),
714                session_plain.as_slice(),
715            )
716            .unwrap();
717        let mut session_packet = vec![WFB_PACKET_KEY];
718        session_packet.extend_from_slice(&session_nonce);
719        session_packet.extend_from_slice(&encrypted_session);
720
721        let mut receiver = WfbReceiver::new(channel_id, keypair, 0);
722        let session_events = receiver.push_forwarder_packet(&session_packet).unwrap();
723        assert!(matches!(session_events.as_slice(), [WfbEvent::Session(_)]));
724
725        let data_nonce = 0u64;
726        let mut block_header = vec![WFB_PACKET_DATA];
727        block_header.extend_from_slice(&data_nonce.to_be_bytes());
728        let encrypted_data = encrypt_chacha20poly1305_legacy(
729            &session_key,
730            &block_header[1..WBLOCK_HDR_LEN],
731            &block_header,
732            &plain(b"rtp payload"),
733        )
734        .unwrap();
735        let mut data_packet = block_header;
736        data_packet.extend_from_slice(&encrypted_data);
737
738        let payload_events = receiver.push_forwarder_packet(&data_packet).unwrap();
739        match payload_events.as_slice() {
740            [WfbEvent::Payload(payload)] => assert_eq!(payload.payload, b"rtp payload"),
741            other => panic!("unexpected events: {other:?}"),
742        }
743    }
744}