Skip to main content

openipc_core/
adaptive.rs

1use rand_core::{OsRng, RngCore};
2
3use crate::channel::{ChannelId, RadioPort};
4use crate::radiotap::TxRadioParams;
5use crate::wfb::WfbError;
6use crate::wfb_tx::{WfbTransmitter, WfbTxKeypair};
7
8const WINDOW_MS: u64 = 1_000;
9const DEFAULT_FEEDBACK_INTERVAL_MS: u64 = 100;
10const DEFAULT_SESSION_INTERVAL_MS: u64 = 1_000;
11const DEFAULT_IDR_REQUEST_MESSAGES: u32 = 20;
12const DEFAULT_VIDEO_START_IDLE_MS: u64 = 1_000;
13
14/// Link-quality report used by OpenIPC adaptive-link feedback.
15#[derive(Debug, Clone, PartialEq)]
16pub struct LinkQuality {
17    /// Lost FEC fragments over the recent window.
18    pub lost_last_second: u32,
19    /// Recovered FEC fragments over the recent window.
20    pub recovered_last_second: u32,
21    /// Total FEC fragments over the recent window.
22    pub total_last_second: u32,
23    /// Averaged RSSI for the first two RF paths.
24    pub rssi: [i32; 2],
25    /// Averaged SNR for the first two RF paths.
26    pub snr: [i32; 2],
27    /// Computed link-score values for the first two RF paths.
28    pub link_score: [i32; 2],
29    /// IDR/keyframe request code to include in feedback.
30    pub idr_code: String,
31}
32
33#[derive(Debug, Clone)]
34struct SignalEntry {
35    at_ms: u64,
36    ant1: i32,
37    ant2: i32,
38}
39
40#[derive(Debug, Clone)]
41struct FecEntry {
42    at_ms: u64,
43    total: u32,
44    recovered: u32,
45    lost: u32,
46}
47
48#[derive(Debug, Clone)]
49struct FecController {
50    enabled: bool,
51    value: i32,
52    last_change_ms: u64,
53}
54
55impl FecController {
56    const fn new() -> Self {
57        Self {
58            enabled: true,
59            value: 0,
60            last_change_ms: 0,
61        }
62    }
63
64    fn value(&mut self, now_ms: u64) -> i32 {
65        if !self.enabled {
66            return 0;
67        }
68        self.decay(now_ms);
69        self.value
70    }
71
72    fn bump(&mut self, now_ms: u64, new_value: i32) {
73        if new_value > self.value {
74            self.value = new_value;
75            self.last_change_ms = now_ms;
76        }
77    }
78
79    fn decay(&mut self, now_ms: u64) {
80        if self.value == 0 {
81            return;
82        }
83        let elapsed = now_ms.saturating_sub(self.last_change_ms);
84        if elapsed < WINDOW_MS {
85            return;
86        }
87        let ticks = (elapsed / WINDOW_MS) as i32;
88        self.value = (self.value - ticks).max(0);
89        self.last_change_ms = self.last_change_ms.saturating_add(ticks as u64 * WINDOW_MS);
90    }
91}
92
93/// Adaptive-link quality estimator and feedback payload builder.
94#[derive(Debug, Clone)]
95pub struct AdaptiveLink {
96    rssi: Vec<SignalEntry>,
97    snr: Vec<SignalEntry>,
98    fec: Vec<FecEntry>,
99    fec_controller: FecController,
100    idr_code: Option<String>,
101    idr_remaining_messages: u32,
102    idr_max_messages: u32,
103    last_video_activity_ms: Option<u64>,
104    video_start_idle_ms: u64,
105    ip_packet_id: u16,
106}
107
108impl AdaptiveLink {
109    /// Create an empty adaptive-link estimator.
110    pub fn new() -> Self {
111        Self {
112            rssi: Vec::new(),
113            snr: Vec::new(),
114            fec: Vec::new(),
115            fec_controller: FecController::new(),
116            idr_code: None,
117            idr_remaining_messages: 0,
118            idr_max_messages: DEFAULT_IDR_REQUEST_MESSAGES,
119            last_video_activity_ms: None,
120            video_start_idle_ms: DEFAULT_VIDEO_START_IDLE_MS,
121            ip_packet_id: 0,
122        }
123    }
124
125    /// Record RSSI/SNR arrays from a Realtek RX descriptor.
126    pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
127        self.record_rx(now_ms, rssi[0], rssi[1], snr[0], snr[1]);
128    }
129
130    /// Record RSSI/SNR for the first two RF paths.
131    pub fn record_rx(&mut self, now_ms: u64, rssi0: u8, rssi1: u8, snr0: i8, snr1: i8) {
132        self.rssi.push(SignalEntry {
133            at_ms: now_ms,
134            ant1: rssi0 as i32,
135            ant2: rssi1 as i32,
136        });
137        self.snr.push(SignalEntry {
138            at_ms: now_ms,
139            ant1: snr0 as i32,
140            ant2: snr1 as i32,
141        });
142        self.cleanup(now_ms);
143    }
144
145    /// Record FEC totals for the current quality window.
146    pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
147        if total == 0 && recovered == 0 && lost == 0 {
148            return;
149        }
150        let video_started = self.video_started_after_idle(now_ms);
151        self.last_video_activity_ms = Some(now_ms);
152        if video_started || lost > 0 {
153            self.request_keyframe();
154        }
155        self.fec.push(FecEntry {
156            at_ms: now_ms,
157            total,
158            recovered,
159            lost,
160        });
161        self.cleanup(now_ms);
162    }
163
164    /// Request keyframes in upcoming feedback packets.
165    pub fn request_keyframe(&mut self) {
166        if self.idr_max_messages == 0 {
167            self.idr_code = None;
168            self.idr_remaining_messages = 0;
169            return;
170        }
171        self.idr_code = Some(random_idr_code());
172        self.idr_remaining_messages = self.idr_max_messages;
173    }
174
175    /// Configure how many feedback packets carry a keyframe request.
176    pub fn set_keyframe_request_messages(&mut self, messages: u32) {
177        self.idr_max_messages = messages;
178        if self.idr_remaining_messages > messages {
179            self.idr_remaining_messages = messages;
180        }
181        if messages == 0 {
182            self.idr_code = None;
183            self.idr_remaining_messages = 0;
184        }
185    }
186
187    /// Configure the idle duration after which new video asks for keyframes.
188    pub fn set_video_start_idle_ms(&mut self, idle_ms: u64) {
189        self.video_start_idle_ms = idle_ms;
190    }
191
192    /// Compute current quality over the rolling window.
193    pub fn quality(&mut self, now_ms: u64) -> LinkQuality {
194        self.cleanup(now_ms);
195        let (avg_rssi0, avg_rssi1) = avg_signal(&self.rssi);
196        let (avg_snr0, avg_snr1) = avg_signal(&self.snr);
197        let (total, recovered, lost) = self.fec.iter().fold((0u32, 0u32, 0u32), |acc, entry| {
198            (
199                acc.0.saturating_add(entry.total),
200                acc.1.saturating_add(entry.recovered),
201                acc.2.saturating_add(entry.lost),
202            )
203        });
204
205        let rssi = [avg_rssi0.round() as i32, avg_rssi1.round() as i32];
206        let snr = [avg_snr0.round() as i32, avg_snr1.round() as i32];
207        let link_score = [
208            link_score(avg_rssi0, avg_snr0),
209            link_score(avg_rssi1, avg_snr1),
210        ];
211
212        LinkQuality {
213            lost_last_second: lost,
214            recovered_last_second: recovered,
215            total_last_second: total,
216            rssi,
217            snr,
218            link_score,
219            idr_code: self
220                .idr_code
221                .clone()
222                .filter(|_| self.idr_remaining_messages > 0)
223                .unwrap_or_default(),
224        }
225    }
226
227    /// Build the adaptive-link UDP payload.
228    pub fn feedback_udp_payload(&mut self, now_ms: u64) -> Vec<u8> {
229        let quality = self.quality(now_ms);
230        if quality.lost_last_second > 2 {
231            self.fec_controller.bump(now_ms, 5);
232        } else if quality.recovered_last_second > 30 {
233            self.fec_controller.bump(now_ms, 4);
234        } else if quality.recovered_last_second > 24 {
235            self.fec_controller.bump(now_ms, 3);
236        } else if quality.recovered_last_second > 14 {
237            self.fec_controller.bump(now_ms, 2);
238        } else if quality.recovered_last_second > 8 {
239            self.fec_controller.bump(now_ms, 1);
240        }
241
242        let fec_change = self.fec_controller.value(now_ms);
243        let best_link_score = quality.link_score[0].max(quality.link_score[1]);
244        let best_rssi = quality.rssi[0].max(quality.rssi[1]);
245        let best_snr = quality.snr[0].max(quality.snr[1]);
246        let mut message = format!(
247            "{}:{}:{}:{}:{}:{}:{:.6}:0:-1:{}",
248            now_ms / 1000,
249            best_link_score,
250            best_link_score,
251            quality.recovered_last_second,
252            quality.lost_last_second,
253            best_rssi,
254            best_snr as f64,
255            fec_change
256        );
257        let idr_code = (self.idr_remaining_messages > 0)
258            .then(|| self.idr_code.clone())
259            .flatten();
260        if let Some(idr_code) = idr_code {
261            message.push(':');
262            message.push_str(&idr_code);
263            self.idr_remaining_messages = self.idr_remaining_messages.saturating_sub(1);
264            if self.idr_remaining_messages == 0 {
265                self.idr_code = None;
266            }
267        }
268        message.push('\n');
269        let mut udp_payload = Vec::with_capacity(4 + message.len());
270        udp_payload.extend_from_slice(&(message.len() as u32).to_be_bytes());
271        udp_payload.extend_from_slice(message.as_bytes());
272        udp_payload
273    }
274
275    /// Build a length-prefixed IPv4/UDP feedback packet.
276    pub fn feedback_ip_packet(&mut self, now_ms: u64) -> Vec<u8> {
277        let packet =
278            wrap_udp_ipv4_payload_with_id(&self.feedback_udp_payload(now_ms), self.ip_packet_id);
279        self.ip_packet_id = self.ip_packet_id.wrapping_add(1);
280        packet
281    }
282
283    fn cleanup(&mut self, now_ms: u64) {
284        let cutoff = now_ms.saturating_sub(WINDOW_MS);
285        self.rssi.retain(|entry| entry.at_ms >= cutoff);
286        self.snr.retain(|entry| entry.at_ms >= cutoff);
287        self.fec.retain(|entry| entry.at_ms >= cutoff);
288    }
289
290    fn video_started_after_idle(&self, now_ms: u64) -> bool {
291        self.last_video_activity_ms
292            .map(|last| now_ms.saturating_sub(last) >= self.video_start_idle_ms)
293            .unwrap_or(true)
294    }
295}
296
297impl Default for AdaptiveLink {
298    fn default() -> Self {
299        Self::new()
300    }
301}
302
303/// Periodic encrypted WFB sender for adaptive-link feedback.
304#[derive(Debug, Clone)]
305pub struct AdaptiveLinkSender {
306    link: AdaptiveLink,
307    tx: WfbTransmitter,
308    tx_params: TxRadioParams,
309    feedback_interval_ms: u64,
310    session_interval_ms: u64,
311    last_feedback_ms: Option<u64>,
312    last_session_ms: Option<u64>,
313}
314
315impl AdaptiveLinkSender {
316    /// Create an adaptive-link sender for a link id and TX keypair.
317    pub fn new(
318        link_id: u32,
319        keypair: WfbTxKeypair,
320        epoch: u64,
321        fec_k: usize,
322        fec_n: usize,
323    ) -> Result<Self, WfbError> {
324        let channel_id = ChannelId::from_link_port(link_id, RadioPort::TunnelTx);
325        Ok(Self {
326            link: AdaptiveLink::new(),
327            tx: WfbTransmitter::new(channel_id, keypair, epoch, fec_k, fec_n)?,
328            tx_params: TxRadioParams::openipc_uplink_default(),
329            feedback_interval_ms: DEFAULT_FEEDBACK_INTERVAL_MS,
330            session_interval_ms: DEFAULT_SESSION_INTERVAL_MS,
331            last_feedback_ms: None,
332            last_session_ms: None,
333        })
334    }
335
336    /// Borrow the quality estimator.
337    pub fn link(&self) -> &AdaptiveLink {
338        &self.link
339    }
340
341    /// Mutably borrow the quality estimator.
342    pub fn link_mut(&mut self) -> &mut AdaptiveLink {
343        &mut self.link
344    }
345
346    /// Override radiotap/TX parameters for generated feedback frames.
347    pub fn set_tx_params(&mut self, params: TxRadioParams) {
348        self.tx_params = params;
349    }
350
351    /// Record RSSI/SNR arrays from a Realtek RX descriptor.
352    pub fn record_rx_paths(&mut self, now_ms: u64, rssi: [u8; 4], snr: [i8; 4]) {
353        self.link.record_rx_paths(now_ms, rssi, snr);
354    }
355
356    /// Record FEC totals for the current quality window.
357    pub fn record_fec(&mut self, now_ms: u64, total: u32, recovered: u32, lost: u32) {
358        self.link.record_fec(now_ms, total, recovered, lost);
359    }
360
361    /// Return WFB radio packets that should be transmitted at `now_ms`.
362    pub fn tick(&mut self, now_ms: u64) -> Result<Vec<Vec<u8>>, WfbError> {
363        let mut out = Vec::new();
364        let send_session = self
365            .last_session_ms
366            .map(|last| now_ms.saturating_sub(last) >= self.session_interval_ms)
367            .unwrap_or(true);
368        if send_session {
369            out.push(self.tx.session_radio_packet(self.tx_params));
370            self.last_session_ms = Some(now_ms);
371        }
372
373        let send_feedback = self
374            .last_feedback_ms
375            .map(|last| now_ms.saturating_sub(last) >= self.feedback_interval_ms)
376            .unwrap_or(true);
377        if send_feedback {
378            let payload = self.link.feedback_ip_packet(now_ms);
379            out.extend(
380                self.tx
381                    .radio_packets_for_payload(&payload, self.tx_params)?,
382            );
383            self.last_feedback_ms = Some(now_ms);
384        }
385        Ok(out)
386    }
387}
388
389/// Wrap a UDP payload in the adaptive-link length-prefixed IPv4/UDP shape.
390pub fn wrap_udp_ipv4_payload(udp_payload: &[u8]) -> Vec<u8> {
391    wrap_udp_ipv4_payload_with_id(udp_payload, 0)
392}
393
394/// Wrap a UDP payload with an explicit IPv4 packet id.
395pub fn wrap_udp_ipv4_payload_with_id(udp_payload: &[u8], packet_id: u16) -> Vec<u8> {
396    let udp_len = 8 + udp_payload.len();
397    let ip_len = 20 + udp_len;
398    let mut out = Vec::with_capacity(2 + ip_len);
399    out.extend_from_slice(&(ip_len as u16).to_be_bytes());
400    out.push(0x45);
401    out.push(0x00);
402    out.extend_from_slice(&(ip_len as u16).to_be_bytes());
403    out.extend_from_slice(&packet_id.to_be_bytes());
404    out.extend_from_slice(&0u16.to_be_bytes());
405    out.push(64);
406    out.push(17);
407    out.extend_from_slice(&0u16.to_be_bytes());
408    out.extend_from_slice(&[10, 5, 0, 1]);
409    out.extend_from_slice(&[10, 5, 0, 10]);
410
411    let checksum = ipv4_checksum(&out[2..22]);
412    out[12] = (checksum >> 8) as u8;
413    out[13] = checksum as u8;
414
415    out.extend_from_slice(&54321u16.to_be_bytes());
416    out.extend_from_slice(&9999u16.to_be_bytes());
417    out.extend_from_slice(&(udp_len as u16).to_be_bytes());
418    out.extend_from_slice(&0u16.to_be_bytes());
419    out.extend_from_slice(udp_payload);
420    out
421}
422
423fn avg_signal(entries: &[SignalEntry]) -> (f64, f64) {
424    if entries.is_empty() {
425        return (0.0, 0.0);
426    }
427    let (sum0, sum1) = entries.iter().fold((0i64, 0i64), |acc, entry| {
428        (acc.0 + entry.ant1 as i64, acc.1 + entry.ant2 as i64)
429    });
430    let count = entries.len() as f64;
431    (sum0 as f64 / count, sum1 as f64 / count)
432}
433
434fn link_score(rssi: f64, _snr: f64) -> i32 {
435    // PixelPilot maps the Realtek raw RSSI byte from 0..80 onto the
436    // adaptive-link score range. Keep SNR in the public report and message
437    // fields, but use the same score scale for transmitter decisions.
438    map_range(rssi, 0.0, 80.0, 1000.0, 2000.0).round() as i32
439}
440
441fn map_range(input: f64, input_min: f64, input_max: f64, output_min: f64, output_max: f64) -> f64 {
442    let clamped = input.clamp(input_min, input_max);
443    output_min + (clamped - input_min) * (output_max - output_min) / (input_max - input_min)
444}
445
446fn ipv4_checksum(header: &[u8]) -> u16 {
447    let mut sum = 0u32;
448    for chunk in header.chunks_exact(2) {
449        sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32;
450    }
451    while sum >> 16 != 0 {
452        sum = (sum & 0xffff) + (sum >> 16);
453    }
454    !(sum as u16)
455}
456
457fn random_idr_code() -> String {
458    let mut bytes = [0u8; 4];
459    OsRng.fill_bytes(&mut bytes);
460    bytes
461        .iter()
462        .map(|byte| (b'a' + (byte % 26)) as char)
463        .collect()
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn computes_link_quality_and_feedback_payload() {
472        let mut link = AdaptiveLink::new();
473        link.set_keyframe_request_messages(0);
474        link.record_rx(1_000, 80, 70, 35, 25);
475        link.record_fec(1_000, 10, 2, 0);
476        let quality = link.quality(1_050);
477        assert_eq!(quality.rssi, [80, 70]);
478        assert_eq!(quality.snr, [35, 25]);
479        assert_eq!(quality.link_score, [2000, 1875]);
480        assert_eq!(quality.recovered_last_second, 2);
481
482        let payload = link.feedback_udp_payload(1_050);
483        let len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize;
484        assert_eq!(len, payload.len() - 4);
485        let text = std::str::from_utf8(&payload[4..]).unwrap();
486        assert!(text.contains(":2:0:"));
487        assert_eq!(text.trim_end().split(':').count(), 10);
488        assert_eq!(quality.idr_code, "");
489    }
490
491    #[test]
492    fn fec_change_thresholds_match_pixelpilot_defaults() {
493        fn fec_change_for(recovered: u32, lost: u32) -> i32 {
494            let mut link = AdaptiveLink::new();
495            link.set_keyframe_request_messages(0);
496            link.record_rx(1_000, 80, 70, 35, 25);
497            link.record_fec(1_000, recovered + lost, recovered, lost);
498            let payload = link.feedback_udp_payload(1_000);
499            let text = std::str::from_utf8(&payload[4..]).unwrap();
500            text.trim_end()
501                .split(':')
502                .nth(9)
503                .unwrap()
504                .parse::<i32>()
505                .unwrap()
506        }
507
508        assert_eq!(fec_change_for(8, 0), 0);
509        assert_eq!(fec_change_for(9, 0), 1);
510        assert_eq!(fec_change_for(15, 0), 2);
511        assert_eq!(fec_change_for(25, 0), 3);
512        assert_eq!(fec_change_for(31, 0), 4);
513        assert_eq!(fec_change_for(0, 3), 5);
514    }
515
516    #[test]
517    fn keyframe_request_code_is_sent_only_for_active_window() {
518        let mut link = AdaptiveLink::new();
519
520        let no_request = link.feedback_udp_payload(1_000);
521        let text = std::str::from_utf8(&no_request[4..]).unwrap();
522        assert_eq!(text.trim_end().split(':').count(), 10);
523
524        link.record_fec(1_100, 10, 0, 1);
525        let first_request = link.feedback_udp_payload(1_100);
526        let text = std::str::from_utf8(&first_request[4..]).unwrap();
527        let fields: Vec<_> = text.trim_end().split(':').collect();
528        assert_eq!(fields.len(), 11);
529        assert_eq!(fields[10].len(), 4);
530
531        for i in 1..DEFAULT_IDR_REQUEST_MESSAGES {
532            let request = link.feedback_udp_payload(1_100 + i as u64);
533            let text = std::str::from_utf8(&request[4..]).unwrap();
534            assert_eq!(text.trim_end().split(':').count(), 11);
535        }
536
537        let expired = link.feedback_udp_payload(2_000);
538        let text = std::str::from_utf8(&expired[4..]).unwrap();
539        assert_eq!(text.trim_end().split(':').count(), 10);
540    }
541
542    #[test]
543    fn keyframe_request_can_be_disabled() {
544        let mut link = AdaptiveLink::new();
545        link.set_keyframe_request_messages(0);
546        link.record_fec(1_000, 1, 0, 1);
547        assert_eq!(link.quality(1_000).idr_code, "");
548
549        let payload = link.feedback_udp_payload(1_000);
550        let text = std::str::from_utf8(&payload[4..]).unwrap();
551        assert_eq!(text.trim_end().split(':').count(), 10);
552    }
553
554    #[test]
555    fn first_video_after_idle_requests_keyframe() {
556        let mut link = AdaptiveLink::new();
557        link.set_keyframe_request_messages(1);
558
559        link.record_fec(1_000, 10, 0, 0);
560        let payload = link.feedback_udp_payload(1_000);
561        let text = std::str::from_utf8(&payload[4..]).unwrap();
562        assert_eq!(text.trim_end().split(':').count(), 11);
563
564        let expired = link.feedback_udp_payload(1_001);
565        let text = std::str::from_utf8(&expired[4..]).unwrap();
566        assert_eq!(text.trim_end().split(':').count(), 10);
567
568        link.record_fec(1_500, 10, 0, 0);
569        let continuous = link.feedback_udp_payload(1_500);
570        let text = std::str::from_utf8(&continuous[4..]).unwrap();
571        assert_eq!(text.trim_end().split(':').count(), 10);
572
573        link.record_fec(2_500, 10, 0, 0);
574        let restarted = link.feedback_udp_payload(2_500);
575        let text = std::str::from_utf8(&restarted[4..]).unwrap();
576        assert_eq!(text.trim_end().split(':').count(), 11);
577    }
578
579    #[test]
580    fn loss_renews_keyframe_request_during_active_video() {
581        let mut link = AdaptiveLink::new();
582        link.set_keyframe_request_messages(1);
583
584        link.record_fec(1_000, 10, 0, 0);
585        let _ = link.feedback_udp_payload(1_000);
586        link.record_fec(1_100, 10, 0, 1);
587        let payload = link.feedback_udp_payload(1_100);
588        let text = std::str::from_utf8(&payload[4..]).unwrap();
589        assert_eq!(text.trim_end().split(':').count(), 11);
590    }
591
592    #[test]
593    fn wraps_udp_payload_in_length_prefixed_ipv4_packet() {
594        let packet = wrap_udp_ipv4_payload(b"abc");
595        let ip_len = u16::from_be_bytes([packet[0], packet[1]]) as usize;
596        assert_eq!(ip_len, packet.len() - 2);
597        assert_eq!(&packet[2..4], &[0x45, 0x00]);
598        assert_eq!(&packet[14..18], &[10, 5, 0, 1]);
599        assert_eq!(&packet[18..22], &[10, 5, 0, 10]);
600        assert_eq!(u16::from_be_bytes([packet[22], packet[23]]), 54321);
601        assert_eq!(u16::from_be_bytes([packet[24], packet[25]]), 9999);
602    }
603
604    #[test]
605    fn feedback_ip_packet_increments_ipv4_id() {
606        let mut link = AdaptiveLink::new();
607        let first = link.feedback_ip_packet(1_000);
608        let second = link.feedback_ip_packet(1_100);
609        assert_eq!(u16::from_be_bytes([first[6], first[7]]), 0);
610        assert_eq!(u16::from_be_bytes([second[6], second[7]]), 1);
611    }
612}