1mod execute;
2mod io;
3mod preview;
4
5use execute::*;
6use io::*;
7use preview::*;
8
9use crate::config::{ActionStep, ActionStepOn, MailConfig, SpecialUseKind};
10use crate::error::{AppError, Result};
11use crate::frontmatter::DraftFrontmatter;
12use crate::progress::ProgressCallback;
13use crate::types::{
14 MessageActionPush, MessagePushAction, OutboundPush, PushItem, PushLocation, PushPayload,
15 PushStepState, PushStepStatus,
16};
17use crate::util::{write_bytes_atomic, write_string_atomic};
18use serde_json::{json, Value};
19use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Component, Path, PathBuf};
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub enum PushMode {
25 All,
26 Drafts,
27 DraftsSend,
28 Archive,
29 Spam,
30 Trash,
31}
32
33impl PushMode {
34 pub(crate) fn as_str(self) -> &'static str {
35 match self {
36 Self::All => "all",
37 Self::Drafts => "drafts",
38 Self::DraftsSend => "drafts-send",
39 Self::Archive => "archive",
40 Self::Spam => "spam",
41 Self::Trash => "trash",
42 }
43 }
44}
45
46#[derive(Clone, Debug)]
47pub struct RemovedOutbound {
48 pub push_id: String,
49 pub eml_path: Option<String>,
50}
51
52#[derive(Clone, Debug)]
53pub struct RemovedMessagePush {
54 pub push_id: String,
55}
56
57pub fn queue_outbound(
58 root: &Path,
59 case_path: &Path,
60 case_uid: &str,
61 draft_name: &str,
62 draft_hash: &str,
63 config: &MailConfig,
64) -> Result<Value> {
65 let existing = find_outbound_item(root, case_uid, draft_name)?;
66 let push_id = existing
67 .as_ref()
68 .map(|item| item.push_id.clone())
69 .unwrap_or_else(|| unique_push_id(root));
70 let message_id = existing
71 .as_ref()
72 .and_then(|item| item.outbound())
73 .map(|outbound| outbound.message_id.as_str());
74 let prepared = crate::smtp_send::prepare_outbound(
75 root, case_path, case_uid, draft_name, config, message_id,
76 )?;
77 let draft_text = fs::read_to_string(case_path.join("drafts").join(draft_name))
78 .map_err(|e| AppError::io("read draft", &e))?;
79 let (frontmatter, _) = crate::markdown::read_doc::<DraftFrontmatter>(&draft_text)?;
80 let push_dir = root.join(".afmail/push");
81 create_dir_all(&push_dir)?;
82 let eml_path = push_dir.join(format!("{push_id}.eml"));
83 write_bytes_atomic(&eml_path, &prepared.raw, "write push eml")?;
84 let now = crate::store::now_rfc3339();
85 let item = PushItem {
86 schema_name: "push_item".to_string(),
87 schema_version: 1,
88 push_id: push_id.clone(),
89 payload: PushPayload::Outbound(Box::new(OutboundPush {
90 case_uid: case_uid.to_string(),
91 draft_name: draft_name.to_string(),
92 draft_hash: draft_hash.to_string(),
93 message_id: prepared.message_id.clone(),
94 reply_to_message_id: frontmatter.reply_to_message_id,
95 eml_path: rel_path(root, &eml_path),
96 envelope_from: prepared.envelope_from,
97 envelope_to: prepared.envelope_to,
98 drafts_mailbox_name: config.special_use_folder(SpecialUseKind::Drafts),
99 sent_mailbox_name: config.special_use_folder(SpecialUseKind::Sent),
100 draft_uid_validity: existing
101 .as_ref()
102 .and_then(|item| item.outbound())
103 .and_then(|outbound| outbound.draft_uid_validity),
104 draft_uid: existing
105 .as_ref()
106 .and_then(|item| item.outbound())
107 .and_then(|outbound| outbound.draft_uid),
108 draft_save_steps: config.actions.draft_save.steps.clone(),
109 draft_send_steps: config.actions.draft_send.steps.clone(),
110 })),
111 created_rfc3339: existing
112 .as_ref()
113 .map(|item| item.created_rfc3339.clone())
114 .unwrap_or_else(|| now.clone()),
115 updated_rfc3339: now,
116 attempt_count: existing.as_ref().map_or(0, |item| item.attempt_count),
117 step_states: existing
118 .as_ref()
119 .map(|item| item.step_states.clone())
120 .unwrap_or_default(),
121 last_error: None,
122 };
123 write_item(root, &item)?;
124 Ok(json!({
125 "code": "push_queued",
126 "push_id": push_id,
127 "kind": "outbound",
128 "case_uid": case_uid,
129 "draft_name": draft_name,
130 "draft_hash": draft_hash,
131 "message_id": prepared.message_id
132 }))
133}
134
135pub fn queue_action_steps(
136 root: &Path,
137 kind: &str,
138 message_ids: &[String],
139 locations: &[PushLocation],
140 steps: &[ActionStep],
141 reply_to_message_id: Option<String>,
142) -> Result<Option<PushItem>> {
143 if locations.is_empty() || steps.is_empty() {
144 return Ok(None);
145 }
146 let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
147 AppError::new(
148 "push_item_invalid",
149 format!("unsupported message push action kind: {kind}"),
150 )
151 })?;
152 let push_dir = root.join(".afmail/push");
153 create_dir_all(&push_dir)?;
154 let push_id = unique_push_id(root);
155 let now = crate::store::now_rfc3339();
156 let item = PushItem {
157 schema_name: "push_item".to_string(),
158 schema_version: 1,
159 push_id,
160 payload: PushPayload::MessageAction(MessageActionPush {
161 action,
162 message_ids: message_ids.to_vec(),
163 locations: locations.to_vec(),
164 steps: steps.to_vec(),
165 reply_to_message_id,
166 }),
167 created_rfc3339: now.clone(),
168 updated_rfc3339: now,
169 attempt_count: 0,
170 step_states: Vec::new(),
171 last_error: None,
172 };
173 write_item(root, &item)?;
174 Ok(Some(item))
175}
176
177#[derive(Clone, Debug, Default, serde::Serialize)]
178pub struct PushStatus {
179 pub drafts: usize,
181 pub case: usize,
183 pub archive: usize,
185 pub spam: usize,
187 pub trash: usize,
189}
190
191pub fn push_status(root: &Path) -> Result<PushStatus> {
192 let mut status = PushStatus::default();
193 for item in sorted_items(root)? {
194 match item_summary_label(&item) {
195 "drafts" => status.drafts += 1,
196 "case" => status.case += 1,
197 "archive" => status.archive += 1,
198 "spam" => status.spam += 1,
199 "trash" => status.trash += 1,
200 _ => {}
201 }
202 }
203 Ok(status)
204}
205
206pub fn list(root: &Path) -> Result<Value> {
207 let items = sorted_items(root)?;
208 Ok(json!({
209 "code": "push_list",
210 "count": items.len(),
211 "items": items
212 }))
213}
214
215pub(crate) fn pending_items(root: &Path) -> Result<Vec<PushItem>> {
216 sorted_items(root)
217}
218
219pub fn push(root: &Path, mode: PushMode, confirmed: bool) -> Result<Value> {
220 push_with_progress(root, mode, confirmed, None)
221}
222
223pub fn push_with_progress(
224 root: &Path,
225 mode: PushMode,
226 confirmed: bool,
227 progress: Option<&mut ProgressCallback<'_>>,
228) -> Result<Value> {
229 let mut progress = progress;
230 let items = filtered_items(root, mode)?;
231 if !confirmed {
232 crate::progress::emit(
233 &mut progress,
234 "push_preview",
235 json!({
236 "mode": mode.as_str(),
237 "item_count": items.len(),
238 }),
239 );
240 let rendered = items
241 .iter()
242 .map(|item| {
243 let outbound = item.outbound();
244 json!({
245 "push_id": item.push_id,
246 "kind": item.kind(),
247 "display_kind": item.display_kind(),
248 "actions": actions_for(mode, item),
249 "case_uid": outbound.map(|outbound| outbound.case_uid.as_str()),
250 "draft_name": outbound.map(|outbound| outbound.draft_name.as_str())
251 })
252 })
253 .collect::<Vec<_>>();
254 return Ok(json!({
255 "code": "push_dry_run",
256 "confirmed": false,
257 "hint": preview_hint(mode),
258 "items": rendered,
259 "count": rendered.len()
260 }));
261 }
262
263 let config = MailConfig::load(root)?;
264 let remote = crate::remote::ImapSmtpRemote::new(&config);
265 let mut pushed = 0usize;
266 let mut failed = 0usize;
267 let mut failures = Vec::new();
268 let item_count = items.len();
269 crate::progress::emit(
270 &mut progress,
271 "push_start",
272 json!({
273 "mode": mode.as_str(),
274 "item_count": item_count,
275 }),
276 );
277 for (index, mut item) in items.into_iter().enumerate() {
278 let progress_context = PushProgressContext {
279 item_index: index,
280 item_count,
281 };
282 crate::progress::emit(
283 &mut progress,
284 "push_item_start",
285 push_item_progress_fields(mode, &item, index, item_count, None),
286 );
287 let result = match (&item.payload, mode) {
288 (PushPayload::Outbound(_), PushMode::DraftsSend) => {
289 ensure_outbound_draft_fresh(root, &item).and_then(|_| {
290 push_outbound_send(
291 root,
292 &config,
293 &remote,
294 &mut item,
295 progress_context,
296 progress.as_deref_mut(),
297 )
298 })
299 }
300 (PushPayload::Outbound(_), PushMode::Drafts | PushMode::All) => {
301 ensure_outbound_draft_fresh(root, &item).and_then(|_| {
302 push_outbound_drafts(
303 root,
304 &config,
305 &remote,
306 &mut item,
307 progress_context,
308 progress.as_deref_mut(),
309 )
310 })
311 }
312 (PushPayload::MessageAction(_), _) => push_action_steps(
313 root,
314 &config,
315 &remote,
316 &mut item,
317 progress_context,
318 progress.as_deref_mut(),
319 ),
320 _ => Ok(()),
321 };
322 match result {
323 Ok(()) => {
324 let workspace = crate::store::Workspace::at(root);
325 let transaction = workspace.begin_transaction(
326 "push_commit",
327 vec![
328 format!(".afmail/push/{}.json", item.push_id),
329 "messages".to_string(),
330 ],
331 )?;
332 workspace.clear_pending_push_item(&item)?;
333 delete_item(root, &item)?;
334 transaction.commit()?;
335 let _ = audit_push(root, "push_succeeded", &item, None);
336 pushed += 1;
337 crate::progress::emit(
338 &mut progress,
339 "push_item_done",
340 push_item_progress_fields(mode, &item, index, item_count, None),
341 );
342 }
343 Err(err) => {
344 let _ = audit_push(root, "push_failed", &item, Some(&err));
345 failed += 1;
346 failures.push(json!({
347 "push_id": item.push_id,
348 "error_code": err.error_code,
349 "error": err.message
350 }));
351 item.attempt_count += 1;
352 item.updated_rfc3339 = crate::store::now_rfc3339();
353 item.last_error = Some(err.to_string());
354 crate::store::Workspace::at(root)
355 .mark_pending_push_error(&item, &err.to_string())?;
356 write_item(root, &item)?;
357 crate::progress::emit(
358 &mut progress,
359 "push_item_failed",
360 push_item_progress_fields(mode, &item, index, item_count, Some(&err)),
361 );
362 }
363 }
364 }
365 crate::progress::emit(
366 &mut progress,
367 "push_done",
368 json!({
369 "mode": mode.as_str(),
370 "item_count": item_count,
371 "pushed_count": pushed,
372 "failed_count": failed,
373 }),
374 );
375 Ok(json!({
376 "code": "push_result",
377 "confirmed": true,
378 "pushed_count": pushed,
379 "failed_count": failed,
380 "failures": failures
381 }))
382}
383
384fn push_item_progress_fields(
385 mode: PushMode,
386 item: &PushItem,
387 index: usize,
388 item_count: usize,
389 err: Option<&AppError>,
390) -> Value {
391 let mut value = json!({
392 "mode": mode.as_str(),
393 "push_id": item.push_id.as_str(),
394 "kind": item.kind(),
395 "display_kind": item.display_kind(),
396 "index": index + 1,
397 "item_count": item_count,
398 });
399 if let Some(err) = err {
400 if let Value::Object(map) = &mut value {
401 map.insert("error_code".to_string(), json!(err.error_code));
402 map.insert("error".to_string(), json!(err.message.as_str()));
403 map.insert("retryable".to_string(), json!(err.retryable));
404 }
405 }
406 value
407}
408
409fn audit_push(root: &Path, kind: &str, item: &PushItem, err: Option<&AppError>) -> Result<()> {
410 let mut targets = vec![json!({"kind": "push", "id": item.push_id.as_str()})];
411 if let Some(outbound) = item.outbound() {
412 targets.push(json!({"kind": "case", "id": outbound.case_uid.as_str()}));
413 targets.push(json!({"kind": "message", "id": outbound.message_id.as_str()}));
414 } else {
415 targets.extend(
416 item.message_ids()
417 .iter()
418 .map(|message_id| json!({"kind": "message", "id": message_id})),
419 );
420 }
421 let mut fields = json!({
422 "push_id": item.push_id.as_str(),
423 "push_kind": item.display_kind(),
424 "succeeded_step_count": item.succeeded_step_count(),
425 "attempt_count": item.attempt_count,
426 });
427 if let Some(outbound) = item.outbound() {
428 if let Value::Object(map) = &mut fields {
429 map.insert("case_uid".to_string(), json!(outbound.case_uid.as_str()));
430 map.insert(
431 "draft_name".to_string(),
432 json!(outbound.draft_name.as_str()),
433 );
434 map.insert(
435 "message_id".to_string(),
436 json!(outbound.message_id.as_str()),
437 );
438 }
439 }
440 if let Some(err) = err {
441 if let Value::Object(map) = &mut fields {
442 map.insert("error_code".to_string(), json!(err.error_code));
443 map.insert("error".to_string(), json!(err.message.as_str()));
444 map.insert("retryable".to_string(), json!(err.retryable));
445 }
446 }
447 crate::store::Workspace::at(root).append_audit_event(kind, targets, None, fields)
448}
449
450pub fn remove_outbound_for_draft(
451 root: &Path,
452 case_uid: &str,
453 draft_name: &str,
454) -> Result<Vec<RemovedOutbound>> {
455 let items = read_items(root)?
456 .into_iter()
457 .filter(|item| {
458 item.outbound().is_some_and(|outbound| {
459 outbound.case_uid == case_uid && outbound.draft_name == draft_name
460 })
461 })
462 .collect::<Vec<_>>();
463 if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
464 return Err(AppError::new(
465 "push_already_started",
466 format!(
467 "draft has an outbound push item that already started: {}",
468 item.push_id
469 ),
470 ));
471 }
472 let mut removed = Vec::new();
473 for item in items {
474 removed.push(RemovedOutbound {
475 push_id: item.push_id.clone(),
476 eml_path: item.outbound().map(|outbound| outbound.eml_path.clone()),
477 });
478 delete_item(root, &item)?;
479 }
480 Ok(removed)
481}
482
483pub fn remove_pending_message_pushes(
484 root: &Path,
485 message_id: &str,
486 kind: &str,
487) -> Result<Vec<RemovedMessagePush>> {
488 let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
489 AppError::new(
490 "push_item_invalid",
491 format!("unsupported message push action kind: {kind}"),
492 )
493 })?;
494 let items = read_items(root)?
495 .into_iter()
496 .filter(|item| {
497 item.message_action().is_some_and(|payload| {
498 payload.action == action
499 && (payload.message_ids.iter().any(|id| id == message_id)
500 || payload
501 .locations
502 .iter()
503 .any(|loc| loc.message_id == message_id))
504 })
505 })
506 .collect::<Vec<_>>();
507 if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
508 return Err(AppError::new(
509 "push_already_started",
510 format!(
511 "push item already started and cannot be undone locally: {}",
512 item.push_id
513 ),
514 ));
515 }
516
517 let mut removed = Vec::new();
518 for mut item in items {
519 let push_id = item.push_id.clone();
520 if let Some(payload) = item.message_action_mut() {
521 payload.message_ids.retain(|id| id != message_id);
522 payload.locations.retain(|loc| loc.message_id != message_id);
523 }
524 let empty = item
525 .message_action()
526 .is_some_and(|payload| payload.message_ids.is_empty() && payload.locations.is_empty());
527 if empty {
528 delete_item(root, &item)?;
529 } else {
530 item.updated_rfc3339 = crate::store::now_rfc3339();
531 write_item(root, &item)?;
532 }
533 removed.push(RemovedMessagePush { push_id });
534 }
535 Ok(removed)
536}
537
538fn preview_hint(mode: PushMode) -> &'static str {
539 if mode == PushMode::DraftsSend {
540 "No mail was sent. Re-run with --confirm to apply queued effects."
541 } else {
542 "No remote changes were made. Re-run with --confirm to apply queued effects."
543 }
544}