Skip to main content

mail_threading/
lib.rs

1#![deny(missing_docs)]
2#![doc = include_str!("../README.md")]
3
4use chrono::{DateTime, Utc};
5use std::collections::HashMap;
6
7/// A message projected into the fields needed by RFC 5256/JWZ threading.
8#[derive(Debug, Clone, PartialEq, Eq)]
9#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
10pub struct Message {
11    /// Caller-stable message identity returned in thread output.
12    pub id: String,
13    /// The RFC 5322 `Message-ID` value, if present and valid.
14    pub message_id: Option<String>,
15    /// The RFC 5322 `In-Reply-To` value, if present.
16    pub in_reply_to: Option<String>,
17    /// The ordered RFC 5322 `References` chain.
18    pub references: Vec<String>,
19    /// Message date used for deterministic output ordering.
20    pub date: DateTime<Utc>,
21    /// Message subject used only by optional subject fallback merging.
22    pub subject: String,
23}
24
25/// A flat email thread.
26#[derive(Debug, Clone, PartialEq, Eq)]
27#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
28pub struct Thread {
29    /// The canonical root message id for this thread.
30    pub root_message_id: String,
31    /// Thread members, ordered deterministically by date then message id.
32    pub messages: Vec<String>,
33}
34
35/// Configuration for threading behavior.
36#[derive(Debug, Clone, PartialEq, Eq)]
37#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
38pub struct ThreadingOptions {
39    /// Merge otherwise unrelated headerless replies by normalized subject.
40    pub subject_merge: bool,
41    /// Hide phantom containers from root selection and thread membership.
42    pub prune_phantoms: bool,
43    /// Case-insensitive subject prefixes stripped by subject fallback.
44    pub subject_prefixes: Vec<String>,
45}
46
47impl Default for ThreadingOptions {
48    fn default() -> Self {
49        Self {
50            subject_merge: true,
51            prune_phantoms: true,
52            subject_prefixes: [
53                "re", "fw", "fwd", "aw", "sv", "antw", "rv", "odp", "tr", "wg",
54            ]
55            .into_iter()
56            .map(str::to_string)
57            .collect(),
58        }
59    }
60}
61
62/// Thread messages with default RFC 5256/JWZ options.
63pub fn thread_messages(messages: &[Message]) -> Vec<Thread> {
64    thread_messages_with(messages, &ThreadingOptions::default())
65}
66
67/// Thread messages with explicit options.
68pub fn thread_messages_with(messages: &[Message], options: &ThreadingOptions) -> Vec<Thread> {
69    if messages.is_empty() {
70        return Vec::new();
71    }
72
73    let mut table = HashMap::new();
74    let mut output_index = HashMap::new();
75    let mut next_order = 0;
76
77    for message in messages {
78        if message.id.is_empty() {
79            continue;
80        }
81
82        let current_key = message_container_key(&table, message);
83        ensure_container(&mut table, &current_key, &mut next_order);
84
85        if let Some(container) = table.get_mut(&current_key) {
86            container.message = Some(message.clone());
87        }
88        output_index.insert(message.id.clone(), current_key.clone());
89
90        let reference_chain = effective_reference_chain(message);
91        let mut previous_id: Option<&str> = None;
92        for reference_id in &reference_chain {
93            ensure_container(&mut table, reference_id, &mut next_order);
94            if let Some(parent_id) = previous_id {
95                link_parent_child(&mut table, parent_id, reference_id, false);
96            }
97            previous_id = Some(reference_id);
98        }
99
100        if let Some(parent_id) = reference_chain.last() {
101            ensure_container(&mut table, parent_id, &mut next_order);
102            link_parent_child(&mut table, parent_id, &current_key, true);
103        } else {
104            detach_parent(&mut table, &current_key);
105        }
106    }
107
108    let mut public_roots = Vec::new();
109    let mut root_ids = table
110        .iter()
111        .filter_map(|(id, container)| container.parent.is_none().then_some(id.clone()))
112        .collect::<Vec<_>>();
113    root_ids.sort_by(|left, right| compare_container_roots(&table, left, right));
114
115    for root_id in root_ids {
116        collect_public_roots(&table, &root_id, options.prune_phantoms, &mut public_roots);
117    }
118
119    let mut threads = Vec::new();
120    for root_id in public_roots {
121        let mut thread_messages = Vec::new();
122        collect_thread_messages(&table, &root_id, &mut thread_messages);
123        thread_messages
124            .sort_by(|left, right| compare_message_ids(&table, &output_index, left, right));
125        thread_messages.dedup();
126
127        if !thread_messages.is_empty() {
128            let root = table
129                .get(&root_id)
130                .and_then(|container| container.message.as_ref())
131                .map(|message| message.id.clone())
132                .unwrap_or(root_id);
133            threads.push(Thread {
134                root_message_id: root,
135                messages: thread_messages,
136            });
137        }
138    }
139
140    threads.sort_by(|left, right| compare_threads(&table, &output_index, left, right));
141
142    if options.subject_merge {
143        merge_subject_fallback_threads(threads, &table, &output_index, options)
144    } else {
145        threads
146    }
147}
148
149#[derive(Debug, Clone)]
150struct Container {
151    message: Option<Message>,
152    parent: Option<String>,
153    children: Vec<String>,
154    order: usize,
155}
156
157impl Container {
158    fn empty(order: usize) -> Self {
159        Self {
160            message: None,
161            parent: None,
162            children: Vec::new(),
163            order,
164        }
165    }
166}
167
168fn ensure_container(table: &mut HashMap<String, Container>, id: &str, next_order: &mut usize) {
169    table.entry(id.to_string()).or_insert_with(|| {
170        let order = *next_order;
171        *next_order += 1;
172        Container::empty(order)
173    });
174}
175
176fn message_container_key(table: &HashMap<String, Container>, message: &Message) -> String {
177    if let Some(normalized) = message.message_id.as_deref().and_then(normalize_message_id) {
178        if table
179            .get(&normalized)
180            .and_then(|container| container.message.as_ref())
181            .is_none()
182        {
183            return normalized;
184        }
185    }
186
187    synthetic_message_id(&message.id)
188}
189
190fn synthetic_message_id(id: &str) -> String {
191    format!("synthetic:{id}")
192}
193
194fn effective_reference_chain(message: &Message) -> Vec<String> {
195    let references = message
196        .references
197        .iter()
198        .filter_map(|reference| normalize_message_id(reference))
199        .collect::<Vec<_>>();
200
201    if references.is_empty() {
202        message
203            .in_reply_to
204            .as_deref()
205            .and_then(normalize_message_id)
206            .into_iter()
207            .collect()
208    } else {
209        references
210    }
211}
212
213fn link_parent_child(
214    table: &mut HashMap<String, Container>,
215    parent_id: &str,
216    child_id: &str,
217    replace_existing: bool,
218) {
219    if parent_id == child_id || would_create_cycle(table, parent_id, child_id) {
220        return;
221    }
222
223    let has_parent = table
224        .get(child_id)
225        .is_some_and(|container| container.parent.is_some());
226    if has_parent && !replace_existing {
227        return;
228    }
229
230    if replace_existing {
231        detach_parent(table, child_id);
232    }
233
234    if let Some(child) = table.get_mut(child_id) {
235        child.parent = Some(parent_id.to_string());
236    }
237
238    if let Some(parent) = table.get_mut(parent_id) {
239        let child = child_id.to_string();
240        if !parent.children.contains(&child) {
241            parent.children.push(child);
242        }
243    }
244}
245
246fn detach_parent(table: &mut HashMap<String, Container>, child_id: &str) {
247    let parent_id = table.get(child_id).and_then(|child| child.parent.clone());
248
249    if let Some(parent_id) = parent_id {
250        if let Some(parent) = table.get_mut(&parent_id) {
251            parent.children.retain(|id| id != child_id);
252        }
253    }
254
255    if let Some(child) = table.get_mut(child_id) {
256        child.parent = None;
257    }
258}
259
260fn would_create_cycle(table: &HashMap<String, Container>, parent_id: &str, child_id: &str) -> bool {
261    let mut current = Some(parent_id);
262    while let Some(id) = current {
263        if id == child_id {
264            return true;
265        }
266        current = table
267            .get(id)
268            .and_then(|container| container.parent.as_deref());
269    }
270    false
271}
272
273fn collect_public_roots(
274    table: &HashMap<String, Container>,
275    id: &str,
276    prune_phantoms: bool,
277    out: &mut Vec<String>,
278) {
279    let Some(container) = table.get(id) else {
280        return;
281    };
282
283    if prune_phantoms && container.message.is_none() {
284        let mut children = container.children.clone();
285        children.sort_by(|left, right| compare_container_roots(table, left, right));
286        for child_id in children {
287            collect_public_roots(table, &child_id, prune_phantoms, out);
288        }
289    } else {
290        out.push(id.to_string());
291    }
292}
293
294fn collect_thread_messages(table: &HashMap<String, Container>, id: &str, out: &mut Vec<String>) {
295    let Some(container) = table.get(id) else {
296        return;
297    };
298
299    if let Some(message) = &container.message {
300        out.push(message.id.clone());
301    }
302
303    let mut children = container.children.clone();
304    children.sort_by(|left, right| compare_container_roots(table, left, right));
305    for child_id in children {
306        collect_thread_messages(table, &child_id, out);
307    }
308}
309
310fn compare_container_roots(
311    table: &HashMap<String, Container>,
312    left: &str,
313    right: &str,
314) -> std::cmp::Ordering {
315    earliest_date(table, left)
316        .cmp(&earliest_date(table, right))
317        .then_with(|| {
318            table
319                .get(left)
320                .map(|container| container.order)
321                .cmp(&table.get(right).map(|container| container.order))
322        })
323        .then_with(|| left.cmp(right))
324}
325
326fn compare_message_ids(
327    table: &HashMap<String, Container>,
328    output_index: &HashMap<String, String>,
329    left_id: &str,
330    right_id: &str,
331) -> std::cmp::Ordering {
332    message_date(table, output_index, left_id)
333        .cmp(&message_date(table, output_index, right_id))
334        .then_with(|| left_id.cmp(right_id))
335}
336
337fn compare_threads(
338    table: &HashMap<String, Container>,
339    output_index: &HashMap<String, String>,
340    left: &Thread,
341    right: &Thread,
342) -> std::cmp::Ordering {
343    thread_first_date(table, output_index, left)
344        .cmp(&thread_first_date(table, output_index, right))
345        .then_with(|| left.root_message_id.cmp(&right.root_message_id))
346}
347
348fn earliest_date(table: &HashMap<String, Container>, root_id: &str) -> DateTime<Utc> {
349    earliest_message_date(table, root_id).unwrap_or_default()
350}
351
352fn earliest_message_date(
353    table: &HashMap<String, Container>,
354    root_id: &str,
355) -> Option<DateTime<Utc>> {
356    let container = table.get(root_id)?;
357
358    container
359        .message
360        .as_ref()
361        .map(|message| message.date)
362        .into_iter()
363        .chain(
364            container
365                .children
366                .iter()
367                .filter_map(|child_id| earliest_message_date(table, child_id)),
368        )
369        .min()
370}
371
372fn message_date(
373    table: &HashMap<String, Container>,
374    output_index: &HashMap<String, String>,
375    output_id: &str,
376) -> DateTime<Utc> {
377    output_index
378        .get(output_id)
379        .and_then(|key| table.get(key))
380        .and_then(|container| container.message.as_ref())
381        .map(|message| message.date)
382        .unwrap_or_default()
383}
384
385fn thread_first_date(
386    table: &HashMap<String, Container>,
387    output_index: &HashMap<String, String>,
388    thread: &Thread,
389) -> DateTime<Utc> {
390    thread
391        .messages
392        .iter()
393        .map(|id| message_date(table, output_index, id))
394        .min()
395        .unwrap_or_default()
396}
397
398fn merge_subject_fallback_threads(
399    mut threads: Vec<Thread>,
400    table: &HashMap<String, Container>,
401    output_index: &HashMap<String, String>,
402    options: &ThreadingOptions,
403) -> Vec<Thread> {
404    threads.sort_by(|left, right| {
405        let left_has_headers = thread_has_headers(left, table, output_index);
406        let right_has_headers = thread_has_headers(right, table, output_index);
407        right_has_headers
408            .cmp(&left_has_headers)
409            .then_with(|| compare_threads(table, output_index, left, right))
410    });
411
412    let mut merged = Vec::new();
413    let mut subject_roots: HashMap<String, usize> = HashMap::new();
414
415    for mut thread in threads {
416        thread
417            .messages
418            .sort_by(|left, right| compare_message_ids(table, output_index, left, right));
419        let key = thread
420            .messages
421            .first()
422            .and_then(|id| output_index.get(id))
423            .and_then(|key| table.get(key))
424            .and_then(|container| container.message.as_ref())
425            .map(|message| normalize_subject(&message.subject, &options.subject_prefixes))
426            .unwrap_or_default();
427        let has_headers = thread_has_headers(&thread, table, output_index);
428
429        if !has_headers && !key.is_empty() {
430            if let Some(index) = subject_roots.get(&key).copied() {
431                let target: &mut Thread = &mut merged[index];
432                for message_id in thread.messages {
433                    if !target.messages.contains(&message_id) {
434                        target.messages.push(message_id);
435                    }
436                }
437                target
438                    .messages
439                    .sort_by(|left, right| compare_message_ids(table, output_index, left, right));
440                continue;
441            }
442        }
443
444        if !key.is_empty() {
445            subject_roots.entry(key).or_insert_with(|| merged.len());
446        }
447        merged.push(thread);
448    }
449
450    merged.sort_by(|left, right| compare_threads(table, output_index, left, right));
451    merged
452}
453
454fn thread_has_headers(
455    thread: &Thread,
456    table: &HashMap<String, Container>,
457    output_index: &HashMap<String, String>,
458) -> bool {
459    thread.messages.iter().any(|id| {
460        output_index
461            .get(id)
462            .and_then(|key| table.get(key))
463            .and_then(|container| container.message.as_ref())
464            .is_some_and(has_valid_threading_header)
465    })
466}
467
468fn has_valid_threading_header(message: &Message) -> bool {
469    message
470        .in_reply_to
471        .as_deref()
472        .and_then(normalize_message_id)
473        .is_some()
474        || message
475            .references
476            .iter()
477            .any(|reference| normalize_message_id(reference).is_some())
478}
479
480fn normalize_message_id(raw: &str) -> Option<String> {
481    let trimmed = raw.trim();
482    let unwrapped = trimmed
483        .strip_prefix('<')
484        .and_then(|value| value.strip_suffix('>'))
485        .unwrap_or(trimmed)
486        .trim();
487
488    if unwrapped.is_empty() || !unwrapped.contains('@') {
489        return None;
490    }
491
492    let (local, domain) = unwrapped.split_once('@')?;
493    let local = local.trim();
494    let domain = domain.trim();
495    if local.is_empty() || domain.is_empty() {
496        return None;
497    }
498
499    let local = local
500        .strip_prefix('"')
501        .and_then(|value| value.strip_suffix('"'))
502        .unwrap_or(local)
503        .replace("\\\"", "\"")
504        .replace("\\\\", "\\");
505
506    Some(format!("{local}@{domain}"))
507}
508
509fn normalize_subject(subject: &str, prefixes: &[String]) -> String {
510    let collapsed = subject.split_whitespace().collect::<Vec<_>>().join(" ");
511    let mut normalized = collapsed.trim();
512
513    loop {
514        if let Some(stripped) = normalized
515            .strip_suffix("(fwd)")
516            .or_else(|| normalized.strip_suffix("(FWD)"))
517            .or_else(|| normalized.strip_suffix("(Fwd)"))
518        {
519            normalized = stripped.trim();
520            continue;
521        }
522
523        let lower = normalized.to_ascii_lowercase();
524        if lower.starts_with("[fwd:") && normalized.ends_with(']') {
525            normalized = normalized[5..normalized.len() - 1].trim();
526            continue;
527        }
528
529        if let Some(stripped) = strip_leading_subject_blobs(normalized) {
530            normalized = stripped;
531            continue;
532        }
533
534        let Some(prefix_end) = normalized.find(':') else {
535            break;
536        };
537        let prefix = normalized[..prefix_end].trim();
538        let lower = prefix.to_ascii_lowercase();
539        let base = lower
540            .trim_end_matches(|ch: char| {
541                ch.is_ascii_digit() || matches!(ch, '[' | ']' | '(' | ')' | ' ')
542            })
543            .trim();
544
545        if prefixes
546            .iter()
547            .any(|candidate| candidate.eq_ignore_ascii_case(base))
548        {
549            normalized = normalized[prefix_end + 1..].trim();
550            continue;
551        }
552
553        break;
554    }
555
556    normalized.to_ascii_lowercase()
557}
558
559fn strip_leading_subject_blobs(subject: &str) -> Option<&str> {
560    let mut cursor = subject.trim_start();
561    let mut stripped_any = false;
562
563    while let Some(rest) = cursor.strip_prefix('[') {
564        let Some(blob_end) = rest.find(']') else {
565            break;
566        };
567        let after_blob = rest[blob_end + 1..].trim_start();
568        if after_blob.is_empty() {
569            break;
570        }
571
572        stripped_any = true;
573        cursor = after_blob;
574    }
575
576    stripped_any.then_some(cursor.trim())
577}