use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
use anyhow::Context;
use worktrunk::git::{LineDiff, Repository};
use super::super::ci_status::{CiBranchName, PrStatus};
use super::super::model::{
ActiveGitOperation, AheadBehind, BranchDiffTotals, CommitDetails, UpstreamStatus,
WorkingTreeStatus,
};
use super::types::{ErrorCause, TaskError, TaskKind, TaskResult};
#[derive(Clone)]
pub struct TaskContext {
pub repo: Repository,
pub branch_ref: worktrunk::git::BranchRef,
pub item_idx: usize,
pub item_url: Option<String>,
pub llm_command: Option<String>,
}
impl TaskContext {
pub(super) fn error(&self, kind: TaskKind, err: &anyhow::Error) -> TaskError {
let is_timeout = err.chain().any(|e| {
e.downcast_ref::<std::io::Error>()
.is_some_and(|io_err| io_err.kind() == std::io::ErrorKind::TimedOut)
});
let cause = if is_timeout {
let kind_str: &'static str = kind.into();
let sha = &self.branch_ref.commit_sha;
let short_sha = &sha[..sha.len().min(8)];
let branch = self.branch_ref.branch.as_deref().unwrap_or(short_sha);
log::debug!("Task {} timed out for {}", kind_str, branch);
ErrorCause::Timeout
} else {
ErrorCause::Other
};
TaskError::new(self.item_idx, kind, err.to_string(), cause)
}
pub(super) fn default_branch(&self) -> Option<String> {
self.repo.default_branch()
}
pub(super) fn integration_target(&self) -> Option<String> {
self.repo.integration_target()
}
}
pub trait Task: Send + Sync + 'static {
const KIND: TaskKind;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError>;
}
pub struct CommitDetailsTask;
impl Task for CommitDetailsTask {
const KIND: TaskKind = TaskKind::CommitDetails;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let repo = &ctx.repo;
let (timestamp, commit_message) = repo
.commit_details(&ctx.branch_ref.commit_sha)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::CommitDetails {
item_idx: ctx.item_idx,
commit: CommitDetails {
timestamp,
commit_message,
},
})
}
}
pub struct AheadBehindTask;
impl Task for AheadBehindTask {
const KIND: TaskKind = TaskKind::AheadBehind;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.default_branch() else {
return Ok(TaskResult::AheadBehind {
item_idx: ctx.item_idx,
counts: AheadBehind::default(),
is_orphan: false,
});
};
let repo = &ctx.repo;
let is_orphan = repo
.merge_base(&base, &ctx.branch_ref.commit_sha)
.map_err(|e| ctx.error(Self::KIND, &e))?
.is_none();
if is_orphan {
return Ok(TaskResult::AheadBehind {
item_idx: ctx.item_idx,
counts: AheadBehind::default(),
is_orphan: true,
});
}
let (ahead, behind) = if let Some(branch) = ctx.branch_ref.branch.as_deref() {
if let Some(counts) = repo.cached_ahead_behind(&base, branch) {
counts
} else {
repo.ahead_behind(&base, branch)
.map_err(|e| ctx.error(Self::KIND, &e))?
}
} else {
repo.ahead_behind(&base, &ctx.branch_ref.commit_sha)
.map_err(|e| ctx.error(Self::KIND, &e))?
};
Ok(TaskResult::AheadBehind {
item_idx: ctx.item_idx,
counts: AheadBehind { ahead, behind },
is_orphan: false,
})
}
}
pub struct CommittedTreesMatchTask;
impl Task for CommittedTreesMatchTask {
const KIND: TaskKind = TaskKind::CommittedTreesMatch;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.integration_target() else {
return Ok(TaskResult::CommittedTreesMatch {
item_idx: ctx.item_idx,
committed_trees_match: false,
});
};
let repo = &ctx.repo;
let ref_to_check = ctx
.branch_ref
.branch
.as_deref()
.unwrap_or(&ctx.branch_ref.commit_sha);
let committed_trees_match = repo
.trees_match(ref_to_check, &base)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::CommittedTreesMatch {
item_idx: ctx.item_idx,
committed_trees_match,
})
}
}
pub struct HasFileChangesTask;
impl Task for HasFileChangesTask {
const KIND: TaskKind = TaskKind::HasFileChanges;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(branch) = ctx.branch_ref.branch.as_deref() else {
return Ok(TaskResult::HasFileChanges {
item_idx: ctx.item_idx,
has_file_changes: true,
});
};
let Some(target) = ctx.integration_target() else {
return Ok(TaskResult::HasFileChanges {
item_idx: ctx.item_idx,
has_file_changes: true,
});
};
let repo = &ctx.repo;
let has_file_changes = repo
.has_added_changes(branch, &target)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::HasFileChanges {
item_idx: ctx.item_idx,
has_file_changes,
})
}
}
pub struct WouldMergeAddTask;
impl Task for WouldMergeAddTask {
const KIND: TaskKind = TaskKind::WouldMergeAdd;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(branch) = ctx.branch_ref.branch.as_deref() else {
return Ok(TaskResult::WouldMergeAdd {
item_idx: ctx.item_idx,
would_merge_add: true,
is_patch_id_match: false,
});
};
let Some(base) = ctx.integration_target() else {
return Ok(TaskResult::WouldMergeAdd {
item_idx: ctx.item_idx,
would_merge_add: true,
is_patch_id_match: false,
});
};
let probe = ctx
.repo
.merge_integration_probe(branch, &base)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::WouldMergeAdd {
item_idx: ctx.item_idx,
would_merge_add: probe.would_merge_add,
is_patch_id_match: probe.is_patch_id_match,
})
}
}
pub struct IsAncestorTask;
impl Task for IsAncestorTask {
const KIND: TaskKind = TaskKind::IsAncestor;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.integration_target() else {
return Ok(TaskResult::IsAncestor {
item_idx: ctx.item_idx,
is_ancestor: false,
});
};
let repo = &ctx.repo;
let ref_to_check = ctx
.branch_ref
.branch
.as_deref()
.unwrap_or(&ctx.branch_ref.commit_sha);
let is_ancestor = repo
.is_ancestor(ref_to_check, &base)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::IsAncestor {
item_idx: ctx.item_idx,
is_ancestor,
})
}
}
pub struct BranchDiffTask;
impl Task for BranchDiffTask {
const KIND: TaskKind = TaskKind::BranchDiff;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.default_branch() else {
return Ok(TaskResult::BranchDiff {
item_idx: ctx.item_idx,
branch_diff: BranchDiffTotals::default(),
});
};
let repo = &ctx.repo;
let ref_to_check = ctx
.branch_ref
.branch
.as_deref()
.unwrap_or(&ctx.branch_ref.commit_sha);
let diff = repo
.branch_diff_stats(&base, ref_to_check)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::BranchDiff {
item_idx: ctx.item_idx,
branch_diff: BranchDiffTotals { diff },
})
}
}
pub struct WorkingTreeDiffTask;
impl Task for WorkingTreeDiffTask {
const KIND: TaskKind = TaskKind::WorkingTreeDiff;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let wt = ctx
.branch_ref
.working_tree(&ctx.repo)
.ok_or_else(|| ctx.error(Self::KIND, &anyhow::anyhow!("requires a worktree")))?;
let status_output = wt
.run_command(&["--no-optional-locks", "status", "--porcelain"])
.map_err(|e| ctx.error(Self::KIND, &e))?;
let (working_tree_status, is_dirty, has_conflicts) =
parse_working_tree_status(&status_output);
let working_tree_diff = if is_dirty {
wt.working_tree_diff_stats()
.map_err(|e| ctx.error(Self::KIND, &e))?
} else {
LineDiff::default()
};
Ok(TaskResult::WorkingTreeDiff {
item_idx: ctx.item_idx,
working_tree_diff,
working_tree_status,
has_conflicts,
})
}
}
pub struct MergeTreeConflictsTask;
impl Task for MergeTreeConflictsTask {
const KIND: TaskKind = TaskKind::MergeTreeConflicts;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.default_branch() else {
return Ok(TaskResult::MergeTreeConflicts {
item_idx: ctx.item_idx,
has_merge_tree_conflicts: false,
});
};
let repo = &ctx.repo;
let ref_to_check = ctx
.branch_ref
.branch
.as_deref()
.unwrap_or(&ctx.branch_ref.commit_sha);
let has_merge_tree_conflicts = repo
.has_merge_conflicts(&base, ref_to_check)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::MergeTreeConflicts {
item_idx: ctx.item_idx,
has_merge_tree_conflicts,
})
}
}
pub struct WorkingTreeConflictsTask;
impl Task for WorkingTreeConflictsTask {
const KIND: TaskKind = TaskKind::WorkingTreeConflicts;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(base) = ctx.default_branch() else {
return Ok(TaskResult::WorkingTreeConflicts {
item_idx: ctx.item_idx,
has_working_tree_conflicts: None,
});
};
let wt = ctx
.branch_ref
.working_tree(&ctx.repo)
.ok_or_else(|| ctx.error(Self::KIND, &anyhow::anyhow!("requires a worktree")))?;
let status_output = wt
.run_command(&["--no-optional-locks", "status", "--porcelain"])
.map_err(|e| ctx.error(Self::KIND, &e))?;
let is_dirty = !status_output.trim().is_empty();
if !is_dirty {
return Ok(TaskResult::WorkingTreeConflicts {
item_idx: ctx.item_idx,
has_working_tree_conflicts: None,
});
}
let has_unmerged = has_unmerged_entries(&status_output);
if has_unmerged {
return Ok(TaskResult::WorkingTreeConflicts {
item_idx: ctx.item_idx,
has_working_tree_conflicts: None,
});
}
let needs_working_tree = status_output
.lines()
.any(|l| l.starts_with("??") || l.as_bytes().get(1) != Some(&b' '));
let tree_sha = if needs_working_tree {
write_tree_with_working_tree(&wt).map_err(|e| ctx.error(Self::KIND, &e))?
} else {
wt.run_command(&["write-tree"])
.map(|s| s.trim().to_string())
.map_err(|e| ctx.error(Self::KIND, &e))?
};
let has_conflicts = ctx
.repo
.has_merge_conflicts_by_tree(&base, &ctx.branch_ref.commit_sha, &tree_sha)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::WorkingTreeConflicts {
item_idx: ctx.item_idx,
has_working_tree_conflicts: Some(has_conflicts),
})
}
}
fn write_tree_with_working_tree(wt: &worktrunk::git::WorkingTree) -> anyhow::Result<String> {
use worktrunk::shell_exec::Cmd;
let git_dir = wt.git_dir()?;
let worktree_root = wt.root()?;
let real_index = git_dir.join("index");
let log_ctx = wt
.path()
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(".")
.to_string();
let temp_index = tempfile::NamedTempFile::new().context("Failed to create temporary index")?;
std::fs::copy(&real_index, temp_index.path()).context("Failed to copy index file")?;
let temp_index_path = temp_index
.path()
.to_str()
.context("Temporary index path is not valid UTF-8")?;
Cmd::new("git")
.args(["add", "-A"])
.current_dir(&worktree_root)
.context(&log_ctx)
.env("GIT_INDEX_FILE", temp_index_path)
.run()
.context("Failed to stage working tree changes")?;
let output = Cmd::new("git")
.args(["write-tree"])
.current_dir(&worktree_root)
.context(&log_ctx)
.env("GIT_INDEX_FILE", temp_index_path)
.run()
.context("Failed to write tree from temporary index")?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
pub struct GitOperationTask;
impl Task for GitOperationTask {
const KIND: TaskKind = TaskKind::GitOperation;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let wt = ctx
.branch_ref
.working_tree(&ctx.repo)
.ok_or_else(|| ctx.error(Self::KIND, &anyhow::anyhow!("requires a worktree")))?;
let git_operation = detect_active_git_operation(&wt);
Ok(TaskResult::GitOperation {
item_idx: ctx.item_idx,
git_operation,
})
}
}
pub struct UserMarkerTask;
impl Task for UserMarkerTask {
const KIND: TaskKind = TaskKind::UserMarker;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let repo = &ctx.repo;
let user_marker = repo.user_marker(ctx.branch_ref.branch.as_deref());
Ok(TaskResult::UserMarker {
item_idx: ctx.item_idx,
user_marker,
})
}
}
pub struct UpstreamTask;
impl Task for UpstreamTask {
const KIND: TaskKind = TaskKind::Upstream;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let repo = &ctx.repo;
let Some(branch) = ctx.branch_ref.branch.as_deref() else {
return Ok(TaskResult::Upstream {
item_idx: ctx.item_idx,
upstream: UpstreamStatus::default(),
});
};
let upstream_branch = repo
.branch(branch)
.upstream()
.map_err(|e| ctx.error(Self::KIND, &e))?;
let Some(upstream_branch) = upstream_branch else {
return Ok(TaskResult::Upstream {
item_idx: ctx.item_idx,
upstream: UpstreamStatus::default(),
});
};
let remote = upstream_branch.split_once('/').map(|(r, _)| r.to_string());
let (ahead, behind) = repo
.ahead_behind(&upstream_branch, &ctx.branch_ref.commit_sha)
.map_err(|e| ctx.error(Self::KIND, &e))?;
Ok(TaskResult::Upstream {
item_idx: ctx.item_idx,
upstream: UpstreamStatus {
remote,
ahead,
behind,
},
})
}
}
pub struct CiStatusTask;
impl Task for CiStatusTask {
const KIND: TaskKind = TaskKind::CiStatus;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let repo = &ctx.repo;
let pr_status = ctx.branch_ref.branch.as_deref().and_then(|branch| {
let ci_branch = CiBranchName::from_branch_ref(branch, ctx.branch_ref.is_remote);
PrStatus::detect(repo, &ci_branch, &ctx.branch_ref.commit_sha)
});
Ok(TaskResult::CiStatus {
item_idx: ctx.item_idx,
pr_status,
})
}
}
pub struct UrlStatusTask;
impl Task for UrlStatusTask {
const KIND: TaskKind = TaskKind::UrlStatus;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(ref url) = ctx.item_url else {
return Ok(TaskResult::UrlStatus {
item_idx: ctx.item_idx,
url: None,
active: None,
});
};
let active = if std::env::var("WORKTRUNK_TEST_SKIP_URL_HEALTH_CHECK").is_ok() {
Some(false)
} else {
parse_port_from_url(url).map(|port| {
let addr = SocketAddr::from(([127, 0, 0, 1], port));
TcpStream::connect_timeout(&addr, Duration::from_millis(50)).is_ok()
})
};
Ok(TaskResult::UrlStatus {
item_idx: ctx.item_idx,
url: None,
active,
})
}
}
pub struct SummaryGenerateTask;
impl Task for SummaryGenerateTask {
const KIND: TaskKind = TaskKind::SummaryGenerate;
fn compute(ctx: TaskContext) -> Result<TaskResult, TaskError> {
let Some(ref llm_command) = ctx.llm_command else {
return Err(ctx.error(
Self::KIND,
&anyhow::anyhow!("SummaryGenerateTask requires llm_command"),
));
};
let branch = ctx.branch_ref.branch.as_deref().unwrap_or("(detached)");
let worktree_path = ctx.branch_ref.worktree_path.as_deref();
let summary = crate::summary::generate_summary_core(
branch,
&ctx.branch_ref.commit_sha,
worktree_path,
llm_command,
&ctx.repo,
)
.map_err(|e| ctx.error(Self::KIND, &e))?;
let subject = summary.as_deref().map(first_line);
Ok(TaskResult::SummaryGenerate {
item_idx: ctx.item_idx,
summary: subject,
})
}
}
fn first_line(s: &str) -> String {
s.lines()
.find(|l| !l.trim().is_empty())
.unwrap_or(s)
.to_string()
}
pub(crate) fn detect_active_git_operation(
wt: &worktrunk::git::WorkingTree<'_>,
) -> ActiveGitOperation {
if wt.is_rebasing().unwrap_or(false) {
ActiveGitOperation::Rebase
} else if wt.is_merging().unwrap_or(false) {
ActiveGitOperation::Merge
} else {
ActiveGitOperation::None
}
}
pub(crate) fn parse_port_from_url(url: &str) -> Option<u16> {
let url = url
.strip_prefix("http://")
.or_else(|| url.strip_prefix("https://"))?;
let host_port = url.split(&['/', '?', '#'][..]).next()?;
let (_host, port_str) = host_port.rsplit_once(':')?;
port_str.parse().ok()
}
pub(super) fn parse_working_tree_status(status_output: &str) -> (WorkingTreeStatus, bool, bool) {
let mut has_untracked = false;
let mut has_modified = false;
let mut has_staged = false;
let mut has_renamed = false;
let mut has_deleted = false;
let mut has_conflicts = false;
for line in status_output.lines() {
if line.len() < 2 {
continue;
}
let bytes = line.as_bytes();
let index_status = bytes[0] as char;
let worktree_status = bytes[1] as char;
if index_status == '?' && worktree_status == '?' {
has_untracked = true;
}
if matches!(worktree_status, 'M' | 'A' | 'T') {
has_modified = true;
}
if matches!(index_status, 'A' | 'M' | 'C' | 'T') {
has_staged = true;
}
if index_status == 'R' {
has_renamed = true;
}
if index_status == 'D' || worktree_status == 'D' {
has_deleted = true;
}
let is_unmerged_pair = matches!(
(index_status, worktree_status),
('U', _) | (_, 'U') | ('A', 'A') | ('D', 'D')
);
if is_unmerged_pair {
has_conflicts = true;
}
}
let working_tree_status = WorkingTreeStatus::new(
has_staged,
has_modified,
has_untracked,
has_renamed,
has_deleted,
);
let is_dirty = working_tree_status.is_dirty();
(working_tree_status, is_dirty, has_conflicts)
}
fn has_unmerged_entries(status_output: &str) -> bool {
status_output.lines().any(|l| {
l.len() >= 2 && {
let xy = &l.as_bytes()[0..2];
xy.contains(&b'U') || xy == b"AA" || xy == b"DD"
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_first_line_simple() {
assert_eq!(first_line("Add feature\n\nDetails here"), "Add feature");
}
#[test]
fn test_first_line_skips_empty() {
assert_eq!(first_line("\n\nAdd feature\nMore"), "Add feature");
}
#[test]
fn test_first_line_single_line() {
assert_eq!(first_line("Single line"), "Single line");
}
#[test]
fn test_first_line_empty_string() {
assert_eq!(first_line(""), "");
}
#[test]
fn unmerged_entries_detected_with_u() {
assert!(has_unmerged_entries("UU src/main.rs"));
assert!(has_unmerged_entries("AU src/main.rs"));
assert!(has_unmerged_entries("UA src/main.rs"));
assert!(has_unmerged_entries("DU src/main.rs"));
assert!(has_unmerged_entries("UD src/main.rs"));
}
#[test]
fn unmerged_entries_detected_aa_dd() {
assert!(has_unmerged_entries("AA src/main.rs"));
assert!(has_unmerged_entries("DD src/main.rs"));
}
#[test]
fn unmerged_entries_mixed_status() {
assert!(has_unmerged_entries("M src/lib.rs\nAA src/main.rs"));
assert!(has_unmerged_entries("?? untracked.txt\nDD deleted.rs"));
}
#[test]
fn unmerged_entries_not_detected_for_normal_status() {
assert!(!has_unmerged_entries("M src/main.rs"));
assert!(!has_unmerged_entries("A src/new.rs"));
assert!(!has_unmerged_entries("D src/old.rs"));
assert!(!has_unmerged_entries("?? untracked.txt"));
assert!(!has_unmerged_entries(""));
}
}