use chrono::{DateTime, TimeZone, Utc};
use clap::Args;
use regex::Regex;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, LINK, USER_AGENT};
use rusqlite::params;
use serde::Deserialize;
use tracing::{debug, info, warn};
use tga::core::config::{Config, DoraConfig, RepositoryConfig};
use tga::core::db::Database;
const USER_AGENT_VALUE: &str = "trusty-git-analytics/0.1";
const GITHUB_API_BASE: &str = "https://api.github.com";
const PAGE_SIZE: u32 = 100;
const GITHUB_TOKEN_ENV: &str = "GITHUB_TOKEN";
#[derive(Args, Debug)]
#[command(
about = "Ingest deployment events into fact_deployments.",
long_about = "Walk the configured deployment source and persist deployment events into\n\
`fact_deployments`. Supported sources:\n\n\
git_tags -- match tags against dora.deployment_tag_pattern (default)\n\
github_releases -- paginate GitHub Releases API (requires GITHUB_TOKEN)\n\
github_actions -- paginate GitHub Actions runs (requires GITHUB_TOKEN)\n\
manual -- no-op (operator INSERTs directly into fact_deployments)\n\n\
The source is configured via `dora.deployment_source` in config.yaml.\n\
Use --source to override at runtime without editing the config file.",
after_help = "EXAMPLES:\n\
# Ingest from the configured source (usually git_tags)\n\
tga deployments collect\n\n\
# Force GitHub Releases source regardless of config\n\
tga deployments collect --source github_releases\n\n\
TIPS:\n\
- Set GITHUB_TOKEN before using github_releases or github_actions sources.\n\
- After ingestion, run `tga dora` to compute deployment frequency and lead time."
)]
pub struct DeploymentsCollectArgs {
#[arg(long, value_name = "SOURCE")]
pub source: Option<String>,
}
#[derive(Debug, Default, Clone)]
struct CollectStats {
inspected_tags: usize,
matched_tags: usize,
inserted: usize,
skipped: usize,
}
pub async fn run(
config: Config,
db: &mut Database,
args: DeploymentsCollectArgs,
) -> anyhow::Result<()> {
let dora = config.dora.clone().unwrap_or_default();
let source = args
.source
.clone()
.unwrap_or_else(|| dora.deployment_source.clone());
let stats = match source.as_str() {
"git_tags" => ingest_git_tags(db, &config.repositories, &dora)?,
"github_releases" => ingest_github_releases(db, &config.repositories, &dora).await?,
"github_actions" => ingest_github_actions(db, &config.repositories, &dora).await?,
"manual" => {
println!(
"deployment_source = 'manual' — no-op. INSERT into \
fact_deployments directly."
);
CollectStats::default()
}
other => {
anyhow::bail!(
"unknown deployment_source '{other}'. Expected one of: \
git_tags, github_releases, github_actions, manual."
);
}
};
println!(
"Inspected {} tag(s) across {} repo(s); {} matched the deployment pattern; \
{} inserted into fact_deployments, {} skipped (already present).",
stats.inspected_tags,
config.repositories.len(),
stats.matched_tags,
stats.inserted,
stats.skipped,
);
Ok(())
}
fn ingest_git_tags(
db: &mut Database,
repositories: &[RepositoryConfig],
dora: &DoraConfig,
) -> anyhow::Result<CollectStats> {
let mut stats = CollectStats::default();
let pattern = Regex::new(&dora.deployment_tag_pattern).map_err(|e| {
anyhow::anyhow!(
"dora.deployment_tag_pattern is not a valid regex: {e} \
(pattern: {pat:?})",
pat = dora.deployment_tag_pattern
)
})?;
let conn = db.connection_mut();
let tx = conn.transaction()?;
{
let mut insert = tx.prepare(
"INSERT OR IGNORE INTO fact_deployments \
(deploy_id, repo, environment, triggered_at, completed_at, \
status, git_sha, git_tag, triggered_by_pr, source) \
VALUES (?1, ?2, 'production', ?3, ?3, 'success', ?4, ?5, NULL, 'git_tag')",
)?;
for repo_cfg in repositories {
let repo_name = repo_cfg.name.clone().unwrap_or_else(|| {
repo_cfg
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("(unknown)")
.to_string()
});
let repo = match git2::Repository::open(&repo_cfg.path) {
Ok(r) => r,
Err(e) => {
warn!(repo = %repo_name, error = %e, "git open failed; skipping tags");
continue;
}
};
let tags = match repo.tag_names(None) {
Ok(t) => t,
Err(e) => {
warn!(repo = %repo_name, error = %e, "tag_names failed; skipping");
continue;
}
};
for tag in tags.iter().flatten() {
stats.inspected_tags += 1;
if !pattern.is_match(tag) {
continue;
}
stats.matched_tags += 1;
let refname = format!("refs/tags/{tag}");
let obj = match repo.revparse_single(&refname) {
Ok(o) => o,
Err(e) => {
warn!(repo = %repo_name, tag = %tag, error = %e, "revparse failed");
continue;
}
};
let commit = match obj.peel_to_commit() {
Ok(c) => c,
Err(e) => {
warn!(repo = %repo_name, tag = %tag, error = %e, "peel failed");
continue;
}
};
let sha = commit.id().to_string();
let time = commit.time();
let triggered_at: DateTime<Utc> = Utc
.timestamp_opt(time.seconds(), 0)
.single()
.unwrap_or_else(Utc::now);
let deploy_id = format!("{repo_name}@{tag}");
let changed = insert.execute(params![
deploy_id,
repo_name,
triggered_at.to_rfc3339(),
sha,
tag,
])?;
if changed > 0 {
stats.inserted += 1;
} else {
stats.skipped += 1;
}
}
}
}
tx.commit()?;
info!(
inspected = stats.inspected_tags,
matched = stats.matched_tags,
inserted = stats.inserted,
skipped = stats.skipped,
"git-tag deployment ingestion complete"
);
Ok(stats)
}
#[derive(Debug, Deserialize)]
struct ApiRelease {
tag_name: String,
#[serde(default)]
target_commitish: Option<String>,
#[serde(default)]
published_at: Option<DateTime<Utc>>,
#[serde(default)]
draft: bool,
#[serde(default)]
prerelease: bool,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct ApiWorkflowRun {
id: u64,
head_sha: String,
#[serde(default)]
head_branch: Option<String>,
#[serde(default)]
created_at: Option<DateTime<Utc>>,
#[serde(default)]
updated_at: Option<DateTime<Utc>>,
#[serde(default)]
conclusion: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
path: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ApiWorkflowRunsEnvelope {
#[serde(default)]
workflow_runs: Vec<ApiWorkflowRun>,
}
fn build_github_http_client(token: Option<&str>) -> anyhow::Result<reqwest::Client> {
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT, HeaderValue::from_static(USER_AGENT_VALUE));
headers.insert(
ACCEPT,
HeaderValue::from_static("application/vnd.github+json"),
);
if let Some(t) = token {
let val = HeaderValue::from_str(&format!("Bearer {t}"))
.map_err(|e| anyhow::anyhow!("invalid GITHUB_TOKEN: {e}"))?;
headers.insert(AUTHORIZATION, val);
}
Ok(reqwest::Client::builder()
.default_headers(headers)
.timeout(std::time::Duration::from_secs(30))
.build()?)
}
fn resolve_repo_to_github_slug(repo_cfg: &RepositoryConfig) -> Option<(String, String)> {
let repo_name = repo_cfg.name.clone().or_else(|| {
repo_cfg
.path
.file_name()
.and_then(|n| n.to_str())
.map(str::to_string)
});
if let Some(owner) = &repo_cfg.org {
if let Some(name) = repo_name.clone() {
if !name.is_empty() {
return Some((owner.clone(), name));
}
}
}
owner_repo_from_remote(&repo_cfg.path)
}
fn owner_repo_from_remote(repo_path: &std::path::Path) -> Option<(String, String)> {
let repo = git2::Repository::open(repo_path).ok()?;
let remote = repo.find_remote("origin").ok()?;
let url = remote.url()?;
extract_owner_repo_from_url(url)
}
fn extract_owner_repo_from_url(url: &str) -> Option<(String, String)> {
let cleaned = url.strip_suffix(".git").unwrap_or(url);
if let Some(rest) = cleaned.strip_prefix("git@github.com:") {
return split_owner_repo(rest);
}
for prefix in [
"https://github.com/",
"http://github.com/",
"ssh://git@github.com/",
] {
if let Some(rest) = cleaned.strip_prefix(prefix) {
return split_owner_repo(rest);
}
}
if let Some(after_scheme) = cleaned.strip_prefix("https://") {
if let Some(at_idx) = after_scheme.find('@') {
let after_at = &after_scheme[at_idx + 1..];
if let Some(rest) = after_at.strip_prefix("github.com/") {
return split_owner_repo(rest);
}
}
}
None
}
fn split_owner_repo(rest: &str) -> Option<(String, String)> {
let mut parts = rest.splitn(3, '/');
let owner = parts.next()?;
let name = parts.next()?;
if owner.is_empty() || name.is_empty() {
return None;
}
Some((owner.to_string(), name.to_string()))
}
fn next_link(headers: &HeaderMap) -> Option<String> {
let link = headers.get(LINK)?.to_str().ok()?;
parse_next_link_value(link)
}
fn parse_next_link_value(link: &str) -> Option<String> {
for entry in link.split(',') {
let entry = entry.trim();
let Some((url_part, rel_part)) = entry.split_once(';') else {
continue;
};
let url = url_part.trim();
if !url.starts_with('<') || !url.ends_with('>') {
continue;
}
let url = &url[1..url.len() - 1];
for param in rel_part.split(';') {
let param = param.trim();
if param == "rel=\"next\"" || param == "rel=next" {
return Some(url.to_string());
}
}
}
None
}
async fn ingest_github_releases(
db: &mut Database,
repositories: &[RepositoryConfig],
dora: &DoraConfig,
) -> anyhow::Result<CollectStats> {
let token = std::env::var(GITHUB_TOKEN_ENV).ok();
if token.is_none() {
warn!(
"deployment_source = 'github_releases' requires {GITHUB_TOKEN_ENV} \
but it is unset. Falling back to git_tags so fact_deployments still populates."
);
return ingest_git_tags(db, repositories, dora);
}
let client = build_github_http_client(token.as_deref())?;
let pattern = Regex::new(&dora.deployment_tag_pattern).map_err(|e| {
anyhow::anyhow!(
"dora.deployment_tag_pattern is not a valid regex: {e} \
(pattern: {pat:?})",
pat = dora.deployment_tag_pattern
)
})?;
let mut stats = CollectStats::default();
let conn = db.connection_mut();
let tx = conn.transaction()?;
{
let mut insert = tx.prepare(
"INSERT OR IGNORE INTO fact_deployments \
(deploy_id, repo, environment, triggered_at, completed_at, \
status, git_sha, git_tag, triggered_by_pr, source) \
VALUES (?1, ?2, 'production', ?3, ?3, 'success', ?4, ?5, NULL, 'github_release')",
)?;
for repo_cfg in repositories {
let repo_name = repo_cfg.name.clone().unwrap_or_else(|| {
repo_cfg
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("(unknown)")
.to_string()
});
let Some((owner, name)) = resolve_repo_to_github_slug(repo_cfg) else {
warn!(repo = %repo_name, "could not resolve owner/repo; skipping github_releases");
continue;
};
let releases = match fetch_all_releases(&client, &owner, &name).await {
Ok(v) => v,
Err(e) => {
warn!(
repo = %repo_name,
error = %e,
"GitHub releases fetch failed; continuing with remaining repos"
);
continue;
}
};
for rel in releases {
stats.inspected_tags += 1;
if rel.draft || rel.prerelease {
continue;
}
if !pattern.is_match(&rel.tag_name) {
continue;
}
stats.matched_tags += 1;
let Some(published_at) = rel.published_at else {
debug!(repo = %repo_name, tag = %rel.tag_name, "release has no published_at; skipping");
continue;
};
let deploy_id = format!("{repo_name}@{}", rel.tag_name);
let sha_or_branch = rel.target_commitish.unwrap_or_default();
let changed = insert.execute(params![
deploy_id,
repo_name,
published_at.to_rfc3339(),
sha_or_branch,
rel.tag_name,
])?;
if changed > 0 {
stats.inserted += 1;
} else {
stats.skipped += 1;
}
}
}
}
tx.commit()?;
info!(
inspected = stats.inspected_tags,
matched = stats.matched_tags,
inserted = stats.inserted,
skipped = stats.skipped,
"github_releases deployment ingestion complete"
);
Ok(stats)
}
async fn fetch_all_releases(
client: &reqwest::Client,
owner: &str,
repo: &str,
) -> anyhow::Result<Vec<ApiRelease>> {
let mut out: Vec<ApiRelease> = Vec::new();
let mut next_url = Some(format!(
"{GITHUB_API_BASE}/repos/{owner}/{repo}/releases?per_page={PAGE_SIZE}"
));
while let Some(url) = next_url {
debug!(url = %url, "GET (github releases)");
let resp = client.get(&url).send().await?;
let next = next_link(resp.headers());
let resp = resp.error_for_status()?;
let page: Vec<ApiRelease> = resp.json().await?;
let n = page.len();
out.extend(page);
if next.is_none() || n == 0 {
break;
}
next_url = next;
}
Ok(out)
}
async fn ingest_github_actions(
db: &mut Database,
repositories: &[RepositoryConfig],
dora: &DoraConfig,
) -> anyhow::Result<CollectStats> {
let token = std::env::var(GITHUB_TOKEN_ENV).ok();
if token.is_none() {
warn!(
"deployment_source = 'github_actions' requires {GITHUB_TOKEN_ENV} \
but it is unset. Falling back to git_tags so fact_deployments still populates."
);
return ingest_git_tags(db, repositories, dora);
}
let client = build_github_http_client(token.as_deref())?;
let workflow_filter = dora.deployment_workflow.clone();
let branch = dora.production_branch.clone();
let mut stats = CollectStats::default();
let conn = db.connection_mut();
let tx = conn.transaction()?;
{
let mut insert = tx.prepare(
"INSERT OR IGNORE INTO fact_deployments \
(deploy_id, repo, environment, triggered_at, completed_at, \
status, git_sha, git_tag, triggered_by_pr, source) \
VALUES (?1, ?2, 'production', ?3, ?4, 'success', ?5, NULL, NULL, 'github_actions')",
)?;
for repo_cfg in repositories {
let repo_name = repo_cfg.name.clone().unwrap_or_else(|| {
repo_cfg
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("(unknown)")
.to_string()
});
let Some((owner, name)) = resolve_repo_to_github_slug(repo_cfg) else {
warn!(repo = %repo_name, "could not resolve owner/repo; skipping github_actions");
continue;
};
let runs = match fetch_all_workflow_runs(&client, &owner, &name, &branch).await {
Ok(v) => v,
Err(e) => {
warn!(
repo = %repo_name,
error = %e,
"GitHub actions runs fetch failed; continuing with remaining repos"
);
continue;
}
};
for run in runs {
stats.inspected_tags += 1;
if !is_kept_run(&run, workflow_filter.as_deref()) {
continue;
}
stats.matched_tags += 1;
let Some(triggered_at) = run.created_at else {
debug!(repo = %repo_name, id = run.id, "run has no created_at; skipping");
continue;
};
let completed_at = run.updated_at.unwrap_or(triggered_at);
let deploy_id = format!("{repo_name}@run:{}", run.id);
let changed = insert.execute(params![
deploy_id,
repo_name,
triggered_at.to_rfc3339(),
completed_at.to_rfc3339(),
run.head_sha,
])?;
if changed > 0 {
stats.inserted += 1;
} else {
stats.skipped += 1;
}
}
}
}
tx.commit()?;
info!(
inspected = stats.inspected_tags,
matched = stats.matched_tags,
inserted = stats.inserted,
skipped = stats.skipped,
"github_actions deployment ingestion complete"
);
Ok(stats)
}
fn is_kept_run(run: &ApiWorkflowRun, workflow_filter: Option<&str>) -> bool {
if run.conclusion.as_deref() != Some("success") {
return false;
}
let Some(filter) = workflow_filter else {
return true;
};
if filter.is_empty() {
return true;
}
if run.name.as_deref() == Some(filter) {
return true;
}
if let Some(path) = run.path.as_deref() {
if path == filter || path.ends_with(&format!("/{filter}")) {
return true;
}
}
false
}
async fn fetch_all_workflow_runs(
client: &reqwest::Client,
owner: &str,
repo: &str,
branch: &str,
) -> anyhow::Result<Vec<ApiWorkflowRun>> {
let mut out: Vec<ApiWorkflowRun> = Vec::new();
let mut next_url = Some(format!(
"{GITHUB_API_BASE}/repos/{owner}/{repo}/actions/runs\
?branch={branch}&status=success&per_page={PAGE_SIZE}",
));
while let Some(url) = next_url {
debug!(url = %url, "GET (github actions runs)");
let resp = client.get(&url).send().await?;
let next = next_link(resp.headers());
let resp = resp.error_for_status()?;
let env: ApiWorkflowRunsEnvelope = resp.json().await?;
let n = env.workflow_runs.len();
out.extend(env.workflow_runs);
if next.is_none() || n == 0 {
break;
}
next_url = next;
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bad_deployment_tag_pattern_returns_clear_error() {
let mut db = Database::open_in_memory().expect("db");
let dora = DoraConfig {
deployment_tag_pattern: "[unclosed".into(),
..DoraConfig::default()
};
let err = ingest_git_tags(&mut db, &[], &dora).expect_err("bad regex");
let msg = format!("{err}");
assert!(
msg.contains("dora.deployment_tag_pattern"),
"error should name the field: {msg}"
);
}
#[test]
fn deploy_id_primary_key_makes_reingest_idempotent() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
for _ in 0..2 {
conn.execute(
"INSERT OR IGNORE INTO fact_deployments \
(deploy_id, repo, environment, triggered_at, status, git_sha, git_tag, source) \
VALUES ('repo@v1.0.0', 'repo', 'production', \
'2025-01-01T00:00:00Z', 'success', 'sha', 'v1.0.0', 'git_tag')",
[],
)
.expect("insert");
}
let n: i64 = conn
.query_row("SELECT COUNT(*) FROM fact_deployments", [], |r| r.get(0))
.expect("count");
assert_eq!(n, 1, "INSERT OR IGNORE must dedupe on deploy_id PK");
}
#[test]
fn extract_owner_repo_from_url_handles_common_forms() {
assert_eq!(
extract_owner_repo_from_url("https://github.com/acme/widget.git"),
Some(("acme".to_string(), "widget".to_string()))
);
assert_eq!(
extract_owner_repo_from_url("https://github.com/acme/widget"),
Some(("acme".to_string(), "widget".to_string()))
);
assert_eq!(
extract_owner_repo_from_url("git@github.com:acme/widget.git"),
Some(("acme".to_string(), "widget".to_string()))
);
assert_eq!(
extract_owner_repo_from_url("ssh://git@github.com/acme/widget.git"),
Some(("acme".to_string(), "widget".to_string()))
);
assert_eq!(
extract_owner_repo_from_url("https://user@github.com/acme/widget"),
Some(("acme".to_string(), "widget".to_string()))
);
assert!(extract_owner_repo_from_url("https://gitlab.com/acme/widget").is_none());
assert!(extract_owner_repo_from_url("nonsense").is_none());
}
#[test]
fn resolve_repo_to_github_slug_prefers_explicit_org() {
let cfg = RepositoryConfig {
path: std::path::PathBuf::from("/tmp/some-dir"),
name: Some("widget".into()),
org: Some("acme".into()),
..Default::default()
};
assert_eq!(
resolve_repo_to_github_slug(&cfg),
Some(("acme".to_string(), "widget".to_string()))
);
}
#[test]
fn resolve_repo_to_github_slug_returns_none_when_unresolvable() {
let cfg = RepositoryConfig {
path: std::path::PathBuf::from("/nonexistent/path-xyz-987"),
name: None,
org: None,
..Default::default()
};
assert_eq!(resolve_repo_to_github_slug(&cfg), None);
}
#[test]
fn next_link_parses_canonical_github_header() {
let h = "<https://api.github.com/repositories/1/releases?page=2>; rel=\"next\", \
<https://api.github.com/repositories/1/releases?page=5>; rel=\"last\"";
assert_eq!(
parse_next_link_value(h).as_deref(),
Some("https://api.github.com/repositories/1/releases?page=2"),
);
let last_only = "<https://api.github.com/repositories/1/releases?page=5>; rel=\"last\"";
assert!(parse_next_link_value(last_only).is_none());
}
#[test]
fn api_release_deserializes_full_and_minimal() {
let full = r#"{
"id": 1,
"tag_name": "v1.2.3",
"target_commitish": "main",
"published_at": "2025-01-01T00:00:00Z",
"draft": false,
"prerelease": false
}"#;
let r: ApiRelease = serde_json::from_str(full).expect("parses");
assert_eq!(r.tag_name, "v1.2.3");
assert_eq!(r.target_commitish.as_deref(), Some("main"));
assert!(!r.draft && !r.prerelease);
assert!(r.published_at.is_some());
let minimal = r#"{"tag_name": "v0.1.0"}"#;
let r: ApiRelease = serde_json::from_str(minimal).expect("parses");
assert_eq!(r.tag_name, "v0.1.0");
assert!(r.target_commitish.is_none());
assert!(r.published_at.is_none());
assert!(!r.draft && !r.prerelease);
}
#[test]
fn api_workflow_run_deserializes() {
let json = r#"{
"workflow_runs": [
{
"id": 999,
"head_sha": "deadbeefcafebabe",
"head_branch": "main",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:05:00Z",
"conclusion": "success",
"name": "deploy-production",
"path": ".github/workflows/deploy-production.yml"
}
]
}"#;
let env: ApiWorkflowRunsEnvelope = serde_json::from_str(json).expect("parses");
assert_eq!(env.workflow_runs.len(), 1);
let r = &env.workflow_runs[0];
assert_eq!(r.id, 999);
assert_eq!(r.head_sha, "deadbeefcafebabe");
assert_eq!(r.conclusion.as_deref(), Some("success"));
assert_eq!(r.name.as_deref(), Some("deploy-production"));
}
#[test]
fn is_kept_run_enforces_conclusion_and_workflow_filter() {
let mut run = ApiWorkflowRun {
id: 1,
head_sha: "sha".into(),
head_branch: Some("main".into()),
created_at: None,
updated_at: None,
conclusion: Some("success".into()),
name: Some("deploy-production".into()),
path: Some(".github/workflows/deploy-production.yml".into()),
};
assert!(is_kept_run(&run, None));
assert!(is_kept_run(&run, Some("deploy-production")));
assert!(is_kept_run(&run, Some("deploy-production.yml")));
assert!(!is_kept_run(&run, Some("ci.yml")));
run.conclusion = Some("failure".into());
assert!(!is_kept_run(&run, None));
run.conclusion = None;
assert!(!is_kept_run(&run, None));
}
}