Skip to main content

agent_first_mail/store/
messages.rs

1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4    let dir = root.join("messages");
5    if !dir.exists() {
6        return Ok(Vec::new());
7    }
8    let mut paths = read_dir(&dir, "read messages")?
9        .into_iter()
10        .map(|entry| entry.path())
11        .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12        .collect::<Vec<_>>();
13    paths.sort();
14    Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageRemoteFile {
20    schema_name: String,
21    schema_version: u64,
22    message_id: String,
23    #[serde(default, skip_serializing_if = "Vec::is_empty")]
24    locations: Vec<RemoteLocation>,
25}
26
27#[derive(Debug, Serialize)]
28struct MessageDispositionResult {
29    code: &'static str,
30    message_id: String,
31    special_use: String,
32    message_ids: Vec<String>,
33    location_count: usize,
34    queued_location_count: usize,
35    queued: bool,
36    push_id: Option<String>,
37}
38
39#[derive(Debug, Serialize)]
40struct MessageArchiveResult {
41    code: &'static str,
42    message_id: String,
43    archive_uid: String,
44    path: String,
45    special_use: String,
46    eligible_message_ids: Vec<String>,
47    location_count: usize,
48    queued_location_count: usize,
49    queued: bool,
50    push_ids: Vec<String>,
51    push_id: Option<String>,
52}
53
54#[derive(Clone, Debug, Default)]
55pub(super) struct MessageCacheRebuildTotals {
56    pub(super) rebuilt_count: usize,
57    pub(super) removed_text_cache_count: usize,
58}
59
60impl MessageRemoteFile {
61    fn from_message(message: &MessageFile) -> Option<Self> {
62        let remote = message.remote.as_ref()?;
63        if remote.locations.is_empty() {
64            return None;
65        }
66        Some(Self {
67            schema_name: "message_remote".to_string(),
68            schema_version: 1,
69            message_id: message.message_id.clone(),
70            locations: remote.locations.clone(),
71        })
72    }
73
74    fn remote_state(&self) -> RemoteState {
75        RemoteState {
76            locations: self.locations.clone(),
77        }
78    }
79}
80
81fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
82    root.join(".afmail/messages")
83        .join(format!("{message_id}.eml"))
84}
85
86fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
87    root.join(".afmail/messages")
88        .join(format!("{message_id}.state.json"))
89}
90
91fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
92    root.join(".afmail/messages")
93        .join(format!("{message_id}.remote.json"))
94}
95
96fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
97    let path = message_remote_path(root, message_id);
98    if !path.exists() {
99        return Ok(None);
100    }
101    let data = read_to_string(&path, "read message remote")?;
102    let remote: MessageRemoteFile =
103        serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
104    if remote.schema_name != "message_remote"
105        || remote.schema_version != 1
106        || remote.message_id != message_id
107    {
108        return Err(AppError::new(
109            "message_remote_invalid",
110            format!("invalid message remote sidecar: {}", rel_path(root, &path)),
111        ));
112    }
113    Ok(Some(remote))
114}
115
116fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
117    let path = message_remote_path(root, &remote.message_id);
118    if let Some(parent) = path.parent() {
119        create_dir_all(parent)?;
120    }
121    let mut normalized = remote.clone();
122    normalized.schema_name = "message_remote".to_string();
123    normalized.schema_version = 1;
124    write_json_pretty(&path, &normalized)
125}
126
127fn default_message_workspace() -> WorkspaceState {
128    WorkspaceState {
129        status: "triage".to_string(),
130        archive_uid: None,
131        archived_rfc3339: None,
132        origin: None,
133        remote_sync: None,
134        push: None,
135    }
136}
137
138pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
139    validate_id("message_id", message_id)?;
140    let message_dir = root.join(".afmail/messages");
141    for path in [
142        root.join("messages").join(format!("{message_id}.json")),
143        message_dir.join(format!("{message_id}.json")),
144        message_dir.join(format!("{message_id}.eml")),
145        message_dir.join(format!("{message_id}.state.json")),
146        message_dir.join(format!("{message_id}.remote.json")),
147        message_dir.join(format!("{message_id}.txt")),
148    ] {
149        if path.exists() {
150            remove_file(&path)?;
151        }
152    }
153    let files_dir = message_dir.join(format!("{message_id}.files"));
154    if files_dir.exists() {
155        remove_dir_all(&files_dir)?;
156    }
157    Ok(())
158}
159
160pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
161    attachments
162        .iter()
163        .map(|attachment| {
164            json!({
165                "part_id": attachment.part_id.as_str(),
166                "filename": attachment.filename.as_str(),
167                "saved_filename": saved_filename_for_attachment(attachment),
168                "content_type": attachment.content_type.as_str(),
169                "size_bytes": attachment.size_bytes,
170                "fetched": attachment.fetched,
171                "file_path": attachment.file_path.as_deref().unwrap_or(""),
172                "storage": if attachment.fetched { "message_cache" } else { "" },
173            })
174        })
175        .collect()
176}
177
178pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
179    let data = read_to_string(path, "read message json")?;
180    let message: MessageFile =
181        serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
182    if message.schema_name != "message" || message.schema_version != 1 {
183        return Err(AppError::new(
184            "message_cache_invalid",
185            format!("unsupported message cache schema: {}", path_to_string(path)),
186        ));
187    }
188    MessageStatus::parse(&message.workspace.status)?;
189    if let Some(direction) = message.direction.as_deref() {
190        MailDirection::parse(direction)?;
191    }
192    Ok(message)
193}
194
195pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
196    let normalized = value
197        .trim()
198        .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
199        .trim()
200        .to_ascii_lowercase();
201    if normalized.is_empty() {
202        None
203    } else {
204        Some(normalized)
205    }
206}
207
208pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
209    let mut ids = Vec::new();
210    let mut rest = value;
211    while let Some(start) = rest.find('<') {
212        let after_start = &rest[start + 1..];
213        let Some(end) = after_start.find('>') else {
214            break;
215        };
216        if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
217            ids.push(id);
218        }
219        rest = &after_start[end + 1..];
220    }
221    if ids.is_empty() {
222        ids.extend(
223            value
224                .split_whitespace()
225                .filter_map(normalize_rfc822_message_id),
226        );
227    }
228    ids.sort();
229    ids.dedup();
230    ids
231}
232
233pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
234    let mut ids = Vec::new();
235    if let Some(in_reply_to) = &message.in_reply_to {
236        ids.extend(rfc822_message_id_candidates(in_reply_to));
237    }
238    for reference in &message.references {
239        ids.extend(rfc822_message_id_candidates(reference));
240    }
241    ids.sort();
242    ids.dedup();
243    ids
244}
245
246impl Workspace {
247    pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
248        let path = message_remote_path(&self.root, &message.message_id);
249        if let Some(remote) = MessageRemoteFile::from_message(message) {
250            write_message_remote_file(&self.root, &remote)
251        } else if path.exists() {
252            remove_file(&path)
253        } else {
254            Ok(())
255        }
256    }
257
258    pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
259        let mut message = message.clone();
260        message.schema_name = "message".to_string();
261        message.schema_version = 1;
262        if message.eml_path.is_none() {
263            message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
264        }
265        let deprecated_state = message_state_path(&self.root, &message.message_id);
266        if deprecated_state.exists() {
267            remove_file(&deprecated_state)?;
268        }
269        let path = self.message_path(&message.message_id);
270        if let Some(parent) = path.parent() {
271            create_dir_all(parent)?;
272        }
273        write_json_pretty(&path, &message)
274    }
275
276    pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
277        self.write_message_materialized_cache(message)
278    }
279
280    pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
281        self.persist_message_remote(message)?;
282        self.write_message_materialized_cache(message)
283    }
284
285    fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
286        if !self.message_cache_needs_materialize(message_id)? {
287            return Ok(false);
288        }
289        self.materialize_message_cache(message_id)?;
290        Ok(true)
291    }
292
293    fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
294        validate_id("message_id", message_id)?;
295        let eml_path = message_eml_path(&self.root, message_id);
296        let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
297        let prior = read_message(&self.message_path(message_id)).ok();
298        let remote =
299            read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
300        let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
301        let mut parsed = crate::mail::parse_message_with_options(
302            message_id.to_string(),
303            &raw,
304            crate::mail::MessageParseOptions {
305                direction,
306                workspace: default_message_workspace(),
307                remote,
308                received_rfc3339: prior
309                    .as_ref()
310                    .and_then(|message| message.received_rfc3339.clone()),
311                sent_rfc3339: prior
312                    .as_ref()
313                    .and_then(|message| message.sent_rfc3339.clone()),
314                attachments: prior
315                    .as_ref()
316                    .map(|message| message.attachments.clone())
317                    .unwrap_or_default(),
318            },
319        )?;
320        self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
321        self.apply_materialized_workspace_overlays(&mut parsed.message)?;
322        self.write_message_materialized_cache(&parsed.message)?;
323        Ok(parsed.message)
324    }
325
326    fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
327        let cache_path = self.message_path(message_id);
328        if !cache_path.exists() {
329            return Ok(true);
330        }
331        let data = match read_to_string(&cache_path, "read message cache") {
332            Ok(data) => data,
333            Err(_) => return Ok(true),
334        };
335        let value: Value = match serde_json::from_str(&data) {
336            Ok(value) => value,
337            Err(_) => return Ok(true),
338        };
339        if value.get("schema_name").and_then(Value::as_str) != Some("message")
340            || value.get("schema_version").and_then(Value::as_u64) != Some(1)
341        {
342            return Ok(true);
343        }
344        let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
345            Ok(time) => time,
346            Err(_) => return Ok(true),
347        };
348        for input in [
349            message_eml_path(&self.root, message_id),
350            message_remote_path(&self.root, message_id),
351        ] {
352            if !input.exists() {
353                continue;
354            }
355            if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
356                if input_modified > cache_modified {
357                    return Ok(true);
358                }
359            }
360        }
361        Ok(false)
362    }
363
364    fn infer_materialized_direction(
365        &self,
366        raw: &[u8],
367        prior: Option<&MessageFile>,
368        remote: Option<&RemoteState>,
369    ) -> Option<String> {
370        if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
371            return Some(direction);
372        }
373        if let Some(remote) = remote {
374            if let Ok(config) = MailConfig::load(&self.root) {
375                for location in &remote.locations {
376                    if let Some(mailbox_id) = location.mailbox_id.as_deref() {
377                        if let Ok(action) = config.pull_action(mailbox_id) {
378                            return Some(action.direction.as_str().to_string());
379                        }
380                    }
381                }
382            }
383        }
384        let local_message_id = mail_parser::MessageParser::default()
385            .parse(raw)
386            .and_then(|message| message.message_id().map(ToString::to_string))
387            .is_some_and(|message_id| {
388                message_id
389                    .trim()
390                    .trim_matches(|ch| matches!(ch, '<' | '>'))
391                    .ends_with("@afmail.local")
392            });
393        Some(if local_message_id {
394            "outbound".to_string()
395        } else {
396            "inbound".to_string()
397        })
398    }
399
400    fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
401        let files_dir = self
402            .root
403            .join(".afmail/messages")
404            .join(format!("{message_id}.files"));
405        if !files_dir.is_dir() {
406            return;
407        }
408        for attachment in attachments {
409            if attachment.fetched
410                && attachment
411                    .file_path
412                    .as_deref()
413                    .is_some_and(|path| self.root.join(path).is_file())
414            {
415                continue;
416            }
417            let candidate = files_dir.join(safe_attachment_filename(
418                &attachment.filename,
419                &attachment.part_id,
420            ));
421            if candidate.is_file() {
422                attachment.fetched = true;
423                attachment.file_path = Some(rel_path(&self.root, &candidate));
424            }
425        }
426    }
427
428    pub(super) fn apply_materialized_workspace_overlays(
429        &self,
430        message: &mut MessageFile,
431    ) -> Result<()> {
432        message.workspace.archive_uid = None;
433        message.workspace.archived_rfc3339 = None;
434        let cases = CaseIndex::build(self)?;
435        if let Some((status, _added_rfc3339)) =
436            self.disposition_state_for_message(&message.message_id)?
437        {
438            message.workspace.status = status.as_str().to_string();
439            message.workspace.origin = None;
440        } else if let Some((archive_uid, archived_rfc3339)) = self
441            .direct_archive_state_by_message()?
442            .get(&message.message_id)
443        {
444            message.workspace.status = "archived".to_string();
445            message.workspace.archive_uid = Some(archive_uid.clone());
446            message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
447            message.workspace.origin = None;
448        } else if cases.has_any_reference(&message.message_id) {
449            message.workspace.status = MessageStatus::Case.as_str().to_string();
450        } else if message.workspace.origin.is_some() {
451            message.workspace.status = MessageStatus::Archived.as_str().to_string();
452        } else {
453            message.workspace.status = MessageStatus::Triage.as_str().to_string();
454        }
455        let last_completed_rfc3339 = message
456            .workspace
457            .push
458            .as_ref()
459            .and_then(|push| push.last_completed_rfc3339.clone());
460        message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
461        if let Some(last_completed_rfc3339) = last_completed_rfc3339 {
462            let push = message
463                .workspace
464                .push
465                .get_or_insert_with(WorkspacePushState::default);
466            if push.last_completed_rfc3339.is_none() {
467                push.last_completed_rfc3339 = Some(last_completed_rfc3339);
468            }
469        }
470        if message
471            .workspace
472            .push
473            .as_ref()
474            .is_some_and(|push| !push.pending.is_empty())
475            && MessageStatus::parse(&message.workspace.status)? == MessageStatus::Triage
476        {
477            message.workspace.status = MessageStatus::PushQueued.as_str().to_string();
478        }
479        Ok(())
480    }
481
482    fn pending_push_state_for_message(
483        &self,
484        message_id: &str,
485    ) -> Result<Option<WorkspacePushState>> {
486        let mut state = WorkspacePushState::default();
487        for item in crate::push_queue::pending_items(&self.root)? {
488            if !item.message_ids().iter().any(|id| id == message_id) {
489                continue;
490            }
491            state.pending.push(WorkspacePendingPush {
492                push_id: item.push_id.clone(),
493                kind: item.display_kind(),
494                queued_rfc3339: item.created_rfc3339.clone(),
495                last_error: item.last_error.clone(),
496            });
497        }
498        state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
499        if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
500            Ok(None)
501        } else {
502            Ok(Some(state))
503        }
504    }
505
506    pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
507        let messages_dir = self.root.join(".afmail/messages");
508        if !messages_dir.exists() {
509            return Ok(MessageCacheRebuildTotals::default());
510        }
511        let mut totals = MessageCacheRebuildTotals::default();
512        let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
513            .into_iter()
514            .map(|entry| entry.path())
515            .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
516            .collect::<Vec<_>>();
517        eml_paths.sort();
518
519        for eml_path in eml_paths {
520            let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
521                continue;
522            };
523            validate_id("message_id", message_id)?;
524            if self.materialize_message_cache_if_needed(message_id)? {
525                totals.rebuilt_count += 1;
526            }
527        }
528
529        for entry in read_dir(&messages_dir, "read message cache")? {
530            let path = entry.path();
531            if path.extension().and_then(|s| s.to_str()) == Some("txt") {
532                remove_file(&path)?;
533                totals.removed_text_cache_count += 1;
534            }
535        }
536        Ok(totals)
537    }
538
539    fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
540        let mut out = BTreeMap::new();
541        for archive_uid in self.archive_message_category_ids()? {
542            let data = self.read_archive_messages(&archive_uid)?;
543            for item in data.items {
544                out.insert(item.message_id, (archive_uid.clone(), item.added_rfc3339));
545            }
546        }
547        Ok(out)
548    }
549
550    pub fn message_show(&self, message_id: &str) -> Result<Value> {
551        self.require_workspace()?;
552        validate_id("message_id", message_id)?;
553        let config = MailConfig::load(&self.root)?;
554        let message = self.read_message_by_id(message_id)?;
555        let body_text = self.message_body_text(&message)?;
556        let flags = message_remote_flags(&message);
557        let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
558        let flagged = flags
559            .iter()
560            .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
561        let push = message.workspace.push.clone();
562        Ok(json!({
563            "code": "message_show",
564            "message_id": message.message_id.as_str(),
565            "from": message.from.as_deref().unwrap_or(""),
566            "to": &message.to,
567            "cc": &message.cc,
568            "bcc": &message.bcc,
569            "reply_to": &message.reply_to,
570            "sender": message.sender.as_deref().unwrap_or(""),
571            "subject": message.subject.as_deref().unwrap_or(""),
572            "direction": message.direction.as_deref().unwrap_or(""),
573            "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
574            "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
575            "body_text": body_text,
576            "attachment_count": message.attachments.len(),
577            "attachments": attachment_metadata_values(&message.attachments),
578            "mailbox_ids": message_mailbox_ids(&message, &config),
579            "flags": flags,
580            "unread": unread,
581            "flagged": flagged,
582            "remote_missing": message_remote_missing(&message),
583            "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
584            "remote_effect_pending": message_remote_effect_pending(&message),
585            "push": push,
586            "view_path": self.message_existing_view_path(&message)?,
587            "json_path": format!("messages/{message_id}.json"),
588            "message": message_template_value(&message)?,
589        }))
590    }
591
592    pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
593        self.require_workspace()?;
594        let reason = self.checked_reason(reason)?;
595        let config = crate::config::MailConfig::load(&self.root)?;
596        validate_id("message_id", message_id)?;
597        self.ensure_no_related_conversation(message_id)?;
598        let message_ids = vec![message_id.to_string()];
599        self.ensure_message_ids_unreferenced(&message_ids, None)?;
600        let locations = self.message_remote_locations(&message_ids)?;
601        let transaction = self.begin_transaction(
602            "message_spam",
603            vec![
604                format!("messages/{message_id}.json"),
605                ".afmail/push".to_string(),
606            ],
607        )?;
608        let item = crate::push_queue::queue_action_steps(
609            &self.root,
610            "message.spam",
611            &message_ids,
612            &locations,
613            &config.actions.message_spam.steps,
614            None,
615        )?;
616        self.update_messages_workspace(&message_ids, "spam")?;
617        if let Some(item) = &item {
618            self.record_pending_push_item(item)?;
619        }
620        self.refresh_disposition_views()?;
621        transaction.commit()?;
622        self.append_audit_event(
623            "message_spam_marked",
624            vec![audit_target("message", message_id)],
625            reason,
626            json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
627        )?;
628        serde_json::to_value(MessageDispositionResult {
629            code: "message_spam_marked",
630            message_id: message_id.to_string(),
631            special_use: SpecialUseKind::Junk.as_str().to_string(),
632            message_ids,
633            location_count: locations.len(),
634            queued_location_count: locations.len(),
635            queued: item.is_some(),
636            push_id: item.as_ref().map(|item| item.push_id.clone()),
637        })
638        .map_err(|e| AppError::json("serialize message disposition result", &e))
639    }
640
641    pub fn restore_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
642        self.require_workspace()?;
643        let reason = self.checked_reason(reason)?;
644        validate_id("message_id", message_id)?;
645        let message = self.read_message_by_id(message_id)?;
646        match message.workspace.status.as_str() {
647            "spam" => self.restore_local_message_disposition(
648                message_id,
649                "spam",
650                "message.spam",
651                "message_restored",
652                "message_restored",
653                reason,
654            ),
655            "trashed" => self.restore_local_message_disposition(
656                message_id,
657                "trashed",
658                "message.trash",
659                "message_restored",
660                "message_restored",
661                reason,
662            ),
663            "archived" => {
664                let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
665                    AppError::new(
666                        "invalid_request",
667                        format!("message {message_id} is not directly archived"),
668                    )
669                })?;
670                self.restore_direct_archive_message(
671                    &archive_uid,
672                    message_id,
673                    reason,
674                    "message_restored",
675                    "message_restored",
676                )
677            }
678            other => Err(AppError::new(
679                "invalid_request",
680                format!("message {message_id} has no disposition to restore (status: {other})"),
681            )),
682        }
683    }
684
685    pub fn archive_message(
686        &self,
687        message_id: &str,
688        archive_ref: &str,
689        summary: Option<&str>,
690        reason: Option<&str>,
691    ) -> Result<Value> {
692        self.require_workspace()?;
693        let reason = self.checked_reason(reason)?;
694        validate_id("message_id", message_id)?;
695        let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
696        self.ensure_no_related_conversation(message_id)?;
697        let transaction = self.begin_transaction(
698            "message_archive",
699            vec![
700                format!("messages/{message_id}.json"),
701                format!("archive/notifications/{archive_uid}"),
702                ".afmail/push".to_string(),
703            ],
704        )?;
705        let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
706        self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
707        self.refresh_archive_message_category(&archive_uid)?;
708        let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
709        transaction.commit()?;
710        self.append_audit_event(
711            "message_archived",
712            vec![
713                audit_target("message", message_id),
714                audit_target("archive", &archive_uid),
715            ],
716            reason,
717            json!({
718                "message_id": message_id,
719                "archive_uid": archive_uid,
720                "summary": summary,
721                "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
722            }),
723        )?;
724        serde_json::to_value(MessageArchiveResult {
725            code: "message_archived",
726            message_id: message_id.to_string(),
727            archive_uid,
728            path: rel_path(&self.root, &archive_dir),
729            special_use: SpecialUseKind::Archive.as_str().to_string(),
730            eligible_message_ids: queue.eligible_message_ids,
731            location_count: queue.location_count,
732            queued_location_count: queue.queued_location_count,
733            queued: !queue.items.is_empty(),
734            push_ids: queue
735                .items
736                .iter()
737                .map(|item| item.push_id.clone())
738                .collect::<Vec<_>>(),
739            push_id: queue.items.first().map(|item| item.push_id.clone()),
740        })
741        .map_err(|e| AppError::json("serialize message archive result", &e))
742    }
743
744    pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
745        self.require_workspace()?;
746        let reason = self.checked_reason(reason)?;
747        let config = crate::config::MailConfig::load(&self.root)?;
748        validate_id("message_id", message_id)?;
749        self.ensure_no_related_conversation(message_id)?;
750        let message_ids = vec![message_id.to_string()];
751        self.ensure_message_ids_unreferenced(&message_ids, None)?;
752        let locations = self.message_remote_locations(&message_ids)?;
753        let transaction = self.begin_transaction(
754            "message_trash",
755            vec![
756                format!("messages/{message_id}.json"),
757                ".afmail/push".to_string(),
758            ],
759        )?;
760        let item = crate::push_queue::queue_action_steps(
761            &self.root,
762            "message.trash",
763            &message_ids,
764            &locations,
765            &config.actions.message_trash.steps,
766            None,
767        )?;
768        self.update_messages_workspace(&message_ids, "trashed")?;
769        if let Some(item) = &item {
770            self.record_pending_push_item(item)?;
771        }
772        self.refresh_disposition_views()?;
773        transaction.commit()?;
774        self.append_audit_event(
775            "message_trashed",
776            vec![audit_target("message", message_id)],
777            reason,
778            json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
779        )?;
780        serde_json::to_value(MessageDispositionResult {
781            code: "message_trashed",
782            message_id: message_id.to_string(),
783            special_use: SpecialUseKind::Trash.as_str().to_string(),
784            message_ids,
785            location_count: locations.len(),
786            queued_location_count: locations.len(),
787            queued: item.is_some(),
788            push_id: item.as_ref().map(|item| item.push_id.clone()),
789        })
790        .map_err(|e| AppError::json("serialize message disposition result", &e))
791    }
792
793    pub(super) fn message_existing_view_path(
794        &self,
795        message: &MessageFile,
796    ) -> Result<Option<String>> {
797        let message_id = message.message_id.as_str();
798        let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
799        if triage_path.is_file() {
800            return Ok(Some(rel_path(&self.root, &triage_path)));
801        }
802        for dir in ["spam", "trash", "deleted"] {
803            let path = self.root.join(dir).join(format!("{message_id}.md"));
804            if path.is_file() {
805                return Ok(Some(rel_path(&self.root, &path)));
806            }
807        }
808        for (case_uid, case_path) in self.all_case_entries()? {
809            let case_messages = read_case_messages(&case_path, &case_uid);
810            if case_messages
811                .as_ref()
812                .map(|messages| messages.contains_message(message_id))
813                .unwrap_or(false)
814            {
815                let message_view = case_message_view_path(&case_path, message_id);
816                if message_view.is_file() {
817                    return Ok(Some(rel_path(&self.root, &message_view)));
818                }
819                let case_view = case_path.join("case.md");
820                if case_view.is_file() {
821                    return Ok(Some(rel_path(&self.root, &case_view)));
822                }
823            }
824        }
825        if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
826            let archive_view = self.archive_message_view_path(archive_uid, message_id);
827            if archive_view.is_file() {
828                return Ok(Some(rel_path(&self.root, &archive_view)));
829            }
830        }
831        Ok(None)
832    }
833
834    pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
835        Ok(message.body_text.clone())
836    }
837
838    pub(super) fn message_conversation_for_dir(
839        &self,
840        message: &MessageFile,
841        output_dir: Option<&Path>,
842    ) -> Result<String> {
843        let config = MailConfig::load(&self.root)?;
844        let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
845        self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
846    }
847
848    pub(super) fn message_conversation_with_renderer(
849        &self,
850        message: &MessageFile,
851        config: &MailConfig,
852        renderer: &mut MarkdownTemplateRenderer<'_>,
853        output_dir: Option<&Path>,
854    ) -> Result<String> {
855        let body_text = self.message_body_text(message)?;
856        renderer.render(
857            TemplateKey::MessageSection,
858            &message_section_context(
859                Some(&self.root),
860                message,
861                &body_text,
862                config.template_language(),
863                config
864                    .smtp
865                    .from
866                    .as_deref()
867                    .or(config.imap.username.as_deref()),
868                output_dir,
869            )?,
870        )
871    }
872
873    pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
874        self.rebuild_message_cache_from_eml()?;
875        let mut index = BTreeMap::new();
876        for path in message_json_paths(&self.root)? {
877            let message = read_message(&path)?;
878            if let Some(normalized) = message
879                .rfc822_message_id
880                .as_deref()
881                .and_then(normalize_rfc822_message_id)
882            {
883                index
884                    .entry(normalized)
885                    .or_insert_with(|| message.message_id.clone());
886            }
887        }
888        Ok(index)
889    }
890
891    pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
892        validate_id("message_id", message_id)?;
893        let current = self.read_message_by_id(message_id)?;
894        let rfc822_index = self.rfc822_message_id_index()?;
895        let mut related = BTreeSet::new();
896
897        for header_id in message_reply_header_ids(&current) {
898            if let Some(local_message_id) = rfc822_index.get(&header_id) {
899                if local_message_id != message_id {
900                    related.insert(local_message_id.clone());
901                }
902            }
903        }
904
905        let Some(current_rfc822_id) = current
906            .rfc822_message_id
907            .as_deref()
908            .and_then(normalize_rfc822_message_id)
909        else {
910            return Ok(related.into_iter().collect());
911        };
912
913        for path in message_json_paths(&self.root)? {
914            let other = read_message(&path)?;
915            if other.message_id == message_id {
916                continue;
917            }
918            if message_reply_header_ids(&other)
919                .iter()
920                .any(|header_id| header_id == &current_rfc822_id)
921            {
922                related.insert(other.message_id);
923            }
924        }
925
926        Ok(related.into_iter().collect())
927    }
928
929    pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
930        let related_message_ids = self.related_message_ids(message_id)?;
931        if related_message_ids.is_empty() {
932            return Ok(());
933        }
934        let mut suggested_commands = vec![format!(
935            "afmail case create --name NAME --message {message_id} --reason TEXT"
936        )];
937        for related_id in &related_message_ids {
938            suggested_commands.push(format!(
939                "afmail case add CASE_REF {related_id} --reason TEXT"
940            ));
941        }
942        suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
943        Err(AppError::new(
944            "message_has_related_conversation_use_case",
945            "message has RFC-header-confirmed related conversation",
946        )
947        .with_hint(
948            "Create a case for the conversation, add the related messages, then archive the case.",
949        )
950        .with_details(json!({
951            "message_id": message_id,
952            "related_message_ids": related_message_ids,
953            "suggested_commands": suggested_commands
954        })))
955    }
956
957    pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
958        for message_id in message_ids {
959            self.refresh_message_after_ref_change(message_id)?;
960        }
961        Ok(())
962    }
963
964    pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
965        validate_id("message_id", message_id)?;
966        let message = self.read_message_by_id(message_id)?;
967        let cases = CaseIndex::build(self)?;
968        if self.triage_candidate(&message, &cases)? {
969            self.write_triage_view(&message)?;
970        } else {
971            self.remove_triage_view_for_message(message_id)?;
972        }
973        self.refresh_all_case_message_views()?;
974        self.refresh_archive_indexes()
975    }
976
977    pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
978        validate_id("message_id", message_id)?;
979        let mut msg = self.read_message_by_id(message_id)?;
980        let cases = CaseIndex::build(self)?;
981        self.apply_materialized_workspace_overlays(&mut msg)?;
982        msg.workspace.remote_sync = None;
983        self.write_message_materialized_cache(&msg)?;
984        if self.triage_candidate(&msg, &cases)? {
985            self.write_triage_view(&msg)?;
986        } else {
987            self.remove_triage_view_for_message(message_id)?;
988        }
989        Ok(())
990    }
991
992    pub(super) fn update_messages_workspace(
993        &self,
994        message_ids: &[String],
995        status: &str,
996    ) -> Result<()> {
997        let status = MessageStatus::parse(status)?;
998        for message_id in message_ids {
999            validate_id("message_id", message_id)?;
1000            if disposition_spec_for_status(status).is_some() {
1001                self.set_message_disposition(status, message_id, None, &now_rfc3339())?;
1002            } else {
1003                self.clear_message_from_all_dispositions(message_id)?;
1004            }
1005            let mut msg = self.read_message_by_id(message_id)?;
1006            msg.workspace.status = status.as_str().to_string();
1007            if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1008                msg.workspace.archive_uid = None;
1009                msg.workspace.archived_rfc3339 = None;
1010                msg.workspace.origin = None;
1011            }
1012            msg.workspace.remote_sync = None;
1013            self.write_message_materialized_cache(&msg)?;
1014            self.remove_triage_view_for_message(message_id)?;
1015        }
1016        Ok(())
1017    }
1018
1019    pub(super) fn restore_local_message_disposition(
1020        &self,
1021        message_id: &str,
1022        expected_status: &str,
1023        push_kind: &str,
1024        event_kind: &str,
1025        result_code: &str,
1026        reason: Option<&str>,
1027    ) -> Result<Value> {
1028        self.require_workspace()?;
1029        let reason = self.checked_reason(reason)?;
1030        validate_id("message_id", message_id)?;
1031        let mut message = self.read_message_by_id(message_id)?;
1032        if message.workspace.status != expected_status {
1033            return Err(AppError::new(
1034                "invalid_request",
1035                format!("message {message_id} is not {expected_status}"),
1036            ));
1037        }
1038        let removed_push =
1039            crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1040        let push_ids = removed_push
1041            .iter()
1042            .map(|item| item.push_id.clone())
1043            .collect::<Vec<_>>();
1044        self.clear_message_disposition(MessageStatus::parse(expected_status)?, message_id)?;
1045        message.workspace.status = "triage".to_string();
1046        message.workspace.remote_sync = None;
1047        self.write_message_materialized_cache(&message)?;
1048        self.refresh_message_after_ref_change(message_id)?;
1049        self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1050        self.refresh_disposition_views()?;
1051        self.append_audit_event(
1052            event_kind,
1053            vec![audit_target("message", message_id)],
1054            reason,
1055            json!({
1056                "message_id": message_id,
1057                "from_status": expected_status,
1058                "to_status": "triage",
1059                "removed_push_ids": push_ids.clone(),
1060            }),
1061        )?;
1062        Ok(json!({
1063            "code": result_code,
1064            "message_id": message_id,
1065            "from_status": expected_status,
1066            "status": "triage",
1067            "triage_path": format!("triage/{message_id}.md"),
1068            "removed_push_count": push_ids.len(),
1069            "push_ids": push_ids,
1070        }))
1071    }
1072
1073    pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1074        let path = self.root.join("triage").join(format!("{message_id}.md"));
1075        if path.exists() {
1076            remove_file(&path)?;
1077        }
1078        Ok(())
1079    }
1080
1081    pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1082        validate_id("message_id", message_id)?;
1083        if message_eml_path(&self.root, message_id).is_file() {
1084            self.materialize_message_cache_if_needed(message_id)?;
1085        }
1086        let path = self.message_path(message_id);
1087        let mut message = read_message(&path)?;
1088        let original_workspace = message.workspace.clone();
1089        self.apply_materialized_workspace_overlays(&mut message)?;
1090        if message.workspace != original_workspace {
1091            self.write_message_materialized_cache(&message)?;
1092        }
1093        Ok(message)
1094    }
1095
1096    pub(crate) fn relocate_message(
1097        &self,
1098        message_id: &str,
1099        target_locations: &[crate::types::RemoteLocation],
1100    ) -> Result<()> {
1101        validate_id("message_id", message_id)?;
1102        let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1103        for location in target_locations {
1104            if !locations.iter().any(|existing| {
1105                existing.mailbox_name == location.mailbox_name
1106                    && existing.uid_validity == location.uid_validity
1107                    && existing.uid == location.uid
1108            }) {
1109                locations.push(location.clone());
1110            }
1111        }
1112        if locations.is_empty() {
1113            return Ok(());
1114        };
1115        let mut message = self.read_message_by_id(message_id)?;
1116        message.remote = Some(crate::types::RemoteState { locations });
1117        self.persist_message_remote(&message)?;
1118        self.write_message_materialized_cache(&message)?;
1119        Ok(())
1120    }
1121}