use crate::dlt::{DltArg, DltMessage, DltMessageIndexType, DLT_TYPE_INFO_RAWD, DLT_TYPE_INFO_STRG};
use std::{
io::BufRead,
sync::mpsc::{Receiver, Sender},
};
mod lowmarkbufreader;
pub use self::lowmarkbufreader::LowMarkBufReader;
mod asc2dltmsgiterator;
use self::asc2dltmsgiterator::Asc2DltMsgIterator;
mod dltmessageiterator;
pub use self::dltmessageiterator::{get_first_message_from_file, DltMessageIterator};
pub mod eac_stats;
pub mod remote_types;
pub fn get_dlt_message_iterator<'a, R: 'a + BufRead>(
file_ext: &str,
start_index: DltMessageIndexType,
reader: R,
log: Option<&'a slog::Logger>,
) -> Box<dyn Iterator<Item = DltMessage> + 'a> {
match file_ext.to_lowercase().as_str() {
"asc" => Box::new(Asc2DltMsgIterator::new(start_index, reader, log)),
_ => Box::new({
let mut it = DltMessageIterator::new(start_index, reader);
it.log = log;
it
}),
}
}
pub const US_PER_SEC: u64 = 1_000_000;
pub fn utc_time_from_us(time_us: u64) -> chrono::NaiveDateTime {
chrono::NaiveDateTime::from_timestamp_opt(
(time_us / US_PER_SEC) as i64,
1_000u32 * (time_us % 1_000_000) as u32,
)
.unwrap_or_else(|| chrono::NaiveDateTime::from_timestamp_opt(0, 0).unwrap())
}
static U8_HEX_LOW: [u8; 16] = [
b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', b'a', b'b', b'c', b'd', b'e', b'f',
];
static CHARS_HEX_LOW: [char; 16] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f',
];
pub fn buf_as_printable_ascii_to_write(
writer: &mut impl std::fmt::Write,
buf: &[u8],
replacement_char: char,
) -> Result<(), std::fmt::Error> {
for item in buf.iter() {
if *item >= 0x20 && *item <= 0x7e {
writer.write_char(*item as char)?;
} else {
writer.write_char(replacement_char)?;
}
}
Ok(())
}
pub fn buf_as_hex_to_write(
writer: &mut impl std::fmt::Write,
buf: &[u8],
) -> Result<(), std::fmt::Error> {
for (i, item) in buf.iter().enumerate() {
let c1 = CHARS_HEX_LOW[(item >> 4) as usize];
let c2 = CHARS_HEX_LOW[(item & 0x0f) as usize];
if i > 0 {
writer.write_char(' ')?;
}
writer.write_char(c1)?;
writer.write_char(c2)?;
}
Ok(())
}
pub fn buf_as_hex_to_io_write(
writer: &mut impl std::io::Write,
buf: &[u8],
) -> Result<(), std::io::Error> {
for (i, item) in buf.iter().enumerate() {
let c1 = U8_HEX_LOW[(item >> 4) as usize];
let c2 = U8_HEX_LOW[(item & 0x0f) as usize];
if i > 0 {
writer.write_all(&[b' ', c1, c2])?
} else {
writer.write_all(&[c1, c2])?
}
}
Ok(())
}
pub fn hex_to_bytes(s: &str) -> Option<Vec<u8>> {
if s.len() < 2 || (s.len() - 2) % 3 != 0 {
None
} else {
let mut v: Vec<u8> = Vec::with_capacity((s.len() + 1) / 3);
for i in (0..s.len()).step_by(3) {
let s = u8::from_str_radix(&s[i..i + 2], 16);
if let Ok(s) = s {
v.push(s);
} else {
return None;
}
}
Some(v)
}
}
#[macro_export]
macro_rules! to_endian_vec {
($x:expr, $i:expr) => {
if $i {
$x.to_be_bytes().to_vec()
} else {
$x.to_le_bytes().to_vec()
}
};
}
pub enum BufferElementsAmount {
NumberElements(usize),
}
pub struct BufferElementsOptions {
pub amount: BufferElementsAmount,
}
pub fn buffer_elements<T>(inflow: Receiver<T>, outflow: Sender<T>, options: BufferElementsOptions) {
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap(); }
buffer.push_back(e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
pub fn buffer_sort_elements<T>(
inflow: Receiver<T>,
outflow: Sender<T>,
options: BufferElementsOptions,
) where
T: std::cmp::Ord,
{
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap();
}
let idx = buffer.binary_search(&e).unwrap_or_else(|x| x); buffer.insert(idx, e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
struct SortedDltMessage {
m: crate::dlt::DltMessage,
calculated_time_us: u64, }
impl std::cmp::PartialEq for SortedDltMessage {
fn eq(&self, other: &Self) -> bool {
self.calculated_time_us == other.calculated_time_us }
}
impl std::cmp::Ord for SortedDltMessage {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.m.lifecycle == other.m.lifecycle {
if self.m.timestamp_dms == other.m.timestamp_dms {
self.m.index.cmp(&other.m.index) } else {
self.m.timestamp_dms.cmp(&other.m.timestamp_dms)
}
} else if self.calculated_time_us == other.calculated_time_us {
self.m.index.cmp(&other.m.index) } else {
self.calculated_time_us.cmp(&other.calculated_time_us)
}
}
}
impl std::cmp::PartialOrd for SortedDltMessage {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::cmp::Eq for SortedDltMessage {}
pub fn buffer_sort_messages<M, S>(
inflow: Receiver<crate::dlt::DltMessage>,
outflow: Sender<crate::dlt::DltMessage>,
lcs_r: &evmap::ReadHandle<crate::lifecycle::LifecycleId, crate::lifecycle::LifecycleItem, M, S>,
windows_size_secs: u8,
min_buffer_delay_us: u64,
) -> Result<(), std::sync::mpsc::SendError<crate::dlt::DltMessage>>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
let mut buffer = std::collections::VecDeque::<SortedDltMessage>::new();
let mut lc_map = std::collections::BTreeMap::<crate::lifecycle::LifecycleId, u64>::new();
let mut get_lc_start_time = |ref x: crate::lifecycle::LifecycleId| -> u64 {
match lc_map.get(x) {
Some(t) => *t,
None => {
let start_time = match lcs_r.read() {
Some(map_read_ref) => {
let l = map_read_ref.get_one(x);
match l {
Some(l) => l.start_time,
None => 0,
}
}
None => 0,
};
lc_map.insert(*x, start_time);
start_time
}
}
};
struct MaxBufferDelayEntry {
start_time: u64, max_buffering_delay: u64,
}
let mut max_buffering_delays = std::collections::HashMap::<
crate::dlt::DltChar4,
(
crate::lifecycle::LifecycleId,
std::collections::VecDeque<MaxBufferDelayEntry>,
u64,
),
>::new();
let mut max_buffer_time_us = min_buffer_delay_us;
let mut update_max_buffering_delays =
|max_buffer_time_us: u64,
ecu: &crate::dlt::DltChar4,
lifecycle_id: &crate::lifecycle::LifecycleId,
msg_reception_time_us: u64,
buffering_delay: u64| {
let mut entry = max_buffering_delays.entry(*ecu).or_insert_with(|| {
(
*lifecycle_id,
std::collections::VecDeque::with_capacity(windows_size_secs as usize),
0,
)
});
let mut recalc_max_buffer_time_us = false;
if entry.0 != *lifecycle_id {
entry.1.clear();
entry.0 = *lifecycle_id;
entry.2 = buffering_delay;
recalc_max_buffer_time_us = true;
}
let mut recalc_buffering_delay = false;
let insert_new = entry.1.is_empty()
|| entry.1.back().unwrap().start_time + crate::utils::US_PER_SEC
< msg_reception_time_us;
if insert_new {
if entry.1.len() == windows_size_secs as usize {
if entry.1.front().unwrap().max_buffering_delay == entry.2 {
recalc_buffering_delay = true; }
entry.1.pop_front(); }
entry.1.push_back(MaxBufferDelayEntry {
start_time: msg_reception_time_us,
max_buffering_delay: buffering_delay,
});
if buffering_delay > entry.2 {
recalc_buffering_delay = false;
entry.2 = buffering_delay;
}
recalc_max_buffer_time_us = true; } else {
let last = entry.1.back_mut().unwrap();
if last.max_buffering_delay < buffering_delay {
last.max_buffering_delay = buffering_delay;
if buffering_delay > entry.2 {
recalc_buffering_delay = false;
recalc_max_buffer_time_us = true;
entry.2 = buffering_delay;
}
}
}
if recalc_buffering_delay {
entry.2 = entry
.1
.iter()
.max_by_key(|x| x.max_buffering_delay)
.unwrap()
.max_buffering_delay;
recalc_max_buffer_time_us = true;
}
if recalc_max_buffer_time_us {
let new_max_buffer_time_us = min_buffer_delay_us + {
let x = max_buffering_delays
.iter()
.max_by_key(|x| {
if x.1 .1.front().unwrap().start_time
+ (windows_size_secs - 1) as u64 * crate::utils::US_PER_SEC
> msg_reception_time_us
{
1000 * crate::utils::US_PER_SEC
} else {
x.1 .2
}
})
.unwrap();
if x.1 .1.front().unwrap().start_time
+ (windows_size_secs - 1) as u64 * crate::utils::US_PER_SEC
> msg_reception_time_us
{
1000 * crate::utils::US_PER_SEC
} else {
x.1 .2
}
};
new_max_buffer_time_us
} else {
max_buffer_time_us
}
};
for m in inflow {
let msg_reception_time_us = m.reception_time_us;
let mut calculated_time_us: u64 = if m.is_ctrl_request() {
m.reception_time_us
} else {
get_lc_start_time(m.lifecycle) + m.timestamp_us()
};
if calculated_time_us > msg_reception_time_us {
calculated_time_us = msg_reception_time_us;
}
let buffering_delay = msg_reception_time_us - calculated_time_us;
max_buffer_time_us = update_max_buffering_delays(
max_buffer_time_us,
&m.ecu,
&m.lifecycle,
msg_reception_time_us,
buffering_delay,
);
let sm = SortedDltMessage {
m,
calculated_time_us,
};
let idx = buffer.binary_search(&sm).unwrap_or_else(|x| x); buffer.insert(idx, sm);
while let Some(sm) = buffer.front() {
if sm.calculated_time_us + max_buffer_time_us < msg_reception_time_us {
let sm2 = buffer.pop_front().unwrap();
outflow.send(sm2.m)?;
} else {
break; }
}
}
for sm in buffer.into_iter() {
outflow.send(sm.m)?;
}
Ok(())
}
pub fn payload_from_args<'a>(args: &'a [DltArg<'a>]) -> Vec<u8> {
if !args.is_empty() {
let mut payload = Vec::new();
let big_endian = args[0].is_big_endian;
for arg in args {
let persist_len_u16 = if arg.type_info & (DLT_TYPE_INFO_STRG | DLT_TYPE_INFO_RAWD) != 0
{
arg.payload_raw.len() as u16
} else {
0u16
};
let type_info = if big_endian {
arg.type_info.to_be_bytes()
} else {
arg.type_info.to_le_bytes()
};
payload.extend_from_slice(&type_info);
if persist_len_u16 > 0 {
payload.extend_from_slice(&if big_endian {
persist_len_u16.to_be_bytes()
} else {
persist_len_u16.to_le_bytes()
})
};
payload.extend_from_slice(arg.payload_raw);
}
payload
} else {
vec![]
}
}
#[cfg(test)]
mod tests {
use crate::dlt::DltMessage;
use crate::lifecycle::*;
use crate::utils::*;
use std::sync::mpsc::channel;
use chrono::{Datelike, Timelike};
#[test]
fn get_dlt_message_it() {
let mut it_asc = get_dlt_message_iterator("asc", 0, &[] as &[u8], None);
assert!(it_asc.next().is_none());
let mut it_dlt = get_dlt_message_iterator("dlt", 0, &[] as &[u8], None);
assert!(it_dlt.next().is_none());
}
#[test]
fn time_utc() {
let utc_time = utc_time_from_us(1640995200000001); assert_eq!(utc_time.date().day(), 1);
assert_eq!(utc_time.date().month(), 1);
assert_eq!(utc_time.date().year(), 2022);
assert_eq!(utc_time.time().hour(), 0);
assert_eq!(utc_time.time().minute(), 0);
assert_eq!(utc_time.time().second(), 0);
assert_eq!(utc_time.time().nanosecond(), 1000);
let utc_time = utc_time_from_us((i64::MAX as u64) + 42); assert_eq!(utc_time.timestamp(), 0);
assert_eq!(utc_time.date().day(), 1);
assert_eq!(utc_time.date().month(), 1);
assert_eq!(utc_time.date().year(), 1970);
}
#[test]
fn buf_as_ascii() {
let mut s = String::new();
buf_as_printable_ascii_to_write(&mut s, &[], '-').unwrap();
assert_eq!(s.len(), 0);
buf_as_printable_ascii_to_write(
&mut s,
&[0x00_u8, 0x1f, 0x20, 0x40, 0x7c, 0x7e, 0x7f, 0xff],
'-',
)
.unwrap();
assert_eq!(s, "-- @|~--");
}
#[test]
fn buf_as_hex() {
let mut s = String::new();
buf_as_hex_to_write(&mut s, &[]).unwrap();
assert_eq!(s.len(), 0);
buf_as_hex_to_write(&mut s, &[0x0f_u8]).unwrap();
assert_eq!(s, "0f");
let mut s = String::new();
buf_as_hex_to_write(&mut s, &[0x0f_u8, 0x00_u8, 0xff_u8]).unwrap();
assert_eq!(s, "0f 00 ff");
let mut v = Vec::<u8>::new();
buf_as_hex_to_io_write(&mut v, &[0x0f_u8, 0x00_u8, 0xff_u8]).unwrap();
assert_eq!(std::str::from_utf8(v.as_slice()).unwrap(), "0f 00 ff");
}
#[test]
fn hex_to_bytes1() {
assert!(hex_to_bytes("").is_none());
assert!(hex_to_bytes("1").is_none());
assert_eq!(hex_to_bytes("02").unwrap(), vec![0x2]);
assert_eq!(hex_to_bytes("12").unwrap(), vec![0x12]);
assert!(hex_to_bytes("123").is_none());
assert!(hex_to_bytes("12 ").is_none());
assert!(hex_to_bytes("12 3").is_none());
assert_eq!(hex_to_bytes("12 34").unwrap(), vec![0x12, 0x34]);
assert!(hex_to_bytes("12 34 ").is_none());
assert!(hex_to_bytes("12 34 5").is_none());
assert_eq!(hex_to_bytes("ff dd").unwrap(), vec![0xff, 0xdd]);
assert_eq!(hex_to_bytes("fF Dd").unwrap(), vec![0xff, 0xdd]);
assert!(hex_to_bytes("gh").is_none());
assert!(hex_to_bytes("gh 10 23 ij").is_none());
}
#[test]
fn buffer_messages() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
t.join().unwrap();
for i in 0..NUMBER_MSGS {
let mr = rx2.recv();
assert!(mr.is_ok(), "failed to get last msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
NUMBER_MSGS + i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
struct SortedMsg(DltMessage);
impl std::cmp::Ord for SortedMsg {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.timestamp_dms.cmp(&other.0.timestamp_dms)
}
}
impl std::cmp::PartialOrd for SortedMsg {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.timestamp_dms.cmp(&other.0.timestamp_dms))
}
}
impl PartialEq for SortedMsg {
fn eq(&self, other: &Self) -> bool {
self.0.timestamp_dms == other.0.timestamp_dms
}
}
impl Eq for SortedMsg {}
impl From<DltMessage> for SortedMsg {
fn from(msg: DltMessage) -> Self {
Self(msg)
}
}
#[test]
fn buffer_sort_elements2() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
let mut msgs: std::vec::Vec<SortedMsg> = std::vec::Vec::with_capacity(NUMBER_MSGS);
for _ in 0..NUMBER_MSGS {
msgs.push(SortedMsg::from(crate::dlt::DltMessage::for_test()));
}
msgs.reverse();
let mut last_time_stamp = u32::MAX;
for m in msgs {
assert!(
m.0.timestamp_dms <= last_time_stamp,
"msg has wrong order/time_stamp! {} vs. exp. > {}",
m.0.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.0.timestamp_dms;
tx.send(m).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_sort_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap().0;
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
t.join().unwrap();
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
#[test]
fn buffer_sort_elements3() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
let mut msgs: std::vec::Vec<SortedMsg> = std::vec::Vec::with_capacity(NUMBER_MSGS);
let mut second_msg_timestamp = 0;
for i in 0..NUMBER_MSGS {
let m = crate::dlt::DltMessage::for_test();
if i == 1 {
second_msg_timestamp = m.timestamp_us()
}
msgs.push(SortedMsg::from(m));
}
msgs.reverse();
let mut last_time_stamp = u32::MAX;
for m in msgs {
assert!(
m.0.timestamp_dms <= last_time_stamp,
"msg has wrong order/time_stamp! {} vs. exp. > {}",
m.0.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.0.timestamp_dms;
tx.send(m).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_sort_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS - 1),
},
)
});
let m = rx2.recv().unwrap();
assert_eq!(m.0.timestamp_us(), second_msg_timestamp); assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS - 1 {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap().0;
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
t.join().unwrap();
}
#[test]
fn buffer_sort_message_sorted_basic1() {
let (tx, parse_lc_in) = channel();
tx.send(DltMessage::for_test_rcv_tms_ms(1_000, 1_000))
.unwrap();
tx.send(DltMessage::for_test_rcv_tms_ms(1_200, 1_100))
.unwrap();
drop(tx);
let (parse_lc_out, sort_in) = channel();
let (sort_out, rx) = channel();
let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
assert_eq!(1, lcs_r.len(), "wrong number of lcs!");
let res = buffer_sort_messages(sort_in, sort_out, &lcs_r, 3, 2_000_000);
assert!(res.is_ok());
let mut last_timestamp = 0;
for _ in 0..2 {
let m = rx.recv().unwrap();
assert!(m.timestamp_us() > last_timestamp);
last_timestamp = m.timestamp_us();
}
}
#[test]
fn buffer_sort_message_sorted_basic2() {
let (tx, sort_in) = channel();
tx.send(DltMessage::for_test_rcv_tms_ms(1_000, 1_000))
.unwrap();
tx.send(DltMessage::for_test_rcv_tms_ms(1_200, 1_100))
.unwrap();
drop(tx);
let (sort_out, rx) = channel();
let (lcs_r, _lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
let res = buffer_sort_messages(sort_in, sort_out, &lcs_r, 3, 2_000_000);
assert!(res.is_ok());
let mut last_timestamp = 0;
for _ in 0..2 {
let m = rx.recv().unwrap();
assert!(m.timestamp_us() > last_timestamp);
last_timestamp = m.timestamp_us();
}
}
#[test]
fn buffer_sort_message_sorted_basic3() {
let (tx, sort_in) = channel();
let mut m1 = DltMessage::for_test_rcv_tms_ms(0, 1_100);
let mut lc = Lifecycle::new(&mut m1);
tx.send(m1).unwrap();
let mut m2 = DltMessage::for_test_rcv_tms_ms(1, 1_000);
assert!(lc.update(&mut m2).is_none());
tx.send(m2).unwrap();
drop(tx);
let (sort_out, rx) = channel();
let (lcs_r, mut lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
lcs_w.insert(lc.id(), lc);
lcs_w.refresh();
let res = buffer_sort_messages(sort_in, sort_out, &lcs_r, 3, 2_000_000);
assert!(res.is_ok());
let mut last_timestamp = 0;
for i in 0..2 {
let m = rx.recv().unwrap();
assert!(
m.timestamp_us() > last_timestamp,
"wrong order at msg#{}: {:?}",
i,
m
);
last_timestamp = m.timestamp_us();
}
}
}