adlt 0.30.0

Library and tools to handle automotive DLT- (diagnostic log and trace) files.
Documentation
// todos:
// use https://lib.rs/crates/loom for concurrency testing (atomics,...)
// use https://lib.rs/crates/lasso for string interner or
// https://lib.rs/crates/arccstr or https://lib.rs/crates/arcstr
// once_cell for one time inits.
// use chrono::{Local, TimeZone};
use crate::{
    dlt::{
        control_msgs::parse_ctrl_sw_version_payload, DltChar4, DltMessage, DltMessageIndexType,
        SERVICE_ID_GET_SOFTWARE_VERSION,
    },
    utils::US_PER_SEC,
};
use std::hash::{Hash, Hasher};
use std::sync::mpsc::{Receiver, Sender};

pub type LifecycleId = u32;
pub type LifecycleItem = Lifecycle; // Box<Lifecycle>; V needs to be Eq+Hash+ShallowCopy (and Send?)
                                    // std::cell::RefCell misses ShallowCopy (makes sense as the destr wont be called properly to determine refcounts)
                                    // std::rc::Rc misses Send
                                    // std::sync::Arc ... cannot borrow data in an Arc as mutable -> mod.rs:149
                                    // RwLock&Mutex misses ShallowCopy, Eq and Hash

fn new_lifecycle_item(lc: &Lifecycle, idx: DltMessageIndexType) -> LifecycleItem {
    let mut lc = lc.clone();
    // we have to update the max_msg_index_update here as well as
    // otherwise buffered lcs are broadcasted with a too old index
    // and thus not send via remote
    if idx > lc.max_msg_index_update {
        lc.max_msg_index_update = idx;
    }
    lc
}

#[derive(Debug, Clone)]
pub struct ResumeLcInfo {
    pub id: LifecycleId,
    max_timestamp_us: u64,
    start_time: u64,
}

#[derive(Debug, Clone)]
pub struct Lifecycle {
    /// unique id
    id: LifecycleId,
    pub ecu: DltChar4,
    /// contains the number of messages belonging to this lifecycle. `0` indicates that this lifecycle is not valid anymore, e.g. was merged into different one.
    pub nr_msgs: u32,
    /// number of control request messages. They are counted additionaly to identify lifecycles that consists of only control request messages. See [Self::only_control_requests()].
    /// # Note: control request messages are treated differently as their timestamp is from a different clock domain (usually the logging device)
    pub nr_control_req_msgs: u32,
    /// contains the start time of this lifecycle. See [Self::end_time()] as well. During processing this start_time is adjusted.
    /// # Note:
    /// This is not the reception time of the first message but the calculated start time of that lifecycle.
    ///
    /// It's determined by MIN(reception time - timestamp) of all messages.
    ///
    /// The real start time will be slightly earlier as there is a minimal buffering time that is not considered / unknown.
    ///
    pub start_time: u64, // start time in us.
    initial_start_time: u64,
    min_timestamp_us: u64, // min. timestamp of the messages assigned to this lifecycle.
    max_timestamp_us: u64, // max. timestamp of the messages assigned to this lifecycle. Used to determine end_time()
    last_reception_time: u64, // last (should be max.) reception_time (i.e. from last message)

    /// was this a resumed lifecycle from another one?
    resume_lc: Option<ResumeLcInfo>,

    /// sw version detected for this lifecycle
    /// this is parsed from the control messages GET_SW_VERSION
    pub sw_version: Option<String>,

    /// the highest/maximum index of the msg that lead to an update
    /// this can be used as a heuristics to see whether the lifecycle was changed
    /// minor (time wise) updates will not be reflected due to buffering delays
    pub max_msg_index_update: DltMessageIndexType,
}

impl evmap::ShallowCopy for Lifecycle {
    unsafe fn shallow_copy(&self) -> std::mem::ManuallyDrop<Self> {
        std::mem::ManuallyDrop::new(self.clone())
    }
}

impl PartialEq for Lifecycle {
    fn eq(&self, other: &Self) -> bool {
        self.id == other.id
    }
}
impl Eq for Lifecycle {}
impl Hash for Lifecycle {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.id.hash(state);
    }
}

/// next lifecycle id. Zero is used as "no lifecycle" so first one must start with 1
static NEXT_LC_ID: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(1);

impl Lifecycle {
    /// returns the unique id of this lifecycle.
    /// # Note:
    /// `0` is never used. And is / can be used as "no lifecycle".
    ///
    /// Take care: lifecycle ids are unique above the overall process run time. So don't rely on the first one being 1.
    /// but we dont support a "persisted" id for filters as this would need a more complex logic for lookup.
    pub fn id(&self) -> u32 {
        self.id
    }

    /// returns the end time of this lifecycle.
    /// The end_time is the start_time plus the maximum timestamp of the messages belonging to this lifecycle.
    /// # Note:
    /// This can be either:
    /// * the time of the last log message of this lifecycle or
    /// * the time until the logs have been recorded but the lifecycle might be continued.
    pub fn end_time(&self) -> u64 {
        if self.max_timestamp_us == 0 {
            // for lifecyces without max_timestamp_us we return the last reception time
            self.last_reception_time
        } else {
            self.start_time + self.max_timestamp_us
        }
    }

    /// returns the resume time of this lifecycle in us.
    /// If no resume was detected this is equal to the start_time
    pub fn resume_time(&self) -> u64 {
        if let Some(resume_lc) = &self.resume_lc {
            self.start_time + self.min_timestamp_us
                - if resume_lc.start_time < self.start_time {
                    self.start_time - resume_lc.start_time
                } else {
                    0
                }
        } else {
            self.start_time
        }
    }

    /// returns the suspend duration of this lifecycle in us.
    /// If no resume was detected this is 0.
    /// Gets calculated by the distance of the calculated start times
    /// of this lc vs. the resumed one.
    pub fn suspend_duration(&self) -> u64 {
        if let Some(resume_lc) = &self.resume_lc {
            if resume_lc.start_time < self.start_time {
                self.start_time - resume_lc.start_time
            } else {
                0
            }
        } else {
            0
        }
    }

    /// returns whether this lifecycle contains only control request messages.
    /// ### Note: the info is wrong on merged lifecycles (we want to get rid of them anyhow)
    pub fn only_control_requests(&self) -> bool {
        self.nr_control_req_msgs >= self.nr_msgs
    }

    /// returns whether this lifecycle is a "suspend/resume" lifecycle.
    pub fn is_resume(&self) -> bool {
        self.resume_lc.is_some()
    }

    /// create a new lifecycle with the first msg passed as parameter
    pub fn new(msg: &mut DltMessage) -> Lifecycle {
        // println!("new lifecycle created by {:?}", msg);
        let is_ctrl_request = msg.is_ctrl_request();
        let timestamp_us = if is_ctrl_request {
            0
        } else {
            let tmsp = msg.timestamp_us();
            if tmsp > msg.reception_time_us {
                0 // tmsp > reception_tims_us is invalid, we ignore the tmsp!
            } else {
                tmsp
            }
        };

        let alc = Lifecycle {
            id: NEXT_LC_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
            ecu: msg.ecu,
            nr_msgs: 1,
            nr_control_req_msgs: u32::from(is_ctrl_request),
            start_time: msg.reception_time_us - timestamp_us,
            initial_start_time: msg.reception_time_us - timestamp_us,
            min_timestamp_us: timestamp_us,
            max_timestamp_us: timestamp_us,
            last_reception_time: msg.reception_time_us,
            resume_lc: None,
            sw_version: None, // might be wrongif the first message is a GET_SW_VERSION but we ignore this case
            max_msg_index_update: msg.index,
        };
        msg.lifecycle = alc.id;
        alc
    }

