use crate::core::types::RawEntry;
use std::collections::HashMap;
pub(crate) trait Deduplicatable {
fn timestamp_ms(&self) -> i64;
fn has_stop_reason(&self) -> bool;
fn message_id(&self) -> Option<&str>;
fn dedup_scope(&self) -> Option<&str> {
None
}
}
impl Deduplicatable for RawEntry {
fn timestamp_ms(&self) -> i64 {
self.timestamp_ms
}
fn has_stop_reason(&self) -> bool {
self.stop_reason.is_some()
}
fn message_id(&self) -> Option<&str> {
self.message_id.as_deref()
}
fn dedup_scope(&self) -> Option<&str> {
Some(&self.session_key)
}
}
#[derive(Debug)]
struct CandidateState<T: Deduplicatable> {
best_completed: Option<T>,
latest: T,
}
impl<T: Deduplicatable> CandidateState<T> {
fn new(entry: T) -> Self {
Self {
best_completed: None,
latest: entry,
}
}
fn best_completed_ts(&self) -> Option<i64> {
let latest_completed_ts = self
.latest
.has_stop_reason()
.then_some(self.latest.timestamp_ms());
match (&self.best_completed, latest_completed_ts) {
(Some(entry), Some(ts)) => Some(entry.timestamp_ms().max(ts)),
(Some(entry), None) => Some(entry.timestamp_ms()),
(None, Some(ts)) => Some(ts),
(None, None) => None,
}
}
fn replace_best_completed_if_newer(&mut self, entry: T) {
let entry_ts = entry.timestamp_ms();
let should_replace = match self.best_completed_ts() {
Some(best_ts) => entry_ts > best_ts,
None => true,
};
if should_replace {
self.best_completed = Some(entry);
}
}
fn update(&mut self, entry: T) {
let entry_ts = entry.timestamp_ms();
if entry_ts > self.latest.timestamp_ms() {
if self.latest.has_stop_reason() {
let old_latest = std::mem::replace(&mut self.latest, entry);
self.replace_best_completed_if_newer(old_latest);
} else {
self.latest = entry;
}
return;
}
if entry.has_stop_reason() {
self.replace_best_completed_if_newer(entry);
}
}
fn merge(&mut self, other: CandidateState<T>) {
let CandidateState {
best_completed,
latest,
} = other;
if let Some(entry) = best_completed {
self.update(entry);
}
self.update(latest);
}
fn finalize(self) -> T {
match self.best_completed {
Some(best)
if !self.latest.has_stop_reason()
|| best.timestamp_ms() > self.latest.timestamp_ms() =>
{
best
}
_ => self.latest,
}
}
}
#[derive(Debug)]
pub(crate) struct DedupAccumulator<T: Deduplicatable> {
message_map: HashMap<(String, String), CandidateState<T>>,
no_id_entries: Vec<T>,
total_with_id: i64,
}
impl<T: Deduplicatable> Default for DedupAccumulator<T> {
fn default() -> Self {
Self {
message_map: HashMap::new(),
no_id_entries: Vec::new(),
total_with_id: 0,
}
}
}
impl<T: Deduplicatable> DedupAccumulator<T> {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn push(&mut self, entry: T) {
if let Some(id) = entry.message_id() {
self.total_with_id += 1;
let key = (
entry.dedup_scope().unwrap_or_default().to_string(),
id.to_string(),
);
match self.message_map.get_mut(&key) {
Some(state) => state.update(entry),
None => {
self.message_map.insert(key, CandidateState::new(entry));
}
}
} else if entry.has_stop_reason() {
self.no_id_entries.push(entry);
}
}
pub(crate) fn extend<I>(&mut self, entries: I)
where
I: IntoIterator<Item = T>,
{
for entry in entries {
self.push(entry);
}
}
pub(crate) fn merge(&mut self, other: DedupAccumulator<T>) {
self.total_with_id += other.total_with_id;
self.no_id_entries.extend(other.no_id_entries);
for (key, state) in other.message_map {
match self.message_map.get_mut(&key) {
Some(existing) => existing.merge(state),
None => {
self.message_map.insert(key, state);
}
}
}
}
pub(crate) fn finalize(self) -> (Vec<T>, i64) {
let unique_count = self.message_map.len() as i64;
let skipped = (self.total_with_id - unique_count).max(0);
let mut result: Vec<T> = self
.message_map
.into_values()
.map(CandidateState::finalize)
.collect();
result.extend(self.no_id_entries);
(result, skipped)
}
}
#[cfg(test)]
pub(crate) fn deduplicate<T, I>(entries: I) -> (Vec<T>, i64)
where
T: Deduplicatable,
I: IntoIterator<Item = T>,
{
let mut accumulator = DedupAccumulator::new();
accumulator.extend(entries);
accumulator.finalize()
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone)]
struct TestEntry {
id: Option<String>,
ts: i64,
stop: bool,
value: i32,
}
impl Deduplicatable for TestEntry {
fn timestamp_ms(&self) -> i64 {
self.ts
}
fn has_stop_reason(&self) -> bool {
self.stop
}
fn message_id(&self) -> Option<&str> {
self.id.as_deref()
}
}
#[test]
fn test_deduplicate_keeps_completed() {
let entries = vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 200,
stop: true,
value: 2,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 300,
stop: false,
value: 3,
},
];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 2); assert_eq!(skipped, 2);
}
#[test]
fn test_deduplicate_same_timestamp_completed_wins() {
let entries = vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: true,
value: 2,
},
];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 2);
assert_eq!(skipped, 1);
}
#[test]
fn test_deduplicate_fallback_to_latest() {
let entries = vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 200,
stop: false,
value: 2,
},
];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 2); assert_eq!(skipped, 1);
}
#[test]
fn test_deduplicate_no_id_with_stop() {
let entries = vec![
TestEntry {
id: None,
ts: 100,
stop: true,
value: 1,
},
TestEntry {
id: None,
ts: 200,
stop: false,
value: 2,
}, ];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 1);
assert_eq!(skipped, 0);
}
#[test]
fn test_deduplicate_empty_input() {
let entries: Vec<TestEntry> = vec![];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 0);
assert_eq!(skipped, 0);
}
#[test]
fn test_deduplicate_single_entry() {
let entries = vec![TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: true,
value: 1,
}];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 1);
assert_eq!(skipped, 0);
}
#[test]
fn test_deduplicate_all_duplicates_all_completed() {
let entries = vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: true,
value: 1,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 300,
stop: true,
value: 3,
},
TestEntry {
id: Some("msg1".to_string()),
ts: 200,
stop: true,
value: 2,
},
];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, 3); assert_eq!(skipped, 2);
}
#[test]
fn test_deduplicate_multiple_distinct_ids() {
let entries = vec![
TestEntry {
id: Some("a".to_string()),
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: Some("b".to_string()),
ts: 200,
stop: true,
value: 2,
},
TestEntry {
id: Some("a".to_string()),
ts: 300,
stop: true,
value: 3,
},
TestEntry {
id: Some("c".to_string()),
ts: 400,
stop: false,
value: 4,
},
];
let (mut result, skipped) = deduplicate(entries);
result.sort_by_key(|e| e.value);
assert_eq!(result.len(), 3); assert_eq!(result[0].value, 2); assert_eq!(result[1].value, 3); assert_eq!(result[2].value, 4); assert_eq!(skipped, 1);
}
#[test]
fn test_deduplicate_no_id_without_stop_dropped() {
let entries = vec![
TestEntry {
id: None,
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: None,
ts: 200,
stop: false,
value: 2,
},
];
let (result, skipped) = deduplicate(entries);
assert_eq!(result.len(), 0);
assert_eq!(skipped, 0);
}
#[test]
fn test_deduplicate_mixed_id_and_no_id() {
let entries = vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: true,
value: 1,
},
TestEntry {
id: None,
ts: 200,
stop: true,
value: 2,
},
TestEntry {
id: None,
ts: 300,
stop: false,
value: 3,
},
];
let (mut result, skipped) = deduplicate(entries);
result.sort_by_key(|e| e.value);
assert_eq!(result.len(), 2); assert_eq!(result[0].value, 1);
assert_eq!(result[1].value, 2);
assert_eq!(skipped, 0);
}
#[test]
fn test_dedup_accumulator_merge() {
let mut left = DedupAccumulator::new();
left.extend(vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 100,
stop: false,
value: 1,
},
TestEntry {
id: Some("msg2".to_string()),
ts: 100,
stop: true,
value: 10,
},
]);
let mut right = DedupAccumulator::new();
right.extend(vec![
TestEntry {
id: Some("msg1".to_string()),
ts: 200,
stop: true,
value: 2,
},
TestEntry {
id: Some("msg2".to_string()),
ts: 120,
stop: false,
value: 11,
},
]);
left.merge(right);
let (mut result, skipped) = left.finalize();
result.sort_by_key(|entry| entry.value);
assert_eq!(result.len(), 2);
assert_eq!(result[0].value, 2); assert_eq!(result[1].value, 10); assert_eq!(skipped, 2);
}
}