use std::collections::{BTreeSet, HashMap, HashSet};
use crate::{
error::table::TableError,
tables::{
diagnostic::ChatHandleDiagnostic,
table::{CHAT_HANDLE_JOIN, CHAT_MESSAGE_JOIN, Cacheable, Table},
},
util::union_find::UnionFind,
};
use rusqlite::{CachedStatement, Connection, Result, Row};
#[derive(Debug)]
pub struct ChatToHandle {
chat_id: i32,
handle_id: i32,
}
impl Table for ChatToHandle {
fn from_row(row: &Row) -> Result<ChatToHandle> {
Ok(ChatToHandle {
chat_id: row.get("chat_id")?,
handle_id: row.get("handle_id")?,
})
}
fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
Ok(db.prepare_cached(&format!("SELECT * FROM {CHAT_HANDLE_JOIN}"))?)
}
}
impl Cacheable for ChatToHandle {
type K = i32;
type V = BTreeSet<i32>;
fn cache(db: &Connection) -> Result<HashMap<Self::K, Self::V>, TableError> {
let mut cache: HashMap<i32, BTreeSet<i32>> = HashMap::new();
let mut rows = ChatToHandle::get(db)?;
let mappings = rows.query_map([], |row| Ok(ChatToHandle::from_row(row)))?;
for mapping in mappings {
let joiner = ChatToHandle::extract(mapping)?;
if let Some(handles) = cache.get_mut(&joiner.chat_id) {
handles.insert(joiner.handle_id);
} else {
let mut data_to_cache = BTreeSet::new();
data_to_cache.insert(joiner.handle_id);
cache.insert(joiner.chat_id, data_to_cache);
}
}
Ok(cache)
}
}
impl ChatToHandle {
pub fn run_diagnostic(db: &Connection) -> Result<ChatHandleDiagnostic, TableError> {
let mut statement_message_chats =
db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_MESSAGE_JOIN}"))?;
let statement_message_chat_rows =
statement_message_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
let mut unique_chats_from_messages: HashSet<i32> = HashSet::new();
statement_message_chat_rows.into_iter().for_each(|row| {
if let Ok(row) = row {
unique_chats_from_messages.insert(row);
}
});
let mut statement_handle_chats =
db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_HANDLE_JOIN}"))?;
let statement_handle_chat_rows =
statement_handle_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
let mut unique_chats_from_handles: HashSet<i32> = HashSet::new();
statement_handle_chat_rows.into_iter().for_each(|row| {
if let Ok(row) = row {
unique_chats_from_handles.insert(row);
}
});
let all_chats = Self::cache(db)?;
let chat_handle_lookup = ChatToHandle::get_chat_lookup_map(db)?;
let real_chatrooms = ChatToHandle::dedupe(&all_chats, &chat_handle_lookup)?;
let total_duplicated =
all_chats.len() - HashSet::<&i32>::from_iter(real_chatrooms.values()).len();
let chats_with_no_handles = unique_chats_from_messages
.difference(&unique_chats_from_handles)
.count();
Ok(ChatHandleDiagnostic {
total_chats: all_chats.len(),
total_duplicated,
chats_with_no_handles,
})
}
}
impl ChatToHandle {
pub fn get_chat_lookup_map(conn: &Connection) -> Result<HashMap<i32, i32>, TableError> {
let mut stmt = conn.prepare(
"
WITH RECURSIVE
adj AS (
SELECT DISTINCT a.chat AS u, b.chat AS v
FROM chat_lookup a
JOIN chat_lookup b
ON a.identifier = b.identifier
),
reach(root, chat) AS (
SELECT u AS root, v AS chat FROM adj
UNION
SELECT r.root, a.v
FROM reach r
JOIN adj a ON a.u = r.chat
),
canon AS (
SELECT chat, MAX(root) AS canonical_chat
FROM reach
GROUP BY chat
)
SELECT chat, canonical_chat
FROM canon
ORDER BY chat;
",
);
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
if let Ok(statement) = stmt.as_mut() {
let chat_lookup_rows = statement.query_map([], |row| {
let chat: i32 = row.get(0)?;
let canonical: i32 = row.get(1)?;
Ok((chat, canonical))
});
if let Ok(chat_lookup_rows) = chat_lookup_rows {
for row in chat_lookup_rows {
let (chat_id, canonical_chat) = row?;
chat_lookup_map.insert(chat_id, canonical_chat);
}
}
}
Ok(chat_lookup_map)
}
pub fn dedupe(
duplicated_data: &HashMap<i32, BTreeSet<i32>>,
chat_lookup_map: &HashMap<i32, i32>,
) -> Result<HashMap<i32, i32>, TableError> {
let mut uf = UnionFind::new();
for chat_id in duplicated_data.keys() {
uf.make_set(*chat_id);
}
let mut sorted_lookup: Vec<(&i32, &i32)> = chat_lookup_map.iter().collect();
sorted_lookup.sort_by_key(|(chat_id, _)| *chat_id);
for (chat_id, canonical) in sorted_lookup {
if duplicated_data.contains_key(chat_id) {
uf.union(*chat_id, *canonical);
}
}
let mut sorted_chats: Vec<(&i32, &BTreeSet<i32>)> = duplicated_data.iter().collect();
sorted_chats.sort_by_key(|(id, _)| *id);
let mut participants_to_chat: HashMap<&BTreeSet<i32>, i32> = HashMap::new();
for (chat_id, participants) in &sorted_chats {
if let Some(&first_chat) = participants_to_chat.get(participants) {
uf.union(**chat_id, first_chat);
} else {
participants_to_chat.insert(participants, **chat_id);
}
}
let mut deduplicated_chats: HashMap<i32, i32> = HashMap::new();
let mut representative_to_id: HashMap<i32, i32> = HashMap::new();
let mut next_id = 0;
for (chat_id, _) in &sorted_chats {
let rep = uf.find(**chat_id);
let dedup_id = *representative_to_id.entry(rep).or_insert_with(|| {
let id = next_id;
next_id += 1;
id
});
deduplicated_chats.insert(**chat_id, dedup_id);
}
Ok(deduplicated_chats)
}
}
#[cfg(test)]
mod tests {
use crate::tables::chat_handle::ChatToHandle;
use std::collections::{BTreeSet, HashMap, HashSet};
#[test]
fn can_dedupe() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([2])); input.insert(6, BTreeSet::from([3]));
let output = ChatToHandle::dedupe(&input, &HashMap::new());
let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
assert_eq!(expected_deduped_ids.len(), 3);
}
#[test]
fn can_dedupe_multi() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2, 1])); input.insert(5, BTreeSet::from([2, 3])); input.insert(6, BTreeSet::from([3]));
let output = ChatToHandle::dedupe(&input, &HashMap::new());
let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
assert_eq!(expected_deduped_ids.len(), 4);
}
#[test]
fn test_same_values() {
let mut input_1: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input_1.insert(1, BTreeSet::from([1]));
input_1.insert(2, BTreeSet::from([1]));
input_1.insert(3, BTreeSet::from([1]));
input_1.insert(4, BTreeSet::from([2]));
input_1.insert(5, BTreeSet::from([2]));
input_1.insert(6, BTreeSet::from([3]));
let mut input_2: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input_2.insert(1, BTreeSet::from([1]));
input_2.insert(2, BTreeSet::from([1]));
input_2.insert(3, BTreeSet::from([1]));
input_2.insert(4, BTreeSet::from([2]));
input_2.insert(5, BTreeSet::from([2]));
input_2.insert(6, BTreeSet::from([3]));
let mut input_3: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input_3.insert(1, BTreeSet::from([1]));
input_3.insert(2, BTreeSet::from([1]));
input_3.insert(3, BTreeSet::from([1]));
input_3.insert(4, BTreeSet::from([2]));
input_3.insert(5, BTreeSet::from([2]));
input_3.insert(6, BTreeSet::from([3]));
let mut output_1 = ChatToHandle::dedupe(&input_1, &HashMap::new())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
let mut output_2 = ChatToHandle::dedupe(&input_2, &HashMap::new())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
let mut output_3 = ChatToHandle::dedupe(&input_3, &HashMap::new())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
output_1.sort_unstable();
output_2.sort_unstable();
output_3.sort_unstable();
assert_eq!(output_1, output_2);
assert_eq!(output_1, output_3);
assert_eq!(output_2, output_3);
}
#[test]
fn test_same_values_with_lookup() {
fn build_input() -> HashMap<i32, BTreeSet<i32>> {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1]));
input.insert(1, BTreeSet::from([1]));
input.insert(2, BTreeSet::from([3]));
input.insert(4, BTreeSet::from([2]));
input.insert(5, BTreeSet::from([1]));
input
}
fn build_lookup() -> HashMap<i32, i32> {
let mut lookup: HashMap<i32, i32> = HashMap::new();
lookup.insert(2, 5);
lookup.insert(4, 0);
lookup
}
let mut output_1 = ChatToHandle::dedupe(&build_input(), &build_lookup())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
let mut output_2 = ChatToHandle::dedupe(&build_input(), &build_lookup())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
let mut output_3 = ChatToHandle::dedupe(&build_input(), &build_lookup())
.unwrap()
.into_iter()
.collect::<Vec<(i32, i32)>>();
output_1.sort_unstable();
output_2.sort_unstable();
output_3.sort_unstable();
assert_eq!(output_1, output_2);
assert_eq!(output_1, output_3);
assert_eq!(output_2, output_3);
}
#[test]
fn can_dedupe_with_chat_lookup_map() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([1]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(2, 5);
chat_lookup_map.insert(4, 0);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(output.get(&0), output.get(&1));
assert_eq!(output.get(&0), output.get(&4));
assert_eq!(output.get(&0), output.get(&5));
assert_eq!(output.get(&2), output.get(&5));
}
#[test]
fn can_dedupe_with_lookup_map_overriding_participants() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1, 2])); input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([3, 4])); input.insert(3, BTreeSet::from([1, 2]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(1, 0);
chat_lookup_map.insert(2, 0);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(output.get(&0), output.get(&1));
assert_eq!(output.get(&0), output.get(&2));
assert_eq!(output.get(&3), output.get(&0));
}
#[test]
fn can_dedupe_mixed_lookup_and_participants() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(3, BTreeSet::from([2])); input.insert(4, BTreeSet::from([3]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(1, 0);
chat_lookup_map.insert(3, 0);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(output.get(&0), output.get(&1));
assert_eq!(output.get(&0), output.get(&3));
assert_ne!(output.get(&2), output.get(&1));
assert_eq!(output.get(&4), output.get(&2));
assert_ne!(output.get(&3), output.get(&4));
}
#[test]
fn lookup_merges_when_canonical_has_higher_id() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(10, BTreeSet::from([1])); input.insert(20, BTreeSet::from([2]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(10, 20);
chat_lookup_map.insert(20, 20);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(
output.get(&10),
output.get(&20),
"Chats linked by chat_lookup should merge regardless of ID ordering"
);
}
#[test]
fn lookup_merge_is_order_independent() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1, 2]));
input.insert(5, BTreeSet::from([3, 4]));
let mut lookup_a: HashMap<i32, i32> = HashMap::new();
lookup_a.insert(0, 0);
lookup_a.insert(5, 0);
let output_a = ChatToHandle::dedupe(&input, &lookup_a).unwrap();
let mut lookup_b: HashMap<i32, i32> = HashMap::new();
lookup_b.insert(0, 5);
lookup_b.insert(5, 5);
let output_b = ChatToHandle::dedupe(&input, &lookup_b).unwrap();
assert_eq!(
output_a.get(&0) == output_a.get(&5),
output_b.get(&0) == output_b.get(&5),
"Merge result must not depend on which chat ID is canonical"
);
assert_eq!(
output_b.get(&0),
output_b.get(&5),
"Chats linked by lookup should merge even when canonical is the higher ID"
);
}
#[test]
fn transitive_merge_across_participants_and_lookup() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(0, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(5, BTreeSet::from([1]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(2, 5);
chat_lookup_map.insert(5, 5);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(output.get(&0), output.get(&5));
assert_eq!(
output.get(&2),
output.get(&5),
"Chat linked by lookup to a chat that merged by participants should join the same group"
);
}
#[test]
fn multiple_service_splits_all_merge() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(100, BTreeSet::from([10])); input.insert(200, BTreeSet::from([20])); input.insert(300, BTreeSet::from([30]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(100, 300);
chat_lookup_map.insert(200, 300);
chat_lookup_map.insert(300, 300);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
let unique_ids: HashSet<i32> = output.values().copied().collect();
assert_eq!(
unique_ids.len(),
1,
"All three service-split chats should merge into one conversation, got {:?}",
output
);
}
#[test]
fn lookup_merges_through_missing_canonical() {
let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
input.insert(10, BTreeSet::from([1]));
input.insert(20, BTreeSet::from([2]));
let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
chat_lookup_map.insert(10, 99);
chat_lookup_map.insert(20, 99);
let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
assert_eq!(
output.get(&10),
output.get(&20),
"Chats sharing a canonical absent from duplicated_data should still merge"
);
assert_eq!(output.len(), 2);
assert!(!output.contains_key(&99));
}
}