use crate::channel::ChannelId;
use crate::ieee80211::FrameLayout;
use crate::realtek::{
parse_rx_aggregate, parse_rx_aggregate_with_kind, AggregateError, RealtekRxPacket,
RxDescriptorKind, RxPacketType,
};
use crate::routes::{
PayloadRouteError, PayloadRouteEvent, PayloadRouteId, PayloadRouteManager, PayloadRuntimeKey,
};
use crate::rtp::{
DepacketizedFrame, RtpDepacketizer, RtpDepacketizerStatus, RtpHeader, RtpReorderBuffer,
RtpReorderStatus,
};
use crate::wfb::{FecCounters, WfbKeypair};
#[derive(Debug, Clone)]
pub struct ReceiverRuntime {
routes: PayloadRouteManager,
video_runtime: PayloadRuntimeKey,
video_route_id: PayloadRouteId,
rtp: RtpDepacketizer,
rtp_reorder: Option<RtpReorderBuffer>,
}
#[derive(Debug, Clone)]
pub struct ReceiverBatchOptions {
pub accept_corrupted: bool,
pub raw_payload_routes: Vec<PayloadRouteId>,
pub rtp_payload_taps: Vec<RtpPayloadTap>,
pub depacketize_video: bool,
}
impl Default for ReceiverBatchOptions {
fn default() -> Self {
Self {
accept_corrupted: false,
raw_payload_routes: Vec::new(),
rtp_payload_taps: Vec::new(),
depacketize_video: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RtpPayloadTap {
pub route_id: PayloadRouteId,
pub payload_type: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutePayload {
pub route_id: PayloadRouteId,
pub channel_id: ChannelId,
pub packet_seq: u64,
pub data: Vec<u8>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct ReceiverBatchCounters {
pub packets: usize,
pub accepted_packets: usize,
pub dropped_packets: usize,
pub crc_dropped: usize,
pub icv_dropped: usize,
pub report_dropped: usize,
pub ignored_frames: usize,
pub sessions: usize,
pub wfb_payloads: usize,
pub rtp_packets: usize,
pub video_frames: usize,
pub raw_payload_count: usize,
pub raw_payload_bytes: usize,
pub route_errors: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceiverBatch {
pub frames: Vec<DepacketizedFrame>,
pub raw_payloads: Vec<RoutePayload>,
pub counters: ReceiverBatchCounters,
pub fec_counters: FecCounters,
pub rtp_status: RtpDepacketizerStatus,
pub rtp_reorder_status: RtpReorderStatus,
}
impl ReceiverRuntime {
pub fn from_routes(
routes: PayloadRouteManager,
video_runtime: PayloadRuntimeKey,
video_route_id: PayloadRouteId,
) -> Self {
Self {
routes,
video_runtime,
video_route_id,
rtp: RtpDepacketizer::new(),
rtp_reorder: None,
}
}
pub fn with_plain_video_route(
frame_layout: FrameLayout,
video_route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
fec_k: usize,
fec_n: usize,
) -> Result<Self, PayloadRouteError> {
let mut routes = PayloadRouteManager::new(frame_layout);
let video_runtime =
routes.add_plain_route(video_route_id, channel_id, key_slot, fec_k, fec_n)?;
Ok(Self::from_routes(routes, video_runtime, video_route_id))
}
pub fn with_keyed_video_route(
frame_layout: FrameLayout,
video_route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
keypair: WfbKeypair,
minimum_epoch: u64,
) -> Result<Self, PayloadRouteError> {
let mut routes = PayloadRouteManager::new(frame_layout);
let video_runtime =
routes.add_keyed_route(video_route_id, channel_id, key_slot, keypair, minimum_epoch)?;
Ok(Self::from_routes(routes, video_runtime, video_route_id))
}
pub fn with_direct_video_route(
frame_layout: FrameLayout,
video_route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
) -> Self {
let mut routes = PayloadRouteManager::new(frame_layout);
let video_runtime = routes.add_direct_route(video_route_id, channel_id, key_slot);
Self::from_routes(routes, video_runtime, video_route_id)
}
pub fn with_mock_video_route(
frame_layout: FrameLayout,
video_route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
) -> Self {
Self::with_direct_video_route(frame_layout, video_route_id, channel_id, key_slot)
}
pub const fn video_runtime(&self) -> PayloadRuntimeKey {
self.video_runtime
}
pub const fn video_route_id(&self) -> PayloadRouteId {
self.video_route_id
}
pub fn routes(&self) -> &PayloadRouteManager {
&self.routes
}
pub fn routes_mut(&mut self) -> &mut PayloadRouteManager {
&mut self.routes
}
pub fn rtp_mut(&mut self) -> &mut RtpDepacketizer {
&mut self.rtp
}
pub fn rtp_status(&self) -> RtpDepacketizerStatus {
self.rtp.status()
}
pub fn rtp_reorder_status(&self) -> RtpReorderStatus {
self.rtp_reorder
.as_ref()
.map(RtpReorderBuffer::status)
.unwrap_or_default()
}
pub fn set_rtp_reorder_enabled(&mut self, enabled: bool) {
if enabled {
self.rtp_reorder
.get_or_insert_with(RtpReorderBuffer::default);
} else {
self.rtp_reorder = None;
}
}
pub const fn rtp_reorder_enabled(&self) -> bool {
self.rtp_reorder.is_some()
}
pub fn push_rtp_packet(
&mut self,
packet: &[u8],
) -> Result<Vec<DepacketizedFrame>, crate::rtp::RtpError> {
let mut frames = Vec::new();
self.push_video_payload_into(packet, &mut frames)?;
Ok(frames)
}
fn push_video_payload_into(
&mut self,
payload: &[u8],
frames: &mut Vec<DepacketizedFrame>,
) -> Result<usize, crate::rtp::RtpError> {
let before = frames.len();
if let Some(reorder) = self.rtp_reorder.as_mut() {
for ordered in reorder.push(payload)? {
if let Some(frame) = self.rtp.push(&ordered)? {
frames.push(frame);
}
}
} else if let Some(frame) = self.rtp.push(payload)? {
frames.push(frame);
}
Ok(frames.len() - before)
}
pub fn add_plain_route(
&mut self,
route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
fec_k: usize,
fec_n: usize,
) -> Result<PayloadRuntimeKey, PayloadRouteError> {
self.routes
.add_plain_route(route_id, channel_id, key_slot, fec_k, fec_n)
}
pub fn add_keyed_route(
&mut self,
route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
keypair: WfbKeypair,
minimum_epoch: u64,
) -> Result<PayloadRuntimeKey, PayloadRouteError> {
self.routes
.add_keyed_route(route_id, channel_id, key_slot, keypair, minimum_epoch)
}
pub fn add_direct_route(
&mut self,
route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
) -> PayloadRuntimeKey {
self.routes.add_direct_route(route_id, channel_id, key_slot)
}
pub fn add_mock_route(
&mut self,
route_id: PayloadRouteId,
channel_id: ChannelId,
key_slot: u64,
) -> PayloadRuntimeKey {
self.add_direct_route(route_id, channel_id, key_slot)
}
pub fn video_fec_counters(&self) -> FecCounters {
self.routes
.fec_counters(self.video_runtime)
.unwrap_or_default()
}
pub fn accepts_video_frame(&self, frame: &[u8]) -> bool {
self.routes.accepts_80211_frame(self.video_runtime, frame)
}
pub fn push_rx_transfer(
&mut self,
transfer: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, AggregateError> {
let packets = parse_rx_aggregate(transfer)?;
Ok(self.push_rx_packets(packets, options))
}
pub fn push_rx_transfer_with_kind(
&mut self,
transfer: &[u8],
descriptor_kind: RxDescriptorKind,
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, AggregateError> {
let packets = parse_rx_aggregate_with_kind(transfer, descriptor_kind)?;
Ok(self.push_rx_packets(packets, options))
}
pub fn push_rx_packets<'a>(
&mut self,
packets: impl IntoIterator<Item = RealtekRxPacket<'a>>,
options: &ReceiverBatchOptions,
) -> ReceiverBatch {
let mut batch = self.empty_batch();
for packet in packets {
batch.counters.packets += 1;
if packet.attrib.crc_err && !options.accept_corrupted {
batch.counters.crc_dropped += 1;
continue;
}
if packet.attrib.icv_err && !options.accept_corrupted {
batch.counters.icv_dropped += 1;
continue;
}
if packet.attrib.pkt_rpt_type != RxPacketType::NormalRx {
batch.counters.report_dropped += 1;
continue;
}
batch.counters.accepted_packets += 1;
match self.routes.push_80211_frame(packet.data) {
Ok(events) => self.apply_route_events(events, options, &mut batch),
Err(_) => {
batch.counters.ignored_frames += 1;
batch.counters.route_errors += 1;
}
}
}
batch.counters.dropped_packets =
batch.counters.crc_dropped + batch.counters.icv_dropped + batch.counters.report_dropped;
batch.fec_counters = self.video_fec_counters();
batch
}
pub fn push_80211_frame(
&mut self,
frame: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, PayloadRouteError> {
let mut batch = self.empty_batch();
let events = self.routes.push_80211_frame(frame)?;
self.apply_route_events(events, options, &mut batch);
batch.fec_counters = self.video_fec_counters();
Ok(batch)
}
pub fn push_decrypted_80211_frame(
&mut self,
runtime: PayloadRuntimeKey,
frame: &[u8],
decrypted_fragment: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, PayloadRouteError> {
let mut batch = self.empty_batch();
let events = self
.routes
.push_decrypted_80211_frame(runtime, frame, decrypted_fragment)?;
self.apply_route_events(events, options, &mut batch);
batch.fec_counters = self.video_fec_counters();
Ok(batch)
}
pub fn push_decrypted_fragment(
&mut self,
runtime: PayloadRuntimeKey,
data_nonce: u64,
decrypted_fragment: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, PayloadRouteError> {
let mut batch = self.empty_batch();
let events =
self.routes
.push_decrypted_fragment(runtime, data_nonce, decrypted_fragment)?;
self.apply_route_events(events, options, &mut batch);
batch.fec_counters = self.video_fec_counters();
Ok(batch)
}
pub fn push_direct_payload(
&mut self,
runtime: PayloadRuntimeKey,
packet_seq: u64,
payload: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, PayloadRouteError> {
let mut batch = self.empty_batch();
let events = self
.routes
.push_direct_payload(runtime, packet_seq, payload)?;
self.apply_route_events(events, options, &mut batch);
batch.fec_counters = self.video_fec_counters();
Ok(batch)
}
pub fn push_mock_payload(
&mut self,
runtime: PayloadRuntimeKey,
packet_seq: u64,
payload: &[u8],
options: &ReceiverBatchOptions,
) -> Result<ReceiverBatch, PayloadRouteError> {
self.push_direct_payload(runtime, packet_seq, payload, options)
}
fn empty_batch(&self) -> ReceiverBatch {
ReceiverBatch {
frames: Vec::new(),
raw_payloads: Vec::new(),
counters: ReceiverBatchCounters::default(),
fec_counters: self.video_fec_counters(),
rtp_status: self.rtp_status(),
rtp_reorder_status: self.rtp_reorder_status(),
}
}
fn apply_route_events(
&mut self,
events: Vec<PayloadRouteEvent>,
options: &ReceiverBatchOptions,
batch: &mut ReceiverBatch,
) {
for event in events {
match event {
PayloadRouteEvent::IgnoredFrame => batch.counters.ignored_frames += 1,
PayloadRouteEvent::SessionEstablished { .. } => batch.counters.sessions += 1,
PayloadRouteEvent::Payload {
route_ids, payload, ..
} => {
if route_ids.contains(&self.video_route_id) {
batch.counters.wfb_payloads += 1;
batch.counters.rtp_packets += 1;
if options.depacketize_video {
if let Ok(frames) =
self.push_video_payload_into(&payload.data, &mut batch.frames)
{
batch.counters.video_frames += frames;
}
}
}
for &route_id in &route_ids {
if !options.raw_payload_routes.contains(&route_id) {
continue;
}
copy_raw_payload(route_id, &payload, batch);
}
if options.rtp_payload_taps.is_empty() {
continue;
}
let Ok(header) = RtpHeader::parse(&payload.data) else {
continue;
};
for tap in &options.rtp_payload_taps {
if header.payload_type != tap.payload_type {
continue;
}
if !route_ids.contains(&tap.route_id) {
continue;
}
copy_raw_payload(tap.route_id, &payload, batch);
}
}
}
}
batch.rtp_status = self.rtp_status();
batch.rtp_reorder_status = self.rtp_reorder_status();
}
}
fn copy_raw_payload(
route_id: PayloadRouteId,
payload: &crate::pipeline::RecoveredPayload,
batch: &mut ReceiverBatch,
) {
batch.counters.raw_payload_count += 1;
batch.counters.raw_payload_bytes += payload.data.len();
batch.raw_payloads.push(RoutePayload {
route_id,
channel_id: payload.channel_id,
packet_seq: payload.packet_seq,
data: payload.data.clone(),
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::RadioPort;
fn plain(payload: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
out.push(0);
out.extend_from_slice(&(payload.len() as u16).to_be_bytes());
out.extend_from_slice(payload);
out
}
fn rtp(payload_type: u8, payload: &[u8]) -> Vec<u8> {
let mut packet = vec![0x80, payload_type & 0x7f];
packet.extend_from_slice(&7u16.to_be_bytes());
packet.extend_from_slice(&48_000u32.to_be_bytes());
packet.extend_from_slice(&0x1122_3344u32.to_be_bytes());
packet.extend_from_slice(payload);
packet
}
fn h264_stap_a_rtp() -> Vec<u8> {
let sps = [0x67, 0x42, 0x00, 0x1e, 0xab];
let pps = [0x68, 0xce, 0x06, 0xe2];
let idr = [0x65, 0x88, 0x84, 0x21];
let mut payload = vec![24];
for nalu in [&sps[..], &pps[..], &idr[..]] {
payload.extend_from_slice(&(nalu.len() as u16).to_be_bytes());
payload.extend_from_slice(nalu);
}
let mut packet = rtp(crate::rtp::RTP_PAYLOAD_TYPE_H264, &payload);
packet[1] |= 0x80;
packet
}
#[test]
fn decrypted_fragment_can_fan_out_to_raw_route() {
let route = PayloadRouteId::new(7);
let mut runtime = ReceiverRuntime::with_plain_video_route(
FrameLayout::WithFcs,
route,
ChannelId::default_video(),
0,
1,
1,
)
.unwrap();
let batch = runtime
.push_decrypted_fragment(
runtime.video_runtime(),
0,
&plain(b"payload bytes"),
&ReceiverBatchOptions {
raw_payload_routes: vec![route],
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(batch.counters.wfb_payloads, 1);
assert_eq!(batch.counters.raw_payload_count, 1);
assert_eq!(batch.raw_payloads[0].data, b"payload bytes");
}
#[test]
fn rtp_payload_tap_copies_only_matching_payload_type() {
let video_route = PayloadRouteId::new(1);
let audio_route = PayloadRouteId::new(3);
let mut runtime = ReceiverRuntime::with_plain_video_route(
FrameLayout::WithFcs,
video_route,
ChannelId::default_video(),
0,
1,
1,
)
.unwrap();
runtime
.add_plain_route(audio_route, ChannelId::default_video(), 0, 1, 1)
.unwrap();
let ignored = runtime
.push_decrypted_fragment(
runtime.video_runtime(),
0,
&plain(&rtp(crate::rtp::RTP_PAYLOAD_TYPE_H264, b"video")),
&ReceiverBatchOptions {
rtp_payload_taps: vec![RtpPayloadTap {
route_id: audio_route,
payload_type: crate::rtp::RTP_PAYLOAD_TYPE_OPUS,
}],
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(ignored.counters.raw_payload_count, 0);
let packet = rtp(crate::rtp::RTP_PAYLOAD_TYPE_OPUS, b"opus");
let batch = runtime
.push_decrypted_fragment(
runtime.video_runtime(),
1 << 8,
&plain(&packet),
&ReceiverBatchOptions {
rtp_payload_taps: vec![RtpPayloadTap {
route_id: audio_route,
payload_type: crate::rtp::RTP_PAYLOAD_TYPE_OPUS,
}],
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(batch.counters.raw_payload_count, 1);
assert_eq!(batch.raw_payloads[0].route_id, audio_route);
assert_eq!(batch.raw_payloads[0].data, packet);
}
#[test]
fn rtp_reorder_is_opt_in() {
let mut runtime = ReceiverRuntime::with_plain_video_route(
FrameLayout::WithFcs,
PayloadRouteId::new(1),
ChannelId::default_video(),
0,
1,
1,
)
.unwrap();
assert!(!runtime.rtp_reorder_enabled());
assert_eq!(runtime.rtp_reorder_status(), RtpReorderStatus::default());
runtime.set_rtp_reorder_enabled(true);
assert!(runtime.rtp_reorder_enabled());
runtime.set_rtp_reorder_enabled(false);
assert!(!runtime.rtp_reorder_enabled());
assert_eq!(runtime.rtp_reorder_status(), RtpReorderStatus::default());
}
#[test]
fn auxiliary_route_does_not_count_as_video_payload() {
let video_route = PayloadRouteId::new(1);
let data_route = PayloadRouteId::new(2);
let mut runtime = ReceiverRuntime::with_plain_video_route(
FrameLayout::WithFcs,
video_route,
ChannelId::default_video(),
0,
1,
1,
)
.unwrap();
let data_runtime = runtime
.add_plain_route(
data_route,
ChannelId::from_link_port(crate::channel::DEFAULT_LINK_ID, RadioPort::TunnelRx),
0,
1,
1,
)
.unwrap();
let batch = runtime
.push_decrypted_fragment(
data_runtime,
0,
&plain(b"data bytes"),
&ReceiverBatchOptions {
raw_payload_routes: vec![data_route],
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(batch.counters.wfb_payloads, 0);
assert_eq!(batch.counters.rtp_packets, 0);
assert_eq!(batch.counters.raw_payload_count, 1);
assert_eq!(batch.raw_payloads[0].data, b"data bytes");
}
#[test]
fn direct_payload_runtime_uses_same_video_route_and_rtp_depacketizer() {
let video_route = PayloadRouteId::new(1);
let mut runtime = ReceiverRuntime::with_direct_video_route(
FrameLayout::WithFcs,
video_route,
ChannelId::default_video(),
0,
);
let packet = h264_stap_a_rtp();
let batch = runtime
.push_direct_payload(
runtime.video_runtime(),
123,
&packet,
&ReceiverBatchOptions {
raw_payload_routes: vec![video_route],
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(batch.counters.wfb_payloads, 1);
assert_eq!(batch.counters.rtp_packets, 1);
assert_eq!(batch.counters.video_frames, 1);
assert_eq!(batch.frames.len(), 1);
assert_eq!(batch.frames[0].codec, crate::rtp::Codec::H264);
assert!(batch.frames[0].is_keyframe);
assert_eq!(batch.raw_payloads[0].data, packet);
assert_eq!(batch.fec_counters, FecCounters::default());
}
#[test]
fn video_depacketization_can_be_delegated_without_losing_raw_rtp() {
let video_route = PayloadRouteId::new(1);
let mut runtime = ReceiverRuntime::with_direct_video_route(
FrameLayout::WithFcs,
video_route,
ChannelId::default_video(),
0,
);
let packet = h264_stap_a_rtp();
let batch = runtime
.push_direct_payload(
runtime.video_runtime(),
123,
&packet,
&ReceiverBatchOptions {
raw_payload_routes: vec![video_route],
depacketize_video: false,
..ReceiverBatchOptions::default()
},
)
.unwrap();
assert_eq!(batch.counters.wfb_payloads, 1);
assert_eq!(batch.counters.rtp_packets, 1);
assert_eq!(batch.counters.video_frames, 0);
assert!(batch.frames.is_empty());
assert_eq!(batch.raw_payloads[0].data, packet);
assert_eq!(batch.rtp_status, RtpDepacketizerStatus::default());
}
#[test]
fn rx_transfer_accepts_explicit_jaguar3_descriptor_layout() {
let mut runtime = ReceiverRuntime::with_plain_video_route(
FrameLayout::WithFcs,
PayloadRouteId::new(1),
ChannelId::default_video(),
0,
1,
1,
)
.unwrap();
let mut transfer = vec![0u8; 32];
transfer[..4].copy_from_slice(&8u32.to_le_bytes());
transfer[24..32].copy_from_slice(&[0x08, 0, 0, 0, 0, 0, 0, 0]);
let batch = runtime
.push_rx_transfer_with_kind(
&transfer,
RxDescriptorKind::Jaguar3,
&ReceiverBatchOptions::default(),
)
.unwrap();
assert_eq!(batch.counters.packets, 1);
assert_eq!(batch.counters.accepted_packets, 1);
assert_eq!(batch.counters.ignored_frames, 1);
}
}