agent_first_mail/store/
transactions.rs1use super::*;
2
3#[derive(Clone, Debug, Deserialize, Serialize)]
4#[serde(deny_unknown_fields)]
5pub(super) struct TransactionFile {
6 pub(super) schema_name: String,
7 pub(super) schema_version: u64,
8 pub(super) transaction_id: String,
9 pub(super) kind: String,
10 pub(super) created_rfc3339: String,
11 #[serde(default, skip_serializing_if = "Vec::is_empty")]
12 pub(super) paths: Vec<String>,
13}
14
15#[derive(Debug)]
16pub(crate) struct LocalTransaction {
17 path: PathBuf,
18}
19
20impl Workspace {
21 pub(crate) fn begin_transaction(
22 &self,
23 kind: &str,
24 paths: impl IntoIterator<Item = String>,
25 ) -> Result<LocalTransaction> {
26 validate_id("transaction kind", kind)?;
27 let transactions_dir = self.root.join(".afmail/transactions");
28 create_dir_all(&transactions_dir)?;
29 let transaction_id = unique_transaction_id(kind);
30 let path = transactions_dir.join(format!("{transaction_id}.json"));
31 let transaction = TransactionFile {
32 schema_name: "local_transaction".to_string(),
33 schema_version: 1,
34 transaction_id,
35 kind: kind.to_string(),
36 created_rfc3339: now_rfc3339(),
37 paths: paths.into_iter().collect(),
38 };
39 write_json_pretty(&path, &transaction)?;
40 Ok(LocalTransaction { path })
41 }
42
43 pub fn ensure_no_incomplete_transactions(&self) -> Result<()> {
44 let transactions = incomplete_transaction_paths(&self.root)?;
45 if transactions.is_empty() {
46 return Ok(());
47 }
48 Err(AppError::new(
49 "transaction_incomplete",
50 format!(
51 "incomplete afmail transaction(s) detected: {}; run `afmail doctor` for details",
52 transactions.join(", ")
53 ),
54 )
55 .with_hint("Run `afmail doctor` to inspect incomplete transactions; use `afmail doctor repair --confirm` only after reviewing the issue.")
56 .with_details(json!({
57 "transaction_paths": transactions,
58 "suggested_commands": [
59 "afmail doctor",
60 "afmail doctor repair --confirm"
61 ]
62 })))
63 }
64
65 pub(super) fn incomplete_transactions(&self) -> Result<Vec<TransactionFile>> {
66 let dir = self.root.join(".afmail/transactions");
67 if !dir.exists() {
68 return Ok(Vec::new());
69 }
70 let mut out = Vec::new();
71 for entry in read_dir(&dir, "read transactions")? {
72 let path = entry.path();
73 if path.extension().and_then(|s| s.to_str()) != Some("json") {
74 continue;
75 }
76 let data = read_to_string(&path, "read transaction")?;
77 let transaction: TransactionFile =
78 serde_json::from_str(&data).map_err(|e| AppError::json("parse transaction", &e))?;
79 if transaction.schema_name != "local_transaction" || transaction.schema_version != 1 {
80 return Err(AppError::new(
81 "transaction_invalid",
82 format!(
83 "invalid local transaction schema: {}",
84 rel_path(&self.root, &path)
85 ),
86 ));
87 }
88 out.push(transaction);
89 }
90 out.sort_by(|a, b| a.transaction_id.cmp(&b.transaction_id));
91 Ok(out)
92 }
93}
94
95impl LocalTransaction {
96 pub(crate) fn commit(self) -> Result<()> {
97 if self.path.exists() {
98 remove_file(&self.path)?;
99 }
100 Ok(())
101 }
102}
103
104fn incomplete_transaction_paths(root: &Path) -> Result<Vec<String>> {
105 let dir = root.join(".afmail/transactions");
106 if !dir.exists() {
107 return Ok(Vec::new());
108 }
109 let mut out = Vec::new();
110 for entry in read_dir(&dir, "read transactions")? {
111 let path = entry.path();
112 if path.extension().and_then(|s| s.to_str()) == Some("json") {
113 out.push(rel_path(root, &path));
114 }
115 }
116 out.sort();
117 Ok(out)
118}
119
120fn unique_transaction_id(kind: &str) -> String {
121 let safe_kind = kind.replace('.', "_");
122 format!(
123 "transaction_{}_{}",
124 now_rfc3339().replace([':', '-'], ""),
125 safe_kind
126 )
127}