1use rand_core::{OsRng, RngCore};
2
3use crate::channel::{ChannelId, RadioPort};
4use crate::radiotap::TxRadioParams;
5use crate::wfb::WfbError;
6use crate::wfb_tx::{WfbTransmitter, WfbTxKeypair};
7
8const WINDOW_MS: u64 = 1_000;
9const DEFAULT_FEEDBACK_INTERVAL_MS: u64 = 100;
10const DEFAULT_SESSION_INTERVAL_MS: u64 = 1_000;
11const DEFAULT_IDR_REQUEST_MESSAGES: u32 = 20;
12const DEFAULT_VIDEO_START_IDLE_MS: u64 = 1_000;
13
14#[derive(Debug, Clone, PartialEq)]
15pub struct LinkQuality {
16 pub lost_last_second: u32,
17 pub recovered_last_second: u32,
18 pub total_last_second: u32,
19 pub rssi: [i32; 2],
20 pub snr: [i32; 2],
21 pub link_score: [i32; 2],
22 pub idr_code: String,
23}
24
25#[derive(Debug, Clone)]
26struct SignalEntry {
27 at_ms: u64,
28 ant1: i32,
29 ant2: i32,
30}
31
32#[derive(Debug, Clone)]
33struct FecEntry {
34 at_ms: u64,
35 total: u32,
36 recovered: u32,
37 lost: u32,
38}
39
40#[derive(Debug, Clone)]
41struct FecController {
42 enabled: bool,
43 value: i32,
44 last_change_ms: u64,
45}
46
47impl FecController {
48 const fn new() -> Self {
49 Self {
50 enabled: true,
51 value: 0,
52 last_change_ms: 0,
53 }
54 }
55
56 fn value(&mut self, now_ms: u64) -> i32 {
57 if !self.enabled {
58 return 0;
59 }
60 self.decay(now_ms);
61 self.value
62 }
63
64 fn bump(&mut self, now_ms: u64, new_value: i32) {
65 if new_value > self.value {
66 self.value = new_value;
67 self.last_change_ms = now_ms;
68 }
69 }
70
71 fn decay(&mut self, now_ms: u64) {
72 if self.value == 0 {
73 return;
74 }
75 let elapsed = now_ms.saturating_sub(self.last_change_ms);
76 if elapsed < WINDOW_MS {
77 return;
78 }
79 let ticks = (elapsed / WINDOW_MS) as i32;
80 self.value = (self.value - ticks).max(0);
81 self.last_change_ms = self.last_change_ms.saturating_add(ticks as u64 * WINDOW_MS);
82 }
83}
84
85#[derive(Debug, Clone)]
86pub struct AdaptiveLink {
87 rssi: Vec<SignalEntry>,
88 snr: Vec<SignalEntry>,
89 fec: Vec<FecEntry>,
90 fec_controller: FecController,
91 idr_code: Option<String>,
92 idr_remaining_messages: u32,
93 idr_max_messages: u32,
94 last_video_activity_ms: Option<u64>,
95 video_start_idle_ms: u64,
96 ip_packet_id: u16,
97}
98
99impl AdaptiveLink {
100 pub fn new() -> Self {
101 Self {
102 rssi: Vec::new(),
103 snr: Vec::new(),
104 fec: Vec::new(),
105 fec_controller: FecController::new(),
106 idr_code: None,
107 idr_remaining_messages: 0,
108 idr_max_messages: DEFAULT_IDR_REQUEST_MESSAGES,
109 last_video_activity_ms: None,
110 video_start_idle_ms: DEFAULT_VIDEO_START_IDLE_MS,
111 ip_packet_id: 0,
112 }
113 }
114
115 pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
116 self.record_rx(now_ms, rssi[0], rssi[1], snr[0], snr[1]);
117 }
118
119 pub fn record_rx(&mut self, now_ms: u64, rssi0: u8, rssi1: u8, snr0: i8, snr1: i8) {
120 self.rssi.push(SignalEntry {
121 at_ms: now_ms,
122 ant1: rssi0 as i32,
123 ant2: rssi1 as i32,
124 });
125 self.snr.push(SignalEntry {
126 at_ms: now_ms,
127 ant1: snr0 as i32,
128 ant2: snr1 as i32,
129 });
130 self.cleanup(now_ms);
131 }
132
133 pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
134 if total == 0 && recovered == 0 && lost == 0 {
135 return;
136 }
137 let video_started = self.video_started_after_idle(now_ms);
138 self.last_video_activity_ms = Some(now_ms);
139 if video_started || lost > 0 {
140 self.request_keyframe();
141 }
142 self.fec.push(FecEntry {
143 at_ms: now_ms,
144 total,
145 recovered,
146 lost,
147 });
148 self.cleanup(now_ms);
149 }
150
151 pub fn request_keyframe(&mut self) {
152 if self.idr_max_messages == 0 {
153 self.idr_code = None;
154 self.idr_remaining_messages = 0;
155 return;
156 }
157 self.idr_code = Some(random_idr_code());
158 self.idr_remaining_messages = self.idr_max_messages;
159 }
160
161 pub fn set_keyframe_request_messages(&mut self, messages: u32) {
162 self.idr_max_messages = messages;
163 if self.idr_remaining_messages > messages {
164 self.idr_remaining_messages = messages;
165 }
166 if messages == 0 {
167 self.idr_code = None;
168 self.idr_remaining_messages = 0;
169 }
170 }
171
172 pub fn set_video_start_idle_ms(&mut self, idle_ms: u64) {
173 self.video_start_idle_ms = idle_ms;
174 }
175
176 pub fn quality(&mut self, now_ms: u64) -> LinkQuality {
177 self.cleanup(now_ms);
178 let (avg_rssi0, avg_rssi1) = avg_signal(&self.rssi);
179 let (avg_snr0, avg_snr1) = avg_signal(&self.snr);
180 let (total, recovered, lost) = self.fec.iter().fold((0u32, 0u32, 0u32), |acc, entry| {
181 (
182 acc.0.saturating_add(entry.total),
183 acc.1.saturating_add(entry.recovered),
184 acc.2.saturating_add(entry.lost),
185 )
186 });
187
188 let rssi = [avg_rssi0.round() as i32, avg_rssi1.round() as i32];
189 let snr = [avg_snr0.round() as i32, avg_snr1.round() as i32];
190 let link_score = [
191 link_score(avg_rssi0, avg_snr0),
192 link_score(avg_rssi1, avg_snr1),
193 ];
194
195 LinkQuality {
196 lost_last_second: lost,
197 recovered_last_second: recovered,
198 total_last_second: total,
199 rssi,
200 snr,
201 link_score,
202 idr_code: self
203 .idr_code
204 .clone()
205 .filter(|_| self.idr_remaining_messages > 0)
206 .unwrap_or_default(),
207 }
208 }
209
210 pub fn feedback_udp_payload(&mut self, now_ms: u64) -> Vec<u8> {
211 let quality = self.quality(now_ms);
212 if quality.lost_last_second > 2 || quality.recovered_last_second > 30 {
213 self.fec_controller.bump(now_ms, 5);
214 } else if quality.recovered_last_second > 24 {
215 self.fec_controller.bump(now_ms, 3);
216 } else if quality.recovered_last_second > 22 {
217 self.fec_controller.bump(now_ms, 2);
218 } else if quality.recovered_last_second > 18 {
219 self.fec_controller.bump(now_ms, 1);
220 } else if quality.recovered_last_second < 18 {
221 self.fec_controller.bump(now_ms, 0);
222 }
223
224 let fec_change = self.fec_controller.value(now_ms);
225 let best_link_score = quality.link_score[0].max(quality.link_score[1]);
226 let best_rssi = quality.rssi[0].max(quality.rssi[1]);
227 let best_snr = quality.snr[0].max(quality.snr[1]);
228 let mut message = format!(
229 "{}:{}:{}:{}:{}:{}:{:.6}:0:-1:{}",
230 now_ms / 1000,
231 best_link_score,
232 best_link_score,
233 quality.recovered_last_second,
234 quality.lost_last_second,
235 best_rssi,
236 best_snr as f64,
237 fec_change
238 );
239 let idr_code = (self.idr_remaining_messages > 0)
240 .then(|| self.idr_code.clone())
241 .flatten();
242 if let Some(idr_code) = idr_code {
243 message.push(':');
244 message.push_str(&idr_code);
245 self.idr_remaining_messages = self.idr_remaining_messages.saturating_sub(1);
246 if self.idr_remaining_messages == 0 {
247 self.idr_code = None;
248 }
249 }
250 message.push('\n');
251 let mut udp_payload = Vec::with_capacity(4 + message.len());
252 udp_payload.extend_from_slice(&(message.len() as u32).to_be_bytes());
253 udp_payload.extend_from_slice(message.as_bytes());
254 udp_payload
255 }
256
257 pub fn feedback_ip_packet(&mut self, now_ms: u64) -> Vec<u8> {
258 let packet =
259 wrap_udp_ipv4_payload_with_id(&self.feedback_udp_payload(now_ms), self.ip_packet_id);
260 self.ip_packet_id = self.ip_packet_id.wrapping_add(1);
261 packet
262 }
263
264 fn cleanup(&mut self, now_ms: u64) {
265 let cutoff = now_ms.saturating_sub(WINDOW_MS);
266 self.rssi.retain(|entry| entry.at_ms >= cutoff);
267 self.snr.retain(|entry| entry.at_ms >= cutoff);
268 self.fec.retain(|entry| entry.at_ms >= cutoff);
269 }
270
271 fn video_started_after_idle(&self, now_ms: u64) -> bool {
272 self.last_video_activity_ms
273 .map(|last| now_ms.saturating_sub(last) >= self.video_start_idle_ms)
274 .unwrap_or(true)
275 }
276}
277
278impl Default for AdaptiveLink {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[derive(Debug, Clone)]
285pub struct AdaptiveLinkSender {
286 link: AdaptiveLink,
287 tx: WfbTransmitter,
288 tx_params: TxRadioParams,
289 feedback_interval_ms: u64,
290 session_interval_ms: u64,
291 last_feedback_ms: Option<u64>,
292 last_session_ms: Option<u64>,
293}
294
295impl AdaptiveLinkSender {
296 pub fn new(
297 link_id: u32,
298 keypair: WfbTxKeypair,
299 epoch: u64,
300 fec_k: usize,
301 fec_n: usize,
302 ) -> Result<Self, WfbError> {
303 let channel_id = ChannelId::from_link_port(link_id, RadioPort::MavlinkTx);
304 Ok(Self {
305 link: AdaptiveLink::new(),
306 tx: WfbTransmitter::new(channel_id, keypair, epoch, fec_k, fec_n)?,
307 tx_params: TxRadioParams::openipc_uplink_default(),
308 feedback_interval_ms: DEFAULT_FEEDBACK_INTERVAL_MS,
309 session_interval_ms: DEFAULT_SESSION_INTERVAL_MS,
310 last_feedback_ms: None,
311 last_session_ms: None,
312 })
313 }
314
315 pub fn link(&self) -> &AdaptiveLink {
316 &self.link
317 }
318
319 pub fn link_mut(&mut self) -> &mut AdaptiveLink {
320 &mut self.link
321 }
322
323 pub fn set_tx_params(&mut self, params: TxRadioParams) {
324 self.tx_params = params;
325 }
326
327 pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
328 self.link.record_rx_paths(now_ms, rssi, snr);
329 }
330
331 pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
332 self.link.record_fec(now_ms, total, recovered, lost);
333 }
334
335 pub fn tick(&mut self, now_ms: u64) -> Result<Vec<Vec<u8>>, WfbError> {
336 let mut out = Vec::new();
337 let send_session = self
338 .last_session_ms
339 .map(|last| now_ms.saturating_sub(last) >= self.session_interval_ms)
340 .unwrap_or(true);
341 if send_session {
342 out.push(self.tx.session_radio_packet(self.tx_params));
343 self.last_session_ms = Some(now_ms);
344 }
345
346 let send_feedback = self
347 .last_feedback_ms
348 .map(|last| now_ms.saturating_sub(last) >= self.feedback_interval_ms)
349 .unwrap_or(true);
350 if send_feedback {
351 let payload = self.link.feedback_ip_packet(now_ms);
352 out.extend(
353 self.tx
354 .radio_packets_for_payload(&payload, self.tx_params)?,
355 );
356 self.last_feedback_ms = Some(now_ms);
357 }
358 Ok(out)
359 }
360}
361
362pub fn wrap_udp_ipv4_payload(udp_payload: &[u8]) -> Vec<u8> {
363 wrap_udp_ipv4_payload_with_id(udp_payload, 0)
364}
365
366pub fn wrap_udp_ipv4_payload_with_id(udp_payload: &[u8], packet_id: u16) -> Vec<u8> {
367 let udp_len = 8 + udp_payload.len();
368 let ip_len = 20 + udp_len;
369 let mut out = Vec::with_capacity(2 + ip_len);
370 out.extend_from_slice(&(ip_len as u16).to_be_bytes());
371 out.push(0x45);
372 out.push(0x00);
373 out.extend_from_slice(&(ip_len as u16).to_be_bytes());
374 out.extend_from_slice(&packet_id.to_be_bytes());
375 out.extend_from_slice(&0u16.to_be_bytes());
376 out.push(64);
377 out.push(17);
378 out.extend_from_slice(&0u16.to_be_bytes());
379 out.extend_from_slice(&[10, 5, 0, 1]);
380 out.extend_from_slice(&[10, 5, 0, 10]);
381
382 let checksum = ipv4_checksum(&out[2..22]);
383 out[12] = (checksum >> 8) as u8;
384 out[13] = checksum as u8;
385
386 out.extend_from_slice(&54321u16.to_be_bytes());
387 out.extend_from_slice(&9999u16.to_be_bytes());
388 out.extend_from_slice(&(udp_len as u16).to_be_bytes());
389 out.extend_from_slice(&0u16.to_be_bytes());
390 out.extend_from_slice(udp_payload);
391 out
392}
393
394fn avg_signal(entries: &[SignalEntry]) -> (f64, f64) {
395 if entries.is_empty() {
396 return (0.0, 0.0);
397 }
398 let (sum0, sum1) = entries.iter().fold((0i64, 0i64), |acc, entry| {
399 (acc.0 + entry.ant1 as i64, acc.1 + entry.ant2 as i64)
400 });
401 let count = entries.len() as f64;
402 (sum0 as f64 / count, sum1 as f64 / count)
403}
404
405fn link_score(rssi: f64, snr: f64) -> i32 {
406 (0.5 * map_range(rssi, 50.0, 110.0, 1000.0, 2000.0)
407 + 0.5 * map_range(snr, 20.0, 50.0, 1000.0, 2000.0))
408 .round() as i32
409}
410
411fn map_range(input: f64, input_min: f64, input_max: f64, output_min: f64, output_max: f64) -> f64 {
412 let clamped = input.clamp(input_min, input_max);
413 output_min + (clamped - input_min) * (output_max - output_min) / (input_max - input_min)
414}
415
416fn ipv4_checksum(header: &[u8]) -> u16 {
417 let mut sum = 0u32;
418 for chunk in header.chunks_exact(2) {
419 sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32;
420 }
421 while sum >> 16 != 0 {
422 sum = (sum & 0xffff) + (sum >> 16);
423 }
424 !(sum as u16)
425}
426
427fn random_idr_code() -> String {
428 let mut bytes = [0u8; 4];
429 OsRng.fill_bytes(&mut bytes);
430 bytes
431 .iter()
432 .map(|byte| (b'a' + (byte % 26)) as char)
433 .collect()
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
441 fn computes_link_quality_and_feedback_payload() {
442 let mut link = AdaptiveLink::new();
443 link.set_keyframe_request_messages(0);
444 link.record_rx(1_000, 80, 70, 35, 25);
445 link.record_fec(1_000, 10, 2, 0);
446 let quality = link.quality(1_050);
447 assert_eq!(quality.rssi, [80, 70]);
448 assert_eq!(quality.snr, [35, 25]);
449 assert_eq!(quality.recovered_last_second, 2);
450
451 let payload = link.feedback_udp_payload(1_050);
452 let len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize;
453 assert_eq!(len, payload.len() - 4);
454 let text = std::str::from_utf8(&payload[4..]).unwrap();
455 assert!(text.contains(":2:0:"));
456 assert_eq!(text.trim_end().split(':').count(), 10);
457 assert_eq!(quality.idr_code, "");
458 }
459
460 #[test]
461 fn keyframe_request_code_is_sent_only_for_active_window() {
462 let mut link = AdaptiveLink::new();
463
464 let no_request = link.feedback_udp_payload(1_000);
465 let text = std::str::from_utf8(&no_request[4..]).unwrap();
466 assert_eq!(text.trim_end().split(':').count(), 10);
467
468 link.record_fec(1_100, 10, 0, 1);
469 let first_request = link.feedback_udp_payload(1_100);
470 let text = std::str::from_utf8(&first_request[4..]).unwrap();
471 let fields: Vec<_> = text.trim_end().split(':').collect();
472 assert_eq!(fields.len(), 11);
473 assert_eq!(fields[10].len(), 4);
474
475 for i in 1..DEFAULT_IDR_REQUEST_MESSAGES {
476 let request = link.feedback_udp_payload(1_100 + i as u64);
477 let text = std::str::from_utf8(&request[4..]).unwrap();
478 assert_eq!(text.trim_end().split(':').count(), 11);
479 }
480
481 let expired = link.feedback_udp_payload(2_000);
482 let text = std::str::from_utf8(&expired[4..]).unwrap();
483 assert_eq!(text.trim_end().split(':').count(), 10);
484 }
485
486 #[test]
487 fn keyframe_request_can_be_disabled() {
488 let mut link = AdaptiveLink::new();
489 link.set_keyframe_request_messages(0);
490 link.record_fec(1_000, 1, 0, 1);
491 assert_eq!(link.quality(1_000).idr_code, "");
492
493 let payload = link.feedback_udp_payload(1_000);
494 let text = std::str::from_utf8(&payload[4..]).unwrap();
495 assert_eq!(text.trim_end().split(':').count(), 10);
496 }
497
498 #[test]
499 fn first_video_after_idle_requests_keyframe() {
500 let mut link = AdaptiveLink::new();
501 link.set_keyframe_request_messages(1);
502
503 link.record_fec(1_000, 10, 0, 0);
504 let payload = link.feedback_udp_payload(1_000);
505 let text = std::str::from_utf8(&payload[4..]).unwrap();
506 assert_eq!(text.trim_end().split(':').count(), 11);
507
508 let expired = link.feedback_udp_payload(1_001);
509 let text = std::str::from_utf8(&expired[4..]).unwrap();
510 assert_eq!(text.trim_end().split(':').count(), 10);
511
512 link.record_fec(1_500, 10, 0, 0);
513 let continuous = link.feedback_udp_payload(1_500);
514 let text = std::str::from_utf8(&continuous[4..]).unwrap();
515 assert_eq!(text.trim_end().split(':').count(), 10);
516
517 link.record_fec(2_500, 10, 0, 0);
518 let restarted = link.feedback_udp_payload(2_500);
519 let text = std::str::from_utf8(&restarted[4..]).unwrap();
520 assert_eq!(text.trim_end().split(':').count(), 11);
521 }
522
523 #[test]
524 fn loss_renews_keyframe_request_during_active_video() {
525 let mut link = AdaptiveLink::new();
526 link.set_keyframe_request_messages(1);
527
528 link.record_fec(1_000, 10, 0, 0);
529 let _ = link.feedback_udp_payload(1_000);
530 link.record_fec(1_100, 10, 0, 1);
531 let payload = link.feedback_udp_payload(1_100);
532 let text = std::str::from_utf8(&payload[4..]).unwrap();
533 assert_eq!(text.trim_end().split(':').count(), 11);
534 }
535
536 #[test]
537 fn wraps_udp_payload_in_length_prefixed_ipv4_packet() {
538 let packet = wrap_udp_ipv4_payload(b"abc");
539 let ip_len = u16::from_be_bytes([packet[0], packet[1]]) as usize;
540 assert_eq!(ip_len, packet.len() - 2);
541 assert_eq!(&packet[2..4], &[0x45, 0x00]);
542 assert_eq!(&packet[14..18], &[10, 5, 0, 1]);
543 assert_eq!(&packet[18..22], &[10, 5, 0, 10]);
544 assert_eq!(u16::from_be_bytes([packet[22], packet[23]]), 54321);
545 assert_eq!(u16::from_be_bytes([packet[24], packet[25]]), 9999);
546 }
547
548 #[test]
549 fn feedback_ip_packet_increments_ipv4_id() {
550 let mut link = AdaptiveLink::new();
551 let first = link.feedback_ip_packet(1_000);
552 let second = link.feedback_ip_packet(1_100);
553 assert_eq!(u16::from_be_bytes([first[6], first[7]]), 0);
554 assert_eq!(u16::from_be_bytes([second[6], second[7]]), 1);
555 }
556}