agent_first_mail/store/
transactions.rs1use super::*;
2
3#[derive(Clone, Debug, Deserialize, Serialize)]
4#[serde(deny_unknown_fields)]
5pub(super) struct TransactionFile {
6 pub(super) schema_name: String,
7 pub(super) schema_version: u64,
8 pub(super) transaction_id: String,
9 pub(super) kind: String,
10 pub(super) created_rfc3339: String,
11 #[serde(default, skip_serializing_if = "Vec::is_empty")]
12 pub(super) paths: Vec<String>,
13}
14
15#[derive(Debug)]
16pub(crate) struct LocalTransaction {
17 path: PathBuf,
18 finished: bool,
19}
20
21impl Workspace {
22 pub(crate) fn begin_transaction(
23 &self,
24 kind: &str,
25 paths: impl IntoIterator<Item = String>,
26 ) -> Result<LocalTransaction> {
27 validate_id("transaction kind", kind)?;
28 let transactions_dir = self.root.join(".afmail/transactions");
29 create_dir_all(&transactions_dir)?;
30 let transaction_id = unique_transaction_id(kind);
31 let path = transactions_dir.join(format!("{transaction_id}.json"));
32 let transaction = TransactionFile {
33 schema_name: "local_transaction".to_string(),
34 schema_version: 1,
35 transaction_id,
36 kind: kind.to_string(),
37 created_rfc3339: now_rfc3339(),
38 paths: paths.into_iter().collect(),
39 };
40 write_json_pretty(&path, &transaction)?;
41 Ok(LocalTransaction {
42 path,
43 finished: false,
44 })
45 }
46
47 pub fn ensure_no_incomplete_transactions(&self) -> Result<()> {
48 let transactions = incomplete_transaction_paths(&self.root)?;
49 if transactions.is_empty() {
50 return Ok(());
51 }
52 Err(AppError::new(
53 "transaction_incomplete",
54 format!(
55 "incomplete afmail transaction(s) detected: {}; run `afmail doctor` for details",
56 transactions.join(", ")
57 ),
58 )
59 .with_hint("Run `afmail doctor` to inspect incomplete transactions; use `afmail doctor repair --confirm` only after reviewing the issue.")
60 .with_details(json!({
61 "transaction_paths": transactions,
62 "suggested_commands": [
63 "afmail doctor",
64 "afmail doctor repair --confirm"
65 ]
66 })))
67 }
68
69 pub(super) fn incomplete_transactions(&self) -> Result<Vec<TransactionFile>> {
70 let dir = self.root.join(".afmail/transactions");
71 if !dir.exists() {
72 return Ok(Vec::new());
73 }
74 let mut out = Vec::new();
75 for entry in read_dir(&dir, "read transactions")? {
76 let path = entry.path();
77 if path.extension().and_then(|s| s.to_str()) != Some("json") {
78 continue;
79 }
80 let data = read_to_string(&path, "read transaction")?;
81 let transaction: TransactionFile =
82 serde_json::from_str(&data).map_err(|e| AppError::json("parse transaction", &e))?;
83 if transaction.schema_name != "local_transaction" || transaction.schema_version != 1 {
84 return Err(AppError::new(
85 "transaction_invalid",
86 format!(
87 "invalid local transaction schema: {}",
88 rel_path(&self.root, &path)
89 ),
90 ));
91 }
92 out.push(transaction);
93 }
94 out.sort_by(|a, b| a.transaction_id.cmp(&b.transaction_id));
95 Ok(out)
96 }
97}
98
99impl LocalTransaction {
100 pub(crate) fn commit(mut self) -> Result<()> {
101 if self.path.exists() {
102 remove_file(&self.path)?;
103 }
104 self.finished = true;
105 Ok(())
106 }
107}
108
109impl Drop for LocalTransaction {
110 fn drop(&mut self) {
111 if !self.finished && !std::thread::panicking() {
112 let _ = fs::remove_file(&self.path);
113 }
114 }
115}
116
117fn incomplete_transaction_paths(root: &Path) -> Result<Vec<String>> {
118 let dir = root.join(".afmail/transactions");
119 if !dir.exists() {
120 return Ok(Vec::new());
121 }
122 let mut out = Vec::new();
123 for entry in read_dir(&dir, "read transactions")? {
124 let path = entry.path();
125 if path.extension().and_then(|s| s.to_str()) == Some("json") {
126 out.push(rel_path(root, &path));
127 }
128 }
129 out.sort();
130 Ok(out)
131}
132
133fn unique_transaction_id(kind: &str) -> String {
134 let safe_kind = kind.replace('.', "_");
135 format!(
136 "transaction_{}_{}",
137 now_rfc3339().replace([':', '-'], ""),
138 safe_kind
139 )
140}