    /// merge another lifecycle into this one.
    ///
    /// The other lifecycle afterwards indicates that it was merged with [Self::was_merged()]
    pub fn merge(&mut self, lc_to_merge: &mut Lifecycle) {
        assert_ne!(lc_to_merge.nr_msgs, 0);
        self.nr_msgs += lc_to_merge.nr_msgs;
        self.nr_control_req_msgs += lc_to_merge.nr_control_req_msgs;
        lc_to_merge.nr_msgs = 0; // this indicates a merged lc
        if lc_to_merge.max_timestamp_us > self.max_timestamp_us {
            self.max_timestamp_us = lc_to_merge.max_timestamp_us;
        }
        if lc_to_merge.min_timestamp_us < self.min_timestamp_us {
            self.min_timestamp_us = lc_to_merge.min_timestamp_us;
        }
        if lc_to_merge.start_time < self.start_time {
            self.start_time = lc_to_merge.start_time;
            self.initial_start_time = lc_to_merge.initial_start_time;
        }
        if lc_to_merge.last_reception_time > self.last_reception_time {
            self.last_reception_time = lc_to_merge.last_reception_time;
        }
        // we mark this in the merged lc as max_timestamp_dms <- id
        lc_to_merge.max_timestamp_us = self.id as u64;
        lc_to_merge.start_time = u64::MAX;

        // if the lc_to_merge has a sw_version and we not, we do use that one:
        if self.sw_version.is_none() && lc_to_merge.sw_version.is_some() {
            self.sw_version = lc_to_merge.sw_version.take();
        }

        if lc_to_merge.max_msg_index_update > self.max_msg_index_update {
            self.max_msg_index_update = lc_to_merge.max_msg_index_update;
        }
    }

    /// returns whether this lifecycled was merged (so is not valid any longer) into a different lifecycle.
    /// Returns None if not merged otherwise the interims lifecycle id of the lifecycle it was merged into.
    /// # Note:
    /// Take care the returned lifecycle id is interims as well and could be or will be merged as well into another lifecycle!
    ///
    pub fn was_merged(&self) -> Option<u32> {
        if self.nr_msgs == 0 {
            Some(self.max_timestamp_us as u32)
        } else {
            None
        }
    }

    /// update the Lifecycle. If this msg doesn't seem to belong to the current one
    /// a new lifecycle is created and returned.
    /// # TODOs:
    /// * ignore/handle control messages
    /// * if the lifecycle is longer than time x (e.g. a few mins) stop adjusting starttime to reduce impact of different clock speed/skews between recorder and ecu
    ///
    pub fn update(&mut self, msg: &mut DltMessage) -> Option<Lifecycle> {
        // check whether this msg belongs to the lifecycle:
        // 0) ignore any CTRL REQUEST msgs:
        if msg.is_ctrl_request() {
            // we dont check any params but
            // simply add to this one
            msg.lifecycle = self.id;
            self.nr_msgs += 1;
            self.nr_control_req_msgs += 1;
            return None;
        }

        // 1) the calc start time needs to be no later than the current end time
        // or
        // 2) the reception_time - timestamp <= reception_time from last msg
        // rationale for 2): there must be at least a gap of timestamp_us to last message if the message is from a new lifecycle
        // 3) msg has no timestamp. This is for logs without a timestamp (e.g. from SER) where no lifecycle detection is possible.
        let msg_timestamp_us = msg.timestamp_us();
        let msg_reception_time_us = msg.reception_time_us;
        let msg_lc_start = msg.reception_time_us - msg_timestamp_us;
        let cur_end_time = self.end_time();

        let is_part_of_cur_lc = msg_lc_start <= cur_end_time
            || msg_lc_start <= self.last_reception_time
            || !msg.standard_header.has_timestamp() /*(msg_timestamp_us == 0 && self.max_timestamp_us == 0)*/;

        // resume (e.g. from Android STR) detection:
        // - a gap of >MIN_RESUME_RECEPTION_TIME_GAP s in reception time
        // - msg.timestamp ~> last_timestamp
        // - a shift in the calc start time >MIN_RESUME_RECEPTION_TIME_GAP s (otherwise it was just e.g. a logger interuption)
        // - gap in reception time > gap in calc start time (as reception time consists of suspend_time + reconnect_time)
        //
        // note: with this we might identify a new lifecycle after a short lifecycle as a resume case. We handle this later and might "unresume" the lifecycle later.

        const MIN_RESUME_RECEPTION_TIME_GAP_US: u64 = US_PER_SEC * 10;
        let is_resume = (msg_reception_time_us
            >= self.last_reception_time + MIN_RESUME_RECEPTION_TIME_GAP_US)
            && (msg_timestamp_us >= self.max_timestamp_us)
            && (msg_lc_start >= self.start_time + MIN_RESUME_RECEPTION_TIME_GAP_US)
            && (msg_reception_time_us - self.last_reception_time > msg_lc_start - self.start_time);

        if !is_resume && is_part_of_cur_lc {
            // ok belongs to this lifecycle
            if self.min_timestamp_us > msg_timestamp_us {
                if let Some(resume_lc) = &self.resume_lc {
                    if msg_timestamp_us > resume_lc.max_timestamp_us {
                        // we ignore the buffered msgs from prev lc...
                        self.min_timestamp_us = msg_timestamp_us;
                    }
                } else {
                    self.min_timestamp_us = msg_timestamp_us;
                }
            }

            if self.max_timestamp_us < msg_timestamp_us {
                self.max_timestamp_us = msg_timestamp_us;
            } else if let Some(resume_lc) = &self.resume_lc {
                // we sometimes get in a resume case prev. msgs that had been buffered before the
                // suspend.
                // Best case we'd add those to the prev lc... (todo)
                // (those msgs will have a wrong calculated time in the resume lc...)
                // For now we consider them part of the resume lc as long as their timestamp
                // is >= half of the timestamp of prev lc:
                if msg_timestamp_us
                    < (resume_lc.max_timestamp_us - (resume_lc.max_timestamp_us / 8))
                {
                    // this was not a resume but a new lifecycle as condition 2 is not met any longer
                    /*println!(
                        "update: untagged resume lifecycle with msg with timestamp diff={}\n{:?}\nLC:{:?}",
                        resume_lc.max_timestamp_us - msg_timestamp_us,
                        msg, &self
                    );*/
                    self.resume_lc = None;
                    // afterwards we might be merged into prev lc as well
                }
            }

            /* if self.last_reception_time > msg_reception_time_us {
                // bug in dlt viewer: https://github.com/COVESA/dlt-viewer/issues/232
                // println!("msg.update reception time going backwards! LC:{:?} {:?} {}", self.last_reception_time, msg.reception_time_us, msg.index);
            } */
            self.last_reception_time = msg_reception_time_us;

            // does it move the start to earlier? (e.g. has a smaller buffering delay)
            // todo this can as well be caused by clock drift. Need to add clock_drift detection/compensation.
            // the clock drift is relative to the recording devices time clock.
            /* not needed as for buffered msgs the buffering delay is larger: if let Some(resume_lc) = &self.resume_lc {
                // for a resume case we take only msgs with timestamp > resume_lc.max_timestamp into account
                // so we do ignore the msgs that had been buffered before resume
                if msg_timestamp_us > resume_lc.max_timestamp_us && msg_lc_start < self.start_time {
                    self.start_time = msg_lc_start;
                }
            } else */
            if msg_lc_start < self.start_time {
                self.start_time = msg_lc_start;
            }
            msg.lifecycle = self.id;
            self.nr_msgs += 1;
            if msg.index > self.max_msg_index_update {
                self.max_msg_index_update = msg.index;
            }

            // sw-version contained?
            if self.sw_version.is_none() && msg.is_ctrl_response() {
                let mut args = msg.into_iter();
                let message_id_arg = args.next();
                let message_id = match message_id_arg {
                    Some(a) => {
                        if a.is_big_endian {
                            u32::from_be_bytes(a.payload_raw.get(0..4).unwrap().try_into().unwrap())
                        } else {
                            u32::from_le_bytes(a.payload_raw.get(0..4).unwrap().try_into().unwrap())
                        }
                    }
                    None => 0,
                };
                if message_id == SERVICE_ID_GET_SOFTWARE_VERSION {
                    let payload_arg = args.next();
                    let (payload, is_big_endian) = match payload_arg {
                        Some(a) => (a.payload_raw, a.is_big_endian),
                        None => (&[] as &[u8], false),
                    };
                    if payload.len() >= 5 {
                        if let Some(sw_vers) =
                            parse_ctrl_sw_version_payload(is_big_endian, &payload[1..])
                        {
                            self.sw_version = Some(sw_vers);
                        }
                    }
                }
            }

            None
        } else {
            /* if is_resume && is_part_of_cur_lc
            {
                println!(
                    "update: new resume lifecycle created by\n{:?}\ntimestamp_diff={}\nlc_start_diff= {}\nrecp_time_diff={}\nLC:{:?}",
                    msg, msg_timestamp_us - self.max_timestamp_us, msg_lc_start - self.start_time,
                    msg_reception_time_us - self.last_reception_time, &self
                );
                // assert!(false, "is_resume!");
            } */
            // new lifecycle:
            let mut lc = Lifecycle::new(msg);
            if is_resume {
                lc.resume_lc = Some(ResumeLcInfo {
                    id: self.id,
                    start_time: self.start_time,
                    max_timestamp_us: self.max_timestamp_us,
                });
            }
            Some(lc)
        }
    }
}

