use std::collections::HashSet;
use std::sync::OnceLock;
use chrono::{DateTime, Utc};
use regex::Regex;
use rusqlite::{params, Connection};
use serde::Deserialize;
use tracing::{debug, info, warn};
use crate::collect::azdo::client::AzdoError;
use crate::core::config::AzureDevOpsConfig;
use crate::core::errors::{Result as CoreResult, TgaError};
fn merged_pr_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"(?i)Merged PR (\d+):").expect("MERGED_PR_RE is a static valid pattern")
})
}
#[derive(Debug, Clone)]
pub struct AdoPullRequest {
pub pr_number: i64,
pub title: String,
pub description: Option<String>,
pub author: String,
pub created_at: DateTime<Utc>,
pub closed_at: Option<DateTime<Utc>>,
pub source_branch: String,
pub target_branch: String,
pub status: String,
pub reviewers: Vec<AdoPrReviewer>,
}
#[derive(Debug, Clone)]
pub struct AdoPrReviewer {
pub reviewer_id: String,
pub display_name: String,
pub vote: i32,
pub is_required: bool,
pub is_container: bool,
}
pub fn extract_pr_ids<I, S>(messages: I) -> Vec<i64>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut seen: HashSet<i64> = HashSet::new();
let re = merged_pr_re();
for msg in messages {
for cap in re.captures_iter(msg.as_ref()) {
if let Some(m) = cap.get(1) {
if let Ok(id) = m.as_str().parse::<i64>() {
seen.insert(id);
}
}
}
}
let mut out: Vec<i64> = seen.into_iter().collect();
out.sort_unstable();
out
}
pub fn get_existing_pr_numbers(conn: &Connection, provider: &str) -> CoreResult<HashSet<i64>> {
let mut stmt = conn.prepare("SELECT pr_number FROM pull_requests WHERE provider = ?1")?;
let rows = stmt
.query_map(params![provider], |row| row.get::<_, i64>(0))
.map_err(TgaError::from)?;
let mut out = HashSet::new();
for r in rows {
out.insert(r.map_err(TgaError::from)?);
}
Ok(out)
}
pub fn upsert_pr(conn: &Connection, pr: &AdoPullRequest) -> CoreResult<i64> {
let state = match pr.status.to_ascii_lowercase().as_str() {
"completed" => "merged",
"abandoned" => "closed",
_ => "open",
};
conn.execute(
"INSERT OR REPLACE INTO pull_requests \
(provider, pr_number, title, author, state, created_at, merged_at, commit_shas) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
"azdo",
pr.pr_number,
pr.title,
pr.author,
state,
pr.created_at.to_rfc3339(),
pr.closed_at.map(|t| t.to_rfc3339()),
"[]",
],
)?;
let id: i64 = conn
.query_row(
"SELECT id FROM pull_requests WHERE provider = ?1 AND pr_number = ?2",
params!["azdo", pr.pr_number],
|row| row.get(0),
)
.map_err(TgaError::from)?;
Ok(id)
}
pub fn upsert_pr_reviewer(
conn: &Connection,
pr_db_id: i64,
reviewer: &AdoPrReviewer,
) -> CoreResult<()> {
conn.execute(
"INSERT OR REPLACE INTO pr_reviewers \
(pr_id, provider, reviewer_id, display_name, vote, is_required, is_container) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
pr_db_id,
"azdo",
reviewer.reviewer_id,
reviewer.display_name,
reviewer.vote,
reviewer.is_required as i32,
reviewer.is_container as i32,
],
)?;
Ok(())
}
pub struct AdoPrFetcher {
config: AzureDevOpsConfig,
client: reqwest::Client,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PrRaw {
pull_request_id: i64,
#[serde(default)]
title: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
status: String,
#[serde(default)]
created_by: Option<IdentityRaw>,
creation_date: DateTime<Utc>,
#[serde(default)]
closed_date: Option<DateTime<Utc>>,
#[serde(default)]
source_ref_name: String,
#[serde(default)]
target_ref_name: String,
#[serde(default)]
reviewers: Vec<ReviewerRaw>,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct IdentityRaw {
#[serde(default)]
unique_name: Option<String>,
#[serde(default)]
display_name: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ReviewerRaw {
#[serde(default)]
unique_name: Option<String>,
#[serde(default)]
display_name: Option<String>,
#[serde(default)]
vote: i32,
#[serde(default)]
is_required: bool,
#[serde(default)]
is_container: bool,
}
impl From<PrRaw> for AdoPullRequest {
fn from(raw: PrRaw) -> Self {
let author = raw
.created_by
.as_ref()
.and_then(|i| i.unique_name.clone().or_else(|| i.display_name.clone()))
.unwrap_or_default();
let reviewers = raw
.reviewers
.into_iter()
.map(|r| {
let display = r.display_name.unwrap_or_default();
let id = r.unique_name.unwrap_or_else(|| display.clone());
AdoPrReviewer {
reviewer_id: id,
display_name: display,
vote: r.vote,
is_required: r.is_required,
is_container: r.is_container,
}
})
.collect();
AdoPullRequest {
pr_number: raw.pull_request_id,
title: raw.title,
description: raw.description,
author,
created_at: raw.creation_date,
closed_at: raw.closed_date,
source_branch: raw.source_ref_name,
target_branch: raw.target_ref_name,
status: raw.status,
reviewers,
}
}
}
impl AdoPrFetcher {
pub fn new(config: AzureDevOpsConfig) -> std::result::Result<Self, AzdoError> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static(concat!("tga/", env!("CARGO_PKG_VERSION"))),
);
headers.insert(
reqwest::header::ACCEPT,
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(AzdoError::Request)?;
Ok(Self { config, client })
}
fn org_url(&self) -> &str {
self.config.organization_url.trim_end_matches('/')
}
pub async fn fetch_pr(
&self,
pr_id: i64,
) -> std::result::Result<Option<AdoPullRequest>, AzdoError> {
let url = format!(
"{}/{}/_apis/git/pullrequests/{pr_id}?api-version=7.1",
self.org_url(),
encode_segment(&self.config.project),
);
debug!(url = %url, pr_id, "GET ADO PR");
let resp = self
.client
.get(&url)
.basic_auth("", Some(&self.config.pat))
.send()
.await
.map_err(AzdoError::Request)?;
match resp.status().as_u16() {
200 => {
let raw: PrRaw = resp
.json()
.await
.map_err(|e| AzdoError::Parse(e.to_string()))?;
Ok(Some(raw.into()))
}
404 => Ok(None),
401 => Err(AzdoError::Unauthorized),
403 => Err(AzdoError::Forbidden),
s => {
let message = resp.text().await.unwrap_or_default();
Err(AzdoError::Http { status: s, message })
}
}
}
pub async fn fetch_prs(&self, ids: &[i64]) -> Vec<AdoPullRequest> {
let mut out = Vec::with_capacity(ids.len());
for &id in ids {
match self.fetch_pr(id).await {
Ok(Some(pr)) => out.push(pr),
Ok(None) => {
debug!(pr_id = id, "ADO PR not found (404), skipping");
}
Err(e) => {
warn!(pr_id = id, error = %e, "ADO PR fetch failed");
}
}
}
out
}
pub async fn run<I, S>(&self, conn: &Connection, commit_messages: I) -> CoreResult<usize>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let ids = extract_pr_ids(commit_messages);
if ids.is_empty() {
info!("No 'Merged PR N:' references found; skipping ADO PR fetch");
return Ok(0);
}
let existing = get_existing_pr_numbers(conn, "azdo")?;
let to_fetch: Vec<i64> = ids
.into_iter()
.filter(|id| !existing.contains(id))
.collect();
if to_fetch.is_empty() {
info!("All referenced ADO PRs already cached; skipping fetch");
return Ok(0);
}
info!(count = to_fetch.len(), "Fetching ADO PRs");
let prs = self.fetch_prs(&to_fetch).await;
let mut stored = 0usize;
for pr in &prs {
let pr_db_id = upsert_pr(conn, pr)?;
for reviewer in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, reviewer)?;
}
stored += 1;
}
info!(stored, "Persisted ADO PRs");
Ok(stored)
}
}
fn encode_segment(s: &str) -> String {
fn is_unreserved(b: u8) -> bool {
b.is_ascii_alphanumeric() || matches!(b, b'-' | b'.' | b'_' | b'~')
}
let mut out = String::with_capacity(s.len());
for &b in s.as_bytes() {
if is_unreserved(b) {
out.push(b as char);
} else {
out.push_str(&format!("%{:02X}", b));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::Database;
fn sample_pr() -> AdoPullRequest {
AdoPullRequest {
pr_number: 12345,
title: "feat: add widget".into(),
description: Some("body".into()),
author: "alice@contoso.com".into(),
created_at: "2024-01-15T10:30:00Z".parse().unwrap(),
closed_at: Some("2024-01-16T14:00:00Z".parse().unwrap()),
source_branch: "refs/heads/feature/widget".into(),
target_branch: "refs/heads/main".into(),
status: "completed".into(),
reviewers: vec![AdoPrReviewer {
reviewer_id: "bob@contoso.com".into(),
display_name: "Bob".into(),
vote: 10,
is_required: true,
is_container: false,
}],
}
}
#[test]
fn extracts_unique_pr_ids() {
let messages = vec![
"Merged PR 100: do thing",
"Some other commit",
"merged pr 200: another (case-insensitive)",
"Merged PR 100: duplicate",
"Refactored: Merged PR 300: nested phrase",
];
let ids = extract_pr_ids(messages);
assert_eq!(ids, vec![100, 200, 300]);
}
#[test]
fn ignores_non_merge_lines() {
let messages = vec!["fix: typo", "PR #42", "merge branch 'foo'"];
let ids = extract_pr_ids(messages);
assert!(ids.is_empty(), "no merge-PR pattern should match: {ids:?}");
}
#[test]
fn extract_pr_ids_handles_empty_input() {
let ids: Vec<i64> = extract_pr_ids(Vec::<&str>::new());
assert!(ids.is_empty());
}
#[test]
fn upsert_pr_round_trips_basic_fields() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
let row_id = upsert_pr(conn, &pr).expect("first upsert");
assert!(row_id > 0);
let row_id2 = upsert_pr(conn, &pr).expect("second upsert");
assert!(row_id2 > 0);
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pull_requests WHERE provider = 'azdo' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("count");
assert_eq!(
n, 1,
"should have exactly one row per (provider, pr_number)"
);
}
#[test]
fn upsert_pr_reviewer_round_trips() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
let pr_db_id = upsert_pr(conn, &pr).expect("pr upsert");
for r in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, r).expect("reviewer upsert");
}
for r in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, r).expect("reviewer upsert (2)");
}
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pr_reviewers WHERE pr_id = ?1",
params![pr_db_id],
|row| row.get(0),
)
.expect("count");
assert_eq!(n, pr.reviewers.len() as i64);
let (vote, required): (i32, i32) = conn
.query_row(
"SELECT vote, is_required FROM pr_reviewers WHERE pr_id = ?1 AND reviewer_id = ?2",
params![pr_db_id, "bob@contoso.com"],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.expect("query reviewer");
assert_eq!(vote, 10);
assert_eq!(required, 1);
}
#[test]
fn get_existing_pr_numbers_returns_persisted_ids() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
upsert_pr(conn, &pr).expect("upsert");
let ids = get_existing_pr_numbers(conn, "azdo").expect("query");
assert!(ids.contains(&pr.pr_number));
let ids_gh = get_existing_pr_numbers(conn, "github").expect("query gh");
assert!(
!ids_gh.contains(&pr.pr_number),
"provider scoping must hold"
);
}
#[test]
fn pr_raw_deserializes_full_payload() {
let json = r#"{
"pullRequestId": 12345,
"title": "feat: add widget",
"description": "body",
"status": "completed",
"createdBy": {
"uniqueName": "alice@contoso.com",
"displayName": "Alice"
},
"creationDate": "2024-01-15T10:30:00Z",
"closedDate": "2024-01-16T14:00:00Z",
"sourceRefName": "refs/heads/feature/widget",
"targetRefName": "refs/heads/main",
"reviewers": [
{
"uniqueName": "bob@contoso.com",
"displayName": "Bob",
"vote": 10,
"isRequired": true,
"isContainer": false
}
]
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(pr.pr_number, 12345);
assert_eq!(pr.title, "feat: add widget");
assert_eq!(pr.author, "alice@contoso.com");
assert_eq!(pr.status, "completed");
assert_eq!(pr.target_branch, "refs/heads/main");
assert_eq!(pr.reviewers.len(), 1);
assert_eq!(pr.reviewers[0].vote, 10);
assert!(pr.reviewers[0].is_required);
}
#[test]
fn pr_raw_tolerates_missing_optional_fields() {
let json = r#"{
"pullRequestId": 7,
"creationDate": "2024-01-15T10:30:00Z"
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse minimal");
let pr: AdoPullRequest = raw.into();
assert_eq!(pr.pr_number, 7);
assert!(pr.author.is_empty());
assert!(pr.reviewers.is_empty());
assert!(pr.closed_at.is_none());
assert!(pr.description.is_none());
}
#[test]
fn fetch_prs_config_deserializes_with_fetch_prs_true() {
let yaml = r#"
organization_url: "https://dev.azure.com/myorg"
pat: "secret-pat"
project: "MyProject"
fetch_prs: true
"#;
let parsed: AzureDevOpsConfig =
serde_yaml::from_str(yaml).expect("should deserialize cleanly");
assert!(parsed.fetch_prs);
}
#[test]
fn fetch_prs_defaults_to_false() {
let yaml = r#"
organization_url: "https://dev.azure.com/myorg"
pat: "secret-pat"
project: "MyProject"
"#;
let parsed: AzureDevOpsConfig =
serde_yaml::from_str(yaml).expect("should deserialize cleanly");
assert!(!parsed.fetch_prs, "fetch_prs default must be false");
}
#[test]
fn status_maps_to_pr_state_string() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let mut pr = sample_pr();
pr.status = "abandoned".into();
let id = upsert_pr(conn, &pr).expect("upsert");
let state: String = conn
.query_row(
"SELECT state FROM pull_requests WHERE id = ?1",
params![id],
|row| row.get(0),
)
.expect("query");
assert_eq!(state, "closed");
pr.status = "active".into();
upsert_pr(conn, &pr).expect("upsert");
let state: String = conn
.query_row(
"SELECT state FROM pull_requests WHERE provider = 'azdo' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("query");
assert_eq!(state, "open");
}
}