use std::sync::mpsc::{Receiver, Sender};
pub enum BufferElementsAmount {
NumberElements(usize),
}
pub struct BufferElementsOptions {
pub amount: BufferElementsAmount,
}
pub fn buffer_elements<T>(inflow: Receiver<T>, outflow: Sender<T>, options: BufferElementsOptions) {
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap(); }
buffer.push_back(e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
pub fn buffer_sort_elements<T>(
inflow: Receiver<T>,
outflow: Sender<T>,
options: BufferElementsOptions,
) where
T: std::cmp::Ord,
{
match options.amount {
BufferElementsAmount::NumberElements(number_elems) => {
let mut buffer = std::collections::VecDeque::<T>::with_capacity(number_elems);
for e in inflow {
if buffer.len() == number_elems {
outflow.send(buffer.pop_front().unwrap()).unwrap();
}
let idx = buffer.binary_search(&e).unwrap_or_else(|x| x); buffer.insert(idx, e);
}
for e in buffer.into_iter() {
outflow.send(e).unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use crate::utils::*;
use std::sync::mpsc::channel;
use crate::{dlt::DltMessage};
#[test]
fn buffer_messages() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
for _ in 0..NUMBER_MSGS {
tx.send(DltMessage::for_test()).unwrap();
}
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
t.join().unwrap();
for i in 0..NUMBER_MSGS {
let mr = rx2.recv();
assert!(mr.is_ok(), "failed to get last msg#{}", i);
let m = mr.unwrap();
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
NUMBER_MSGS + i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
struct SortedMsg(DltMessage);
impl std::cmp::Ord for SortedMsg {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.timestamp_dms.cmp(&other.0.timestamp_dms)
}
}
impl std::cmp::PartialOrd for SortedMsg {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.timestamp_dms.cmp(&other.0.timestamp_dms))
}
}
impl PartialEq for SortedMsg {
fn eq(&self, other: &Self) -> bool {
self.0.timestamp_dms == other.0.timestamp_dms
}
}
impl Eq for SortedMsg {}
impl From<DltMessage> for SortedMsg {
fn from(msg: DltMessage) -> Self {
Self(msg)
}
}
#[test]
fn buffer_sort_messages() {
let (tx, rx) = channel();
const NUMBER_MSGS: usize = 1_000;
let mut msgs: std::vec::Vec<SortedMsg> = std::vec::Vec::with_capacity(NUMBER_MSGS);
for _ in 0..NUMBER_MSGS {
msgs.push(SortedMsg::from(crate::dlt::DltMessage::for_test()));
}
msgs.reverse();
let mut last_time_stamp = u32::MAX;
for m in msgs {
assert!(
m.0.timestamp_dms <= last_time_stamp,
"msg has wrong order/time_stamp! {} vs. exp. > {}",
m.0.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.0.timestamp_dms;
tx.send(m).unwrap();
}
let (tx2, rx2) = channel();
let t = std::thread::spawn(move || {
buffer_sort_elements(
rx,
tx2,
BufferElementsOptions {
amount: BufferElementsAmount::NumberElements(NUMBER_MSGS),
},
)
});
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
drop(tx);
let mut last_time_stamp = 0;
for i in 0..NUMBER_MSGS {
let mr = rx2.recv_timeout(std::time::Duration::from_millis(50));
assert!(mr.is_ok(), "failed to get msg#{}", i);
let m = mr.unwrap().0;
assert!(
m.timestamp_dms > last_time_stamp,
"msg#{} has wrong order/time_stamp! {} vs. exp. > {}",
i,
m.timestamp_dms,
last_time_stamp
);
last_time_stamp = m.timestamp_dms;
}
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
t.join().unwrap();
assert!(rx2
.recv_timeout(std::time::Duration::from_millis(50))
.is_err());
}
}