1use rand_core::{OsRng, RngCore};
2
3use crate::channel::{ChannelId, RadioPort};
4use crate::radiotap::TxRadioParams;
5use crate::wfb::WfbError;
6use crate::wfb_tx::{WfbTransmitter, WfbTxKeypair};
7
8const WINDOW_MS: u64 = 1_000;
9const DEFAULT_FEEDBACK_INTERVAL_MS: u64 = 100;
10const DEFAULT_SESSION_INTERVAL_MS: u64 = 1_000;
11const DEFAULT_IDR_REQUEST_MESSAGES: u32 = 20;
12const DEFAULT_VIDEO_START_IDLE_MS: u64 = 1_000;
13
14#[derive(Debug, Clone, PartialEq)]
16pub struct LinkQuality {
17 pub lost_last_second: u32,
19 pub recovered_last_second: u32,
21 pub total_last_second: u32,
23 pub rssi: [i32; 2],
25 pub snr: [i32; 2],
27 pub link_score: [i32; 2],
29 pub idr_code: String,
31}
32
33#[derive(Debug, Clone)]
34struct SignalEntry {
35 at_ms: u64,
36 ant1: i32,
37 ant2: i32,
38}
39
40#[derive(Debug, Clone)]
41struct FecEntry {
42 at_ms: u64,
43 total: u32,
44 recovered: u32,
45 lost: u32,
46}
47
48#[derive(Debug, Clone)]
49struct FecController {
50 enabled: bool,
51 value: i32,
52 last_change_ms: u64,
53}
54
55impl FecController {
56 const fn new() -> Self {
57 Self {
58 enabled: true,
59 value: 0,
60 last_change_ms: 0,
61 }
62 }
63
64 fn value(&mut self, now_ms: u64) -> i32 {
65 if !self.enabled {
66 return 0;
67 }
68 self.decay(now_ms);
69 self.value
70 }
71
72 fn bump(&mut self, now_ms: u64, new_value: i32) {
73 if new_value > self.value {
74 self.value = new_value;
75 self.last_change_ms = now_ms;
76 }
77 }
78
79 fn decay(&mut self, now_ms: u64) {
80 if self.value == 0 {
81 return;
82 }
83 let elapsed = now_ms.saturating_sub(self.last_change_ms);
84 if elapsed < WINDOW_MS {
85 return;
86 }
87 let ticks = (elapsed / WINDOW_MS) as i32;
88 self.value = (self.value - ticks).max(0);
89 self.last_change_ms = self.last_change_ms.saturating_add(ticks as u64 * WINDOW_MS);
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct AdaptiveLink {
96 rssi: Vec<SignalEntry>,
97 snr: Vec<SignalEntry>,
98 fec: Vec<FecEntry>,
99 fec_controller: FecController,
100 idr_code: Option<String>,
101 idr_remaining_messages: u32,
102 idr_max_messages: u32,
103 last_video_activity_ms: Option<u64>,
104 video_start_idle_ms: u64,
105 ip_packet_id: u16,
106}
107
108impl AdaptiveLink {
109 pub fn new() -> Self {
111 Self {
112 rssi: Vec::new(),
113 snr: Vec::new(),
114 fec: Vec::new(),
115 fec_controller: FecController::new(),
116 idr_code: None,
117 idr_remaining_messages: 0,
118 idr_max_messages: DEFAULT_IDR_REQUEST_MESSAGES,
119 last_video_activity_ms: None,
120 video_start_idle_ms: DEFAULT_VIDEO_START_IDLE_MS,
121 ip_packet_id: 0,
122 }
123 }
124
125 pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
127 self.record_rx(now_ms, rssi[0], rssi[1], snr[0], snr[1]);
128 }
129
130 pub fn record_rx(&mut self, now_ms: u64, rssi0: u8, rssi1: u8, snr0: i8, snr1: i8) {
132 self.rssi.push(SignalEntry {
133 at_ms: now_ms,
134 ant1: rssi0 as i32,
135 ant2: rssi1 as i32,
136 });
137 self.snr.push(SignalEntry {
138 at_ms: now_ms,
139 ant1: snr0 as i32,
140 ant2: snr1 as i32,
141 });
142 self.cleanup(now_ms);
143 }
144
145 pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
147 if total == 0 && recovered == 0 && lost == 0 {
148 return;
149 }
150 let video_started = self.video_started_after_idle(now_ms);
151 self.last_video_activity_ms = Some(now_ms);
152 if video_started || lost > 0 {
153 self.request_keyframe();
154 }
155 self.fec.push(FecEntry {
156 at_ms: now_ms,
157 total,
158 recovered,
159 lost,
160 });
161 self.cleanup(now_ms);
162 }
163
164 pub fn request_keyframe(&mut self) {
166 if self.idr_max_messages == 0 {
167 self.idr_code = None;
168 self.idr_remaining_messages = 0;
169 return;
170 }
171 self.idr_code = Some(random_idr_code());
172 self.idr_remaining_messages = self.idr_max_messages;
173 }
174
175 pub fn set_keyframe_request_messages(&mut self, messages: u32) {
177 self.idr_max_messages = messages;
178 if self.idr_remaining_messages > messages {
179 self.idr_remaining_messages = messages;
180 }
181 if messages == 0 {
182 self.idr_code = None;
183 self.idr_remaining_messages = 0;
184 }
185 }
186
187 pub fn set_video_start_idle_ms(&mut self, idle_ms: u64) {
189 self.video_start_idle_ms = idle_ms;
190 }
191
192 pub fn quality(&mut self, now_ms: u64) -> LinkQuality {
194 self.cleanup(now_ms);
195 let (avg_rssi0, avg_rssi1) = avg_signal(&self.rssi);
196 let (avg_snr0, avg_snr1) = avg_signal(&self.snr);
197 let (total, recovered, lost) = self.fec.iter().fold((0u32, 0u32, 0u32), |acc, entry| {
198 (
199 acc.0.saturating_add(entry.total),
200 acc.1.saturating_add(entry.recovered),
201 acc.2.saturating_add(entry.lost),
202 )
203 });
204
205 let rssi = [avg_rssi0.round() as i32, avg_rssi1.round() as i32];
206 let snr = [avg_snr0.round() as i32, avg_snr1.round() as i32];
207 let link_score = [
208 link_score(avg_rssi0, avg_snr0),
209 link_score(avg_rssi1, avg_snr1),
210 ];
211
212 LinkQuality {
213 lost_last_second: lost,
214 recovered_last_second: recovered,
215 total_last_second: total,
216 rssi,
217 snr,
218 link_score,
219 idr_code: self
220 .idr_code
221 .clone()
222 .filter(|_| self.idr_remaining_messages > 0)
223 .unwrap_or_default(),
224 }
225 }
226
227 pub fn feedback_udp_payload(&mut self, now_ms: u64) -> Vec<u8> {
229 let quality = self.quality(now_ms);
230 if quality.lost_last_second > 2 {
231 self.fec_controller.bump(now_ms, 5);
232 } else if quality.recovered_last_second > 30 {
233 self.fec_controller.bump(now_ms, 4);
234 } else if quality.recovered_last_second > 24 {
235 self.fec_controller.bump(now_ms, 3);
236 } else if quality.recovered_last_second > 14 {
237 self.fec_controller.bump(now_ms, 2);
238 } else if quality.recovered_last_second > 8 {
239 self.fec_controller.bump(now_ms, 1);
240 }
241
242 let fec_change = self.fec_controller.value(now_ms);
243 let best_link_score = quality.link_score[0].max(quality.link_score[1]);
244 let best_rssi = quality.rssi[0].max(quality.rssi[1]);
245 let best_snr = quality.snr[0].max(quality.snr[1]);
246 let mut message = format!(
247 "{}:{}:{}:{}:{}:{}:{:.6}:0:-1:{}",
248 now_ms / 1000,
249 best_link_score,
250 best_link_score,
251 quality.recovered_last_second,
252 quality.lost_last_second,
253 best_rssi,
254 best_snr as f64,
255 fec_change
256 );
257 let idr_code = (self.idr_remaining_messages > 0)
258 .then(|| self.idr_code.clone())
259 .flatten();
260 if let Some(idr_code) = idr_code {
261 message.push(':');
262 message.push_str(&idr_code);
263 self.idr_remaining_messages = self.idr_remaining_messages.saturating_sub(1);
264 if self.idr_remaining_messages == 0 {
265 self.idr_code = None;
266 }
267 }
268 message.push('\n');
269 let mut udp_payload = Vec::with_capacity(4 + message.len());
270 udp_payload.extend_from_slice(&(message.len() as u32).to_be_bytes());
271 udp_payload.extend_from_slice(message.as_bytes());
272 udp_payload
273 }
274
275 pub fn feedback_ip_packet(&mut self, now_ms: u64) -> Vec<u8> {
277 let packet =
278 wrap_udp_ipv4_payload_with_id(&self.feedback_udp_payload(now_ms), self.ip_packet_id);
279 self.ip_packet_id = self.ip_packet_id.wrapping_add(1);
280 packet
281 }
282
283 fn cleanup(&mut self, now_ms: u64) {
284 let cutoff = now_ms.saturating_sub(WINDOW_MS);
285 self.rssi.retain(|entry| entry.at_ms >= cutoff);
286 self.snr.retain(|entry| entry.at_ms >= cutoff);
287 self.fec.retain(|entry| entry.at_ms >= cutoff);
288 }
289
290 fn video_started_after_idle(&self, now_ms: u64) -> bool {
291 self.last_video_activity_ms
292 .map(|last| now_ms.saturating_sub(last) >= self.video_start_idle_ms)
293 .unwrap_or(true)
294 }
295}
296
297impl Default for AdaptiveLink {
298 fn default() -> Self {
299 Self::new()
300 }
301}
302
303#[derive(Debug, Clone)]
305pub struct AdaptiveLinkSender {
306 link: AdaptiveLink,
307 tx: WfbTransmitter,
308 tx_params: TxRadioParams,
309 feedback_interval_ms: u64,
310 session_interval_ms: u64,
311 last_feedback_ms: Option<u64>,
312 last_session_ms: Option<u64>,
313}
314
315impl AdaptiveLinkSender {
316 pub fn new(
318 link_id: u32,
319 keypair: WfbTxKeypair,
320 epoch: u64,
321 fec_k: usize,
322 fec_n: usize,
323 ) -> Result<Self, WfbError> {
324 let channel_id = ChannelId::from_link_port(link_id, RadioPort::TunnelTx);
325 Ok(Self {
326 link: AdaptiveLink::new(),
327 tx: WfbTransmitter::new(channel_id, keypair, epoch, fec_k, fec_n)?,
328 tx_params: TxRadioParams::openipc_uplink_default(),
329 feedback_interval_ms: DEFAULT_FEEDBACK_INTERVAL_MS,
330 session_interval_ms: DEFAULT_SESSION_INTERVAL_MS,
331 last_feedback_ms: None,
332 last_session_ms: None,
333 })
334 }
335
336 pub fn link(&self) -> &AdaptiveLink {
338 &self.link
339 }
340
341 pub fn link_mut(&mut self) -> &mut AdaptiveLink {
343 &mut self.link
344 }
345
346 pub fn set_tx_params(&mut self, params: TxRadioParams) {
348 self.tx_params = params;
349 }
350
351 pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
353 self.link.record_rx_paths(now_ms, rssi, snr);
354 }
355
356 pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
358 self.link.record_fec(now_ms, total, recovered, lost);
359 }
360
361 pub fn tick(&mut self, now_ms: u64) -> Result<Vec<Vec<u8>>, WfbError> {
363 let mut out = Vec::new();
364 let send_session = self
365 .last_session_ms
366 .map(|last| now_ms.saturating_sub(last) >= self.session_interval_ms)
367 .unwrap_or(true);
368 if send_session {
369 out.push(self.tx.session_radio_packet(self.tx_params));
370 self.last_session_ms = Some(now_ms);
371 }
372
373 let send_feedback = self
374 .last_feedback_ms
375 .map(|last| now_ms.saturating_sub(last) >= self.feedback_interval_ms)
376 .unwrap_or(true);
377 if send_feedback {
378 let payload = self.link.feedback_ip_packet(now_ms);
379 out.extend(
380 self.tx
381 .radio_packets_for_payload(&payload, self.tx_params)?,
382 );
383 self.last_feedback_ms = Some(now_ms);
384 }
385 Ok(out)
386 }
387}
388
389pub fn wrap_udp_ipv4_payload(udp_payload: &[u8]) -> Vec<u8> {
391 wrap_udp_ipv4_payload_with_id(udp_payload, 0)
392}
393
394pub fn wrap_udp_ipv4_payload_with_id(udp_payload: &[u8], packet_id: u16) -> Vec<u8> {
396 let udp_len = 8 + udp_payload.len();
397 let ip_len = 20 + udp_len;
398 let mut out = Vec::with_capacity(2 + ip_len);
399 out.extend_from_slice(&(ip_len as u16).to_be_bytes());
400 out.push(0x45);
401 out.push(0x00);
402 out.extend_from_slice(&(ip_len as u16).to_be_bytes());
403 out.extend_from_slice(&packet_id.to_be_bytes());
404 out.extend_from_slice(&0u16.to_be_bytes());
405 out.push(64);
406 out.push(17);
407 out.extend_from_slice(&0u16.to_be_bytes());
408 out.extend_from_slice(&[10, 5, 0, 1]);
409 out.extend_from_slice(&[10, 5, 0, 10]);
410
411 let checksum = ipv4_checksum(&out[2..22]);
412 out[12] = (checksum >> 8) as u8;
413 out[13] = checksum as u8;
414
415 out.extend_from_slice(&54321u16.to_be_bytes());
416 out.extend_from_slice(&9999u16.to_be_bytes());
417 out.extend_from_slice(&(udp_len as u16).to_be_bytes());
418 out.extend_from_slice(&0u16.to_be_bytes());
419 out.extend_from_slice(udp_payload);
420 out
421}
422
423fn avg_signal(entries: &[SignalEntry]) -> (f64, f64) {
424 if entries.is_empty() {
425 return (0.0, 0.0);
426 }
427 let (sum0, sum1) = entries.iter().fold((0i64, 0i64), |acc, entry| {
428 (acc.0 + entry.ant1 as i64, acc.1 + entry.ant2 as i64)
429 });
430 let count = entries.len() as f64;
431 (sum0 as f64 / count, sum1 as f64 / count)
432}
433
434fn link_score(rssi: f64, _snr: f64) -> i32 {
435 map_range(rssi, 0.0, 80.0, 1000.0, 2000.0).round() as i32
439}
440
441fn map_range(input: f64, input_min: f64, input_max: f64, output_min: f64, output_max: f64) -> f64 {
442 let clamped = input.clamp(input_min, input_max);
443 output_min + (clamped - input_min) * (output_max - output_min) / (input_max - input_min)
444}
445
446fn ipv4_checksum(header: &[u8]) -> u16 {
447 let mut sum = 0u32;
448 for chunk in header.chunks_exact(2) {
449 sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32;
450 }
451 while sum >> 16 != 0 {
452 sum = (sum & 0xffff) + (sum >> 16);
453 }
454 !(sum as u16)
455}
456
457fn random_idr_code() -> String {
458 let mut bytes = [0u8; 4];
459 OsRng.fill_bytes(&mut bytes);
460 bytes
461 .iter()
462 .map(|byte| (b'a' + (byte % 26)) as char)
463 .collect()
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn computes_link_quality_and_feedback_payload() {
472 let mut link = AdaptiveLink::new();
473 link.set_keyframe_request_messages(0);
474 link.record_rx(1_000, 80, 70, 35, 25);
475 link.record_fec(1_000, 10, 2, 0);
476 let quality = link.quality(1_050);
477 assert_eq!(quality.rssi, [80, 70]);
478 assert_eq!(quality.snr, [35, 25]);
479 assert_eq!(quality.link_score, [2000, 1875]);
480 assert_eq!(quality.recovered_last_second, 2);
481
482 let payload = link.feedback_udp_payload(1_050);
483 let len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize;
484 assert_eq!(len, payload.len() - 4);
485 let text = std::str::from_utf8(&payload[4..]).unwrap();
486 assert!(text.contains(":2:0:"));
487 assert_eq!(text.trim_end().split(':').count(), 10);
488 assert_eq!(quality.idr_code, "");
489 }
490
491 #[test]
492 fn fec_change_thresholds_match_pixelpilot_defaults() {
493 fn fec_change_for(recovered: u32, lost: u32) -> i32 {
494 let mut link = AdaptiveLink::new();
495 link.set_keyframe_request_messages(0);
496 link.record_rx(1_000, 80, 70, 35, 25);
497 link.record_fec(1_000, recovered + lost, recovered, lost);
498 let payload = link.feedback_udp_payload(1_000);
499 let text = std::str::from_utf8(&payload[4..]).unwrap();
500 text.trim_end()
501 .split(':')
502 .nth(9)
503 .unwrap()
504 .parse::<i32>()
505 .unwrap()
506 }
507
508 assert_eq!(fec_change_for(8, 0), 0);
509 assert_eq!(fec_change_for(9, 0), 1);
510 assert_eq!(fec_change_for(15, 0), 2);
511 assert_eq!(fec_change_for(25, 0), 3);
512 assert_eq!(fec_change_for(31, 0), 4);
513 assert_eq!(fec_change_for(0, 3), 5);
514 }
515
516 #[test]
517 fn keyframe_request_code_is_sent_only_for_active_window() {
518 let mut link = AdaptiveLink::new();
519
520 let no_request = link.feedback_udp_payload(1_000);
521 let text = std::str::from_utf8(&no_request[4..]).unwrap();
522 assert_eq!(text.trim_end().split(':').count(), 10);
523
524 link.record_fec(1_100, 10, 0, 1);
525 let first_request = link.feedback_udp_payload(1_100);
526 let text = std::str::from_utf8(&first_request[4..]).unwrap();
527 let fields: Vec<_> = text.trim_end().split(':').collect();
528 assert_eq!(fields.len(), 11);
529 assert_eq!(fields[10].len(), 4);
530
531 for i in 1..DEFAULT_IDR_REQUEST_MESSAGES {
532 let request = link.feedback_udp_payload(1_100 + i as u64);
533 let text = std::str::from_utf8(&request[4..]).unwrap();
534 assert_eq!(text.trim_end().split(':').count(), 11);
535 }
536
537 let expired = link.feedback_udp_payload(2_000);
538 let text = std::str::from_utf8(&expired[4..]).unwrap();
539 assert_eq!(text.trim_end().split(':').count(), 10);
540 }
541
542 #[test]
543 fn keyframe_request_can_be_disabled() {
544 let mut link = AdaptiveLink::new();
545 link.set_keyframe_request_messages(0);
546 link.record_fec(1_000, 1, 0, 1);
547 assert_eq!(link.quality(1_000).idr_code, "");
548
549 let payload = link.feedback_udp_payload(1_000);
550 let text = std::str::from_utf8(&payload[4..]).unwrap();
551 assert_eq!(text.trim_end().split(':').count(), 10);
552 }
553
554 #[test]
555 fn first_video_after_idle_requests_keyframe() {
556 let mut link = AdaptiveLink::new();
557 link.set_keyframe_request_messages(1);
558
559 link.record_fec(1_000, 10, 0, 0);
560 let payload = link.feedback_udp_payload(1_000);
561 let text = std::str::from_utf8(&payload[4..]).unwrap();
562 assert_eq!(text.trim_end().split(':').count(), 11);
563
564 let expired = link.feedback_udp_payload(1_001);
565 let text = std::str::from_utf8(&expired[4..]).unwrap();
566 assert_eq!(text.trim_end().split(':').count(), 10);
567
568 link.record_fec(1_500, 10, 0, 0);
569 let continuous = link.feedback_udp_payload(1_500);
570 let text = std::str::from_utf8(&continuous[4..]).unwrap();
571 assert_eq!(text.trim_end().split(':').count(), 10);
572
573 link.record_fec(2_500, 10, 0, 0);
574 let restarted = link.feedback_udp_payload(2_500);
575 let text = std::str::from_utf8(&restarted[4..]).unwrap();
576 assert_eq!(text.trim_end().split(':').count(), 11);
577 }
578
579 #[test]
580 fn loss_renews_keyframe_request_during_active_video() {
581 let mut link = AdaptiveLink::new();
582 link.set_keyframe_request_messages(1);
583
584 link.record_fec(1_000, 10, 0, 0);
585 let _ = link.feedback_udp_payload(1_000);
586 link.record_fec(1_100, 10, 0, 1);
587 let payload = link.feedback_udp_payload(1_100);
588 let text = std::str::from_utf8(&payload[4..]).unwrap();
589 assert_eq!(text.trim_end().split(':').count(), 11);
590 }
591
592 #[test]
593 fn wraps_udp_payload_in_length_prefixed_ipv4_packet() {
594 let packet = wrap_udp_ipv4_payload(b"abc");
595 let ip_len = u16::from_be_bytes([packet[0], packet[1]]) as usize;
596 assert_eq!(ip_len, packet.len() - 2);
597 assert_eq!(&packet[2..4], &[0x45, 0x00]);
598 assert_eq!(&packet[14..18], &[10, 5, 0, 1]);
599 assert_eq!(&packet[18..22], &[10, 5, 0, 10]);
600 assert_eq!(u16::from_be_bytes([packet[22], packet[23]]), 54321);
601 assert_eq!(u16::from_be_bytes([packet[24], packet[25]]), 9999);
602 }
603
604 #[test]
605 fn feedback_ip_packet_increments_ipv4_id() {
606 let mut link = AdaptiveLink::new();
607 let first = link.feedback_ip_packet(1_000);
608 let second = link.feedback_ip_packet(1_100);
609 assert_eq!(u16::from_be_bytes([first[6], first[7]]), 0);
610 assert_eq!(u16::from_be_bytes([second[6], second[7]]), 1);
611 }
612}