Skip to main content

brainwires_tool_system/
transaction.rs

1//! Two-phase commit transaction manager for file write operations.
2//!
3//! [`TransactionManager`] implements [`StagingBackend`] from `brainwires-core`.
4//!
5//! ## Protocol
6//!
7//! 1. **Stage** — calls to [`TransactionManager::stage`] write content to a
8//!    temporary directory with a key-addressed filename.  The target path is
9//!    **not** touched.
10//!
11//! 2. **Commit** — [`TransactionManager::commit`] atomically renames each staged
12//!    file to its target path.  On cross-filesystem moves a copy+delete fallback
13//!    is used.  Parent directories are created as needed.
14//!
15//! 3. **Rollback** — [`TransactionManager::rollback`] deletes all staged files
16//!    from the temp dir without touching any target path.
17//!
18//! A `TransactionManager` is single-use per transaction: after `commit()` or
19//! `rollback()` the queue is empty and new stages can be accepted.
20//!
21//! ## Usage
22//!
23//! ```rust,ignore
24//! use std::sync::Arc;
25//! use brainwires_tool_system::transaction::TransactionManager;
26//! use brainwires_core::{StagedWrite, ToolContext};
27//!
28//! let mgr = Arc::new(TransactionManager::new().unwrap());
29//! let ctx = ToolContext::default().with_staging_backend(mgr.clone());
30//!
31//! // … execute file tools against `ctx` — writes are staged, not applied …
32//!
33//! mgr.commit().unwrap(); // atomically write all staged files
34//! // or mgr.rollback();  // discard everything
35//! ```
36
37use std::collections::HashMap;
38use std::fs;
39use std::path::PathBuf;
40use std::sync::{Arc, Mutex};
41
42use anyhow::{Context, Result};
43
44use brainwires_core::{CommitResult, StagedWrite, StagingBackend};
45
46// ── Internal types ────────────────────────────────────────────────────────────
47
48/// One pending staged write entry.
49#[derive(Debug)]
50struct StagedEntry {
51    staged_path: PathBuf,
52    target_path: PathBuf,
53    content: String,
54}
55
56#[derive(Debug)]
57struct Inner {
58    staging_dir: PathBuf,
59    /// key → entry
60    staged: HashMap<String, StagedEntry>,
61}
62
63// ── TransactionManager ────────────────────────────────────────────────────────
64
65/// Filesystem-backed two-phase commit transaction manager.
66///
67/// Wrap in [`Arc`] and attach to [`ToolContext`][brainwires_core::ToolContext]
68/// via [`with_staging_backend`][brainwires_core::ToolContext::with_staging_backend].
69#[derive(Debug, Clone)]
70pub struct TransactionManager {
71    inner: Arc<Mutex<Inner>>,
72}
73
74impl TransactionManager {
75    /// Create a new manager using the system temp directory.
76    ///
77    /// The staging directory is `<tmpdir>/brainwires-txn-<uuid>` and is created
78    /// on construction.
79    pub fn new() -> Result<Self> {
80        let staging_dir =
81            std::env::temp_dir().join(format!("brainwires-txn-{}", uuid::Uuid::new_v4()));
82        Self::with_dir(staging_dir)
83    }
84
85    /// Create a new manager using a specific staging directory.
86    ///
87    /// The directory is created if it does not already exist.
88    pub fn with_dir(staging_dir: PathBuf) -> Result<Self> {
89        fs::create_dir_all(&staging_dir)
90            .with_context(|| format!("Failed to create staging dir: {}", staging_dir.display()))?;
91        Ok(Self {
92            inner: Arc::new(Mutex::new(Inner {
93                staging_dir,
94                staged: HashMap::new(),
95            })),
96        })
97    }
98}
99
100impl StagingBackend for TransactionManager {
101    fn stage(&self, write: StagedWrite) -> bool {
102        let mut inner = self.inner.lock().expect("transaction log lock poisoned");
103
104        // Idempotent: same key staged twice is a no-op (first write wins)
105        if inner.staged.contains_key(&write.key) {
106            return false;
107        }
108
109        // Write content to the staging directory under a key-addressed name
110        let safe_name = format!("{}.staged", write.key);
111        let staged_path = inner.staging_dir.join(&safe_name);
112
113        if let Err(e) = fs::write(&staged_path, &write.content) {
114            tracing::error!(
115                key = %write.key,
116                path = %staged_path.display(),
117                error = %e,
118                "TransactionManager: failed to stage write"
119            );
120            return false;
121        }
122
123        tracing::debug!(
124            key = %write.key,
125            target = %write.target_path.display(),
126            "TransactionManager: staged write"
127        );
128
129        inner.staged.insert(
130            write.key,
131            StagedEntry {
132                staged_path,
133                target_path: write.target_path,
134                content: write.content,
135            },
136        );
137
138        true
139    }
140
141    fn commit(&self) -> Result<CommitResult> {
142        let mut inner = self.inner.lock().expect("transaction log lock poisoned");
143        let mut committed = 0;
144        let mut paths = Vec::new();
145
146        for entry in inner.staged.values() {
147            // Ensure target parent directory exists
148            if let Some(parent) = entry.target_path.parent() {
149                fs::create_dir_all(parent).with_context(|| {
150                    format!("Failed to create parent dir: {}", parent.display())
151                })?;
152            }
153
154            // Attempt atomic rename (same filesystem); fall back to copy+delete
155            if fs::rename(&entry.staged_path, &entry.target_path).is_err() {
156                fs::write(&entry.target_path, &entry.content).with_context(|| {
157                    format!(
158                        "Failed to commit staged write to {}",
159                        entry.target_path.display()
160                    )
161                })?;
162                let _ = fs::remove_file(&entry.staged_path);
163            }
164
165            tracing::debug!(target = %entry.target_path.display(), "TransactionManager: committed");
166            committed += 1;
167            paths.push(entry.target_path.clone());
168        }
169
170        inner.staged.clear();
171        Ok(CommitResult { committed, paths })
172    }
173
174    fn rollback(&self) {
175        let mut inner = self.inner.lock().expect("transaction log lock poisoned");
176        for entry in inner.staged.values() {
177            let _ = fs::remove_file(&entry.staged_path);
178        }
179        inner.staged.clear();
180        tracing::debug!("TransactionManager: rolled back");
181    }
182
183    fn pending_count(&self) -> usize {
184        self.inner
185            .lock()
186            .expect("transaction log lock poisoned")
187            .staged
188            .len()
189    }
190}
191
192impl Drop for TransactionManager {
193    fn drop(&mut self) {
194        // If we hold the last Arc reference, auto-rollback any remaining staged files
195        if Arc::strong_count(&self.inner) == 1 {
196            self.rollback();
197            // Best-effort: remove the (now-empty) staging directory
198            let inner = self.inner.lock().expect("transaction log lock poisoned");
199            let _ = fs::remove_dir(&inner.staging_dir);
200        }
201    }
202}
203
204// ── Tests ─────────────────────────────────────────────────────────────────────
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use brainwires_core::{StagedWrite, StagingBackend};
210    use std::path::Path;
211    use tempfile::TempDir;
212
213    fn make_write(key: &str, path: &Path, content: &str) -> StagedWrite {
214        StagedWrite {
215            key: key.to_string(),
216            target_path: path.to_path_buf(),
217            content: content.to_string(),
218        }
219    }
220
221    #[test]
222    fn test_stage_and_commit() {
223        let temp = TempDir::new().unwrap();
224        let target = temp.path().join("output.txt");
225        let mgr = TransactionManager::new().unwrap();
226
227        let staged = mgr.stage(make_write("k1", &target, "hello world"));
228        assert!(staged);
229        assert_eq!(mgr.pending_count(), 1);
230        assert!(!target.exists(), "Target must not exist before commit");
231
232        let result = mgr.commit().unwrap();
233        assert_eq!(result.committed, 1);
234        assert!(target.exists());
235        assert_eq!(fs::read_to_string(&target).unwrap(), "hello world");
236        assert_eq!(mgr.pending_count(), 0);
237    }
238
239    #[test]
240    fn test_rollback_discards_staged_writes() {
241        let temp = TempDir::new().unwrap();
242        let target = temp.path().join("discard.txt");
243        let mgr = TransactionManager::new().unwrap();
244
245        mgr.stage(make_write("k1", &target, "data"));
246        assert_eq!(mgr.pending_count(), 1);
247
248        mgr.rollback();
249        assert_eq!(mgr.pending_count(), 0);
250        assert!(!target.exists(), "Target must not exist after rollback");
251    }
252
253    #[test]
254    fn test_duplicate_key_is_idempotent() {
255        let temp = TempDir::new().unwrap();
256        let target = temp.path().join("idem.txt");
257        let mgr = TransactionManager::new().unwrap();
258
259        let first = mgr.stage(make_write("same-key", &target, "v1"));
260        assert!(first);
261        let second = mgr.stage(make_write("same-key", &target, "v2"));
262        assert!(!second, "Same key must not be staged twice");
263        assert_eq!(mgr.pending_count(), 1);
264
265        mgr.commit().unwrap();
266        // Only the first content should have been committed
267        assert_eq!(fs::read_to_string(&target).unwrap(), "v1");
268    }
269
270    #[test]
271    fn test_commit_multiple_files() {
272        let temp = TempDir::new().unwrap();
273        let mgr = TransactionManager::new().unwrap();
274
275        let f1 = temp.path().join("a.txt");
276        let f2 = temp.path().join("b.txt");
277        mgr.stage(make_write("k-a", &f1, "alpha"));
278        mgr.stage(make_write("k-b", &f2, "beta"));
279        assert_eq!(mgr.pending_count(), 2);
280
281        let result = mgr.commit().unwrap();
282        assert_eq!(result.committed, 2);
283        assert_eq!(fs::read_to_string(&f1).unwrap(), "alpha");
284        assert_eq!(fs::read_to_string(&f2).unwrap(), "beta");
285    }
286
287    #[test]
288    fn test_empty_commit_succeeds() {
289        let mgr = TransactionManager::new().unwrap();
290        let result = mgr.commit().unwrap();
291        assert_eq!(result.committed, 0);
292        assert!(result.paths.is_empty());
293    }
294
295    #[test]
296    fn test_commit_creates_parent_directories() {
297        let temp = TempDir::new().unwrap();
298        let nested = temp.path().join("nested/deep/file.txt");
299        let mgr = TransactionManager::new().unwrap();
300
301        mgr.stage(make_write("k-nested", &nested, "content"));
302        mgr.commit().unwrap();
303
304        assert!(nested.exists());
305        assert_eq!(fs::read_to_string(&nested).unwrap(), "content");
306    }
307
308    #[test]
309    fn test_commit_clears_queue() {
310        let temp = TempDir::new().unwrap();
311        let target = temp.path().join("f.txt");
312        let mgr = TransactionManager::new().unwrap();
313
314        mgr.stage(make_write("k", &target, "x"));
315        mgr.commit().unwrap();
316        assert_eq!(mgr.pending_count(), 0);
317
318        // After commit, new stages are accepted
319        mgr.stage(make_write("k2", &temp.path().join("g.txt"), "y"));
320        assert_eq!(mgr.pending_count(), 1);
321    }
322
323    #[test]
324    fn test_rollback_clears_queue() {
325        let temp = TempDir::new().unwrap();
326        let mgr = TransactionManager::new().unwrap();
327
328        mgr.stage(make_write("k", &temp.path().join("f.txt"), "x"));
329        mgr.rollback();
330        assert_eq!(mgr.pending_count(), 0);
331    }
332}