use futures::StreamExt as _;
use tracing::{info, warn};
use crate::collect::github::client::{build_http_client, GitHubReview};
use crate::collect::github::org_discovery::{discover_org_repos, effective_orgs};
use crate::collect::github::reviewer_store::upsert_github_pr_reviewer;
use crate::collect::github::GitHubClient;
use crate::core::config::GithubConfig;
use crate::core::db::Database;
use crate::collect::collector::CollectionStats;
pub(super) async fn run_github_org_discovery(gh_cfg: &GithubConfig) -> Vec<(String, String)> {
let orgs = effective_orgs(gh_cfg.org.as_deref(), &gh_cfg.orgs);
if orgs.is_empty() {
return Vec::new();
}
let http = match build_http_client(gh_cfg) {
Ok(c) => c,
Err(e) => {
warn!("GitHub org-discovery: could not build HTTP client: {e}");
return Vec::new();
}
};
let mut all: Vec<(String, String)> = Vec::new();
let mut seen = std::collections::HashSet::new();
for org in &orgs {
info!(org = %org, "discovering repositories for GitHub org");
match discover_org_repos(&http, org).await {
Ok(repos) => {
info!(org = %org, count = repos.len(), "org discovery complete");
for p in repos {
if seen.insert(p.clone()) {
all.push(p);
}
}
}
Err(e) => {
warn!(
org = %org,
error = %e,
"org discovery failed; continuing with other orgs"
);
}
}
}
all
}
pub(super) async fn fetch_and_store_github_reviewers(
db: &mut Database,
gh_cfg: &GithubConfig,
force_refresh_prs: bool,
stats: &mut CollectionStats,
) {
let prs: Vec<(i64, String, u64)> = {
let conn = db.connection();
let query = if force_refresh_prs {
"SELECT id, repository, pr_number FROM pull_requests \
WHERE provider = 'github' ORDER BY id"
} else {
"SELECT p.id, p.repository, p.pr_number \
FROM pull_requests p \
WHERE p.provider = 'github' \
AND NOT EXISTS ( \
SELECT 1 FROM pr_reviewers r \
WHERE r.pr_id = p.id AND r.provider = 'github' \
) \
ORDER BY p.id"
};
let mut stmt = match conn.prepare(query) {
Ok(s) => s,
Err(e) => {
stats
.errors
.push(format!("GitHub reviewer query prepare failed: {e}"));
return;
}
};
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)? as u64,
))
});
match rows {
Ok(iter) => iter.filter_map(|r| r.ok()).collect(),
Err(e) => {
stats
.errors
.push(format!("GitHub reviewer query failed: {e}"));
return;
}
}
};
if prs.is_empty() {
return;
}
info!(count = prs.len(), "fetching GitHub PR reviews");
let gh_client = match GitHubClient::new_for_reviews(gh_cfg) {
Ok(c) => c,
Err(e) => {
stats
.errors
.push(format!("GitHub reviewer client init failed: {e}"));
return;
}
};
let concurrency = (gh_cfg.review_fetch_concurrency as usize).max(1);
type FetchResult = (i64, String, u64, Result<Vec<GitHubReview>, String>);
let fetched: Vec<FetchResult> = futures::stream::iter(prs.iter().cloned())
.map(|(pr_db_id, repository, pr_number)| {
let repo_clone = repository.clone();
let client_ref = &gh_client;
async move {
let result = match repo_clone.split_once('/') {
Some((o, r)) if !o.is_empty() && !r.is_empty() => client_ref
.fetch_pr_reviews_for_repo(o, r, pr_number)
.await
.map_err(|e| e.to_string()),
_ => Err(format!(
"malformed repository slug '{repo_clone}'; skipping reviewer fetch"
)),
};
(pr_db_id, repo_clone, pr_number, result)
}
})
.buffer_unordered(concurrency)
.collect()
.await;
for (pr_db_id, repository, pr_number, result) in fetched {
match result {
Ok(reviews) => {
let conn = db.connection();
for review in &reviews {
match upsert_github_pr_reviewer(conn, pr_db_id, review) {
Ok(()) => stats.reviewers_fetched += 1,
Err(e) => {
stats.errors.push(format!(
"reviewer upsert failed for {repository}#{pr_number}: {e}"
));
}
}
}
}
Err(msg) => {
warn!(
repository = %repository,
pr_number,
"GitHub reviewer fetch failed for PR: {msg}; continuing"
);
}
}
}
if stats.reviewers_fetched > 0 {
info!(
count = stats.reviewers_fetched,
"stored GitHub PR reviewers"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collect::github::client::{GhUser, GitHubReview};
use crate::core::config::GithubConfig;
use crate::core::db::Database;
use rusqlite::params;
fn open_db() -> Database {
Database::open_in_memory().expect("open db")
}
fn seed_pr(conn: &rusqlite::Connection, repository: &str, pr_number: i64) -> i64 {
conn.execute(
"INSERT INTO pull_requests \
(provider, repository, pr_number, title, author, state, created_at, commit_shas) \
VALUES ('github', ?1, ?2, 'T', 'u', 'open', '2024-01-01T00:00:00Z', '[]')",
params![repository, pr_number],
)
.expect("seed pr");
conn.last_insert_rowid()
}
fn make_review(login: &str, state: &str) -> GitHubReview {
GitHubReview {
id: 0,
state: state.to_string(),
user: Some(GhUser {
login: login.to_string(),
}),
submitted_at: None,
}
}
fn make_gh_cfg(concurrency: u32) -> GithubConfig {
GithubConfig {
token: None,
org: None,
orgs: vec![],
repo: None,
fetch_prs: true,
fetch_pr_reviews: true,
review_fetch_concurrency: concurrency,
ticket_regex: None,
}
}
#[tokio::test]
async fn fetch_reviewers_concurrency_upserts_all() {
let db = open_db();
let pr_ids = {
let conn = db.connection();
vec![
seed_pr(conn, "acme/alpha", 1),
seed_pr(conn, "acme/beta", 2),
seed_pr(conn, "acme/gamma", 3),
]
};
{
let conn = db.connection();
upsert_github_pr_reviewer(conn, pr_ids[0], &make_review("alice", "APPROVED"))
.expect("upsert alice");
upsert_github_pr_reviewer(conn, pr_ids[1], &make_review("bob", "CHANGES_REQUESTED"))
.expect("upsert bob");
upsert_github_pr_reviewer(conn, pr_ids[2], &make_review("carol", "COMMENTED"))
.expect("upsert carol");
}
let count: i64 = {
let conn = db.connection();
conn.query_row(
"SELECT COUNT(*) FROM pr_reviewers WHERE provider = 'github'",
[],
|r| r.get(0),
)
.expect("count")
};
assert_eq!(
count, 3,
"all three reviewer rows must be present after concurrent ingestion"
);
}
#[test]
fn review_fetch_concurrency_clamped_to_minimum_one() {
let cfg = make_gh_cfg(0);
let concurrency = (cfg.review_fetch_concurrency as usize).max(1);
assert_eq!(concurrency, 1, "0 must clamp to 1 (serial)");
let cfg2 = make_gh_cfg(5);
let concurrency2 = (cfg2.review_fetch_concurrency as usize).max(1);
assert_eq!(concurrency2, 5);
}
}