1use std::collections::{BTreeSet, HashMap, HashSet};
6
7use crate::{
8 error::table::TableError,
9 tables::{
10 diagnostic::ChatHandleDiagnostic,
11 table::{CHAT_HANDLE_JOIN, CHAT_MESSAGE_JOIN, Cacheable, Table},
12 },
13 util::union_find::UnionFind,
14};
15use rusqlite::{CachedStatement, Connection, Result, Row};
16
17#[derive(Debug)]
20pub struct ChatToHandle {
21 chat_id: i32,
22 handle_id: i32,
23}
24
25impl Table for ChatToHandle {
26 fn from_row(row: &Row) -> Result<ChatToHandle> {
27 Ok(ChatToHandle {
28 chat_id: row.get("chat_id")?,
29 handle_id: row.get("handle_id")?,
30 })
31 }
32
33 fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
34 Ok(db.prepare_cached(&format!("SELECT * FROM {CHAT_HANDLE_JOIN}"))?)
35 }
36}
37
38impl Cacheable for ChatToHandle {
40 type K = i32;
41 type V = BTreeSet<i32>;
42 fn cache(db: &Connection) -> Result<HashMap<Self::K, Self::V>, TableError> {
56 let mut cache: HashMap<i32, BTreeSet<i32>> = HashMap::new();
57
58 let mut rows = ChatToHandle::get(db)?;
59 let mappings = rows.query_map([], |row| Ok(ChatToHandle::from_row(row)))?;
60
61 for mapping in mappings {
62 let joiner = ChatToHandle::extract(mapping)?;
63 if let Some(handles) = cache.get_mut(&joiner.chat_id) {
64 handles.insert(joiner.handle_id);
65 } else {
66 let mut data_to_cache = BTreeSet::new();
67 data_to_cache.insert(joiner.handle_id);
68 cache.insert(joiner.chat_id, data_to_cache);
69 }
70 }
71
72 Ok(cache)
73 }
74}
75
76impl ChatToHandle {
78 pub fn run_diagnostic(db: &Connection) -> Result<ChatHandleDiagnostic, TableError> {
95 let mut statement_message_chats =
97 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_MESSAGE_JOIN}"))?;
98 let statement_message_chat_rows =
99 statement_message_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
100 let mut unique_chats_from_messages: HashSet<i32> = HashSet::new();
101 statement_message_chat_rows.into_iter().for_each(|row| {
102 if let Ok(row) = row {
103 unique_chats_from_messages.insert(row);
104 }
105 });
106
107 let mut statement_handle_chats =
109 db.prepare(&format!("SELECT DISTINCT chat_id from {CHAT_HANDLE_JOIN}"))?;
110 let statement_handle_chat_rows =
111 statement_handle_chats.query_map([], |row: &Row| -> Result<i32> { row.get(0) })?;
112 let mut unique_chats_from_handles: HashSet<i32> = HashSet::new();
113 statement_handle_chat_rows.into_iter().for_each(|row| {
114 if let Ok(row) = row {
115 unique_chats_from_handles.insert(row);
116 }
117 });
118
119 let all_chats = Self::cache(db)?;
121
122 let chat_handle_lookup = ChatToHandle::get_chat_lookup_map(db)?;
124
125 let real_chatrooms = ChatToHandle::dedupe(&all_chats, &chat_handle_lookup)?;
127
128 let total_duplicated =
130 all_chats.len() - HashSet::<&i32>::from_iter(real_chatrooms.values()).len();
131
132 let chats_with_no_handles = unique_chats_from_messages
134 .difference(&unique_chats_from_handles)
135 .count();
136
137 Ok(ChatHandleDiagnostic {
138 total_chats: all_chats.len(),
139 total_duplicated,
140 chats_with_no_handles,
141 })
142 }
143}
144
145impl ChatToHandle {
146 pub fn get_chat_lookup_map(conn: &Connection) -> Result<HashMap<i32, i32>, TableError> {
163 let mut stmt = conn.prepare(
165 "
166WITH RECURSIVE
167 adj AS (
168 SELECT DISTINCT a.chat AS u, b.chat AS v
169 FROM chat_lookup a
170 JOIN chat_lookup b
171 ON a.identifier = b.identifier
172 ),
173 reach(root, chat) AS (
174 SELECT u AS root, v AS chat FROM adj
175 UNION
176 SELECT r.root, a.v
177 FROM reach r
178 JOIN adj a ON a.u = r.chat
179 ),
180 canon AS (
181 SELECT chat, MAX(root) AS canonical_chat
182 FROM reach
183 GROUP BY chat
184 )
185SELECT chat, canonical_chat
186FROM canon
187ORDER BY chat;
188 ",
189 );
190 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
191
192 if let Ok(statement) = stmt.as_mut() {
193 let chat_lookup_rows = statement.query_map([], |row| {
195 let chat: i32 = row.get(0)?;
196 let canonical: i32 = row.get(1)?;
197 Ok((chat, canonical))
198 });
199
200 if let Ok(chat_lookup_rows) = chat_lookup_rows {
202 for row in chat_lookup_rows {
203 let (chat_id, canonical_chat) = row?;
204 chat_lookup_map.insert(chat_id, canonical_chat);
205 }
206 }
207 }
208 Ok(chat_lookup_map)
209 }
210
211 pub fn dedupe(
233 duplicated_data: &HashMap<i32, BTreeSet<i32>>,
234 chat_lookup_map: &HashMap<i32, i32>,
235 ) -> Result<HashMap<i32, i32>, TableError> {
236 let mut uf = UnionFind::new();
237
238 for chat_id in duplicated_data.keys() {
240 uf.make_set(*chat_id);
241 }
242
243 let mut sorted_lookup: Vec<(&i32, &i32)> = chat_lookup_map.iter().collect();
248 sorted_lookup.sort_by_key(|(chat_id, _)| *chat_id);
249 for (chat_id, canonical) in sorted_lookup {
250 if duplicated_data.contains_key(chat_id) {
251 uf.union(*chat_id, *canonical);
252 }
253 }
254
255 let mut sorted_chats: Vec<(&i32, &BTreeSet<i32>)> = duplicated_data.iter().collect();
258 sorted_chats.sort_by_key(|(id, _)| *id);
259
260 let mut participants_to_chat: HashMap<&BTreeSet<i32>, i32> = HashMap::new();
261 for (chat_id, participants) in &sorted_chats {
262 if let Some(&first_chat) = participants_to_chat.get(participants) {
263 uf.union(**chat_id, first_chat);
264 } else {
265 participants_to_chat.insert(participants, **chat_id);
266 }
267 }
268
269 let mut deduplicated_chats: HashMap<i32, i32> = HashMap::new();
272 let mut representative_to_id: HashMap<i32, i32> = HashMap::new();
273 let mut next_id = 0;
274
275 for (chat_id, _) in &sorted_chats {
276 let rep = uf.find(**chat_id);
277 let dedup_id = *representative_to_id.entry(rep).or_insert_with(|| {
278 let id = next_id;
279 next_id += 1;
280 id
281 });
282 deduplicated_chats.insert(**chat_id, dedup_id);
283 }
284
285 Ok(deduplicated_chats)
286 }
287}
288
289#[cfg(test)]
291mod tests {
292 use crate::tables::chat_handle::ChatToHandle;
293 use std::collections::{BTreeSet, HashMap, HashSet};
294
295 #[test]
296 fn can_dedupe() {
297 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
298 input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([2])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
306 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
307 assert_eq!(expected_deduped_ids.len(), 3);
308 }
309
310 #[test]
311 fn can_dedupe_multi() {
312 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
313 input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([1])); input.insert(3, BTreeSet::from([1])); input.insert(4, BTreeSet::from([2, 1])); input.insert(5, BTreeSet::from([2, 3])); input.insert(6, BTreeSet::from([3])); let output = ChatToHandle::dedupe(&input, &HashMap::new());
321 let expected_deduped_ids: HashSet<i32> = output.unwrap().values().copied().collect();
322 assert_eq!(expected_deduped_ids.len(), 4);
323 }
324
325 #[test]
326 fn test_same_values() {
328 let mut input_1: HashMap<i32, BTreeSet<i32>> = HashMap::new();
329 input_1.insert(1, BTreeSet::from([1]));
330 input_1.insert(2, BTreeSet::from([1]));
331 input_1.insert(3, BTreeSet::from([1]));
332 input_1.insert(4, BTreeSet::from([2]));
333 input_1.insert(5, BTreeSet::from([2]));
334 input_1.insert(6, BTreeSet::from([3]));
335
336 let mut input_2: HashMap<i32, BTreeSet<i32>> = HashMap::new();
337 input_2.insert(1, BTreeSet::from([1]));
338 input_2.insert(2, BTreeSet::from([1]));
339 input_2.insert(3, BTreeSet::from([1]));
340 input_2.insert(4, BTreeSet::from([2]));
341 input_2.insert(5, BTreeSet::from([2]));
342 input_2.insert(6, BTreeSet::from([3]));
343
344 let mut input_3: HashMap<i32, BTreeSet<i32>> = HashMap::new();
345 input_3.insert(1, BTreeSet::from([1]));
346 input_3.insert(2, BTreeSet::from([1]));
347 input_3.insert(3, BTreeSet::from([1]));
348 input_3.insert(4, BTreeSet::from([2]));
349 input_3.insert(5, BTreeSet::from([2]));
350 input_3.insert(6, BTreeSet::from([3]));
351
352 let mut output_1 = ChatToHandle::dedupe(&input_1, &HashMap::new())
353 .unwrap()
354 .into_iter()
355 .collect::<Vec<(i32, i32)>>();
356 let mut output_2 = ChatToHandle::dedupe(&input_2, &HashMap::new())
357 .unwrap()
358 .into_iter()
359 .collect::<Vec<(i32, i32)>>();
360 let mut output_3 = ChatToHandle::dedupe(&input_3, &HashMap::new())
361 .unwrap()
362 .into_iter()
363 .collect::<Vec<(i32, i32)>>();
364
365 output_1.sort_unstable();
366 output_2.sort_unstable();
367 output_3.sort_unstable();
368
369 assert_eq!(output_1, output_2);
370 assert_eq!(output_1, output_3);
371 assert_eq!(output_2, output_3);
372 }
373
374 #[test]
375 fn test_same_values_with_lookup() {
376 fn build_input() -> HashMap<i32, BTreeSet<i32>> {
379 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
380 input.insert(0, BTreeSet::from([1]));
381 input.insert(1, BTreeSet::from([1]));
382 input.insert(2, BTreeSet::from([3]));
383 input.insert(4, BTreeSet::from([2]));
384 input.insert(5, BTreeSet::from([1]));
385 input
386 }
387
388 fn build_lookup() -> HashMap<i32, i32> {
389 let mut lookup: HashMap<i32, i32> = HashMap::new();
390 lookup.insert(2, 5);
391 lookup.insert(4, 0);
392 lookup
393 }
394
395 let mut output_1 = ChatToHandle::dedupe(&build_input(), &build_lookup())
396 .unwrap()
397 .into_iter()
398 .collect::<Vec<(i32, i32)>>();
399 let mut output_2 = ChatToHandle::dedupe(&build_input(), &build_lookup())
400 .unwrap()
401 .into_iter()
402 .collect::<Vec<(i32, i32)>>();
403 let mut output_3 = ChatToHandle::dedupe(&build_input(), &build_lookup())
404 .unwrap()
405 .into_iter()
406 .collect::<Vec<(i32, i32)>>();
407
408 output_1.sort_unstable();
409 output_2.sort_unstable();
410 output_3.sort_unstable();
411
412 assert_eq!(output_1, output_2);
413 assert_eq!(output_1, output_3);
414 assert_eq!(output_2, output_3);
415 }
416
417 #[test]
418 fn can_dedupe_with_chat_lookup_map() {
419 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
420 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(4, BTreeSet::from([2])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
427 chat_lookup_map.insert(2, 5);
428 chat_lookup_map.insert(4, 0);
429
430 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
431
432 assert_eq!(output.get(&0), output.get(&1));
435 assert_eq!(output.get(&0), output.get(&4));
436 assert_eq!(output.get(&0), output.get(&5));
437 assert_eq!(output.get(&2), output.get(&5));
438 }
439
440 #[test]
441 fn can_dedupe_with_lookup_map_overriding_participants() {
442 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
443 input.insert(0, BTreeSet::from([1, 2])); input.insert(1, BTreeSet::from([1, 2])); input.insert(2, BTreeSet::from([3, 4])); input.insert(3, BTreeSet::from([1, 2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
449 chat_lookup_map.insert(1, 0);
450 chat_lookup_map.insert(2, 0);
451
452 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
453
454 assert_eq!(output.get(&0), output.get(&1));
456 assert_eq!(output.get(&0), output.get(&2));
457 assert_eq!(output.get(&3), output.get(&0));
459 }
460
461 #[test]
462 fn can_dedupe_mixed_lookup_and_participants() {
463 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
464 input.insert(0, BTreeSet::from([1])); input.insert(1, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(3, BTreeSet::from([2])); input.insert(4, BTreeSet::from([3])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
471 chat_lookup_map.insert(1, 0);
472 chat_lookup_map.insert(3, 0);
473
474 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
475
476 assert_eq!(output.get(&0), output.get(&1));
478 assert_eq!(output.get(&0), output.get(&3));
479 assert_ne!(output.get(&2), output.get(&1));
481 assert_eq!(output.get(&4), output.get(&2));
483 assert_ne!(output.get(&3), output.get(&4));
485 }
486
487 #[test]
488 fn lookup_merges_when_canonical_has_higher_id() {
489 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
493 input.insert(10, BTreeSet::from([1])); input.insert(20, BTreeSet::from([2])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
497 chat_lookup_map.insert(10, 20);
498 chat_lookup_map.insert(20, 20);
499
500 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
501
502 assert_eq!(
504 output.get(&10),
505 output.get(&20),
506 "Chats linked by chat_lookup should merge regardless of ID ordering"
507 );
508 }
509
510 #[test]
511 fn lookup_merge_is_order_independent() {
512 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
515 input.insert(0, BTreeSet::from([1, 2]));
516 input.insert(5, BTreeSet::from([3, 4]));
517
518 let mut lookup_a: HashMap<i32, i32> = HashMap::new();
520 lookup_a.insert(0, 0);
521 lookup_a.insert(5, 0);
522 let output_a = ChatToHandle::dedupe(&input, &lookup_a).unwrap();
523
524 let mut lookup_b: HashMap<i32, i32> = HashMap::new();
526 lookup_b.insert(0, 5);
527 lookup_b.insert(5, 5);
528 let output_b = ChatToHandle::dedupe(&input, &lookup_b).unwrap();
529
530 assert_eq!(
532 output_a.get(&0) == output_a.get(&5),
533 output_b.get(&0) == output_b.get(&5),
534 "Merge result must not depend on which chat ID is canonical"
535 );
536 assert_eq!(
538 output_b.get(&0),
539 output_b.get(&5),
540 "Chats linked by lookup should merge even when canonical is the higher ID"
541 );
542 }
543
544 #[test]
545 fn transitive_merge_across_participants_and_lookup() {
546 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
550 input.insert(0, BTreeSet::from([1])); input.insert(2, BTreeSet::from([3])); input.insert(5, BTreeSet::from([1])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
555 chat_lookup_map.insert(2, 5);
556 chat_lookup_map.insert(5, 5);
557
558 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
559
560 assert_eq!(output.get(&0), output.get(&5));
562 assert_eq!(
564 output.get(&2),
565 output.get(&5),
566 "Chat linked by lookup to a chat that merged by participants should join the same group"
567 );
568 }
569
570 #[test]
571 fn multiple_service_splits_all_merge() {
572 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
576 input.insert(100, BTreeSet::from([10])); input.insert(200, BTreeSet::from([20])); input.insert(300, BTreeSet::from([30])); let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
581 chat_lookup_map.insert(100, 300);
582 chat_lookup_map.insert(200, 300);
583 chat_lookup_map.insert(300, 300);
584
585 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
586
587 let unique_ids: HashSet<i32> = output.values().copied().collect();
588 assert_eq!(
589 unique_ids.len(),
590 1,
591 "All three service-split chats should merge into one conversation, got {:?}",
592 output
593 );
594 }
595
596 #[test]
597 fn lookup_merges_through_missing_canonical() {
598 let mut input: HashMap<i32, BTreeSet<i32>> = HashMap::new();
602 input.insert(10, BTreeSet::from([1]));
603 input.insert(20, BTreeSet::from([2]));
604
605 let mut chat_lookup_map: HashMap<i32, i32> = HashMap::new();
606 chat_lookup_map.insert(10, 99);
607 chat_lookup_map.insert(20, 99);
608
609 let output = ChatToHandle::dedupe(&input, &chat_lookup_map).unwrap();
610
611 assert_eq!(
613 output.get(&10),
614 output.get(&20),
615 "Chats sharing a canonical absent from duplicated_data should still merge"
616 );
617 assert_eq!(output.len(), 2);
619 assert!(!output.contains_key(&99));
620 }
621}