use crate::{
dlt::{HEADER_MIN_LENGTH, STORAGE_HEADER_LENGTH},
filtering::ProcessedDltFilterConfig,
parse::{dlt_message, parse_length, DltParseError, ParsedMessage, DLT_PATTERN},
};
use std::io::{BufReader, Read};
pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 10 * 1024 * 1024;
pub(crate) const DEFAULT_MESSAGE_MAX_LEN: usize =
STORAGE_HEADER_LENGTH as usize + u16::MAX as usize;
pub fn read_message<S: Read>(
reader: &mut DltMessageReader<S>,
filter_config_opt: Option<&ProcessedDltFilterConfig>,
) -> Result<Option<ParsedMessage>, DltParseError> {
let with_storage_header = reader.with_storage_header();
let slice = reader.next_message_slice()?;
if !slice.is_empty() {
Ok(Some(
dlt_message(slice, filter_config_opt, with_storage_header)?.1,
))
} else {
Ok(None)
}
}
pub struct DltMessageReader<S: Read> {
source: BufReader<S>,
with_storage_header: bool,
buffer: Vec<u8>,
}
impl<S: Read> DltMessageReader<S> {
pub fn new(source: S, with_storage_header: bool) -> Self {
DltMessageReader::with_capacity(
DEFAULT_BUFFER_CAPACITY,
DEFAULT_MESSAGE_MAX_LEN,
source,
with_storage_header,
)
}
pub fn with_capacity(
buffer_capacity: usize,
message_max_len: usize,
source: S,
with_storage_header: bool,
) -> Self {
assert!(buffer_capacity >= message_max_len);
DltMessageReader {
source: BufReader::with_capacity(buffer_capacity, source),
with_storage_header,
buffer: vec![0u8; message_max_len],
}
}
pub fn next_message_slice(&mut self) -> Result<&[u8], DltParseError> {
loop {
let storage_len = if self.with_storage_header {
let storage_len = STORAGE_HEADER_LENGTH as usize;
loop {
if self
.source
.read_exact(&mut self.buffer[..storage_len])
.is_err()
{
return Ok(&[]);
}
if &self.buffer[..DLT_PATTERN.len()] == DLT_PATTERN {
break;
}
}
storage_len
} else {
0
};
let header_len = storage_len + HEADER_MIN_LENGTH as usize;
if self
.source
.read_exact(&mut self.buffer[storage_len..header_len])
.is_err()
{
return Ok(&[]);
}
let (_, message_len) = parse_length(&self.buffer[storage_len..header_len])?;
let total_len = storage_len + message_len as usize;
if total_len < header_len {
continue;
}
self.source
.read_exact(&mut self.buffer[header_len..total_len])?;
return Ok(&self.buffer[..total_len]);
}
}
pub fn with_storage_header(&self) -> bool {
self.with_storage_header
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
dlt::Message,
proptest_strategies::{messages_strat, messages_with_storage_header_strat},
tests::{DLT_MESSAGE, DLT_MESSAGE_WITH_STORAGE_HEADER},
};
use proptest::prelude::*;
#[test]
fn test_message_reader() {
let messages_with_storage = [
(DLT_MESSAGE, false),
(DLT_MESSAGE_WITH_STORAGE_HEADER, true),
];
for message_with_storage in &messages_with_storage {
let bytes = message_with_storage.0;
let with_storage_header = message_with_storage.1;
let mut reader = DltMessageReader::new(bytes, with_storage_header);
assert_eq!(with_storage_header, reader.with_storage_header());
let slice = reader.next_message_slice().expect("message");
assert_eq!(bytes, slice);
assert!(reader.next_message_slice().expect("message").is_empty());
}
}
#[test]
fn test_read_message() {
let messages_with_storage = [
(DLT_MESSAGE, false),
(DLT_MESSAGE_WITH_STORAGE_HEADER, true),
];
for message_with_storage in &messages_with_storage {
let bytes = message_with_storage.0;
let with_storage_header = message_with_storage.1;
let mut reader = DltMessageReader::new(bytes, with_storage_header);
if let Some(ParsedMessage::Item(message)) =
read_message(&mut reader, None).expect("message")
{
assert_eq!(bytes, message.as_bytes());
}
assert_eq!(None, read_message(&mut reader, None).expect("message"));
}
}
#[test]
fn test_read_message_robustness() {
#[rustfmt::skip]
let bytes = [
[
0xFF, 0x4C, 0x54, 0x01, 0x2B, 0x2C, 0xC9, 0x4D,
0x7A, 0xE8, 0x01, 0x00, 0x45, 0x43, 0x55, 0x00,
]
.to_vec(),
[
0x44, 0x4C, 0x54, 0x01, 0x2B, 0x2C, 0xC9, 0x4D,
0x7A, 0xE8, 0x01, 0x00, 0x45, 0x43, 0x55, 0x00,
0x21, 0x0A, 0x00, 0x00,
]
.to_vec(),
DLT_MESSAGE_WITH_STORAGE_HEADER.to_vec(),
]
.concat();
let mut reader = DltMessageReader::new(bytes.as_slice(), true);
assert!(read_message(&mut reader, None).expect("message").is_some());
assert!(read_message(&mut reader, None).expect("message").is_none());
}
proptest! {
#[test]
fn test_read_messages_proptest(messages in messages_strat(10)) {
test_read_messages(messages, false);
}
#[test]
fn test_read_messages_with_storage_header_proptest(messages in messages_with_storage_header_strat(10)) {
test_read_messages(messages, true);
}
}
fn test_read_messages(messages: Vec<Message>, with_storage_header: bool) {
let mut bytes = vec![];
for message in &messages {
bytes.extend(message.as_bytes());
}
let mut reader = DltMessageReader::new(bytes.as_slice(), with_storage_header);
let mut parsed = 0usize;
loop {
match read_message(&mut reader, None).expect("read") {
Some(ParsedMessage::Item(message)) => {
assert_eq!(messages.get(parsed).unwrap().as_bytes(), message.as_bytes());
parsed += 1;
}
None => {
break;
}
_ => {
panic!("unexpected item");
}
};
}
assert_eq!(messages.len(), parsed);
}
}