Skip to main content

agent_first_mail/push_queue/
mod.rs

1mod execute;
2mod io;
3mod preview;
4
5use execute::*;
6use io::*;
7use preview::*;
8
9use crate::config::{ActionStep, ActionStepOn, MailConfig, SpecialUseKind};
10use crate::error::{AppError, Result};
11use crate::frontmatter::DraftFrontmatter;
12use crate::progress::ProgressCallback;
13use crate::types::{
14    MessageActionPush, MessagePushAction, OutboundAction, OutboundPush, PushItem, PushLocation,
15    PushPayload, PushStepState, PushStepStatus,
16};
17use crate::util::write_string_atomic;
18use serde_json::{json, Value};
19use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Path, PathBuf};
22
23#[derive(Clone, Debug)]
24pub struct RemovedOutbound {
25    pub push_id: String,
26    pub action: OutboundAction,
27}
28
29#[derive(Clone, Debug)]
30pub struct RemovedMessagePush {
31    pub push_id: String,
32}
33
34pub fn queue_outbound(
35    root: &Path,
36    case_uid: &str,
37    draft_name: &str,
38    action: OutboundAction,
39) -> Result<Value> {
40    let existing = find_outbound_item(root, case_uid, draft_name)?;
41    let push_id = existing
42        .as_ref()
43        .map(|item| item.push_id.clone())
44        .unwrap_or_else(|| unique_push_id(root));
45    let push_dir = root.join(".afmail/push");
46    create_dir_all(&push_dir)?;
47    let now = crate::store::now_rfc3339();
48    let existing_outbound = existing.as_ref().and_then(|item| item.outbound());
49    let same_action = existing_outbound.is_some_and(|outbound| outbound.action == action);
50    let step_states = if same_action {
51        existing
52            .as_ref()
53            .map(|item| item.step_states.clone())
54            .unwrap_or_default()
55    } else {
56        Vec::new()
57    };
58    let item = PushItem {
59        schema_name: "push_item".to_string(),
60        schema_version: 1,
61        push_id: push_id.clone(),
62        payload: PushPayload::Outbound(Box::new(OutboundPush {
63            action,
64            case_uid: case_uid.to_string(),
65            draft_name: draft_name.to_string(),
66            draft_uid_validity: existing_outbound.and_then(|outbound| outbound.draft_uid_validity),
67            draft_uid: existing_outbound.and_then(|outbound| outbound.draft_uid),
68        })),
69        created_rfc3339: existing
70            .as_ref()
71            .map(|item| item.created_rfc3339.clone())
72            .unwrap_or_else(|| now.clone()),
73        updated_rfc3339: now,
74        attempt_count: if same_action {
75            existing.as_ref().map_or(0, |item| item.attempt_count)
76        } else {
77            0
78        },
79        step_states,
80        last_error: None,
81    };
82    write_item(root, &item)?;
83    Ok(json!({
84        "code": "push_queued",
85        "push_id": push_id,
86        "kind": "outbound",
87        "action": action.as_str(),
88        "case_uid": case_uid,
89        "draft_name": draft_name
90    }))
91}
92
93pub(crate) fn find_outbound_for_draft(
94    root: &Path,
95    case_uid: &str,
96    draft_name: &str,
97) -> Result<Option<PushItem>> {
98    find_outbound_item(root, case_uid, draft_name)
99}
100
101pub fn queue_action_steps(
102    root: &Path,
103    kind: &str,
104    message_ids: &[String],
105    locations: &[PushLocation],
106    steps: &[ActionStep],
107    reply_to_message_id: Option<String>,
108) -> Result<Option<PushItem>> {
109    if locations.is_empty() || steps.is_empty() {
110        return Ok(None);
111    }
112    let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
113        AppError::new(
114            "push_item_invalid",
115            format!("unsupported message push action kind: {kind}"),
116        )
117    })?;
118    let push_dir = root.join(".afmail/push");
119    create_dir_all(&push_dir)?;
120    let push_id = unique_push_id(root);
121    let now = crate::store::now_rfc3339();
122    let item = PushItem {
123        schema_name: "push_item".to_string(),
124        schema_version: 1,
125        push_id,
126        payload: PushPayload::MessageAction(MessageActionPush {
127            action,
128            message_ids: message_ids.to_vec(),
129            locations: locations.to_vec(),
130            steps: steps.to_vec(),
131            reply_to_message_id,
132        }),
133        created_rfc3339: now.clone(),
134        updated_rfc3339: now,
135        attempt_count: 0,
136        step_states: Vec::new(),
137        last_error: None,
138    };
139    write_item(root, &item)?;
140    Ok(Some(item))
141}
142
143#[derive(Clone, Debug, Default, serde::Serialize)]
144pub struct PushStatus {
145    /// Outbound drafts queued to save or send.
146    pub drafts: usize,
147    /// Case-membership flag operations queued.
148    pub case: usize,
149    /// Archive moves queued, not yet applied on the server.
150    pub archive: usize,
151    /// Spam (junk) moves queued, not yet applied on the server.
152    pub spam: usize,
153    /// Trash moves queued, not yet applied on the server.
154    pub trash: usize,
155}
156
157pub fn push_status(root: &Path) -> Result<PushStatus> {
158    let mut status = PushStatus::default();
159    for item in sorted_items(root)? {
160        match item_summary_label(&item) {
161            "drafts" => status.drafts += 1,
162            "case" => status.case += 1,
163            "archive" => status.archive += 1,
164            "spam" => status.spam += 1,
165            "trash" => status.trash += 1,
166            _ => {}
167        }
168    }
169    Ok(status)
170}
171
172pub fn list(root: &Path) -> Result<Value> {
173    let items = sorted_items(root)?;
174    Ok(json!({
175        "code": "push_list",
176        "count": items.len(),
177        "items": items
178    }))
179}
180
181pub(crate) fn pending_items(root: &Path) -> Result<Vec<PushItem>> {
182    sorted_items(root)
183}
184
185pub fn push_with_progress(
186    root: &Path,
187    confirmed: bool,
188    progress: Option<&mut ProgressCallback<'_>>,
189) -> Result<Value> {
190    let mut progress = progress;
191    let items = sorted_items(root)?;
192    let config = MailConfig::load(root)?;
193    if !confirmed {
194        crate::progress::emit(
195            &mut progress,
196            "push_preview",
197            json!({
198                "item_count": items.len(),
199            }),
200        );
201        let rendered = items
202            .iter()
203            .map(|item| {
204                let outbound = item.outbound();
205                Ok(json!({
206                    "push_id": item.push_id,
207                    "kind": item.kind(),
208                    "display_kind": item.display_kind(),
209                    "actions": actions_for(root, &config, item)?,
210                    "case_uid": outbound.map(|outbound| outbound.case_uid.as_str()),
211                    "draft_name": outbound.map(|outbound| outbound.draft_name.as_str()),
212                    "action": outbound.map(|outbound| outbound.action.as_str())
213                }))
214            })
215            .collect::<Result<Vec<_>>>()?;
216        return Ok(json!({
217            "code": "push_dry_run",
218            "confirmed": false,
219            "hint": preview_hint(),
220            "items": rendered,
221            "count": rendered.len()
222        }));
223    }
224
225    let remote = crate::remote::ImapSmtpRemote::new(&config);
226    let mut pushed = 0usize;
227    let mut failed = 0usize;
228    let mut failures = Vec::new();
229    let item_count = items.len();
230    crate::progress::emit(
231        &mut progress,
232        "push_start",
233        json!({
234            "item_count": item_count,
235        }),
236    );
237    for (index, mut item) in items.into_iter().enumerate() {
238        let progress_context = PushProgressContext {
239            item_index: index,
240            item_count,
241        };
242        crate::progress::emit(
243            &mut progress,
244            "push_item_start",
245            push_item_progress_fields(&item, index, item_count, None),
246        );
247        let result = match &item.payload {
248            PushPayload::Outbound(_) => push_outbound(
249                root,
250                &config,
251                &remote,
252                &mut item,
253                progress_context,
254                progress.as_deref_mut(),
255            ),
256            PushPayload::MessageAction(_) => push_action_steps(
257                root,
258                &config,
259                &remote,
260                &mut item,
261                progress_context,
262                progress.as_deref_mut(),
263            ),
264        };
265        match result {
266            Ok(()) => {
267                let workspace = crate::store::Workspace::at(root);
268                let transaction = workspace.begin_transaction(
269                    "push_commit",
270                    vec![
271                        format!(".afmail/push/{}.json", item.push_id),
272                        "cases".to_string(),
273                        "messages".to_string(),
274                    ],
275                )?;
276                workspace.consume_outbound_draft_after_push(&item, &config)?;
277                delete_item(root, &item)?;
278                workspace.clear_pending_push_item(&item)?;
279                transaction.commit()?;
280                let _ = audit_push(root, "push_succeeded", &item, None);
281                pushed += 1;
282                crate::progress::emit(
283                    &mut progress,
284                    "push_item_done",
285                    push_item_progress_fields(&item, index, item_count, None),
286                );
287            }
288            Err(err) => {
289                let _ = audit_push(root, "push_failed", &item, Some(&err));
290                failed += 1;
291                failures.push(json!({
292                    "push_id": item.push_id,
293                    "error_code": err.error_code,
294                    "error": err.message
295                }));
296                item.attempt_count += 1;
297                item.updated_rfc3339 = crate::store::now_rfc3339();
298                item.last_error = Some(err.to_string());
299                write_item(root, &item)?;
300                crate::store::Workspace::at(root)
301                    .mark_pending_push_error(&item, &err.to_string())?;
302                crate::progress::emit(
303                    &mut progress,
304                    "push_item_failed",
305                    push_item_progress_fields(&item, index, item_count, Some(&err)),
306                );
307            }
308        }
309    }
310    crate::progress::emit(
311        &mut progress,
312        "push_done",
313        json!({
314            "item_count": item_count,
315            "pushed_count": pushed,
316            "failed_count": failed,
317        }),
318    );
319    Ok(json!({
320        "code": "push_result",
321        "confirmed": true,
322        "pushed_count": pushed,
323        "failed_count": failed,
324        "failures": failures
325    }))
326}
327
328fn push_item_progress_fields(
329    item: &PushItem,
330    index: usize,
331    item_count: usize,
332    err: Option<&AppError>,
333) -> Value {
334    let mut value = json!({
335        "push_id": item.push_id.as_str(),
336        "kind": item.kind(),
337        "display_kind": item.display_kind(),
338        "index": index + 1,
339        "item_count": item_count,
340    });
341    if let Some(err) = err {
342        if let Value::Object(map) = &mut value {
343            map.insert("error_code".to_string(), json!(err.error_code));
344            map.insert("error".to_string(), json!(err.message.as_str()));
345            map.insert("retryable".to_string(), json!(err.retryable));
346        }
347    }
348    value
349}
350
351fn audit_push(root: &Path, kind: &str, item: &PushItem, err: Option<&AppError>) -> Result<()> {
352    let mut targets = vec![json!({"kind": "push", "id": item.push_id.as_str()})];
353    if let Some(outbound) = item.outbound() {
354        targets.push(json!({"kind": "case", "id": outbound.case_uid.as_str()}));
355    } else {
356        targets.extend(
357            item.message_ids()
358                .iter()
359                .map(|message_id| json!({"kind": "message", "id": message_id})),
360        );
361    }
362    let mut fields = json!({
363        "push_id": item.push_id.as_str(),
364        "push_kind": item.display_kind(),
365        "succeeded_step_count": item.succeeded_step_count(),
366        "attempt_count": item.attempt_count,
367    });
368    if let Some(outbound) = item.outbound() {
369        if let Value::Object(map) = &mut fields {
370            map.insert("case_uid".to_string(), json!(outbound.case_uid.as_str()));
371            map.insert(
372                "draft_name".to_string(),
373                json!(outbound.draft_name.as_str()),
374            );
375            map.insert("action".to_string(), json!(outbound.action.as_str()));
376        }
377    }
378    if let Some(err) = err {
379        if let Value::Object(map) = &mut fields {
380            map.insert("error_code".to_string(), json!(err.error_code));
381            map.insert("error".to_string(), json!(err.message.as_str()));
382            map.insert("retryable".to_string(), json!(err.retryable));
383        }
384    }
385    crate::store::Workspace::at(root).append_audit_event(kind, targets, None, fields)
386}
387
388pub fn remove_outbound_for_draft(
389    root: &Path,
390    case_uid: &str,
391    draft_name: &str,
392) -> Result<Vec<RemovedOutbound>> {
393    let items = read_items(root)?
394        .into_iter()
395        .filter(|item| {
396            item.outbound().is_some_and(|outbound| {
397                outbound.case_uid == case_uid && outbound.draft_name == draft_name
398            })
399        })
400        .collect::<Vec<_>>();
401    let mut removed = Vec::new();
402    for item in items {
403        removed.push(RemovedOutbound {
404            push_id: item.push_id.clone(),
405            action: item
406                .outbound()
407                .map(|outbound| outbound.action)
408                .unwrap_or(OutboundAction::Send),
409        });
410        delete_item(root, &item)?;
411    }
412    Ok(removed)
413}
414
415pub fn remove_pending_message_pushes(
416    root: &Path,
417    message_id: &str,
418    kind: &str,
419) -> Result<Vec<RemovedMessagePush>> {
420    let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
421        AppError::new(
422            "push_item_invalid",
423            format!("unsupported message push action kind: {kind}"),
424        )
425    })?;
426    let items = read_items(root)?
427        .into_iter()
428        .filter(|item| {
429            item.message_action().is_some_and(|payload| {
430                payload.action == action
431                    && (payload.message_ids.iter().any(|id| id == message_id)
432                        || payload
433                            .locations
434                            .iter()
435                            .any(|loc| loc.message_id == message_id))
436            })
437        })
438        .collect::<Vec<_>>();
439    if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
440        return Err(AppError::new(
441            "push_already_started",
442            format!(
443                "push item already started and cannot be undone locally: {}",
444                item.push_id
445            ),
446        ));
447    }
448
449    let mut removed = Vec::new();
450    for mut item in items {
451        let push_id = item.push_id.clone();
452        if let Some(payload) = item.message_action_mut() {
453            payload.message_ids.retain(|id| id != message_id);
454            payload.locations.retain(|loc| loc.message_id != message_id);
455        }
456        let empty = item
457            .message_action()
458            .is_some_and(|payload| payload.message_ids.is_empty() && payload.locations.is_empty());
459        if empty {
460            delete_item(root, &item)?;
461        } else {
462            item.updated_rfc3339 = crate::store::now_rfc3339();
463            write_item(root, &item)?;
464        }
465        removed.push(RemovedMessagePush { push_id });
466    }
467    Ok(removed)
468}
469
470fn preview_hint() -> &'static str {
471    "No remote changes were made. Re-run with --confirm to apply queued effects."
472}