use rusmes_proto::Mail;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
type ThreadIndex = HashMap<String, String>;
pub struct ThreadingEngine {
index_path: PathBuf,
}
impl ThreadingEngine {
pub fn new(mailbox_dir: &Path) -> Self {
Self {
index_path: mailbox_dir.join(".thread_index.json"),
}
}
pub async fn assign_thread_id(&self, mail: &Mail) -> anyhow::Result<String> {
let message = mail.message();
let headers = message.headers();
let rfc_message_id: String = headers
.get_first("message-id")
.map(strip_angle_brackets)
.unwrap_or_else(|| format!("uuid:{}", mail.message_id()));
let mut refs: Vec<String> = Vec::new();
if let Some(references_hdr) = headers.get_first("references") {
parse_message_id_list(references_hdr, &mut refs);
}
if let Some(in_reply_to_hdr) = headers.get_first("in-reply-to") {
parse_message_id_list(in_reply_to_hdr, &mut refs);
}
dedup_keep_order(&mut refs);
let mut index = load_index(&self.index_path).await?;
let mut found_thread_id: Option<String> = None;
for ref_id in &refs {
if let Some(tid) = index.get(ref_id) {
found_thread_id = Some(tid.clone());
break;
}
}
if found_thread_id.is_none() {
let normalized = headers
.get_first("subject")
.map(normalize_subject)
.unwrap_or_default();
if !normalized.is_empty() {
let subj_key = format!("subj:{}", normalized);
if let Some(tid) = index.get(&subj_key) {
found_thread_id = Some(tid.clone());
}
}
}
let thread_id = found_thread_id.unwrap_or_else(|| {
let mut hasher = Sha256::new();
hasher.update(rfc_message_id.as_bytes());
let digest = hasher.finalize();
format!("{:x}", digest).chars().take(16).collect()
});
index.insert(rfc_message_id.clone(), thread_id.clone());
let normalized_subj = headers
.get_first("subject")
.map(normalize_subject)
.unwrap_or_default();
if !normalized_subj.is_empty() {
let subj_key = format!("subj:{}", normalized_subj);
index.entry(subj_key).or_insert_with(|| thread_id.clone());
}
persist_index(&self.index_path, &index).await?;
Ok(thread_id)
}
pub async fn get_thread_id(&self, message_id: &str) -> anyhow::Result<Option<String>> {
let normalized = strip_angle_brackets(message_id);
let index = load_index(&self.index_path).await?;
Ok(index.get(&normalized).cloned())
}
}
pub(super) fn strip_angle_brackets(s: &str) -> String {
let trimmed = s.trim();
if trimmed.starts_with('<') && trimmed.ends_with('>') {
trimmed[1..trimmed.len() - 1].to_string()
} else {
trimmed.to_string()
}
}
fn parse_message_id_list(value: &str, out: &mut Vec<String>) {
for token in value.split([' ', ',', '\t', '\n', '\r']) {
let stripped = strip_angle_brackets(token);
if !stripped.is_empty() {
out.push(stripped);
}
}
}
fn dedup_keep_order(v: &mut Vec<String>) {
let mut seen = std::collections::HashSet::new();
v.retain(|item| seen.insert(item.clone()));
}
fn normalize_subject(subject: &str) -> String {
let mut s = subject.trim().to_lowercase();
loop {
let before = s.clone();
for prefix in &["re:", "fwd:", "fw:"] {
if let Some(rest) = s.strip_prefix(prefix) {
s = rest.trim_start().to_string();
}
}
if s.starts_with('[') {
if let Some(end) = s.find(']') {
s = s[end + 1..].trim_start().to_string();
}
}
if s == before {
break;
}
}
s
}
async fn load_index(path: &Path) -> anyhow::Result<ThreadIndex> {
if !tokio::fs::try_exists(path).await.unwrap_or(false) {
return Ok(HashMap::new());
}
let bytes = tokio::fs::read(path).await?;
let index: ThreadIndex = serde_json::from_slice(&bytes).unwrap_or_else(|_| HashMap::new());
Ok(index)
}
async fn persist_index(path: &Path, index: &ThreadIndex) -> anyhow::Result<()> {
let json = serde_json::to_vec(index)
.map_err(|e| anyhow::anyhow!("Failed to serialize thread index: {}", e))?;
let tmp_path = path.with_extension("json.tmp");
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&tmp_path, &json).await?;
tokio::fs::rename(&tmp_path, path).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use rusmes_proto::{HeaderMap, Mail, MessageBody, MimeMessage};
fn make_mail(headers: Vec<(&str, &str)>) -> Mail {
let mut hmap = HeaderMap::new();
for (name, value) in headers {
hmap.insert(name, value.to_string());
}
let body = MessageBody::Small(Bytes::from("test body"));
let mime = MimeMessage::new(hmap, body);
Mail::new(None, vec![], mime, None, None)
}
#[tokio::test]
async fn test_new_message_gets_new_thread_id() {
let dir = std::env::temp_dir().join(format!("threading-test-{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let engine = ThreadingEngine::new(&dir);
let mail = make_mail(vec![
("message-id", "<msg001@example.com>"),
("subject", "Hello world"),
]);
let tid = engine.assign_thread_id(&mail).await.unwrap();
assert!(!tid.is_empty(), "thread_id must not be empty");
assert_eq!(tid.len(), 16, "thread_id must be 16 hex chars");
tokio::fs::remove_dir_all(&dir).await.unwrap();
}
#[tokio::test]
async fn test_reply_gets_same_thread_id() {
let dir = std::env::temp_dir().join(format!("threading-test-{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let engine = ThreadingEngine::new(&dir);
let original = make_mail(vec![
("message-id", "<original@example.com>"),
("subject", "Original topic"),
]);
let original_tid = engine.assign_thread_id(&original).await.unwrap();
let reply = make_mail(vec![
("message-id", "<reply001@example.com>"),
("in-reply-to", "<original@example.com>"),
("subject", "Re: Original topic"),
]);
let reply_tid = engine.assign_thread_id(&reply).await.unwrap();
assert_eq!(
original_tid, reply_tid,
"Reply must share thread_id with original"
);
tokio::fs::remove_dir_all(&dir).await.unwrap();
}
#[tokio::test]
async fn test_references_chain_assigns_thread() {
let dir = std::env::temp_dir().join(format!("threading-test-{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let engine = ThreadingEngine::new(&dir);
let root = make_mail(vec![
("message-id", "<root@example.com>"),
("subject", "Root thread"),
]);
let root_tid = engine.assign_thread_id(&root).await.unwrap();
let mid = make_mail(vec![
("message-id", "<mid@example.com>"),
("references", "<root@example.com>"),
("subject", "Re: Root thread"),
]);
let mid_tid = engine.assign_thread_id(&mid).await.unwrap();
assert_eq!(root_tid, mid_tid, "Mid reply must be in same thread");
let late = make_mail(vec![
("message-id", "<late@example.com>"),
("references", "<root@example.com> <mid@example.com>"),
("subject", "Re: Root thread"),
]);
let late_tid = engine.assign_thread_id(&late).await.unwrap();
assert_eq!(root_tid, late_tid, "Late reply must be in same thread");
tokio::fs::remove_dir_all(&dir).await.unwrap();
}
#[tokio::test]
async fn test_subject_fallback_threading() {
let dir = std::env::temp_dir().join(format!("threading-test-{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let engine = ThreadingEngine::new(&dir);
let first = make_mail(vec![
("message-id", "<first@example.com>"),
("subject", "Meeting tomorrow"),
]);
let first_tid = engine.assign_thread_id(&first).await.unwrap();
let second = make_mail(vec![
("message-id", "<second@example.com>"),
("subject", "Re: Meeting tomorrow"),
]);
let second_tid = engine.assign_thread_id(&second).await.unwrap();
assert_eq!(
first_tid, second_tid,
"Same normalized subject must produce same thread_id via fallback"
);
tokio::fs::remove_dir_all(&dir).await.unwrap();
}
#[tokio::test]
async fn test_different_subjects_different_threads() {
let dir = std::env::temp_dir().join(format!("threading-test-{}", uuid::Uuid::new_v4()));
tokio::fs::create_dir_all(&dir).await.unwrap();
let engine = ThreadingEngine::new(&dir);
let first = make_mail(vec![
("message-id", "<alpha@example.com>"),
("subject", "Alpha topic"),
]);
let second = make_mail(vec![
("message-id", "<beta@example.com>"),
("subject", "Beta topic"),
]);
let tid_a = engine.assign_thread_id(&first).await.unwrap();
let tid_b = engine.assign_thread_id(&second).await.unwrap();
assert_ne!(
tid_a, tid_b,
"Different subjects must produce different thread IDs"
);
tokio::fs::remove_dir_all(&dir).await.unwrap();
}
#[test]
fn test_strip_angle_brackets() {
assert_eq!(strip_angle_brackets("<foo@bar.com>"), "foo@bar.com");
assert_eq!(strip_angle_brackets("foo@bar.com"), "foo@bar.com");
assert_eq!(
strip_angle_brackets(" <spaced@bar.com> "),
"spaced@bar.com"
);
}
#[test]
fn test_normalize_subject() {
assert_eq!(normalize_subject("Re: Hello"), "hello");
assert_eq!(normalize_subject("Fwd: Test"), "test");
assert_eq!(normalize_subject("FW: Test"), "test");
assert_eq!(normalize_subject("[List] Re: Hello"), "hello");
assert_eq!(normalize_subject("Hello World"), "hello world");
assert_eq!(normalize_subject("Re: Re: Deep"), "deep");
}
}