nebulus 0.1.29

Low-latency native OpenIPC FPV ground station built with egui
use super::{crc8_dvb_s2, MspDirectionFilter, MspVersionFilter, TelemetryUpdate};

const MAX_BUFFER: usize = 8 * 1024;
const MSP_RAW_GPS: u16 = 106;
const MSP_COMP_GPS: u16 = 107;
const MSP_ATTITUDE: u16 = 108;
const MSP_ALTITUDE: u16 = 109;
const MSP_ANALOG: u16 = 110;
const MSP_BATTERY_STATE: u16 = 130;

pub(super) struct Parser {
    buffer: Vec<u8>,
    version: MspVersionFilter,
    direction: MspDirectionFilter,
}

impl Parser {
    pub(super) const fn new(version: MspVersionFilter, direction: MspDirectionFilter) -> Self {
        Self {
            buffer: Vec::new(),
            version,
            direction,
        }
    }

    pub(super) fn push(&mut self, bytes: &[u8]) -> TelemetryUpdate {
        self.buffer.extend_from_slice(bytes);
        if self.buffer.len() > MAX_BUFFER {
            let excess = self.buffer.len() - MAX_BUFFER;
            self.buffer.drain(..excess);
        }

        let mut combined = TelemetryUpdate::default();
        loop {
            let Some(start) = self.buffer.iter().position(|byte| *byte == b'$') else {
                self.buffer.clear();
                break;
            };
            if start > 0 {
                self.buffer.drain(..start);
            }
            if self.buffer.len() < 3 {
                break;
            }
            let parsed = match self.buffer[1] {
                b'M' => self.parse_v1(),
                b'X' => self.parse_v2(),
                _ => {
                    self.buffer.drain(..1);
                    continue;
                }
            };
            match parsed {
                FrameResult::NeedMore => break,
                FrameResult::Invalid(consumed) => {
                    self.buffer.drain(..consumed.min(self.buffer.len()));
                }
                FrameResult::Valid {
                    consumed,
                    version,
                    direction,
                    command,
                    payload,
                } => {
                    self.buffer.drain(..consumed);
                    if !self.accepts(version, direction) {
                        combined.counters.filtered_frames += 1;
                        continue;
                    }
                    combined.counters.accepted_frames += 1;
                    if let Some(update) = decode_message(command, &payload) {
                        combined.merge(update);
                    }
                }
            }
        }
        combined
    }

    fn accepts(&self, version: MspVersionFilter, direction: u8) -> bool {
        let version_matches =
            matches!(self.version, MspVersionFilter::Any) || self.version == version;
        let direction_matches = match self.direction {
            MspDirectionFilter::Any => true,
            MspDirectionFilter::FromFlightController => matches!(direction, b'>' | b'!'),
            MspDirectionFilter::ToFlightController => direction == b'<',
        };
        version_matches && direction_matches
    }

    fn parse_v1(&self) -> FrameResult {
        if self.buffer.len() < 6 {
            return FrameResult::NeedMore;
        }
        if !matches!(self.buffer[2], b'>' | b'<' | b'!') {
            return FrameResult::Invalid(1);
        }
        let payload_len = usize::from(self.buffer[3]);
        let total = 6 + payload_len;
        if self.buffer.len() < total {
            return FrameResult::NeedMore;
        }
        let checksum = self.buffer[3..5 + payload_len]
            .iter()
            .fold(0, |checksum, byte| checksum ^ byte);
        if checksum != self.buffer[5 + payload_len] {
            return FrameResult::Invalid(total);
        }
        FrameResult::Valid {
            consumed: total,
            version: MspVersionFilter::V1,
            direction: self.buffer[2],
            command: u16::from(self.buffer[4]),
            payload: self.buffer[5..5 + payload_len].to_vec(),
        }
    }

    fn parse_v2(&self) -> FrameResult {
        if self.buffer.len() < 9 {
            return FrameResult::NeedMore;
        }
        if !matches!(self.buffer[2], b'>' | b'<' | b'!') {
            return FrameResult::Invalid(1);
        }
        let payload_len = usize::from(u16::from_le_bytes([self.buffer[6], self.buffer[7]]));
        let total = 9 + payload_len;
        if self.buffer.len() < total {
            return FrameResult::NeedMore;
        }
        if crc8_dvb_s2(&self.buffer[3..8 + payload_len]) != self.buffer[8 + payload_len] {
            return FrameResult::Invalid(total);
        }
        FrameResult::Valid {
            consumed: total,
            version: MspVersionFilter::V2,
            direction: self.buffer[2],
            command: u16::from_le_bytes([self.buffer[4], self.buffer[5]]),
            payload: self.buffer[8..8 + payload_len].to_vec(),
        }
    }
}

enum FrameResult {
    NeedMore,
    Invalid(usize),
    Valid {
        consumed: usize,
        version: MspVersionFilter,
        direction: u8,
        command: u16,
        payload: Vec<u8>,
    },
}

