Skip to main content

agent_first_mail/store/
messages.rs

1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4    let dir = root.join("messages");
5    if !dir.exists() {
6        return Ok(Vec::new());
7    }
8    let mut paths = read_dir(&dir, "read messages")?
9        .into_iter()
10        .map(|entry| entry.path())
11        .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12        .collect::<Vec<_>>();
13    paths.sort();
14    Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageStateFile {
20    schema_name: String,
21    schema_version: u64,
22    message_id: String,
23    status: String,
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    archive_uid: Option<String>,
26    #[serde(default, skip_serializing_if = "Option::is_none")]
27    archived_rfc3339: Option<String>,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    origin: Option<String>,
30    updated_rfc3339: String,
31}
32
33#[derive(Clone, Debug, Deserialize, Serialize)]
34#[serde(deny_unknown_fields)]
35struct MessageRemoteFile {
36    schema_name: String,
37    schema_version: u64,
38    message_id: String,
39    #[serde(default, skip_serializing_if = "Vec::is_empty")]
40    locations: Vec<RemoteLocation>,
41}
42
43#[derive(Debug, Serialize)]
44struct MessageDispositionResult {
45    code: &'static str,
46    message_id: String,
47    special_use: String,
48    message_ids: Vec<String>,
49    location_count: usize,
50    queued_location_count: usize,
51    queued: bool,
52    push_id: Option<String>,
53}
54
55#[derive(Debug, Serialize)]
56struct MessageArchiveResult {
57    code: &'static str,
58    message_id: String,
59    archive_uid: String,
60    path: String,
61    special_use: String,
62    eligible_message_ids: Vec<String>,
63    location_count: usize,
64    queued_location_count: usize,
65    queued: bool,
66    push_ids: Vec<String>,
67    push_id: Option<String>,
68}
69
70#[derive(Clone, Debug, Default)]
71pub(super) struct MessageCacheRebuildTotals {
72    pub(super) rebuilt_count: usize,
73    pub(super) removed_text_cache_count: usize,
74}
75
76impl MessageStateFile {
77    fn from_message(message: &MessageFile) -> Self {
78        Self {
79            schema_name: "message_state".to_string(),
80            schema_version: 1,
81            message_id: message.message_id.clone(),
82            status: message.workspace.status.clone(),
83            archive_uid: message.workspace.archive_uid.clone(),
84            archived_rfc3339: message.workspace.archived_rfc3339.clone(),
85            origin: message.workspace.origin.clone(),
86            updated_rfc3339: now_rfc3339(),
87        }
88    }
89
90    fn workspace(&self) -> Result<WorkspaceState> {
91        let status = MessageStatus::parse(&self.status)?.as_str().to_string();
92        Ok(WorkspaceState {
93            status,
94            archive_uid: self.archive_uid.clone(),
95            archived_rfc3339: self.archived_rfc3339.clone(),
96            origin: self.origin.clone(),
97            remote_sync: None,
98            push: None,
99        })
100    }
101}
102
103impl MessageRemoteFile {
104    fn from_message(message: &MessageFile) -> Option<Self> {
105        let remote = message.remote.as_ref()?;
106        if remote.locations.is_empty() {
107            return None;
108        }
109        Some(Self {
110            schema_name: "message_remote".to_string(),
111            schema_version: 1,
112            message_id: message.message_id.clone(),
113            locations: remote.locations.clone(),
114        })
115    }
116
117    fn remote_state(&self) -> RemoteState {
118        RemoteState {
119            locations: self.locations.clone(),
120        }
121    }
122}
123
124fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
125    root.join(".afmail/messages")
126        .join(format!("{message_id}.eml"))
127}
128
129fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
130    root.join(".afmail/messages")
131        .join(format!("{message_id}.state.json"))
132}
133
134fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
135    root.join(".afmail/messages")
136        .join(format!("{message_id}.remote.json"))
137}
138
139pub(super) fn message_state_updated_rfc3339(
140    root: &Path,
141    message_id: &str,
142) -> Result<Option<String>> {
143    Ok(read_message_state_file(root, message_id)?.map(|state| state.updated_rfc3339))
144}
145
146fn read_message_state_file(root: &Path, message_id: &str) -> Result<Option<MessageStateFile>> {
147    let path = message_state_path(root, message_id);
148    if !path.exists() {
149        return Ok(None);
150    }
151    let data = read_to_string(&path, "read message state")?;
152    let state: MessageStateFile =
153        serde_json::from_str(&data).map_err(|e| AppError::json("parse message state", &e))?;
154    if state.schema_name != "message_state"
155        || state.schema_version != 1
156        || state.message_id != message_id
157    {
158        return Err(AppError::new(
159            "message_state_invalid",
160            format!("invalid message state sidecar: {}", rel_path(root, &path)),
161        ));
162    }
163    MessageStatus::parse(&state.status)?;
164    Ok(Some(state))
165}
166
167fn write_message_state_file(root: &Path, state: &MessageStateFile) -> Result<()> {
168    let path = message_state_path(root, &state.message_id);
169    if let Some(parent) = path.parent() {
170        create_dir_all(parent)?;
171    }
172    let mut normalized = state.clone();
173    normalized.schema_name = "message_state".to_string();
174    normalized.schema_version = 1;
175    write_json_pretty(&path, &normalized)
176}
177
178fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
179    let path = message_remote_path(root, message_id);
180    if !path.exists() {
181        return Ok(None);
182    }
183    let data = read_to_string(&path, "read message remote")?;
184    let remote: MessageRemoteFile =
185        serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
186    if remote.schema_name != "message_remote"
187        || remote.schema_version != 1
188        || remote.message_id != message_id
189    {
190        return Err(AppError::new(
191            "message_remote_invalid",
192            format!("invalid message remote sidecar: {}", rel_path(root, &path)),
193        ));
194    }
195    Ok(Some(remote))
196}
197
198fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
199    let path = message_remote_path(root, &remote.message_id);
200    if let Some(parent) = path.parent() {
201        create_dir_all(parent)?;
202    }
203    let mut normalized = remote.clone();
204    normalized.schema_name = "message_remote".to_string();
205    normalized.schema_version = 1;
206    write_json_pretty(&path, &normalized)
207}
208
209fn default_message_workspace() -> WorkspaceState {
210    WorkspaceState {
211        status: "triage".to_string(),
212        archive_uid: None,
213        archived_rfc3339: None,
214        origin: None,
215        remote_sync: None,
216        push: None,
217    }
218}
219
220pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
221    validate_id("message_id", message_id)?;
222    let message_dir = root.join(".afmail/messages");
223    for path in [
224        root.join("messages").join(format!("{message_id}.json")),
225        message_dir.join(format!("{message_id}.json")),
226        message_dir.join(format!("{message_id}.eml")),
227        message_dir.join(format!("{message_id}.state.json")),
228        message_dir.join(format!("{message_id}.remote.json")),
229        message_dir.join(format!("{message_id}.txt")),
230    ] {
231        if path.exists() {
232            remove_file(&path)?;
233        }
234    }
235    let files_dir = message_dir.join(format!("{message_id}.files"));
236    if files_dir.exists() {
237        remove_dir_all(&files_dir)?;
238    }
239    Ok(())
240}
241
242pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
243    attachments
244        .iter()
245        .map(|attachment| {
246            json!({
247                "part_id": attachment.part_id.as_str(),
248                "filename": attachment.filename.as_str(),
249                "saved_filename": saved_filename_for_attachment(attachment),
250                "content_type": attachment.content_type.as_str(),
251                "size_bytes": attachment.size_bytes,
252                "fetched": attachment.fetched,
253                "file_path": attachment.file_path.as_deref().unwrap_or(""),
254                "storage": if attachment.fetched { "message_cache" } else { "" },
255            })
256        })
257        .collect()
258}
259
260pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
261    let data = read_to_string(path, "read message json")?;
262    let message: MessageFile =
263        serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
264    if message.schema_name != "message" || message.schema_version != 1 {
265        return Err(AppError::new(
266            "message_cache_invalid",
267            format!("unsupported message cache schema: {}", path_to_string(path)),
268        ));
269    }
270    MessageStatus::parse(&message.workspace.status)?;
271    if let Some(direction) = message.direction.as_deref() {
272        MailDirection::parse(direction)?;
273    }
274    Ok(message)
275}
276
277pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
278    let normalized = value
279        .trim()
280        .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
281        .trim()
282        .to_ascii_lowercase();
283    if normalized.is_empty() {
284        None
285    } else {
286        Some(normalized)
287    }
288}
289
290pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
291    let mut ids = Vec::new();
292    let mut rest = value;
293    while let Some(start) = rest.find('<') {
294        let after_start = &rest[start + 1..];
295        let Some(end) = after_start.find('>') else {
296            break;
297        };
298        if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
299            ids.push(id);
300        }
301        rest = &after_start[end + 1..];
302    }
303    if ids.is_empty() {
304        ids.extend(
305            value
306                .split_whitespace()
307                .filter_map(normalize_rfc822_message_id),
308        );
309    }
310    ids.sort();
311    ids.dedup();
312    ids
313}
314
315pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
316    let mut ids = Vec::new();
317    if let Some(in_reply_to) = &message.in_reply_to {
318        ids.extend(rfc822_message_id_candidates(in_reply_to));
319    }
320    for reference in &message.references {
321        ids.extend(rfc822_message_id_candidates(reference));
322    }
323    ids.sort();
324    ids.dedup();
325    ids
326}
327
328impl Workspace {
329    pub(crate) fn persist_message_state(&self, message: &MessageFile) -> Result<()> {
330        let state = MessageStateFile::from_message(message);
331        write_message_state_file(&self.root, &state)
332    }
333
334    pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
335        let path = message_remote_path(&self.root, &message.message_id);
336        if let Some(remote) = MessageRemoteFile::from_message(message) {
337            write_message_remote_file(&self.root, &remote)
338        } else if path.exists() {
339            remove_file(&path)
340        } else {
341            Ok(())
342        }
343    }
344
345    pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
346        let mut message = message.clone();
347        message.schema_name = "message".to_string();
348        message.schema_version = 1;
349        if message.eml_path.is_none() {
350            message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
351        }
352        let path = self.message_path(&message.message_id);
353        if let Some(parent) = path.parent() {
354            create_dir_all(parent)?;
355        }
356        write_json_pretty(&path, &message)
357    }
358
359    pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
360        self.persist_message_state(message)?;
361        self.write_message_materialized_cache(message)
362    }
363
364    pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
365        self.persist_message_state(message)?;
366        self.persist_message_remote(message)?;
367        self.write_message_materialized_cache(message)
368    }
369
370    fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
371        if !self.message_cache_needs_materialize(message_id)? {
372            return Ok(false);
373        }
374        self.materialize_message_cache(message_id)?;
375        Ok(true)
376    }
377
378    fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
379        validate_id("message_id", message_id)?;
380        let eml_path = message_eml_path(&self.root, message_id);
381        let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
382        let prior = read_message(&self.message_path(message_id)).ok();
383        let state = read_message_state_file(&self.root, message_id)?;
384        let remote =
385            read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
386        let workspace = state
387            .as_ref()
388            .map(MessageStateFile::workspace)
389            .transpose()?
390            .unwrap_or_else(default_message_workspace);
391        let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
392        let mut parsed = crate::mail::parse_message_with_options(
393            message_id.to_string(),
394            &raw,
395            crate::mail::MessageParseOptions {
396                direction,
397                workspace,
398                remote,
399                received_rfc3339: prior
400                    .as_ref()
401                    .and_then(|message| message.received_rfc3339.clone()),
402                sent_rfc3339: prior
403                    .as_ref()
404                    .and_then(|message| message.sent_rfc3339.clone()),
405                attachments: prior
406                    .as_ref()
407                    .map(|message| message.attachments.clone())
408                    .unwrap_or_default(),
409            },
410        )?;
411        self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
412        self.apply_materialized_workspace_overlays(&mut parsed.message)?;
413        self.write_message_materialized_cache(&parsed.message)?;
414        Ok(parsed.message)
415    }
416
417    fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
418        let cache_path = self.message_path(message_id);
419        if !cache_path.exists() {
420            return Ok(true);
421        }
422        let data = match read_to_string(&cache_path, "read message cache") {
423            Ok(data) => data,
424            Err(_) => return Ok(true),
425        };
426        let value: Value = match serde_json::from_str(&data) {
427            Ok(value) => value,
428            Err(_) => return Ok(true),
429        };
430        if value.get("schema_name").and_then(Value::as_str) != Some("message")
431            || value.get("schema_version").and_then(Value::as_u64) != Some(1)
432        {
433            return Ok(true);
434        }
435        let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
436            Ok(time) => time,
437            Err(_) => return Ok(true),
438        };
439        for input in [
440            message_eml_path(&self.root, message_id),
441            message_state_path(&self.root, message_id),
442            message_remote_path(&self.root, message_id),
443        ] {
444            if !input.exists() {
445                continue;
446            }
447            if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
448                if input_modified > cache_modified {
449                    return Ok(true);
450                }
451            }
452        }
453        Ok(false)
454    }
455
456    fn infer_materialized_direction(
457        &self,
458        raw: &[u8],
459        prior: Option<&MessageFile>,
460        remote: Option<&RemoteState>,
461    ) -> Option<String> {
462        if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
463            return Some(direction);
464        }
465        if let Some(remote) = remote {
466            if let Ok(config) = MailConfig::load(&self.root) {
467                for location in &remote.locations {
468                    if let Some(mailbox_id) = location.mailbox_id.as_deref() {
469                        if let Ok(action) = config.pull_action(mailbox_id) {
470                            return Some(action.direction.as_str().to_string());
471                        }
472                    }
473                }
474            }
475        }
476        let local_message_id = mail_parser::MessageParser::default()
477            .parse(raw)
478            .and_then(|message| message.message_id().map(ToString::to_string))
479            .is_some_and(|message_id| {
480                message_id
481                    .trim()
482                    .trim_matches(|ch| matches!(ch, '<' | '>'))
483                    .ends_with("@afmail.local")
484            });
485        Some(if local_message_id {
486            "outbound".to_string()
487        } else {
488            "inbound".to_string()
489        })
490    }
491
492    fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
493        let files_dir = self
494            .root
495            .join(".afmail/messages")
496            .join(format!("{message_id}.files"));
497        if !files_dir.is_dir() {
498            return;
499        }
500        for attachment in attachments {
501            if attachment.fetched
502                && attachment
503                    .file_path
504                    .as_deref()
505                    .is_some_and(|path| self.root.join(path).is_file())
506            {
507                continue;
508            }
509            let candidate = files_dir.join(safe_attachment_filename(
510                &attachment.filename,
511                &attachment.part_id,
512            ));
513            if candidate.is_file() {
514                attachment.fetched = true;
515                attachment.file_path = Some(rel_path(&self.root, &candidate));
516            }
517        }
518    }
519
520    fn apply_materialized_workspace_overlays(&self, message: &mut MessageFile) -> Result<()> {
521        if let Some((archive_uid, archived_rfc3339)) = self
522            .direct_archive_state_by_message()?
523            .get(&message.message_id)
524        {
525            message.workspace.status = "archived".to_string();
526            message.workspace.archive_uid = Some(archive_uid.clone());
527            message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
528            message.workspace.origin = None;
529        }
530        let cases = CaseIndex::build(self)?;
531        message.workspace.status = self.derived_message_status(message, &cases)?;
532        message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
533        Ok(())
534    }
535
536    fn pending_push_state_for_message(
537        &self,
538        message_id: &str,
539    ) -> Result<Option<WorkspacePushState>> {
540        let mut state = WorkspacePushState::default();
541        for item in crate::push_queue::pending_items(&self.root)? {
542            if !item.message_ids().iter().any(|id| id == message_id) {
543                continue;
544            }
545            state.pending.push(WorkspacePendingPush {
546                push_id: item.push_id.clone(),
547                kind: item.display_kind(),
548                queued_rfc3339: item.created_rfc3339.clone(),
549                last_error: item.last_error.clone(),
550            });
551        }
552        state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
553        if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
554            Ok(None)
555        } else {
556            Ok(Some(state))
557        }
558    }
559
560    pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
561        let messages_dir = self.root.join(".afmail/messages");
562        if !messages_dir.exists() {
563            return Ok(MessageCacheRebuildTotals::default());
564        }
565        let mut totals = MessageCacheRebuildTotals::default();
566        let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
567            .into_iter()
568            .map(|entry| entry.path())
569            .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
570            .collect::<Vec<_>>();
571        eml_paths.sort();
572
573        for eml_path in eml_paths {
574            let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
575                continue;
576            };
577            validate_id("message_id", message_id)?;
578            if self.materialize_message_cache_if_needed(message_id)? {
579                totals.rebuilt_count += 1;
580            }
581        }
582
583        for entry in read_dir(&messages_dir, "read message cache")? {
584            let path = entry.path();
585            if path.extension().and_then(|s| s.to_str()) == Some("txt") {
586                remove_file(&path)?;
587                totals.removed_text_cache_count += 1;
588            }
589        }
590        Ok(totals)
591    }
592
593    fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
594        let mut out = BTreeMap::new();
595        for archive_uid in self.archive_message_category_ids()? {
596            let data = self.read_archive_messages(&archive_uid)?;
597            for item in data.items {
598                out.insert(
599                    item.message_id,
600                    (archive_uid.clone(), item.archived_rfc3339),
601                );
602            }
603        }
604        Ok(out)
605    }
606
607    pub fn message_show(&self, message_id: &str) -> Result<Value> {
608        self.require_workspace()?;
609        validate_id("message_id", message_id)?;
610        let config = MailConfig::load(&self.root)?;
611        let message = self.read_message_by_id(message_id)?;
612        let body_text = self.message_body_text(&message)?;
613        let flags = message_remote_flags(&message);
614        let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
615        let flagged = flags
616            .iter()
617            .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
618        let push = message.workspace.push.clone();
619        Ok(json!({
620            "code": "message_show",
621            "message_id": message.message_id.as_str(),
622            "from": message.from.as_deref().unwrap_or(""),
623            "to": &message.to,
624            "cc": &message.cc,
625            "bcc": &message.bcc,
626            "reply_to": &message.reply_to,
627            "sender": message.sender.as_deref().unwrap_or(""),
628            "subject": message.subject.as_deref().unwrap_or(""),
629            "direction": message.direction.as_deref().unwrap_or(""),
630            "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
631            "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
632            "body_text": body_text,
633            "attachment_count": message.attachments.len(),
634            "attachments": attachment_metadata_values(&message.attachments),
635            "mailbox_ids": message_mailbox_ids(&message, &config),
636            "flags": flags,
637            "unread": unread,
638            "flagged": flagged,
639            "remote_missing": message_remote_missing(&message),
640            "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
641            "remote_effect_pending": message_remote_effect_pending(&message),
642            "push": push,
643            "view_path": self.message_existing_view_path(&message)?,
644            "json_path": format!("messages/{message_id}.json"),
645            "message": message_template_value(&message)?,
646        }))
647    }
648
649    pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
650        self.require_workspace()?;
651        let reason = self.checked_reason(reason)?;
652        let config = crate::config::MailConfig::load(&self.root)?;
653        validate_id("message_id", message_id)?;
654        self.ensure_no_related_conversation(message_id)?;
655        let message_ids = vec![message_id.to_string()];
656        self.ensure_message_ids_unreferenced(&message_ids, None)?;
657        let locations = self.message_remote_locations(&message_ids)?;
658        let transaction = self.begin_transaction(
659            "message_spam",
660            vec![
661                format!("messages/{message_id}.json"),
662                ".afmail/push".to_string(),
663            ],
664        )?;
665        let item = crate::push_queue::queue_action_steps(
666            &self.root,
667            "message.spam",
668            &message_ids,
669            &locations,
670            &config.actions.message_spam.steps,
671            None,
672        )?;
673        self.update_messages_workspace(&message_ids, "spam")?;
674        if let Some(item) = &item {
675            self.record_pending_push_item(item)?;
676        }
677        self.refresh_disposition_views()?;
678        transaction.commit()?;
679        self.append_audit_event(
680            "message_spam_marked",
681            vec![audit_target("message", message_id)],
682            reason,
683            json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
684        )?;
685        serde_json::to_value(MessageDispositionResult {
686            code: "message_spam_marked",
687            message_id: message_id.to_string(),
688            special_use: SpecialUseKind::Junk.as_str().to_string(),
689            message_ids,
690            location_count: locations.len(),
691            queued_location_count: locations.len(),
692            queued: item.is_some(),
693            push_id: item.as_ref().map(|item| item.push_id.clone()),
694        })
695        .map_err(|e| AppError::json("serialize message disposition result", &e))
696    }
697
698    pub fn unspam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
699        self.restore_local_message_disposition(
700            message_id,
701            "spam",
702            "message.spam",
703            "message_unspammed",
704            "message_unspammed",
705            reason,
706        )
707    }
708
709    pub fn archive_message(
710        &self,
711        message_id: &str,
712        archive_ref: &str,
713        summary: Option<&str>,
714        reason: Option<&str>,
715    ) -> Result<Value> {
716        self.require_workspace()?;
717        let reason = self.checked_reason(reason)?;
718        validate_id("message_id", message_id)?;
719        let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
720        self.ensure_no_related_conversation(message_id)?;
721        let transaction = self.begin_transaction(
722            "message_archive",
723            vec![
724                format!("messages/{message_id}.json"),
725                format!("archive/notifications/{archive_uid}"),
726                ".afmail/push".to_string(),
727            ],
728        )?;
729        let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
730        self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
731        self.refresh_archive_message_category(&archive_uid)?;
732        let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
733        transaction.commit()?;
734        self.append_audit_event(
735            "message_archived",
736            vec![
737                audit_target("message", message_id),
738                audit_target("archive", &archive_uid),
739            ],
740            reason,
741            json!({
742                "message_id": message_id,
743                "archive_uid": archive_uid,
744                "summary": summary,
745                "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
746            }),
747        )?;
748        serde_json::to_value(MessageArchiveResult {
749            code: "message_archived",
750            message_id: message_id.to_string(),
751            archive_uid,
752            path: rel_path(&self.root, &archive_dir),
753            special_use: SpecialUseKind::Archive.as_str().to_string(),
754            eligible_message_ids: queue.eligible_message_ids,
755            location_count: queue.location_count,
756            queued_location_count: queue.queued_location_count,
757            queued: !queue.items.is_empty(),
758            push_ids: queue
759                .items
760                .iter()
761                .map(|item| item.push_id.clone())
762                .collect::<Vec<_>>(),
763            push_id: queue.items.first().map(|item| item.push_id.clone()),
764        })
765        .map_err(|e| AppError::json("serialize message archive result", &e))
766    }
767
768    pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
769        self.require_workspace()?;
770        let reason = self.checked_reason(reason)?;
771        let config = crate::config::MailConfig::load(&self.root)?;
772        validate_id("message_id", message_id)?;
773        self.ensure_no_related_conversation(message_id)?;
774        let message_ids = vec![message_id.to_string()];
775        self.ensure_message_ids_unreferenced(&message_ids, None)?;
776        let locations = self.message_remote_locations(&message_ids)?;
777        let transaction = self.begin_transaction(
778            "message_trash",
779            vec![
780                format!("messages/{message_id}.json"),
781                ".afmail/push".to_string(),
782            ],
783        )?;
784        let item = crate::push_queue::queue_action_steps(
785            &self.root,
786            "message.trash",
787            &message_ids,
788            &locations,
789            &config.actions.message_trash.steps,
790            None,
791        )?;
792        self.update_messages_workspace(&message_ids, "trashed")?;
793        if let Some(item) = &item {
794            self.record_pending_push_item(item)?;
795        }
796        self.refresh_disposition_views()?;
797        transaction.commit()?;
798        self.append_audit_event(
799            "message_trashed",
800            vec![audit_target("message", message_id)],
801            reason,
802            json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
803        )?;
804        serde_json::to_value(MessageDispositionResult {
805            code: "message_trashed",
806            message_id: message_id.to_string(),
807            special_use: SpecialUseKind::Trash.as_str().to_string(),
808            message_ids,
809            location_count: locations.len(),
810            queued_location_count: locations.len(),
811            queued: item.is_some(),
812            push_id: item.as_ref().map(|item| item.push_id.clone()),
813        })
814        .map_err(|e| AppError::json("serialize message disposition result", &e))
815    }
816
817    pub fn untrash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
818        self.restore_local_message_disposition(
819            message_id,
820            "trashed",
821            "message.trash",
822            "message_untrashed",
823            "message_untrashed",
824            reason,
825        )
826    }
827
828    pub fn unarchive_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
829        self.require_workspace()?;
830        let reason = self.checked_reason(reason)?;
831        validate_id("message_id", message_id)?;
832        let message = self.read_message_by_id(message_id)?;
833        let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
834            AppError::new(
835                "invalid_request",
836                format!("message {message_id} is not directly archived"),
837            )
838        })?;
839        self.restore_direct_archive_message(
840            &archive_uid,
841            message_id,
842            reason,
843            "message_unarchived",
844            "message_unarchived",
845        )
846    }
847
848    pub(super) fn message_existing_view_path(
849        &self,
850        message: &MessageFile,
851    ) -> Result<Option<String>> {
852        let message_id = message.message_id.as_str();
853        let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
854        if triage_path.is_file() {
855            return Ok(Some(rel_path(&self.root, &triage_path)));
856        }
857        for dir in ["spam", "trash", "deleted"] {
858            let path = self.root.join(dir).join(format!("{message_id}.md"));
859            if path.is_file() {
860                return Ok(Some(rel_path(&self.root, &path)));
861            }
862        }
863        for (case_uid, case_path) in self.all_case_entries()? {
864            let messages_path = case_messages_json_path(&case_path);
865            let case_messages = read_case_messages(&messages_path, &case_uid);
866            if case_messages
867                .as_ref()
868                .map(|messages| messages.message_ids.iter().any(|id| id == message_id))
869                .unwrap_or(false)
870            {
871                let message_view = case_message_view_path(&case_path, message_id);
872                if message_view.is_file() {
873                    return Ok(Some(rel_path(&self.root, &message_view)));
874                }
875                let case_view = case_path.join("case.md");
876                if case_view.is_file() {
877                    return Ok(Some(rel_path(&self.root, &case_view)));
878                }
879            }
880        }
881        if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
882            let archive_view = self.archive_message_view_path(archive_uid, message_id);
883            if archive_view.is_file() {
884                return Ok(Some(rel_path(&self.root, &archive_view)));
885            }
886        }
887        Ok(None)
888    }
889
890    pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
891        Ok(message.body_text.clone())
892    }
893
894    pub(super) fn message_conversation_for_dir(
895        &self,
896        message: &MessageFile,
897        output_dir: Option<&Path>,
898    ) -> Result<String> {
899        let config = MailConfig::load(&self.root)?;
900        let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
901        self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
902    }
903
904    pub(super) fn message_conversation_with_renderer(
905        &self,
906        message: &MessageFile,
907        config: &MailConfig,
908        renderer: &mut MarkdownTemplateRenderer<'_>,
909        output_dir: Option<&Path>,
910    ) -> Result<String> {
911        let body_text = self.message_body_text(message)?;
912        renderer.render(
913            TemplateKey::MessageSection,
914            &message_section_context(
915                Some(&self.root),
916                message,
917                &body_text,
918                config.template_language(),
919                config
920                    .smtp
921                    .from
922                    .as_deref()
923                    .or(config.imap.username.as_deref()),
924                output_dir,
925            )?,
926        )
927    }
928
929    pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
930        self.rebuild_message_cache_from_eml()?;
931        let mut index = BTreeMap::new();
932        for path in message_json_paths(&self.root)? {
933            let message = read_message(&path)?;
934            if let Some(normalized) = message
935                .rfc822_message_id
936                .as_deref()
937                .and_then(normalize_rfc822_message_id)
938            {
939                index
940                    .entry(normalized)
941                    .or_insert_with(|| message.message_id.clone());
942            }
943        }
944        Ok(index)
945    }
946
947    pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
948        validate_id("message_id", message_id)?;
949        let current = self.read_message_by_id(message_id)?;
950        let rfc822_index = self.rfc822_message_id_index()?;
951        let mut related = BTreeSet::new();
952
953        for header_id in message_reply_header_ids(&current) {
954            if let Some(local_message_id) = rfc822_index.get(&header_id) {
955                if local_message_id != message_id {
956                    related.insert(local_message_id.clone());
957                }
958            }
959        }
960
961        let Some(current_rfc822_id) = current
962            .rfc822_message_id
963            .as_deref()
964            .and_then(normalize_rfc822_message_id)
965        else {
966            return Ok(related.into_iter().collect());
967        };
968
969        for path in message_json_paths(&self.root)? {
970            let other = read_message(&path)?;
971            if other.message_id == message_id {
972                continue;
973            }
974            if message_reply_header_ids(&other)
975                .iter()
976                .any(|header_id| header_id == &current_rfc822_id)
977            {
978                related.insert(other.message_id);
979            }
980        }
981
982        Ok(related.into_iter().collect())
983    }
984
985    pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
986        let related_message_ids = self.related_message_ids(message_id)?;
987        if related_message_ids.is_empty() {
988            return Ok(());
989        }
990        let mut suggested_commands = vec![format!(
991            "afmail case create --name NAME --message {message_id} --reason TEXT"
992        )];
993        for related_id in &related_message_ids {
994            suggested_commands.push(format!(
995                "afmail case add CASE_REF {related_id} --reason TEXT"
996            ));
997        }
998        suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
999        Err(AppError::new(
1000            "message_has_related_conversation_use_case",
1001            "message has RFC-header-confirmed related conversation",
1002        )
1003        .with_hint(
1004            "Create a case for the conversation, add the related messages, then archive the case.",
1005        )
1006        .with_details(json!({
1007            "message_id": message_id,
1008            "related_message_ids": related_message_ids,
1009            "suggested_commands": suggested_commands
1010        })))
1011    }
1012
1013    pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
1014        for message_id in message_ids {
1015            self.refresh_message_after_ref_change(message_id)?;
1016        }
1017        Ok(())
1018    }
1019
1020    pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
1021        validate_id("message_id", message_id)?;
1022        let message = self.read_message_by_id(message_id)?;
1023        let cases = CaseIndex::build(self)?;
1024        if self.triage_candidate(&message, &cases)? {
1025            self.write_triage_view(&message)?;
1026        } else {
1027            self.remove_triage_view_for_message(message_id)?;
1028        }
1029        self.refresh_all_case_message_views()?;
1030        self.refresh_archive_indexes()
1031    }
1032
1033    pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
1034        validate_id("message_id", message_id)?;
1035        let mut msg = self.read_message_by_id(message_id)?;
1036        let cases = CaseIndex::build(self)?;
1037        msg.workspace.status = self.derived_message_status(&msg, &cases)?;
1038        msg.workspace.remote_sync = None;
1039        self.write_message_cache(&msg)?;
1040        if self.triage_candidate(&msg, &cases)? {
1041            self.write_triage_view(&msg)?;
1042        } else {
1043            self.remove_triage_view_for_message(message_id)?;
1044        }
1045        Ok(())
1046    }
1047
1048    pub(super) fn update_messages_workspace(
1049        &self,
1050        message_ids: &[String],
1051        status: &str,
1052    ) -> Result<()> {
1053        let status = MessageStatus::parse(status)?;
1054        for message_id in message_ids {
1055            validate_id("message_id", message_id)?;
1056            let mut msg = self.read_message_by_id(message_id)?;
1057            msg.workspace.status = status.as_str().to_string();
1058            if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1059                msg.workspace.archive_uid = None;
1060                msg.workspace.archived_rfc3339 = None;
1061                msg.workspace.origin = None;
1062            }
1063            msg.workspace.remote_sync = None;
1064            self.write_message_cache(&msg)?;
1065            self.remove_triage_view_for_message(message_id)?;
1066        }
1067        Ok(())
1068    }
1069
1070    pub(super) fn restore_local_message_disposition(
1071        &self,
1072        message_id: &str,
1073        expected_status: &str,
1074        push_kind: &str,
1075        event_kind: &str,
1076        result_code: &str,
1077        reason: Option<&str>,
1078    ) -> Result<Value> {
1079        self.require_workspace()?;
1080        let reason = self.checked_reason(reason)?;
1081        validate_id("message_id", message_id)?;
1082        let mut message = self.read_message_by_id(message_id)?;
1083        if message.workspace.status != expected_status {
1084            return Err(AppError::new(
1085                "invalid_request",
1086                format!("message {message_id} is not {expected_status}"),
1087            ));
1088        }
1089        let removed_push =
1090            crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1091        let push_ids = removed_push
1092            .iter()
1093            .map(|item| item.push_id.clone())
1094            .collect::<Vec<_>>();
1095        message.workspace.status = "triage".to_string();
1096        message.workspace.remote_sync = None;
1097        self.write_message_cache(&message)?;
1098        self.refresh_message_after_ref_change(message_id)?;
1099        self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1100        self.refresh_disposition_views()?;
1101        self.append_audit_event(
1102            event_kind,
1103            vec![audit_target("message", message_id)],
1104            reason,
1105            json!({
1106                "message_id": message_id,
1107                "from_status": expected_status,
1108                "to_status": "triage",
1109                "removed_push_ids": push_ids.clone(),
1110            }),
1111        )?;
1112        Ok(json!({
1113            "code": result_code,
1114            "message_id": message_id,
1115            "from_status": expected_status,
1116            "status": "triage",
1117            "triage_path": format!("triage/{message_id}.md"),
1118            "removed_push_count": push_ids.len(),
1119            "push_ids": push_ids,
1120        }))
1121    }
1122
1123    pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1124        let path = self.root.join("triage").join(format!("{message_id}.md"));
1125        if path.exists() {
1126            remove_file(&path)?;
1127        }
1128        Ok(())
1129    }
1130
1131    pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1132        validate_id("message_id", message_id)?;
1133        if message_eml_path(&self.root, message_id).is_file() {
1134            self.materialize_message_cache_if_needed(message_id)?;
1135        }
1136        let path = self.message_path(message_id);
1137        read_message(&path)
1138    }
1139
1140    pub(crate) fn relocate_message(
1141        &self,
1142        message_id: &str,
1143        target_locations: &[crate::types::RemoteLocation],
1144    ) -> Result<()> {
1145        validate_id("message_id", message_id)?;
1146        let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1147        for location in target_locations {
1148            if !locations.iter().any(|existing| {
1149                existing.mailbox_name == location.mailbox_name
1150                    && existing.uid_validity == location.uid_validity
1151                    && existing.uid == location.uid
1152            }) {
1153                locations.push(location.clone());
1154            }
1155        }
1156        if locations.is_empty() {
1157            return Ok(());
1158        };
1159        let mut message = self.read_message_by_id(message_id)?;
1160        message.remote = Some(crate::types::RemoteState { locations });
1161        self.persist_message_remote(&message)?;
1162        self.write_message_materialized_cache(&message)?;
1163        Ok(())
1164    }
1165}