use rs1090::prelude::*;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};
use std::time::SystemTime;
use tokio::sync::mpsc;
use tracing::info;
pub async fn deduplicate_messages(
mut rx: mpsc::Receiver<TimedMessage>,
tx: mpsc::Sender<TimedMessage>,
dedup_threshold: u32,
reorder_window: u32,
) {
let mut cache: HashMap<Vec<u8>, Vec<TimedMessage>> = HashMap::new();
let mut expiration_heap: BinaryHeap<Reverse<(u128, Vec<u8>)>> =
BinaryHeap::new();
let mut emission_buffer: Vec<TimedMessage> = Vec::new();
let reorder_window_enabled = reorder_window > 0;
while let Some(msg) = rx.recv().await {
let timestamp_ms = (msg.timestamp * 1e3) as u128;
let frame = msg.frame.clone();
cache.entry(frame.clone()).or_default().push(msg);
if cache[&frame].len() == 1 {
expiration_heap.push(Reverse((
timestamp_ms + dedup_threshold as u128,
frame.clone(),
)));
}
while let Some(Reverse((next_expiration, _))) = expiration_heap.peek() {
let next_expiration = *next_expiration;
if next_expiration > timestamp_ms {
break;
}
let Reverse((_, frame)) = expiration_heap.pop().unwrap();
if let Some(mut entries) = cache.remove(&frame) {
entries.sort_by(|a, b| {
a.timestamp.partial_cmp(&b.timestamp).unwrap()
});
let merged_metadata: Vec<SensorMetadata> = entries
.iter()
.flat_map(|entry| entry.metadata.clone())
.collect();
let mut tmsg = entries.remove(0);
tmsg.metadata = merged_metadata;
let start = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime before unix epoch")
.as_secs_f64();
if let Ok((_, msg)) = Message::from_bytes((&tmsg.frame, 0)) {
tmsg.decode_time = Some(
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime before unix epoch")
.as_secs_f64()
- start,
);
tmsg.message = Some(msg);
if reorder_window_enabled {
emission_buffer.push(tmsg);
} else {
if let Err(e) = tx.send(tmsg).await {
info!("Failed to send deduplicated entries: {}", e);
}
}
}
}
}
if reorder_window_enabled && !emission_buffer.is_empty() {
emission_buffer
.sort_by(|a, b| a.timestamp.partial_cmp(&b.timestamp).unwrap());
while !emission_buffer.is_empty() {
let msg_timestamp_ms =
(emission_buffer[0].timestamp * 1e3) as u128;
if timestamp_ms - msg_timestamp_ms > reorder_window as u128 {
let tmsg = emission_buffer.remove(0);
if let Err(e) = tx.send(tmsg).await {
info!("Failed to send reordered message: {}", e);
}
} else {
break;
}
}
}
}
if reorder_window_enabled && !emission_buffer.is_empty() {
emission_buffer
.sort_by(|a, b| a.timestamp.partial_cmp(&b.timestamp).unwrap());
for tmsg in emission_buffer {
if let Err(e) = tx.send(tmsg).await {
info!("Failed to send final buffered message: {}", e);
}
}
}
}