use std::{
cmp::{Ord, Ordering},
hash::Hasher,
slice::{Iter, IterMut},
};
use chrono::{DateTime, Utc};
use crate::{data::Data, id_hash::IdHash, packet::PacketId, utils::current_timestamp};
#[derive(Clone, Debug)]
pub struct DataSet {
pub timestamp: DateTime<Utc>,
set: Vec<Data>,
}
impl DataSet {
pub fn new() -> DataSet {
DataSet {
timestamp: current_timestamp(),
set: Vec::new(),
}
}
pub fn from_data(timestamp: DateTime<Utc>, set: Vec<Data>) -> DataSet {
DataSet { timestamp, set }
}
pub fn len(&self) -> usize {
self.set.len()
}
pub fn is_empty(&self) -> bool {
self.set.is_empty()
}
pub fn as_data_slice(&self) -> &[Data] {
&self.set[..]
}
pub fn add_data(&mut self, data: Data) {
let timestamp = data.as_header().timestamp;
let position = self.set.iter().position(|d| d.eq(&data));
match position {
Some(index) => self.set[index] = data,
None => self.set.push(data),
};
if self.timestamp < timestamp {
self.timestamp = timestamp;
}
}
pub fn add_data_set(&mut self, data_set: DataSet) {
let timestamp = data_set.timestamp;
for data in data_set.set.into_iter() {
self.add_data(data);
}
if self.timestamp < timestamp {
self.timestamp = timestamp;
}
}
pub fn remove_all_data(&mut self) {
self.set.clear();
}
pub fn remove_data_older_than(&mut self, min_timestamp: DateTime<Utc>) {
self.set
.retain(|data| data.as_header().timestamp >= min_timestamp);
}
pub fn clear_all_packets(&mut self) {
for data in self.set.iter_mut() {
if let Data::Packet(ref mut packet) = *data {
if packet.frame_count > 0 {
packet.frame_count = 0;
}
}
}
}
pub fn clear_packets_older_than(&mut self, min_timestamp: DateTime<Utc>) {
for data in self.set.iter_mut() {
if let Data::Packet(ref mut packet) = *data {
if packet.header.timestamp < min_timestamp && packet.frame_count > 0 {
packet.frame_count = 0;
}
}
}
}
pub fn iter(&self) -> Iter<'_, Data> {
self.set.iter()
}
pub fn iter_mut(&mut self) -> IterMut<'_, Data> {
self.set.iter_mut()
}
pub fn sort(&mut self) {
self.set.sort_by(|l, r| l.partial_cmp(r).unwrap());
}
pub fn sort_by<F>(&mut self, f: F)
where
F: FnMut(&Data, &Data) -> Ordering,
{
self.set.sort_by(f);
}
pub fn sort_by_id_slice(&mut self, ids: &[PacketId]) {
self.sort_by(|l, r| {
if l.is_packet() && r.is_packet() {
let l_id = l.as_packet().packet_id();
let r_id = r.as_packet().packet_id();
let l_pos = ids.iter().position(|id| *id == l_id);
let r_pos = ids.iter().position(|id| *id == r_id);
if l_pos.is_some() && r_pos.is_some() {
l_pos.cmp(&r_pos)
} else if l_pos.is_some() {
Ordering::Less
} else if r_pos.is_some() {
Ordering::Greater
} else {
l_id.cmp(&r_id)
}
} else if l.is_packet() {
Ordering::Less
} else if r.is_packet() {
Ordering::Greater
} else {
Ordering::Equal
}
})
}
}
impl IdHash for DataSet {
fn id_hash<H: Hasher>(&self, h: &mut H) {
for data in self.set.iter() {
data.id_hash(h);
}
}
}
impl Default for DataSet {
fn default() -> DataSet {
DataSet::new()
}
}
impl AsRef<[Data]> for DataSet {
fn as_ref(&self) -> &[Data] {
&self.set
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, TimeZone};
use crate::{
id_hash::id_hash,
live_data_decoder::data_from_checked_bytes,
test_data::{LIVE_DATA_1, LIVE_TELEGRAM_1},
};
#[test]
fn test_add_data() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let packet_data = data_from_checked_bytes(timestamp, channel, &LIVE_DATA_1[0..]);
let dgram_data = data_from_checked_bytes(timestamp, channel, &LIVE_DATA_1[352..]);
let tgram_data = data_from_checked_bytes(timestamp, channel, &LIVE_TELEGRAM_1[0..]);
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
assert_eq!(0, data_set.as_data_slice().len());
data_set.add_data(packet_data.clone());
assert_eq!(timestamp, data_set.timestamp);
assert_eq!(1, data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
let other_timestamp = timestamp + Duration::seconds(1);
let data = data_from_checked_bytes(other_timestamp, channel, &LIVE_DATA_1[0..]);
data_set.add_data(data);
assert_eq!(other_timestamp, data_set.timestamp);
assert_eq!(1, data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
let other_channel = channel + 1;
let data = data_from_checked_bytes(timestamp, other_channel, &LIVE_DATA_1[0..]);
data_set.add_data(data);
assert_eq!(other_timestamp, data_set.timestamp);
assert_eq!(2, data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[1].id_string()
);
data_set.add_data(dgram_data.clone());
assert_eq!(other_timestamp, data_set.timestamp);
assert_eq!(3, data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[2].id_string()
);
data_set.add_data(tgram_data.clone());
assert_eq!(other_timestamp, data_set.timestamp);
assert_eq!(4, data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[2].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[3].id_string()
);
}
#[test]
fn test_add_data_set() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_TELEGRAM_1[0..],
));
let mut other_data_set = DataSet::new();
other_data_set.timestamp = Utc.timestamp(0, 0);
other_data_set.add_data_set(data_set);
assert_eq!(timestamp, other_data_set.timestamp);
assert_eq!(3, other_data_set.as_data_slice().len());
assert_eq!(
"11_0010_7E11_10_0100",
other_data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
other_data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
other_data_set.as_data_slice()[2].id_string()
);
}
#[test]
fn test_remove_data_older_than() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(10),
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(20),
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(30),
channel,
&LIVE_TELEGRAM_1[0..],
));
data_set.remove_data_older_than(timestamp + Duration::seconds(20));
assert_eq!(timestamp + Duration::seconds(30), data_set.timestamp);
assert_eq!(2, data_set.as_data_slice().len());
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[1].id_string()
);
}
#[test]
fn test_clear_packets_older_than() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(10),
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(20),
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp + Duration::seconds(30),
channel,
&LIVE_TELEGRAM_1[0..],
));
data_set.clear_packets_older_than(timestamp + Duration::seconds(20));
assert_eq!(timestamp + Duration::seconds(30), data_set.timestamp);
let data_slice = data_set.as_data_slice();
assert_eq!(3, data_slice.len());
assert_eq!("11_0010_7E11_10_0100", data_slice[0].id_string());
if let Data::Packet(ref packet) = data_slice[0] {
assert_eq!(0, packet.frame_count);
} else {
panic!("First element should have been a packet");
}
assert_eq!("11_0000_7E11_20_0500_0000", data_slice[1].id_string());
assert_eq!("11_7771_2011_30_25", data_slice[2].id_string());
}
#[test]
fn test_sort() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp,
channel + 1,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_TELEGRAM_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[258..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[242..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[172..],
));
assert_eq!(7, data_set.as_data_slice().len());
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[2].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[3].id_string()
);
assert_eq!(
"11_6651_7E11_10_0200",
data_set.as_data_slice()[4].id_string()
);
assert_eq!(
"11_0010_7E22_10_0100",
data_set.as_data_slice()[5].id_string()
);
assert_eq!(
"11_0015_7E11_10_0100",
data_set.as_data_slice()[6].id_string()
);
data_set.sort();
assert_eq!(7, data_set.as_data_slice().len());
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0010_7E22_10_0100",
data_set.as_data_slice()[2].id_string()
);
assert_eq!(
"11_0015_7E11_10_0100",
data_set.as_data_slice()[3].id_string()
);
assert_eq!(
"11_6651_7E11_10_0200",
data_set.as_data_slice()[4].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[5].id_string()
);
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[6].id_string()
);
}
#[test]
fn test_sort_by() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp,
channel + 1,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_TELEGRAM_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[258..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[242..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[172..],
));
assert_eq!(7, data_set.as_data_slice().len());
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[2].id_string()
);
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[3].id_string()
);
assert_eq!(
"11_6651_7E11_10_0200",
data_set.as_data_slice()[4].id_string()
);
assert_eq!(
"11_0010_7E22_10_0100",
data_set.as_data_slice()[5].id_string()
);
assert_eq!(
"11_0015_7E11_10_0100",
data_set.as_data_slice()[6].id_string()
);
data_set.sort_by(|l, r| {
let l_id = &l.id_string()[8..];
let r_id = &r.id_string()[8..];
l_id.cmp(r_id)
});
assert_eq!(7, data_set.as_data_slice().len());
assert_eq!(
"11_7771_2011_30_25",
data_set.as_data_slice()[0].id_string()
);
assert_eq!(
"12_0010_7E11_10_0100",
data_set.as_data_slice()[1].id_string()
);
assert_eq!(
"11_0010_7E11_10_0100",
data_set.as_data_slice()[2].id_string()
);
assert_eq!(
"11_0015_7E11_10_0100",
data_set.as_data_slice()[3].id_string()
);
assert_eq!(
"11_6651_7E11_10_0200",
data_set.as_data_slice()[4].id_string()
);
assert_eq!(
"11_0000_7E11_20_0500_0000",
data_set.as_data_slice()[5].id_string()
);
assert_eq!(
"11_0010_7E22_10_0100",
data_set.as_data_slice()[6].id_string()
);
}
#[test]
fn test_id_hash() {
let timestamp = Utc.timestamp(1485688933, 0);
let channel = 0x11;
let mut data_set = DataSet::new();
data_set.timestamp = Utc.timestamp(0, 0);
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[0..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_DATA_1[352..],
));
data_set.add_data(data_from_checked_bytes(
timestamp,
channel,
&LIVE_TELEGRAM_1[0..],
));
let result = id_hash(&data_set);
assert_eq!(13725728793204414233, result);
}
}