use std::path::PathBuf;
use objects::fs_atomic::write_file_atomic;
use oplog::OpRecord;
use repo::Repository;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ReplaySentinel {
transaction_id: String,
repo_path: String,
thread: String,
message: String,
state: String,
started_at_secs: i64,
started_by_email: String,
base_state: String,
#[serde(default)]
buffered_ops: Vec<String>,
#[serde(default)]
aborted_reason: Option<String>,
}
const STATE_ACTIVE: &str = "active";
const STATE_ABORTED: &str = "aborted";
pub const REPLAY_RECOVERY_REASON: &str = "recovered from crash on startup";
#[derive(Debug, Default, Clone)]
pub struct ReplayReport {
pub recovered_transaction_ids: Vec<String>,
pub orphan_temp_files_removed: usize,
pub failed_orphan_deletes: Vec<PathBuf>,
pub unparseable_sentinels: Vec<PathBuf>,
pub failed_sentinel_writes: Vec<PathBuf>,
pub failed_oplog_appends: Vec<String>,
pub scan_error: Option<String>,
pub unreadable_entries: usize,
}
impl ReplayReport {
pub fn is_clean(&self) -> bool {
self.recovered_transaction_ids.is_empty()
&& self.orphan_temp_files_removed == 0
&& self.failed_orphan_deletes.is_empty()
&& self.unparseable_sentinels.is_empty()
&& self.failed_sentinel_writes.is_empty()
&& self.failed_oplog_appends.is_empty()
&& self.scan_error.is_none()
&& self.unreadable_entries == 0
}
pub fn has_hard_failures(&self) -> bool {
self.scan_error.is_some() || !self.failed_oplog_appends.is_empty()
}
pub fn has_recoverable_failures(&self) -> bool {
!self.failed_sentinel_writes.is_empty()
|| !self.failed_orphan_deletes.is_empty()
|| !self.unparseable_sentinels.is_empty()
|| self.unreadable_entries > 0
}
}
fn is_orphan_temp_name(name: &str) -> bool {
name.starts_with('.') && name.contains(".tmp-")
}
pub fn replay_active_transactions(repo: &Repository) -> ReplayReport {
let mut report = ReplayReport::default();
let dir = repo.heddle_dir().join("state").join("transactions");
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return report,
Err(err) => {
tracing::warn!(error = %err, dir = %dir.display(),
"transaction-replay: failed to read sentinel directory");
report.scan_error = Some(err.to_string());
return report;
}
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(err) => {
tracing::warn!(error = %err, dir = %dir.display(),
"transaction-replay: failed to read directory entry");
report.unreadable_entries += 1;
continue;
}
};
let path = entry.path();
let name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => continue,
};
if is_orphan_temp_name(name) {
match std::fs::remove_file(&path) {
Ok(()) => report.orphan_temp_files_removed += 1,
Err(err) => {
tracing::warn!(error = %err, path = %path.display(),
"transaction-replay: failed to remove orphan temp file");
report.failed_orphan_deletes.push(path.clone());
}
}
continue;
}
if path.extension().and_then(|e| e.to_str()) != Some("toml") {
continue;
}
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(_) => {
report.unparseable_sentinels.push(path.clone());
continue;
}
};
let text = match std::str::from_utf8(&bytes) {
Ok(t) => t,
Err(_) => {
report.unparseable_sentinels.push(path.clone());
continue;
}
};
let mut sentinel: ReplaySentinel = match toml::from_str(text) {
Ok(s) => s,
Err(_) => {
report.unparseable_sentinels.push(path.clone());
continue;
}
};
if sentinel.state != STATE_ACTIVE {
continue;
}
let txn_id = sentinel.transaction_id.clone();
sentinel.state = STATE_ABORTED.to_string();
sentinel.aborted_reason = Some(REPLAY_RECOVERY_REASON.to_string());
sentinel.buffered_ops.clear();
let serialized = match toml::to_string_pretty(&sentinel) {
Ok(s) => s,
Err(err) => {
tracing::warn!(error = %err, txn = %txn_id,
"transaction-replay: failed to serialize recovered sentinel");
report.failed_sentinel_writes.push(path.clone());
continue;
}
};
if let Err(err) = write_file_atomic(&path, serialized.as_bytes()) {
tracing::warn!(error = %err, txn = %txn_id,
"transaction-replay: failed to persist recovered sentinel");
report.failed_sentinel_writes.push(path.clone());
continue;
}
if let Err(err) = repo.oplog().record_batch(vec![OpRecord::TransactionAbort {
transaction_id: txn_id.clone(),
reason: REPLAY_RECOVERY_REASON.to_string(),
}]) {
tracing::warn!(error = %err, txn = %txn_id,
"transaction-replay: failed to record TransactionAbort");
report.failed_oplog_appends.push(txn_id.clone());
}
report.recovered_transaction_ids.push(txn_id);
}
report
}
#[cfg(test)]
mod tests {
use std::{fs, path::Path};
use oplog::OpLogBackend;
use proptest::prelude::*;
use repo::Repository;
use tempfile::TempDir;
use super::*;
fn write_sentinel_raw(dir: &Path, id: &str, state: &str, buffered_ops: &[&str]) {
fs::create_dir_all(dir).unwrap();
let buffered = if buffered_ops.is_empty() {
"[]".to_string()
} else {
let inner = buffered_ops
.iter()
.map(|op| format!("\"{op}\""))
.collect::<Vec<_>>()
.join(", ");
format!("[{inner}]")
};
let body = format!(
r#"transaction_id = "{id}"
repo_path = ""
thread = ""
message = ""
state = "{state}"
started_at_secs = 0
started_by_email = ""
base_state = ""
buffered_ops = {buffered}
"#
);
fs::write(dir.join(format!("{id}.toml")), body).unwrap();
}
fn fresh_repo() -> (TempDir, Repository) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
(temp, repo)
}
fn sentinel_dir(repo: &Repository) -> PathBuf {
repo.heddle_dir().join("state").join("transactions")
}
fn read_state(dir: &Path, id: &str) -> String {
let body = fs::read_to_string(dir.join(format!("{id}.toml"))).unwrap();
let sentinel: ReplaySentinel = toml::from_str(&body).unwrap();
sentinel.state
}
fn read_reason(dir: &Path, id: &str) -> Option<String> {
let body = fs::read_to_string(dir.join(format!("{id}.toml"))).unwrap();
let sentinel: ReplaySentinel = toml::from_str(&body).unwrap();
sentinel.aborted_reason
}
fn count_orphan_tmps(dir: &Path) -> usize {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return 0,
};
entries
.flatten()
.filter(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str())
.map(is_orphan_temp_name)
.unwrap_or(false)
})
.count()
}
#[test]
#[serial_test::serial(process_global)]
fn empty_directory_is_a_no_op() {
let (_t, repo) = fresh_repo();
let report = replay_active_transactions(&repo);
assert!(report.is_clean());
}
#[test]
#[serial_test::serial(process_global)]
fn aborts_active_sentinel_from_prior_run() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
write_sentinel_raw(&dir, "tx-stuck", "active", &["capture", "merge"]);
let report = replay_active_transactions(&repo);
assert_eq!(
report.recovered_transaction_ids,
vec!["tx-stuck".to_string()]
);
assert_eq!(report.orphan_temp_files_removed, 0);
assert!(report.unparseable_sentinels.is_empty());
assert_eq!(read_state(&dir, "tx-stuck"), STATE_ABORTED);
assert_eq!(
read_reason(&dir, "tx-stuck").as_deref(),
Some(REPLAY_RECOVERY_REASON)
);
let tail = repo.oplog().recent(64).unwrap();
let last = tail.last().expect("oplog non-empty");
match &last.operation {
OpRecord::TransactionAbort {
transaction_id,
reason,
} => {
assert_eq!(transaction_id, "tx-stuck");
assert_eq!(reason, REPLAY_RECOVERY_REASON);
}
other => panic!("expected TransactionAbort, got {other:?}"),
}
}
#[test]
#[serial_test::serial(process_global)]
fn leaves_terminal_sentinels_alone() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
write_sentinel_raw(&dir, "tx-committed", "committed", &[]);
write_sentinel_raw(&dir, "tx-aborted", "aborted", &[]);
let before_oplog = repo.oplog().recent(64).unwrap().len();
let report = replay_active_transactions(&repo);
assert!(report.is_clean());
assert_eq!(read_state(&dir, "tx-committed"), "committed");
assert_eq!(read_state(&dir, "tx-aborted"), "aborted");
assert_eq!(repo.oplog().recent(64).unwrap().len(), before_oplog);
}
#[test]
#[serial_test::serial(process_global)]
fn removes_orphan_temp_files() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
fs::create_dir_all(&dir).unwrap();
fs::write(dir.join(".tx-a.toml.tmp-100-200-1"), b"partial").unwrap();
fs::write(dir.join(".tx-b.toml.tmp-100-200-2"), b"partial").unwrap();
fs::write(dir.join(".tx-c.toml.tmp-999-1234-5"), b"").unwrap();
let report = replay_active_transactions(&repo);
assert_eq!(report.orphan_temp_files_removed, 3);
assert_eq!(count_orphan_tmps(&dir), 0);
}
#[test]
#[serial_test::serial(process_global)]
fn is_idempotent() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
write_sentinel_raw(&dir, "tx-1", "active", &["capture"]);
fs::write(dir.join(".tx-1.toml.tmp-1-1-1"), b"x").unwrap();
let first = replay_active_transactions(&repo);
assert_eq!(first.recovered_transaction_ids.len(), 1);
assert_eq!(first.orphan_temp_files_removed, 1);
let second = replay_active_transactions(&repo);
assert!(
second.is_clean(),
"second pass should be a no-op (was {second:?})"
);
}
#[test]
#[serial_test::serial(process_global)]
fn scan_error_set_when_sentinel_dir_is_not_a_directory() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
fs::create_dir_all(dir.parent().unwrap()).unwrap();
fs::write(&dir, b"not a directory").unwrap();
let report = replay_active_transactions(&repo);
assert!(
report.scan_error.is_some(),
"expected scan_error to be set, got {report:?}"
);
assert!(!report.is_clean());
assert!(report.recovered_transaction_ids.is_empty());
}
#[cfg(unix)]
#[test]
#[serial_test::serial(process_global)]
fn failed_sentinel_write_surfaced_on_readonly_dir() {
use std::os::unix::fs::PermissionsExt;
if unsafe { libc::getuid() } == 0 {
eprintln!(
"skipping failed_sentinel_write_surfaced_on_readonly_dir: \
running as root, DAC checks bypassed"
);
return;
}
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
write_sentinel_raw(&dir, "tx-ro", "active", &[]);
let path = dir.join("tx-ro.toml");
let mut perms = fs::metadata(&dir).unwrap().permissions();
let original = perms.mode();
perms.set_mode(0o555);
fs::set_permissions(&dir, perms).unwrap();
let report = replay_active_transactions(&repo);
let mut restore = fs::metadata(&dir).unwrap().permissions();
restore.set_mode(original);
fs::set_permissions(&dir, restore).unwrap();
assert_eq!(report.failed_sentinel_writes, vec![path]);
assert!(report.recovered_transaction_ids.is_empty());
assert!(!report.is_clean());
assert_eq!(read_state(&dir, "tx-ro"), STATE_ACTIVE);
}
#[cfg(unix)]
#[test]
#[serial_test::serial(process_global)]
fn failed_orphan_delete_surfaced_on_readonly_dir() {
use std::os::unix::fs::PermissionsExt;
if unsafe { libc::getuid() } == 0 {
eprintln!(
"skipping failed_orphan_delete_surfaced_on_readonly_dir: \
running as root, DAC checks bypassed"
);
return;
}
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
fs::create_dir_all(&dir).unwrap();
let orphan = dir.join(".tx-stuck.toml.tmp-1-2-3");
fs::write(&orphan, b"partial").unwrap();
let mut perms = fs::metadata(&dir).unwrap().permissions();
let original = perms.mode();
perms.set_mode(0o555);
fs::set_permissions(&dir, perms).unwrap();
let report = replay_active_transactions(&repo);
let mut restore = fs::metadata(&dir).unwrap().permissions();
restore.set_mode(original);
fs::set_permissions(&dir, restore).unwrap();
assert_eq!(report.failed_orphan_deletes, vec![orphan.clone()]);
assert_eq!(report.orphan_temp_files_removed, 0);
assert!(!report.is_clean());
assert!(report.has_recoverable_failures());
assert!(!report.has_hard_failures());
assert!(orphan.exists());
}
#[test]
#[serial_test::serial(process_global)]
fn leaves_unparseable_sentinels_in_place() {
let (_t, repo) = fresh_repo();
let dir = sentinel_dir(&repo);
fs::create_dir_all(&dir).unwrap();
let bad = dir.join("tx-garbage.toml");
fs::write(&bad, b"not = valid toml = oops").unwrap();
let report = replay_active_transactions(&repo);
assert!(report.recovered_transaction_ids.is_empty());
assert_eq!(report.unparseable_sentinels, vec![bad.clone()]);
assert!(bad.exists());
}
#[derive(Debug, Clone)]
enum CrashKind {
DuringTmpWrite,
BeforeRename,
AfterRename,
NoCrash,
}
fn arb_crash_kind() -> impl Strategy<Value = CrashKind> {
prop_oneof![
Just(CrashKind::DuringTmpWrite),
Just(CrashKind::BeforeRename),
Just(CrashKind::AfterRename),
Just(CrashKind::NoCrash),
]
}
fn arb_ops() -> impl Strategy<Value = Vec<String>> {
prop::collection::vec("[a-z]{1,8}", 0..6)
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 64,
..ProptestConfig::default()
})]
#[test]
#[serial_test::serial(process_global)]
fn crash_matrix_replay_reaches_consistent_terminal_state(
crash in arb_crash_kind(),
ops in arb_ops(),
) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let dir = sentinel_dir(&repo);
fs::create_dir_all(&dir).unwrap();
let txn_id = "tx-crashed";
let op_refs: Vec<&str> = ops.iter().map(|s| s.as_str()).collect();
match crash {
CrashKind::DuringTmpWrite => {
write_sentinel_raw(&dir, txn_id, "active", &op_refs);
fs::write(
dir.join(format!(".{txn_id}.toml.tmp-1-2-3")),
b"partial bytes\n",
)
.unwrap();
}
CrashKind::BeforeRename => {
write_sentinel_raw(&dir, txn_id, "active", &op_refs);
let pretend_new = r#"transaction_id = "tx-crashed"
repo_path = ""
thread = ""
message = ""
state = "committed"
started_at_secs = 0
started_by_email = ""
base_state = ""
buffered_ops = []
"#;
fs::write(
dir.join(format!(".{txn_id}.toml.tmp-4-5-6")),
pretend_new,
)
.unwrap();
}
CrashKind::AfterRename => {
write_sentinel_raw(&dir, txn_id, "active", &op_refs);
}
CrashKind::NoCrash => {
write_sentinel_raw(&dir, txn_id, "active", &op_refs);
}
}
let report = replay_active_transactions(&repo);
prop_assert_eq!(
report.recovered_transaction_ids.clone(),
vec![txn_id.to_string()]
);
prop_assert_eq!(count_orphan_tmps(&dir), 0);
prop_assert_eq!(read_state(&dir, txn_id), STATE_ABORTED);
let recovered_reason = read_reason(&dir, txn_id);
prop_assert_eq!(
recovered_reason.as_deref(),
Some(REPLAY_RECOVERY_REASON)
);
let tail = repo.oplog().recent(64).unwrap();
let last = tail.last().expect("oplog non-empty after recovery");
match &last.operation {
OpRecord::TransactionAbort {
transaction_id,
reason,
} => {
prop_assert_eq!(transaction_id, txn_id);
prop_assert_eq!(reason, REPLAY_RECOVERY_REASON);
}
_ => prop_assert!(false, "expected TransactionAbort at oplog tail"),
}
let again = replay_active_transactions(&repo);
prop_assert!(again.is_clean(), "second pass not clean: {:?}", again);
}
}
}