1use crate::buffered_eprintln;
2use crate::config::Config;
3use crate::github::cache::CacheConfig;
4use crate::github::types::PullRequest;
5use crate::scoring::{calculate_score, merge_scoring_configs, ScoreResult};
6use crate::snooze::{filter_active_prs, filter_snoozed_prs, SnoozeState};
7use anyhow::Result;
8use futures::stream::{FuturesUnordered, StreamExt};
9use std::collections::{HashMap, HashSet};
10use std::fmt;
11
12#[derive(Debug)]
16pub struct AuthError {
17 pub message: String,
18}
19
20impl fmt::Display for AuthError {
21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 write!(f, "{}", self.message)
23 }
24}
25
26impl std::error::Error for AuthError {}
27
28pub async fn fetch_and_score_prs(
34 client: &octocrab::Octocrab,
35 config: &Config,
36 snooze_state: &SnoozeState,
37 cache_config: &CacheConfig,
38 verbose: bool,
39 auth_username: Option<&str>,
40) -> Result<(
41 Vec<(PullRequest, ScoreResult)>,
42 Vec<(PullRequest, ScoreResult)>,
43 Option<u64>,
44)> {
45 if verbose {
46 let cache_status = if cache_config.enabled {
47 "enabled"
48 } else {
49 "disabled (--no-cache)"
50 };
51 buffered_eprintln!("Cache: {}", cache_status);
52 }
53
54 let global_scoring = config.scoring.clone().unwrap_or_default();
56
57 let mut all_prs = Vec::new();
59 let mut any_succeeded = false;
60
61 let mut futures = FuturesUnordered::new();
62 let auth_username_owned = auth_username.map(|s| s.to_string());
63 for (query_index, query_config) in config.queries.iter().enumerate() {
64 let client = client.clone();
65 let query = query_config.query.clone();
66 let query_name = query_config.name.clone();
67 let auth_username_clone = auth_username_owned.clone();
68 let merged_scoring = merge_scoring_configs(&global_scoring, query_config.scoring.as_ref());
70 let exclude_patterns = merged_scoring.size.and_then(|s| s.exclude);
71 futures.push(async move {
72 let result = crate::github::search_and_enrich_prs(
73 &client,
74 &query,
75 auth_username_clone.as_deref(),
76 exclude_patterns,
77 )
78 .await;
79 (query_name, query, query_index, result)
80 });
81 }
82
83 while let Some((name, query, query_index, result)) = futures.next().await {
84 match result {
85 Ok(prs) => {
86 if verbose {
87 buffered_eprintln!(
88 " Found {} PRs for {}",
89 prs.len(),
90 name.as_deref().unwrap_or(&query)
91 );
92 }
93 all_prs.extend(prs.into_iter().map(|pr| (pr, query_index)));
95 any_succeeded = true;
96 }
97 Err(e) => {
98 if e.downcast_ref::<AuthError>().is_some() {
100 return Err(e);
101 }
102 buffered_eprintln!(
103 "Query failed: {} - {}",
104 name.as_deref().unwrap_or(&query),
105 e
106 );
107 }
108 }
109 }
110
111 if !any_succeeded && !config.queries.is_empty() {
113 anyhow::bail!("All queries failed. Check your network connection and GitHub token.");
114 }
115
116 let mut seen_urls = HashSet::new();
119 let mut pr_to_query_index = HashMap::new();
120 let unique_prs: Vec<_> = all_prs
121 .into_iter()
122 .filter_map(|(pr, query_idx)| {
123 if seen_urls.insert(pr.url.clone()) {
124 pr_to_query_index.insert(pr.url.clone(), query_idx);
125 Some(pr)
126 } else {
127 None
128 }
129 })
130 .collect();
131
132 if verbose {
133 buffered_eprintln!("After deduplication: {} unique PRs", unique_prs.len());
134 }
135
136 let active_prs = filter_active_prs(unique_prs.clone(), snooze_state);
138 let snoozed_prs = filter_snoozed_prs(unique_prs, snooze_state);
139
140 if verbose {
141 buffered_eprintln!(
142 "After filter: {} active, {} snoozed",
143 active_prs.len(),
144 snoozed_prs.len()
145 );
146 }
147
148 let mut active_scored: Vec<_> = active_prs
150 .into_iter()
151 .map(|pr| {
152 let query_idx = pr_to_query_index.get(&pr.url).copied().unwrap_or(0);
154 let scoring =
155 merge_scoring_configs(&global_scoring, config.queries[query_idx].scoring.as_ref());
156 let result = calculate_score(&pr, &scoring);
157 (pr, result)
158 })
159 .collect();
160
161 let mut snoozed_scored: Vec<_> = snoozed_prs
163 .into_iter()
164 .map(|pr| {
165 let query_idx = pr_to_query_index.get(&pr.url).copied().unwrap_or(0);
167 let scoring =
168 merge_scoring_configs(&global_scoring, config.queries[query_idx].scoring.as_ref());
169 let result = calculate_score(&pr, &scoring);
170 (pr, result)
171 })
172 .collect();
173
174 let sort_fn = |a: &(PullRequest, ScoreResult), b: &(PullRequest, ScoreResult)| {
176 let score_cmp =
178 b.1.score
179 .partial_cmp(&a.1.score)
180 .unwrap_or(std::cmp::Ordering::Equal);
181 if score_cmp != std::cmp::Ordering::Equal {
182 return score_cmp;
183 }
184 a.0.created_at.cmp(&b.0.created_at)
186 };
187
188 active_scored.sort_by(sort_fn);
189 snoozed_scored.sort_by(sort_fn);
190
191 let rate_limit_remaining = match client.ratelimit().get().await {
193 Ok(rate_limit) => Some(rate_limit.resources.core.remaining as u64),
194 Err(_) => None,
195 };
196
197 Ok((active_scored, snoozed_scored, rate_limit_remaining))
198}