use mavlink::async_peek_reader::AsyncPeekReader;
use mavlink::common::MavMessage;
use mavlink::{ReadVersion, read_versioned_msg_async};
use tokio::io::AsyncRead;
#[derive(Debug)]
pub struct TlogEntry {
pub timestamp_usec: u64,
pub message: MavMessage,
}
#[derive(Debug, thiserror::Error)]
pub enum TlogError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("MAVLink parse error: {0}")]
Parse(#[from] mavlink::error::MessageReadError),
}
const MAX_RECOVERY_ATTEMPTS: usize = 256;
pub struct TlogReader<R: AsyncRead + Unpin> {
reader: AsyncPeekReader<R>,
}
impl<R: AsyncRead + Unpin> TlogReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader: AsyncPeekReader::new(reader),
}
}
pub async fn next(&mut self) -> Result<Option<TlogEntry>, TlogError> {
let ts_bytes = match self.reader.read_exact(8).await {
Ok(bytes) => bytes.try_into().unwrap(),
Err(mavlink::error::MessageReadError::Io(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
return Ok(None);
}
Err(e) => return Err(TlogError::Parse(e)),
};
let timestamp_usec = u64::from_le_bytes(ts_bytes);
let mut attempts = 0;
loop {
match read_versioned_msg_async::<MavMessage, _>(&mut self.reader, ReadVersion::Any)
.await
{
Ok((_header, message)) => {
return Ok(Some(TlogEntry {
timestamp_usec,
message,
}));
}
Err(mavlink::error::MessageReadError::Io(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
return Ok(None);
}
Err(mavlink::error::MessageReadError::Parse(_)) => {
attempts += 1;
if attempts >= MAX_RECOVERY_ATTEMPTS {
return Ok(None);
}
continue;
}
Err(e) => return Err(TlogError::Parse(e)),
}
}
}
pub async fn collect(mut self) -> Result<Vec<TlogEntry>, TlogError> {
let mut entries = Vec::new();
while let Some(entry) = self.next().await? {
entries.push(entry);
}
Ok(entries)
}
}
#[cfg(test)]
mod tests {
use super::*;
use mavlink::common::MavMessage;
use mavlink::{MavHeader, MavlinkVersion};
fn build_tlog(entries: &[(u64, MavMessage)]) -> Vec<u8> {
let mut buf = Vec::new();
let mut seq = 0u8;
for (ts, msg) in entries {
buf.extend_from_slice(&ts.to_le_bytes());
let header = MavHeader {
sequence: seq,
system_id: 1,
component_id: 1,
};
mavlink::write_versioned_msg(&mut buf, MavlinkVersion::V2, header, msg).unwrap();
seq = seq.wrapping_add(1);
}
buf
}
fn heartbeat() -> MavMessage {
MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA {
custom_mode: 0,
mavtype: mavlink::common::MavType::MAV_TYPE_QUADROTOR,
autopilot: mavlink::common::MavAutopilot::MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode: mavlink::common::MavModeFlag::empty(),
system_status: mavlink::common::MavState::MAV_STATE_STANDBY,
mavlink_version: 3,
})
}
#[tokio::test]
async fn basic_parsing() {
let data = build_tlog(&[
(1000, heartbeat()),
(2000, heartbeat()),
(3000, heartbeat()),
]);
let reader = TlogReader::new(data.as_slice());
let entries = reader.collect().await.unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].timestamp_usec, 1000);
assert_eq!(entries[1].timestamp_usec, 2000);
assert_eq!(entries[2].timestamp_usec, 3000);
for entry in &entries {
assert!(matches!(entry.message, MavMessage::HEARTBEAT(_)));
}
}
#[tokio::test]
async fn error_recovery() {
let entry1 = build_tlog(&[(1000, heartbeat())]);
let entry2 = build_tlog(&[(2000, heartbeat())]);
let mut data = Vec::new();
data.extend_from_slice(&entry1);
data.extend_from_slice(&[0xFF, 0x00, 0x42, 0x13, 0x37, 0xDE, 0xAD, 0xBE]);
data.extend_from_slice(&entry2);
let reader = TlogReader::new(data.as_slice());
let entries = reader.collect().await.unwrap();
assert!(!entries.is_empty());
assert_eq!(entries[0].timestamp_usec, 1000);
}
#[tokio::test]
async fn empty_input() {
let reader = TlogReader::new(&b""[..]);
let entries = reader.collect().await.unwrap();
assert!(entries.is_empty());
}
}