Skip to main content

mkit_core/ops/
recovery.rs

1//! Recovery log — durable record of commits superseded by
2//! history-rewriting operations (`commit --amend`, `reset`, `rebase`),
3//! so they (a) remain GC roots within a retention window and (b) can be
4//! surfaced and restored. This is Part 2 of #260, the recovery half of
5//! the gc model (Part 1 added the reachability root set in
6//! [`super::gc`]).
7//!
8//! Why this exists: the per-branch history journal stores only opaque
9//! MMR digests, so a superseded tip is otherwise unrecoverable the moment
10//! it leaves the ref set — and `mkit gc` (#233) would reclaim it. Each
11//! rewrite appends the old tip here; gc treats every logged hash as a
12//! root (clock-free), and [`expire`] drops entries older than the
13//! retention policy so they stop pinning objects.
14//!
15//! On-disk: `.mkit/recovery-log`, append-only, one tab-delimited record
16//! per line — `<unix_ts>\t<op>\t<64-hex superseded>\t<branch>`. The
17//! branch is last because ref names cannot contain tabs or spaces; `op`
18//! is a short internal token. Parsing is **strict** (fail closed) so a
19//! corrupt log makes gc abort rather than under-count roots.
20//!
21//! NOTE: the *producers* (recording at the amend/reset/rebase rewrite
22//! sites) are Part 2b — this module is the format, store, retention
23//! policy, and gc-root integration.
24//!
25//! ## Concurrency
26//!
27//! [`record`] and [`expire`] are **not** internally synchronized.
28//! [`expire`] reads-filters-rewrites the log, so an [`record`] append
29//! that interleaves between its read and its atomic replace would be
30//! clobbered — and the superseded commit would silently drop out of
31//! [`roots`], un-pinning it for gc. Callers MUST therefore hold the repo
32//! lock, which every mutating command and `mkit gc` already do (they
33//! serialize via `worktree.lock`); gc's "expire then collect roots"
34//! sequence must run under that same lock. This module assumes that
35//! invariant rather than taking a second, nested lock.
36
37use std::collections::BTreeSet;
38use std::fmt::Write as _;
39use std::fs::{self, OpenOptions};
40use std::io::{self, Write as _};
41use std::path::Path;
42
43use crate::atomic::{sync_parent_dir, write_atomic};
44use crate::hash::{self, Hash};
45
46/// File name under `.mkit/` for the append-only recovery log.
47pub const RECOVERY_LOG: &str = "recovery-log";
48
49/// Default retention: keep entries from the last 90 days, and always the
50/// most recent 50 regardless of age, so a recent mistake is recoverable
51/// even on a long-idle repo.
52pub const DEFAULT_GRACE_SECS: u64 = 90 * 24 * 60 * 60;
53/// Default floor on retained entries regardless of age.
54pub const DEFAULT_KEEP_LAST: usize = 50;
55
56/// Errors from the recovery log.
57#[derive(Debug, thiserror::Error)]
58pub enum RecoveryError {
59    #[error("io: {0}")]
60    Io(#[from] io::Error),
61    #[error("malformed object id in recovery log: {0}")]
62    BadHash(#[from] hash::FromHexError),
63    #[error("malformed recovery-log line: {0}")]
64    Malformed(String),
65    #[error("recovery-log field {0} may not contain a tab or newline")]
66    InvalidField(&'static str),
67}
68
69type Result<T> = std::result::Result<T, RecoveryError>;
70
71/// One superseded-commit record.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct RecoveryEntry {
74    /// Unix seconds when the rewrite happened.
75    pub timestamp: u64,
76    /// Short operation token, e.g. `amend`, `reset`, `rebase`.
77    pub op: String,
78    /// The commit that was superseded (and would otherwise dangle).
79    pub superseded: Hash,
80    /// Branch the rewrite moved (empty for a detached HEAD).
81    pub branch: String,
82}
83
84/// Retention policy for [`expire`]. An entry is kept if it is newer than
85/// `grace_secs` **or** among the most recent `keep_last`.
86#[derive(Debug, Clone, Copy)]
87pub struct RetentionPolicy {
88    pub grace_secs: u64,
89    pub keep_last: usize,
90}
91
92impl Default for RetentionPolicy {
93    fn default() -> Self {
94        Self {
95            grace_secs: DEFAULT_GRACE_SECS,
96            keep_last: DEFAULT_KEEP_LAST,
97        }
98    }
99}
100
101/// Append `entry` to the recovery log, creating it if absent. The
102/// zero-hash is rejected (nothing to recover).
103///
104/// **Durable**: the appended line is `fsync`'d (and the parent directory
105/// `fsync`'d) before returning, so a crash cannot leave a ref rewrite
106/// durable while its recovery entry is lost — that would reopen the
107/// unrecoverable-superseded-commit gap this log exists to close.
108///
109/// **Not internally synchronized**: callers MUST hold the repo lock (as
110/// every mutating command and `mkit gc` do). See the module-level
111/// concurrency note — a concurrent [`expire`] could otherwise clobber a
112/// racing append.
113///
114/// # Errors
115/// [`RecoveryError::InvalidField`] if `op`/`branch` contain a tab or
116/// newline; [`RecoveryError::Io`] on filesystem failure.
117pub fn record(mkit_dir: &Path, entry: &RecoveryEntry) -> Result<()> {
118    if entry.op.contains(['\t', '\n']) {
119        return Err(RecoveryError::InvalidField("op"));
120    }
121    if entry.branch.contains(['\t', '\n']) {
122        return Err(RecoveryError::InvalidField("branch"));
123    }
124    if entry.superseded == hash::ZERO {
125        return Ok(());
126    }
127    fs::create_dir_all(mkit_dir)?;
128    let line = format!(
129        "{}\t{}\t{}\t{}\n",
130        entry.timestamp,
131        entry.op,
132        hash::to_hex(&entry.superseded),
133        entry.branch,
134    );
135    let path = mkit_dir.join(RECOVERY_LOG);
136    let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
137    f.write_all(line.as_bytes())?;
138    // fsync the data, then the directory entry (cheap — records happen
139    // only on history rewrites, and durability is the whole point).
140    f.sync_all()?;
141    sync_parent_dir(mkit_dir)?;
142    Ok(())
143}
144
145/// Read every recovery-log entry, oldest first. Absent log ⇒ empty.
146///
147/// # Errors
148/// Strict: any unparseable line or bad hash errors (fail closed for gc).
149pub fn read_all(mkit_dir: &Path) -> Result<Vec<RecoveryEntry>> {
150    let raw = match fs::read_to_string(mkit_dir.join(RECOVERY_LOG)) {
151        Ok(s) => s,
152        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
153        Err(e) => return Err(e.into()),
154    };
155    raw.lines()
156        .filter(|l| !l.is_empty())
157        .map(parse_line)
158        .collect()
159}
160
161/// The set of superseded-commit hashes currently in the log — gc roots,
162/// clock-free. Call [`expire`] first to drop entries past the retention
163/// policy before treating these as live.
164///
165/// # Errors
166/// Propagates [`RecoveryError`] from a strict [`read_all`].
167pub fn roots(mkit_dir: &Path) -> Result<BTreeSet<Hash>> {
168    Ok(read_all(mkit_dir)?
169        .into_iter()
170        .map(|e| e.superseded)
171        .filter(|h| *h != hash::ZERO)
172        .collect())
173}
174
175/// Whether the entry at position `i` of a `total`-length log is retained
176/// under `policy`: kept if fresher than `grace_secs` OR among the last
177/// `keep_last`. Shared by [`expire`] and [`would_expire`] so the preview
178/// can't drift from the real prune.
179fn is_retained(
180    i: usize,
181    e: &RecoveryEntry,
182    total: usize,
183    now: u64,
184    policy: &RetentionPolicy,
185) -> bool {
186    let fresh = now.saturating_sub(e.timestamp) <= policy.grace_secs;
187    let keep_floor = total.saturating_sub(policy.keep_last);
188    fresh || i >= keep_floor
189}
190
191/// Count how many entries [`expire`] would prune, **without** mutating the
192/// log. For `gc --dry-run` previews.
193///
194/// # Errors
195/// [`RecoveryError`] from a strict [`read_all`].
196pub fn would_expire(mkit_dir: &Path, now: u64, policy: &RetentionPolicy) -> Result<usize> {
197    let all = read_all(mkit_dir)?;
198    let total = all.len();
199    Ok(all
200        .iter()
201        .enumerate()
202        .filter(|(i, e)| !is_retained(*i, e, total, now, policy))
203        .count())
204}
205
206/// Drop entries that are both older than `policy.grace_secs` (relative to
207/// `now`, unix seconds) and not among the most recent `policy.keep_last`.
208/// Rewrites the log atomically (durably, via `write_atomic`). Returns
209/// the number of entries pruned.
210///
211/// Must be called under the repo lock — see the module concurrency note;
212/// a concurrent [`record`] append would otherwise be lost in the rewrite.
213///
214/// # Errors
215/// [`RecoveryError`] on a strict read or filesystem failure.
216pub fn expire(mkit_dir: &Path, now: u64, policy: &RetentionPolicy) -> Result<usize> {
217    let all = read_all(mkit_dir)?;
218    let total = all.len();
219    if total == 0 {
220        return Ok(0);
221    }
222    let kept: Vec<RecoveryEntry> = all
223        .into_iter()
224        .enumerate()
225        .filter(|(i, e)| is_retained(*i, e, total, now, policy))
226        .map(|(_, e)| e)
227        .collect();
228    let pruned = total - kept.len();
229    if pruned == 0 {
230        return Ok(0);
231    }
232    let mut buf = String::new();
233    for e in &kept {
234        let _ = writeln!(
235            buf,
236            "{}\t{}\t{}\t{}",
237            e.timestamp,
238            e.op,
239            hash::to_hex(&e.superseded),
240            e.branch
241        );
242    }
243    write_atomic(&mkit_dir.join(RECOVERY_LOG), buf.as_bytes(), true)?;
244    Ok(pruned)
245}
246
247fn parse_line(line: &str) -> Result<RecoveryEntry> {
248    let mut it = line.splitn(4, '\t');
249    let ts = it.next().ok_or_else(|| malformed(line))?;
250    let op = it.next().ok_or_else(|| malformed(line))?;
251    let hex = it.next().ok_or_else(|| malformed(line))?;
252    let branch = it.next().ok_or_else(|| malformed(line))?;
253    let timestamp = ts.parse::<u64>().map_err(|_| malformed(line))?;
254    let superseded = hash::from_hex(hex)?;
255    Ok(RecoveryEntry {
256        timestamp,
257        op: op.to_owned(),
258        superseded,
259        branch: branch.to_owned(),
260    })
261}
262
263fn malformed(line: &str) -> RecoveryError {
264    RecoveryError::Malformed(line.to_owned())
265}
266
267// =====================================================================
268// Tests
269// =====================================================================
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use tempfile::TempDir;
275
276    fn md() -> (TempDir, std::path::PathBuf) {
277        let d = TempDir::new().unwrap();
278        let md = d.path().join(crate::MKIT_DIR);
279        fs::create_dir_all(&md).unwrap();
280        (d, md)
281    }
282
283    fn h(byte: u8) -> Hash {
284        [byte; 32]
285    }
286
287    fn entry(ts: u64, byte: u8) -> RecoveryEntry {
288        RecoveryEntry {
289            timestamp: ts,
290            op: "amend".into(),
291            superseded: h(byte),
292            branch: "main".into(),
293        }
294    }
295
296    #[test]
297    fn record_then_read_roundtrips_in_order() {
298        let (_d, md) = md();
299        record(&md, &entry(100, 1)).unwrap();
300        record(&md, &entry(200, 2)).unwrap();
301        let all = read_all(&md).unwrap();
302        assert_eq!(all, vec![entry(100, 1), entry(200, 2)]);
303        assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(1), h(2)]));
304    }
305
306    #[test]
307    fn absent_log_is_empty_not_error() {
308        let (_d, md) = md();
309        assert!(read_all(&md).unwrap().is_empty());
310        assert!(roots(&md).unwrap().is_empty());
311    }
312
313    #[test]
314    fn record_rejects_tab_in_fields_and_skips_zero_hash() {
315        let (_d, md) = md();
316        let mut e = entry(1, 1);
317        e.branch = "a\tb".into();
318        assert!(matches!(
319            record(&md, &e),
320            Err(RecoveryError::InvalidField("branch"))
321        ));
322        // Zero hash is a no-op (nothing to recover), not an error.
323        record(&md, &entry(1, 0)).unwrap();
324        assert!(roots(&md).unwrap().is_empty());
325    }
326
327    #[test]
328    fn read_fails_closed_on_corrupt_line() {
329        let (_d, md) = md();
330        fs::write(md.join(RECOVERY_LOG), "not-a-valid-line\n").unwrap();
331        assert!(read_all(&md).is_err(), "corrupt log must fail closed");
332    }
333
334    #[test]
335    fn expire_drops_old_entries_outside_keep_last() {
336        let (_d, md) = md();
337        // 3 entries; now=1000, grace=200 → fresh means ts>=800, so
338        // ts=850/900 are fresh and ts=10 is old. keep_last=1 would also
339        // retain the newest, but the old ts=10 entry is neither fresh nor
340        // in the last-1, so it is pruned.
341        record(&md, &entry(10, 1)).unwrap();
342        record(&md, &entry(850, 2)).unwrap();
343        record(&md, &entry(900, 3)).unwrap();
344        let pruned = expire(
345            &md,
346            1000,
347            &RetentionPolicy {
348                grace_secs: 200,
349                keep_last: 1,
350            },
351        )
352        .unwrap();
353        assert_eq!(pruned, 1, "the ts=10 entry is old and not in last-1");
354        assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(2), h(3)]));
355    }
356
357    #[test]
358    fn would_expire_counts_without_mutating() {
359        let (_d, md) = md();
360        record(&md, &entry(10, 1)).unwrap();
361        record(&md, &entry(850, 2)).unwrap();
362        record(&md, &entry(900, 3)).unwrap();
363        let policy = RetentionPolicy {
364            grace_secs: 200,
365            keep_last: 1,
366        };
367        let before = read_all(&md).unwrap();
368        let n = would_expire(&md, 1000, &policy).unwrap();
369        assert_eq!(n, 1, "ts=10 is the only expirable entry");
370        // The log itself is untouched by the preview.
371        assert_eq!(
372            read_all(&md).unwrap(),
373            before,
374            "would_expire must not mutate"
375        );
376    }
377
378    #[test]
379    fn expire_keep_last_retains_old_recent_entries() {
380        let (_d, md) = md();
381        record(&md, &entry(1, 1)).unwrap();
382        record(&md, &entry(2, 2)).unwrap();
383        // Everything is "old" vs now, but keep_last=5 retains all.
384        let pruned = expire(
385            &md,
386            1_000_000,
387            &RetentionPolicy {
388                grace_secs: 0,
389                keep_last: 5,
390            },
391        )
392        .unwrap();
393        assert_eq!(pruned, 0);
394        assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(1), h(2)]));
395    }
396}