worktrunk 0.35.3

A CLI for Git worktree management, designed for parallel AI agent workflows
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
//! Result processing and draining.
//!
//! Contains the logic for processing task results:
//! - `drain_results()` - drain channel and apply results to items
//! - `apply_default()` - apply defaults for failed tasks

use std::time::{Duration, Instant};

use crossbeam_channel as chan;
use worktrunk::git::LineDiff;

/// Deadline for the entire drain operation. Generous to avoid flaky timeouts
/// under CI load where process spawning for ~70 work items can be slow.
pub(super) const DRAIN_TIMEOUT: Duration = Duration::from_secs(120);

use super::super::model::{CommitDetails, ItemKind, ListItem, UpstreamStatus, WorkingTreeStatus};
use super::execution::ExpectedResults;
use super::types::{DrainOutcome, MissingResult, StatusContext, TaskError, TaskKind, TaskResult};

/// Apply default values for a failed task.
///
/// When a task fails, we still need to populate the item fields with sensible
/// defaults so the UI can render. This centralizes all default logic in one place.
pub(super) fn apply_default(
    items: &mut [ListItem],
    status_contexts: &mut [StatusContext],
    error: &TaskError,
) {
    let idx = error.item_idx;
    match error.kind {
        TaskKind::CommitDetails => {
            items[idx].commit = Some(CommitDetails::default());
        }
        TaskKind::AheadBehind => {
            // Leave as None — UI shows `⋯` for not-loaded tasks
            // Conservative: don't claim orphan if we couldn't check
            items[idx].is_orphan = Some(false);
        }
        TaskKind::CommittedTreesMatch => {
            // Conservative: don't claim integrated if we couldn't check
            items[idx].committed_trees_match = Some(false);
        }
        TaskKind::HasFileChanges => {
            // Conservative: assume has changes if we couldn't check
            items[idx].has_file_changes = Some(true);
        }
        TaskKind::WouldMergeAdd => {
            // Conservative: assume would add changes if we couldn't check
            items[idx].would_merge_add = Some(true);
            items[idx].is_patch_id_match = Some(false);
        }
        TaskKind::IsAncestor => {
            // Conservative: don't claim merged if we couldn't check
            items[idx].is_ancestor = Some(false);
        }
        TaskKind::BranchDiff => {
            // Leave as None — UI shows `…` for skipped/failed tasks
        }
        TaskKind::WorkingTreeDiff => {
            if let ItemKind::Worktree(data) = &mut items[idx].kind {
                data.working_tree_diff = Some(LineDiff::default());
            } else {
                debug_assert!(false, "WorkingTreeDiff task spawned for non-worktree item");
            }
            status_contexts[idx].working_tree_status = Some(WorkingTreeStatus::default());
            status_contexts[idx].has_conflicts = false;
        }
        TaskKind::MergeTreeConflicts => {
            // Don't show conflict symbol if we couldn't check
            status_contexts[idx].has_merge_tree_conflicts = false;
        }
        TaskKind::WorkingTreeConflicts => {
            // Fall back to commit-based check on failure
            status_contexts[idx].has_working_tree_conflicts = None;
        }
        TaskKind::GitOperation => {
            // Already defaults to ActiveGitOperation::None in WorktreeData
        }
        TaskKind::UserMarker => {
            // Already defaults to None
            status_contexts[idx].user_marker = None;
        }
        TaskKind::Upstream => {
            items[idx].upstream = Some(UpstreamStatus::default());
        }
        TaskKind::CiStatus => {
            // Leave as None (not fetched) on error. This allows the hint path
            // in mod.rs to run and show "install gh/glab" when CI tools fail.
            // Some(None) means "CI tool ran successfully but found no PR".
        }
        TaskKind::UrlStatus => {
            // URL is set at item creation, only default url_active
            items[idx].url_active = None;
        }
        TaskKind::SummaryGenerate => {
            // Leave as None — no summary available
        }
    }
}

