pub mod commits;
use anyhow::{Context, Result};
use chrono;
use rusqlite::Connection;
use serde_json::json;
use std::collections::HashMap;
use std::path::Path;
use std::process::Command;
use std::time::Instant;
use super::database;
use super::ScrapeStats;
#[derive(Debug)]
struct SessionBounds {
session_id: String,
start_time: String, end_time: Option<String>, }
fn parse_session_tags() -> Result<Vec<SessionBounds>> {
let output = Command::new("git")
.args([
"tag",
"-l",
"session-*",
"--format",
"%(refname:short)|%(creatordate:iso-strict)",
])
.output()
.context("Failed to run git tag")?;
if !output.status.success() {
return Ok(Vec::new());
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut sessions: HashMap<String, (Option<String>, Option<String>)> = HashMap::new();
for line in stdout.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('|').collect();
if parts.len() != 2 {
continue;
}
let tag_name = parts[0];
let timestamp = parts[1].to_string();
if let Some(rest) = tag_name.strip_prefix("session-") {
let (candidate, is_start) = if let Some(c) = rest.strip_suffix("-start") {
(c, true)
} else if let Some(c) = rest.strip_suffix("-end") {
(c, false)
} else {
continue;
};
let session_id = if candidate.len() > 15 && candidate.as_bytes().get(15) == Some(&b'-')
{
&candidate[..15]
} else {
candidate
};
let entry = sessions
.entry(session_id.to_string())
.or_insert((None, None));
if is_start {
entry.0 = Some(timestamp);
} else {
entry.1 = Some(timestamp);
}
}
}
let mut bounds: Vec<SessionBounds> = sessions
.into_iter()
.filter_map(|(session_id, (start, end))| {
start.map(|start_time| SessionBounds {
session_id,
start_time,
end_time: end,
})
})
.collect();
bounds.sort_by(|a, b| a.start_time.cmp(&b.start_time));
Ok(bounds)
}
const MAX_SESSION_DURATION_SECS: i64 = 24 * 60 * 60;
fn find_session_for_commit(commit_time: &str, sessions: &[SessionBounds]) -> Option<String> {
for session in sessions.iter().rev() {
if commit_time >= session.start_time.as_str() {
match &session.end_time {
Some(end_time) if commit_time <= end_time.as_str() => {
return Some(session.session_id.clone());
}
None => {
if let (Ok(commit_dt), Ok(start_dt)) = (
chrono::DateTime::parse_from_rfc3339(commit_time),
chrono::DateTime::parse_from_rfc3339(&session.start_time),
) {
let duration = commit_dt.signed_duration_since(start_dt);
if duration.num_seconds() >= 0
&& duration.num_seconds() <= MAX_SESSION_DURATION_SECS
{
return Some(session.session_id.clone());
}
}
}
_ => continue, }
}
}
None
}
#[derive(Debug)]
struct GitTag {
name: String,
sha: String,
date: String,
tagger: String,
message: String,
}
fn parse_all_tags() -> Result<Vec<GitTag>> {
let output = Command::new("git")
.args([
"tag",
"-l",
"--format",
"%(refname:short)|%(objectname:short)|%(creatordate:iso-strict)|%(taggername)|%(contents:subject)",
])
.output()
.context("Failed to run git tag")?;
if !output.status.success() {
return Ok(Vec::new());
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut tags = Vec::new();
for line in stdout.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.splitn(5, '|').collect();
if parts.len() < 3 {
continue;
}
tags.push(GitTag {
name: parts[0].to_string(),
sha: parts.get(1).unwrap_or(&"").to_string(),
date: parts.get(2).unwrap_or(&"").to_string(),
tagger: parts.get(3).unwrap_or(&"").to_string(),
message: parts.get(4).unwrap_or(&"").to_string(),
});
}
Ok(tags)
}
fn insert_tags(conn: &Connection, tags: &[GitTag], skip_eventlog: bool) -> Result<usize> {
conn.execute("DELETE FROM git_tags", [])?;
let mut stmt = conn.prepare(
"INSERT OR REPLACE INTO git_tags (tag_name, sha, tag_date, tagger_name, message) VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
for tag in tags {
if !skip_eventlog {
let event_data = json!({
"tag_name": &tag.name,
"sha": &tag.sha,
"tag_date": &tag.date,
"tagger_name": &tag.tagger,
"message": &tag.message,
});
database::insert_event(
conn,
"git.tag",
&tag.date,
&tag.name,
None,
&event_data.to_string(),
)?;
}
stmt.execute(rusqlite::params![
&tag.name,
&tag.sha,
&tag.date,
&tag.tagger,
&tag.message,
])?;
}
Ok(tags.len())
}
fn parse_tracked_files() -> Result<Vec<String>> {
let output = Command::new("git")
.args(["ls-files"])
.output()
.context("Failed to run git ls-files")?;
if !output.status.success() {
return Ok(Vec::new());
}
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(stdout
.lines()
.filter(|l| !l.is_empty())
.map(String::from)
.collect())
}
fn insert_tracked_files(conn: &Connection, files: &[String]) -> Result<usize> {
conn.execute("DELETE FROM git_tracked_files", [])?;
let mut stmt =
conn.prepare("INSERT INTO git_tracked_files (file_path, status) VALUES (?1, 'tracked')")?;
for file in files {
stmt.execute([file])?;
}
Ok(files.len())
}
#[derive(Debug)]
struct GitCommit {
sha: String,
message: String,
author_name: String,
author_email: String,
timestamp: String,
files: Vec<FileChange>,
session_id: Option<String>, }
#[derive(Debug)]
struct FileChange {
path: String,
change_type: String,
lines_added: i32,
lines_removed: i32,
}
fn create_materialized_views(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
-- Commits view (materialized from git.commit events)
CREATE TABLE IF NOT EXISTS commits (
sha TEXT PRIMARY KEY,
message TEXT,
author_name TEXT,
author_email TEXT,
timestamp TEXT,
branch TEXT
);
-- Files changed per commit (from git.commit event data)
CREATE TABLE IF NOT EXISTS commit_files (
sha TEXT,
file_path TEXT,
change_type TEXT,
lines_added INTEGER,
lines_removed INTEGER,
PRIMARY KEY (sha, file_path)
);
-- Co-change relationships (derived from commit_files)
CREATE TABLE IF NOT EXISTS co_changes (
file_a TEXT,
file_b TEXT,
count INTEGER,
PRIMARY KEY (file_a, file_b)
);
-- Git tracked files (from git ls-files)
CREATE TABLE IF NOT EXISTS git_tracked_files (
file_path TEXT PRIMARY KEY,
status TEXT DEFAULT 'tracked'
);
-- Git tags (all tags, not just session tags)
CREATE TABLE IF NOT EXISTS git_tags (
tag_name TEXT PRIMARY KEY,
sha TEXT,
tag_date TEXT,
tagger_name TEXT,
message TEXT
);
-- Indexes for common queries
CREATE INDEX IF NOT EXISTS idx_commits_timestamp ON commits(timestamp);
CREATE INDEX IF NOT EXISTS idx_commits_author ON commits(author_email);
CREATE INDEX IF NOT EXISTS idx_commit_files_path ON commit_files(file_path);
CREATE INDEX IF NOT EXISTS idx_co_changes_count ON co_changes(count DESC);
CREATE INDEX IF NOT EXISTS idx_git_tags_date ON git_tags(tag_date);
"#,
)?;
Ok(())
}
fn resolve_rename_path(path: &str) -> String {
if !path.contains("=>") {
return path.to_string();
}
if let (Some(open), Some(close)) = (path.find('{'), path.find('}')) {
let prefix = &path[..open];
let suffix = &path[close + 1..];
let rename_part = &path[open + 1..close];
if let Some(arrow) = rename_part.find(" => ") {
let new_name = rename_part[arrow + 4..].trim();
return if new_name.is_empty() {
format!("{}{}", prefix, suffix.trim_start_matches('/'))
} else {
format!("{}{}{}", prefix, new_name, suffix)
};
}
}
if let Some(arrow) = path.find(" => ") {
return path[arrow + 4..].to_string();
}
path.to_string()
}
fn parse_git_log(since_sha: Option<&str>) -> Result<Vec<GitCommit>> {
let mut cmd = Command::new("git");
cmd.args([
"log",
"--pretty=format:%H|%s|%an|%ae|%aI",
"--numstat",
"--no-merges",
]);
if let Some(sha) = since_sha {
cmd.arg(format!("{}..HEAD", sha));
}
let output = cmd.output().context("Failed to run git log")?;
if !output.status.success() {
anyhow::bail!(
"git log failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
parse_git_log_output(&stdout)
}
fn parse_git_log_output(output: &str) -> Result<Vec<GitCommit>> {
let mut commits = Vec::new();
let mut current_commit: Option<GitCommit> = None;
for line in output.lines() {
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('|').collect();
if parts.len() == 5
&& parts[0].len() == 40
&& parts[0].chars().all(|c| c.is_ascii_hexdigit())
{
if let Some(commit) = current_commit.take() {
commits.push(commit);
}
current_commit = Some(GitCommit {
sha: parts[0].to_string(),
message: parts[1].to_string(),
author_name: parts[2].to_string(),
author_email: parts[3].to_string(),
timestamp: parts[4].to_string(),
files: Vec::new(),
session_id: None, });
} else if let Some(ref mut commit) = current_commit {
let stat_parts: Vec<&str> = line.split('\t').collect();
if stat_parts.len() >= 3 {
let lines_added = stat_parts[0].parse().unwrap_or(0);
let lines_removed = stat_parts[1].parse().unwrap_or(0);
let path = resolve_rename_path(stat_parts[2]);
let change_type = if lines_added > 0 && lines_removed == 0 {
"added"
} else if lines_added == 0 && lines_removed > 0 {
"deleted"
} else {
"modified"
};
commit.files.push(FileChange {
path,
change_type: change_type.to_string(),
lines_added,
lines_removed,
});
}
}
}
if let Some(commit) = current_commit {
commits.push(commit);
}
Ok(commits)
}
fn insert_commits(conn: &Connection, commits: &[GitCommit], skip_eventlog: bool) -> Result<usize> {
let mut count = 0;
let mut commit_stmt = conn.prepare(
"INSERT OR REPLACE INTO commits (sha, message, author_name, author_email, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
let mut file_stmt = conn.prepare(
"INSERT OR REPLACE INTO commit_files (sha, file_path, change_type, lines_added, lines_removed) VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
for commit in commits {
let parsed = commits::parse_conventional(&commit.message);
let mut event_data = json!({
"sha": &commit.sha,
"message": &commit.message,
"author_name": &commit.author_name,
"author_email": &commit.author_email,
"files": commit.files.iter().map(|f| json!({
"path": &f.path,
"change_type": &f.change_type,
"lines_added": f.lines_added,
"lines_removed": f.lines_removed,
})).collect::<Vec<_>>(),
});
if parsed.has_structure() {
event_data["parsed"] = json!({
"type": &parsed.commit_type,
"scope": &parsed.scope,
"breaking": parsed.breaking,
"pr_ref": parsed.pr_ref,
"issue_refs": &parsed.issue_refs,
});
}
if let Some(ref session_id) = commit.session_id {
event_data["session_id"] = json!(session_id);
}
if !skip_eventlog {
database::insert_event(
conn,
"git.commit",
&commit.timestamp,
&commit.sha,
None, &event_data.to_string(),
)?;
}
commit_stmt.execute([
&commit.sha,
&commit.message,
&commit.author_name,
&commit.author_email,
&commit.timestamp,
])?;
for file in &commit.files {
file_stmt.execute(rusqlite::params![
&commit.sha,
&file.path,
&file.change_type,
file.lines_added,
file.lines_removed,
])?;
}
count += 1;
}
Ok(count)
}
const MAX_FILES_PER_COMMIT: usize = 50;
fn rebuild_co_changes(conn: &Connection) -> Result<usize> {
conn.execute("DELETE FROM co_changes", [])?;
let mut co_change_counts: HashMap<(String, String), i32> = HashMap::new();
let mut stmt = conn.prepare("SELECT sha, file_path FROM commit_files ORDER BY sha")?;
let mut rows = stmt.query([])?;
let mut current_sha: Option<String> = None;
let mut current_files: Vec<String> = Vec::new();
let mut skipped_commits = 0;
while let Some(row) = rows.next()? {
let sha: String = row.get(0)?;
let file_path: String = row.get(1)?;
if Some(&sha) != current_sha.as_ref() {
if current_files.len() > 1 && current_files.len() <= MAX_FILES_PER_COMMIT {
for i in 0..current_files.len() {
for j in (i + 1)..current_files.len() {
let (a, b) = if current_files[i] < current_files[j] {
(current_files[i].clone(), current_files[j].clone())
} else {
(current_files[j].clone(), current_files[i].clone())
};
*co_change_counts.entry((a, b)).or_insert(0) += 1;
}
}
} else if current_files.len() > MAX_FILES_PER_COMMIT {
skipped_commits += 1;
}
current_sha = Some(sha);
current_files.clear();
}
current_files.push(file_path);
}
if current_files.len() > 1 && current_files.len() <= MAX_FILES_PER_COMMIT {
for i in 0..current_files.len() {
for j in (i + 1)..current_files.len() {
let (a, b) = if current_files[i] < current_files[j] {
(current_files[i].clone(), current_files[j].clone())
} else {
(current_files[j].clone(), current_files[i].clone())
};
*co_change_counts.entry((a, b)).or_insert(0) += 1;
}
}
} else if current_files.len() > MAX_FILES_PER_COMMIT {
skipped_commits += 1;
}
if skipped_commits > 0 {
println!(
" Skipped {} commits with >{} files",
skipped_commits, MAX_FILES_PER_COMMIT
);
}
let mut insert_stmt =
conn.prepare("INSERT INTO co_changes (file_a, file_b, count) VALUES (?1, ?2, ?3)")?;
let count = co_change_counts.len();
for ((file_a, file_b), cnt) in co_change_counts {
insert_stmt.execute([&file_a, &file_b, &cnt.to_string()])?;
}
Ok(count)
}
fn get_last_sha(conn: &Connection) -> Result<Option<String>> {
database::get_last_processed(conn, "git")
}
fn update_last_sha(conn: &Connection, sha: &str) -> Result<()> {
database::set_last_processed(conn, "git", sha)
}
fn is_shallow_clone() -> bool {
Path::new(".git/shallow").exists()
}
pub fn run(full: bool) -> Result<ScrapeStats> {
let start = Instant::now();
let db_path = Path::new(database::PATINA_DB);
let skip_eventlog = database::is_ref_repo(db_path);
if is_shallow_clone() {
println!("⚠️ Shallow clone detected - skipping git history analysis");
println!(" (Co-change analysis requires full git history)");
return Ok(ScrapeStats {
items_processed: 0,
time_elapsed: start.elapsed(),
database_size_kb: std::fs::metadata(db_path)
.map(|m| m.len() / 1024)
.unwrap_or(0),
});
}
let conn = database::initialize(db_path)?;
create_materialized_views(&conn)?;
let tags = parse_all_tags()?;
let tag_count = insert_tags(&conn, &tags, skip_eventlog)?;
if tag_count > 0 {
println!(" Indexed {} git tags", tag_count);
}
let tracked = parse_tracked_files()?;
let tracked_count = insert_tracked_files(&conn, &tracked)?;
if tracked_count > 0 {
println!(" Indexed {} tracked files", tracked_count);
}
let since_sha = if full { None } else { get_last_sha(&conn)? };
if since_sha.is_some() {
println!("📊 Incremental scrape from last known commit...");
} else {
println!("📊 Full git history scrape...");
}
let mut commits = parse_git_log(since_sha.as_deref())?;
if commits.is_empty() {
println!(" No new commits to process");
return Ok(ScrapeStats {
items_processed: 0,
time_elapsed: start.elapsed(),
database_size_kb: std::fs::metadata(db_path)
.map(|m| m.len() / 1024)
.unwrap_or(0),
});
}
println!(" Found {} commits to process", commits.len());
let session_bounds = parse_session_tags().unwrap_or_default();
if !session_bounds.is_empty() {
let mut linked_count = 0;
for commit in &mut commits {
if let Some(session_id) = find_session_for_commit(&commit.timestamp, &session_bounds) {
commit.session_id = Some(session_id);
linked_count += 1;
}
}
if linked_count > 0 {
println!(" Linked {} commits to sessions", linked_count);
}
}
let conventional_count = commits
.iter()
.filter(|c| commits::parse_conventional(&c.message).has_structure())
.count();
let pr_ref_count = commits
.iter()
.filter(|c| commits::parse_conventional(&c.message).pr_ref.is_some())
.count();
let commit_count = insert_commits(&conn, &commits, skip_eventlog)?;
if skip_eventlog {
println!(" Inserted {} commits (direct, no eventlog)", commit_count);
} else {
println!(" Inserted {} commits", commit_count);
}
if conventional_count > 0 {
let pct = (conventional_count * 100) / commits.len();
println!(
" Parsed {} conventional commits ({}%), {} with PR refs",
conventional_count, pct, pr_ref_count
);
}
if let Some(latest) = commits.first() {
update_last_sha(&conn, &latest.sha)?;
}
let co_change_count = rebuild_co_changes(&conn)?;
println!(" Built {} co-change relationships", co_change_count);
let fts_count = database::populate_commits_fts5(&conn)?;
println!(" Indexed {} commit messages for search", fts_count);
let elapsed = start.elapsed();
let db_size = std::fs::metadata(db_path)
.map(|m| m.len() / 1024)
.unwrap_or(0);
Ok(ScrapeStats {
items_processed: commit_count,
time_elapsed: elapsed,
database_size_kb: db_size,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_git_log_output() {
let sample = "abc123def456abc123def456abc123def456abc1|Fix bug in parser|John Doe|john@example.com|2025-01-15T10:30:00+00:00\n5\t2\tsrc/parser.rs\n10\t0\tsrc/new_file.rs\n\ndef456abc123def456abc123def456abc123def4|Add feature|Jane Smith|jane@example.com|2025-01-14T09:00:00+00:00\n20\t5\tsrc/feature.rs";
let commits = parse_git_log_output(sample).unwrap();
assert_eq!(commits.len(), 2);
assert_eq!(commits[0].sha, "abc123def456abc123def456abc123def456abc1");
assert_eq!(commits[0].files.len(), 2);
assert_eq!(commits[0].files[0].path, "src/parser.rs");
assert_eq!(commits[0].files[0].lines_added, 5);
assert_eq!(commits[0].files[0].lines_removed, 2);
assert!(commits[0].session_id.is_none()); }
#[test]
fn test_find_session_for_commit() {
let sessions = vec![
SessionBounds {
session_id: "20251217-100000".to_string(),
start_time: "2025-12-17T10:00:00+00:00".to_string(),
end_time: Some("2025-12-17T12:00:00+00:00".to_string()),
},
SessionBounds {
session_id: "20251217-140000".to_string(),
start_time: "2025-12-17T14:00:00+00:00".to_string(),
end_time: None, },
];
assert_eq!(
find_session_for_commit("2025-12-17T11:00:00+00:00", &sessions),
Some("20251217-100000".to_string())
);
assert_eq!(
find_session_for_commit("2025-12-17T13:00:00+00:00", &sessions),
None
);
assert_eq!(
find_session_for_commit("2025-12-17T15:00:00+00:00", &sessions),
Some("20251217-140000".to_string())
);
assert_eq!(
find_session_for_commit("2025-12-17T09:00:00+00:00", &sessions),
None
);
}
#[test]
fn test_find_session_for_commit_abandoned_session() {
let sessions = vec![SessionBounds {
session_id: "20250801-100000".to_string(),
start_time: "2025-08-01T10:00:00+00:00".to_string(),
end_time: None, }];
assert_eq!(
find_session_for_commit("2025-12-17T11:00:00+00:00", &sessions),
None
);
assert_eq!(
find_session_for_commit("2025-08-01T20:00:00+00:00", &sessions),
Some("20250801-100000".to_string())
);
}
#[test]
fn test_chrono_parsing_real_timestamps() {
let commit_time = "2025-12-17T08:52:56-05:00";
let session_start = "2025-08-19T10:51:24-04:00";
let commit_dt = chrono::DateTime::parse_from_rfc3339(commit_time);
let start_dt = chrono::DateTime::parse_from_rfc3339(session_start);
assert!(commit_dt.is_ok(), "Failed to parse commit time");
assert!(start_dt.is_ok(), "Failed to parse session start time");
let duration = commit_dt.unwrap().signed_duration_since(start_dt.unwrap());
assert!(
duration.num_days() > 100,
"Duration should be months, got {} days",
duration.num_days()
);
assert!(
duration.num_seconds() > MAX_SESSION_DURATION_SECS,
"Duration should exceed 24h limit"
);
}
}