use std::path::PathBuf;
use repo::Repository;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ActiveTransaction {
pub transaction_id: String,
pub repo_path: String,
pub thread: String,
pub message: String,
pub state: String,
pub started_at_secs: i64,
pub started_by_email: String,
pub base_state: String,
#[serde(default)]
pub buffered_ops: Vec<String>,
#[serde(default)]
pub aborted_reason: Option<String>,
}
pub fn active_transactions(repo: &Repository) -> Vec<ActiveTransaction> {
let dir = sentinels_dir(repo);
let mut out = Vec::new();
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => return out,
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("toml") {
continue;
}
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(_) => continue,
};
let txn: ActiveTransaction = match toml::from_str(&String::from_utf8_lossy(&bytes)) {
Ok(t) => t,
Err(_) => continue,
};
if txn.state == "active" {
out.push(txn);
}
}
out
}
pub fn active_for_thread(repo: &Repository, thread_name: &str) -> Option<ActiveTransaction> {
active_transactions(repo)
.into_iter()
.find(|t| t.thread == thread_name)
}
fn sentinels_dir(repo: &Repository) -> PathBuf {
repo.heddle_dir().join("state").join("transactions")
}
pub fn append_op_to_active_for_thread(
repo: &Repository,
thread_name: &str,
verb_name: &str,
) -> Vec<String> {
let dir = sentinels_dir(repo);
let mut updated = Vec::new();
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => return updated,
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("toml") {
continue;
}
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(_) => continue,
};
let mut txn: ActiveTransaction = match toml::from_str(&String::from_utf8_lossy(&bytes)) {
Ok(t) => t,
Err(_) => continue,
};
if txn.state != "active" || txn.thread != thread_name {
continue;
}
txn.buffered_ops.push(verb_name.to_string());
let serialized = match toml::to_string_pretty(&txn) {
Ok(s) => s,
Err(err) => {
tracing::warn!(error = %err, txn = %txn.transaction_id,
"transaction-sentinel: failed to serialize after append");
continue;
}
};
if let Err(err) = objects::fs_atomic::write_file_atomic(&path, serialized.as_bytes()) {
tracing::warn!(error = %err, txn = %txn.transaction_id,
"transaction-sentinel: failed to persist appended op");
continue;
}
updated.push(txn.transaction_id);
}
updated
}
pub fn buffered_for_thread(repo: &Repository, thread_name: &str, verb_name: &str) -> bool {
!append_op_to_active_for_thread(repo, thread_name, verb_name).is_empty()
}
#[cfg(test)]
mod tests {
use std::fs;
use tempfile::TempDir;
use super::*;
fn write_sentinel(dir: &std::path::Path, id: &str, thread: &str, state: &str) {
fs::create_dir_all(dir).unwrap();
let body = format!(
r#"transaction_id = "{id}"
repo_path = ""
thread = "{thread}"
message = ""
state = "{state}"
started_at_secs = 0
started_by_email = ""
base_state = ""
buffered_ops = []
"#
);
fs::write(dir.join(format!("{id}.toml")), body).unwrap();
}
fn fresh_repo() -> (TempDir, Repository) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
(temp, repo)
}
#[test]
fn returns_empty_when_no_sentinels() {
let (_t, repo) = fresh_repo();
assert!(active_transactions(&repo).is_empty());
}
#[test]
fn returns_only_active_sentinels() {
let (_t, repo) = fresh_repo();
let dir = repo.heddle_dir().join("state").join("transactions");
write_sentinel(&dir, "tx-active", "main", "active");
write_sentinel(&dir, "tx-committed", "main", "committed");
write_sentinel(&dir, "tx-aborted", "feature", "aborted");
let active = active_transactions(&repo);
assert_eq!(active.len(), 1);
assert_eq!(active[0].transaction_id, "tx-active");
}
#[test]
fn active_for_thread_filters_by_thread() {
let (_t, repo) = fresh_repo();
let dir = repo.heddle_dir().join("state").join("transactions");
write_sentinel(&dir, "tx-main", "main", "active");
write_sentinel(&dir, "tx-feat", "feature", "active");
assert_eq!(
active_for_thread(&repo, "main").unwrap().transaction_id,
"tx-main"
);
assert_eq!(
active_for_thread(&repo, "feature").unwrap().transaction_id,
"tx-feat"
);
assert!(active_for_thread(&repo, "unrelated").is_none());
}
#[test]
fn append_op_to_active_for_thread_writes_only_matching_active_sentinels() {
let (_t, repo) = fresh_repo();
let dir = repo.heddle_dir().join("state").join("transactions");
write_sentinel(&dir, "tx-main", "main", "active");
write_sentinel(&dir, "tx-other-thread", "feature", "active");
write_sentinel(&dir, "tx-main-aborted", "main", "aborted");
let updated = append_op_to_active_for_thread(&repo, "main", "capture");
assert_eq!(updated, vec!["tx-main".to_string()]);
let main_after = active_for_thread(&repo, "main").unwrap();
assert_eq!(main_after.buffered_ops, vec!["capture".to_string()]);
let feature_after = active_for_thread(&repo, "feature").unwrap();
assert!(feature_after.buffered_ops.is_empty());
}
#[cfg(unix)]
#[test]
fn append_publishes_fresh_inode_proving_atomic_rename() {
use std::os::unix::fs::MetadataExt;
let (_t, repo) = fresh_repo();
let dir = repo.heddle_dir().join("state").join("transactions");
write_sentinel(&dir, "tx-main", "main", "active");
let path = dir.join("tx-main.toml");
let ino_before = fs::metadata(&path).unwrap().ino();
let updated = append_op_to_active_for_thread(&repo, "main", "capture");
assert_eq!(updated, vec!["tx-main".to_string()]);
let ino_after = fs::metadata(&path).unwrap().ino();
assert_ne!(
ino_before, ino_after,
"sentinel must be published via atomic rename (fresh inode), \
not rewritten in place"
);
let stray: Vec<_> = fs::read_dir(&dir)
.unwrap()
.flatten()
.map(|e| e.file_name())
.filter(|n| n != "tx-main.toml")
.collect();
assert!(stray.is_empty(), "atomic write left orphans: {stray:?}");
}
#[test]
fn append_appends_in_order() {
let (_t, repo) = fresh_repo();
let dir = repo.heddle_dir().join("state").join("transactions");
write_sentinel(&dir, "tx-main", "main", "active");
append_op_to_active_for_thread(&repo, "main", "capture");
append_op_to_active_for_thread(&repo, "main", "merge");
append_op_to_active_for_thread(&repo, "main", "marker");
let txn = active_for_thread(&repo, "main").unwrap();
assert_eq!(
txn.buffered_ops,
vec![
"capture".to_string(),
"merge".to_string(),
"marker".to_string()
]
);
}
}