/// Drain task results from the channel and apply them to items.
///
/// This is the shared logic between progressive and buffered collection modes.
/// The `on_result` callback is called after each result is processed with the
/// item index and a reference to the updated item, allowing progressive mode
/// to update the live table while buffered mode does nothing.
///
/// Uses a caller-provided `deadline` to cap wall-clock time. When the deadline
/// is reached, returns `DrainOutcome::TimedOut` with diagnostic info.
///
/// Errors are collected in the `errors` vec for display after rendering.
/// Default values are applied for failed tasks so the UI can still render.
///
/// Callers decide how to handle timeout:
/// - `collect()`: Shows user-facing diagnostic (interactive command)
/// - `populate_item()`: Logs silently (used by statusline)
pub(super) fn drain_results(
    rx: chan::Receiver<Result<TaskResult, TaskError>>,
    items: &mut [ListItem],
    errors: &mut Vec<TaskError>,
    expected_results: &ExpectedResults,
    deadline: Instant,
    mut on_result: impl FnMut(usize, &mut ListItem, &StatusContext),
) -> DrainOutcome {
    // Track which result kinds we've received per item (for timeout diagnostics)
    let mut received_by_item: Vec<Vec<TaskKind>> = vec![Vec::new(); items.len()];

    // Temporary storage for data needed by status_symbols computation
    let mut status_contexts = vec![StatusContext::default(); items.len()];

    // Process task results as they arrive (with deadline)
    loop {
        let remaining = deadline.saturating_duration_since(Instant::now());
        if remaining.is_zero() {
            // Deadline exceeded - build diagnostic info showing MISSING results
            let received_count: usize = received_by_item.iter().map(|v| v.len()).sum();

            // Find items with missing results by comparing received vs expected
            let mut items_with_missing: Vec<MissingResult> = Vec::new();

            for (item_idx, item) in items.iter().enumerate() {
                // Get expected results for this item (populated at spawn time)
                let expected = expected_results.results_for(item_idx);

                // Get received results for this item (empty vec if none received)
                let received = received_by_item[item_idx].as_slice();

                // Find missing results
                let missing_kinds: Vec<TaskKind> = expected
                    .iter()
                    .filter(|kind| !received.contains(kind))
                    .copied()
                    .collect();

                if !missing_kinds.is_empty() {
                    let name = item
                        .branch
                        .clone()
                        .unwrap_or_else(|| item.head[..8.min(item.head.len())].to_string());
                    items_with_missing.push(MissingResult {
                        item_idx,
                        name,
                        missing_kinds,
                    });
                }
            }

            // Sort by item index and limit to first 5
            items_with_missing.sort_by_key(|result| result.item_idx);
            items_with_missing.truncate(5);

            return DrainOutcome::TimedOut {
                received_count,
                items_with_missing,
            };
        }

        let outcome = match rx.recv_timeout(remaining) {
            Ok(outcome) => outcome,
            Err(chan::RecvTimeoutError::Timeout) => continue, // Check deadline in next iteration
            Err(chan::RecvTimeoutError::Disconnected) => break, // All senders dropped - done
        };

        // Handle success or error
        let (item_idx, kind) = match outcome {
            Ok(ref result) => (result.item_idx(), TaskKind::from(result)),
            Err(ref error) => (error.item_idx, error.kind),
        };

        // Track this result for diagnostics (both success and error count as "received")
        received_by_item[item_idx].push(kind);

        // Handle error case: apply defaults and collect error
        if let Err(error) = outcome {
            apply_default(items, &mut status_contexts, &error);
            errors.push(error);
            let item = &mut items[item_idx];
            let status_ctx = &status_contexts[item_idx];
            on_result(item_idx, item, status_ctx);
            continue;
        }

        // Handle success case
        let result = outcome.unwrap();
        let item = &mut items[item_idx];
        let status_ctx = &mut status_contexts[item_idx];

        match result {
            TaskResult::CommitDetails { commit, .. } => {
                item.commit = Some(commit);
            }
            TaskResult::AheadBehind {
                counts, is_orphan, ..
            } => {
                item.counts = Some(counts);
                item.is_orphan = Some(is_orphan);
            }
            TaskResult::CommittedTreesMatch {
                committed_trees_match,
                ..
            } => {
                item.committed_trees_match = Some(committed_trees_match);
            }
            TaskResult::HasFileChanges {
                has_file_changes, ..
            } => {
                item.has_file_changes = Some(has_file_changes);
            }
            TaskResult::WouldMergeAdd {
                would_merge_add,
                is_patch_id_match,
                ..
            } => {
                item.would_merge_add = Some(would_merge_add);
                item.is_patch_id_match = Some(is_patch_id_match);
            }
            TaskResult::IsAncestor { is_ancestor, .. } => {
                item.is_ancestor = Some(is_ancestor);
            }
            TaskResult::BranchDiff { branch_diff, .. } => {
                item.branch_diff = Some(branch_diff);
            }
            TaskResult::WorkingTreeDiff {
                working_tree_diff,
                working_tree_status,
                has_conflicts,
                ..
            } => {
                if let ItemKind::Worktree(data) = &mut item.kind {
                    data.working_tree_diff = Some(working_tree_diff);
                } else {
                    debug_assert!(false, "WorkingTreeDiff result for non-worktree item");
                }
                // Store for status_symbols computation
                status_ctx.working_tree_status = Some(working_tree_status);
                status_ctx.has_conflicts = has_conflicts;
            }
            TaskResult::MergeTreeConflicts {
                has_merge_tree_conflicts,
                ..
            } => {
                // Store for status_symbols computation
                status_ctx.has_merge_tree_conflicts = has_merge_tree_conflicts;
            }
            TaskResult::WorkingTreeConflicts {
                has_working_tree_conflicts,
                ..
            } => {
                // Store for status_symbols computation (takes precedence over commit check)
                status_ctx.has_working_tree_conflicts = has_working_tree_conflicts;
            }
            TaskResult::GitOperation { git_operation, .. } => {
                if let ItemKind::Worktree(data) = &mut item.kind {
                    data.git_operation = git_operation;
                } else {
                    debug_assert!(false, "GitOperation result for non-worktree item");
                }
            }
            TaskResult::UserMarker { user_marker, .. } => {
                // Store for status_symbols computation
                status_ctx.user_marker = user_marker;
            }
            TaskResult::Upstream { upstream, .. } => {
                item.upstream = Some(upstream);
            }
            TaskResult::CiStatus { pr_status, .. } => {
                // Wrap in Some() to indicate "loaded" (Some(None) = no CI, Some(Some(status)) = has CI)
                item.pr_status = Some(pr_status);
            }
            TaskResult::UrlStatus { url, active, .. } => {
                // Two-phase URL rendering:
                // 1. First result (from spawning code): url=Some, active=None → URL appears in normal styling
                // 2. Second result (from health check): url=None, active=Some → dims if inactive
                // Only update non-None fields to preserve values from earlier results.
                if url.is_some() {
                    item.url = url;
                }
                if active.is_some() {
                    item.url_active = active;
                }
            }
            TaskResult::SummaryGenerate { summary, .. } => {
                item.summary = Some(summary);
            }
        }

        // Invoke callback (progressive mode re-renders rows, buffered mode does nothing)
        on_result(item_idx, item, status_ctx);
    }

    DrainOutcome::Complete
}

