1mod execute;
2mod io;
3mod preview;
4
5use execute::*;
6use io::*;
7use preview::*;
8
9use crate::config::{ActionStep, ActionStepOn, MailConfig, SpecialUseKind};
10use crate::error::{AppError, Result};
11use crate::frontmatter::DraftFrontmatter;
12use crate::progress::ProgressCallback;
13use crate::types::{
14 MessageActionPush, MessagePushAction, OutboundAction, OutboundPush, PushItem, PushLocation,
15 PushPayload, PushStepState, PushStepStatus,
16};
17use crate::util::write_string_atomic;
18use serde_json::{json, Value};
19use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Path, PathBuf};
22
23#[derive(Clone, Debug)]
24pub struct RemovedOutbound {
25 pub push_id: String,
26 pub action: OutboundAction,
27}
28
29#[derive(Clone, Debug)]
30pub struct RemovedMessagePush {
31 pub push_id: String,
32}
33
34pub fn queue_outbound(
35 root: &Path,
36 case_uid: &str,
37 draft_name: &str,
38 action: OutboundAction,
39) -> Result<Value> {
40 let existing = find_outbound_item(root, case_uid, draft_name)?;
41 let push_id = existing
42 .as_ref()
43 .map(|item| item.push_id.clone())
44 .unwrap_or_else(|| unique_push_id(root));
45 let push_dir = root.join(".afmail/push");
46 create_dir_all(&push_dir)?;
47 let now = crate::store::now_rfc3339();
48 let existing_outbound = existing.as_ref().and_then(|item| item.outbound());
49 let same_action = existing_outbound.is_some_and(|outbound| outbound.action == action);
50 let step_states = if same_action {
51 existing
52 .as_ref()
53 .map(|item| item.step_states.clone())
54 .unwrap_or_default()
55 } else {
56 Vec::new()
57 };
58 let item = PushItem {
59 schema_name: "push_item".to_string(),
60 schema_version: 1,
61 push_id: push_id.clone(),
62 payload: PushPayload::Outbound(Box::new(OutboundPush {
63 action,
64 case_uid: case_uid.to_string(),
65 draft_name: draft_name.to_string(),
66 draft_uid_validity: existing_outbound.and_then(|outbound| outbound.draft_uid_validity),
67 draft_uid: existing_outbound.and_then(|outbound| outbound.draft_uid),
68 })),
69 created_rfc3339: existing
70 .as_ref()
71 .map(|item| item.created_rfc3339.clone())
72 .unwrap_or_else(|| now.clone()),
73 updated_rfc3339: now,
74 attempt_count: if same_action {
75 existing.as_ref().map_or(0, |item| item.attempt_count)
76 } else {
77 0
78 },
79 step_states,
80 last_error: None,
81 };
82 write_item(root, &item)?;
83 Ok(json!({
84 "code": "push_queued",
85 "push_id": push_id,
86 "kind": "outbound",
87 "action": action.as_str(),
88 "case_uid": case_uid,
89 "draft_name": draft_name
90 }))
91}
92
93pub(crate) fn find_outbound_for_draft(
94 root: &Path,
95 case_uid: &str,
96 draft_name: &str,
97) -> Result<Option<PushItem>> {
98 find_outbound_item(root, case_uid, draft_name)
99}
100
101pub fn queue_action_steps(
102 root: &Path,
103 kind: &str,
104 message_ids: &[String],
105 locations: &[PushLocation],
106 steps: &[ActionStep],
107 reply_to_message_id: Option<String>,
108) -> Result<Option<PushItem>> {
109 if locations.is_empty() || steps.is_empty() {
110 return Ok(None);
111 }
112 let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
113 AppError::new(
114 "push_item_invalid",
115 format!("unsupported message push action kind: {kind}"),
116 )
117 })?;
118 let push_dir = root.join(".afmail/push");
119 create_dir_all(&push_dir)?;
120 let push_id = unique_push_id(root);
121 let now = crate::store::now_rfc3339();
122 let item = PushItem {
123 schema_name: "push_item".to_string(),
124 schema_version: 1,
125 push_id,
126 payload: PushPayload::MessageAction(MessageActionPush {
127 action,
128 message_ids: message_ids.to_vec(),
129 locations: locations.to_vec(),
130 steps: steps.to_vec(),
131 reply_to_message_id,
132 }),
133 created_rfc3339: now.clone(),
134 updated_rfc3339: now,
135 attempt_count: 0,
136 step_states: Vec::new(),
137 last_error: None,
138 };
139 write_item(root, &item)?;
140 Ok(Some(item))
141}
142
143#[derive(Clone, Debug, Default, serde::Serialize)]
144pub struct PushStatus {
145 pub drafts: usize,
147 pub case: usize,
149 pub archive: usize,
151 pub spam: usize,
153 pub trash: usize,
155}
156
157pub fn push_status(root: &Path) -> Result<PushStatus> {
158 let mut status = PushStatus::default();
159 for item in sorted_items(root)? {
160 match item_summary_label(&item) {
161 "drafts" => status.drafts += 1,
162 "case" => status.case += 1,
163 "archive" => status.archive += 1,
164 "spam" => status.spam += 1,
165 "trash" => status.trash += 1,
166 _ => {}
167 }
168 }
169 Ok(status)
170}
171
172pub fn list(root: &Path) -> Result<Value> {
173 let items = sorted_items(root)?;
174 Ok(json!({
175 "code": "push_list",
176 "count": items.len(),
177 "items": items
178 }))
179}
180
181pub(crate) fn pending_items(root: &Path) -> Result<Vec<PushItem>> {
182 sorted_items(root)
183}
184
185pub fn push_with_progress(
186 root: &Path,
187 confirmed: bool,
188 progress: Option<&mut ProgressCallback<'_>>,
189) -> Result<Value> {
190 let mut progress = progress;
191 let items = sorted_items(root)?;
192 let config = MailConfig::load(root)?;
193 if !confirmed {
194 crate::progress::emit(
195 &mut progress,
196 "push_preview",
197 json!({
198 "item_count": items.len(),
199 }),
200 );
201 let rendered = items
202 .iter()
203 .map(|item| {
204 let outbound = item.outbound();
205 Ok(json!({
206 "push_id": item.push_id,
207 "kind": item.kind(),
208 "display_kind": item.display_kind(),
209 "actions": actions_for(root, &config, item)?,
210 "case_uid": outbound.map(|outbound| outbound.case_uid.as_str()),
211 "draft_name": outbound.map(|outbound| outbound.draft_name.as_str()),
212 "action": outbound.map(|outbound| outbound.action.as_str())
213 }))
214 })
215 .collect::<Result<Vec<_>>>()?;
216 return Ok(json!({
217 "code": "push_dry_run",
218 "confirmed": false,
219 "hint": preview_hint(),
220 "items": rendered,
221 "count": rendered.len()
222 }));
223 }
224
225 let remote = crate::remote::ImapSmtpRemote::new(&config);
226 let mut pushed = 0usize;
227 let mut failed = 0usize;
228 let mut failures = Vec::new();
229 let item_count = items.len();
230 crate::progress::emit(
231 &mut progress,
232 "push_start",
233 json!({
234 "item_count": item_count,
235 }),
236 );
237 for (index, mut item) in items.into_iter().enumerate() {
238 let progress_context = PushProgressContext {
239 item_index: index,
240 item_count,
241 };
242 crate::progress::emit(
243 &mut progress,
244 "push_item_start",
245 push_item_progress_fields(&item, index, item_count, None),
246 );
247 let result = match &item.payload {
248 PushPayload::Outbound(_) => push_outbound(
249 root,
250 &config,
251 &remote,
252 &mut item,
253 progress_context,
254 progress.as_deref_mut(),
255 ),
256 PushPayload::MessageAction(_) => push_action_steps(
257 root,
258 &config,
259 &remote,
260 &mut item,
261 progress_context,
262 progress.as_deref_mut(),
263 ),
264 };
265 match result {
266 Ok(()) => {
267 let workspace = crate::store::Workspace::at(root);
268 let transaction = workspace.begin_transaction(
269 "push_commit",
270 vec![
271 format!(".afmail/push/{}.json", item.push_id),
272 "cases".to_string(),
273 "messages".to_string(),
274 ],
275 )?;
276 workspace.consume_outbound_draft_after_push(&item, &config)?;
277 delete_item(root, &item)?;
278 workspace.clear_pending_push_item(&item)?;
279 transaction.commit()?;
280 let _ = audit_push(root, "push_succeeded", &item, None);
281 pushed += 1;
282 crate::progress::emit(
283 &mut progress,
284 "push_item_done",
285 push_item_progress_fields(&item, index, item_count, None),
286 );
287 }
288 Err(err) => {
289 let _ = audit_push(root, "push_failed", &item, Some(&err));
290 failed += 1;
291 failures.push(json!({
292 "push_id": item.push_id,
293 "error_code": err.error_code,
294 "error": err.message
295 }));
296 item.attempt_count += 1;
297 item.updated_rfc3339 = crate::store::now_rfc3339();
298 item.last_error = Some(err.to_string());
299 write_item(root, &item)?;
300 crate::store::Workspace::at(root)
301 .mark_pending_push_error(&item, &err.to_string())?;
302 crate::progress::emit(
303 &mut progress,
304 "push_item_failed",
305 push_item_progress_fields(&item, index, item_count, Some(&err)),
306 );
307 }
308 }
309 }
310 crate::progress::emit(
311 &mut progress,
312 "push_done",
313 json!({
314 "item_count": item_count,
315 "pushed_count": pushed,
316 "failed_count": failed,
317 }),
318 );
319 Ok(json!({
320 "code": "push_result",
321 "confirmed": true,
322 "pushed_count": pushed,
323 "failed_count": failed,
324 "failures": failures
325 }))
326}
327
328fn push_item_progress_fields(
329 item: &PushItem,
330 index: usize,
331 item_count: usize,
332 err: Option<&AppError>,
333) -> Value {
334 let mut value = json!({
335 "push_id": item.push_id.as_str(),
336 "kind": item.kind(),
337 "display_kind": item.display_kind(),
338 "index": index + 1,
339 "item_count": item_count,
340 });
341 if let Some(err) = err {
342 if let Value::Object(map) = &mut value {
343 map.insert("error_code".to_string(), json!(err.error_code));
344 map.insert("error".to_string(), json!(err.message.as_str()));
345 map.insert("retryable".to_string(), json!(err.retryable));
346 }
347 }
348 value
349}
350
351fn audit_push(root: &Path, kind: &str, item: &PushItem, err: Option<&AppError>) -> Result<()> {
352 let mut targets = vec![json!({"kind": "push", "id": item.push_id.as_str()})];
353 if let Some(outbound) = item.outbound() {
354 targets.push(json!({"kind": "case", "id": outbound.case_uid.as_str()}));
355 } else {
356 targets.extend(
357 item.message_ids()
358 .iter()
359 .map(|message_id| json!({"kind": "message", "id": message_id})),
360 );
361 }
362 let mut fields = json!({
363 "push_id": item.push_id.as_str(),
364 "push_kind": item.display_kind(),
365 "succeeded_step_count": item.succeeded_step_count(),
366 "attempt_count": item.attempt_count,
367 });
368 if let Some(outbound) = item.outbound() {
369 if let Value::Object(map) = &mut fields {
370 map.insert("case_uid".to_string(), json!(outbound.case_uid.as_str()));
371 map.insert(
372 "draft_name".to_string(),
373 json!(outbound.draft_name.as_str()),
374 );
375 map.insert("action".to_string(), json!(outbound.action.as_str()));
376 }
377 }
378 if let Some(err) = err {
379 if let Value::Object(map) = &mut fields {
380 map.insert("error_code".to_string(), json!(err.error_code));
381 map.insert("error".to_string(), json!(err.message.as_str()));
382 map.insert("retryable".to_string(), json!(err.retryable));
383 }
384 }
385 crate::store::Workspace::at(root).append_audit_event(kind, targets, None, fields)
386}
387
388pub fn remove_outbound_for_draft(
389 root: &Path,
390 case_uid: &str,
391 draft_name: &str,
392) -> Result<Vec<RemovedOutbound>> {
393 let items = read_items(root)?
394 .into_iter()
395 .filter(|item| {
396 item.outbound().is_some_and(|outbound| {
397 outbound.case_uid == case_uid && outbound.draft_name == draft_name
398 })
399 })
400 .collect::<Vec<_>>();
401 let mut removed = Vec::new();
402 for item in items {
403 removed.push(RemovedOutbound {
404 push_id: item.push_id.clone(),
405 action: item
406 .outbound()
407 .map(|outbound| outbound.action)
408 .unwrap_or(OutboundAction::Send),
409 });
410 delete_item(root, &item)?;
411 }
412 Ok(removed)
413}
414
415pub fn remove_pending_message_pushes(
416 root: &Path,
417 message_id: &str,
418 kind: &str,
419) -> Result<Vec<RemovedMessagePush>> {
420 let action = MessagePushAction::from_kind(kind).ok_or_else(|| {
421 AppError::new(
422 "push_item_invalid",
423 format!("unsupported message push action kind: {kind}"),
424 )
425 })?;
426 let items = read_items(root)?
427 .into_iter()
428 .filter(|item| {
429 item.message_action().is_some_and(|payload| {
430 payload.action == action
431 && (payload.message_ids.iter().any(|id| id == message_id)
432 || payload
433 .locations
434 .iter()
435 .any(|loc| loc.message_id == message_id))
436 })
437 })
438 .collect::<Vec<_>>();
439 if let Some(item) = items.iter().find(|item| item.has_started_steps()) {
440 return Err(AppError::new(
441 "push_already_started",
442 format!(
443 "push item already started and cannot be undone locally: {}",
444 item.push_id
445 ),
446 ));
447 }
448
449 let mut removed = Vec::new();
450 for mut item in items {
451 let push_id = item.push_id.clone();
452 if let Some(payload) = item.message_action_mut() {
453 payload.message_ids.retain(|id| id != message_id);
454 payload.locations.retain(|loc| loc.message_id != message_id);
455 }
456 let empty = item
457 .message_action()
458 .is_some_and(|payload| payload.message_ids.is_empty() && payload.locations.is_empty());
459 if empty {
460 delete_item(root, &item)?;
461 } else {
462 item.updated_rfc3339 = crate::store::now_rfc3339();
463 write_item(root, &item)?;
464 }
465 removed.push(RemovedMessagePush { push_id });
466 }
467 Ok(removed)
468}
469
470fn preview_hint() -> &'static str {
471 "No remote changes were made. Re-run with --confirm to apply queued effects."
472}