Skip to main content

mdql_core/
txn.rs

1//! Process-level ACID primitives for MDQL.
2//!
3//! Three layers:
4//! 1. `atomic_write` — crash-safe single-file write via temp+rename
5//! 2. `table_lock` — exclusive per-table lock via flock
6//! 3. `multi_file_txn` — write-ahead journal for multi-file operations
7
8use std::fs;
9use std::io::Write;
10use std::path::Path;
11
12use fs2::FileExt;
13
14use crate::errors::MdqlError;
15
16pub const LOCK_FILENAME: &str = ".mdql_lock";
17pub const JOURNAL_FILENAME: &str = ".mdql_journal";
18pub const TMP_SUFFIX: &str = ".mdql_tmp";
19
20// ── Atomic single-file write ─────────────────────────────────────────────
21
22/// Write content to path atomically via temp file + rename.
23pub fn atomic_write(path: &Path, content: &str) -> crate::errors::Result<()> {
24    let parent = path.parent().unwrap_or(Path::new("."));
25    let mut tmp = tempfile::NamedTempFile::new_in(parent)?;
26    tmp.write_all(content.as_bytes())?;
27    tmp.as_file().sync_all()?;
28    tmp.persist(path).map_err(|e| {
29        MdqlError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
30    })?;
31    Ok(())
32}
33
34// ── Table-level lock ──────────────────────────────────────────────────────
35
36/// RAII guard for an exclusive table lock.
37pub struct TableLock {
38    _file: fs::File,
39}
40
41impl TableLock {
42    pub fn acquire(table_dir: &Path) -> crate::errors::Result<Self> {
43        let lock_path = table_dir.join(LOCK_FILENAME);
44        let file = fs::OpenOptions::new()
45            .create(true)
46            .read(true)
47            .write(true)
48            .truncate(false)
49            .open(&lock_path)?;
50        file.lock_exclusive()?;
51        Ok(TableLock { _file: file })
52    }
53}
54
55impl Drop for TableLock {
56    fn drop(&mut self) {
57        let _ = self._file.unlock();
58    }
59}
60
61// ── Write-ahead journal ──────────────────────────────────────────────────
62
63#[derive(serde::Serialize, serde::Deserialize, Debug)]
64pub struct JournalEntry {
65    pub action: String,  // "modify", "create", "delete"
66    pub path: String,
67    pub backup: Option<String>,
68}
69
70#[derive(serde::Serialize, serde::Deserialize, Debug)]
71pub struct Journal {
72    pub version: u32,
73    pub operation: String,
74    pub started_at: String,
75    pub entries: Vec<JournalEntry>,
76}
77
78pub struct TableTransaction {
79    _table_dir: std::path::PathBuf,
80    journal_path: std::path::PathBuf,
81    journal: Journal,
82}
83
84impl TableTransaction {
85    pub fn new(table_dir: &Path, operation: &str) -> crate::errors::Result<Self> {
86        let journal_path = table_dir.join(JOURNAL_FILENAME);
87        let journal = Journal {
88            version: 1,
89            operation: operation.to_string(),
90            started_at: chrono::Utc::now().to_rfc3339(),
91            entries: Vec::new(),
92        };
93
94        let t = TableTransaction {
95            _table_dir: table_dir.to_path_buf(),
96            journal_path,
97            journal,
98        };
99        t.flush()?;
100        Ok(t)
101    }
102
103    pub fn backup(&mut self, path: &Path) -> crate::errors::Result<()> {
104        let content = fs::read_to_string(path)?;
105        self.journal.entries.push(JournalEntry {
106            action: "modify".to_string(),
107            path: path.to_string_lossy().to_string(),
108            backup: Some(content),
109        });
110        self.flush()
111    }
112
113    pub fn record_create(&mut self, path: &Path) -> crate::errors::Result<()> {
114        self.journal.entries.push(JournalEntry {
115            action: "create".to_string(),
116            path: path.to_string_lossy().to_string(),
117            backup: None,
118        });
119        self.flush()
120    }
121
122    pub fn record_delete(&mut self, path: &Path, content: &str) -> crate::errors::Result<()> {
123        self.journal.entries.push(JournalEntry {
124            action: "delete".to_string(),
125            path: path.to_string_lossy().to_string(),
126            backup: Some(content.to_string()),
127        });
128        self.flush()
129    }
130
131    pub fn rollback(&self) -> crate::errors::Result<()> {
132        for entry in self.journal.entries.iter().rev() {
133            let path = Path::new(&entry.path);
134            match entry.action.as_str() {
135                "modify" => {
136                    if let Some(ref backup) = entry.backup {
137                        let _ = atomic_write(path, backup);
138                    }
139                }
140                "create" => {
141                    if path.exists() {
142                        let _ = fs::remove_file(path);
143                    }
144                }
145                "delete" => {
146                    if let Some(ref backup) = entry.backup {
147                        let _ = atomic_write(path, backup);
148                    }
149                }
150                _ => {}
151            }
152        }
153        Ok(())
154    }
155
156    pub fn commit(&self) -> crate::errors::Result<()> {
157        let _ = fs::remove_file(&self.journal_path);
158        Ok(())
159    }
160
161    fn flush(&self) -> crate::errors::Result<()> {
162        let content = serde_json::to_string(&self.journal)
163            .map_err(|e| MdqlError::General(e.to_string()))?;
164        atomic_write(&self.journal_path, &content)
165    }
166}
167
168/// Context manager equivalent — runs a closure within a transaction.
169/// On success, commits. On error, rolls back and re-raises.
170pub fn with_multi_file_txn<F>(
171    table_dir: &Path,
172    operation: &str,
173    f: F,
174) -> crate::errors::Result<()>
175where
176    F: FnOnce(&mut TableTransaction) -> crate::errors::Result<()>,
177{
178    let mut txn = TableTransaction::new(table_dir, operation)?;
179    match f(&mut txn) {
180        Ok(()) => {
181            txn.commit()?;
182            Ok(())
183        }
184        Err(e) => {
185            let _ = txn.rollback();
186            let _ = txn.commit(); // Clean up journal after rollback
187            Err(e)
188        }
189    }
190}
191
192/// If a journal exists from a crashed transaction, roll back.
193/// Returns true if recovery was performed.
194pub fn recover_journal(table_dir: &Path) -> crate::errors::Result<bool> {
195    let journal_path = table_dir.join(JOURNAL_FILENAME);
196    if !journal_path.exists() {
197        cleanup_tmp_files(table_dir);
198        return Ok(false);
199    }
200
201    let text = match fs::read_to_string(&journal_path) {
202        Ok(t) => t,
203        Err(e) => {
204            let corrupt_path = journal_path.with_extension("corrupt");
205            let _ = fs::rename(&journal_path, &corrupt_path);
206            return Err(MdqlError::JournalRecovery(format!(
207                "Corrupt journal in {}, renamed to {}: {}",
208                table_dir.display(),
209                corrupt_path.file_name().unwrap_or_default().to_string_lossy(),
210                e
211            )));
212        }
213    };
214
215    let journal: Journal = match serde_json::from_str(&text) {
216        Ok(j) => j,
217        Err(e) => {
218            let corrupt_path = journal_path.with_extension("corrupt");
219            let _ = fs::rename(&journal_path, &corrupt_path);
220            return Err(MdqlError::JournalRecovery(format!(
221                "Corrupt journal in {}, renamed to {}: {}",
222                table_dir.display(),
223                corrupt_path.file_name().unwrap_or_default().to_string_lossy(),
224                e
225            )));
226        }
227    };
228
229    // Roll back in reverse
230    for entry in journal.entries.iter().rev() {
231        let path = Path::new(&entry.path);
232        match entry.action.as_str() {
233            "modify" => {
234                if let Some(ref backup) = entry.backup {
235                    let _ = atomic_write(path, backup);
236                }
237            }
238            "create" => {
239                if path.exists() {
240                    let _ = fs::remove_file(path);
241                }
242            }
243            "delete" => {
244                if let Some(ref backup) = entry.backup {
245                    let _ = atomic_write(path, backup);
246                }
247            }
248            _ => {}
249        }
250    }
251
252    let _ = fs::remove_file(&journal_path);
253    cleanup_tmp_files(table_dir);
254    Ok(true)
255}
256
257fn cleanup_tmp_files(table_dir: &Path) {
258    if let Ok(entries) = fs::read_dir(table_dir) {
259        for entry in entries.flatten() {
260            let name = entry.file_name();
261            if name.to_string_lossy().ends_with(TMP_SUFFIX) {
262                let _ = fs::remove_file(entry.path());
263            }
264        }
265    }
266}