1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! Background thread spawning for git status and PR status fetches.
use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use crate::git;
use super::super::agent;
use super::App;
use super::types::AppEvent;
impl App {
/// Spawn a background thread to fetch git status for all agent worktrees
pub(super) fn spawn_git_status_fetch(&self) {
// Skip if a fetch is already in progress (prevents thread pile-up)
if self
.is_git_fetching
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
let tx = self.event_tx.clone();
let is_fetching = self.is_git_fetching.clone();
let main_branch = self.config.main_branch.clone();
// Include both agent paths and worktree paths so the worktree view gets git status too
let mut paths: Vec<PathBuf> = self.all_agents.iter().map(|a| a.path.clone()).collect();
for wt in &self.worktrees {
if !paths.contains(&wt.path) {
paths.push(wt.path.clone());
}
}
std::thread::spawn(move || {
// Reset flag when thread completes (even on panic)
struct ResetFlag(Arc<AtomicBool>);
impl Drop for ResetFlag {
fn drop(&mut self) {
self.0.store(false, Ordering::SeqCst);
}
}
let _reset = ResetFlag(is_fetching);
for path in paths {
let status = git::get_git_status(&path, main_branch.as_deref());
let _ = tx.send(AppEvent::GitStatus(path, status));
}
});
}
/// Spawn a background thread to fetch PR status for all repos.
/// Returns true if a fetch was started, false if one is already in progress.
pub(super) fn spawn_pr_status_fetch(&self) -> bool {
// Skip if already fetching
if self
.is_pr_fetching
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return false;
}
// Collect branches per repo root from agents
// Use all_agents (not filtered agents) because apply_filters() runs after
// this in refresh(), so self.agents may still be empty on first call.
let mut repo_branches: HashMap<PathBuf, Vec<String>> = HashMap::new();
for agent in &self.all_agents {
let Some(status) = self.git_statuses.get(&agent.path) else {
continue;
};
let Some(ref branch) = status.branch else {
continue;
};
if branch == "main" || branch == "master" {
continue;
}
if let Some(repo_root) = self.repo_roots.get(&agent.path) {
repo_branches
.entry(repo_root.clone())
.or_default()
.push(branch.clone());
}
}
// Also collect branches from worktrees (keyed by main worktree path as repo root)
// Group non-main worktrees by their project's main worktree path
let main_paths: HashMap<String, PathBuf> = self
.all_worktrees
.iter()
.filter(|w| w.is_main)
.map(|w| {
let project = agent::extract_project_name(&w.path);
(project, w.path.clone())
})
.collect();
for wt in &self.all_worktrees {
if wt.is_main || wt.branch == "main" || wt.branch == "master" {
continue;
}
let project = agent::extract_project_name(&wt.path);
if let Some(repo_root) = main_paths.get(&project) {
repo_branches
.entry(repo_root.clone())
.or_default()
.push(wt.branch.clone());
}
}
// Deduplicate branches per repo
for branches in repo_branches.values_mut() {
branches.sort();
branches.dedup();
}
if repo_branches.is_empty() {
self.is_pr_fetching.store(false, Ordering::SeqCst);
// Return false so the caller doesn't reset the timer — no fetch
// was actually started, and we want to retry once git statuses
// or worktree data arrive.
return false;
}
let tx = self.event_tx.clone();
let is_fetching = self.is_pr_fetching.clone();
// Identify the priority repo (current project) so it fetches first
let priority_repo = self
.worktree_project_override
.as_ref()
.map(|(_, p)| p.clone())
.or_else(|| {
self.current_worktree
.as_ref()
.and_then(|p| self.repo_roots.get(p).cloned())
});
std::thread::spawn(move || {
struct ResetFlag(Arc<AtomicBool>);
impl Drop for ResetFlag {
fn drop(&mut self) {
self.0.store(false, Ordering::SeqCst);
}
}
let _reset = ResetFlag(is_fetching);
// Sort repos so the priority repo (current project) is fetched first
let mut repos: VecDeque<_> = repo_branches.into_iter().collect();
if let Some(ref priority) = priority_repo {
repos
.make_contiguous()
.sort_by_key(|(repo, _)| repo != priority);
}
// Fetch repos in parallel with bounded concurrency
let queue = Arc::new(Mutex::new(repos));
let workers = queue.lock().unwrap().len().min(4);
std::thread::scope(|s| {
for _ in 0..workers {
let queue = Arc::clone(&queue);
let tx = tx.clone();
s.spawn(move || {
loop {
let Some((repo_root, branches)) = queue.lock().unwrap().pop_front()
else {
break;
};
match crate::github::list_prs_for_branches(&repo_root, &branches) {
Ok(prs) => {
let _ = tx.send(AppEvent::PrStatus(repo_root, prs));
}
Err(e) => {
tracing::warn!(
"Failed to fetch PRs for {:?}: {}",
repo_root,
e
);
}
}
}
});
}
});
});
true
}
}