use crate::core::config::Config;
use crate::core::db::Database;
use tracing::{info, warn};
use crate::collect::errors::Result;
use crate::collect::git::GitCollector;
use crate::collect::github::GitHubClient;
use crate::collect::identity::IdentityResolver;
#[derive(Debug, Clone, Default)]
pub struct CollectionStats {
pub commits_collected: usize,
pub authors_resolved: usize,
pub prs_fetched: usize,
pub errors: Vec<String>,
}
pub struct CollectionPipeline {
config: Config,
}
impl CollectionPipeline {
pub fn new(config: Config) -> Self {
Self { config }
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn run(&self, db: &mut Database) -> Result<CollectionStats> {
let mut stats = CollectionStats::default();
let resolver = IdentityResolver::from_config(&self.config);
for repo_cfg in &self.config.repositories {
let collector = match GitCollector::new(repo_cfg) {
Ok(c) => c,
Err(e) => {
let msg = format!("failed to open repo {}: {e}", repo_cfg.path.display());
warn!("{msg}");
stats.errors.push(msg);
continue;
}
};
let repo_name = collector.name().to_string();
match collector.collect(db) {
Ok(n) => {
info!(repo = %repo_name, commits = n, "extracted");
stats.commits_collected += n;
}
Err(e) => {
let msg = format!("collection failed for {repo_name}: {e}");
warn!("{msg}");
stats.errors.push(msg);
}
}
}
stats.authors_resolved = self.upsert_observed_authors(db, &resolver)?;
if let Some(gh_cfg) = &self.config.github {
if gh_cfg.fetch_prs {
match GitHubClient::new(gh_cfg) {
Ok(gh) => match gh.fetch_pull_requests().await {
Ok(prs) => match gh.store_pull_requests(db, &prs) {
Ok(n) => {
info!(prs = n, "stored pull requests");
stats.prs_fetched += n;
}
Err(e) => {
stats.errors.push(format!("PR store failed: {e}"));
}
},
Err(e) => {
stats.errors.push(format!("PR fetch failed: {e}"));
}
},
Err(e) => {
stats.errors.push(format!("GitHub client init failed: {e}"));
}
}
}
}
Ok(stats)
}
fn upsert_observed_authors(
&self,
db: &mut Database,
resolver: &IdentityResolver,
) -> Result<usize> {
let pairs: Vec<(String, String)> = {
let conn = db.connection();
let mut stmt = conn.prepare(
"SELECT DISTINCT author_name, author_email FROM commits WHERE author_id IS NULL",
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
out
};
let mut count = 0usize;
for (name, email) in pairs {
let author_id = resolver.upsert_author(db, &name, &email)?;
db.connection().execute(
"UPDATE commits SET author_id = ?1 \
WHERE author_id IS NULL AND author_name = ?2 AND author_email = ?3",
rusqlite::params![author_id, name, email],
)?;
count += 1;
}
Ok(count)
}
}