1use super::*;
2
3pub(super) fn message_json_paths(root: &Path) -> Result<Vec<PathBuf>> {
4 let dir = root.join("messages");
5 if !dir.exists() {
6 return Ok(Vec::new());
7 }
8 let mut paths = read_dir(&dir, "read messages")?
9 .into_iter()
10 .map(|entry| entry.path())
11 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("json"))
12 .collect::<Vec<_>>();
13 paths.sort();
14 Ok(paths)
15}
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18#[serde(deny_unknown_fields)]
19struct MessageStateFile {
20 schema_name: String,
21 schema_version: u64,
22 message_id: String,
23 status: String,
24 #[serde(default, skip_serializing_if = "Option::is_none")]
25 archive_uid: Option<String>,
26 #[serde(default, skip_serializing_if = "Option::is_none")]
27 archived_rfc3339: Option<String>,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 origin: Option<String>,
30 updated_rfc3339: String,
31}
32
33#[derive(Clone, Debug, Deserialize, Serialize)]
34#[serde(deny_unknown_fields)]
35struct MessageRemoteFile {
36 schema_name: String,
37 schema_version: u64,
38 message_id: String,
39 #[serde(default, skip_serializing_if = "Vec::is_empty")]
40 locations: Vec<RemoteLocation>,
41}
42
43#[derive(Debug, Serialize)]
44struct MessageDispositionResult {
45 code: &'static str,
46 message_id: String,
47 special_use: String,
48 message_ids: Vec<String>,
49 location_count: usize,
50 queued_location_count: usize,
51 queued: bool,
52 push_id: Option<String>,
53}
54
55#[derive(Debug, Serialize)]
56struct MessageArchiveResult {
57 code: &'static str,
58 message_id: String,
59 archive_uid: String,
60 path: String,
61 special_use: String,
62 eligible_message_ids: Vec<String>,
63 location_count: usize,
64 queued_location_count: usize,
65 queued: bool,
66 push_ids: Vec<String>,
67 push_id: Option<String>,
68}
69
70#[derive(Clone, Debug, Default)]
71pub(super) struct MessageCacheRebuildTotals {
72 pub(super) rebuilt_count: usize,
73 pub(super) removed_text_cache_count: usize,
74}
75
76impl MessageStateFile {
77 fn from_message(message: &MessageFile) -> Self {
78 Self {
79 schema_name: "message_state".to_string(),
80 schema_version: 1,
81 message_id: message.message_id.clone(),
82 status: message.workspace.status.clone(),
83 archive_uid: message.workspace.archive_uid.clone(),
84 archived_rfc3339: message.workspace.archived_rfc3339.clone(),
85 origin: message.workspace.origin.clone(),
86 updated_rfc3339: now_rfc3339(),
87 }
88 }
89
90 fn workspace(&self) -> Result<WorkspaceState> {
91 let status = MessageStatus::parse(&self.status)?.as_str().to_string();
92 Ok(WorkspaceState {
93 status,
94 archive_uid: self.archive_uid.clone(),
95 archived_rfc3339: self.archived_rfc3339.clone(),
96 origin: self.origin.clone(),
97 remote_sync: None,
98 push: None,
99 })
100 }
101}
102
103impl MessageRemoteFile {
104 fn from_message(message: &MessageFile) -> Option<Self> {
105 let remote = message.remote.as_ref()?;
106 if remote.locations.is_empty() {
107 return None;
108 }
109 Some(Self {
110 schema_name: "message_remote".to_string(),
111 schema_version: 1,
112 message_id: message.message_id.clone(),
113 locations: remote.locations.clone(),
114 })
115 }
116
117 fn remote_state(&self) -> RemoteState {
118 RemoteState {
119 locations: self.locations.clone(),
120 }
121 }
122}
123
124fn message_eml_path(root: &Path, message_id: &str) -> PathBuf {
125 root.join(".afmail/messages")
126 .join(format!("{message_id}.eml"))
127}
128
129fn message_state_path(root: &Path, message_id: &str) -> PathBuf {
130 root.join(".afmail/messages")
131 .join(format!("{message_id}.state.json"))
132}
133
134fn message_remote_path(root: &Path, message_id: &str) -> PathBuf {
135 root.join(".afmail/messages")
136 .join(format!("{message_id}.remote.json"))
137}
138
139pub(super) fn message_state_updated_rfc3339(
140 root: &Path,
141 message_id: &str,
142) -> Result<Option<String>> {
143 Ok(read_message_state_file(root, message_id)?.map(|state| state.updated_rfc3339))
144}
145
146fn read_message_state_file(root: &Path, message_id: &str) -> Result<Option<MessageStateFile>> {
147 let path = message_state_path(root, message_id);
148 if !path.exists() {
149 return Ok(None);
150 }
151 let data = read_to_string(&path, "read message state")?;
152 let state: MessageStateFile =
153 serde_json::from_str(&data).map_err(|e| AppError::json("parse message state", &e))?;
154 if state.schema_name != "message_state"
155 || state.schema_version != 1
156 || state.message_id != message_id
157 {
158 return Err(AppError::new(
159 "message_state_invalid",
160 format!("invalid message state sidecar: {}", rel_path(root, &path)),
161 ));
162 }
163 MessageStatus::parse(&state.status)?;
164 Ok(Some(state))
165}
166
167fn write_message_state_file(root: &Path, state: &MessageStateFile) -> Result<()> {
168 let path = message_state_path(root, &state.message_id);
169 if let Some(parent) = path.parent() {
170 create_dir_all(parent)?;
171 }
172 let mut normalized = state.clone();
173 normalized.schema_name = "message_state".to_string();
174 normalized.schema_version = 1;
175 write_json_pretty(&path, &normalized)
176}
177
178fn read_message_remote_file(root: &Path, message_id: &str) -> Result<Option<MessageRemoteFile>> {
179 let path = message_remote_path(root, message_id);
180 if !path.exists() {
181 return Ok(None);
182 }
183 let data = read_to_string(&path, "read message remote")?;
184 let remote: MessageRemoteFile =
185 serde_json::from_str(&data).map_err(|e| AppError::json("parse message remote", &e))?;
186 if remote.schema_name != "message_remote"
187 || remote.schema_version != 1
188 || remote.message_id != message_id
189 {
190 return Err(AppError::new(
191 "message_remote_invalid",
192 format!("invalid message remote sidecar: {}", rel_path(root, &path)),
193 ));
194 }
195 Ok(Some(remote))
196}
197
198fn write_message_remote_file(root: &Path, remote: &MessageRemoteFile) -> Result<()> {
199 let path = message_remote_path(root, &remote.message_id);
200 if let Some(parent) = path.parent() {
201 create_dir_all(parent)?;
202 }
203 let mut normalized = remote.clone();
204 normalized.schema_name = "message_remote".to_string();
205 normalized.schema_version = 1;
206 write_json_pretty(&path, &normalized)
207}
208
209fn default_message_workspace() -> WorkspaceState {
210 WorkspaceState {
211 status: "triage".to_string(),
212 archive_uid: None,
213 archived_rfc3339: None,
214 origin: None,
215 remote_sync: None,
216 push: None,
217 }
218}
219
220pub(super) fn purge_message_artifacts(root: &Path, message_id: &str) -> Result<()> {
221 validate_id("message_id", message_id)?;
222 let message_dir = root.join(".afmail/messages");
223 for path in [
224 root.join("messages").join(format!("{message_id}.json")),
225 message_dir.join(format!("{message_id}.json")),
226 message_dir.join(format!("{message_id}.eml")),
227 message_dir.join(format!("{message_id}.state.json")),
228 message_dir.join(format!("{message_id}.remote.json")),
229 message_dir.join(format!("{message_id}.txt")),
230 ] {
231 if path.exists() {
232 remove_file(&path)?;
233 }
234 }
235 let files_dir = message_dir.join(format!("{message_id}.files"));
236 if files_dir.exists() {
237 remove_dir_all(&files_dir)?;
238 }
239 Ok(())
240}
241
242pub(super) fn attachment_metadata_values(attachments: &[AttachmentRef]) -> Vec<Value> {
243 attachments
244 .iter()
245 .map(|attachment| {
246 json!({
247 "part_id": attachment.part_id.as_str(),
248 "filename": attachment.filename.as_str(),
249 "saved_filename": saved_filename_for_attachment(attachment),
250 "content_type": attachment.content_type.as_str(),
251 "size_bytes": attachment.size_bytes,
252 "fetched": attachment.fetched,
253 "file_path": attachment.file_path.as_deref().unwrap_or(""),
254 "storage": if attachment.fetched { "message_cache" } else { "" },
255 })
256 })
257 .collect()
258}
259
260pub(super) fn read_message(path: &Path) -> Result<MessageFile> {
261 let data = read_to_string(path, "read message json")?;
262 let message: MessageFile =
263 serde_json::from_str(&data).map_err(|e| AppError::json("parse message json", &e))?;
264 if message.schema_name != "message" || message.schema_version != 1 {
265 return Err(AppError::new(
266 "message_cache_invalid",
267 format!("unsupported message cache schema: {}", path_to_string(path)),
268 ));
269 }
270 MessageStatus::parse(&message.workspace.status)?;
271 if let Some(direction) = message.direction.as_deref() {
272 MailDirection::parse(direction)?;
273 }
274 Ok(message)
275}
276
277pub(super) fn normalize_rfc822_message_id(value: &str) -> Option<String> {
278 let normalized = value
279 .trim()
280 .trim_matches(|ch| matches!(ch, '<' | '>' | ',' | ';'))
281 .trim()
282 .to_ascii_lowercase();
283 if normalized.is_empty() {
284 None
285 } else {
286 Some(normalized)
287 }
288}
289
290pub(super) fn rfc822_message_id_candidates(value: &str) -> Vec<String> {
291 let mut ids = Vec::new();
292 let mut rest = value;
293 while let Some(start) = rest.find('<') {
294 let after_start = &rest[start + 1..];
295 let Some(end) = after_start.find('>') else {
296 break;
297 };
298 if let Some(id) = normalize_rfc822_message_id(&after_start[..end]) {
299 ids.push(id);
300 }
301 rest = &after_start[end + 1..];
302 }
303 if ids.is_empty() {
304 ids.extend(
305 value
306 .split_whitespace()
307 .filter_map(normalize_rfc822_message_id),
308 );
309 }
310 ids.sort();
311 ids.dedup();
312 ids
313}
314
315pub(super) fn message_reply_header_ids(message: &MessageFile) -> Vec<String> {
316 let mut ids = Vec::new();
317 if let Some(in_reply_to) = &message.in_reply_to {
318 ids.extend(rfc822_message_id_candidates(in_reply_to));
319 }
320 for reference in &message.references {
321 ids.extend(rfc822_message_id_candidates(reference));
322 }
323 ids.sort();
324 ids.dedup();
325 ids
326}
327
328impl Workspace {
329 pub(crate) fn persist_message_state(&self, message: &MessageFile) -> Result<()> {
330 let state = MessageStateFile::from_message(message);
331 write_message_state_file(&self.root, &state)
332 }
333
334 pub(crate) fn persist_message_remote(&self, message: &MessageFile) -> Result<()> {
335 let path = message_remote_path(&self.root, &message.message_id);
336 if let Some(remote) = MessageRemoteFile::from_message(message) {
337 write_message_remote_file(&self.root, &remote)
338 } else if path.exists() {
339 remove_file(&path)
340 } else {
341 Ok(())
342 }
343 }
344
345 pub(crate) fn write_message_materialized_cache(&self, message: &MessageFile) -> Result<()> {
346 let mut message = message.clone();
347 message.schema_name = "message".to_string();
348 message.schema_version = 1;
349 if message.eml_path.is_none() {
350 message.eml_path = Some(format!(".afmail/messages/{}.eml", message.message_id));
351 }
352 let path = self.message_path(&message.message_id);
353 if let Some(parent) = path.parent() {
354 create_dir_all(parent)?;
355 }
356 write_json_pretty(&path, &message)
357 }
358
359 pub(super) fn write_message_cache(&self, message: &MessageFile) -> Result<()> {
360 self.persist_message_state(message)?;
361 self.write_message_materialized_cache(message)
362 }
363
364 pub(crate) fn write_message_artifacts(&self, message: &MessageFile) -> Result<()> {
365 self.persist_message_state(message)?;
366 self.persist_message_remote(message)?;
367 self.write_message_materialized_cache(message)
368 }
369
370 fn materialize_message_cache_if_needed(&self, message_id: &str) -> Result<bool> {
371 if !self.message_cache_needs_materialize(message_id)? {
372 return Ok(false);
373 }
374 self.materialize_message_cache(message_id)?;
375 Ok(true)
376 }
377
378 fn materialize_message_cache(&self, message_id: &str) -> Result<MessageFile> {
379 validate_id("message_id", message_id)?;
380 let eml_path = message_eml_path(&self.root, message_id);
381 let raw = fs::read(&eml_path).map_err(|e| AppError::io("read message eml", &e))?;
382 let prior = read_message(&self.message_path(message_id)).ok();
383 let state = read_message_state_file(&self.root, message_id)?;
384 let remote =
385 read_message_remote_file(&self.root, message_id)?.map(|file| file.remote_state());
386 let workspace = state
387 .as_ref()
388 .map(MessageStateFile::workspace)
389 .transpose()?
390 .unwrap_or_else(default_message_workspace);
391 let direction = self.infer_materialized_direction(&raw, prior.as_ref(), remote.as_ref());
392 let mut parsed = crate::mail::parse_message_with_options(
393 message_id.to_string(),
394 &raw,
395 crate::mail::MessageParseOptions {
396 direction,
397 workspace,
398 remote,
399 received_rfc3339: prior
400 .as_ref()
401 .and_then(|message| message.received_rfc3339.clone()),
402 sent_rfc3339: prior
403 .as_ref()
404 .and_then(|message| message.sent_rfc3339.clone()),
405 attachments: prior
406 .as_ref()
407 .map(|message| message.attachments.clone())
408 .unwrap_or_default(),
409 },
410 )?;
411 self.apply_fetched_attachment_files(message_id, &mut parsed.message.attachments);
412 self.apply_materialized_workspace_overlays(&mut parsed.message)?;
413 self.write_message_materialized_cache(&parsed.message)?;
414 Ok(parsed.message)
415 }
416
417 fn message_cache_needs_materialize(&self, message_id: &str) -> Result<bool> {
418 let cache_path = self.message_path(message_id);
419 if !cache_path.exists() {
420 return Ok(true);
421 }
422 let data = match read_to_string(&cache_path, "read message cache") {
423 Ok(data) => data,
424 Err(_) => return Ok(true),
425 };
426 let value: Value = match serde_json::from_str(&data) {
427 Ok(value) => value,
428 Err(_) => return Ok(true),
429 };
430 if value.get("schema_name").and_then(Value::as_str) != Some("message")
431 || value.get("schema_version").and_then(Value::as_u64) != Some(1)
432 {
433 return Ok(true);
434 }
435 let cache_modified = match fs::metadata(&cache_path).and_then(|meta| meta.modified()) {
436 Ok(time) => time,
437 Err(_) => return Ok(true),
438 };
439 for input in [
440 message_eml_path(&self.root, message_id),
441 message_state_path(&self.root, message_id),
442 message_remote_path(&self.root, message_id),
443 ] {
444 if !input.exists() {
445 continue;
446 }
447 if let Ok(input_modified) = fs::metadata(&input).and_then(|meta| meta.modified()) {
448 if input_modified > cache_modified {
449 return Ok(true);
450 }
451 }
452 }
453 Ok(false)
454 }
455
456 fn infer_materialized_direction(
457 &self,
458 raw: &[u8],
459 prior: Option<&MessageFile>,
460 remote: Option<&RemoteState>,
461 ) -> Option<String> {
462 if let Some(direction) = prior.and_then(|message| message.direction.clone()) {
463 return Some(direction);
464 }
465 if let Some(remote) = remote {
466 if let Ok(config) = MailConfig::load(&self.root) {
467 for location in &remote.locations {
468 if let Some(mailbox_id) = location.mailbox_id.as_deref() {
469 if let Ok(action) = config.pull_action(mailbox_id) {
470 return Some(action.direction.as_str().to_string());
471 }
472 }
473 }
474 }
475 }
476 let local_message_id = mail_parser::MessageParser::default()
477 .parse(raw)
478 .and_then(|message| message.message_id().map(ToString::to_string))
479 .is_some_and(|message_id| {
480 message_id
481 .trim()
482 .trim_matches(|ch| matches!(ch, '<' | '>'))
483 .ends_with("@afmail.local")
484 });
485 Some(if local_message_id {
486 "outbound".to_string()
487 } else {
488 "inbound".to_string()
489 })
490 }
491
492 fn apply_fetched_attachment_files(&self, message_id: &str, attachments: &mut [AttachmentRef]) {
493 let files_dir = self
494 .root
495 .join(".afmail/messages")
496 .join(format!("{message_id}.files"));
497 if !files_dir.is_dir() {
498 return;
499 }
500 for attachment in attachments {
501 if attachment.fetched
502 && attachment
503 .file_path
504 .as_deref()
505 .is_some_and(|path| self.root.join(path).is_file())
506 {
507 continue;
508 }
509 let candidate = files_dir.join(safe_attachment_filename(
510 &attachment.filename,
511 &attachment.part_id,
512 ));
513 if candidate.is_file() {
514 attachment.fetched = true;
515 attachment.file_path = Some(rel_path(&self.root, &candidate));
516 }
517 }
518 }
519
520 fn apply_materialized_workspace_overlays(&self, message: &mut MessageFile) -> Result<()> {
521 if let Some((archive_uid, archived_rfc3339)) = self
522 .direct_archive_state_by_message()?
523 .get(&message.message_id)
524 {
525 message.workspace.status = "archived".to_string();
526 message.workspace.archive_uid = Some(archive_uid.clone());
527 message.workspace.archived_rfc3339 = Some(archived_rfc3339.clone());
528 message.workspace.origin = None;
529 }
530 let cases = CaseIndex::build(self)?;
531 message.workspace.status = self.derived_message_status(message, &cases)?;
532 message.workspace.push = self.pending_push_state_for_message(&message.message_id)?;
533 Ok(())
534 }
535
536 fn pending_push_state_for_message(
537 &self,
538 message_id: &str,
539 ) -> Result<Option<WorkspacePushState>> {
540 let mut state = WorkspacePushState::default();
541 for item in crate::push_queue::pending_items(&self.root)? {
542 if !item.message_ids().iter().any(|id| id == message_id) {
543 continue;
544 }
545 state.pending.push(WorkspacePendingPush {
546 push_id: item.push_id.clone(),
547 kind: item.display_kind(),
548 queued_rfc3339: item.created_rfc3339.clone(),
549 last_error: item.last_error.clone(),
550 });
551 }
552 state.pending.sort_by(|a, b| a.push_id.cmp(&b.push_id));
553 if state.pending.is_empty() && state.last_completed_rfc3339.is_none() {
554 Ok(None)
555 } else {
556 Ok(Some(state))
557 }
558 }
559
560 pub(super) fn rebuild_message_cache_from_eml(&self) -> Result<MessageCacheRebuildTotals> {
561 let messages_dir = self.root.join(".afmail/messages");
562 if !messages_dir.exists() {
563 return Ok(MessageCacheRebuildTotals::default());
564 }
565 let mut totals = MessageCacheRebuildTotals::default();
566 let mut eml_paths = read_dir(&messages_dir, "read message eml cache")?
567 .into_iter()
568 .map(|entry| entry.path())
569 .filter(|path| path.extension().and_then(|s| s.to_str()) == Some("eml"))
570 .collect::<Vec<_>>();
571 eml_paths.sort();
572
573 for eml_path in eml_paths {
574 let Some(message_id) = eml_path.file_stem().and_then(|stem| stem.to_str()) else {
575 continue;
576 };
577 validate_id("message_id", message_id)?;
578 if self.materialize_message_cache_if_needed(message_id)? {
579 totals.rebuilt_count += 1;
580 }
581 }
582
583 for entry in read_dir(&messages_dir, "read message cache")? {
584 let path = entry.path();
585 if path.extension().and_then(|s| s.to_str()) == Some("txt") {
586 remove_file(&path)?;
587 totals.removed_text_cache_count += 1;
588 }
589 }
590 Ok(totals)
591 }
592
593 fn direct_archive_state_by_message(&self) -> Result<BTreeMap<String, (String, String)>> {
594 let mut out = BTreeMap::new();
595 for archive_uid in self.archive_message_category_ids()? {
596 let data = self.read_archive_messages(&archive_uid)?;
597 for item in data.items {
598 out.insert(
599 item.message_id,
600 (archive_uid.clone(), item.archived_rfc3339),
601 );
602 }
603 }
604 Ok(out)
605 }
606
607 pub fn message_show(&self, message_id: &str) -> Result<Value> {
608 self.require_workspace()?;
609 validate_id("message_id", message_id)?;
610 let config = MailConfig::load(&self.root)?;
611 let message = self.read_message_by_id(message_id)?;
612 let body_text = self.message_body_text(&message)?;
613 let flags = message_remote_flags(&message);
614 let unread = !flags.iter().any(|flag| flag.eq_ignore_ascii_case("\\Seen"));
615 let flagged = flags
616 .iter()
617 .any(|flag| flag.eq_ignore_ascii_case("\\Flagged"));
618 let push = message.workspace.push.clone();
619 Ok(json!({
620 "code": "message_show",
621 "message_id": message.message_id.as_str(),
622 "from": message.from.as_deref().unwrap_or(""),
623 "to": &message.to,
624 "cc": &message.cc,
625 "bcc": &message.bcc,
626 "reply_to": &message.reply_to,
627 "sender": message.sender.as_deref().unwrap_or(""),
628 "subject": message.subject.as_deref().unwrap_or(""),
629 "direction": message.direction.as_deref().unwrap_or(""),
630 "received_rfc3339": message.received_rfc3339.as_deref().unwrap_or(""),
631 "sent_rfc3339": message.sent_rfc3339.as_deref().unwrap_or(""),
632 "body_text": body_text,
633 "attachment_count": message.attachments.len(),
634 "attachments": attachment_metadata_values(&message.attachments),
635 "mailbox_ids": message_mailbox_ids(&message, &config),
636 "flags": flags,
637 "unread": unread,
638 "flagged": flagged,
639 "remote_missing": message_remote_missing(&message),
640 "remote_missing_since_rfc3339": message_remote_missing_since_rfc3339(&message),
641 "remote_effect_pending": message_remote_effect_pending(&message),
642 "push": push,
643 "view_path": self.message_existing_view_path(&message)?,
644 "json_path": format!("messages/{message_id}.json"),
645 "message": message_template_value(&message)?,
646 }))
647 }
648
649 pub fn spam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
650 self.require_workspace()?;
651 let reason = self.checked_reason(reason)?;
652 let config = crate::config::MailConfig::load(&self.root)?;
653 validate_id("message_id", message_id)?;
654 self.ensure_no_related_conversation(message_id)?;
655 let message_ids = vec![message_id.to_string()];
656 self.ensure_message_ids_unreferenced(&message_ids, None)?;
657 let locations = self.message_remote_locations(&message_ids)?;
658 let transaction = self.begin_transaction(
659 "message_spam",
660 vec![
661 format!("messages/{message_id}.json"),
662 ".afmail/push".to_string(),
663 ],
664 )?;
665 let item = crate::push_queue::queue_action_steps(
666 &self.root,
667 "message.spam",
668 &message_ids,
669 &locations,
670 &config.actions.message_spam.steps,
671 None,
672 )?;
673 self.update_messages_workspace(&message_ids, "spam")?;
674 if let Some(item) = &item {
675 self.record_pending_push_item(item)?;
676 }
677 self.refresh_disposition_views()?;
678 transaction.commit()?;
679 self.append_audit_event(
680 "message_spam_marked",
681 vec![audit_target("message", message_id)],
682 reason,
683 json!({"message_id": message_id, "special_use": SpecialUseKind::Junk.as_str()}),
684 )?;
685 serde_json::to_value(MessageDispositionResult {
686 code: "message_spam_marked",
687 message_id: message_id.to_string(),
688 special_use: SpecialUseKind::Junk.as_str().to_string(),
689 message_ids,
690 location_count: locations.len(),
691 queued_location_count: locations.len(),
692 queued: item.is_some(),
693 push_id: item.as_ref().map(|item| item.push_id.clone()),
694 })
695 .map_err(|e| AppError::json("serialize message disposition result", &e))
696 }
697
698 pub fn unspam_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
699 self.restore_local_message_disposition(
700 message_id,
701 "spam",
702 "message.spam",
703 "message_unspammed",
704 "message_unspammed",
705 reason,
706 )
707 }
708
709 pub fn archive_message(
710 &self,
711 message_id: &str,
712 archive_ref: &str,
713 summary: Option<&str>,
714 reason: Option<&str>,
715 ) -> Result<Value> {
716 self.require_workspace()?;
717 let reason = self.checked_reason(reason)?;
718 validate_id("message_id", message_id)?;
719 let (archive_uid, archive_dir) = self.resolve_archive_message_category(archive_ref)?;
720 self.ensure_no_related_conversation(message_id)?;
721 let transaction = self.begin_transaction(
722 "message_archive",
723 vec![
724 format!("messages/{message_id}.json"),
725 format!("archive/notifications/{archive_uid}"),
726 ".afmail/push".to_string(),
727 ],
728 )?;
729 let archived_rfc3339 = self.set_direct_message_archive(message_id, &archive_uid)?;
730 self.upsert_archive_message_item(&archive_uid, message_id, summary, &archived_rfc3339)?;
731 self.refresh_archive_message_category(&archive_uid)?;
732 let queue = self.queue_archive_for_archived_messages(&[message_id.to_string()], None)?;
733 transaction.commit()?;
734 self.append_audit_event(
735 "message_archived",
736 vec![
737 audit_target("message", message_id),
738 audit_target("archive", &archive_uid),
739 ],
740 reason,
741 json!({
742 "message_id": message_id,
743 "archive_uid": archive_uid,
744 "summary": summary,
745 "to_path": format!("{}/views/messages/{message_id}.md", rel_path(&self.root, &archive_dir)),
746 }),
747 )?;
748 serde_json::to_value(MessageArchiveResult {
749 code: "message_archived",
750 message_id: message_id.to_string(),
751 archive_uid,
752 path: rel_path(&self.root, &archive_dir),
753 special_use: SpecialUseKind::Archive.as_str().to_string(),
754 eligible_message_ids: queue.eligible_message_ids,
755 location_count: queue.location_count,
756 queued_location_count: queue.queued_location_count,
757 queued: !queue.items.is_empty(),
758 push_ids: queue
759 .items
760 .iter()
761 .map(|item| item.push_id.clone())
762 .collect::<Vec<_>>(),
763 push_id: queue.items.first().map(|item| item.push_id.clone()),
764 })
765 .map_err(|e| AppError::json("serialize message archive result", &e))
766 }
767
768 pub fn trash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
769 self.require_workspace()?;
770 let reason = self.checked_reason(reason)?;
771 let config = crate::config::MailConfig::load(&self.root)?;
772 validate_id("message_id", message_id)?;
773 self.ensure_no_related_conversation(message_id)?;
774 let message_ids = vec![message_id.to_string()];
775 self.ensure_message_ids_unreferenced(&message_ids, None)?;
776 let locations = self.message_remote_locations(&message_ids)?;
777 let transaction = self.begin_transaction(
778 "message_trash",
779 vec![
780 format!("messages/{message_id}.json"),
781 ".afmail/push".to_string(),
782 ],
783 )?;
784 let item = crate::push_queue::queue_action_steps(
785 &self.root,
786 "message.trash",
787 &message_ids,
788 &locations,
789 &config.actions.message_trash.steps,
790 None,
791 )?;
792 self.update_messages_workspace(&message_ids, "trashed")?;
793 if let Some(item) = &item {
794 self.record_pending_push_item(item)?;
795 }
796 self.refresh_disposition_views()?;
797 transaction.commit()?;
798 self.append_audit_event(
799 "message_trashed",
800 vec![audit_target("message", message_id)],
801 reason,
802 json!({"message_id": message_id, "special_use": SpecialUseKind::Trash.as_str()}),
803 )?;
804 serde_json::to_value(MessageDispositionResult {
805 code: "message_trashed",
806 message_id: message_id.to_string(),
807 special_use: SpecialUseKind::Trash.as_str().to_string(),
808 message_ids,
809 location_count: locations.len(),
810 queued_location_count: locations.len(),
811 queued: item.is_some(),
812 push_id: item.as_ref().map(|item| item.push_id.clone()),
813 })
814 .map_err(|e| AppError::json("serialize message disposition result", &e))
815 }
816
817 pub fn untrash_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
818 self.restore_local_message_disposition(
819 message_id,
820 "trashed",
821 "message.trash",
822 "message_untrashed",
823 "message_untrashed",
824 reason,
825 )
826 }
827
828 pub fn unarchive_message(&self, message_id: &str, reason: Option<&str>) -> Result<Value> {
829 self.require_workspace()?;
830 let reason = self.checked_reason(reason)?;
831 validate_id("message_id", message_id)?;
832 let message = self.read_message_by_id(message_id)?;
833 let archive_uid = message.workspace.archive_uid.clone().ok_or_else(|| {
834 AppError::new(
835 "invalid_request",
836 format!("message {message_id} is not directly archived"),
837 )
838 })?;
839 self.restore_direct_archive_message(
840 &archive_uid,
841 message_id,
842 reason,
843 "message_unarchived",
844 "message_unarchived",
845 )
846 }
847
848 pub(super) fn message_existing_view_path(
849 &self,
850 message: &MessageFile,
851 ) -> Result<Option<String>> {
852 let message_id = message.message_id.as_str();
853 let triage_path = self.root.join("triage").join(format!("{message_id}.md"));
854 if triage_path.is_file() {
855 return Ok(Some(rel_path(&self.root, &triage_path)));
856 }
857 for dir in ["spam", "trash", "deleted"] {
858 let path = self.root.join(dir).join(format!("{message_id}.md"));
859 if path.is_file() {
860 return Ok(Some(rel_path(&self.root, &path)));
861 }
862 }
863 for (case_uid, case_path) in self.all_case_entries()? {
864 let messages_path = case_messages_json_path(&case_path);
865 let case_messages = read_case_messages(&messages_path, &case_uid);
866 if case_messages
867 .as_ref()
868 .map(|messages| messages.message_ids.iter().any(|id| id == message_id))
869 .unwrap_or(false)
870 {
871 let message_view = case_message_view_path(&case_path, message_id);
872 if message_view.is_file() {
873 return Ok(Some(rel_path(&self.root, &message_view)));
874 }
875 let case_view = case_path.join("case.md");
876 if case_view.is_file() {
877 return Ok(Some(rel_path(&self.root, &case_view)));
878 }
879 }
880 }
881 if let Some(archive_uid) = message.workspace.archive_uid.as_deref() {
882 let archive_view = self.archive_message_view_path(archive_uid, message_id);
883 if archive_view.is_file() {
884 return Ok(Some(rel_path(&self.root, &archive_view)));
885 }
886 }
887 Ok(None)
888 }
889
890 pub(super) fn message_body_text(&self, message: &MessageFile) -> Result<String> {
891 Ok(message.body_text.clone())
892 }
893
894 pub(super) fn message_conversation_for_dir(
895 &self,
896 message: &MessageFile,
897 output_dir: Option<&Path>,
898 ) -> Result<String> {
899 let config = MailConfig::load(&self.root)?;
900 let mut renderer = MarkdownTemplateRenderer::new(&self.root, config.template_language());
901 self.message_conversation_with_renderer(message, &config, &mut renderer, output_dir)
902 }
903
904 pub(super) fn message_conversation_with_renderer(
905 &self,
906 message: &MessageFile,
907 config: &MailConfig,
908 renderer: &mut MarkdownTemplateRenderer<'_>,
909 output_dir: Option<&Path>,
910 ) -> Result<String> {
911 let body_text = self.message_body_text(message)?;
912 renderer.render(
913 TemplateKey::MessageSection,
914 &message_section_context(
915 Some(&self.root),
916 message,
917 &body_text,
918 config.template_language(),
919 config
920 .smtp
921 .from
922 .as_deref()
923 .or(config.imap.username.as_deref()),
924 output_dir,
925 )?,
926 )
927 }
928
929 pub(super) fn rfc822_message_id_index(&self) -> Result<BTreeMap<String, String>> {
930 self.rebuild_message_cache_from_eml()?;
931 let mut index = BTreeMap::new();
932 for path in message_json_paths(&self.root)? {
933 let message = read_message(&path)?;
934 if let Some(normalized) = message
935 .rfc822_message_id
936 .as_deref()
937 .and_then(normalize_rfc822_message_id)
938 {
939 index
940 .entry(normalized)
941 .or_insert_with(|| message.message_id.clone());
942 }
943 }
944 Ok(index)
945 }
946
947 pub(super) fn related_message_ids(&self, message_id: &str) -> Result<Vec<String>> {
948 validate_id("message_id", message_id)?;
949 let current = self.read_message_by_id(message_id)?;
950 let rfc822_index = self.rfc822_message_id_index()?;
951 let mut related = BTreeSet::new();
952
953 for header_id in message_reply_header_ids(¤t) {
954 if let Some(local_message_id) = rfc822_index.get(&header_id) {
955 if local_message_id != message_id {
956 related.insert(local_message_id.clone());
957 }
958 }
959 }
960
961 let Some(current_rfc822_id) = current
962 .rfc822_message_id
963 .as_deref()
964 .and_then(normalize_rfc822_message_id)
965 else {
966 return Ok(related.into_iter().collect());
967 };
968
969 for path in message_json_paths(&self.root)? {
970 let other = read_message(&path)?;
971 if other.message_id == message_id {
972 continue;
973 }
974 if message_reply_header_ids(&other)
975 .iter()
976 .any(|header_id| header_id == ¤t_rfc822_id)
977 {
978 related.insert(other.message_id);
979 }
980 }
981
982 Ok(related.into_iter().collect())
983 }
984
985 pub(super) fn ensure_no_related_conversation(&self, message_id: &str) -> Result<()> {
986 let related_message_ids = self.related_message_ids(message_id)?;
987 if related_message_ids.is_empty() {
988 return Ok(());
989 }
990 let mut suggested_commands = vec![format!(
991 "afmail case create --name NAME --message {message_id} --reason TEXT"
992 )];
993 for related_id in &related_message_ids {
994 suggested_commands.push(format!(
995 "afmail case add CASE_REF {related_id} --reason TEXT"
996 ));
997 }
998 suggested_commands.push("afmail case archive CASE_REF --reason TEXT".to_string());
999 Err(AppError::new(
1000 "message_has_related_conversation_use_case",
1001 "message has RFC-header-confirmed related conversation",
1002 )
1003 .with_hint(
1004 "Create a case for the conversation, add the related messages, then archive the case.",
1005 )
1006 .with_details(json!({
1007 "message_id": message_id,
1008 "related_message_ids": related_message_ids,
1009 "suggested_commands": suggested_commands
1010 })))
1011 }
1012
1013 pub(super) fn refresh_messages_after_ref_change(&self, message_ids: &[String]) -> Result<()> {
1014 for message_id in message_ids {
1015 self.refresh_message_after_ref_change(message_id)?;
1016 }
1017 Ok(())
1018 }
1019
1020 pub(super) fn refresh_read_views_after_message_change(&self, message_id: &str) -> Result<()> {
1021 validate_id("message_id", message_id)?;
1022 let message = self.read_message_by_id(message_id)?;
1023 let cases = CaseIndex::build(self)?;
1024 if self.triage_candidate(&message, &cases)? {
1025 self.write_triage_view(&message)?;
1026 } else {
1027 self.remove_triage_view_for_message(message_id)?;
1028 }
1029 self.refresh_all_case_message_views()?;
1030 self.refresh_archive_indexes()
1031 }
1032
1033 pub(super) fn refresh_message_after_ref_change(&self, message_id: &str) -> Result<()> {
1034 validate_id("message_id", message_id)?;
1035 let mut msg = self.read_message_by_id(message_id)?;
1036 let cases = CaseIndex::build(self)?;
1037 msg.workspace.status = self.derived_message_status(&msg, &cases)?;
1038 msg.workspace.remote_sync = None;
1039 self.write_message_cache(&msg)?;
1040 if self.triage_candidate(&msg, &cases)? {
1041 self.write_triage_view(&msg)?;
1042 } else {
1043 self.remove_triage_view_for_message(message_id)?;
1044 }
1045 Ok(())
1046 }
1047
1048 pub(super) fn update_messages_workspace(
1049 &self,
1050 message_ids: &[String],
1051 status: &str,
1052 ) -> Result<()> {
1053 let status = MessageStatus::parse(status)?;
1054 for message_id in message_ids {
1055 validate_id("message_id", message_id)?;
1056 let mut msg = self.read_message_by_id(message_id)?;
1057 msg.workspace.status = status.as_str().to_string();
1058 if matches!(status, MessageStatus::Spam | MessageStatus::Trashed) {
1059 msg.workspace.archive_uid = None;
1060 msg.workspace.archived_rfc3339 = None;
1061 msg.workspace.origin = None;
1062 }
1063 msg.workspace.remote_sync = None;
1064 self.write_message_cache(&msg)?;
1065 self.remove_triage_view_for_message(message_id)?;
1066 }
1067 Ok(())
1068 }
1069
1070 pub(super) fn restore_local_message_disposition(
1071 &self,
1072 message_id: &str,
1073 expected_status: &str,
1074 push_kind: &str,
1075 event_kind: &str,
1076 result_code: &str,
1077 reason: Option<&str>,
1078 ) -> Result<Value> {
1079 self.require_workspace()?;
1080 let reason = self.checked_reason(reason)?;
1081 validate_id("message_id", message_id)?;
1082 let mut message = self.read_message_by_id(message_id)?;
1083 if message.workspace.status != expected_status {
1084 return Err(AppError::new(
1085 "invalid_request",
1086 format!("message {message_id} is not {expected_status}"),
1087 ));
1088 }
1089 let removed_push =
1090 crate::push_queue::remove_pending_message_pushes(&self.root, message_id, push_kind)?;
1091 let push_ids = removed_push
1092 .iter()
1093 .map(|item| item.push_id.clone())
1094 .collect::<Vec<_>>();
1095 message.workspace.status = "triage".to_string();
1096 message.workspace.remote_sync = None;
1097 self.write_message_cache(&message)?;
1098 self.refresh_message_after_ref_change(message_id)?;
1099 self.clear_message_pending_pushes(message_id, &push_ids, false)?;
1100 self.refresh_disposition_views()?;
1101 self.append_audit_event(
1102 event_kind,
1103 vec![audit_target("message", message_id)],
1104 reason,
1105 json!({
1106 "message_id": message_id,
1107 "from_status": expected_status,
1108 "to_status": "triage",
1109 "removed_push_ids": push_ids.clone(),
1110 }),
1111 )?;
1112 Ok(json!({
1113 "code": result_code,
1114 "message_id": message_id,
1115 "from_status": expected_status,
1116 "status": "triage",
1117 "triage_path": format!("triage/{message_id}.md"),
1118 "removed_push_count": push_ids.len(),
1119 "push_ids": push_ids,
1120 }))
1121 }
1122
1123 pub(super) fn remove_triage_view_for_message(&self, message_id: &str) -> Result<()> {
1124 let path = self.root.join("triage").join(format!("{message_id}.md"));
1125 if path.exists() {
1126 remove_file(&path)?;
1127 }
1128 Ok(())
1129 }
1130
1131 pub(crate) fn read_message_by_id(&self, message_id: &str) -> Result<MessageFile> {
1132 validate_id("message_id", message_id)?;
1133 if message_eml_path(&self.root, message_id).is_file() {
1134 self.materialize_message_cache_if_needed(message_id)?;
1135 }
1136 let path = self.message_path(message_id);
1137 read_message(&path)
1138 }
1139
1140 pub(crate) fn relocate_message(
1141 &self,
1142 message_id: &str,
1143 target_locations: &[crate::types::RemoteLocation],
1144 ) -> Result<()> {
1145 validate_id("message_id", message_id)?;
1146 let mut locations: Vec<crate::types::RemoteLocation> = Vec::new();
1147 for location in target_locations {
1148 if !locations.iter().any(|existing| {
1149 existing.mailbox_name == location.mailbox_name
1150 && existing.uid_validity == location.uid_validity
1151 && existing.uid == location.uid
1152 }) {
1153 locations.push(location.clone());
1154 }
1155 }
1156 if locations.is_empty() {
1157 return Ok(());
1158 };
1159 let mut message = self.read_message_by_id(message_id)?;
1160 message.remote = Some(crate::types::RemoteState { locations });
1161 self.persist_message_remote(&message)?;
1162 self.write_message_materialized_cache(&message)?;
1163 Ok(())
1164 }
1165}