use std::sync::mpsc::{Receiver, Sender};
pub const US_PER_SEC: u64 = 1_000_000;
pub fn utc_time_from_us(time_us: u64) -> chrono::NaiveDateTime {
chrono::NaiveDateTime::from_timestamp_opt(
(time_us / US_PER_SEC) as i64,
1_000u32 * (time_us % 1_000_000) as u32,
)
.unwrap_or_else(|| chrono::NaiveDateTime::from_timestamp(0, 0))
}
pub enum BufferElementsAmount {
NumberElements(usize),
}
pub struct BufferElementsOptions {
pub amount: BufferElementsAmount,
}
pub fn buffer_elements<T>(inflow: Receiver<T>, outflow: Sender<T>, options: BufferElementsOptions) {
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap(); }
buffer.push_back(e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
pub fn buffer_sort_elements<T>(
inflow: Receiver<T>,
outflow: Sender<T>,
options: BufferElementsOptions,
) where
T: std::cmp::Ord,
{
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap();
}
let idx = buffer.binary_search(&e).unwrap_or_else(|x| x); buffer.insert(idx, e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
struct SortedDltMessage {
m: crate::dlt::DltMessage,
calculated_time_us: u64, }
impl std::cmp::PartialEq for SortedDltMessage {
fn eq(&self, other: &Self) -> bool {
self.calculated_time_us == other.calculated_time_us }
}
impl std::cmp::Ord for SortedDltMessage {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.m.lifecycle == other.m.lifecycle {
if self.m.timestamp_dms == other.m.timestamp_dms {
self.m.index.cmp(&other.m.index) } else {
self.m.timestamp_dms.cmp(&other.m.timestamp_dms)
}
} else {
if self.calculated_time_us == other.calculated_time_us {
self.m.index.cmp(&other.m.index) } else {
self.calculated_time_us.cmp(&other.calculated_time_us)
}
}
}
}
impl std::cmp::PartialOrd for SortedDltMessage {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::cmp::Eq for SortedDltMessage {}
pub fn buffer_sort_messages<'a, M, S>(
inflow: Receiver<crate::dlt::DltMessage>,
outflow: Sender<crate::dlt::DltMessage>,
lcs_r: &'a evmap::ReadHandle<
crate::lifecycle::LifecycleId,
crate::lifecycle::LifecycleItem,
M,
S,
>,
windows_size_secs: u8,
min_buffer_delay_us: u64,
) -> Result<(), std::sync::mpsc::SendError<crate::dlt::DltMessage>>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
let mut buffer = std::collections::VecDeque::<SortedDltMessage>::new();
let mut lc_map = std::collections::BTreeMap::<crate::lifecycle::LifecycleId, u64>::new();
let mut get_lc_start_time = |ref x: crate::lifecycle::LifecycleId| -> u64 {
match lc_map.get(x) {
Some(t) => *t,
None => {
let start_time = match lcs_r.read() {
Some(map_read_ref) => {
let l = map_read_ref.get_one(x);
match l {
Some(l) => l.start_time,
None => 0,
}
}
None => 0,
};
lc_map.insert(*x, start_time);
println!("added lc_map {} {}", x, start_time);
start_time
}
}
};
struct MaxBufferDelayEntry {
start_time: u64, max_buffering_delay: u64,
}
let mut max_buffering_delays = std::collections::HashMap::<
crate::dlt::DltChar4,
(
crate::lifecycle::LifecycleId,
std::collections::VecDeque<MaxBufferDelayEntry>,
u64,
),
>::new();
let mut max_buffer_time_us = min_buffer_delay_us;
let mut update_max_buffering_delays =
|max_buffer_time_us: u64,
ecu: &crate::dlt::DltChar4,
lifecycle_id: &crate::lifecycle::LifecycleId,
msg_reception_time_us: u64,
buffering_delay: u64| {
let mut entry = max_buffering_delays.entry(*ecu).or_insert_with(|| {
(
*lifecycle_id,
std::collections::VecDeque::with_capacity(windows_size_secs as usize),
0,
)
});
let mut recalc_max_buffer_time_us = false;
if entry.0 != *lifecycle_id {
entry.1.clear();
entry.0 = *lifecycle_id;
entry.2 = buffering_delay;
recalc_max_buffer_time_us = true;
}
let mut recalc_buffering_delay = false;
let insert_new = entry.1.len() == 0
|| entry.1.back().unwrap().start_time + crate::utils::US_PER_SEC
< msg_reception_time_us;
if insert_new {
if entry.1.len() == windows_size_secs as usize {
if entry.1.front().unwrap().max_buffering_delay == entry.2 {
recalc_buffering_delay = true; }
entry.1.pop_front(); }
entry.1.push_back(MaxBufferDelayEntry {
start_time: msg_reception_time_us,
max_buffering_delay: buffering_delay,
});
if buffering_delay > entry.2 {
recalc_buffering_delay = false;
entry.2 = buffering_delay;
}
recalc_max_buffer_time_us = true; } else {
let last = entry.1.back_mut().unwrap();
if last.max_buffering_delay < buffering_delay {
last.max_buffering_delay = buffering_delay;
if buffering_delay > entry.2 {
recalc_buffering_delay = false;
recalc_max_buffer_time_us = true;
entry.2 = buffering_delay;
}
}
}
if recalc_buffering_delay {
entry.2 = entry
.1
.iter()
.max_by_key(|x| x.max_buffering_delay)
.unwrap()
.max_buffering_delay;
recalc_max_buffer_time_us = true;
}
if recalc_max_buffer_time_us {
let new_max_buffer_time_us = min_buffer_delay_us + {
let x = max_buffering_delays
.iter()
.max_by_key(|x| {
if x.1 .1.front().unwrap().start_time + (windows_size_secs-1) as u64 * crate::utils::US_PER_SEC > msg_reception_time_us {
1000 * crate::utils::US_PER_SEC
} else {
x.1 .2
}
})
.unwrap();
if x.1 .1.front().unwrap().start_time + (windows_size_secs-1) as u64 * crate::utils::US_PER_SEC > msg_reception_time_us {
1000 * crate::utils::US_PER_SEC
} else {
x.1 .2
}
};
if new_max_buffer_time_us != max_buffer_time_us && new_max_buffer_time_us > min_buffer_delay_us*2{
println!("max_buffer_time_us={}", new_max_buffer_time_us);
}
new_max_buffer_time_us
} else {
max_buffer_time_us
}
};
for m in inflow {
let msg_reception_time_us = m.reception_time_us;
let mut calculated_time_us: u64 = if m.is_ctrl_request() {
m.reception_time_us
} else {
get_lc_start_time(m.lifecycle) + m.timestamp_us()
};
if calculated_time_us > msg_reception_time_us {
calculated_time_us = msg_reception_time_us;
}
let buffering_delay = msg_reception_time_us - calculated_time_us;
max_buffer_time_us = update_max_buffering_delays(
max_buffer_time_us,
&m.ecu,
&m.lifecycle,
msg_reception_time_us,
buffering_delay,
);
let sm = SortedDltMessage {
m,
calculated_time_us,
};
let idx = buffer.binary_search(&sm).unwrap_or_else(|x| x); buffer.insert(idx, sm);
loop {
match buffer.front() {
Some(sm) => {
if sm.calculated_time_us + max_buffer_time_us < msg_reception_time_us {
let sm2 = buffer.pop_front().unwrap();
outflow.send(sm2.m)?;
} else {
break; }
}
None => {
break;
}
}
}
}
for sm in buffer.into_iter() {
outflow.send(sm.m)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::dlt::DltMessage;
use crate::utils::*;
use std::sync::mpsc::channel;
#[test]
fn buffer_messages() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
t.join().unwrap();
for i in 0..NUMBER_MSGS {
let mr = rx2.recv();
assert!(mr.is_ok(), "failed to get last msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
NUMBER_MSGS + i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
struct SortedMsg(DltMessage);
impl std::cmp::Ord for SortedMsg {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.timestamp_dms.cmp(&other.0.timestamp_dms)
}
}
impl std::cmp::PartialOrd for SortedMsg {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.timestamp_dms.cmp(&other.0.timestamp_dms))
}
}
impl PartialEq for SortedMsg {
fn eq(&self, other: &Self) -> bool {
self.0.timestamp_dms == other.0.timestamp_dms
}
}
impl Eq for SortedMsg {}
impl From<DltMessage> for SortedMsg {
fn from(msg: DltMessage) -> Self {
Self(msg)
}
}
#[test]
fn buffer_sort_messages() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
let mut msgs: std::vec::Vec<SortedMsg> = std::vec::Vec::with_capacity(NUMBER_MSGS);
for _ in 0..NUMBER_MSGS {
msgs.push(SortedMsg::from(crate::dlt::DltMessage::for_test()));
}
msgs.reverse();
let mut last_time_stamp = u32::MAX;
for m in msgs {
assert!(
m.0.timestamp_dms <= last_time_stamp,
"msg has wrong order/time_stamp! {} vs. exp. > {}",
m.0.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.0.timestamp_dms;
tx.send(m).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_sort_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap().0;
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
t.join().unwrap();
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
}