1use std::collections::HashSet;
11use std::fs::{self, OpenOptions};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use rayon::prelude::*;
16use tracing::{debug, trace};
17
18use crate::error::{Error, IoCtx, Result};
19use crate::plan::{FileChange, Plan};
20
21#[derive(Debug, Clone, Copy)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize))]
25pub struct ApplyOutcome {
26 pub files_written: usize,
27 pub total_matches: usize,
28}
29
30struct Staged {
31 target: PathBuf,
32 temp_path: PathBuf,
33}
34
35struct Committed {
36 target: PathBuf,
37 backup_path: PathBuf,
38}
39
40pub fn apply_changes(plan: &Plan) -> Result<ApplyOutcome> {
53 if plan.changes.is_empty() {
54 debug!("apply: no changes; nothing to do");
55 return Ok(ApplyOutcome { files_written: 0, total_matches: plan.total_matches });
56 }
57
58 let nonces = NonceGen::new();
59 debug!(files = plan.changes.len(), "apply: stage phase begin");
60 let staged = stage_all(&plan.changes, &nonces)?;
61 debug!(files = staged.len(), "apply: stage phase complete");
62
63 finalize_apply(plan, &staged, commit_all(&staged, &nonces))
64}
65
66fn finalize_apply(
67 plan: &Plan,
68 staged: &[Staged],
69 result: std::result::Result<Vec<Committed>, CommitFailure>,
70) -> Result<ApplyOutcome> {
71 match result {
72 Ok(committed) => {
73 debug!(files = committed.len(), "apply: commit phase complete");
74 best_effort_fsync_parents(&committed);
78 best_effort_cleanup_backups(&committed);
79 Ok(ApplyOutcome {
80 files_written: plan.changes.len(),
81 total_matches: plan.total_matches,
82 })
83 }
84 Err(CommitFailure { committed, remaining_staged, error }) => {
85 debug!(
86 committed = committed.len(),
87 remaining = remaining_staged,
88 "apply: commit failed, rolling back"
89 );
90 rollback_committed(&committed);
91 cleanup_remaining_staged(staged, remaining_staged);
92 Err(error)
93 }
94 }
95}
96
97#[cfg(test)]
101fn apply_inner<F>(plan: &Plan, between_commits: F) -> Result<ApplyOutcome>
102where
103 F: Fn(usize) -> Result<()>,
104{
105 if plan.changes.is_empty() {
106 return Ok(ApplyOutcome { files_written: 0, total_matches: plan.total_matches });
107 }
108 let nonces = NonceGen::new();
109 let staged = stage_all(&plan.changes, &nonces)?;
110 finalize_apply(plan, &staged, commit_all_with(&staged, &nonces, between_commits))
111}
112
113fn stage_all(changes: &[FileChange], nonces: &NonceGen) -> Result<Vec<Staged>> {
114 let results: Vec<Result<Staged>> = changes.par_iter().map(|c| stage_one(c, nonces)).collect();
118
119 let mut staged: Vec<Staged> = Vec::with_capacity(results.len());
120 let mut first_error: Option<Error> = None;
121 for r in results {
122 match r {
123 Ok(s) => staged.push(s),
124 Err(e) => {
125 if first_error.is_none() {
126 first_error = Some(e);
127 }
128 }
129 }
130 }
131 if let Some(e) = first_error {
132 for s in &staged {
133 let _ = fs::remove_file(&s.temp_path);
134 }
135 return Err(e);
136 }
137 Ok(staged)
138}
139
140fn stage_one(change: &FileChange, nonces: &NonceGen) -> Result<Staged> {
141 let parent = parent_dir(&change.path)?;
142
143 let permissions = change.permissions.clone();
147
148 let temp_name = sibling_temp_name(&change.path, SiblingKind::Temp, nonces);
149 let temp_path = parent.join(&temp_name);
150
151 let mut file =
152 OpenOptions::new().write(true).create_new(true).open(&temp_path).io_ctx(&temp_path)?;
153 file.write_all(change.after.as_bytes()).io_ctx(&temp_path)?;
154 file.flush().io_ctx(&temp_path)?;
155 file.sync_all().io_ctx(&temp_path)?;
156 drop(file);
157
158 if let Some(perm) = permissions
159 && let Err(e) = fs::set_permissions(&temp_path, perm)
160 {
161 let _ = fs::remove_file(&temp_path);
162 return Err(Error::Io { path: temp_path, source: e });
163 }
164
165 Ok(Staged { target: change.path.clone(), temp_path })
166}
167
168fn parent_dir(path: &Path) -> Result<&Path> {
169 path.parent().ok_or_else(|| Error::Io {
170 path: path.to_path_buf(),
171 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, "no parent directory"),
172 })
173}
174
175struct CommitFailure {
176 committed: Vec<Committed>,
177 remaining_staged: usize,
178 error: Error,
179}
180
181fn commit_all(
182 staged: &[Staged],
183 nonces: &NonceGen,
184) -> std::result::Result<Vec<Committed>, CommitFailure> {
185 commit_all_with(staged, nonces, |_| Ok(()))
186}
187
188fn commit_all_with<F>(
189 staged: &[Staged],
190 nonces: &NonceGen,
191 between_commits: F,
192) -> std::result::Result<Vec<Committed>, CommitFailure>
193where
194 F: Fn(usize) -> Result<()>,
195{
196 let mut committed: Vec<Committed> = Vec::with_capacity(staged.len());
197 for (i, s) in staged.iter().enumerate() {
198 match commit_one(s, nonces) {
199 Ok(c) => committed.push(c),
200 Err(error) => {
201 return Err(CommitFailure { committed, remaining_staged: staged.len() - i, error });
202 }
203 }
204 if let Err(error) = between_commits(i) {
205 return Err(CommitFailure { committed, remaining_staged: staged.len() - i - 1, error });
206 }
207 }
208 Ok(committed)
209}
210
211fn commit_one(staged: &Staged, nonces: &NonceGen) -> Result<Committed> {
212 trace!(target = %staged.target.display(), "commit: rename");
213 let backup_name = sibling_temp_name(&staged.target, SiblingKind::Backup, nonces);
214 let backup_path = parent_dir(&staged.target)?.join(&backup_name);
215
216 rename_with_exdev_fallback(&staged.target, &backup_path).io_ctx(&staged.target)?;
217
218 if let Err(e) = rename_with_exdev_fallback(&staged.temp_path, &staged.target) {
219 let _ = rename_with_exdev_fallback(&backup_path, &staged.target);
220 return Err(Error::Io { path: staged.target.clone(), source: e });
221 }
222
223 Ok(Committed { target: staged.target.clone(), backup_path })
224}
225
226fn rollback_committed(committed: &[Committed]) {
227 for c in committed.iter().rev() {
228 let _ = rename_with_exdev_fallback(&c.backup_path, &c.target);
229 }
230}
231
232fn rename_with_exdev_fallback(src: &Path, dst: &Path) -> std::io::Result<()> {
240 match fs::rename(src, dst) {
241 Ok(()) => Ok(()),
242 Err(e) if e.kind() == std::io::ErrorKind::CrossesDevices => {
243 fs::copy(src, dst)?;
244 if let Ok(file) = std::fs::File::open(dst) {
248 let _ = file.sync_all();
249 }
250 fs::remove_file(src)
251 }
252 Err(e) => Err(e),
253 }
254}
255
256fn cleanup_remaining_staged(staged: &[Staged], remaining_count: usize) {
257 let start = staged.len().saturating_sub(remaining_count);
258 for s in &staged[start..] {
259 let _ = fs::remove_file(&s.temp_path);
260 }
261}
262
263fn best_effort_cleanup_backups(committed: &[Committed]) {
264 for c in committed {
265 let _ = fs::remove_file(&c.backup_path);
266 }
267}
268
269fn best_effort_fsync_parents(committed: &[Committed]) {
270 let mut seen: HashSet<&Path> = HashSet::new();
271 for c in committed {
272 if let Some(parent) = c.target.parent()
273 && seen.insert(parent)
274 && let Ok(dir) = std::fs::File::open(parent)
275 {
276 let _ = dir.sync_all();
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, Default)]
283#[cfg_attr(feature = "serde", derive(serde::Serialize))]
284pub struct RecoverySummary {
285 pub backups_restored: usize,
286 pub backups_removed: usize,
287 pub temps_removed: usize,
288}
289
290pub fn recover_sweep<P: AsRef<Path>>(roots: &[P]) -> Result<RecoverySummary> {
300 use ignore::WalkBuilder;
301
302 let mut iter = if let Some(first) = roots.first() {
303 WalkBuilder::new(first.as_ref())
304 } else {
305 WalkBuilder::new(".")
306 };
307 for extra in roots.iter().skip(1) {
308 iter.add(extra.as_ref());
309 }
310 iter.hidden(false).ignore(false).git_ignore(false).git_global(false).git_exclude(false);
311
312 let mut groups: std::collections::HashMap<PathBuf, RecoveryGroup> =
313 std::collections::HashMap::new();
314 for entry in iter.build() {
315 let entry = entry.map_err(Error::Walk)?;
316 let path = entry.into_path();
317 let name = match path.file_name().and_then(|n| n.to_str()) {
318 Some(s) => s.to_owned(),
319 None => continue,
320 };
321 if let Some((target_name, kind, nonce)) = parse_sibling_name(&name) {
322 let Some(parent) = path.parent() else {
328 trace!(name = %name, "recover: sibling has no parent; skipping");
329 continue;
330 };
331 let target = parent.join(&target_name);
332 let g = groups.entry(target).or_default();
333 match kind {
334 SiblingKind::Backup => g.backups.push((nonce, path.clone())),
335 SiblingKind::Temp => g.temps.push((nonce, path.clone())),
336 }
337 }
338 }
339
340 let mut summary = RecoverySummary::default();
341 let mut first_error: Option<Error> = None;
345 for (target, mut group) in groups {
346 group.backups.sort_by_key(|(n, _)| *n);
347 group.temps.sort_by_key(|(n, _)| *n);
348 if target.exists() {
349 remove_nonced(&group.backups, &mut summary.backups_removed, &mut first_error);
350 remove_nonced(&group.temps, &mut summary.temps_removed, &mut first_error);
351 continue;
352 }
353 if let Some((_, newest)) = group.backups.pop() {
354 match rename_with_exdev_fallback(&newest, &target).io_ctx(&newest) {
355 Ok(()) => summary.backups_restored += 1,
356 Err(e) => {
357 if first_error.is_none() {
358 first_error = Some(e);
359 }
360 continue;
361 }
362 }
363 remove_nonced(&group.backups, &mut summary.backups_removed, &mut first_error);
364 remove_nonced(&group.temps, &mut summary.temps_removed, &mut first_error);
365 }
366 }
367 if let Some(e) = first_error {
368 return Err(e);
369 }
370 Ok(summary)
371}
372
373fn remove_nonced(entries: &[(u64, PathBuf)], counter: &mut usize, first_error: &mut Option<Error>) {
374 for (_, p) in entries {
375 match fs::remove_file(p).io_ctx(p) {
376 Ok(()) => *counter += 1,
377 Err(e) => {
378 if first_error.is_none() {
379 *first_error = Some(e);
380 }
381 }
382 }
383 }
384}
385
386#[derive(Default)]
387struct RecoveryGroup {
388 backups: Vec<(u64, PathBuf)>,
389 temps: Vec<(u64, PathBuf)>,
390}
391
392#[derive(Copy, Clone)]
393enum SiblingKind {
394 Backup,
395 Temp,
396}
397
398impl SiblingKind {
399 fn as_str(self) -> &'static str {
403 match self {
404 SiblingKind::Backup => "bak",
405 SiblingKind::Temp => "tmp",
406 }
407 }
408
409 fn from_token(token: &str) -> Option<Self> {
410 match token {
411 "bak" => Some(SiblingKind::Backup),
412 "tmp" => Some(SiblingKind::Temp),
413 _ => None,
414 }
415 }
416}
417
418fn parse_sibling_name(name: &str) -> Option<(String, SiblingKind, u64)> {
419 let rest = name.strip_prefix('.')?;
420 let idx_recast = rest.find(".recast.")?;
421 let (target, suffix) = rest.split_at(idx_recast);
422 if target.is_empty() {
423 return None;
424 }
425 let suffix = suffix.strip_prefix(".recast.")?;
426 let dot = suffix.find('.')?;
427 let (kind_str, nonce_str) = suffix.split_at(dot);
428 let kind = SiblingKind::from_token(kind_str)?;
429 let nonce: u64 = nonce_str.strip_prefix('.')?.parse().ok()?;
430 Some((target.to_owned(), kind, nonce))
431}
432
433fn sibling_temp_name(target: &Path, kind: SiblingKind, nonces: &NonceGen) -> String {
434 let name = target.file_name().map(|n| n.to_string_lossy().into_owned()).unwrap_or_default();
435 let nonce = nonces.next();
436 format!(".{name}.recast.{token}.{nonce}", token = kind.as_str())
437}
438
439struct NonceGen {
451 seed: u64,
452 counter: std::sync::atomic::AtomicU64,
453}
454
455impl NonceGen {
456 fn new() -> Self {
457 use std::time::{SystemTime, UNIX_EPOCH};
458 let ts =
459 SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
460 let seed = ts.wrapping_mul(0x9E37_79B9_7F4A_7C15);
461 Self { seed, counter: std::sync::atomic::AtomicU64::new(0) }
462 }
463
464 fn next(&self) -> u64 {
465 use std::sync::atomic::Ordering;
466 let n = self.counter.fetch_add(1, Ordering::Relaxed);
467 self.seed.wrapping_add(n)
468 }
469}
470
471#[cfg(test)]
472#[path = "commit_tests.rs"]
473mod tests;