Skip to main content

agent_first_mail/store/
messages.rs

1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4    let dir = root.join("messages");
5    if !dir.exists() {
6        return Ok(Vec::new());
7    }
8    let mut paths = read_dir(&dir, "read messages")?
9        .into_iter()
10        .map(|entry| entry.path())
11        .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12        .collect::<Vec<_>>();
13    paths.sort();
14    Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageRemoteFile {
20    schema_name: String,
21    schema_version: u64,
22    message_id: String,
23    #[serde(default, skip_serializing_if = "Vec::is_empty")]
24    locations: Vec<RemoteLocation>,
25}
26
27#[derive(Debug, Serialize)]
28struct MessageDispositionResult {
29    code: &'static str,
30    message_id: String,
31    special_use: String,
32    message_ids: Vec<String>,
33    location_count: usize,
34    queued_location_count: usize,
35    queued: bool,
36    push_id: Option<String>,
37}
38
39#[derive(Debug, Serialize)]
40struct MessageArchiveResult {
41    code: &'static str,
42    message_id: String,
43    archive_uid: String,
44    path: String,
45    special_use: String,
46    eligible_message_ids: Vec<String>,
47    location_count: usize,
48    queued_location_count: usize,
49    queued: bool,
50    push_ids: Vec<String>,
51    push_id: Option<String>,
52}
53
54#[derive(Clone, Debug, Default)]
55pub(super) struct MessageCacheRebuildTotals {
56    pub(super) rebuilt_count: usize,
57    pub(super) removed_text_cache_count: usize,
58}
59
60impl MessageRemoteFile {
61    fn from_message(message: &MessageFile) -> Option<Self> {
62        let remote = message.remote.as_ref()?;
63        if remote.locations.is_empty() {
64            return None;
65        }
66        Some(Self {
67            schema_name: "message_remote".to_string(),
68            schema_version: 1,
69            message_id: message.message_id.clone(),
70            locations: remote.locations.clone(),
71        })
72    }
73
74    fn remote_state(&self) -> RemoteState {
75        RemoteState {
76            locations: self.locations.clone(),
77        }
78    }
79}
80
81fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
82    root.join(".afmail/messages")
83        .join(format!("{message_id}.eml"))
84}
85
86fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
87    root.join(".afmail/messages")
88        .join(format!("{message_id}.state.json"))
89}
90
91fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
92    root.join(".afmail/messages")
93        .join(format!("{message_id}.remote.json"))
94}
95
96fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
97    let path = message_remote_path(root, message_id);
98    if !path.exists() {
99        return Ok(None);
100    }
101    let data = read_to_string(&path, "read message remote")?;
102    let remote: MessageRemoteFile =
103        serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
104    if remote.schema_name != "message_remote"
105        || remote.schema_version != 1
106        || remote.message_id != message_id
107    {
108        return Err(AppError::new(
109            "message_remote_invalid",
110            format!("invalid message remote sidecar: {}", rel_path(root, &path)),
111        ));
112    }
113    Ok(Some(remote))
114}
115
116fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
117    let path = message_remote_path(root, &remote.message_id);
118    if let Some(parent) = path.parent() {
119        create_dir_all(parent)?;
120    }
121    let mut normalized = remote.clone();
122    normalized.schema_name = "message_remote".to_string();
123    normalized.schema_version = 1;
124    write_json_pretty(&path, &normalized)
125}
126
127fn default_message_workspace() -> WorkspaceState {
128    WorkspaceState {
129        status: "triage".to_string(),
130        archive_uid: None,
131        archived_rfc3339: None,
132        origin: None,
133        remote_sync: None,
134        push: None,
135    }
136}
137
138pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
139    validate_id("message_id", message_id)?;
140    let message_dir = root.join(".afmail/messages");
141    for path in [
142        root.join("messages").join(format!("{message_id}.json")),
143        message_dir.join(format!("{message_id}.json")),
144        message_dir.join(format!("{message_id}.eml")),
145        message_dir.join(format!("{message_id}.state.json")),
146        message_dir.join(format!("{message_id}.remote.json")),
147        message_dir.join(format!("{message_id}.txt")),
148    ] {
149        if path.exists() {
150            remove_file(&path)?;
151        }
152    }
153    let files_dir = message_dir.join(format!("{message_id}.files"));
154    if files_dir.exists() {
155        remove_dir_all(&files_dir)?;
156    }
157    Ok(())
158}
159
160pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
161    attachments
162        .iter()
163        .map(|attachment| {
164            json!({
165                "part_id": attachment.part_id.as_str(),
166                "filename": attachment.filename.as_str(),
167                "saved_filename": saved_filename_for_attachment(attachment),
168                "content_type": attachment.content_type.as_str(),
169                "size_bytes": attachment.size_bytes,
170                "fetched": attachment.fetched,
171                "file_path": attachment.file_path.as_deref().unwrap_or(""),
172                "storage": if attachment.fetched { "message_cache" } else { "" },
173            })
174        })
175        .collect()
176}
177
178pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
179    let data = read_to_string(path, "read message json")?;
180    let message: MessageFile =
181        serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
182    if message.schema_name != "message" || message.schema_version != 1 {
183        return Err(AppError::new(
184            "message_cache_invalid",
185            format!("unsupported message cache schema: {}", path_to_string(path)),
186        ));
187    }
188    MessageStatus::parse(&message.workspace.status)?;
189    if let Some(direction) = message.direction.as_deref() {
190        MailDirection::parse(direction)?;
191    }
192    Ok(message)
193}
194
195pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
196    let normalized = value
197        .trim()
198        .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
199        .trim()
200        .to_ascii_lowercase();
201    if normalized.is_empty() {
202        None
203    } else {
204        Some(normalized)
205    }
206}
207
208pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
209    let mut ids = Vec::new();
210    let mut rest = value;
211    while let Some(start) = rest.find('<') {
212        let after_start = &rest[start + 1..];
213        let Some(end) = after_start.find('>') else {
214            break;
215        };
216        if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
217            ids.push(id);
218        }
219        rest = &after_start[end + 1..];
220    }
221    if ids.is_empty() {
222        ids.extend(
223            value
224                .split_whitespace()
225                .filter_map(normalize_rfc822_message_id),
226        );
227    }
228    ids.sort();
229    ids.dedup();
230    ids
231}
232
233pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
234    let mut ids = Vec::new();
235    if let Some(in_reply_to) = &message.in_reply_to {
236        ids.extend(rfc822_message_id_candidates(in_reply_to));
237    }
238    for reference in &message.references {
239        ids.extend(rfc822_message_id_candidates(reference));
240    }
241    ids.sort();
242    ids.dedup();
243    ids
244}
245
246impl Workspace {
247    pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
248        let path = message_remote_path(&self.root, &message.message_id);
249        if let Some(remote) = MessageRemoteFile::from_message(message) {
250            write_message_remote_file(&self.root, &remote)
251        } else if path.exists() {
252            remove_file(&path)
253        } else {
254            Ok(())
255        }
256    }
257
258    pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
259        let mut message = message.clone();
260        message.schema_name = "message".to_string();
261        message.schema_version = 1;
262        if message.eml_path.is_none() {
263            message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
264        }
265        let deprecated_state = message_state_path(&self.root, &message.message_id);
266        if deprecated_state.exists() {
267            remove_file(&deprecated_state)?;
268        }
269        let path = self.message_path(&message.message_id);
270        if let Some(parent) = path.parent() {
271            create_dir_all(parent)?;
272        }
273        write_json_pretty(&path, &message)
274    }
275
276    pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
277        self.write_message_materialized_cache(message)
278    }
279
280    pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
281        self.persist_message_remote(message)?;
282        self.write_message_materialized_cache(message)
283    }
284
285    fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
286        if !self.message_cache_needs_materialize(message_id)? {
287            return Ok(false);
288        }
289        self.materialize_message_cache(message_id)?;
290        Ok(true)
291    }
292
293    fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
294        validate_id("message_id", message_id)?;
295        let eml_path = message_eml_path(&self.root, message_id);
296        let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
297        let prior = read_message(&self.message_path(message_id)).ok();
298        let remote =
299            read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
300        let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
301        let mut parsed = crate::mail::parse_message_with_options(
302            message_id.to_string(),
303            &raw,
304            crate::mail::MessageParseOptions {
305                direction,
306                workspace: default_message_workspace(),
307                remote,
308                received_rfc3339: prior
309                    .as_ref()
310                    .and_then(|message| message.received_rfc3339.clone()),
311                sent_rfc3339: prior
312                    .as_ref()
313                    .and_then(|message| message.sent_rfc3339.clone()),
314                attachments: prior
315                    .as_ref()
316                    .map(|message| message.attachments.clone())
317                    .unwrap_or_default(),
318            },
319        )?;
320        self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
321        self.apply_materialized_workspace_overlays(&mut parsed.message)?;
322        let config = MailConfig::load(&self.root)?;
323        self.apply_identity_match(&mut parsed.message, &config)?;
324        let contact_map = self.contact_email_map()?;
325        self.apply_contact_link(&mut parsed.message, &contact_map);
326        self.write_message_materialized_cache(&parsed.message)?;
327        Ok(parsed.message)
328    }
329
330    fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
331        let cache_path = self.message_path(message_id);
332        if !cache_path.exists() {
333            return Ok(true);
334        }
335        let data = match read_to_string(&cache_path, "read message cache") {
336            Ok(data) => data,
337            Err(_) => return Ok(true),
338        };
339        let value: Value = match serde_json::from_str(&data) {
340            Ok(value) => value,
341            Err(_) => return Ok(true),
342        };
343        if value.get("schema_name").and_then(Value::as_str) != Some("message")
344            || value.get("schema_version").and_then(Value::as_u64) != Some(1)
345        {
346            return Ok(true);
347        }
348        let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
349            Ok(time) => time,
350            Err(_) => return Ok(true),
351        };
352        for input in [
353            message_eml_path(&self.root, message_id),
354            message_remote_path(&self.root, message_id),
355        ] {
356            if !input.exists() {
357                continue;
358            }
359            if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
360                if input_modified > cache_modified {
361                    return Ok(true);
362                }
363            }
364        }
365        Ok(false)
366    }
367
368    fn infer_materialized_direction(
369        &self,
370        raw: &[u8],
371        prior: Option<&MessageFile>,
372        remote: Option<&RemoteState>,
373    ) -> Option<String> {
374        if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
375            return Some(direction);
376        }
377        if let Some(remote) = remote {
378            if let Ok(config) = MailConfig::load(&self.root) {
379                for location in &remote.locations {
380                    if let Some(mailbox_id) = location.mailbox_id.as_deref() {
381                        if let Ok(action) = config.pull_action(mailbox_id) {
382                            return Some(action.direction.as_str().to_string());
383                        }
384                    }
385                }
386            }
387        }
388        let local_message_id = mail_parser::MessageParser::default()
389            .parse(raw)
390            .and_then(|message| message.message_id().map(ToString::to_string))
391            .is_some_and(|message_id| {
392                message_id
393                    .trim()
394                    .trim_matches(|ch| matches!(ch, '<' | '>'))
395                    .ends_with("@afmail.local")
396            });
397        Some(if local_message_id {
398            "outbound".to_string()
399        } else {
400            "inbound".to_string()
401        })
402    }
403
404    fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
405        let files_dir = self
406            .root
407            .join(".afmail/messages")
408            .join(format!("{message_id}.files"));
409        if !files_dir.is_dir() {
410            return;
411        }
412        for attachment in attachments {
413            if attachment.fetched
414                && attachment
415                    .file_path
416                    .as_deref()
417                    .is_some_and(|path| self.root.join(path).is_file())
418            {
419                continue;
420            }
421            let candidate = files_dir.join(safe_attachment_filename(
422                &attachment.filename,
423                &attachment.part_id,
424            ));
425            if candidate.is_file() {
426                attachment.fetched = true;
427                attachment.file_path = Some(rel_path(&self.root, &candidate));
428            }
429        }
430    }
431
432    pub(super) fn apply_materialized_workspace_overlays(
433        &self,
434        message: &mut MessageFile,
435    ) -> Result<()> {
436        message.workspace.archive_uid = None;
437        message.workspace.archived_rfc3339 = None;
438        let cases = CaseIndex::build(self)?;
439        if let Some((status, _added_rfc3339)) =
440            self.disposition_state_for_message(&message.message_id)?
441        {
442            message.workspace.status = status.as_str().to_string();
443            message.workspace.origin = None;
444        } else if let Some((archive_uid, archived_rfc3339)) = self
445            .direct_archive_state_by_message()?
446            .get(&message.message_id)
447        {
448            message.workspace.status = "archived".to_string();
449            message.workspace.archive_uid = Some(archive_uid.clone());
450            message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
451            message.workspace.origin = None;
452        } else if cases.has_any_reference(&message.message_id) {
453            message.workspace.status = MessageStatus::Case.as_str().to_string();
454        } else if message.workspace.origin.is_some() {
455            message.workspace.status = MessageStatus::Archived.as_str().to_string();
456        } else {
457            message.workspace.status = MessageStatus::Triage.as_str().to_string();
458        }
459        let last_completed_rfc3339 = message
460            .workspace
461            .push
462            .as_ref()
463            .and_then(|push| push.last_completed_rfc3339.clone());
464        message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
465        if let Some(last_completed_rfc3339) = last_completed_rfc3339 {
466            let push = message
467                .workspace
468                .push
469                .get_or_insert_with(WorkspacePushState::default);
470            if push.last_completed_rfc3339.is_none() {
471                push.last_completed_rfc3339 = Some(last_completed_rfc3339);
472            }
473        }
474        if message
475            .workspace
476            .push
477            .as_ref()
478            .is_some_and(|push| !push.pending.is_empty())
479            && MessageStatus::parse(&message.workspace.status)? == MessageStatus::Triage
480        {
481            message.workspace.status = MessageStatus::PushQueued.as_str().to_string();
482        }
483        Ok(())
484    }
485
486    /// Set `message.contact` from the sender address using a prebuilt
487    /// email→contact map. Called only from materialize/render paths so plain
488    /// `message show` reads the stored field without scanning contacts.
489    pub(super) fn apply_contact_link(
490        &self,
491        message: &mut MessageFile,
492        contact_map: &BTreeMap<String, MessageContact>,
493    ) {
494        message.contact = message
495            .from
496            .as_deref()
497            .and_then(|from| super::contacts::contact_for_from(contact_map, from));
498    }
499
500    pub(crate) fn apply_identity_match(
501        &self,
502        message: &mut MessageFile,
503        config: &MailConfig,
504    ) -> Result<()> {
505        if message
506            .direction
507            .as_deref()
508            .and_then(|direction| MailDirection::parse(direction).ok())
509            == Some(MailDirection::Outbound)
510        {
511            message.identity = None;
512            message.identity_email = None;
513            message.identity_match = None;
514            message.identity_candidates.clear();
515            message.observed_recipient_emails.clear();
516            return Ok(());
517        }
518        let matched = config.match_message_identity(&self.root, message)?;
519        message.identity = matched.identity;
520        message.identity_email = matched.identity_email;
521        message.identity_match = Some(matched.identity_match);
522        message.identity_candidates = matched.identity_candidates;
523        message.observed_recipient_emails = matched.observed_recipient_emails;
524        Ok(())
525    }
526
527    fn pending_push_state_for_message(
528        &self,
529        message_id: &str,
530    ) -> Result<Option<WorkspacePushState>> {
531        let mut state = WorkspacePushState::default();
532        for item in crate::push_queue::pending_items(&self.root)? {
533            if !item.message_ids().iter().any(|id| id == message_id) {
534                continue;
535            }
536            state.pending.push(WorkspacePendingPush {
537                push_id: item.push_id.clone(),
538                kind: item.display_kind(),
539                queued_rfc3339: item.created_rfc3339.clone(),
540                last_error: item.last_error.clone(),
541            });
542        }
543        state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
544        if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
545            Ok(None)
546        } else {
547            Ok(Some(state))
548        }
549    }
550
551    pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
552        let messages_dir = self.root.join(".afmail/messages");
553        if !messages_dir.exists() {
554            return Ok(MessageCacheRebuildTotals::default());
555        }
556        let mut totals = MessageCacheRebuildTotals::default();
557        let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
558            .into_iter()
559            .map(|entry| entry.path())
560            .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
561            .collect::<Vec<_>>();
562        eml_paths.sort();
563
564        for eml_path in eml_paths {
565            let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
566                continue;
567            };
568            validate_id("message_id", message_id)?;
569            if self.materialize_message_cache_if_needed(message_id)? {
570                totals.rebuilt_count += 1;
571            }
572        }
573
574        for entry in read_dir(&messages_dir, "read message cache")? {
575            let path = entry.path();
576            if path.extension().and_then(|s| s.to_str()) == Some("txt") {
577                remove_file(&path)?;
578                totals.removed_text_cache_count += 1;
579            }
580        }
581        Ok(totals)
582    }
583
584    fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
585        let mut out = BTreeMap::new();
586        for archive_uid in self.archive_message_category_ids()? {
587            let data = self.read_archive_messages(&archive_uid)?;
588            for item in data.items {
589                out.insert(item.message_id, (archive_uid.clone(), item.added_rfc3339));
590            }
591        }
592        Ok(out)
593    }
594
595    pub fn message_show(&self, message_id: &str) -> Result<Value> {
596        self.require_workspace()?;
597        validate_id("message_id", message_id)?;
598        let config = MailConfig::load(&self.root)?;
599        let message = self.read_message_by_id(message_id)?;
600        let body_text = self.message_body_text(&message)?;
601        let flags = message_remote_flags(&message);
602        let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
603        let flagged = flags
604            .iter()
605            .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
606        let push = message.workspace.push.clone();
607        let contact = message.contact.clone();
608        Ok(json!({
609            "code": "message_show",
610            "message_id": message.message_id.as_str(),
611            "from": message.from.as_deref().unwrap_or(""),
612            "to": &message.to,
613            "cc": &message.cc,
614            "bcc": &message.bcc,
615            "reply_to": &message.reply_to,
616            "sender": message.sender.as_deref().unwrap_or(""),
617            "subject": message.subject.as_deref().unwrap_or(""),
618            "direction": message.direction.as_deref().unwrap_or(""),
619            "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
620            "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
621            "body_text": body_text,
622            "attachment_count": message.attachments.len(),
623            "attachments": attachment_metadata_values(&message.attachments),
624            "mailbox_ids": message_mailbox_ids(&message, &config),
625            "flags": flags,
626            "unread": unread,
627            "flagged": flagged,
628            "remote_missing": message_remote_missing(&message),
629            "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
630            "remote_effect_pending": message_remote_effect_pending(&message),
631            "push": push,
632            "identity": message.identity.as_deref().unwrap_or(""),
633            "identity_email": message.identity_email.as_deref().unwrap_or(""),
634            "identity_match": message.identity_match.as_deref().unwrap_or(""),
635            "identity_candidates": &message.identity_candidates,
636            "observed_recipient_emails": &message.observed_recipient_emails,
637            "view_path": self.message_existing_view_path(&message)?,
638            "json_path": format!("messages/{message_id}.json"),
639            "message": message_template_value(&message)?,
640            "contact": contact,
641        }))
642    }
643
644    pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
645        self.require_workspace()?;
646        let reason = self.checked_reason(reason)?;
647        let config = crate::config::MailConfig::load(&self.root)?;
648        validate_id("message_id", message_id)?;
649        self.ensure_no_related_conversation(message_id)?;
650        let message_ids = vec![message_id.to_string()];
651        self.ensure_message_ids_unreferenced(&message_ids, None)?;
652        let locations = self.message_remote_locations(&message_ids)?;
653        let transaction = self.begin_transaction(
654            "message_spam",
655            vec![
656                format!("messages/{message_id}.json"),
657                ".afmail/push".to_string(),
658            ],
659        )?;
660        let item = crate::push_queue::queue_action_steps(
661            &self.root,
662            "message.spam",
663            &message_ids,
664            &locations,
665            &config.actions.message_spam.steps,
666            None,
667        )?;
668        self.update_messages_workspace(&message_ids, "spam")?;
669        if let Some(item) = &item {
670            self.record_pending_push_item(item)?;
671        }
672        self.refresh_disposition_views()?;
673        transaction.commit()?;
674        self.append_audit_event(
675            "message_spam_marked",
676            vec![audit_target("message", message_id)],
677            reason,
678            json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
679        )?;
680        serde_json::to_value(MessageDispositionResult {
681            code: "message_spam_marked",
682            message_id: message_id.to_string(),
683            special_use: SpecialUseKind::Junk.as_str().to_string(),
684            message_ids,
685            location_count: locations.len(),
686            queued_location_count: locations.len(),
687            queued: item.is_some(),
688            push_id: item.as_ref().map(|item| item.push_id.clone()),
689        })
690        .map_err(|e| AppError::json("serialize message disposition result", &e))
691    }
692
693    pub fn restore_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
694        self.require_workspace()?;
695        let reason = self.checked_reason(reason)?;
696        validate_id("message_id", message_id)?;
697        let message = self.read_message_by_id(message_id)?;
698        match message.workspace.status.as_str() {
699            "spam" => self.restore_local_message_disposition(
700                message_id,
701                "spam",
702                "message.spam",
703                "message_restored",
704                "message_restored",
705                reason,
706            ),
707            "trashed" => self.restore_local_message_disposition(
708                message_id,
709                "trashed",
710                "message.trash",
711                "message_restored",
712                "message_restored",
713                reason,
714            ),
715            "archived" => {
716                let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
717                    AppError::new(
718                        "invalid_request",
719                        format!("message {message_id} is not directly archived"),
720                    )
721                })?;
722                self.restore_direct_archive_message(
723                    &archive_uid,
724                    message_id,
725                    reason,
726                    "message_restored",
727                    "message_restored",
728                )
729            }
730            other => Err(AppError::new(
731                "invalid_request",
732                format!("message {message_id} has no disposition to restore (status: {other})"),
733            )),
734        }
735    }
736
737    pub fn archive_message(
738        &self,
739        message_id: &str,
740        archive_ref: &str,
741        summary: Option<&str>,
742        reason: Option<&str>,
743    ) -> Result<Value> {
744        self.require_workspace()?;
745        let reason = self.checked_reason(reason)?;
746        validate_id("message_id", message_id)?;
747        let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
748        self.ensure_no_related_conversation(message_id)?;
749        let transaction = self.begin_transaction(
750            "message_archive",
751            vec![
752                format!("messages/{message_id}.json"),
753                format!("archive/notifications/{archive_uid}"),
754                ".afmail/push".to_string(),
755            ],
756        )?;
757        let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
758        self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
759        self.refresh_archive_message_category(&archive_uid)?;
760        let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
761        transaction.commit()?;
762        self.append_audit_event(
763            "message_archived",
764            vec![
765                audit_target("message", message_id),
766                audit_target("archive", &archive_uid),
767            ],
768            reason,
769            json!({
770                "message_id": message_id,
771                "archive_uid": archive_uid,
772                "summary": summary,
773                "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
774            }),
775        )?;
776        serde_json::to_value(MessageArchiveResult {
777            code: "message_archived",
778            message_id: message_id.to_string(),
779            archive_uid,
780            path: rel_path(&self.root, &archive_dir),
781            special_use: SpecialUseKind::Archive.as_str().to_string(),
782            eligible_message_ids: queue.eligible_message_ids,
783            location_count: queue.location_count,
784            queued_location_count: queue.queued_location_count,
785            queued: !queue.items.is_empty(),
786            push_ids: queue
787                .items
788                .iter()
789                .map(|item| item.push_id.clone())
790                .collect::<Vec<_>>(),
791            push_id: queue.items.first().map(|item| item.push_id.clone()),
792        })
793        .map_err(|e| AppError::json("serialize message archive result", &e))
794    }
795
796    pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
797        self.require_workspace()?;
798        let reason = self.checked_reason(reason)?;
799        let config = crate::config::MailConfig::load(&self.root)?;
800        validate_id("message_id", message_id)?;
801        self.ensure_no_related_conversation(message_id)?;
802        let message_ids = vec![message_id.to_string()];
803        self.ensure_message_ids_unreferenced(&message_ids, None)?;
804        let locations = self.message_remote_locations(&message_ids)?;
805        let transaction = self.begin_transaction(
806            "message_trash",
807            vec![
808                format!("messages/{message_id}.json"),
809                ".afmail/push".to_string(),
810            ],
811        )?;
812        let item = crate::push_queue::queue_action_steps(
813            &self.root,
814            "message.trash",
815            &message_ids,
816            &locations,
817            &config.actions.message_trash.steps,
818            None,
819        )?;
820        self.update_messages_workspace(&message_ids, "trashed")?;
821        if let Some(item) = &item {
822            self.record_pending_push_item(item)?;
823        }
824        self.refresh_disposition_views()?;
825        transaction.commit()?;
826        self.append_audit_event(
827            "message_trashed",
828            vec![audit_target("message", message_id)],
829            reason,
830            json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
831        )?;
832        serde_json::to_value(MessageDispositionResult {
833            code: "message_trashed",
834            message_id: message_id.to_string(),
835            special_use: SpecialUseKind::Trash.as_str().to_string(),
836            message_ids,
837            location_count: locations.len(),
838            queued_location_count: locations.len(),
839            queued: item.is_some(),
840            push_id: item.as_ref().map(|item| item.push_id.clone()),
841        })
842        .map_err(|e| AppError::json("serialize message disposition result", &e))
843    }
844
845    pub(super) fn message_existing_view_path(
846        &self,
847        message: &MessageFile,
848    ) -> Result<Option<String>> {
849        let message_id = message.message_id.as_str();
850        let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
851        if triage_path.is_file() {
852            return Ok(Some(rel_path(&self.root, &triage_path)));
853        }
854        for dir in ["spam", "trash", "deleted"] {
855            let path = self.root.join(dir).join(format!("{message_id}.md"));
856            if path.is_file() {
857                return Ok(Some(rel_path(&self.root, &path)));
858            }
859        }
860        for (case_uid, case_path) in self.all_case_entries()? {
861            let case_messages = read_case_messages(&case_path, &case_uid);
862            if case_messages
863                .as_ref()
864                .map(|messages| messages.contains_message(message_id))
865                .unwrap_or(false)
866            {
867                let message_view = case_message_view_path(&case_path, message_id);
868                if message_view.is_file() {
869                    return Ok(Some(rel_path(&self.root, &message_view)));
870                }
871                let case_view = case_path.join("case.md");
872                if case_view.is_file() {
873                    return Ok(Some(rel_path(&self.root, &case_view)));
874                }
875            }
876        }
877        if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
878            let archive_view = self.archive_message_view_path(archive_uid, message_id);
879            if archive_view.is_file() {
880                return Ok(Some(rel_path(&self.root, &archive_view)));
881            }
882        }
883        Ok(None)
884    }
885
886    pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
887        Ok(message.body_text.clone())
888    }
889
890    pub(super) fn message_conversation_for_dir(
891        &self,
892        message: &MessageFile,
893        output_dir: Option<&Path>,
894    ) -> Result<String> {
895        let config = MailConfig::load(&self.root)?;
896        let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
897        self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
898    }
899
900    pub(super) fn message_conversation_with_renderer(
901        &self,
902        message: &MessageFile,
903        config: &MailConfig,
904        renderer: &mut MarkdownTemplateRenderer<'_>,
905        output_dir: Option<&Path>,
906    ) -> Result<String> {
907        let body_text = self.message_body_text(message)?;
908        renderer.render(
909            TemplateKey::MessageSection,
910            &message_section_context(
911                Some(&self.root),
912                message,
913                &body_text,
914                config.template_language(),
915                config
916                    .default_identity()
917                    .ok()
918                    .map(|identity| identity.email.as_str()),
919                output_dir,
920            )?,
921        )
922    }
923
924    pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
925        self.rebuild_message_cache_from_eml()?;
926        let mut index = BTreeMap::new();
927        for path in message_json_paths(&self.root)? {
928            let message = read_message(&path)?;
929            if let Some(normalized) = message
930                .rfc822_message_id
931                .as_deref()
932                .and_then(normalize_rfc822_message_id)
933            {
934                index
935                    .entry(normalized)
936                    .or_insert_with(|| message.message_id.clone());
937            }
938        }
939        Ok(index)
940    }
941
942    pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
943        validate_id("message_id", message_id)?;
944        let current = self.read_message_by_id(message_id)?;
945        let rfc822_index = self.rfc822_message_id_index()?;
946        let mut related = BTreeSet::new();
947
948        for header_id in message_reply_header_ids(&current) {
949            if let Some(local_message_id) = rfc822_index.get(&header_id) {
950                if local_message_id != message_id {
951                    related.insert(local_message_id.clone());
952                }
953            }
954        }
955
956        let Some(current_rfc822_id) = current
957            .rfc822_message_id
958            .as_deref()
959            .and_then(normalize_rfc822_message_id)
960        else {
961            return Ok(related.into_iter().collect());
962        };
963
964        for path in message_json_paths(&self.root)? {
965            let other = read_message(&path)?;
966            if other.message_id == message_id {
967                continue;
968            }
969            if message_reply_header_ids(&other)
970                .iter()
971                .any(|header_id| header_id == &current_rfc822_id)
972            {
973                related.insert(other.message_id);
974            }
975        }
976
977        Ok(related.into_iter().collect())
978    }
979
980    pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
981        let related_message_ids = self.related_message_ids(message_id)?;
982        if related_message_ids.is_empty() {
983            return Ok(());
984        }
985        let mut suggested_commands = vec![format!(
986            "afmail case create --name NAME --message {message_id} --reason TEXT"
987        )];
988        for related_id in &related_message_ids {
989            suggested_commands.push(format!(
990                "afmail case add CASE_REF {related_id} --reason TEXT"
991            ));
992        }
993        suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
994        Err(AppError::new(
995            "message_has_related_conversation_use_case",
996            "message has RFC-header-confirmed related conversation",
997        )
998        .with_hint(
999            "Create a case for the conversation, add the related messages, then archive the case.",
1000        )
1001        .with_details(json!({
1002            "message_id": message_id,
1003            "related_message_ids": related_message_ids,
1004            "suggested_commands": suggested_commands
1005        })))
1006    }
1007
1008    pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
1009        for message_id in message_ids {
1010            self.refresh_message_after_ref_change(message_id)?;
1011        }
1012        Ok(())
1013    }
1014
1015    pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
1016        validate_id("message_id", message_id)?;
1017        let message = self.read_message_by_id(message_id)?;
1018        let cases = CaseIndex::build(self)?;
1019        if self.triage_candidate(&message, &cases)? {
1020            self.write_triage_view(&message)?;
1021        } else {
1022            self.remove_triage_view_for_message(message_id)?;
1023        }
1024        self.refresh_all_case_message_views()?;
1025        self.refresh_archive_indexes()
1026    }
1027
1028    pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
1029        validate_id("message_id", message_id)?;
1030        let mut msg = self.read_message_by_id(message_id)?;
1031        let cases = CaseIndex::build(self)?;
1032        self.apply_materialized_workspace_overlays(&mut msg)?;
1033        msg.workspace.remote_sync = None;
1034        self.write_message_materialized_cache(&msg)?;
1035        if self.triage_candidate(&msg, &cases)? {
1036            self.write_triage_view(&msg)?;
1037        } else {
1038            self.remove_triage_view_for_message(message_id)?;
1039        }
1040        Ok(())
1041    }
1042
1043    pub(super) fn update_messages_workspace(
1044        &self,
1045        message_ids: &[String],
1046        status: &str,
1047    ) -> Result<()> {
1048        let status = MessageStatus::parse(status)?;
1049        for message_id in message_ids {
1050            validate_id("message_id", message_id)?;
1051            if disposition_spec_for_status(status).is_some() {
1052                self.set_message_disposition(status, message_id, None, &now_rfc3339())?;
1053            } else {
1054                self.clear_message_from_all_dispositions(message_id)?;
1055            }
1056            let mut msg = self.read_message_by_id(message_id)?;
1057            msg.workspace.status = status.as_str().to_string();
1058            if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1059                msg.workspace.archive_uid = None;
1060                msg.workspace.archived_rfc3339 = None;
1061                msg.workspace.origin = None;
1062            }
1063            msg.workspace.remote_sync = None;
1064            self.write_message_materialized_cache(&msg)?;
1065            self.remove_triage_view_for_message(message_id)?;
1066        }
1067        Ok(())
1068    }
1069
1070    pub(super) fn restore_local_message_disposition(
1071        &self,
1072        message_id: &str,
1073        expected_status: &str,
1074        push_kind: &str,
1075        event_kind: &str,
1076        result_code: &str,
1077        reason: Option<&str>,
1078    ) -> Result<Value> {
1079        self.require_workspace()?;
1080        let reason = self.checked_reason(reason)?;
1081        validate_id("message_id", message_id)?;
1082        let mut message = self.read_message_by_id(message_id)?;
1083        if message.workspace.status != expected_status {
1084            return Err(AppError::new(
1085                "invalid_request",
1086                format!("message {message_id} is not {expected_status}"),
1087            ));
1088        }
1089        let removed_push =
1090            crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1091        let push_ids = removed_push
1092            .iter()
1093            .map(|item| item.push_id.clone())
1094            .collect::<Vec<_>>();
1095        self.clear_message_disposition(MessageStatus::parse(expected_status)?, message_id)?;
1096        message.workspace.status = "triage".to_string();
1097        message.workspace.remote_sync = None;
1098        self.write_message_materialized_cache(&message)?;
1099        self.refresh_message_after_ref_change(message_id)?;
1100        self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1101        self.refresh_disposition_views()?;
1102        self.append_audit_event(
1103            event_kind,
1104            vec![audit_target("message", message_id)],
1105            reason,
1106            json!({
1107                "message_id": message_id,
1108                "from_status": expected_status,
1109                "to_status": "triage",
1110                "removed_push_ids": push_ids.clone(),
1111            }),
1112        )?;
1113        Ok(json!({
1114            "code": result_code,
1115            "message_id": message_id,
1116            "from_status": expected_status,
1117            "status": "triage",
1118            "triage_path": format!("triage/{message_id}.md"),
1119            "removed_push_count": push_ids.len(),
1120            "push_ids": push_ids,
1121        }))
1122    }
1123
1124    pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1125        let path = self.root.join("triage").join(format!("{message_id}.md"));
1126        if path.exists() {
1127            remove_file(&path)?;
1128        }
1129        Ok(())
1130    }
1131
1132    pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1133        validate_id("message_id", message_id)?;
1134        if message_eml_path(&self.root, message_id).is_file() {
1135            self.materialize_message_cache_if_needed(message_id)?;
1136        }
1137        let path = self.message_path(message_id);
1138        let mut message = read_message(&path)?;
1139        let original_workspace = message.workspace.clone();
1140        self.apply_materialized_workspace_overlays(&mut message)?;
1141        if message.workspace != original_workspace {
1142            self.write_message_materialized_cache(&message)?;
1143        }
1144        Ok(message)
1145    }
1146
1147    pub(crate) fn relocate_message(
1148        &self,
1149        message_id: &str,
1150        target_locations: &[crate::types::RemoteLocation],
1151    ) -> Result<()> {
1152        validate_id("message_id", message_id)?;
1153        let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1154        for location in target_locations {
1155            if !locations.iter().any(|existing| {
1156                existing.mailbox_name == location.mailbox_name
1157                    && existing.uid_validity == location.uid_validity
1158                    && existing.uid == location.uid
1159            }) {
1160                locations.push(location.clone());
1161            }
1162        }
1163        if locations.is_empty() {
1164            return Ok(());
1165        };
1166        let mut message = self.read_message_by_id(message_id)?;
1167        message.remote = Some(crate::types::RemoteState { locations });
1168        self.persist_message_remote(&message)?;
1169        self.write_message_materialized_cache(&message)?;
1170        Ok(())
1171    }
1172}