use std::{
collections::BTreeSet,
fs,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{Result, anyhow};
use objects::{
error::{HeddleError, Result as HeddleResult},
lock::{RepoLock, WriteLockGuard},
object::{ChangeId, ContentHash, MarkerName, ThreadName},
};
use oplog::{IsolationKey, OpBatch, OpEntry, OpLogBackend, OpRecord, isolation_keys_for_record};
use refs::Head;
use repo::{
CommitGraphIndex, Repository, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
ThreadState, VisibilitySidecarRestore,
atomic::{AtomicMutation, DeferredMutation, StagedCommit, Tx},
refresh_thread_freshness,
};
use sley::{
DeleteRef, FullName, GitObjectType, GitTime, IndexWriteOptions, ObjectId, RefPrecondition,
ReferenceTarget, Repository as SleyRepository, Signature,
plumbing::sley_core::ByteString as GitByteString,
};
use super::{advice::RecoveryAdvice, thread_cmd::thread_not_found_advice};
use crate::bridge::git_core::{open_repo as open_git_repo, set_reference};
pub(super) fn preflight_undo_batches(repo: &Repository, batches: &[OpBatch]) -> Result<()> {
if !batches_have_git_checkpoint(batches) {
return Ok(());
}
let mut simulated_git_head = current_git_head(repo)?;
for batch in batches {
for entry in batch.entries.iter().rev() {
if let OpRecord::GitCheckpoint {
new_git_oid,
previous_git_oid,
..
} = &entry.operation
{
ensure_simulated_git_head_is(
repo,
&simulated_git_head,
new_git_oid,
"undo git checkpoint",
)?;
if let Some(previous) = previous_git_oid {
simulated_git_head = previous.clone();
}
}
}
}
ensure_git_worktree_clean(repo, "undo git checkpoint")?;
Ok(())
}
pub(super) fn preflight_redo_batches(repo: &Repository, batches: &[OpBatch]) -> Result<()> {
if !batches_have_git_checkpoint(batches) {
return Ok(());
}
let mut simulated_git_head = current_git_head(repo)?;
for batch in batches {
for entry in &batch.entries {
if let OpRecord::GitCheckpoint {
previous_git_oid,
new_git_oid,
..
} = &entry.operation
{
if let Some(previous) = previous_git_oid {
ensure_simulated_git_head_is(
repo,
&simulated_git_head,
previous,
"redo git checkpoint",
)?;
}
simulated_git_head = new_git_oid.clone();
}
}
}
ensure_git_worktree_clean(repo, "redo git checkpoint")?;
Ok(())
}
fn batches_have_git_checkpoint(batches: &[OpBatch]) -> bool {
batches.iter().any(|batch| {
batch
.entries
.iter()
.any(|entry| matches!(&entry.operation, OpRecord::GitCheckpoint { .. }))
})
}
fn current_git_head(repo: &Repository) -> Result<String> {
let git = git_checkout_repo(repo)?;
git.head()
.map_err(|error| anyhow!("failed to inspect Git HEAD: {error}"))?
.oid
.map(|id| id.to_string())
.ok_or_else(|| anyhow!("failed to inspect Git HEAD: HEAD is unborn"))
}
fn ensure_simulated_git_head_is(
repo: &Repository,
actual: &str,
expected: &str,
action: &str,
) -> Result<()> {
if actual == expected {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::git_head_mismatch(
action,
actual,
expected,
repo.git_overlay_current_branch()?
.unwrap_or_else(|| "HEAD".to_string()),
git_dirty_paths(repo),
)))
}
#[cfg(test)]
thread_local! {
static ENTRY_WRITE_FAULT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
static NONATOMIC_FORWARD_FAULT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
}
#[cfg(test)]
fn with_entry_write_fault<T>(skip_then_fail_at: usize, body: impl FnOnce() -> T) -> T {
ENTRY_WRITE_FAULT.with(|f| f.set(Some(skip_then_fail_at)));
let out = body();
ENTRY_WRITE_FAULT.with(|f| f.set(None));
out
}
#[cfg(test)]
fn with_nonatomic_forward_fault<T>(skip_then_fail_at: usize, body: impl FnOnce() -> T) -> T {
NONATOMIC_FORWARD_FAULT.with(|f| f.set(Some(skip_then_fail_at)));
let out = body();
NONATOMIC_FORWARD_FAULT.with(|f| f.set(None));
out
}
#[cfg(test)]
fn fault_counter_trips(
cell: &'static std::thread::LocalKey<std::cell::Cell<Option<usize>>>,
) -> bool {
cell.with(|f| match f.get() {
Some(0) => {
f.set(None);
true
}
Some(n) => {
f.set(Some(n - 1));
false
}
None => false,
})
}
fn restore_head(repo: &Repository, state: Option<ChangeId>, head_ref: &Head) -> HeddleResult<()> {
if let Some(state) = state {
repo.goto_without_record_discard_local(&state)?;
}
repo.refs().write_head(head_ref)
}
struct EntrySteps<'tx, 'a> {
tx: &'tx mut Tx<'a>,
}
impl<'a> EntrySteps<'_, 'a> {
fn new<'tx>(tx: &'tx mut Tx<'a>) -> EntrySteps<'tx, 'a> {
EntrySteps { tx }
}
fn repo(&self) -> &'a Repository {
self.tx.repo()
}
fn step<T>(
&mut self,
forward: impl FnOnce() -> HeddleResult<T>,
inverse: impl FnOnce() -> HeddleResult<()> + 'a,
) -> HeddleResult<T> {
#[cfg(test)]
if fault_counter_trips(&ENTRY_WRITE_FAULT) {
return Err(HeddleError::Conflict(
"injected mid-entry write fault".to_string(),
));
}
self.tx.step(forward, inverse)
}
fn step_nonatomic<T, S: 'a>(
&mut self,
capture: impl FnOnce() -> HeddleResult<S>,
restore: impl FnOnce(S) -> HeddleResult<()> + 'a,
forward: impl FnOnce() -> HeddleResult<T>,
) -> HeddleResult<T> {
#[cfg(test)]
if fault_counter_trips(&ENTRY_WRITE_FAULT) {
return Err(HeddleError::Conflict(
"injected mid-entry write fault".to_string(),
));
}
#[cfg(test)]
if fault_counter_trips(&NONATOMIC_FORWARD_FAULT) {
return self.tx.step_nonatomic(capture, restore, move || {
forward()?;
Err(HeddleError::Conflict(
"injected non-atomic forward fault".to_string(),
))
});
}
self.tx.step_nonatomic(capture, restore, forward)
}
fn goto(&mut self, target: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
self.step_nonatomic(
move || Ok((repo.head()?, repo.head_ref()?)),
move |(prev_state, prev_head_ref)| restore_head(repo, prev_state, &prev_head_ref),
move || repo.goto_without_record_discard_local(&target),
)
}
fn restore_active_thread_worktree(&mut self, name: &str, target: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let head_ref = repo.head_ref()?;
let Head::Attached { thread } = &head_ref else {
return Ok(());
};
if thread != name {
return Ok(());
}
self.step_nonatomic(
move || Ok((repo.head()?, repo.head_ref()?)),
move |(prev_state, prev_head_ref)| restore_head(repo, prev_state, &prev_head_ref),
move || repo.fast_forward_attached_without_record_discard_local(&target),
)
}
fn write_head(&mut self, head: Head) -> HeddleResult<()> {
let repo = self.repo();
let prev = repo.head_ref()?;
self.step(
move || repo.refs().write_head(&head),
move || repo.refs().write_head(&prev),
)
}
fn set_thread(&mut self, name: &str, state: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = ThreadName::new(name);
let prev = repo.refs().get_thread(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().set_thread(&forward_name, &state),
move || {
let name = ThreadName::new(restore_name);
match prev {
Some(prev) => repo.refs().set_thread(&name, &prev),
None => repo.refs().delete_thread(&name).map(|_| ()),
}
},
)
}
fn delete_thread(&mut self, name: &str) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = ThreadName::new(name);
let prev = repo.refs().get_thread(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().delete_thread(&forward_name).map(|_| ()),
move || match prev {
Some(prev) => repo
.refs()
.set_thread(&ThreadName::new(restore_name), &prev),
None => Ok(()),
},
)
}
fn create_marker(&mut self, name: &str, state: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = MarkerName::new(name);
let prev = repo.refs().get_marker(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().create_marker(&forward_name, &state),
move || {
let name = MarkerName::new(restore_name);
match prev {
Some(prev) => repo.refs().create_marker(&name, &prev),
None => repo.refs().delete_marker(&name).map(|_| ()),
}
},
)
}
fn delete_marker(&mut self, name: &str) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = MarkerName::new(name);
let prev = repo.refs().get_marker(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().delete_marker(&forward_name).map(|_| ()),
move || match prev {
Some(prev) => repo
.refs()
.create_marker(&MarkerName::new(restore_name), &prev),
None => Ok(()),
},
)
}
fn converge_thread_records(&mut self, name: &str, desired: Vec<Thread>) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = name.to_string();
let restore_name = name.to_string();
let prior = ThreadManager::new(repo.heddle_dir()).snapshot_records(name)?;
self.step_nonatomic(
|| Ok(prior),
move |prior| {
ThreadManager::new(repo.heddle_dir()).converge_records(&restore_name, &prior)
},
move || ThreadManager::new(repo.heddle_dir()).converge_records(&forward_name, &desired),
)
}
fn save_thread_record(&mut self, record: Thread) -> HeddleResult<()> {
let name = record.thread.clone();
self.converge_thread_records(&name, vec![record])
}
fn restore_thread_record(
&mut self,
name: &str,
bytes: &[u8],
op_label: &'static str,
) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = name.to_string();
let restore_name = name.to_string();
let warn_name = name.to_string();
let bytes = bytes.to_vec();
let prior = ThreadManager::new(repo.heddle_dir()).snapshot_records(name)?;
self.step_nonatomic(
|| Ok(prior),
move |prior| {
ThreadManager::new(repo.heddle_dir()).converge_records(&restore_name, &prior)
},
move || {
let manager = ThreadManager::new(repo.heddle_dir());
match manager.decode_thread_record_snapshot(&bytes) {
Ok(restored) => {
manager.converge_records(&forward_name, std::slice::from_ref(&restored))
}
Err(e) => {
eprintln!(
"warning: replay of `{}` for '{}' restored the ref but failed \
to decode the ThreadManager record snapshot ({}). Record-backed \
commands (`thread cd`, delegate) may degrade on this thread — run \
`heddle thread start {}` to recreate the record.",
op_label, warn_name, e, warn_name
);
Ok(())
}
}
},
)
}
fn restore_thread_record_set(
&mut self,
name: &str,
snapshots: &[Vec<u8>],
op_label: &'static str,
) -> HeddleResult<()> {
let manager = ThreadManager::new(self.repo().heddle_dir());
let mut restored = Vec::with_capacity(snapshots.len());
for bytes in snapshots {
match manager.decode_thread_record_snapshot(bytes) {
Ok(record) => restored.push(record),
Err(e) => {
eprintln!(
"warning: replay of `{}` for '{}' skipped ThreadManager record-set \
restore because one snapshot failed to decode ({}).",
op_label, name, e
);
return Ok(());
}
}
}
self.converge_thread_records(name, restored)
}
fn remove_redaction_sidecar(
&mut self,
blob: ContentHash,
state: ChangeId,
path: String,
redaction_id: ContentHash,
) -> HeddleResult<()> {
let repo = self.repo();
self.step_nonatomic(
move || repo.capture_redaction_sidecar(&blob).map_err(apply_error),
move |snapshot| {
repo.restore_redaction_sidecar(&blob, snapshot)
.map_err(apply_error)
},
move || {
repo.remove_redaction(&blob, &state, &path, &redaction_id)
.map(|_| ())
.map_err(apply_error)
},
)
}
fn restore_visibility_sidecar(
&mut self,
state: ChangeId,
expected_current: Option<Vec<u8>>,
target: Option<Vec<u8>>,
) -> HeddleResult<()> {
let repo = self.repo();
let inverse_expected = target.clone();
let inverse_target = expected_current.clone();
self.step(
move || match repo
.restore_state_visibility_sidecar_if_unchanged(&state, &expected_current, target)
.map_err(apply_error)?
{
VisibilitySidecarRestore::Applied => Ok(()),
VisibilitySidecarRestore::Superseded => Err(visibility_superseded_conflict(&state)),
},
move || {
repo.restore_state_visibility_sidecar_if_unchanged(
&state,
&inverse_expected,
inverse_target,
)
.map(|_| ())
.map_err(apply_error)
},
)
}
fn git_restore_snapshot(
&mut self,
repo: &'a Repository,
branch: &str,
snapshot: &GitState,
forward: impl FnOnce() -> Result<()>,
) -> HeddleResult<()> {
let snapshot = snapshot.clone();
let branch = branch.to_string();
self.step_nonatomic(
move || Ok(snapshot),
move |snapshot| restore_git_state(repo, &branch, &snapshot),
move || forward().map_err(apply_error),
)
}
fn mark_batch_undone(&mut self, batch: &OpBatch) -> HeddleResult<OpBatch> {
let repo = self.repo();
let forward_batch = batch.clone();
let inverse_batch = batch.clone();
self.step(
move || repo.oplog().mark_batch_undone(&forward_batch),
move || repo.oplog().mark_batch_redone(&inverse_batch).map(|_| ()),
)
}
fn mark_batch_redone(&mut self, batch: &OpBatch) -> HeddleResult<OpBatch> {
let repo = self.repo();
let forward_batch = batch.clone();
let inverse_batch = batch.clone();
self.step(
move || repo.oplog().mark_batch_redone(&forward_batch),
move || repo.oplog().mark_batch_undone(&inverse_batch).map(|_| ()),
)
}
}
fn apply_undo_entry(steps: &mut EntrySteps, entry: &OpEntry) -> HeddleResult<()> {
match &entry.operation {
OpRecord::Snapshot {
prev_head: Some(prev),
thread,
new_state,
..
} => {
steps.goto(*prev)?;
if let Some(thread) = thread {
steps.set_thread(thread.as_str(), *prev)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(thread.as_str()),
})?;
sync_thread_record_state(steps, thread, *prev)?;
mark_merged_threads_unintegrated_for_target(steps, thread, new_state, prev)?;
}
}
OpRecord::Goto {
prev_head: Some(prev),
..
} => {
steps.goto(*prev)?;
}
OpRecord::Snapshot {
prev_head: None, ..
}
| OpRecord::Goto {
prev_head: None, ..
} => {}
OpRecord::ThreadCreate { name, .. } => {
delete_thread_safely(steps, &ThreadName::new(name.as_str()))?;
remove_thread_manager_record(steps, name)?;
}
OpRecord::ThreadDelete { name, state } => {
steps.set_thread(name.as_str(), *state)?;
}
OpRecord::ThreadUpdate {
name,
old_state,
manager_snapshots,
..
} => {
if manager_snapshots
.as_ref()
.is_some_and(|snapshots| snapshots.old_ref_absent)
{
steps.delete_thread(name.as_str())?;
} else {
steps.set_thread(name.as_str(), *old_state)?;
steps.restore_active_thread_worktree(name.as_str(), *old_state)?;
}
if let Some(snapshots) = manager_snapshots.as_ref() {
if !snapshots.old_records.is_empty() || !snapshots.new_records.is_empty() {
steps.restore_thread_record_set(
name,
&snapshots.old_records,
"ThreadUpdate",
)?;
} else if let Some(bytes) = snapshots.old.as_ref() {
steps.restore_thread_record(name, bytes, "ThreadUpdate")?;
}
}
}
OpRecord::MarkerCreate { name, .. } => {
steps.delete_marker(name.as_str())?;
}
OpRecord::MarkerDelete { name, state } => {
steps.create_marker(name.as_str(), *state)?;
}
OpRecord::Collapse {
thread: Some(thread),
pre_thread_state: Some(pre_thread_state),
..
} => {
steps.set_thread(thread.as_str(), *pre_thread_state)?;
sync_thread_record_state(steps, thread, *pre_thread_state)?;
}
OpRecord::Collapse { .. } => {}
OpRecord::Redact {
redaction_id,
blob,
state,
path,
} => {
steps.remove_redaction_sidecar(*blob, *state, path.clone(), *redaction_id)?;
}
OpRecord::FastForward {
source_thread,
target_thread,
pre_target_id,
..
} => {
apply_ff_undo(steps, source_thread, target_thread, pre_target_id)?;
}
OpRecord::GitCheckpoint {
branch,
previous_git_oid,
new_git_oid,
..
} => {
apply_git_checkpoint_undo(steps, branch, previous_git_oid.as_deref(), new_git_oid)?;
}
OpRecord::StateVisibilitySet {
state,
prior_sidecar,
new_sidecar,
..
}
| OpRecord::StateVisibilityPromote {
state,
prior_sidecar,
new_sidecar,
..
} => {
steps.restore_visibility_sidecar(*state, new_sidecar.clone(), prior_sidecar.clone())?;
}
OpRecord::Fork { .. }
| OpRecord::Checkpoint { .. }
| OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::EphemeralThreadCollapse { .. }
| OpRecord::Purge { .. }
| OpRecord::RemoteThreadUpdate { .. }
| OpRecord::RemoteThreadDelete { .. }
| OpRecord::UndoRecoveryUpdate { .. } => {}
}
Ok(())
}
fn apply_ff_undo(
steps: &mut EntrySteps,
source_thread: &str,
target_thread: &str,
pre_target_id: &ChangeId,
) -> HeddleResult<()> {
steps.goto(*pre_target_id)?;
steps.set_thread(target_thread, *pre_target_id)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(target_thread),
})?;
sync_thread_record_state(steps, target_thread, *pre_target_id)?;
mark_source_thread_unintegrated(steps, source_thread, pre_target_id)
}
fn apply_redo_entry(steps: &mut EntrySteps, entry: &OpEntry) -> HeddleResult<()> {
match &entry.operation {
OpRecord::Snapshot {
new_state,
prev_head,
thread,
..
} => {
steps.goto(*new_state)?;
if let Some(thread) = thread {
steps.set_thread(thread.as_str(), *new_state)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(thread.as_str()),
})?;
sync_thread_record_state(steps, thread, *new_state)?;
mark_ready_threads_integrated_for_target(steps, thread, new_state, prev_head)?;
}
}
OpRecord::Goto { target, .. } => {
steps.goto(*target)?;
}
OpRecord::ThreadCreate {
name,
state,
manager_snapshot,
} => {
steps.set_thread(name.as_str(), *state)?;
if let Some(bytes) = manager_snapshot {
steps.restore_thread_record(name, bytes, "ThreadCreate")?;
}
}
OpRecord::ThreadDelete { name, .. } => {
delete_thread_safely(steps, &ThreadName::new(name.as_str()))?;
}
OpRecord::ThreadUpdate {
name,
new_state,
manager_snapshots,
..
} => {
steps.set_thread(name.as_str(), *new_state)?;
steps.restore_active_thread_worktree(name.as_str(), *new_state)?;
if let Some(snapshots) = manager_snapshots.as_ref() {
if !snapshots.old_records.is_empty() || !snapshots.new_records.is_empty() {
steps.restore_thread_record_set(
name,
&snapshots.new_records,
"ThreadUpdate",
)?;
} else if let Some(bytes) = snapshots.new.as_ref() {
steps.restore_thread_record(name, bytes, "ThreadUpdate")?;
}
}
}
OpRecord::MarkerCreate { name, state } => {
steps.create_marker(name.as_str(), *state)?;
}
OpRecord::MarkerDelete { name, .. } => {
steps.delete_marker(name.as_str())?;
}
OpRecord::Collapse {
thread: Some(thread),
result,
pre_thread_state: Some(_),
..
} => {
steps.set_thread(thread.as_str(), *result)?;
sync_thread_record_state(steps, thread, *result)?;
}
OpRecord::Collapse { .. } => {}
OpRecord::FastForward {
source_thread,
target_thread,
post_target_id,
..
} => {
apply_ff_redo(steps, source_thread, target_thread, post_target_id)?;
}
OpRecord::GitCheckpoint {
branch,
previous_git_oid,
new_git_oid,
..
} => {
apply_git_checkpoint_redo(steps, branch, previous_git_oid.as_deref(), new_git_oid)?;
}
OpRecord::StateVisibilitySet {
state,
prior_sidecar,
new_sidecar,
..
}
| OpRecord::StateVisibilityPromote {
state,
prior_sidecar,
new_sidecar,
..
} => {
steps.restore_visibility_sidecar(*state, prior_sidecar.clone(), new_sidecar.clone())?;
}
OpRecord::Fork { .. }
| OpRecord::Checkpoint { .. }
| OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::EphemeralThreadCollapse { .. }
| OpRecord::Redact { .. }
| OpRecord::Purge { .. }
| OpRecord::RemoteThreadUpdate { .. }
| OpRecord::RemoteThreadDelete { .. }
| OpRecord::UndoRecoveryUpdate { .. } => {}
}
Ok(())
}
fn apply_ff_redo(
steps: &mut EntrySteps,
source_thread: &str,
target_thread: &str,
post_target_id: &ChangeId,
) -> HeddleResult<()> {
steps.goto(*post_target_id)?;
steps.set_thread(target_thread, *post_target_id)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(target_thread),
})?;
sync_thread_record_state(steps, target_thread, *post_target_id)?;
mark_source_thread_integrated(steps, source_thread, post_target_id)
}
#[derive(Clone)]
struct GitState {
head_file: Option<String>,
checkout_branch_oid: Option<ObjectId>,
checkout_head_oid: Option<String>,
mirror_branch_oid: Option<ObjectId>,
}
fn capture_git_state(repo: &Repository, branch: &str) -> HeddleResult<GitState> {
let git = git_checkout_repo(repo).map_err(apply_error)?;
let checkout_branch_oid = if branch == "HEAD" {
None
} else {
ref_target_oid(&git, &format!("refs/heads/{branch}")).map_err(apply_error)?
};
let head_file = fs::read_to_string(git.git_dir().join("HEAD")).ok();
let checkout_head_oid = git
.head()
.ok()
.and_then(|head| head.oid.map(|id| id.to_string()));
let mirror_branch_oid = capture_mirror_oid(repo, branch).map_err(apply_error)?;
Ok(GitState {
head_file,
checkout_branch_oid,
checkout_head_oid,
mirror_branch_oid,
})
}
fn restore_git_state(repo: &Repository, branch: &str, state: &GitState) -> HeddleResult<()> {
let git = git_checkout_repo(repo).map_err(apply_error)?;
if branch != "HEAD" {
let ref_name = format!("refs/heads/{branch}");
match state.checkout_branch_oid {
Some(oid) => set_reference(
&git,
&ref_name,
oid,
RefPrecondition::Any,
"heddle: rollback git checkpoint",
)
.map_err(|error| apply_error(anyhow!(error)))?,
None => delete_ref_if_present(&git, &ref_name).map_err(apply_error)?,
}
}
if let Some(head) = &state.head_file {
let head_path = git.git_dir().join("HEAD");
fs::write(&head_path, head)?;
fsync_file_and_parent(&head_path).map_err(apply_error)?;
}
if let Some(oid) = &state.checkout_head_oid {
let oid = parse_git_oid(oid).map_err(apply_error)?;
reset_git_index_to_commit(&git, oid).map_err(apply_error)?;
}
restore_mirror_oid(repo, branch, state.mirror_branch_oid).map_err(apply_error)?;
Ok(())
}
fn capture_mirror_oid(repo: &Repository, branch: &str) -> Result<Option<ObjectId>> {
if branch == "HEAD" {
return Ok(None);
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(None);
}
let git = open_git_repo(&mirror)?;
ref_target_oid(&git, &format!("refs/heads/{branch}"))
}
fn restore_mirror_oid(repo: &Repository, branch: &str, oid: Option<ObjectId>) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(());
}
let git = open_git_repo(&mirror)?;
let ref_name = format!("refs/heads/{branch}");
match oid {
Some(oid) => set_reference(
&git,
&ref_name,
oid,
RefPrecondition::Any,
"heddle: rollback mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
None => delete_ref_if_present(&git, &ref_name),
}
}
fn delete_ref_if_present(git: &SleyRepository, ref_name: &str) -> Result<()> {
if ref_target_oid(git, ref_name)?.is_some() {
delete_reference_matching(git, ref_name, None)?;
}
Ok(())
}
fn apply_git_checkpoint_undo(
steps: &mut EntrySteps,
branch: &str,
previous_git_oid: Option<&str>,
new_git_oid: &str,
) -> HeddleResult<()> {
let repo = steps.repo();
ensure_git_head_is(repo, new_git_oid, "undo git checkpoint").map_err(apply_error)?;
ensure_git_worktree_clean(repo, "undo git checkpoint").map_err(apply_error)?;
let snapshot = capture_git_state(repo, branch)?;
let new_oid = parse_git_oid(new_git_oid).map_err(apply_error)?;
match previous_git_oid {
Some(previous) => {
let previous_oid = parse_git_oid(previous).map_err(apply_error)?;
if branch != "HEAD" {
let git = git_checkout_repo(repo).map_err(apply_error)?;
if ref_target_oid(&git, &format!("refs/heads/{branch}")).map_err(apply_error)?
!= Some(previous_oid)
{
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_attached_git_head(
&git_checkout_repo(repo)?,
branch,
previous_oid,
new_oid,
"heddle: undo git checkpoint",
)
})?;
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
reset_git_index_to_commit(&git_checkout_repo(repo)?, previous_oid)
})?;
let previous = previous.to_string();
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(repo, branch, Some(&previous), Some(&new_git_oid))
})?;
}
None => {
if branch != "HEAD" {
steps.git_restore_snapshot(repo, branch, &snapshot, || {
delete_reference_matching(
&git_checkout_repo(repo)?,
&format!("refs/heads/{branch}"),
Some(new_oid),
)
})?;
}
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(repo, branch, None, Some(&new_git_oid))
})?;
}
}
Ok(())
}
fn apply_git_checkpoint_redo(
steps: &mut EntrySteps,
branch: &str,
previous_git_oid: Option<&str>,
new_git_oid: &str,
) -> HeddleResult<()> {
let repo = steps.repo();
if let Some(previous) = previous_git_oid {
ensure_git_head_is(repo, previous, "redo git checkpoint").map_err(apply_error)?;
}
let snapshot = capture_git_state(repo, branch)?;
let new_oid = parse_git_oid(new_git_oid).map_err(apply_error)?;
if branch != "HEAD" {
match previous_git_oid {
Some(previous) => {
let previous_oid = parse_git_oid(previous).map_err(apply_error)?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_attached_git_head(
&git_checkout_repo(repo)?,
branch,
new_oid,
previous_oid,
"heddle: redo git checkpoint",
)
})?;
}
None => {
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_reference(
&git_checkout_repo(repo)?,
&format!("refs/heads/{branch}"),
new_oid,
RefPrecondition::Any,
"heddle: redo git checkpoint",
)
.map_err(|error| anyhow!(error))
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
}
}
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
reset_git_index_to_commit(&git_checkout_repo(repo)?, new_oid)
})?;
let previous_git_oid = previous_git_oid.map(|previous| previous.to_string());
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(
repo,
branch,
Some(&new_git_oid),
previous_git_oid.as_deref(),
)
})?;
Ok(())
}
fn update_mirror_branch_ref(
repo: &Repository,
branch: &str,
target_oid: Option<&str>,
expected_old_oid: Option<&str>,
) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(());
}
let git = open_git_repo(&mirror)?;
let ref_name = format!("refs/heads/{branch}");
if let Some(target) = target_oid
&& ref_target_oid(&git, &ref_name)? == Some(parse_git_oid(target)?)
{
return Ok(());
}
match (target_oid, expected_old_oid) {
(Some(target), Some(expected)) => set_reference(
&git,
&ref_name,
parse_git_oid(target)?,
RefPrecondition::MustExistAndMatch(ReferenceTarget::Direct(parse_git_oid(expected)?)),
"heddle: update mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
(Some(target), None) => set_reference(
&git,
&ref_name,
parse_git_oid(target)?,
RefPrecondition::Any,
"heddle: update mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
(None, Some(expected)) => {
delete_reference_matching(&git, &ref_name, Some(parse_git_oid(expected)?))
}
(None, None) => delete_reference_matching(&git, &ref_name, None),
}
}
fn ensure_git_head_is(repo: &Repository, expected: &str, action: &str) -> Result<()> {
let actual = current_git_head(repo)?;
if actual == expected {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::git_head_mismatch(
action,
&actual,
expected,
repo.git_overlay_current_branch()?
.unwrap_or_else(|| "HEAD".to_string()),
git_dirty_paths(repo),
)))
}
fn ensure_git_worktree_clean(repo: &Repository, action: &str) -> Result<()> {
let Some(status) = repo.git_overlay_worktree_status()? else {
return Ok(());
};
if status.is_clean() {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::dirty_worktree(
action,
git_status_paths(&status),
"the Heddle undo batch has not been applied",
)))
}
fn git_dirty_paths(repo: &Repository) -> Vec<String> {
repo.git_overlay_worktree_status()
.ok()
.flatten()
.map(|status| git_status_paths(&status))
.unwrap_or_default()
}
fn git_status_paths(status: &objects::worktree::WorktreeStatus) -> Vec<String> {
let mut paths = Vec::new();
paths.extend(format_status_paths("modified", &status.modified));
paths.extend(format_status_paths("added", &status.added));
paths.extend(format_status_paths("deleted", &status.deleted));
paths
}
fn format_status_paths(kind: &str, paths: &[PathBuf]) -> Vec<String> {
paths
.iter()
.map(|path| format!("{kind}: {}", path.display()))
.collect()
}
fn git_checkout_repo(repo: &Repository) -> Result<SleyRepository> {
open_git_repo(repo.root()).map_err(|error| anyhow!(error))
}
fn parse_git_oid(oid: &str) -> Result<ObjectId> {
oid.parse::<ObjectId>()
.map_err(|error| anyhow!("invalid Git object id '{oid}': {error}"))
}
fn ref_target_oid(repo: &SleyRepository, name: &str) -> Result<Option<ObjectId>> {
let Some(reference) = repo
.find_reference(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
else {
return Ok(None);
};
reference
.peeled_oid(repo)
.map_err(|error| anyhow!("failed to resolve Git reference '{name}': {error}"))
}
fn attach_git_head_to_branch(repo: &SleyRepository, branch: &str) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
let head_path = repo.git_dir().join("HEAD");
fs::write(&head_path, format!("ref: refs/heads/{branch}\n"))
.map_err(|error| anyhow!("failed to attach Git HEAD to branch '{branch}': {error}"))?;
fsync_file_and_parent(&head_path)?;
Ok(())
}
fn set_attached_git_head(
repo: &SleyRepository,
branch: &str,
target: ObjectId,
expected: ObjectId,
log_message: &str,
) -> Result<()> {
let ref_name = if branch == "HEAD" {
"HEAD".to_string()
} else {
format!("refs/heads/{branch}")
};
set_reference_with_reflog(
repo,
&ref_name,
target,
RefPrecondition::MustExistAndMatch(ReferenceTarget::Direct(expected)),
log_message,
)
.map_err(|error| anyhow!("failed to update Git HEAD for branch '{branch}': {error}"))
}
fn reset_git_index_to_commit(repo: &SleyRepository, oid: ObjectId) -> Result<()> {
let object = repo
.read_object(&oid)
.map_err(|error| anyhow!("failed to inspect Git commit {oid}: {error}"))?;
if object.object_type != GitObjectType::Commit {
return Err(anyhow!("failed to inspect Git commit {oid}: not a commit"));
}
let commit = repo
.read_commit(&oid)
.map_err(|error| anyhow!("failed to inspect Git commit {oid}: {error}"))?;
let index = repo
.index_from_tree(&commit.tree)
.map_err(|error| anyhow!("failed to build Git index for commit {oid}: {error}"))?;
repo.write_index(
&index,
IndexWriteOptions {
fsync: true,
validate_checksum: true,
},
)
.map_err(|error| anyhow!("failed to write Git index for commit {oid}: {error}"))?;
Ok(())
}
fn delete_reference_matching(
repo: &SleyRepository,
name: &str,
expected: Option<ObjectId>,
) -> Result<()> {
let current = ref_target_oid(repo, name)?;
if current.is_none() {
return Err(anyhow!(
"failed to delete Git reference '{name}': ref is missing"
));
}
if let Some(expected) = expected
&& current != Some(expected)
{
return Err(anyhow!(
"failed to delete Git reference '{name}': expected {expected}, found {}",
current
.map(|oid| oid.to_string())
.unwrap_or_else(|| "missing".to_string())
));
}
let refs = repo.references();
match refs
.read_ref(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
{
Some(ReferenceTarget::Direct(oid)) => repo
.delete_ref(DeleteRef {
name: FullName::new(name)
.map_err(|error| anyhow!("invalid Git reference '{name}': {error}"))?,
expected_old: Some(expected.unwrap_or(oid)),
expected: None,
reflog: None,
reflog_committer: None,
})
.map_err(|error| anyhow!("failed to delete Git reference '{name}': {error}")),
Some(ReferenceTarget::Symbolic(_)) => refs
.delete_symbolic_ref(name)
.map(|_| ())
.map_err(|error| anyhow!("failed to delete Git reference '{name}': {error}")),
None => Err(anyhow!(
"failed to delete Git reference '{name}': ref is missing"
)),
}
}
fn git_signature() -> Signature {
let seconds = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs() as i64)
.unwrap_or(0);
let name = "Heddle";
let email = "heddle@local";
Signature {
name: GitByteString::new(name.as_bytes().to_vec()),
email: GitByteString::new(email.as_bytes().to_vec()),
time: GitTime::new(seconds, 0),
raw: format!("{name} <{email}> {seconds} +0000").into_bytes(),
}
}
fn git_reflog_entry(
old_oid: ObjectId,
new_oid: ObjectId,
message: &str,
) -> sley::plumbing::sley_refs::ReflogEntry {
sley::plumbing::sley_refs::ReflogEntry {
old_oid,
new_oid,
committer: git_signature().to_ident_bytes(),
message: message.as_bytes().to_vec(),
}
}
fn set_reference_with_reflog(
repo: &SleyRepository,
name: &str,
target: ObjectId,
constraint: RefPrecondition,
log_message: &str,
) -> Result<()> {
let refs = repo.references();
let old_oid = match refs
.read_ref(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
{
Some(ReferenceTarget::Direct(oid)) => oid,
_ => ObjectId::null(repo.object_format()),
};
let reflog = git_reflog_entry(old_oid, target, log_message);
let should_append_head_reflog = name != "HEAD"
&& repo
.head()
.ok()
.and_then(|head| head.symbolic_target.map(|target| target.to_string()))
.as_deref()
== Some(name);
let mut tx = refs.transaction();
tx.update_to(
name.to_string(),
ReferenceTarget::Direct(target),
constraint,
Some(reflog.clone()),
);
tx.commit()
.map_err(|error| anyhow!("failed to update Git reference '{name}': {error}"))?;
if should_append_head_reflog {
refs.append_reflog("HEAD", &reflog)
.map_err(|error| anyhow!("failed to append Git HEAD reflog: {error}"))?;
}
Ok(())
}
fn fsync_file_and_parent(path: &Path) -> Result<()> {
fs::File::open(path)
.and_then(|file| file.sync_all())
.map_err(|error| anyhow!("failed to sync '{}': {error}", path.display()))?;
if let Some(parent) = path.parent() {
fs::File::open(parent)
.and_then(|dir| dir.sync_all())
.map_err(|error| anyhow!("failed to sync '{}': {error}", parent.display()))?;
}
Ok(())
}
fn delete_thread_safely(steps: &mut EntrySteps, name: &ThreadName) -> HeddleResult<()> {
let repo = steps.repo();
if let Head::Attached { thread } = repo.head_ref()?
&& thread == *name
{
let state = repo.refs().get_thread(name)?.ok_or_else(|| {
HeddleError::Conflict(
thread_not_found_advice(name.as_str(), "delete thread").to_string(),
)
})?;
steps.write_head(Head::Detached { state })?;
}
steps.delete_thread(name.as_str())?;
Ok(())
}
fn sync_thread_record_state(
steps: &mut EntrySteps,
thread_name: &str,
state: objects::object::ChangeId,
) -> HeddleResult<()> {
let manager = ThreadManager::new(steps.repo().heddle_dir());
if let Some(mut thread) = manager.find_by_thread(thread_name)? {
thread.current_state = Some(state.short());
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
}
Ok(())
}
fn mark_source_thread_unintegrated(
steps: &mut EntrySteps,
source_thread: &str,
target_after_undo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
let Some(mut thread) = manager.find_by_thread(source_thread)? else {
return Ok(());
};
let source_tip = repo.refs().get_thread(&ThreadName::new(source_thread))?;
let still_integrated = source_tip
.as_ref()
.is_some_and(|source_tip| change_contains(repo, source_tip, target_after_undo));
if still_integrated {
return Ok(());
}
if matches!(thread.state, ThreadState::Merged) {
thread.state = ThreadState::Ready;
}
if let Some(source_tip) = source_tip {
thread.current_state = Some(source_tip.short());
}
thread.merged_state = None;
if matches!(
thread.integration_policy_result.status.as_deref(),
Some("auto_integrated")
) {
thread.integration_policy_result = ThreadIntegrationPolicy::default();
}
refresh_thread_freshness(repo, &mut thread)?;
if matches!(thread.freshness, ThreadFreshness::Unknown) {
thread.freshness = ThreadFreshness::Current;
}
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
Ok(())
}
fn mark_merged_threads_unintegrated_for_target(
steps: &mut EntrySteps,
target_thread: &str,
integrated_state: &ChangeId,
target_after_undo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
for thread in manager.list()? {
if thread.thread == target_thread
|| thread.target_thread.as_deref() != Some(target_thread)
|| thread.state != ThreadState::Merged
{
continue;
}
let points_at_integrated_state = thread
.merged_state
.as_deref()
.or(thread.current_state.as_deref())
.and_then(|state| repo.resolve_state(state).ok().flatten())
.is_some_and(|state| state == *integrated_state);
if points_at_integrated_state {
mark_source_thread_unintegrated(steps, &thread.thread, target_after_undo)?;
}
}
Ok(())
}
fn mark_source_thread_integrated(
steps: &mut EntrySteps,
source_thread: &str,
target_after_redo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
let Some(mut thread) = manager.find_by_thread(source_thread)? else {
return Ok(());
};
let source_tip = repo.refs().get_thread(&ThreadName::new(source_thread))?;
let integrated = source_tip
.as_ref()
.is_some_and(|source_tip| change_contains(repo, source_tip, target_after_redo));
if !integrated {
return Ok(());
}
thread.state = ThreadState::Merged;
thread.merged_state = Some(target_after_redo.short());
thread.current_state = source_tip
.map(|source_tip| source_tip.short())
.or_else(|| Some(target_after_redo.short()));
thread.integration_policy_result = ThreadIntegrationPolicy {
status: Some("auto_integrated".to_string()),
reason: Some("redo restored integrated target state".to_string()),
manual_resolution_state: thread.integration_policy_result.manual_resolution_state,
conflicts_resolved_manually: thread
.integration_policy_result
.conflicts_resolved_manually,
};
thread.freshness = ThreadFreshness::Current;
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
Ok(())
}
fn mark_ready_threads_integrated_for_target(
steps: &mut EntrySteps,
target_thread: &str,
integrated_state: &ChangeId,
target_before_redo: &Option<ChangeId>,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
for thread in manager.list()? {
if thread.thread == target_thread
|| thread.target_thread.as_deref() != Some(target_thread)
|| thread.state != ThreadState::Ready
{
continue;
}
let Some(source_tip) = repo.refs().get_thread(&ThreadName::new(&thread.thread))? else {
continue;
};
let newly_integrated = change_contains(repo, &source_tip, integrated_state)
&& !target_before_redo
.as_ref()
.is_some_and(|before| change_contains(repo, &source_tip, before));
if newly_integrated {
mark_source_thread_integrated(steps, &thread.thread, integrated_state)?;
}
}
Ok(())
}
fn change_contains(repo: &Repository, ancestor: &ChangeId, descendant: &ChangeId) -> bool {
let mut graph = CommitGraphIndex::new(repo);
graph.is_ancestor(ancestor, descendant).unwrap_or(false)
}
fn remove_thread_manager_record(steps: &mut EntrySteps, thread_name: &str) -> HeddleResult<()> {
steps.converge_thread_records(thread_name, Vec::new())
}
fn apply_error(err: anyhow::Error) -> HeddleError {
HeddleError::Conflict(format!("{err:#}"))
}
fn visibility_superseded_conflict(state: &ChangeId) -> HeddleError {
HeddleError::Conflict(format!(
"cannot undo/redo visibility on state {}: a concurrent `visibility set`/`promote` \
superseded the sidecar. The newer record is preserved; re-run undo/redo after \
refreshing.",
state.to_string_full()
))
}
pub(super) fn acquire_undo_redo_lock(repo: &Repository) -> Result<WriteLockGuard> {
RepoLock::at(repo.heddle_dir().join("locks/undo-redo.lock"))
.write()
.map_err(|e| anyhow!("failed to acquire undo/redo serialization lock: {e}"))
}
pub(super) fn undo_redo_transaction_id(
action: &str,
scope: &str,
generation: u64,
batches: &[OpBatch],
) -> String {
let ids: Vec<String> = batches.iter().map(|batch| batch.id.to_string()).collect();
format!("{action}:{scope}:gen{generation}:[{}]", ids.join(","))
}
struct StageUndoRecovery {
head: Option<ChangeId>,
}
impl AtomicMutation for StageUndoRecovery {
type Output = ();
fn transaction_id(&self) -> String {
"undo:stage-recovery".to_string()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
let mut keys = BTreeSet::new();
keys.insert(IsolationKey::LocalHead {
scope: repo.op_scope(),
});
Ok(keys)
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let Some(state) = self.head else {
return Ok(StagedCommit::pure(()));
};
let repo = tx.repo();
let prior = repo.refs().get_undo_recovery()?;
tx.step(
|| repo.refs().set_undo_recovery(&state),
move || match prior {
Some(prior) => repo.refs().set_undo_recovery(&prior),
None => repo.refs().clear_undo_recovery(),
},
)?;
Ok(StagedCommit::pure(()))
}
}
impl DeferredMutation for StageUndoRecovery {}
struct ApplyUndoBatch {
batch: OpBatch,
#[cfg(test)]
fail_after_entries: Option<usize>,
}
impl ApplyUndoBatch {
fn new(batch: OpBatch) -> Self {
Self {
batch,
#[cfg(test)]
fail_after_entries: None,
}
}
#[cfg(test)]
fn failing_after(batch: OpBatch, entries: usize) -> Self {
Self {
batch,
fail_after_entries: Some(entries),
}
}
}
impl AtomicMutation for ApplyUndoBatch {
type Output = OpBatch;
fn transaction_id(&self) -> String {
format!("undo:batch:{}", self.batch.id)
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(
std::slice::from_ref(&self.batch),
&repo.op_scope(),
))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<OpBatch>> {
let mut steps = EntrySteps::new(tx);
for (applied, entry) in self.batch.entries.iter().rev().enumerate() {
apply_undo_entry(&mut steps, entry)?;
#[cfg(test)]
if self.fail_after_entries == Some(applied + 1) {
return Err(HeddleError::Conflict("injected mid-undo fault".to_string()));
}
#[cfg(not(test))]
let _ = applied;
}
let updated = steps.mark_batch_undone(&self.batch)?;
Ok(StagedCommit::pure(updated))
}
}
impl DeferredMutation for ApplyUndoBatch {}
struct ApplyRedoBatch {
batch: OpBatch,
#[cfg(test)]
fail_after_entries: Option<usize>,
}
impl ApplyRedoBatch {
fn new(batch: OpBatch) -> Self {
Self {
batch,
#[cfg(test)]
fail_after_entries: None,
}
}
#[cfg(test)]
fn failing_after(batch: OpBatch, entries: usize) -> Self {
Self {
batch,
fail_after_entries: Some(entries),
}
}
}
impl AtomicMutation for ApplyRedoBatch {
type Output = OpBatch;
fn transaction_id(&self) -> String {
format!("redo:batch:{}", self.batch.id)
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(
std::slice::from_ref(&self.batch),
&repo.op_scope(),
))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<OpBatch>> {
let mut steps = EntrySteps::new(tx);
for (applied, entry) in self.batch.entries.iter().enumerate() {
apply_redo_entry(&mut steps, entry)?;
#[cfg(test)]
if self.fail_after_entries == Some(applied + 1) {
return Err(HeddleError::Conflict("injected mid-redo fault".to_string()));
}
#[cfg(not(test))]
let _ = applied;
}
let updated = steps.mark_batch_redone(&self.batch)?;
Ok(StagedCommit::pure(updated))
}
}
impl DeferredMutation for ApplyRedoBatch {}
fn isolation_keys_for_batches(batches: &[OpBatch], scope: &str) -> BTreeSet<IsolationKey> {
let mut keys = BTreeSet::new();
for batch in batches {
for entry in &batch.entries {
keys.extend(isolation_keys_for_record(
&entry.operation,
entry.scope.as_deref(),
));
}
}
keys.insert(IsolationKey::LocalHead {
scope: scope.to_string(),
});
keys
}
pub(super) struct UndoOp {
batches: Vec<OpBatch>,
recovery_head: Option<ChangeId>,
transaction_id: String,
}
impl UndoOp {
pub(super) fn new(
batches: Vec<OpBatch>,
recovery_head: Option<ChangeId>,
transaction_id: String,
) -> Self {
Self {
batches,
recovery_head,
transaction_id,
}
}
}
impl AtomicMutation for UndoOp {
type Output = Vec<OpBatch>;
fn transaction_id(&self) -> String {
self.transaction_id.clone()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<Vec<OpBatch>>> {
tx.enroll(StageUndoRecovery {
head: self.recovery_head,
})?;
let mut updated = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let staged = tx.enroll(ApplyUndoBatch::new(batch.clone()))?;
updated.push(staged.output);
}
Ok(StagedCommit::pure(updated))
}
}
pub(super) struct RedoOp {
batches: Vec<OpBatch>,
transaction_id: String,
}
impl RedoOp {
pub(super) fn new(batches: Vec<OpBatch>, transaction_id: String) -> Self {
Self {
batches,
transaction_id,
}
}
}
impl AtomicMutation for RedoOp {
type Output = Vec<OpBatch>;
fn transaction_id(&self) -> String {
self.transaction_id.clone()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<Vec<OpBatch>>> {
let mut updated = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let staged = tx.enroll(ApplyRedoBatch::new(batch.clone()))?;
updated.push(staged.output);
}
Ok(StagedCommit::pure(updated))
}
}
#[cfg(test)]
mod atomic_tests {
use oplog::ThreadUpdateSnapshots;
use tempfile::TempDir;
use super::*;
fn repo_with_two_snapshots() -> (TempDir, Repository, ChangeId, ChangeId) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
std::fs::write(temp.path().join("a.txt"), "a").unwrap();
let s1 = repo.snapshot(Some("s1".to_string()), None).unwrap();
std::fs::write(temp.path().join("b.txt"), "b").unwrap();
let s2 = repo.snapshot(Some("s2".to_string()), None).unwrap();
(temp, repo, s1.change_id, s2.change_id)
}
#[test]
fn apply_error_wraps_anyhow_into_conflict() {
let wrapped = apply_error(anyhow!("boom"));
assert!(
matches!(&wrapped, HeddleError::Conflict(message) if message.contains("boom")),
"an apply-helper error must surface as a HeddleError::Conflict carrying the message"
);
}
fn commit_marker_count(repo: &Repository) -> usize {
repo.oplog()
.recent(256)
.unwrap()
.iter()
.filter(|entry| matches!(entry.operation, OpRecord::TransactionCommit { .. }))
.count()
}
fn commit_marker_count_for(repo: &Repository, txid: &str) -> usize {
repo.oplog()
.recent(256)
.unwrap()
.iter()
.filter(|entry| {
matches!(
&entry.operation,
OpRecord::TransactionCommit { transaction_id, .. } if transaction_id == txid
)
})
.count()
}
fn main_thread(repo: &Repository) -> Option<ChangeId> {
repo.refs().get_thread(&ThreadName::new("main")).unwrap()
}
struct FaultyUndo {
batches: Vec<OpBatch>,
recovery_head: Option<ChangeId>,
fail_after: usize,
}
impl AtomicMutation for FaultyUndo {
type Output = ();
fn transaction_id(&self) -> String {
"test-undo-fault".to_string()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
tx.enroll(StageUndoRecovery {
head: self.recovery_head,
})?;
let last = self.batches.len() - 1;
for (i, batch) in self.batches.iter().enumerate() {
if i == last {
tx.enroll(ApplyUndoBatch::failing_after(
batch.clone(),
self.fail_after,
))?;
} else {
tx.enroll(ApplyUndoBatch::new(batch.clone()))?;
}
}
Ok(StagedCommit::pure(()))
}
}
struct FaultyRedo {
batches: Vec<OpBatch>,
fail_after: usize,
}
impl AtomicMutation for FaultyRedo {
type Output = ();
fn transaction_id(&self) -> String {
"test-redo-fault".to_string()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let last = self.batches.len() - 1;
for (i, batch) in self.batches.iter().enumerate() {
if i == last {
tx.enroll(ApplyRedoBatch::failing_after(
batch.clone(),
self.fail_after,
))?;
} else {
tx.enroll(ApplyRedoBatch::new(batch.clone()))?;
}
}
Ok(StagedCommit::pure(()))
}
}
#[test]
fn atomic_undo_success_reverts_and_records_recovery() {
let (temp, repo, s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let recovery_head = repo.head().unwrap();
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let updated =
repo::atomic::execute(&repo, UndoOp::new(batches, recovery_head, txid.clone()))
.unwrap();
assert_eq!(updated.len(), 1);
assert!(updated[0].entries.iter().all(|e| e.undone));
assert_eq!(repo.head().unwrap(), Some(s1), "HEAD reverted to s1");
assert_eq!(main_thread(&repo), Some(s1));
assert!(temp.path().join("a.txt").exists(), "s1 file kept");
assert!(!temp.path().join("b.txt").exists(), "s2 file reverted");
assert_eq!(
repo.refs().get_undo_recovery().unwrap(),
Some(s2),
"recovery pointer pins the pre-undo tip"
);
assert_eq!(
commit_marker_count_for(&repo, &txid),
1,
"exactly one undo commit marker"
);
}
#[test]
fn fault_mid_undo_rewinds_to_pre_operation_state() {
let (temp, repo, _s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let pre_head = repo.head().unwrap();
assert_eq!(pre_head, Some(s2));
let pre_main = main_thread(&repo);
assert_eq!(repo.refs().get_undo_recovery().unwrap(), None);
let pre_markers = commit_marker_count(&repo);
let batches = repo.oplog().undo_batches_scoped(2, Some(&scope)).unwrap();
assert_eq!(batches.len(), 2, "two snapshots are undoable");
let result = repo::atomic::execute(
&repo,
FaultyUndo {
batches,
recovery_head: pre_head,
fail_after: 1,
},
);
assert!(result.is_err(), "the injected fault must fail the undo");
assert_eq!(
repo.head().unwrap(),
Some(s2),
"HEAD rewound to pre-undo tip"
);
assert_eq!(main_thread(&repo), pre_main, "main ref rewound");
assert!(temp.path().join("a.txt").exists(), "s1 file restored");
assert!(temp.path().join("b.txt").exists(), "s2 file restored");
assert_eq!(
repo.oplog()
.undo_batches_scoped(2, Some(&scope))
.unwrap()
.len(),
2,
"no batch left marked undone"
);
assert_eq!(
repo.refs().get_undo_recovery().unwrap(),
None,
"recovery pointer cleared by rewind (it had no prior value)"
);
assert_eq!(
commit_marker_count(&repo),
pre_markers,
"a failed transaction commits no marker"
);
}
#[test]
fn fault_mid_redo_rewinds_to_pre_operation_state() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
std::fs::write(temp.path().join("a.txt"), "a").unwrap();
let _s1 = repo.snapshot(Some("s1".to_string()), None).unwrap();
let scope = repo.op_scope();
let recovery_head = repo.head().unwrap();
let undo_batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &undo_batches);
repo::atomic::execute(&repo, UndoOp::new(undo_batches, recovery_head, txid)).unwrap();
assert!(!temp.path().join("a.txt").exists(), "undone: a.txt gone");
let pre_redo_head = repo.head().unwrap();
let pre_redo_main = main_thread(&repo);
assert_eq!(
repo.oplog()
.redo_batches_scoped(1, Some(&scope))
.unwrap()
.len(),
1,
"one batch is redoable"
);
let pre_markers = commit_marker_count(&repo);
let redo_batches = repo.oplog().redo_batches_scoped(1, Some(&scope)).unwrap();
let result = repo::atomic::execute(
&repo,
FaultyRedo {
batches: redo_batches,
fail_after: 1,
},
);
assert!(result.is_err(), "the injected fault must fail the redo");
assert_eq!(repo.head().unwrap(), pre_redo_head, "HEAD rewound");
assert_eq!(main_thread(&repo), pre_redo_main, "main ref rewound");
assert!(
!temp.path().join("a.txt").exists(),
"s1 file not resurrected"
);
assert_eq!(
repo.oplog()
.redo_batches_scoped(1, Some(&scope))
.unwrap()
.len(),
1,
"batch still redoable"
);
assert_eq!(
commit_marker_count(&repo),
pre_markers,
"a failed transaction commits no marker"
);
}
#[test]
fn per_effect_rollback_threaded_snapshot_undo() {
let (temp, repo, _s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let pre_head = repo.head().unwrap();
assert_eq!(pre_head, Some(s2));
let pre_main = main_thread(&repo);
let pre_markers = commit_marker_count(&repo);
assert_eq!(repo.refs().get_undo_recovery().unwrap(), None);
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = with_entry_write_fault(1, || {
repo::atomic::execute(&repo, UndoOp::new(batches, pre_head, txid))
});
assert!(
result.is_err(),
"the injected 2nd-write fault must fail the undo"
);
assert_eq!(
repo.head().unwrap(),
Some(s2),
"HEAD goto rolled back to the pre-undo tip"
);
assert_eq!(main_thread(&repo), pre_main, "main ref unchanged");
assert!(temp.path().join("a.txt").exists(), "s1 file present");
assert!(
temp.path().join("b.txt").exists(),
"s2 file restored by the goto rollback (the per-effect inverse ran)"
);
assert_eq!(
repo.oplog()
.undo_batches_scoped(1, Some(&scope))
.unwrap()
.len(),
1,
"no batch left marked undone"
);
assert_eq!(
repo.refs().get_undo_recovery().unwrap(),
None,
"recovery pointer cleared by rewind"
);
assert_eq!(
commit_marker_count(&repo),
pre_markers,
"no marker committed"
);
}
#[test]
fn per_effect_rollback_threaded_snapshot_redo() {
let (temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let recovery_head = repo.head().unwrap();
let undo_batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &undo_batches);
repo::atomic::execute(&repo, UndoOp::new(undo_batches, recovery_head, txid)).unwrap();
assert_eq!(repo.head().unwrap(), Some(s1), "undone to s1");
assert!(!temp.path().join("b.txt").exists(), "b.txt gone after undo");
let pre_redo_head = repo.head().unwrap();
let pre_redo_main = main_thread(&repo);
let pre_markers = commit_marker_count(&repo);
let redo_batches = repo.oplog().redo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("redo", &scope, generation, &redo_batches);
let result = with_entry_write_fault(1, || {
repo::atomic::execute(&repo, RedoOp::new(redo_batches, txid))
});
assert!(
result.is_err(),
"the injected 2nd-write fault must fail the redo"
);
assert_eq!(
repo.head().unwrap(),
pre_redo_head,
"HEAD goto rolled back to the pre-redo (fully-undone) state"
);
assert_eq!(main_thread(&repo), pre_redo_main, "main ref unchanged");
assert!(temp.path().join("a.txt").exists(), "s1 file present");
assert!(
!temp.path().join("b.txt").exists(),
"s2 file NOT resurrected — the goto's per-effect inverse rolled it back"
);
assert_eq!(
repo.oplog()
.redo_batches_scoped(1, Some(&scope))
.unwrap()
.len(),
1,
"batch still redoable"
);
assert_eq!(
commit_marker_count(&repo),
pre_markers,
"no marker committed"
);
}
#[test]
fn per_effect_rollback_restores_marker_writes() {
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
repo.refs()
.create_marker(&MarkerName::new("mc"), &s1)
.unwrap();
let main_state = main_thread(&repo).unwrap();
repo.oplog()
.record_batch_scoped(
vec![
OpRecord::ThreadUpdate {
name: "main".to_string(),
old_state: main_state,
new_state: main_state,
manager_snapshots: None,
},
OpRecord::MarkerCreate {
name: "mc".to_string(),
state: s1,
},
OpRecord::MarkerDelete {
name: "md".to_string(),
state: s1,
},
],
Some(&scope),
)
.unwrap();
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let recovery_head = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = with_entry_write_fault(2, || {
repo::atomic::execute(&repo, UndoOp::new(batches, recovery_head, txid))
});
assert!(result.is_err(), "the injected fault must fail the undo");
assert_eq!(
repo.refs().get_marker(&MarkerName::new("mc")).unwrap(),
Some(s1),
"mc restored by the delete_marker inverse"
);
assert_eq!(
repo.refs().get_marker(&MarkerName::new("md")).unwrap(),
None,
"md removed again by the create_marker inverse"
);
}
#[test]
fn per_effect_rollback_restores_thread_ref_writes() {
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
repo.refs()
.set_thread(&ThreadName::new("old"), &s1)
.unwrap();
let main_state = main_thread(&repo).unwrap();
repo.oplog()
.record_batch_scoped(
vec![
OpRecord::ThreadUpdate {
name: "main".to_string(),
old_state: main_state,
new_state: main_state,
manager_snapshots: None,
},
OpRecord::ThreadCreate {
name: "old".to_string(),
state: s1,
manager_snapshot: None,
},
OpRecord::ThreadDelete {
name: "new".to_string(),
state: s1,
},
],
Some(&scope),
)
.unwrap();
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let recovery_head = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = with_entry_write_fault(2, || {
repo::atomic::execute(&repo, UndoOp::new(batches, recovery_head, txid))
});
assert!(result.is_err(), "the injected fault must fail the undo");
assert_eq!(
repo.refs().get_thread(&ThreadName::new("old")).unwrap(),
Some(s1),
"old restored by the delete_thread inverse"
);
assert_eq!(
repo.refs().get_thread(&ThreadName::new("new")).unwrap(),
None,
"new removed again by the set_thread inverse"
);
}
#[test]
fn atomic_undo_redo_round_trip_ignores_commit_markers() {
let (temp, repo, s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let recovery_head = repo.head().unwrap();
let undo_batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &undo_batches);
repo::atomic::execute(&repo, UndoOp::new(undo_batches, recovery_head, txid)).unwrap();
assert_eq!(repo.head().unwrap(), Some(s1));
let still_undoable = repo.oplog().undo_batches_scoped(2, Some(&scope)).unwrap();
assert_eq!(
still_undoable.len(),
1,
"only the s1 snapshot remains undoable; the commit marker is excluded"
);
let redo_batches = repo.oplog().redo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("redo", &scope, generation, &redo_batches);
repo::atomic::execute(&repo, RedoOp::new(redo_batches, txid)).unwrap();
assert_eq!(repo.head().unwrap(), Some(s2), "redo restored the s2 tip");
assert!(
temp.path().join("b.txt").exists(),
"s2 file restored by redo"
);
}
#[test]
fn undo_redo_lock_is_exclusive() {
let (_temp, repo, _s1, _s2) = repo_with_two_snapshots();
let lock_path = repo.heddle_dir().join("locks/undo-redo.lock");
let guard = acquire_undo_redo_lock(&repo).unwrap();
let contended = RepoLock::at(lock_path.clone()).try_write().unwrap();
assert!(
contended.is_none(),
"a second writer must be blocked while the lock is held"
);
drop(guard);
let reacquired = RepoLock::at(lock_path).try_write().unwrap();
assert!(
reacquired.is_some(),
"the lock is acquirable again after the holder releases it"
);
}
#[test]
fn serialized_second_undo_selects_a_different_batch() {
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let first_ids: Vec<u64> = {
let _lock = acquire_undo_redo_lock(&repo).unwrap();
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let ids = batches.iter().map(|b| b.id).collect();
let recovery = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
repo::atomic::execute(&repo, UndoOp::new(batches, recovery, txid)).unwrap();
ids
};
assert_eq!(repo.head().unwrap(), Some(s1), "first undo reverted to s1");
let _lock = acquire_undo_redo_lock(&repo).unwrap();
let second = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let second_ids: Vec<u64> = second.iter().map(|b| b.id).collect();
assert!(!second_ids.is_empty(), "a preceding op is still undoable");
assert_ne!(
second_ids, first_ids,
"the serialized second undo must not re-select the batch the first already undid"
);
}
#[test]
fn list_depth_one_returns_preceding_user_op_past_commit_marker() {
let (_temp, repo, _s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let recovery_head = repo.head().unwrap();
let undo_batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &undo_batches);
repo::atomic::execute(&repo, UndoOp::new(undo_batches, recovery_head, txid)).unwrap();
let raw = repo.oplog().recent_batches_scoped(1, Some(&scope)).unwrap();
assert_eq!(raw.len(), 1);
assert!(
raw[0].is_transaction_marker_only(),
"the newest batch is the undo's commit marker"
);
let user = repo
.oplog()
.recent_user_batches_scoped(1, Some(&scope))
.unwrap();
assert_eq!(
user.len(),
1,
"depth 1 returns exactly one user-facing batch"
);
assert!(
!user[0].is_transaction_marker_only(),
"the returned batch is a real op, not the marker sentinel"
);
assert!(
user[0]
.entries
.iter()
.any(|e| matches!(e.operation, OpRecord::Snapshot { .. })),
"it is the preceding s1 snapshot"
);
}
#[test]
fn undo_marker_delete_forward_failure_keeps_preexisting_marker() {
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let marker = MarkerName::new("keep");
repo.oplog()
.record_batch_scoped(
vec![OpRecord::MarkerDelete {
name: "keep".to_string(),
state: s1,
}],
Some(&scope),
)
.unwrap();
repo.refs().create_marker(&marker, &s1).unwrap();
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
assert!(
matches!(
batches[0].entries[0].operation,
OpRecord::MarkerDelete { .. }
),
"the newest undoable batch is the MarkerDelete"
);
let recovery_head = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = repo::atomic::execute(&repo, UndoOp::new(batches, recovery_head, txid));
assert!(
result.is_err(),
"the colliding create_marker must fail the undo"
);
assert_eq!(
repo.refs().get_marker(&marker).unwrap(),
Some(s1),
"the pre-existing marker survives the rolled-back undo (no delete inverse ran)"
);
}
#[test]
fn redo_marker_create_forward_failure_keeps_preexisting_marker() {
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let marker = MarkerName::new("keep");
repo.oplog()
.record_batch_scoped(
vec![OpRecord::MarkerCreate {
name: "keep".to_string(),
state: s1,
}],
Some(&scope),
)
.unwrap();
let created = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
repo.oplog().mark_batch_undone(&created[0]).unwrap();
repo.refs().create_marker(&marker, &s1).unwrap();
let redo_batches = repo.oplog().redo_batches_scoped(1, Some(&scope)).unwrap();
assert!(
matches!(
redo_batches[0].entries[0].operation,
OpRecord::MarkerCreate { .. }
),
"the redoable batch is the MarkerCreate"
);
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("redo", &scope, generation, &redo_batches);
let result = repo::atomic::execute(&repo, RedoOp::new(redo_batches, txid));
assert!(
result.is_err(),
"the colliding create_marker must fail the redo"
);
assert_eq!(
repo.refs().get_marker(&marker).unwrap(),
Some(s1),
"the pre-existing marker survives the rolled-back redo (no delete inverse ran)"
);
}
#[test]
fn step_nonatomic_rolls_back_partially_applied_goto() {
let (temp, repo, _s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let pre_head = repo.head().unwrap();
assert_eq!(pre_head, Some(s2));
let pre_main = main_thread(&repo);
let pre_markers = commit_marker_count(&repo);
assert_eq!(repo.refs().get_undo_recovery().unwrap(), None);
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(&repo, UndoOp::new(batches, pre_head, txid))
});
assert!(
result.is_err(),
"the injected partial-goto fault must fail the undo"
);
assert_eq!(
repo.head().unwrap(),
Some(s2),
"HEAD restored to the pre-undo tip after a partially-applied goto"
);
assert_eq!(main_thread(&repo), pre_main, "main ref unchanged");
assert!(temp.path().join("a.txt").exists(), "s1 file present");
assert!(
temp.path().join("b.txt").exists(),
"s2 worktree material restored by the goto's restore-before inverse"
);
assert_eq!(
repo.refs().get_undo_recovery().unwrap(),
None,
"recovery pointer cleared by rewind"
);
assert_eq!(
commit_marker_count(&repo),
pre_markers,
"no marker committed"
);
}
fn sample_main_thread(current_state: &str, materialized: &str) -> Thread {
Thread {
id: "thread-main".to_string(),
thread: "main".to_string(),
target_thread: None,
parent_thread: None,
mode: repo::ThreadMode::Solid,
state: ThreadState::Active,
base_state: "base".to_string(),
base_root: "root".to_string(),
current_state: Some(current_state.to_string()),
merged_state: None,
task: None,
execution_path: PathBuf::from("/work/exec"),
materialized_path: Some(PathBuf::from(materialized)),
changed_paths: vec![],
impact_categories: vec![],
heavy_impact_paths: vec![],
promotion_suggested: false,
freshness: ThreadFreshness::Current,
verification_summary: repo::ThreadVerificationSummary::default(),
confidence_summary: repo::ThreadConfidenceSummary::default(),
integration_policy_result: ThreadIntegrationPolicy::default(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
ephemeral: None,
auto: false,
shared_target_dir: None,
}
}
fn encode_thread_record_set(manager: &ThreadManager, records: &[Thread]) -> Vec<Vec<u8>> {
records
.iter()
.map(|record| manager.encode_thread_record_snapshot(record).unwrap())
.collect()
}
fn apply_undo_once(repo: &Repository, scope: &str) {
let batches = repo.oplog().undo_batches_scoped(1, Some(scope)).unwrap();
let recovery_head = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", scope, generation, &batches);
repo::atomic::execute(repo, UndoOp::new(batches, recovery_head, txid)).unwrap();
}
fn apply_redo_once(repo: &Repository, scope: &str) {
let batches = repo.oplog().redo_batches_scoped(1, Some(scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("redo", scope, generation, &batches);
repo::atomic::execute(repo, RedoOp::new(batches, txid)).unwrap();
}
#[test]
fn thread_update_undo_preserves_missing_ref_fallback_absence() {
let (_temp, repo, s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let manager = ThreadManager::new(repo.heddle_dir());
let mut old_record = sample_main_thread(&s1.short(), "/work/missing-ref");
old_record.id = "missing-ref".to_string();
old_record.thread = "missing-ref".to_string();
old_record.base_state = s1.short();
old_record.current_state = Some(s1.short());
let mut new_record = old_record.clone();
new_record.current_state = Some(s2.short());
new_record.updated_at = old_record.updated_at + chrono::Duration::seconds(1);
manager.save(&new_record).unwrap();
repo.refs()
.delete_thread(&ThreadName::new("missing-ref"))
.unwrap();
repo.oplog()
.record_batch_scoped(
vec![OpRecord::ThreadUpdate {
name: "missing-ref".to_string(),
old_state: s1,
new_state: s2,
manager_snapshots: ThreadUpdateSnapshots::from_record_sets(
Some(manager.encode_thread_record_snapshot(&old_record).unwrap()),
Some(manager.encode_thread_record_snapshot(&new_record).unwrap()),
encode_thread_record_set(&manager, std::slice::from_ref(&old_record)),
encode_thread_record_set(&manager, std::slice::from_ref(&new_record)),
true,
),
}],
Some(&scope),
)
.unwrap();
apply_undo_once(&repo, &scope);
assert_eq!(
repo.refs()
.get_thread(&ThreadName::new("missing-ref"))
.unwrap(),
None,
"undo restores the pre-update absence instead of fabricating a ref"
);
assert_eq!(
manager
.find_by_thread("missing-ref")
.unwrap()
.unwrap()
.current_state
.as_deref(),
Some(s1.short().as_str()),
"undo restores the old ThreadManager record"
);
apply_redo_once(&repo, &scope);
assert_eq!(
repo.refs()
.get_thread(&ThreadName::new("missing-ref"))
.unwrap(),
Some(s2),
"redo recreates the post-update thread ref"
);
assert_eq!(
manager
.find_by_thread("missing-ref")
.unwrap()
.unwrap()
.current_state
.as_deref(),
Some(s2.short().as_str()),
"redo restores the new ThreadManager record"
);
}
#[test]
fn thread_update_undo_redo_restores_duplicate_same_name_record_sets() {
let (_temp, repo, s1, s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let manager = ThreadManager::new(repo.heddle_dir());
let mut winner_old = sample_main_thread(&s1.short(), "/work/winner-old");
winner_old.id = "rec-winner".to_string();
winner_old.updated_at = chrono::Utc::now();
let mut duplicate = sample_main_thread(&s1.short(), "/work/duplicate");
duplicate.id = "rec-duplicate".to_string();
duplicate.updated_at = winner_old.updated_at - chrono::Duration::seconds(30);
let mut winner_new = winner_old.clone();
winner_new.current_state = Some(s2.short());
winner_new.materialized_path = Some(PathBuf::from("/work/winner-new"));
winner_new.updated_at = winner_old.updated_at + chrono::Duration::seconds(30);
manager.save(&winner_new).unwrap();
manager.save(&duplicate).unwrap();
repo.refs()
.set_thread(&ThreadName::new("main"), &s2)
.unwrap();
let old_records = vec![winner_old.clone(), duplicate.clone()];
let new_records = vec![winner_new.clone(), duplicate.clone()];
repo.oplog()
.record_batch_scoped(
vec![OpRecord::ThreadUpdate {
name: "main".to_string(),
old_state: s1,
new_state: s2,
manager_snapshots: ThreadUpdateSnapshots::from_record_sets(
Some(manager.encode_thread_record_snapshot(&winner_old).unwrap()),
Some(manager.encode_thread_record_snapshot(&winner_new).unwrap()),
encode_thread_record_set(&manager, &old_records),
encode_thread_record_set(&manager, &new_records),
false,
),
}],
Some(&scope),
)
.unwrap();
apply_undo_once(&repo, &scope);
let undone = manager.snapshot_records("main").unwrap();
let undone_ids: std::collections::HashSet<_> =
undone.iter().map(|record| record.id.as_str()).collect();
assert_eq!(
undone_ids,
std::collections::HashSet::from(["rec-winner", "rec-duplicate"]),
"undo preserves every same-name record"
);
assert_eq!(
manager
.load("rec-winner")
.unwrap()
.unwrap()
.current_state
.as_deref(),
Some(s1.short().as_str()),
"undo restores the winner's old body"
);
assert_eq!(
manager
.load("rec-duplicate")
.unwrap()
.unwrap()
.materialized_path,
Some(PathBuf::from("/work/duplicate")),
"undo keeps the non-winner duplicate worktree metadata"
);
apply_redo_once(&repo, &scope);
let redone = manager.snapshot_records("main").unwrap();
let redone_ids: std::collections::HashSet<_> =
redone.iter().map(|record| record.id.as_str()).collect();
assert_eq!(
redone_ids,
std::collections::HashSet::from(["rec-winner", "rec-duplicate"]),
"redo preserves every same-name record"
);
assert_eq!(
manager
.load("rec-winner")
.unwrap()
.unwrap()
.current_state
.as_deref(),
Some(s2.short().as_str()),
"redo restores the winner's new body"
);
assert_eq!(
manager
.load("rec-duplicate")
.unwrap()
.unwrap()
.materialized_path,
Some(PathBuf::from("/work/duplicate")),
"redo keeps the non-winner duplicate worktree metadata"
);
}
struct SaveOnly {
record: Thread,
}
impl AtomicMutation for SaveOnly {
type Output = ();
fn transaction_id(&self) -> String {
"test-save-only".to_string()
}
fn isolation_keys(&self, _repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
let mut keys = BTreeSet::new();
keys.insert(IsolationKey::Thread(self.record.thread.clone()));
Ok(keys)
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let mut steps = EntrySteps::new(tx);
steps.save_thread_record(self.record.clone())?;
Ok(StagedCommit::pure(()))
}
}
impl DeferredMutation for SaveOnly {}
#[test]
fn step_nonatomic_restores_record_and_workspace_on_save_failure() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let r0 = sample_main_thread("current-A", "/work/A");
manager.save(&r0).unwrap();
let mut r1 = r0.clone();
r1.current_state = Some("current-B".to_string());
r1.materialized_path = Some(PathBuf::from("/work/B"));
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(&repo, SaveOnly { record: r1 })
});
assert!(result.is_err(), "the injected save fault must fail the op");
let restored = manager.find_by_thread("main").unwrap().unwrap();
assert_eq!(
restored.current_state.as_deref(),
Some("current-A"),
"the record half (current_state) was restored to R0"
);
assert_eq!(
restored.materialized_path,
Some(PathBuf::from("/work/A")),
"the workspace half (materialized_path) was restored to R0"
);
}
#[test]
fn step_nonatomic_restores_replacement_save_deleting_leaked_new_record() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut r0 = sample_main_thread("current-A", "/work/A");
r0.id = "thread-main-v1".to_string();
r0.updated_at = chrono::Utc::now();
manager.save(&r0).unwrap();
let mut r1 = r0.clone();
r1.id = "thread-main-v2".to_string();
r1.current_state = Some("current-B".to_string());
r1.updated_at = r0.updated_at + chrono::Duration::seconds(60);
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(&repo, SaveOnly { record: r1 })
});
assert!(result.is_err(), "the injected save fault must fail the op");
assert!(
manager.load("thread-main-v2").unwrap().is_none(),
"the leaked new_id record must be deleted on rollback"
);
let remaining = manager.list().unwrap();
assert_eq!(
remaining.len(),
1,
"only the prior record survives for the thread, no leaked newer record"
);
let restored = manager.find_by_thread("main").unwrap().unwrap();
assert_eq!(
restored.id, "thread-main-v1",
"find_by_thread returns ONLY prev"
);
assert_eq!(restored.current_state.as_deref(), Some("current-A"));
}
#[test]
fn step_nonatomic_create_save_rollback_removes_created_record() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut created = sample_main_thread("current-A", "/work/A");
created.id = "thread-main-new".to_string();
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(&repo, SaveOnly { record: created })
});
assert!(result.is_err(), "the injected save fault must fail the op");
assert!(
manager.load("thread-main-new").unwrap().is_none(),
"the created record must be removed on rollback"
);
assert!(
manager.find_by_thread("main").unwrap().is_none(),
"no record survives for a rolled-back create save"
);
}
struct RestoreSnapshotOnly {
name: String,
bytes: Vec<u8>,
}
impl AtomicMutation for RestoreSnapshotOnly {
type Output = ();
fn transaction_id(&self) -> String {
"test-restore-snapshot-only".to_string()
}
fn isolation_keys(&self, _repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
let mut keys = BTreeSet::new();
keys.insert(IsolationKey::Thread(self.name.clone()));
Ok(keys)
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let mut steps = EntrySteps::new(tx);
steps.restore_thread_record(&self.name, &self.bytes, "ThreadCreate")?;
Ok(StagedCommit::pure(()))
}
}
impl DeferredMutation for RestoreSnapshotOnly {}
#[test]
fn step_nonatomic_restores_redo_snapshot_deleting_leaked_record() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut r0 = sample_main_thread("current-A", "/work/A");
r0.id = "thread-main-v1".to_string();
r0.updated_at = chrono::Utc::now();
manager.save(&r0).unwrap();
let mut snap_rec = r0.clone();
snap_rec.id = "thread-main-v2".to_string();
snap_rec.current_state = Some("current-B".to_string());
snap_rec.updated_at = r0.updated_at + chrono::Duration::seconds(60);
manager.save(&snap_rec).unwrap();
let snapshot = manager.snapshot_thread_record("main").unwrap().unwrap();
manager.delete("thread-main-v2").unwrap();
assert_eq!(
manager.list().unwrap().len(),
1,
"precondition: only the prior record exists at capture time"
);
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(
&repo,
RestoreSnapshotOnly {
name: "main".to_string(),
bytes: snapshot,
},
)
});
assert!(
result.is_err(),
"the injected restore fault must fail the op"
);
assert!(
manager.load("thread-main-v2").unwrap().is_none(),
"the leaked snapshot-id record must be deleted on rollback"
);
assert_eq!(
manager.list().unwrap().len(),
1,
"only the prior record survives, no leaked newer record"
);
let restored = manager.find_by_thread("main").unwrap().unwrap();
assert_eq!(
restored.id, "thread-main-v1",
"find_by_thread returns ONLY prev"
);
assert_eq!(restored.current_state.as_deref(), Some("current-A"));
}
#[test]
fn redo_restore_thread_record_converges_away_preexisting_duplicate() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut to_restore = sample_main_thread("current-restored", "/work/R");
to_restore.id = "rec-restored".to_string();
to_restore.updated_at = chrono::Utc::now();
manager.save(&to_restore).unwrap();
let snapshot = manager.snapshot_thread_record("main").unwrap().unwrap();
manager.delete("rec-restored").unwrap();
let mut dup = sample_main_thread("current-dup", "/work/D");
dup.id = "rec-dup".to_string();
dup.updated_at = to_restore.updated_at + chrono::Duration::seconds(60);
manager.save(&dup).unwrap();
assert_eq!(
manager
.list()
.unwrap()
.iter()
.filter(|t| t.thread == "main")
.count(),
1,
"precondition: only the duplicate is filed at redo time"
);
repo::atomic::execute(
&repo,
RestoreSnapshotOnly {
name: "main".to_string(),
bytes: snapshot,
},
)
.unwrap();
let under_name: Vec<_> = manager
.list()
.unwrap()
.into_iter()
.filter(|t| t.thread == "main")
.collect();
assert_eq!(
under_name.len(),
1,
"ONLY the restored record remains — the duplicate is converged away"
);
assert_eq!(under_name[0].id, "rec-restored");
assert_eq!(
manager.find_by_thread("main").unwrap().unwrap().id,
"rec-restored",
"find_by_thread returns the restored record, not the leaked duplicate"
);
assert!(
manager.load("rec-dup").unwrap().is_none(),
"the pre-existing duplicate record file is gone"
);
}
struct RemoveRecordOnly {
name: String,
}
impl AtomicMutation for RemoveRecordOnly {
type Output = ();
fn transaction_id(&self) -> String {
"test-remove-record-only".to_string()
}
fn isolation_keys(&self, _repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
let mut keys = BTreeSet::new();
keys.insert(IsolationKey::Thread(self.name.clone()));
Ok(keys)
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let mut steps = EntrySteps::new(tx);
remove_thread_manager_record(&mut steps, &self.name)?;
Ok(StagedCommit::pure(()))
}
}
impl DeferredMutation for RemoveRecordOnly {}
#[test]
fn remove_thread_manager_record_converges_name_to_empty() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut winner = sample_main_thread("current-A", "/work/A");
winner.id = "rec-winner".to_string();
winner.updated_at = chrono::Utc::now();
manager.save(&winner).unwrap();
let mut older = sample_main_thread("current-B", "/work/B");
older.id = "rec-older".to_string();
older.updated_at = winner.updated_at - chrono::Duration::seconds(60);
manager.save(&older).unwrap();
assert_eq!(
manager.list().unwrap().len(),
2,
"precondition: two records"
);
repo::atomic::execute(
&repo,
RemoveRecordOnly {
name: "main".to_string(),
},
)
.unwrap();
assert!(
manager.find_by_thread("main").unwrap().is_none(),
"converge-to-empty: no record survives under the name"
);
assert!(
manager.list().unwrap().iter().all(|t| t.thread != "main"),
"EVERY same-name record removed, not just the find_by_thread winner"
);
}
#[test]
fn remove_thread_manager_record_rollback_resaves_all_records() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let manager = ThreadManager::new(repo.heddle_dir());
let mut winner = sample_main_thread("current-A", "/work/A");
winner.id = "rec-winner".to_string();
winner.updated_at = chrono::Utc::now();
manager.save(&winner).unwrap();
let mut older = sample_main_thread("current-B", "/work/B");
older.id = "rec-older".to_string();
older.updated_at = winner.updated_at - chrono::Duration::seconds(60);
manager.save(&older).unwrap();
let result = with_nonatomic_forward_fault(0, || {
repo::atomic::execute(
&repo,
RemoveRecordOnly {
name: "main".to_string(),
},
)
});
assert!(
result.is_err(),
"the injected forward fault must fail the op"
);
let remaining = manager.list().unwrap();
assert_eq!(
remaining.len(),
2,
"rollback re-converged to ALL same-name records, not just the winner"
);
let ids: std::collections::HashSet<_> = remaining.iter().map(|t| t.id.clone()).collect();
assert!(
ids.contains("rec-winner") && ids.contains("rec-older"),
"both the winner and the older duplicate were restored"
);
assert_eq!(
manager.find_by_thread("main").unwrap().unwrap().id,
"rec-winner",
"find_by_thread still selects the newer winner after rollback"
);
}
#[test]
fn step_nonatomic_restores_redaction_sidecar_when_a_later_batch_fails() {
use objects::object::{Principal, Redaction};
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let main_state = main_thread(&repo).unwrap();
let blob = ContentHash::from_bytes([7u8; 32]);
let redaction = Redaction {
redacted_blob: blob,
state: s1,
path: "config/secrets.toml".to_string(),
reason: "leaked credential".to_string(),
redactor: Principal {
name: "Grace Hopper".to_string(),
email: "grace@example.com".to_string(),
},
redacted_at: chrono::Utc::now(),
signature: None,
purged_at: None,
supersedes: None,
};
let redaction_id = repo.put_redaction(redaction).unwrap();
assert_eq!(
repo.get_redactions_for_blob(&blob)
.unwrap()
.redactions
.len(),
1,
"redaction planted on disk"
);
repo.oplog()
.record_batch_scoped(
vec![OpRecord::ThreadUpdate {
name: "main".to_string(),
old_state: main_state,
new_state: main_state,
manager_snapshots: None,
}],
Some(&scope),
)
.unwrap();
repo.oplog()
.record_batch_scoped(
vec![OpRecord::Redact {
redaction_id,
blob,
state: s1,
path: "config/secrets.toml".to_string(),
}],
Some(&scope),
)
.unwrap();
let batches = repo.oplog().undo_batches_scoped(2, Some(&scope)).unwrap();
assert_eq!(batches.len(), 2);
assert!(
matches!(batches[0].entries[0].operation, OpRecord::Redact { .. }),
"the newest undoable batch is the Redact (undone first)"
);
let recovery_head = repo.head().unwrap();
let result = repo::atomic::execute(
&repo,
FaultyUndo {
batches,
recovery_head,
fail_after: 1,
},
);
assert!(
result.is_err(),
"the injected fault on the later batch must fail the undo"
);
let restored = repo.get_redactions_for_blob(&blob).unwrap();
assert_eq!(
restored.redactions.len(),
1,
"redaction sidecar restored by the rollback — the blob is NOT re-exposed"
);
assert!(
repo.get_redaction(&redaction_id).unwrap().is_some(),
"the exact redaction record is back on disk"
);
}
fn visibility_record(
state: ChangeId,
tier: objects::object::VisibilityTier,
ts: i64,
) -> objects::object::StateVisibility {
objects::object::StateVisibility {
state,
tier,
embargo_until: None,
declarer: objects::object::Principal {
name: "Grace Hopper".to_string(),
email: "grace@example.com".to_string(),
},
declared_at: chrono::DateTime::from_timestamp(ts, 0).unwrap(),
signature: None,
supersedes: None,
}
}
#[test]
fn concurrent_set_during_undo_is_not_clobbered() {
use objects::object::VisibilityTier;
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let state = s1;
repo.commit_state_visibility(
visibility_record(state, VisibilityTier::Internal, 1_700_000_000),
repo::VisibilityCommitKind::Set,
)
.expect("commit A")
.expect("a set always commits");
let batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
assert!(
batches[0]
.entries
.iter()
.any(|e| matches!(e.operation, OpRecord::StateVisibilitySet { .. })),
"the newest undoable batch is the visibility set"
);
repo.commit_state_visibility(
visibility_record(
state,
VisibilityTier::TeamScoped {
team_id: "infra".to_string(),
},
1_700_000_060,
),
repo::VisibilityCommitKind::Set,
)
.expect("commit C")
.expect("a set always commits");
let after_c = repo
.get_state_visibility_bytes_for_state(&state)
.expect("read sidecar after C");
assert!(after_c.is_some(), "C is on disk");
let recovery = repo.head().unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &batches);
let result = repo::atomic::execute(&repo, UndoOp::new(batches, recovery, txid));
assert!(
result.is_err(),
"the undo must abort on the superseding concurrent visibility commit"
);
assert_eq!(
repo.get_state_visibility_bytes_for_state(&state).unwrap(),
after_c,
"the newer concurrent visibility record C must survive the aborted undo"
);
assert!(
repo.has_visibility_for_state(&state).unwrap(),
"the state stays non-public (C's tier), not dropped to public-by-absence"
);
}
#[test]
fn undo_redo_visibility_roundtrip_still_works() {
use objects::object::VisibilityTier;
let (_temp, repo, s1, _s2) = repo_with_two_snapshots();
let scope = repo.op_scope();
let state = s1;
assert!(
!repo.has_visibility_for_state(&state).unwrap(),
"the state starts public-by-absence"
);
repo.commit_state_visibility(
visibility_record(state, VisibilityTier::Internal, 1_700_000_000),
repo::VisibilityCommitKind::Set,
)
.expect("commit A")
.expect("a set always commits");
let after_set = repo
.get_state_visibility_bytes_for_state(&state)
.expect("read A");
assert!(after_set.is_some(), "A is on disk");
let recovery = repo.head().unwrap();
let undo_batches = repo.oplog().undo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("undo", &scope, generation, &undo_batches);
repo::atomic::execute(&repo, UndoOp::new(undo_batches, recovery, txid))
.expect("undo succeeds with no concurrent writer");
assert!(
!repo.has_visibility_for_state(&state).unwrap(),
"undo restored public-by-absence"
);
assert!(
repo.get_state_visibility_bytes_for_state(&state)
.unwrap()
.is_none(),
"the sidecar was removed by the undo"
);
let redo_batches = repo.oplog().redo_batches_scoped(1, Some(&scope)).unwrap();
let generation = repo.oplog().head_id().unwrap();
let txid = undo_redo_transaction_id("redo", &scope, generation, &redo_batches);
repo::atomic::execute(&repo, RedoOp::new(redo_batches, txid)).expect("redo succeeds");
assert_eq!(
repo.get_state_visibility_bytes_for_state(&state).unwrap(),
after_set,
"redo restored exactly A's sidecar bytes"
);
assert!(
repo.has_visibility_for_state(&state).unwrap(),
"the state is non-public again after redo"
);
}
}