1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4 let dir = root.join("messages");
5 if !dir.exists() {
6 return Ok(Vec::new());
7 }
8 let mut paths = read_dir(&dir, "read messages")?
9 .into_iter()
10 .map(|entry| entry.path())
11 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12 .collect::<Vec<_>>();
13 paths.sort();
14 Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageRemoteFile {
20 schema_name: String,
21 schema_version: u64,
22 message_id: String,
23 #[serde(default, skip_serializing_if = "Vec::is_empty")]
24 locations: Vec<RemoteLocation>,
25}
26
27#[derive(Debug, Serialize)]
28struct MessageDispositionResult {
29 code: &'static str,
30 message_id: String,
31 special_use: String,
32 message_ids: Vec<String>,
33 location_count: usize,
34 queued_location_count: usize,
35 queued: bool,
36 push_id: Option<String>,
37}
38
39#[derive(Debug, Serialize)]
40struct MessageArchiveResult {
41 code: &'static str,
42 message_id: String,
43 archive_uid: String,
44 path: String,
45 special_use: String,
46 eligible_message_ids: Vec<String>,
47 location_count: usize,
48 queued_location_count: usize,
49 queued: bool,
50 push_ids: Vec<String>,
51 push_id: Option<String>,
52}
53
54#[derive(Clone, Debug, Default)]
55pub(super) struct MessageCacheRebuildTotals {
56 pub(super) rebuilt_count: usize,
57 pub(super) removed_text_cache_count: usize,
58}
59
60impl MessageRemoteFile {
61 fn from_message(message: &MessageFile) -> Option<Self> {
62 let remote = message.remote.as_ref()?;
63 if remote.locations.is_empty() {
64 return None;
65 }
66 Some(Self {
67 schema_name: "message_remote".to_string(),
68 schema_version: 1,
69 message_id: message.message_id.clone(),
70 locations: remote.locations.clone(),
71 })
72 }
73
74 fn remote_state(&self) -> RemoteState {
75 RemoteState {
76 locations: self.locations.clone(),
77 }
78 }
79}
80
81fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
82 root.join(".afmail/messages")
83 .join(format!("{message_id}.eml"))
84}
85
86fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
87 root.join(".afmail/messages")
88 .join(format!("{message_id}.state.json"))
89}
90
91fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
92 root.join(".afmail/messages")
93 .join(format!("{message_id}.remote.json"))
94}
95
96fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
97 let path = message_remote_path(root, message_id);
98 if !path.exists() {
99 return Ok(None);
100 }
101 let data = read_to_string(&path, "read message remote")?;
102 let remote: MessageRemoteFile =
103 serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
104 if remote.schema_name != "message_remote"
105 || remote.schema_version != 1
106 || remote.message_id != message_id
107 {
108 return Err(AppError::new(
109 "message_remote_invalid",
110 format!("invalid message remote sidecar: {}", rel_path(root, &path)),
111 ));
112 }
113 Ok(Some(remote))
114}
115
116fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
117 let path = message_remote_path(root, &remote.message_id);
118 if let Some(parent) = path.parent() {
119 create_dir_all(parent)?;
120 }
121 let mut normalized = remote.clone();
122 normalized.schema_name = "message_remote".to_string();
123 normalized.schema_version = 1;
124 write_json_pretty(&path, &normalized)
125}
126
127fn default_message_workspace() -> WorkspaceState {
128 WorkspaceState {
129 status: "triage".to_string(),
130 archive_uid: None,
131 archived_rfc3339: None,
132 origin: None,
133 remote_sync: None,
134 push: None,
135 }
136}
137
138pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
139 validate_id("message_id", message_id)?;
140 let message_dir = root.join(".afmail/messages");
141 for path in [
142 root.join("messages").join(format!("{message_id}.json")),
143 message_dir.join(format!("{message_id}.json")),
144 message_dir.join(format!("{message_id}.eml")),
145 message_dir.join(format!("{message_id}.state.json")),
146 message_dir.join(format!("{message_id}.remote.json")),
147 message_dir.join(format!("{message_id}.txt")),
148 ] {
149 if path.exists() {
150 remove_file(&path)?;
151 }
152 }
153 let files_dir = message_dir.join(format!("{message_id}.files"));
154 if files_dir.exists() {
155 remove_dir_all(&files_dir)?;
156 }
157 Ok(())
158}
159
160pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
161 attachments
162 .iter()
163 .map(|attachment| {
164 json!({
165 "part_id": attachment.part_id.as_str(),
166 "filename": attachment.filename.as_str(),
167 "saved_filename": saved_filename_for_attachment(attachment),
168 "content_type": attachment.content_type.as_str(),
169 "size_bytes": attachment.size_bytes,
170 "fetched": attachment.fetched,
171 "file_path": attachment.file_path.as_deref().unwrap_or(""),
172 "storage": if attachment.fetched { "message_cache" } else { "" },
173 })
174 })
175 .collect()
176}
177
178pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
179 let data = read_to_string(path, "read message json")?;
180 let message: MessageFile =
181 serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
182 if message.schema_name != "message" || message.schema_version != 1 {
183 return Err(AppError::new(
184 "message_cache_invalid",
185 format!("unsupported message cache schema: {}", path_to_string(path)),
186 ));
187 }
188 MessageStatus::parse(&message.workspace.status)?;
189 if let Some(direction) = message.direction.as_deref() {
190 MailDirection::parse(direction)?;
191 }
192 Ok(message)
193}
194
195pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
196 let normalized = value
197 .trim()
198 .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
199 .trim()
200 .to_ascii_lowercase();
201 if normalized.is_empty() {
202 None
203 } else {
204 Some(normalized)
205 }
206}
207
208pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
209 let mut ids = Vec::new();
210 let mut rest = value;
211 while let Some(start) = rest.find('<') {
212 let after_start = &rest[start + 1..];
213 let Some(end) = after_start.find('>') else {
214 break;
215 };
216 if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
217 ids.push(id);
218 }
219 rest = &after_start[end + 1..];
220 }
221 if ids.is_empty() {
222 ids.extend(
223 value
224 .split_whitespace()
225 .filter_map(normalize_rfc822_message_id),
226 );
227 }
228 ids.sort();
229 ids.dedup();
230 ids
231}
232
233pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
234 let mut ids = Vec::new();
235 if let Some(in_reply_to) = &message.in_reply_to {
236 ids.extend(rfc822_message_id_candidates(in_reply_to));
237 }
238 for reference in &message.references {
239 ids.extend(rfc822_message_id_candidates(reference));
240 }
241 ids.sort();
242 ids.dedup();
243 ids
244}
245
246impl Workspace {
247 pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
248 let path = message_remote_path(&self.root, &message.message_id);
249 if let Some(remote) = MessageRemoteFile::from_message(message) {
250 write_message_remote_file(&self.root, &remote)
251 } else if path.exists() {
252 remove_file(&path)
253 } else {
254 Ok(())
255 }
256 }
257
258 pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
259 let mut message = message.clone();
260 message.schema_name = "message".to_string();
261 message.schema_version = 1;
262 if message.eml_path.is_none() {
263 message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
264 }
265 let deprecated_state = message_state_path(&self.root, &message.message_id);
266 if deprecated_state.exists() {
267 remove_file(&deprecated_state)?;
268 }
269 let path = self.message_path(&message.message_id);
270 if let Some(parent) = path.parent() {
271 create_dir_all(parent)?;
272 }
273 write_json_pretty(&path, &message)
274 }
275
276 pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
277 self.write_message_materialized_cache(message)
278 }
279
280 pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
281 self.persist_message_remote(message)?;
282 self.write_message_materialized_cache(message)
283 }
284
285 fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
286 if !self.message_cache_needs_materialize(message_id)? {
287 return Ok(false);
288 }
289 self.materialize_message_cache(message_id)?;
290 Ok(true)
291 }
292
293 fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
294 validate_id("message_id", message_id)?;
295 let eml_path = message_eml_path(&self.root, message_id);
296 let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
297 let prior = read_message(&self.message_path(message_id)).ok();
298 let remote =
299 read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
300 let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
301 let mut parsed = crate::mail::parse_message_with_options(
302 message_id.to_string(),
303 &raw,
304 crate::mail::MessageParseOptions {
305 direction,
306 workspace: default_message_workspace(),
307 remote,
308 received_rfc3339: prior
309 .as_ref()
310 .and_then(|message| message.received_rfc3339.clone()),
311 sent_rfc3339: prior
312 .as_ref()
313 .and_then(|message| message.sent_rfc3339.clone()),
314 attachments: prior
315 .as_ref()
316 .map(|message| message.attachments.clone())
317 .unwrap_or_default(),
318 },
319 )?;
320 self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
321 self.apply_materialized_workspace_overlays(&mut parsed.message)?;
322 self.write_message_materialized_cache(&parsed.message)?;
323 Ok(parsed.message)
324 }
325
326 fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
327 let cache_path = self.message_path(message_id);
328 if !cache_path.exists() {
329 return Ok(true);
330 }
331 let data = match read_to_string(&cache_path, "read message cache") {
332 Ok(data) => data,
333 Err(_) => return Ok(true),
334 };
335 let value: Value = match serde_json::from_str(&data) {
336 Ok(value) => value,
337 Err(_) => return Ok(true),
338 };
339 if value.get("schema_name").and_then(Value::as_str) != Some("message")
340 || value.get("schema_version").and_then(Value::as_u64) != Some(1)
341 {
342 return Ok(true);
343 }
344 let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
345 Ok(time) => time,
346 Err(_) => return Ok(true),
347 };
348 for input in [
349 message_eml_path(&self.root, message_id),
350 message_remote_path(&self.root, message_id),
351 ] {
352 if !input.exists() {
353 continue;
354 }
355 if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
356 if input_modified > cache_modified {
357 return Ok(true);
358 }
359 }
360 }
361 Ok(false)
362 }
363
364 fn infer_materialized_direction(
365 &self,
366 raw: &[u8],
367 prior: Option<&MessageFile>,
368 remote: Option<&RemoteState>,
369 ) -> Option<String> {
370 if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
371 return Some(direction);
372 }
373 if let Some(remote) = remote {
374 if let Ok(config) = MailConfig::load(&self.root) {
375 for location in &remote.locations {
376 if let Some(mailbox_id) = location.mailbox_id.as_deref() {
377 if let Ok(action) = config.pull_action(mailbox_id) {
378 return Some(action.direction.as_str().to_string());
379 }
380 }
381 }
382 }
383 }
384 let local_message_id = mail_parser::MessageParser::default()
385 .parse(raw)
386 .and_then(|message| message.message_id().map(ToString::to_string))
387 .is_some_and(|message_id| {
388 message_id
389 .trim()
390 .trim_matches(|ch| matches!(ch, '<' | '>'))
391 .ends_with("@afmail.local")
392 });
393 Some(if local_message_id {
394 "outbound".to_string()
395 } else {
396 "inbound".to_string()
397 })
398 }
399
400 fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
401 let files_dir = self
402 .root
403 .join(".afmail/messages")
404 .join(format!("{message_id}.files"));
405 if !files_dir.is_dir() {
406 return;
407 }
408 for attachment in attachments {
409 if attachment.fetched
410 && attachment
411 .file_path
412 .as_deref()
413 .is_some_and(|path| self.root.join(path).is_file())
414 {
415 continue;
416 }
417 let candidate = files_dir.join(safe_attachment_filename(
418 &attachment.filename,
419 &attachment.part_id,
420 ));
421 if candidate.is_file() {
422 attachment.fetched = true;
423 attachment.file_path = Some(rel_path(&self.root, &candidate));
424 }
425 }
426 }
427
428 pub(super) fn apply_materialized_workspace_overlays(
429 &self,
430 message: &mut MessageFile,
431 ) -> Result<()> {
432 message.workspace.archive_uid = None;
433 message.workspace.archived_rfc3339 = None;
434 let cases = CaseIndex::build(self)?;
435 if let Some((status, _added_rfc3339)) =
436 self.disposition_state_for_message(&message.message_id)?
437 {
438 message.workspace.status = status.as_str().to_string();
439 message.workspace.origin = None;
440 } else if let Some((archive_uid, archived_rfc3339)) = self
441 .direct_archive_state_by_message()?
442 .get(&message.message_id)
443 {
444 message.workspace.status = "archived".to_string();
445 message.workspace.archive_uid = Some(archive_uid.clone());
446 message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
447 message.workspace.origin = None;
448 } else if cases.has_any_reference(&message.message_id) {
449 message.workspace.status = MessageStatus::Case.as_str().to_string();
450 } else if message.workspace.origin.is_some() {
451 message.workspace.status = MessageStatus::Archived.as_str().to_string();
452 } else {
453 message.workspace.status = MessageStatus::Triage.as_str().to_string();
454 }
455 let last_completed_rfc3339 = message
456 .workspace
457 .push
458 .as_ref()
459 .and_then(|push| push.last_completed_rfc3339.clone());
460 message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
461 if let Some(last_completed_rfc3339) = last_completed_rfc3339 {
462 let push = message
463 .workspace
464 .push
465 .get_or_insert_with(WorkspacePushState::default);
466 if push.last_completed_rfc3339.is_none() {
467 push.last_completed_rfc3339 = Some(last_completed_rfc3339);
468 }
469 }
470 if message
471 .workspace
472 .push
473 .as_ref()
474 .is_some_and(|push| !push.pending.is_empty())
475 && MessageStatus::parse(&message.workspace.status)? == MessageStatus::Triage
476 {
477 message.workspace.status = MessageStatus::PushQueued.as_str().to_string();
478 }
479 Ok(())
480 }
481
482 fn pending_push_state_for_message(
483 &self,
484 message_id: &str,
485 ) -> Result<Option<WorkspacePushState>> {
486 let mut state = WorkspacePushState::default();
487 for item in crate::push_queue::pending_items(&self.root)? {
488 if !item.message_ids().iter().any(|id| id == message_id) {
489 continue;
490 }
491 state.pending.push(WorkspacePendingPush {
492 push_id: item.push_id.clone(),
493 kind: item.display_kind(),
494 queued_rfc3339: item.created_rfc3339.clone(),
495 last_error: item.last_error.clone(),
496 });
497 }
498 state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
499 if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
500 Ok(None)
501 } else {
502 Ok(Some(state))
503 }
504 }
505
506 pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
507 let messages_dir = self.root.join(".afmail/messages");
508 if !messages_dir.exists() {
509 return Ok(MessageCacheRebuildTotals::default());
510 }
511 let mut totals = MessageCacheRebuildTotals::default();
512 let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
513 .into_iter()
514 .map(|entry| entry.path())
515 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
516 .collect::<Vec<_>>();
517 eml_paths.sort();
518
519 for eml_path in eml_paths {
520 let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
521 continue;
522 };
523 validate_id("message_id", message_id)?;
524 if self.materialize_message_cache_if_needed(message_id)? {
525 totals.rebuilt_count += 1;
526 }
527 }
528
529 for entry in read_dir(&messages_dir, "read message cache")? {
530 let path = entry.path();
531 if path.extension().and_then(|s| s.to_str()) == Some("txt") {
532 remove_file(&path)?;
533 totals.removed_text_cache_count += 1;
534 }
535 }
536 Ok(totals)
537 }
538
539 fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
540 let mut out = BTreeMap::new();
541 for archive_uid in self.archive_message_category_ids()? {
542 let data = self.read_archive_messages(&archive_uid)?;
543 for item in data.items {
544 out.insert(item.message_id, (archive_uid.clone(), item.added_rfc3339));
545 }
546 }
547 Ok(out)
548 }
549
550 pub fn message_show(&self, message_id: &str) -> Result<Value> {
551 self.require_workspace()?;
552 validate_id("message_id", message_id)?;
553 let config = MailConfig::load(&self.root)?;
554 let message = self.read_message_by_id(message_id)?;
555 let body_text = self.message_body_text(&message)?;
556 let flags = message_remote_flags(&message);
557 let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
558 let flagged = flags
559 .iter()
560 .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
561 let push = message.workspace.push.clone();
562 Ok(json!({
563 "code": "message_show",
564 "message_id": message.message_id.as_str(),
565 "from": message.from.as_deref().unwrap_or(""),
566 "to": &message.to,
567 "cc": &message.cc,
568 "bcc": &message.bcc,
569 "reply_to": &message.reply_to,
570 "sender": message.sender.as_deref().unwrap_or(""),
571 "subject": message.subject.as_deref().unwrap_or(""),
572 "direction": message.direction.as_deref().unwrap_or(""),
573 "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
574 "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
575 "body_text": body_text,
576 "attachment_count": message.attachments.len(),
577 "attachments": attachment_metadata_values(&message.attachments),
578 "mailbox_ids": message_mailbox_ids(&message, &config),
579 "flags": flags,
580 "unread": unread,
581 "flagged": flagged,
582 "remote_missing": message_remote_missing(&message),
583 "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
584 "remote_effect_pending": message_remote_effect_pending(&message),
585 "push": push,
586 "view_path": self.message_existing_view_path(&message)?,
587 "json_path": format!("messages/{message_id}.json"),
588 "message": message_template_value(&message)?,
589 }))
590 }
591
592 pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
593 self.require_workspace()?;
594 let reason = self.checked_reason(reason)?;
595 let config = crate::config::MailConfig::load(&self.root)?;
596 validate_id("message_id", message_id)?;
597 self.ensure_no_related_conversation(message_id)?;
598 let message_ids = vec![message_id.to_string()];
599 self.ensure_message_ids_unreferenced(&message_ids, None)?;
600 let locations = self.message_remote_locations(&message_ids)?;
601 let transaction = self.begin_transaction(
602 "message_spam",
603 vec![
604 format!("messages/{message_id}.json"),
605 ".afmail/push".to_string(),
606 ],
607 )?;
608 let item = crate::push_queue::queue_action_steps(
609 &self.root,
610 "message.spam",
611 &message_ids,
612 &locations,
613 &config.actions.message_spam.steps,
614 None,
615 )?;
616 self.update_messages_workspace(&message_ids, "spam")?;
617 if let Some(item) = &item {
618 self.record_pending_push_item(item)?;
619 }
620 self.refresh_disposition_views()?;
621 transaction.commit()?;
622 self.append_audit_event(
623 "message_spam_marked",
624 vec![audit_target("message", message_id)],
625 reason,
626 json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
627 )?;
628 serde_json::to_value(MessageDispositionResult {
629 code: "message_spam_marked",
630 message_id: message_id.to_string(),
631 special_use: SpecialUseKind::Junk.as_str().to_string(),
632 message_ids,
633 location_count: locations.len(),
634 queued_location_count: locations.len(),
635 queued: item.is_some(),
636 push_id: item.as_ref().map(|item| item.push_id.clone()),
637 })
638 .map_err(|e| AppError::json("serialize message disposition result", &e))
639 }
640
641 pub fn restore_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
642 self.require_workspace()?;
643 let reason = self.checked_reason(reason)?;
644 validate_id("message_id", message_id)?;
645 let message = self.read_message_by_id(message_id)?;
646 match message.workspace.status.as_str() {
647 "spam" => self.restore_local_message_disposition(
648 message_id,
649 "spam",
650 "message.spam",
651 "message_restored",
652 "message_restored",
653 reason,
654 ),
655 "trashed" => self.restore_local_message_disposition(
656 message_id,
657 "trashed",
658 "message.trash",
659 "message_restored",
660 "message_restored",
661 reason,
662 ),
663 "archived" => {
664 let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
665 AppError::new(
666 "invalid_request",
667 format!("message {message_id} is not directly archived"),
668 )
669 })?;
670 self.restore_direct_archive_message(
671 &archive_uid,
672 message_id,
673 reason,
674 "message_restored",
675 "message_restored",
676 )
677 }
678 other => Err(AppError::new(
679 "invalid_request",
680 format!("message {message_id} has no disposition to restore (status: {other})"),
681 )),
682 }
683 }
684
685 pub fn archive_message(
686 &self,
687 message_id: &str,
688 archive_ref: &str,
689 summary: Option<&str>,
690 reason: Option<&str>,
691 ) -> Result<Value> {
692 self.require_workspace()?;
693 let reason = self.checked_reason(reason)?;
694 validate_id("message_id", message_id)?;
695 let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
696 self.ensure_no_related_conversation(message_id)?;
697 let transaction = self.begin_transaction(
698 "message_archive",
699 vec![
700 format!("messages/{message_id}.json"),
701 format!("archive/notifications/{archive_uid}"),
702 ".afmail/push".to_string(),
703 ],
704 )?;
705 let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
706 self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
707 self.refresh_archive_message_category(&archive_uid)?;
708 let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
709 transaction.commit()?;
710 self.append_audit_event(
711 "message_archived",
712 vec![
713 audit_target("message", message_id),
714 audit_target("archive", &archive_uid),
715 ],
716 reason,
717 json!({
718 "message_id": message_id,
719 "archive_uid": archive_uid,
720 "summary": summary,
721 "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
722 }),
723 )?;
724 serde_json::to_value(MessageArchiveResult {
725 code: "message_archived",
726 message_id: message_id.to_string(),
727 archive_uid,
728 path: rel_path(&self.root, &archive_dir),
729 special_use: SpecialUseKind::Archive.as_str().to_string(),
730 eligible_message_ids: queue.eligible_message_ids,
731 location_count: queue.location_count,
732 queued_location_count: queue.queued_location_count,
733 queued: !queue.items.is_empty(),
734 push_ids: queue
735 .items
736 .iter()
737 .map(|item| item.push_id.clone())
738 .collect::<Vec<_>>(),
739 push_id: queue.items.first().map(|item| item.push_id.clone()),
740 })
741 .map_err(|e| AppError::json("serialize message archive result", &e))
742 }
743
744 pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
745 self.require_workspace()?;
746 let reason = self.checked_reason(reason)?;
747 let config = crate::config::MailConfig::load(&self.root)?;
748 validate_id("message_id", message_id)?;
749 self.ensure_no_related_conversation(message_id)?;
750 let message_ids = vec![message_id.to_string()];
751 self.ensure_message_ids_unreferenced(&message_ids, None)?;
752 let locations = self.message_remote_locations(&message_ids)?;
753 let transaction = self.begin_transaction(
754 "message_trash",
755 vec![
756 format!("messages/{message_id}.json"),
757 ".afmail/push".to_string(),
758 ],
759 )?;
760 let item = crate::push_queue::queue_action_steps(
761 &self.root,
762 "message.trash",
763 &message_ids,
764 &locations,
765 &config.actions.message_trash.steps,
766 None,
767 )?;
768 self.update_messages_workspace(&message_ids, "trashed")?;
769 if let Some(item) = &item {
770 self.record_pending_push_item(item)?;
771 }
772 self.refresh_disposition_views()?;
773 transaction.commit()?;
774 self.append_audit_event(
775 "message_trashed",
776 vec![audit_target("message", message_id)],
777 reason,
778 json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
779 )?;
780 serde_json::to_value(MessageDispositionResult {
781 code: "message_trashed",
782 message_id: message_id.to_string(),
783 special_use: SpecialUseKind::Trash.as_str().to_string(),
784 message_ids,
785 location_count: locations.len(),
786 queued_location_count: locations.len(),
787 queued: item.is_some(),
788 push_id: item.as_ref().map(|item| item.push_id.clone()),
789 })
790 .map_err(|e| AppError::json("serialize message disposition result", &e))
791 }
792
793 pub(super) fn message_existing_view_path(
794 &self,
795 message: &MessageFile,
796 ) -> Result<Option<String>> {
797 let message_id = message.message_id.as_str();
798 let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
799 if triage_path.is_file() {
800 return Ok(Some(rel_path(&self.root, &triage_path)));
801 }
802 for dir in ["spam", "trash", "deleted"] {
803 let path = self.root.join(dir).join(format!("{message_id}.md"));
804 if path.is_file() {
805 return Ok(Some(rel_path(&self.root, &path)));
806 }
807 }
808 for (case_uid, case_path) in self.all_case_entries()? {
809 let case_messages = read_case_messages(&case_path, &case_uid);
810 if case_messages
811 .as_ref()
812 .map(|messages| messages.contains_message(message_id))
813 .unwrap_or(false)
814 {
815 let message_view = case_message_view_path(&case_path, message_id);
816 if message_view.is_file() {
817 return Ok(Some(rel_path(&self.root, &message_view)));
818 }
819 let case_view = case_path.join("case.md");
820 if case_view.is_file() {
821 return Ok(Some(rel_path(&self.root, &case_view)));
822 }
823 }
824 }
825 if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
826 let archive_view = self.archive_message_view_path(archive_uid, message_id);
827 if archive_view.is_file() {
828 return Ok(Some(rel_path(&self.root, &archive_view)));
829 }
830 }
831 Ok(None)
832 }
833
834 pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
835 Ok(message.body_text.clone())
836 }
837
838 pub(super) fn message_conversation_for_dir(
839 &self,
840 message: &MessageFile,
841 output_dir: Option<&Path>,
842 ) -> Result<String> {
843 let config = MailConfig::load(&self.root)?;
844 let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
845 self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
846 }
847
848 pub(super) fn message_conversation_with_renderer(
849 &self,
850 message: &MessageFile,
851 config: &MailConfig,
852 renderer: &mut MarkdownTemplateRenderer<'_>,
853 output_dir: Option<&Path>,
854 ) -> Result<String> {
855 let body_text = self.message_body_text(message)?;
856 renderer.render(
857 TemplateKey::MessageSection,
858 &message_section_context(
859 Some(&self.root),
860 message,
861 &body_text,
862 config.template_language(),
863 config
864 .smtp
865 .from
866 .as_deref()
867 .or(config.imap.username.as_deref()),
868 output_dir,
869 )?,
870 )
871 }
872
873 pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
874 self.rebuild_message_cache_from_eml()?;
875 let mut index = BTreeMap::new();
876 for path in message_json_paths(&self.root)? {
877 let message = read_message(&path)?;
878 if let Some(normalized) = message
879 .rfc822_message_id
880 .as_deref()
881 .and_then(normalize_rfc822_message_id)
882 {
883 index
884 .entry(normalized)
885 .or_insert_with(|| message.message_id.clone());
886 }
887 }
888 Ok(index)
889 }
890
891 pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
892 validate_id("message_id", message_id)?;
893 let current = self.read_message_by_id(message_id)?;
894 let rfc822_index = self.rfc822_message_id_index()?;
895 let mut related = BTreeSet::new();
896
897 for header_id in message_reply_header_ids(¤t) {
898 if let Some(local_message_id) = rfc822_index.get(&header_id) {
899 if local_message_id != message_id {
900 related.insert(local_message_id.clone());
901 }
902 }
903 }
904
905 let Some(current_rfc822_id) = current
906 .rfc822_message_id
907 .as_deref()
908 .and_then(normalize_rfc822_message_id)
909 else {
910 return Ok(related.into_iter().collect());
911 };
912
913 for path in message_json_paths(&self.root)? {
914 let other = read_message(&path)?;
915 if other.message_id == message_id {
916 continue;
917 }
918 if message_reply_header_ids(&other)
919 .iter()
920 .any(|header_id| header_id == ¤t_rfc822_id)
921 {
922 related.insert(other.message_id);
923 }
924 }
925
926 Ok(related.into_iter().collect())
927 }
928
929 pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
930 let related_message_ids = self.related_message_ids(message_id)?;
931 if related_message_ids.is_empty() {
932 return Ok(());
933 }
934 let mut suggested_commands = vec![format!(
935 "afmail case create --name NAME --message {message_id} --reason TEXT"
936 )];
937 for related_id in &related_message_ids {
938 suggested_commands.push(format!(
939 "afmail case add CASE_REF {related_id} --reason TEXT"
940 ));
941 }
942 suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
943 Err(AppError::new(
944 "message_has_related_conversation_use_case",
945 "message has RFC-header-confirmed related conversation",
946 )
947 .with_hint(
948 "Create a case for the conversation, add the related messages, then archive the case.",
949 )
950 .with_details(json!({
951 "message_id": message_id,
952 "related_message_ids": related_message_ids,
953 "suggested_commands": suggested_commands
954 })))
955 }
956
957 pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
958 for message_id in message_ids {
959 self.refresh_message_after_ref_change(message_id)?;
960 }
961 Ok(())
962 }
963
964 pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
965 validate_id("message_id", message_id)?;
966 let message = self.read_message_by_id(message_id)?;
967 let cases = CaseIndex::build(self)?;
968 if self.triage_candidate(&message, &cases)? {
969 self.write_triage_view(&message)?;
970 } else {
971 self.remove_triage_view_for_message(message_id)?;
972 }
973 self.refresh_all_case_message_views()?;
974 self.refresh_archive_indexes()
975 }
976
977 pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
978 validate_id("message_id", message_id)?;
979 let mut msg = self.read_message_by_id(message_id)?;
980 let cases = CaseIndex::build(self)?;
981 self.apply_materialized_workspace_overlays(&mut msg)?;
982 msg.workspace.remote_sync = None;
983 self.write_message_materialized_cache(&msg)?;
984 if self.triage_candidate(&msg, &cases)? {
985 self.write_triage_view(&msg)?;
986 } else {
987 self.remove_triage_view_for_message(message_id)?;
988 }
989 Ok(())
990 }
991
992 pub(super) fn update_messages_workspace(
993 &self,
994 message_ids: &[String],
995 status: &str,
996 ) -> Result<()> {
997 let status = MessageStatus::parse(status)?;
998 for message_id in message_ids {
999 validate_id("message_id", message_id)?;
1000 if disposition_spec_for_status(status).is_some() {
1001 self.set_message_disposition(status, message_id, None, &now_rfc3339())?;
1002 } else {
1003 self.clear_message_from_all_dispositions(message_id)?;
1004 }
1005 let mut msg = self.read_message_by_id(message_id)?;
1006 msg.workspace.status = status.as_str().to_string();
1007 if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1008 msg.workspace.archive_uid = None;
1009 msg.workspace.archived_rfc3339 = None;
1010 msg.workspace.origin = None;
1011 }
1012 msg.workspace.remote_sync = None;
1013 self.write_message_materialized_cache(&msg)?;
1014 self.remove_triage_view_for_message(message_id)?;
1015 }
1016 Ok(())
1017 }
1018
1019 pub(super) fn restore_local_message_disposition(
1020 &self,
1021 message_id: &str,
1022 expected_status: &str,
1023 push_kind: &str,
1024 event_kind: &str,
1025 result_code: &str,
1026 reason: Option<&str>,
1027 ) -> Result<Value> {
1028 self.require_workspace()?;
1029 let reason = self.checked_reason(reason)?;
1030 validate_id("message_id", message_id)?;
1031 let mut message = self.read_message_by_id(message_id)?;
1032 if message.workspace.status != expected_status {
1033 return Err(AppError::new(
1034 "invalid_request",
1035 format!("message {message_id} is not {expected_status}"),
1036 ));
1037 }
1038 let removed_push =
1039 crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1040 let push_ids = removed_push
1041 .iter()
1042 .map(|item| item.push_id.clone())
1043 .collect::<Vec<_>>();
1044 self.clear_message_disposition(MessageStatus::parse(expected_status)?, message_id)?;
1045 message.workspace.status = "triage".to_string();
1046 message.workspace.remote_sync = None;
1047 self.write_message_materialized_cache(&message)?;
1048 self.refresh_message_after_ref_change(message_id)?;
1049 self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1050 self.refresh_disposition_views()?;
1051 self.append_audit_event(
1052 event_kind,
1053 vec![audit_target("message", message_id)],
1054 reason,
1055 json!({
1056 "message_id": message_id,
1057 "from_status": expected_status,
1058 "to_status": "triage",
1059 "removed_push_ids": push_ids.clone(),
1060 }),
1061 )?;
1062 Ok(json!({
1063 "code": result_code,
1064 "message_id": message_id,
1065 "from_status": expected_status,
1066 "status": "triage",
1067 "triage_path": format!("triage/{message_id}.md"),
1068 "removed_push_count": push_ids.len(),
1069 "push_ids": push_ids,
1070 }))
1071 }
1072
1073 pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1074 let path = self.root.join("triage").join(format!("{message_id}.md"));
1075 if path.exists() {
1076 remove_file(&path)?;
1077 }
1078 Ok(())
1079 }
1080
1081 pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1082 validate_id("message_id", message_id)?;
1083 if message_eml_path(&self.root, message_id).is_file() {
1084 self.materialize_message_cache_if_needed(message_id)?;
1085 }
1086 let path = self.message_path(message_id);
1087 let mut message = read_message(&path)?;
1088 let original_workspace = message.workspace.clone();
1089 self.apply_materialized_workspace_overlays(&mut message)?;
1090 if message.workspace != original_workspace {
1091 self.write_message_materialized_cache(&message)?;
1092 }
1093 Ok(message)
1094 }
1095
1096 pub(crate) fn relocate_message(
1097 &self,
1098 message_id: &str,
1099 target_locations: &[crate::types::RemoteLocation],
1100 ) -> Result<()> {
1101 validate_id("message_id", message_id)?;
1102 let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1103 for location in target_locations {
1104 if !locations.iter().any(|existing| {
1105 existing.mailbox_name == location.mailbox_name
1106 && existing.uid_validity == location.uid_validity
1107 && existing.uid == location.uid
1108 }) {
1109 locations.push(location.clone());
1110 }
1111 }
1112 if locations.is_empty() {
1113 return Ok(());
1114 };
1115 let mut message = self.read_message_by_id(message_id)?;
1116 message.remote = Some(crate::types::RemoteState { locations });
1117 self.persist_message_remote(&message)?;
1118 self.write_message_materialized_cache(&message)?;
1119 Ok(())
1120 }
1121}