use std::time::{Duration, Instant};
use crossbeam_channel as chan;
use super::super::model::{ItemKind, ListItem};
use super::execution::ExpectedResults;
use super::types::{DrainOutcome, MissingResult, TaskError, TaskKind, TaskResult};
pub(super) const DRAIN_TIMEOUT: Duration = Duration::from_secs(120);
pub(super) const STALL_TIMINGS: StallTimings = StallTimings {
threshold: Duration::from_secs(5),
tick: Duration::from_millis(500),
};
#[derive(Debug, Clone, Copy)]
pub(super) struct StallTimings {
pub threshold: Duration,
pub tick: Duration,
}
pub(super) enum DrainEvent<'a> {
Result {
item_idx: usize,
item: &'a mut ListItem,
},
Reveal { items: &'a [ListItem] },
Stall {
pending_count: usize,
first_kind: TaskKind,
first_name: &'a str,
},
}
fn pick_pending_hint<'a>(
expected: &ExpectedResults,
received_by_item: &[Vec<TaskKind>],
items: &'a [ListItem],
) -> (usize, Option<(TaskKind, &'a str)>) {
let received_count: usize = received_by_item.iter().map(|v| v.len()).sum();
let pending_count = expected.count().saturating_sub(received_count);
let first = received_by_item
.iter()
.enumerate()
.zip(items.iter())
.find_map(|((item_idx, received), item)| {
expected
.results_for(item_idx)
.into_iter()
.find(|kind| !received.contains(kind))
.map(|kind| (kind, item.display_name()))
});
(pending_count, first)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn drain_results(
rx: chan::Receiver<Result<TaskResult, TaskError>>,
items: &mut [ListItem],
errors: &mut Vec<TaskError>,
expected_results: &ExpectedResults,
deadline: Instant,
integration_target: Option<&str>,
on_event: impl FnMut(DrainEvent<'_>),
reveal_at: Option<Instant>,
) -> DrainOutcome {
drain_results_with_timings(
rx,
items,
errors,
expected_results,
deadline,
integration_target,
STALL_TIMINGS,
on_event,
reveal_at,
)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn drain_results_with_timings(
rx: chan::Receiver<Result<TaskResult, TaskError>>,
items: &mut [ListItem],
errors: &mut Vec<TaskError>,
expected_results: &ExpectedResults,
deadline: Instant,
integration_target: Option<&str>,
stall_timings: StallTimings,
mut on_event: impl FnMut(DrainEvent<'_>),
mut reveal_at: Option<Instant>,
) -> DrainOutcome {
let mut received_by_item: Vec<Vec<TaskKind>> = vec![Vec::new(); items.len()];
let mut last_result_time = Instant::now();
loop {
if let Some(at) = reveal_at
&& Instant::now() >= at
{
on_event(DrainEvent::Reveal { items });
reveal_at = None;
}
let now = Instant::now();
let remaining = deadline.saturating_duration_since(now);
let mut recv_timeout_dur = remaining.min(stall_timings.tick);
if let Some(at) = reveal_at {
recv_timeout_dur = recv_timeout_dur.min(at.saturating_duration_since(now));
}
if remaining.is_zero() {
let received_count: usize = received_by_item.iter().map(|v| v.len()).sum();
let mut items_with_missing: Vec<MissingResult> = Vec::new();
for (item_idx, item) in items.iter().enumerate() {
let expected = expected_results.results_for(item_idx);
let received = received_by_item[item_idx].as_slice();
let missing_kinds: Vec<TaskKind> = expected
.iter()
.filter(|kind| !received.contains(kind))
.copied()
.collect();
if !missing_kinds.is_empty() {
items_with_missing.push(MissingResult {
item_idx,
name: item.display_name().to_string(),
missing_kinds,
});
}
}
items_with_missing.sort_by_key(|result| result.item_idx);
return DrainOutcome::TimedOut {
received_count,
items_with_missing,
};
}
let outcome = match rx.recv_timeout(recv_timeout_dur) {
Ok(outcome) => outcome,
Err(chan::RecvTimeoutError::Timeout) => {
if last_result_time.elapsed() >= stall_timings.threshold {
let (pending_count, first) =
pick_pending_hint(expected_results, &received_by_item, items);
if let Some((first_kind, first_name)) = first {
on_event(DrainEvent::Stall {
pending_count,
first_kind,
first_name,
});
}
}
continue;
}
Err(chan::RecvTimeoutError::Disconnected) => break, };
let (item_idx, kind) = match outcome {
Ok(ref result) => (result.item_idx(), TaskKind::from(result)),
Err(ref error) => (error.item_idx, error.kind),
};
received_by_item[item_idx].push(kind);
last_result_time = Instant::now();
if let Err(error) = outcome {
errors.push(error);
on_event(DrainEvent::Result {
item_idx,
item: &mut items[item_idx],
});
continue;
}
let result = outcome.unwrap();
let item = &mut items[item_idx];
match result {
TaskResult::CommitDetails { commit, .. } => {
item.commit = Some(commit);
}
TaskResult::AheadBehind {
counts, is_orphan, ..
} => {
item.counts = Some(counts);
item.is_orphan = Some(is_orphan);
}
TaskResult::CommittedTreesMatch {
committed_trees_match,
..
} => {
item.committed_trees_match = Some(committed_trees_match);
}
TaskResult::HasFileChanges {
has_file_changes, ..
} => {
item.has_file_changes = Some(has_file_changes);
}
TaskResult::WouldMergeAdd {
would_merge_add,
is_patch_id_match,
..
} => {
item.would_merge_add = Some(would_merge_add);
item.is_patch_id_match = Some(is_patch_id_match);
}
TaskResult::IsAncestor { is_ancestor, .. } => {
item.is_ancestor = Some(is_ancestor);
}
TaskResult::BranchDiff { branch_diff, .. } => {
item.branch_diff = Some(branch_diff);
}
TaskResult::WorkingTreeDiff {
working_tree_diff,
working_tree_status,
has_conflicts,
..
} => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.working_tree_diff = Some(working_tree_diff);
data.working_tree_status = Some(working_tree_status);
data.has_conflicts = Some(has_conflicts);
} else {
debug_assert!(false, "WorkingTreeDiff result for non-worktree item");
}
}
TaskResult::MergeTreeConflicts {
has_merge_tree_conflicts,
..
} => {
item.has_merge_tree_conflicts = Some(has_merge_tree_conflicts);
}
TaskResult::WorkingTreeConflicts {
has_working_tree_conflicts,
..
} => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.has_working_tree_conflicts = Some(has_working_tree_conflicts);
} else {
debug_assert!(false, "WorkingTreeConflicts result for non-worktree item");
}
}
TaskResult::GitOperation { git_operation, .. } => {
if let ItemKind::Worktree(data) = &mut item.kind {
data.git_operation = Some(git_operation);
} else {
debug_assert!(false, "GitOperation result for non-worktree item");
}
}
TaskResult::UserMarker { user_marker, .. } => {
item.user_marker = Some(user_marker);
}
TaskResult::Upstream { upstream, .. } => {
item.upstream = Some(upstream);
}
TaskResult::CiStatus { pr_status, .. } => {
item.pr_status = Some(pr_status);
}
TaskResult::UrlStatus { url, active, .. } => {
if url.is_some() {
item.url = url;
}
if active.is_some() {
item.url_active = active;
}
}
TaskResult::SummaryGenerate { summary, .. } => {
item.summary = Some(summary);
}
}
item.refresh_status_symbols(integration_target);
on_event(DrainEvent::Result { item_idx, item });
}
DrainOutcome::Complete
}
#[cfg(test)]
mod tests {
use super::super::super::model::{
AheadBehind, MainState, UpstreamStatus, WorkingTreeStatus, WorktreeState,
};
use super::super::execution::seed_skipped_task_defaults;
use super::super::types::ErrorCause;
use super::*;
use worktrunk::git::LineDiff;
fn seed_all_fields(item: &mut ListItem) {
for kind in [
TaskKind::UserMarker,
TaskKind::MergeTreeConflicts,
TaskKind::IsAncestor,
TaskKind::CommittedTreesMatch,
TaskKind::HasFileChanges,
TaskKind::WouldMergeAdd,
TaskKind::WorkingTreeConflicts,
TaskKind::GitOperation,
] {
seed_skipped_task_defaults(item, kind);
}
if let ItemKind::Worktree(data) = &mut item.kind {
data.working_tree_diff = Some(LineDiff::default());
data.working_tree_status = Some(WorkingTreeStatus::default());
data.has_conflicts = Some(false);
}
item.counts = Some(AheadBehind {
ahead: 3,
behind: 5,
});
item.is_orphan = Some(false);
item.upstream = Some(UpstreamStatus::default());
}
fn full_computation_ran(item: &ListItem) -> bool {
item.status_symbols.main_state == Some(MainState::Diverged)
}
fn fully_seeded_branch_item() -> ListItem {
let mut item = ListItem::new_branch("abc123".into(), "feat".into());
seed_all_fields(&mut item);
item
}
fn fully_seeded_worktree_item() -> ListItem {
use crate::commands::list::collect::build_worktree_item;
use worktrunk::git::WorktreeInfo;
let wt = WorktreeInfo {
path: std::path::PathBuf::from("/tmp/wt"),
head: "abc123".into(),
branch: Some("feat".into()),
bare: false,
detached: false,
locked: None,
prunable: None,
};
let mut item = build_worktree_item(&wt, false, false, false);
seed_all_fields(&mut item);
item
}
#[test]
fn test_drain_results_summary_generate() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
tx.send(Ok(TaskResult::SummaryGenerate {
item_idx: 0,
summary: Some("Add feature".into()),
}))
.unwrap();
drop(tx);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
None,
|_| {},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert_eq!(items[0].summary, Some(Some("Add feature".into())));
}
#[test]
fn test_refresh_fully_seeded_worktree_resolves_all_gates() {
let mut item = fully_seeded_worktree_item();
item.refresh_status_symbols(Some("main"));
let s = &item.status_symbols;
assert!(s.working_tree.is_some(), "gate 1 (working tree flags)");
assert!(s.operation_state.is_some(), "gate 2 (operation state)");
assert!(s.worktree_state.is_some(), "gate 2 (metadata)");
assert_eq!(
s.main_state,
Some(MainState::Diverged),
"gate 3 — baseline counts (3, 5) produce Diverged"
);
assert!(
s.upstream_divergence.is_some(),
"gate 4 (upstream divergence)"
);
assert!(s.user_marker.is_some(), "gate 5 (user marker)");
}
#[test]
fn test_refresh_fully_seeded_branch_resolves_all_gates() {
let mut item = fully_seeded_branch_item();
item.refresh_status_symbols(Some("main"));
let s = &item.status_symbols;
assert!(s.working_tree.is_some(), "gate 1 (branches are clean)");
assert!(s.operation_state.is_some(), "gate 2 (branches have no op)");
assert_eq!(s.worktree_state, Some(WorktreeState::Branch));
assert_eq!(
s.main_state,
Some(MainState::Diverged),
"gate 3 — baseline counts (3, 5) produce Diverged"
);
assert!(s.upstream_divergence.is_some());
assert!(s.user_marker.is_some());
}
#[test]
fn test_drain_results_status_stays_none_while_field_pending() {
let mut item = fully_seeded_branch_item();
item.counts = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::Upstream);
tx.send(Ok(TaskResult::Upstream {
item_idx: 0,
upstream: UpstreamStatus::default(),
}))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_| {},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
!full_computation_ran(&item),
"full status computation should not run while a required field is pending",
);
}
#[test]
fn test_drain_results_status_snaps_when_final_field_arrives() {
let mut item = fully_seeded_branch_item();
item.counts = None;
item.is_orphan = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::AheadBehind);
tx.send(Ok(TaskResult::AheadBehind {
item_idx: 0,
counts: AheadBehind {
ahead: 3,
behind: 5,
},
is_orphan: false,
}))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_| {},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
full_computation_ran(&item),
"full status computation should run once the final required field arrives",
);
}
#[test]
fn test_drain_results_status_stays_none_when_feeder_errors() {
let mut item = fully_seeded_branch_item();
item.counts = None;
item.is_orphan = None;
let (tx, rx) = crossbeam_channel::unbounded();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::AheadBehind);
tx.send(Err(TaskError::new(
0,
TaskKind::AheadBehind,
"boom",
ErrorCause::Other,
)))
.unwrap();
drop(tx);
let mut errors = Vec::new();
let outcome = drain_results(
rx,
std::slice::from_mut(&mut item),
&mut errors,
&expected,
Instant::now() + DRAIN_TIMEOUT,
Some("main"),
|_| {},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(
!full_computation_ran(&item),
"full status computation should not run when its sole feeder errored",
);
assert_eq!(errors.len(), 1);
}
#[test]
fn test_drain_results_timeout_returns_missing_diagnostics() {
let (_tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::CommitDetails);
expected.expect(0, TaskKind::AheadBehind);
let outcome = drain_results(
rx,
&mut items,
&mut errors,
&expected,
Instant::now(),
None,
|_| {},
None,
);
let DrainOutcome::TimedOut {
received_count,
items_with_missing,
} = outcome
else {
panic!("expected TimedOut with immediate deadline");
};
assert_eq!(received_count, 0);
assert_eq!(items_with_missing.len(), 1);
assert_eq!(items_with_missing[0].name, "feat");
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::CommitDetails)
);
assert!(
items_with_missing[0]
.missing_kinds
.contains(&TaskKind::AheadBehind)
);
}
#[test]
fn test_drain_results_fires_stall_when_silent_past_threshold() {
let (_tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![
ListItem::new_branch("abc123".into(), "feat".into()),
ListItem::new_branch("def456".into(), "other".into()),
];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::AheadBehind);
expected.expect(1, TaskKind::CommitDetails);
let mut stall_events: Vec<(usize, TaskKind, String)> = Vec::new();
let outcome = drain_results_with_timings(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + Duration::from_millis(200),
None,
StallTimings {
threshold: Duration::from_millis(20),
tick: Duration::from_millis(20),
},
|event| {
if let DrainEvent::Stall {
pending_count,
first_kind,
first_name,
} = event
{
stall_events.push((pending_count, first_kind, first_name.to_string()));
}
},
None,
);
assert!(matches!(outcome, DrainOutcome::TimedOut { .. }));
assert!(
!stall_events.is_empty(),
"expected at least one stall event before the deadline"
);
let (count, kind, name) = &stall_events[0];
assert_eq!(*count, 2);
assert_eq!(*kind, TaskKind::AheadBehind);
assert_eq!(name, "feat");
}
#[test]
fn test_drain_results_does_not_fire_stall_when_results_flow() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::CommitDetails);
tx.send(Ok(TaskResult::SummaryGenerate {
item_idx: 0,
summary: None,
}))
.unwrap();
drop(tx);
let mut stall_count = 0;
let outcome = drain_results_with_timings(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + Duration::from_millis(50),
None,
StallTimings {
threshold: Duration::from_secs(10), tick: Duration::from_millis(20),
},
|event| {
if matches!(event, DrainEvent::Stall { .. }) {
stall_count += 1;
}
},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert_eq!(stall_count, 0, "stall must not fire under the threshold");
}
#[test]
fn test_drain_results_survives_mid_stall_result() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
let mut errors = Vec::new();
let expected = ExpectedResults::default();
expected.expect(0, TaskKind::CommitDetails);
expected.expect(0, TaskKind::AheadBehind);
let mut sender = Some(tx);
let mut saw_result = false;
let mut stalls_before_result = 0;
let mut stalls_after_result = 0;
let outcome = drain_results_with_timings(
rx,
&mut items,
&mut errors,
&expected,
Instant::now() + Duration::from_secs(5),
None,
StallTimings {
threshold: Duration::from_millis(20),
tick: Duration::from_millis(10),
},
|event| match event {
DrainEvent::Stall { .. } => {
if saw_result {
stalls_after_result += 1;
sender.take();
} else {
stalls_before_result += 1;
if let Some(tx) = sender.as_ref() {
tx.send(Ok(TaskResult::SummaryGenerate {
item_idx: 0,
summary: None,
}))
.unwrap();
}
}
}
DrainEvent::Result { .. } => {
saw_result = true;
}
_ => {}
},
None,
);
assert!(matches!(outcome, DrainOutcome::Complete));
assert!(stalls_before_result >= 1);
assert!(saw_result, "drain should deliver the injected result");
assert!(
stalls_after_result >= 1,
"drain should keep emitting stalls after a mid-stall result"
);
}
}