use std::collections::HashSet;
use std::sync::OnceLock;
use chrono::{DateTime, Utc};
use regex::Regex;
use rusqlite::{params, Connection};
use serde::Deserialize;
use tracing::{debug, info, warn};
use crate::collect::azdo::client::AzdoError;
use crate::core::config::AzureDevOpsConfig;
use crate::core::errors::{Result as CoreResult, TgaError};
fn merged_pr_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"(?i)Merged PR (\d+):").expect("MERGED_PR_RE is a static valid pattern")
})
}
#[derive(Debug, Clone)]
pub struct AdoPullRequest {
pub pr_number: i64,
pub title: String,
pub description: Option<String>,
pub author: String,
pub created_at: DateTime<Utc>,
pub closed_at: Option<DateTime<Utc>>,
pub source_branch: String,
pub target_branch: String,
pub status: String,
pub reviewers: Vec<AdoPrReviewer>,
pub merge_commit_sha: Option<String>,
}
#[derive(Debug, Clone)]
pub struct AdoPrReviewer {
pub reviewer_id: String,
pub display_name: String,
pub vote: i32,
pub is_required: bool,
pub is_container: bool,
}
pub fn extract_pr_ids<I, S>(messages: I) -> Vec<i64>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut seen: HashSet<i64> = HashSet::new();
let re = merged_pr_re();
for msg in messages {
for cap in re.captures_iter(msg.as_ref()) {
if let Some(m) = cap.get(1) {
if let Ok(id) = m.as_str().parse::<i64>() {
seen.insert(id);
}
}
}
}
let mut out: Vec<i64> = seen.into_iter().collect();
out.sort_unstable();
out
}
pub fn get_existing_pr_numbers(
conn: &Connection,
provider: &str,
repository: &str,
) -> CoreResult<HashSet<i64>> {
let mut stmt = conn
.prepare("SELECT pr_number FROM pull_requests WHERE provider = ?1 AND repository = ?2")?;
let rows = stmt
.query_map(params![provider, repository], |row| row.get::<_, i64>(0))
.map_err(TgaError::from)?;
let mut out = HashSet::new();
for r in rows {
out.insert(r.map_err(TgaError::from)?);
}
Ok(out)
}
pub fn upsert_pr(conn: &Connection, pr: &AdoPullRequest, repository: &str) -> CoreResult<i64> {
let state = match pr.status.to_ascii_lowercase().as_str() {
"completed" => "merged",
"abandoned" => "closed",
_ => "open",
};
let commit_shas = match &pr.merge_commit_sha {
Some(sha) => serde_json::to_string(&[sha.as_str()])?,
None => "[]".to_string(),
};
conn.execute(
"INSERT OR REPLACE INTO pull_requests \
(provider, repository, pr_number, title, author, state, created_at, merged_at, commit_shas) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
"azdo",
repository,
pr.pr_number,
pr.title,
pr.author,
state,
pr.created_at.to_rfc3339(),
pr.closed_at.map(|t| t.to_rfc3339()),
commit_shas,
],
)?;
let id: i64 = conn
.query_row(
"SELECT id FROM pull_requests WHERE provider = ?1 AND repository = ?2 AND pr_number = ?3",
params!["azdo", repository, pr.pr_number],
|row| row.get(0),
)
.map_err(TgaError::from)?;
Ok(id)
}
pub fn upsert_pr_reviewer(
conn: &Connection,
pr_db_id: i64,
reviewer: &AdoPrReviewer,
) -> CoreResult<()> {
conn.execute(
"INSERT OR REPLACE INTO pr_reviewers \
(pr_id, provider, reviewer_id, display_name, vote, is_required, is_container) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
pr_db_id,
"azdo",
reviewer.reviewer_id,
reviewer.display_name,
reviewer.vote,
reviewer.is_required as i32,
reviewer.is_container as i32,
],
)?;
Ok(())
}
pub struct AdoPrFetcher {
config: AzureDevOpsConfig,
client: reqwest::Client,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PrRaw {
pull_request_id: i64,
#[serde(default)]
title: String,
#[serde(default)]
description: Option<String>,
#[serde(default)]
status: String,
#[serde(default)]
created_by: Option<IdentityRaw>,
creation_date: DateTime<Utc>,
#[serde(default)]
closed_date: Option<DateTime<Utc>>,
#[serde(default)]
source_ref_name: String,
#[serde(default)]
target_ref_name: String,
#[serde(default)]
reviewers: Vec<ReviewerRaw>,
#[serde(default)]
last_merge_commit: Option<CommitRefRaw>,
#[serde(default)]
merge_strategy: Option<String>,
#[serde(default)]
completion_options: Option<CompletionOptionsRaw>,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct CommitRefRaw {
#[serde(default)]
commit_id: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct CompletionOptionsRaw {
#[serde(default)]
merge_strategy: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct IdentityRaw {
#[serde(default)]
unique_name: Option<String>,
#[serde(default)]
display_name: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ReviewerRaw {
#[serde(default)]
unique_name: Option<String>,
#[serde(default)]
display_name: Option<String>,
#[serde(default)]
vote: i32,
#[serde(default)]
is_required: bool,
#[serde(default)]
is_container: bool,
}
impl From<PrRaw> for AdoPullRequest {
fn from(raw: PrRaw) -> Self {
let author = raw
.created_by
.as_ref()
.and_then(|i| i.unique_name.clone().or_else(|| i.display_name.clone()))
.unwrap_or_default();
let reviewers = raw
.reviewers
.into_iter()
.map(|r| {
let display = r.display_name.unwrap_or_default();
let id = r.unique_name.unwrap_or_else(|| display.clone());
AdoPrReviewer {
reviewer_id: id,
display_name: display,
vote: r.vote,
is_required: r.is_required,
is_container: r.is_container,
}
})
.collect();
let strategy_allows_merge_sha = {
let strategy = raw.merge_strategy.as_deref().or_else(|| {
raw.completion_options
.as_ref()
.and_then(|co| co.merge_strategy.as_deref())
});
match strategy {
None => true,
Some(s) => s.eq_ignore_ascii_case("noFastForward"),
}
};
let merge_commit_sha =
if raw.status.eq_ignore_ascii_case("completed") && strategy_allows_merge_sha {
raw.last_merge_commit
.and_then(|c| c.commit_id)
.filter(|s| !s.is_empty())
} else {
None
};
AdoPullRequest {
pr_number: raw.pull_request_id,
title: raw.title,
description: raw.description,
author,
created_at: raw.creation_date,
closed_at: raw.closed_date,
source_branch: raw.source_ref_name,
target_branch: raw.target_ref_name,
status: raw.status,
reviewers,
merge_commit_sha,
}
}
}
impl AdoPrFetcher {
pub fn new(config: AzureDevOpsConfig) -> std::result::Result<Self, AzdoError> {
if config.projects().is_empty() {
return Err(AzdoError::Config(
"pm.azure_devops.project (or .projects) must not be empty".into(),
));
}
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static(concat!("tga/", env!("CARGO_PKG_VERSION"))),
);
headers.insert(
reqwest::header::ACCEPT,
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(AzdoError::Request)?;
Ok(Self { config, client })
}
fn org_url(&self) -> &str {
self.config.organization_url.trim_end_matches('/')
}
pub async fn fetch_pr(
&self,
pr_id: i64,
) -> std::result::Result<Option<(AdoPullRequest, String)>, AzdoError> {
for project in self.config.projects() {
let url = format!(
"{}/{}/_apis/git/pullrequests/{pr_id}?api-version=7.1",
self.org_url(),
encode_segment(project),
);
debug!(url = %url, pr_id, project = %project, "GET ADO PR");
let resp = self
.client
.get(&url)
.basic_auth("", Some(&self.config.pat))
.send()
.await
.map_err(AzdoError::Request)?;
match resp.status().as_u16() {
200 => {
let raw: PrRaw = resp
.json()
.await
.map_err(|e| AzdoError::Parse(e.to_string()))?;
let pr: AdoPullRequest = raw.into();
return Ok(Some((pr, project.to_string())));
}
404 => {
debug!(pr_id, project = %project, "404 in project; trying next");
continue;
}
401 => return Err(AzdoError::Unauthorized),
403 => return Err(AzdoError::Forbidden),
s => {
let message = resp.text().await.unwrap_or_default();
return Err(AzdoError::Http { status: s, message });
}
}
}
Ok(None)
}
pub async fn fetch_prs(&self, ids: &[i64]) -> Vec<(AdoPullRequest, String)> {
let mut out = Vec::with_capacity(ids.len());
for &id in ids {
match self.fetch_pr(id).await {
Ok(Some(pair)) => out.push(pair),
Ok(None) => {
debug!(pr_id = id, "ADO PR not found (404), skipping");
}
Err(e) => {
warn!(pr_id = id, error = %e, "ADO PR fetch failed");
}
}
}
out
}
pub async fn run<I, S>(&self, conn: &Connection, commit_messages: I) -> CoreResult<usize>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
self.run_with_options(conn, commit_messages, false).await
}
pub async fn run_with_options<I, S>(
&self,
conn: &Connection,
commit_messages: I,
force_refresh: bool,
) -> CoreResult<usize>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let ids = extract_pr_ids(commit_messages);
if ids.is_empty() {
info!("No 'Merged PR N:' references found; skipping ADO PR fetch");
return Ok(0);
}
let projects = self.config.projects();
let to_fetch: Vec<i64> = if force_refresh {
info!(
count = ids.len(),
"force-refresh-prs: bypassing PR-ID dedup cache"
);
ids
} else if projects.len() == 1 {
let existing = get_existing_pr_numbers(conn, "azdo", projects[0])?;
ids.into_iter()
.filter(|id| !existing.contains(id))
.collect()
} else {
debug!(
projects_len = projects.len(),
"Multi-project ADO config: skipping cross-project PR cache to avoid masking collisions"
);
ids
};
if to_fetch.is_empty() {
info!("All referenced ADO PRs already cached; skipping fetch");
return Ok(0);
}
info!(count = to_fetch.len(), "Fetching ADO PRs");
let prs = self.fetch_prs(&to_fetch).await;
let mut stored = 0usize;
for (pr, project) in &prs {
let pr_db_id = upsert_pr(conn, pr, project)?;
for reviewer in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, reviewer)?;
}
stored += 1;
}
info!(stored, "Persisted ADO PRs");
Ok(stored)
}
}
fn encode_segment(s: &str) -> String {
fn is_unreserved(b: u8) -> bool {
b.is_ascii_alphanumeric() || matches!(b, b'-' | b'.' | b'_' | b'~')
}
let mut out = String::with_capacity(s.len());
for &b in s.as_bytes() {
if is_unreserved(b) {
out.push(b as char);
} else {
out.push_str(&format!("%{:02X}", b));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::Database;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn multi_project_config(server_url: &str, projects: Vec<&str>) -> AzureDevOpsConfig {
AzureDevOpsConfig {
organization_url: server_url.to_string(),
pat: "secret-pat".into(),
project: None,
projects: projects.iter().map(|s| s.to_string()).collect(),
ticket_regex: r"AB#(\d+)".into(),
team_keys: vec![],
fetch_on_reference: true,
fetch_prs: true,
}
}
fn pr_body_json(pr_id: i64) -> serde_json::Value {
serde_json::json!({
"pullRequestId": pr_id,
"title": "feat: multi-project",
"status": "completed",
"createdBy": {
"uniqueName": "alice@contoso.com",
"displayName": "Alice"
},
"creationDate": "2024-01-15T10:30:00Z",
"closedDate": "2024-01-16T14:00:00Z",
"sourceRefName": "refs/heads/feature/x",
"targetRefName": "refs/heads/main",
"reviewers": []
})
}
fn sample_pr() -> AdoPullRequest {
AdoPullRequest {
pr_number: 12345,
title: "feat: add widget".into(),
description: Some("body".into()),
author: "alice@contoso.com".into(),
created_at: "2024-01-15T10:30:00Z".parse().unwrap(),
closed_at: Some("2024-01-16T14:00:00Z".parse().unwrap()),
source_branch: "refs/heads/feature/widget".into(),
target_branch: "refs/heads/main".into(),
status: "completed".into(),
reviewers: vec![AdoPrReviewer {
reviewer_id: "bob@contoso.com".into(),
display_name: "Bob".into(),
vote: 10,
is_required: true,
is_container: false,
}],
merge_commit_sha: Some("deadbeefcafef00d1234567890abcdef12345678".into()),
}
}
#[test]
fn ado_pr_fetcher_new_rejects_empty_projects() {
let cfg = AzureDevOpsConfig {
organization_url: "http://localhost".to_string(),
pat: "secret-pat".into(),
project: None,
projects: vec![],
ticket_regex: r"AB#(\d+)".into(),
team_keys: vec![],
fetch_on_reference: true,
fetch_prs: true,
};
match AdoPrFetcher::new(cfg) {
Ok(_) => panic!("empty projects must be rejected"),
Err(AzdoError::Config(msg)) => assert!(
msg.contains("project"),
"expected message to mention project, got: {msg}"
),
Err(other) => panic!("expected AzdoError::Config, got: {other:?}"),
}
}
#[test]
fn extracts_unique_pr_ids() {
let messages = vec![
"Merged PR 100: do thing",
"Some other commit",
"merged pr 200: another (case-insensitive)",
"Merged PR 100: duplicate",
"Refactored: Merged PR 300: nested phrase",
];
let ids = extract_pr_ids(messages);
assert_eq!(ids, vec![100, 200, 300]);
}
#[test]
fn ignores_non_merge_lines() {
let messages = vec!["fix: typo", "PR #42", "merge branch 'foo'"];
let ids = extract_pr_ids(messages);
assert!(ids.is_empty(), "no merge-PR pattern should match: {ids:?}");
}
#[test]
fn extract_pr_ids_handles_empty_input() {
let ids: Vec<i64> = extract_pr_ids(Vec::<&str>::new());
assert!(ids.is_empty());
}
#[test]
fn upsert_pr_round_trips_basic_fields() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
let row_id = upsert_pr(conn, &pr, "MyProject").expect("first upsert");
assert!(row_id > 0);
let row_id2 = upsert_pr(conn, &pr, "MyProject").expect("second upsert");
assert!(row_id2 > 0);
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pull_requests \
WHERE provider = 'azdo' AND repository = 'MyProject' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("count");
assert_eq!(
n, 1,
"should have exactly one row per (provider, repository, pr_number)"
);
}
#[test]
fn upsert_pr_reviewer_round_trips() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
let pr_db_id = upsert_pr(conn, &pr, "MyProject").expect("pr upsert");
for r in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, r).expect("reviewer upsert");
}
for r in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, r).expect("reviewer upsert (2)");
}
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pr_reviewers WHERE pr_id = ?1",
params![pr_db_id],
|row| row.get(0),
)
.expect("count");
assert_eq!(n, pr.reviewers.len() as i64);
let (vote, required): (i32, i32) = conn
.query_row(
"SELECT vote, is_required FROM pr_reviewers WHERE pr_id = ?1 AND reviewer_id = ?2",
params![pr_db_id, "bob@contoso.com"],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.expect("query reviewer");
assert_eq!(vote, 10);
assert_eq!(required, 1);
}
#[test]
fn get_existing_pr_numbers_returns_persisted_ids() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
upsert_pr(conn, &pr, "MyProject").expect("upsert");
let ids = get_existing_pr_numbers(conn, "azdo", "MyProject").expect("query");
assert!(ids.contains(&pr.pr_number));
let ids_gh = get_existing_pr_numbers(conn, "github", "MyProject").expect("query gh");
assert!(
!ids_gh.contains(&pr.pr_number),
"provider scoping must hold"
);
let ids_other = get_existing_pr_numbers(conn, "azdo", "OtherProject").expect("query other");
assert!(
!ids_other.contains(&pr.pr_number),
"repository scoping must hold for #88"
);
}
#[test]
fn upsert_pr_allows_same_pr_number_in_different_repositories() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
let id_a = upsert_pr(conn, &pr, "ProjectA").expect("upsert A");
let id_b = upsert_pr(conn, &pr, "ProjectB").expect("upsert B");
assert_ne!(id_a, id_b, "different repos must produce different rows");
let total: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pull_requests WHERE provider = 'azdo' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("count");
assert_eq!(
total, 2,
"same pr_number across two repos must yield two rows"
);
}
#[test]
fn upsert_pr_writes_commit_shas_when_merge_sha_present() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr();
upsert_pr(conn, &pr, "MyProject").expect("upsert");
let stored: String = conn
.query_row(
"SELECT commit_shas FROM pull_requests \
WHERE provider = 'azdo' AND repository = 'MyProject' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("query");
assert_eq!(
stored,
r#"["deadbeefcafef00d1234567890abcdef12345678"]"#, "merge commit SHA must be persisted as a JSON array"
);
}
#[test]
fn upsert_pr_writes_empty_commit_shas_when_no_merge_sha() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let mut pr = sample_pr();
pr.merge_commit_sha = None;
upsert_pr(conn, &pr, "MyProject").expect("upsert");
let stored: String = conn
.query_row(
"SELECT commit_shas FROM pull_requests \
WHERE provider = 'azdo' AND repository = 'MyProject' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("query");
assert_eq!(stored, "[]");
}
#[test]
fn pr_raw_deserializes_full_payload() {
let json = r#"{
"pullRequestId": 12345,
"title": "feat: add widget",
"description": "body",
"status": "completed",
"createdBy": {
"uniqueName": "alice@contoso.com",
"displayName": "Alice"
},
"creationDate": "2024-01-15T10:30:00Z",
"closedDate": "2024-01-16T14:00:00Z",
"sourceRefName": "refs/heads/feature/widget",
"targetRefName": "refs/heads/main",
"reviewers": [
{
"uniqueName": "bob@contoso.com",
"displayName": "Bob",
"vote": 10,
"isRequired": true,
"isContainer": false
}
],
"lastMergeCommit": {
"commitId": "deadbeefcafef00d1234567890abcdef12345678",
"url": "https://dev.azure.com/.../commits/deadbeef..."
}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(pr.pr_number, 12345);
assert_eq!(pr.title, "feat: add widget");
assert_eq!(pr.author, "alice@contoso.com");
assert_eq!(pr.status, "completed");
assert_eq!(pr.target_branch, "refs/heads/main");
assert_eq!(pr.reviewers.len(), 1);
assert_eq!(pr.reviewers[0].vote, 10);
assert!(pr.reviewers[0].is_required);
assert_eq!(
pr.merge_commit_sha.as_deref(),
Some("deadbeefcafef00d1234567890abcdef12345678"),
"lastMergeCommit.commitId should be threaded through"
);
}
#[test]
fn pr_raw_treats_empty_last_merge_commit_as_none() {
let json = r#"{
"pullRequestId": 7,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"lastMergeCommit": {}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(pr.merge_commit_sha.is_none());
let json = r#"{
"pullRequestId": 8,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"lastMergeCommit": {"commitId": ""}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(pr.merge_commit_sha.is_none());
}
#[test]
fn pr_raw_drops_merge_sha_for_non_completed_status() {
for status in ["active", "abandoned", "notSet", "", "ACTIVE"] {
let json = format!(
r#"{{
"pullRequestId": 42,
"creationDate": "2024-01-15T10:30:00Z",
"status": "{status}",
"mergeStrategy": "noFastForward",
"lastMergeCommit": {{"commitId": "feedfacecafef00d1234567890abcdef12345678"}}
}}"#
);
let raw: PrRaw = serde_json::from_str(&json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(
pr.merge_commit_sha.is_none(),
"non-completed status {status:?} must not expose a merge SHA"
);
}
for status in ["completed", "Completed", "COMPLETED"] {
let json = format!(
r#"{{
"pullRequestId": 43,
"creationDate": "2024-01-15T10:30:00Z",
"status": "{status}",
"mergeStrategy": "noFastForward",
"lastMergeCommit": {{"commitId": "feedfacecafef00d1234567890abcdef12345678"}}
}}"#
);
let raw: PrRaw = serde_json::from_str(&json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(
pr.merge_commit_sha.as_deref(),
Some("feedfacecafef00d1234567890abcdef12345678"),
"completed status {status:?} should pass the gate (case-insensitive)",
);
}
}
#[test]
fn pr_raw_emits_merge_sha_for_no_fast_forward_strategy() {
for strategy in ["noFastForward", "NOFASTFORWARD", "nofastforward"] {
let json = format!(
r#"{{
"pullRequestId": 100,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"mergeStrategy": "{strategy}",
"lastMergeCommit": {{"commitId": "feedfacecafef00d1234567890abcdef12345678"}}
}}"#
);
let raw: PrRaw = serde_json::from_str(&json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(
pr.merge_commit_sha.as_deref(),
Some("feedfacecafef00d1234567890abcdef12345678"),
"noFastForward variant {strategy:?} must pass the gate",
);
}
}
#[test]
fn pr_raw_emits_merge_sha_when_strategy_absent() {
let json = r#"{
"pullRequestId": 101,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"lastMergeCommit": {"commitId": "feedfacecafef00d1234567890abcdef12345678"}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(
pr.merge_commit_sha.as_deref(),
Some("feedfacecafef00d1234567890abcdef12345678"),
"absent mergeStrategy must default to allowed (pre-#96 behavior)",
);
}
#[test]
fn pr_raw_drops_merge_sha_for_squash_strategy() {
for strategy in ["squash", "SQUASH", "Squash"] {
let json = format!(
r#"{{
"pullRequestId": 102,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"mergeStrategy": "{strategy}",
"lastMergeCommit": {{"commitId": "feedfacecafef00d1234567890abcdef12345678"}}
}}"#
);
let raw: PrRaw = serde_json::from_str(&json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(
pr.merge_commit_sha.is_none(),
"squash variant {strategy:?} must drop the merge SHA",
);
}
}
#[test]
fn pr_raw_drops_merge_sha_for_rebase_strategy() {
let json = r#"{
"pullRequestId": 103,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"mergeStrategy": "rebase",
"lastMergeCommit": {"commitId": "feedfacecafef00d1234567890abcdef12345678"}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(pr.merge_commit_sha.is_none());
}
#[test]
fn pr_raw_drops_merge_sha_for_rebase_merge_strategy() {
let json = r#"{
"pullRequestId": 104,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"mergeStrategy": "rebaseMerge",
"lastMergeCommit": {"commitId": "feedfacecafef00d1234567890abcdef12345678"}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(pr.merge_commit_sha.is_none());
}
#[test]
fn pr_raw_reads_merge_strategy_from_completion_options_fallback() {
let json = r#"{
"pullRequestId": 105,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"completionOptions": {"mergeStrategy": "squash"},
"lastMergeCommit": {"commitId": "feedfacecafef00d1234567890abcdef12345678"}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert!(
pr.merge_commit_sha.is_none(),
"squash via completionOptions fallback must drop the merge SHA",
);
}
#[test]
fn pr_raw_prefers_top_level_merge_strategy_over_completion_options() {
let json = r#"{
"pullRequestId": 106,
"creationDate": "2024-01-15T10:30:00Z",
"status": "completed",
"mergeStrategy": "noFastForward",
"completionOptions": {"mergeStrategy": "squash"},
"lastMergeCommit": {"commitId": "feedfacecafef00d1234567890abcdef12345678"}
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse");
let pr: AdoPullRequest = raw.into();
assert_eq!(
pr.merge_commit_sha.as_deref(),
Some("feedfacecafef00d1234567890abcdef12345678"),
"top-level mergeStrategy must take precedence over completionOptions",
);
}
#[test]
fn pr_raw_tolerates_missing_optional_fields() {
let json = r#"{
"pullRequestId": 7,
"creationDate": "2024-01-15T10:30:00Z"
}"#;
let raw: PrRaw = serde_json::from_str(json).expect("parse minimal");
let pr: AdoPullRequest = raw.into();
assert_eq!(pr.pr_number, 7);
assert!(pr.author.is_empty());
assert!(pr.reviewers.is_empty());
assert!(pr.closed_at.is_none());
assert!(pr.description.is_none());
}
#[test]
fn fetch_prs_config_deserializes_with_fetch_prs_true() {
let yaml = r#"
organization_url: "https://dev.azure.com/myorg"
pat: "secret-pat"
project: "MyProject"
fetch_prs: true
"#;
let parsed: AzureDevOpsConfig =
serde_yaml::from_str(yaml).expect("should deserialize cleanly");
assert!(parsed.fetch_prs);
}
#[test]
fn fetch_prs_defaults_to_false() {
let yaml = r#"
organization_url: "https://dev.azure.com/myorg"
pat: "secret-pat"
project: "MyProject"
"#;
let parsed: AzureDevOpsConfig =
serde_yaml::from_str(yaml).expect("should deserialize cleanly");
assert!(!parsed.fetch_prs, "fetch_prs default must be false");
}
fn select_to_fetch(
conn: &Connection,
project: &str,
ids: Vec<i64>,
force_refresh: bool,
) -> Vec<i64> {
if force_refresh {
ids
} else {
let existing = get_existing_pr_numbers(conn, "azdo", project).expect("query existing");
ids.into_iter()
.filter(|id| !existing.contains(id))
.collect()
}
}
#[test]
fn force_refresh_false_skips_existing_pr_ids() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr(); upsert_pr(conn, &pr, "MyProject").expect("upsert");
let to_fetch = select_to_fetch(conn, "MyProject", vec![12345, 999], false);
assert_eq!(
to_fetch,
vec![999],
"cached PR 12345 must be skipped when force_refresh is false"
);
}
#[test]
fn force_refresh_true_re_fetches_existing_pr_ids() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let pr = sample_pr(); upsert_pr(conn, &pr, "MyProject").expect("upsert");
let to_fetch = select_to_fetch(conn, "MyProject", vec![12345, 999], true);
assert_eq!(
to_fetch,
vec![12345, 999],
"force_refresh must NOT skip already-cached PR IDs"
);
}
#[test]
fn status_maps_to_pr_state_string() {
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let mut pr = sample_pr();
pr.status = "abandoned".into();
let id = upsert_pr(conn, &pr, "MyProject").expect("upsert");
let state: String = conn
.query_row(
"SELECT state FROM pull_requests WHERE id = ?1",
params![id],
|row| row.get(0),
)
.expect("query");
assert_eq!(state, "closed");
pr.status = "active".into();
upsert_pr(conn, &pr, "MyProject").expect("upsert");
let state: String = conn
.query_row(
"SELECT state FROM pull_requests \
WHERE provider = 'azdo' AND repository = 'MyProject' AND pr_number = ?1",
params![pr.pr_number],
|row| row.get(0),
)
.expect("query");
assert_eq!(state, "open");
}
#[tokio::test]
async fn fetch_pr_single_project_hit() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/ProjectA/_apis/git/pullrequests/100"))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(100)))
.expect(1)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let result = fetcher.fetch_pr(100).await.expect("fetch ok");
let (pr, project) = result.expect("PR present");
assert_eq!(pr.pr_number, 100);
assert_eq!(project, "ProjectA");
drop(server);
}
#[tokio::test]
async fn fetch_pr_falls_through_404_to_next_project() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/ProjectA/_apis/git/pullrequests/200"))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/ProjectB/_apis/git/pullrequests/200"))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(200)))
.expect(1)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let result = fetcher.fetch_pr(200).await.expect("fetch ok");
let (pr, project) = result.expect("PR present");
assert_eq!(pr.pr_number, 200);
assert_eq!(
project, "ProjectB",
"must report the project where PR was found"
);
drop(server);
}
#[tokio::test]
async fn fetch_pr_all_projects_404_returns_none() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/A/_apis/git/pullrequests/300"))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/B/_apis/git/pullrequests/300"))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["A", "B"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let result = fetcher.fetch_pr(300).await.expect("fetch ok");
assert!(result.is_none(), "all 404s must produce Ok(None)");
drop(server);
}
#[tokio::test]
async fn fetch_pr_first_hit_wins_no_query_to_subsequent_projects() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/ProjectA/_apis/git/pullrequests/400"))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(400)))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/ProjectB/_apis/git/pullrequests/400"))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(400)))
.expect(0) .mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let (pr, project) = fetcher
.fetch_pr(400)
.await
.expect("fetch ok")
.expect("PR present");
assert_eq!(pr.pr_number, 400);
assert_eq!(project, "ProjectA");
drop(server);
}
#[tokio::test]
async fn run_persists_pr_under_project_where_found() {
let server = MockServer::start().await;
let pr_id: i64 = 500;
Mock::given(method("GET"))
.and(path(format!("/ProjectA/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/ProjectB/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(pr_id)))
.expect(1)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let commit_messages = vec![format!("Merged PR {pr_id}: feat: multi-project test")];
let stored = fetcher.run(conn, commit_messages).await.expect("run ok");
assert_eq!(stored, 1, "exactly one PR should be persisted");
let repo: String = conn
.query_row(
"SELECT repository FROM pull_requests \
WHERE provider = 'azdo' AND pr_number = ?1",
params![pr_id],
|row| row.get(0),
)
.expect("query persisted repository");
assert_eq!(
repo, "ProjectB",
"persisted repository must be the project where the PR was found"
);
drop(server);
}
#[tokio::test]
async fn run_multi_project_does_not_skip_cached_pr_under_other_project() {
let server = MockServer::start().await;
let pr_id: i64 = 123;
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let mut existing = sample_pr();
existing.pr_number = pr_id;
existing.title = "stale: ProjectA copy".into();
upsert_pr(conn, &existing, "ProjectA").expect("seed ProjectA row");
Mock::given(method("GET"))
.and(path(format!("/ProjectA/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(404))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/ProjectB/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(pr_id)))
.expect(1)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let commit_messages = vec![format!("Merged PR {pr_id}: feat: collision case")];
let stored = fetcher.run(conn, commit_messages).await.expect("run ok");
assert_eq!(stored, 1, "ProjectB hit must produce one persisted PR");
let total: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pull_requests \
WHERE provider = 'azdo' AND pr_number = ?1",
params![pr_id],
|row| row.get(0),
)
.expect("count rows");
assert_eq!(
total, 2,
"expected two rows for pr_number=123 (one per project)"
);
let repos: Vec<String> = {
let mut stmt = conn
.prepare(
"SELECT repository FROM pull_requests \
WHERE provider = 'azdo' AND pr_number = ?1 ORDER BY repository",
)
.expect("prepare");
let rows = stmt
.query_map(params![pr_id], |row| row.get::<_, String>(0))
.expect("query")
.map(|r| r.expect("row"))
.collect();
rows
};
assert_eq!(repos, vec!["ProjectA".to_string(), "ProjectB".to_string()]);
drop(server);
}
#[tokio::test]
async fn run_single_project_still_uses_cache_short_circuit() {
let server = MockServer::start().await;
let pr_id: i64 = 123;
let db = Database::open_in_memory().expect("db");
let conn = db.connection();
let mut existing = sample_pr();
existing.pr_number = pr_id;
upsert_pr(conn, &existing, "ProjectA").expect("seed ProjectA row");
Mock::given(method("GET"))
.and(path(format!("/ProjectA/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(pr_id)))
.expect(0)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let commit_messages = vec![format!("Merged PR {pr_id}: cached case")];
let stored = fetcher.run(conn, commit_messages).await.expect("run ok");
assert_eq!(stored, 0, "cached PR must not be re-fetched");
drop(server);
}
#[tokio::test]
async fn fetch_pr_aborts_iteration_on_401_does_not_query_next_project() {
let server = MockServer::start().await;
let pr_id: i64 = 901;
Mock::given(method("GET"))
.and(path(format!("/ProjectA/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(401))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/ProjectB/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(pr_id)))
.expect(0)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let err = fetcher
.fetch_pr(pr_id)
.await
.expect_err("401 must surface as Err");
assert!(
matches!(err, AzdoError::Unauthorized),
"expected AzdoError::Unauthorized, got: {err:?}"
);
drop(server);
}
#[tokio::test]
async fn fetch_pr_aborts_iteration_on_500_does_not_query_next_project() {
let server = MockServer::start().await;
let pr_id: i64 = 902;
Mock::given(method("GET"))
.and(path(format!("/ProjectA/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/ProjectB/_apis/git/pullrequests/{pr_id}")))
.respond_with(ResponseTemplate::new(200).set_body_json(pr_body_json(pr_id)))
.expect(0)
.mount(&server)
.await;
let cfg = multi_project_config(&server.uri(), vec!["ProjectA", "ProjectB"]);
let fetcher = AdoPrFetcher::new(cfg).expect("fetcher");
let err = fetcher
.fetch_pr(pr_id)
.await
.expect_err("500 must surface as Err");
assert!(
matches!(err, AzdoError::Http { status: 500, .. }),
"expected AzdoError::Http {{ status: 500, .. }}, got: {err:?}"
);
drop(server);
}
}