pub mod git;
pub mod github;
pub mod logging;
pub mod models;
pub mod pagination;
pub mod tools;
use agentic_config::types::GitHubServiceConfig;
use anyhow::Context;
use anyhow::Result;
use models::CommentSourceType;
use models::PrSummary;
use models::PrSummaryList;
use models::ReviewComment;
use models::ReviewCommentList;
use models::Thread;
use pagination::PaginationCache;
use pagination::QueryLock;
use pagination::make_key;
use pagination::make_pr_list_key;
use pagination::paginate_slice;
use std::sync::Arc;
use std::time::Duration;
pub use tools::build_registry;
pub const AI_PREFIX: &str = "\u{1F916} AI response: ";
fn guarded_post_fetch_reset<T>(query_lock: &Arc<QueryLock<T>>, entries: Vec<T>, page_size: usize) {
let mut state = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if state.is_empty() || state.is_expired() {
state.reset(entries, (), page_size);
}
}
pub fn with_ai_prefix(body: &str) -> String {
if body.trim_start().starts_with(AI_PREFIX) {
body.to_string()
} else {
format!("{AI_PREFIX}{body}")
}
}
#[derive(Clone)]
pub struct PrComments {
owner: String,
repo: String,
token: Option<String>,
github_config: GitHubServiceConfig,
pager: Arc<PaginationCache<Thread>>,
pr_list_pager: Arc<PaginationCache<PrSummary>>,
init_error: Option<String>,
}
impl PrComments {
fn get_token() -> Option<String> {
if let Ok(t) = std::env::var("GH_TOKEN").or_else(|_| std::env::var("GITHUB_TOKEN")) {
tracing::debug!("Using GitHub token from environment");
return Some(t);
}
let hosts = match gh_config::Hosts::load() {
Ok(hosts) => hosts,
Err(e) => {
tracing::debug!("gh-config unavailable: {e}");
return None;
}
};
match hosts.retrieve_token(gh_config::GITHUB_COM) {
Ok(Some(t)) => {
tracing::debug!("Using GitHub token from gh-config");
Some(t)
}
Ok(None) => {
tracing::debug!("No token found in gh-config");
None
}
Err(e) => {
tracing::debug!("gh-config token retrieval failed: {e}");
None
}
}
}
fn page_size_from_env() -> usize {
std::env::var("PR_COMMENTS_PAGE_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.filter(|n| (1..=1000).contains(n))
.unwrap_or(10)
}
pub fn github_config(&self) -> &GitHubServiceConfig {
&self.github_config
}
pub fn new() -> Result<Self> {
Self::with_config(GitHubServiceConfig::default())
}
pub fn with_config(github_config: GitHubServiceConfig) -> Result<Self> {
let git_info = git::get_git_info().context("Failed to get git information")?;
let token = Self::get_token();
Ok(Self {
owner: git_info.owner,
repo: git_info.repo,
token,
github_config,
pager: Arc::new(PaginationCache::new()),
pr_list_pager: Arc::new(PaginationCache::new()),
init_error: None,
})
}
pub fn with_repo(owner: String, repo: String) -> Self {
Self::with_repo_and_config(owner, repo, GitHubServiceConfig::default())
}
pub fn with_repo_and_config(
owner: String,
repo: String,
github_config: GitHubServiceConfig,
) -> Self {
let token = Self::get_token();
Self {
owner,
repo,
token,
github_config,
pager: Arc::new(PaginationCache::new()),
pr_list_pager: Arc::new(PaginationCache::new()),
init_error: None,
}
}
pub fn disabled(init_error: String) -> Self {
Self::disabled_with_config(init_error, GitHubServiceConfig::default())
}
pub fn disabled_with_config(init_error: String, github_config: GitHubServiceConfig) -> Self {
let token = Self::get_token();
Self {
owner: String::new(),
repo: String::new(),
token,
github_config,
pager: Arc::new(PaginationCache::new()),
pr_list_pager: Arc::new(PaginationCache::new()),
init_error: Some(init_error),
}
}
async fn with_github_total_timeout<T, F>(&self, label: &str, fut: F) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
let timeout_secs = self.github_config.total_timeout_secs;
if timeout_secs == 0 {
return fut.await;
}
match tokio::time::timeout(Duration::from_secs(timeout_secs), fut).await {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!(
"GitHub operation timed out after {timeout_secs}s while {label}"
)),
}
}
fn ensure_repo_configured(&self) -> Result<()> {
if !self.owner.is_empty() && !self.repo.is_empty() {
return Ok(());
}
let mut msg = "invalid argument: pr_comments repository context is not available.\n\
This tool relies on ambient git repo detection. Run it inside a git checkout with a GitHub remote."
.to_string();
if let Some(e) = &self.init_error {
msg.push_str("\n\nAmbient detection error: ");
msg.push_str(e);
}
anyhow::bail!(msg);
}
async fn get_pr_number(&self, pr_number: Option<u64>) -> Result<u64> {
if let Some(pr) = pr_number {
return Ok(pr);
}
let git_info = git::get_git_info()?;
let branch = git_info
.current_branch
.context("Could not determine current git branch")?;
let client =
github::GitHubClient::new(self.owner.clone(), self.repo.clone(), self.token.clone())?;
match self
.with_github_total_timeout(
&format!("looking up pull request for branch '{branch}'"),
async { client.get_pr_from_branch(&branch).await },
)
.await
{
Ok(Some(pr)) => Ok(pr),
Ok(None) => Err(anyhow::anyhow!(
"No open PR found for branch '{branch}' in {owner}/{repo}. \n\
Make sure you have an open PR for this branch.",
owner = self.owner.as_str(),
repo = self.repo.as_str()
)),
Err(e) => {
let msg = e.to_string();
if msg.contains("401") || msg.contains("403") || msg.contains("Not Found") {
Err(anyhow::anyhow!(
"Failed to access {owner}/{repo}: {msg}\n\n\
Hint: For private repositories, ensure your GITHUB_TOKEN has the 'repo' scope.\n\
Current token: {token_state}",
owner = self.owner.as_str(),
repo = self.repo.as_str(),
token_state = if self.token.is_some() {
"Set"
} else {
"Not set (required for private repos)"
}
))
} else {
Err(e)
}
}
}
}
}
impl PrComments {
pub async fn get_comments(
&self,
pr_number: Option<u64>,
comment_source_type: Option<CommentSourceType>,
include_resolved: Option<bool>,
) -> Result<ReviewCommentList> {
self.ensure_repo_configured()
.context("invalid argument: missing repository context")?;
let pr = self
.get_pr_number(pr_number)
.await
.context("invalid argument: failed to determine PR number")?;
let src = comment_source_type.unwrap_or_default();
let include_resolved = include_resolved.unwrap_or(false);
let page_size = Self::page_size_from_env();
let pr_url = format!(
"https://github.com/{owner}/{repo}/pull/{pr}",
owner = self.owner,
repo = self.repo
);
self.pager.sweep_expired();
let key = make_key(
&self.owner,
&self.repo,
pr,
src,
include_resolved,
page_size,
);
let query_lock = self.pager.get_or_create(&key);
let needs_fetch = {
let state = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state.is_empty() || state.is_expired()
};
if needs_fetch {
let client = github::GitHubClient::new(
self.owner.clone(),
self.repo.clone(),
self.token.clone(),
)
.context("internal: failed to create GitHub client")?;
let (comments, resolution_map) = self
.with_github_total_timeout(
&format!("fetching review comments for PR #{pr}"),
async {
let comments = client.fetch_review_comments(pr).await.map_err(|e| {
let msg = e.to_string();
if msg.contains("401") || msg.contains("403") {
anyhow::anyhow!(
"{msg}\n\nHint: For private repositories, ensure your token has the 'repo' scope."
)
} else {
anyhow::anyhow!("{msg}")
}
})?;
let resolution_map = if include_resolved {
std::collections::HashMap::new()
} else {
client
.get_review_thread_resolution_status(pr)
.await
.unwrap_or_default()
};
Ok((comments, resolution_map))
},
)
.await?;
let threads = github::GitHubClient::build_threads(comments, &resolution_map);
let filtered = github::GitHubClient::filter_threads(threads, src, include_resolved);
guarded_post_fetch_reset(&query_lock, filtered, page_size);
}
let mut state = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (page_threads, has_more) =
paginate_slice(&state.results, state.next_offset, state.page_size);
state.next_offset += page_threads.len();
let total_threads = state.results.len();
let shown_threads = state.next_offset;
let comments: Vec<_> = page_threads
.into_iter()
.flat_map(|t| {
let mut cs = vec![t.parent];
cs.extend(t.replies);
cs
})
.collect();
let message = if has_more {
Some(format!(
"Showing {shown_threads} out of {total_threads} threads. Call gh_get_comments again for more."
))
} else {
None
};
if !has_more {
drop(state);
self.pager.remove_if_same(&key, &query_lock);
}
Ok(ReviewCommentList {
owner: self.owner.clone(),
repo: self.repo.clone(),
pr_number: pr,
pr_url,
comments,
shown_threads,
total_threads,
has_more,
message,
})
}
pub async fn list_prs(&self, state: Option<String>) -> Result<PrSummaryList> {
self.ensure_repo_configured()
.context("invalid argument: missing repository context")?;
let state = state.unwrap_or_else(|| "open".to_string());
let page_size = Self::page_size_from_env();
self.pr_list_pager.sweep_expired();
let key = make_pr_list_key(&self.owner, &self.repo, &state, page_size);
let query_lock = self.pr_list_pager.get_or_create(&key);
let needs_fetch = {
let state = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state.is_empty() || state.is_expired()
};
if needs_fetch {
let client = github::GitHubClient::new(
self.owner.clone(),
self.repo.clone(),
self.token.clone(),
)
.context("internal: failed to create GitHub client")?;
let prs = self
.with_github_total_timeout(&format!("listing pull requests for state={state}"), async {
client.list_prs(Some(state.clone())).await.map_err(|e| {
let msg = e.to_string();
if msg.contains("401") || msg.contains("403") {
anyhow::anyhow!(
"{msg}\n\nHint: For private repositories, ensure your GITHUB_TOKEN has the 'repo' scope."
)
} else {
anyhow::anyhow!("{msg}")
}
})
})
.await?;
guarded_post_fetch_reset(&query_lock, prs, page_size);
}
let mut pager_state = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (prs, has_more) = paginate_slice(
&pager_state.results,
pager_state.next_offset,
pager_state.page_size,
);
pager_state.next_offset += prs.len();
let total_prs = pager_state.results.len();
let shown_prs = pager_state.next_offset;
let message = if has_more {
Some(format!(
"Showing {shown_prs} out of {total_prs} pull requests. Call gh_get_prs again for more."
))
} else {
None
};
if !has_more {
drop(pager_state);
self.pr_list_pager.remove_if_same(&key, &query_lock);
}
Ok(PrSummaryList {
owner: self.owner.clone(),
repo: self.repo.clone(),
state,
prs,
shown_prs,
total_prs,
has_more,
message,
})
}
pub async fn add_comment_reply(
&self,
pr_number: Option<u64>,
comment_id: u64,
body: String,
) -> Result<ReviewComment> {
anyhow::ensure!(
!body.trim().is_empty(),
"invalid argument: Body cannot be empty"
);
self.ensure_repo_configured()
.context("invalid argument: missing repository context")?;
let pr = self
.get_pr_number(pr_number)
.await
.context("invalid argument: failed to determine PR number")?;
let client =
github::GitHubClient::new(self.owner.clone(), self.repo.clone(), self.token.clone())
.context("internal: failed to create GitHub client")?;
let prefixed_body = with_ai_prefix(&body);
let comment = self
.with_github_total_timeout(
&format!("posting reply to review comment {comment_id} on PR #{pr}"),
async {
client
.reply_to_comment(pr, comment_id, &prefixed_body)
.await
.map_err(|e| {
let msg = e.to_string();
if msg.contains("401") || msg.contains("403") {
anyhow::anyhow!(
"{msg}\n\nHint: For private repositories, ensure your token has the 'repo' scope."
)
} else if msg.contains("404") {
anyhow::anyhow!("not found: Comment {comment_id} not found on PR #{pr}")
} else {
anyhow::anyhow!("{msg}")
}
})
},
)
.await?;
Ok(comment)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn sample_comment(id: u64) -> ReviewComment {
ReviewComment {
id,
user: "alice".into(),
is_bot: false,
body: format!("Comment {id}"),
path: "src/lib.rs".into(),
line: Some(id),
side: Some("RIGHT".into()),
created_at: "2025-01-01T00:00:00Z".into(),
updated_at: "2025-01-01T00:00:00Z".into(),
html_url: format!("https://example.com/review/{id}"),
pull_request_review_id: Some(42),
in_reply_to_id: None,
}
}
fn sample_thread(id: u64) -> Thread {
Thread {
parent: sample_comment(id),
replies: vec![],
is_resolved: false,
}
}
fn sample_pr_summary(number: u64) -> PrSummary {
PrSummary {
number,
title: format!("PR {number}"),
author: "alice".into(),
state: "open".into(),
created_at: "2025-01-01T00:00:00Z".into(),
updated_at: "2025-01-01T00:00:00Z".into(),
comment_count: 0,
review_comment_count: 0,
}
}
#[test]
fn late_concurrent_caller_does_not_rewind_next_offset() {
let cache: PaginationCache<Thread> = PaginationCache::new();
let key = make_key("owner", "repo", 123, CommentSourceType::All, false, 2);
let query_lock = cache.get_or_create(&key);
let needs_fetch_a = {
let st = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
st.is_empty() || st.is_expired()
};
let needs_fetch_b = {
let st = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
st.is_empty() || st.is_expired()
};
assert!(needs_fetch_a);
assert!(needs_fetch_b);
guarded_post_fetch_reset(
&query_lock,
vec![sample_thread(1), sample_thread(2), sample_thread(3)],
2,
);
{
let mut st = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (page, has_more) = paginate_slice(&st.results, st.next_offset, st.page_size);
st.next_offset += page.len();
assert_eq!(page.len(), 2);
assert_eq!(page[0].parent.id, 1);
assert_eq!(page[1].parent.id, 2);
assert!(has_more);
assert_eq!(st.next_offset, 2);
}
guarded_post_fetch_reset(
&query_lock,
vec![sample_thread(100), sample_thread(101), sample_thread(102)],
2,
);
{
let st = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert_eq!(st.next_offset, 2);
assert_eq!(st.results[0].parent.id, 1);
assert_eq!(st.results[1].parent.id, 2);
assert_eq!(st.results[2].parent.id, 3);
}
{
let mut st = query_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (page, has_more) = paginate_slice(&st.results, st.next_offset, st.page_size);
st.next_offset += page.len();
assert_eq!(page.len(), 1);
assert_eq!(page[0].parent.id, 3);
assert!(!has_more);
}
}
#[test]
fn with_ai_prefix_adds_prefix() {
let body = "This is a reply";
let prefixed = with_ai_prefix(body);
assert!(prefixed.starts_with(AI_PREFIX));
assert_eq!(prefixed, format!("{AI_PREFIX}This is a reply"));
}
#[test]
fn with_ai_prefix_avoids_double_prefix() {
let already_prefixed = format!("{AI_PREFIX}Already prefixed");
let result = with_ai_prefix(&already_prefixed);
assert_eq!(result, already_prefixed);
assert_eq!(result.matches(AI_PREFIX).count(), 1);
}
#[test]
fn with_ai_prefix_handles_empty_body() {
let body = "";
let prefixed = with_ai_prefix(body);
assert_eq!(prefixed, AI_PREFIX);
}
#[test]
fn with_ai_prefix_handles_leading_whitespace() {
let body_with_space = format!(" {AI_PREFIX}Already prefixed");
let result = with_ai_prefix(&body_with_space);
assert_eq!(result, body_with_space);
assert_eq!(result.matches(AI_PREFIX).count(), 1);
}
#[test]
fn ai_prefix_contains_robot_emoji() {
assert!(AI_PREFIX.contains('\u{1F916}')); }
#[test]
fn ensure_repo_configured_fails_with_empty_owner_repo() {
let disabled = PrComments::disabled("test error".into());
let result = disabled.ensure_repo_configured();
assert!(result.is_err());
let err = match result {
Ok(()) => panic!("expected ensure_repo_configured to fail"),
Err(err) => err.to_string(),
};
assert!(
err.contains("repository context is not available"),
"Error should mention missing repo context"
);
assert!(
err.contains("ambient git repo detection"),
"Error should mention ambient detection"
);
assert!(
err.contains("test error"),
"Error should include the original init error"
);
}
#[test]
fn ensure_repo_configured_succeeds_with_valid_repo() {
let valid = PrComments::with_repo("owner".into(), "repo".into());
let result = valid.ensure_repo_configured();
assert!(result.is_ok());
}
#[tokio::test(start_paused = true)]
async fn github_total_timeout_expires_when_enabled() {
let svc = PrComments::with_repo_and_config(
"owner".into(),
"repo".into(),
GitHubServiceConfig {
total_timeout_secs: 1,
..Default::default()
},
);
let handle = tokio::spawn(async move {
svc.with_github_total_timeout("testing timeout", async {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
})
.await
});
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(1)).await;
let err = handle.await.unwrap().unwrap_err();
assert!(err.to_string().contains("timed out after 1s"));
}
#[tokio::test(start_paused = true)]
async fn github_total_timeout_zero_disables_wrapper() {
let svc = PrComments::with_repo_and_config(
"owner".into(),
"repo".into(),
GitHubServiceConfig {
total_timeout_secs: 0,
..Default::default()
},
);
let handle = tokio::spawn(async move {
svc.with_github_total_timeout("testing disabled timeout", async {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
})
.await
});
tokio::task::yield_now().await;
assert!(
!handle.is_finished(),
"timeout=0 should not time out immediately"
);
tokio::time::advance(Duration::from_secs(5)).await;
assert!(handle.await.unwrap().is_ok());
}
#[test]
fn completed_comment_pagination_restarts_with_fresh_state_on_next_identical_call() {
let cache: PaginationCache<Thread> = PaginationCache::new();
let key = make_key("owner", "repo", 123, CommentSourceType::All, false, 10);
let first_lock = cache.get_or_create(&key);
{
let mut state = first_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state.reset(vec![sample_thread(1), sample_thread(2)], (), 10);
let (page_threads, has_more) =
paginate_slice(&state.results, state.next_offset, state.page_size);
state.next_offset += page_threads.len();
assert_eq!(page_threads.len(), 2);
assert!(!has_more);
}
cache.remove_if_same(&key, &first_lock);
let restarted_lock = cache.get_or_create(&key);
assert!(!Arc::ptr_eq(&first_lock, &restarted_lock));
let restarted_state = restarted_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert!(restarted_state.is_empty());
assert_eq!(restarted_state.next_offset, 0);
}
#[test]
fn completed_pr_list_pagination_restarts_with_fresh_state_on_next_identical_call() {
let cache: PaginationCache<PrSummary> = PaginationCache::new();
let key = make_pr_list_key("owner", "repo", "open", 10);
let first_lock = cache.get_or_create(&key);
{
let mut state = first_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state.reset(vec![sample_pr_summary(1), sample_pr_summary(2)], (), 10);
let (page_prs, has_more) =
paginate_slice(&state.results, state.next_offset, state.page_size);
state.next_offset += page_prs.len();
assert_eq!(page_prs.len(), 2);
assert!(!has_more);
}
cache.remove_if_same(&key, &first_lock);
let restarted_lock = cache.get_or_create(&key);
assert!(!Arc::ptr_eq(&first_lock, &restarted_lock));
let restarted_state = restarted_lock
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert!(restarted_state.is_empty());
assert_eq!(restarted_state.next_offset, 0);
}
}