use crate::dlt::{DltChar4, DltMessage};
use std::hash::{Hash, Hasher};
use std::sync::mpsc::{Receiver, Sender};
pub type LifecycleId = u32;
pub type LifecycleItem = Lifecycle;
fn new_lifecycle_item(lc: Lifecycle) -> LifecycleItem {
LifecycleItem::from(lc) }
#[derive(Debug, Clone, Copy)]
pub struct Lifecycle {
id: LifecycleId,
pub ecu: DltChar4,
pub nr_msgs: u32,
pub nr_control_req_msgs: u32,
pub start_time: u64, initial_start_time: u64,
max_timestamp_us: u64, }
impl evmap::ShallowCopy for Lifecycle {
unsafe fn shallow_copy(&self) -> std::mem::ManuallyDrop<Self> {
std::mem::ManuallyDrop::new(*self)
}
}
impl PartialEq for Lifecycle {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Lifecycle {}
impl Hash for Lifecycle {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
static NEXT_LC_ID: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(1);
impl Lifecycle {
pub fn id(&self) -> u32 {
self.id
}
pub fn end_time(&self) -> u64 {
return self.start_time + self.max_timestamp_us;
}
pub fn only_control_requests(&self) -> bool {
self.nr_control_req_msgs >= self.nr_msgs
}
pub fn new(msg: &mut DltMessage) -> Lifecycle {
let is_ctrl_request = msg.is_ctrl_request();
let timestamp_us = if is_ctrl_request {
0
} else {
msg.timestamp_us()
};
let alc = Lifecycle {
id: NEXT_LC_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
ecu: msg.ecu.clone(),
nr_msgs: 1,
nr_control_req_msgs: if is_ctrl_request { 1 } else { 0 },
start_time: msg.reception_time_us - timestamp_us,
initial_start_time: msg.reception_time_us - timestamp_us,
max_timestamp_us: timestamp_us,
};
msg.lifecycle = alc.id;
alc
}
pub fn merge(&mut self, lc_to_merge: &mut Lifecycle) {
assert_ne!(lc_to_merge.nr_msgs, 0);
self.nr_msgs += lc_to_merge.nr_msgs;
self.nr_control_req_msgs += lc_to_merge.nr_control_req_msgs;
lc_to_merge.nr_msgs = 0; if lc_to_merge.max_timestamp_us > self.max_timestamp_us {
self.max_timestamp_us = lc_to_merge.max_timestamp_us;
}
if lc_to_merge.start_time < self.start_time {
self.start_time = lc_to_merge.start_time;
self.initial_start_time = lc_to_merge.initial_start_time;
}
lc_to_merge.max_timestamp_us = self.id as u64;
lc_to_merge.start_time = u64::MAX;
}
pub fn final_lc<'a>(
&'a self,
interims_lcs: &'a std::collections::HashMap<LifecycleId, &Lifecycle>,
) -> &'a Lifecycle {
if self.nr_msgs == 0 {
interims_lcs
.get(&(self.max_timestamp_us as u32))
.unwrap()
.final_lc(interims_lcs)
} else {
self
}
}
pub fn was_merged(&self) -> Option<u32> {
if self.nr_msgs == 0 {
Some(self.max_timestamp_us as u32)
} else {
None
}
}
pub fn update(&mut self, msg: &mut DltMessage) -> Option<Lifecycle> {
if msg.is_ctrl_request() {
msg.lifecycle = self.id;
self.nr_msgs += 1;
self.nr_control_req_msgs += 1;
return None;
}
let msg_timestamp_us = msg.timestamp_us();
let msg_lc_start = msg.reception_time_us - msg_timestamp_us;
let cur_end_time = self.end_time();
if msg_lc_start <= cur_end_time {
if self.max_timestamp_us < msg_timestamp_us {
self.max_timestamp_us = msg_timestamp_us;
}
if msg_lc_start < self.start_time {
self.start_time = msg_lc_start;
}
msg.lifecycle = self.id;
self.nr_msgs += 1;
None
} else {
Some(Lifecycle::new(msg))
}
}
}
pub fn parse_lifecycles_from_stream<M, S>(
mut lcs_w: evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>,
inflow: Receiver<DltMessage>,
outflow: Sender<DltMessage>,
) -> evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
let mut ecu_map: std::collections::HashMap<DltChar4, Vec<Lifecycle>> =
std::collections::HashMap::new();
for lci in lcs_w.read().iter() {
for (_id, b) in lci {
let lc = b.get_one().unwrap();
match ecu_map.get_mut(&lc.ecu) {
None => {
ecu_map.insert(lc.ecu.clone(), [lc.clone()].to_vec());
}
Some(v) => v.push(lc.clone()),
}
}
}
println!("Have ecu_map.len={}", ecu_map.len());
for (k, v) in &ecu_map {
println!("Have for ecu {:?} {:?}", &k, &v);
}
let mut last_last_lc_id = 0;
for mut msg in inflow {
let ecu_lcs = ecu_map.entry(msg.ecu.clone()).or_insert_with(|| Vec::new());
let ecu_lcs_len = ecu_lcs.len();
if ecu_lcs_len > 0 {
let (last_lc, rest_lcs) = ecu_lcs.as_mut_slice().split_last_mut().unwrap();
let lc2 = last_lc; assert_eq!(last_last_lc_id, lc2.id);
let mut remove_last_lc = false;
match lc2.update(&mut msg) {
None => {
if ecu_lcs_len > 1 {
let prev_lc = rest_lcs.last_mut().unwrap(); if lc2.start_time <= prev_lc.end_time() {
println!("merge needed:\n {:?}\n {:?}", prev_lc, lc2);
prev_lc.merge(lc2);
lcs_w.update(prev_lc.id, *prev_lc);
last_last_lc_id = prev_lc.id;
remove_last_lc = true;
}
}
lcs_w.update(lc2.id, *lc2);
}
Some(lc3) => {
last_last_lc_id = lc3.id;
lcs_w.insert(lc3.id, new_lifecycle_item(lc3));
ecu_lcs.push(lc3);
lcs_w.refresh();
}
}
if remove_last_lc {
ecu_lcs.remove(ecu_lcs_len - 1);
}
} else {
let lc = Lifecycle::new(&mut msg);
last_last_lc_id = lc.id;
lcs_w.insert(lc.id, new_lifecycle_item(lc));
ecu_lcs.push(lc);
lcs_w.refresh();
}
outflow.send(msg).unwrap(); }
lcs_w.refresh();
lcs_w
}
pub fn parse_lifecycles_buffered_from_stream<M, S>(
mut lcs_w: evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>,
inflow: Receiver<DltMessage>,
outflow: Sender<DltMessage>,
) -> evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
let max_buffering_delay: u64 = 60_000;
let mut ecu_map: std::collections::HashMap<DltChar4, Vec<Lifecycle>> =
std::collections::HashMap::new();
for lci in lcs_w.read().iter() {
for (_id, b) in lci {
let lc = b.get_one().unwrap();
match ecu_map.get_mut(&lc.ecu) {
None => {
ecu_map.insert(lc.ecu.clone(), [lc.clone()].to_vec());
}
Some(v) => v.push(lc.clone()),
}
}
}
let mut buffered_msgs: std::vec::Vec<DltMessage> = std::vec::Vec::new();
let mut buffered_lcs: std::collections::HashSet<LifecycleId> = std::collections::HashSet::new();
for mut msg in inflow {
let msg_ecu = msg.ecu.clone();
let ecu_lcs = ecu_map.entry(msg_ecu).or_insert_with(|| Vec::new());
let ecu_lcs_len = ecu_lcs.len();
if ecu_lcs_len > 0 {
let (last_lc, rest_lcs) = ecu_lcs.as_mut_slice().split_last_mut().unwrap();
let lc2 = last_lc;
let mut remove_last_lc = false;
match lc2.update(&mut msg) {
None => {
if ecu_lcs_len > 1 {
let prev_lc = rest_lcs.last_mut().unwrap(); if lc2.start_time <= prev_lc.end_time() {
println!("merge needed:\n {:?}\n {:?}", prev_lc, lc2);
let is_buffered = buffered_lcs.contains(&lc2.id);
if is_buffered {
prev_lc.merge(lc2);
msg.lifecycle = prev_lc.id;
{
buffered_msgs.iter_mut().for_each(|m| {
println!(
"modifying lifecycle from {} to {} for {:?}",
lc2.id, prev_lc.id, m
);
if m.lifecycle == lc2.id {
(*m).lifecycle = prev_lc.id;
}
});
buffered_msgs.iter().for_each(|m| {
assert_ne!(m.lifecycle, lc2.id);
});
};
buffered_lcs.remove(&lc2.id);
remove_last_lc = true;
if buffered_lcs.len() == 0 {
while buffered_msgs.len() > 0 {
let msg = buffered_msgs.remove(0);
outflow.send(msg).unwrap();
}
assert_eq!(buffered_msgs.len(), 0);
}
} else {
assert!(false, "todo shouldn't happen yet!");
prev_lc.merge(lc2);
lcs_w.update(prev_lc.id, *prev_lc);
remove_last_lc = true;
}
}
let is_buffered = buffered_lcs.contains(&lc2.id);
if is_buffered {
let min_lc_start_time =
(msg.reception_time_us - msg.timestamp_us()) - max_buffering_delay;
if min_lc_start_time > prev_lc.start_time {
buffered_lcs.remove(&lc2.id);
lcs_w.insert(lc2.id, new_lifecycle_item(*lc2));
lcs_w.refresh();
if buffered_lcs.len() == 0 {
while buffered_msgs.len() > 0 {
let msg = buffered_msgs.remove(0);
outflow.send(msg).unwrap();
}
assert_eq!(buffered_msgs.len(), 0);
}
}
}
}
if buffered_lcs.len() > 0 {
}
if lcs_w.contains_key(&lc2.id) {
lcs_w.update(lc2.id, *lc2);
lcs_w.refresh(); }
}
Some(lc3) => {
buffered_lcs.insert(lc3.id);
ecu_lcs.push(lc3);
}
}
if remove_last_lc {
let removed = ecu_lcs.remove(ecu_lcs_len - 1);
assert!(!buffered_lcs.contains(&removed.id));
}
} else {
assert_eq!(
buffered_lcs.len(),
0,
"buffered_lcs exist already. not yet implemented/tested!"
);
let lc = Lifecycle::new(&mut msg);
lcs_w.insert(lc.id, new_lifecycle_item(lc));
ecu_lcs.push(lc);
lcs_w.refresh();
}
if buffered_lcs.len() > 0 {
buffered_msgs.push(msg);
} else {
outflow.send(msg).unwrap(); }
}
for lc_id in buffered_lcs {
'outer: for vs in ecu_map.values() {
for v in vs {
if v.id == lc_id {
lcs_w.insert(lc_id, new_lifecycle_item(*v));
break 'outer;
}
}
}
}
lcs_w.refresh();
for m in buffered_msgs.into_iter() {
outflow.send(m).unwrap();
}
lcs_w
}
pub fn get_interims_lifecycles_as_hashmap<'a, M, S>(
lcr: &'a evmap::MapReadRef<LifecycleId, LifecycleItem, M, S>,
) -> std::collections::HashMap<LifecycleId, &'a Lifecycle>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
lcr.iter()
.map(|(id, b)| (*id, b.get_one().unwrap()))
.collect()
}
pub fn get_mapped_lifecycles_as_hashmap<'a>(
interims_lcs: &'a std::collections::HashMap<LifecycleId, &'a Lifecycle>,
) -> std::collections::HashMap<LifecycleId, &'a Lifecycle> {
interims_lcs
.iter()
.map(|(id, l)| (*id, l.final_lc(interims_lcs)))
.collect()
}
pub fn get_sorted_lifecycles_as_vec<'a, M, S>(
lcr: &'a evmap::MapReadRef<LifecycleId, LifecycleItem, M, S>,
) -> std::vec::Vec<&'a Lifecycle>
where
S: std::hash::BuildHasher + Clone,
M: 'static + Clone,
{
let mut sorted_lcs: std::vec::Vec<&'a Lifecycle> = lcr
.iter()
.map(|(id, b)| {
let lc = b.get_one().unwrap();
assert_eq!(&lc.id, id);
lc
})
.collect();
sorted_lcs.sort_by(|a, b| a.start_time.cmp(&b.start_time));
sorted_lcs
}
#[cfg(test)]
mod tests {
use crate::lifecycle::*;
use std::sync::mpsc::channel;
use std::time::Instant;
extern crate nohash_hasher;
#[test]
fn one_ecu() {
let (tx, rx) = channel();
const NUMBER_ITERATIONS: usize = 2_000_000;
let start = Instant::now();
for _ in 0..NUMBER_ITERATIONS {
tx.send(crate::dlt::DltMessage::for_test()).unwrap();
}
let duration = start.elapsed();
println!(
"Time elapsed sending {}msgs is: {:?}",
NUMBER_ITERATIONS, duration
);
let (tx2, rx2) = channel();
drop(tx);
let (lcs_r, lcs_w) = evmap::Options::default()
.with_hasher(nohash_hasher::BuildNoHashHasher::<LifecycleId>::default())
.construct::<LifecycleId, LifecycleItem>(); let start = Instant::now();
let t = std::thread::spawn(move || parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2));
for a in lcs_r.read().iter() {
println!("lcs_r content before join {:?}", a);
}
let lcs_w = t.join().unwrap();
let duration = start.elapsed();
println!(
"Time elapsed parse_lifecycles {}msgs is: {:?}",
NUMBER_ITERATIONS, duration
);
let start = Instant::now();
{
let read_handle = lcs_r.read();
assert_eq!(read_handle.is_some(), true);
let read_handle = read_handle.unwrap();
for i in 0..NUMBER_ITERATIONS {
let m = rx2.recv();
assert_eq!(m.is_ok(), true, "{}th message missing", i + 1);
let msg = m.unwrap();
assert_ne!(msg.lifecycle, 0, "{}th message without lifecycle", i + 1);
let l = read_handle.get_one(&msg.lifecycle);
assert_eq!(l.is_some(), true);
}
}
let duration = start.elapsed();
println!(
"Time elapsed reading/verifying {}msgs is: {:?}",
NUMBER_ITERATIONS, duration
);
assert_ne!(rx2.recv().is_ok(), true);
for a in lcs_r.read().iter() {
println!("lcs_r content {:?}", a);
}
for a in lcs_w.read().iter() {
for (id, b) in a {
println!("lcs_w2 content id={:?} lc={:?}", id, b);
}
}
assert_eq!(lcs_r.is_empty(), false, "empty lcs!");
assert_eq!(lcs_r.len(), 1, "wrong number of lcs!");
}
#[test]
fn basics() {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
drop(tx);
let (_lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
assert_eq!(rx2.recv().is_err(), true);
}
#[test]
fn basics_read_in_different_thread() {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
drop(tx);
let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
assert_eq!(rx2.recv().is_err(), true);
let r = lcs_r.clone();
let t = std::thread::spawn(move || {
for a in r.read().iter() {
println!("r content {:?}", a);
}
assert_eq!(r.len(), 0);
});
t.join().unwrap();
}
struct MessageGenerator {
msgs: std::vec::Vec<DltMessage>,
}
struct MessageGeneratorOptions {
frequency: u64,
ecu: DltChar4,
}
impl Default for MessageGeneratorOptions {
fn default() -> Self {
MessageGeneratorOptions {
frequency: 1_000,
ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, 0x45]),
}
}
}
impl MessageGenerator {
fn new(
lc_start_time: u64,
initial_delays: &[(u64, u64)],
nr_msgs: usize,
options: MessageGeneratorOptions,
) -> MessageGenerator {
let mut msgs: std::vec::Vec<DltMessage> = std::vec::Vec::new();
for (buf_delay, start_delay) in initial_delays {
for i in 0..nr_msgs {
let timestamp_us = start_delay + ((i as u64) * options.frequency); let min_send_time = std::cmp::max(buf_delay + (i as u64), timestamp_us);
msgs.push(DltMessage {
index: i as crate::dlt::DltMessageIndexType,
reception_time_us: lc_start_time + min_send_time,
timestamp_dms: (timestamp_us / 100) as u32,
lifecycle: 0,
ecu: options.ecu,
standard_header: crate::dlt::DltStandardHeader {
htyp: 1,
len: 0,
mcnt: 0,
},
extended_header: None,
payload: [].to_vec(),
});
}
}
msgs.sort_by(|a, b| a.reception_time_us.cmp(&b.reception_time_us));
MessageGenerator { msgs }
}
}
impl Iterator for MessageGenerator {
type Item = DltMessage;
fn next(&mut self) -> Option<Self::Item> {
if self.msgs.len() > 0 {
let r = Some(self.msgs.remove(0));
r
} else {
None
}
}
}
#[test]
fn gen_two_lcs() {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
const NUMBER_PER_MSG_CAT: usize = 50;
const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
const NUMBER_MSGS: usize = LC_START_TIMES.len() * NUMBER_PER_MSG_CAT * MSG_DELAYS.len();
let gen_lc1 = MessageGenerator::new(
LC_START_TIMES[0],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
Default::default(),
);
for m in gen_lc1 {
tx.send(m).unwrap();
}
let gen_lc2 = MessageGenerator::new(
LC_START_TIMES[1],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
Default::default(),
);
for m in gen_lc2 {
tx.send(m).unwrap();
}
drop(tx);
let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
println!("have {} interims lifecycles", lcs_r.len());
if let Some(a) = lcs_r.read() {
println!("have interims lifecycles");
for (id, b) in a.iter() {
println!("lcs_r content id={:?} lc={:?}", id, b);
}
let mut final_lcs: std::vec::Vec<&Lifecycle> = a
.iter()
.filter(|(_id, b)| b.get_one().unwrap().was_merged().is_none())
.map(|(_id, b)| b.get_one().unwrap())
.collect();
println!("have {} final lifecycles", final_lcs.len());
final_lcs.sort_by(|a, b| a.start_time.cmp(&b.start_time));
for (i, lc) in final_lcs.iter().enumerate() {
println!("lc={:?}", lc);
match i {
0 => {
assert_eq!(lc.start_time, LC_START_TIMES[0]);
assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
assert_eq!(
lc.end_time(),
LC_START_TIMES[0]
+ ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
+ MSG_DELAYS[1].1
);
}
1 => {
assert_eq!(lc.start_time, LC_START_TIMES[1]);
assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
assert_eq!(
lc.end_time(),
LC_START_TIMES[1]
+ ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
+ MSG_DELAYS[1].1
);
}
_ => {
assert_eq!(true, false, "too many lifecycles detected {}", i)
}
}
}
let interims_lcs = get_interims_lifecycles_as_hashmap(&a);
println!("have {} interims lifecycles", interims_lcs.len());
let mapped_lcs = get_mapped_lifecycles_as_hashmap(&interims_lcs);
println!("have mapped lifecycles: {:?}", mapped_lcs);
for _i in 0..NUMBER_MSGS {
let rm = rx2.recv();
assert_eq!(rm.is_err(), false);
let m = rm.unwrap();
assert!(m.lifecycle != 0);
assert!(
mapped_lcs.get(&m.lifecycle).is_some(),
"no mapped_lcs for lc id {}",
&m.lifecycle
);
assert!(mapped_lcs.get(&m.lifecycle).unwrap().was_merged().is_none());
}
assert_eq!(rx2.recv().is_err(), true);
} else {
assert_eq!(true, false);
};
}
#[test]
fn gen_two_lcs_two_ecus() {
let (tx, rx1) = channel();
let (tx1, rx) = channel();
let (tx2, rx2) = channel();
const NUMBER_PER_MSG_CAT: usize = 50;
const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
const NUMBER_MSGS: usize = LC_START_TIMES.len() * NUMBER_PER_MSG_CAT * MSG_DELAYS.len();
for ecu in 0x45..0x47 {
let gen_lc1 = MessageGenerator::new(
LC_START_TIMES[0],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
MessageGeneratorOptions {
ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, ecu]),
..Default::default()
},
);
for m in gen_lc1 {
tx.send(m).unwrap();
}
let gen_lc2 = MessageGenerator::new(
LC_START_TIMES[1],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
MessageGeneratorOptions {
ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, ecu]),
..Default::default()
},
);
for m in gen_lc2 {
tx.send(m).unwrap();
}
}
drop(tx);
let mut sort_buffer: std::vec::Vec<DltMessage> =
std::vec::Vec::with_capacity(2 * NUMBER_MSGS);
for m in rx1 {
sort_buffer.push(m);
}
sort_buffer.sort_by(|a, b| a.reception_time_us.cmp(&b.reception_time_us));
for m in sort_buffer.into_iter() {
tx1.send(m).unwrap();
}
drop(tx1);
let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
println!("have {} interims lifecycles", lcs_r.len());
if let Some(a) = lcs_r.read() {
println!("have interims lifecycles");
for (id, b) in a.iter() {
println!("lcs_r content id={:?} lc={:?}", id, b);
}
let mut final_lcs: std::vec::Vec<&Lifecycle> = a
.iter()
.filter(|(_id, b)| b.get_one().unwrap().was_merged().is_none())
.map(|(_id, b)| b.get_one().unwrap())
.collect();
println!("have {} final lifecycles", final_lcs.len());
final_lcs.sort_by(|a, b| {
if a.start_time == b.start_time {
a.id.cmp(&b.id)
} else {
a.start_time.cmp(&b.start_time)
}
});
for (i, lc) in final_lcs.iter().enumerate() {
println!("lc={:?}", lc);
match i {
0 | 1 => {
assert_eq!(lc.start_time, LC_START_TIMES[0]);
assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
assert_eq!(
lc.end_time(),
LC_START_TIMES[0]
+ ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
+ MSG_DELAYS[1].1
);
}
2 | 3 => {
assert_eq!(lc.start_time, LC_START_TIMES[1]);
assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
assert_eq!(
lc.end_time(),
LC_START_TIMES[1]
+ ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
+ MSG_DELAYS[1].1
);
}
_ => {
assert_eq!(true, false, "too many lifecycles detected {}", i)
}
}
}
let interims_lcs = get_interims_lifecycles_as_hashmap(&a);
println!("have {} interims lifecycles", interims_lcs.len());
let mapped_lcs = get_mapped_lifecycles_as_hashmap(&interims_lcs);
println!("have mapped lifecycles: {:?}", mapped_lcs);
for _i in 0..2 * NUMBER_MSGS {
let rm = rx2.recv();
assert_eq!(rm.is_err(), false);
let m = rm.unwrap();
assert!(m.lifecycle != 0);
assert!(
mapped_lcs.get(&m.lifecycle).is_some(),
"no mapped_lcs for lc id {}",
&m.lifecycle
);
assert!(mapped_lcs.get(&m.lifecycle).unwrap().was_merged().is_none());
}
assert_eq!(rx2.recv().is_err(), true);
} else {
assert_eq!(true, false);
};
}
struct SortedDltMessage {
m: DltMessage,
lc_start_time: u64,
}
impl std::cmp::PartialEq for SortedDltMessage {
fn eq(&self, other: &Self) -> bool {
self.lc_start_time + self.m.timestamp_us()
== other.lc_start_time + other.m.timestamp_us()
}
}
impl std::cmp::Ord for SortedDltMessage {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.m.lifecycle == other.m.lifecycle {
self.m.timestamp_dms.cmp(&other.m.timestamp_dms)
} else {
let t1 = self.lc_start_time + self.m.timestamp_us();
let t2 = other.lc_start_time + other.m.timestamp_us();
t1.cmp(&t2)
}
}
}
impl std::cmp::PartialOrd for SortedDltMessage {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::cmp::Eq for SortedDltMessage {}
#[test]
fn async_lc_export_sorted() {
use crate::utils::*;
let (tx, rx) = channel();
let (tx2, rx2) = channel();
const NUMBER_PER_MSG_CAT: usize = 50;
const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
let t1 = std::thread::spawn(move || {
let gen_lc1 = MessageGenerator::new(
LC_START_TIMES[0],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
Default::default(),
);
for m in gen_lc1 {
tx.send(m).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
let gen_lc2 = MessageGenerator::new(
LC_START_TIMES[1],
&MSG_DELAYS,
NUMBER_PER_MSG_CAT,
Default::default(),
);
for m in gen_lc2 {
tx.send(m).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
let t2 = std::thread::spawn(move || parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2));
let (tx3, rx3) = channel();
let t3 = std::thread::spawn(move || {
buffer_elements(
rx2,
tx3,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(40),
},
)
});
let t4 = std::thread::spawn(move || {
let mut buffer = std::collections::VecDeque::<SortedDltMessage>::with_capacity(100);
let mut last_time = 0;
for m in rx3 {
let read = lcs_r.read().unwrap();
let interims_lcs = get_interims_lifecycles_as_hashmap(&read);
let lc_start_time = interims_lcs
.get(&m.lifecycle)
.unwrap()
.final_lc(&interims_lcs)
.start_time;
let s_m = SortedDltMessage {
m: m,
lc_start_time,
};
if buffer.len() == buffer.capacity() {
let s_m2 = buffer.pop_front().unwrap();
let s_m2_time = s_m2.lc_start_time + s_m2.m.timestamp_us();
println!(
"received msg with lc_start_time {} {:?}",
s_m2.lc_start_time, s_m2.m
);
assert!(
last_time <= s_m2_time,
"last_time={} vs {} with msg {:?}",
last_time,
s_m2_time,
s_m2.m
);
last_time = s_m2_time;
}
let idx = buffer.binary_search(&s_m).unwrap_or_else(|x| x); buffer.insert(idx, s_m);
}
while buffer.len() > 0 {
let s_m2 = buffer.pop_front().unwrap();
let s_m2_time = s_m2.lc_start_time + s_m2.m.timestamp_us();
assert!(
last_time <= s_m2_time,
"last_time={} vs {}",
last_time,
s_m2_time
);
last_time = s_m2_time;
}
});
t1.join().unwrap();
let _lcs_w = t2.join().unwrap();
t3.join().unwrap();
t4.join().unwrap();
}
}