1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4 let dir = root.join("messages");
5 if !dir.exists() {
6 return Ok(Vec::new());
7 }
8 let mut paths = read_dir(&dir, "read messages")?
9 .into_iter()
10 .map(|entry| entry.path())
11 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12 .collect::<Vec<_>>();
13 paths.sort();
14 Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageRemoteFile {
20 schema_name: String,
21 schema_version: u64,
22 message_id: String,
23 #[serde(default, skip_serializing_if = "Vec::is_empty")]
24 locations: Vec<RemoteLocation>,
25}
26
27#[derive(Debug, Serialize)]
28struct MessageDispositionResult {
29 code: &'static str,
30 message_id: String,
31 special_use: String,
32 message_ids: Vec<String>,
33 location_count: usize,
34 queued_location_count: usize,
35 queued: bool,
36 push_id: Option<String>,
37}
38
39#[derive(Debug, Serialize)]
40struct MessageArchiveResult {
41 code: &'static str,
42 message_id: String,
43 archive_uid: String,
44 path: String,
45 special_use: String,
46 eligible_message_ids: Vec<String>,
47 location_count: usize,
48 queued_location_count: usize,
49 queued: bool,
50 push_ids: Vec<String>,
51 push_id: Option<String>,
52}
53
54#[derive(Clone, Debug, Default)]
55pub(super) struct MessageCacheRebuildTotals {
56 pub(super) rebuilt_count: usize,
57 pub(super) removed_text_cache_count: usize,
58}
59
60impl MessageRemoteFile {
61 fn from_message(message: &MessageFile) -> Option<Self> {
62 let remote = message.remote.as_ref()?;
63 if remote.locations.is_empty() {
64 return None;
65 }
66 Some(Self {
67 schema_name: "message_remote".to_string(),
68 schema_version: 1,
69 message_id: message.message_id.clone(),
70 locations: remote.locations.clone(),
71 })
72 }
73
74 fn remote_state(&self) -> RemoteState {
75 RemoteState {
76 locations: self.locations.clone(),
77 }
78 }
79}
80
81fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
82 root.join(".afmail/messages")
83 .join(format!("{message_id}.eml"))
84}
85
86fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
87 root.join(".afmail/messages")
88 .join(format!("{message_id}.state.json"))
89}
90
91fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
92 root.join(".afmail/messages")
93 .join(format!("{message_id}.remote.json"))
94}
95
96fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
97 let path = message_remote_path(root, message_id);
98 if !path.exists() {
99 return Ok(None);
100 }
101 let data = read_to_string(&path, "read message remote")?;
102 let remote: MessageRemoteFile =
103 serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
104 if remote.schema_name != "message_remote"
105 || remote.schema_version != 1
106 || remote.message_id != message_id
107 {
108 return Err(AppError::new(
109 "message_remote_invalid",
110 format!("invalid message remote sidecar: {}", rel_path(root, &path)),
111 ));
112 }
113 Ok(Some(remote))
114}
115
116fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
117 let path = message_remote_path(root, &remote.message_id);
118 if let Some(parent) = path.parent() {
119 create_dir_all(parent)?;
120 }
121 let mut normalized = remote.clone();
122 normalized.schema_name = "message_remote".to_string();
123 normalized.schema_version = 1;
124 write_json_pretty(&path, &normalized)
125}
126
127fn default_message_workspace() -> WorkspaceState {
128 WorkspaceState {
129 status: "triage".to_string(),
130 archive_uid: None,
131 archived_rfc3339: None,
132 origin: None,
133 remote_sync: None,
134 push: None,
135 }
136}
137
138pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
139 validate_id("message_id", message_id)?;
140 let message_dir = root.join(".afmail/messages");
141 for path in [
142 root.join("messages").join(format!("{message_id}.json")),
143 message_dir.join(format!("{message_id}.json")),
144 message_dir.join(format!("{message_id}.eml")),
145 message_dir.join(format!("{message_id}.state.json")),
146 message_dir.join(format!("{message_id}.remote.json")),
147 message_dir.join(format!("{message_id}.txt")),
148 ] {
149 if path.exists() {
150 remove_file(&path)?;
151 }
152 }
153 let files_dir = message_dir.join(format!("{message_id}.files"));
154 if files_dir.exists() {
155 remove_dir_all(&files_dir)?;
156 }
157 Ok(())
158}
159
160pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
161 attachments
162 .iter()
163 .map(|attachment| {
164 json!({
165 "part_id": attachment.part_id.as_str(),
166 "filename": attachment.filename.as_str(),
167 "saved_filename": saved_filename_for_attachment(attachment),
168 "content_type": attachment.content_type.as_str(),
169 "size_bytes": attachment.size_bytes,
170 "fetched": attachment.fetched,
171 "file_path": attachment.file_path.as_deref().unwrap_or(""),
172 "storage": if attachment.fetched { "message_cache" } else { "" },
173 })
174 })
175 .collect()
176}
177
178pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
179 let data = read_to_string(path, "read message json")?;
180 let message: MessageFile =
181 serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
182 if message.schema_name != "message" || message.schema_version != 1 {
183 return Err(AppError::new(
184 "message_cache_invalid",
185 format!("unsupported message cache schema: {}", path_to_string(path)),
186 ));
187 }
188 MessageStatus::parse(&message.workspace.status)?;
189 if let Some(direction) = message.direction.as_deref() {
190 MailDirection::parse(direction)?;
191 }
192 Ok(message)
193}
194
195pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
196 let normalized = value
197 .trim()
198 .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
199 .trim()
200 .to_ascii_lowercase();
201 if normalized.is_empty() {
202 None
203 } else {
204 Some(normalized)
205 }
206}
207
208pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
209 let mut ids = Vec::new();
210 let mut rest = value;
211 while let Some(start) = rest.find('<') {
212 let after_start = &rest[start + 1..];
213 let Some(end) = after_start.find('>') else {
214 break;
215 };
216 if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
217 ids.push(id);
218 }
219 rest = &after_start[end + 1..];
220 }
221 if ids.is_empty() {
222 ids.extend(
223 value
224 .split_whitespace()
225 .filter_map(normalize_rfc822_message_id),
226 );
227 }
228 ids.sort();
229 ids.dedup();
230 ids
231}
232
233pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
234 let mut ids = Vec::new();
235 if let Some(in_reply_to) = &message.in_reply_to {
236 ids.extend(rfc822_message_id_candidates(in_reply_to));
237 }
238 for reference in &message.references {
239 ids.extend(rfc822_message_id_candidates(reference));
240 }
241 ids.sort();
242 ids.dedup();
243 ids
244}
245
246impl Workspace {
247 pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
248 let path = message_remote_path(&self.root, &message.message_id);
249 if let Some(remote) = MessageRemoteFile::from_message(message) {
250 write_message_remote_file(&self.root, &remote)
251 } else if path.exists() {
252 remove_file(&path)
253 } else {
254 Ok(())
255 }
256 }
257
258 pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
259 let mut message = message.clone();
260 message.schema_name = "message".to_string();
261 message.schema_version = 1;
262 if message.eml_path.is_none() {
263 message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
264 }
265 let deprecated_state = message_state_path(&self.root, &message.message_id);
266 if deprecated_state.exists() {
267 remove_file(&deprecated_state)?;
268 }
269 let path = self.message_path(&message.message_id);
270 if let Some(parent) = path.parent() {
271 create_dir_all(parent)?;
272 }
273 write_json_pretty(&path, &message)
274 }
275
276 pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
277 self.write_message_materialized_cache(message)
278 }
279
280 pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
281 self.persist_message_remote(message)?;
282 self.write_message_materialized_cache(message)
283 }
284
285 fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
286 if !self.message_cache_needs_materialize(message_id)? {
287 return Ok(false);
288 }
289 self.materialize_message_cache(message_id)?;
290 Ok(true)
291 }
292
293 fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
294 validate_id("message_id", message_id)?;
295 let eml_path = message_eml_path(&self.root, message_id);
296 let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
297 let prior = read_message(&self.message_path(message_id)).ok();
298 let remote =
299 read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
300 let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
301 let mut parsed = crate::mail::parse_message_with_options(
302 message_id.to_string(),
303 &raw,
304 crate::mail::MessageParseOptions {
305 direction,
306 workspace: default_message_workspace(),
307 remote,
308 received_rfc3339: prior
309 .as_ref()
310 .and_then(|message| message.received_rfc3339.clone()),
311 sent_rfc3339: prior
312 .as_ref()
313 .and_then(|message| message.sent_rfc3339.clone()),
314 attachments: prior
315 .as_ref()
316 .map(|message| message.attachments.clone())
317 .unwrap_or_default(),
318 },
319 )?;
320 self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
321 self.apply_materialized_workspace_overlays(&mut parsed.message)?;
322 let config = MailConfig::load(&self.root)?;
323 self.apply_identity_match(&mut parsed.message, &config)?;
324 let contact_map = self.contact_email_map()?;
325 self.apply_contact_link(&mut parsed.message, &contact_map);
326 self.write_message_materialized_cache(&parsed.message)?;
327 Ok(parsed.message)
328 }
329
330 fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
331 let cache_path = self.message_path(message_id);
332 if !cache_path.exists() {
333 return Ok(true);
334 }
335 let data = match read_to_string(&cache_path, "read message cache") {
336 Ok(data) => data,
337 Err(_) => return Ok(true),
338 };
339 let value: Value = match serde_json::from_str(&data) {
340 Ok(value) => value,
341 Err(_) => return Ok(true),
342 };
343 if value.get("schema_name").and_then(Value::as_str) != Some("message")
344 || value.get("schema_version").and_then(Value::as_u64) != Some(1)
345 {
346 return Ok(true);
347 }
348 let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
349 Ok(time) => time,
350 Err(_) => return Ok(true),
351 };
352 for input in [
353 message_eml_path(&self.root, message_id),
354 message_remote_path(&self.root, message_id),
355 ] {
356 if !input.exists() {
357 continue;
358 }
359 if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
360 if input_modified > cache_modified {
361 return Ok(true);
362 }
363 }
364 }
365 Ok(false)
366 }
367
368 fn infer_materialized_direction(
369 &self,
370 raw: &[u8],
371 prior: Option<&MessageFile>,
372 remote: Option<&RemoteState>,
373 ) -> Option<String> {
374 if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
375 return Some(direction);
376 }
377 if let Some(remote) = remote {
378 if let Ok(config) = MailConfig::load(&self.root) {
379 for location in &remote.locations {
380 if let Some(mailbox_id) = location.mailbox_id.as_deref() {
381 if let Ok(action) = config.pull_action(mailbox_id) {
382 return Some(action.direction.as_str().to_string());
383 }
384 }
385 }
386 }
387 }
388 let local_message_id = mail_parser::MessageParser::default()
389 .parse(raw)
390 .and_then(|message| message.message_id().map(ToString::to_string))
391 .is_some_and(|message_id| {
392 message_id
393 .trim()
394 .trim_matches(|ch| matches!(ch, '<' | '>'))
395 .ends_with("@afmail.local")
396 });
397 Some(if local_message_id {
398 "outbound".to_string()
399 } else {
400 "inbound".to_string()
401 })
402 }
403
404 fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
405 let files_dir = self
406 .root
407 .join(".afmail/messages")
408 .join(format!("{message_id}.files"));
409 if !files_dir.is_dir() {
410 return;
411 }
412 for attachment in attachments {
413 if attachment.fetched
414 && attachment
415 .file_path
416 .as_deref()
417 .is_some_and(|path| self.root.join(path).is_file())
418 {
419 continue;
420 }
421 let candidate = files_dir.join(safe_attachment_filename(
422 &attachment.filename,
423 &attachment.part_id,
424 ));
425 if candidate.is_file() {
426 attachment.fetched = true;
427 attachment.file_path = Some(rel_path(&self.root, &candidate));
428 }
429 }
430 }
431
432 pub(super) fn apply_materialized_workspace_overlays(
433 &self,
434 message: &mut MessageFile,
435 ) -> Result<()> {
436 message.workspace.archive_uid = None;
437 message.workspace.archived_rfc3339 = None;
438 let cases = CaseIndex::build(self)?;
439 if let Some((status, _added_rfc3339)) =
440 self.disposition_state_for_message(&message.message_id)?
441 {
442 message.workspace.status = status.as_str().to_string();
443 message.workspace.origin = None;
444 } else if let Some((archive_uid, archived_rfc3339)) = self
445 .direct_archive_state_by_message()?
446 .get(&message.message_id)
447 {
448 message.workspace.status = "archived".to_string();
449 message.workspace.archive_uid = Some(archive_uid.clone());
450 message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
451 message.workspace.origin = None;
452 } else if cases.has_any_reference(&message.message_id) {
453 message.workspace.status = MessageStatus::Case.as_str().to_string();
454 } else if message.workspace.origin.is_some() {
455 message.workspace.status = MessageStatus::Archived.as_str().to_string();
456 } else {
457 message.workspace.status = MessageStatus::Triage.as_str().to_string();
458 }
459 let last_completed_rfc3339 = message
460 .workspace
461 .push
462 .as_ref()
463 .and_then(|push| push.last_completed_rfc3339.clone());
464 message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
465 if let Some(last_completed_rfc3339) = last_completed_rfc3339 {
466 let push = message
467 .workspace
468 .push
469 .get_or_insert_with(WorkspacePushState::default);
470 if push.last_completed_rfc3339.is_none() {
471 push.last_completed_rfc3339 = Some(last_completed_rfc3339);
472 }
473 }
474 if message
475 .workspace
476 .push
477 .as_ref()
478 .is_some_and(|push| !push.pending.is_empty())
479 && MessageStatus::parse(&message.workspace.status)? == MessageStatus::Triage
480 {
481 message.workspace.status = MessageStatus::PushQueued.as_str().to_string();
482 }
483 Ok(())
484 }
485
486 pub(super) fn apply_contact_link(
490 &self,
491 message: &mut MessageFile,
492 contact_map: &BTreeMap<String, MessageContact>,
493 ) {
494 message.contact = message
495 .from
496 .as_deref()
497 .and_then(|from| super::contacts::contact_for_from(contact_map, from));
498 }
499
500 pub(crate) fn apply_identity_match(
501 &self,
502 message: &mut MessageFile,
503 config: &MailConfig,
504 ) -> Result<()> {
505 if message
506 .direction
507 .as_deref()
508 .and_then(|direction| MailDirection::parse(direction).ok())
509 == Some(MailDirection::Outbound)
510 {
511 message.identity = None;
512 message.identity_email = None;
513 message.identity_match = None;
514 message.identity_candidates.clear();
515 message.observed_recipient_emails.clear();
516 return Ok(());
517 }
518 let matched = config.match_message_identity(&self.root, message)?;
519 message.identity = matched.identity;
520 message.identity_email = matched.identity_email;
521 message.identity_match = Some(matched.identity_match);
522 message.identity_candidates = matched.identity_candidates;
523 message.observed_recipient_emails = matched.observed_recipient_emails;
524 Ok(())
525 }
526
527 fn pending_push_state_for_message(
528 &self,
529 message_id: &str,
530 ) -> Result<Option<WorkspacePushState>> {
531 let mut state = WorkspacePushState::default();
532 for item in crate::push_queue::pending_items(&self.root)? {
533 if !item.message_ids().iter().any(|id| id == message_id) {
534 continue;
535 }
536 state.pending.push(WorkspacePendingPush {
537 push_id: item.push_id.clone(),
538 kind: item.display_kind(),
539 queued_rfc3339: item.created_rfc3339.clone(),
540 last_error: item.last_error.clone(),
541 });
542 }
543 state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
544 if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
545 Ok(None)
546 } else {
547 Ok(Some(state))
548 }
549 }
550
551 pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
552 let messages_dir = self.root.join(".afmail/messages");
553 if !messages_dir.exists() {
554 return Ok(MessageCacheRebuildTotals::default());
555 }
556 let mut totals = MessageCacheRebuildTotals::default();
557 let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
558 .into_iter()
559 .map(|entry| entry.path())
560 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
561 .collect::<Vec<_>>();
562 eml_paths.sort();
563
564 for eml_path in eml_paths {
565 let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
566 continue;
567 };
568 validate_id("message_id", message_id)?;
569 if self.materialize_message_cache_if_needed(message_id)? {
570 totals.rebuilt_count += 1;
571 }
572 }
573
574 for entry in read_dir(&messages_dir, "read message cache")? {
575 let path = entry.path();
576 if path.extension().and_then(|s| s.to_str()) == Some("txt") {
577 remove_file(&path)?;
578 totals.removed_text_cache_count += 1;
579 }
580 }
581 Ok(totals)
582 }
583
584 fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
585 let mut out = BTreeMap::new();
586 for archive_uid in self.archive_message_category_ids()? {
587 let data = self.read_archive_messages(&archive_uid)?;
588 for item in data.items {
589 out.insert(item.message_id, (archive_uid.clone(), item.added_rfc3339));
590 }
591 }
592 Ok(out)
593 }
594
595 pub fn message_show(&self, message_id: &str) -> Result<Value> {
596 self.require_workspace()?;
597 validate_id("message_id", message_id)?;
598 let config = MailConfig::load(&self.root)?;
599 let message = self.read_message_by_id(message_id)?;
600 let body_text = self.message_body_text(&message)?;
601 let flags = message_remote_flags(&message);
602 let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
603 let flagged = flags
604 .iter()
605 .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
606 let push = message.workspace.push.clone();
607 let contact = message.contact.clone();
608 Ok(json!({
609 "code": "message_show",
610 "message_id": message.message_id.as_str(),
611 "from": message.from.as_deref().unwrap_or(""),
612 "to": &message.to,
613 "cc": &message.cc,
614 "bcc": &message.bcc,
615 "reply_to": &message.reply_to,
616 "sender": message.sender.as_deref().unwrap_or(""),
617 "subject": message.subject.as_deref().unwrap_or(""),
618 "direction": message.direction.as_deref().unwrap_or(""),
619 "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
620 "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
621 "body_text": body_text,
622 "attachment_count": message.attachments.len(),
623 "attachments": attachment_metadata_values(&message.attachments),
624 "mailbox_ids": message_mailbox_ids(&message, &config),
625 "flags": flags,
626 "unread": unread,
627 "flagged": flagged,
628 "remote_missing": message_remote_missing(&message),
629 "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
630 "remote_effect_pending": message_remote_effect_pending(&message),
631 "push": push,
632 "identity": message.identity.as_deref().unwrap_or(""),
633 "identity_email": message.identity_email.as_deref().unwrap_or(""),
634 "identity_match": message.identity_match.as_deref().unwrap_or(""),
635 "identity_candidates": &message.identity_candidates,
636 "observed_recipient_emails": &message.observed_recipient_emails,
637 "view_path": self.message_existing_view_path(&message)?,
638 "json_path": format!("messages/{message_id}.json"),
639 "message": message_template_value(&message)?,
640 "contact": contact,
641 }))
642 }
643
644 pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
645 self.require_workspace()?;
646 let reason = self.checked_reason(reason)?;
647 let config = crate::config::MailConfig::load(&self.root)?;
648 validate_id("message_id", message_id)?;
649 self.ensure_no_related_conversation(message_id)?;
650 let message_ids = vec![message_id.to_string()];
651 self.ensure_message_ids_unreferenced(&message_ids, None)?;
652 let locations = self.message_remote_locations(&message_ids)?;
653 let transaction = self.begin_transaction(
654 "message_spam",
655 vec![
656 format!("messages/{message_id}.json"),
657 ".afmail/push".to_string(),
658 ],
659 )?;
660 let item = crate::push_queue::queue_action_steps(
661 &self.root,
662 "message.spam",
663 &message_ids,
664 &locations,
665 &config.actions.message_spam.steps,
666 None,
667 )?;
668 self.update_messages_workspace(&message_ids, "spam")?;
669 if let Some(item) = &item {
670 self.record_pending_push_item(item)?;
671 }
672 self.refresh_disposition_views()?;
673 transaction.commit()?;
674 self.append_audit_event(
675 "message_spam_marked",
676 vec![audit_target("message", message_id)],
677 reason,
678 json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
679 )?;
680 serde_json::to_value(MessageDispositionResult {
681 code: "message_spam_marked",
682 message_id: message_id.to_string(),
683 special_use: SpecialUseKind::Junk.as_str().to_string(),
684 message_ids,
685 location_count: locations.len(),
686 queued_location_count: locations.len(),
687 queued: item.is_some(),
688 push_id: item.as_ref().map(|item| item.push_id.clone()),
689 })
690 .map_err(|e| AppError::json("serialize message disposition result", &e))
691 }
692
693 pub fn restore_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
694 self.require_workspace()?;
695 let reason = self.checked_reason(reason)?;
696 validate_id("message_id", message_id)?;
697 let message = self.read_message_by_id(message_id)?;
698 match message.workspace.status.as_str() {
699 "spam" => self.restore_local_message_disposition(
700 message_id,
701 "spam",
702 "message.spam",
703 "message_restored",
704 "message_restored",
705 reason,
706 ),
707 "trashed" => self.restore_local_message_disposition(
708 message_id,
709 "trashed",
710 "message.trash",
711 "message_restored",
712 "message_restored",
713 reason,
714 ),
715 "archived" => {
716 let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
717 AppError::new(
718 "invalid_request",
719 format!("message {message_id} is not directly archived"),
720 )
721 })?;
722 self.restore_direct_archive_message(
723 &archive_uid,
724 message_id,
725 reason,
726 "message_restored",
727 "message_restored",
728 )
729 }
730 other => Err(AppError::new(
731 "invalid_request",
732 format!("message {message_id} has no disposition to restore (status: {other})"),
733 )),
734 }
735 }
736
737 pub fn archive_message(
738 &self,
739 message_id: &str,
740 archive_ref: &str,
741 summary: Option<&str>,
742 reason: Option<&str>,
743 ) -> Result<Value> {
744 self.require_workspace()?;
745 let reason = self.checked_reason(reason)?;
746 validate_id("message_id", message_id)?;
747 let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
748 self.ensure_no_related_conversation(message_id)?;
749 let transaction = self.begin_transaction(
750 "message_archive",
751 vec![
752 format!("messages/{message_id}.json"),
753 format!("archive/notifications/{archive_uid}"),
754 ".afmail/push".to_string(),
755 ],
756 )?;
757 let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
758 self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
759 self.refresh_archive_message_category(&archive_uid)?;
760 let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
761 transaction.commit()?;
762 self.append_audit_event(
763 "message_archived",
764 vec![
765 audit_target("message", message_id),
766 audit_target("archive", &archive_uid),
767 ],
768 reason,
769 json!({
770 "message_id": message_id,
771 "archive_uid": archive_uid,
772 "summary": summary,
773 "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
774 }),
775 )?;
776 serde_json::to_value(MessageArchiveResult {
777 code: "message_archived",
778 message_id: message_id.to_string(),
779 archive_uid,
780 path: rel_path(&self.root, &archive_dir),
781 special_use: SpecialUseKind::Archive.as_str().to_string(),
782 eligible_message_ids: queue.eligible_message_ids,
783 location_count: queue.location_count,
784 queued_location_count: queue.queued_location_count,
785 queued: !queue.items.is_empty(),
786 push_ids: queue
787 .items
788 .iter()
789 .map(|item| item.push_id.clone())
790 .collect::<Vec<_>>(),
791 push_id: queue.items.first().map(|item| item.push_id.clone()),
792 })
793 .map_err(|e| AppError::json("serialize message archive result", &e))
794 }
795
796 pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
797 self.require_workspace()?;
798 let reason = self.checked_reason(reason)?;
799 let config = crate::config::MailConfig::load(&self.root)?;
800 validate_id("message_id", message_id)?;
801 self.ensure_no_related_conversation(message_id)?;
802 let message_ids = vec![message_id.to_string()];
803 self.ensure_message_ids_unreferenced(&message_ids, None)?;
804 let locations = self.message_remote_locations(&message_ids)?;
805 let transaction = self.begin_transaction(
806 "message_trash",
807 vec![
808 format!("messages/{message_id}.json"),
809 ".afmail/push".to_string(),
810 ],
811 )?;
812 let item = crate::push_queue::queue_action_steps(
813 &self.root,
814 "message.trash",
815 &message_ids,
816 &locations,
817 &config.actions.message_trash.steps,
818 None,
819 )?;
820 self.update_messages_workspace(&message_ids, "trashed")?;
821 if let Some(item) = &item {
822 self.record_pending_push_item(item)?;
823 }
824 self.refresh_disposition_views()?;
825 transaction.commit()?;
826 self.append_audit_event(
827 "message_trashed",
828 vec![audit_target("message", message_id)],
829 reason,
830 json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
831 )?;
832 serde_json::to_value(MessageDispositionResult {
833 code: "message_trashed",
834 message_id: message_id.to_string(),
835 special_use: SpecialUseKind::Trash.as_str().to_string(),
836 message_ids,
837 location_count: locations.len(),
838 queued_location_count: locations.len(),
839 queued: item.is_some(),
840 push_id: item.as_ref().map(|item| item.push_id.clone()),
841 })
842 .map_err(|e| AppError::json("serialize message disposition result", &e))
843 }
844
845 pub(super) fn message_existing_view_path(
846 &self,
847 message: &MessageFile,
848 ) -> Result<Option<String>> {
849 let message_id = message.message_id.as_str();
850 let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
851 if triage_path.is_file() {
852 return Ok(Some(rel_path(&self.root, &triage_path)));
853 }
854 for dir in ["spam", "trash", "deleted"] {
855 let path = self.root.join(dir).join(format!("{message_id}.md"));
856 if path.is_file() {
857 return Ok(Some(rel_path(&self.root, &path)));
858 }
859 }
860 for (case_uid, case_path) in self.all_case_entries()? {
861 let case_messages = read_case_messages(&case_path, &case_uid);
862 if case_messages
863 .as_ref()
864 .map(|messages| messages.contains_message(message_id))
865 .unwrap_or(false)
866 {
867 let message_view = case_message_view_path(&case_path, message_id);
868 if message_view.is_file() {
869 return Ok(Some(rel_path(&self.root, &message_view)));
870 }
871 let case_view = case_path.join("case.md");
872 if case_view.is_file() {
873 return Ok(Some(rel_path(&self.root, &case_view)));
874 }
875 }
876 }
877 if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
878 let archive_view = self.archive_message_view_path(archive_uid, message_id);
879 if archive_view.is_file() {
880 return Ok(Some(rel_path(&self.root, &archive_view)));
881 }
882 }
883 Ok(None)
884 }
885
886 pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
887 Ok(message.body_text.clone())
888 }
889
890 pub(super) fn message_conversation_for_dir(
891 &self,
892 message: &MessageFile,
893 output_dir: Option<&Path>,
894 ) -> Result<String> {
895 let config = MailConfig::load(&self.root)?;
896 let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
897 self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
898 }
899
900 pub(super) fn message_conversation_with_renderer(
901 &self,
902 message: &MessageFile,
903 config: &MailConfig,
904 renderer: &mut MarkdownTemplateRenderer<'_>,
905 output_dir: Option<&Path>,
906 ) -> Result<String> {
907 let body_text = self.message_body_text(message)?;
908 renderer.render(
909 TemplateKey::MessageSection,
910 &message_section_context(
911 Some(&self.root),
912 message,
913 &body_text,
914 config.template_language(),
915 config
916 .default_identity()
917 .ok()
918 .map(|identity| identity.email.as_str()),
919 output_dir,
920 )?,
921 )
922 }
923
924 pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
925 self.rebuild_message_cache_from_eml()?;
926 let mut index = BTreeMap::new();
927 for path in message_json_paths(&self.root)? {
928 let message = read_message(&path)?;
929 if let Some(normalized) = message
930 .rfc822_message_id
931 .as_deref()
932 .and_then(normalize_rfc822_message_id)
933 {
934 index
935 .entry(normalized)
936 .or_insert_with(|| message.message_id.clone());
937 }
938 }
939 Ok(index)
940 }
941
942 pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
943 validate_id("message_id", message_id)?;
944 let current = self.read_message_by_id(message_id)?;
945 let rfc822_index = self.rfc822_message_id_index()?;
946 let mut related = BTreeSet::new();
947
948 for header_id in message_reply_header_ids(¤t) {
949 if let Some(local_message_id) = rfc822_index.get(&header_id) {
950 if local_message_id != message_id {
951 related.insert(local_message_id.clone());
952 }
953 }
954 }
955
956 let Some(current_rfc822_id) = current
957 .rfc822_message_id
958 .as_deref()
959 .and_then(normalize_rfc822_message_id)
960 else {
961 return Ok(related.into_iter().collect());
962 };
963
964 for path in message_json_paths(&self.root)? {
965 let other = read_message(&path)?;
966 if other.message_id == message_id {
967 continue;
968 }
969 if message_reply_header_ids(&other)
970 .iter()
971 .any(|header_id| header_id == ¤t_rfc822_id)
972 {
973 related.insert(other.message_id);
974 }
975 }
976
977 Ok(related.into_iter().collect())
978 }
979
980 pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
981 let related_message_ids = self.related_message_ids(message_id)?;
982 if related_message_ids.is_empty() {
983 return Ok(());
984 }
985 let mut suggested_commands = vec![format!(
986 "afmail case create --name NAME --message {message_id} --reason TEXT"
987 )];
988 for related_id in &related_message_ids {
989 suggested_commands.push(format!(
990 "afmail case add CASE_REF {related_id} --reason TEXT"
991 ));
992 }
993 suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
994 Err(AppError::new(
995 "message_has_related_conversation_use_case",
996 "message has RFC-header-confirmed related conversation",
997 )
998 .with_hint(
999 "Create a case for the conversation, add the related messages, then archive the case.",
1000 )
1001 .with_details(json!({
1002 "message_id": message_id,
1003 "related_message_ids": related_message_ids,
1004 "suggested_commands": suggested_commands
1005 })))
1006 }
1007
1008 pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
1009 for message_id in message_ids {
1010 self.refresh_message_after_ref_change(message_id)?;
1011 }
1012 Ok(())
1013 }
1014
1015 pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
1016 validate_id("message_id", message_id)?;
1017 let message = self.read_message_by_id(message_id)?;
1018 let cases = CaseIndex::build(self)?;
1019 if self.triage_candidate(&message, &cases)? {
1020 self.write_triage_view(&message)?;
1021 } else {
1022 self.remove_triage_view_for_message(message_id)?;
1023 }
1024 self.refresh_all_case_message_views()?;
1025 self.refresh_archive_indexes()
1026 }
1027
1028 pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
1029 validate_id("message_id", message_id)?;
1030 let mut msg = self.read_message_by_id(message_id)?;
1031 let cases = CaseIndex::build(self)?;
1032 self.apply_materialized_workspace_overlays(&mut msg)?;
1033 msg.workspace.remote_sync = None;
1034 self.write_message_materialized_cache(&msg)?;
1035 if self.triage_candidate(&msg, &cases)? {
1036 self.write_triage_view(&msg)?;
1037 } else {
1038 self.remove_triage_view_for_message(message_id)?;
1039 }
1040 Ok(())
1041 }
1042
1043 pub(super) fn update_messages_workspace(
1044 &self,
1045 message_ids: &[String],
1046 status: &str,
1047 ) -> Result<()> {
1048 let status = MessageStatus::parse(status)?;
1049 for message_id in message_ids {
1050 validate_id("message_id", message_id)?;
1051 if disposition_spec_for_status(status).is_some() {
1052 self.set_message_disposition(status, message_id, None, &now_rfc3339())?;
1053 } else {
1054 self.clear_message_from_all_dispositions(message_id)?;
1055 }
1056 let mut msg = self.read_message_by_id(message_id)?;
1057 msg.workspace.status = status.as_str().to_string();
1058 if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1059 msg.workspace.archive_uid = None;
1060 msg.workspace.archived_rfc3339 = None;
1061 msg.workspace.origin = None;
1062 }
1063 msg.workspace.remote_sync = None;
1064 self.write_message_materialized_cache(&msg)?;
1065 self.remove_triage_view_for_message(message_id)?;
1066 }
1067 Ok(())
1068 }
1069
1070 pub(super) fn restore_local_message_disposition(
1071 &self,
1072 message_id: &str,
1073 expected_status: &str,
1074 push_kind: &str,
1075 event_kind: &str,
1076 result_code: &str,
1077 reason: Option<&str>,
1078 ) -> Result<Value> {
1079 self.require_workspace()?;
1080 let reason = self.checked_reason(reason)?;
1081 validate_id("message_id", message_id)?;
1082 let mut message = self.read_message_by_id(message_id)?;
1083 if message.workspace.status != expected_status {
1084 return Err(AppError::new(
1085 "invalid_request",
1086 format!("message {message_id} is not {expected_status}"),
1087 ));
1088 }
1089 let removed_push =
1090 crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1091 let push_ids = removed_push
1092 .iter()
1093 .map(|item| item.push_id.clone())
1094 .collect::<Vec<_>>();
1095 self.clear_message_disposition(MessageStatus::parse(expected_status)?, message_id)?;
1096 message.workspace.status = "triage".to_string();
1097 message.workspace.remote_sync = None;
1098 self.write_message_materialized_cache(&message)?;
1099 self.refresh_message_after_ref_change(message_id)?;
1100 self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1101 self.refresh_disposition_views()?;
1102 self.append_audit_event(
1103 event_kind,
1104 vec![audit_target("message", message_id)],
1105 reason,
1106 json!({
1107 "message_id": message_id,
1108 "from_status": expected_status,
1109 "to_status": "triage",
1110 "removed_push_ids": push_ids.clone(),
1111 }),
1112 )?;
1113 Ok(json!({
1114 "code": result_code,
1115 "message_id": message_id,
1116 "from_status": expected_status,
1117 "status": "triage",
1118 "triage_path": format!("triage/{message_id}.md"),
1119 "removed_push_count": push_ids.len(),
1120 "push_ids": push_ids,
1121 }))
1122 }
1123
1124 pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1125 let path = self.root.join("triage").join(format!("{message_id}.md"));
1126 if path.exists() {
1127 remove_file(&path)?;
1128 }
1129 Ok(())
1130 }
1131
1132 pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1133 validate_id("message_id", message_id)?;
1134 if message_eml_path(&self.root, message_id).is_file() {
1135 self.materialize_message_cache_if_needed(message_id)?;
1136 }
1137 let path = self.message_path(message_id);
1138 let mut message = read_message(&path)?;
1139 let original_workspace = message.workspace.clone();
1140 self.apply_materialized_workspace_overlays(&mut message)?;
1141 if message.workspace != original_workspace {
1142 self.write_message_materialized_cache(&message)?;
1143 }
1144 Ok(message)
1145 }
1146
1147 pub(crate) fn relocate_message(
1148 &self,
1149 message_id: &str,
1150 target_locations: &[crate::types::RemoteLocation],
1151 ) -> Result<()> {
1152 validate_id("message_id", message_id)?;
1153 let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1154 for location in target_locations {
1155 if !locations.iter().any(|existing| {
1156 existing.mailbox_name == location.mailbox_name
1157 && existing.uid_validity == location.uid_validity
1158 && existing.uid == location.uid
1159 }) {
1160 locations.push(location.clone());
1161 }
1162 }
1163 if locations.is_empty() {
1164 return Ok(());
1165 };
1166 let mut message = self.read_message_by_id(message_id)?;
1167 message.remote = Some(crate::types::RemoteState { locations });
1168 self.persist_message_remote(&message)?;
1169 self.write_message_materialized_cache(&message)?;
1170 Ok(())
1171 }
1172}