brainwires_tool_system/
transaction.rs1use std::collections::HashMap;
38use std::fs;
39use std::path::PathBuf;
40use std::sync::{Arc, Mutex};
41
42use anyhow::{Context, Result};
43
44use brainwires_core::{CommitResult, StagedWrite, StagingBackend};
45
46#[derive(Debug)]
50struct StagedEntry {
51 staged_path: PathBuf,
52 target_path: PathBuf,
53 content: String,
54}
55
56#[derive(Debug)]
57struct Inner {
58 staging_dir: PathBuf,
59 staged: HashMap<String, StagedEntry>,
61}
62
63#[derive(Debug, Clone)]
70pub struct TransactionManager {
71 inner: Arc<Mutex<Inner>>,
72}
73
74impl TransactionManager {
75 pub fn new() -> Result<Self> {
80 let staging_dir =
81 std::env::temp_dir().join(format!("brainwires-txn-{}", uuid::Uuid::new_v4()));
82 Self::with_dir(staging_dir)
83 }
84
85 pub fn with_dir(staging_dir: PathBuf) -> Result<Self> {
89 fs::create_dir_all(&staging_dir)
90 .with_context(|| format!("Failed to create staging dir: {}", staging_dir.display()))?;
91 Ok(Self {
92 inner: Arc::new(Mutex::new(Inner {
93 staging_dir,
94 staged: HashMap::new(),
95 })),
96 })
97 }
98}
99
100impl StagingBackend for TransactionManager {
101 fn stage(&self, write: StagedWrite) -> bool {
102 let mut inner = self.inner.lock().expect("transaction log lock poisoned");
103
104 if inner.staged.contains_key(&write.key) {
106 return false;
107 }
108
109 let safe_name = format!("{}.staged", write.key);
111 let staged_path = inner.staging_dir.join(&safe_name);
112
113 if let Err(e) = fs::write(&staged_path, &write.content) {
114 tracing::error!(
115 key = %write.key,
116 path = %staged_path.display(),
117 error = %e,
118 "TransactionManager: failed to stage write"
119 );
120 return false;
121 }
122
123 tracing::debug!(
124 key = %write.key,
125 target = %write.target_path.display(),
126 "TransactionManager: staged write"
127 );
128
129 inner.staged.insert(
130 write.key,
131 StagedEntry {
132 staged_path,
133 target_path: write.target_path,
134 content: write.content,
135 },
136 );
137
138 true
139 }
140
141 fn commit(&self) -> Result<CommitResult> {
142 let mut inner = self.inner.lock().expect("transaction log lock poisoned");
143 let mut committed = 0;
144 let mut paths = Vec::new();
145
146 for entry in inner.staged.values() {
147 if let Some(parent) = entry.target_path.parent() {
149 fs::create_dir_all(parent).with_context(|| {
150 format!("Failed to create parent dir: {}", parent.display())
151 })?;
152 }
153
154 if fs::rename(&entry.staged_path, &entry.target_path).is_err() {
156 fs::write(&entry.target_path, &entry.content).with_context(|| {
157 format!(
158 "Failed to commit staged write to {}",
159 entry.target_path.display()
160 )
161 })?;
162 let _ = fs::remove_file(&entry.staged_path);
163 }
164
165 tracing::debug!(target = %entry.target_path.display(), "TransactionManager: committed");
166 committed += 1;
167 paths.push(entry.target_path.clone());
168 }
169
170 inner.staged.clear();
171 Ok(CommitResult { committed, paths })
172 }
173
174 fn rollback(&self) {
175 let mut inner = self.inner.lock().expect("transaction log lock poisoned");
176 for entry in inner.staged.values() {
177 let _ = fs::remove_file(&entry.staged_path);
178 }
179 inner.staged.clear();
180 tracing::debug!("TransactionManager: rolled back");
181 }
182
183 fn pending_count(&self) -> usize {
184 self.inner
185 .lock()
186 .expect("transaction log lock poisoned")
187 .staged
188 .len()
189 }
190}
191
192impl Drop for TransactionManager {
193 fn drop(&mut self) {
194 if Arc::strong_count(&self.inner) == 1 {
196 self.rollback();
197 let inner = self.inner.lock().expect("transaction log lock poisoned");
199 let _ = fs::remove_dir(&inner.staging_dir);
200 }
201 }
202}
203
204#[cfg(test)]
207mod tests {
208 use super::*;
209 use brainwires_core::{StagedWrite, StagingBackend};
210 use std::path::Path;
211 use tempfile::TempDir;
212
213 fn make_write(key: &str, path: &Path, content: &str) -> StagedWrite {
214 StagedWrite {
215 key: key.to_string(),
216 target_path: path.to_path_buf(),
217 content: content.to_string(),
218 }
219 }
220
221 #[test]
222 fn test_stage_and_commit() {
223 let temp = TempDir::new().unwrap();
224 let target = temp.path().join("output.txt");
225 let mgr = TransactionManager::new().unwrap();
226
227 let staged = mgr.stage(make_write("k1", &target, "hello world"));
228 assert!(staged);
229 assert_eq!(mgr.pending_count(), 1);
230 assert!(!target.exists(), "Target must not exist before commit");
231
232 let result = mgr.commit().unwrap();
233 assert_eq!(result.committed, 1);
234 assert!(target.exists());
235 assert_eq!(fs::read_to_string(&target).unwrap(), "hello world");
236 assert_eq!(mgr.pending_count(), 0);
237 }
238
239 #[test]
240 fn test_rollback_discards_staged_writes() {
241 let temp = TempDir::new().unwrap();
242 let target = temp.path().join("discard.txt");
243 let mgr = TransactionManager::new().unwrap();
244
245 mgr.stage(make_write("k1", &target, "data"));
246 assert_eq!(mgr.pending_count(), 1);
247
248 mgr.rollback();
249 assert_eq!(mgr.pending_count(), 0);
250 assert!(!target.exists(), "Target must not exist after rollback");
251 }
252
253 #[test]
254 fn test_duplicate_key_is_idempotent() {
255 let temp = TempDir::new().unwrap();
256 let target = temp.path().join("idem.txt");
257 let mgr = TransactionManager::new().unwrap();
258
259 let first = mgr.stage(make_write("same-key", &target, "v1"));
260 assert!(first);
261 let second = mgr.stage(make_write("same-key", &target, "v2"));
262 assert!(!second, "Same key must not be staged twice");
263 assert_eq!(mgr.pending_count(), 1);
264
265 mgr.commit().unwrap();
266 assert_eq!(fs::read_to_string(&target).unwrap(), "v1");
268 }
269
270 #[test]
271 fn test_commit_multiple_files() {
272 let temp = TempDir::new().unwrap();
273 let mgr = TransactionManager::new().unwrap();
274
275 let f1 = temp.path().join("a.txt");
276 let f2 = temp.path().join("b.txt");
277 mgr.stage(make_write("k-a", &f1, "alpha"));
278 mgr.stage(make_write("k-b", &f2, "beta"));
279 assert_eq!(mgr.pending_count(), 2);
280
281 let result = mgr.commit().unwrap();
282 assert_eq!(result.committed, 2);
283 assert_eq!(fs::read_to_string(&f1).unwrap(), "alpha");
284 assert_eq!(fs::read_to_string(&f2).unwrap(), "beta");
285 }
286
287 #[test]
288 fn test_empty_commit_succeeds() {
289 let mgr = TransactionManager::new().unwrap();
290 let result = mgr.commit().unwrap();
291 assert_eq!(result.committed, 0);
292 assert!(result.paths.is_empty());
293 }
294
295 #[test]
296 fn test_commit_creates_parent_directories() {
297 let temp = TempDir::new().unwrap();
298 let nested = temp.path().join("nested/deep/file.txt");
299 let mgr = TransactionManager::new().unwrap();
300
301 mgr.stage(make_write("k-nested", &nested, "content"));
302 mgr.commit().unwrap();
303
304 assert!(nested.exists());
305 assert_eq!(fs::read_to_string(&nested).unwrap(), "content");
306 }
307
308 #[test]
309 fn test_commit_clears_queue() {
310 let temp = TempDir::new().unwrap();
311 let target = temp.path().join("f.txt");
312 let mgr = TransactionManager::new().unwrap();
313
314 mgr.stage(make_write("k", &target, "x"));
315 mgr.commit().unwrap();
316 assert_eq!(mgr.pending_count(), 0);
317
318 mgr.stage(make_write("k2", &temp.path().join("g.txt"), "y"));
320 assert_eq!(mgr.pending_count(), 1);
321 }
322
323 #[test]
324 fn test_rollback_clears_queue() {
325 let temp = TempDir::new().unwrap();
326 let mgr = TransactionManager::new().unwrap();
327
328 mgr.stage(make_write("k", &temp.path().join("f.txt"), "x"));
329 mgr.rollback();
330 assert_eq!(mgr.pending_count(), 0);
331 }
332}