use std::time::{Duration, Instant};
use crossbeam_channel as chan;
pub(super) const DRAIN_TIMEOUT: Duration = Duration::from_secs(120);
use super::super::model::{ItemKind, ListItem};
use super::execution::ExpectedResults;
use super::types::{DrainOutcome, MissingResult, TaskError, TaskKind, TaskResult};
pub(super) fn drain_results(
rx: chan::Receiver<Result<TaskResult, TaskError>>,
items: &mut [ListItem],
errors: &mut Vec<TaskError>,
expected_results: &ExpectedResults,
deadline: Instant,
integration_target: Option<&str>,
mut on_result: impl FnMut(usize, &mut ListItem),
) -> DrainOutcome {
let mut received_by_item: Vec<Vec<TaskKind>> = vec![Vec::new(); items.len()];
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
let received_count: usize = received_by_item.iter().map(|v| v.len()).sum();
let mut items_with_missing: Vec<MissingResult> = Vec::new();
for (item_idx, item) in items.iter().enumerate() {
let expected = expected_results.results_for(item_idx);
let received = received_by_item[item_idx].as_slice();
let missing_kinds: Vec<TaskKind> = expected
.iter()
.filter(|kind| !received.contains(kind))
.copied()
.collect();
if !missing_kinds.is_empty() {
let name = item
.branch
.clone()
.unwrap_or_else(|| item.head[..8.min(item.head.len())].to_string());
items_with_missing.push(MissingResult {
item_idx,
name,
missing_kinds,
});
}
}
items_with_missing.sort_by_key(|result| result.item_idx);
return DrainOutcome::TimedOut {
received_count,
items_with_missing,
};
}
let outcome = match rx.recv_timeout(remaining) {
Ok(outcome) => outcome,
Err(chan::RecvTimeoutError::Timeout) => continue, Err(chan::RecvTimeoutError::Disconnected) => break, };
let (item_idx, kind) = match outcome {
Ok(ref result) => (result.item_idx(), TaskKind::from(result)),
Err(ref error) => (error.item_idx, error.kind),
};
received_by_item[item_idx].push(kind);
if let Err(error) = outcome {
errors.push(error);
on_result(item_idx, &mut items[item_idx]);
continue;
}
let result = outcome.unwrap();
let item = &mut items[item_idx];
match result {
TaskResult::CommitDetails { commit, .. } => {
item.commit = Some(commit);
}
TaskResult::AheadBehind {
counts, is_orphan, ..
} => {
item.counts = Some(counts);
item.is_orphan = Some(is_orphan);
}
TaskResult::CommittedTreesMatch {
committed_trees_match,
..
} => {
item.committed_trees_match = Some(committed_trees_match);
}
TaskResult::HasFileChanges {
has_file_changes, ..
} => {
item.has_file_changes = Some(has_file_changes);
}
TaskResult::WouldMergeAdd {
would_merge_add,
is_patch_id_match,
..
} => {
item.would_merge_add = Some(would_merge_add);
item.is_patch_id_match = Some(is_patch_id_match);
}
TaskResult::IsAncestor { is_ancestor, .. } => {
item.is_ancestor = Some(is_ancestor);
}
TaskResult::BranchDiff { branch_diff, .. } => {
item.branch_diff = Some(branch_diff);
}
TaskResult::WorkingTreeDiff {
working_tree_diff,
working_tree_status,
has_conflicts,
..
} => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.working_tree_diff = Some(working_tree_diff);
data.working_tree_status = Some(working_tree_status);
data.has_conflicts = Some(has_conflicts);
} else {
debug_assert!(false, "WorkingTreeDiff result for non-worktree item");
}
}
TaskResult::MergeTreeConflicts {
has_merge_tree_conflicts,
..
} => {
item.has_merge_tree_conflicts = Some(has_merge_tree_conflicts);
}
TaskResult::WorkingTreeConflicts {
has_working_tree_conflicts,
..
} => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.has_working_tree_conflicts = Some(has_working_tree_conflicts);
} else {
debug_assert!(false, "WorkingTreeConflicts result for non-worktree item");
}
}
TaskResult::GitOperation { git_operation, .. } => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.git_operation = Some(git_operation);
} else {
debug_assert!(false, "GitOperation result for non-worktree item");
}
}
TaskResult::UserMarker { user_marker, .. } => {
item.user_marker = Some(user_marker);
}
TaskResult::Upstream { upstream, .. } => {
item.upstream = Some(upstream);
}
TaskResult::CiStatus { pr_status, .. } => {
item.pr_status = Some(pr_status);
}
TaskResult::UrlStatus { url, active, .. } => {
if url.is_some() {
item.url = url;
}
if active.is_some() {
item.url_active = active;
}
}
TaskResult::SummaryGenerate { summary, .. } => {
item.summary = Some(summary);
}
}
item.refresh_status_symbols(integration_target);
on_result(item_idx, item);
}
DrainOutcome::Complete
}
#[cfg(test)]
mod tests {
use super::super::super::model::{
AheadBehind, MainState, UpstreamStatus, WorkingTreeStatus, WorktreeState,
};
use super::super::execution::seed_skipped_task_defaults;
use super::super::types::ErrorCause;
use super::*;
use worktrunk::git::LineDiff;
fn seed_all_fields(item: &mut ListItem) {
for kind in [
TaskKind::UserMarker,
TaskKind::MergeTreeConflicts,
TaskKind::IsAncestor,
TaskKind::CommittedTreesMatch,
TaskKind::HasFileChanges,
TaskKind::WouldMergeAdd,
TaskKind::WorkingTreeConflicts,
TaskKind::GitOperation,
] {
seed_skipped_task_defaults(item, kind);
}
if let ItemKind::Worktree(data) = &mut item.kind {
data.working_tree_diff = Some(LineDiff::default());
data.working_tree_status = Some(WorkingTreeStatus::default());
data.has_conflicts = Some(false);
}
item.counts = Some(AheadBehind {
ahead: 3,
behind: 5,
});
item.is_orphan = Some(false);
item.upstream = Some(UpstreamStatus::default());
}
fn full_computation_ran(item: &ListItem) -> bool {
item.status_symbols.main_state == Some(MainState::Diverged)
}
fn fully_seeded_branch_item() -> ListItem {
let mut item = ListItem::new_branch("abc123".into(), "feat".into());
seed_all_fields(&mut item);
item
}
fn fully_seeded_worktree_item() -> ListItem {
use crate::commands::list::collect::build_worktree_item;
use worktrunk::git::WorktreeInfo;
let wt = WorktreeInfo {
path: std::path::PathBuf::from("/tmp/wt"),
head: "abc123".into(),
branch: Some("feat".into()),
bare: false,
detached: false,
locked: None,
prunable: None,
};
let mut item = build_worktree_item(&wt, false, false, false);
seed_all_fields(&mut item);
item
}
#[test]
fn test_drain_results_summary_generate() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
tx.send(Ok(TaskResult::SummaryGenerate {
item_idx: 0,
summary: Some("Add feature".into()),
}))
.unwrap();
drop(tx);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
None,
|_, _| {},
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert_eq!(items[0].summary, Some(Some("Add feature".into())));
}
#[test]
fn test_refresh_fully_seeded_worktree_resolves_all_gates() {
let mut item = fully_seeded_worktree_item();
item.refresh_status_symbols(Some("main"));
let s = &item.status_symbols;
assert!(s.working_tree.is_some(), "gate 1 (working tree flags)");
assert!(s.operation_state.is_some(), "gate 2 (operation state)");
assert!(s.worktree_state.is_some(), "gate 2 (metadata)");
assert_eq!(
s.main_state,
Some(MainState::Diverged),
"gate 3 — baseline counts (3, 5) produce Diverged"
);
assert!(
s.upstream_divergence.is_some(),
"gate 4 (upstream divergence)"
);
assert!(s.user_marker.is_some(), "gate 5 (user marker)");
}
#[test]
fn test_refresh_fully_seeded_branch_resolves_all_gates() {
let mut item = fully_seeded_branch_item();
item.refresh_status_symbols(Some("main"));
let s = &item.status_symbols;
assert!(s.working_tree.is_some(), "gate 1 (branches are clean)");
assert!(s.operation_state.is_some(), "gate 2 (branches have no op)");
assert_eq!(s.worktree_state, Some(WorktreeState::Branch));
assert_eq!(
s.main_state,
Some(MainState::Diverged),
"gate 3 — baseline counts (3, 5) produce Diverged"
);
assert!(s.upstream_divergence.is_some());
assert!(s.user_marker.is_some());
}
#[test]
fn test_drain_results_status_stays_none_while_field_pending() {
let mut item = fully_seeded_branch_item();
item.counts = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::Upstream);
tx.send(Ok(TaskResult::Upstream {
item_idx: 0,
upstream: UpstreamStatus::default(),
}))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_, _| {},
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
!full_computation_ran(&item),
"full status computation should not run while a required field is pending",
);
}
#[test]
fn test_drain_results_status_snaps_when_final_field_arrives() {
let mut item = fully_seeded_branch_item();
item.counts = None;
item.is_orphan = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::AheadBehind);
tx.send(Ok(TaskResult::AheadBehind {
item_idx: 0,
counts: AheadBehind {
ahead: 3,
behind: 5,
},
is_orphan: false,
}))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_, _| {},
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
full_computation_ran(&item),
"full status computation should run once the final required field arrives",
);
}
#[test]
fn test_drain_results_status_stays_none_when_feeder_errors() {
let mut item = fully_seeded_branch_item();
item.counts = None;
item.is_orphan = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::AheadBehind);
tx.send(Err(TaskError::new(
0,
TaskKind::AheadBehind,
"boom",
ErrorCause::Other,
)))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_, _| {},
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
!full_computation_ran(&item),
"full status computation should not run when its sole feeder errored",
);
assert_eq!(errors.len(), 1);
}
#[test]
fn test_drain_results_timeout_returns_missing_diagnostics() {
let (_tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::CommitDetails);
expected.expect(0, TaskKind::AheadBehind);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now(),
None,
|_, _| {},
);
let DrainOutcome::TimedOut {
received_count,
items_with_missing,
} = outcome
else {
panic!("expected TimedOut with immediate deadline");
};
assert_eq!(received_count, 0);
assert_eq!(items_with_missing.len(), 1);
assert_eq!(items_with_missing[0].name, "feat");
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::CommitDetails)
);
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::AheadBehind)
);
}
}