use std::collections::{BTreeMap, BTreeSet};
use openipc_core::{
ChannelId, PayloadRouteId, RadioPort, ReceiverBatchOptions, ReceiverRuntime, RtpPayloadTap,
WfbKeypair,
};
use web_time::Instant;
use crate::{
audio::AudioPlayer,
model::AudioStats,
recording::{AudioTrackConfig, RecordedAudioPacket},
settings::{PayloadRouteSettings, RouteAction},
};
use super::{RouteMetricDelta, StartRequest};
pub(super) const VPN_ROUTE_ID: PayloadRouteId = PayloadRouteId::new(u64::MAX);
fn route_id_is_reserved(id: u64) -> bool {
id <= 1 || id == VPN_ROUTE_ID.raw()
}
pub(super) struct RouteLog {
pub(super) warning: bool,
pub(super) message: String,
}
struct ActiveRoute {
settings: PayloadRouteSettings,
audio: Option<AudioPlayer>,
audio_unavailable: Option<AudioStats>,
#[cfg(not(target_arch = "wasm32"))]
udp: Option<std::net::UdpSocket>,
last_log: Option<Instant>,
}
pub(super) struct RouteProcessor {
routes: BTreeMap<u64, ActiveRoute>,
recording_audio_route: Option<u64>,
startup_logs: Vec<RouteLog>,
}
impl RouteProcessor {
pub(super) fn new(request: &StartRequest) -> Result<Self, String> {
let mut routes = BTreeMap::new();
let mut startup_logs = Vec::new();
for settings in request.payload_routes.iter().filter(|route| route.enabled) {
if route_id_is_reserved(settings.id) {
return Err(format!(
"payload route id {} is reserved by the receiver",
settings.id
));
}
if routes.contains_key(&settings.id) {
return Err(format!("duplicate payload route id {}", settings.id));
}
#[cfg(target_arch = "wasm32")]
if settings.action == RouteAction::Udp {
startup_logs.push(RouteLog {
warning: true,
message: format!(
"{} disabled: UDP forwarding is unavailable in browsers",
settings.name
),
});
continue;
}
let (audio, audio_unavailable) = if settings.action == RouteAction::Audio {
match AudioPlayer::new(
settings.sample_rate,
settings.channels,
request.audio_volume,
) {
Ok(player) => (Some(player), None),
Err(error) => {
startup_logs.push(RouteLog {
warning: true,
message: format!("{} audio unavailable: {error}", settings.name),
});
(
None,
Some(AudioStats {
enabled: true,
supported: false,
decoder_name: "Unavailable".to_owned(),
errors: 1,
..AudioStats::default()
}),
)
}
}
} else {
(None, None)
};
#[cfg(not(target_arch = "wasm32"))]
let udp = if settings.action == RouteAction::Udp {
let socket = std::net::UdpSocket::bind("0.0.0.0:0")
.map_err(|error| format!("{} UDP bind failed: {error}", settings.name))?;
socket
.connect((settings.udp_host.as_str(), settings.udp_port))
.map_err(|error| {
format!("{} UDP destination failed: {error}", settings.name)
})?;
Some(socket)
} else {
None
};
routes.insert(
settings.id,
ActiveRoute {
settings: settings.clone(),
audio,
audio_unavailable,
#[cfg(not(target_arch = "wasm32"))]
udp,
last_log: None,
},
);
}
let recording_audio_route = routes
.iter()
.find_map(|(id, route)| (route.settings.action == RouteAction::Audio).then_some(*id));
Ok(Self {
routes,
recording_audio_route,
startup_logs,
})
}
pub(super) fn take_startup_logs(&mut self) -> Vec<RouteLog> {
std::mem::take(&mut self.startup_logs)
}
pub(super) fn set_audio_volume(&mut self, volume: u8) {
for route in self.routes.values_mut() {
if let Some(audio) = route.audio.as_mut() {
audio.set_volume(volume);
}
}
}
pub(super) fn process(
&mut self,
payloads: &[openipc_core::RoutePayload],
capture_audio: bool,
) -> (
Vec<RouteMetricDelta>,
Vec<RouteLog>,
Vec<RecordedAudioPacket>,
) {
let mut updates = BTreeMap::<u64, RouteMetricDelta>::new();
let mut logs = Vec::new();
let mut recorded_audio = Vec::new();
for payload in payloads {
let Some(route) = self.routes.get_mut(&payload.route_id.raw()) else {
continue;
};
let update = updates
.entry(route.settings.id)
.or_insert_with(|| RouteMetricDelta {
route_id: route.settings.id,
..RouteMetricDelta::default()
});
update.packets = update.packets.saturating_add(1);
update.bytes = update.bytes.saturating_add(payload.data.len() as u64);
update.last_bytes = payload.data.len();
let result = match route.settings.action {
RouteAction::Inspect => Ok(()),
RouteAction::Log => {
if log_due(&mut route.last_log) {
logs.push(RouteLog {
warning: false,
message: format!(
"{} seq={} bytes={} preview={}",
route.settings.name,
payload.packet_seq,
payload.data.len(),
hex_preview(&payload.data)
),
});
}
Ok(())
}
RouteAction::Audio => {
if capture_audio && self.recording_audio_route == Some(route.settings.id) {
if let Ok(header) = openipc_core::RtpHeader::parse(&payload.data) {
recorded_audio.push(RecordedAudioPacket {
timestamp: header.timestamp,
data: header.payload(&payload.data).to_vec(),
});
}
}
route
.audio
.as_mut()
.ok_or_else(|| "audio output unavailable".to_owned())
.and_then(|audio| audio.push_rtp(&payload.data))
}
RouteAction::Udp => {
#[cfg(not(target_arch = "wasm32"))]
{
match route.udp.as_ref() {
Some(socket) => socket
.send(&payload.data)
.map(|_| ())
.map_err(|error| format!("UDP send failed: {error}")),
None => Err("UDP socket unavailable".to_owned()),
}
}
#[cfg(target_arch = "wasm32")]
{
Err("UDP forwarding is unavailable in browsers".to_owned())
}
}
};
if let Err(error) = result {
update.errors = update.errors.saturating_add(1);
if let Some(audio) = route.audio.as_mut() {
audio.record_error();
}
if log_due(&mut route.last_log) {
logs.push(RouteLog {
warning: true,
message: format!("{}: {error}", route.settings.name),
});
}
}
}
(updates.into_values().collect(), logs, recorded_audio)
}
pub(super) fn recording_audio_config(&self) -> Option<AudioTrackConfig> {
let id = self.recording_audio_route?;
let route = self.routes.get(&id)?;
Some(AudioTrackConfig {
sample_rate: 48_000,
channels: route.settings.channels.max(1),
})
}
pub(super) fn audio_stats(&self) -> AudioStats {
let mut combined = AudioStats::default();
for route in self.routes.values() {
let stats = route
.audio
.as_ref()
.map(AudioPlayer::stats)
.or_else(|| route.audio_unavailable.clone());
let Some(stats) = stats else {
continue;
};
combined.enabled |= stats.enabled;
combined.supported |= stats.supported;
if combined.decoder_name.is_empty() {
combined.decoder_name = stats.decoder_name;
}
combined.packets = combined.packets.saturating_add(stats.packets);
combined.bytes = combined.bytes.saturating_add(stats.bytes);
combined.decoded_frames = combined.decoded_frames.saturating_add(stats.decoded_frames);
combined.errors = combined.errors.saturating_add(stats.errors);
combined.queued_ms = combined.queued_ms.max(stats.queued_ms);
}
combined
}
}
pub(super) fn configure_receiver(
receiver: &mut ReceiverRuntime,
request: &StartRequest,
) -> Result<ReceiverBatchOptions, String> {
let mut options = ReceiverBatchOptions::default();
let mut ids = BTreeSet::new();
for route in request.payload_routes.iter().filter(|route| route.enabled) {
#[cfg(target_arch = "wasm32")]
if route.action == RouteAction::Udp {
continue;
}
if route_id_is_reserved(route.id) || !ids.insert(route.id) {
return Err(format!(
"invalid or duplicate payload route id {}",
route.id
));
}
let route_id = PayloadRouteId::new(route.id);
let channel_id =
ChannelId::from_link_port(request.channel_id >> 8, RadioPort::Custom(route.radio_port));
let keypair = WfbKeypair::from_bytes(&request.key_bytes)
.map_err(|error| format!("{} key is invalid: {error}", route.name))?;
receiver
.add_keyed_route(route_id, channel_id, 0, keypair, request.minimum_epoch)
.map_err(|error| format!("{} route setup failed: {error}", route.name))?;
if route.action == RouteAction::Audio {
options.rtp_payload_taps.push(RtpPayloadTap {
route_id,
payload_type: route.payload_type.min(127),
});
} else {
options.raw_payload_routes.push(route_id);
}
}
if request.vpn_enabled {
let keypair = WfbKeypair::from_bytes(&request.key_bytes)
.map_err(|error| format!("VPN key is invalid: {error}"))?;
receiver
.add_keyed_route(
VPN_ROUTE_ID,
ChannelId::from_link_port(request.channel_id >> 8, RadioPort::TunnelRx),
0,
keypair,
request.minimum_epoch,
)
.map_err(|error| format!("VPN route setup failed: {error}"))?;
options.raw_payload_routes.push(VPN_ROUTE_ID);
}
Ok(options)
}
#[cfg(debug_assertions)]
pub(super) fn configure_mock_receiver(
receiver: &mut ReceiverRuntime,
request: &StartRequest,
) -> ReceiverBatchOptions {
let mut options = ReceiverBatchOptions::default();
let channel_id = ChannelId::new(request.channel_id);
for route in request.payload_routes.iter().filter(|route| {
route.enabled
&& route.action == RouteAction::Audio
&& route.radio_port == RadioPort::Video.as_u8()
}) {
let route_id = PayloadRouteId::new(route.id);
receiver.add_mock_route(route_id, channel_id, 0);
options.rtp_payload_taps.push(RtpPayloadTap {
route_id,
payload_type: route.payload_type.min(127),
});
}
options
}
fn log_due(last: &mut Option<Instant>) -> bool {
let now = Instant::now();
if last.is_some_and(|previous| now.duration_since(previous).as_secs_f32() < 1.0) {
return false;
}
*last = Some(now);
true
}
fn hex_preview(bytes: &[u8]) -> String {
use std::fmt::Write as _;
let mut preview = String::new();
for byte in bytes.iter().take(16) {
let _ = write!(preview, "{byte:02x}");
}
if bytes.len() > 16 {
preview.push_str("...");
}
preview
}
#[cfg(test)]
mod tests {
use openipc_core::{
ChannelId, FrameLayout, PayloadRouteId, RadioPort, ReceiverRuntime, RoutePayload,
WfbKeypair,
};
use super::{configure_receiver, RouteProcessor};
use crate::{runtime::StartRequest, settings::Settings};
fn request_from_settings(settings: Settings) -> StartRequest {
StartRequest {
#[cfg(target_os = "android")]
video_output: None,
device_id: None,
channel: settings.channel,
channel_width_mhz: settings.channel_width_mhz,
channel_offset: settings.channel_offset,
channel_id: settings.video_channel().raw(),
minimum_epoch: settings.minimum_epoch,
transfer_size: settings.transfer_size,
codec_preference: settings.codec_preference,
rtp_reorder: false,
adaptive_link: false,
tx_power: settings.tx_power,
key_bytes: settings.key_bytes,
audio_volume: settings.audio_volume,
vpn_enabled: settings.vpn_enabled,
payload_routes: settings.payload_routes,
}
}
#[test]
fn default_audio_route_shares_the_video_runtime() {
let settings = Settings::default();
let keypair = WfbKeypair::from_bytes(&settings.key_bytes).unwrap();
let mut receiver = ReceiverRuntime::with_keyed_video_route(
FrameLayout::WithFcs,
PayloadRouteId::new(1),
ChannelId::new(settings.video_channel().raw()),
0,
keypair,
settings.minimum_epoch,
)
.unwrap();
let request = request_from_settings(settings);
let options = configure_receiver(&mut receiver, &request).unwrap();
assert_eq!(receiver.routes().runtime_count(), 2);
assert_eq!(options.raw_payload_routes, [PayloadRouteId::new(2)]);
assert_eq!(options.rtp_payload_taps.len(), 1);
assert_eq!(options.rtp_payload_taps[0].route_id, PayloadRouteId::new(3));
}
#[test]
fn vpn_adds_a_raw_tunnel_route() {
let settings = Settings {
vpn_enabled: true,
..Settings::default()
};
let keypair = WfbKeypair::from_bytes(&settings.key_bytes).unwrap();
let mut receiver = ReceiverRuntime::with_keyed_video_route(
FrameLayout::WithFcs,
PayloadRouteId::new(1),
settings.video_channel(),
0,
keypair,
settings.minimum_epoch,
)
.unwrap();
let request = request_from_settings(settings);
let options = configure_receiver(&mut receiver, &request).unwrap();
assert!(options.raw_payload_routes.contains(&super::VPN_ROUTE_ID));
}
#[test]
fn custom_routes_cannot_use_the_internal_vpn_route_id() {
let mut settings = Settings::default();
settings.payload_routes[0].id = super::VPN_ROUTE_ID.raw();
let request = request_from_settings(settings);
let error = RouteProcessor::new(&request)
.err()
.expect("reserved route id must be rejected");
assert!(error.contains("reserved"));
}
#[test]
fn recording_tap_removes_the_opus_rtp_header() {
let request = request_from_settings(Settings::default());
let mut routes = RouteProcessor::new(&request).unwrap();
let opus = [0xf8, 0xff, 0xfe];
let mut rtp = vec![0x80, 0x80 | openipc_core::rtp::RTP_PAYLOAD_TYPE_OPUS];
rtp.extend_from_slice(&7u16.to_be_bytes());
rtp.extend_from_slice(&48_000u32.to_be_bytes());
rtp.extend_from_slice(&1u32.to_be_bytes());
rtp.extend_from_slice(&opus);
let payload = RoutePayload {
route_id: PayloadRouteId::new(3),
channel_id: ChannelId::from_link_port(request.channel_id >> 8, RadioPort::Video),
packet_seq: 9,
data: rtp,
};
let (_, _, recorded) = routes.process(&[payload], true);
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].timestamp, 48_000);
assert_eq!(recorded[0].data, opus);
}
}