use chrono::{DateTime, NaiveDate, TimeZone, Utc};
use tracing::{info, warn};
use crate::collect::azdo::AzureDevOpsClient;
use crate::collect::bitbucket::BitbucketClient;
use crate::collect::errors::Result;
use crate::collect::git::GitCollector;
use crate::collect::github::GitHubClient;
use crate::collect::identity::IdentityResolver;
use crate::collect::linear::LinearClient;
use crate::collect::pr_provider::PrProvider;
use crate::collect::weeks::{clamp_week_to_range, weeks_in_range};
use crate::core::config::Config;
use crate::core::db::{self, Database};
use crate::core::models::PullRequest;
#[derive(Debug, Clone, Default)]
pub struct CollectionStats {
pub commits_collected: usize,
pub authors_resolved: usize,
pub prs_fetched: usize,
pub linear_issues_fetched: usize,
pub weeks_collected: usize,
pub weeks_skipped: usize,
pub errors: Vec<String>,
pub reachability_rows: usize,
}
pub struct CollectionPipeline {
config: Config,
force: bool,
no_fetch: bool,
force_refresh_prs: bool,
skip_tag_reachability: bool,
}
impl CollectionPipeline {
pub fn new(config: Config) -> Self {
Self {
config,
force: false,
no_fetch: false,
force_refresh_prs: false,
skip_tag_reachability: false,
}
}
pub fn with_force(mut self, force: bool) -> Self {
self.force = force;
self
}
pub fn with_no_fetch(mut self, no_fetch: bool) -> Self {
self.no_fetch = no_fetch;
self
}
pub fn with_skip_tag_reachability(mut self, skip: bool) -> Self {
self.skip_tag_reachability = skip;
self
}
pub fn with_force_refresh_prs(mut self, force_refresh_prs: bool) -> Self {
self.force_refresh_prs = force_refresh_prs;
self
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn run(&self, db: &mut Database) -> Result<CollectionStats> {
let mut stats = CollectionStats::default();
let resolver = IdentityResolver::from_config(&self.config);
for repo_cfg in &self.config.repositories {
let collector = match GitCollector::new(repo_cfg) {
Ok(c) => c.no_fetch(self.no_fetch),
Err(e) => {
let msg = format!("failed to open repo {}: {e}", repo_cfg.path.display());
warn!("{msg}");
stats.errors.push(msg);
continue;
}
};
self.collect_repo_by_week(db, &collector, &mut stats);
}
if !self.skip_tag_reachability {
self.run_reachability_scan(db, &mut stats);
} else {
info!("skipping tag/release-branch reachability scan (--skip-tag-reachability)");
}
stats.authors_resolved = self.upsert_observed_authors(db, &resolver)?;
if let Ok(unresolved) = count_unresolved_commits(db) {
if unresolved > 0 {
let msg = format!(
"WARNING: {unresolved} commits have unresolved author identities and may \
inflate developer counts. Run `tga aliases list` to review, or extend \
`developer_aliases` in the config to map missing identities."
);
warn!("{msg}");
eprintln!("{msg}");
}
}
self.fetch_and_store_prs(db, &mut stats).await;
if let Some(azdo_cfg) = self.config.azure_devops_config() {
let client = AzureDevOpsClient::new(azdo_cfg.clone());
match client.test_connection().await {
Ok(info) => info!(
user = info.user_name.as_deref().unwrap_or("?"),
org = %info.organization_url,
"Azure DevOps connection verified",
),
Err(e) => {
warn!("Azure DevOps connection failed (non-fatal): {e}");
}
}
if azdo_cfg.fetch_on_reference {
if let Err(e) = self
.fetch_and_persist_azdo_work_items(db, &client, azdo_cfg)
.await
{
stats
.errors
.push(format!("ADO work item persistence failed: {e}"));
}
}
if azdo_cfg.fetch_prs {
match self.fetch_and_persist_azdo_prs(db, azdo_cfg).await {
Ok(n) => {
info!(prs = n, "stored ADO pull requests");
stats.prs_fetched += n;
}
Err(e) => {
stats.errors.push(format!("ADO PR fetch failed: {e}"));
}
}
}
}
if let Some(linear_cfg) = &self.config.linear {
if linear_cfg.fetch_on_reference {
match LinearClient::new(linear_cfg) {
Ok(client) => {
let messages: Vec<String> = {
let conn = db.connection();
let mut stmt = match conn.prepare("SELECT message FROM commits") {
Ok(s) => s,
Err(e) => {
stats
.errors
.push(format!("Linear: query commits failed: {e}"));
return Ok(stats);
}
};
let rows = match stmt.query_map([], |row| row.get::<_, String>(0)) {
Ok(r) => r,
Err(e) => {
stats
.errors
.push(format!("Linear: read commits failed: {e}"));
return Ok(stats);
}
};
let mut out = Vec::new();
for r in rows.flatten() {
out.push(r);
}
out
};
let msg_refs: Vec<&str> = messages.iter().map(String::as_str).collect();
let issues = client
.fetch_referenced_issues(&msg_refs, &linear_cfg.team_keys)
.await;
for issue in &issues {
info!(
id = %issue.identifier,
state = %issue.state,
team = %issue.team,
"Linear issue fetched"
);
}
match client.store_issues(db, &issues) {
Ok(n) => {
info!(stored = n, "persisted linear_issues rows");
stats.linear_issues_fetched += n;
}
Err(e) => {
stats
.errors
.push(format!("Linear: store issues failed: {e}"));
}
}
}
Err(e) => {
stats.errors.push(format!("Linear client init failed: {e}"));
}
}
}
}
Ok(stats)
}
fn run_reachability_scan(&self, db: &mut Database, stats: &mut CollectionStats) {
use crate::collect::git::reachability::scan_and_persist;
use crate::core::config::expand_path;
let cfg = &self.config.reachability;
if !cfg.track_tags && !cfg.track_release_branches {
info!("reachability tracking disabled by config (track_tags=false, track_release_branches=false)");
return;
}
let conn = db.connection();
for repo_cfg in &self.config.repositories {
let path = expand_path(&repo_cfg.path);
let name = repo_cfg
.name
.clone()
.or_else(|| {
path.file_name()
.and_then(|s| s.to_str())
.map(|s| s.to_string())
})
.unwrap_or_else(|| path.display().to_string());
info!(repo = %name, "running reachability scan");
match scan_and_persist(&path, conn, cfg) {
Ok(r) => {
info!(
repo = %name,
rows = r.rows_upserted,
tagged = r.tagged_commits,
release_branch = r.release_branch_commits,
"reachability scan complete"
);
stats.reachability_rows += r.rows_upserted;
}
Err(e) => {
let msg = format!("reachability scan failed for {name}: {e}");
warn!("{msg}");
stats.errors.push(msg);
}
}
}
}
fn build_pr_providers(
&self,
stats: &mut CollectionStats,
) -> Vec<Box<dyn PrProvider + Send + Sync>> {
let mut providers: Vec<Box<dyn PrProvider + Send + Sync>> = Vec::new();
if let Some(gh_cfg) = &self.config.github {
if gh_cfg.fetch_prs {
let repos = crate::collect::github::client::resolve_github_repos(
gh_cfg,
&self.config.repositories,
);
if repos.is_empty() {
info!(
"GitHub PR fetch skipped: no github.repo, no per-repo org, \
and no github.org resolvable from repositories[]"
);
} else if gh_cfg.token.is_none() && std::env::var("GITHUB_TOKEN").ok().is_none() {
let msg = "GitHub PR fetch is enabled (github.fetch_prs=true) but \
no token is configured. Set `github.token` or the \
GITHUB_TOKEN env var to a PAT with `repo` scope (public \
repos only need `public_repo`); without it, GitHub \
rate-limits to 60 requests/hour and most PRs will be \
missed.";
warn!("{msg}");
eprintln!("warning: {msg}");
info!(
repo_count = repos.len(),
"GitHub PR fetcher will scan {} repo(s) anonymously",
repos.len()
);
match GitHubClient::new_for_prs(gh_cfg, repos) {
Ok(gh) => providers.push(Box::new(gh)),
Err(e) => stats.errors.push(format!("GitHub client init failed: {e}")),
}
} else {
info!(
repo_count = repos.len(),
"GitHub PR fetcher will scan {} repo(s)",
repos.len()
);
match GitHubClient::new_for_prs(gh_cfg, repos) {
Ok(gh) => providers.push(Box::new(gh)),
Err(e) => stats.errors.push(format!("GitHub client init failed: {e}")),
}
}
} else {
info!(
"GitHub PR fetch disabled (github.fetch_prs=false). Set \
`github.fetch_prs: true` in your config to populate the \
pull_requests table."
);
}
} else if has_github_like_repos(&self.config.repositories) {
let msg = "Repositories look like GitHub clones, but no `github:` config \
block is present. To populate the `pull_requests` table, add:\n\
\n\
github:\n \
token: \"${GITHUB_TOKEN}\" # PAT with `repo` scope\n \
fetch_prs: true\n \
repo: \"owner/name\" # OR `org: \"owner\"` for org-wide\n";
tracing::info!("{msg}");
}
if let Some(bb_cfg) = &self.config.bitbucket {
if bb_cfg.fetch_prs {
match BitbucketClient::new(bb_cfg) {
Ok(bb) => providers.push(Box::new(bb)),
Err(e) => stats
.errors
.push(format!("Bitbucket client init failed: {e}")),
}
}
}
providers
}
async fn fetch_and_store_prs(&self, db: &mut Database, stats: &mut CollectionStats) {
let providers = self.build_pr_providers(stats);
if providers.is_empty() {
return;
}
let mut set: tokio::task::JoinSet<(String, Result<Vec<PullRequest>>)> =
tokio::task::JoinSet::new();
let providers: Vec<std::sync::Arc<dyn PrProvider + Send + Sync>> =
providers.into_iter().map(std::sync::Arc::from).collect();
for p in &providers {
let p = std::sync::Arc::clone(p);
let name = p.name().to_string();
set.spawn(async move {
let result = p.fetch_pull_requests().await;
(name, result)
});
}
while let Some(joined) = set.join_next().await {
let (provider_name, fetch_result) = match joined {
Ok(t) => t,
Err(e) => {
stats.errors.push(format!("PR fetch task panicked: {e}"));
continue;
}
};
match fetch_result {
Ok(prs) => {
let Some(provider) = providers.iter().find(|p| p.name() == provider_name)
else {
stats.errors.push(format!(
"internal: no provider registered for '{provider_name}' \
when storing PRs"
));
continue;
};
match provider.store_pull_requests(db, &prs) {
Ok(n) => {
info!(provider = %provider_name, prs = n, "stored pull requests");
stats.prs_fetched += n;
}
Err(e) => {
stats
.errors
.push(format!("{provider_name} PR store failed: {e}"));
}
}
}
Err(e) => {
stats
.errors
.push(format!("{provider_name} PR fetch failed: {e}"));
}
}
}
}
fn collect_repo_by_week(
&self,
db: &mut Database,
collector: &GitCollector,
stats: &mut CollectionStats,
) {
let repo_name = collector.name().to_string();
let (from, to) = match (collector.since(), collector.until()) {
(Some(s), Some(u)) => (s.date_naive(), u.date_naive()),
(Some(s), None) => (s.date_naive(), Utc::now().date_naive()),
(None, Some(u)) => {
warn!(
repo = %repo_name,
"until_date set without since_date — collecting full git history. \
Use --weeks N or set analysis.since_date in config to limit scope."
);
eprintln!(
"warning: [{repo_name}] no since_date / --weeks — collecting FULL git history. \
Set analysis.since_date or pass --weeks N to limit scope."
);
match collector.collect_window(db, None, Some(u)) {
Ok(n) => {
info!(repo = %repo_name, commits = n, "extracted (until-only)");
stats.commits_collected += n;
}
Err(e) => {
let msg = format!("collection failed for {repo_name}: {e}");
warn!("{msg}");
stats.errors.push(msg);
}
}
return;
}
(None, None) => {
warn!(
repo = %repo_name,
"no since_date or --weeks flag set — collecting full git history. \
Use --weeks N or set analysis.since_date in config to limit scope."
);
eprintln!(
"warning: [{repo_name}] no since_date / --weeks — collecting FULL git history. \
Set analysis.since_date or pass --weeks N to limit scope."
);
match collector.collect(db) {
Ok(n) => {
info!(repo = %repo_name, commits = n, "extracted (unbounded)");
stats.commits_collected += n;
}
Err(e) => {
let msg = format!("collection failed for {repo_name}: {e}");
warn!("{msg}");
stats.errors.push(msg);
}
}
return;
}
};
for week in weeks_in_range(from, to) {
let (year, week_no, _, _) = week;
if !self.force {
match db::is_week_collected(db, &repo_name, year, week_no) {
Ok(true) => {
info!("Skipping {repo_name} W{week_no} {year} — already collected");
println!(
"Skipped W{week_no:02} {year}: already collected \
(use --force to re-collect) [{repo_name}]"
);
stats.weeks_skipped += 1;
continue;
}
Ok(false) => {}
Err(e) => {
let msg = format!(
"collection_runs lookup failed for {repo_name} W{week_no} {year}: {e}"
);
warn!("{msg}");
stats.errors.push(msg);
continue;
}
}
}
let (win_start, win_end) = clamp_week_to_range(week, from, to);
let since_ts = naive_date_start_utc(win_start);
let until_ts = naive_date_end_utc(win_end);
match collector.collect_window(db, Some(since_ts), Some(until_ts)) {
Ok(n) => {
info!(
repo = %repo_name,
year,
week = week_no,
commits = n,
"extracted week"
);
println!("Collected W{week_no:02} {year}: {n} commits [{repo_name}]");
stats.commits_collected += n;
stats.weeks_collected += 1;
let repo_count = self.config.repositories.len();
if let Err(e) =
db::record_collection_run(db, &repo_name, year, week_no, n, repo_count)
{
let msg = format!(
"failed to record collection_run for {repo_name} W{week_no} {year}: {e}"
);
warn!("{msg}");
stats.errors.push(msg);
}
}
Err(e) => {
let msg = format!("collection failed for {repo_name} W{week_no} {year}: {e}");
warn!("{msg}");
stats.errors.push(msg);
}
}
}
}
fn upsert_observed_authors(
&self,
db: &mut Database,
resolver: &IdentityResolver,
) -> Result<usize> {
let pairs: Vec<(String, String)> = {
let conn = db.connection();
let mut stmt = conn.prepare(
"SELECT DISTINCT author_name, author_email FROM commits WHERE author_id IS NULL",
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
out
};
let mut count = 0usize;
for (name, email) in pairs {
let author_id = resolver.upsert_author(db, &name, &email)?;
db.connection().execute(
"UPDATE commits SET author_id = ?1 \
WHERE author_id IS NULL AND author_name = ?2 AND author_email = ?3",
rusqlite::params![author_id, name, email],
)?;
count += 1;
}
Ok(count)
}
async fn fetch_and_persist_azdo_work_items(
&self,
db: &mut Database,
client: &AzureDevOpsClient,
azdo_cfg: &crate::core::config::AzureDevOpsConfig,
) -> Result<()> {
use crate::collect::azdo::extract_work_item_refs;
use std::collections::{BTreeSet, HashMap};
let ticket_re = regex::Regex::new(&azdo_cfg.ticket_regex).map_err(|e| {
crate::collect::CollectError::Config(format!(
"pm.azure_devops.ticket_regex {:?} failed to compile: {e}",
azdo_cfg.ticket_regex
))
})?;
let rows: Vec<(String, String)> = {
let conn = db.connection();
let mut stmt = conn.prepare("SELECT sha, message FROM commits")?;
let mapped = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut out = Vec::new();
for r in mapped {
out.push(r?);
}
out
};
let mut commit_refs: HashMap<String, Vec<u32>> = HashMap::new();
let mut all_ids: BTreeSet<u32> = BTreeSet::new();
for (sha, msg) in &rows {
let ids = extract_work_item_refs(&ticket_re, msg);
if !ids.is_empty() {
for id in &ids {
all_ids.insert(*id);
}
commit_refs.insert(sha.clone(), ids);
}
}
if all_ids.is_empty() {
info!(
pattern = %azdo_cfg.ticket_regex,
"No work-item references found in commit messages; skipping ADO work item fetch",
);
return Ok(());
}
let ids: Vec<u32> = all_ids.iter().copied().collect();
let items = match client.get_work_items(&ids).await {
Ok(v) => v,
Err(e) => {
warn!("ADO get_work_items failed: {e}");
return Ok(());
}
};
info!(
fetched = items.len(),
commits = commit_refs.len(),
"Fetched ADO work items for commits",
);
let tx = db.connection_mut().transaction()?;
let fetched_ids: std::collections::HashSet<u32> = items.iter().map(|w| w.id).collect();
for w in &items {
let raw_json = serde_json::to_string(w).ok();
let tags_csv = if w.tags.is_empty() {
None
} else {
Some(w.tags.join(","))
};
let row = crate::core::db::WorkItemRow {
id: w.id.to_string(),
source: "azdo".to_string(),
title: w.title.clone(),
status: w.state.clone(),
item_type: w.work_item_type.clone(),
tags: tags_csv,
project: Some(w.team_project.clone()),
url: w.url.clone(),
raw_json,
};
crate::core::db::work_items::upsert_work_item(&tx, &row)?;
}
for (sha, ref_ids) in &commit_refs {
for id in ref_ids {
if !fetched_ids.contains(id) {
continue;
}
crate::core::db::work_items::link_commit_work_item(
&tx,
sha,
&id.to_string(),
"azdo",
)?;
}
}
tx.commit()?;
Ok(())
}
async fn fetch_and_persist_azdo_prs(
&self,
db: &mut Database,
azdo_cfg: &crate::core::config::AzureDevOpsConfig,
) -> Result<usize> {
use crate::collect::azdo::AdoPrFetcher;
let messages: Vec<String> = {
let conn = db.connection();
let mut stmt = conn.prepare("SELECT message FROM commits")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
out
};
let fetcher = match AdoPrFetcher::new(azdo_cfg.clone()) {
Ok(f) => f,
Err(e) => {
warn!("ADO PR fetcher init failed: {e}");
return Ok(0);
}
};
let conn = db.connection();
let stored = fetcher
.run_with_options(
conn,
messages.iter().map(String::as_str),
self.force_refresh_prs,
)
.await?;
Ok(stored)
}
}
fn has_github_like_repos(repositories: &[crate::core::config::RepositoryConfig]) -> bool {
for repo_cfg in repositories {
let Ok(repo) = git2::Repository::open(&repo_cfg.path) else {
continue;
};
let Ok(remote) = repo.find_remote("origin") else {
continue;
};
let Some(url) = remote.url() else {
continue;
};
if url.contains("github.com") {
return true;
}
}
false
}
fn naive_date_start_utc(d: NaiveDate) -> DateTime<Utc> {
let ndt = d
.and_hms_opt(0, 0, 0)
.expect("00:00:00 is always a valid time");
Utc.from_utc_datetime(&ndt)
}
fn count_unresolved_commits(db: &Database) -> Result<usize> {
let n: i64 = db
.connection()
.query_row(
"SELECT COUNT(*) FROM commits WHERE author_id IS NULL",
[],
|r| r.get(0),
)
.map_err(crate::core::TgaError::from)?;
Ok(n as usize)
}
fn naive_date_end_utc(d: NaiveDate) -> DateTime<Utc> {
let ndt = d
.and_hms_opt(23, 59, 59)
.expect("23:59:59 is always a valid time");
Utc.from_utc_datetime(&ndt)
}