1use std::collections::BTreeSet;
38use std::fmt::Write as _;
39use std::fs::{self, OpenOptions};
40use std::io::{self, Write as _};
41use std::path::Path;
42
43use crate::atomic::{sync_parent_dir, write_atomic};
44use crate::hash::{self, Hash};
45
46pub const RECOVERY_LOG: &str = "recovery-log";
48
49pub const DEFAULT_GRACE_SECS: u64 = 90 * 24 * 60 * 60;
53pub const DEFAULT_KEEP_LAST: usize = 50;
55
56#[derive(Debug, thiserror::Error)]
58pub enum RecoveryError {
59 #[error("io: {0}")]
60 Io(#[from] io::Error),
61 #[error("malformed object id in recovery log: {0}")]
62 BadHash(#[from] hash::FromHexError),
63 #[error("malformed recovery-log line: {0}")]
64 Malformed(String),
65 #[error("recovery-log field {0} may not contain a tab or newline")]
66 InvalidField(&'static str),
67}
68
69type Result<T> = std::result::Result<T, RecoveryError>;
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct RecoveryEntry {
74 pub timestamp: u64,
76 pub op: String,
78 pub superseded: Hash,
80 pub branch: String,
82}
83
84#[derive(Debug, Clone, Copy)]
87pub struct RetentionPolicy {
88 pub grace_secs: u64,
89 pub keep_last: usize,
90}
91
92impl Default for RetentionPolicy {
93 fn default() -> Self {
94 Self {
95 grace_secs: DEFAULT_GRACE_SECS,
96 keep_last: DEFAULT_KEEP_LAST,
97 }
98 }
99}
100
101pub fn record(mkit_dir: &Path, entry: &RecoveryEntry) -> Result<()> {
118 if entry.op.contains(['\t', '\n']) {
119 return Err(RecoveryError::InvalidField("op"));
120 }
121 if entry.branch.contains(['\t', '\n']) {
122 return Err(RecoveryError::InvalidField("branch"));
123 }
124 if entry.superseded == hash::ZERO {
125 return Ok(());
126 }
127 fs::create_dir_all(mkit_dir)?;
128 let line = format!(
129 "{}\t{}\t{}\t{}\n",
130 entry.timestamp,
131 entry.op,
132 hash::to_hex(&entry.superseded),
133 entry.branch,
134 );
135 let path = mkit_dir.join(RECOVERY_LOG);
136 let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
137 f.write_all(line.as_bytes())?;
138 f.sync_all()?;
141 sync_parent_dir(mkit_dir)?;
142 Ok(())
143}
144
145pub fn read_all(mkit_dir: &Path) -> Result<Vec<RecoveryEntry>> {
150 let raw = match fs::read_to_string(mkit_dir.join(RECOVERY_LOG)) {
151 Ok(s) => s,
152 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
153 Err(e) => return Err(e.into()),
154 };
155 raw.lines()
156 .filter(|l| !l.is_empty())
157 .map(parse_line)
158 .collect()
159}
160
161pub fn roots(mkit_dir: &Path) -> Result<BTreeSet<Hash>> {
168 Ok(read_all(mkit_dir)?
169 .into_iter()
170 .map(|e| e.superseded)
171 .filter(|h| *h != hash::ZERO)
172 .collect())
173}
174
175fn is_retained(
180 i: usize,
181 e: &RecoveryEntry,
182 total: usize,
183 now: u64,
184 policy: &RetentionPolicy,
185) -> bool {
186 let fresh = now.saturating_sub(e.timestamp) <= policy.grace_secs;
187 let keep_floor = total.saturating_sub(policy.keep_last);
188 fresh || i >= keep_floor
189}
190
191pub fn would_expire(mkit_dir: &Path, now: u64, policy: &RetentionPolicy) -> Result<usize> {
197 let all = read_all(mkit_dir)?;
198 let total = all.len();
199 Ok(all
200 .iter()
201 .enumerate()
202 .filter(|(i, e)| !is_retained(*i, e, total, now, policy))
203 .count())
204}
205
206pub fn expire(mkit_dir: &Path, now: u64, policy: &RetentionPolicy) -> Result<usize> {
217 let all = read_all(mkit_dir)?;
218 let total = all.len();
219 if total == 0 {
220 return Ok(0);
221 }
222 let kept: Vec<RecoveryEntry> = all
223 .into_iter()
224 .enumerate()
225 .filter(|(i, e)| is_retained(*i, e, total, now, policy))
226 .map(|(_, e)| e)
227 .collect();
228 let pruned = total - kept.len();
229 if pruned == 0 {
230 return Ok(0);
231 }
232 let mut buf = String::new();
233 for e in &kept {
234 let _ = writeln!(
235 buf,
236 "{}\t{}\t{}\t{}",
237 e.timestamp,
238 e.op,
239 hash::to_hex(&e.superseded),
240 e.branch
241 );
242 }
243 write_atomic(&mkit_dir.join(RECOVERY_LOG), buf.as_bytes(), true)?;
244 Ok(pruned)
245}
246
247fn parse_line(line: &str) -> Result<RecoveryEntry> {
248 let mut it = line.splitn(4, '\t');
249 let ts = it.next().ok_or_else(|| malformed(line))?;
250 let op = it.next().ok_or_else(|| malformed(line))?;
251 let hex = it.next().ok_or_else(|| malformed(line))?;
252 let branch = it.next().ok_or_else(|| malformed(line))?;
253 let timestamp = ts.parse::<u64>().map_err(|_| malformed(line))?;
254 let superseded = hash::from_hex(hex)?;
255 Ok(RecoveryEntry {
256 timestamp,
257 op: op.to_owned(),
258 superseded,
259 branch: branch.to_owned(),
260 })
261}
262
263fn malformed(line: &str) -> RecoveryError {
264 RecoveryError::Malformed(line.to_owned())
265}
266
267#[cfg(test)]
272mod tests {
273 use super::*;
274 use tempfile::TempDir;
275
276 fn md() -> (TempDir, std::path::PathBuf) {
277 let d = TempDir::new().unwrap();
278 let md = d.path().join(crate::MKIT_DIR);
279 fs::create_dir_all(&md).unwrap();
280 (d, md)
281 }
282
283 fn h(byte: u8) -> Hash {
284 [byte; 32]
285 }
286
287 fn entry(ts: u64, byte: u8) -> RecoveryEntry {
288 RecoveryEntry {
289 timestamp: ts,
290 op: "amend".into(),
291 superseded: h(byte),
292 branch: "main".into(),
293 }
294 }
295
296 #[test]
297 fn record_then_read_roundtrips_in_order() {
298 let (_d, md) = md();
299 record(&md, &entry(100, 1)).unwrap();
300 record(&md, &entry(200, 2)).unwrap();
301 let all = read_all(&md).unwrap();
302 assert_eq!(all, vec![entry(100, 1), entry(200, 2)]);
303 assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(1), h(2)]));
304 }
305
306 #[test]
307 fn absent_log_is_empty_not_error() {
308 let (_d, md) = md();
309 assert!(read_all(&md).unwrap().is_empty());
310 assert!(roots(&md).unwrap().is_empty());
311 }
312
313 #[test]
314 fn record_rejects_tab_in_fields_and_skips_zero_hash() {
315 let (_d, md) = md();
316 let mut e = entry(1, 1);
317 e.branch = "a\tb".into();
318 assert!(matches!(
319 record(&md, &e),
320 Err(RecoveryError::InvalidField("branch"))
321 ));
322 record(&md, &entry(1, 0)).unwrap();
324 assert!(roots(&md).unwrap().is_empty());
325 }
326
327 #[test]
328 fn read_fails_closed_on_corrupt_line() {
329 let (_d, md) = md();
330 fs::write(md.join(RECOVERY_LOG), "not-a-valid-line\n").unwrap();
331 assert!(read_all(&md).is_err(), "corrupt log must fail closed");
332 }
333
334 #[test]
335 fn expire_drops_old_entries_outside_keep_last() {
336 let (_d, md) = md();
337 record(&md, &entry(10, 1)).unwrap();
342 record(&md, &entry(850, 2)).unwrap();
343 record(&md, &entry(900, 3)).unwrap();
344 let pruned = expire(
345 &md,
346 1000,
347 &RetentionPolicy {
348 grace_secs: 200,
349 keep_last: 1,
350 },
351 )
352 .unwrap();
353 assert_eq!(pruned, 1, "the ts=10 entry is old and not in last-1");
354 assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(2), h(3)]));
355 }
356
357 #[test]
358 fn would_expire_counts_without_mutating() {
359 let (_d, md) = md();
360 record(&md, &entry(10, 1)).unwrap();
361 record(&md, &entry(850, 2)).unwrap();
362 record(&md, &entry(900, 3)).unwrap();
363 let policy = RetentionPolicy {
364 grace_secs: 200,
365 keep_last: 1,
366 };
367 let before = read_all(&md).unwrap();
368 let n = would_expire(&md, 1000, &policy).unwrap();
369 assert_eq!(n, 1, "ts=10 is the only expirable entry");
370 assert_eq!(
372 read_all(&md).unwrap(),
373 before,
374 "would_expire must not mutate"
375 );
376 }
377
378 #[test]
379 fn expire_keep_last_retains_old_recent_entries() {
380 let (_d, md) = md();
381 record(&md, &entry(1, 1)).unwrap();
382 record(&md, &entry(2, 2)).unwrap();
383 let pruned = expire(
385 &md,
386 1_000_000,
387 &RetentionPolicy {
388 grace_secs: 0,
389 keep_last: 5,
390 },
391 )
392 .unwrap();
393 assert_eq!(pruned, 0);
394 assert_eq!(roots(&md).unwrap(), BTreeSet::from([h(1), h(2)]));
395 }
396}