use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use blake3::Hash;
use serde::{Deserialize, Serialize};
const JOURNAL_EXT: &str = ".atomwrite.journal.json";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "phase", rename_all = "snake_case")]
pub enum JournalEntry {
Started {
op_id: String,
op: JournalOp,
target: String,
checksum_before: Option<String>,
checksum_after: String,
pid: u32,
started_at_unix: u64,
},
Committed {
op_id: String,
committed_at_unix: u64,
},
Aborted {
op_id: String,
aborted_at_unix: u64,
reason: String,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum JournalOp {
Write,
Edit,
Replace,
Set,
}
pub fn journal_path(target: &Path) -> PathBuf {
let dir = target.parent().unwrap_or_else(|| Path::new("."));
let basename = target
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
dir.join(format!(".atomwrite.journal.{}{}", basename, JOURNAL_EXT))
}
pub fn generate_op_id() -> String {
let pid = std::process::id();
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let input = format!("{}-{}", pid, nanos);
blake3::hash(input.as_bytes())
.to_hex()
.as_str()
.chars()
.take(16)
.collect()
}
pub fn journal_started(
target: &Path,
op: JournalOp,
checksum_before: Option<Hash>,
checksum_after: Hash,
) -> Result<String> {
let op_id = generate_op_id();
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = JournalEntry::Started {
op_id: op_id.clone(),
op,
target: target.display().to_string(),
checksum_before: checksum_before.map(|h| h.to_hex().to_string()),
checksum_after: checksum_after.to_hex().to_string(),
pid: std::process::id(),
started_at_unix,
};
append_entry(target, &entry)?;
Ok(op_id)
}
pub fn journal_committed(target: &Path, op_id: &str) -> Result<()> {
let committed_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = JournalEntry::Committed {
op_id: op_id.to_owned(),
committed_at_unix,
};
append_entry(target, &entry)
}
pub fn journal_aborted(target: &Path, op_id: &str, reason: &str) -> Result<()> {
let aborted_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let entry = JournalEntry::Aborted {
op_id: op_id.to_owned(),
aborted_at_unix,
reason: reason.to_owned(),
};
append_entry(target, &entry)
}
fn append_entry(target: &Path, entry: &JournalEntry) -> Result<()> {
let path = journal_path(target);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create journal dir {}", parent.display()))?;
}
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.with_context(|| format!("failed to open journal {}", path.display()))?;
let json = serde_json::to_string(entry)
.with_context(|| format!("failed to serialize journal entry for {}", target.display()))?;
writeln!(file, "{}", json)
.with_context(|| format!("failed to write journal entry to {}", path.display()))?;
file.sync_data()
.with_context(|| format!("failed to fsync journal {}", path.display()))?;
Ok(())
}
#[derive(Debug, Clone, Serialize)]
#[allow(clippy::struct_field_names)]
pub struct OrphanJournalReport {
pub journal_path: String,
pub target: String,
pub op_id: String,
pub op: JournalOp,
pub expected_new_checksum: String,
pub checksum_before: Option<String>,
pub started_at_unix: u64,
pub pid: u32,
}
pub fn recover_orphan_journals(dir: &Path) -> Result<Vec<OrphanJournalReport>> {
let mut reports = Vec::new();
if !dir.exists() {
return Ok(reports);
}
let entries =
fs::read_dir(dir).with_context(|| format!("failed to read dir {}", dir.display()))?;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(name) = path.file_name().and_then(|s| s.to_str()) else {
continue;
};
if !name.starts_with(".atomwrite.journal.") || !name.ends_with(JOURNAL_EXT) {
continue;
}
match parse_orphan(&path) {
Ok(Some(report)) => reports.push(report),
Ok(None) => {
}
Err(e) => {
tracing::warn!(path = %path.display(), error = %e, "failed to parse journal");
}
}
}
Ok(reports)
}
fn parse_orphan(path: &Path) -> Result<Option<OrphanJournalReport>> {
let content = fs::read_to_string(path)
.with_context(|| format!("failed to read journal {}", path.display()))?;
let mut last_started: Option<JournalEntry> = None;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let entry: JournalEntry = serde_json::from_str(line)
.with_context(|| format!("invalid JSON in journal {}", path.display()))?;
match &entry {
JournalEntry::Started { .. } => last_started = Some(entry),
JournalEntry::Committed { .. } | JournalEntry::Aborted { .. } => {
last_started = None;
}
}
}
let Some(last) = last_started else {
return Ok(None);
};
let JournalEntry::Started {
op_id,
op,
target,
checksum_before,
checksum_after,
pid,
started_at_unix,
} = last
else {
return Ok(None);
};
Ok(Some(OrphanJournalReport {
journal_path: path.display().to_string(),
target,
op_id,
op,
expected_new_checksum: checksum_after,
checksum_before,
started_at_unix,
pid,
}))
}
#[cfg(test)]
pub(crate) fn read_entries(path: &Path) -> Result<Vec<JournalEntry>> {
let content = fs::read_to_string(path)
.with_context(|| format!("failed to read journal {}", path.display()))?;
content
.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| serde_json::from_str(l).context("invalid JSON"))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn journal_path_appends_atomwrite_journal_json() {
let target = Path::new("/tmp/foo.txt");
let jp = journal_path(target);
assert!(jp.ends_with(".atomwrite.journal.foo.txt.atomwrite.journal.json"));
}
#[test]
fn journal_started_creates_sidecar_and_records_op_id() {
let tmp = TempDir::new().unwrap();
let target = tmp.path().join("file.txt");
let before = blake3::hash(b"old");
let after = blake3::hash(b"new");
let op_id = journal_started(&target, JournalOp::Write, Some(before), after).unwrap();
assert_eq!(op_id.len(), 16);
let jp = journal_path(&target);
assert!(jp.exists());
let entries = read_entries(&jp).unwrap();
assert_eq!(entries.len(), 1);
let JournalEntry::Started {
op_id: recorded_id,
op,
target: t,
checksum_before: cb,
checksum_after: ca,
pid,
started_at_unix,
} = &entries[0]
else {
panic!("expected Started entry");
};
assert_eq!(recorded_id, &op_id);
assert_eq!(*op, JournalOp::Write);
assert_eq!(t, &target.display().to_string());
assert_eq!(cb.as_deref(), Some(before.to_hex().to_string().as_str()));
assert_eq!(ca, &after.to_hex().to_string());
assert_eq!(*pid, std::process::id());
assert!(*started_at_unix > 0);
}
#[test]
fn journal_committed_after_started_does_not_orphan() {
let tmp = TempDir::new().unwrap();
let target = tmp.path().join("file.txt");
let op_id = journal_started(&target, JournalOp::Edit, None, blake3::hash(b"x")).unwrap();
journal_committed(&target, &op_id).unwrap();
let reports = recover_orphan_journals(tmp.path()).unwrap();
assert!(
reports.is_empty(),
"expected zero orphans, got {:?}",
reports
);
}
#[test]
fn orphan_detected_when_started_without_committed() {
let tmp = TempDir::new().unwrap();
let target = tmp.path().join("file.txt");
let op_id = journal_started(
&target,
JournalOp::Write,
Some(blake3::hash(b"old")),
blake3::hash(b"new"),
)
.unwrap();
let reports = recover_orphan_journals(tmp.path()).unwrap();
assert_eq!(reports.len(), 1);
let r = &reports[0];
assert_eq!(r.op_id, op_id);
assert_eq!(r.op, JournalOp::Write);
assert_eq!(r.target, target.display().to_string());
assert!(r.checksum_before.is_some());
assert_eq!(r.pid, std::process::id());
}
#[test]
fn journal_aborted_clears_orphan() {
let tmp = TempDir::new().unwrap();
let target = tmp.path().join("file.txt");
let op_id = journal_started(&target, JournalOp::Replace, None, blake3::hash(b"x")).unwrap();
journal_aborted(&target, &op_id, "caller cancelled").unwrap();
let reports = recover_orphan_journals(tmp.path()).unwrap();
assert!(reports.is_empty());
}
#[test]
fn generate_op_id_is_16_hex_chars_and_unique() {
let a = generate_op_id();
let b = generate_op_id();
assert_eq!(a.len(), 16);
assert_eq!(b.len(), 16);
assert_ne!(a, b);
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn recover_on_empty_dir_returns_empty() {
let tmp = TempDir::new().unwrap();
let reports = recover_orphan_journals(tmp.path()).unwrap();
assert!(reports.is_empty());
}
#[test]
fn recover_on_missing_dir_returns_empty() {
let missing = std::env::temp_dir().join("atomwrite-test-missing-dir-xyz");
let _ = fs::remove_dir_all(&missing);
let reports = recover_orphan_journals(&missing).unwrap();
assert!(reports.is_empty());
}
}