mod execution;
mod results;
mod tasks;
mod types;
use anyhow::Context;
use std::collections::HashSet;
use std::sync::Arc;
use anstyle::Style;
use color_print::cformat;
use crossbeam_channel as chan;
use dunce::canonicalize;
use once_cell::sync::OnceCell;
use rayon::prelude::*;
use worktrunk::git::{Repository, WorktreeInfo};
use worktrunk::styling::{
INFO_SYMBOL, eprintln, format_with_gutter, hint_message, warning_message,
};
use crate::commands::is_worktree_at_expected_path;
use super::model::{DisplayFields, ItemKind, ListItem, StatusSymbols, WorktreeData};
use super::progressive_table::ProgressiveTable;
pub(crate) use tasks::parse_port_from_url;
pub(crate) use types::TaskKind;
pub(crate) use execution::ExpectedResults;
use execution::{work_items_for_branch, work_items_for_worktree};
use results::drain_results;
use types::DrainOutcome;
use types::{TaskError, TaskResult};
struct TableRenderPlan {
progressive_table: Option<ProgressiveTable>,
header: String,
rows: Vec<String>,
summary: String,
}
impl TableRenderPlan {
fn render(mut self) -> anyhow::Result<()> {
if let Some(mut table) = self.progressive_table.take() {
if table.is_tty() {
table.finalize(self.rows, self.summary)?;
} else {
print_buffered_table(&self.header, &self.rows, &self.summary);
}
} else {
print_buffered_table(&self.header, &self.rows, &self.summary);
}
Ok(())
}
}
fn print_buffered_table(header: &str, rows: &[String], summary: &str) {
println!("{header}");
for row in rows {
println!("{row}");
}
println!();
println!("{summary}");
}
#[derive(Clone, Default)]
pub struct CollectOptions {
pub skip_tasks: std::collections::HashSet<TaskKind>,
pub url_template: Option<String>,
pub llm_command: Option<String>,
pub stale_branches: std::collections::HashSet<String>,
}
fn worktree_branch_set(worktrees: &[WorktreeInfo]) -> HashSet<&str> {
worktrees
.iter()
.filter_map(|wt| wt.branch.as_deref())
.collect()
}
#[cfg_attr(not(unix), allow(dead_code))]
pub enum ShowConfig {
Resolved {
show_branches: bool,
show_remotes: bool,
skip_tasks: HashSet<TaskKind>,
command_timeout: Option<std::time::Duration>,
collect_deadline: Option<std::time::Instant>,
},
DeferredToParallel {
cli_branches: bool,
cli_remotes: bool,
cli_full: bool,
},
}
pub fn collect(
repo: &Repository,
show_config: ShowConfig,
show_progress: bool,
render_table: bool,
skip_expensive_for_stale: bool,
) -> anyhow::Result<Option<super::model::ListData>> {
worktrunk::shell_exec::trace_instant("List collect started");
let (fetch_branches, fetch_remotes) = match &show_config {
ShowConfig::Resolved {
show_branches,
show_remotes,
..
} => (*show_branches, *show_remotes),
ShowConfig::DeferredToParallel { cli_remotes, .. } => {
let fetch_branches = true;
let fetch_remotes = *cli_remotes;
(fetch_branches, fetch_remotes)
}
};
let worktrees_cell: OnceCell<anyhow::Result<Vec<WorktreeInfo>>> = OnceCell::new();
let default_branch_cell: OnceCell<Option<String>> = OnceCell::new();
let url_template_cell: OnceCell<Option<String>> = OnceCell::new();
let local_branches_cell: OnceCell<anyhow::Result<Vec<(String, String)>>> = OnceCell::new();
let remote_branches_cell: OnceCell<anyhow::Result<Vec<(String, String)>>> = OnceCell::new();
rayon::scope(|s| {
s.spawn(|_| {
let _ = worktrees_cell.set(repo.list_worktrees());
});
s.spawn(|_| {
let _ = default_branch_cell.set(repo.default_branch());
});
s.spawn(|_| {
let _ = repo.is_bare();
});
s.spawn(|_| {
let _ = url_template_cell.set(repo.url_template());
});
s.spawn(|_| {
let _ = repo.config();
});
s.spawn(|_| {
if fetch_branches {
let _ = local_branches_cell.set(repo.list_local_branches());
}
});
s.spawn(|_| {
if fetch_remotes {
let _ = remote_branches_cell.set(repo.list_untracked_remote_branches());
}
});
});
let worktrees = worktrees_cell
.into_inner()
.unwrap()
.context("Failed to list worktrees")?;
if worktrees.is_empty() {
return Ok(None);
}
let default_branch = default_branch_cell.into_inner().unwrap();
let url_template = url_template_cell.into_inner().unwrap();
let (show_branches, show_remotes, skip_tasks, command_timeout, collect_deadline) =
match show_config {
ShowConfig::Resolved {
show_branches,
show_remotes,
skip_tasks,
command_timeout,
collect_deadline,
} => (
show_branches,
show_remotes,
skip_tasks,
command_timeout,
collect_deadline,
),
ShowConfig::DeferredToParallel {
cli_branches,
cli_remotes,
cli_full,
} => {
let config = repo.config();
let show_branches = cli_branches || config.list.branches();
let show_remotes = cli_remotes || config.list.remotes();
let show_full = cli_full || config.list.full();
let skip_tasks: HashSet<TaskKind> = if show_full {
HashSet::new()
} else {
[
TaskKind::BranchDiff,
TaskKind::CiStatus,
TaskKind::SummaryGenerate,
]
.into_iter()
.collect()
};
let (command_timeout, collect_deadline) = if show_full {
(None, None)
} else {
let task_timeout = config.list.task_timeout();
let deadline = config.list.timeout().map(|d| std::time::Instant::now() + d);
(task_timeout, deadline)
};
(
show_branches,
show_remotes,
skip_tasks,
command_timeout,
collect_deadline,
)
}
};
let branches_without_worktrees = if show_branches {
let all_local = if let Some(result) = local_branches_cell.into_inner() {
result?
} else {
repo.list_local_branches()?
};
let worktree_branches = worktree_branch_set(&worktrees);
all_local
.into_iter()
.filter(|(name, _)| !worktree_branches.contains(name.as_str()))
.collect()
} else {
Vec::new()
};
let remote_branches = if show_remotes {
if let Some(result) = remote_branches_cell.into_inner() {
result?
} else {
repo.list_untracked_remote_branches()?
}
} else {
Vec::new()
};
let current_worktree_path = repo.current_worktree().root().ok().and_then(|root| {
worktrees
.iter()
.find(|wt| canonicalize(&wt.path).map(|p| p == root).unwrap_or(false))
.map(|wt| wt.path.clone())
});
if let Some(configured) = repo.invalid_default_branch_config() {
let msg =
cformat!("Configured default branch <bold>{configured}</> does not exist locally");
eprintln!("{}", warning_message(msg));
let hint = cformat!("To reset, run <underline>wt config state default-branch clear</>");
eprintln!("{}", hint_message(hint));
}
let primary_path = repo.primary_worktree()?;
let main_worktree = primary_path
.as_ref()
.and_then(|p| worktrees.iter().find(|wt| wt.path == *p))
.or_else(|| worktrees.iter().find(|wt| !wt.is_prunable()))
.cloned()
.ok_or_else(|| anyhow::anyhow!("No worktrees found"))?;
let all_shas: Vec<&str> = worktrees
.iter()
.map(|wt| wt.head.as_str())
.chain(
branches_without_worktrees
.iter()
.map(|(_, sha)| sha.as_str()),
)
.chain(remote_branches.iter().map(|(_, sha)| sha.as_str()))
.filter(|sha| *sha != worktrunk::git::NULL_OID)
.collect();
let timestamps = repo.commit_timestamps(&all_shas).unwrap_or_default();
let sorted_worktrees = sort_worktrees_with_cache(
worktrees.clone(),
&main_worktree,
current_worktree_path.as_ref(),
×tamps,
);
let branches_without_worktrees =
sort_by_timestamp_desc_with_cache(branches_without_worktrees, ×tamps, |(_, sha)| {
sha.as_str()
});
let remote_branches =
sort_by_timestamp_desc_with_cache(remote_branches, ×tamps, |(_, sha)| sha.as_str());
let main_worktree_canonical = canonicalize(&main_worktree.path).ok();
let mut all_items: Vec<ListItem> = sorted_worktrees
.iter()
.map(|wt| {
let wt_canonical = canonicalize(&wt.path).ok();
let is_main = match (&wt_canonical, &main_worktree_canonical) {
(Some(wt_c), Some(main_c)) => wt_c == main_c,
_ => wt.path == main_worktree.path,
};
let is_current = current_worktree_path
.as_ref()
.is_some_and(|cp| wt_canonical.as_ref() == Some(cp));
let is_previous = false;
let branch_worktree_mismatch =
!is_worktree_at_expected_path(wt, repo, repo.user_config());
let mut worktree_data =
WorktreeData::from_worktree(wt, is_main, is_current, is_previous);
worktree_data.branch_worktree_mismatch = branch_worktree_mismatch;
ListItem {
head: wt.head.clone(),
branch: wt.branch.clone(),
commit: None,
counts: None,
branch_diff: None,
committed_trees_match: None,
has_file_changes: None,
would_merge_add: None,
is_patch_id_match: None,
is_ancestor: None,
is_orphan: None,
upstream: None,
pr_status: None,
url: None,
url_active: None,
summary: None,
has_merge_tree_conflicts: None,
user_marker: None,
status_symbols: StatusSymbols::default(),
display: DisplayFields::default(),
kind: ItemKind::Worktree(Box::new(worktree_data)),
}
})
.collect();
let branch_start_idx = all_items.len();
all_items.extend(
branches_without_worktrees
.iter()
.map(|(name, sha)| ListItem::new_branch(sha.clone(), name.clone())),
);
let remote_start_idx = all_items.len();
all_items.extend(
remote_branches
.iter()
.map(|(name, sha)| ListItem::new_branch(sha.clone(), name.clone())),
);
let mut effective_skip_tasks = skip_tasks.clone();
if url_template.is_none() {
effective_skip_tasks.insert(TaskKind::UrlStatus);
}
let config = repo.config();
let llm_command = config.commit_generation.command.clone();
if !config.list.summary() || llm_command.is_none() {
effective_skip_tasks.insert(TaskKind::SummaryGenerate);
}
let layout = super::layout::calculate_layout_from_basics(
&all_items,
&effective_skip_tasks,
&main_worktree.path,
url_template.as_deref(),
);
let max_width = crate::display::terminal_width();
let returned_skip_tasks = effective_skip_tasks.clone();
let mut options = CollectOptions {
skip_tasks: effective_skip_tasks,
url_template: url_template.clone(),
llm_command,
..Default::default()
};
let expected_results = std::sync::Arc::new(ExpectedResults::default());
let num_worktrees = all_items
.iter()
.filter(|item| item.worktree_data().is_some())
.count();
let num_local_branches = branches_without_worktrees.len();
let num_remote_branches = remote_branches.len();
let footer_base =
if (show_branches && num_local_branches > 0) || (show_remotes && num_remote_branches > 0) {
let mut parts = vec![format!("{} worktrees", num_worktrees)];
if show_branches && num_local_branches > 0 {
parts.push(format!("{} branches", num_local_branches));
}
if show_remotes && num_remote_branches > 0 {
parts.push(format!("{} remote branches", num_remote_branches));
}
format!("Showing {}", parts.join(", "))
} else {
let plural = if num_worktrees == 1 { "" } else { "s" };
format!("Showing {} worktree{}", num_worktrees, plural)
};
let mut progressive_table = if show_progress {
let dim = Style::new().dimmed();
let skeletons: Vec<String> = all_items
.iter()
.map(|item| layout.render_skeleton_row(item).render())
.collect();
let initial_footer = format!("{INFO_SYMBOL} {dim}{footer_base} (loading...){dim:#}");
let mut table = ProgressiveTable::new(
layout.format_header_line(),
skeletons,
initial_footer,
max_width,
);
table.render_skeleton()?;
worktrunk::shell_exec::trace_instant("Skeleton rendered");
Some(table)
} else {
None
};
if std::env::var_os("WORKTRUNK_SKELETON_ONLY").is_some()
|| std::env::var_os("WORKTRUNK_FIRST_OUTPUT").is_some()
{
return Ok(None);
}
#[cfg(target_os = "macos")]
let fsmonitor_worktrees: Vec<_> = if repo.is_builtin_fsmonitor_enabled() {
sorted_worktrees
.iter()
.filter(|wt| !wt.is_prunable())
.collect()
} else {
vec![]
};
#[cfg(not(target_os = "macos"))]
let fsmonitor_worktrees: Vec<&WorktreeInfo> = vec![];
let previous_branch_cell: OnceCell<Option<String>> = OnceCell::new();
let integration_target_cell: OnceCell<Option<String>> = OnceCell::new();
rayon::scope(|s| {
s.spawn(|_| {
let _ = previous_branch_cell.set(repo.switch_previous());
});
s.spawn(|_| {
let _ = integration_target_cell.set(repo.integration_target());
});
for wt in &fsmonitor_worktrees {
s.spawn(|_| {
repo.start_fsmonitor_daemon_at(&wt.path);
});
}
});
let previous_branch = previous_branch_cell.into_inner().flatten();
let integration_target = integration_target_cell.into_inner().flatten();
if let Some(prev) = previous_branch.as_deref() {
for item in &mut all_items {
if item.branch.as_deref() == Some(prev)
&& let Some(wt_data) = item.worktree_data_mut()
{
wt_data.is_previous = true;
}
}
}
if skip_expensive_for_stale && let Some(ref db) = default_branch {
let threshold: usize = std::env::var("WORKTRUNK_TEST_SKIP_EXPENSIVE_THRESHOLD")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50);
let ahead_behind = repo.batch_ahead_behind(db);
options.stale_branches = ahead_behind
.into_iter()
.filter_map(|(branch, (_, behind))| (behind > threshold).then_some(branch))
.collect();
}
let mut last_rendered_lines: Vec<String> = vec![String::new(); all_items.len()];
let (tx, rx) = chan::unbounded::<Result<TaskResult, TaskError>>();
let mut errors: Vec<TaskError> = Vec::new();
let branch_data: Vec<(usize, String, String, bool)> =
if show_branches || show_remotes {
let mut all_branches = Vec::new();
if show_branches {
all_branches.extend(branches_without_worktrees.iter().enumerate().map(
|(idx, (name, sha))| (branch_start_idx + idx, name.clone(), sha.clone(), false),
));
}
if show_remotes {
all_branches.extend(remote_branches.iter().enumerate().map(
|(idx, (name, sha))| (remote_start_idx + idx, name.clone(), sha.clone(), true),
));
}
all_branches
} else {
Vec::new()
};
let mut all_work_items = Vec::new();
for (idx, wt) in sorted_worktrees.iter().enumerate() {
all_work_items.extend(work_items_for_worktree(
repo,
wt,
idx,
&options,
&expected_results,
&tx,
&mut all_items[idx],
));
}
for (item_idx, branch_name, commit_sha, is_remote) in &branch_data {
all_work_items.extend(work_items_for_branch(
repo,
execution::BranchSpawn {
name: branch_name,
commit_sha,
item_idx: *item_idx,
is_remote: *is_remote,
},
&options,
&expected_results,
&mut all_items[*item_idx],
));
}
all_work_items.sort_by_key(|item| item.kind.is_network());
let tx_worker = tx.clone();
worktrunk::shell_exec::trace_instant("Spawning worker thread");
std::thread::spawn(move || {
worktrunk::shell_exec::trace_instant("Parallel execution started");
all_work_items.into_par_iter().for_each(|item| {
worktrunk::shell_exec::set_command_timeout(command_timeout);
let result = item.execute();
let _ = tx_worker.send(result);
});
});
drop(tx);
let mut completed_results = 0;
let mut progress_overflow = false;
let mut first_result_traced = false;
let drain_deadline =
collect_deadline.unwrap_or_else(|| std::time::Instant::now() + results::DRAIN_TIMEOUT);
let drain_outcome = drain_results(
rx,
&mut all_items,
&mut errors,
&expected_results,
drain_deadline,
integration_target.as_deref(),
|item_idx, item| {
if !first_result_traced {
first_result_traced = true;
worktrunk::shell_exec::trace_instant("First result received");
}
if let Some(ref mut table) = progressive_table {
let dim = Style::new().dimmed();
completed_results += 1;
let total_results = expected_results.count();
debug_assert!(
completed_results <= total_results,
"completed ({completed_results}) > expected ({total_results}): \
task result sent without registering expectation"
);
if completed_results > total_results {
progress_overflow = true;
}
let footer_msg = format!(
"{INFO_SYMBOL} {dim}{footer_base} ({completed_results}/{total_results} loaded){dim:#}"
);
table.update_footer(footer_msg);
let rendered = layout.format_list_item_line(item);
if rendered != last_rendered_lines[item_idx] {
last_rendered_lines[item_idx] = rendered.clone();
table.update_row(item_idx, rendered);
}
if let Err(e) = table.flush() {
log::debug!("Progressive table flush failed: {}", e);
}
}
},
);
worktrunk::shell_exec::trace_instant("All results drained");
if collect_deadline.is_none()
&& let DrainOutcome::TimedOut {
received_count,
items_with_missing,
} = drain_outcome
{
let mut diag = format!(
"wt list timed out after {}s ({received_count} results received)",
results::DRAIN_TIMEOUT.as_secs()
);
if !items_with_missing.is_empty() {
diag.push_str("\nBlocked tasks:");
let missing_lines: Vec<String> = items_with_missing
.iter()
.take(5)
.map(|result| {
let missing_names: Vec<&str> =
result.missing_kinds.iter().map(|k| k.into()).collect();
cformat!("<bold>{}</>: {}", result.name, missing_names.join(", "))
})
.collect();
diag.push_str(&format!(
"\n{}",
format_with_gutter(&missing_lines.join("\n"), None)
));
}
eprintln!("{}", warning_message(&diag));
eprintln!(
"{}",
hint_message(cformat!(
"A git command likely hung; run <underline>wt list -v</> for details or <underline>wt list -vv</> to create a diagnostic file"
))
);
}
for item in all_items.iter_mut() {
item.refresh_status_symbols(integration_target.as_deref());
}
let error_count = errors.len();
let timed_out_count = errors.iter().filter(|e| e.is_timeout()).count();
let table_render = render_table.then(|| TableRenderPlan {
progressive_table,
header: layout.format_header_line(),
rows: all_items
.iter()
.map(|item| layout.format_list_item_line(item))
.collect(),
summary: super::format_summary_message(
&all_items,
show_branches || show_remotes,
layout.hidden_column_count,
error_count,
timed_out_count,
),
});
if let Some(table_render) = table_render {
table_render.render()?;
}
let non_timeout_errors: Vec<_> = errors.iter().filter(|e| !e.is_timeout()).collect();
if !non_timeout_errors.is_empty() || progress_overflow {
let mut warning_parts = Vec::new();
if !non_timeout_errors.is_empty() {
let mut sorted_errors = non_timeout_errors;
sorted_errors.sort_by_key(|e| (e.item_idx, e.kind));
let error_lines: Vec<String> = sorted_errors
.iter()
.map(|error| {
let name = all_items[error.item_idx].branch_name();
let kind_str: &'static str = error.kind.into();
let msg = error.message.lines().next().unwrap_or(&error.message);
cformat!("<bold>{}</>: {} ({})", name, kind_str, msg)
})
.collect();
warning_parts.push(format!(
"Some git operations failed:\n{}",
format_with_gutter(&error_lines.join("\n"), None)
));
}
if progress_overflow {
warning_parts.push("Progress counter overflow (completed > expected)".to_string());
}
let warning = warning_parts.join("\n");
eprintln!("{}", warning_message(&warning));
eprintln!("{}", hint_message(crate::diagnostic::issue_hint()));
}
for item in &mut all_items {
item.finalize_display();
}
let items = all_items;
worktrunk::shell_exec::trace_instant("List collect complete");
Ok(Some(super::model::ListData {
items,
main_worktree_path: main_worktree.path.clone(),
skip_tasks: returned_skip_tasks,
}))
}
fn sort_by_timestamp_desc_with_cache<T, F>(
items: Vec<T>,
timestamps: &std::collections::HashMap<String, i64>,
get_sha: F,
) -> Vec<T>
where
F: Fn(&T) -> &str,
{
let mut with_ts: Vec<_> = items
.into_iter()
.map(|item| {
let ts = *timestamps.get(get_sha(&item)).unwrap_or(&0);
(item, ts)
})
.collect();
with_ts.sort_by_key(|(_, ts)| std::cmp::Reverse(*ts));
with_ts.into_iter().map(|(item, _)| item).collect()
}
fn sort_worktrees_with_cache(
worktrees: Vec<WorktreeInfo>,
main_worktree: &WorktreeInfo,
current_path: Option<&std::path::PathBuf>,
timestamps: &std::collections::HashMap<String, i64>,
) -> Vec<WorktreeInfo> {
let mut with_sort_key: Vec<_> = worktrees
.into_iter()
.map(|wt| {
let priority = if current_path.is_some_and(|cp| &wt.path == cp) {
0 } else if wt.path == main_worktree.path {
1 } else {
2 };
let ts = *timestamps.get(&wt.head).unwrap_or(&0);
(wt, priority, ts)
})
.collect();
with_sort_key.sort_by_key(|(_, priority, ts)| (*priority, std::cmp::Reverse(*ts)));
with_sort_key.into_iter().map(|(wt, _, _)| wt).collect()
}
pub fn build_worktree_item(
wt: &WorktreeInfo,
is_main: bool,
is_current: bool,
is_previous: bool,
) -> ListItem {
ListItem {
head: wt.head.clone(),
branch: wt.branch.clone(),
commit: None,
counts: None,
branch_diff: None,
committed_trees_match: None,
has_file_changes: None,
would_merge_add: None,
is_patch_id_match: None,
is_ancestor: None,
is_orphan: None,
upstream: None,
pr_status: None,
url: None,
url_active: None,
summary: None,
has_merge_tree_conflicts: None,
user_marker: None,
status_symbols: StatusSymbols::default(),
display: DisplayFields::default(),
kind: ItemKind::Worktree(Box::new(WorktreeData::from_worktree(
wt,
is_main,
is_current,
is_previous,
))),
}
}
pub fn populate_item(
repo: &Repository,
item: &mut ListItem,
options: CollectOptions,
) -> anyhow::Result<()> {
let Some(data) = item.worktree_data() else {
return Ok(());
};
let target = repo.integration_target();
let (tx, rx) = chan::unbounded::<Result<TaskResult, TaskError>>();
let expected_results = Arc::new(ExpectedResults::default());
let mut errors: Vec<TaskError> = Vec::new();
let wt = WorktreeInfo {
path: data.path.clone(),
head: item.head.clone(),
branch: item.branch.clone(),
bare: false,
detached: false,
locked: None,
prunable: None,
};
let mut work_items = work_items_for_worktree(
repo,
&wt,
0, &options,
&expected_results,
&tx,
item,
);
work_items.sort_by_key(|w| w.kind.is_network());
std::thread::spawn(move || {
work_items.into_par_iter().for_each(|w| {
let result = w.execute();
let _ = tx.send(result);
});
});
let drain_outcome = drain_results(
rx,
std::slice::from_mut(item),
&mut errors,
&expected_results,
std::time::Instant::now() + results::DRAIN_TIMEOUT,
target.as_deref(),
|_item_idx, _item| {},
);
if let DrainOutcome::TimedOut { received_count, .. } = drain_outcome {
log::warn!(
"populate_item timed out after {}s ({received_count} results received)",
results::DRAIN_TIMEOUT.as_secs()
);
}
if !errors.is_empty() {
log::warn!("populate_item had {} task errors", errors.len());
for error in &errors {
let kind_str: &'static str = error.kind.into();
log::debug!(
" - item {}: {} ({})",
error.item_idx,
kind_str,
error.message
);
}
}
item.refresh_status_symbols(target.as_deref());
item.finalize_display();
Ok(())
}