use std::{
borrow::Borrow,
collections::HashMap,
sync::Arc,
thread::{self, JoinHandle},
};
use super::{
missing_frames, DefaultDerandomizer, DefaultReedSolomon, Derandomizer, Frame, Integrity,
IntegrityAlgorithm, VCDUHeader,
};
use crate::prelude::*;
use crossbeam::channel::{bounded, unbounded, Receiver};
use tracing::{debug, span, Level};
#[derive(Default)]
pub struct FrameDecoder {
num_threads: Option<u32>,
derandomization: Option<Box<dyn Derandomizer>>,
integrity: Option<Box<dyn IntegrityAlgorithm>>,
}
impl FrameDecoder {
const DEFAULT_BUFFER_SIZE: usize = 1024;
pub fn new() -> Self {
FrameDecoder::default()
}
pub fn with_derandomization(mut self, derandomizer: Box<dyn Derandomizer>) -> Self {
self.derandomization = Some(derandomizer);
self
}
pub fn with_integrity(mut self, integrity: Box<dyn IntegrityAlgorithm>) -> Self {
self.integrity = Some(integrity);
self
}
pub fn with_integrity_threads(mut self, num: u32) -> Self {
self.num_threads = Some(num);
self
}
pub fn decode<B>(self, cadus: B) -> impl Iterator<Item = Result<DecodedFrame>>
where
B: Iterator<Item = Vec<u8>> + Send + 'static,
{
let (jobs_tx, jobs_rx) = bounded(Self::DEFAULT_BUFFER_SIZE);
let handle = thread::spawn(move || {
let pool = {
let mut pool = rayon::ThreadPoolBuilder::new();
if let Some(num) = self.num_threads {
pool = pool.num_threads(num as usize);
}
pool
}
.build()
.expect("failed to construct RS threadpool with requested number of threads");
let jobs_tx = jobs_tx.clone();
let integrity_alg = Arc::new(self.integrity);
for (idx, mut block) in cadus.enumerate() {
let (future_tx, future_rx) = unbounded();
let integrity_alg = integrity_alg.clone();
if let Some(ref pn) = self.derandomization {
block = pn.derandomize(&block).to_vec();
}
let Some(hdr) = VCDUHeader::decode(&block) else {
debug!(block_idx = idx, "cannot decode header; skipping");
continue;
};
pool.spawn_fifo(move || {
let decoded_frame = if let Some(integrity_alg) = integrity_alg.clone().borrow()
{
match integrity_alg.perform(&hdr, &block) {
Ok((status, data)) => Ok(DecodedFrame {
frame: Frame { header: hdr, data },
missing: 0,
integrity: Some(status),
}),
Err(err) => Err(err),
}
} else {
Ok(DecodedFrame {
frame: Frame {
header: hdr,
data: block,
},
missing: 0,
integrity: None,
})
};
if future_tx.send(decoded_frame).is_err() {
debug!(block_idx = idx, "failed to send frame");
}
});
if let Err(err) = jobs_tx.send(future_rx) {
debug!("failed to send frame future: {err}");
}
}
});
DecodedFrameIter {
done: false,
jobs: jobs_rx,
handle: Some(handle),
last: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct DecodedFrame {
pub frame: super::Frame,
pub missing: u32,
pub integrity: Option<Integrity>,
}
struct DecodedFrameIter {
done: bool,
jobs: Receiver<Receiver<Result<DecodedFrame>>>,
handle: Option<JoinHandle<()>>,
last: HashMap<super::Vcid, u32>,
}
impl Iterator for DecodedFrameIter {
type Item = Result<DecodedFrame>;
fn next(&mut self) -> Option<Self::Item> {
match self.jobs.recv() {
Err(_) => {
self.done = true;
self.handle
.take()
.expect("bad state, handle should not be None")
.join()
.expect("reedsolomon thread paniced");
None
}
Ok(rx) => match rx.recv().expect("failed to receive frame future") {
Ok(mut decoded_frame) => {
let frame = &decoded_frame.frame;
let span = span!(
Level::TRACE,
"frame",
scid = frame.header.scid,
vcid = frame.header.vcid
);
let _guard = span.enter();
decoded_frame.missing = if frame.header.vcid == VCDUHeader::FILL {
0
} else if let Some(last) = self.last.get(&frame.header.vcid) {
missing_frames(frame.header.counter, *last)
} else {
self.last.insert(frame.header.vcid, frame.header.counter);
0
};
self.last.insert(frame.header.vcid, frame.header.counter);
Some(Ok(decoded_frame))
}
Err(err) => Some(Err(err)),
},
}
}
}
pub fn decode_frames<I>(
cadus: I,
integrity: Option<Box<dyn IntegrityAlgorithm>>,
pn: Option<Box<dyn Derandomizer>>,
) -> impl Iterator<Item = Result<DecodedFrame>>
where
I: Iterator<Item = Vec<u8>> + Send + 'static,
{
let mut decoder = FrameDecoder::new();
if let Some(pn) = pn {
decoder = decoder.with_derandomization(pn);
}
if let Some(integrity) = integrity {
decoder = decoder.with_integrity(integrity);
}
decoder.decode(cadus)
}
pub fn decode_frames_rs<I>(cadus: I, interleave: u8) -> impl Iterator<Item = Result<DecodedFrame>>
where
I: Iterator<Item = Vec<u8>> + Send + 'static,
{
decode_frames(
cadus,
Some(Box::new(DefaultReedSolomon::new(interleave))),
Some(Box::new(DefaultDerandomizer)),
)
}
#[cfg(test)]
mod tests {
use crate::framing::MPDU;
use super::*;
#[test]
fn decode_frame() {
let dat: Vec<u8> = vec![
0x55, 0x61, 0x01, 0xe2, 0x40, 0x05, 0x00, 0x00, 0x00,
];
let expected_len = dat.len();
let frame = Frame::decode(dat).unwrap();
assert_eq!(frame.data.len(), expected_len);
}
#[test]
fn test_decode_single_frame() {
let mut dat: Vec<u8> = vec![
0x67, 0x50, 0x96, 0x30, 0xbc, 0x80, 0x07, 0xff, ];
dat.resize(892, 0xff);
assert_eq!(dat.len(), 892);
let frame = Frame::decode(dat).unwrap();
assert_eq!(frame.header.scid, 157);
assert_eq!(frame.header.vcid, 16);
let mpdu = frame.mpdu(0, 0).unwrap();
assert!(!mpdu.is_fill());
assert!(
mpdu.first_header == MPDU::NO_HEADER,
"expected {} got {}",
MPDU::NO_HEADER,
mpdu.first_header
);
assert!(!mpdu.has_header());
}
}