fn decode_message(command: u16, payload: &[u8]) -> Option<TelemetryUpdate> {
    let mut update = TelemetryUpdate {
        messages: 1,
        ..TelemetryUpdate::default()
    };
    match command {
        MSP_RAW_GPS if payload.len() >= 16 => {
            update.gps_fix = Some(payload[0]);
            update.satellites = Some(payload[1]);
            update.latitude_deg = coordinate(le_i32(payload, 2)?);
            update.longitude_deg = coordinate(le_i32(payload, 6)?);
            update.altitude_m = Some(f32::from(le_u16(payload, 10)?));
            update.ground_speed_mps = Some(f32::from(le_u16(payload, 12)?) / 100.0);
            update.heading_deg = Some(f32::from(le_u16(payload, 14)?) / 10.0);
        }
        MSP_COMP_GPS if payload.len() >= 4 => {
            update.home_distance_m = Some(f32::from(le_u16(payload, 0)?));
        }
        MSP_ATTITUDE if payload.len() >= 6 => {
            update.roll_deg = Some(f32::from(le_i16(payload, 0)?) / 10.0);
            update.pitch_deg = Some(f32::from(le_i16(payload, 2)?) / 10.0);
            update.yaw_deg = Some(f32::from(le_i16(payload, 4)?));
        }
        MSP_ALTITUDE if payload.len() >= 6 => {
            update.relative_altitude_m = Some(le_i32(payload, 0)? as f32 / 100.0);
            update.vertical_speed_mps = Some(f32::from(le_i16(payload, 4)?) / 100.0);
        }
        MSP_ANALOG if payload.len() >= 7 => {
            update.battery_voltage_v = Some(f32::from(payload[0]) / 10.0);
            update.battery_consumed_mah = Some(u32::from(le_u16(payload, 1)?));
            update.rc_link_quality_pct =
                Some(((u32::from(le_u16(payload, 3)?) * 100) / 1_023).min(100) as u8);
            update.battery_current_a = Some(f32::from(le_i16(payload, 5)?) / 100.0);
            if payload.len() >= 9 {
                let precise_voltage = le_u16(payload, 7)?;
                if precise_voltage > 0 {
                    update.battery_voltage_v = Some(f32::from(precise_voltage) / 100.0);
                }
            }
        }
        MSP_BATTERY_STATE if payload.len() >= 8 => {
            update.battery_voltage_v = Some(f32::from(payload[3]) / 10.0);
            update.battery_consumed_mah = Some(u32::from(le_u16(payload, 4)?));
            update.battery_current_a = Some(f32::from(le_u16(payload, 6)?) / 100.0);
            if payload.len() >= 11 {
                let precise_voltage = le_u16(payload, 9)?;
                if precise_voltage > 0 {
                    update.battery_voltage_v = Some(f32::from(precise_voltage) / 100.0);
                }
            }
        }
        _ => return None,
    }
    Some(update)
}

fn coordinate(value: i32) -> Option<f64> {
    (value != 0).then_some(f64::from(value) / 10_000_000.0)
}

fn le_u16(bytes: &[u8], offset: usize) -> Option<u16> {
    Some(u16::from_le_bytes(
        bytes.get(offset..offset + 2)?.try_into().ok()?,
    ))
}

fn le_i16(bytes: &[u8], offset: usize) -> Option<i16> {
    Some(i16::from_le_bytes(
        bytes.get(offset..offset + 2)?.try_into().ok()?,
    ))
}

fn le_i32(bytes: &[u8], offset: usize) -> Option<i32> {
    Some(i32::from_le_bytes(
        bytes.get(offset..offset + 4)?.try_into().ok()?,
    ))
}

#[cfg(test)]
mod tests {
    use super::{MspDirectionFilter, MspVersionFilter, Parser, MSP_ALTITUDE, MSP_ANALOG};

    fn parser() -> Parser {
        Parser::new(MspVersionFilter::Any, MspDirectionFilter::Any)
    }

    fn v1_frame(command: u8, payload: &[u8]) -> Vec<u8> {
        let mut frame = vec![b'$', b'M', b'>', payload.len() as u8, command];
        frame.extend_from_slice(payload);
        let checksum = frame[3..].iter().fold(0, |checksum, byte| checksum ^ byte);
        frame.push(checksum);
        frame
    }

    #[test]
    fn decodes_msp_analog_and_altitude() {
        let analog = [168, 0x7b, 0x00, 0xff, 0x03, 0x2e, 0x09];
        let mut altitude = Vec::new();
        altitude.extend_from_slice(&12_345i32.to_le_bytes());
        altitude.extend_from_slice(&(-125i16).to_le_bytes());
        let mut bytes = v1_frame(MSP_ANALOG as u8, &analog);
        bytes.extend_from_slice(&v1_frame(MSP_ALTITUDE as u8, &altitude));

        let update = parser().push(&bytes);
        assert_eq!(update.messages, 2);
        assert_eq!(update.battery_voltage_v, Some(16.8));
        assert_eq!(update.battery_current_a, Some(23.5));
        assert_eq!(update.relative_altitude_m, Some(123.45));
        assert_eq!(update.vertical_speed_mps, Some(-1.25));
    }

    #[test]
    fn filters_version_and_direction_after_checksum_validation() {
        let frame = v1_frame(MSP_ANALOG as u8, &[168, 0, 0, 0, 0, 0, 0]);
        let mut parser = Parser::new(
            MspVersionFilter::V2,
            MspDirectionFilter::FromFlightController,
        );
        let update = parser.push(&frame);
        assert_eq!(update.messages, 0);
        assert_eq!(update.counters.filtered_frames, 1);
    }
}