use rusqlite::Connection;
use tracing::{debug, info, warn};
use crate::collect::azdo::errors::AzdoError;
use crate::collect::azdo::pr_fetcher::db::{
extract_pr_ids, get_existing_pr_numbers, upsert_pr, upsert_pr_reviewer,
};
use crate::collect::azdo::pr_fetcher::types::{AdoPullRequest, PrRaw};
use crate::core::config::AzureDevOpsConfig;
use crate::core::errors::Result as CoreResult;
pub(super) fn encode_segment(s: &str) -> String {
fn is_unreserved(b: u8) -> bool {
b.is_ascii_alphanumeric() || matches!(b, b'-' | b'.' | b'_' | b'~')
}
let mut out = String::with_capacity(s.len());
for &b in s.as_bytes() {
if is_unreserved(b) {
out.push(b as char);
} else {
out.push_str(&format!("%{:02X}", b));
}
}
out
}
pub struct AdoPrFetcher {
pub(super) config: AzureDevOpsConfig,
pub(super) client: reqwest::Client,
}
impl AdoPrFetcher {
pub fn new(config: AzureDevOpsConfig) -> std::result::Result<Self, AzdoError> {
if config.projects().is_empty() {
return Err(AzdoError::Config(
"pm.azure_devops.project (or .projects) must not be empty".into(),
));
}
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static(concat!("tga/", env!("CARGO_PKG_VERSION"))),
);
headers.insert(
reqwest::header::ACCEPT,
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(AzdoError::Request)?;
Ok(Self { config, client })
}
fn org_url(&self) -> &str {
self.config.organization_url.trim_end_matches('/')
}
pub async fn fetch_pr(
&self,
pr_id: i64,
) -> std::result::Result<Option<(AdoPullRequest, String)>, AzdoError> {
for project in self.config.projects() {
let url = format!(
"{}/{}/_apis/git/pullrequests/{pr_id}?api-version=7.1",
self.org_url(),
encode_segment(project),
);
debug!(url = %url, pr_id, project = %project, "GET ADO PR");
let resp = self
.client
.get(&url)
.basic_auth("", Some(&self.config.pat))
.send()
.await
.map_err(AzdoError::Request)?;
match resp.status().as_u16() {
200 => {
let raw: PrRaw = resp
.json()
.await
.map_err(|e| AzdoError::Parse(e.to_string()))?;
let pr: AdoPullRequest = raw.into();
return Ok(Some((pr, project.to_string())));
}
404 => {
debug!(pr_id, project = %project, "404 in project; trying next");
continue;
}
401 => return Err(AzdoError::Unauthorized),
403 => return Err(AzdoError::Forbidden),
s => {
let message = resp.text().await.unwrap_or_default();
return Err(AzdoError::Http { status: s, message });
}
}
}
Ok(None)
}
pub async fn fetch_prs(&self, ids: &[i64]) -> Vec<(AdoPullRequest, String)> {
let mut out = Vec::with_capacity(ids.len());
for &id in ids {
match self.fetch_pr(id).await {
Ok(Some(pair)) => out.push(pair),
Ok(None) => {
debug!(pr_id = id, "ADO PR not found (404), skipping");
}
Err(e) => {
warn!(pr_id = id, error = %e, "ADO PR fetch failed");
}
}
}
out
}
pub async fn run<I, S>(&self, conn: &Connection, commit_messages: I) -> CoreResult<usize>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
self.run_with_options(conn, commit_messages, false).await
}
pub async fn run_with_options<I, S>(
&self,
conn: &Connection,
commit_messages: I,
force_refresh: bool,
) -> CoreResult<usize>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let ids = extract_pr_ids(commit_messages);
if ids.is_empty() {
info!("No 'Merged PR N:' references found; skipping ADO PR fetch");
return Ok(0);
}
let projects = self.config.projects();
let to_fetch: Vec<i64> = if force_refresh {
info!(
count = ids.len(),
"force-refresh-prs: bypassing PR-ID dedup cache"
);
ids
} else if projects.len() == 1 {
let existing = get_existing_pr_numbers(conn, "azdo", projects[0])?;
ids.into_iter()
.filter(|id| !existing.contains(id))
.collect()
} else {
debug!(
projects_len = projects.len(),
"Multi-project ADO config: skipping cross-project PR cache to avoid masking collisions"
);
ids
};
if to_fetch.is_empty() {
info!("All referenced ADO PRs already cached; skipping fetch");
return Ok(0);
}
info!(count = to_fetch.len(), "Fetching ADO PRs");
let prs = self.fetch_prs(&to_fetch).await;
let mut stored = 0usize;
for (pr, project) in &prs {
let pr_db_id = upsert_pr(conn, pr, project)?;
for reviewer in &pr.reviewers {
upsert_pr_reviewer(conn, pr_db_id, reviewer)?;
}
stored += 1;
}
info!(stored, "Persisted ADO PRs");
Ok(stored)
}
}