1use std::collections::{BTreeSet, HashMap, HashSet};
6
7use crate::{
8 error::table::TableError,
9 tables::{
10 diagnostic::ChatHandleDiagnostic,
11 table::{CHAT_HANDLE_JOIN, CHAT_MESSAGE_JOIN, Cacheable, Table},
12 },
13 util::union_find::UnionFind,
14};
15use rusqlite::{CachedStatement, Connection, Result, Row};
16
17#[derive(Debug)]
20pub struct ChatToHandle {
21 chat_id: i32,
22 handle_id: i32,
23}
24
25impl Table for ChatToHandle {
26 fn from_row(row: &Row) -> Result<ChatToHandle> {
27 Ok(ChatToHandle {
28 chat_id: row.get("chat_id")?,
29 handle_id: row.get("handle_id")?,
30 })
31 }
32
33 fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
34 Ok(db.prepare_cached(&format!("SELECT * FROM {CHAT_HANDLE_JOIN}"))?)
35 }
36}
37
38impl Cacheable for ChatToHandle {
40 type K = i32;
41 type V = BTreeSet<i32>;
42 fn cache(db: &Connection) -> Result<HashMap<Self::K, Self::V>, TableError> {
56 let mut cache: HashMap<i32, BTreeSet<i32>> = HashMap::new();
57
58 let mut rows = ChatToHandle::get(db)?;
59 for mapping in ChatToHandle::rows(&mut rows, [])? {
60 let joiner = mapping?;
61 if let Some(handles) = cache.get_mut(&joiner.chat_id) {
62 handles.insert(joiner.handle_id);
63 } else {
64 let mut data_to_cache = BTreeSet::new();
65 data_to_cache.insert(joiner.handle_id);
66 cache.insert(joiner.chat_id, data_to_cache);
67 }
68 }
69
70 Ok(cache)
71 }
72}
73
74impl ChatToHandle {
76 pub fn run_diagnostic(db: &Connection) -> Result<ChatHandleDiagnostic, TableError> {
92 let mut statement_message_chats =
94 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_MESSAGE_JOIN}"))?;
95 let statement_message_chat_rows =
96 statement_message_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
97 let mut unique_chats_from_messages: HashSet<i32> = HashSet::new();
98 statement_message_chat_rows.into_iter().for_each(|row| {
99 if let Ok(row) = row {
100 unique_chats_from_messages.insert(row);
101 }
102 });
103
104 let mut statement_handle_chats =
106 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_HANDLE_JOIN}"))?;
107 let statement_handle_chat_rows =
108 statement_handle_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
109 let mut unique_chats_from_handles: HashSet<i32> = HashSet::new();
110 statement_handle_chat_rows.into_iter().for_each(|row| {
111 if let Ok(row) = row {
112 unique_chats_from_handles.insert(row);
113 }
114 });
115
116 let all_chats = Self::cache(db)?;
118
119 let chat_handle_lookup = ChatToHandle::get_chat_lookup_map(db)?;
121
122 let real_chatrooms = ChatToHandle::dedupe(&all_chats, &chat_handle_lookup)?;
124
125 let total_duplicated =
127 all_chats.len() - HashSet::<&i32>::from_iter(real_chatrooms.values()).len();
128
129 let chats_with_no_handles = unique_chats_from_messages
131 .difference(&unique_chats_from_handles)
132 .count();
133
134 Ok(ChatHandleDiagnostic {
135 total_chats: all_chats.len(),
136 total_duplicated,
137 chats_with_no_handles,
138 })
139 }
140}
141
142impl ChatToHandle {
143 pub fn get_chat_lookup_map(conn: &Connection) -> Result<HashMap<i32, i32>, TableError> {
160 let mut stmt = conn.prepare(
162 "
163WITH RECURSIVE
164 adj AS (
165 SELECT DISTINCT a.chat AS u, b.chat AS v
166 FROM chat_lookup a
167 JOIN chat_lookup b
168 ON a.identifier = b.identifier
169 ),
170 reach(root, chat) AS (
171 SELECT u AS root, v AS chat FROM adj
172 UNION
173 SELECT r.root, a.v
174 FROM reach r
175 JOIN adj a ON a.u = r.chat
176 ),
177 canon AS (
178 SELECT chat, MAX(root) AS canonical_chat
179 FROM reach
180 GROUP BY chat
181 )
182SELECT chat, canonical_chat
183FROM canon
184ORDER BY chat;
185 ",
186 );
187 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
188
189 if let Ok(statement) = stmt.as_mut() {
190 let chat_lookup_rows = statement.query_map([], |row| {
192 let chat: i32 = row.get(0)?;
193 let canonical: i32 = row.get(1)?;
194 Ok((chat, canonical))
195 });
196
197 if let Ok(chat_lookup_rows) = chat_lookup_rows {
199 for row in chat_lookup_rows {
200 let (chat_id, canonical_chat) = row?;
201 chat_lookup_map.insert(chat_id, canonical_chat);
202 }
203 }
204 }
205 Ok(chat_lookup_map)
206 }
207
208 pub fn dedupe(
230 duplicated_data: &HashMap<i32, BTreeSet<i32>>,
231 chat_lookup_map: &HashMap<i32, i32>,
232 ) -> Result<HashMap<i32, i32>, TableError> {
233 let mut uf = UnionFind::new();
234
235 for chat_id in duplicated_data.keys() {
236 uf.make_set(*chat_id);
237 }
238
239 let mut sorted_lookup: Vec<(&i32, &i32)> = chat_lookup_map.iter().collect();
244 sorted_lookup.sort_by_key(|(chat_id, _)| *chat_id);
245 for (chat_id, canonical) in sorted_lookup {
246 if duplicated_data.contains_key(chat_id) {
247 uf.union(*chat_id, *canonical);
248 }
249 }
250
251 let mut sorted_chats: Vec<(&i32, &BTreeSet<i32>)> = duplicated_data.iter().collect();
254 sorted_chats.sort_by_key(|(id, _)| *id);
255
256 let mut participants_to_chat: HashMap<&BTreeSet<i32>, i32> = HashMap::new();
257 for (chat_id, participants) in &sorted_chats {
258 if let Some(&first_chat) = participants_to_chat.get(participants) {
259 uf.union(**chat_id, first_chat);
260 } else {
261 participants_to_chat.insert(participants, **chat_id);
262 }
263 }
264
265 let mut deduplicated_chats: HashMap<i32, i32> = HashMap::new();
268 let mut representative_to_id: HashMap<i32, i32> = HashMap::new();
269 let mut next_id = 0;
270
271 for (chat_id, _) in &sorted_chats {
272 let rep = uf.find(**chat_id);
273 let dedup_id = *representative_to_id.entry(rep).or_insert_with(|| {
274 let id = next_id;
275 next_id += 1;
276 id
277 });
278 deduplicated_chats.insert(**chat_id, dedup_id);
279 }
280
281 Ok(deduplicated_chats)
282 }
283}
284
285#[cfg(test)]
287mod tests {
288 use crate::tables::chat_handle::ChatToHandle;
289 use std::collections::{BTreeSet, HashMap, HashSet};
290
291 #[test]
292 fn can_dedupe() {
293 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
294 input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([2])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
302 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
303 assert_eq!(expected_deduped_ids.len(), 3);
304 }
305
306 #[test]
307 fn can_dedupe_multi() {
308 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
309 input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2, 1])); input.insert(5, BTreeSet::from([2, 3])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
317 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
318 assert_eq!(expected_deduped_ids.len(), 4);
319 }
320
321 #[test]
322 fn test_same_values() {
324 let mut input_1: HashMap<i32, BTreeSet<i32>> = HashMap::new();
325 input_1.insert(1, BTreeSet::from([1]));
326 input_1.insert(2, BTreeSet::from([1]));
327 input_1.insert(3, BTreeSet::from([1]));
328 input_1.insert(4, BTreeSet::from([2]));
329 input_1.insert(5, BTreeSet::from([2]));
330 input_1.insert(6, BTreeSet::from([3]));
331
332 let mut input_2: HashMap<i32, BTreeSet<i32>> = HashMap::new();
333 input_2.insert(1, BTreeSet::from([1]));
334 input_2.insert(2, BTreeSet::from([1]));
335 input_2.insert(3, BTreeSet::from([1]));
336 input_2.insert(4, BTreeSet::from([2]));
337 input_2.insert(5, BTreeSet::from([2]));
338 input_2.insert(6, BTreeSet::from([3]));
339
340 let mut input_3: HashMap<i32, BTreeSet<i32>> = HashMap::new();
341 input_3.insert(1, BTreeSet::from([1]));
342 input_3.insert(2, BTreeSet::from([1]));
343 input_3.insert(3, BTreeSet::from([1]));
344 input_3.insert(4, BTreeSet::from([2]));
345 input_3.insert(5, BTreeSet::from([2]));
346 input_3.insert(6, BTreeSet::from([3]));
347
348 let mut output_1 = ChatToHandle::dedupe(&input_1, &HashMap::new())
349 .unwrap()
350 .into_iter()
351 .collect::<Vec<(i32, i32)>>();
352 let mut output_2 = ChatToHandle::dedupe(&input_2, &HashMap::new())
353 .unwrap()
354 .into_iter()
355 .collect::<Vec<(i32, i32)>>();
356 let mut output_3 = ChatToHandle::dedupe(&input_3, &HashMap::new())
357 .unwrap()
358 .into_iter()
359 .collect::<Vec<(i32, i32)>>();
360
361 output_1.sort_unstable();
362 output_2.sort_unstable();
363 output_3.sort_unstable();
364
365 assert_eq!(output_1, output_2);
366 assert_eq!(output_1, output_3);
367 assert_eq!(output_2, output_3);
368 }
369
370 #[test]
371 fn test_same_values_with_lookup() {
372 fn build_input() -> HashMap<i32, BTreeSet<i32>> {
375 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
376 input.insert(0, BTreeSet::from([1]));
377 input.insert(1, BTreeSet::from([1]));
378 input.insert(2, BTreeSet::from([3]));
379 input.insert(4, BTreeSet::from([2]));
380 input.insert(5, BTreeSet::from([1]));
381 input
382 }
383
384 fn build_lookup() -> HashMap<i32, i32> {
385 let mut lookup: HashMap<i32, i32> = HashMap::new();
386 lookup.insert(2, 5);
387 lookup.insert(4, 0);
388 lookup
389 }
390
391 let mut output_1 = ChatToHandle::dedupe(&build_input(), &build_lookup())
392 .unwrap()
393 .into_iter()
394 .collect::<Vec<(i32, i32)>>();
395 let mut output_2 = ChatToHandle::dedupe(&build_input(), &build_lookup())
396 .unwrap()
397 .into_iter()
398 .collect::<Vec<(i32, i32)>>();
399 let mut output_3 = ChatToHandle::dedupe(&build_input(), &build_lookup())
400 .unwrap()
401 .into_iter()
402 .collect::<Vec<(i32, i32)>>();
403
404 output_1.sort_unstable();
405 output_2.sort_unstable();
406 output_3.sort_unstable();
407
408 assert_eq!(output_1, output_2);
409 assert_eq!(output_1, output_3);
410 assert_eq!(output_2, output_3);
411 }
412
413 #[test]
414 fn can_dedupe_with_chat_lookup_map() {
415 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
416 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
423 chat_lookup_map.insert(2, 5);
424 chat_lookup_map.insert(4, 0);
425
426 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
427
428 assert_eq!(output.get(&0), output.get(&1));
431 assert_eq!(output.get(&0), output.get(&4));
432 assert_eq!(output.get(&0), output.get(&5));
433 assert_eq!(output.get(&2), output.get(&5));
434 }
435
436 #[test]
437 fn can_dedupe_with_lookup_map_overriding_participants() {
438 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
439 input.insert(0, BTreeSet::from([1, 2])); input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([3, 4])); input.insert(3, BTreeSet::from([1, 2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
445 chat_lookup_map.insert(1, 0);
446 chat_lookup_map.insert(2, 0);
447
448 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
449
450 assert_eq!(output.get(&0), output.get(&1));
452 assert_eq!(output.get(&0), output.get(&2));
453 assert_eq!(output.get(&3), output.get(&0));
455 }
456
457 #[test]
458 fn can_dedupe_mixed_lookup_and_participants() {
459 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
460 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(3, BTreeSet::from([2])); input.insert(4, BTreeSet::from([3])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
467 chat_lookup_map.insert(1, 0);
468 chat_lookup_map.insert(3, 0);
469
470 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
471
472 assert_eq!(output.get(&0), output.get(&1));
474 assert_eq!(output.get(&0), output.get(&3));
475 assert_ne!(output.get(&2), output.get(&1));
477 assert_eq!(output.get(&4), output.get(&2));
479 assert_ne!(output.get(&3), output.get(&4));
481 }
482
483 #[test]
484 fn lookup_merges_when_canonical_has_higher_id() {
485 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
489 input.insert(10, BTreeSet::from([1])); input.insert(20, BTreeSet::from([2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
493 chat_lookup_map.insert(10, 20);
494 chat_lookup_map.insert(20, 20);
495
496 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
497
498 assert_eq!(
500 output.get(&10),
501 output.get(&20),
502 "Chats linked by chat_lookup should merge regardless of ID ordering"
503 );
504 }
505
506 #[test]
507 fn lookup_merge_is_order_independent() {
508 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
511 input.insert(0, BTreeSet::from([1, 2]));
512 input.insert(5, BTreeSet::from([3, 4]));
513
514 let mut lookup_a: HashMap<i32, i32> = HashMap::new();
516 lookup_a.insert(0, 0);
517 lookup_a.insert(5, 0);
518 let output_a = ChatToHandle::dedupe(&input, &lookup_a).unwrap();
519
520 let mut lookup_b: HashMap<i32, i32> = HashMap::new();
522 lookup_b.insert(0, 5);
523 lookup_b.insert(5, 5);
524 let output_b = ChatToHandle::dedupe(&input, &lookup_b).unwrap();
525
526 assert_eq!(
528 output_a.get(&0) == output_a.get(&5),
529 output_b.get(&0) == output_b.get(&5),
530 "Merge result must not depend on which chat ID is canonical"
531 );
532 assert_eq!(
534 output_b.get(&0),
535 output_b.get(&5),
536 "Chats linked by lookup should merge even when canonical is the higher ID"
537 );
538 }
539
540 #[test]
541 fn transitive_merge_across_participants_and_lookup() {
542 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
546 input.insert(0, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
551 chat_lookup_map.insert(2, 5);
552 chat_lookup_map.insert(5, 5);
553
554 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
555
556 assert_eq!(output.get(&0), output.get(&5));
558 assert_eq!(
560 output.get(&2),
561 output.get(&5),
562 "Chat linked by lookup to a chat that merged by participants should join the same group"
563 );
564 }
565
566 #[test]
567 fn multiple_service_splits_all_merge() {
568 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
572 input.insert(100, BTreeSet::from([10])); input.insert(200, BTreeSet::from([20])); input.insert(300, BTreeSet::from([30])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
577 chat_lookup_map.insert(100, 300);
578 chat_lookup_map.insert(200, 300);
579 chat_lookup_map.insert(300, 300);
580
581 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
582
583 let unique_ids: HashSet<i32> = output.values().copied().collect();
584 assert_eq!(
585 unique_ids.len(),
586 1,
587 "All three service-split chats should merge into one conversation, got {:?}",
588 output
589 );
590 }
591
592 #[test]
593 fn lookup_merges_through_missing_canonical() {
594 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
598 input.insert(10, BTreeSet::from([1]));
599 input.insert(20, BTreeSet::from([2]));
600
601 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
602 chat_lookup_map.insert(10, 99);
603 chat_lookup_map.insert(20, 99);
604
605 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
606
607 assert_eq!(
609 output.get(&10),
610 output.get(&20),
611 "Chats sharing a canonical absent from duplicated_data should still merge"
612 );
613 assert_eq!(output.len(), 2);
615 assert!(!output.contains_key(&99));
616 }
617}