use std::{
fs,
path::{Path, PathBuf},
};
use anyhow::{Result, anyhow};
use chrono::Utc;
use objects::{
fs_ops::remove_path_recursively,
object::{ChangeId, ThreadName},
store::{AgentRegistry, ObjectStore},
};
use oplog::{OpLogRecorder, ThreadUpdateSnapshots};
use refs::Head;
use repo::{
Repository, Thread, ThreadFreshness, ThreadManager, ThreadMode, ThreadState,
describe_thread_advice,
};
use serde::Serialize;
use tokio::time::{Duration, sleep};
use super::{
action_line::print_next,
advice::RecoveryAdvice,
git_overlay_health::{PlainGitVerificationProbe, build_plain_git_verification_probe},
marker::cmd_thread_marker,
mount_lifecycle,
next_action::normalized_action,
operator_core::{OperatorAction, OperatorCommandOutput},
operator_loop::primary_next_action,
thread::{
cmd_thread_cd, cmd_thread_create, cmd_thread_current, cmd_thread_delete, cmd_thread_list,
cmd_thread_rename, cmd_thread_show, cmd_thread_switch, find_thread_summary,
show_thread_summary,
},
thread_shaping::{cmd_thread_absorb, cmd_thread_move, cmd_thread_resolve},
worktree_cmd::helpers::{prepare_worktree_target, write_isolated_checkout},
worktree_safety::ensure_worktree_clean,
};
use crate::cli::{Cli, ThreadCleanupArgs, ThreadCommands, should_output_json, style};
#[derive(Serialize)]
struct ThreadOutput {
#[serde(flatten)]
operator: OperatorCommandOutput,
thread: Thread,
changed_path_count: usize,
}
pub(crate) struct ThreadRefState {
pub state: ChangeId,
pub ref_absent: bool,
}
pub(crate) struct ThreadUpdateBefore {
pub state: ChangeId,
pub ref_absent: bool,
pub manager_snapshot: Option<Vec<u8>>,
pub manager_records: Vec<Thread>,
}
pub(crate) fn thread_manager(repo: &Repository) -> ThreadManager {
ThreadManager::new(repo.heddle_dir())
}
pub(crate) fn current_thread_ref_state_with_presence(
repo: &Repository,
thread: &Thread,
) -> Result<ThreadRefState> {
if let Some(state) = repo.refs().get_thread(&ThreadName::new(&thread.thread))? {
return Ok(ThreadRefState {
state,
ref_absent: false,
});
}
let state = if let Some(current_state) = thread.current_state.as_deref() {
repo.resolve_state(current_state)?
.ok_or_else(|| anyhow!("current state not found for thread '{}'", thread.thread))?
} else {
repo.resolve_state(&thread.base_state)?
.ok_or_else(|| anyhow!("base state not found for thread '{}'", thread.thread))?
};
Ok(ThreadRefState {
state,
ref_absent: true,
})
}
pub(crate) fn current_thread_ref_state(repo: &Repository, thread: &Thread) -> Result<ChangeId> {
Ok(current_thread_ref_state_with_presence(repo, thread)?.state)
}
pub(crate) fn capture_thread_update_before(
repo: &Repository,
manager: &ThreadManager,
thread: &Thread,
) -> Result<ThreadUpdateBefore> {
let ref_state = current_thread_ref_state_with_presence(repo, thread)?;
Ok(ThreadUpdateBefore {
state: ref_state.state,
ref_absent: ref_state.ref_absent,
manager_snapshot: manager.snapshot_thread_record(&thread.thread)?,
manager_records: manager.snapshot_records(&thread.thread)?,
})
}
pub(crate) fn save_thread_update_with_oplog(
repo: &Repository,
manager: &ThreadManager,
thread: &Thread,
before: ThreadUpdateBefore,
new_state: ChangeId,
) -> Result<()> {
let new_manager_snapshot = Some(manager.encode_thread_record_snapshot(thread)?);
let mut new_manager_records = before.manager_records.clone();
if let Some(existing) = new_manager_records
.iter_mut()
.find(|record| record.id == thread.id)
{
*existing = thread.clone();
} else {
new_manager_records.push(thread.clone());
}
let old_manager_records = encode_thread_records(manager, &before.manager_records)?;
let new_manager_records = encode_thread_records(manager, &new_manager_records)?;
objects::fault_inject::maybe_fail_at("thread_manager_save_in_thread_update")?;
manager.save(thread)?;
repo.oplog().record_thread_update(
&ThreadName::new(&thread.thread),
&before.state,
&new_state,
ThreadUpdateSnapshots::from_record_sets(
before.manager_snapshot,
new_manager_snapshot,
old_manager_records,
new_manager_records,
before.ref_absent,
),
Some(&repo.op_scope()),
)?;
Ok(())
}
fn encode_thread_records(manager: &ThreadManager, records: &[Thread]) -> Result<Vec<Vec<u8>>> {
records
.iter()
.map(|record| {
manager
.encode_thread_record_snapshot(record)
.map_err(Into::into)
})
.collect()
}
pub(crate) fn resolve_thread_name_or_current(
repo: &Repository,
name: Option<String>,
command: &'static str,
primary_command: impl Into<String>,
) -> Result<String> {
if let Some(name) = name {
return Ok(name);
}
if let Some(lane) = repo.current_lane()? {
return Ok(lane);
}
if let Some(thread) = current_thread(repo)? {
return Ok(thread.thread);
}
Err(anyhow!(RecoveryAdvice::no_current_thread(
command,
Some("<THREAD>"),
primary_command,
)))
}
pub(crate) fn current_thread(repo: &Repository) -> Result<Option<Thread>> {
if let Some(thread) = thread_manager(repo).find_by_execution_root(repo.root())? {
return Ok(Some(thread));
}
let Head::Attached { thread } = repo.head_ref()? else {
return Ok(None);
};
let current_state = repo.refs().get_thread(&thread)?.map(|id| id.short());
let base_root = current_state
.as_deref()
.and_then(|state| repo.resolve_state(state).ok().flatten())
.and_then(|id| repo.store().get_state(&id).ok().flatten())
.map(|state| state.tree.short())
.unwrap_or_default();
let thread_str = thread.to_string();
Ok(Some(Thread {
id: thread_str.clone(),
thread: thread_str,
target_thread: None,
parent_thread: None,
mode: ThreadMode::Materialized,
state: ThreadState::Active,
base_state: current_state.clone().unwrap_or_default(),
base_root,
current_state,
merged_state: None,
task: None,
execution_path: repo.root().to_path_buf(),
materialized_path: None,
changed_paths: Vec::new(),
impact_categories: Vec::new(),
heavy_impact_paths: Vec::new(),
promotion_suggested: false,
freshness: ThreadFreshness::Unknown,
verification_summary: Default::default(),
confidence_summary: Default::default(),
integration_policy_result: Default::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
ephemeral: None,
auto: false,
shared_target_dir: None,
}))
}
pub(crate) fn load_thread(repo: &Repository, thread_id: &str) -> Result<Thread> {
match thread_manager(repo).load(thread_id)? {
Some(thread) => Ok(thread),
None if repo
.refs()
.get_thread(&ThreadName::new(thread_id))?
.is_some() =>
{
Err(anyhow!(imported_git_ref_not_managed_thread_advice(
thread_id
)))
}
None => Err(anyhow!(thread_not_found_advice(thread_id, "load thread"))),
}
}
fn render_plain_git_thread_list(cli: &Cli, probe: &PlainGitVerificationProbe) -> Result<()> {
if should_output_json(cli, None) {
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"repository_capability": "plain-git",
"storage_model": "git",
"hosted_enabled": false,
"threads": [],
"current": null,
"verification": &probe.trust,
"recommended_action": &probe.trust.recommended_action,
"recommended_action_template": &probe.trust.recommended_action_template,
"recovery_commands": &probe.trust.recovery_commands,
"recovery_action_templates": &probe.trust.recovery_action_templates,
}))?
);
} else {
println!("Git repo, Heddle not adopted");
if let Some(branch) = &probe.git_branch {
println!("Git branch: {}", style::bold(branch));
}
println!("Threads: none");
println!(
"Next step: {}",
style::bold(probe.trust.recommended_action.as_str())
);
}
Ok(())
}
fn render_plain_git_thread_show(
cli: &Cli,
probe: &PlainGitVerificationProbe,
requested: Option<&str>,
) -> Result<()> {
if should_output_json(cli, None) {
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"repository_capability": "plain-git",
"storage_model": "git",
"requested_thread": requested,
"thread": null,
"verification": &probe.trust,
"recommended_action": &probe.trust.recommended_action,
"recommended_action_template": &probe.trust.recommended_action_template,
"recovery_commands": &probe.trust.recovery_commands,
"recovery_action_templates": &probe.trust.recovery_action_templates,
}))?
);
} else {
println!("Git repo, Heddle not adopted");
if let Some(name) = requested {
println!("Thread: {}", style::bold(name));
}
println!("No Heddle thread is available until this Git repo is adopted.");
println!(
"Next step: {}",
style::bold(probe.trust.recommended_action.as_str())
);
}
Ok(())
}
pub(crate) use repo::refresh_thread_freshness;
pub async fn cmd_thread(cli: &Cli, command: ThreadCommands) -> Result<()> {
let current_dir = std::env::current_dir()?;
let repo_path = cli.repo.as_ref().unwrap_or(¤t_dir);
match &command {
ThreadCommands::List(_) => {
if let Some(probe) = build_plain_git_verification_probe(repo_path)? {
return render_plain_git_thread_list(cli, &probe);
}
}
ThreadCommands::Show(args) if !args.watch => {
if let Some(probe) = build_plain_git_verification_probe(repo_path)? {
return render_plain_git_thread_show(cli, &probe, args.thread.as_deref());
}
}
_ => {}
}
let repo = Repository::open(repo_path)?;
match command {
ThreadCommands::Create {
name,
ephemeral,
ttl_secs,
} => cmd_thread_create(cli, &repo, name, ephemeral, ttl_secs),
ThreadCommands::Switch {
name,
print_cd_path,
force,
} => cmd_thread_switch(cli, &repo, name, print_cd_path, force),
ThreadCommands::Current => cmd_thread_current(cli, &repo),
ThreadCommands::Cd { name } => cmd_thread_cd(&repo, name),
ThreadCommands::List(args) => cmd_thread_list(cli, &repo, args),
ThreadCommands::Cleanup(args) => cmd_thread_cleanup(cli, &repo, args),
ThreadCommands::Marker { command } => cmd_thread_marker(cli, &repo, command),
ThreadCommands::Show(args) => {
if args.watch {
let thread = resolve_thread_name_or_current(
&repo,
args.thread,
"thread show",
"heddle thread show <THREAD>",
)?;
watch_thread_show(
cli,
&repo,
&thread,
args.watch_iterations,
args.watch_interval_ms,
)
.await
} else {
cmd_thread_show(cli, &repo, args.thread)
}
}
ThreadCommands::Captures(args) => {
let thread = resolve_thread_name_or_current(
&repo,
args.thread,
"thread captures",
"heddle thread captures <THREAD>",
)?;
super::thread::cmd_thread_captures(cli, &repo, &thread, args.limit)
}
ThreadCommands::Rename(args) => cmd_thread_rename(cli, &repo, args.old, args.new),
ThreadCommands::Refresh(args) => {
let thread = resolve_thread_name_or_current(
&repo,
args.thread,
"thread refresh",
"heddle thread refresh <THREAD>",
)?;
cmd_thread_refresh(cli, &repo, &thread)
}
ThreadCommands::Move(args) => {
cmd_thread_move(cli, args.from, args.to, args.paths, args.message)
}
ThreadCommands::Absorb(args) => {
cmd_thread_absorb(cli, args.thread, args.into, args.message, args.preview)
}
ThreadCommands::Resolve(args) => cmd_thread_resolve(cli, args.thread),
ThreadCommands::Promote(args) => {
cmd_thread_promote(cli, &repo, &args.thread, args.path, args.force)
}
ThreadCommands::Drop(args) => {
cmd_thread_drop(cli, &repo, &args.thread, args.delete_thread, args.force)
}
#[cfg(feature = "client")]
ThreadCommands::Approve(args) => {
require_hosted_repo(&repo, "thread approvals")?;
super::thread_approval::cmd_thread_approve(cli, args).await
}
#[cfg(feature = "client")]
ThreadCommands::Approvals(args) => {
require_hosted_repo(&repo, "thread approvals")?;
super::thread_approval::cmd_thread_approvals(cli, args).await
}
#[cfg(feature = "client")]
ThreadCommands::RevokeApproval(args) => {
require_hosted_repo(&repo, "thread approvals")?;
super::thread_approval::cmd_thread_revoke_approval(cli, args).await
}
#[cfg(feature = "client")]
ThreadCommands::CheckMerge(args) => {
require_hosted_repo(&repo, "hosted merge checks")?;
super::thread_approval::cmd_thread_check_merge(cli, args).await
}
#[cfg(not(feature = "client"))]
ThreadCommands::Approve(_)
| ThreadCommands::Approvals(_)
| ThreadCommands::RevokeApproval(_)
| ThreadCommands::CheckMerge(_) => Err(anyhow!(
"rebuild cli with --features client to use thread approvals"
)),
}
}
#[cfg(feature = "client")]
fn require_hosted_repo(repo: &Repository, feature: &str) -> Result<()> {
if repo.hosted_enabled() {
Ok(())
} else {
Err(anyhow!(
"{} require a repository linked to a Heddle hosted upstream. Configure [hosted] in .heddle/config.toml or run this in a hosted-enabled repository.",
feature
))
}
}
async fn watch_thread_show(
cli: &Cli,
repo: &Repository,
thread_id: &str,
watch_iterations: Option<usize>,
watch_interval_ms: Option<u64>,
) -> Result<()> {
let interval = Duration::from_millis(watch_interval_ms.unwrap_or(1000));
let mut iterations = 0usize;
loop {
let summary = find_thread_summary(repo, thread_id)?
.ok_or_else(|| anyhow!(thread_not_found_advice(thread_id, "watch thread")))?;
show_thread_summary(cli, repo, &summary)?;
iterations += 1;
if watch_iterations.is_some_and(|limit| iterations >= limit) {
break;
}
sleep(interval).await;
}
Ok(())
}
fn cmd_thread_refresh(cli: &Cli, repo: &Repository, thread_id: &str) -> Result<()> {
let thread = refresh_thread(repo, thread_id, cli)?;
print_thread_output(
cli,
repo,
OperatorAction::ThreadRefresh,
thread,
format!("Refreshed thread '{}'", thread_id),
)
}
pub(crate) fn refresh_thread(repo: &Repository, thread_id: &str, _cli: &Cli) -> Result<Thread> {
let manager = thread_manager(repo);
let mut thread = manager
.load(thread_id)?
.ok_or_else(|| anyhow!(thread_not_found_advice(thread_id, "refresh thread")))?;
let target_thread = thread.target_thread.clone().ok_or_else(|| {
anyhow!(RecoveryAdvice::missing_target_thread(
thread_id,
"refresh thread"
))
})?;
refresh_thread_freshness(repo, &mut thread)?;
if thread.freshness == ThreadFreshness::Current {
return Ok(thread);
}
let opened_thread_repo;
let current_lane = repo.current_lane()?;
let thread_repo = if thread.execution_path.as_os_str().is_empty() {
if current_lane.as_deref() == Some(thread.thread.as_str()) {
repo
} else {
return Err(anyhow!(thread_refresh_requires_checkout_advice(
&thread.thread,
current_lane.as_deref(),
)));
}
} else {
opened_thread_repo = Repository::open(&thread.execution_path).map_err(|error| {
anyhow!(thread_refresh_checkout_unavailable_advice(
&thread.thread,
&thread.execution_path,
error
))
})?;
&opened_thread_repo
};
if let Some(ThreeWayMergeRefresh::Conflicted {
tree,
paths,
ours,
theirs,
base,
}) = preflight_three_way_refresh_conflict(repo, thread_repo, &thread, &target_thread)?
{
persist_refresh_conflict_state(thread_repo, tree, ours, theirs, base, paths.clone())?;
return Err(anyhow!(thread_refresh_conflicted_advice(
thread_id,
&paths,
thread_repo.root(),
)));
}
let before_update = capture_thread_update_before(repo, &manager, &thread)?;
let rebase_state_path = thread_repo.heddle_dir().join("REBASE_STATE");
if rebase_state_path.exists() {
super::rebase::cmd_rebase_silent(thread_repo, None, false, true)?;
} else {
super::rebase::cmd_rebase_silent(thread_repo, Some(&target_thread), false, false)?;
}
if rebase_state_path.exists() {
let rebase_state = super::rebase::load_persisted_rebase_state(&rebase_state_path)?;
let current_state = thread_repo
.head()?
.ok_or_else(|| anyhow!("Thread '{}' has no current state after refresh", thread_id))?;
if rebase_state
.pending_manual_resolution
.is_some_and(|pending| pending != current_state)
{
fs::remove_file(&rebase_state_path)?;
thread_repo
.refs()
.set_thread(&ThreadName::new(&thread.thread), ¤t_state)?;
thread.integration_policy_result.status = Some("manual_resolved".to_string());
thread.integration_policy_result.reason =
Some("manual integration resolution captured".to_string());
thread.integration_policy_result.manual_resolution_state = Some(current_state.short());
thread.integration_policy_result.conflicts_resolved_manually = true;
} else {
match try_three_way_merge_refresh(repo, thread_repo, &thread, &target_thread) {
Err(error) => {
restore_refresh_rebase_abort(thread_repo, &rebase_state_path)?;
return Err(error);
}
Ok(ThreeWayMergeRefresh::Clean { new_state }) => {
let _ = fs::remove_file(&rebase_state_path);
thread.integration_policy_result.status = Some("manual_resolved".to_string());
thread.integration_policy_result.reason =
Some("thread refreshed cleanly via 3-way merge fallback".to_string());
thread.integration_policy_result.manual_resolution_state =
Some(new_state.short());
thread.integration_policy_result.conflicts_resolved_manually = false;
}
Ok(ThreeWayMergeRefresh::Conflicted {
tree,
paths,
ours,
theirs,
base,
}) => {
restore_refresh_rebase_abort(thread_repo, &rebase_state_path)?;
persist_refresh_conflict_state(
thread_repo,
tree,
ours,
theirs,
base,
paths.clone(),
)?;
return Err(anyhow!(thread_refresh_conflicted_advice(
thread_id,
&paths,
thread_repo.root(),
)));
}
}
}
}
let current_state = thread_repo
.head()?
.ok_or_else(|| anyhow!("Thread '{}' has no current state after refresh", thread_id))?;
let target_state = repo
.refs()
.get_thread(&ThreadName::new(&target_thread))?
.ok_or_else(|| anyhow!(thread_not_found_advice(&target_thread, "refresh thread")))?;
let target_state_obj = repo
.store()
.get_state(&target_state)?
.ok_or_else(|| anyhow!("Target state not found"))?;
thread.base_state = target_state.short();
thread.base_root = target_state_obj.tree.short();
thread.current_state = Some(current_state.short());
thread.integration_policy_result.status = Some("manual_resolved".to_string());
thread.integration_policy_result.reason =
Some("thread refreshed cleanly onto target".to_string());
thread.integration_policy_result.manual_resolution_state = Some(current_state.short());
thread.integration_policy_result.conflicts_resolved_manually = false;
thread.updated_at = Utc::now();
thread.freshness = ThreadFreshness::Current;
save_thread_update_with_oplog(repo, &manager, &thread, before_update, current_state)?;
Ok(thread)
}
fn restore_refresh_rebase_abort(repo: &Repository, rebase_state_path: &Path) -> Result<()> {
let rebase_state = super::rebase::load_persisted_rebase_state(rebase_state_path)?;
let head_before = repo.head_ref()?;
repo.goto_without_record_discard_local(&rebase_state.original_head)?;
if let Head::Attached { thread } = head_before {
repo.refs()
.set_thread(&thread, &rebase_state.original_head)?;
repo.refs().write_head(&Head::Attached {
thread: thread.clone(),
})?;
let manager = thread_manager(repo);
if let Some(mut metadata) = manager.find_by_thread(&thread)? {
let state = repo
.store()
.get_state(&rebase_state.original_head)?
.ok_or_else(|| anyhow!("Original rebase state not found"))?;
repo::update_thread_state_from_state(&mut metadata, &state);
refresh_thread_freshness(repo, &mut metadata)?;
metadata.updated_at = Utc::now();
manager.save(&metadata)?;
}
}
if rebase_state_path.exists() {
fs::remove_file(rebase_state_path)?;
}
Ok(())
}
fn persist_refresh_conflict_state(
thread_repo: &Repository,
tree: objects::object::Tree,
ours: objects::object::ChangeId,
theirs: objects::object::ChangeId,
base: Option<objects::object::ChangeId>,
paths: Vec<String>,
) -> Result<()> {
super::merge::apply_merged_tree_external(thread_repo, &tree)?;
ensure_refresh_conflict_markers_materialized(thread_repo, &ours, &theirs, &paths)?;
thread_repo
.merge_state_manager()
.start(ours, theirs, base, paths)?;
Ok(())
}
fn ensure_refresh_conflict_markers_materialized(
repo: &Repository,
ours: &objects::object::ChangeId,
theirs: &objects::object::ChangeId,
paths: &[String],
) -> Result<()> {
let ours_tree = tree_for_state(repo, ours)?;
let theirs_tree = tree_for_state(repo, theirs)?;
for path in paths {
let full_path = repo.root().join(path);
let existing = fs::read(&full_path).unwrap_or_default();
if contains_conflict_marker_bytes(&existing) {
continue;
}
let ours_content = conflict_side_content(repo, &ours_tree, path)?;
let theirs_content = conflict_side_content(repo, &theirs_tree, path)?;
if let Some(parent) = full_path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(
full_path,
format_refresh_conflict_markers(&ours_content, &theirs_content),
)?;
}
Ok(())
}
fn tree_for_state(
repo: &Repository,
state_id: &objects::object::ChangeId,
) -> Result<objects::object::Tree> {
let state = repo
.store()
.get_state(state_id)?
.ok_or_else(|| anyhow!("State '{}' not found", state_id.short()))?;
repo.require_tree(&state.tree).map_err(Into::into)
}
fn conflict_side_content(
repo: &Repository,
tree: &objects::object::Tree,
path: &str,
) -> Result<Vec<u8>> {
let Some(entry) = tree.get(path) else {
return Ok(Vec::new());
};
if entry.entry_type != objects::object::EntryType::Blob
&& entry.entry_type != objects::object::EntryType::Symlink
{
return Ok(Vec::new());
}
Ok(repo.require_blob(&entry.hash)?.content().to_vec())
}
fn contains_conflict_marker_bytes(content: &[u8]) -> bool {
content
.windows("<<<<<<<".len())
.any(|window| window == b"<<<<<<<")
&& content
.windows("=======".len())
.any(|window| window == b"=======")
&& content
.windows(">>>>>>>".len())
.any(|window| window == b">>>>>>>")
}
fn format_refresh_conflict_markers(ours: &[u8], theirs: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(b"<<<<<<< CURRENT\n");
out.extend_from_slice(ours);
if !ours.ends_with(b"\n") {
out.push(b'\n');
}
out.extend_from_slice(b"=======\n");
out.extend_from_slice(theirs);
if !theirs.ends_with(b"\n") {
out.push(b'\n');
}
out.extend_from_slice(b">>>>>>> INCOMING\n");
out
}
fn thread_refresh_conflicted_advice(
thread_id: &str,
paths: &[String],
conflict_repo: &Path,
) -> RecoveryAdvice {
let path_summary = if paths.is_empty() {
"merge conflict detected".to_string()
} else {
paths
.iter()
.take(12)
.cloned()
.collect::<Vec<_>>()
.join(", ")
};
let overflow = paths.len().saturating_sub(12);
let unsafe_condition = if overflow == 0 {
format!("{} conflicting path(s): {}", paths.len(), path_summary)
} else {
format!(
"{} conflicting path(s): {}, and {} more",
paths.len(),
path_summary,
overflow
)
};
let repo_arg = quote_recommended_action_arg(&conflict_repo.display().to_string());
let conflict_list_command = format!("heddle --repo {repo_arg} resolve --list");
let resolve_command = paths.first().map(|path| {
format!(
"heddle --repo {repo_arg} resolve {}",
quote_recommended_action_arg(path)
)
});
let continue_command = format!("heddle --repo {repo_arg} continue");
let preview_command = super::thread_landing::merge_preview_command(thread_id);
let mut recovery_commands = vec![conflict_list_command.clone()];
if let Some(resolve) = resolve_command.clone() {
recovery_commands.push(resolve);
}
recovery_commands.push(continue_command.clone());
recovery_commands.push(preview_command.clone());
RecoveryAdvice::safety_refusal(
"thread_refresh_conflicted",
format!(
"Thread '{thread_id}' could not be refreshed cleanly: {} conflicting path(s) ({path_summary})",
paths.len(),
),
format!(
"Refresh wrote conflict markers and merge state in {}. Inspect them with `{conflict_list_command}`, resolve the files, then run `{continue_command}`.",
conflict_repo.display()
),
unsafe_condition,
"refresh would need resolved file contents before advancing the thread ref",
"the thread ref was left unchanged; conflict markers and merge state were written to the thread checkout for inspection",
conflict_list_command,
recovery_commands,
)
}
fn quote_recommended_action_arg(value: &str) -> String {
if !value.is_empty()
&& value
.bytes()
.all(|b| b.is_ascii_alphanumeric() || matches!(b, b'/' | b'.' | b'_' | b'-' | b'+'))
{
value.to_string()
} else {
format!("\"{}\"", value.replace('\\', "\\\\").replace('"', "\\\""))
}
}
fn thread_refresh_requires_checkout_advice(
thread_id: &str,
current: Option<&str>,
) -> RecoveryAdvice {
let switch_command = format!("heddle switch {thread_id}");
let retry_command = format!("heddle thread refresh {thread_id}");
let current_summary = current
.map(|name| format!("current checkout is attached to '{name}'"))
.unwrap_or_else(|| "current checkout is not attached to a thread".to_string());
RecoveryAdvice::safety_refusal(
"thread_refresh_requires_checkout",
format!("Thread '{thread_id}' must be the current checkout before refresh"),
format!("Run `{switch_command}`, then `{retry_command}`."),
format!("thread '{thread_id}' has no dedicated checkout; {current_summary}"),
"refresh would update the requested thread checkout and cannot safely do that from another branch-like checkout",
"thread refs, checkout files, and Heddle metadata were left unchanged",
switch_command.clone(),
vec![switch_command, retry_command],
)
}
fn thread_refresh_checkout_unavailable_advice(
thread_id: &str,
path: &Path,
error: impl std::fmt::Display,
) -> RecoveryAdvice {
let inspect_command = format!("heddle thread show {thread_id}");
let switch_command = format!("heddle switch {thread_id}");
RecoveryAdvice::safety_refusal(
"thread_refresh_checkout_unavailable",
format!("Thread '{thread_id}' checkout could not be opened"),
format!(
"Inspect the recorded checkout with `{inspect_command}`. If this is a branch-like thread, run `{switch_command}` from the main checkout before refreshing."
),
format!(
"recorded checkout path '{}' is unavailable: {error}",
path.display()
),
"refresh needs a readable Heddle checkout before it can rebase or merge the thread state",
"thread refs, checkout files, and Heddle metadata were left unchanged",
inspect_command.clone(),
vec![inspect_command, switch_command],
)
}
pub(crate) fn thread_not_found_advice(thread_id: &str, action: &str) -> RecoveryAdvice {
RecoveryAdvice::safety_refusal(
"thread_not_found",
format!("Thread '{thread_id}' not found"),
"Inspect available threads with `heddle thread list`, then retry with an existing thread.",
format!("{action} was requested for missing thread '{thread_id}'"),
"the command cannot safely change or remove thread metadata that does not exist",
"no thread refs, checkout directories, mounts, or agent reservations were changed",
"heddle thread list",
vec!["heddle thread list".to_string()],
)
}
fn imported_git_ref_not_managed_thread_advice(thread_id: &str) -> RecoveryAdvice {
let reconcile_preview =
super::git_overlay_health::canonical_bridge_reconcile_ref_preview_command(None, thread_id);
RecoveryAdvice::safety_refusal(
"imported_git_ref_not_managed_thread",
format!("'{thread_id}' is an imported Git ref, not a managed Heddle thread"),
format!(
"Preview Git/Heddle reconciliation with `{reconcile_preview}`. Use managed threads for `ready` and `land`."
),
format!("thread ref '{thread_id}' exists, but no managed thread metadata exists for it"),
"ready/land require managed thread metadata and explicit integration authority; treating an imported Git ref as landable would be ambiguous",
"thread refs, Git refs, checkout files, and thread metadata were left unchanged",
reconcile_preview.clone(),
vec![reconcile_preview, "heddle thread list".to_string()],
)
}
fn current_thread_drop_advice(repo: &Repository, thread_id: &str) -> RecoveryAdvice {
let (primary, recovery, hint) =
super::thread::current_thread_drop_recovery(repo, thread_id, super::thread::DropMode::Drop);
RecoveryAdvice::safety_refusal(
"current_thread_not_droppable",
format!("Thread '{thread_id}' is the current checkout thread and cannot be dropped"),
hint,
format!("drop thread was requested for the attached checkout thread '{thread_id}'"),
"dropping the current checkout would remove the thread that owns this working tree",
"no thread refs, checkout directories, mounts, or agent reservations were changed",
primary,
recovery,
)
}
fn thread_cleanup_mode_required_advice() -> RecoveryAdvice {
RecoveryAdvice::safety_refusal(
"thread_cleanup_mode_required",
"heddle thread cleanup requires at least one mode flag: pass --merged to sweep merged threads, --auto --older-than <duration> to sweep stale auto-threads, or both.",
"Choose `heddle thread cleanup --merged --dry-run` or `heddle thread cleanup --auto --older-than 7d --dry-run` first.",
"thread cleanup was invoked without a mode flag",
"cleanup without an explicit mode could remove threads the user did not mean to sweep",
"no thread metadata, checkout directories, mounts, or reservations were changed",
"heddle thread cleanup --merged --dry-run",
vec![
"heddle thread cleanup --merged --dry-run".to_string(),
"heddle thread cleanup --auto --older-than 7d --dry-run".to_string(),
],
)
}
fn thread_cleanup_auto_required_advice() -> RecoveryAdvice {
RecoveryAdvice::safety_refusal(
"thread_cleanup_auto_required",
"--older-than only applies with --auto; pass --auto to sweep stale auto-threads.",
"Run `heddle thread cleanup --auto --older-than 7d --dry-run`, or remove `--older-than` when sweeping merged threads.",
"`--older-than` was provided without `--auto`",
"cleanup cannot apply age-based filtering unless it is restricted to auto-threads",
"no thread metadata, checkout directories, mounts, or reservations were changed",
"heddle thread cleanup --auto --older-than 7d --dry-run",
vec!["heddle thread cleanup --auto --older-than 7d --dry-run".to_string()],
)
}
fn thread_cleanup_older_than_required_advice() -> RecoveryAdvice {
RecoveryAdvice::safety_refusal(
"thread_cleanup_older_than_required",
"--auto requires --older-than <duration> (e.g. --older-than 7d) so the sweep does not drop a thread you just created.",
"Run `heddle thread cleanup --auto --older-than 7d --dry-run` before sweeping stale auto-threads.",
"`--auto` was provided without an age threshold",
"cleanup cannot distinguish stale auto-threads from freshly created auto-threads",
"no thread metadata, checkout directories, mounts, or reservations were changed",
"heddle thread cleanup --auto --older-than 7d --dry-run",
vec!["heddle thread cleanup --auto --older-than 7d --dry-run".to_string()],
)
}
fn thread_cleanup_duration_advice(spec: &str, detail: impl Into<String>) -> RecoveryAdvice {
let detail = detail.into();
RecoveryAdvice::safety_refusal(
"thread_cleanup_invalid_duration",
detail.clone(),
"Use a non-negative duration like `7d`, `24h`, `30m`, or bare seconds.",
format!("invalid --older-than duration '{spec}': {detail}"),
"cleanup cannot decide which auto-threads are stale without a valid duration",
"no thread metadata, checkout directories, mounts, or reservations were changed",
"heddle thread cleanup --auto --older-than 7d --dry-run",
vec!["heddle thread cleanup --auto --older-than 7d --dry-run".to_string()],
)
}
enum ThreeWayMergeRefresh {
Clean {
new_state: objects::object::ChangeId,
},
Conflicted {
tree: objects::object::Tree,
paths: Vec<String>,
ours: objects::object::ChangeId,
theirs: objects::object::ChangeId,
base: Option<objects::object::ChangeId>,
},
}
fn preflight_three_way_refresh_conflict(
parent_repo: &Repository,
thread_repo: &Repository,
thread: &Thread,
target_thread_name: &str,
) -> Result<Option<ThreeWayMergeRefresh>> {
use super::merge::{
ConflictLabels, MergeStrategy, ThreeWayMergeOutcome, try_three_way_merge_between_tips,
};
let target_tip = parent_repo
.refs()
.get_thread(&ThreadName::new(target_thread_name))?
.ok_or_else(|| {
anyhow!(thread_not_found_advice(
target_thread_name,
"preflight thread refresh",
))
})?;
let thread_tip = parent_repo
.refs()
.get_thread(&ThreadName::new(&thread.thread))?
.ok_or_else(|| {
anyhow!(
"managed thread '{}' is missing its ref during refresh preflight",
thread.thread
)
})?;
let current_label = format!("CURRENT ({})", thread.thread);
let incoming_label = format!("INCOMING ({})", target_thread_name);
match try_three_way_merge_between_tips(
parent_repo,
&thread_tip,
&target_tip,
ConflictLabels {
current: ¤t_label,
incoming: &incoming_label,
strategy: MergeStrategy::Semantic,
},
)? {
ThreeWayMergeOutcome::Conflicted { tree, paths, base } => {
let _ = thread_repo;
Ok(Some(ThreeWayMergeRefresh::Conflicted {
tree,
paths,
ours: thread_tip,
theirs: target_tip,
base: Some(base),
}))
}
ThreeWayMergeOutcome::Clean { .. }
| ThreeWayMergeOutcome::AlreadyIntegrated { .. }
| ThreeWayMergeOutcome::FastForward { .. } => Ok(None),
}
}
fn try_three_way_merge_refresh(
parent_repo: &Repository,
thread_repo: &Repository,
thread: &Thread,
target_thread_name: &str,
) -> Result<ThreeWayMergeRefresh> {
use objects::object::Attribution;
use super::merge::{
ConflictLabels, MergeStrategy, ThreeWayMergeOutcome, try_three_way_merge_between_tips,
};
let target_tip = parent_repo
.refs()
.get_thread(&ThreadName::new(target_thread_name))?
.ok_or_else(|| {
anyhow!(thread_not_found_advice(
target_thread_name,
"refresh thread",
))
})?;
let thread_tip = parent_repo
.refs()
.get_thread(&ThreadName::new(&thread.thread))?
.ok_or_else(|| {
anyhow!(
"managed thread '{}' is missing its ref during refresh",
thread.thread
)
})?;
let current_label = format!("CURRENT ({})", thread.thread);
let incoming_label = format!("INCOMING ({})", target_thread_name);
let outcome = try_three_way_merge_between_tips(
parent_repo,
&thread_tip,
&target_tip,
ConflictLabels {
current: ¤t_label,
incoming: &incoming_label,
strategy: MergeStrategy::Semantic,
},
)?;
match outcome {
ThreeWayMergeOutcome::AlreadyIntegrated { target } => {
Ok(ThreeWayMergeRefresh::Clean { new_state: target })
}
ThreeWayMergeOutcome::FastForward { target } => {
parent_repo
.refs()
.set_thread(&ThreadName::new(&thread.thread), &target)?;
let target_state = parent_repo
.store()
.get_state(&target)?
.ok_or_else(|| anyhow!("Target state not found during fast-forward refresh"))?;
let target_tree = parent_repo
.store()
.get_tree(&target_state.tree)?
.ok_or_else(|| anyhow!("Target tree not found during fast-forward refresh"))?;
super::merge::apply_merged_tree_external(thread_repo, &target_tree)?;
Ok(ThreeWayMergeRefresh::Clean { new_state: target })
}
ThreeWayMergeOutcome::Clean { tree } => {
super::merge::apply_merged_tree_external(thread_repo, &tree)?;
let attribution = Attribution::human(thread_repo.get_principal()?);
let new_state = thread_repo.snapshot_merge_with_attribution(
&target_tip,
Some(format!(
"Refresh thread '{}' onto '{}'",
thread.thread, target_thread_name
)),
None,
attribution,
None,
false,
)?;
parent_repo
.refs()
.set_thread(&ThreadName::new(&thread.thread), &new_state.change_id)?;
Ok(ThreeWayMergeRefresh::Clean {
new_state: new_state.change_id,
})
}
ThreeWayMergeOutcome::Conflicted { tree, paths, base } => {
Ok(ThreeWayMergeRefresh::Conflicted {
tree,
paths,
ours: thread_tip,
theirs: target_tip,
base: Some(base),
})
}
}
}
fn cmd_thread_promote(
cli: &Cli,
repo: &Repository,
thread_id: &str,
path: Option<PathBuf>,
force: bool,
) -> Result<()> {
let manager = thread_manager(repo);
let mut thread = manager
.load(thread_id)?
.ok_or_else(|| anyhow!(thread_not_found_advice(thread_id, "promote thread")))?;
let state_id = repo
.refs()
.get_thread(&ThreadName::new(&thread.thread))?
.ok_or_else(|| {
anyhow!(
"managed thread '{}' is missing its ref during promote",
thread.thread
)
})?;
if !force {
if thread.execution_path.exists()
&& thread.execution_path != *repo.root()
&& thread.execution_path.join(".heddle").exists()
{
match Repository::open(&thread.execution_path) {
Ok(guard_repo) => ensure_worktree_clean(&guard_repo, "promote thread")?,
Err(_) => ensure_worktree_clean(repo, "promote thread")?,
}
} else {
ensure_worktree_clean(repo, "promote thread")?;
}
}
if matches!(thread.mode, ThreadMode::Virtualized) {
mount_lifecycle::unmount_thread_if_mounted(thread_id);
if let Err(error) =
crate::cli::commands::daemon_client::unmount_via_daemon(repo.root(), thread_id)
{
tracing::warn!(
thread = thread_id,
%error,
"daemon unmount RPC failed during promote; continuing"
);
}
}
let using_default = path.is_none();
let target = path.unwrap_or_else(|| default_materialized_thread_path(repo, thread_id));
if using_default && matches!(thread.mode, ThreadMode::Materialized | ThreadMode::Solid) {
let existing = thread
.materialized_path
.clone()
.filter(|p| !p.as_os_str().is_empty())
.unwrap_or_else(|| thread.execution_path.clone());
if !existing.as_os_str().is_empty()
&& existing.join(".heddle").exists()
&& same_existing_dir(&existing, &target)
{
remove_path_recursively(&existing)?;
}
}
let abs_path = prepare_worktree_target(repo, &target, Some(thread_id))?.path;
write_isolated_checkout(repo, &abs_path, &state_id, Some(&thread.thread))?;
thread.mode = ThreadMode::Solid;
thread.state = ThreadState::Promoted;
thread.materialized_path = Some(abs_path.clone());
thread.updated_at = Utc::now();
manager.save(&thread)?;
print_thread_output(
cli,
repo,
OperatorAction::ThreadPromote,
thread,
format!(
"Promoted thread '{}' to '{}'",
thread_id,
abs_path.display()
),
)
}
pub(crate) fn cmd_thread_drop(
cli: &Cli,
repo: &Repository,
thread_id: &str,
delete_thread: bool,
force: bool,
) -> Result<()> {
let outcome = drop_thread_silent(repo, thread_id, delete_thread, force)?;
match outcome {
DropOutcome::Dropped(thread) => print_thread_output(
cli,
repo,
OperatorAction::ThreadDrop,
*thread,
format!("Dropped thread '{}'", thread_id),
),
DropOutcome::Deleted => cmd_thread_delete(cli, repo, thread_id.to_string()),
}
}
pub(crate) enum DropOutcome {
Dropped(Box<Thread>),
Deleted,
}
pub(crate) fn drop_thread_silent(
repo: &Repository,
thread_id: &str,
delete_thread: bool,
force: bool,
) -> Result<DropOutcome> {
let manager = thread_manager(repo);
let Some(mut thread) = manager.load(thread_id)? else {
if !delete_thread && repo.current_lane()?.as_deref() == Some(thread_id) {
return Err(anyhow!(current_thread_drop_advice(repo, thread_id)));
}
if delete_thread {
return Ok(DropOutcome::Deleted);
}
return Err(anyhow!(thread_not_found_advice(thread_id, "drop thread")));
};
if !force {
if thread.execution_path.exists()
&& thread.execution_path != *repo.root()
&& thread.execution_path.join(".heddle").exists()
{
match Repository::open(&thread.execution_path) {
Ok(guard_repo) => ensure_worktree_clean(&guard_repo, "drop thread")?,
Err(_) => ensure_worktree_clean(repo, "drop thread")?,
}
} else {
ensure_worktree_clean(repo, "drop thread")?;
}
}
if matches!(thread.mode, ThreadMode::Virtualized) {
mount_lifecycle::unmount_thread_if_mounted(thread_id);
if let Err(error) =
crate::cli::commands::daemon_client::unmount_via_daemon(repo.root(), thread_id)
{
tracing::warn!(thread = thread_id, %error, "daemon unmount RPC failed; continuing with drop");
}
}
if thread.execution_path.exists() {
remove_path_recursively(&thread.execution_path)?;
}
repo::thread_manifest::remove_thread_manifest_dir(repo.heddle_dir(), &thread.thread)?;
thread.state = ThreadState::Abandoned;
thread.updated_at = Utc::now();
manager.save(&thread)?;
let registry = AgentRegistry::new(repo.heddle_dir());
for entry in registry.list()? {
if entry.thread == thread.thread || entry.thread_id.as_deref() == Some(thread_id) {
registry.delete(&entry.session_id)?;
}
}
let tn = ThreadName::new(&thread.thread);
if delete_thread && repo.refs().get_thread(&tn)?.is_some() {
repo.refs().delete_thread(&tn)?;
}
Ok(DropOutcome::Dropped(Box::new(thread)))
}
fn print_thread_output(
cli: &Cli,
repo: &Repository,
action: OperatorAction,
mut thread: Thread,
message: String,
) -> Result<()> {
refresh_thread_freshness(repo, &mut thread)?;
let mut advice = describe_thread_advice(&thread, false, 0, false);
if matches!(thread.state, ThreadState::Abandoned) {
advice.blockers.clear();
advice.recommended_action.clear();
}
let operation = repo.operation_status()?;
let remote_tracking = repo.git_remote_tracking_status()?;
let import_hint = repo.git_overlay_import_hint()?;
let recommended_action = primary_next_action(
operation.as_ref(),
remote_tracking.as_ref(),
import_hint.as_ref(),
Some(&advice.recommended_action),
);
let recommended_action = normalized_action(recommended_action);
if should_output_json(cli, Some(repo.config())) {
println!(
"{}",
serde_json::to_string(&ThreadOutput {
operator: OperatorCommandOutput {
status: "completed".to_string(),
action,
message,
blockers: advice.blockers.clone(),
warnings: Vec::new(),
next_action: recommended_action.clone(),
recommended_action: recommended_action.clone(),
},
changed_path_count: thread.changed_paths.len(),
thread,
})?
);
} else {
println!("{}", message);
if !advice.blockers.is_empty() {
println!("Blockers:");
for blocker in &advice.blockers {
println!(" - {}", blocker);
}
}
if let Some(recommended_action) = &recommended_action {
print_next(recommended_action);
}
}
Ok(())
}
fn same_existing_dir(a: &Path, b: &Path) -> bool {
match (a.canonicalize(), b.canonicalize()) {
(Ok(ca), Ok(cb)) => ca == cb,
_ => a == b,
}
}
fn default_materialized_thread_path(repo: &Repository, thread_id: &str) -> PathBuf {
repo.managed_checkout_path(thread_id)
}
#[derive(Debug, Clone, Serialize)]
struct ThreadCleanupOutput {
#[serde(flatten)]
operator: OperatorCommandOutput,
dry_run: bool,
merged: Vec<DroppedThread>,
auto: Vec<DroppedThread>,
reclaimed_bytes: u64,
would_reclaim_bytes: u64,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
skipped: Vec<SkippedThread>,
}
#[derive(Debug, Clone, Serialize)]
struct DroppedThread {
thread: String,
id: String,
reason: &'static str,
age_seconds: i64,
bytes: u64,
execution_path: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct SkippedThread {
thread: String,
id: String,
reason: &'static str,
note: String,
}
fn parse_duration(spec: &str) -> Result<chrono::Duration> {
let trimmed = spec.trim();
if trimmed.is_empty() {
return Err(anyhow!(thread_cleanup_duration_advice(
spec,
"duration is empty"
)));
}
if let Ok(seconds) = trimmed.parse::<i64>() {
if seconds < 0 {
return Err(anyhow!(thread_cleanup_duration_advice(
spec,
"duration must be non-negative"
)));
}
return chrono::Duration::try_seconds(seconds).ok_or_else(|| {
anyhow!(thread_cleanup_duration_advice(
spec,
format!("duration overflow: '{spec}' exceeds chrono's range")
))
});
}
let (num_part, unit) = trimmed.split_at(trimmed.len() - 1);
let value: i64 = num_part.parse().map_err(|_| {
anyhow!(thread_cleanup_duration_advice(
spec,
format!(
"could not parse duration '{spec}' - expected a non-negative integer with optional suffix s/m/h/d/w (e.g. 7d, 24h)"
)
))
})?;
if value < 0 {
return Err(anyhow!(thread_cleanup_duration_advice(
spec,
"duration must be non-negative"
)));
}
let duration = match unit {
"s" => chrono::Duration::try_seconds(value),
"m" => chrono::Duration::try_minutes(value),
"h" => chrono::Duration::try_hours(value),
"d" => chrono::Duration::try_days(value),
"w" => chrono::Duration::try_weeks(value),
other => {
return Err(anyhow!(thread_cleanup_duration_advice(
spec,
format!(
"unknown duration unit '{other}' in '{spec}' - expected one of s, m, h, d, w"
)
)));
}
};
duration.ok_or_else(|| {
anyhow!(thread_cleanup_duration_advice(
spec,
format!("duration overflow: {spec}")
))
})
}
fn directory_size(path: &std::path::Path) -> u64 {
if !path.exists() {
return 0;
}
let mut total: u64 = 0;
let walker = match fs::read_dir(path) {
Ok(walker) => walker,
Err(err) => {
tracing::debug!(path = %path.display(), %err, "directory_size: read_dir failed");
return 0;
}
};
for entry in walker.flatten() {
let entry_path = entry.path();
match entry.file_type() {
Ok(ft) if ft.is_dir() => total += directory_size(&entry_path),
Ok(ft) if ft.is_file() => {
if let Ok(meta) = entry.metadata() {
total = total.saturating_add(meta.len());
}
}
_ => {}
}
}
total
}
fn format_bytes(bytes: u64) -> String {
const KB: f64 = 1024.0;
const MB: f64 = KB * 1024.0;
const GB: f64 = MB * 1024.0;
const TB: f64 = GB * 1024.0;
let b = bytes as f64;
if b >= TB {
format!("{:.1} TB", b / TB)
} else if b >= GB {
format!("{:.1} GB", b / GB)
} else if b >= MB {
format!("{:.1} MB", b / MB)
} else if b >= KB {
format!("{:.1} KB", b / KB)
} else {
format!("{} B", bytes)
}
}
fn cmd_thread_cleanup(cli: &Cli, repo: &Repository, args: ThreadCleanupArgs) -> Result<()> {
if !args.merged && !args.auto {
return Err(anyhow!(thread_cleanup_mode_required_advice()));
}
if args.older_than.is_some() && !args.auto {
return Err(anyhow!(thread_cleanup_auto_required_advice()));
}
let staleness_cutoff = if args.auto {
let spec = args
.older_than
.as_deref()
.ok_or_else(|| anyhow!(thread_cleanup_older_than_required_advice()))?;
Some(parse_duration(spec)?)
} else {
None
};
let manager = thread_manager(repo);
let threads = manager.list()?;
let now = Utc::now();
let active_thread_id: Option<String> = match current_thread(repo) {
Ok(Some(t)) => Some(t.id),
_ => None,
};
let mut merged_drops: Vec<(Thread, DroppedThread)> = Vec::new();
let mut auto_drops: Vec<(Thread, DroppedThread)> = Vec::new();
let mut skipped: Vec<SkippedThread> = Vec::new();
for thread in threads {
if matches!(thread.state, ThreadState::Abandoned) {
continue;
}
let age_seconds = (now - thread.updated_at).num_seconds().max(0);
let bytes = directory_size(&thread.execution_path);
let execution_path = thread
.execution_path
.to_str()
.map(ToString::to_string)
.filter(|s| !s.is_empty());
let is_active = active_thread_id
.as_deref()
.is_some_and(|id| id == thread.id);
if args.merged && matches!(thread.state, ThreadState::Merged) {
if is_active {
tracing::info!(
thread = %thread.thread,
"skipping cleanup of active thread (currently in use)"
);
skipped.push(SkippedThread {
thread: thread.thread.clone(),
id: thread.id.clone(),
reason: "active",
note:
"currently the active thread; would leave the user in a deleted directory"
.to_string(),
});
continue;
}
let dropped = DroppedThread {
thread: thread.thread.clone(),
id: thread.id.clone(),
reason: "merged",
age_seconds,
bytes,
execution_path: execution_path.clone(),
};
merged_drops.push((thread, dropped));
continue;
}
if args.auto && thread.auto {
if let Some(cutoff) = staleness_cutoff
&& age_seconds < cutoff.num_seconds()
{
continue;
}
if is_active {
tracing::info!(
thread = %thread.thread,
"skipping cleanup of active thread (currently in use)"
);
skipped.push(SkippedThread {
thread: thread.thread.clone(),
id: thread.id.clone(),
reason: "active",
note:
"currently the active thread; would leave the user in a deleted directory"
.to_string(),
});
continue;
}
let dropped = DroppedThread {
thread: thread.thread.clone(),
id: thread.id.clone(),
reason: "auto-stale",
age_seconds,
bytes,
execution_path,
};
auto_drops.push((thread, dropped));
}
}
let mut reclaimed_bytes: u64 = 0;
if !args.dry_run {
for (thread, dropped) in merged_drops.iter().chain(auto_drops.iter()) {
apply_thread_drop(repo, &manager, thread)?;
reclaimed_bytes = reclaimed_bytes.saturating_add(dropped.bytes);
}
}
let merged_summary: Vec<DroppedThread> = merged_drops.iter().map(|(_, d)| d.clone()).collect();
let auto_summary: Vec<DroppedThread> = auto_drops.iter().map(|(_, d)| d.clone()).collect();
let total_dropped = merged_summary.len() + auto_summary.len();
let would_reclaim: u64 = merged_summary
.iter()
.chain(auto_summary.iter())
.map(|d| d.bytes)
.sum();
let action_word = if args.dry_run {
"would drop"
} else {
"dropped"
};
let mut parts: Vec<String> = Vec::new();
if args.merged {
parts.push(format!(
"{} {} merged thread(s)",
action_word,
merged_summary.len()
));
}
if args.auto {
parts.push(format!(
"{} {} stale auto-thread(s)",
action_word,
auto_summary.len()
));
}
let bytes_for_message = if args.dry_run {
would_reclaim
} else {
reclaimed_bytes
};
let reclaim_word = if args.dry_run {
"would reclaim"
} else {
"reclaimed"
};
let summary_message = format!(
"{} ({} {})",
parts.join(", "),
reclaim_word,
format_bytes(bytes_for_message)
);
let output = ThreadCleanupOutput {
operator: OperatorCommandOutput {
status: "completed".to_string(),
action: OperatorAction::ThreadCleanup,
message: summary_message.clone(),
blockers: Vec::new(),
warnings: Vec::new(),
next_action: None,
recommended_action: None,
},
dry_run: args.dry_run,
merged: merged_summary.clone(),
auto: auto_summary.clone(),
reclaimed_bytes: if args.dry_run { 0 } else { reclaimed_bytes },
would_reclaim_bytes: would_reclaim,
skipped: skipped.clone(),
};
if should_output_json(cli, Some(repo.config())) {
println!("{}", serde_json::to_string(&output)?);
} else {
println!("{}", summary_message);
if total_dropped == 0 {
println!("No threads matched the cleanup criteria.");
} else {
for entry in merged_summary.iter().chain(auto_summary.iter()) {
println!(
" - {} ({}) [{}] {} age {}s",
entry.thread,
entry.id,
entry.reason,
format_bytes(entry.bytes),
entry.age_seconds,
);
}
}
for entry in &skipped {
println!(
" - {} ({}) [skipped: {}] {}",
entry.thread, entry.id, entry.reason, entry.note,
);
}
}
Ok(())
}
fn apply_thread_drop(repo: &Repository, manager: &ThreadManager, thread: &Thread) -> Result<()> {
if matches!(thread.mode, ThreadMode::Virtualized) {
mount_lifecycle::unmount_thread_if_mounted(&thread.thread);
if let Err(error) =
crate::cli::commands::daemon_client::unmount_via_daemon(repo.root(), &thread.thread)
{
tracing::warn!(
thread = %thread.thread,
%error,
"daemon unmount RPC failed during cleanup; continuing"
);
}
}
if thread.execution_path.exists() {
remove_path_recursively(&thread.execution_path)?;
}
repo::thread_manifest::remove_thread_manifest_dir(repo.heddle_dir(), &thread.thread)?;
let mut updated = thread.clone();
updated.state = ThreadState::Abandoned;
updated.updated_at = Utc::now();
manager.save(&updated)?;
let tn = ThreadName::new(&thread.thread);
if repo.refs().get_thread(&tn)?.is_some() {
repo.refs().delete_thread(&tn)?;
}
let registry = AgentRegistry::new(repo.heddle_dir());
for entry in registry.list()? {
if entry.thread == thread.thread || entry.thread_id.as_deref() == Some(&thread.id) {
registry.delete(&entry.session_id)?;
}
}
Ok(())
}
#[cfg(test)]
mod cleanup_tests {
use super::*;
#[test]
fn promote_default_path_matches_canonical_thread_dir() {
let dir = tempfile::TempDir::new().unwrap();
let repo = Repository::init_default(dir.path()).unwrap();
for id in ["foo", "foo/bar", "team@scope", "feature/foo"] {
let promote = default_materialized_thread_path(&repo, id);
let canonical = repo.managed_checkout_path(id);
assert_eq!(
promote, canonical,
"promote default must match the canonical thread_dir for {id:?}"
);
}
}
#[test]
fn parse_duration_handles_supported_units() {
assert_eq!(parse_duration("0").unwrap(), chrono::Duration::seconds(0));
assert_eq!(parse_duration("90").unwrap(), chrono::Duration::seconds(90));
assert_eq!(
parse_duration("30s").unwrap(),
chrono::Duration::seconds(30)
);
assert_eq!(
parse_duration("15m").unwrap(),
chrono::Duration::minutes(15)
);
assert_eq!(parse_duration("4h").unwrap(), chrono::Duration::hours(4));
assert_eq!(parse_duration("7d").unwrap(), chrono::Duration::days(7));
assert_eq!(parse_duration("2w").unwrap(), chrono::Duration::weeks(2));
}
#[test]
fn parse_duration_rejects_unknown_units_and_negatives() {
assert!(parse_duration("").is_err());
assert!(parse_duration("3y").is_err());
assert!(parse_duration("foo").is_err());
assert!(parse_duration("-5").is_err());
}
#[test]
fn parse_duration_rejects_overflow() {
let huge_weeks = format!("{}w", i64::MAX);
let err = parse_duration(&huge_weeks).expect_err("must reject overflow");
let msg = format!("{err}");
assert!(
msg.contains("overflow"),
"error should mention overflow; got: {msg}"
);
for suffix in ["d", "h", "m"] {
let huge = format!("{}{}", i64::MAX, suffix);
assert!(
parse_duration(&huge).is_err(),
"{suffix} overflow should error",
);
}
}
#[test]
fn format_bytes_picks_a_reasonable_unit() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(512), "512 B");
assert_eq!(format_bytes(2048), "2.0 KB");
assert_eq!(format_bytes(1024 * 1024 * 5), "5.0 MB");
}
#[test]
fn current_thread_ref_state_resolves_persisted_short_state() {
let dir = tempfile::TempDir::new().unwrap();
let repo = Repository::init_default(dir.path()).unwrap();
fs::write(dir.path().join("file.txt"), "content\n").unwrap();
let state = repo.snapshot(Some("content".to_string()), None).unwrap();
let thread = Thread {
id: "feature".to_string(),
thread: "feature".to_string(),
target_thread: Some("main".to_string()),
parent_thread: Some("main".to_string()),
mode: ThreadMode::Materialized,
state: ThreadState::Active,
base_state: state.change_id.short(),
base_root: state.tree.short(),
current_state: Some(state.change_id.short()),
merged_state: None,
task: None,
execution_path: dir.path().to_path_buf(),
materialized_path: None,
changed_paths: Vec::new(),
impact_categories: Vec::new(),
heavy_impact_paths: Vec::new(),
promotion_suggested: false,
freshness: ThreadFreshness::Unknown,
verification_summary: Default::default(),
confidence_summary: Default::default(),
integration_policy_result: Default::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
ephemeral: None,
auto: false,
shared_target_dir: None,
};
let resolved = current_thread_ref_state(&repo, &thread).unwrap();
assert_eq!(
resolved, state.change_id,
"fallback must resolve thread-record short current_state values"
);
}
}