use std::{
collections::BTreeSet,
fs,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{Result, anyhow};
use objects::{
error::{HeddleError, Result as HeddleResult},
lock::{RepoLock, WriteLockGuard},
object::{ChangeId, ContentHash, MarkerName, ThreadName},
};
use oplog::{IsolationKey, OpBatch, OpEntry, OpLogBackend, OpRecord, isolation_keys_for_record};
use refs::Head;
use repo::{
CommitGraphIndex, Repository, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
ThreadState, VisibilitySidecarRestore,
atomic::{AtomicMutation, DeferredMutation, StagedCommit, Tx},
refresh_thread_freshness,
};
use sley::{
DeleteRef, FullName, GitObjectType, GitTime, HeadUpdateOptions, IndexWriteOptions, ObjectId,
RefPrecondition, ReferenceTarget, Repository as SleyRepository, Signature,
plumbing::sley_core::ByteString as GitByteString,
};
use super::{advice::RecoveryAdvice, thread_cmd::thread_not_found_advice};
use crate::bridge::git_core::{open_repo as open_git_repo, set_reference};
pub(super) fn preflight_undo_batches(repo: &Repository, batches: &[OpBatch]) -> Result<()> {
if !batches_have_git_checkpoint(batches) {
return Ok(());
}
let mut simulated_git_head = current_git_head(repo)?;
for batch in batches {
for entry in batch.entries.iter().rev() {
if let OpRecord::GitCheckpoint {
new_git_oid,
previous_git_oid,
..
} = &entry.operation
{
ensure_simulated_git_head_is(
repo,
&simulated_git_head,
new_git_oid,
"undo git checkpoint",
)?;
if let Some(previous) = previous_git_oid {
simulated_git_head = previous.clone();
}
}
}
}
ensure_git_worktree_clean(repo, "undo git checkpoint")?;
Ok(())
}
pub(super) fn preflight_redo_batches(repo: &Repository, batches: &[OpBatch]) -> Result<()> {
if !batches_have_git_checkpoint(batches) {
return Ok(());
}
let mut simulated_git_head = current_git_head(repo)?;
for batch in batches {
for entry in &batch.entries {
if let OpRecord::GitCheckpoint {
previous_git_oid,
new_git_oid,
..
} = &entry.operation
{
if let Some(previous) = previous_git_oid {
ensure_simulated_git_head_is(
repo,
&simulated_git_head,
previous,
"redo git checkpoint",
)?;
}
simulated_git_head = new_git_oid.clone();
}
}
}
ensure_git_worktree_clean(repo, "redo git checkpoint")?;
Ok(())
}
fn batches_have_git_checkpoint(batches: &[OpBatch]) -> bool {
batches.iter().any(|batch| {
batch
.entries
.iter()
.any(|entry| matches!(&entry.operation, OpRecord::GitCheckpoint { .. }))
})
}
fn current_git_head(repo: &Repository) -> Result<String> {
let git = git_checkout_repo(repo)?;
git.head()
.map_err(|error| anyhow!("failed to inspect Git HEAD: {error}"))?
.oid
.map(|id| id.to_string())
.ok_or_else(|| anyhow!("failed to inspect Git HEAD: HEAD is unborn"))
}
fn ensure_simulated_git_head_is(
repo: &Repository,
actual: &str,
expected: &str,
action: &str,
) -> Result<()> {
if actual == expected {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::git_head_mismatch(
action,
actual,
expected,
repo.git_overlay_current_branch()?
.unwrap_or_else(|| "HEAD".to_string()),
git_dirty_paths(repo),
)))
}
#[cfg(test)]
thread_local! {
static ENTRY_WRITE_FAULT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
static NONATOMIC_FORWARD_FAULT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
}
#[cfg(test)]
fn with_entry_write_fault<T>(skip_then_fail_at: usize, body: impl FnOnce() -> T) -> T {
ENTRY_WRITE_FAULT.with(|f| f.set(Some(skip_then_fail_at)));
let out = body();
ENTRY_WRITE_FAULT.with(|f| f.set(None));
out
}
#[cfg(test)]
fn with_nonatomic_forward_fault<T>(skip_then_fail_at: usize, body: impl FnOnce() -> T) -> T {
NONATOMIC_FORWARD_FAULT.with(|f| f.set(Some(skip_then_fail_at)));
let out = body();
NONATOMIC_FORWARD_FAULT.with(|f| f.set(None));
out
}
#[cfg(test)]
fn fault_counter_trips(
cell: &'static std::thread::LocalKey<std::cell::Cell<Option<usize>>>,
) -> bool {
cell.with(|f| match f.get() {
Some(0) => {
f.set(None);
true
}
Some(n) => {
f.set(Some(n - 1));
false
}
None => false,
})
}
fn restore_head(repo: &Repository, state: Option<ChangeId>, head_ref: &Head) -> HeddleResult<()> {
if let Some(state) = state {
repo.goto_without_record_discard_local(&state)?;
}
repo.refs().write_head(head_ref)
}
struct EntrySteps<'tx, 'a> {
tx: &'tx mut Tx<'a>,
}
impl<'a> EntrySteps<'_, 'a> {
fn new<'tx>(tx: &'tx mut Tx<'a>) -> EntrySteps<'tx, 'a> {
EntrySteps { tx }
}
fn repo(&self) -> &'a Repository {
self.tx.repo()
}
fn step<T>(
&mut self,
forward: impl FnOnce() -> HeddleResult<T>,
inverse: impl FnOnce() -> HeddleResult<()> + 'a,
) -> HeddleResult<T> {
#[cfg(test)]
if fault_counter_trips(&ENTRY_WRITE_FAULT) {
return Err(HeddleError::Conflict(
"injected mid-entry write fault".to_string(),
));
}
self.tx.step(forward, inverse)
}
fn step_nonatomic<T, S: 'a>(
&mut self,
capture: impl FnOnce() -> HeddleResult<S>,
restore: impl FnOnce(S) -> HeddleResult<()> + 'a,
forward: impl FnOnce() -> HeddleResult<T>,
) -> HeddleResult<T> {
#[cfg(test)]
if fault_counter_trips(&ENTRY_WRITE_FAULT) {
return Err(HeddleError::Conflict(
"injected mid-entry write fault".to_string(),
));
}
#[cfg(test)]
if fault_counter_trips(&NONATOMIC_FORWARD_FAULT) {
return self.tx.step_nonatomic(capture, restore, move || {
forward()?;
Err(HeddleError::Conflict(
"injected non-atomic forward fault".to_string(),
))
});
}
self.tx.step_nonatomic(capture, restore, forward)
}
fn goto(&mut self, target: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
self.step_nonatomic(
move || Ok((repo.head()?, repo.head_ref()?)),
move |(prev_state, prev_head_ref)| restore_head(repo, prev_state, &prev_head_ref),
move || repo.goto_without_record_discard_local(&target),
)
}
fn restore_active_thread_worktree(&mut self, name: &str, target: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let head_ref = repo.head_ref()?;
let Head::Attached { thread } = &head_ref else {
return Ok(());
};
if thread != name {
return Ok(());
}
self.step_nonatomic(
move || Ok((repo.head()?, repo.head_ref()?)),
move |(prev_state, prev_head_ref)| restore_head(repo, prev_state, &prev_head_ref),
move || repo.fast_forward_attached_without_record_discard_local(&target),
)
}
fn write_head(&mut self, head: Head) -> HeddleResult<()> {
let repo = self.repo();
let prev = repo.head_ref()?;
self.step(
move || repo.refs().write_head(&head),
move || repo.refs().write_head(&prev),
)
}
fn set_thread(&mut self, name: &str, state: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = ThreadName::new(name);
let prev = repo.refs().get_thread(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().set_thread(&forward_name, &state),
move || {
let name = ThreadName::new(restore_name);
match prev {
Some(prev) => repo.refs().set_thread(&name, &prev),
None => repo.refs().delete_thread(&name).map(|_| ()),
}
},
)
}
fn delete_thread(&mut self, name: &str) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = ThreadName::new(name);
let prev = repo.refs().get_thread(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().delete_thread(&forward_name).map(|_| ()),
move || match prev {
Some(prev) => repo
.refs()
.set_thread(&ThreadName::new(restore_name), &prev),
None => Ok(()),
},
)
}
fn create_marker(&mut self, name: &str, state: ChangeId) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = MarkerName::new(name);
let prev = repo.refs().get_marker(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().create_marker(&forward_name, &state),
move || {
let name = MarkerName::new(restore_name);
match prev {
Some(prev) => repo.refs().create_marker(&name, &prev),
None => repo.refs().delete_marker(&name).map(|_| ()),
}
},
)
}
fn delete_marker(&mut self, name: &str) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = MarkerName::new(name);
let prev = repo.refs().get_marker(&forward_name)?;
let restore_name = name.to_string();
self.step(
move || repo.refs().delete_marker(&forward_name).map(|_| ()),
move || match prev {
Some(prev) => repo
.refs()
.create_marker(&MarkerName::new(restore_name), &prev),
None => Ok(()),
},
)
}
fn converge_thread_records(&mut self, name: &str, desired: Vec<Thread>) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = name.to_string();
let restore_name = name.to_string();
let prior = ThreadManager::new(repo.heddle_dir()).snapshot_records(name)?;
self.step_nonatomic(
|| Ok(prior),
move |prior| {
ThreadManager::new(repo.heddle_dir()).converge_records(&restore_name, &prior)
},
move || ThreadManager::new(repo.heddle_dir()).converge_records(&forward_name, &desired),
)
}
fn save_thread_record(&mut self, record: Thread) -> HeddleResult<()> {
let name = record.thread.clone();
self.converge_thread_records(&name, vec![record])
}
fn restore_thread_record(
&mut self,
name: &str,
bytes: &[u8],
op_label: &'static str,
) -> HeddleResult<()> {
let repo = self.repo();
let forward_name = name.to_string();
let restore_name = name.to_string();
let warn_name = name.to_string();
let bytes = bytes.to_vec();
let prior = ThreadManager::new(repo.heddle_dir()).snapshot_records(name)?;
self.step_nonatomic(
|| Ok(prior),
move |prior| {
ThreadManager::new(repo.heddle_dir()).converge_records(&restore_name, &prior)
},
move || {
let manager = ThreadManager::new(repo.heddle_dir());
match manager.decode_thread_record_snapshot(&bytes) {
Ok(restored) => {
manager.converge_records(&forward_name, std::slice::from_ref(&restored))
}
Err(e) => {
eprintln!(
"warning: replay of `{}` for '{}' restored the ref but failed \
to decode the ThreadManager record snapshot ({}). Record-backed \
commands (`thread cd`, delegate) may degrade on this thread — run \
`heddle thread start {}` to recreate the record.",
op_label, warn_name, e, warn_name
);
Ok(())
}
}
},
)
}
fn restore_thread_record_set(
&mut self,
name: &str,
snapshots: &[Vec<u8>],
op_label: &'static str,
) -> HeddleResult<()> {
let manager = ThreadManager::new(self.repo().heddle_dir());
let mut restored = Vec::with_capacity(snapshots.len());
for bytes in snapshots {
match manager.decode_thread_record_snapshot(bytes) {
Ok(record) => restored.push(record),
Err(e) => {
eprintln!(
"warning: replay of `{}` for '{}' skipped ThreadManager record-set \
restore because one snapshot failed to decode ({}).",
op_label, name, e
);
return Ok(());
}
}
}
self.converge_thread_records(name, restored)
}
fn remove_redaction_sidecar(
&mut self,
blob: ContentHash,
state: ChangeId,
path: String,
redaction_id: ContentHash,
) -> HeddleResult<()> {
let repo = self.repo();
self.step_nonatomic(
move || repo.capture_redaction_sidecar(&blob).map_err(apply_error),
move |snapshot| {
repo.restore_redaction_sidecar(&blob, snapshot)
.map_err(apply_error)
},
move || {
repo.remove_redaction(&blob, &state, &path, &redaction_id)
.map(|_| ())
.map_err(apply_error)
},
)
}
fn restore_visibility_sidecar(
&mut self,
state: ChangeId,
expected_current: Option<Vec<u8>>,
target: Option<Vec<u8>>,
) -> HeddleResult<()> {
let repo = self.repo();
let inverse_expected = target.clone();
let inverse_target = expected_current.clone();
self.step(
move || match repo
.restore_state_visibility_sidecar_if_unchanged(&state, &expected_current, target)
.map_err(apply_error)?
{
VisibilitySidecarRestore::Applied => Ok(()),
VisibilitySidecarRestore::Superseded => Err(visibility_superseded_conflict(&state)),
},
move || {
repo.restore_state_visibility_sidecar_if_unchanged(
&state,
&inverse_expected,
inverse_target,
)
.map(|_| ())
.map_err(apply_error)
},
)
}
fn git_restore_snapshot(
&mut self,
repo: &'a Repository,
branch: &str,
snapshot: &GitState,
forward: impl FnOnce() -> Result<()>,
) -> HeddleResult<()> {
let snapshot = snapshot.clone();
let branch = branch.to_string();
self.step_nonatomic(
move || Ok(snapshot),
move |snapshot| restore_git_state(repo, &branch, &snapshot),
move || forward().map_err(apply_error),
)
}
fn mark_batch_undone(&mut self, batch: &OpBatch) -> HeddleResult<OpBatch> {
let repo = self.repo();
let forward_batch = batch.clone();
let inverse_batch = batch.clone();
self.step(
move || repo.oplog().mark_batch_undone(&forward_batch),
move || repo.oplog().mark_batch_redone(&inverse_batch).map(|_| ()),
)
}
fn mark_batch_redone(&mut self, batch: &OpBatch) -> HeddleResult<OpBatch> {
let repo = self.repo();
let forward_batch = batch.clone();
let inverse_batch = batch.clone();
self.step(
move || repo.oplog().mark_batch_redone(&forward_batch),
move || repo.oplog().mark_batch_undone(&inverse_batch).map(|_| ()),
)
}
}
fn apply_undo_entry(steps: &mut EntrySteps, entry: &OpEntry) -> HeddleResult<()> {
match &entry.operation {
OpRecord::Snapshot {
prev_head: Some(prev),
thread,
new_state,
..
} => {
steps.goto(*prev)?;
if let Some(thread) = thread {
steps.set_thread(thread.as_str(), *prev)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(thread.as_str()),
})?;
sync_thread_record_state(steps, thread, *prev)?;
mark_merged_threads_unintegrated_for_target(steps, thread, new_state, prev)?;
}
}
OpRecord::Goto {
prev_head: Some(prev),
..
} => {
steps.goto(*prev)?;
}
OpRecord::Snapshot {
prev_head: None, ..
}
| OpRecord::Goto {
prev_head: None, ..
} => {}
OpRecord::ThreadCreate { name, .. } => {
delete_thread_safely(steps, &ThreadName::new(name.as_str()))?;
remove_thread_manager_record(steps, name)?;
}
OpRecord::ThreadDelete { name, state } => {
steps.set_thread(name.as_str(), *state)?;
}
OpRecord::ThreadUpdate {
name,
old_state,
manager_snapshots,
..
} => {
if manager_snapshots
.as_ref()
.is_some_and(|snapshots| snapshots.old_ref_absent)
{
steps.delete_thread(name.as_str())?;
} else {
steps.set_thread(name.as_str(), *old_state)?;
steps.restore_active_thread_worktree(name.as_str(), *old_state)?;
}
if let Some(snapshots) = manager_snapshots.as_ref() {
if !snapshots.old_records.is_empty() || !snapshots.new_records.is_empty() {
steps.restore_thread_record_set(
name,
&snapshots.old_records,
"ThreadUpdate",
)?;
} else if let Some(bytes) = snapshots.old.as_ref() {
steps.restore_thread_record(name, bytes, "ThreadUpdate")?;
}
}
}
OpRecord::MarkerCreate { name, .. } => {
steps.delete_marker(name.as_str())?;
}
OpRecord::MarkerDelete { name, state } => {
steps.create_marker(name.as_str(), *state)?;
}
OpRecord::Collapse {
thread: Some(thread),
pre_thread_state: Some(pre_thread_state),
..
} => {
steps.set_thread(thread.as_str(), *pre_thread_state)?;
sync_thread_record_state(steps, thread, *pre_thread_state)?;
}
OpRecord::Collapse { .. } => {}
OpRecord::Redact {
redaction_id,
blob,
state,
path,
} => {
steps.remove_redaction_sidecar(*blob, *state, path.clone(), *redaction_id)?;
}
OpRecord::FastForward {
source_thread,
target_thread,
pre_target_id,
..
} => {
apply_ff_undo(steps, source_thread, target_thread, pre_target_id)?;
}
OpRecord::GitCheckpoint {
branch,
previous_git_oid,
new_git_oid,
..
} => {
apply_git_checkpoint_undo(steps, branch, previous_git_oid.as_deref(), new_git_oid)?;
}
OpRecord::StateVisibilitySet {
state,
prior_sidecar,
new_sidecar,
..
}
| OpRecord::StateVisibilityPromote {
state,
prior_sidecar,
new_sidecar,
..
} => {
steps.restore_visibility_sidecar(*state, new_sidecar.clone(), prior_sidecar.clone())?;
}
OpRecord::Fork { .. }
| OpRecord::Checkpoint { .. }
| OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::EphemeralThreadCollapse { .. }
| OpRecord::Purge { .. }
| OpRecord::RemoteThreadUpdate { .. }
| OpRecord::RemoteThreadDelete { .. }
| OpRecord::UndoRecoveryUpdate { .. } => {}
}
Ok(())
}
fn apply_ff_undo(
steps: &mut EntrySteps,
source_thread: &str,
target_thread: &str,
pre_target_id: &ChangeId,
) -> HeddleResult<()> {
steps.goto(*pre_target_id)?;
steps.set_thread(target_thread, *pre_target_id)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(target_thread),
})?;
sync_thread_record_state(steps, target_thread, *pre_target_id)?;
mark_source_thread_unintegrated(steps, source_thread, pre_target_id)
}
fn apply_redo_entry(steps: &mut EntrySteps, entry: &OpEntry) -> HeddleResult<()> {
match &entry.operation {
OpRecord::Snapshot {
new_state,
prev_head,
thread,
..
} => {
steps.goto(*new_state)?;
if let Some(thread) = thread {
steps.set_thread(thread.as_str(), *new_state)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(thread.as_str()),
})?;
sync_thread_record_state(steps, thread, *new_state)?;
mark_ready_threads_integrated_for_target(steps, thread, new_state, prev_head)?;
}
}
OpRecord::Goto { target, .. } => {
steps.goto(*target)?;
}
OpRecord::ThreadCreate {
name,
state,
manager_snapshot,
} => {
steps.set_thread(name.as_str(), *state)?;
if let Some(bytes) = manager_snapshot {
steps.restore_thread_record(name, bytes, "ThreadCreate")?;
}
}
OpRecord::ThreadDelete { name, .. } => {
delete_thread_safely(steps, &ThreadName::new(name.as_str()))?;
}
OpRecord::ThreadUpdate {
name,
new_state,
manager_snapshots,
..
} => {
steps.set_thread(name.as_str(), *new_state)?;
steps.restore_active_thread_worktree(name.as_str(), *new_state)?;
if let Some(snapshots) = manager_snapshots.as_ref() {
if !snapshots.old_records.is_empty() || !snapshots.new_records.is_empty() {
steps.restore_thread_record_set(
name,
&snapshots.new_records,
"ThreadUpdate",
)?;
} else if let Some(bytes) = snapshots.new.as_ref() {
steps.restore_thread_record(name, bytes, "ThreadUpdate")?;
}
}
}
OpRecord::MarkerCreate { name, state } => {
steps.create_marker(name.as_str(), *state)?;
}
OpRecord::MarkerDelete { name, .. } => {
steps.delete_marker(name.as_str())?;
}
OpRecord::Collapse {
thread: Some(thread),
result,
pre_thread_state: Some(_),
..
} => {
steps.set_thread(thread.as_str(), *result)?;
sync_thread_record_state(steps, thread, *result)?;
}
OpRecord::Collapse { .. } => {}
OpRecord::FastForward {
source_thread,
target_thread,
post_target_id,
..
} => {
apply_ff_redo(steps, source_thread, target_thread, post_target_id)?;
}
OpRecord::GitCheckpoint {
branch,
previous_git_oid,
new_git_oid,
..
} => {
apply_git_checkpoint_redo(steps, branch, previous_git_oid.as_deref(), new_git_oid)?;
}
OpRecord::StateVisibilitySet {
state,
prior_sidecar,
new_sidecar,
..
}
| OpRecord::StateVisibilityPromote {
state,
prior_sidecar,
new_sidecar,
..
} => {
steps.restore_visibility_sidecar(*state, prior_sidecar.clone(), new_sidecar.clone())?;
}
OpRecord::Fork { .. }
| OpRecord::Checkpoint { .. }
| OpRecord::TransactionAbort { .. }
| OpRecord::TransactionCommit { .. }
| OpRecord::ConflictResolved { .. }
| OpRecord::EphemeralThreadCollapse { .. }
| OpRecord::Redact { .. }
| OpRecord::Purge { .. }
| OpRecord::RemoteThreadUpdate { .. }
| OpRecord::RemoteThreadDelete { .. }
| OpRecord::UndoRecoveryUpdate { .. } => {}
}
Ok(())
}
fn apply_ff_redo(
steps: &mut EntrySteps,
source_thread: &str,
target_thread: &str,
post_target_id: &ChangeId,
) -> HeddleResult<()> {
steps.goto(*post_target_id)?;
steps.set_thread(target_thread, *post_target_id)?;
steps.write_head(Head::Attached {
thread: ThreadName::new(target_thread),
})?;
sync_thread_record_state(steps, target_thread, *post_target_id)?;
mark_source_thread_integrated(steps, source_thread, post_target_id)
}
#[derive(Clone)]
struct GitState {
head_file: Option<String>,
checkout_branch_oid: Option<ObjectId>,
checkout_head_oid: Option<String>,
mirror_branch_oid: Option<ObjectId>,
}
fn capture_git_state(repo: &Repository, branch: &str) -> HeddleResult<GitState> {
let git = git_checkout_repo(repo).map_err(apply_error)?;
let checkout_branch_oid = if branch == "HEAD" {
None
} else {
ref_target_oid(&git, &format!("refs/heads/{branch}")).map_err(apply_error)?
};
let head_file = fs::read_to_string(git.git_dir().join("HEAD")).ok();
let checkout_head_oid = git
.head()
.ok()
.and_then(|head| head.oid.map(|id| id.to_string()));
let mirror_branch_oid = capture_mirror_oid(repo, branch).map_err(apply_error)?;
Ok(GitState {
head_file,
checkout_branch_oid,
checkout_head_oid,
mirror_branch_oid,
})
}
fn restore_git_state(repo: &Repository, branch: &str, state: &GitState) -> HeddleResult<()> {
let git = git_checkout_repo(repo).map_err(apply_error)?;
if branch != "HEAD" {
let ref_name = format!("refs/heads/{branch}");
match state.checkout_branch_oid {
Some(oid) => set_reference(
&git,
&ref_name,
oid,
RefPrecondition::Any,
"heddle: rollback git checkpoint",
)
.map_err(|error| apply_error(anyhow!(error)))?,
None => delete_ref_if_present(&git, &ref_name).map_err(apply_error)?,
}
}
if let Some(head) = &state.head_file {
let head_path = git.git_dir().join("HEAD");
fs::write(&head_path, head)?;
fsync_file_and_parent(&head_path).map_err(apply_error)?;
}
if let Some(oid) = &state.checkout_head_oid {
let oid = parse_git_oid(oid).map_err(apply_error)?;
reset_git_index_to_commit(&git, oid).map_err(apply_error)?;
}
restore_mirror_oid(repo, branch, state.mirror_branch_oid).map_err(apply_error)?;
Ok(())
}
fn capture_mirror_oid(repo: &Repository, branch: &str) -> Result<Option<ObjectId>> {
if branch == "HEAD" {
return Ok(None);
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(None);
}
let git = open_git_repo(&mirror)?;
ref_target_oid(&git, &format!("refs/heads/{branch}"))
}
fn restore_mirror_oid(repo: &Repository, branch: &str, oid: Option<ObjectId>) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(());
}
let git = open_git_repo(&mirror)?;
let ref_name = format!("refs/heads/{branch}");
match oid {
Some(oid) => set_reference(
&git,
&ref_name,
oid,
RefPrecondition::Any,
"heddle: rollback mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
None => delete_ref_if_present(&git, &ref_name),
}
}
fn delete_ref_if_present(git: &SleyRepository, ref_name: &str) -> Result<()> {
if ref_target_oid(git, ref_name)?.is_some() {
delete_reference_matching(git, ref_name, None)?;
}
Ok(())
}
fn apply_git_checkpoint_undo(
steps: &mut EntrySteps,
branch: &str,
previous_git_oid: Option<&str>,
new_git_oid: &str,
) -> HeddleResult<()> {
let repo = steps.repo();
ensure_git_head_is(repo, new_git_oid, "undo git checkpoint").map_err(apply_error)?;
ensure_git_worktree_clean(repo, "undo git checkpoint").map_err(apply_error)?;
let snapshot = capture_git_state(repo, branch)?;
let new_oid = parse_git_oid(new_git_oid).map_err(apply_error)?;
match previous_git_oid {
Some(previous) => {
let previous_oid = parse_git_oid(previous).map_err(apply_error)?;
if branch != "HEAD" {
let git = git_checkout_repo(repo).map_err(apply_error)?;
if ref_target_oid(&git, &format!("refs/heads/{branch}")).map_err(apply_error)?
!= Some(previous_oid)
{
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_attached_git_head(
&git_checkout_repo(repo)?,
branch,
previous_oid,
new_oid,
"heddle: undo git checkpoint",
)
})?;
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
reset_git_index_to_commit(&git_checkout_repo(repo)?, previous_oid)
})?;
let previous = previous.to_string();
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(repo, branch, Some(&previous), Some(&new_git_oid))
})?;
}
None => {
if branch != "HEAD" {
steps.git_restore_snapshot(repo, branch, &snapshot, || {
delete_reference_matching(
&git_checkout_repo(repo)?,
&format!("refs/heads/{branch}"),
Some(new_oid),
)
})?;
}
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(repo, branch, None, Some(&new_git_oid))
})?;
}
}
Ok(())
}
fn apply_git_checkpoint_redo(
steps: &mut EntrySteps,
branch: &str,
previous_git_oid: Option<&str>,
new_git_oid: &str,
) -> HeddleResult<()> {
let repo = steps.repo();
if let Some(previous) = previous_git_oid {
ensure_git_head_is(repo, previous, "redo git checkpoint").map_err(apply_error)?;
}
let snapshot = capture_git_state(repo, branch)?;
let new_oid = parse_git_oid(new_git_oid).map_err(apply_error)?;
if branch != "HEAD" {
match previous_git_oid {
Some(previous) => {
let previous_oid = parse_git_oid(previous).map_err(apply_error)?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_attached_git_head(
&git_checkout_repo(repo)?,
branch,
new_oid,
previous_oid,
"heddle: redo git checkpoint",
)
})?;
}
None => {
steps.git_restore_snapshot(repo, branch, &snapshot, || {
set_reference(
&git_checkout_repo(repo)?,
&format!("refs/heads/{branch}"),
new_oid,
RefPrecondition::Any,
"heddle: redo git checkpoint",
)
.map_err(|error| anyhow!(error))
})?;
steps.git_restore_snapshot(repo, branch, &snapshot, || {
attach_git_head_to_branch(&git_checkout_repo(repo)?, branch)
})?;
}
}
}
steps.git_restore_snapshot(repo, branch, &snapshot, || {
reset_git_index_to_commit(&git_checkout_repo(repo)?, new_oid)
})?;
let previous_git_oid = previous_git_oid.map(|previous| previous.to_string());
let new_git_oid = new_git_oid.to_string();
steps.git_restore_snapshot(repo, branch, &snapshot, || {
update_mirror_branch_ref(
repo,
branch,
Some(&new_git_oid),
previous_git_oid.as_deref(),
)
})?;
Ok(())
}
fn update_mirror_branch_ref(
repo: &Repository,
branch: &str,
target_oid: Option<&str>,
expected_old_oid: Option<&str>,
) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
let mirror = repo.heddle_dir().join("git");
if !mirror.exists() {
return Ok(());
}
let git = open_git_repo(&mirror)?;
let ref_name = format!("refs/heads/{branch}");
if let Some(target) = target_oid
&& ref_target_oid(&git, &ref_name)? == Some(parse_git_oid(target)?)
{
return Ok(());
}
match (target_oid, expected_old_oid) {
(Some(target), Some(expected)) => set_reference(
&git,
&ref_name,
parse_git_oid(target)?,
RefPrecondition::MustExistAndMatch(ReferenceTarget::Direct(parse_git_oid(expected)?)),
"heddle: update mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
(Some(target), None) => set_reference(
&git,
&ref_name,
parse_git_oid(target)?,
RefPrecondition::Any,
"heddle: update mirror checkpoint ref",
)
.map_err(|error| anyhow!(error)),
(None, Some(expected)) => {
delete_reference_matching(&git, &ref_name, Some(parse_git_oid(expected)?))
}
(None, None) => delete_reference_matching(&git, &ref_name, None),
}
}
fn ensure_git_head_is(repo: &Repository, expected: &str, action: &str) -> Result<()> {
let actual = current_git_head(repo)?;
if actual == expected {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::git_head_mismatch(
action,
&actual,
expected,
repo.git_overlay_current_branch()?
.unwrap_or_else(|| "HEAD".to_string()),
git_dirty_paths(repo),
)))
}
fn ensure_git_worktree_clean(repo: &Repository, action: &str) -> Result<()> {
let Some(status) = repo.git_overlay_worktree_status()? else {
return Ok(());
};
if status.is_clean() {
return Ok(());
}
Err(anyhow!(RecoveryAdvice::dirty_worktree(
action,
git_status_paths(&status),
"the Heddle undo batch has not been applied",
)))
}
fn git_dirty_paths(repo: &Repository) -> Vec<String> {
repo.git_overlay_worktree_status()
.ok()
.flatten()
.map(|status| git_status_paths(&status))
.unwrap_or_default()
}
fn git_status_paths(status: &objects::worktree::WorktreeStatus) -> Vec<String> {
let mut paths = Vec::new();
paths.extend(format_status_paths("modified", &status.modified));
paths.extend(format_status_paths("added", &status.added));
paths.extend(format_status_paths("deleted", &status.deleted));
paths
}
fn format_status_paths(kind: &str, paths: &[PathBuf]) -> Vec<String> {
paths
.iter()
.map(|path| format!("{kind}: {}", path.display()))
.collect()
}
fn git_checkout_repo(repo: &Repository) -> Result<SleyRepository> {
open_git_repo(repo.root()).map_err(|error| anyhow!(error))
}
fn parse_git_oid(oid: &str) -> Result<ObjectId> {
oid.parse::<ObjectId>()
.map_err(|error| anyhow!("invalid Git object id '{oid}': {error}"))
}
fn ref_target_oid(repo: &SleyRepository, name: &str) -> Result<Option<ObjectId>> {
let Some(reference) = repo
.find_reference(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
else {
return Ok(None);
};
reference
.peeled_oid(repo)
.map_err(|error| anyhow!("failed to resolve Git reference '{name}': {error}"))
}
fn attach_git_head_to_branch(repo: &SleyRepository, branch: &str) -> Result<()> {
if branch == "HEAD" {
return Ok(());
}
repo.set_head_symref(
format!("refs/heads/{branch}"),
HeadUpdateOptions::new(),
)
.map_err(|error| anyhow!("failed to attach Git HEAD to branch '{branch}': {error}"))?;
Ok(())
}
fn set_attached_git_head(
repo: &SleyRepository,
branch: &str,
target: ObjectId,
expected: ObjectId,
log_message: &str,
) -> Result<()> {
let ref_name = if branch == "HEAD" {
"HEAD".to_string()
} else {
format!("refs/heads/{branch}")
};
set_reference_with_reflog(
repo,
&ref_name,
target,
RefPrecondition::MustExistAndMatch(ReferenceTarget::Direct(expected)),
log_message,
)
.map_err(|error| anyhow!("failed to update Git HEAD for branch '{branch}': {error}"))
}
fn reset_git_index_to_commit(repo: &SleyRepository, oid: ObjectId) -> Result<()> {
let object = repo
.read_object(&oid)
.map_err(|error| anyhow!("failed to inspect Git commit {oid}: {error}"))?;
if object.object_type != GitObjectType::Commit {
return Err(anyhow!("failed to inspect Git commit {oid}: not a commit"));
}
let commit = repo
.read_commit(&oid)
.map_err(|error| anyhow!("failed to inspect Git commit {oid}: {error}"))?;
let index = repo
.index_from_tree(&commit.tree)
.map_err(|error| anyhow!("failed to build Git index for commit {oid}: {error}"))?;
repo.write_index(
&index,
IndexWriteOptions {
fsync: true,
validate_checksum: true,
},
)
.map_err(|error| anyhow!("failed to write Git index for commit {oid}: {error}"))?;
Ok(())
}
fn delete_reference_matching(
repo: &SleyRepository,
name: &str,
expected: Option<ObjectId>,
) -> Result<()> {
let current = ref_target_oid(repo, name)?;
if current.is_none() {
return Err(anyhow!(
"failed to delete Git reference '{name}': ref is missing"
));
}
if let Some(expected) = expected
&& current != Some(expected)
{
return Err(anyhow!(
"failed to delete Git reference '{name}': expected {expected}, found {}",
current
.map(|oid| oid.to_string())
.unwrap_or_else(|| "missing".to_string())
));
}
let refs = repo.references();
match refs
.read_ref(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
{
Some(ReferenceTarget::Direct(oid)) => repo
.delete_ref(DeleteRef {
name: FullName::new(name)
.map_err(|error| anyhow!("invalid Git reference '{name}': {error}"))?,
expected_old: Some(expected.unwrap_or(oid)),
expected: None,
reflog: None,
reflog_committer: None,
})
.map_err(|error| anyhow!("failed to delete Git reference '{name}': {error}")),
Some(ReferenceTarget::Symbolic(_)) => refs
.delete_symbolic_ref(name)
.map(|_| ())
.map_err(|error| anyhow!("failed to delete Git reference '{name}': {error}")),
None => Err(anyhow!(
"failed to delete Git reference '{name}': ref is missing"
)),
}
}
fn git_signature() -> Signature {
let seconds = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs() as i64)
.unwrap_or(0);
let name = "Heddle";
let email = "heddle@local";
Signature {
name: GitByteString::new(name.as_bytes().to_vec()),
email: GitByteString::new(email.as_bytes().to_vec()),
time: GitTime::new(seconds, 0),
raw: format!("{name} <{email}> {seconds} +0000").into_bytes(),
}
}
fn git_reflog_entry(
old_oid: ObjectId,
new_oid: ObjectId,
message: &str,
) -> sley::plumbing::sley_refs::ReflogEntry {
sley::plumbing::sley_refs::ReflogEntry {
old_oid,
new_oid,
committer: git_signature().to_ident_bytes(),
message: message.as_bytes().to_vec(),
}
}
fn set_reference_with_reflog(
repo: &SleyRepository,
name: &str,
target: ObjectId,
constraint: RefPrecondition,
log_message: &str,
) -> Result<()> {
let refs = repo.references();
let old_oid = match refs
.read_ref(name)
.map_err(|error| anyhow!("failed to inspect Git reference '{name}': {error}"))?
{
Some(ReferenceTarget::Direct(oid)) => oid,
_ => ObjectId::null(repo.object_format()),
};
let reflog = git_reflog_entry(old_oid, target, log_message);
let should_append_head_reflog = name != "HEAD"
&& repo
.head()
.ok()
.and_then(|head| head.symbolic_target.map(|target| target.to_string()))
.as_deref()
== Some(name);
let mut tx = refs.transaction();
tx.update_to(
name.to_string(),
ReferenceTarget::Direct(target),
constraint,
Some(reflog.clone()),
);
tx.commit()
.map_err(|error| anyhow!("failed to update Git reference '{name}': {error}"))?;
if should_append_head_reflog {
refs.append_reflog("HEAD", &reflog)
.map_err(|error| anyhow!("failed to append Git HEAD reflog: {error}"))?;
}
Ok(())
}
fn fsync_file_and_parent(path: &Path) -> Result<()> {
fs::File::open(path)
.and_then(|file| file.sync_all())
.map_err(|error| anyhow!("failed to sync '{}': {error}", path.display()))?;
if let Some(parent) = path.parent() {
fs::File::open(parent)
.and_then(|dir| dir.sync_all())
.map_err(|error| anyhow!("failed to sync '{}': {error}", parent.display()))?;
}
Ok(())
}
fn delete_thread_safely(steps: &mut EntrySteps, name: &ThreadName) -> HeddleResult<()> {
let repo = steps.repo();
if let Head::Attached { thread } = repo.head_ref()?
&& thread == *name
{
let state = repo.refs().get_thread(name)?.ok_or_else(|| {
HeddleError::Conflict(
thread_not_found_advice(name.as_str(), "delete thread").to_string(),
)
})?;
steps.write_head(Head::Detached { state })?;
}
steps.delete_thread(name.as_str())?;
Ok(())
}
fn sync_thread_record_state(
steps: &mut EntrySteps,
thread_name: &str,
state: objects::object::ChangeId,
) -> HeddleResult<()> {
let manager = ThreadManager::new(steps.repo().heddle_dir());
if let Some(mut thread) = manager.find_by_thread(thread_name)? {
thread.current_state = Some(state.short());
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
}
Ok(())
}
fn mark_source_thread_unintegrated(
steps: &mut EntrySteps,
source_thread: &str,
target_after_undo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
let Some(mut thread) = manager.find_by_thread(source_thread)? else {
return Ok(());
};
let source_tip = repo.refs().get_thread(&ThreadName::new(source_thread))?;
let still_integrated = source_tip
.as_ref()
.is_some_and(|source_tip| change_contains(repo, source_tip, target_after_undo));
if still_integrated {
return Ok(());
}
if matches!(thread.state, ThreadState::Merged) {
thread.state = ThreadState::Ready;
}
if let Some(source_tip) = source_tip {
thread.current_state = Some(source_tip.short());
}
thread.merged_state = None;
if matches!(
thread.integration_policy_result.status.as_deref(),
Some("auto_integrated")
) {
thread.integration_policy_result = ThreadIntegrationPolicy::default();
}
refresh_thread_freshness(repo, &mut thread)?;
if matches!(thread.freshness, ThreadFreshness::Unknown) {
thread.freshness = ThreadFreshness::Current;
}
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
Ok(())
}
fn mark_merged_threads_unintegrated_for_target(
steps: &mut EntrySteps,
target_thread: &str,
integrated_state: &ChangeId,
target_after_undo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
for thread in manager.list()? {
if thread.thread == target_thread
|| thread.target_thread.as_deref() != Some(target_thread)
|| thread.state != ThreadState::Merged
{
continue;
}
let points_at_integrated_state = thread
.merged_state
.as_deref()
.or(thread.current_state.as_deref())
.and_then(|state| repo.resolve_state(state).ok().flatten())
.is_some_and(|state| state == *integrated_state);
if points_at_integrated_state {
mark_source_thread_unintegrated(steps, &thread.thread, target_after_undo)?;
}
}
Ok(())
}
fn mark_source_thread_integrated(
steps: &mut EntrySteps,
source_thread: &str,
target_after_redo: &ChangeId,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
let Some(mut thread) = manager.find_by_thread(source_thread)? else {
return Ok(());
};
let source_tip = repo.refs().get_thread(&ThreadName::new(source_thread))?;
let integrated = source_tip
.as_ref()
.is_some_and(|source_tip| change_contains(repo, source_tip, target_after_redo));
if !integrated {
return Ok(());
}
thread.state = ThreadState::Merged;
thread.merged_state = Some(target_after_redo.short());
thread.current_state = source_tip
.map(|source_tip| source_tip.short())
.or_else(|| Some(target_after_redo.short()));
thread.integration_policy_result = ThreadIntegrationPolicy {
status: Some("auto_integrated".to_string()),
reason: Some("redo restored integrated target state".to_string()),
manual_resolution_state: thread.integration_policy_result.manual_resolution_state,
conflicts_resolved_manually: thread.integration_policy_result.conflicts_resolved_manually,
};
thread.freshness = ThreadFreshness::Current;
thread.updated_at = chrono::Utc::now();
steps.save_thread_record(thread)?;
Ok(())
}
fn mark_ready_threads_integrated_for_target(
steps: &mut EntrySteps,
target_thread: &str,
integrated_state: &ChangeId,
target_before_redo: &Option<ChangeId>,
) -> HeddleResult<()> {
let repo = steps.repo();
let manager = ThreadManager::new(repo.heddle_dir());
for thread in manager.list()? {
if thread.thread == target_thread
|| thread.target_thread.as_deref() != Some(target_thread)
|| thread.state != ThreadState::Ready
{
continue;
}
let Some(source_tip) = repo.refs().get_thread(&ThreadName::new(&thread.thread))? else {
continue;
};
let newly_integrated = change_contains(repo, &source_tip, integrated_state)
&& !target_before_redo
.as_ref()
.is_some_and(|before| change_contains(repo, &source_tip, before));
if newly_integrated {
mark_source_thread_integrated(steps, &thread.thread, integrated_state)?;
}
}
Ok(())
}
fn change_contains(repo: &Repository, ancestor: &ChangeId, descendant: &ChangeId) -> bool {
let mut graph = CommitGraphIndex::new(repo);
graph.is_ancestor(ancestor, descendant).unwrap_or(false)
}
fn remove_thread_manager_record(steps: &mut EntrySteps, thread_name: &str) -> HeddleResult<()> {
steps.converge_thread_records(thread_name, Vec::new())
}
fn apply_error(err: anyhow::Error) -> HeddleError {
HeddleError::Conflict(format!("{err:#}"))
}
fn visibility_superseded_conflict(state: &ChangeId) -> HeddleError {
HeddleError::Conflict(format!(
"cannot undo/redo visibility on state {}: a concurrent `visibility set`/`promote` \
superseded the sidecar. The newer record is preserved; re-run undo/redo after \
refreshing.",
state.to_string_full()
))
}
pub(super) fn acquire_undo_redo_lock(repo: &Repository) -> Result<WriteLockGuard> {
RepoLock::at(repo.heddle_dir().join("locks/undo-redo.lock"))
.write()
.map_err(|e| anyhow!("failed to acquire undo/redo serialization lock: {e}"))
}
pub(super) fn undo_redo_transaction_id(
action: &str,
scope: &str,
generation: u64,
batches: &[OpBatch],
) -> String {
let ids: Vec<String> = batches.iter().map(|batch| batch.id.to_string()).collect();
format!("{action}:{scope}:gen{generation}:[{}]", ids.join(","))
}
struct StageUndoRecovery {
head: Option<ChangeId>,
}
impl AtomicMutation for StageUndoRecovery {
type Output = ();
fn transaction_id(&self) -> String {
"undo:stage-recovery".to_string()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
let mut keys = BTreeSet::new();
keys.insert(IsolationKey::LocalHead {
scope: repo.op_scope(),
});
Ok(keys)
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<()>> {
let Some(state) = self.head else {
return Ok(StagedCommit::pure(()));
};
let repo = tx.repo();
let prior = repo.refs().get_undo_recovery()?;
tx.step(
|| repo.refs().set_undo_recovery(&state),
move || match prior {
Some(prior) => repo.refs().set_undo_recovery(&prior),
None => repo.refs().clear_undo_recovery(),
},
)?;
Ok(StagedCommit::pure(()))
}
}
impl DeferredMutation for StageUndoRecovery {}
struct ApplyUndoBatch {
batch: OpBatch,
#[cfg(test)]
fail_after_entries: Option<usize>,
}
impl ApplyUndoBatch {
fn new(batch: OpBatch) -> Self {
Self {
batch,
#[cfg(test)]
fail_after_entries: None,
}
}
#[cfg(test)]
fn failing_after(batch: OpBatch, entries: usize) -> Self {
Self {
batch,
fail_after_entries: Some(entries),
}
}
}
impl AtomicMutation for ApplyUndoBatch {
type Output = OpBatch;
fn transaction_id(&self) -> String {
format!("undo:batch:{}", self.batch.id)
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(
std::slice::from_ref(&self.batch),
&repo.op_scope(),
))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<OpBatch>> {
let mut steps = EntrySteps::new(tx);
for (applied, entry) in self.batch.entries.iter().rev().enumerate() {
apply_undo_entry(&mut steps, entry)?;
#[cfg(test)]
if self.fail_after_entries == Some(applied + 1) {
return Err(HeddleError::Conflict("injected mid-undo fault".to_string()));
}
#[cfg(not(test))]
let _ = applied;
}
let updated = steps.mark_batch_undone(&self.batch)?;
Ok(StagedCommit::pure(updated))
}
}
impl DeferredMutation for ApplyUndoBatch {}
struct ApplyRedoBatch {
batch: OpBatch,
#[cfg(test)]
fail_after_entries: Option<usize>,
}
impl ApplyRedoBatch {
fn new(batch: OpBatch) -> Self {
Self {
batch,
#[cfg(test)]
fail_after_entries: None,
}
}
#[cfg(test)]
fn failing_after(batch: OpBatch, entries: usize) -> Self {
Self {
batch,
fail_after_entries: Some(entries),
}
}
}
impl AtomicMutation for ApplyRedoBatch {
type Output = OpBatch;
fn transaction_id(&self) -> String {
format!("redo:batch:{}", self.batch.id)
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(
std::slice::from_ref(&self.batch),
&repo.op_scope(),
))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<OpBatch>> {
let mut steps = EntrySteps::new(tx);
for (applied, entry) in self.batch.entries.iter().enumerate() {
apply_redo_entry(&mut steps, entry)?;
#[cfg(test)]
if self.fail_after_entries == Some(applied + 1) {
return Err(HeddleError::Conflict("injected mid-redo fault".to_string()));
}
#[cfg(not(test))]
let _ = applied;
}
let updated = steps.mark_batch_redone(&self.batch)?;
Ok(StagedCommit::pure(updated))
}
}
impl DeferredMutation for ApplyRedoBatch {}
fn isolation_keys_for_batches(batches: &[OpBatch], scope: &str) -> BTreeSet<IsolationKey> {
let mut keys = BTreeSet::new();
for batch in batches {
for entry in &batch.entries {
keys.extend(isolation_keys_for_record(
&entry.operation,
entry.scope.as_deref(),
));
}
}
keys.insert(IsolationKey::LocalHead {
scope: scope.to_string(),
});
keys
}
pub(super) struct UndoOp {
batches: Vec<OpBatch>,
recovery_head: Option<ChangeId>,
transaction_id: String,
}
impl UndoOp {
pub(super) fn new(
batches: Vec<OpBatch>,
recovery_head: Option<ChangeId>,
transaction_id: String,
) -> Self {
Self {
batches,
recovery_head,
transaction_id,
}
}
}
impl AtomicMutation for UndoOp {
type Output = Vec<OpBatch>;
fn transaction_id(&self) -> String {
self.transaction_id.clone()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<Vec<OpBatch>>> {
tx.enroll(StageUndoRecovery {
head: self.recovery_head,
})?;
let mut updated = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let staged = tx.enroll(ApplyUndoBatch::new(batch.clone()))?;
updated.push(staged.output);
}
Ok(StagedCommit::pure(updated))
}
}
pub(super) struct RedoOp {
batches: Vec<OpBatch>,
transaction_id: String,
}
impl RedoOp {
pub(super) fn new(batches: Vec<OpBatch>, transaction_id: String) -> Self {
Self {
batches,
transaction_id,
}
}
}
impl AtomicMutation for RedoOp {
type Output = Vec<OpBatch>;
fn transaction_id(&self) -> String {
self.transaction_id.clone()
}
fn isolation_keys(&self, repo: &Repository) -> HeddleResult<BTreeSet<IsolationKey>> {
Ok(isolation_keys_for_batches(&self.batches, &repo.op_scope()))
}
fn apply(&mut self, tx: &mut Tx<'_>) -> HeddleResult<StagedCommit<Vec<OpBatch>>> {
let mut updated = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let staged = tx.enroll(ApplyRedoBatch::new(batch.clone()))?;
updated.push(staged.output);
}
Ok(StagedCommit::pure(updated))
}
}
#[cfg(test)]
mod head_symref_tests {
use super::attach_git_head_to_branch;
use sley::{HeadUpdateOptions, Repository as SleyRepository};
#[test]
fn attach_git_head_writes_legacy_head_bytes() {
let tmp = tempfile::TempDir::new().unwrap();
let git_dir = tmp.path().join(".git");
let repo = SleyRepository::init_bare(&git_dir).expect("init bare");
attach_git_head_to_branch(&repo, "feature").expect("attach HEAD");
assert_eq!(
std::fs::read_to_string(git_dir.join("HEAD")).expect("read HEAD"),
"ref: refs/heads/feature\n"
);
repo.set_head_symref("refs/heads/other", HeadUpdateOptions::new())
.expect("direct symref");
attach_git_head_to_branch(&repo, "main").expect("reattach");
assert_eq!(
std::fs::read_to_string(git_dir.join("HEAD")).unwrap(),
"ref: refs/heads/main\n"
);
}
}
#[cfg(test)]
mod atomic_tests;