Skip to main content

openipc_core/
wfb.rs

1use std::collections::BTreeMap;
2
3use crypto_box::aead::Aead;
4use crypto_box::{Nonce as BoxNonce, PublicKey, SalsaBox, SecretKey};
5
6use crate::channel::ChannelId;
7use crate::crypto::decrypt_chacha20poly1305_legacy_into;
8use crate::fec::FecCode;
9
10/// WFB WiFi MTU used by OpenIPC forwarder packets.
11pub const WIFI_MTU: usize = 4045;
12/// 802.11 header length subtracted from WFB packet capacity.
13pub const IEEE80211_HEADER_LEN: usize = 24;
14/// crypto_box secret key length.
15pub const CRYPTO_BOX_SECRETKEY_LEN: usize = 32;
16/// crypto_box public key length.
17pub const CRYPTO_BOX_PUBLICKEY_LEN: usize = 32;
18/// crypto_box nonce length.
19pub const CRYPTO_BOX_NONCE_LEN: usize = 24;
20/// crypto_box authentication tag length.
21pub const CRYPTO_BOX_TAG_LEN: usize = 16;
22/// WFB session packet header length.
23pub const WSESSION_HDR_LEN: usize = 1 + CRYPTO_BOX_NONCE_LEN;
24/// Plain WFB session body length before crypto_box encryption.
25pub const WSESSION_DATA_LEN: usize = 8 + 4 + 1 + 1 + 1 + CHACHA20_POLY1305_KEY_LEN;
26/// WFB data-block header length.
27pub const WBLOCK_HDR_LEN: usize = 9;
28/// Plain WFB payload-fragment header length.
29pub const WPACKET_HDR_LEN: usize = 3;
30/// WFB session ChaCha20-Poly1305 key length.
31pub const CHACHA20_POLY1305_KEY_LEN: usize = 32;
32/// WFB session ChaCha20-Poly1305 authentication tag length.
33pub const CHACHA20_POLY1305_TAG_LEN: usize = 16;
34/// Maximum encrypted FEC fragment payload carried by one WFB data packet.
35pub const MAX_FEC_PAYLOAD: usize =
36    WIFI_MTU - IEEE80211_HEADER_LEN - WBLOCK_HDR_LEN - CHACHA20_POLY1305_TAG_LEN;
37/// Maximum application payload before WFB fragment headers are added.
38pub const MAX_PAYLOAD_SIZE: usize = MAX_FEC_PAYLOAD - WPACKET_HDR_LEN;
39/// Maximum WFB forwarder packet payload after the 802.11 header.
40pub const MAX_FORWARDER_PACKET_SIZE: usize = WIFI_MTU - IEEE80211_HEADER_LEN;
41/// Largest WFB block index before a transmitter must rotate session keys.
42pub const MAX_BLOCK_IDX: u64 = (1u64 << 55) - 1;
43
44/// WFB packet type for encrypted data fragments.
45pub const WFB_PACKET_DATA: u8 = 0x01;
46/// WFB packet type for encrypted session-key packets.
47pub const WFB_PACKET_KEY: u8 = 0x02;
48/// FEC type used by WFB's Vandermonde Reed-Solomon blocks.
49pub const WFB_FEC_VDM_RS: u8 = 0x01;
50/// Flag marking a WFB packet as parity-only FEC data.
51pub const WFB_PACKET_FEC_ONLY: u8 = 0x01;
52
53/// Error returned while parsing, decrypting, or assembling WFB packets.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum WfbError {
56    /// Packet buffer is empty.
57    Empty,
58    /// Packet exceeds WFB forwarder size.
59    TooLong,
60    /// Data packet is too short.
61    ShortDataPacket,
62    /// Session packet is too short.
63    ShortSessionPacket,
64    /// WFB keypair is not the expected 64-byte file shape.
65    InvalidKeypair,
66    /// Session-key encryption failed.
67    SessionEncryptFailed,
68    /// Session-key decryption failed.
69    SessionDecryptFailed,
70    /// Data encryption failed.
71    DataEncryptFailed,
72    /// Data decryption failed.
73    DataDecryptFailed,
74    /// Session epoch was older than the configured minimum.
75    SessionEpochTooOld {
76        /// Epoch from the received session packet.
77        session_epoch: u64,
78        /// Minimum epoch accepted by the receiver.
79        minimum_epoch: u64,
80    },
81    /// Session packet was for a different WFB channel.
82    SessionChannelMismatch {
83        /// Expected channel id.
84        expected: u32,
85        /// Actual channel id in the session packet.
86        actual: u32,
87    },
88    /// FEC type is not the supported VDM Reed-Solomon mode.
89    UnsupportedFecType(u8),
90    /// Forwarder packet type is unknown.
91    UnknownPacketType(u8),
92    /// FEC parameters are invalid.
93    InvalidFecParameters,
94    /// Fragment index is outside the current FEC block.
95    InvalidFragmentIndex,
96    /// Data nonce encoded a block index beyond the supported range.
97    BlockIndexOverflow,
98    /// Decrypted plain packet is malformed.
99    InvalidPlainPacket,
100    /// Plain payload exceeds the WFB maximum.
101    PayloadTooLarge,
102    /// Encrypted data packet arrived before a session key.
103    MissingSession,
104    /// FEC recovery failed.
105    FecRecoveryFailed,
106}
107
108impl std::fmt::Display for WfbError {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Self::Empty => write!(f, "empty WFB packet"),
112            Self::TooLong => write!(f, "WFB packet exceeds maximum forwarder size"),
113            Self::ShortDataPacket => write!(f, "short WFB data packet"),
114            Self::ShortSessionPacket => write!(f, "invalid WFB session packet size"),
115            Self::InvalidKeypair => write!(f, "WFB keypair must be 64 bytes"),
116            Self::SessionEncryptFailed => write!(f, "unable to encrypt WFB session key"),
117            Self::SessionDecryptFailed => write!(f, "unable to decrypt WFB session key"),
118            Self::DataEncryptFailed => write!(f, "unable to encrypt WFB data packet"),
119            Self::DataDecryptFailed => write!(f, "unable to decrypt WFB data packet"),
120            Self::SessionEpochTooOld {
121                session_epoch,
122                minimum_epoch,
123            } => write!(
124                f,
125                "WFB session epoch {session_epoch} is older than minimum {minimum_epoch}"
126            ),
127            Self::SessionChannelMismatch { expected, actual } => write!(
128                f,
129                "WFB session channel mismatch: expected 0x{expected:08x}, got 0x{actual:08x}"
130            ),
131            Self::UnsupportedFecType(fec_type) => {
132                write!(f, "unsupported WFB FEC type {fec_type}")
133            }
134            Self::UnknownPacketType(packet_type) => {
135                write!(f, "unknown WFB packet type 0x{packet_type:02x}")
136            }
137            Self::InvalidFecParameters => write!(f, "invalid WFB FEC parameters"),
138            Self::InvalidFragmentIndex => write!(f, "invalid WFB fragment index"),
139            Self::BlockIndexOverflow => write!(f, "WFB block index overflow"),
140            Self::InvalidPlainPacket => write!(f, "invalid decrypted WFB packet"),
141            Self::PayloadTooLarge => write!(f, "decrypted WFB payload is too large"),
142            Self::MissingSession => write!(f, "WFB data packet arrived before session key"),
143            Self::FecRecoveryFailed => write!(f, "WFB FEC recovery failed"),
144        }
145    }
146}
147
148impl std::error::Error for WfbError {}
149
150/// Borrowed WFB forwarder packet.
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub enum WfbPacket<'a> {
153    /// Encrypted WFB data/FEC fragment packet.
154    Data {
155        /// Data nonce; high bits are block index and low byte is fragment index.
156        data_nonce: u64,
157        /// Encrypted fragment payload plus authentication tag.
158        encrypted_payload: &'a [u8],
159        /// Associated data used for WFB data authentication.
160        associated_data: &'a [u8],
161    },
162    /// Encrypted WFB session-key packet.
163    SessionKey {
164        /// crypto_box session nonce.
165        session_nonce: &'a [u8],
166        /// Encrypted session data.
167        encrypted_session: &'a [u8],
168    },
169}
170
171/// Ground-station WFB keypair file contents.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct WfbKeypair {
174    /// Ground-station receive secret key.
175    pub rx_secretkey: [u8; CRYPTO_BOX_SECRETKEY_LEN],
176    /// Air-unit transmit public key.
177    pub tx_publickey: [u8; CRYPTO_BOX_PUBLICKEY_LEN],
178}
179
180impl WfbKeypair {
181    /// Parse the 64-byte `gs.key` style keypair.
182    pub fn from_bytes(bytes: &[u8]) -> Result<Self, WfbError> {
183        if bytes.len() != CRYPTO_BOX_SECRETKEY_LEN + CRYPTO_BOX_PUBLICKEY_LEN {
184            return Err(WfbError::InvalidKeypair);
185        }
186        let mut rx_secretkey = [0; CRYPTO_BOX_SECRETKEY_LEN];
187        let mut tx_publickey = [0; CRYPTO_BOX_PUBLICKEY_LEN];
188        rx_secretkey.copy_from_slice(&bytes[..CRYPTO_BOX_SECRETKEY_LEN]);
189        tx_publickey.copy_from_slice(&bytes[CRYPTO_BOX_SECRETKEY_LEN..]);
190        Ok(Self {
191            rx_secretkey,
192            tx_publickey,
193        })
194    }
195}
196
197/// Cumulative FEC counters for a WFB receiver or assembler.
198#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
199pub struct FecCounters {
200    /// Total data fragments observed.
201    pub total_packets: u64,
202    /// Primary fragments recovered by FEC.
203    pub recovered_packets: u64,
204    /// Primary fragments considered lost.
205    pub lost_packets: u64,
206    /// Malformed or unrecoverable fragments.
207    pub bad_packets: u64,
208}
209
210/// Decrypted WFB session parameters.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct WfbSession {
213    /// Session epoch.
214    pub epoch: u64,
215    /// Channel id this session applies to.
216    pub channel_id: ChannelId,
217    /// WFB FEC type.
218    pub fec_type: u8,
219    /// Primary fragment count.
220    pub fec_k: usize,
221    /// Total primary plus parity fragment count.
222    pub fec_n: usize,
223    /// Symmetric key used for WFB data packets.
224    pub session_key: [u8; CHACHA20_POLY1305_KEY_LEN],
225}
226
227impl WfbSession {
228    fn parse(
229        plaintext: &[u8],
230        expected_channel_id: ChannelId,
231        minimum_epoch: u64,
232    ) -> Result<Self, WfbError> {
233        if plaintext.len() < WSESSION_DATA_LEN {
234            return Err(WfbError::SessionDecryptFailed);
235        }
236        let epoch = u64::from_be_bytes(plaintext[0..8].try_into().expect("checked length"));
237        if epoch < minimum_epoch {
238            return Err(WfbError::SessionEpochTooOld {
239                session_epoch: epoch,
240                minimum_epoch,
241            });
242        }
243
244        let raw_channel = u32::from_be_bytes(plaintext[8..12].try_into().expect("checked length"));
245        let channel_id = ChannelId::new(raw_channel);
246        if channel_id != expected_channel_id {
247            return Err(WfbError::SessionChannelMismatch {
248                expected: expected_channel_id.raw(),
249                actual: raw_channel,
250            });
251        }
252
253        let fec_type = plaintext[12];
254        if fec_type != WFB_FEC_VDM_RS {
255            return Err(WfbError::UnsupportedFecType(fec_type));
256        }
257        let fec_k = plaintext[13] as usize;
258        let fec_n = plaintext[14] as usize;
259        if fec_k == 0 || fec_n == 0 || fec_k > fec_n || fec_n >= 256 {
260            return Err(WfbError::InvalidFecParameters);
261        }
262
263        let mut session_key = [0; CHACHA20_POLY1305_KEY_LEN];
264        session_key.copy_from_slice(&plaintext[15..47]);
265        Ok(Self {
266            epoch,
267            channel_id,
268            fec_type,
269            fec_k,
270            fec_n,
271            session_key,
272        })
273    }
274}
275
276/// Event emitted by an encrypted WFB receiver.
277#[derive(Debug, Clone, PartialEq, Eq)]
278pub enum WfbEvent {
279    /// A session key was accepted.
280    Session(WfbSession),
281    /// One recovered payload was emitted.
282    Payload(WfbOutput),
283}
284
285/// Encrypted WFB receiver for one channel.
286#[derive(Debug, Clone)]
287pub struct WfbReceiver {
288    channel_id: ChannelId,
289    minimum_epoch: u64,
290    keypair: WfbKeypair,
291    session: Option<WfbSession>,
292    assembler: Option<PlainAssembler>,
293    decrypt_scratch: Vec<u8>,
294    incoming_packets: u64,
295    session_packets: u64,
296    data_packets: u64,
297}
298
299impl WfbReceiver {
300    /// Create a receiver for one channel and keypair.
301    pub fn new(channel_id: ChannelId, keypair: WfbKeypair, minimum_epoch: u64) -> Self {
302        Self {
303            channel_id,
304            minimum_epoch,
305            keypair,
306            session: None,
307            assembler: None,
308            decrypt_scratch: Vec::with_capacity(MAX_FEC_PAYLOAD),
309            incoming_packets: 0,
310            session_packets: 0,
311            data_packets: 0,
312        }
313    }
314
315    /// Return the currently accepted WFB session, if any.
316    pub fn session(&self) -> Option<&WfbSession> {
317        self.session.as_ref()
318    }
319
320    /// Return cumulative receive/FEC counters.
321    pub fn counters(&self) -> FecCounters {
322        let assembler = self
323            .assembler
324            .as_ref()
325            .map(PlainAssembler::counters)
326            .unwrap_or_default();
327        FecCounters {
328            total_packets: self.incoming_packets,
329            recovered_packets: assembler.recovered_packets,
330            lost_packets: assembler.lost_packets,
331            bad_packets: assembler.bad_packets,
332        }
333    }
334
335    /// Push one WFB forwarder packet payload.
336    pub fn push_forwarder_packet(&mut self, buf: &[u8]) -> Result<Vec<WfbEvent>, WfbError> {
337        log::trace!(target: "openipc_core::wfb", "received WFB forwarder packet bytes={}", buf.len());
338        match parse_forwarder_packet(buf)? {
339            WfbPacket::SessionKey {
340                session_nonce,
341                encrypted_session,
342            } => {
343                self.incoming_packets += 1;
344                self.session_packets += 1;
345                let session = self.decrypt_session(session_nonce, encrypted_session)?;
346                let changed = self
347                    .session
348                    .as_ref()
349                    .map(|current| current.session_key != session.session_key)
350                    .unwrap_or(true);
351                if changed {
352                    log::info!(
353                        target: "openipc_core::wfb",
354                        "accepted WFB session epoch={} channel=0x{:08x} fec={}/{}",
355                        session.epoch,
356                        session.channel_id.raw(),
357                        session.fec_k,
358                        session.fec_n
359                    );
360                    self.assembler = Some(PlainAssembler::new(session.fec_k, session.fec_n)?);
361                    self.session = Some(session.clone());
362                    Ok(vec![WfbEvent::Session(session)])
363                } else {
364                    Ok(Vec::new())
365                }
366            }
367            WfbPacket::Data {
368                data_nonce,
369                encrypted_payload,
370                associated_data,
371            } => {
372                self.incoming_packets += 1;
373                self.data_packets += 1;
374                let session = self.session.as_ref().ok_or(WfbError::MissingSession)?;
375                let nonce = &associated_data[1..WBLOCK_HDR_LEN];
376                decrypt_chacha20poly1305_legacy_into(
377                    &session.session_key,
378                    nonce,
379                    associated_data,
380                    encrypted_payload,
381                    &mut self.decrypt_scratch,
382                )
383                .map_err(|_| WfbError::DataDecryptFailed)?;
384                let assembler = self.assembler.as_mut().ok_or(WfbError::MissingSession)?;
385                let payloads: Vec<_> = assembler
386                    .push_decrypted_fragment(data_nonce, &self.decrypt_scratch)?
387                    .into_iter()
388                    .map(WfbEvent::Payload)
389                    .collect();
390                log::trace!(
391                    target: "openipc_core::wfb",
392                    "processed encrypted WFB data fragment nonce={} payloads={}",
393                    data_nonce,
394                    payloads.len()
395                );
396                Ok(payloads)
397            }
398        }
399    }
400
401    fn decrypt_session(
402        &self,
403        session_nonce: &[u8],
404        encrypted_session: &[u8],
405    ) -> Result<WfbSession, WfbError> {
406        let nonce: [u8; CRYPTO_BOX_NONCE_LEN] = session_nonce
407            .try_into()
408            .map_err(|_| WfbError::ShortSessionPacket)?;
409        let rx_secret = SecretKey::from(self.keypair.rx_secretkey);
410        let tx_public = PublicKey::from(self.keypair.tx_publickey);
411        let cipher = SalsaBox::new(&tx_public, &rx_secret);
412        let plaintext = cipher
413            .decrypt(BoxNonce::from_slice(&nonce), encrypted_session)
414            .map_err(|_| WfbError::SessionDecryptFailed)?;
415        let minimum_epoch = self
416            .session
417            .as_ref()
418            .map(|session| session.epoch.max(self.minimum_epoch))
419            .unwrap_or(self.minimum_epoch);
420        WfbSession::parse(&plaintext, self.channel_id, minimum_epoch)
421    }
422}
423
424/// Parse a WFB forwarder packet as data or session-key payload.
425pub fn parse_forwarder_packet(buf: &[u8]) -> Result<WfbPacket<'_>, WfbError> {
426    if buf.is_empty() {
427        return Err(WfbError::Empty);
428    }
429    if buf.len() > MAX_FORWARDER_PACKET_SIZE {
430        return Err(WfbError::TooLong);
431    }
432
433    match buf[0] {
434        WFB_PACKET_DATA => {
435            if buf.len() < WBLOCK_HDR_LEN + WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN {
436                return Err(WfbError::ShortDataPacket);
437            }
438            let mut nonce = [0; 8];
439            nonce.copy_from_slice(&buf[1..9]);
440            Ok(WfbPacket::Data {
441                data_nonce: u64::from_be_bytes(nonce),
442                encrypted_payload: &buf[WBLOCK_HDR_LEN..],
443                associated_data: &buf[..WBLOCK_HDR_LEN],
444            })
445        }
446        WFB_PACKET_KEY => {
447            if buf.len() < WSESSION_HDR_LEN + WSESSION_DATA_LEN + CRYPTO_BOX_TAG_LEN {
448                return Err(WfbError::ShortSessionPacket);
449            }
450            Ok(WfbPacket::SessionKey {
451                session_nonce: &buf[1..WSESSION_HDR_LEN],
452                encrypted_session: &buf[WSESSION_HDR_LEN..],
453            })
454        }
455        other => Err(WfbError::UnknownPacketType(other)),
456    }
457}
458
459/// Recovered WFB application payload.
460#[derive(Debug, Clone, PartialEq, Eq)]
461pub struct WfbOutput {
462    /// Recovered packet sequence number.
463    pub packet_seq: u64,
464    /// Raw application payload bytes.
465    pub payload: Vec<u8>,
466}
467
468#[derive(Debug, Clone)]
469struct Block {
470    fragments: Vec<Option<Vec<u8>>>,
471    received: usize,
472    next_fragment: usize,
473}
474
475impl Block {
476    fn new(n: usize) -> Self {
477        Self {
478            fragments: vec![None; n],
479            received: 0,
480            next_fragment: 0,
481        }
482    }
483}
484
485/// Plain WFB FEC assembler.
486///
487/// This is used after data decryption, or directly for tests/pre-decrypted
488/// captures. It accepts primary and parity fragments and emits recovered
489/// application payloads in order.
490#[derive(Debug, Clone)]
491pub struct PlainAssembler {
492    fec_k: usize,
493    fec_n: usize,
494    fec: FecCode,
495    blocks: BTreeMap<u64, Block>,
496    next_block: Option<u64>,
497    /// Total fragments observed.
498    pub total_packets: u64,
499    /// Primary fragments considered lost.
500    pub lost_packets: u64,
501    /// Primary fragments recovered by FEC.
502    pub recovered_packets: u64,
503    /// Malformed or unrecoverable fragments.
504    pub bad_packets: u64,
505}
506
507impl PlainAssembler {
508    /// Create a plain assembler for `fec_k` primary and `fec_n` total fragments.
509    pub fn new(fec_k: usize, fec_n: usize) -> Result<Self, WfbError> {
510        if fec_k == 0 || fec_n == 0 || fec_k > fec_n || fec_n > 255 {
511            return Err(WfbError::InvalidFecParameters);
512        }
513        let fec = FecCode::new(fec_k, fec_n).map_err(|_| WfbError::InvalidFecParameters)?;
514        Ok(Self {
515            fec_k,
516            fec_n,
517            fec,
518            blocks: BTreeMap::new(),
519            next_block: None,
520            total_packets: 0,
521            lost_packets: 0,
522            recovered_packets: 0,
523            bad_packets: 0,
524        })
525    }
526
527    /// Return the primary fragment count.
528    pub const fn fec_k(&self) -> usize {
529        self.fec_k
530    }
531
532    /// Return the total primary plus parity fragment count.
533    pub const fn fec_n(&self) -> usize {
534        self.fec_n
535    }
536
537    /// Reset assembler state and FEC parameters.
538    pub fn reset_fec(&mut self, fec_k: usize, fec_n: usize) -> Result<(), WfbError> {
539        *self = Self::new(fec_k, fec_n)?;
540        Ok(())
541    }
542
543    /// Return cumulative FEC counters.
544    pub fn counters(&self) -> FecCounters {
545        FecCounters {
546            total_packets: self.total_packets,
547            recovered_packets: self.recovered_packets,
548            lost_packets: self.lost_packets,
549            bad_packets: self.bad_packets,
550        }
551    }
552
553    /// Push a decrypted WFB FEC fragment.
554    pub fn push_decrypted_fragment(
555        &mut self,
556        data_nonce: u64,
557        fragment: &[u8],
558    ) -> Result<Vec<WfbOutput>, WfbError> {
559        let block_idx = data_nonce >> 8;
560        let fragment_idx = (data_nonce & 0xff) as usize;
561
562        if block_idx > MAX_BLOCK_IDX {
563            return Err(WfbError::BlockIndexOverflow);
564        }
565        if fragment_idx >= self.fec_n {
566            return Err(WfbError::InvalidFragmentIndex);
567        }
568        self.total_packets += 1;
569
570        if self.next_block.is_none() {
571            self.next_block = Some(block_idx);
572        }
573        if self
574            .next_block
575            .map(|next_block| block_idx < next_block)
576            .unwrap_or(false)
577        {
578            return Ok(Vec::new());
579        }
580
581        let block = self
582            .blocks
583            .entry(block_idx)
584            .or_insert_with(|| Block::new(self.fec_n));
585        if block.fragments[fragment_idx].is_none() {
586            let mut padded = vec![0; MAX_FEC_PAYLOAD];
587            let len = fragment.len().min(MAX_FEC_PAYLOAD);
588            padded[..len].copy_from_slice(&fragment[..len]);
589            block.fragments[fragment_idx] = Some(padded);
590            block.received += 1;
591        }
592
593        Ok(self.drain_ready_blocks())
594    }
595
596    fn drain_ready_blocks(&mut self) -> Vec<WfbOutput> {
597        let mut out = Vec::new();
598        while let Some(block_idx) = self.next_block {
599            if !self.blocks.contains_key(&block_idx) {
600                if self.should_skip_missing_block(block_idx) {
601                    self.lost_packets += self.fec_k as u64;
602                    self.next_block = Some(block_idx + 1);
603                    continue;
604                }
605                break;
606            }
607
608            self.emit_contiguous_primary(block_idx, &mut out);
609            let complete = self
610                .blocks
611                .get(&block_idx)
612                .map(|block| block.next_fragment == self.fec_k)
613                .unwrap_or(false);
614            if complete {
615                self.blocks.remove(&block_idx);
616                self.next_block = Some(block_idx + 1);
617                continue;
618            }
619
620            let can_recover = self
621                .blocks
622                .get(&block_idx)
623                .map(|block| block.received >= self.fec_k)
624                .unwrap_or(false);
625            if can_recover {
626                if let Some(block) = self.blocks.get_mut(&block_idx) {
627                    match self
628                        .fec
629                        .recover_primary(&mut block.fragments, MAX_FEC_PAYLOAD)
630                    {
631                        Ok(recovered) => {
632                            if recovered > 0 {
633                                log::debug!(
634                                    target: "openipc_core::fec",
635                                    "recovered missing primary WFB fragments block={} recovered={}",
636                                    block_idx,
637                                    recovered
638                                );
639                            }
640                            self.recovered_packets += recovered as u64;
641                        }
642                        Err(error) => {
643                            log::warn!(
644                                target: "openipc_core::fec",
645                                "FEC recovery failed block={block_idx}: {error}"
646                            );
647                            self.bad_packets += 1;
648                            self.force_flush_block(block_idx, &mut out);
649                            continue;
650                        }
651                    }
652                }
653                self.emit_contiguous_primary(block_idx, &mut out);
654                self.blocks.remove(&block_idx);
655                self.next_block = Some(block_idx + 1);
656                continue;
657            }
658
659            if self.should_force_flush(block_idx) {
660                self.force_flush_block(block_idx, &mut out);
661                continue;
662            }
663
664            break;
665        }
666        out
667    }
668
669    fn should_skip_missing_block(&self, block_idx: u64) -> bool {
670        let Some((&next_present_block, block)) = self.blocks.range((block_idx + 1)..).next() else {
671            return false;
672        };
673
674        block.received >= self.fec_k
675            || self.blocks.len() > 40
676            || next_present_block.saturating_sub(block_idx) >= 40
677    }
678
679    fn emit_contiguous_primary(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
680        let Some(block) = self.blocks.get_mut(&block_idx) else {
681            return;
682        };
683        while block.next_fragment < self.fec_k {
684            let fragment_idx = block.next_fragment;
685            let Some(fragment) = block.fragments[fragment_idx].as_deref() else {
686                break;
687            };
688            let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
689            match parse_plain_packet(fragment) {
690                Ok(Some(payload)) => out.push(WfbOutput {
691                    packet_seq,
692                    payload: payload.to_vec(),
693                }),
694                Ok(None) => {}
695                Err(_) => {
696                    self.bad_packets += 1;
697                }
698            }
699            block.next_fragment += 1;
700        }
701    }
702
703    fn should_force_flush(&self, block_idx: u64) -> bool {
704        if self.blocks.len() > 40 {
705            return true;
706        }
707        self.blocks
708            .range((block_idx + 1)..)
709            .any(|(_, block)| block.received >= self.fec_k)
710    }
711
712    fn force_flush_block(&mut self, block_idx: u64, out: &mut Vec<WfbOutput>) {
713        if let Some(block) = self.blocks.remove(&block_idx) {
714            for fragment_idx in block.next_fragment..self.fec_k {
715                let packet_seq = block_idx * self.fec_k as u64 + fragment_idx as u64;
716                match block.fragments[fragment_idx].as_deref() {
717                    Some(fragment) => match parse_plain_packet(fragment) {
718                        Ok(Some(payload)) => out.push(WfbOutput {
719                            packet_seq,
720                            payload: payload.to_vec(),
721                        }),
722                        Ok(None) => {}
723                        Err(_) => {
724                            self.bad_packets += 1;
725                        }
726                    },
727                    None => {
728                        self.lost_packets += 1;
729                    }
730                }
731            }
732            self.next_block = Some(block_idx + 1);
733        }
734    }
735}
736
737/// Parse a decrypted WFB plain packet and return payload bytes when present.
738pub fn parse_plain_packet(fragment: &[u8]) -> Result<Option<&[u8]>, WfbError> {
739    if fragment.len() < WPACKET_HDR_LEN {
740        return Err(WfbError::InvalidPlainPacket);
741    }
742    let flags = fragment[0];
743    let packet_size = u16::from_be_bytes([fragment[1], fragment[2]]) as usize;
744    if packet_size > MAX_PAYLOAD_SIZE || WPACKET_HDR_LEN + packet_size > fragment.len() {
745        return Err(WfbError::PayloadTooLarge);
746    }
747    if flags & WFB_PACKET_FEC_ONLY != 0 {
748        return Ok(None);
749    }
750    Ok(Some(
751        &fragment[WPACKET_HDR_LEN..WPACKET_HDR_LEN + packet_size],
752    ))
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use crate::crypto::encrypt_chacha20poly1305_legacy;
759    use crypto_box::aead::Aead;
760
761    fn plain(payload: &[u8]) -> Vec<u8> {
762        let mut out = Vec::new();
763        out.push(0);
764        out.extend_from_slice(&(payload.len() as u16).to_be_bytes());
765        out.extend_from_slice(payload);
766        out
767    }
768
769    fn padded(fragment: &[u8]) -> Vec<u8> {
770        let mut out = vec![0; MAX_FEC_PAYLOAD];
771        out[..fragment.len()].copy_from_slice(fragment);
772        out
773    }
774
775    #[test]
776    fn parses_forwarder_data_packet() {
777        let mut packet = vec![WFB_PACKET_DATA];
778        packet.extend_from_slice(&0x0102_0304_0506_0708u64.to_be_bytes());
779        let encrypted = [9; WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN];
780        packet.extend_from_slice(&encrypted);
781
782        let parsed = parse_forwarder_packet(&packet).unwrap();
783        match parsed {
784            WfbPacket::Data {
785                data_nonce,
786                encrypted_payload,
787                associated_data,
788            } => {
789                assert_eq!(data_nonce, 0x0102_0304_0506_0708);
790                assert_eq!(encrypted_payload, encrypted);
791                assert_eq!(associated_data.len(), WBLOCK_HDR_LEN);
792            }
793            WfbPacket::SessionKey { .. } => panic!("expected data"),
794        }
795    }
796
797    #[test]
798    fn rejects_data_packets_without_encrypted_plain_header_and_tag() {
799        let mut packet = vec![WFB_PACKET_DATA];
800        packet.extend_from_slice(&0x0102_0304_0506_0708u64.to_be_bytes());
801        packet.extend_from_slice(&[0; WPACKET_HDR_LEN + CHACHA20_POLY1305_TAG_LEN - 1]);
802
803        assert_eq!(
804            parse_forwarder_packet(&packet),
805            Err(WfbError::ShortDataPacket)
806        );
807    }
808
809    #[test]
810    fn emits_primary_fragments_in_order() {
811        let mut assembler = PlainAssembler::new(2, 4).unwrap();
812        let first = assembler
813            .push_decrypted_fragment(0, &plain(b"first"))
814            .unwrap();
815        assert_eq!(first.len(), 1);
816        assert_eq!(first[0].payload, b"first");
817        let out = assembler
818            .push_decrypted_fragment(1, &plain(b"second"))
819            .unwrap();
820        assert_eq!(out.len(), 1);
821        assert_eq!(out[0].payload, b"second");
822    }
823
824    #[test]
825    fn recovers_missing_primary_fragment_from_fec() {
826        let fec = FecCode::new(3, 5).unwrap();
827        let primary = vec![
828            padded(&plain(b"first")),
829            padded(&plain(b"second")),
830            padded(&plain(b"third")),
831        ];
832        let parity = fec.encode(&primary, MAX_FEC_PAYLOAD).unwrap();
833
834        let mut assembler = PlainAssembler::new(3, 5).unwrap();
835        let first = assembler.push_decrypted_fragment(0, &primary[0]).unwrap();
836        assert_eq!(first[0].payload, b"first");
837        assert!(assembler
838            .push_decrypted_fragment(2, &primary[2])
839            .unwrap()
840            .is_empty());
841        let recovered = assembler.push_decrypted_fragment(3, &parity[0]).unwrap();
842        assert_eq!(recovered.len(), 2);
843        assert_eq!(recovered[0].payload, b"second");
844        assert_eq!(recovered[1].payload, b"third");
845        assert_eq!(assembler.recovered_packets, 1);
846    }
847
848    #[test]
849    fn skips_fully_missing_blocks_when_later_block_is_ready() {
850        let mut assembler = PlainAssembler::new(2, 2).unwrap();
851
852        let first = assembler
853            .push_decrypted_fragment(0, &plain(b"b0-f0"))
854            .unwrap();
855        assert_eq!(first[0].payload, b"b0-f0");
856
857        assert!(assembler
858            .push_decrypted_fragment(2 << 8, &plain(b"b2-f0"))
859            .unwrap()
860            .is_empty());
861        let out = assembler
862            .push_decrypted_fragment((2 << 8) | 1, &plain(b"b2-f1"))
863            .unwrap();
864
865        assert_eq!(out.len(), 2);
866        assert_eq!(out[0].payload, b"b2-f0");
867        assert_eq!(out[1].payload, b"b2-f1");
868        assert_eq!(assembler.lost_packets, 3);
869    }
870
871    #[test]
872    fn ignores_late_fragments_from_already_flushed_blocks() {
873        let mut assembler = PlainAssembler::new(2, 2).unwrap();
874
875        assembler
876            .push_decrypted_fragment(0, &plain(b"b0-f0"))
877            .unwrap();
878        assembler
879            .push_decrypted_fragment(2 << 8, &plain(b"b2-f0"))
880            .unwrap();
881        assembler
882            .push_decrypted_fragment((2 << 8) | 1, &plain(b"b2-f1"))
883            .unwrap();
884
885        let late = assembler
886            .push_decrypted_fragment(1 << 8, &plain(b"late-b1-f0"))
887            .unwrap();
888        assert!(late.is_empty());
889    }
890
891    #[test]
892    fn skips_fec_only_plain_packets() {
893        let mut fragment = vec![WFB_PACKET_FEC_ONLY];
894        fragment.extend_from_slice(&4u16.to_be_bytes());
895        fragment.extend_from_slice(b"skip");
896        assert!(parse_plain_packet(&fragment).unwrap().is_none());
897    }
898
899    #[test]
900    fn receiver_decrypts_session_and_data_packet() {
901        let rx_secret = SecretKey::from([1; CRYPTO_BOX_SECRETKEY_LEN]);
902        let tx_secret = SecretKey::from([2; CRYPTO_BOX_SECRETKEY_LEN]);
903        let keypair = WfbKeypair {
904            rx_secretkey: rx_secret.to_bytes(),
905            tx_publickey: *tx_secret.public_key().as_bytes(),
906        };
907        let channel_id = ChannelId::default_video();
908        let session_key = [7; CHACHA20_POLY1305_KEY_LEN];
909
910        let mut session_plain = Vec::new();
911        session_plain.extend_from_slice(&1u64.to_be_bytes());
912        session_plain.extend_from_slice(&channel_id.raw().to_be_bytes());
913        session_plain.push(WFB_FEC_VDM_RS);
914        session_plain.push(1);
915        session_plain.push(1);
916        session_plain.extend_from_slice(&session_key);
917        assert_eq!(session_plain.len(), WSESSION_DATA_LEN);
918        // wfb-ng allows encrypted optional session TLVs after the fixed fields.
919        session_plain.extend_from_slice(&[0x42, 0x00, 0x01, 0x99]);
920
921        let session_nonce = [3; CRYPTO_BOX_NONCE_LEN];
922        let tx_box = SalsaBox::new(&rx_secret.public_key(), &tx_secret);
923        let encrypted_session = tx_box
924            .encrypt(
925                BoxNonce::from_slice(&session_nonce),
926                session_plain.as_slice(),
927            )
928            .unwrap();
929        let mut session_packet = vec![WFB_PACKET_KEY];
930        session_packet.extend_from_slice(&session_nonce);
931        session_packet.extend_from_slice(&encrypted_session);
932
933        let mut receiver = WfbReceiver::new(channel_id, keypair, 0);
934        let session_events = receiver.push_forwarder_packet(&session_packet).unwrap();
935        assert!(matches!(session_events.as_slice(), [WfbEvent::Session(_)]));
936
937        let data_nonce = 0u64;
938        let mut block_header = vec![WFB_PACKET_DATA];
939        block_header.extend_from_slice(&data_nonce.to_be_bytes());
940        let encrypted_data = encrypt_chacha20poly1305_legacy(
941            &session_key,
942            &block_header[1..WBLOCK_HDR_LEN],
943            &block_header,
944            &plain(b"rtp payload"),
945        )
946        .unwrap();
947        let mut data_packet = block_header;
948        data_packet.extend_from_slice(&encrypted_data);
949
950        let payload_events = receiver.push_forwarder_packet(&data_packet).unwrap();
951        match payload_events.as_slice() {
952            [WfbEvent::Payload(payload)] => assert_eq!(payload.payload, b"rtp payload"),
953            other => panic!("unexpected events: {other:?}"),
954        }
955
956        let mut older_session_plain = Vec::new();
957        older_session_plain.extend_from_slice(&0u64.to_be_bytes());
958        older_session_plain.extend_from_slice(&channel_id.raw().to_be_bytes());
959        older_session_plain.push(WFB_FEC_VDM_RS);
960        older_session_plain.push(1);
961        older_session_plain.push(1);
962        older_session_plain.extend_from_slice(&[8; CHACHA20_POLY1305_KEY_LEN]);
963        let older_session_nonce = [4; CRYPTO_BOX_NONCE_LEN];
964        let encrypted_older_session = tx_box
965            .encrypt(
966                BoxNonce::from_slice(&older_session_nonce),
967                older_session_plain.as_slice(),
968            )
969            .unwrap();
970        let mut older_session_packet = vec![WFB_PACKET_KEY];
971        older_session_packet.extend_from_slice(&older_session_nonce);
972        older_session_packet.extend_from_slice(&encrypted_older_session);
973
974        assert_eq!(
975            receiver.push_forwarder_packet(&older_session_packet),
976            Err(WfbError::SessionEpochTooOld {
977                session_epoch: 0,
978                minimum_epoch: 1,
979            })
980        );
981    }
982}