#[cfg(test)]
mod tests {
    use super::super::types::ErrorCause;
    use super::*;

    #[test]
    fn test_apply_default_summary_generate() {
        let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
        let mut status_contexts = vec![StatusContext::default()];

        let error = TaskError::new(
            0,
            TaskKind::SummaryGenerate,
            "llm failed",
            ErrorCause::Other,
        );
        apply_default(&mut items, &mut status_contexts, &error);

        // SummaryGenerate default leaves summary as None
        assert!(items[0].summary.is_none());
    }

    #[test]
    fn test_drain_results_summary_generate() {
        let (tx, rx) = crossbeam_channel::unbounded();
        let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
        let mut errors = Vec::new();
        let expected = ExpectedResults::default();

        // Send a SummaryGenerate result
        tx.send(Ok(TaskResult::SummaryGenerate {
            item_idx: 0,
            summary: Some("Add feature".into()),
        }))
        .unwrap();
        drop(tx);

        let outcome = drain_results(
            rx,
            &mut items,
            &mut errors,
            &expected,
            Instant::now() + DRAIN_TIMEOUT,
            |_, _, _| {},
        );
        assert!(matches!(outcome, DrainOutcome::Complete));
        assert_eq!(items[0].summary, Some(Some("Add feature".into())));
    }

    #[test]
    fn test_drain_results_timeout_returns_missing_diagnostics() {
        let (_tx, rx) = crossbeam_channel::unbounded();
        let mut items = vec![ListItem::new_branch("abc123".into(), "feat".into())];
        let mut errors = Vec::new();

        // Register expected results but don't send any — simulates tasks still running
        let expected = ExpectedResults::default();
        expected.expect(0, TaskKind::CommitDetails);
        expected.expect(0, TaskKind::AheadBehind);

        // Use an already-expired deadline — remaining.is_zero() triggers immediately
        let outcome = drain_results(
            rx,
            &mut items,
            &mut errors,
            &expected,
            Instant::now(),
            |_, _, _| {},
        );

        let DrainOutcome::TimedOut {
            received_count,
            items_with_missing,
        } = outcome
        else {
            panic!("expected TimedOut with immediate deadline");
        };

        assert_eq!(received_count, 0);
        assert_eq!(items_with_missing.len(), 1);
        assert_eq!(items_with_missing[0].name, "feat");
        assert!(
            items_with_missing[0]
                .missing_kinds
                .contains(&TaskKind::CommitDetails)
        );
        assert!(
            items_with_missing[0]
                .missing_kinds
                .contains(&TaskKind::AheadBehind)
        );
    }
}