/// Calculate lifecycles from a stream of DltMessages.
/// # Assumptions:
/// * the stream is per ecu in the order as generated by the ecu. So messages are not in random order.
/// * if the origin are multiple files they are sorted by reception time already before sending to here
///
/// The messages are passed via a stream and will be forwarded to one after processing.
/// Messages might be buffered internally and are only forwarded as soon as the lifecycle is determined.
/// Messages are not sorted and output in same order as incoming.
///
/// # Examples
/// ````
/// let (tx, rx) = std::sync::mpsc::channel();
/// let (tx2, _rx2) = std::sync::mpsc::channel();
/// // add msgs here to the tx side
/// // tx.send(msg);
/// drop(tx); // close the channel tx to indicate last msg otherwise the function wont end
/// let (_lcs_r, lcs_w) = evmap::new::<adlt::lifecycle::LifecycleId, adlt::lifecycle::LifecycleItem>();
/// let lcs_w = adlt::lifecycle::parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
/// ````
/// # Note
/// As soon as the lcs_w is dropped the lcs_r returns no data. That's why the lcs_w is returned and
/// can be used e.g. by the caller even if a thread is spawned like
/// ````
/// let (tx, rx) = std::sync::mpsc::channel();
/// let (tx2, _rx2) = std::sync::mpsc::channel();
/// // add msgs here to the tx side
/// // tx.send(msg);
/// drop(tx); // close the channel tx to indicate last msg otherwise the function wont end
/// let (lcs_r, lcs_w) = evmap::new::<adlt::lifecycle::LifecycleId, adlt::lifecycle::LifecycleItem>();
/// let t = std::thread::spawn(move || adlt::lifecycle::parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2));
/// let lcs_w = t.join().unwrap();
/// // now lcs_r still contains valid data!
/// ````
pub fn parse_lifecycles_buffered_from_stream<M, S>(
    mut lcs_w: evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>,
    inflow: Receiver<DltMessage>,
    outflow: Sender<DltMessage>,
) -> evmap::WriteHandle<LifecycleId, LifecycleItem, M, S>
where
    S: std::hash::BuildHasher + Clone,
    M: 'static + Clone,
{
    let max_buffering_delay_us: u64 = 60_000_000; // 60s

    // create a map of ecu:vec<lifecycle.id>
    // we can maintain that here as we're the only one modifying the lcs_ evmap
    let mut ecu_map: std::collections::HashMap<DltChar4, Vec<Lifecycle>> =
        std::collections::HashMap::new();

    if let Some(lci) = lcs_w.read() {
        for (_id, b) in &lci {
            let lc = b.get_one().unwrap();
            match ecu_map.get_mut(&lc.ecu) {
                None => {
                    //ecu_map.insert(lc.ecu.clone(), [lc.id].to_vec());
                    ecu_map.insert(lc.ecu, [lc.clone()].to_vec());
                }
                Some(v) => v.push(lc.clone()),
            }
        }
    }

    /*
    println!("parse_lifecycles_buffered_from_stream. Have ecu_map.len={}", ecu_map.len());
    for (k, v) in &ecu_map {
        println!("Have for ecu {:?} {:?}", &k, &v);
    } */

    // we buffer all messages until we do treat the lifecycles as stable (e.g. likelhood of being merged with prev one low)
    // we buffer all messages in the same order as they arrive. So not e.g per ECU as we want to output them in the same order.
    let mut buffered_msgs: std::collections::VecDeque<DltMessage> =
        std::collections::VecDeque::with_capacity(10_000_000); // todo what is a good value to buffer at least 60s?
                                                               // the lifecycles that have a likelyhood of being merged with prev or changing start times are kept here:
    let mut buffered_lcs: std::collections::HashSet<LifecycleId> = std::collections::HashSet::new();

    // todo add check that msg.received times increase monotonically! (ignoring the dlt viewer bug)
    let mut next_buffer_check_time: u64 = 0;
    let mut merged_needed_id: LifecycleId = 0;
    let start = std::time::Instant::now();
    let mut lcs_w_needs_refresh = false;
    let mut last_msg_index: DltMessageIndexType = 0;
    for mut msg in inflow {
        /* if msg.ecu == DltChar4::from_str("ECU").unwrap() && msg.timestamp_dms > 0 {
            println!(
                "got msg:{} {:?}:{:?} {} {}",
                msg.index,
                msg.apid(),
                msg.ctid(),
                msg.reception_time_us,
                msg.timestamp_dms
            );
        }*/
        // get the lifecycles for the ecu from that msg:
        last_msg_index = msg.index;
        let msg_reception_time = msg.reception_time_us;

        let msg_timestamp_us = msg.timestamp_us();

        let ecu_lcs = ecu_map.entry(msg.ecu).or_insert_with(Vec::new);

        let ecu_lcs_len = ecu_lcs.len();
        if ecu_lcs_len > 0 {
            // get LC with that id:
            let (last_lc, rest_lcs) = ecu_lcs.as_mut_slice().split_last_mut().unwrap();
            let lc2 = last_lc;
            let mut remove_last_lc = false;
            match lc2.update(&mut msg) {
                None => {
                    // lc2 was updated
                    // now we have to check whether it overlaps with the prev. one and needs to be merged:
                    if ecu_lcs_len > 1 {
                        let prev_lc = rest_lcs.last_mut().unwrap(); // : &mut Lifecycle = &mut last_lcs[ecu_lcs_len - 2];
                        if lc2.start_time <= prev_lc.end_time() && !lc2.is_resume() {
                            // todo consider clock skew here. the earliest start time needs to be close to the prev start time and not just within...
                            //println!("merge needed:\n {:?}\n {:?}", prev_lc, lc2);
                            // we merge into the prev. one (so use the prev.one only)
                            let is_buffered = buffered_lcs.contains(&prev_lc.id);
                            if is_buffered {
                                // the buffered lcs shall be merged again (so lc2 is invalid afterwards)
                                // this is easy now:
                                prev_lc.merge(lc2);
                                msg.lifecycle = prev_lc.id;
                                // and now update the buffered msgs:
                                {
                                    buffered_msgs.iter_mut().for_each(|m| {
                                        /*println!(
                                            "modifying lifecycle from {} to {} for {:?}",
                                            lc2.id, prev_lc.id, m
                                        );*/
                                        if m.lifecycle == lc2.id {
                                            m.lifecycle = prev_lc.id;
                                        }
                                    });
                                };
                                // we can delete the buffered_lcs elem now:
                                assert!(
                                    buffered_lcs.contains(&lc2.id),
                                    "buffered_lcs does not contain {} msg:{:?}",
                                    lc2.id,
                                    msg
                                ); // logical error otherwise
                                buffered_lcs.remove(&lc2.id);
                                remove_last_lc = true;
                                // if we have no more yet, send the other msgs: (not possible as prev_lc exists)
                            } else {
                                #[allow(clippy::collapsible_else_if)]
                                if merged_needed_id != lc2.id {
                                    println!("merge needed but prev_lc not buffered anymore! (todo!):\n {:?}\n {:?} msg #{}", prev_lc, lc2, last_msg_index);
                                    merged_needed_id = lc2.id;
                                }
                                //panic!("todo shouldn't happen yet!");
                                // this is the rare case where there had been already 2 lifecycles from prev. run and now
                                // the 2nd got merged... todo think about how to handle that... as we dont want to have our callers
                                // have to support/handle interims lifecycles!
                                /*
                                prev_lc.merge(lc2);
                                lcs_w.update(prev_lc.id, *prev_lc);
                                // we will store lc2 later as the msgs still point to this one
                                // but we have to make sure that this is not ecu_lcs anymore
                                remove_last_lc = true;
                                // check whether prev_lc now overlaps with the prevprev one... todo
                                */
                            }
                        }
                    }

                    // quick fix for wrong lifecycle end/nr msgs, otherwise the next .contains_key might be wrong!
                    // happens when the lc2.id is not in buffered_lcs anymore.
                    // todo refresh logic needed, e.g. by option every x sec or every x msgs
                    if lcs_w_needs_refresh {
                        lcs_w.refresh();
                        lcs_w_needs_refresh = false;
                    }
                    if lcs_w.contains_key(&lc2.id) {
                        // this assert is met. so we can ignore the above prev_lc merge part assert!(!buffered_lcs.contains(&lc2.id));
                        // and prev_lc is still buffered as well to as well not contained.
                        // to update nr of msgs in lifecycle and end time:
                        lcs_w.update(lc2.id, new_lifecycle_item(lc2, last_msg_index));
                        lcs_w_needs_refresh = true;
                    }
                }
                Some(lc3) => {
                    // new lc was created (as calc. lc start_time was past prev lc end time)

                    // we buffer here the messages until its clear that this is really
                    // a new lifecycle and wont be merged soonish into the prev. lifecycle!
                    // this would allow us to still correct the msgs and dont have to handle the "interims" lifecycles later on!
                    // disadvantage is that we'd need to delay here the output. But we might have to do so anyhow later.
                    // can we define an upper limit on the time to delay? or some criteria on when to stop buffering?
                    // possible criteria:
                    // 1. once the new lifecycle overlaps the prev. one -> and needs a merge (see above)
                    // 2. once the new lifecycle contains messages with timestamp > x (max buffering at (start plus at runtime) buffer)
                    // println!("added lc id {} to buffered_lcs", lc3.id);
                    buffered_lcs.insert(lc3.id);
                    ecu_lcs.push(lc3);
                }
            }
            if remove_last_lc {
                let _removed = ecu_lcs.remove(ecu_lcs_len - 1);
                // assert!(!buffered_lcs.contains(&removed.id));
            }
        } else {
            // msg.ecu not known yet:
            let lc = Lifecycle::new(&mut msg);
            // even though the first lifecycle per ecu cannot disappear it still has to be buffered
            // as the 2nd lifecycle might want to merge into that one
            buffered_lcs.insert(lc.id);
            ecu_lcs.push(lc);
        }

        // if we have buffered lifecycles check whether we can stop buffering them and mark as valid ones:
        // once the lifecycle start even including a max buffering delay can not fit into the prev one any longer:
        // we do this only once per sec
        if next_buffer_check_time < msg_reception_time {
            let min_lc_start_time =
                if msg.reception_time_us > (msg_timestamp_us + max_buffering_delay_us) {
                    (msg.reception_time_us - msg_timestamp_us) - max_buffering_delay_us
                } else {
                    0
                };
            for ecu_lcs in ecu_map.values() {
                for lc in ecu_lcs.iter().rev() {
                    if !buffered_lcs.contains(&lc.id) {
                        break;
                    } else {
                        // this lc is still buffered:
                        if min_lc_start_time > lc.start_time {
                            //println!("confirmed buffered lc as min_lc_start_time {} > lc.start_time {}, confirmed lc={:?}", min_lc_start_time, lc.start_time, lc);
                            buffered_lcs.remove(&lc.id);
                            /*println!("remaining buffered_lcs={}", buffered_lcs.len());
                            for lc in &buffered_lcs {
                                println!(" buffered_lc={}", lc);
                            }*/
                            lcs_w.update(lc.id, new_lifecycle_item(lc, last_msg_index)); // update is safer and handles add case as well lcs_w.insert(lc.id, new_lifecycle_item(lc.clone()));
                            lcs_w_needs_refresh = true;

                            // if the first msg in buffered_msgs belongs to this confirmed lc
                            // then send all msgs until one msgs belongs to a buffered_lcs
                            let mut prune_lc_id = lc.id;
                            while !buffered_msgs.is_empty() {
                                let msg_lc = buffered_msgs[0].lifecycle;
                                if msg_lc == prune_lc_id {
                                    let msg = buffered_msgs.pop_front().unwrap(); // .remove(0);
                                    if lcs_w_needs_refresh {
                                        lcs_w.refresh();
                                        lcs_w_needs_refresh = false;
                                    }
                                    if let Err(e) = outflow.send(msg) {
                                        println!("parse_lifecycles_buffered_from_stream .send 1 got err={}", e);
                                        break; // exit. the receiver has stopped
                                    }
                                } else if !buffered_lcs.contains(&msg_lc) {
                                    prune_lc_id = msg_lc;
                                    // and we can delete right away
                                    let msg = buffered_msgs.pop_front().unwrap(); // .remove(0);
                                    if lcs_w_needs_refresh {
                                        lcs_w.refresh();
                                        lcs_w_needs_refresh = false;
                                    }
                                    if let Err(e) = outflow.send(msg) {
                                        println!("parse_lifecycles_buffered_from_stream .send 2 got err={}", e);
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                        }
                    }
                }
                next_buffer_check_time = msg_reception_time + US_PER_SEC; // in 1s again
            }
        }

        // pass msg to outflow only if we dont have buffered lcs:
        if !buffered_lcs.is_empty() {
            buffered_msgs.push_back(msg);
        } else {
            // todo slog... println!("sending non-buffered_msg {:?}", msg);
            if lcs_w_needs_refresh {
                lcs_w.refresh();
                lcs_w_needs_refresh = false;
            }
            if let Err(e) = outflow.send(msg) {
                println!(
                    "parse_lifecycles_buffered_from_stream .send 3 got err={}",
                    e
                );
                break;
            }
        }
    }

    // if we have still buffered lcs we have to make them valid now:
    //println!("adding {} buffered_lcs to lcs_w at end", buffered_lcs.len());
    for lc_id in buffered_lcs {
        'outer: for vs in ecu_map.values() {
            for v in vs {
                if v.id == lc_id {
                    lcs_w.update(lc_id, new_lifecycle_item(v, last_msg_index));
                    // println!("lcs_w content added at end id={:?} lc={:?}", lc_id, *v);
                    break 'outer;
                }
            }
        }
    }
    lcs_w.refresh();

    // if we have buffered msgs we have to output them now:
    for m in buffered_msgs.into_iter() {
        // println!("sending buffered_msg {:?}", m);
        if let Err(e) = outflow.send(m) {
            println!(
                "parse_lifecycles_buffered_from_stream .send 4 got err={}",
                e
            );
            break;
        }
    }

    /*
    println!(
        "After processing stream: Have ecu_map.len={}",
        ecu_map.len()
    );
    for (k, v) in &ecu_map {
        println!("Have for ecu {:?} {:?}", &k, &v);
    }

    for a in lcs_w.read().iter() {
        println!("lcs_w a...");
        for (id, b) in a {
            println!("lcs_w content id={:?} lc={:?}", id, b);
        }
    }*/
    let duration = start.elapsed();
    if duration > std::time::Duration::from_millis(1) {
        // println!("parse_lifecycles_buffered_from_stream took {:?}", duration);
    }
    lcs_w
}

/// return a vector of lifecycles sorted by start_time
/// asserts if an interims lifecycle is contained!
/// todo add example
pub fn get_sorted_lifecycles_as_vec<'a, M, S>(
    lcr: &'a evmap::MapReadRef<LifecycleId, LifecycleItem, M, S>,
) -> std::vec::Vec<&'a Lifecycle>
where
    S: std::hash::BuildHasher + Clone,
    M: 'static + Clone,
{
    let mut sorted_lcs: std::vec::Vec<&'a Lifecycle> = lcr
        .iter()
        .map(|(id, b)| {
            let lc = b.get_one().unwrap();
            assert_eq!(&lc.id, id);
            lc
        })
        .collect();
    sorted_lcs.sort_by(|a, b| a.start_time.cmp(&b.start_time));
    sorted_lcs
}

#[cfg(test)]
mod tests {
    //use super::*;
    use crate::dlt::*;
    use crate::lifecycle::*;
    use crate::utils::get_dlt_message_iterator;
    use crate::utils::LowMarkBufReader;
    use ntest::timeout;
    use std::fs::File;
    use std::str::FromStr;
    use std::sync::mpsc::channel;
    use std::time::Instant;
    extern crate nohash_hasher;
    #[test]
    fn one_ecu() {
        let (tx, rx) = channel();
        const NUMBER_ITERATIONS: usize = 2_000_000;
        let start = Instant::now();
        for _ in 0..NUMBER_ITERATIONS {
            tx.send(crate::dlt::DltMessage::for_test()).unwrap();
        }
        let duration = start.elapsed();
        println!(
            "Time elapsed sending {}msgs is: {:?}",
            NUMBER_ITERATIONS, duration
        );
        let (tx2, rx2) = channel();
        drop(tx);
        let (lcs_r, lcs_w) = evmap::Options::default()
            .with_hasher(nohash_hasher::BuildNoHashHasher::<LifecycleId>::default())
            .construct::<LifecycleId, LifecycleItem>(); //  evmap::new::<u32, Box<Lifecycle>>();
        let start = Instant::now();
        let t = std::thread::spawn(move || parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2));
        if let Some(a) = lcs_r.read() {
            println!("lcs_r content before join {:?}", a);
        }
        let lcs_w = t.join().unwrap();
        let duration = start.elapsed();
        println!(
            "Time elapsed parse_lifecycles {}msgs is: {:?}",
            NUMBER_ITERATIONS, duration
        );
        // all messages should be passed on
        let start = Instant::now();
        {
            let read_handle = lcs_r.read();
            assert!(read_handle.is_some());
            let read_handle = read_handle.unwrap();
            for i in 0..NUMBER_ITERATIONS {
                // check whether all msgs have a lifecycle:
                let m = rx2.recv();
                assert!(m.is_ok(), "{}th message missing", i + 1);
                let msg = m.unwrap();
                assert_ne!(msg.lifecycle, 0, "{}th message without lifecycle", i + 1);
                // check that the lifecycle is known as well: (this seems time consuming! around if omitted 90ms instead of 180ms)
                //let l = lcs_r.get_one(&msg.lifecycle);
                // using the read_handle its a lot faster: 106ms instead of 180ms/90ms
                let l = read_handle.get_one(&msg.lifecycle);
                assert!(l.is_some());
            }
        }
        let duration = start.elapsed();
        println!(
            "Time elapsed reading/verifying {}msgs is: {:?}",
            NUMBER_ITERATIONS, duration
        );
        assert!(rx2.recv().is_err());
        // and lifecycle info be available
        if let Some(a) = lcs_r.read() {
            println!("lcs_r content {:?}", a);
        }
        if let Some(a) = lcs_w.read() {
            for (id, b) in &a {
                println!("lcs_w2 content id={:?} lc={:?}", id, b);
            }
        }
        assert!(!lcs_r.is_empty(), "empty lcs!");
        assert_eq!(lcs_r.len(), 1, "wrong number of lcs!");
    }
    #[test]
    fn basics() {
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
        drop(tx);
        let (_lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
        assert!(rx2.recv().is_err());
    }
    #[test]
    fn basics_read_in_different_thread() {
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
        drop(tx);
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
        assert!(rx2.recv().is_err());
        let r = lcs_r;
        let t = std::thread::spawn(move || {
            if let Some(a) = r.read() {
                println!("r content {:?}", a);
            }
            assert_eq!(r.len(), 0);
        });
        t.join().unwrap();
    }

    #[test]
    fn lc_invalid_msg_timestamps() {
        // lifecycle use case 1:
        // one long lifecycle but with timestamp_dms all 0 (e.g. from Dlt-Viewer SER/ASC)
        let (tx, parse_lc_in) = channel();
        // 3 lifecycle messages:
        let mut m1 = crate::dlt::DltMessage::for_test();
        m1.timestamp_dms += 40_000 * 10; // that should now be invalid!
        assert!(m1.timestamp_us() > m1.reception_time_us);

        tx.send(m1).unwrap();
        drop(tx);
        let (parse_lc_out, _rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        assert_eq!(1, lcs_r.len(), "wrong number of lcs!");
        // todo
    }

    #[test]
    fn lc_uc_1() {
        // lifecycle use case 1:
        // one long lifecycle but with timestamp_dms all 0 (e.g. from Dlt-Viewer SER/ASC)
        // todo
    }

    #[test]
    fn lc_uc_2() {
        // lifecycle use case 2:
        // one lc but first message is with higher timestamp, then smaller ones
        // todo
    }

    #[test]
    fn lc_uc_3() {
        // lifecycle use case 3:
        // two lc with each first message is with higher timestamp, then smaller ones
        // todo
    }

    #[test]
    fn lc_uc_4() {
        // lifecycle use case 4:
        // three small lifecycles each 40s (<60s max_buffering_delay_us) long
        let (tx, parse_lc_in) = channel();
        // 3 lifecycle messages:
        let mut m1 = crate::dlt::DltMessage::for_test();
        m1.timestamp_dms = 40_000 * 10; // 40s
        m1.reception_time_us = m1.timestamp_us() + 1_000_000_000;

        let mut m2 = crate::dlt::DltMessage::for_test();
        m2.timestamp_dms = 40_000 * 10;
        m2.reception_time_us = m2.timestamp_us() + m1.reception_time_us + m1.timestamp_us() + 1;

        let mut m3 = crate::dlt::DltMessage::for_test();
        m3.timestamp_dms = 40_000 * 10;
        m3.reception_time_us = m3.timestamp_us() + m2.reception_time_us + m2.timestamp_us() + 1;

        tx.send(m1).unwrap();
        tx.send(m2).unwrap();
        tx.send(m3).unwrap();
        drop(tx);
        let (parse_lc_out, _rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        assert_eq!(3, lcs_r.len(), "wrong number of lcs!");
    }

    #[test]
    #[timeout(1000)]
    fn lc_uc_4_buf_delay_easy() {
        // lifecycle use case 4:
        // three small lifecycles each 70s (>60s max_buffering_delay_us) long, 1 one needs to be streamed from parse_lifecycles_buffered_from_stream while stream is not done
        let (tx, parse_lc_in) = channel();
        // 3 lifecycle messages:

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(70_000, 50_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(70_000, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        // next lifecycle starts 1ms after end of prev one (so at timestamp 70.001s)
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(70_000 + 70_001, 50_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(140_001, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(140_001 + 70_001, 50_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(210_002, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        let (parse_lc_out, rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();

        let t = std::thread::spawn(move || {
            for _ in 0..4 {
                // 4 messages can be received. two from first and the two from second lc
                assert!(rx.recv().is_ok()); // one msg can be received
            }
            drop(tx);
            rx
        });

        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        // wait until one message could be received
        let _rx = t.join().unwrap(); // need result to avoid channel being closed too early!

        assert_eq!(3, lcs_r.len(), "wrong number of lcs!");
    }
    #[test]
    #[timeout(1000)]
    fn lc_uc_4_buf_delay_easy2() {
        // lifecycle use case 4:
        // three small lifecycles each 70s (>60s max_buffering_delay_us) long, 1 one needs to be streamed from parse_lifecycles_buffered_from_stream while stream is not done
        let (tx, parse_lc_in) = channel();
        // 3 lifecycle messages:

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(70_000, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        // next lifecycle starts 1ms after end of prev one (so at timestamp 70.001s)
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(140_001, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(210_002, 70_000); // 0s buf delay
        tx.send(m1).unwrap();

        let (parse_lc_out, rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();

        let t = std::thread::spawn(move || {
            for _ in 0..2 {
                // 2 messages can be received. one from first and one from second lc
                assert!(rx.recv().is_ok()); // one msg can be received
            }
            drop(tx);
            rx
        });

        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        // wait until one message could be received
        let _rx = t.join().unwrap(); // need result to avoid channel being closed too early!

        assert_eq!(3, lcs_r.len(), "wrong number of lcs!");
    }

    #[test]
    #[timeout(1000)]
    fn lc_uc_4_buf_delay_small() {
        // lifecycle use case 4:
        // four small lifecycles each 40s (<60s max_buffering_delay_us) long, 1 one needs to be streamed from parse_lifecycles_buffered_from_stream while stream is not done
        // this is a bit trickier than the lc_uc_4_buf_delay_easy as the lifecycles itself are not confirmed directly
        // 4 lifecycle messages:
        let (tx, parse_lc_in) = channel();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(40_000, 20_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(40_000, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        // next lifecycle starts 1ms after end of prev one (so at timestamp 40.001s)
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(40_000 + 40_001, 20_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(80_001, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(80_001 + 40_001, 20_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(120_002, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(120_002 + 40_001, 20_000); // 20s buf delay
        tx.send(m1).unwrap();
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(160_003, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let (parse_lc_out, rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();

        let t = std::thread::spawn(move || {
            for _ in 0..6 {
                // 4 messages can be received. two from first LC and two from 2nd lc and two from 3rd
                assert!(rx.recv().is_ok()); // one msg can be received
            }
            assert!(rx
                .recv_timeout(std::time::Duration::from_millis(10))
                .is_err());
            drop(tx);
            rx
        });

        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        // wait until one message could be received
        let rx = t.join().unwrap(); // need result to avoid channel being closed too early!
        assert!(rx.try_recv().is_ok()); // now msgs can be recvd
        assert_eq!(4, lcs_r.len(), "wrong number of lcs!");
    }
    #[test]
    #[timeout(1000)]
    fn lc_uc_4_buf_delay_small2() {
        // lifecycle use case 4:
        // four small lifecycles each 40s (<60s max_buffering_delay_us) long, 1 one needs to be streamed from parse_lifecycles_buffered_from_stream while stream is not done
        // this is a bit trickier than the lc_uc_4_buf_delay_easy as the lifecycles itself are not confirmed directly
        // 4 lifecycle messages:
        let (tx, parse_lc_in) = channel();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(40_000, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        // next lifecycle starts 1ms after end of prev one (so at timestamp 40.001s)
        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(80_001, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(120_002, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let m1 = crate::dlt::DltMessage::for_test_rcv_tms_ms(160_003, 40_000); // 0s buf delay
        tx.send(m1).unwrap();

        let (parse_lc_out, rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();

        let t = std::thread::spawn(move || {
            for _ in 0..2 {
                // 2 messages can be received. one from first LC and one from 2nd lc
                assert!(rx.recv().is_ok()); // one msg can be received
            }
            assert!(rx
                .recv_timeout(std::time::Duration::from_millis(10))
                .is_err());
            drop(tx);
            rx
        });

        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        // wait until one message could be received
        let rx = t.join().unwrap(); // need result to avoid channel being closed too early!
        assert!(rx.try_recv().is_ok());

        assert_eq!(4, lcs_r.len(), "wrong number of lcs!");
    }

    #[test]
    fn lc_merge_1() {
        // test where 2nd lc gets merged into first
        let (tx, parse_lc_in) = channel();
        // 0s buffering delay assumed, lc start at 0
        tx.send(DltMessage::for_test_rcv_tms_ms(1_000, 1_000))
            .unwrap();
        // 1.5s buffering delay  -> but could be a new lifecycle as well with lc start at 1.5
        tx.send(DltMessage::for_test_rcv_tms_ms(2_000, 500))
            .unwrap();
        // 1s buffering delay
        tx.send(DltMessage::for_test_rcv_tms_ms(2_500, 1_500))
            .unwrap();

        drop(tx);
        let (parse_lc_out, _rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        assert_eq!(1, lcs_r.len(), "wrong number of lcs: {:?}", lcs_r.read());
    }

    #[test]
    fn lc_merge_2() {
        // test where 3rd lc gets merged into 2nd lc
        let (tx, parse_lc_in) = channel();
        // 0s buffering delay assumed, lc start at 0
        tx.send(DltMessage::for_test_rcv_tms_ms(1_000, 1_000))
            .unwrap();

        // new lc with 0s buf delay assumed, lc start at 2_000
        tx.send(DltMessage::for_test_rcv_tms_ms(3_000, 1_000))
            .unwrap();

        // 1.5s buffering delay  -> but could be a new lifecycle as well with lc start at 3.5
        tx.send(DltMessage::for_test_rcv_tms_ms(4_000, 500))
            .unwrap();
        // 1s buffering delay
        tx.send(DltMessage::for_test_rcv_tms_ms(4_500, 1_500))
            .unwrap();

        drop(tx);
        let (parse_lc_out, _rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        assert_eq!(2, lcs_r.len(), "wrong number of lcs!");
    }

    #[test]
    fn lc_merge_3() {
        // test where 3rd lc gets merged into 2nd lc and then into 1st lc
        let (tx, parse_lc_in) = channel();
        // 0s buffering delay assumed, lc start at 0
        tx.send(DltMessage::for_test_rcv_tms_ms(1_000, 1_000))
            .unwrap();

        // 2s buffering delay  -> but could be a new lifecycle as well with lc start at 2
        tx.send(DltMessage::for_test_rcv_tms_ms(3_000, 1_000))
            .unwrap();

        // 2s buffering delay  -> but could be a new lifecycle as well with lc start at 4
        tx.send(DltMessage::for_test_rcv_tms_ms(5_000, 1_00))
            .unwrap();
        // 1s buffering delay -> now we should have just one lifecycle
        tx.send(DltMessage::for_test_rcv_tms_ms(5_500, 4_500))
            .unwrap();
        // todo bug: but we currently need a 2nd message to trigger next merge
        tx.send(DltMessage::for_test_rcv_tms_ms(5_501, 4_501))
            .unwrap();

        drop(tx);
        let (parse_lc_out, _rx) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, parse_lc_in, parse_lc_out);
        assert_eq!(1, lcs_r.len(), "wrong number of lcs!");
    }

    /// a generator for messages to ease test scenarios for lifecycles
    struct MessageGenerator {
        msgs: std::vec::Vec<DltMessage>,
    }
    struct MessageGeneratorOptions {
        frequency: u64,
        ecu: DltChar4,
    }
    impl Default for MessageGeneratorOptions {
        fn default() -> Self {
            MessageGeneratorOptions {
                frequency: 1_000,
                ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, 0x45]),
            }
        }
    }

    impl MessageGenerator {
        fn new(
            lc_start_time: u64,
            initial_delays: &[(u64, u64)],
            nr_msgs: usize,
            options: MessageGeneratorOptions,
        ) -> MessageGenerator {
            let mut msgs: std::vec::Vec<DltMessage> = std::vec::Vec::new();
            for (buf_delay, start_delay) in initial_delays {
                for i in 0..nr_msgs {
                    let timestamp_us = start_delay + ((i as u64) * options.frequency); // frequency
                    let min_send_time = std::cmp::max(buf_delay + (i as u64), timestamp_us);
                    msgs.push(DltMessage {
                        index: i as crate::dlt::DltMessageIndexType,
                        reception_time_us: lc_start_time + min_send_time,
                        timestamp_dms: (timestamp_us / 100) as u32,
                        lifecycle: 0,
                        ecu: options.ecu,
                        standard_header: crate::dlt::DltStandardHeader {
                            htyp: DLT_STD_HDR_HAS_TIMESTAMP,
                            len: 0,
                            mcnt: 0,
                        },
                        extended_header: None,
                        payload: [].to_vec(),
                        payload_text: None,
                    });
                }
            }
            // sort msgs by reception time
            msgs.sort_by(|a, b| a.reception_time_us.cmp(&b.reception_time_us));
            MessageGenerator { msgs }
        }
    }

    impl Iterator for MessageGenerator {
        type Item = DltMessage;
        fn next(&mut self) -> Option<Self::Item> {
            // Check to see if we've finished counting or not.
            if !self.msgs.is_empty() {
                Some(self.msgs.remove(0))
            } else {
                None
            }
        }
    }

    #[test]
    fn gen_two_lcs() {
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
        const NUMBER_PER_MSG_CAT: usize = 50;
        const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
        const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
        const NUMBER_MSGS: usize = LC_START_TIMES.len() * NUMBER_PER_MSG_CAT * MSG_DELAYS.len();
        let gen_lc1 = MessageGenerator::new(
            LC_START_TIMES[0],
            &MSG_DELAYS,
            NUMBER_PER_MSG_CAT,
            Default::default(),
        );
        for m in gen_lc1 {
            tx.send(m).unwrap();
        }
        let gen_lc2 = MessageGenerator::new(
            LC_START_TIMES[1],
            &MSG_DELAYS,
            NUMBER_PER_MSG_CAT,
            Default::default(),
        );
        for m in gen_lc2 {
            tx.send(m).unwrap();
        }
        drop(tx);
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
        // now check the lifecycles:
        println!("have {} interims lifecycles", lcs_r.len());
        if let Some(a) = lcs_r.read() {
            println!("have interims lifecycles");
            for (id, b) in a.iter() {
                println!("lcs_r content id={:?} lc={:?}", id, b);
            }
            // view on final lifecycles:
            let mut final_lcs: std::vec::Vec<&Lifecycle> = a
                .iter()
                .filter(|(_id, b)| b.get_one().unwrap().was_merged().is_none())
                .map(|(_id, b)| b.get_one().unwrap())
                .collect();
            println!("have {} final lifecycles", final_lcs.len());
            final_lcs.sort_by(|a, b| a.start_time.cmp(&b.start_time));
            for (i, lc) in final_lcs.iter().enumerate() {
                println!("lc={:?}", lc);
                match i {
                    0 => {
                        assert_eq!(lc.start_time, LC_START_TIMES[0]);
                        assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
                        assert_eq!(
                            lc.end_time(),
                            LC_START_TIMES[0]
                                + ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
                                + MSG_DELAYS[1].1
                        );
                    }
                    1 => {
                        assert_eq!(lc.start_time, LC_START_TIMES[1]);
                        assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
                        assert_eq!(
                            lc.end_time(),
                            LC_START_TIMES[1]
                                + ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
                                + MSG_DELAYS[1].1
                        );
                    }
                    _ => {
                        assert_eq!(true, false, "too many lifecycles detected {}", i)
                    }
                }
            }

            // now check whether each message has a valid lifecycle in mapped_lcs:
            // the msg has only an interims lifecycle id which might point to a
            // lifecycle that has been merged into a different one later on
            // or will be merged later on.
            // We could modify the msg as well to point to the final lc.id but
            // for streaming that doesn't really work as the ids might change later
            // so using the mapped lifecycles gives the current view
            for _i in 0..NUMBER_MSGS {
                let rm = rx2.recv();
                assert!(rm.is_ok());
                let m = rm.unwrap();
                assert!(m.lifecycle != 0);
                assert!(
                    final_lcs.iter().any(|x| x.id == m.lifecycle),
                    "no mapped_lcs for lc id {}",
                    &m.lifecycle
                );
                assert!(final_lcs
                    .iter()
                    .find(|x| x.id == m.lifecycle)
                    .unwrap()
                    .was_merged()
                    .is_none());
                //println!("got msg:{:?}", rm.unwrap());
            }
            assert!(rx2.recv().is_err());
        } else {
            assert_eq!(true, false);
        };
    }
    #[test]
    fn gen_two_lcs_two_ecus() {
        let (tx, rx1) = channel();
        let (tx1, rx) = channel();
        let (tx2, rx2) = channel();
        const NUMBER_PER_MSG_CAT: usize = 50;
        const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
        const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
        const NUMBER_MSGS: usize = LC_START_TIMES.len() * NUMBER_PER_MSG_CAT * MSG_DELAYS.len();
        for ecu in 0x45..0x47 {
            let gen_lc1 = MessageGenerator::new(
                LC_START_TIMES[0],
                &MSG_DELAYS,
                NUMBER_PER_MSG_CAT,
                MessageGeneratorOptions {
                    ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, ecu]),
                    ..Default::default()
                },
            );
            for m in gen_lc1 {
                tx.send(m).unwrap();
            }
            let gen_lc2 = MessageGenerator::new(
                LC_START_TIMES[1],
                &MSG_DELAYS,
                NUMBER_PER_MSG_CAT,
                MessageGeneratorOptions {
                    ecu: DltChar4::from_buf(&[0x41, 0x42, 0x43, ecu]),
                    ..Default::default()
                },
            );
            for m in gen_lc2 {
                tx.send(m).unwrap();
            }
        }
        drop(tx);
        // need to sort again: (buffer_sort_elements is somewhat unuseable...)
        let mut sort_buffer: std::vec::Vec<DltMessage> =
            std::vec::Vec::with_capacity(2 * NUMBER_MSGS);
        for m in rx1 {
            sort_buffer.push(m);
        }
        sort_buffer.sort_by(|a, b| a.reception_time_us.cmp(&b.reception_time_us));
        for m in sort_buffer.into_iter() {
            tx1.send(m).unwrap();
        }
        drop(tx1);

        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let _lcs_w = parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2);
        // now check the lifecycles:
        println!("have {} interims lifecycles", lcs_r.len());
        if let Some(a) = lcs_r.read() {
            println!("have interims lifecycles");
            for (id, b) in a.iter() {
                println!("lcs_r content id={:?} lc={:?}", id, b);
            }
            // view on final lifecycles:
            let mut final_lcs: std::vec::Vec<&Lifecycle> = a
                .iter()
                .filter(|(_id, b)| b.get_one().unwrap().was_merged().is_none())
                .map(|(_id, b)| b.get_one().unwrap())
                .collect();
            println!("have {} final lifecycles", final_lcs.len());
            final_lcs.sort_by(|a, b| {
                if a.start_time == b.start_time {
                    a.id.cmp(&b.id)
                } else {
                    a.start_time.cmp(&b.start_time)
                }
            });
            for (i, lc) in final_lcs.iter().enumerate() {
                println!("lc={:?}", lc);
                match i {
                    0 | 1 => {
                        assert_eq!(lc.start_time, LC_START_TIMES[0]);
                        assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
                        assert_eq!(
                            lc.end_time(),
                            LC_START_TIMES[0]
                                + ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
                                + MSG_DELAYS[1].1
                        );
                    }
                    2 | 3 => {
                        assert_eq!(lc.start_time, LC_START_TIMES[1]);
                        assert_eq!(lc.nr_msgs as usize, NUMBER_PER_MSG_CAT * MSG_DELAYS.len());
                        assert_eq!(
                            lc.end_time(),
                            LC_START_TIMES[1]
                                + ((NUMBER_PER_MSG_CAT as u64 - 1) * 1_000)
                                + MSG_DELAYS[1].1
                        );
                    }
                    _ => {
                        assert_eq!(true, false, "too many lifecycles detected {}", i)
                    }
                }
            }

            // now check whether each message has a valid lifecycle in mapped_lcs:
            // the msg has only an interims lifecycle id which might point to a
            // lifecycle that has been merged into a different one later on
            // or will be merged later on.
            // We could modify the msg as well to point to the final lc.id but
            // for streaming that doesn't really work as the ids might change later
            // so using the mapped lifecycles gives the current view
            for _i in 0..2 * NUMBER_MSGS {
                let rm = rx2.recv();
                assert!(rm.is_ok());
                let m = rm.unwrap();
                assert!(m.lifecycle != 0);
                assert!(
                    final_lcs.iter().any(|x| x.id == m.lifecycle),
                    "no mapped_lcs for lc id {}",
                    &m.lifecycle
                );
                assert!(final_lcs
                    .iter()
                    .find(|x| x.id == m.lifecycle)
                    .unwrap()
                    .was_merged()
                    .is_none());
            }
            assert!(rx2.recv().is_err());
        } else {
            assert_eq!(true, false);
        };
    }

    struct SortedDltMessage {
        m: DltMessage,
        lc_start_time: u64,
    }
    impl std::cmp::PartialEq for SortedDltMessage {
        fn eq(&self, other: &Self) -> bool {
            self.lc_start_time + self.m.timestamp_us()
                == other.lc_start_time + other.m.timestamp_us()
        }
    }
    impl std::cmp::Ord for SortedDltMessage {
        fn cmp(&self, other: &Self) -> std::cmp::Ordering {
            if self.m.lifecycle == other.m.lifecycle {
                self.m.timestamp_dms.cmp(&other.m.timestamp_dms)
            } else {
                let t1 = self.lc_start_time + self.m.timestamp_us();
                let t2 = other.lc_start_time + other.m.timestamp_us();
                t1.cmp(&t2)
            }
        }
    }
    impl std::cmp::PartialOrd for SortedDltMessage {
        fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
            Some(self.cmp(other))
        }
    }
    impl std::cmp::Eq for SortedDltMessage {}

    #[test]
    fn async_lc_export_sorted() {
        use crate::utils::*;
        // lets try to model a real use-case:
        // export sorted messages async from a stream
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
        const NUMBER_PER_MSG_CAT: usize = 50;
        const MSG_DELAYS: [(u64, u64); 2] = [(45_000, 0), (30_000, 10_000)];
        const LC_START_TIMES: [u64; 2] = [1_000_000, 1_060_000];
        // const NUMBER_MSGS: usize = LC_START_TIMES.len() * NUMBER_PER_MSG_CAT * MSG_DELAYS.len();
        let t1 = std::thread::spawn(move || {
            let gen_lc1 = MessageGenerator::new(
                LC_START_TIMES[0],
                &MSG_DELAYS,
                NUMBER_PER_MSG_CAT,
                Default::default(),
            );
            for m in gen_lc1 {
                tx.send(m).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(10));
            }
            let gen_lc2 = MessageGenerator::new(
                LC_START_TIMES[1],
                &MSG_DELAYS,
                NUMBER_PER_MSG_CAT,
                Default::default(),
            );
            for m in gen_lc2 {
                tx.send(m).unwrap();
                std::thread::sleep(std::time::Duration::from_millis(10));
            }
            // not needed as done autom. drop(tx);
        });
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();
        let t2 = std::thread::spawn(move || parse_lifecycles_buffered_from_stream(lcs_w, rx, tx2));
        // now we need to buffer/delay the messages, to let the lcs settle a bit
        let (tx3, rx3) = channel();
        let t3 = std::thread::spawn(move || {
            buffer_elements(
                rx2,
                tx3,
                BufferElementsOptions {
                    amount: BufferElementsAmount::NumberElements(40),
                },
            )
        });
        // now sort them:
        let t4 = std::thread::spawn(move || {
            let mut buffer = std::collections::VecDeque::<SortedDltMessage>::with_capacity(100);
            let mut last_time = 0;
            for m in rx3 {
                // todo that's still poor... slow...
                // we need to get the .read() but while keeping it we do block the writer...
                // we need to get a view on the current lifecycles:
                let read = lcs_r.read().unwrap();
                let interims_lcs = read.get_one(&m.lifecycle);
                let lc_start_time = interims_lcs.unwrap().start_time;
                let s_m = SortedDltMessage { m, lc_start_time };
                if buffer.len() == buffer.capacity() {
                    let s_m2 = buffer.pop_front().unwrap();
                    // todo verify
                    let s_m2_time = s_m2.lc_start_time + s_m2.m.timestamp_us();
                    println!(
                        "received msg with lc_start_time {} {:?}",
                        s_m2.lc_start_time, s_m2.m
                    );
                    assert!(
                        last_time <= s_m2_time,
                        "last_time={} vs {} with msg {:?}",
                        last_time,
                        s_m2_time,
                        s_m2.m
                    );
                    last_time = s_m2_time;
                }
                let idx = buffer.binary_search(&s_m).unwrap_or_else(|x| x); // todo this is not stable!
                buffer.insert(idx, s_m);
            }
            while !buffer.is_empty() {
                let s_m2 = buffer.pop_front().unwrap();
                // todo verify
                let s_m2_time = s_m2.lc_start_time + s_m2.m.timestamp_us();
                assert!(
                    last_time <= s_m2_time,
                    "last_time={} vs {}",
                    last_time,
                    s_m2_time
                );
                last_time = s_m2_time;
            }
        });

        t1.join().unwrap();
        let _lcs_w = t2.join().unwrap();
        t3.join().unwrap();
        t4.join().unwrap();
    }

    /// return a control message
    fn get_testmsg_control(big_endian: bool, noar: u8, payload_buf: &[u8]) -> DltMessage {
        let sh = DltStorageHeader {
            secs: 0,
            micros: 0,
            ecu: DltChar4::from_str("ECU1").unwrap(),
        };
        let exth = DltExtendedHeader {
            verb_mstp_mtin: 0x3 << 1 | (0x02 << 4),
            noar,
            apid: DltChar4::from_buf(b"DA1\0"),
            ctid: DltChar4::from_buf(b"DC1\0"),
        };
        let stdh = DltStandardHeader {
            htyp: 0x21
                | (if big_endian {
                    DLT_STD_HDR_BIG_ENDIAN
                } else {
                    0
                }),
            mcnt: 0,
            len: (DLT_MIN_STD_HEADER_SIZE + DLT_EXT_HEADER_SIZE + payload_buf.len()) as u16,
        };
        let mut add_header_buf = Vec::new();
        exth.to_write(&mut add_header_buf).unwrap();

        DltMessage::from_headers(1, sh, stdh, &add_header_buf, payload_buf.to_vec())
    }

    #[test]
    fn sw_version() {
        let mut m = get_testmsg_control(
            false,
            1,
            &[19, 0, 0, 0, 0, 4, 0, 0, 0, b'S', b'W', b' ', b'1'],
        );
        let mut m2 = get_testmsg_control(
            false,
            1,
            &[19, 0, 0, 0, 0, 4, 0, 0, 0, b'S', b'W', b' ', b'2'],
        );
        let mut lc = Lifecycle::new(&mut m);
        assert!(lc.sw_version.is_none()); // this is weird but currently accepted impl.
        lc.update(&mut m2);
        assert!(lc.sw_version.is_some());
        assert_eq!(lc.sw_version.unwrap(), "SW 2");
    }

    fn nr_lcs_for_file(file_name: &str) -> (DltMessageIndexType, usize) {
        let (tx_for_parse_thread, rx_from_parse_thread) = channel();
        let (lcs_r, lcs_w) = evmap::new::<LifecycleId, LifecycleItem>();

        let (tx_for_lc_thread, rx_from_lc_thread) = channel();
        let lc_thread = std::thread::spawn(move || {
            parse_lifecycles_buffered_from_stream(lcs_w, rx_from_parse_thread, tx_for_lc_thread)
        });
        let junk_thread = std::thread::spawn(move || for _msg in rx_from_lc_thread {});

        let fi = File::open(file_name).unwrap();
        const BUFREADER_CAPACITY: usize = 512 * 1024;
        let mut messages_processed: DltMessageIndexType = 0;
        let buf_reader = LowMarkBufReader::new(fi, BUFREADER_CAPACITY, DLT_MAX_STORAGE_MSG_SIZE);
        let mut it = get_dlt_message_iterator(
            std::path::Path::new(file_name)
                .extension()
                .and_then(|s| s.to_str())
                .unwrap_or(""),
            messages_processed,
            buf_reader,
            None,
        );
        for msg in it.by_ref() {
            messages_processed += 1;
            tx_for_parse_thread.send(msg).unwrap(); // todo handle error
        }
        drop(tx_for_parse_thread);

        // wait for the threads
        junk_thread.join().unwrap();
        let _lcs_w = lc_thread.join().unwrap();
        let nr_lcs = if let Some(a) = lcs_r.read() {
            let sorted_lcs = get_sorted_lifecycles_as_vec(&a);
            sorted_lcs.len()
        } else {
            0
        };

        (messages_processed, nr_lcs)
    }

    fn get_tests_filename(file_name: &str) -> std::path::PathBuf {
        let mut test_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
        test_dir.push("tests");
        test_dir.push(file_name);
        test_dir
    }

    #[test]
    fn lc_ex001() {
        // lc_ex001 should have 426464 msgs and 26 lifecycles:
        let ex001 = get_tests_filename("lc_ex001.dlt");
        if ex001.exists() {
            assert_eq!(nr_lcs_for_file(&ex001.to_string_lossy()), (426464, 26));
        } else {
            // consider adding to github...
            println!("skipped test lc_ex001 as file not available!");
        }
    }

    #[test]
    fn lc_ex002() {
        // very short lifecycle that start with timestamp 0
        assert_eq!(
            nr_lcs_for_file(&get_tests_filename("lc_ex002.dlt").to_string_lossy()),
            (11696, 4)
        );
    }
    #[test]
    fn lc_ex003() {
        // timestamp not available on all msgs -> 1 LC
        assert_eq!(
            nr_lcs_for_file(&get_tests_filename("lc_ex003.dlt").to_string_lossy()),
            (8045, 1)
        );
    }
}