Skip to main content

recast_core/
commit.rs

1//! Two-phase atomic commit for an approved [`Plan`].
2//!
3//! Stage phase writes a sibling temp per file (fsync, preserve mode),
4//! commit phase swaps original → backup and temp → original per file.
5//! Any commit-phase failure walks the rename log in reverse to restore
6//! every already-renamed original from its backup; remaining staged
7//! temps are deleted. On success, backups are removed and parent dirs
8//! are fsynced so the rename batch is durable.
9
10use std::collections::HashSet;
11use std::fs::{self, OpenOptions};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use rayon::prelude::*;
16use tracing::{debug, trace};
17
18use crate::error::{Error, IoCtx, Result};
19use crate::plan::{FileChange, Plan};
20
21/// Returned by [`apply_changes`] on success: how many files were
22/// written and how many matches they covered.
23#[derive(Debug, Clone, Copy)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize))]
25pub struct ApplyOutcome {
26    pub files_written: usize,
27    pub total_matches: usize,
28}
29
30struct Staged {
31    target: PathBuf,
32    temp_path: PathBuf,
33}
34
35struct Committed {
36    target: PathBuf,
37    backup_path: PathBuf,
38}
39
40/// Two-phase atomic commit.
41///
42/// Phase A (stage): for every change, write the new content to a sibling
43/// temp file, fsync the temp, copy the original's permissions across, and
44/// fsync the parent directory so the entry is durable. Any failure during
45/// stage deletes every temp created so far; originals are untouched.
46///
47/// Phase B (commit): for every change, rename the original aside to a
48/// sibling `*.recast.bak` and rename the temp into place. Any failure
49/// during commit walks the rename log in reverse to restore originals;
50/// remaining staged temps are deleted. The on-disk tree ends up either
51/// fully rewritten or bit-identical to the pre-image.
52pub fn apply_changes(plan: &Plan) -> Result<ApplyOutcome> {
53    if plan.changes.is_empty() {
54        debug!("apply: no changes; nothing to do");
55        return Ok(ApplyOutcome { files_written: 0, total_matches: plan.total_matches });
56    }
57
58    let nonces = NonceGen::new();
59    debug!(files = plan.changes.len(), "apply: stage phase begin");
60    let staged = stage_all(&plan.changes, &nonces)?;
61    debug!(files = staged.len(), "apply: stage phase complete");
62
63    finalize_apply(plan, &staged, commit_all(&staged, &nonces))
64}
65
66fn finalize_apply(
67    plan: &Plan,
68    staged: &[Staged],
69    result: std::result::Result<Vec<Committed>, CommitFailure>,
70) -> Result<ApplyOutcome> {
71    match result {
72        Ok(committed) => {
73            debug!(files = committed.len(), "apply: commit phase complete");
74            // Backups must outlive the parent-dir fsync; reversing this
75            // order opens a crash window where the rename isn't durable
76            // and the safety net is already gone.
77            best_effort_fsync_parents(&committed);
78            best_effort_cleanup_backups(&committed);
79            Ok(ApplyOutcome {
80                files_written: plan.changes.len(),
81                total_matches: plan.total_matches,
82            })
83        }
84        Err(CommitFailure { committed, remaining_staged, error }) => {
85            debug!(
86                committed = committed.len(),
87                remaining = remaining_staged,
88                "apply: commit failed, rolling back"
89            );
90            rollback_committed(&committed);
91            cleanup_remaining_staged(staged, remaining_staged);
92            Err(error)
93        }
94    }
95}
96
97/// Test-only entry point that lets a closure fire between each per-file
98/// commit so the rollback path can be exercised. Production code goes
99/// through [`apply_changes`], which has no hook in its signature.
100#[cfg(test)]
101fn apply_inner<F>(plan: &Plan, between_commits: F) -> Result<ApplyOutcome>
102where
103    F: Fn(usize) -> Result<()>,
104{
105    if plan.changes.is_empty() {
106        return Ok(ApplyOutcome { files_written: 0, total_matches: plan.total_matches });
107    }
108    let nonces = NonceGen::new();
109    let staged = stage_all(&plan.changes, &nonces)?;
110    finalize_apply(plan, &staged, commit_all_with(&staged, &nonces, between_commits))
111}
112
113fn stage_all(changes: &[FileChange], nonces: &NonceGen) -> Result<Vec<Staged>> {
114    // Per-file stage is dominated by `sync_all` on the temp file, which
115    // the kernel can overlap across workers. Commit phase stays serial
116    // so rollback ordering remains deterministic.
117    let results: Vec<Result<Staged>> = changes.par_iter().map(|c| stage_one(c, nonces)).collect();
118
119    let mut staged: Vec<Staged> = Vec::with_capacity(results.len());
120    let mut first_error: Option<Error> = None;
121    for r in results {
122        match r {
123            Ok(s) => staged.push(s),
124            Err(e) => {
125                if first_error.is_none() {
126                    first_error = Some(e);
127                }
128            }
129        }
130    }
131    if let Some(e) = first_error {
132        for s in &staged {
133            let _ = fs::remove_file(&s.temp_path);
134        }
135        return Err(e);
136    }
137    Ok(staged)
138}
139
140fn stage_one(change: &FileChange, nonces: &NonceGen) -> Result<Staged> {
141    let parent = parent_dir(&change.path)?;
142
143    // Permissions were captured by the planner alongside the file
144    // read; reuse them so the stage hot path doesn't do a second
145    // `fs::metadata` syscall per file.
146    let permissions = change.permissions.clone();
147
148    let temp_name = sibling_temp_name(&change.path, SiblingKind::Temp, nonces);
149    let temp_path = parent.join(&temp_name);
150
151    let mut file =
152        OpenOptions::new().write(true).create_new(true).open(&temp_path).io_ctx(&temp_path)?;
153    file.write_all(change.after.as_bytes()).io_ctx(&temp_path)?;
154    file.flush().io_ctx(&temp_path)?;
155    file.sync_all().io_ctx(&temp_path)?;
156    drop(file);
157
158    if let Some(perm) = permissions
159        && let Err(e) = fs::set_permissions(&temp_path, perm)
160    {
161        let _ = fs::remove_file(&temp_path);
162        return Err(Error::Io { path: temp_path, source: e });
163    }
164
165    Ok(Staged { target: change.path.clone(), temp_path })
166}
167
168fn parent_dir(path: &Path) -> Result<&Path> {
169    path.parent().ok_or_else(|| Error::Io {
170        path: path.to_path_buf(),
171        source: std::io::Error::new(std::io::ErrorKind::InvalidInput, "no parent directory"),
172    })
173}
174
175struct CommitFailure {
176    committed: Vec<Committed>,
177    remaining_staged: usize,
178    error: Error,
179}
180
181fn commit_all(
182    staged: &[Staged],
183    nonces: &NonceGen,
184) -> std::result::Result<Vec<Committed>, CommitFailure> {
185    commit_all_with(staged, nonces, |_| Ok(()))
186}
187
188fn commit_all_with<F>(
189    staged: &[Staged],
190    nonces: &NonceGen,
191    between_commits: F,
192) -> std::result::Result<Vec<Committed>, CommitFailure>
193where
194    F: Fn(usize) -> Result<()>,
195{
196    let mut committed: Vec<Committed> = Vec::with_capacity(staged.len());
197    for (i, s) in staged.iter().enumerate() {
198        match commit_one(s, nonces) {
199            Ok(c) => committed.push(c),
200            Err(error) => {
201                return Err(CommitFailure { committed, remaining_staged: staged.len() - i, error });
202            }
203        }
204        if let Err(error) = between_commits(i) {
205            return Err(CommitFailure { committed, remaining_staged: staged.len() - i - 1, error });
206        }
207    }
208    Ok(committed)
209}
210
211fn commit_one(staged: &Staged, nonces: &NonceGen) -> Result<Committed> {
212    trace!(target = %staged.target.display(), "commit: rename");
213    let backup_name = sibling_temp_name(&staged.target, SiblingKind::Backup, nonces);
214    let backup_path = parent_dir(&staged.target)?.join(&backup_name);
215
216    rename_with_exdev_fallback(&staged.target, &backup_path).io_ctx(&staged.target)?;
217
218    if let Err(e) = rename_with_exdev_fallback(&staged.temp_path, &staged.target) {
219        let _ = rename_with_exdev_fallback(&backup_path, &staged.target);
220        return Err(Error::Io { path: staged.target.clone(), source: e });
221    }
222
223    Ok(Committed { target: staged.target.clone(), backup_path })
224}
225
226fn rollback_committed(committed: &[Committed]) {
227    for c in committed.iter().rev() {
228        let _ = rename_with_exdev_fallback(&c.backup_path, &c.target);
229    }
230}
231
232/// Rename `src` to `dst` with a copy + fsync + unlink fallback when
233/// the kernel rejects the rename with `EXDEV` (cross-device link).
234/// Same-directory renames inside a normal filesystem never hit this
235/// path; overlayfs, unionfs, bind-mounts with mismatched lowerdirs,
236/// FUSE backends, and certain container layouts can return EXDEV even
237/// for what looks lexically like a sibling rename, so we degrade to a
238/// non-atomic copy + remove rather than aborting the apply.
239fn rename_with_exdev_fallback(src: &Path, dst: &Path) -> std::io::Result<()> {
240    match fs::rename(src, dst) {
241        Ok(()) => Ok(()),
242        Err(e) if e.kind() == std::io::ErrorKind::CrossesDevices => {
243            fs::copy(src, dst)?;
244            // Best-effort fsync the copy so it survives a crash before
245            // the source unlink lands; a failure here is non-fatal —
246            // the caller already has the pre-image as a backup.
247            if let Ok(file) = std::fs::File::open(dst) {
248                let _ = file.sync_all();
249            }
250            fs::remove_file(src)
251        }
252        Err(e) => Err(e),
253    }
254}
255
256fn cleanup_remaining_staged(staged: &[Staged], remaining_count: usize) {
257    let start = staged.len().saturating_sub(remaining_count);
258    for s in &staged[start..] {
259        let _ = fs::remove_file(&s.temp_path);
260    }
261}
262
263fn best_effort_cleanup_backups(committed: &[Committed]) {
264    for c in committed {
265        let _ = fs::remove_file(&c.backup_path);
266    }
267}
268
269fn best_effort_fsync_parents(committed: &[Committed]) {
270    let mut seen: HashSet<&Path> = HashSet::new();
271    for c in committed {
272        if let Some(parent) = c.target.parent()
273            && seen.insert(parent)
274            && let Ok(dir) = std::fs::File::open(parent)
275        {
276            let _ = dir.sync_all();
277        }
278    }
279}
280
281/// Summary of a [`recover_sweep`] call.
282#[derive(Debug, Clone, Copy, Default)]
283#[cfg_attr(feature = "serde", derive(serde::Serialize))]
284pub struct RecoverySummary {
285    pub backups_restored: usize,
286    pub backups_removed: usize,
287    pub temps_removed: usize,
288}
289
290/// Walk every regular file under `roots` and reconcile leftover
291/// `.recast.bak.*` / `.recast.tmp.*` siblings from a previous interrupted
292/// apply.
293///
294/// Rules per target `foo`:
295/// - target exists, only `.foo.recast.bak.*`/`.tmp.*` leftovers → delete leftovers
296/// - target missing, `.foo.recast.bak.N` present → rename newest backup → target,
297///   delete older backups and any temps
298/// - target missing, only temps present → leave untouched (can't decide safely)
299pub fn recover_sweep<P: AsRef<Path>>(roots: &[P]) -> Result<RecoverySummary> {
300    use ignore::WalkBuilder;
301
302    let mut iter = if let Some(first) = roots.first() {
303        WalkBuilder::new(first.as_ref())
304    } else {
305        WalkBuilder::new(".")
306    };
307    for extra in roots.iter().skip(1) {
308        iter.add(extra.as_ref());
309    }
310    iter.hidden(false).ignore(false).git_ignore(false).git_global(false).git_exclude(false);
311
312    let mut groups: std::collections::HashMap<PathBuf, RecoveryGroup> =
313        std::collections::HashMap::new();
314    for entry in iter.build() {
315        let entry = entry.map_err(Error::Walk)?;
316        let path = entry.into_path();
317        let name = match path.file_name().and_then(|n| n.to_str()) {
318            Some(s) => s.to_owned(),
319            None => continue,
320        };
321        if let Some((target_name, kind, nonce)) = parse_sibling_name(&name) {
322            // A recast sibling file lives next to its target, so its
323            // path must have a parent. If the walker handed back a
324            // parentless path (root-level entry, weird filesystem),
325            // bucketing under PathBuf::new() would silently merge
326            // unrelated targets — skip instead.
327            let Some(parent) = path.parent() else {
328                trace!(name = %name, "recover: sibling has no parent; skipping");
329                continue;
330            };
331            let target = parent.join(&target_name);
332            let g = groups.entry(target).or_default();
333            match kind {
334                SiblingKind::Backup => g.backups.push((nonce, path.clone())),
335                SiblingKind::Temp => g.temps.push((nonce, path.clone())),
336            }
337        }
338    }
339
340    let mut summary = RecoverySummary::default();
341    // Best-effort: keep sweeping past per-group failures (the tree is
342    // already partial — that's why --recover was called) and surface the
343    // first error at the end so the exit code still reflects failure.
344    let mut first_error: Option<Error> = None;
345    for (target, mut group) in groups {
346        group.backups.sort_by_key(|(n, _)| *n);
347        group.temps.sort_by_key(|(n, _)| *n);
348        if target.exists() {
349            remove_nonced(&group.backups, &mut summary.backups_removed, &mut first_error);
350            remove_nonced(&group.temps, &mut summary.temps_removed, &mut first_error);
351            continue;
352        }
353        if let Some((_, newest)) = group.backups.pop() {
354            match rename_with_exdev_fallback(&newest, &target).io_ctx(&newest) {
355                Ok(()) => summary.backups_restored += 1,
356                Err(e) => {
357                    if first_error.is_none() {
358                        first_error = Some(e);
359                    }
360                    continue;
361                }
362            }
363            remove_nonced(&group.backups, &mut summary.backups_removed, &mut first_error);
364            remove_nonced(&group.temps, &mut summary.temps_removed, &mut first_error);
365        }
366    }
367    if let Some(e) = first_error {
368        return Err(e);
369    }
370    Ok(summary)
371}
372
373fn remove_nonced(entries: &[(u64, PathBuf)], counter: &mut usize, first_error: &mut Option<Error>) {
374    for (_, p) in entries {
375        match fs::remove_file(p).io_ctx(p) {
376            Ok(()) => *counter += 1,
377            Err(e) => {
378                if first_error.is_none() {
379                    *first_error = Some(e);
380                }
381            }
382        }
383    }
384}
385
386#[derive(Default)]
387struct RecoveryGroup {
388    backups: Vec<(u64, PathBuf)>,
389    temps: Vec<(u64, PathBuf)>,
390}
391
392#[derive(Copy, Clone)]
393enum SiblingKind {
394    Backup,
395    Temp,
396}
397
398impl SiblingKind {
399    /// Filename token used in `.{target}.recast.{token}.{nonce}` sibling
400    /// names. Single source of truth for both emission
401    /// ([`sibling_temp_name`]) and parsing ([`parse_sibling_name`]).
402    fn as_str(self) -> &'static str {
403        match self {
404            SiblingKind::Backup => "bak",
405            SiblingKind::Temp => "tmp",
406        }
407    }
408
409    fn from_token(token: &str) -> Option<Self> {
410        match token {
411            "bak" => Some(SiblingKind::Backup),
412            "tmp" => Some(SiblingKind::Temp),
413            _ => None,
414        }
415    }
416}
417
418fn parse_sibling_name(name: &str) -> Option<(String, SiblingKind, u64)> {
419    let rest = name.strip_prefix('.')?;
420    let idx_recast = rest.find(".recast.")?;
421    let (target, suffix) = rest.split_at(idx_recast);
422    if target.is_empty() {
423        return None;
424    }
425    let suffix = suffix.strip_prefix(".recast.")?;
426    let dot = suffix.find('.')?;
427    let (kind_str, nonce_str) = suffix.split_at(dot);
428    let kind = SiblingKind::from_token(kind_str)?;
429    let nonce: u64 = nonce_str.strip_prefix('.')?.parse().ok()?;
430    Some((target.to_owned(), kind, nonce))
431}
432
433fn sibling_temp_name(target: &Path, kind: SiblingKind, nonces: &NonceGen) -> String {
434    let name = target.file_name().map(|n| n.to_string_lossy().into_owned()).unwrap_or_default();
435    let nonce = nonces.next();
436    format!(".{name}.recast.{token}.{nonce}", token = kind.as_str())
437}
438
439/// Per-apply nonce generator for `.recast.{kind}.{nonce}` sibling
440/// filenames. Samples epoch nanoseconds once at construction and mixes
441/// the constant with a monotonically-incremented counter so siblings
442/// from concurrent stage workers don't collide and a single apply can
443/// produce thousands of unique names without re-entering the kernel
444/// per file.
445///
446/// One instance per [`apply_changes`] / [`recover_sweep`] (writer) call
447/// — passed by reference into the stage + commit phases so the counter
448/// is scoped to the apply that owns it instead of living in static
449/// mutable state.
450struct NonceGen {
451    seed: u64,
452    counter: std::sync::atomic::AtomicU64,
453}
454
455impl NonceGen {
456    fn new() -> Self {
457        use std::time::{SystemTime, UNIX_EPOCH};
458        let ts =
459            SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
460        let seed = ts.wrapping_mul(0x9E37_79B9_7F4A_7C15);
461        Self { seed, counter: std::sync::atomic::AtomicU64::new(0) }
462    }
463
464    fn next(&self) -> u64 {
465        use std::sync::atomic::Ordering;
466        let n = self.counter.fetch_add(1, Ordering::Relaxed);
467        self.seed.wrapping_add(n)
468    }
469}
470
471#[cfg(test)]
472#[path = "commit_tests.rs"]
473mod tests;