1use std::collections::{BTreeSet, HashMap, HashSet};
6
7use crate::{
8 error::table::TableError,
9 tables::{
10 diagnostic::ChatHandleDiagnostic,
11 table::{CHAT_HANDLE_JOIN, CHAT_MESSAGE_JOIN, Cacheable, Table},
12 },
13 util::union_find::UnionFind,
14};
15use rusqlite::{CachedStatement, Connection, Result, Row};
16
17#[derive(Debug)]
20pub struct ChatToHandle {
21 chat_id: i32,
22 handle_id: i32,
23}
24
25impl Table for ChatToHandle {
26 fn from_row(row: &Row) -> Result<ChatToHandle> {
27 Ok(ChatToHandle {
28 chat_id: row.get("chat_id")?,
29 handle_id: row.get("handle_id")?,
30 })
31 }
32
33 fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
34 Ok(db.prepare_cached(&format!("SELECT * FROM {CHAT_HANDLE_JOIN}"))?)
35 }
36}
37
38impl Cacheable for ChatToHandle {
40 type K = i32;
41 type V = BTreeSet<i32>;
42 fn cache(db: &Connection) -> Result<HashMap<Self::K, Self::V>, TableError> {
56 let mut cache: HashMap<i32, BTreeSet<i32>> = HashMap::new();
57
58 let mut rows = ChatToHandle::get(db)?;
59 for mapping in ChatToHandle::rows(&mut rows, [])? {
60 let joiner = mapping?;
61 if let Some(handles) = cache.get_mut(&joiner.chat_id) {
62 handles.insert(joiner.handle_id);
63 } else {
64 let mut data_to_cache = BTreeSet::new();
65 data_to_cache.insert(joiner.handle_id);
66 cache.insert(joiner.chat_id, data_to_cache);
67 }
68 }
69
70 Ok(cache)
71 }
72}
73
74impl ChatToHandle {
76 pub fn run_diagnostic(db: &Connection) -> Result<ChatHandleDiagnostic, TableError> {
93 let mut statement_message_chats =
95 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_MESSAGE_JOIN}"))?;
96 let statement_message_chat_rows =
97 statement_message_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
98 let mut unique_chats_from_messages: HashSet<i32> = HashSet::new();
99 statement_message_chat_rows.into_iter().for_each(|row| {
100 if let Ok(row) = row {
101 unique_chats_from_messages.insert(row);
102 }
103 });
104
105 let mut statement_handle_chats =
107 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_HANDLE_JOIN}"))?;
108 let statement_handle_chat_rows =
109 statement_handle_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
110 let mut unique_chats_from_handles: HashSet<i32> = HashSet::new();
111 statement_handle_chat_rows.into_iter().for_each(|row| {
112 if let Ok(row) = row {
113 unique_chats_from_handles.insert(row);
114 }
115 });
116
117 let all_chats = Self::cache(db)?;
119
120 let chat_handle_lookup = ChatToHandle::get_chat_lookup_map(db)?;
122
123 let real_chatrooms = ChatToHandle::dedupe(&all_chats, &chat_handle_lookup)?;
125
126 let total_duplicated =
128 all_chats.len() - HashSet::<&i32>::from_iter(real_chatrooms.values()).len();
129
130 let chats_with_no_handles = unique_chats_from_messages
132 .difference(&unique_chats_from_handles)
133 .count();
134
135 Ok(ChatHandleDiagnostic {
136 total_chats: all_chats.len(),
137 total_duplicated,
138 chats_with_no_handles,
139 })
140 }
141}
142
143impl ChatToHandle {
144 pub fn get_chat_lookup_map(conn: &Connection) -> Result<HashMap<i32, i32>, TableError> {
161 let mut stmt = conn.prepare(
163 "
164WITH RECURSIVE
165 adj AS (
166 SELECT DISTINCT a.chat AS u, b.chat AS v
167 FROM chat_lookup a
168 JOIN chat_lookup b
169 ON a.identifier = b.identifier
170 ),
171 reach(root, chat) AS (
172 SELECT u AS root, v AS chat FROM adj
173 UNION
174 SELECT r.root, a.v
175 FROM reach r
176 JOIN adj a ON a.u = r.chat
177 ),
178 canon AS (
179 SELECT chat, MAX(root) AS canonical_chat
180 FROM reach
181 GROUP BY chat
182 )
183SELECT chat, canonical_chat
184FROM canon
185ORDER BY chat;
186 ",
187 );
188 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
189
190 if let Ok(statement) = stmt.as_mut() {
191 let chat_lookup_rows = statement.query_map([], |row| {
193 let chat: i32 = row.get(0)?;
194 let canonical: i32 = row.get(1)?;
195 Ok((chat, canonical))
196 });
197
198 if let Ok(chat_lookup_rows) = chat_lookup_rows {
200 for row in chat_lookup_rows {
201 let (chat_id, canonical_chat) = row?;
202 chat_lookup_map.insert(chat_id, canonical_chat);
203 }
204 }
205 }
206 Ok(chat_lookup_map)
207 }
208
209 pub fn dedupe(
231 duplicated_data: &HashMap<i32, BTreeSet<i32>>,
232 chat_lookup_map: &HashMap<i32, i32>,
233 ) -> Result<HashMap<i32, i32>, TableError> {
234 let mut uf = UnionFind::new();
235
236 for chat_id in duplicated_data.keys() {
238 uf.make_set(*chat_id);
239 }
240
241 let mut sorted_lookup: Vec<(&i32, &i32)> = chat_lookup_map.iter().collect();
246 sorted_lookup.sort_by_key(|(chat_id, _)| *chat_id);
247 for (chat_id, canonical) in sorted_lookup {
248 if duplicated_data.contains_key(chat_id) {
249 uf.union(*chat_id, *canonical);
250 }
251 }
252
253 let mut sorted_chats: Vec<(&i32, &BTreeSet<i32>)> = duplicated_data.iter().collect();
256 sorted_chats.sort_by_key(|(id, _)| *id);
257
258 let mut participants_to_chat: HashMap<&BTreeSet<i32>, i32> = HashMap::new();
259 for (chat_id, participants) in &sorted_chats {
260 if let Some(&first_chat) = participants_to_chat.get(participants) {
261 uf.union(**chat_id, first_chat);
262 } else {
263 participants_to_chat.insert(participants, **chat_id);
264 }
265 }
266
267 let mut deduplicated_chats: HashMap<i32, i32> = HashMap::new();
270 let mut representative_to_id: HashMap<i32, i32> = HashMap::new();
271 let mut next_id = 0;
272
273 for (chat_id, _) in &sorted_chats {
274 let rep = uf.find(**chat_id);
275 let dedup_id = *representative_to_id.entry(rep).or_insert_with(|| {
276 let id = next_id;
277 next_id += 1;
278 id
279 });
280 deduplicated_chats.insert(**chat_id, dedup_id);
281 }
282
283 Ok(deduplicated_chats)
284 }
285}
286
287#[cfg(test)]
289mod tests {
290 use crate::tables::chat_handle::ChatToHandle;
291 use std::collections::{BTreeSet, HashMap, HashSet};
292
293 #[test]
294 fn can_dedupe() {
295 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
296 input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([2])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
304 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
305 assert_eq!(expected_deduped_ids.len(), 3);
306 }
307
308 #[test]
309 fn can_dedupe_multi() {
310 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
311 input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2, 1])); input.insert(5, BTreeSet::from([2, 3])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
319 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
320 assert_eq!(expected_deduped_ids.len(), 4);
321 }
322
323 #[test]
324 fn test_same_values() {
326 let mut input_1: HashMap<i32, BTreeSet<i32>> = HashMap::new();
327 input_1.insert(1, BTreeSet::from([1]));
328 input_1.insert(2, BTreeSet::from([1]));
329 input_1.insert(3, BTreeSet::from([1]));
330 input_1.insert(4, BTreeSet::from([2]));
331 input_1.insert(5, BTreeSet::from([2]));
332 input_1.insert(6, BTreeSet::from([3]));
333
334 let mut input_2: HashMap<i32, BTreeSet<i32>> = HashMap::new();
335 input_2.insert(1, BTreeSet::from([1]));
336 input_2.insert(2, BTreeSet::from([1]));
337 input_2.insert(3, BTreeSet::from([1]));
338 input_2.insert(4, BTreeSet::from([2]));
339 input_2.insert(5, BTreeSet::from([2]));
340 input_2.insert(6, BTreeSet::from([3]));
341
342 let mut input_3: HashMap<i32, BTreeSet<i32>> = HashMap::new();
343 input_3.insert(1, BTreeSet::from([1]));
344 input_3.insert(2, BTreeSet::from([1]));
345 input_3.insert(3, BTreeSet::from([1]));
346 input_3.insert(4, BTreeSet::from([2]));
347 input_3.insert(5, BTreeSet::from([2]));
348 input_3.insert(6, BTreeSet::from([3]));
349
350 let mut output_1 = ChatToHandle::dedupe(&input_1, &HashMap::new())
351 .unwrap()
352 .into_iter()
353 .collect::<Vec<(i32, i32)>>();
354 let mut output_2 = ChatToHandle::dedupe(&input_2, &HashMap::new())
355 .unwrap()
356 .into_iter()
357 .collect::<Vec<(i32, i32)>>();
358 let mut output_3 = ChatToHandle::dedupe(&input_3, &HashMap::new())
359 .unwrap()
360 .into_iter()
361 .collect::<Vec<(i32, i32)>>();
362
363 output_1.sort_unstable();
364 output_2.sort_unstable();
365 output_3.sort_unstable();
366
367 assert_eq!(output_1, output_2);
368 assert_eq!(output_1, output_3);
369 assert_eq!(output_2, output_3);
370 }
371
372 #[test]
373 fn test_same_values_with_lookup() {
374 fn build_input() -> HashMap<i32, BTreeSet<i32>> {
377 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
378 input.insert(0, BTreeSet::from([1]));
379 input.insert(1, BTreeSet::from([1]));
380 input.insert(2, BTreeSet::from([3]));
381 input.insert(4, BTreeSet::from([2]));
382 input.insert(5, BTreeSet::from([1]));
383 input
384 }
385
386 fn build_lookup() -> HashMap<i32, i32> {
387 let mut lookup: HashMap<i32, i32> = HashMap::new();
388 lookup.insert(2, 5);
389 lookup.insert(4, 0);
390 lookup
391 }
392
393 let mut output_1 = ChatToHandle::dedupe(&build_input(), &build_lookup())
394 .unwrap()
395 .into_iter()
396 .collect::<Vec<(i32, i32)>>();
397 let mut output_2 = ChatToHandle::dedupe(&build_input(), &build_lookup())
398 .unwrap()
399 .into_iter()
400 .collect::<Vec<(i32, i32)>>();
401 let mut output_3 = ChatToHandle::dedupe(&build_input(), &build_lookup())
402 .unwrap()
403 .into_iter()
404 .collect::<Vec<(i32, i32)>>();
405
406 output_1.sort_unstable();
407 output_2.sort_unstable();
408 output_3.sort_unstable();
409
410 assert_eq!(output_1, output_2);
411 assert_eq!(output_1, output_3);
412 assert_eq!(output_2, output_3);
413 }
414
415 #[test]
416 fn can_dedupe_with_chat_lookup_map() {
417 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
418 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
425 chat_lookup_map.insert(2, 5);
426 chat_lookup_map.insert(4, 0);
427
428 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
429
430 assert_eq!(output.get(&0), output.get(&1));
433 assert_eq!(output.get(&0), output.get(&4));
434 assert_eq!(output.get(&0), output.get(&5));
435 assert_eq!(output.get(&2), output.get(&5));
436 }
437
438 #[test]
439 fn can_dedupe_with_lookup_map_overriding_participants() {
440 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
441 input.insert(0, BTreeSet::from([1, 2])); input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([3, 4])); input.insert(3, BTreeSet::from([1, 2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
447 chat_lookup_map.insert(1, 0);
448 chat_lookup_map.insert(2, 0);
449
450 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
451
452 assert_eq!(output.get(&0), output.get(&1));
454 assert_eq!(output.get(&0), output.get(&2));
455 assert_eq!(output.get(&3), output.get(&0));
457 }
458
459 #[test]
460 fn can_dedupe_mixed_lookup_and_participants() {
461 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
462 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(3, BTreeSet::from([2])); input.insert(4, BTreeSet::from([3])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
469 chat_lookup_map.insert(1, 0);
470 chat_lookup_map.insert(3, 0);
471
472 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
473
474 assert_eq!(output.get(&0), output.get(&1));
476 assert_eq!(output.get(&0), output.get(&3));
477 assert_ne!(output.get(&2), output.get(&1));
479 assert_eq!(output.get(&4), output.get(&2));
481 assert_ne!(output.get(&3), output.get(&4));
483 }
484
485 #[test]
486 fn lookup_merges_when_canonical_has_higher_id() {
487 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
491 input.insert(10, BTreeSet::from([1])); input.insert(20, BTreeSet::from([2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
495 chat_lookup_map.insert(10, 20);
496 chat_lookup_map.insert(20, 20);
497
498 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
499
500 assert_eq!(
502 output.get(&10),
503 output.get(&20),
504 "Chats linked by chat_lookup should merge regardless of ID ordering"
505 );
506 }
507
508 #[test]
509 fn lookup_merge_is_order_independent() {
510 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
513 input.insert(0, BTreeSet::from([1, 2]));
514 input.insert(5, BTreeSet::from([3, 4]));
515
516 let mut lookup_a: HashMap<i32, i32> = HashMap::new();
518 lookup_a.insert(0, 0);
519 lookup_a.insert(5, 0);
520 let output_a = ChatToHandle::dedupe(&input, &lookup_a).unwrap();
521
522 let mut lookup_b: HashMap<i32, i32> = HashMap::new();
524 lookup_b.insert(0, 5);
525 lookup_b.insert(5, 5);
526 let output_b = ChatToHandle::dedupe(&input, &lookup_b).unwrap();
527
528 assert_eq!(
530 output_a.get(&0) == output_a.get(&5),
531 output_b.get(&0) == output_b.get(&5),
532 "Merge result must not depend on which chat ID is canonical"
533 );
534 assert_eq!(
536 output_b.get(&0),
537 output_b.get(&5),
538 "Chats linked by lookup should merge even when canonical is the higher ID"
539 );
540 }
541
542 #[test]
543 fn transitive_merge_across_participants_and_lookup() {
544 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
548 input.insert(0, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
553 chat_lookup_map.insert(2, 5);
554 chat_lookup_map.insert(5, 5);
555
556 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
557
558 assert_eq!(output.get(&0), output.get(&5));
560 assert_eq!(
562 output.get(&2),
563 output.get(&5),
564 "Chat linked by lookup to a chat that merged by participants should join the same group"
565 );
566 }
567
568 #[test]
569 fn multiple_service_splits_all_merge() {
570 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
574 input.insert(100, BTreeSet::from([10])); input.insert(200, BTreeSet::from([20])); input.insert(300, BTreeSet::from([30])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
579 chat_lookup_map.insert(100, 300);
580 chat_lookup_map.insert(200, 300);
581 chat_lookup_map.insert(300, 300);
582
583 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
584
585 let unique_ids: HashSet<i32> = output.values().copied().collect();
586 assert_eq!(
587 unique_ids.len(),
588 1,
589 "All three service-split chats should merge into one conversation, got {:?}",
590 output
591 );
592 }
593
594 #[test]
595 fn lookup_merges_through_missing_canonical() {
596 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
600 input.insert(10, BTreeSet::from([1]));
601 input.insert(20, BTreeSet::from([2]));
602
603 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
604 chat_lookup_map.insert(10, 99);
605 chat_lookup_map.insert(20, 99);
606
607 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
608
609 assert_eq!(
611 output.get(&10),
612 output.get(&20),
613 "Chats sharing a canonical absent from duplicated_data should still merge"
614 );
615 assert_eq!(output.len(), 2);
617 assert!(!output.contains_key(&99));
618 }
619}