1use std::fs;
9use std::io::Write;
10use std::path::Path;
11
12use fs2::FileExt;
13
14use crate::errors::MdqlError;
15
16pub const LOCK_FILENAME: &str = ".mdql_lock";
17pub const JOURNAL_FILENAME: &str = ".mdql_journal";
18pub const TMP_SUFFIX: &str = ".mdql_tmp";
19
20pub fn atomic_write(path: &Path, content: &str) -> crate::errors::Result<()> {
24 let parent = path.parent().unwrap_or(Path::new("."));
25 let mut tmp = tempfile::NamedTempFile::new_in(parent)?;
26 tmp.write_all(content.as_bytes())?;
27 tmp.as_file().sync_all()?;
28 tmp.persist(path).map_err(|e| {
29 MdqlError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
30 })?;
31 Ok(())
32}
33
34pub struct TableLock {
38 _file: fs::File,
39}
40
41impl TableLock {
42 pub fn acquire(table_dir: &Path) -> crate::errors::Result<Self> {
43 let lock_path = table_dir.join(LOCK_FILENAME);
44 let file = fs::OpenOptions::new()
45 .create(true)
46 .read(true)
47 .write(true)
48 .truncate(false)
49 .open(&lock_path)?;
50 file.lock_exclusive()?;
51 Ok(TableLock { _file: file })
52 }
53}
54
55impl Drop for TableLock {
56 fn drop(&mut self) {
57 let _ = self._file.unlock();
58 }
59}
60
61#[derive(serde::Serialize, serde::Deserialize, Debug)]
64pub struct JournalEntry {
65 pub action: String, pub path: String,
67 pub backup: Option<String>,
68}
69
70#[derive(serde::Serialize, serde::Deserialize, Debug)]
71pub struct Journal {
72 pub version: u32,
73 pub operation: String,
74 pub started_at: String,
75 pub entries: Vec<JournalEntry>,
76}
77
78pub struct TableTransaction {
79 _table_dir: std::path::PathBuf,
80 journal_path: std::path::PathBuf,
81 journal: Journal,
82}
83
84impl TableTransaction {
85 pub fn new(table_dir: &Path, operation: &str) -> crate::errors::Result<Self> {
86 let journal_path = table_dir.join(JOURNAL_FILENAME);
87 let journal = Journal {
88 version: 1,
89 operation: operation.to_string(),
90 started_at: chrono::Utc::now().to_rfc3339(),
91 entries: Vec::new(),
92 };
93
94 let t = TableTransaction {
95 _table_dir: table_dir.to_path_buf(),
96 journal_path,
97 journal,
98 };
99 t.flush()?;
100 Ok(t)
101 }
102
103 pub fn backup(&mut self, path: &Path) -> crate::errors::Result<()> {
104 let content = fs::read_to_string(path)?;
105 self.journal.entries.push(JournalEntry {
106 action: "modify".to_string(),
107 path: path.to_string_lossy().to_string(),
108 backup: Some(content),
109 });
110 self.flush()
111 }
112
113 pub fn record_create(&mut self, path: &Path) -> crate::errors::Result<()> {
114 self.journal.entries.push(JournalEntry {
115 action: "create".to_string(),
116 path: path.to_string_lossy().to_string(),
117 backup: None,
118 });
119 self.flush()
120 }
121
122 pub fn record_delete(&mut self, path: &Path, content: &str) -> crate::errors::Result<()> {
123 self.journal.entries.push(JournalEntry {
124 action: "delete".to_string(),
125 path: path.to_string_lossy().to_string(),
126 backup: Some(content.to_string()),
127 });
128 self.flush()
129 }
130
131 pub fn rollback(&self) -> crate::errors::Result<()> {
132 for entry in self.journal.entries.iter().rev() {
133 let path = Path::new(&entry.path);
134 match entry.action.as_str() {
135 "modify" => {
136 if let Some(ref backup) = entry.backup {
137 let _ = atomic_write(path, backup);
138 }
139 }
140 "create" => {
141 if path.exists() {
142 let _ = fs::remove_file(path);
143 }
144 }
145 "delete" => {
146 if let Some(ref backup) = entry.backup {
147 let _ = atomic_write(path, backup);
148 }
149 }
150 _ => {}
151 }
152 }
153 Ok(())
154 }
155
156 pub fn commit(&self) -> crate::errors::Result<()> {
157 let _ = fs::remove_file(&self.journal_path);
158 Ok(())
159 }
160
161 fn flush(&self) -> crate::errors::Result<()> {
162 let content = serde_json::to_string(&self.journal)
163 .map_err(|e| MdqlError::General(e.to_string()))?;
164 atomic_write(&self.journal_path, &content)
165 }
166}
167
168pub fn with_multi_file_txn<F>(
171 table_dir: &Path,
172 operation: &str,
173 f: F,
174) -> crate::errors::Result<()>
175where
176 F: FnOnce(&mut TableTransaction) -> crate::errors::Result<()>,
177{
178 let mut txn = TableTransaction::new(table_dir, operation)?;
179 match f(&mut txn) {
180 Ok(()) => {
181 txn.commit()?;
182 Ok(())
183 }
184 Err(e) => {
185 let _ = txn.rollback();
186 let _ = txn.commit(); Err(e)
188 }
189 }
190}
191
192pub fn recover_journal(table_dir: &Path) -> crate::errors::Result<bool> {
195 let journal_path = table_dir.join(JOURNAL_FILENAME);
196 if !journal_path.exists() {
197 cleanup_tmp_files(table_dir);
198 return Ok(false);
199 }
200
201 let text = match fs::read_to_string(&journal_path) {
202 Ok(t) => t,
203 Err(e) => {
204 let corrupt_path = journal_path.with_extension("corrupt");
205 let _ = fs::rename(&journal_path, &corrupt_path);
206 return Err(MdqlError::JournalRecovery(format!(
207 "Corrupt journal in {}, renamed to {}: {}",
208 table_dir.display(),
209 corrupt_path.file_name().unwrap_or_default().to_string_lossy(),
210 e
211 )));
212 }
213 };
214
215 let journal: Journal = match serde_json::from_str(&text) {
216 Ok(j) => j,
217 Err(e) => {
218 let corrupt_path = journal_path.with_extension("corrupt");
219 let _ = fs::rename(&journal_path, &corrupt_path);
220 return Err(MdqlError::JournalRecovery(format!(
221 "Corrupt journal in {}, renamed to {}: {}",
222 table_dir.display(),
223 corrupt_path.file_name().unwrap_or_default().to_string_lossy(),
224 e
225 )));
226 }
227 };
228
229 for entry in journal.entries.iter().rev() {
231 let path = Path::new(&entry.path);
232 match entry.action.as_str() {
233 "modify" => {
234 if let Some(ref backup) = entry.backup {
235 let _ = atomic_write(path, backup);
236 }
237 }
238 "create" => {
239 if path.exists() {
240 let _ = fs::remove_file(path);
241 }
242 }
243 "delete" => {
244 if let Some(ref backup) = entry.backup {
245 let _ = atomic_write(path, backup);
246 }
247 }
248 _ => {}
249 }
250 }
251
252 let _ = fs::remove_file(&journal_path);
253 cleanup_tmp_files(table_dir);
254 Ok(true)
255}
256
257fn cleanup_tmp_files(table_dir: &Path) {
258 if let Ok(entries) = fs::read_dir(table_dir) {
259 for entry in entries.flatten() {
260 let name = entry.file_name();
261 if name.to_string_lossy().ends_with(TMP_SUFFIX) {
262 let _ = fs::remove_file(entry.path());
263 }
264 }
265 }
266}