use std::time::{Duration, Instant};
use crossbeam_channel as chan;
use worktrunk::git::LineDiff;
pub(super) const DRAIN_TIMEOUT: Duration = Duration::from_secs(120);
use super::super::model::{CommitDetails, ItemKind, ListItem, UpstreamStatus, WorkingTreeStatus};
use super::execution::ExpectedResults;
use super::types::{DrainOutcome, MissingResult, StatusContext, TaskError, TaskKind, TaskResult};
pub(super) fn apply_default(
items: &mut [ListItem],
status_contexts: &mut [StatusContext],
error: &TaskError,
) {
let idx = error.item_idx;
match error.kind {
TaskKind::CommitDetails => {
items[idx].commit = Some(CommitDetails::default());
}
TaskKind::AheadBehind => {
items[idx].is_orphan = Some(false);
}
TaskKind::CommittedTreesMatch => {
items[idx].committed_trees_match = Some(false);
}
TaskKind::HasFileChanges => {
items[idx].has_file_changes = Some(true);
}
TaskKind::WouldMergeAdd => {
items[idx].would_merge_add = Some(true);
items[idx].is_patch_id_match = Some(false);
}
TaskKind::IsAncestor => {
items[idx].is_ancestor = Some(false);
}
TaskKind::BranchDiff => {
}
TaskKind::WorkingTreeDiff => {
if let ItemKind::Worktree(data) = &mut items[idx].kind {
data.working_tree_diff = Some(LineDiff::default());
} else {
debug_assert!(false, "WorkingTreeDiff task spawned for non-worktree item");
}
status_contexts[idx].working_tree_status = Some(WorkingTreeStatus::default());
status_contexts[idx].has_conflicts = false;
}
TaskKind::MergeTreeConflicts => {
status_contexts[idx].has_merge_tree_conflicts = false;
}
TaskKind::WorkingTreeConflicts => {
status_contexts[idx].has_working_tree_conflicts = None;
}
TaskKind::GitOperation => {
}
TaskKind::UserMarker => {
status_contexts[idx].user_marker = None;
}
TaskKind::Upstream => {
items[idx].upstream = Some(UpstreamStatus::default());
}
TaskKind::CiStatus => {
}
TaskKind::UrlStatus => {
items[idx].url_active = None;
}
TaskKind::SummaryGenerate => {
}
}
}
pub(super) fn drain_results(
rx: chan::Receiver<Result<TaskResult, TaskError>>,
items: &mut [ListItem],
errors: &mut Vec<TaskError>,
expected_results: &ExpectedResults,
deadline: Instant,
mut on_result: impl FnMut(usize, &mut ListItem, &StatusContext),
) -> DrainOutcome {
let mut received_by_item: Vec<Vec<TaskKind>> = vec![Vec::new(); items.len()];
let mut status_contexts = vec![StatusContext::default(); items.len()];
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
let received_count: usize = received_by_item.iter().map(|v| v.len()).sum();
let mut items_with_missing: Vec<MissingResult> = Vec::new();
for (item_idx, item) in items.iter().enumerate() {
let expected = expected_results.results_for(item_idx);
let received = received_by_item[item_idx].as_slice();
let missing_kinds: Vec<TaskKind> = expected
.iter()
.filter(|kind| !received.contains(kind))
.copied()
.collect();
if !missing_kinds.is_empty() {
let name = item
.branch
.clone()
.unwrap_or_else(|| item.head[..8.min(item.head.len())].to_string());
items_with_missing.push(MissingResult {
item_idx,
name,
missing_kinds,
});
}
}
items_with_missing.sort_by_key(|result| result.item_idx);
items_with_missing.truncate(5);
return DrainOutcome::TimedOut {
received_count,
items_with_missing,
};
}
let outcome = match rx.recv_timeout(remaining) {
Ok(outcome) => outcome,
Err(chan::RecvTimeoutError::Timeout) => continue, Err(chan::RecvTimeoutError::Disconnected) => break, };
let (item_idx, kind) = match outcome {
Ok(ref result) => (result.item_idx(), TaskKind::from(result)),
Err(ref error) => (error.item_idx, error.kind),
};
received_by_item[item_idx].push(kind);
if let Err(error) = outcome {
apply_default(items, &mut status_contexts, &error);
errors.push(error);
let item = &mut items[item_idx];
let status_ctx = &status_contexts[item_idx];
on_result(item_idx, item, status_ctx);
continue;
}
let result = outcome.unwrap();
let item = &mut items[item_idx];
let status_ctx = &mut status_contexts[item_idx];
match result {
TaskResult::CommitDetails { commit, .. } => {
item.commit = Some(commit);
}
TaskResult::AheadBehind {
counts, is_orphan, ..
} => {
item.counts = Some(counts);
item.is_orphan = Some(is_orphan);
}
TaskResult::CommittedTreesMatch {
committed_trees_match,
..
} => {
item.committed_trees_match = Some(committed_trees_match);
}
TaskResult::HasFileChanges {
has_file_changes, ..
} => {
item.has_file_changes = Some(has_file_changes);
}
TaskResult::WouldMergeAdd {
would_merge_add,
is_patch_id_match,
..
} => {
item.would_merge_add = Some(would_merge_add);
item.is_patch_id_match = Some(is_patch_id_match);
}
TaskResult::IsAncestor { is_ancestor, .. } => {
item.is_ancestor = Some(is_ancestor);
}
TaskResult::BranchDiff { branch_diff, .. } => {
item.branch_diff = Some(branch_diff);
}
TaskResult::WorkingTreeDiff {
working_tree_diff,
working_tree_status,
has_conflicts,
..
} => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.working_tree_diff = Some(working_tree_diff);
} else {
debug_assert!(false, "WorkingTreeDiff result for non-worktree item");
}
status_ctx.working_tree_status = Some(working_tree_status);
status_ctx.has_conflicts = has_conflicts;
}
TaskResult::MergeTreeConflicts {
has_merge_tree_conflicts,
..
} => {
status_ctx.has_merge_tree_conflicts = has_merge_tree_conflicts;
}
TaskResult::WorkingTreeConflicts {
has_working_tree_conflicts,
..
} => {
status_ctx.has_working_tree_conflicts = has_working_tree_conflicts;
}
TaskResult::GitOperation { git_operation, .. } => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.git_operation = git_operation;
} else {
debug_assert!(false, "GitOperation result for non-worktree item");
}
}
TaskResult::UserMarker { user_marker, .. } => {
status_ctx.user_marker = user_marker;
}
TaskResult::Upstream { upstream, .. } => {
item.upstream = Some(upstream);
}
TaskResult::CiStatus { pr_status, .. } => {
item.pr_status = Some(pr_status);
}
TaskResult::UrlStatus { url, active, .. } => {
if url.is_some() {
item.url = url;
}
if active.is_some() {
item.url_active = active;
}
}
TaskResult::SummaryGenerate { summary, .. } => {
item.summary = Some(summary);
}
}
on_result(item_idx, item, status_ctx);
}
DrainOutcome::Complete
}
#[cfg(test)]
mod tests {
use super::super::types::ErrorCause;
use super::*;
#[test]
fn test_apply_default_summary_generate() {
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut status_contexts = vec![StatusContext::default()];
let error = TaskError::new(
0,
TaskKind::SummaryGenerate,
"llm failed",
ErrorCause::Other,
);
apply_default(&mut items, &mut status_contexts, &error);
assert!(items[0].summary.is_none());
}
#[test]
fn test_drain_results_summary_generate() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
tx.send(Ok(TaskResult::SummaryGenerate {
item_idx: 0,
summary: Some("Add feature".into()),
}))
.unwrap();
drop(tx);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
|_, _, _| {},
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert_eq!(items[0].summary, Some(Some("Add feature".into())));
}
#[test]
fn test_drain_results_timeout_returns_missing_diagnostics() {
let (_tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::CommitDetails);
expected.expect(0, TaskKind::AheadBehind);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now(),
|_, _, _| {},
);
let DrainOutcome::TimedOut {
received_count,
items_with_missing,
} = outcome
else {
panic!("expected TimedOut with immediate deadline");
};
assert_eq!(received_count, 0);
assert_eq!(items_with_missing.len(), 1);
assert_eq!(items_with_missing[0].name, "feat");
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::CommitDetails)
);
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::AheadBehind)
);
}
}