use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
use super::reader::{TlogError, TlogReader};
pub struct TlogFile {
path: PathBuf,
}
impl TlogFile {
pub async fn open(path: impl AsRef<Path>) -> Result<Self, TlogError> {
let path = path.as_ref().to_path_buf();
let _ = File::open(&path).await?;
Ok(Self { path })
}
pub async fn entries(&self) -> Result<TlogReader<BufReader<File>>, TlogError> {
let file = File::open(&self.path).await?;
Ok(TlogReader::new(BufReader::new(file)))
}
pub async fn seek_to_timestamp(
&self,
target_usec: u64,
) -> Result<TlogReader<BufReader<File>>, TlogError> {
let file = File::open(&self.path).await?;
let mut buf_reader = BufReader::new(file);
let file_len = buf_reader.seek(SeekFrom::End(0)).await?;
const MIN_ENTRY_SIZE: u64 = 20;
const LINEAR_THRESHOLD: u64 = 4096;
if file_len < MIN_ENTRY_SIZE {
buf_reader.seek(SeekFrom::Start(0)).await?;
return Ok(TlogReader::new(buf_reader));
}
let mut lo: u64 = 0;
let mut hi: u64 = file_len;
while hi - lo > LINEAR_THRESHOLD {
let mid = lo + (hi - lo) / 2;
match Self::find_entry_at_or_after(&mut buf_reader, mid, hi).await? {
Some((entry_pos, ts)) => {
if ts < target_usec {
lo = entry_pos + MIN_ENTRY_SIZE;
} else {
hi = entry_pos;
}
}
None => {
hi = mid;
}
}
}
buf_reader.seek(SeekFrom::Start(lo)).await?;
Self::linear_seek(buf_reader, target_usec).await
}
async fn find_entry_at_or_after(
reader: &mut BufReader<File>,
start: u64,
limit: u64,
) -> Result<Option<(u64, u64)>, TlogError> {
const CHUNK_SIZE: usize = 4096;
let mut pos = start;
while pos < limit {
reader.seek(SeekFrom::Start(pos)).await?;
let to_read = CHUNK_SIZE.min((limit - pos) as usize);
let mut buf = vec![0u8; to_read];
let n = reader.read(&mut buf).await?;
if n == 0 {
return Ok(None);
}
buf.truncate(n);
for (i, &byte) in buf.iter().enumerate() {
if byte == 0xFD || byte == 0xFE {
let magic_file_pos = pos + i as u64;
if magic_file_pos < 8 {
continue;
}
let ts_pos = magic_file_pos - 8;
reader.seek(SeekFrom::Start(ts_pos)).await?;
let mut ts_buf = [0u8; 8];
match reader.read_exact(&mut ts_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => continue,
Err(e) => return Err(TlogError::Io(e)),
}
let ts = u64::from_le_bytes(ts_buf);
reader.seek(SeekFrom::Start(magic_file_pos)).await?;
let mut peek = mavlink::async_peek_reader::AsyncPeekReader::new(&mut *reader);
match mavlink::read_versioned_msg_async::<mavlink::common::MavMessage, _>(
&mut peek,
mavlink::ReadVersion::Any,
)
.await
{
Ok(_) => return Ok(Some((ts_pos, ts))),
Err(mavlink::error::MessageReadError::Io(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
return Ok(None);
}
Err(_) => continue, }
}
}
pos += n as u64;
}
Ok(None)
}
async fn linear_seek(
mut buf_reader: BufReader<File>,
target_usec: u64,
) -> Result<TlogReader<BufReader<File>>, TlogError> {
loop {
let pos = buf_reader.stream_position().await?;
let mut ts_buf = [0u8; 8];
match buf_reader.read_exact(&mut ts_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(TlogReader::new(buf_reader));
}
Err(e) => return Err(TlogError::Io(e)),
}
let ts = u64::from_le_bytes(ts_buf);
if ts >= target_usec {
buf_reader.seek(SeekFrom::Start(pos)).await?;
return Ok(TlogReader::new(buf_reader));
}
use mavlink::async_peek_reader::AsyncPeekReader;
use mavlink::{ReadVersion, read_versioned_msg_async};
let mut peek = AsyncPeekReader::new(&mut buf_reader);
match read_versioned_msg_async::<mavlink::common::MavMessage, _>(
&mut peek,
ReadVersion::Any,
)
.await
{
Ok(_) => continue,
Err(mavlink::error::MessageReadError::Io(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
buf_reader.seek(SeekFrom::Start(pos)).await?;
return Ok(TlogReader::new(buf_reader));
}
Err(e) => return Err(TlogError::Parse(e)),
}
}
}
pub async fn time_range(&self) -> Result<Option<(u64, u64)>, TlogError> {
let file = File::open(&self.path).await?;
let mut buf_reader = BufReader::new(file);
let mut ts_buf = [0u8; 8];
match buf_reader.read_exact(&mut ts_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(TlogError::Io(e)),
}
let first_ts = u64::from_le_bytes(ts_buf);
let file_len = buf_reader.seek(SeekFrom::End(0)).await?;
const SCAN_SIZE: u64 = 8192;
let mut scan_start = file_len.saturating_sub(SCAN_SIZE);
let entry_pos = loop {
match Self::find_entry_at_or_after(&mut buf_reader, scan_start, file_len).await? {
Some((pos, _ts)) => break pos,
None => {
if scan_start == 0 {
return Ok(Some((first_ts, first_ts)));
}
scan_start = scan_start.saturating_sub(SCAN_SIZE);
}
}
};
buf_reader.seek(SeekFrom::Start(entry_pos)).await?;
let mut reader = TlogReader::new(buf_reader);
let mut last_ts = first_ts;
while let Some(entry) = reader.next().await? {
last_ts = entry.timestamp_usec;
}
Ok(Some((first_ts, last_ts)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use mavlink::common::MavMessage;
use mavlink::{MavHeader, MavlinkVersion};
fn build_tlog(entries: &[(u64, MavMessage)]) -> Vec<u8> {
let mut buf = Vec::new();
let mut seq = 0u8;
for (ts, msg) in entries {
buf.extend_from_slice(&ts.to_le_bytes());
let header = MavHeader {
sequence: seq,
system_id: 1,
component_id: 1,
};
mavlink::write_versioned_msg(&mut buf, MavlinkVersion::V2, header, msg).unwrap();
seq = seq.wrapping_add(1);
}
buf
}
fn heartbeat() -> MavMessage {
MavMessage::HEARTBEAT(mavlink::common::HEARTBEAT_DATA {
custom_mode: 0,
mavtype: mavlink::common::MavType::MAV_TYPE_QUADROTOR,
autopilot: mavlink::common::MavAutopilot::MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode: mavlink::common::MavModeFlag::empty(),
system_status: mavlink::common::MavState::MAV_STATE_STANDBY,
mavlink_version: 3,
})
}
async fn write_temp_tlog(entries: &[(u64, MavMessage)]) -> tempfile::NamedTempFile {
let data = build_tlog(entries);
let mut tmp = tempfile::NamedTempFile::new().unwrap();
std::io::Write::write_all(&mut tmp, &data).unwrap();
tmp
}
#[tokio::test]
async fn seek_to_timestamp() {
let tmp = write_temp_tlog(&[
(1000, heartbeat()),
(2000, heartbeat()),
(3000, heartbeat()),
])
.await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(2000).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 2000);
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 3000);
assert!(reader.next().await.unwrap().is_none());
}
#[tokio::test]
async fn time_range() {
let tmp = write_temp_tlog(&[
(1000, heartbeat()),
(2000, heartbeat()),
(3000, heartbeat()),
])
.await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let range = tlog.time_range().await.unwrap();
assert_eq!(range, Some((1000, 3000)));
}
#[tokio::test]
async fn time_range_empty() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let range = tlog.time_range().await.unwrap();
assert_eq!(range, None);
}
#[tokio::test]
async fn seek_to_first_entry() {
let tmp = write_temp_tlog(&[
(1000, heartbeat()),
(2000, heartbeat()),
(3000, heartbeat()),
])
.await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(1000).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 1000);
}
#[tokio::test]
async fn seek_before_all_entries() {
let tmp = write_temp_tlog(&[(1000, heartbeat()), (2000, heartbeat())]).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(500).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 1000);
}
#[tokio::test]
async fn seek_past_all_entries() {
let tmp = write_temp_tlog(&[(1000, heartbeat()), (2000, heartbeat())]).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(9999).await.unwrap();
assert!(reader.next().await.unwrap().is_none());
}
#[tokio::test]
async fn seek_single_entry() {
let tmp = write_temp_tlog(&[(5000, heartbeat())]).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(5000).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 5000);
assert!(reader.next().await.unwrap().is_none());
}
#[tokio::test]
async fn seek_many_entries() {
let entries: Vec<(u64, MavMessage)> =
(0..200).map(|i| (1000 + i * 100, heartbeat())).collect();
let tmp = write_temp_tlog(&entries).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(10_000).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 10_000);
let mut reader = tlog.seek_to_timestamp(10_050).await.unwrap();
let entry = reader.next().await.unwrap().unwrap();
assert_eq!(entry.timestamp_usec, 10_100);
}
#[tokio::test]
async fn seek_empty_file() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let mut reader = tlog.seek_to_timestamp(1000).await.unwrap();
assert!(reader.next().await.unwrap().is_none());
}
#[tokio::test]
async fn time_range_many_entries() {
let entries: Vec<(u64, MavMessage)> =
(0..500).map(|i| (1000 + i * 100, heartbeat())).collect();
let tmp = write_temp_tlog(&entries).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let range = tlog.time_range().await.unwrap();
assert_eq!(range, Some((1000, 1000 + 499 * 100)));
}
#[tokio::test]
async fn time_range_single_entry() {
let tmp = write_temp_tlog(&[(42_000, heartbeat())]).await;
let tlog = TlogFile::open(tmp.path()).await.unwrap();
let range = tlog.time_range().await.unwrap();
assert_eq!(range, Some((42_000, 42_000)));
}
}