Skip to main content

openipc_core/
adaptive.rs

1use rand_core::{OsRng, RngCore};
2
3use crate::channel::{ChannelId, RadioPort};
4use crate::radiotap::TxRadioParams;
5use crate::wfb::WfbError;
6use crate::wfb_tx::{WfbTransmitter, WfbTxKeypair};
7
8const WINDOW_MS: u64 = 1_000;
9const DEFAULT_FEEDBACK_INTERVAL_MS: u64 = 100;
10const DEFAULT_SESSION_INTERVAL_MS: u64 = 1_000;
11const DEFAULT_IDR_REQUEST_MESSAGES: u32 = 20;
12const DEFAULT_VIDEO_START_IDLE_MS: u64 = 1_000;
13
14#[derive(Debug, Clone, PartialEq)]
15pub struct LinkQuality {
16    pub lost_last_second: u32,
17    pub recovered_last_second: u32,
18    pub total_last_second: u32,
19    pub rssi: [i32; 2],
20    pub snr: [i32; 2],
21    pub link_score: [i32; 2],
22    pub idr_code: String,
23}
24
25#[derive(Debug, Clone)]
26struct SignalEntry {
27    at_ms: u64,
28    ant1: i32,
29    ant2: i32,
30}
31
32#[derive(Debug, Clone)]
33struct FecEntry {
34    at_ms: u64,
35    total: u32,
36    recovered: u32,
37    lost: u32,
38}
39
40#[derive(Debug, Clone)]
41struct FecController {
42    enabled: bool,
43    value: i32,
44    last_change_ms: u64,
45}
46
47impl FecController {
48    const fn new() -> Self {
49        Self {
50            enabled: true,
51            value: 0,
52            last_change_ms: 0,
53        }
54    }
55
56    fn value(&mut self, now_ms: u64) -> i32 {
57        if !self.enabled {
58            return 0;
59        }
60        self.decay(now_ms);
61        self.value
62    }
63
64    fn bump(&mut self, now_ms: u64, new_value: i32) {
65        if new_value > self.value {
66            self.value = new_value;
67            self.last_change_ms = now_ms;
68        }
69    }
70
71    fn decay(&mut self, now_ms: u64) {
72        if self.value == 0 {
73            return;
74        }
75        let elapsed = now_ms.saturating_sub(self.last_change_ms);
76        if elapsed < WINDOW_MS {
77            return;
78        }
79        let ticks = (elapsed / WINDOW_MS) as i32;
80        self.value = (self.value - ticks).max(0);
81        self.last_change_ms = self.last_change_ms.saturating_add(ticks as u64 * WINDOW_MS);
82    }
83}
84
85#[derive(Debug, Clone)]
86pub struct AdaptiveLink {
87    rssi: Vec<SignalEntry>,
88    snr: Vec<SignalEntry>,
89    fec: Vec<FecEntry>,
90    fec_controller: FecController,
91    idr_code: Option<String>,
92    idr_remaining_messages: u32,
93    idr_max_messages: u32,
94    last_video_activity_ms: Option<u64>,
95    video_start_idle_ms: u64,
96    ip_packet_id: u16,
97}
98
99impl AdaptiveLink {
100    pub fn new() -> Self {
101        Self {
102            rssi: Vec::new(),
103            snr: Vec::new(),
104            fec: Vec::new(),
105            fec_controller: FecController::new(),
106            idr_code: None,
107            idr_remaining_messages: 0,
108            idr_max_messages: DEFAULT_IDR_REQUEST_MESSAGES,
109            last_video_activity_ms: None,
110            video_start_idle_ms: DEFAULT_VIDEO_START_IDLE_MS,
111            ip_packet_id: 0,
112        }
113    }
114
115    pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
116        self.record_rx(now_ms, rssi[0], rssi[1], snr[0], snr[1]);
117    }
118
119    pub fn record_rx(&mut self, now_ms: u64, rssi0: u8, rssi1: u8, snr0: i8, snr1: i8) {
120        self.rssi.push(SignalEntry {
121            at_ms: now_ms,
122            ant1: rssi0 as i32,
123            ant2: rssi1 as i32,
124        });
125        self.snr.push(SignalEntry {
126            at_ms: now_ms,
127            ant1: snr0 as i32,
128            ant2: snr1 as i32,
129        });
130        self.cleanup(now_ms);
131    }
132
133    pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
134        if total == 0 && recovered == 0 && lost == 0 {
135            return;
136        }
137        let video_started = self.video_started_after_idle(now_ms);
138        self.last_video_activity_ms = Some(now_ms);
139        if video_started || lost > 0 {
140            self.request_keyframe();
141        }
142        self.fec.push(FecEntry {
143            at_ms: now_ms,
144            total,
145            recovered,
146            lost,
147        });
148        self.cleanup(now_ms);
149    }
150
151    pub fn request_keyframe(&mut self) {
152        if self.idr_max_messages == 0 {
153            self.idr_code = None;
154            self.idr_remaining_messages = 0;
155            return;
156        }
157        self.idr_code = Some(random_idr_code());
158        self.idr_remaining_messages = self.idr_max_messages;
159    }
160
161    pub fn set_keyframe_request_messages(&mut self, messages: u32) {
162        self.idr_max_messages = messages;
163        if self.idr_remaining_messages > messages {
164            self.idr_remaining_messages = messages;
165        }
166        if messages == 0 {
167            self.idr_code = None;
168            self.idr_remaining_messages = 0;
169        }
170    }
171
172    pub fn set_video_start_idle_ms(&mut self, idle_ms: u64) {
173        self.video_start_idle_ms = idle_ms;
174    }
175
176    pub fn quality(&mut self, now_ms: u64) -> LinkQuality {
177        self.cleanup(now_ms);
178        let (avg_rssi0, avg_rssi1) = avg_signal(&self.rssi);
179        let (avg_snr0, avg_snr1) = avg_signal(&self.snr);
180        let (total, recovered, lost) = self.fec.iter().fold((0u32, 0u32, 0u32), |acc, entry| {
181            (
182                acc.0.saturating_add(entry.total),
183                acc.1.saturating_add(entry.recovered),
184                acc.2.saturating_add(entry.lost),
185            )
186        });
187
188        let rssi = [avg_rssi0.round() as i32, avg_rssi1.round() as i32];
189        let snr = [avg_snr0.round() as i32, avg_snr1.round() as i32];
190        let link_score = [
191            link_score(avg_rssi0, avg_snr0),
192            link_score(avg_rssi1, avg_snr1),
193        ];
194
195        LinkQuality {
196            lost_last_second: lost,
197            recovered_last_second: recovered,
198            total_last_second: total,
199            rssi,
200            snr,
201            link_score,
202            idr_code: self
203                .idr_code
204                .clone()
205                .filter(|_| self.idr_remaining_messages > 0)
206                .unwrap_or_default(),
207        }
208    }
209
210    pub fn feedback_udp_payload(&mut self, now_ms: u64) -> Vec<u8> {
211        let quality = self.quality(now_ms);
212        if quality.lost_last_second > 2 || quality.recovered_last_second > 30 {
213            self.fec_controller.bump(now_ms, 5);
214        } else if quality.recovered_last_second > 24 {
215            self.fec_controller.bump(now_ms, 3);
216        } else if quality.recovered_last_second > 22 {
217            self.fec_controller.bump(now_ms, 2);
218        } else if quality.recovered_last_second > 18 {
219            self.fec_controller.bump(now_ms, 1);
220        } else if quality.recovered_last_second < 18 {
221            self.fec_controller.bump(now_ms, 0);
222        }
223
224        let fec_change = self.fec_controller.value(now_ms);
225        let best_link_score = quality.link_score[0].max(quality.link_score[1]);
226        let best_rssi = quality.rssi[0].max(quality.rssi[1]);
227        let best_snr = quality.snr[0].max(quality.snr[1]);
228        let mut message = format!(
229            "{}:{}:{}:{}:{}:{}:{:.6}:0:-1:{}",
230            now_ms / 1000,
231            best_link_score,
232            best_link_score,
233            quality.recovered_last_second,
234            quality.lost_last_second,
235            best_rssi,
236            best_snr as f64,
237            fec_change
238        );
239        let idr_code = (self.idr_remaining_messages > 0)
240            .then(|| self.idr_code.clone())
241            .flatten();
242        if let Some(idr_code) = idr_code {
243            message.push(':');
244            message.push_str(&idr_code);
245            self.idr_remaining_messages = self.idr_remaining_messages.saturating_sub(1);
246            if self.idr_remaining_messages == 0 {
247                self.idr_code = None;
248            }
249        }
250        message.push('\n');
251        let mut udp_payload = Vec::with_capacity(4 + message.len());
252        udp_payload.extend_from_slice(&(message.len() as u32).to_be_bytes());
253        udp_payload.extend_from_slice(message.as_bytes());
254        udp_payload
255    }
256
257    pub fn feedback_ip_packet(&mut self, now_ms: u64) -> Vec<u8> {
258        let packet =
259            wrap_udp_ipv4_payload_with_id(&self.feedback_udp_payload(now_ms), self.ip_packet_id);
260        self.ip_packet_id = self.ip_packet_id.wrapping_add(1);
261        packet
262    }
263
264    fn cleanup(&mut self, now_ms: u64) {
265        let cutoff = now_ms.saturating_sub(WINDOW_MS);
266        self.rssi.retain(|entry| entry.at_ms >= cutoff);
267        self.snr.retain(|entry| entry.at_ms >= cutoff);
268        self.fec.retain(|entry| entry.at_ms >= cutoff);
269    }
270
271    fn video_started_after_idle(&self, now_ms: u64) -> bool {
272        self.last_video_activity_ms
273            .map(|last| now_ms.saturating_sub(last) >= self.video_start_idle_ms)
274            .unwrap_or(true)
275    }
276}
277
278impl Default for AdaptiveLink {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284#[derive(Debug, Clone)]
285pub struct AdaptiveLinkSender {
286    link: AdaptiveLink,
287    tx: WfbTransmitter,
288    tx_params: TxRadioParams,
289    feedback_interval_ms: u64,
290    session_interval_ms: u64,
291    last_feedback_ms: Option<u64>,
292    last_session_ms: Option<u64>,
293}
294
295impl AdaptiveLinkSender {
296    pub fn new(
297        link_id: u32,
298        keypair: WfbTxKeypair,
299        epoch: u64,
300        fec_k: usize,
301        fec_n: usize,
302    ) -> Result<Self, WfbError> {
303        let channel_id = ChannelId::from_link_port(link_id, RadioPort::MavlinkTx);
304        Ok(Self {
305            link: AdaptiveLink::new(),
306            tx: WfbTransmitter::new(channel_id, keypair, epoch, fec_k, fec_n)?,
307            tx_params: TxRadioParams::openipc_uplink_default(),
308            feedback_interval_ms: DEFAULT_FEEDBACK_INTERVAL_MS,
309            session_interval_ms: DEFAULT_SESSION_INTERVAL_MS,
310            last_feedback_ms: None,
311            last_session_ms: None,
312        })
313    }
314
315    pub fn link(&self) -> &AdaptiveLink {
316        &self.link
317    }
318
319    pub fn link_mut(&mut self) -> &mut AdaptiveLink {
320        &mut self.link
321    }
322
323    pub fn set_tx_params(&mut self, params: TxRadioParams) {
324        self.tx_params = params;
325    }
326
327    pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
328        self.link.record_rx_paths(now_ms, rssi, snr);
329    }
330
331    pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
332        self.link.record_fec(now_ms, total, recovered, lost);
333    }
334
335    pub fn tick(&mut self, now_ms: u64) -> Result<Vec<Vec<u8>>, WfbError> {
336        let mut out = Vec::new();
337        let send_session = self
338            .last_session_ms
339            .map(|last| now_ms.saturating_sub(last) >= self.session_interval_ms)
340            .unwrap_or(true);
341        if send_session {
342            out.push(self.tx.session_radio_packet(self.tx_params));
343            self.last_session_ms = Some(now_ms);
344        }
345
346        let send_feedback = self
347            .last_feedback_ms
348            .map(|last| now_ms.saturating_sub(last) >= self.feedback_interval_ms)
349            .unwrap_or(true);
350        if send_feedback {
351            let payload = self.link.feedback_ip_packet(now_ms);
352            out.extend(
353                self.tx
354                    .radio_packets_for_payload(&payload, self.tx_params)?,
355            );
356            self.last_feedback_ms = Some(now_ms);
357        }
358        Ok(out)
359    }
360}
361
362pub fn wrap_udp_ipv4_payload(udp_payload: &[u8]) -> Vec<u8> {
363    wrap_udp_ipv4_payload_with_id(udp_payload, 0)
364}
365
366pub fn wrap_udp_ipv4_payload_with_id(udp_payload: &[u8], packet_id: u16) -> Vec<u8> {
367    let udp_len = 8 + udp_payload.len();
368    let ip_len = 20 + udp_len;
369    let mut out = Vec::with_capacity(2 + ip_len);
370    out.extend_from_slice(&(ip_len as u16).to_be_bytes());
371    out.push(0x45);
372    out.push(0x00);
373    out.extend_from_slice(&(ip_len as u16).to_be_bytes());
374    out.extend_from_slice(&packet_id.to_be_bytes());
375    out.extend_from_slice(&0u16.to_be_bytes());
376    out.push(64);
377    out.push(17);
378    out.extend_from_slice(&0u16.to_be_bytes());
379    out.extend_from_slice(&[10, 5, 0, 1]);
380    out.extend_from_slice(&[10, 5, 0, 10]);
381
382    let checksum = ipv4_checksum(&out[2..22]);
383    out[12] = (checksum >> 8) as u8;
384    out[13] = checksum as u8;
385
386    out.extend_from_slice(&54321u16.to_be_bytes());
387    out.extend_from_slice(&9999u16.to_be_bytes());
388    out.extend_from_slice(&(udp_len as u16).to_be_bytes());
389    out.extend_from_slice(&0u16.to_be_bytes());
390    out.extend_from_slice(udp_payload);
391    out
392}
393
394fn avg_signal(entries: &[SignalEntry]) -> (f64, f64) {
395    if entries.is_empty() {
396        return (0.0, 0.0);
397    }
398    let (sum0, sum1) = entries.iter().fold((0i64, 0i64), |acc, entry| {
399        (acc.0 + entry.ant1 as i64, acc.1 + entry.ant2 as i64)
400    });
401    let count = entries.len() as f64;
402    (sum0 as f64 / count, sum1 as f64 / count)
403}
404
405fn link_score(rssi: f64, snr: f64) -> i32 {
406    (0.5 * map_range(rssi, 50.0, 110.0, 1000.0, 2000.0)
407        + 0.5 * map_range(snr, 20.0, 50.0, 1000.0, 2000.0))
408    .round() as i32
409}
410
411fn map_range(input: f64, input_min: f64, input_max: f64, output_min: f64, output_max: f64) -> f64 {
412    let clamped = input.clamp(input_min, input_max);
413    output_min + (clamped - input_min) * (output_max - output_min) / (input_max - input_min)
414}
415
416fn ipv4_checksum(header: &[u8]) -> u16 {
417    let mut sum = 0u32;
418    for chunk in header.chunks_exact(2) {
419        sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32;
420    }
421    while sum >> 16 != 0 {
422        sum = (sum & 0xffff) + (sum >> 16);
423    }
424    !(sum as u16)
425}
426
427fn random_idr_code() -> String {
428    let mut bytes = [0u8; 4];
429    OsRng.fill_bytes(&mut bytes);
430    bytes
431        .iter()
432        .map(|byte| (b'a' + (byte % 26)) as char)
433        .collect()
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn computes_link_quality_and_feedback_payload() {
442        let mut link = AdaptiveLink::new();
443        link.set_keyframe_request_messages(0);
444        link.record_rx(1_000, 80, 70, 35, 25);
445        link.record_fec(1_000, 10, 2, 0);
446        let quality = link.quality(1_050);
447        assert_eq!(quality.rssi, [80, 70]);
448        assert_eq!(quality.snr, [35, 25]);
449        assert_eq!(quality.recovered_last_second, 2);
450
451        let payload = link.feedback_udp_payload(1_050);
452        let len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize;
453        assert_eq!(len, payload.len() - 4);
454        let text = std::str::from_utf8(&payload[4..]).unwrap();
455        assert!(text.contains(":2:0:"));
456        assert_eq!(text.trim_end().split(':').count(), 10);
457        assert_eq!(quality.idr_code, "");
458    }
459
460    #[test]
461    fn keyframe_request_code_is_sent_only_for_active_window() {
462        let mut link = AdaptiveLink::new();
463
464        let no_request = link.feedback_udp_payload(1_000);
465        let text = std::str::from_utf8(&no_request[4..]).unwrap();
466        assert_eq!(text.trim_end().split(':').count(), 10);
467
468        link.record_fec(1_100, 10, 0, 1);
469        let first_request = link.feedback_udp_payload(1_100);
470        let text = std::str::from_utf8(&first_request[4..]).unwrap();
471        let fields: Vec<_> = text.trim_end().split(':').collect();
472        assert_eq!(fields.len(), 11);
473        assert_eq!(fields[10].len(), 4);
474
475        for i in 1..DEFAULT_IDR_REQUEST_MESSAGES {
476            let request = link.feedback_udp_payload(1_100 + i as u64);
477            let text = std::str::from_utf8(&request[4..]).unwrap();
478            assert_eq!(text.trim_end().split(':').count(), 11);
479        }
480
481        let expired = link.feedback_udp_payload(2_000);
482        let text = std::str::from_utf8(&expired[4..]).unwrap();
483        assert_eq!(text.trim_end().split(':').count(), 10);
484    }
485
486    #[test]
487    fn keyframe_request_can_be_disabled() {
488        let mut link = AdaptiveLink::new();
489        link.set_keyframe_request_messages(0);
490        link.record_fec(1_000, 1, 0, 1);
491        assert_eq!(link.quality(1_000).idr_code, "");
492
493        let payload = link.feedback_udp_payload(1_000);
494        let text = std::str::from_utf8(&payload[4..]).unwrap();
495        assert_eq!(text.trim_end().split(':').count(), 10);
496    }
497
498    #[test]
499    fn first_video_after_idle_requests_keyframe() {
500        let mut link = AdaptiveLink::new();
501        link.set_keyframe_request_messages(1);
502
503        link.record_fec(1_000, 10, 0, 0);
504        let payload = link.feedback_udp_payload(1_000);
505        let text = std::str::from_utf8(&payload[4..]).unwrap();
506        assert_eq!(text.trim_end().split(':').count(), 11);
507
508        let expired = link.feedback_udp_payload(1_001);
509        let text = std::str::from_utf8(&expired[4..]).unwrap();
510        assert_eq!(text.trim_end().split(':').count(), 10);
511
512        link.record_fec(1_500, 10, 0, 0);
513        let continuous = link.feedback_udp_payload(1_500);
514        let text = std::str::from_utf8(&continuous[4..]).unwrap();
515        assert_eq!(text.trim_end().split(':').count(), 10);
516
517        link.record_fec(2_500, 10, 0, 0);
518        let restarted = link.feedback_udp_payload(2_500);
519        let text = std::str::from_utf8(&restarted[4..]).unwrap();
520        assert_eq!(text.trim_end().split(':').count(), 11);
521    }
522
523    #[test]
524    fn loss_renews_keyframe_request_during_active_video() {
525        let mut link = AdaptiveLink::new();
526        link.set_keyframe_request_messages(1);
527
528        link.record_fec(1_000, 10, 0, 0);
529        let _ = link.feedback_udp_payload(1_000);
530        link.record_fec(1_100, 10, 0, 1);
531        let payload = link.feedback_udp_payload(1_100);
532        let text = std::str::from_utf8(&payload[4..]).unwrap();
533        assert_eq!(text.trim_end().split(':').count(), 11);
534    }
535
536    #[test]
537    fn wraps_udp_payload_in_length_prefixed_ipv4_packet() {
538        let packet = wrap_udp_ipv4_payload(b"abc");
539        let ip_len = u16::from_be_bytes([packet[0], packet[1]]) as usize;
540        assert_eq!(ip_len, packet.len() - 2);
541        assert_eq!(&packet[2..4], &[0x45, 0x00]);
542        assert_eq!(&packet[14..18], &[10, 5, 0, 1]);
543        assert_eq!(&packet[18..22], &[10, 5, 0, 10]);
544        assert_eq!(u16::from_be_bytes([packet[22], packet[23]]), 54321);
545        assert_eq!(u16::from_be_bytes([packet[24], packet[25]]), 9999);
546    }
547
548    #[test]
549    fn feedback_ip_packet_increments_ipv4_id() {
550        let mut link = AdaptiveLink::new();
551        let first = link.feedback_ip_packet(1_000);
552        let second = link.feedback_ip_packet(1_100);
553        assert_eq!(u16::from_be_bytes([first[6], first[7]]), 0);
554        assert_eq!(u16::from_be_bytes([second[6], second[7]]), 1);
555    }
556}