use std::collections::HashMap;
use chrono::{DateTime, Utc};
use regex::Regex;
use tracing::{debug, warn};
use crate::collect::ai_attribution::AgenticMode;
use crate::core::config::Config;
use crate::core::db::Database;
use crate::report::errors::{ReportError, Result};
use crate::report::models::{ActivityWeights, ReportData, VelocitySummary};
pub struct Aggregator;
pub(super) struct CommitRow {
pub(super) sha: String,
pub(super) author_name: String,
pub(super) author_email: String,
pub(super) timestamp: DateTime<Utc>,
pub(super) repository: String,
pub(super) insertions: i64,
pub(super) deletions: i64,
pub(super) files_changed: i64,
pub(super) category: Option<String>,
pub(super) message: String,
pub(super) ticketed: bool,
pub(super) is_ai_assisted: bool,
pub(super) complexity: Option<i64>,
pub(super) agentic_mode: AgenticMode,
}
pub(super) struct PrRow {
pub(super) author: String,
pub(super) state: String,
pub(super) created_at: DateTime<Utc>,
pub(super) merged_at: Option<DateTime<Utc>>,
}
pub(super) const DEFAULT_BOILERPLATE_PATTERNS: &[&str] = &[
r"^[Mm]erge branch",
r"^[Mm]erge pull request",
r"^[Bb]ump version",
r"^[Uu]pdate package-lock",
r"^[Uu]pdate yarn\.lock",
r"[Gg]enerated by",
r"[Aa]uto-generated",
];
pub(super) const BOILERPLATE_LINES_THRESHOLD: i64 = 500;
pub(super) fn is_boilerplate(message: &str, lines_changed: i64, patterns: &[Regex]) -> bool {
let first_line = message.lines().next().unwrap_or(message);
if lines_changed > BOILERPLATE_LINES_THRESHOLD {
if lines_changed > BOILERPLATE_LINES_THRESHOLD * 10 {
return true;
}
}
patterns.iter().any(|p| p.is_match(first_line))
}
pub(super) fn compile_patterns(patterns: &[&str]) -> Vec<Regex> {
patterns
.iter()
.filter_map(|p| match Regex::new(p) {
Ok(r) => Some(r),
Err(e) => {
warn!(pattern = %p, error = %e, "skipping invalid regex pattern");
None
}
})
.collect()
}
impl Aggregator {
pub fn build(db: &Database, config: &Config) -> Result<ReportData> {
Self::build_filtered(db, config, None)
}
pub fn build_filtered(
db: &Database,
config: &Config,
author_email: Option<&str>,
) -> Result<ReportData> {
let canonical_email: Option<String> = if let Some(email) = author_email {
let resolved = Self::resolve_canonical_email(db, email)?;
Some(resolved)
} else {
None
};
let rows = Self::load_rows_filtered(db, canonical_email.as_deref())?;
let prs = Self::load_prs(db).unwrap_or_default();
let unresolved_db = if canonical_email.is_none() {
Self::count_unresolved_author_commits(db).unwrap_or(0)
} else {
0
};
let mut data = Self::aggregate(rows, prs);
data.repository_coverage = data.repositories.len();
let alias_set = configured_alias_emails(config);
let unresolved_authors = if alias_set.is_empty() {
0
} else {
data.authors
.iter()
.filter(|a| !alias_set.contains(&a.email.to_lowercase()))
.count()
};
data.unresolved_authors = unresolved_authors;
data.unresolved_author_commits = unresolved_db;
check_weekly_coverage_drift(db, &data.weekly_metrics);
match Self::persist_weekly_quality(db, &data) {
Ok(n) => {
tracing::debug!(
rows = n,
"persisted weekly quality rows to fact_weekly_quality"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"WARNING: could not persist to fact_weekly_quality; \
run `tga backfill quality` to retry. Report generation continues."
);
}
}
match Self::persist_weekly_engineer(db, &data) {
Ok(n) => {
tracing::debug!(
rows = n,
"persisted weekly engineer rows to fact_weekly_engineer"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"WARNING: could not persist to fact_weekly_engineer; \
report generation continues."
);
}
}
if unresolved_db > 0 {
tracing::warn!(
count = unresolved_db,
"WARNING: {unresolved_db} commits have unresolved author identities and may \
inflate developer counts. Run `tga aliases list` to review, or extend \
`developer_aliases` in the config to map missing identities."
);
}
Ok(data)
}
fn count_unresolved_author_commits(db: &Database) -> Result<usize> {
let conn = db.connection();
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM commits WHERE author_id IS NULL",
[],
|r| r.get(0),
)
.map_err(crate::core::TgaError::from)?;
Ok(n as usize)
}
fn load_prs(db: &Database) -> Result<Vec<PrRow>> {
let conn = db.connection();
let mut stmt = conn
.prepare("SELECT created_at, merged_at, author, state FROM pull_requests")
.map_err(crate::core::TgaError::from)?;
let rows = stmt
.query_map([], |row| {
let created: String = row.get(0)?;
let merged: Option<String> = row.get(1)?;
let author: String = row.get(2)?;
let state: String = row.get(3)?;
Ok((created, merged, author, state))
})
.map_err(crate::core::TgaError::from)?;
let mut out = Vec::new();
for r in rows {
let (created_s, merged_s, author, state) = r.map_err(crate::core::TgaError::from)?;
let created_at = match DateTime::parse_from_rfc3339(&created_s) {
Ok(dt) => dt.with_timezone(&Utc),
Err(_) => continue,
};
let merged_at = merged_s
.as_deref()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc));
out.push(PrRow {
author,
state,
created_at,
merged_at,
});
}
Ok(out)
}
fn resolve_canonical_email(db: &Database, email: &str) -> Result<String> {
let conn = db.connection();
let lower = email.to_lowercase();
let result: rusqlite::Result<String> = conn.query_row(
"SELECT canonical_email FROM authors WHERE LOWER(canonical_email) = LOWER(?1) LIMIT 1",
rusqlite::params![lower],
|row| row.get(0),
);
match result {
Ok(stored) => Ok(stored),
Err(rusqlite::Error::QueryReturnedNoRows) => Err(ReportError::Report(format!(
"no canonical identity with canonical_email '{email}' found in authors table.\n\
Run `tga aliases list` to see all canonical identities, or \
`tga aliases merge` to consolidate duplicate identities."
))),
Err(e) => Err(ReportError::Core(crate::core::TgaError::from(e))),
}
}
fn load_rows_filtered(db: &Database, author_email: Option<&str>) -> Result<Vec<CommitRow>> {
let conn = db.connection();
let sql_base = "SELECT c.sha, \
COALESCE(a.canonical_name, c.author_name) AS author_name, \
COALESCE(NULLIF(a.canonical_email, ''), c.author_email) AS author_email, \
c.timestamp, c.repository, \
c.insertions, c.deletions, c.files_changed, cl.category, \
c.message, c.ticketed, c.is_ai_assisted, cl.complexity, \
COALESCE(c.agentic_mode, 'none') AS agentic_mode \
FROM commits c \
LEFT JOIN authors a ON a.id = c.author_id \
LEFT JOIN classifications cl ON cl.id = c.classification_id";
let row_mapper = |row: &rusqlite::Row<'_>| -> rusqlite::Result<CommitRow> {
let ts_str: String = row.get(3)?;
let timestamp = DateTime::parse_from_rfc3339(&ts_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let ticketed: i64 = row.get(10).unwrap_or(0);
let is_ai_assisted: i64 = row.get(11).unwrap_or(0);
let complexity: Option<i64> = row.get(12).unwrap_or(None);
let agentic_mode_str: String = row.get(13).unwrap_or_else(|_| "none".to_string());
let agentic_mode = agentic_mode_str
.parse::<AgenticMode>()
.unwrap_or(AgenticMode::None);
Ok(CommitRow {
sha: row.get(0)?,
author_name: row.get(1)?,
author_email: row.get(2)?,
timestamp,
repository: row.get(4)?,
insertions: row.get(5)?,
deletions: row.get(6)?,
files_changed: row.get(7)?,
category: row.get(8)?,
message: row.get(9)?,
ticketed: ticketed != 0,
is_ai_assisted: is_ai_assisted != 0,
complexity,
agentic_mode,
})
};
let mut out: Vec<CommitRow> = Vec::new();
if let Some(email) = author_email {
let sql = format!(
"{sql_base} \
WHERE LOWER(COALESCE(NULLIF(a.canonical_email, ''), c.author_email)) = LOWER(?1)"
);
let mut stmt = conn.prepare(&sql).map_err(crate::core::TgaError::from)?;
let rows = stmt
.query_map(rusqlite::params![email], row_mapper)
.map_err(crate::core::TgaError::from)?;
for r in rows {
out.push(r.map_err(crate::core::TgaError::from)?);
}
} else {
let mut stmt = conn
.prepare(sql_base)
.map_err(crate::core::TgaError::from)?;
let rows = stmt
.query_map([], row_mapper)
.map_err(crate::core::TgaError::from)?;
for r in rows {
out.push(r.map_err(crate::core::TgaError::from)?);
}
}
debug!(count = out.len(), "loaded commit rows for aggregation");
Ok(out)
}
fn aggregate(rows: Vec<CommitRow>, prs: Vec<PrRow>) -> ReportData {
let generated_at = Utc::now().to_rfc3339();
let mut data = ReportData::empty(generated_at);
if rows.is_empty() {
return data;
}
let row_flags = compute_row_flags(&rows);
let acc = accumulate_rows(&rows, &row_flags);
let author_summaries = materialize_authors(acc.authors);
let repo_summaries = materialize_repositories(acc.repos);
let email_to_name: HashMap<String, String> = author_summaries
.iter()
.map(|a| (a.email.clone(), a.name.clone()))
.collect();
let abandoned_by_week_identity = build_abandoned_pr_counts(&prs);
let weekly_activity =
materialize_weekly_activity(acc.weekly, &email_to_name, &abandoned_by_week_identity);
let total_commits = rows.len();
let total_authors = author_summaries.len();
let total_weeks = acc.week_totals.len();
let weekly_metrics = build_weekly_metrics(&acc.week_totals);
let weekly_categorization = build_weekly_categorization(&acc.week_totals);
let untracked_commits = build_untracked_commits(&rows, &email_to_name);
let velocity_inputs = compute_velocity_inputs(&prs);
let velocity = Some(VelocitySummary {
pr_cycle_time_avg_hours: velocity_inputs.cycle_time_avg,
pr_cycle_time_median_hours: velocity_inputs.cycle_time_median,
pr_throughput_per_week: velocity_inputs.pr_throughput_per_week,
revision_rate: 0.0,
pr_count: velocity_inputs.pr_count,
});
let weekly_velocity = build_weekly_velocity(
&acc.week_totals,
&velocity_inputs.pr_per_week,
velocity_inputs.cycle_time_avg,
);
let dora = Some(compute_dora(
&rows,
&row_flags,
&acc.category_total,
&prs,
velocity_inputs.cycle_time_avg,
total_weeks,
acc.revert_count,
));
let quality = Some(compute_quality(
total_commits,
&acc.category_total,
acc.revert_count,
));
let weights = ActivityWeights::default();
let developer_activity = compute_developer_activity(
&author_summaries,
&acc.dev_weeks,
&acc.dev_categories,
&weights,
);
let summary = Some(build_summary(
&rows,
total_commits,
total_authors,
total_weeks,
acc.min_ts,
acc.max_ts,
));
data.total_commits = total_commits;
data.total_authors = total_authors;
data.period_start = Some(acc.min_ts.to_rfc3339());
data.period_end = Some(acc.max_ts.to_rfc3339());
data.authors = author_summaries;
data.repositories = repo_summaries;
data.weekly_activity = weekly_activity;
data.category_breakdown = acc.category_total;
data.weekly_metrics = weekly_metrics;
data.developer_activity = developer_activity;
data.summary = summary;
data.untracked_commits = untracked_commits;
data.weekly_categorization = weekly_categorization;
data.weekly_velocity = weekly_velocity;
data.dora = dora;
data.velocity = velocity;
data.quality = quality;
data.boilerplate_count = acc.boilerplate_count;
data.revert_count = acc.revert_count;
let _ = acc.dev_ticketed;
data
}
pub fn persist_weekly_quality(db: &Database, data: &ReportData) -> Result<usize> {
crate::report::persist::persist_weekly_quality(db, data)
}
pub fn persist_weekly_engineer(db: &Database, data: &ReportData) -> Result<usize> {
crate::report::persist::persist_weekly_engineer(db, data)
}
}
mod accumulate;
mod metrics;
use accumulate::{
accumulate_rows, build_abandoned_pr_counts, build_untracked_commits,
build_weekly_categorization, build_weekly_metrics, compute_row_flags, materialize_authors,
materialize_repositories, materialize_weekly_activity,
};
use metrics::{
build_summary, build_weekly_velocity, check_weekly_coverage_drift, compute_developer_activity,
compute_dora, compute_quality, compute_velocity_inputs, configured_alias_emails,
};