Skip to main content

agent_first_mail/push_queue/
mod.rs

1mod execute;
2mod io;
3mod preview;
4
5use execute::*;
6use io::*;
7use preview::*;
8
9use crate::config::{ActionStep, ActionStepOn, MailConfig, SpecialUseKind};
10use crate::error::{AppError, Result};
11use crate::frontmatter::DraftFrontmatter;
12use crate::progress::ProgressCallback;
13use crate::types::{
14    MessageActionPush, MessagePushAction, OutboundPush, PushItem, PushLocation, PushPayload,
15    PushStepState, PushStepStatus,
16};
17use crate::util::{write_bytes_atomic, write_string_atomic};
18use serde_json::{json, Value};
19use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Component, Path, PathBuf};
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub enum PushMode {
25    All,
26    Drafts,
27    DraftsSend,
28    Archive,
29    Spam,
30    Trash,
31}
32
33impl PushMode {
34    pub(crate) fn as_str(self) -> &'static str {
35        match self {
36            Self::All => "all",
37            Self::Drafts => "drafts",
38            Self::DraftsSend => "drafts-send",
39            Self::Archive => "archive",
40            Self::Spam => "spam",
41            Self::Trash => "trash",
42        }
43    }
44}
45
46#[derive(Clone, Debug)]
47pub struct RemovedOutbound {
48    pub push_id: String,
49    pub eml_path: Option<String>,
50}
51
52#[derive(Clone, Debug)]
53pub struct RemovedMessagePush {
54    pub push_id: String,
55}
56
57pub fn queue_outbound(
58    root: &Path,
59    case_path: &Path,
60    case_uid: &str,
61    draft_name: &str,
62    draft_hash: &str,
63    config: &MailConfig,
64) -> Result<Value> {
65    let existing = find_outbound_item(root, case_uid, draft_name)?;
66    let push_id = existing
67        .as_ref()
68        .map(|item| item.push_id.clone())
69        .unwrap_or_else(|| unique_push_id(root));
70    let message_id = existing
71        .as_ref()
72        .and_then(|item| item.outbound())
73        .map(|outbound| outbound.message_id.as_str());
74    let prepared = crate::smtp_send::prepare_outbound(
75        root, case_path, case_uid, draft_name, config, message_id,
76    )?;
77    let draft_text = fs::read_to_string(case_path.join("drafts").join(draft_name))
78        .map_err(|e| AppError::io("read draft", &e))?;
79    let (frontmatter, _) = crate::markdown::read_doc::<DraftFrontmatter>(&draft_text)?;
80    let push_dir = root.join(".afmail/push");
81    create_dir_all(&push_dir)?;
82    let eml_path = push_dir.join(format!("{push_id}.eml"));
83    write_bytes_atomic(&eml_path, &prepared.raw, "write push eml")?;
84    let now = crate::store::now_rfc3339();
85    let item = PushItem {
86        schema_name: "push_item".to_string(),
87        schema_version: 1,
88        push_id: push_id.clone(),
89        payload: PushPayload::Outbound(Box::new(OutboundPush {
90            case_uid: case_uid.to_string(),
91            draft_name: draft_name.to_string(),
92            draft_hash: draft_hash.to_string(),
93            message_id: prepared.message_id.clone(),
94            reply_to_message_id: frontmatter.reply_to_message_id,
95            eml_path: rel_path(root, &eml_path),
96            envelope_from: prepared.envelope_from,
97            envelope_to: prepared.envelope_to,
98            drafts_mailbox_name: config.special_use_folder(SpecialUseKind::Drafts),
99            sent_mailbox_name: config.special_use_folder(SpecialUseKind::Sent),
100            draft_uid_validity: existing
101                .as_ref()
102                .and_then(|item| item.outbound())
103                .and_then(|outbound| outbound.draft_uid_validity),
104            draft_uid: existing
105                .as_ref()
106                .and_then(|item| item.outbound())
107                .and_then(|outbound| outbound.draft_uid),
108            draft_save_steps: config.actions.draft_save.steps.clone(),
109            draft_send_steps: config.actions.draft_send.steps.clone(),
110        })),
111        created_rfc3339: existing
112            .as_ref()
113            .map(|item| item.created_rfc3339.clone())
114            .unwrap_or_else(|| now.clone()),
115        updated_rfc3339: now,
116        attempt_count: existing.as_ref().map_or(0, |item| item.attempt_count),
117        step_states: existing
118            .as_ref()
119            .map(|item| item.step_states.clone())
120            .unwrap_or_default(),
121        last_error: None,
122    };
123    write_item(root, &item)?;
124    Ok(json!({
125        "code": "push_queued",
126        "push_id": push_id,
127        "kind": "outbound",
128        "case_uid": case_uid,
129        "draft_name": draft_name,
130        "draft_hash": draft_hash,
131        "message_id": prepared.message_id
132    }))
133}
134
135pub fn queue_action_steps(
136    root: &Path,
137    kind: &str,
138    message_ids: &[String],
139    locations: &[PushLocation],
140    steps: &[ActionStep],
141    reply_to_message_id: Option<String>,
142) -> Result<Option<PushItem>> {
143    if locations.is_empty() || steps.is_empty() {
144        return Ok(None);
145    }
146    let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
147        AppError::new(
148            "push_item_invalid",
149            format!("unsupported message push action kind: {kind}"),
150        )
151    })?;
152    let push_dir = root.join(".afmail/push");
153    create_dir_all(&push_dir)?;
154    let push_id = unique_push_id(root);
155    let now = crate::store::now_rfc3339();
156    let item = PushItem {
157        schema_name: "push_item".to_string(),
158        schema_version: 1,
159        push_id,
160        payload: PushPayload::MessageAction(MessageActionPush {
161            action,
162            message_ids: message_ids.to_vec(),
163            locations: locations.to_vec(),
164            steps: steps.to_vec(),
165            reply_to_message_id,
166        }),
167        created_rfc3339: now.clone(),
168        updated_rfc3339: now,
169        attempt_count: 0,
170        step_states: Vec::new(),
171        last_error: None,
172    };
173    write_item(root, &item)?;
174    Ok(Some(item))
175}
176
177#[derive(Clone, Debug, Default, serde::Serialize)]
178pub struct PushStatus {
179    /// Outbound drafts queued to save or send.
180    pub drafts: usize,
181    /// Case-membership flag operations queued.
182    pub case: usize,
183    /// Archive moves queued, not yet applied on the server.
184    pub archive: usize,
185    /// Spam (junk) moves queued, not yet applied on the server.
186    pub spam: usize,
187    /// Trash moves queued, not yet applied on the server.
188    pub trash: usize,
189}
190
191pub fn push_status(root: &Path) -> Result<PushStatus> {
192    let mut status = PushStatus::default();
193    for item in sorted_items(root)? {
194        match item_summary_label(&item) {
195            "drafts" => status.drafts += 1,
196            "case" => status.case += 1,
197            "archive" => status.archive += 1,
198            "spam" => status.spam += 1,
199            "trash" => status.trash += 1,
200            _ => {}
201        }
202    }
203    Ok(status)
204}
205
206pub fn list(root: &Path) -> Result<Value> {
207    let items = sorted_items(root)?;
208    Ok(json!({
209        "code": "push_list",
210        "count": items.len(),
211        "items": items
212    }))
213}
214
215pub(crate) fn pending_items(root: &Path) -> Result<Vec<PushItem>> {
216    sorted_items(root)
217}
218
219pub fn push(root: &Path, mode: PushMode, confirmed: bool) -> Result<Value> {
220    push_with_progress(root, mode, confirmed, None)
221}
222
223pub fn push_with_progress(
224    root: &Path,
225    mode: PushMode,
226    confirmed: bool,
227    progress: Option<&mut ProgressCallback<'_>>,
228) -> Result<Value> {
229    let mut progress = progress;
230    let items = filtered_items(root, mode)?;
231    if !confirmed {
232        crate::progress::emit(
233            &mut progress,
234            "push_preview",
235            json!({
236                "mode": mode.as_str(),
237                "item_count": items.len(),
238            }),
239        );
240        let rendered = items
241            .iter()
242            .map(|item| {
243                let outbound = item.outbound();
244                json!({
245                    "push_id": item.push_id,
246                    "kind": item.kind(),
247                    "display_kind": item.display_kind(),
248                    "actions": actions_for(mode, item),
249                    "case_uid": outbound.map(|outbound| outbound.case_uid.as_str()),
250                    "draft_name": outbound.map(|outbound| outbound.draft_name.as_str())
251                })
252            })
253            .collect::<Vec<_>>();
254        return Ok(json!({
255            "code": "push_dry_run",
256            "confirmed": false,
257            "hint": preview_hint(mode),
258            "items": rendered,
259            "count": rendered.len()
260        }));
261    }
262
263    let config = MailConfig::load(root)?;
264    let remote = crate::remote::ImapSmtpRemote::new(&config);
265    let mut pushed = 0usize;
266    let mut failed = 0usize;
267    let mut failures = Vec::new();
268    let item_count = items.len();
269    crate::progress::emit(
270        &mut progress,
271        "push_start",
272        json!({
273            "mode": mode.as_str(),
274            "item_count": item_count,
275        }),
276    );
277    for (index, mut item) in items.into_iter().enumerate() {
278        let progress_context = PushProgressContext {
279            item_index: index,
280            item_count,
281        };
282        crate::progress::emit(
283            &mut progress,
284            "push_item_start",
285            push_item_progress_fields(mode, &item, index, item_count, None),
286        );
287        let result = match (&item.payload, mode) {
288            (PushPayload::Outbound(_), PushMode::DraftsSend) => {
289                ensure_outbound_draft_fresh(root, &item).and_then(|_| {
290                    push_outbound_send(
291                        root,
292                        &config,
293                        &remote,
294                        &mut item,
295                        progress_context,
296                        progress.as_deref_mut(),
297                    )
298                })
299            }
300            (PushPayload::Outbound(_), PushMode::Drafts | PushMode::All) => {
301                ensure_outbound_draft_fresh(root, &item).and_then(|_| {
302                    push_outbound_drafts(
303                        root,
304                        &config,
305                        &remote,
306                        &mut item,
307                        progress_context,
308                        progress.as_deref_mut(),
309                    )
310                })
311            }
312            (PushPayload::MessageAction(_), _) => push_action_steps(
313                root,
314                &config,
315                &remote,
316                &mut item,
317                progress_context,
318                progress.as_deref_mut(),
319            ),
320            _ => Ok(()),
321        };
322        match result {
323            Ok(()) => {
324                let workspace = crate::store::Workspace::at(root);
325                let transaction = workspace.begin_transaction(
326                    "push_commit",
327                    vec![
328                        format!(".afmail/push/{}.json", item.push_id),
329                        "messages".to_string(),
330                    ],
331                )?;
332                workspace.clear_pending_push_item(&item)?;
333                delete_item(root, &item)?;
334                transaction.commit()?;
335                let _ = audit_push(root, "push_succeeded", &item, None);
336                pushed += 1;
337                crate::progress::emit(
338                    &mut progress,
339                    "push_item_done",
340                    push_item_progress_fields(mode, &item, index, item_count, None),
341                );
342            }
343            Err(err) => {
344                let _ = audit_push(root, "push_failed", &item, Some(&err));
345                failed += 1;
346                failures.push(json!({
347                    "push_id": item.push_id,
348                    "error_code": err.error_code,
349                    "error": err.message
350                }));
351                item.attempt_count += 1;
352                item.updated_rfc3339 = crate::store::now_rfc3339();
353                item.last_error = Some(err.to_string());
354                crate::store::Workspace::at(root)
355                    .mark_pending_push_error(&item, &err.to_string())?;
356                write_item(root, &item)?;
357                crate::progress::emit(
358                    &mut progress,
359                    "push_item_failed",
360                    push_item_progress_fields(mode, &item, index, item_count, Some(&err)),
361                );
362            }
363        }
364    }
365    crate::progress::emit(
366        &mut progress,
367        "push_done",
368        json!({
369            "mode": mode.as_str(),
370            "item_count": item_count,
371            "pushed_count": pushed,
372            "failed_count": failed,
373        }),
374    );
375    Ok(json!({
376        "code": "push_result",
377        "confirmed": true,
378        "pushed_count": pushed,
379        "failed_count": failed,
380        "failures": failures
381    }))
382}
383
384fn push_item_progress_fields(
385    mode: PushMode,
386    item: &PushItem,
387    index: usize,
388    item_count: usize,
389    err: Option<&AppError>,
390) -> Value {
391    let mut value = json!({
392        "mode": mode.as_str(),
393        "push_id": item.push_id.as_str(),
394        "kind": item.kind(),
395        "display_kind": item.display_kind(),
396        "index": index + 1,
397        "item_count": item_count,
398    });
399    if let Some(err) = err {
400        if let Value::Object(map) = &mut value {
401            map.insert("error_code".to_string(), json!(err.error_code));
402            map.insert("error".to_string(), json!(err.message.as_str()));
403            map.insert("retryable".to_string(), json!(err.retryable));
404        }
405    }
406    value
407}
408
409fn audit_push(root: &Path, kind: &str, item: &PushItem, err: Option<&AppError>) -> Result<()> {
410    let mut targets = vec![json!({"kind": "push", "id": item.push_id.as_str()})];
411    if let Some(outbound) = item.outbound() {
412        targets.push(json!({"kind": "case", "id": outbound.case_uid.as_str()}));
413        targets.push(json!({"kind": "message", "id": outbound.message_id.as_str()}));
414    } else {
415        targets.extend(
416            item.message_ids()
417                .iter()
418                .map(|message_id| json!({"kind": "message", "id": message_id})),
419        );
420    }
421    let mut fields = json!({
422        "push_id": item.push_id.as_str(),
423        "push_kind": item.display_kind(),
424        "succeeded_step_count": item.succeeded_step_count(),
425        "attempt_count": item.attempt_count,
426    });
427    if let Some(outbound) = item.outbound() {
428        if let Value::Object(map) = &mut fields {
429            map.insert("case_uid".to_string(), json!(outbound.case_uid.as_str()));
430            map.insert(
431                "draft_name".to_string(),
432                json!(outbound.draft_name.as_str()),
433            );
434            map.insert(
435                "message_id".to_string(),
436                json!(outbound.message_id.as_str()),
437            );
438        }
439    }
440    if let Some(err) = err {
441        if let Value::Object(map) = &mut fields {
442            map.insert("error_code".to_string(), json!(err.error_code));
443            map.insert("error".to_string(), json!(err.message.as_str()));
444            map.insert("retryable".to_string(), json!(err.retryable));
445        }
446    }
447    crate::store::Workspace::at(root).append_audit_event(kind, targets, None, fields)
448}
449
450pub fn remove_outbound_for_draft(
451    root: &Path,
452    case_uid: &str,
453    draft_name: &str,
454) -> Result<Vec<RemovedOutbound>> {
455    let items = read_items(root)?
456        .into_iter()
457        .filter(|item| {
458            item.outbound().is_some_and(|outbound| {
459                outbound.case_uid == case_uid && outbound.draft_name == draft_name
460            })
461        })
462        .collect::<Vec<_>>();
463    if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
464        return Err(AppError::new(
465            "push_already_started",
466            format!(
467                "draft has an outbound push item that already started: {}",
468                item.push_id
469            ),
470        ));
471    }
472    let mut removed = Vec::new();
473    for item in items {
474        removed.push(RemovedOutbound {
475            push_id: item.push_id.clone(),
476            eml_path: item.outbound().map(|outbound| outbound.eml_path.clone()),
477        });
478        delete_item(root, &item)?;
479    }
480    Ok(removed)
481}
482
483pub fn remove_pending_message_pushes(
484    root: &Path,
485    message_id: &str,
486    kind: &str,
487) -> Result<Vec<RemovedMessagePush>> {
488    let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
489        AppError::new(
490            "push_item_invalid",
491            format!("unsupported message push action kind: {kind}"),
492        )
493    })?;
494    let items = read_items(root)?
495        .into_iter()
496        .filter(|item| {
497            item.message_action().is_some_and(|payload| {
498                payload.action == action
499                    && (payload.message_ids.iter().any(|id| id == message_id)
500                        || payload
501                            .locations
502                            .iter()
503                            .any(|loc| loc.message_id == message_id))
504            })
505        })
506        .collect::<Vec<_>>();
507    if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
508        return Err(AppError::new(
509            "push_already_started",
510            format!(
511                "push item already started and cannot be undone locally: {}",
512                item.push_id
513            ),
514        ));
515    }
516
517    let mut removed = Vec::new();
518    for mut item in items {
519        let push_id = item.push_id.clone();
520        if let Some(payload) = item.message_action_mut() {
521            payload.message_ids.retain(|id| id != message_id);
522            payload.locations.retain(|loc| loc.message_id != message_id);
523        }
524        let empty = item
525            .message_action()
526            .is_some_and(|payload| payload.message_ids.is_empty() && payload.locations.is_empty());
527        if empty {
528            delete_item(root, &item)?;
529        } else {
530            item.updated_rfc3339 = crate::store::now_rfc3339();
531            write_item(root, &item)?;
532        }
533        removed.push(RemovedMessagePush { push_id });
534    }
535    Ok(removed)
536}
537
538fn preview_hint(mode: PushMode) -> &'static str {
539    if mode == PushMode::DraftsSend {
540        "No mail was sent. Re-run with --confirm to apply queued effects."
541    } else {
542        "No remote changes were made. Re-run with --confirm to apply queued effects."
543    }
544}