use bytes::{Buf, Bytes, BytesMut};
use mavlink::{MavHeader, MavlinkVersion};
use memchr::memchr2; use std::io::Cursor;
use tracing::warn;
const MAX_BUFFER_SIZE: usize = 1024 * 1024;
pub struct MavlinkFrame {
pub header: MavHeader,
pub message: mavlink::common::MavMessage,
pub version: MavlinkVersion,
pub raw_bytes: Bytes,
}
pub struct StreamParser {
buffer: BytesMut,
}
impl Default for StreamParser {
fn default() -> Self {
Self::new()
}
}
impl StreamParser {
pub fn new() -> Self {
Self {
buffer: BytesMut::with_capacity(4096),
}
}
pub fn push(&mut self, data: &[u8]) {
let new_len = self.buffer.len() + data.len();
if new_len > MAX_BUFFER_SIZE {
let overflow = new_len - MAX_BUFFER_SIZE;
warn!(
"StreamParser buffer full, dropping {} oldest bytes to make room",
overflow
);
if overflow <= self.buffer.len() {
self.buffer.advance(overflow);
} else {
self.buffer.clear();
}
}
self.buffer.extend_from_slice(data);
}
pub fn parse_next(&mut self) -> Option<MavlinkFrame> {
loop {
if self.buffer.is_empty() {
return None;
}
let start_idx = memchr2(0xFD, 0xFE, &self.buffer);
if let Some(idx) = start_idx {
if idx > 0 {
self.buffer.advance(idx);
}
} else {
self.buffer.clear();
return None;
}
let mut cursor = Cursor::new(&self.buffer[..]);
let start_pos = cursor.position();
let res_v2 = mavlink::read_v2_msg::<mavlink::common::MavMessage, _>(&mut cursor);
match res_v2 {
Ok((header, message)) => {
let len = cursor.position() as usize;
let raw_bytes = self.buffer.split_to(len).freeze();
return Some(MavlinkFrame {
header,
message,
version: MavlinkVersion::V2,
raw_bytes,
});
}
Err(e) => {
cursor.set_position(start_pos);
let res_v1 =
mavlink::read_v1_msg::<mavlink::common::MavMessage, _>(&mut cursor);
match res_v1 {
Ok((header, message)) => {
let len = cursor.position() as usize;
let raw_bytes = self.buffer.split_to(len).freeze();
return Some(MavlinkFrame {
header,
message,
version: MavlinkVersion::V1,
raw_bytes,
});
}
Err(e_v1) => {
if is_eof(&e) || is_eof(&e_v1) {
return None;
}
self.buffer.advance(1);
continue;
}
}
}
}
}
}
}
fn is_eof(e: &mavlink::error::MessageReadError) -> bool {
match e {
mavlink::error::MessageReadError::Io(io_err) => {
io_err.kind() == std::io::ErrorKind::UnexpectedEof
}
_ => false,
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
use mavlink::common::MavMessage;
use mavlink::Message;
#[test]
fn test_partial_packet() {
let mut parser = StreamParser::new();
let header = MavHeader::default();
let msg = MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA::default());
let mut buf = Vec::new();
mavlink::write_v2_msg(&mut buf, header, &msg).expect("Failed to write test message");
let split_idx = buf.len() / 2;
parser.push(&buf[..split_idx]);
assert!(parser.parse_next().is_none());
parser.push(&buf[split_idx..]);
let res = parser.parse_next();
assert!(res.is_some());
assert_eq!(
res.expect("Should have parsed packet").message.message_id(),
0
);
}
#[test]
fn test_v1_packet_parsing() {
let mut parser = StreamParser::new();
let header = MavHeader {
system_id: 1,
component_id: 1,
sequence: 0,
};
let msg = MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA::default());
let mut buf = Vec::new();
mavlink::write_v1_msg(&mut buf, header, &msg).expect("Failed to write V1 message");
parser.push(&buf);
let res = parser.parse_next();
assert!(res.is_some());
let frame = res.expect("Should parse V1 packet");
assert_eq!(frame.version, MavlinkVersion::V1);
assert_eq!(frame.header.system_id, 1);
}
#[test]
fn test_v2_packet_parsing() {
let mut parser = StreamParser::new();
let header = MavHeader {
system_id: 255,
component_id: 190,
sequence: 42,
};
let msg = MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA::default());
let mut buf = Vec::new();
mavlink::write_v2_msg(&mut buf, header, &msg).expect("Failed to write V2 message");
parser.push(&buf);
let res = parser.parse_next();
assert!(res.is_some());
let frame = res.expect("Should parse V2 packet");
assert_eq!(frame.version, MavlinkVersion::V2);
assert_eq!(frame.header.system_id, 255);
assert_eq!(frame.header.component_id, 190);
}
#[test]
fn test_garbage_before_packet() {
let mut parser = StreamParser::new();
let header = MavHeader::default();
let msg = MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA::default());
let mut buf = Vec::new();
mavlink::write_v2_msg(&mut buf, header, &msg).expect("Failed to write message");
let mut garbage = vec![0x00, 0x11, 0x22, 0x33, 0x44];
garbage.extend_from_slice(&buf);
parser.push(&garbage);
let res = parser.parse_next();
assert!(res.is_some(), "Should skip garbage and find packet");
}
#[test]
fn test_empty_buffer_returns_none() {
let mut parser = StreamParser::new();
assert!(parser.parse_next().is_none());
}
#[test]
fn test_no_stx_clears_buffer() {
let mut parser = StreamParser::new();
parser.push(&[0x00, 0x11, 0x22, 0x33]);
assert!(parser.parse_next().is_none());
}
#[test]
fn test_multiple_packets_in_sequence() {
let mut parser = StreamParser::new();
let header = MavHeader::default();
let msg = MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA::default());
let mut buf = Vec::new();
mavlink::write_v2_msg(&mut buf, header, &msg).expect("write msg 1");
mavlink::write_v2_msg(&mut buf, header, &msg).expect("write msg 2");
mavlink::write_v2_msg(&mut buf, header, &msg).expect("write msg 3");
parser.push(&buf);
assert!(parser.parse_next().is_some());
assert!(parser.parse_next().is_some());
assert!(parser.parse_next().is_some());
assert!(parser.parse_next().is_none());
}
#[test]
fn test_default_trait() {
let parser = StreamParser::default();
assert!(parser.buffer.is_empty());
}
}